`
baishuo491
  • 浏览: 77254 次
  • 性别: Icon_minigender_1
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

spark源码分析--Master和worker建立连接

阅读更多
原创,转载请注明出处 http://baishuo491.iteye.com/blog/1990242。作者邮箱 vc_java@hotmail.com
Spark的master启动后,等待work通过spark://master'ip:7077的url去连接Master.  
在worker的回调函数preStart(Worker.scala)里面,调用了函数connectToMaster,这个函数完成了向Master节点注册work的工作。执行的方法是向master发送一个RegisterWorker消息
master ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort.get, publicAddress)

再来看Master.scala
在这个类的recieve函数里,我们可以看到当Master收到RegisterWorker消息后如何处理
case RegisterWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress) => {
      ........
      if (idToWorker.contains(id)) {
        sender ! RegisterWorkerFailed("Duplicate worker ID")
      } else {//如果idToWorker里面没有,成功注册
        addWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress)
        ........
        sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUi.boundPort.get)
        schedule()
      }
    }

如果idToWorker这个hashmap里面,已经存在了相同的id,给发出请求的worker,发送RegisterWorkerFailed消息。如果不存在相同的id,执行addWorker操作后,向发出消息的worker,发送RegisteredWorker消息。之后调用schedule函数,进行job的重新分配
再回到Worker.scala,看worker收到RegisteredWorker消息后的动作
      case RegisteredWorker(url) =>
      .......
      context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis) {
        master ! Heartbeat(workerId)
      }

很简单,就以HEARTBEAT_MILLIS (默认是15秒(15000毫秒) --System.getProperty("spark.worker.timeout", "60").toLong * 1000 / 4)为时间间隔,定期向master发送心跳,
而master 每隔WORKER_TIMEOUT(默认60秒(60000毫秒) val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000),检查一次超时。发送CheckForWorkerTimeOut消息给自己(也就是master),收到这个消息后,调用timeOutDeadWorkers清理超过WORKER_TIMEOUT时间间隔,仍未收到心跳的worker
  • 大小: 55.9 KB
分享到:
评论
1 楼 lzh8189146 2014-04-02  
楼主给力,在读源码之前先看一下楼主的帖子,事半功百倍!!!!!
        

相关推荐

Global site tag (gtag.js) - Google Analytics