## Hurricane 实时处理系统架构剖析
文/卢誉声
>本文介绍分布式存储系统以及分布式计算系统,进而引出分布式实时处理系统的基础知识及其在机器学习和深度学习领域的应用。之后介绍了 Hurricane 实时处理系统的整体结构和设计,并在此基础上,探讨结合深度学习的框架设计方案和实际解决方案。
分布式计算一般分为以下几步:
- 设计分布式计算模型:首先要规定分布式系统的计算模型。计算模型决定了系统中各个组件应该如何运行,组件之间应该如何进行消息通信,组件和节点应该如何管理等。
- 分布式任务分配:分布式算法不同于普通算法。普通算法通常是按部就班,完成任务,而分布式计算中计算任务是分摊到各个节点上的。该算法着重解决的是能否分配任务,或如何分配任务的问题。
- 编写并执行分布式程序:使用特定的分布式计算框架与计算模型,将分布式算法转化为实现,并尽量保证整个集群的高效运行。其中有一些难点。
### 计算任务的划分
分布式计算的特点就是多个节点同时运算,因此如何将复杂算法优化分解成适用于每个节点计算的小任务,并最后将节点的计算结果回收就成了问题。尤其是并行计算的最大特点是希望节点之间的计算互不相干,这样可以保证各节点以最快速度完成计算,一旦出现节点之间的等待,往往就会拖慢整个系统速度。
### 多节点之间的通信方式
另一个难点是节点之间如何高效通信。虽然我们知道划分计算任务时,最好确保任务互不相干,这样每个节点可以各自为政。但大多数时候节点之间还是需要互相通信,比如获取对方的计算结果等。一般有两种解决方案,一种是利用消息队列,将节点之间的依赖变成节点之间的消息传递。第二类是分布式存储系统,我们可以将节点的执行结果暂时存放在数据库中,其他节点等待或从数据库中获取数据。无论哪种方式只要符合实际需求都是可行的。
G. Coulouris 曾经对分布式系统下了一个简单定义:你会知道系统当中的某台电脑崩溃或停止运行了,但是你的软件却永远不会。这句话虽然简单,但是却道出了分布式系统的关键特性。分布式系统的特性包括容错、高可扩展、开放、并发处理能力和透明。
随着互联网业务数据规模的急剧增加,人们处理和使用数据的模式已然发生了天翻地覆的变化,传统的技术架构已经越来越无法适应当今海量数据处理的需求。MapReduce、Hadoop 以及 Spark 等技术的出现使得我们能处理的数据量比以前要多得多,这类技术解决了面对海量数据时的措手不及,也在一定程度上缓解了传统技术架构过时的问题。
但是,随着业务数据规模的爆炸式增长和对数据实时处理能力的需求越来越高,原本承载着海量数据处理任务的批处理系统在实时计算处理方面越发显得乏力。这么说的原因很简单,像 Hadoop 使用的 MapReduce 数据处理技术,其设计初衷并不是为了满足实时计算的需求。任务式计算模型与实时处理系统在需求上存在着本质的区别。要做到实时性,不仅需要及时推送数据以便处理,还要将数据划分成尽可能小的单位,而像 HDFS 存储推送数据的能力已经远不能满足实时性的需求。
因此,Apache Storm 实时处理系统的出现顺应了实时数据处理业务的需求。Apache Storm 是一个开源的、实时的计算平台,最初由社交分析公司 Backtype 的 Nathan Marz 编写。后来被 Twitter 收购,并作为开源软件发布。从整体架构上看,Apache Storm 和 Hadoop 非常类似。Apache Storm 从架构基础本身就实现了实时计算和数据处理保序的功能,而且从概念上看,Apache Storm 秉承了许多 Hadoop 的概念、术语和操作方法。
Apache Storm 是实时处理系统当中的典型案例,来了解下它的特点和优势:
- 高可扩展性:Apache Storm 可以每秒处理海量消息请求,同时该系统也极易扩展,只需增加机器并提高计算拓扑的并行程度即可。根据官方数据,在包含10个节点的 Apache Storm 集群中可以每秒处理一百万个消息请求,由此可以看出 Apache Storm 的实时处理性能优越。
高容错性:如果在消息处理过程中出现了异常,Apache Storm 的消息源会重新发送相关元组数据,确保请求被重新处理。
- 易于管理:Apache Storm 使用 ZooKeeper 来协调集群内的节点配置并扩展集群规模。
- 消息可靠性:Apache Storm 能够确保所有到达计算拓扑的消息都能被处理。
本文将用简短的篇幅,结合笔者自身的体会,由分布式系统为引子,逐步介绍分布式存储系统以及分布式计算系统,进而引出分布式实时处理系统的基础知识及其在机器学习和深度学习领域的应用。在了解了基本的分布式实时处理系统的特点之后,我们进入主题,一起来了解一下 Hurricane 实时处理系统的整体结构和设计,并在此基础上,探讨结合深度学习的框架设计方案和实际解决方案。
### 基本概念
Hurricane 实时处理系统是基于 C++ 实现并开源的分布式实时处理系统,与传统海量数据批处理系统不同,它采用流模型处理数据,同时为高性能浮点运算提供了原生接口。考虑到多语言支持的必要性,提供了对 Python、Java、JavaScript、Swift 编程语言的互操作接口。
我们先来介绍 Hurricane 中的一些基本概念:
- President:Hurricane 集群中的核心服务。该节点保存了整个集群的运行数据,并运行了许多必要的服务功能。
- Manager:Hurricane 集群中的工作节点。该节点负责执行具体的计算任务,并与 President 通信,不同的 Manager之间可以相互发送数据元组。
- Executor:执行器,每个 Manager 中包含多个 Executor,每个执行器负责执行一个具体的任务。
- Topology:拓扑结构,Hurricane 中计算任务的抽象,每一个完整的计算任务是一个 Topology。
- Task:Hurricane 中对具体计算与处理步骤的抽象,任务包括生成元组的 Spout 和处理元组的 Bolt。
- Tuple:Hurricane 中对数据的抽象,各个计算任务之间都使用元组进行数据交换传输。
- Stream:Stream 也是一种计算抽象,每个由 Spout 生成的一个元组会生成一个新的流,该元组派生出的所有元组都属于该流。
图1是整个集群中节点的拓扑结构,中心是一个 President 节点,外部是多个 Manager 节点。
图1 Hurricane 集群中节点拓扑结构
President 负责存储集群元数据并管理周围的 Manager 节点。
Manager 节点会接收 President 的指令,并完成具体运算。
每个 Manager 节点之间可以互相通信,通过元组传递数据。
Hurricane 的计算模型是计算拓扑(Topology),而计算拓扑的基本范式是基于 MapReduce 的概念构建出来的,这里简单介绍一下 MapReduce:MapReduce 是
Google 提出的一个软件架构,用于大规模数据集(大于1TB)的并行运算。概念 “Map(映射)”和“Reduce(归纳)”,及他们的主要思想,都是从函数式编程语言借来的,还有从矢量编程语言借来的特性。当前的软件实现是指定一个 Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的 Reduce(归纳)函数,用来保证所有映射的键值对中的每一个共享相同的键组。感兴趣的读者可以参考 https://zh.wikipedia.org/wiki/MapReduce。
每个 Topology 都是一个网络,该网络由计算任务和任务之间的数据流组成。
图2 Hurricane 和 Storm 计算模型
该模型中 Spout 负责产生新的元组,Bolt 负责处理前一级任务传递的元组,并将处理过的元组发送给下一级。
Spout 是元组的生成器,而 Bolt 则是元组的处理单元。
每个任务都会将数据封装为元组传递给其他的任务。
### Hurricane 架构
现在来看一下 Hurricane 的架构设计,如图3。
图3 Hurricane 基本架构
最上方的是 President,在介绍概念的时候我们提到过,这是整个集群的管理者,负责存储集群的所有元数据,所有 Manager 都需要与之通信并受其控制。
下方的是多个 Manager,每个 Manager 中会包含多个 Executor,每个 Executor 会执行一个任务,可能为 Spout 和 Bolt。
从任务的抽象角度来讲,每个 Executor 之间会相互传递数据,只不过都需要通过 Manager 完成数据的传递,Manager 会帮助 Executor 将数据以元组的形式传递给其他的 Executor。
Manager 之间可以自己传递数据(如果分组策略是确定的),有些情况下还需要通过 President 来得知自己应该将数据发送到哪个节点中。
#### President
了解整体架构后,我们来具体讲解一下 President 和 Manager 的架构。President 的架构如图4。
图4 President 中央控制节点结构
President 的底层是一个基于 Meshy 实现的 NetListener,该类负责监听网络,并将请求发送给事件队列,交由事件队列处理。
President 的核心是 EventQueue。这是一个事件队列,当没有计算任务的时候,会从事件队列中获取事件并进行处理。
用户需要在 EventQueue中 事先注册每个事件对应的处理函数,President 会根据事件类型调用对应的事件处理函数。
#### Manager
Manager 的架构如图5。
图5 Manager 节点结构
Manager 的架构相对来说较为复杂。考虑到性能优化等问题,这个架构修改了几次。
首先,最顶层和 President 一样,是一个事件队列,并使用一个基于 Meshy 的 NetListener 来完成 I/O 事件的响应(转换成事件放入事件队列)。不过考虑到 Manager 中每个 Executor 都是一个单独的线程,因此这个队列需要上锁,确保线程安全。
接下来有两个模块:
- 一个是 Metadata Manager,这是一个独立线程,会监听 EventQueue,接收元数据的同步事件,负责和 President 同步集群的元数据。
- 另一个是 Tuple Dispatcher,该线程负责响应 OnTuple 事件,接收其他节点发过来的元组,并将元组分发到响应的 Bolt Executor 的元组队列中。
再下一层就是 Executor。它分为 SpoutExecutor 和 BoltExecutor,每个 Executor 都是一个单独的线程,在系统初始化 Topology 的时候,Managert 会初始化 Executor,并设置其中的任务。SpoutExecutor 负责执行 Spout 任务,而 BoltExecutor 负责执行 Bolt 任务。
其中 BoltExecutor 需要接受来自其他 Executor 的 Tuple,因此包含一个 Tuple Queue。Tuple Dispatcher 会将 Tuple 投送到这个 Tuple Queue 中,而 Bolt 则从 Tuple Queue 中取出数据并执行任务。
Eexecutor 在执行完任务后,可能会将 Tuple 通过 OutputCollector 投送到 OutputQueue 中。我们又设计了一个 OutputDispatcher,从 OutputQueue 中获取 Tuple 并发送到其他节点。
OutputQueue 也是一个带锁的阻塞队列,是唯一用于输出的队列。
### 高级抽象元语 Squared
介绍完 Hurricane 的基本功能与架构之后,我们来介绍一下 Squared。
图6 Hurricane 基本的计算模型
图6是 Hurricane 基本的计算模型。在该计算模型中,系统是一个计算任务组成的网络。需要考虑每个节点的琐屑实现。但如果在日常任务中,使用这种模型相对来说会比较复杂,尤其当网络非常复杂的时候。
为了解决这个问题,看一下图7中的计算模型,这是对我们完成计算任务的再次抽象。
图7 计算任务的再次抽象
- 第一步是产生语句的数据源;
- 然后每条语句需要使用名为 SplitSentence 的函数处理将句子划分为单词;
- 接下来根据单词分组,使用 CountWord 这个统计操作完成单词的计数。
这里其实是将网络映射成了简单的数据操作流程,这样一来,解决问题和讨论问题都会变得更为简单直观。
这就是 Squared 所做的事情——将基于网络与数据流的模型转换成这种简单的流模型,让开发者更关注于数据的统计分析,脱离部分繁琐的工作。
Squared 中有下面几个基本组件:
1. Spout:对 Hurricane Spout 的封装,用于生成
Squared 使用 Tuple。
2. Dispatcher:Tuple 分发器,用来决定如何将 Tuple 分发给后续的计算操作。
- 目前有两种 Tuple 的分发器,一种是 Each,会将每一个 Tuple 都发送给后续的计算操作,后面一般连接 Function或 者 Filter;
- 第二种是 Aggregator,会将部分 Tuple 合并成一组(根据字段分组),并传递给后续的 Aggregator。
3. Operation:对 Tuple 的操作,所有的计算处理都封装在里面。我们根据一般的计算任务抽象出了三种
Operation,分别是
- Function:用于变换 Tuple;
- Filter:用于过滤 Tuple;
- Aggregator:用于对 Tuple 进行统计。
图8 将繁琐操作和分布式拓扑细节封装到 Squared 当中
图8中就是两类 Dispatcher。最上面的是 Each Dispatcher,这种 Dispatcher 会将所有的 Tuple 分别发送到 Function 和 Filter 中进行处理。下面是 GroupBy Dispatcher,这种 Dispatcher 会将部分
Tuple 分为一组,开发者可以指定分组使用的字段,GroupDispatcher 将 Tuple 分组传入 Aggregator 进行处理。
### 保序
在现实工作中,我们常常需要一个的特性就是保序。比如部分银行交易和部分电商订单处理,希望数据按照顺序进行处理,但是传统的数据处理系统往往不支持这个特性。
Squared 实现了保序功能。目前 Squared 的实现原理很简单,会在未来逐步改进和提高稳定性、可用性和效率。首先每个 Tuple 会一个一个 orderId 字段,orderId 是依据顺序生成的,然后所有对 Tuple 的操作都会检验该 orderId 之前的 Tuple 是否已经完成。
如果已经完成则处理该 Tuple,否则就将 Tuple 放在一个队列里,等待前面的 Tuple 处理完毕为止。
保序功能的架构,如图9。
图9 分布式消息保序基本结构
President 中有一个 OrderId Generator,作用是生成分布式的 OrderId。
Manager 中需要有一个特殊的 OrderSpout,生成带 OrderId 的 OrderTuple。
Manager 中的 OrderBolt 则需要使用 OrderId Sliding Window 判断该 OrderId 之前的 OrderId 是否都被处理过了。
假设我们有 A、B、C、D 这4个服务器逻辑节点:A 是 Spout,B、C、D 是数据操作(Operation)。
那么大致的数据请求过程可以描述为:
1. A 产生 TupleA1,请求 President 生成 OrderId 1;
2. A 将 TupleA1 发给 B,B 处理 TupleA1,发出 TupleB1,OrderId 为1,滑动窗口 B 移动1位;
3. A 产生 TupleA2,请求 President 生成 OrderId 2;
4. A 产生 TupleA3,请求 President 生成 OrderId 3;
5. B 收到 TupleA3,暂时无法处理,缓存;
6. B 收到 TupleA2,处理 TupleA2,发出 TupleB2,OrderId 为2,滑动窗口 B 移动1位;
7. B 扫描缓存,TupleA3 的 OrderId 为3,可以处理,处理 TupleA3,发出 TupleB3,OrderId 为3,滑动窗口 B 移动1位。
实现效果如图10所示。
图10 保序时序图
现在来看一下整个过程的实现,如图11:
图11 Hurricane 保序流程示意图
1. OrderSpout 在生成 Tuple 的时候都会向
President 申请一个新的 OrderId,这个 OrderId 是整个集群唯一而且持续递增的;
2. OrderSpout 使用 OrderId 和用户发送的数据生成一个 OrderTuple,并将 OrderTuple 发送到其目的节点中;
3. OrderBolt 接收到 OrderTuple 的时候,会利用分布式的 OrderId 滑动窗口检测该 Tuple 之前的元组是否已经都被处理过了,如果全部处理过了则处理该 Tuple,没有则缓存该 Tuple。
这样一来我们就实现了保序功能,图12为滑动窗口示意图。
图12 滑动窗口
### 子系统
作为一个可扩展的系统,类似于插件的子系统机制是必不可少的。但是,众所周知在 C/C++ 中实现可热插拔的子系统是一件较为困难的事情。为了便于用户在 Hurricane 中动态加载卸载子系统,方便 Hurricane 扩展,我们设计了一个独立的子系统机制。
每个子系统就像一个插件,你可以在需要的时候加载,一旦不需要就可以卸载插件。同时我们为子系统定义了一套跨平台的通用接口,这个接口本身是面向对象的,你可以使用面向对象的思路来定义与使用子系统。
子系统的架构如图13所示。
图13 Hurricane 子系统架构
Hurricane 在中间构建了一个轻量级的子系统层,并为子系统的实现者提供了子系统的定义与实现接口,为子系统的调用者提供了装载与调用接口。
为了保证跨平台性,子系统之间使用 Tuple 作为通信方式,因此子系统中传递参数与使用参数的方式也与 Hurricane 中非常相似。
现在我们来看一下子系统的 API。
每个子系统都会有一个接口定义,实现者需要将接口定义放在 HSS _ BEGIN _ CLASS 和 HSS _ END _ CLASS 中,并使用 HSS _ BEGIN _ CLASS 为子系统定义一个名字。
在接口定义区域中可以使用 HSS _ METHOD 为子系统定义一个接口方法,也可以使用 HSS _ PROPERTY 为子系统定义一个接口属性。
子系统需要编译成独立的 so 或者 dll,便于动态加载卸载。
调用方只需要使用 HssLoadClass 就可以加载一个子系统,HssUnloadClass 则用于卸载子系统。
使用 HssLoadClass 可以获得一个结构体,使用结构体中的 Create 函数可以创建一个对象,Destroy 用于销毁对象,除了两个固定接口其他的函数则要根据子系统的定义调用。
### BLAS 子系统
图14实现了一个官方的子系统,这是一个 BLAS 子系统。BLAS 是基础线性代数子程序,这是在 C/C++ 的科学计算程序中经常用到的组件。这套组件有一个基本的接口标准,但是在不同平台往往有不同实现,这些实现在运行速度以及适用情况上有很大不同。
这有两个问题:
图14 Hurricane BLAS 扩展
1. 部分程序实现会和部分实现密切绑定,在换编译器、实现库或者进行异构运算的时候较为麻烦;
2. BLAS 的标准接口对于 C++ 开发者来说较难使用,许多系统都有自己的 BLAS 抽象,因此我们需要做出一套面向对象的标准抽象接口。
为此,我们设计了 BLAS 这个子系统,支持许多常见的 BLAS 实现,支持同一系统中进行异构运算,并提供 C++ 的一套面向对象的标准接口。最终目的是为了让用户可以在 Hurricane 上更惬意地处理科学计算问题。
接着来简单了解下 Hurricane 实时处理系统的多语言支持。
毋庸置疑,一个庞大复杂的实际系统不可能整个系统都使用 C++ 编写。首先就是 C++ 的入门门槛高,平均开发效率无法和其他语言相比。其次,现在大部分的 Web 应用都是使用 Java 或者脚本语言开发,因此我们必须考虑 Hurricane 的多语言接口问题。
为此,Hurricane 的思想是以基本的 C++ 为后端,然后在 C++ 上面封装其他语言的接口。此外还提供 Bolt 和 Spout 的实现接口,让其他语言可以直接编写计算组件。当用户希望使用其他语言快速实现部分新的算法和模型的时候,这种特性就会非常有用。
### 深度学习与 Hurricane
深度学习目前是一个非常热门的领域,由于计算资源越来越充足,数据越来越多,因此深度学习可以应用的范围也越来越广。数据是深度学习的重要支撑,如果没有足够多的数据,是无法通过深度学习得到一个性能足够好的模型的,即使网络本身再好,也弥补不了数据数量带来的影响。
数据量的增加给数据管理与处理带来了很多麻烦。如何有效存储大量的零散数据、如何快速查找处理这些数据成了值得研究的问题。而 Hurricane 正可以有效解决快速处理数据的问题。
在我们的架构中,Hurricane 在深度学习的两个阶段扮演着非常重要的角色,一个是在数据预处理阶段,另一个是在使用深度学习训练模型提供服务的时候。
深度学习的训练数据往往是数据庞大的数据流,比如来自特定场景的图像数据、语音数据和文本数据等等,这些数据会被实时传输并存储在服务器中。在进行实验之前我们必须要整理数据,这个整理数据的过程就是预处理,包括数据清洗、数据集成、数据变换等各种操作,然后将其转换成可以由深度学习框架训练程序识别的程序。在传统过程中我们会收集大量的数据并使用批处理程序完成这些工作。但如果利用 Hurricane,可以将数据预处理实时化,在收集数据的过程中一边存储数据一边完成数据预处理,更加充分利用机器的性能,并节省数据整理和训练的时间。
而在深度学习训练完模型后,我们需要使用模型来提供服务。这个过程就是将用户提交的数据输入到网络中并得到网络的数据。通常我们会建立一个消息队列,然后多个工作线程从消息队列中抓取任务,调用深度学习框架使用网络计算得到网络输出,这个过程还要处理比如事务控制、错误处理、负载均衡等各种问题,确保功能正确稳定,并充分利用机器的计算能力。这种情况下每个计算任务一般不会到一秒钟,需要处理的数据流量非常大,希望为用户提供低延迟、高吞吐量的服务,这正是 Hurricane 的绝佳应用场景。开发者可以用 Hurricane 替代消息队列等各种服务,使用 Hurricane 调用深度学习框架计算出结果,并在业务系统中使用 DRPC 调用 Hurricane 的计算服务,如图15所示。
图15 深度学习框架 Caffe 与 Hurricane 集成
需要记住的是 Hurricane 只是一个辅助性工具,因此具体使用哪个深度学习框架完全取决于用户,Hurricane 自身支持 Caffe,而用户也可以自行编写程序调用 MXNet 和 TensorFlow 等框架,这个选择权完全交给开发者。
总之将 Hurricane 用在深度学习中可以将许多过程自动化,提升工作效率,改善整个工作流。
### 性能对比
性能也是 Hurricane 实时处理系统关注的一个点,因此也针对一些特定的应用场景进行了 Benchmark,当然,实际生产环境中的问题是复杂多变的,Hurricane 也会随着时间推移,逐步去解决和改进性能——这也是未来开发方向的侧重点之一。
图16是性能比较的结构。
图16 Hurricane 粗粒度的性能比对
现在我们控制网络结构,使用不同的工具和网络完成性能测试,测试结果如图17所示。
图17 简单性能比较
### 未来之路
Hurricane 实时处理系统通过 Squared 高级抽象元语实现基本的高级抽象元语。由于高级抽象元语的覆盖面十分广,因此我们仍然可以对 Squared 高级抽象元语的功能进行进一步地扩充,让 Hurricane 实时处理系统拥有功能更加完善的高层次抽象元语,进一步简化构建计算拓扑的工作量。目前,Squared 高级抽象元语的任务分配机制较为单一,尚未优化任务分配的过程。对于部分流操作来说,我们还可以在 Squared 当中增添更多的
Function/Filter。另外,我们已经在 Squared 高级抽象元语当中实现了可靠消息处理的核心算法,但是还需要对另外一些异常情况进行处理和优化,使得 Hurricane 实时处理系统更加健壮。最后,Squared 高级抽象元语只实现了基于内存的状态存储机制,之后需要进一步增加 Memcached、Cassandra、MongoDB 等流行的缓存和数据库的支持,也需要进一步丰富状态的存储和获取机制与策略。
我们讨论的 Hurricane 实时处理系统中包含了以下几个开源项目:
1. Hurricane Framework:实时处理实现;
2. libMeshy:网络库;
3. Kake:构建系统。
从以上讨论可以了解,大数据实时处理系统相较于传统的批处理系统在实时性方面拥有着得天独厚的优势,也很有可能为未来海量数据处理和机器学习提供基本支持,拥有着广阔的前景。
最后,这里献上开源代码仓库 GitHub 地址,如果你感兴趣或者希望成为项目贡献者,即刻 Push。