## 使用 Marathon 管理 Spark 2.0.2 实现运行 文/李雪岩,徐磊,吕晓旭 ### 背景 去年10月,我们(去哪网)在 Mesos 资源管理框架上实现了 Spark 1.5.2 版本的运行。Spark 版本更新后又对其进行了小升级,沿用之前修改过的代码重新编译,替换一下包,把历史任务全部发一遍就能很好地升级到现在的1.6.1集群版本,1.6.2改动不大也就没有继续升级。到现在正好一年的时间,线上已注册了44个 Spark 任务,其中28个 Streaming 任务。在运行这些任务的过程中我们遇到了很多问题,其中最大的一点是动态扩容问题,即当业务线增加了更复杂的代码逻辑时或者业务增长导致处理量上升时,Spark 会面临计算资源不足的情况,这时如果没有做流量控制那 Spark 任务会因内存承受不了而失败,如果做了流量控制则 Kafka 的 Lag 会有堆积,这时就需要增加 executor 来处理,但是数量的多少不好判断,因此要反复修改并重新发布来找到合理的配置。 我们在 Marathon 上使用 Logstash 时也有过类似问题。接入的日志较大时,流量会急剧增加从而导致 Logstash 无法应对,Kafka 的 Lag 产生堆积,这时只需点击 Marathon 界面的 Scale 然后填入更大的实例数字就能启动 Logstash 实例自动处理了。发现慢结点时,只需把 Marathon 对应的任务 Kill 掉就会自动补发替代任务。那么 Spark 可以实现 Logstash 的这些功能吗?我们决定在 Spark 2.0 版本中进行尝试,同时改进其它一些问题,另外 Spark2.0 是一次较大的版本升级,配置与之前的1.6.1不同,不能通过所有任务重发一遍来做到全部升级。 图1  使用Logstash的管理架构 图1 使用 Logstash 的管理架构 ### Mesos-dispacher 架构与问题 首先介绍一些 Mesos 相关的概念。Mesos 的 Framework 是资源分配与调度的发起者;Spark 自带一个 spark-mesos-dispacher 的 Framework,用来管理 Spark 的资源调度;而 Marathon 也是一个 Framework,本质上和 mesos-dispacher、spark schedular 相同。 图2中,首先得向 mesos 注册一个 mesos-dispacher 的 Framework,然后通过 spark-sumbit 脚本向 mesos-dispacher 发布任务,mesos-dispacher 接到任务后调度一个 Spark Driver,然后 driver 在 mesos 模式下,会再次向 mesos 注册这个任务的 Framework 也就是我们看到的 Spark UI,也可以理解为自身就是个调度器,然后这个 Framework 根据配置向 Mesos 申请资源来发出一些 Spark Executor。 图2  Mesos-dispacher架构 图2 Mesos-dispacher 架构 图3显示 mesos-dispacher 提供的一些功能: 1. 仅提供一个查看配置的界面,可以看到资源分配的信息,点进去后可以看到 SparkConf 的一些参数,但我们在业务线发布时就已经拿到了这些配置,这里只能用于确认 Driver 配置是否正确,而且在 Spark UI 上也能看到; 2. 自带一个 Driver 队列,能够按顺序依次发布,当资源不足时会在队列里等待; 3. 自带一个 Driver 的 HA 功能,但当提交的 Driver 代码有问题时会反复重发,比较难杀掉(最终能杀掉并且没有次数限制)。所以这个功能一般不开放。 图3  Mesos-dispacher功能截图 图3 Mesos-dispacher 功能截图 所以 mesos-dispacher 并不是一个完备的 Framework,在使用过程中存在以下问题: 1. 在发布 Spark 时需要向 mesos-dispacher 提供一个 SPARK _ EXECUTOR _ URI 配置,为 SPARK 运行环境提供地址。我们最初使用了 HTTP 方式,但在一次需要发60个 executor 的时候流量打满,原因是编译的 Spark 环境包约250MB,60台机器同时拉取环境就把流量打爆了。因此我们的解决方案是在每台机器上都部署 Spark 的环境,把 SPARK _ EXECUTOR _ URI 设成本地目录来解决这个问题。 2. 界面上的配置并不会真正同步到 driver 或 executor。由于 SPARK 的配置很灵活:启动 mesos-dispacher 时会读取 spark-defalut.conf 来加载配置;每次发布时又会从 spark-env.conf 里读取配置;发 driver 的时候,driver 会从 jar 包里的配置读取配置;用户自己也可以设置 sparkConf 的配置;executor 的 jar 包里同样也有配置。最终会发现有些配置生效了,有些配置的设置没有传递反而会造成配置混乱。 3. mesos-dispacher 基本功能缺失。mesos-dispacher 虽是专为 mesos 设计的,但它对 mesos 的基本功能(如 role 和 constrain)并不能很好支持,必须通过修改代码来实现,关于这个我提交了一个 PR 并且在 Spark2.0 已经没有这个问题了。mesos-dispacher 在运行时不能修改配置,必须重启。比如我们上了一些新机器,打了其它一些标签或者是多标签,如果想使其生效必须停止 mesos-dispacher 再启动才能生效,无法在运行时修改。mesos-dispacher 在非 HA 模式下默认工作,因此启动 mesos-dispacher 时需要加上 Mesosr 的 zk,这样当 mesos-dispacher 停止后,mesos-dispacher 上的任务也不会受到影响,并在重启时自动接管任务。 4. 没有动态扩容功能。我们希望做到的是让 Spark 在运行时增加或减少实例,但受于架构限制 mesos-dispacher 只能管理 driver,如果改 mesos-dispacher 的代码也只能实现动态扩 driver,意义不大。 5. 此外还有另一种方案,即帮助 Spark 改进 Framework 使其更强大,但后来发现有了 Marathon 这一优秀的 Framework 就可以了,重复造轮子的成本较大。同时也不希望对 Spark 代码有过多修改,不利于升级。 ### Marathon+Docker 统一架构 Mesos 有多种发布模式,我们主要考察了其中2种。 #### 独立集群模式 该模式下需要启动一个 master 作为发布入口,再对实例分别启动 slave,每个 slave 在启动时的资源就已经固定了。再增加资源时需启动新的 slave,然后停止之前的任务以及修改资源配置数重发,这种模式的好处是提供单独的界面,可以直接给业务线这个独立集群模式的界面使用,界面上可以根据自己固定的资源发多个任务,并且在 Spark UI 上可以直接看到日志。另外它还是预先占资源模式,在发布时不会有资源争抢导致资源不够的情况,但是缺点就在于做不到运行时的动态扩容。 #### 仿 mesos-dispacher 模式 该模式下,我们使用 Marathon 这个 Framework 来模仿 mesos-dispacher 所做的事,就是先发一个 driver 然后再发 executor 挂载到 driver 来执行任务。关于日志,我们仍使用之前调用 Mesos 接口的方式来获得日志。当需要增加资源的时候直接往结点继续挂 executor 就可以,当需要删除结点的时候直接停止 executor 即可。 ### 实施过程 #### 如何实现仿 mesos-dispacher 模式 我们要做的事实际上是把图2的架构变成图4的模式,其中 Step 1 和 Step 2 需要模仿,而 Step 0 则不需要,因为 Step 0 只是启动 Framework。通过观察 meos-dispacher 可以发现 Step 1 所做的实际上是调用 Spark Submit 向 Mesos 注册一个 Framework,然后再由 driver 来负责调度,我们利用 mesos 的 constraints 特性,设置一个不存在的不可调度策略,例如 colo:none,这样一来 driver 就无法管理资源,可以使用 Marathon 发布 Spark Executor 来挂到 driver 上,进而实现 Marathon 控制 Spark 的资源调度策略。由于 Mesos 把 Offer 推送给了 Framework,我们使用的这种方式也不会有性能问题。 图4  仿mesos-dispacher模式 图4 仿mesos-dispacher模式 那么图2中的 Step 2 如何实现?通过分析 Spark 源代码发现,Spark 2.0.2 在 Executor 挂到 drvier 上是通过图5的命令来做到的,所以通过 Marathon 发布 Spark Executor 的基本原理就是模仿图5的代码。 图5  主要代码 图5 主要代码 从图6可以看出 Marathon 发布的时候先发 Spark Driver,拿到 mesos 分配的 Spark Driver 的 IP 和 PORT 填入脚本,这个参数是 Driver 与 Executor 之间通信的通道,在发 Spark Executor 的时候需要提供,这个 Driver 的 IP 我们通过 Mesos 接口可以拿到,因为 Driver 会向 Mesos 注册一个 Framework,我们拿到 Framework 的信息就可以拿到 IP 和 PORT,同时我们还可以得到 FrameworkID,那这个 PORT 是在制作 Docker 镜像时随机分配的一个环境变量 PORT0,然后通过 spark.driver.port 指定,这样 Executor 这端就可以调用 Marathon 的 REST API 来拿到 driver 的 Port。 图6  Executor发布示意图 图6 Executor发布示意图 而参数 executor-id 是 Spark Driver 调度时按顺序分配的 ID,从0开始每次递增1,如何生成 executor-id 呢?这个由 Spark Executor 自己生成一个不超过 int 范围的、不重复的随机数即可,该 ID 不会影响其它行为。hostname 可以直接通过命令获取。cores 是通过用户提交的配置计算得出,这个 Core 需要填 spark.executor.cores,也就是每个 Spark Executor 的正常使用的 Core 与 spark.mesos.extra.cores 分配给每个 Spark Executor 之和。 最后一项 app-id,通过研究发现直接通过 Mesos 接口就可以获得 Framework ID。这样我们就完成了 Executor 的发布,通过上述命令来把 Spark Executor 挂到了 Driver 上,但在实际生产应用中还存在 Driver 和 Executor 的同步问题。 #### Spark Receiver 的平衡问题 这里介绍一下在 Kafka 使用高阶 API 时,影响 Spark 性能的 Receiver 平衡问题,使用低阶 API 不会有这个问题。如果使用 Spark 提供的 Kafka 高阶 API,会在代码里预先指定好 Receiver 的数量,然后再做一个 Union。在 Spark 代码中实际上是这样做的:先等待 Executor 连上 Driver,默认 30s 如果超过了则开始进行 Receiver 调度,而调度策略是 ReceiverPolicy 类里写死的,ReceiverPolicy 的调度策略可以概括为尽量保证均匀地分配给每个 Host 一定量的 Recevier。 举个例子如图7,当启动3个 Spark Executor、代码里指定启动1个 Executor 时,如果每个 Executor 启动在不同的 Host 下,Spark 在 Receiver 调度开始时会随机指定一个 Executor 启动 Receiver 并分配1个 Core 给这个 Task。但如果代码里指定为2个 Receiver 而2个 Executor 启动在了同1个 Host1 上,另一个启动在了 Host2 上,也就是 Receiver 的数量等于 Host Unique 数量,则会在 Host1 中保证其中的一个 Executor 启动1个 Receiver,Host2 中启动一个 Receiver。如果 Receiver 的数量大于 Host Unique 的数量如第三张图,则会在 Host1 或者 Host2 中随机开 Receiver,这就带来了一个问题。分析 Spark 源代码可知 Spark Driver 和 Spark Executor 间通过运行一个 DummyJob(也就是一个 MapReduce 任务)来保证同步,但这种做法只能保证一个 Spark Executor 挂在 Spark Driver 上,不能够保证所有的 Executor,比如只有一个 Spark Executor 挂在 Spark Driver 上时才开始 Receiver 调度。 图7  启动3个Spark Executor 示例 图7 启动3个 Spark Executor 示例 #### 如何保证 Driver 和 Executor 之间同步 Spark 官方文档中提供了2个参数去解决这个问题,分别是 spark.scheduler.maxRegisteredResourcesWaitingTime(用来设置一个等待 Executor 挂上的时间) 和 spark.scheduler.minRegisteredResourcesRatio(用来检查资源分配的比例),但在我们的方式下两个参数都不起作用了。因为 Spark 在实现过程中通过 DummyJob 的运行来保证挂载方式的同步,这也是为什么第一个任务一定是70个 Task 的原因,但这种方式在一个 Executor 挂上去后才开始调度 Recevier。因此我们对源代码进行了修改,主要是 ReceiverTracker 部分,通过一个自定义配置让 Executor 数量达到指定个数后才开始发布,这样在 Receiver 调度时才会保证均匀分配在各结点,从而实现最好的性能。另外对于业务线写的 jar 包,要求打成 assembly 包然后提交到我们的发布系统,发布系统会上传到 swift 上,在发布时我们会先在容器里把包下载下来,然后启动 Spark Driver,而当 Spark Executor 挂在 Spark Driver 上的时候,它们会自动从 Spark Driver 获取对应的 jar 包。 #### 如何保证容器的时间和编码的准确性让配置同步 之前在部署1.6.1的 mesos-dispacher 架构时,我们发现 Spark 打出的中文日志会产生乱码,即使做了各种实验、设置 JVM 参数、或是使用代码进行内部转换都解决不了乱码问题,在新架构的 Docker 环境中也不例外,不过最终还是解决了。我们发现通过设置 JAVA _ TOOL _ OPTIONS 这个环境变量,JAVA 虚拟机的参数才真正修改生效,于是我们在容器启动时配置了 file.encoding=UTF-8,乱码问题才得以解决。此外 Docker 镜像中系统时间也不准确,默认是 UTC 时间,而系统时间对代码的影响也很大,有可能写入到 HDFS 的文件是以时间戳生成的,我们一开始以只读的方式在 Docker 中挂载宿主机上的 /etc/localtime 来修正,但是发现时间还是不正确,因为 Spark 内部还会根据时区自动修正时间为 UTC,所以还需要给 JVM 加一个环境变量,设置 user.timezone=PRC,这样才可以保证时间是对的。另外使用这种架构时 spark.driver.extraJavaOptions 和 spark.executor.extraJavaOptions 这两个参数也不会生效,需要用户通过发布配置传过来,然后在容器中追加到 JAVA _ TOOL _ OPTIONS。值得注意的是 SPARK_EXECUTOR_MEMORY 也不会同步,需要手动进设置。 #### 如何保证 driver 和 executor 失败时同步 虽然解决了 marathon 发布 driver 和 executor 之间的连接问题,但是由于 mesos 接口慢,在实际测试中发30个 executor 就可以把 mesos 打挂,因此我们想了另一个办法来解决这个问题。我们首先修改了 Spark 代码,让 Spark Driver 在不依赖 mesos-dispacher 的情况下实现 driver 的 HA,HA 的实现原理大概就是每次在 Spark Driver 启动注册 Framework 时,把 Framework ID 存到 zk 里,然后在程序挂掉后保持 Framework 与 Mesos 的连接,在下次启动的时候重新注册这个 Framework,这样的话 Framework ID 可以基本保持不变,在发布 Spark Executor 的时候就可以固定住这个 Framework ID,在 Executor 挂掉时 marathon 拉起来也能保证重连,而 driver 如果挂掉的话它会重新注册,获得的 Framework ID 不变,又可以继续运行,这样做只需要在 Spark Driver 发布完成以后调用一次 Mesos 接口拿到 Framework ID,再分发给 Spark Executor 就可以了。另外 Spark Executor 拿 Spark Driver 的 ip 和 port 是通过调 Marathon 接口实现的,而 Marathon 接口速度很快,不会有这个问题。 #### 如何升级 Spark 版本 对业务线的任务来说升级 Spark 是一件麻烦的事,主要原因是需要改代码,不过从改代码的角度来说其实变化不大,Spark 版本和 Scala 版本改变下,再对部分 API 做一些调整。另外一个原因是之前没有使用过 Marathon+Docker 的模式,如果之前使用了,在以后的升级中我们只需要制作新镜像就可以,非常方便迁移,并且可以跑在任何集群上。现在为了过渡到这种模式,再结合之前发布的经验,我们使用的是旧的和新的各有一套配置,然后通过在 git 上打 tag 的方式,在旧配置里加入升级信息,然后发布逻辑改为优先读取是否要升级,如果需要升级则发在新集群上,如果不需要则保持不变,我们会先让业务线进行测试,同时保持旧的任务在线,测试通过之后再停止旧的任务,把改好的新版本发到新集群上,有问题时可以用原来的 tag 进行回滚,因为原来 tag 里的配置会先判断是否需要升级,而之前的配置没有需要升级的选项。 #### 如何监控 Spark 的运行状态 Spark 自身有一套 metric 监控,新版本也不例外。我们集群中唯一的变更就是把不靠谱的 udp 改成了 tcp,另外因为使用了 Docker 容器,我们还有另一套监控,用于分析 cgroup 里的数据,使用的是我们开源的 pyadvisor。我们可以通过监控来观察 CPU 和内存的使用情况,提出优化资源使用的建议。另外,对于业务线们,我们推荐使用 Spark 自带的 Accumulator,先在 Spark Driver 上做个聚合1分钟的指标,然后再往 watcher 上打他们的业务指标,这样既不会有之前不同 host 间聚合指标的问题,同时也给 watcher 减轻了压力。 ### 总结 以上就是我们所做的新 Spark 架构,综合看来有以下优点: 1. 无需环境配置与部署,走 Docker。对于以后升级也较为方便,可以复用之前 Dockerfile; 2. 以直接启动的方式,配置绝对生效,不会出现复杂配置的问题; 3. 自动平衡 executor。没有 Receiver 不平衡的问题,在某些场景下可以动态增减 executor,不会有失败过多而不再拉 executor 的现象,也不会有多发或少发 executor 的现象; 4. 由于使用 Marathon 的原因,可以支持多标签复杂调度,例如业务线有时需要指定的机运行 Spark 开白名单,同时也为以后的迁移提供了更多便利。