## Heron:来自 Twitter 的新一代流处理引擎 (应用篇) 文/吴惠君,吕能,符茂松 >本文对比了 Heron 和常见的流处理项目,包括 Storm、Flink、Spark Streaming 和 Kafka Streams,归纳了系统选型的要点。此外实践了 Heron 的一个案例,以及讨论了 Heron 在这一年开发的新特性。 在今年6月期的“基础篇”中,我们通过学习 Heron 的基本概念、整体架构和核心组件等内容,对 Heron 的设计、运行等方面有了基本的了解。在这一期的“应用篇”中,我们将 Heron 与其他流行的实时流处理系统(Apache Storm 、Apache Flink、Apache Spark Streaming 和Apache Kafka Streams)进行比较。在此基础上,我们再介绍如何在实际应用中进行系统选型。然后我们将分享一个简单的案例应用。最后我们会介绍在即将完结的2017年里 Heron 有哪些新的进展。 ### 实时流处理系统比较与选型 当前流行的实时流处理系统主要包括 Apache 基金会旗下的 Apache Storm、Apache Flink、Apache Spark Streaming 和 Apache Kafka Streams 等项目。虽然它们和 Heron 同属于实时流处理范畴,但是它们也有各自的特点。 #### Heron 对比 Storm(包括 Trident) 在 Twitter 内部,Heron 替换了 Storm,是流处理的标准。 ##### 数据模型的区别 Heron 兼容 Storm 的数据模型,或者说 Heron 兼容 Storm 的 API,但是背后的实现完全不同。所以它们的应用场景是一样的,能用 Storm 的地方也能用 Heron。但是 Heron 比 Storm 提供更好的效率,更多的功能,更稳定,更易于维护。 Storm Trident 是 Storm 基础上的项目,提供高级别的 API,如同 Heron 的函数式 API。Trident 以 checkpoint 加 rollback 的方式实现了 exactly once;Heron 以 Chandy 和 Lamport 发明的分布式快照算法实现了 effectively once。 ##### 应用程序架构的区别 Storm 的 worker 在每个 JVM 进程中运行多个线程,每个线程中执行多个任务。这些任务的 log 混在一起,很难调试不同任务的性能。Storm 的 nimbus 无法对 worker 进行资源隔离,所以多个 topology 的资源之间互相影响。另外 ZooKeeper 被用来管理 heartbeat,这使得 ZooKeeper 很容易变成瓶颈。 Heron 的每个任务都是单独的 JVM 进程,方便调试和资源隔离管理,同时节省了整个 topology 的资源。ZooKeeper 在 Heron 中只存放很少量的数据,heartbeat 由 tmaster 进程管理,对 ZooKeeper 没有压力。 #### Heron 对比 Flink Flink 框架包含批处理和流处理两方面的功能。Flink 的核心采用流处理的模式,它的批处理模式通过模拟块数据的的流处理形式得到。 ##### 数据模型的区别 Flink 在 API 方面采用 declarative 的 API 模式。Heron 既提供 declarative 模式 API 或者叫做 functional API 也提供底层 compositional 模式的 API,此外 Heron 还提供 Python 和 C++ 的 API。 ##### 应用程序架构的区别 在运行方面,Flink 可以有多种配置,一般情况采用的是多任务多线程在同一个 JVM 中的混杂模式,不利于调试。Heron 采用的是单任务单 JVM 的模式,利于调试与资源分配。 在资源池方面,Flink 和 Heron 都可以与多种资源池合作,包括 Mesos/Aurora、YARN、Kubernetes 等。 #### Heron 对比 Spark Streaming Spark Streaming 处理 tuple 的粒度是 micro-batch,通常使用半秒到几秒的时间窗口,将这个窗口内的 tuple 作为一个 micro-batch 提交给 Spark 处理。而 Heron 使用的处理粒度是 tuple。由于时间窗口的限制,Spark Streaming 的平均响应周期可以认为是半个时间窗口的长度,而 Heron 就没有这个限制。所以 Heron 是低延迟,而 Spark Streaming 是高延迟。 Spark Streaming 近期公布了一项提案,计划在下一个版本2.3中加入一个新的模式,新的模式不使用 micro-batch 来进行计算。 ##### 数据模型的区别 语义层面上,Spark Streaming 和 Heron 都实现了 exactly once/effectively once。状态层面上,Spark Streaming 和 Heron 都实现了 stateful processing。API 接口方面,Spark Streaming 支持 SQL,Heron 暂不支持。Spark Streaming 和 Heron 都支持 Java、Python 接口。需要指出的是,Heron 的 API 是 pluggable 模式的,除了 Java 和 Python 以外,Heron 可以支持许多编程语言,比如 C++。 ##### 应用程序架构的区别 任务分配方面,Spark Streaming 对每个任务使用单个线程。一个 JVM 进程中可能有多个任务的线程在同时运行。Heron 对每个任务都是一个单独的 heron-instance 进程,这样的设计是为了方便调试,因为当一个 task 失败的时候,只用把这个任务进程拿出来检查就好了,避免了进程中各个任务线程相互影响。 资源池方面,Spark Streaming 和 Heron 都可以运行在 YARN 和 Mesos 上。需要指出的是 Heron 的资源池设计是 pluggable interface 的模式,可以连接许多资源管理器,比如 Aurora 等。读者可以查看参考资料[11]了解 Heron 支持的资源池。 #### Heron 对比 Kafka Streams Kafka Streams 是一个客户端的程序库。通过这个调用库,应用程序可以读取 Kafka 中的消息流进行处理。 ##### 数据模型的区别 Kafka Streams 与 Kafka 绑定,需要订阅 topic 来获取消息流,这与 Heron 的 DAG 模型完全不同。对于 DAG 模式的流计算,DAG 的结点都是由流计算框架控制,用户计算逻辑需要按照 DAG 的模式提交给这些框架。Kafka Streams 没有这些预设,用户的计算逻辑完全用户控制,不必按照 DAG 的模式。此外,Kafka Streams 也支持反压(back pressure)和 stateful processing。 Kafka Streams 定义了2种抽象:KStream 和 KTable。在 KStream 中,每一对 key-value 是独立的。在 KTable 中,key-value 以序列的形式解析。 ##### 应用程序架构的区别 Kafka Streams 是完全基于 Kafka 来建设的,与 Heron 等流处理系统差别很大。Kafka Streams 的计算逻辑完全由用户程序控制,也就是说流计算的逻辑并不在 Kafka 集群中运行。Kafka Streams 可以理解为一个连接器,从 Kafka 集群中读取和写入键值序列,计算所需资源和任务生命周期等等都要用户程序管理。而 Heron 可以理解为一个平台,用户提交 topology 以后,剩下的由 Heron 完成。 #### 选型 归纳以上对各个系统的比较,我们可以得到如上的表基于以上表格的比较,我们可以得到如下的选型要点: 表1 各系统比较 - Storm 适用于需要快速响应、中等流量的场景。Storm 和 Heron 在 API 上兼容,在功能上基本可以互换;Twitter 从 Storm 迁移到了 Heron,说明如果 Storm 和 Heron 二选一的话,一般都是选 Heron。 - Kafka Streams 与 Kafka 绑定,如果现有系统是基于 Kafka 构建的,可以考虑使用 Kafka Streams,减少各种开销。 - 一般认为 Spark Streaming 的流量是这些项目中最高的,但是它的响应延迟也是最高的。对于响应速度要求不高、但是对流通量要求高的系统,可以采用 Spark Streaming;如果把这种情况推广到极致就可以直接使用 Spark 系统。 - Flink 使用了流处理的内核,同时提供了流处理和批处理的接口。如果项目中需要同时兼顾流处理和批处理的情况,Flink 比较适合。同时因为需要兼顾两边的取舍,在单个方面就不容易进行针对性的优化和处理。 总结上面,Spark Streaming、Kafka Streams、Flink 都有特定的应用场景,其他一般流处理情况下可以使用 Heron。 ### Heron 案例学习 让我们在 Ubuntu 单机上来实践运行一个示例 topology,这包括如下几个步骤: - 安装 Heron 客户端,
启动一个 Heron 示例 topology,
其他 topology 操作命令。
- 安装 Heron 工具包,
运行 Heron Tracker,
运行 Heron UI。
#### 运行 topology 首先找到 Heron 的发布网页Heron找到最新的版本0.16.5。可以看到 Heron 提供了多个版本的安装文件,这些安装文件又分为几个类别:客户端 client、工具包 tools 和开发包 API 等。 ##### 安装客户端 下载客户端安装文件 heron-client-install-0.16.5-ubuntu.sh: ``` wget https://github.com/twitter/heron/releases/download/0.16.5/heron-client-install-0.16.5-darwin.sh ``` 然后执行这个文件: ``` chmod +x heron-*.sh ./heron-client-install-0.16.5--PLATFORM.sh --user ``` 其中--user 参数让 heron 客户端安装到当前用户目录 ~/.hedon,同时在 ~/bin 下创建一个链接指向 ~/.heorn/bin 下的可执行文件。 Heron 客户端是一个名字叫 heron 的命令行程序。可以通过 export PATH=~/bin:$PATH 让 heron 命令能被直接访问。运行如下命令来检测 heron 命令是否安装成功: ``` heron version ``` ##### 运行示例 topology 首先添加 localhost 到 /etc/hosts,Heron 在单机模式时会用 /etc/hosts 来解析本地域名。 Heron 客户端安装时已经包含了一个示例 topology 的 jar 包,在 ~/.heron/example 目录下。我们可以运行其中一个示例 topology 作为例子: ``` heron submit local ~/.heron/examples/heron-examples.jar \ com.twitter.heron.examples.ExclamationTopology ExclamationTopology \ --deploy-deactivated ``` heron submit 命令提交一个 topology 给 heron 运行。关于 heron submit 的命令的格式,可以用过 heron help submit 来查看。 当 Heron 运行在单机本地模式时,它会将运行状态和日志等信息存放在 ~/.herondata 目录下。我们可以可以查看刚才运行的示例 topology 目录,具体位置是: ``` ls -al ~/.herondata/topologies/local/${USER_NAME}/ExclamationTopology ``` ##### Topology 生命周期 一个 topology 的生命周期包括如下几个阶段: - submit:提交 topology 给 heron-scheduler。这时 topology 还没有处理 tuples,但是它已经准备好,等待被 activate; - activate/deactivate:让 topology 开始/停止处理 tuples; - restart:重启一个 topology,让资源管理器重新分配容器; - kill:撤销 topology, 释放资源。 这些阶段都是通过 heron 命令行客户端来管理的。具体的命令格式可以通过 heron help 查看。 #### Heron 工具包 Heron 项目提供了一些工具,可以方便查看数据中心中运行的 topology 状态。在单机本地模式下,我们也可以来试试这些工具。这些工具主要包括: - Tracker:一个服务器提供 restful API,监视每个 topology 的运行时状态; - UI:一个网站,调用 Tracker restful API 展示成网页。 一个数据中心内可以部署一套工具包来涵盖整个数据中心的所有 topology。 ##### 安装工具包 用安装 Heron 客户端类似的方法,找到安装文件,然后安装它: ``` wget https://github.com/twitter/heron/releases/download/0.16.5/heron-tools-install-0.16.5-darwin.sh chmod +x heron-*.sh ./heron-tools-install-0.16.5-PLATFORM.sh --user ``` ##### Tracker 工具 启动 Tracker 服务器:heron-tracker 验证服务器 restful api:在浏览器中打开 http://localhost:8888 图1  启动Tracker服务器 图1 启动 Tracker 服务器 ##### UI 工具 启动 UI 网站:heron-ui 验证 UI 网站:在浏览器中打开http://localhost:8889 图2  启动UI网站 图2 启动 UI 网站 ### Heron 新特性 自从2016年夏 Twitter 开源 Heron 以来,Heron 社区开发了许多新的功能,特别是2017年 Heron 增加了“在线动态扩容/缩容”、“effectively once 传输语义”、“函数式 API”、“多种编程语言支持”、“自我调节(self-regulating)”等。 #### 在线动态扩容/缩容 根据 Storm 的数据模型,topology 的并行度是 topology 的作者在编程 topology 的时候指定的。很多情况下,topology 需要应付的数据流量在不停的变化。topology 的编程者很难预估适合的资源配置,所以动态的调整 topology 的资源配置就是运行时的必要功能需求。 直观地,改变 topology 中结点的并行度就能快速改变 topology 的资源使用量来应付数据流量的变换。Heron 通过 update 命令来实现这种动态调整。Heron 命令行工具使用 packing 算法按照用户指定的新的并行度计算 topology 的新的 packing plan,然后通过资源池调度器增加或者减少容器数量,并再将这个 packing plan 发送给 tmaster 合并成新的 physical plan,使得整个 topology 所有容器状态一致。Heron 实现的并行度动态调整对运行时的 topology 影响小,调整快速。 #### Effectively once 传输语义 Heron 在原有 tuple 传输模式 at most once和at least once 以外,新加入了 effectively once。原有的 at most once和at least once 都有些不足之处,比如 at most once 会漏掉某些 tuple;而 at least once 会重复某些 tuple。所以 effectively once 的目标是,当计算是确定性(deterministic)的时候,结果精确可信。 Effectively once 的实现可以概括为两点: - 分布式状态 checkpoint; - topology 状态回滚。 tmaster 定期向 spout 发送 marker tuple。当 topology 中的一个结点收集齐上游的 marker tuple时,会将当时自己的状态写入一个 state storage,这个过程就是 checkpoint。当整个 topology 的所有结点都完成 checkpoint 的时候,state storage 就存储了一份整个 topology 快照。如果 topology 遇到异常,可以从 state storage读取快照进行恢复并重新开始处理数据。 #### 函数式 API (Functional API) 函数式编程是近年来的热点,Heron 适应时代潮流在原有 API 的基础上添加了函数式 API。Heron 的函数式 API 让 topology 编程者更专注于 topology 的应用逻辑,而不必关心 topology/spout/bolt 的具体细节。Heron 的函数式 API 相比于原有的底层 API 是一种更高层级上的 API,它背后的实现仍然是转化为底层 API 来构建 topology。 Heron 函数式 API 建立在 streamlet 的概念上。一个 streamlet 是一个无限的、顺序的 tuple 序列。Heron 函数式 API 的数据模型中,数据处理就是指从一个 streamlet 转变为另一个 streamlet。转变的操作包括:map、flatmap、join、filter 和 window 等常见的函数式操作。 #### 多种编程语言支持 以往 topology 编写者通常使用兼容 Storm 的 Java API 来编写 topology,现在 Heron 提供 Python 和 C++ 的 API,让熟悉 Python 和 C++ 的程序员也可以编写 topology。Python 和 C++ 的 API 设计与 Java API 类似,它们包含底层 API 用来构造 DAG,将来也会提供函数式 API 让 topology 开发者更专注业务逻辑。 在实现上,Python 和 C++ 的 API 都有 Python 和 C++ 的 heron-instance 实现。它们不与 heron-instance 的 Java 实现重叠,所以减少了语言间转化的开销,提高了效率。 #### 自我调节(self-regulating) Heron 结合 Dhalion 框架开发了新的 health manager 模块。Dhalion 框架是一个读取 metric 然后对 topology 进行相应调整或者修复的框架。Health manager 由2个部分组成:detector/diagnoser 和 resolver。Detector/diagnoser 读取 metric 探测 topology 状态并发现异常,resolver 根据发现的异常执行相应的措施让 topology 恢复正常。Health manager 模块的引入,让 Heron 形成了完整的反馈闭环。 现在常用的两个场景是:1. detector 监测 back pressure 和 stmgr 中队列的长度,发现是否有些容器是非常慢的;然后 resolver 告知 heron-scheduler 来重新调度这个结点到其他 host 上去;2. detector 监测所有结点的状态来计算 topology 在全局层面上是不是资源紧张,如果发现 topology 资源使用量很大,resolver 计算需要添加的资源并告知scheduler来进行调度。 ### 结束语 在本文中,我们对比了 Heron 和常见的流处理项目,包括 Storm、Flink、Spark Streaming 和 Kafka Streams,归纳了系统选型的要点,此外我们实践了 Heron 的一个案例,最后我们讨论了Heron在这一年开发的新特性。 最后,作者希望这篇文章能为大家提供一些 Heron 应用的相关经验,也欢迎大家向我们提供建议和帮助。如果大家对 Heron 的开发和改进感兴趣,可以查看 Heron 官网(http://heronstreaming.io/)和 代码(https://github.com/twitter/heron)。 *** **参考文献** [1] Maosong Fu, Ashvin Agrawal, Avrilia Floratou, Bill Graham, Andrew Jorgensen, Mark Li, Neng Lu, Karthik Ramasamy, Sriram Rao, and Cong Wang. "Twitter Heron: Towards Extensible Streaming Engines." In 2017 IEEE 33rd International Conference on Data Engineering (ICDE), pp. 1165-1172. IEEE, 2017. [2] Sanjeev Kulkarni, Nikunj Bhagat, Maosong Fu, Vikas Kedigehalli, Christopher Kellogg, Sailesh Mittal, Jignesh M. Patel, Karthik Ramasamy, and Siddarth Taneja. "Twitter heron: Stream processing at scale." In Proceedings of the 2015 ACM SIGMOD International Conference on Management of Data, pp. 239-250. ACM, 2015. [3] Maosong Fu, Sailesh Mittal, Vikas Kedigehalli, Karthik Ramasamy, Michael Barry, Andrew Jorgensen, Christopher Kellogg, Neng Lu, Bill Graham, and Jingwei Wu. "Streaming@ Twitter." IEEE Data Eng. Bull. 38, no. 4 (2015): 15-27. [4] http://storm.apache.org/ [5] http://storm.apache.org/releases/current/Trident-tutorial.html [6] https://flink.apache.org/ [7] https://spark.apache.org/streaming/ [8] https://kafka.apache.org/documentation/streams/ [9] https://twitter.github.io/heron/api/python/ [10] https://github.com/twitter/heron/tree/master/heron/instance/src/cpp [11] https://github.com/twitter/heron/tree/master/heron/schedulers/src/java/com/twitter/heron/scheduler