Spark(四十七):Spark UI 数据可视化,Metrics信息,Apache Spark源码走读之21 -- WEB UI和Metrics初始化及数据更新过程分析
Spark UI(http server)是如何被启动的
接下来让我们从源码入手查看下Spark UI(http server)是如何被启动的,页面中的数据从哪里获取到。
SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。1)Spark UI中用到的http server是jetty,jetty采用java编写,是比较不错的servlet engine和http server,能嵌入到用户程序中执行,不用用tomcat或jboss那样需要运行在独立jvm进程中。
Spark UI(http server)在SparkContext初始化的时候被创建:
class SparkContext(config: SparkConf) extends Logging { ... private var _ui: Option[SparkUI] = None ... private[spark] def ui: Option[SparkUI] = _ui def uiWebUrl: Option[String] = _ui.map(_.webUrl) ... try { _conf = config.clone() _conf.validateSettings() if (!_conf.contains("spark.master")) { throw new SparkException("A master URL must be set in your configuration") } if (!_conf.contains("spark.app.name")) { throw new SparkException("An application name must be set in your configuration") } // log out spark.app.name in the Spark driver logs logInfo(s"Submitted application: $appName") // System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) { throw new SparkException("Detected yarn cluster mode, but isn't running on a cluster. " + "Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.") } if (_conf.getBoolean("spark.logConf", false)) { logInfo("Spark configuration:\n" + _conf.toDebugString) } // Set Spark driver host and port system properties. This explicitly sets the configuration // instead of relying on the default value of the config constant. _conf.set(DRIVER_HOST_ADDRESS, _conf.get(DRIVER_HOST_ADDRESS)) _conf.setIfMissing("spark.driver.port", "0") _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER) _jars = Utils.getUserJars(_conf) _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty)) .toSeq.flatten _eventLogDir = if (isEventLogEnabled) { val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR) .stripSuffix("/") Some(Utils.resolveURI(unresolvedDir)) } else { None } _eventLogCodec = { val compress = _conf.getBoolean("spark.eventLog.compress", false) if (compress && isEventLogEnabled) { Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName) } else { None } } _listenerBus = new LiveListenerBus(_conf) // Initialize the app status store and listener before SparkEnv is created so that it gets // all events. _statusStore = AppStatusStore.createLiveStore(conf) listenerBus.addToStatusQueue(_statusStore.listener.get) // Create the Spark execution environment (cache, map output tracker, etc) _env = createSparkEnv(_conf, isLocal, listenerBus) SparkEnv.set(_env) // If running the REPL, register the repl's output dir with the file server. _conf.getOption("spark.repl.class.outputDir").foreach { path => val replUri = _env.rpcEnv.fileServer.addDirectory("/classes", new File(path)) _conf.set("spark.repl.class.uri", replUri) } _statusTracker = new SparkStatusTracker(this, _statusStore) _progressBar = if (_conf.get(UI_SHOW_CONSOLE_PROGRESS) && !log.isInfoEnabled) { Some(new ConsoleProgressBar(this)) } else { None } _ui = if (conf.getBoolean("spark.ui.enabled", true)) { Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "", startTime)) } else { // For tests, do not enable the UI None } // Bind the UI before starting the task scheduler to communicate // the bound port to the cluster manager properly _ui.foreach(_.bind()) _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf) // Add each JAR given through the constructor if (jars != null) { jars.foreach(addJar) } if (files != null) { files.foreach(addFile) } _executorMemory = _conf.getOption("spark.executor.memory") .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY"))) .orElse(Option(System.getenv("SPARK_MEM")) .map(warnSparkMem)) .map(Utils.memoryStringToMb) .getOrElse(1024) // Convert java options to env vars as a work around // since we can't set env vars directly in sbt. for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing")) value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} { executorEnvs(envKey) = value } Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v => executorEnvs("SPARK_PREPEND_CLASSES") = v } // The Mesos scheduler backend relies on this environment variable to set executor memory. // TODO: Set this only in the Mesos scheduler. executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m" executorEnvs ++= _conf.getExecutorEnv executorEnvs("SPARK_USER") = sparkUser // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640) _heartbeatReceiver = env.rpcEnv.setupEndpoint( HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this)) // Create and start the scheduler val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) _schedulerBackend = sched _taskScheduler = ts _dagScheduler = new DAGScheduler(this) _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet) // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's // constructor _taskScheduler.start() _applicationId = _taskScheduler.applicationId() _applicationAttemptId = taskScheduler.applicationAttemptId() _conf.set("spark.app.id", _applicationId) if (_conf.getBoolean("spark.ui.reverseProxy", false)) { System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId) } _ui.foreach(_.setAppId(_applicationId)) _env.blockManager.initialize(_applicationId) // The metrics system for Driver need to be set spark.app.id to app ID. // So it should start after we get app ID from the task scheduler and set spark.app.id. _env.metricsSystem.start() // Attach the driver metrics servlet handler to the web ui after the metrics system is started. _env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler))) _eventLogger = if (isEventLogEnabled) { val logger = new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get, _conf, _hadoopConfiguration) logger.start() listenerBus.addToEventLogQueue(logger) Some(logger) } else { None } // Optionally scale number of executors dynamically based on workload. Exposed for testing. val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf) _executorAllocationManager = if (dynamicAllocationEnabled) { schedulerBackend match { case b: ExecutorAllocationClient => Some(new ExecutorAllocationManager( schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf, _env.blockManager.master)) case _ => None } } else { None } _executorAllocationManager.foreach(_.start()) _cleaner = if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) { Some(new ContextCleaner(this)) } else { None } _cleaner.foreach(_.start()) setupAndStartListenerBus() postEnvironmentUpdate() postApplicationStart() // Post init _taskScheduler.postStartHook() _env.metricsSystem.registerSource(_dagScheduler.metricsSource) _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager)) _executorAllocationManager.foreach { e => _env.metricsSystem.registerSource(e.executorAllocationManagerSource) } // Make sure the context is stopped if the user forgets about it. This avoids leaving // unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM // is killed, though. logDebug("Adding shutdown hook") // force eager creation of logger _shutdownHookRef = ShutdownHookManager.addShutdownHook( ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () => logInfo("Invoking stop() from shutdown hook") try { stop() } catch { case e: Throwable => logWarning("Ignoring Exception while stopping SparkContext from shutdown hook", e) } } } catch { case NonFatal(e) => logError("Error initializing SparkContext.", e) try { stop() } catch { case NonFatal(inner) => logError("Error stopping SparkContext after init error.", inner) } finally { throw e } }
...
2)SparkUI调用create方法后会初始化一个SparkUI对象,在SparkUI对象被初始化时,会调用SparkUI的initialize()方法
2.1)该方法包含将JobsTab、StagesTab、StorageTab、EnvironmentTab、ExecutorsTab加载&初始化常用的Spark UI所有页面[Tab];
/** * Top level user interface for a Spark application. */ private[spark] class SparkUI private ( val store: AppStatusStore, val sc: Option[SparkContext], val conf: SparkConf, securityManager: SecurityManager, var appName: String, val basePath: String, val startTime: Long, val appSparkVersion: String) extends WebUI(securityManager, securityManager.getSSLOptions("ui"), SparkUI.getUIPort(conf), conf, basePath, "SparkUI") with Logging with UIRoot { val killEnabled = sc.map(_.conf.getBoolean("spark.ui.killEnabled", true)).getOrElse(false) var appId: String = _ private var streamingJobProgressListener: Option[SparkListener] = None /** Initialize all components of the server. */ def initialize(): Unit = { val jobsTab = new JobsTab(this, store) attachTab(jobsTab) val stagesTab = new StagesTab(this, store) attachTab(stagesTab) attachTab(new StorageTab(this, store)) attachTab(new EnvironmentTab(this, store)) attachTab(new ExecutorsTab(this)) addStaticHandler(SparkUI.STATIC_RESOURCE_DIR) attachHandler(createRedirectHandler("/", "/jobs/", basePath = basePath)) attachHandler(ApiRootResource.getServletHandler(this)) // These should be POST only, but, the YARN AM proxy won't proxy POSTs attachHandler(createRedirectHandler( "/jobs/job/kill", "/jobs/", jobsTab.handleKillRequest, httpMethods = Set("GET", "POST"))) attachHandler(createRedirectHandler( "/stages/stage/kill", "/stages/", stagesTab.handleKillRequest, httpMethods = Set("GET", "POST"))) } initialize() 。。。 }
上边每个JobsTab、StagesTab、StorageTab、EnvironmentTab、ExecutorsTab除了包含有渲染页面类,还包含资源html&js&css&其他(图片)(https://github.com/apache/spark/tree/branch-2.4/core/src/main/resources/org/apache/spark/ui/static)
2.2)在上边SparkContext初始化时,创建了SparkUI对象,将会调用bind()方法,这个bind()方法SparkUI子类WebUI中的一个方法。
/** * The top level component of the UI hierarchy that contains the server. * * Each WebUI represents a collection of tabs, each of which in turn represents a collection of * pages. The use of tabs is optional, however; a WebUI may choose to include pages directly. */ private[spark] abstract class WebUI( val securityManager: SecurityManager, val sslOptions: SSLOptions, port: Int, conf: SparkConf, basePath: String = "", name: String = "") extends Logging { protected val tabs = ArrayBuffer[WebUITab]() protected val handlers = ArrayBuffer[ServletContextHandler]() protected val pageToHandlers = new HashMap[WebUIPage, ArrayBuffer[ServletContextHandler]] protected var serverInfo: Option[ServerInfo] = None protected val publicHostName = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse( conf.get(DRIVER_HOST_ADDRESS)) private val className = Utils.getFormattedClassName(this) def getBasePath: String = basePath def getTabs: Seq[WebUITab] = tabs def getHandlers: Seq[ServletContextHandler] = handlers def getSecurityManager: SecurityManager = securityManager /** Attaches a tab to this UI, along with all of its attached pages. */ def attachTab(tab: WebUITab): Unit = { tab.pages.foreach(attachPage) tabs += tab } /** Detaches a tab from this UI, along with all of its attached pages. */ def detachTab(tab: WebUITab): Unit = { tab.pages.foreach(detachPage) tabs -= tab } /** Detaches a page from this UI, along with all of its attached handlers. */ def detachPage(page: WebUIPage): Unit = { pageToHandlers.remove(page).foreach(_.foreach(detachHandler)) } /** Attaches a page to this UI. */ def attachPage(page: WebUIPage): Unit = { val pagePath = "/" + page.prefix val renderHandler = createServletHandler(pagePath, (request: HttpServletRequest) => page.render(request), securityManager, conf, basePath) val renderJsonHandler = createServletHandler(pagePath.stripSuffix("/") + "/json", (request: HttpServletRequest) => page.renderJson(request), securityManager, conf, basePath) attachHandler(renderHandler) attachHandler(renderJsonHandler) val handlers = pageToHandlers.getOrElseUpdate(page, ArrayBuffer[ServletContextHandler]()) handlers += renderHandler } /** Attaches a handler to this UI. */ def attachHandler(handler: ServletContextHandler): Unit = { handlers += handler serverInfo.foreach(_.addHandler(handler)) } /** Detaches a handler from this UI. */ def detachHandler(handler: ServletContextHandler): Unit = { handlers -= handler serverInfo.foreach(_.removeHandler(handler)) } /** * Detaches the content handler at `path` URI. * * @param path Path in UI to unmount. */ def detachHandler(path: String): Unit = { handlers.find(_.getContextPath() == path).foreach(detachHandler) } /** * Adds a handler for static content. * * @param resourceBase Root of where to find resources to serve. * @param path Path in UI where to mount the resources. */ def addStaticHandler(resourceBase: String, path: String = "/static"): Unit = { attachHandler(JettyUtils.createStaticHandler(resourceBase, path)) } /** A hook to initialize components of the UI */ def initialize(): Unit /** Binds to the HTTP server behind this web interface. */ def bind(): Unit = { assert(serverInfo.isEmpty, s"Attempted to bind $className more than once!") try { val host = Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse("0.0.0.0") serverInfo = Some(startJettyServer(host, port, sslOptions, handlers, conf, name)) logInfo(s"Bound $className to $host, and started at $webUrl") } catch { case e: Exception => logError(s"Failed to bind $className", e) System.exit(1) } } /** @return The url of web interface. Only valid after [[bind]]. */ def webUrl: String = s"http://$publicHostName:$boundPort" /** @return The actual port to which this server is bound. Only valid after [[bind]]. */ def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1) /** Stops the server behind this web interface. Only valid after [[bind]]. */ def stop(): Unit = { assert(serverInfo.isDefined, s"Attempted to stop $className before binding to a server!") serverInfo.foreach(_.stop()) } }
https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/ui/WebUI.scala
这个bind()方法,包含节点信息:
a)其中调用startJettyServer(...)方法,该方法是JettyUtils.scala中的一个方法,这点也说明了SparkUI运行时基于jetty实现的。
b)调用startjettyServer(...)方法传递了host,port参数,这两个参数也是Spark UI访问的ip和端口,我们需要了解下这两个参数具体的配置在哪里。
√ host的获取代码:
val host = Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse("0.0.0.0")
spark-env.sh包含了参数(SPARK_LOCAL_IP、SPARK_PUBLIC_DNS):
# Options read when launching programs locally with # ./bin/run-example or ./bin/spark-submit # - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files # - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node # - SPARK_PUBLIC_DNS, to set the public dns name of the driver program # Options read by executors and drivers running inside the cluster # - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node # - SPARK_PUBLIC_DNS, to set the public DNS name of the driver program # - SPARK_LOCAL_DIRS, storage directories to use on this node for shuffle and RDD data # - MESOS_NATIVE_JAVA_LIBRARY, to point to your libmesos.so if you use Mesos
https://github.com/apache/spark/blob/branch-2.4/conf/spark-env.sh.template
√ ip的获取代码在SparkUI object静态类中
private[spark] object SparkUI { val DEFAULT_PORT = 4040 val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static" val DEFAULT_POOL_NAME = "default" def getUIPort(conf: SparkConf): Int = { conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT) } /** * Create a new UI backed by an AppStatusStore. */ def create( sc: Option[SparkContext], store: AppStatusStore, conf: SparkConf, securityManager: SecurityManager, appName: String, basePath: String, startTime: Long, appSparkVersion: String = org.apache.spark.SPARK_VERSION): SparkUI = { new SparkUI(store, sc, conf, securityManager, appName, basePath, startTime, appSparkVersion) } }
Spark UI上数据获取
参考:
《Apache Spark源码走读之21 -- WEB UI和Metrics初始化及数据更新过程分析》
《Spark Structrued Streaming源码分析--(四)ProgressReporter每个流处理进度计算、StreamQueryManager管理运行的流》
