## 基于 Spark 的大规模机器学习在微博的应用
文 / 吴磊,张拓宇
众所周知,自2015年以来微博的业务发展迅猛。如果根据内容来划分,微博的业务有主信息(Feed)流、热门微博、微博推送(Push)、反垃圾、微博分发控制等。每个业务都有自己不同的用户构成、业务关注点和数据特征。庞大的用户基数下,由用户相互关注衍生的用户间关系,以及用户千人千面的个性化需求,要求我们用更高、更大规模的维度去刻画和描绘用户。大体量的微博内容,也呈现出多样化、多媒体化的发展趋势。
一直以来,微博都尝试通过机器学习来解决业务场景中遇到的各种挑战。本文为新浪微博吴磊在 CCTC 2017云计算大会 Spark 峰会所做分享《基于 Spark 的大规模机器学习在微博的应用》主题的一部分,介绍微博在面对大规模机器学习的挑战时,采取的最佳实践和解决方案。
### Spark Mllib
针对微博近百亿特征维度、近万亿样本量的模型训练需求,我们首先尝试了 Apache Spark 原生实现的逻辑回归算法。采用该方式的优点显而易见,即开发周期短、试错成本低。我们将不同来源的特征(用户、微博内容、用户间关系、使用环境等)根据业务需要进行数据清洗、提取、离散化,生成 Libsvm 格式的可训练样本集,再将样本喂给 LR 算法进行训练。在维度升高的过程中,我们遇到了不同方面的问题,并通过实践提供了解决办法。
#### Stack overflow
栈溢出的问题在函数嵌套调用中非常普遍,但在我们的实践中发现,过多 Spark RDD 的 union 操作,同样会导致栈溢出的问题。解决办法自然是避免大量的 RDD union,转而采用其他的实现方式。
#### AUC=0.5
在进行模型训练的过程中,曾出现测试集 AUC 一直停留在0.5的尴尬局面。通过仔细查看训练参数,发现当 LR 的学习率设置较大时,梯度下降会在局部最优左右摇摆,造成训练出来的模型成本偏高,拟合性差。通过适当调整学习率可以避免该问题的出现。
#### 整型越界
整型越界通常是指给定的数据值过大,超出了整形(32bit Int)的上限。但在我们的场景中,导致整型越界的并不是某个具体数据值的大小,而是因为训练样本数据量过大、HDFS 的分片过大,导致 Spark RDD 的单个分片内的数据记录条数超出了整型上限,进而导致越界。Spark RDD 中的迭代器以整数(Int)来记录 Iterator 的位置,当记录数超过32位整型所包含的范围(2147483647),就会报出该错误。解决办法是在 Spark 加载 HDFS 中的 HadoopRDD 时,设置分区数,将分区数设置足够大,从而保证每个分片的数据量足够小,以避免该问题。可以通过公式(总记录数/单个分片记录数)来计算合理的分区数。
#### Shuffle fetch failed
在分布式计算中,Shuffle 阶段不可避免,在 Shuffle 的 Map 阶段,Spark 会将 Map 输出缓存到本机的本地文件系统。当 Map 输出的数据较大,且本地文件系统存储空间不足时,会导致 Shuffle 中间文件的丢失,这是 Shuffle fetch failed 错误的常见原因。但在我们的场景中,我们手工设置了 spark.local.dir 配置项,将其指向存储空间足够、I/O 效率较高的文件系统中,但还是碰到了该问题。通过仔细查对日志和 Spark UI 的记录,发现有个别 Executor 因任务过重、GC 时间过长,丢失了与 Driver 的心跳。Driver 感知不到这些 Executor 的心跳,便主动要求 Yarn 的 Application master 将包含这些 Executor 的
Container 杀掉。皮之不存、毛之焉附,Executor 被杀掉了,存储在其中的 Map
输出信息自然也就丢了,造成在 Reduce 阶段,Reducer 无法获得属于自己的那份
Map 输出。解决办法是合理地设置 JVM 的 GC 设置,或者通过将
spark.network.timeout 的时间(默认60s)设置为120s,该时间为 Driver 与 Executor 心跳通信的超时时间,给 Executor 足够的响应时间,让其不必因处理任务过重而无暇与 Driver 端通信。
通过各种优化,我们将模型的维度提升至千万维。当模型维度冲击到亿维时,因 Spark Mllib LR 的实现为非模型并行,过高的模型维度会导致海森矩阵呈指数级上涨,导致内存和网络 I/O 的极大开销。因此我们不得不尝试其他的解决方案。
### 基于 Spark 的参数服务器
在经过大量调研和初步的尝试,我们最终选择参数服务器方案来解决模型并行问题。参数服务器通过将参数分片以分布式形式存储和访问,将高维模型平均分配到参数服务器集群中的每一台机器,将 CPU 计算、内存消耗、存储、磁盘 I/O、网络 I/O 等负载和开销均摊。典型的参数服务器采用主从架构,Master 负责记录和维护每个参数服务器的心跳和状态;参数服务器则负责参数分片的存储、梯度计算、梯度更新、副本存储等具体工作。图1是我们采用的参数服务器方案。
图1 微博参数服务器架构图
蓝色文本框架即是采用主从架构的参数服务器集群,以 Yarn 应用的方式部署在 Yarn 集群中,为所有应用提供服务。在参数服务器的客户端,也是通过 Yarn 应用的方式,启动 Spark 任务执行 LR 分布式算法。在图中绿色文本框中,Spark 模型训练以独立的应用存在于 Yarn 集群中。在模型训练过程中,每个 Spark Executor 以数据分片为单位,进行参数的拉取、计算、更新和推送。
在参数服务器实现方面,业界至少有两种实现方式,即全同步与全异步。全同步的方式能够在理论层面保证模型收敛,但在分布式环境中,鉴于各计算节点的执行性能各异,加上迭代中需要彼此间相互同步,容易导致过早执行完任务的节点等待计算任务繁重的节点,引入通信边界,从而造成计算资料的浪费和开销。全异步方式能够很好地避免这些问题,因节点间无需等待和同步,可以充分利用各个节点的计算资源。虽然从理论上无法验证模型一定收敛,但是通过实践发现,模型每次的迭代速度会更快,AUC 的加速度会更高,实际训练出的模型效果可以满足业务和线上的要求。
在通过参数服务器进行 LR 模型训练时,我们总结了影响执行性能的关键因素,罗列如下:
#### Batch size
即 Spark 数据分片大小。前文提到,每个 Spark Executor 以数据分片为单位,进行参数的拉取和推送。分片的大小直接决定本次迭代需要拉取和通信的参数数量,而参数数量直接决定了本地迭代的计算量、通信量。因此分片大小是影响模型训练执行性能的首要因素。过大的数据分片会造成单次迭代任务过重,Executor 不堪重负;过小的分片虽然能够充分利用网络吞吐,但是会造成很多额外的开销。因此,选择合理的 Batch size,将会令执行性能的提升事半功倍。下文将以 Batch size 为例,对比不同设置下模型训练执行性能的差异。
#### PS server 数量
参数服务器的数量,决定了模型参数的存储容量。通过扩展参数服务器集群,理论上可以无限扩展存储容量。但是当集群大小达到瓶颈值时,过多的参数服务器带来的网络开销反而会令整体执行性能趋于平缓甚至下降。
#### 特征稀疏度
根据需要可以将原始业务特征(用户、微博内容、用户间关系、使用环境等)通过映射函数映射到高维模型,以这种方式提炼出区分度更佳的特征。特征稀疏度结合每次迭代数据分片的数据分布,决定了该分片本次迭代需要拉取和推送的参数数量,进而决定了本次迭代所需的计算资源和网络开销。
#### PS 分区策略
分区策略决定了模型参数在参数服务器的分布,好的分区策略能够使模型参数的分布更均匀,从而均摊每个节点的计算和通信负载。
#### Spark 内存规划
在 PS 的客户端,Spark Executor 需要保证有足够的内存容纳本次迭代分片所需的参数向量,才能完成后续的参数计算、更新任务。
表1所示为不同的 Batch size 下,各执行性能指标对比。Parameter(MB)表示一次迭代所需参数个数;Tx(MB)表示一次迭代的网络吞吐;Pull(ms)和Push(ms)分别表示一次迭代的拉取和推送时间消耗;Time(s)为一次迭代的整体执行时间。从表1中可见,参数个数与分片大小成正比、网络吞吐与分片大小成反比。分片越小,需要通信、处理的参数越少,但 PS 客户端与 PS 服务器通信更加频繁,因而网络吞吐更高。但是当分片过小时,会产生额外的开销,造成参数拉取、推送的平均耗时和任务的整体耗时上升。
表1 模型训练执行性能指标在不同 Batch size 下的对比
通过参数服务器的解决方案,我们解决了微博机器学习平台化进程中的大规模模型训练问题。众所周知,在机器学习流中,模型训练只是其中耗时最短的一环。如果把机器学习流比作烹饪,那么模型训练就是最后翻炒的过程,烹饪的大部分时间实际上都花在了食材、佐料的挑选,洗菜、择菜,食材再加工(切丁、切块、过油、预热)等步骤。在微博的机器学习流中,原始样本生成、数据处理、特征工程、训练样本生成、模型后期的测试、评估等步骤所需要投入的时间和精力,占据了整个流程的80%之多。如何能够高效地端到端进行机器学习流的开发,如何能够根据线上的反馈及时地选取高区分度特征,对模型进行优化,验证模型的有效性,加速模型迭代效率,满足线上的要求,都是我们需要解决的问题。在接下来的《weiflow——微博机器学习流统一计算框架》一文中,我们将为你一一解答。