博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spark 笔记 12: Executor,task最后的归宿
阅读量:5298 次
发布时间:2019-06-14

本文共 3563 字,大约阅读时间需要 11 分钟。

spark的Executor是执行task的容器。和java的executor概念类似。
===================start executor runs task============================
->CoarseGrainedExecutorBackend::receiveWithLogging --接收CoarseGrainedSchedulerBackend发来的消息
->case LaunchTask(data) =>  处理启动task的消息
->val taskDesc = ser.deserialize[TaskDescription](data.value) --将受到的taskDescription反序列化
->executor.
launchTask(this, taskDesc.taskId, taskDesc.name, taskDesc.serializedTask) --调用executor的launchTask方法
  
->Executor::
launchTask(    --Executor执行task的方法
->val tr =
new TaskRunner(context, taskId, taskName, serializedTask)  --创建一个新的task,这样可以放到新的线程中执行
->override def run() {
->val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)  --解析task字段
->
updateDependencies(taskFiles, taskJars)  //Download any missing dependencies if we receive a new set of files 
//and JARs from theSparkContext. Also adds any new JARs we fetched to the class loader. 更新并补全依赖
->for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp)   --获取依赖文件
->Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager)
->for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp)  --获取依赖jar包
->Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager)
->val url = new File(SparkFiles.getRootDirectory, localName).toURI.toURL
->urlClassLoader.addURL(url)
->task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)  --反序列化task
->
val value = task.run(taskId.toInt)  --直接调用task的run函数
->val valueBytes = resultSer.serialize(value) --序列化任务结果
->val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull)  生成会送给的task结果
->val serializedDirectResult = ser.serialize(directResult)   --序列化回送的结果
->execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)  --回送结果
    
    
    
    
    
     
    
->
driver
!
StatusUpdate
(executorId
,
taskId
,
state
,
data) --
CoarseGrainedSchedulerBackend实现
->env.shuffleMemoryManager.releaseMemoryForThisThread() // Release memory used by this thread for shuffles
->env.blockManager.memoryStore.releaseUnrollMemoryForThisThread()// Release memory used by this thread for unrolling blocks
->
runningTasks.remove(taskId)
->runningTasks.put(taskId, tr)
->threadPool.execute(tr)
  ===========================end======================
/**  * Spark executor used with Mesos, YARN, and the standalone scheduler.  */ private[spark] class Executor(     executorId: String,     slaveHostname: String,     properties: Seq[(String, String)],     isLocal: Boolean = false) extends Logging {
重要属性:
// Maintains the list of running tasks. private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
通过心跳发送任务状态到master
def startDriverHeartbeater() {
终于看到熟悉的executor了。这就是最终我们要执行的东西。
def launchTask(     context: ExecutorBackend, taskId: Long, taskName: String, serializedTask: ByteBuffer) {
val tr = new TaskRunner(context, taskId, taskName, serializedTask) runningTasks.put(taskId, tr) threadPool.execute(tr) }
class TaskRunner(     execBackend: ExecutorBackend, val taskId: Long, taskName: String, serializedTask: ByteBuffer)
CoarseGrainedExecutorBackend: 是直接与master的CoarseGrainedSchedulerBackend类对位的,它们直接通信来实现任务的传递和结果回送功能。
private[spark] class CoarseGrainedExecutorBackend(     driverUrl: String,     executorId: String,     hostPort: String,     cores: Int,     sparkProperties: Seq[(String, String)]) extends Actor with ActorLogReceive with ExecutorBackend with Logging {
posted on
2015-01-25 01:55 阅读(
...) 评论(
...)

转载于:https://www.cnblogs.com/zwCHAN/p/4247690.html

你可能感兴趣的文章
返回结果数据帮助类
查看>>
SVN部署和使用
查看>>
Build Tools
查看>>
Mysql的基础使用之MariaDB安装
查看>>
单链表操作B 分类: 链表 2015-06-0...
查看>>
周赛-Heros and Swords 分类: 比赛 ...
查看>>
Error:No suitable device found: no device found for connection "System eth0"
查看>>
Go beego框架使用笔记(一)
查看>>
jQuery各种效果举例
查看>>
Day47:HTML(简介及常用标签)
查看>>
Redis.md
查看>>
软件工程课堂小测01
查看>>
大道至简阅读笔记02
查看>>
自定义日志工具LogUtil
查看>>
Linux下安装PHP遇到的各种问题
查看>>
transition
查看>>
shell脚本自动备份数据库(精简版)
查看>>
充分发挥FPGA优势 Altera首推新颖OpenCL工具
查看>>
ORA-12520: TNS: 监听程序无法为请求的服务器类型找到可用的处理程序
查看>>
ORACLE手工删除数据库
查看>>