ch10.md 111.2 KB
Newer Older
V
Vonng 已提交
1 2 3 4
# 10. 批处理 

![](img/ch10.png)

M
MuAlex 已提交
5
> 带有太强个人色彩的系统无法成功。当最初的设计完成并且相对稳定时,不同的人们以自己的方式进行测试,真正的考验才开始。
V
Vonng 已提交
6 7
>
> ——高德纳
V
Vonng 已提交
8 9 10 11 12

---------------

[TOC]

M
MuAlex 已提交
13
在本书的前两部分中,我们讨论了很多关于请求和查询以及相应的响应或结果。许多现有数据系统中都采用这种数据处理方式:你发送请求指令,一段时间后(我们期望)系统会给出一个结果。数据库,缓存,搜索索引,Web服务器以及其他一些系统都以这种方式工作。
V
Vonng 已提交
14

M
MuAlex 已提交
15
像这样的线上系统,无论是浏览器请求页面还是调用远程API的服务,我们通常认为请求是由用户触发的,并且正在等待响应。他们不应该等太久,所以我们非常关注系统的响应时间(参阅“[描述性能](ch1.md)”)。
V
Vonng 已提交
16

M
MuAlex 已提交
17
Web和越来越多的基于HTTP/REST的API使交互的请求/响应风格变得如此普遍,以至于很容易将其视为理所当然。但我们应该记住,这不是构建系统的唯一方式,其他方法也有其优点。我们来看看三种不同类型的系统:
V
Vonng 已提交
18

M
MuAlex 已提交
19
 ***服务(线上系统)***
V
Vonng 已提交
20

M
MuAlex 已提交
21
- 服务等待客户的请求或指令到达。每收到一个,服务会试图尽快处理它,并发回一个响应。响应时间通常是服务性能的主要衡量指标,可用性通常非常重要(如果客户端无法访问服务,用户可能会收到错误消息)。
V
Vonng 已提交
22

M
MuAlex 已提交
23
***批处理系统(线下系统)***
V
Vonng 已提交
24

M
MuAlex 已提交
25
- 一个批处理系统有大量的输入数据,跑一个作业(job)来处理它,并生成一些输出数据,这往往需要一段时间(从几分钟到几天),所以通常不会有用户等待作业完成。相反,批量作业通常会定期运行(例如,每天一次)。批处理作业的主要性能衡量标准通常是吞吐量(处理特定大小的输入所需的时间)。本章中讨论的就是批处理。
V
Vonng 已提交
26 27 28

***流处理系统(近实时系统)***

M
MuAlex 已提交
29
- 流处理介于线上和线下(批处理)之间,所以有时候被称为近实时或近线(nearline)处理。像批处理系统一样,流处理消费输入并产生输出(并不需要响应请求)。但是,流式作业在事件发生后不久就会对事件进行操作,而批处理作业则需等待固定的一组输入数据。这种差异使流处理系统比起批处理系统具有更低的延迟。由于流处理基于批处理,我们将在第11章讨论它。
V
Vonng 已提交
30

M
MuAlex 已提交
31
正如我们将在本章中看到的那样,批处理是构建可靠,可扩展和可维护应用程序的重要组成部分。例如,2004年发布的批处理算法Map-Reduce(可能有点过于热门)被称为“造就Google大规模可扩展性的算法”)被称为“造就Google大规模可扩展性的算法”[2]。随后在各种开源数据系统中得到应用,包括Hadoop,CouchDB和MongoDB。
V
Vonng 已提交
32

M
MuAlex 已提交
33
与多年前为数据仓库开发的并行处理系统【3,4】相比,MapReduce是一个相当低级别的编程模型,但它在现有硬件水平基础上,迈出了处理大数据重要的一步。虽然MapReduce的重要性正在下降【5】,但它仍然值得理解,因为它提供了一个清晰的画面来阐述批处理为什么以及如何有用。
V
Vonng 已提交
34

M
MuAlex 已提交
35
实际上,批处理是一种非常古老的计算方式。早在可编程数字计算机诞生之前,打孔卡制表机(例如1890年美国人口普查【6】中使用的霍尔里斯机)实现了半机械化的批处理形式,从大量输入中汇总计算。 Map-Reduce与1940年代和1950年代广泛用于商业数据处理的机电IBM卡片分类机器有着惊人的相似之处[7]。正如我们所说,历史总是在不断重演。
V
Vonng 已提交
36

M
MuAlex 已提交
37
在本章中,我们将了解MapReduce和其他一些批处理算法和框架,并探索它们在现代数据系统中的作用。但首先我们将看看使用标准Unix工具的数据处理。即使你已经熟悉了它们,Unix的哲学也值得一读,Unix的思想和经验教训可以转移到大规模,异构的分布式数据系统中。
V
Vonng 已提交
38

V
Vonng 已提交
39 40 41 42


## 使用Unix工具的批处理

M
MuAlex 已提交
43
我们从一个简单的例子开始。假设您有一台Web服务器,每次处理请求时都会在日志文件中附加一行。例如,使用nginx默认访问日志格式,日志的一行可能如下所示:
V
Vonng 已提交
44

M
MuAlex 已提交
45
```
V
Vonng 已提交
46 47 48 49 50
216.58.210.78 - - [27/Feb/2015:17:55:11 +0000] "GET /css/typography.css HTTP/1.1" 
200 3377 "http://martin.kleppmann.com/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_5) 
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/40.0.2214.115 Safari/537.36"
```

M
MuAlex 已提交
51
(实际上这只是一行,分成多行只是为了便于阅读)。这一行中有很多信息。为了理解它,你需要看看日志格式的定义,如下所示:
V
Vonng 已提交
52

M
MuAlex 已提交
53
```
V
Vonng 已提交
54 55 56 57
 $remote_addr - $remote_user [$time_local] "$request"
 $status $body_bytes_sent "$http_referer" "$http_user_agent"
```

M
MuAlex 已提交
58
日志的这一行表明在2015年2月27日17:55:11 UTC,服务器从客户端IP地址216.58.210.78接收到对文件`/css/typography.css`的请求。用户没有被认证,所以`$remote_user`被设置为连字符(`-` )。响应状态是200(即请求成功),响应的大小是3377字节。网页浏览器是Chrome 40,并且加载了该文件,因为在URL `http://martin.kleppmann.com/`的页面中引用。
V
Vonng 已提交
59 60 61



V
Vonng 已提交
62 63
### 分析简单日志

M
MuAlex 已提交
64
很多工具可以利用这些日志文件产生关于你网站流量的漂亮的报告,但为了练手,让我们使用基本的Unix功能创建自己的工具。 例如,假设你想在你的网站上找到五个最受欢迎的网页。 你可以在Unix shell中这样做:[^i]
V
Vonng 已提交
65

M
MuAlex 已提交
66
[^i]: 有些人认为`cat`这里并没有必要,因为输入文件可以直接作为awk的参数。 但这种写法让线性管道更为显眼。
V
Vonng 已提交
67 68 69 70 71 72 73 74 75 76 77

```bash
cat /var/log/nginx/access.log | #1
	awk '{print $7}' | #2
	sort             | #3
	uniq -c          | #4
	sort -r -n       | #5
	head -n 5          #6
```

1. 读取日志文件
M
MuAlex 已提交
78 79 80 81 82
2. 将每一行按空格分割成不同的字段,每行只输出第七个字段,恰好是请求的URL。在我们的例子中是`/css/typography.css`
3. 按字母顺序排列请求的URL列表。如果某个URL被请求过n次,那么排序后,文件在一行中会重复出现n次该URL。
4. uniq命令通过检查两个相邻的行是否相同来过滤掉输入中的重复行。 -c则表示输出一个计数器:对于每个不同的URL,它会报告输入中出现该URL的次数。
5. 第二种排序按每行起始处的数字(-n)排序,这是URL的请求次数。然后逆序(-r)返回结果,大的数字在前。
6. 最后,只输出前五行(-n 5),并丢弃其余的。该系列命令的输出如下所示:
V
Vonng 已提交
83

M
MuAlex 已提交
84
```
V
Vonng 已提交
85 86 87 88 89 90 91
    4189 /favicon.ico
    3631 /2013/05/24/improving-security-of-ssh-private-keys.html
    2124 /2012/12/05/schema-evolution-in-avro-protocol-buffers-thrift.html
    1369 /
     915 /css/typography.css
```

M
MuAlex 已提交
92
如果你不熟悉Unix工具,上面的命令行可能看起来有点吃力,但是它非常强大。它能在几秒钟内处理几GB的日志文件,并且您可以根据需要轻松修改命令。例如,如果要从报告中省略CSS文件,可以将awk参数更改为`'$7 !~ /\.css$/ {print $7}'`,如果想统计最多的客户端IP地址,可以把awk参数改为`'{print $1}'`等等。
V
Vonng 已提交
93

M
MuAlex 已提交
94
我们不会在这里详细探索Unix工具,但是它非常值得学习。令人惊讶的是,使用awk,sed,grep,sort,uniq和xargs的组合,可以在几分钟内完成许多数据分析,并且它们的性能相当的好[8]。
V
Vonng 已提交
95 96 97

#### 命令链与自定义程序

M
MuAlex 已提交
98
抛开Unix命令链,你可以写一个简单的程序来做同样的事情。例如,在Ruby中,它可能看起来像这样:
V
Vonng 已提交
99 100 101 102 103

```ruby
counts = Hash.new(0)         # 1
File.open('/var/log/nginx/access.log') do |file| 
    file.each do |line|
M
MuAlex 已提交
104
        url = line.split[6]  # 2
V
Vonng 已提交
105 106 107 108 109 110 111 112
        counts[url] += 1     # 3
    end
end

top5 = counts.map{|url, count| [count, url] }.sort.reverse[0...5] # 4
top5.each{|count, url| puts "#{count} #{url}" }                   # 5
```

M
MuAlex 已提交
113 114 115 116
1. `counts`是一个存储计数器次数的哈希表,初始情况下每个网址对应的计数为零。
2. 我们读取日志的每一行,把获取第七个空格分隔的字段(这里的数组索引是6,因为Ruby的数组从0开始)对应的URL。
3. 将日志当前行中URL的计数加一。
4. 按计数器值(降序)对哈希表进行排序,并取前五位。
V
Vonng 已提交
117 118
5. 打印出前五个条目。

M
MuAlex 已提交
119
这个程序并不像Unix管道那样简练,但是它的可读性很强,对于这两种方式每个人有不同的偏好。但是,除了表面上的语法差异,两者执行流程也有很大差异,文件越大则越明显。
V
Vonng 已提交
120

M
MuAlex 已提交
121
#### 排序 VS 内存中的聚合
V
Vonng 已提交
122

M
MuAlex 已提交
123
Ruby脚本在内存中保存着一个URL的哈希表,其中每个URL映射到它被统计的次数。 Unix流水线没有这样一个哈希表,而是依赖于对URL列表的排序,在这个URL列表中,同一个URL的只是简单的重复出现。
V
Vonng 已提交
124

M
MuAlex 已提交
125
哪种方法更好?这取决于你有多少个不同的URL。对于大多数中小型网站,您可能可以为所有不同网址提供一个计数器(假设我们使用1GB内存)。在此例中,作业的工作集(作业需要随机访问的内存大小)仅取决于不同URL的数量:如果单个URL有一百万个日志条目,则哈希中所需的空间表仍然只有一个URL加上一个计数器的大小。如果这个工作集足够小,那么内存哈希表工作正常,甚至在性能较差的笔记本电脑上也可以。
V
Vonng 已提交
126

M
MuAlex 已提交
127
另一方面,如果作业的工作集大于可用内存,则排序方法的优点是可以高效地使用磁盘。这与我们在第74页的“[SSTables和LSM树]”中讨论过的原理是一样的:数据块可以在内存中排序并分段写入磁盘,然后多个有序的分段可以合并为一个更大的有序文件。归并排序具有在磁盘上运行良好的顺序访问模式。(请记住,在顺序I/O中进行优化是第3章中反复出现的主题,这里我们再次提到。)
V
Vonng 已提交
128

M
MuAlex 已提交
129
Linux GNU 核心工具(Coreutils)中的排序自动把大于可用内存的数据集在磁盘进行分割,并且通过CPU的多核并行排序【9】。这意味着之前简单的Unix命令链很容易扩展到大数据集,并且不会耗尽内存。瓶颈更可能是从磁盘读取输入文件的速度。
V
Vonng 已提交
130 131 132



V
Vonng 已提交
133 134
### Unix哲学

M
MuAlex 已提交
135
我们可以非常容易地使用前一个例子中的命令链来分析日志文件,这并非巧合:这实际上是Unix的关键设计思想之一,而且它今天依然令人惊讶。让我们更深入地进行研究,这样我们可以从Unix中借鉴一些想法【10】。
V
Vonng 已提交
136

M
MuAlex 已提交
137
Unix管道的发明者道格·麦克罗伊(Doug McIlroy)在1964年首先做出了描述【11】:“我们需要一种像花园软管一样的连接程序的方法 - 可以把数据通过拧口传输到不同的片段中。这也是I/O的实现方式”。通过管道连接程序的想法成为了Unix的哲学的一部分,Unix哲学成为在开发人员和用户之间流行的设计准则。1978年, Unix哲学得到具体描述【12,13】:
V
Vonng 已提交
138

M
MuAlex 已提交
139 140 141 142
1. 让每个程序都做好一件事。如果有新的任务,重新建立一个新的程序,而不是在原有程序上增加“功能”使其复杂化。
2. 每个程序的输出应该成为另一个程序的输入,而不是未知程序。不要将输出与无关的信息混淆。避免使用严格的柱状或二进制输入格式。不要坚持交互式输入。
3. 设计和构建软件,甚至是操作系统时要尽早尝试,最好在几周内完成。毫不犹犹豫扔掉笨拙的部分并重建它们。
4. 优先使用工具而不是拙劣的帮助来减轻编程任务,即使您必须花费额外经历开发工具,并且其中一些工具可能今后再也不会用到。
V
Vonng 已提交
143

M
MuAlex 已提交
144
这种方式 - 自动化,快速原型设计,迭代式迭代,对实验友好,将大型项目分解成可管理的模块 - 听起来非常像今天的敏捷和DevOps概念。令人惊奇的是这个理念四十年来变化不大。
V
Vonng 已提交
145

M
MuAlex 已提交
146
排序工具是一个“做好一件事”很好的例子。它可以说比大多数编程语言的标准库(不会分割到磁盘并且不使用多线程,即使这么做有好处)中的实现更好。然而,排序工具几乎不会单独使用。它只能与其他Unix工具(如`uniq`)结合使用。
V
Vonng 已提交
147

M
MuAlex 已提交
148
像bash这样的Unix shell可以让我们轻松地将这些小程序组合成令人惊讶的强大数据处理作业。尽管这些程序有很多是由不同人群编写的,但它们可以灵活地组合在一起。 Unix如何实现这种可组合性?
V
Vonng 已提交
149 150 151

#### 统一的接口

M
MuAlex 已提交
152
如果您希望一个程序的输出成为另一个程序的输入,那意味着这些程序必须使用相同的数据格式 - 换句话说是一个兼容的接口。如果您希望能够将任何程序的输入和其他程序的输出连接起来,那意味着所有程序必须使用相同的I/O接口。
V
Vonng 已提交
153

M
MuAlex 已提交
154
在Unix中,该接口是一个文件(更准确地说,是一个文件描述符)。一个文件只是一个有序的字节序列。因为这是一个非常简单的接口,所以可以用来表示许多不同的东西:文件系统上的真实文件,到另一个进程(Unix套接字,stdin,stdout)的通信通道,设备驱动程序(比如`/dev/audio``/dev/lp0`),表示TCP连接的套接字等等。这看起来似乎理所当然,但实际上这些非常不同的事物可以共享一个统一接口是很牛逼的,这使它们可以很容易地连接在一起[^ii]。
V
Vonng 已提交
155

M
MuAlex 已提交
156
[^ii]: 统一接口的另一个例子是URL和HTTP,他们是Web的基石。 一个URL标识一个网站上的一个特定的东西(资源),你可以在一个网站上链接到任何网址。 浏览器用户因此可以通过链接在网站之间无缝跳转,即使服务器属于完全不相关的组织。 这个原则看起来平淡无奇,但它是网络取得今天成功的关键。 之前的系统并不是那么统一:例如,在广播公告系统(BBS)时代,每个系统都有自己的电话号码和波特率配置。 从一个BBS到另一个BBS的引用必须以电话号码和调制解调器设置的形式; 用户将不得不挂断,拨打其他BBS,然后手动找到他们正在寻找的信息,无法直接链接到另一个BBS中内容。
V
Vonng 已提交
157

M
MuAlex 已提交
158
按照惯例,许多(但不是全部)Unix程序将这个字节序列视为ASCII文本。我们的日志分析案例使用了这个事实:awk,sort,uniq和head都将它们的输入文件视为由`\n`(换行符,ASCII 0x0A)字符分隔的记录列表。 `\n`的选择是有争议的 - ASCII记录分隔符`0x1E`可能是一个更好的选择,因为它是为了这个目的而设计的【14】, 但是无论如何,所有这些程序使用相同的记录分隔符允许它们进行交互。
V
Vonng 已提交
159

M
MuAlex 已提交
160
每个记录(比如一行输入)的解析更加模糊。 Unix工具通常通过空格或制表符将行分割成字段,但也使用CSV(逗号分隔),管道分隔和其他编码。即使像`xargs`这样一个相当简单的工具也有六个命令行选项,用于指定如何解析输入。
V
Vonng 已提交
161

M
MuAlex 已提交
162
ASCII文本的统一接口大多数情况下好用,但它不优雅:我们的日志分析示例使用`{print $ 7}`来提取网址,这个命令的可读性很差。理想情况下,这可能是`{print $ request_url}`或类似的东西。我们稍后来看这个想法。
V
Vonng 已提交
163

M
MuAlex 已提交
164
尽管几十年后仍然不够完美,但统一的Unix接口还是非同凡响。与Unix工具一样,软件的交互操作并不是很多:你不能通过定制分析工具将你的邮件和在线购物记录通过管道导入电子表格并且发布到社交网络或者维基。像Unix工具一样流畅地将程序组合运行是一个例外,而不是规范。
V
Vonng 已提交
165

M
MuAlex 已提交
166
即使是具有相同数据模型的数据库,也往往不容易进行数据迁移。这种缺乏整合导致数据的巴尔干化[^译注i]。
V
Vonng 已提交
167

M
MuAlex 已提交
168
[^译注i]: ***巴尔干化(Balkanization)*** 是一个常带有贬义的地缘政治学术语,其定义为:一个国家或政区分裂成多个互相敌对的国家或政区的过程。
V
Vonng 已提交
169

V
Vonng 已提交
170 171


M
MuAlex 已提交
172
#### 逻辑和线路的分离
V
Vonng 已提交
173

M
MuAlex 已提交
174
Unix工具的另一个特点是使用标准输入(stdin)和标准输出(stdout)。如果你运行一个程序,而不指定任何其他的东西,将会从键盘获得并且标准输出到屏幕上。但是,您也可以从文件获得输入和/或将输出重定向到文件。管道允许您将一个进程的标准输出附加到另一个进程的标准输入上(通过较小的内存缓冲区,而不需要将整个中间数据写入磁盘)。
V
Vonng 已提交
175

M
MuAlex 已提交
176
程序仍然可以直接读取和写入文件,但如果程序不关注特定的文件路径,只使用标准输入和标准输出,则Unix的方法效果最好。这允许shell用户以任何他们想要的方式连接输入和输出;该程序不知道或不关心输入来自哪里以及输出到哪里。 (可以说这是一种松耦合,后期绑定【15】或控制反转【16】)。将输入/输出的连接与程序逻辑分开,可以更容易将小模块组合成更大的系统。
V
Vonng 已提交
177

M
MuAlex 已提交
178
您甚至可以编写自己的程序,并将它们与操作系统提供的工具组合在一起。你的程序只需要从标准输入读取输入,并将输出写入标准输出,并且可以参与数据处理流水线。在日志分析案例中,您可以编写一个将user-agent字符串转换为更易读的浏览器标识符,或者将IP地址转换为国家代码的工具,并将其插入管道。排序程序并不关心它和操作系统的一部分还是你写的程序进行通信。
V
Vonng 已提交
179

M
MuAlex 已提交
180
但是,使用stdin和stdout可以做什么是有限的。需要多个输入或输出的程序会很棘手。如果程序直接打开文件进行读写,或者启动一个子程序,或者打开网络连接,则无法将程序的输出传输到网络连接中【17,18】[^iii],这种情况下I/O操作由程序本身连接。它仍然可以配置(例如通过命令行选项),但是减少了在shell中连接输入和输出的灵活性。
V
Vonng 已提交
181

M
MuAlex 已提交
182
[^iii]: 除了使用一个单独的工具,如`netcat`或`curl`。 Unix开始试图将所有东西都表示为文件,但是BSD套接字API偏离了这个惯例[17]。研究型操作系统Plan 9和Inferno在使用文件方面更加一致:它们将TCP连接表示为`/net/tcp`中的文件[18]。
V
Vonng 已提交
183

M
MuAlex 已提交
184
#### 透明度和调试
V
Vonng 已提交
185

M
MuAlex 已提交
186
Unix工具如此成功的部分原因是它们使得查看正在发生的事情变得非常容易:
V
Vonng 已提交
187

M
MuAlex 已提交
188
- Unix命令的输入文件通常被视为不可变的。这意味着您可以随意运行命令,尝试各种命令行选项,而不会损坏输入文件。
V
Vonng 已提交
189

M
MuAlex 已提交
190 191 192
- 您可以在任何时候结束管道,将输出管道输送到`less`,然后查找它是否具有预期的数据。这种检查能力对调试非常有用。
 
- 您可以将一个流水线阶段的输出写入文件,并将该文件用作下一阶段的输入。这使您可以重新启动后面的阶段,而无需重新运行整个管道。
V
Vonng 已提交
193

M
MuAlex 已提交
194
因此,与关系数据库的查询优化器相比,即使Unix工具非常直接和简单,仍然非常有用,特别是对于调试来说。
V
Vonng 已提交
195

M
MuAlex 已提交
196
然而,Unix工具的最大局限在于它们只能在一台机器上运行 - 而Hadoop这样的工具就此应运而生。
V
Vonng 已提交
197

V
Vonng 已提交
198 199 200 201


## MapReduce和分布式文件系统

M
MuAlex 已提交
202
MapReduce有点像Unix工具,但分布在数千台机器上。像Unix工具一样,这是一个相当直接的,蛮力的,但却是令人惊讶的有效工具。一个MapReduce作业可以和一个Unix进程类似:它获取一个或多个输入,并产生一个或多个输出。
V
Vonng 已提交
203

M
MuAlex 已提交
204
和大多数Unix工具一样,运行MapReduce作业通常不会修改输入,除了生成输出外没有任何副作用。输出文件以顺序的方式写入一次(一旦生成输出,不会修改任何现有的文件)。
V
Vonng 已提交
205

M
MuAlex 已提交
206
Unix工具使用`stdin``stdout`作为输入和输出,MapReduce作业在分布式文件系统上读写文件。在Hadoop的Map-Reduce实现中,该文件系统被称为HDFS(Hadoop分布式文件系统),一个开源Google文件系统(GFS)的重新实现【19】。
V
Vonng 已提交
207

M
MuAlex 已提交
208
除HDFS外,还有各种其他分布式文件系统,如GlusterFS和Quantcast File System(QFS)【20】。诸如Amazon S3,Azure Blob存储和OpenStack Swift [21]等对象存储服务非常类似[^iv]。在本章中,我们将主要使用HDFS作为示例,但是原则适用于任何分布式文件系统。
V
Vonng 已提交
209

M
MuAlex 已提交
210
[^iv]: 对于HDFS,一个不同之处在于可以将计算任务安排在存储特定文件副本的计算机上运行,而对象存储通常将存储和计算分开。如果网络带宽是瓶颈,从本地磁盘读取有性能优势。但是请注意,如果是删除功能,局部存储将没有优势,因为来自多台机器的数据必须进行合并以重建原始文件[20]。
V
Vonng 已提交
211

M
MuAlex 已提交
212
与网络连接存储(NAS)和存储区域网络(SAN)架构的共享磁盘方法相比,HDFS基于无共享原则(参见第二部分的介绍)。共享磁盘存储由集中式存储设备实现,通常使用定制硬件和专用网络基础设施(如光纤通道)。另一方面,无共享方法不需要特殊的硬件,只需要通过传统数据中心网络连接的计算机。
V
Vonng 已提交
213

M
MuAlex 已提交
214
HDFS包含在每台机器上运行的守护进程,暴露一个允许其他节点访问存储在该机器上的文件的网络服务(假设数据中心中的每台通用的计算机都有磁盘)。名为NameNode的中央服务器会跟踪哪个文件块存储在哪台机器上。因此在概念上,HDFS创建了一个大文件系统的守护进程,可以使用所有机器磁盘上的内容。
V
Vonng 已提交
215

M
MuAlex 已提交
216
为了容忍机器和磁盘故障,文件块被复制到多台机器上。复制可能意味着在多个机器上存在相同数据的多个副本,如第5章中所述,或者像Reed-Solomon代码这样的擦除编码方案,比完全复制花费更小的开销恢复丢失的数据【20, 22】。这些技术与RAID相似,可以在连接到同一台机器的多个磁盘上提供冗余;区别在于在分布式文件系统中,文件访问和复制是在传统的数据中心网络上完成的,没有特殊的硬件。
V
Vonng 已提交
217

M
MuAlex 已提交
218
HDFS发展迅猛:在撰写本文时,最大的HDFS部署运行在成千上万台机器上,总存储容量达数百peta-bytes 【23】。使用商品硬件和开源软件的HDFS上的数据存储和访问成本远低于同等容量的专用存储设备【24】使得如此大的规模存储变得可行。
V
Vonng 已提交
219 220 221

### MapReduce作业执行

M
MuAlex 已提交
222
MapReduce是一个编程框架,您可以使用它编写代码来处理HDFS等分布式文件系统中的大型数据集。理解它的最简单方法是参考第391页上的“简单日志分析”中的Web服务器日志分析示例。MapReduce中的数据处理模式与下例非常相似:
V
Vonng 已提交
223

M
MuAlex 已提交
224 225 226 227
1. 读取一组输入文件,并将其分解成多个记录。在Web服务器日志案例中,每条记录都是日志中的一行(即`\n`是记录分隔符)。
2. 调用Mapper函数从每个输入记录中提取一个键和值。在前面的例子中,mapper函数是`awk'{print $ 7}'`:它提取`URL($7)`作为主键,并将值保留为空。
3. 按键排序所有的键值对。在日志案例中,这由第一个排序命令完成。
4. 调用reducer函数遍历排序后的键值对。如果同一个键出现多次,排序使它们在列表中相邻,所以很容易组合这些值而不必在内存中保留很多中间状态。在前面的例子中,reducer是由uniq -c命令实现的,该命令使用相同的密钥来统计相邻记录的数量。
V
Vonng 已提交
228

M
MuAlex 已提交
229
这四个步骤可以由一个MapReduce作业执行。步骤2(map)和4(reduce)是您编写自定义数据处理代码的地方。步骤1(将文件分解成记录)由输入格式解析器处理。步骤3中的排序步骤隐含在MapReduce中 - 您不必编写它,因为mapper的输出始终在给予reducer之前进行排序。
V
Vonng 已提交
230

M
MuAlex 已提交
231
要创建MapReduce作业,您需要实现两个回调函数,mapper和reducer,其行为如下(另请参阅“MapReduce查询”(第46页)):
V
Vonng 已提交
232 233 234

***Mapper***

M
MuAlex 已提交
235
每个输入记录都会调用一次mapper,其工作是从输入记录中提取键和值。对于每个输入,它可以生成任意数量的键值对(包括没有)。它不会保留从一个输入记录到下一个中的状态,因此每个记录都是独立处理的。
V
Vonng 已提交
236 237 238

***Reducer***

M
MuAlex 已提交
239 240 241
MapReduce框架使用由mapper生成的键值对,收集属于同一个键的所有值,并使用迭代器调用reducer处理集合。 Reducer可以产生输出记录(例如相同URL的出现次数)。

在Web服务器日志案例中,第5步中出现了第二个排序命令,它按请求数对URL进行排序。在MapReduce中,如果您需要第二个排序阶段,则可以通过编写第二个MapReduce作业并将第一个作业的输出用作第二个作业的输入来实现它。这样看来,mapper的作用是将数据放入一个适合排序的表单中,而reducer的作用是处理已排序的数据。
V
Vonng 已提交
242 243 244

#### 分布式执行MapReduce

M
MuAlex 已提交
245
和Unix命令管道相比,MapReduce可以在多台机器上并行执行计算,而无需编写代码来明确并行处理。mapper和reducer一次只能处理一条记录;他们不需要知道他们的输入来自哪里或者输出到什么地方,所以框架可以处理数据在机器之间移动带来的复杂性。
V
Vonng 已提交
246

M
MuAlex 已提交
247
在分布式计算中可以使用标准的Unix工具作为mapper和reducer【25】,但是通常我们使用传统编程语言中的函数来实现,。在Hadoop MapReduce中,mapper和reducer都是实现特定接口的Java类。在MongoDB和CouchDB中,mapper和reducer都是JavaScript函数(请参阅第46页的“MapReduce查询”)。
V
Vonng 已提交
248

M
MuAlex 已提交
249
[图10-1]()显示了Hadoop MapReduce作业中的数据流。它的并行化基于分区(参见第6章):输入通常是HDFS中的一个目录,输入目录中的每个文件或文件块都被认为是一个单独的分区,可以单独进行map任务(图10-1中的m 1,m 2和m 3标记)。
V
Vonng 已提交
250

M
MuAlex 已提交
251
每个输入文件的大小通常是数百兆字节。 MapReduce调度器(图中未显示)试图在其中一台存储输入文件副本的机器上运行每个mapper,只要该机器有足够的备用RAM和CPU资源来运行映射任务【26】。这个原则被称为将计算放在数据附近【27】:节省通过网络复制输入文件开销,减少网络负载和增加局部计算。
V
Vonng 已提交
252 253 254 255 256

![](img/fig10-1.png)

**图10-1 具有三个Mapper和三个Reducer的MapReduce任务**

M
MuAlex 已提交
257
在大多数情况下,应该在map任务中运行的程序代码在分配运行的计算机上不存在,所以MapReduce框架首先复制代码(例如Java程序中的JAR文件)到适当的机器。然后启动map任务并开始读取输入文件,一次将一条记录传递给mapper的回调函数。mapper的输出由键值对组成。
V
Vonng 已提交
258

M
MuAlex 已提交
259
Reduer的计算也会分割。虽然Map任务的数量由输入文件块的数量决定,但Reducer的任务数量由作业执行者配置(可以不同于mapper任务的数量)。为了确保具有相同主键的所有键值对在相同的reducer处理,框架使用主键的哈希值来确定reducer处理的键值对(参见“通过主键哈希分区”)第203页)。
V
Vonng 已提交
260

M
MuAlex 已提交
261
键值对必须进行排序,但数据集可能太大,无法在单台机器上使用常规排序算法进行排序。替代方案是将排序分阶段进行。首先,每个map任务都基于主键的哈希,通过reducer将其输出分区。每一个分区都作为排序好的文件被写入mapper的磁盘上,使用的技术与我们在第76页的“SSTables and LSM-Trees”中讨论的类似。
V
Vonng 已提交
262

M
MuAlex 已提交
263
只要mapper读取输入文件并写入它排序后的输出文件,MapReduce调度器就会通知reducer可以从mapper获取输出文件。Reducer连接到每个mapper,并下载其对应分区的排序好的键值对文件。Reducer进行分区,排序,从mapper中拷贝对应分区到reducer一并称为洗牌(shuffle)【26】(一个令人困惑的术语 - 不像扑克中的洗牌,MapReduce中没有随机性)。
V
Vonng 已提交
264

M
MuAlex 已提交
265
Reduce任务从mapper获取文件,将它们合并在一起并保证有序。因此,如果不同的mapper使用相同的主键生成记录,则它们将在合并的reducer输入中相邻。
V
Vonng 已提交
266

M
MuAlex 已提交
267
Reducer通过一个迭代器对相同的主键进行逐步扫描(在某些情况下可能不是全部在内存中)。 Reducer可以使用任意逻辑来处理这些记录,并且可以生成任意数量的输出记录。这些输出会写入到分布式文件系统上的文件(通常存在运行reducer的机器的本地磁盘,其他机器上则是这个文件的副本)。
V
Vonng 已提交
268

M
MuAlex 已提交
269
#### MapReduce工作流程
V
Vonng 已提交
270

M
MuAlex 已提交
271
单个MapReduce作业可以解决的问题范围有限。请参阅日志分析案例,一个MapReduce作业可以确定每个URL的页面浏览次数,但无法确定访问最多的URL,因为这需要第二轮排序。
V
Vonng 已提交
272

M
MuAlex 已提交
273
因此,将MapReduce作业链到工作流中非常常见,例如,一个作业的输出成为下一个作业的输入。 Hadoop Map-Reduce框架对工作流没有特别的支持,所以这个链接是通过目录名隐含完成的:第一个作业必须通过配置将其输出写入到HDFS中的指定目录,第二个作业必须是读取这个目录。从MapReduce框架的角度来看,他们是两个独立的工作。
V
Vonng 已提交
274

M
MuAlex 已提交
275
因此,被链接的MapReduce作业和Unix命令的流水线(它直接将一个进程的输出作为输入传递给另一个进程,只使用一个小的内存缓冲区)不太一样,它更像是一个命令序列,其中每个命令的输出会写入临时文件,下一个命令从临时文件中读取。这种设计有利有弊,我们将在第419页“中间状态的具体化”中讨论。
V
Vonng 已提交
276

M
MuAlex 已提交
277
只有当作业成功完成时,批处理作业的输出才被视为有效(MapReduce丢弃失败作业产生的部分输出)。因此,工作流中的一项工作只有在先前的工作 - 即产生它所需的输入工作成功完成时才能开始。为了处理这些作业之间的依赖关系执行,基于Hadoop开发了各种工作流调度器,包括Oozie,Azkaban,Luigi,Airflow和Pinball [28]。
V
Vonng 已提交
278

M
MuAlex 已提交
279
这些调度程序还具有管理功能,在维护大量批处理作业时非常有用。在构建推荐系统[29]时,由50到100个MapReduce作业组成的工作流非常常见,而在大型组织中,不同的团队可能运行不同的作业来读取彼此的输出。工具支持对于管理这样复杂的数据流非常重要。
V
Vonng 已提交
280

M
MuAlex 已提交
281
Hadoop的各种高级工具(如Pig [30],Hive [31],Cascading [32],Crunch [33]和FlumeJava [34])也设置了多个MapReduce阶段的工作流程并且将他们自动连接在一起。
V
Vonng 已提交
282

M
MuAlex 已提交
283
### Reduce端连接(join)与分组(group)
V
Vonng 已提交
284

M
MuAlex 已提交
285
我们在第2章中讨论了数据模型和查询语言的连接,但是我们还没有深入探讨连接如何实现,现在我们来再次探讨。
V
Vonng 已提交
286

M
MuAlex 已提交
287
在许多数据集中,一条记录与另一条记录有连接的情况很常见:关系模型中的外键,文档模型中的文档引用,图模型中的边。只要有一些代码需要访问该连接两边的记录(包含引用的记录和被引用的记录),连接就是必需的。正如第2章所讨论的,反范式可以减少对连接的依赖,但很难将其完全移除[^v]。
V
Vonng 已提交
288

M
MuAlex 已提交
289
在数据库中,如果执行的查询只涉及少量记录,数据库通常会使用索引来快速定位(请参阅第3章)。如果查询涉及连接,则可能需要多个索引查找。然而,MapReduce没有索引的概念 - 至少不是通常意义上的索引。
V
Vonng 已提交
290

M
MuAlex 已提交
291
当MapReduce作业被赋予一组文件作为输入时,它读取这些文件的全部内容;数据库会把这个操作称为全表扫描。如果您只想读取少量的记录,与索引查找相比,全表扫描的成本非常高昂。但是,在分析查询中(请参阅第88页上的“处理事务或分析”),通常需要计算大量记录的聚合。在这种情况下,全表扫描可能比较合理,特别是可以在多台机器上并行处理。
V
Vonng 已提交
292

M
MuAlex 已提交
293
[^v]: 我们在本书中讨论的连接通常是等值连接,即最常见的连接类型,其中记录与其他记录通过主键(例如ID)连接。有些数据库支持更通用的连接类型,例如使用<而不是=,由于篇幅所限,我们不予讨论。
V
Vonng 已提交
294

M
MuAlex 已提交
295
当我们在批处理的背景下讨论连接时,意味着解析数据集内某个连接的所有事件。 例如,我们假设一个作业同时为所有用户处理数据,而不仅仅是为一个特定用户查找数据(可以通过索引更有效地完成)。
V
Vonng 已提交
296 297 298

#### 示例:分析用户活动事件

M
MuAlex 已提交
299
图10-2给出了一个批处理作业中使用join的典型例子。 在左侧是事件日志,描述登录用户在网站上做的事情(称为活动事件或点击流数据),右侧是用户数据库。 您可以将此示例看作是星型模式的一部分(请参阅“星号和雪花:分析的示意图”(第93页)):事件日志记录可以看做一个表,用户数据库是其中的一个维度(通过userId映射)。
V
Vonng 已提交
300

V
Vonng 已提交
301 302 303 304
![](img/fig10-2.png)

**图10-2 用户行为日志与用户档案的连接**

M
MuAlex 已提交
305
分析任务可能需要将用户活动与用户信息相连接:例如,用户资料包含年龄或出生日期,则系统可以确定不同年龄组中最受欢迎的页面。但是,活动事件仅包含用户ID,而不是完整的用户资料信息。在每一个活动事件中嵌入这个完整信息很可能非常浪费。因此,活动事件需要和用户资料数据库连接。
V
Vonng 已提交
306

M
MuAlex 已提交
307
最简单实现连接的方式是逐个遍历活动事件,并为每个用户ID查询用户数据库(在远程服务器上)。虽然可行,但是很可能性能非常差:处理吞吐量受限于访问数据库服务器的往返时间,本地缓存的有效性很大程度上取决于数据的分布情况,并行运行大量查询可能会轻易压垮数据库[35]。
V
Vonng 已提交
308

M
MuAlex 已提交
309
为了在批处理过程中达到良好的吞吐量,计算必须(尽可能)在一台机器上进行。通过网络随机访问要处理的每个记录太慢。而且,查询远程数据库意味着批处理作业结果会变得不确定,因为远程数据库中的数据可能会改变。
V
Vonng 已提交
310

M
MuAlex 已提交
311
所以更好的方法是获取用户数据库的副本(例如,使用ETL进程从数据库备份中提取数据,请参阅第91页上的“数据仓库”),并将其放入与用户活动事件的日志相同的分布式文件系统中。这样,用户数据库存在于HDFS中的一组文件中,用户活动记录在另一组文件中,您可以使用MapReduce将所有相关记录集中在一起并高效地处理它们。
V
Vonng 已提交
312 313 314

#### 排序合并连接

M
MuAlex 已提交
315
回想一下,mapper的作用是从每个输入记录中提取一个键和值。在图10-2中,键是用户ID:一组mapper负责活动事件(提取用户ID作为键和活动事件作为值),而另一组mapper将会检查用户数据库(提取用户ID作为键,用户的出生日期作为值)。这个过程如图10-3所示。
V
Vonng 已提交
316 317 318

![](img/fig10-3.png)

M
MuAlex 已提交
319
**图10-3 Reduce端基于user ID进行归并排序连接,如果输入数据集被分为多个文件,则每个都可以被多个mapper并行处理**
V
Vonng 已提交
320

M
MuAlex 已提交
321
当MapReduce框架通过主键对mapper输出进行分区,然后对键值对进行排序后,所有用户ID相同的活动事件和用户记录在reducer输入中彼此相邻。 Map-Reduce作业甚至可以通过排序使reducer始终最先看到用户信息,然后紧跟着按照时间排序的活动事件 - 这种技术被称为次级排序【26】。
V
Vonng 已提交
322

M
MuAlex 已提交
323
然后reducer可以很容易地执行真正的连接逻辑:对于每个用户ID都会调用一次reduce函数,并且由于次级排序,第一个值应该是来自用户数据库的出生日期记录。 Reducer将出生日期存储在本地变量中,然后使用相同的用户ID遍历活动事件,输出“已观看网址”和“观看者年龄”这一对数据。随后的Map-Reduce作业可以计算每个URL查看者的年龄分布,并按年龄组进行分簇。
V
Vonng 已提交
324

M
MuAlex 已提交
325
由于reducer一次处理一个特定用户ID的所有记录,因此每次只需要将一个用户记录保存在内存中,而不需要通过网络发起任何请求。这个算法被称为归并排序连接(sort-merge join),因为mapper输出按主键排序,然后reducer可以把两边排序好的记录列表合并在一起。
V
Vonng 已提交
326

M
MuAlex 已提交
327
#### 相关数据整合
V
Vonng 已提交
328

M
MuAlex 已提交
329
在归并排序连接中,mapper和排序过程确保将执行特定用户ID的连接操作的所有必需数据放在一起:然后只需调用一次reducer。因为预先排列了所有需要的数据,reducer可以是一个高吞吐量并且低内存消耗的相当简单,单线程的代码。
V
Vonng 已提交
330

M
MuAlex 已提交
331
看待这种体系结构的一种方法是mapper“发送消息”给reducer。当一个mapper发出一个键值对时,主键决定了值应该传递到的目标地址。尽管主键只是一个任意的字符串(不像IP地址和端口号那样的实际的网络地址),它的行为就像一个地址:所有具有相同主键的键值对将被发送到相同的目标(对同一个reducer的调用)。
V
Vonng 已提交
332

M
MuAlex 已提交
333
使用MapReduce编程模型将物理网络通信方面的计算(从正确的计算机获取数据)从应用程序逻辑(处理数据)中分离出来。这种分离与数据库的典型应用形成了鲜明的对比,从数据库中获取数据的请求通常发生在应用程序代码中的某处[36]。由于MapReduce能够处理所有的网络通信,因此它也避免了应用程序代码担心的部分故障问题,例如另一个节点的崩溃:MapReduce在不影响应用程序逻辑的前提下透明地重试失败的任务。
V
Vonng 已提交
334

M
MuAlex 已提交
335
### 分组(GROUP BY)
V
Vonng 已提交
336

M
MuAlex 已提交
337
除了连接(Join),“数据整合”的另一个常见用法是通过某个键(如SQL中的GROUP BY子句)对记录进行分组。相同的密钥的记录成为一个分组,并且下一步通常在每个组内进行某种聚合,例如:
V
Vonng 已提交
338

M
MuAlex 已提交
339 340 341
* 计算每个组中记录的数量(例如,在统计页面视图的示例中,在SQL中表示为COUNT(*)聚合)
* 选取特定字段(SUM(fieldname))进行累加
* 根据某些排序函数选择前k个记录
V
Vonng 已提交
342

M
MuAlex 已提交
343
使用MapReduce实现这种分组操作的最简单方法是设置mapper,可以依据需要的主键进行分组生成的键值对。然后分区和排序过程将所有相同键的记录一起发送给与同一个reducer。因此,在MapReduce上实现时,分组和连接看起来非常相似。
V
Vonng 已提交
344

M
MuAlex 已提交
345
分组的另一个常见用途是整理特定用户会话期间的所有活动事件,以便找出用户的一系列操作(称为会话化[37])。例如,可以使用这种分析来确定显示网站新版本的用户是否比那些显示旧版本(A / B测试)的用户更有可能进行购买,或计算某个营销活动是否值得。
V
Vonng 已提交
346

M
MuAlex 已提交
347
如果您有多个Web服务器处理用户请求,则特定用户的活动事件很可能分散在各个不同的服务器的日志文件中。您可以通过使用会话cookie,用户ID或类似的标识符作为分组主键来实现会话化,并将特定用户的所有活动事件进行整合,同时将不同用户的事件分配到不同的分区。
V
Vonng 已提交
348

M
MuAlex 已提交
349
#### 处理偏斜
V
Vonng 已提交
350

M
MuAlex 已提交
351
如果大量数据指向同一主键,则“将具有相同主键的所有记录放到相同位置”的模式将被破坏。例如,在社交网络中,大多数用户可能和几百人有连接,但少数名人可能有数百万的追随者。这种不成比例的活动数据库记录被称为“关键对象”(linchpin objects)[38]或热键(hot keys)。
V
Vonng 已提交
352

M
MuAlex 已提交
353
在单个reducer中收集与名人相关的所有活动(例如回复他们发布的内容)可能导致严重的偏斜(也称为热点) - 也就是说,一个reducer必须处理比其他更多的记录(参见205页的“偏斜的工作负载和消除热点“)。由于MapReduce作业只有在其所有mapper和reducer都完成时才完成,所有后续作业必须等待最慢的reducer完成后才能启动。
V
Vonng 已提交
354

M
MuAlex 已提交
355
如果连接(join)输入有热键,则可以使用一些算法进行补偿。例如,Pig中的偏斜连接方法首先运行一个抽样作业来确定哪些是热键[39]。执行实际连接时,mapper随机发送到一系列主键相关的reducer中的一个(传统的MapReduce会选择一个基于主键哈希确定的reducer)。对于连接的其他输入,与热键相关的记录需要被复制到所有处理该密钥的reducer[40]。
V
Vonng 已提交
356

M
MuAlex 已提交
357
这种技术将处理热键的工作分散到多个reducer上,以把其他连接输入复制到多个reducer的代价使其更好地并行化。Crunch中的分片连接(sharded join)方法与之类似,但需要显式指定热键而不使用采样作业。这种技术也非常类似于我们在第205页的“偏斜的工作负载和消除热点”中讨论的技术,使用随机化来缓解(alleviate)分区数据库中的热点。
V
Vonng 已提交
358

M
MuAlex 已提交
359
Hive的偏斜连接优化采取了另一种方法。它需要在表元数据中明确指定热键,并将与这些键相关的记录与其余文件分开存放。在该表上执行连接时,它将使用map端连接(请参阅下一节)获取热键。
V
Vonng 已提交
360

M
MuAlex 已提交
361
使用热键对记录进行分组和聚合时,可以分两个阶段。第一个MapReduce阶段将记录发送到随机reducer,以便每个reducer对热键对应的子集执行分组,并为每个键输出更紧凑的聚合值。第二个Map-Reduce作业将来自所有第一阶段reducer的输出合并为每个键的单个值。
V
Vonng 已提交
362 363 364

### Map端连接

M
MuAlex 已提交
365 366 367
上一节描述的连接算法在reducer中执行实际的连接逻辑,因此被称为reduce端连接。mapper负责准备输入数据:从每个输入记录中提取键和值,将键值对分配给reducer分区,并按键排序。

reduce端连接的优点是不需要对输入数据做任何假设:无论其属性和结构如何,mapper都可以为连接准备数据。然而,不利的一面是,所有这些排序,复制到reducer以及合并reducer输入可能开销非常大。取决于可用的内存缓冲区,因为数据会经历MapReduce [37]的不同阶段,可能被多次写入磁盘。
V
Vonng 已提交
368

M
MuAlex 已提交
369
另一方面,如果您可以对输入数据进行某些假设,则可以通过使用所谓的map端连接来加快连接速度。这种方法使用了一个简化的MapReduce作业,其中没有reducer,也没有排序。相反,每个mapper只需从分布式文件系统读取一个输入文件块,然后将一个输出文件写入文件系统。
V
Vonng 已提交
370

M
MuAlex 已提交
371
#### 广播哈希连接
V
Vonng 已提交
372

M
MuAlex 已提交
373
执行map端连接最简单的方法适用于大数据集与小数据集连接的情况。特别是小数据集需要足够小,以便可以将其全部加载到每个mapper的内存中。
V
Vonng 已提交
374

M
MuAlex 已提交
375
例如,在图10-2的情况下,假设用户数据库足够小可以放入内存。当mapper启动时,它可以首先将用户数据库从分布式文件系统读取到内存中的哈希表中。然后mapper可以扫描用户活动事件,并简单地在哈希表查找中每个事件的用户ID[^vi]。
V
Vonng 已提交
376

M
MuAlex 已提交
377
[^vi]: 这个例子假定哈希表中的每个键只有一个记录,这对用户数据库(用户ID唯一标识一个用户)可能是正确的。通常,哈希表可能需要包含具有相同键的多个记录,并且连接运算符将输出主键的所有匹配。
V
Vonng 已提交
378

M
MuAlex 已提交
379
这种方式仍然可以有多个map任务:一个是大数据集用于连接的每个文件块(在图10-2的例子中,活动事件是大数据集),然后每个mapper都将小的数据集全部加载到内存中和之前进行连接。
V
Vonng 已提交
380

M
MuAlex 已提交
381
这种简单而有效的算法被称为广播哈希连接:“广播”反映了这样一个事实,即大数据集分区后的每个mapper都读取整个小数据集(所以小数据集有效地“广播”到大的数据),单词hash反映了它使用一个哈希表。 Pig(又叫做“replicated join”),Hive(“MapJoin”),Cascading和Crunch都支持这种连接方法。它也用于数据仓库查询引擎,如Impala [41]。 
V
Vonng 已提交
382

M
MuAlex 已提交
383
除了将连接中的小数据集加载到内存哈希表中以外,另一种方法是将它存储在本地磁盘上的只读索引中[42]。索引中经常使用的部分将保留在操作系统的页面缓存中,因此这种方法可以提供与内存中哈希表几乎一样快的随机访问查找,也并不需要把数据放入内存。
V
Vonng 已提交
384

M
MuAlex 已提交
385
#### 分区哈希连接
V
Vonng 已提交
386

M
MuAlex 已提交
387
如果map端连接的输入以相同的方式进行分区,那么哈希连接方法可以独立应用于每个分区。在图10-2的情况中,您可以根据用户ID的最后一位(假设是数字)进行分区(因此大小数据集每边都有10个分区)。例如,mapper 3首先将所有具有以3结尾的ID的用户加载到哈希表中,然后扫描ID为3的每个用户的所有活动事件。
V
Vonng 已提交
388

M
MuAlex 已提交
389
如果分区正确无误,您可以确定所有需要连接的记录都位于相同编号的分区中,因此每个mapper只需读取一个分区就足够了。这个方法的优点是每个mapper都只需将较少量的数据加载到其哈希表中。
V
Vonng 已提交
390

M
MuAlex 已提交
391
这种方法只适用于连接两边的输入具有相同数量的分区的情况,并且分区选取的主键和哈希方式相同。如果输入是由之前执行过这个分组的MapReduce作业生成的,那么这是一个可行的方案。
V
Vonng 已提交
392

M
MuAlex 已提交
393
在Hive中, 分区哈希连接被叫做桶映射连接(bucketed map joins )。
V
Vonng 已提交
394

M
MuAlex 已提交
395
#### Map端归并连接
V
Vonng 已提交
396

M
MuAlex 已提交
397
如果输入数据集不仅以相同的方式进行分区,而且还基于相同的键进行排序,则可以应用另一种map端连接的变体。在这种情况下,输入是否足够小可以放入内存并不重要,因为mapper可以执行通常由reducer执行的合并操作:按主键升序顺序递增读取两个输入文件,来匹配有相同主键的记录。
V
Vonng 已提交
398

M
MuAlex 已提交
399
如果可以进行map端归并连接,则可能意味着先前的MapReduce作业将输入数据集进行了分区和排序。原则上,这个连接可以在之前工作的reduce阶段进行。但是,在单独只有map的作业中执行归并连接仍然是有可能的,例如,除了上述特定连接之外,分区和排序好的数据集还有其他用途。
V
Vonng 已提交
400

M
MuAlex 已提交
401
#### 使用Map端连接的MapReduce工作流程
V
Vonng 已提交
402

M
MuAlex 已提交
403
当下游作业消费MapReduce连接完成后的输出时,map端或reduce端连接的选择会影响输出的结构。 reduce端连接的输出按连接主键进行分区和排序,而map端连接的输出与大数据集相同的方式进行分区和排序(因为每个map任务都基于连接的大数据集端的文件块进行启动,无论是使用分区连接还是广播连接)。
V
Vonng 已提交
404

M
MuAlex 已提交
405
如前所述,map端连接也对输入数据集的大小,排序和分区做出了更多的假设和限制。在优化连接策略时,了解分布式文件系统中数据集的物理布局变得非常重要:仅仅知道编码格式和数据存储目录的名称是不够的;您还必须知道数据分区的数量和排序依据的主键。
V
Vonng 已提交
406

M
MuAlex 已提交
407
在Hadoop生态系统中,这种关于数据集分区的元数据经常在HCatalog和Hive Metastore中维护[37]。
V
Vonng 已提交
408

M
MuAlex 已提交
409
### 批处理工作流的输出
V
Vonng 已提交
410

M
MuAlex 已提交
411
我们已经谈了很多关于实现MapReduce工作流程的各种算法,但是我们忽略了一个重要的问题:所有处理完成的结果是什么?我们为什么要把这些工作放在前面?
V
Vonng 已提交
412

M
MuAlex 已提交
413
在数据库查询中,我们把事务处理(OLTP)和分析处理(OLAP)进行了区分(请参阅第90页上的“事务处理或分析?”)。我们看到,OLTP查询通常使用索引按键查找少量记录,以便将其呈现给用户(例如在网页上)。而另一方面,分析查询通常会扫描大量记录,执行分组和聚合,输出通常是报告的形式:显示某个随时间变化的指标的图表,或以某种排序得出的前10项,或把一些数量的数据按子类别分解。这种报告的用户通常是需要做出商业决策的分析师或经理。
V
Vonng 已提交
414

M
MuAlex 已提交
415
批处理适用在哪里?不是事务处理,也不是分析,但是与分析更接近,因为批处理通常需要扫描输入数据集的大部分。然而,MapReduce作业的工作流程与用于分析目的的SQL查询不同(请参阅第414页的“比较Hadoop与分布式数据库”)。批处理过程的输出通常不是报告,而是一些其他类型的结构。
V
Vonng 已提交
416 417 418

#### 建立搜索索引

M
MuAlex 已提交
419
Google最初使用的MapReduce为其搜索引擎建立索引,以5到10个MapReduce作业的工作流实现[1]。虽然Google后来不再使用MapReduce [43]构建搜索索引,但是如果从建立搜索索引的角度来看可以帮助理解MapReduce。 (即使在今天,Hadoop MapReduce仍然是构建Lucene / Solr索引的好方法。)
V
Vonng 已提交
420

M
MuAlex 已提交
421
我们在第88页的“全文搜索和模糊索引”中简要地看到了Lucene这样的全文搜索索引是如何工作的:它是一个文件(术语字典),您可以在其中高效地查找特定关键字并找到包含该关键字的所有文档ID列表(发布列表)。这是一个非常简单的搜索索引视图 - 实际上,它需要各种额外数据,以便根据相关性对搜索结果进行排名,纠正拼写错误,解析同义词等等,但这一原则是成立的。
V
Vonng 已提交
422

M
MuAlex 已提交
423
如果需要对一组固定文档执行全文搜索,则批处理是构建索引的一种非常有效的方法:mapper根据需要对文档集合进行分区,每个reducer构建其分区的索引,并将索引文件写入分布式文件系统。构建这样的文档分区索引(请参阅“分区和二级索引”(第184页))非常有利于并行处理。
V
Vonng 已提交
424 425 426

由于按关键字查询搜索索引是只读操作,因此这些索引文件一旦创建就是不可变的。

M
MuAlex 已提交
427
如果索引的文档集合发生更改,则可以选择定期重新运行整个索引工作流程,并在完成后用新的索引文件批量替换以前的索引文件。如果只有少量的文档发生了变化,这种方法可能会带来很高的计算成本,但是它的优点是索引更新过程很简单:文档变化,索引过期需更新。
V
Vonng 已提交
428

M
MuAlex 已提交
429
或者,可以增量建立索引。如第3章所述,如果要添加,删除或更新索引中的文档,Lucene会写出新的某段(segment)文件,并在后台异步合并和整理。我们将在第11章中看到更多这样的增量处理。
V
Vonng 已提交
430 431 432

#### 键值存储作为批处理输出

M
MuAlex 已提交
433
搜索索引只是批处理工作流程可能输出的一个示例。批量处理的另一个常见用途是构建机器学习系统,如分类器(例如,垃圾邮件过滤器,异常检测,图像识别)和推荐系统(例如,您可能认识的人,您可能感兴趣的产品或相关搜索)[29])。
V
Vonng 已提交
434

M
MuAlex 已提交
435
这些批处理作业的输出通常是某种数据库:例如,可以通过用户ID查询并获取该用户的推荐好友的数据库,或者可以通过产品ID查询的数据库并获取相关产品[45]。
V
Vonng 已提交
436

M
MuAlex 已提交
437
这些数据库需要从处理用户请求的Web应用程序中查询,这些请求通常与Hadoop基础架构分离。那么批处理过程的输出如何返回到Web应用程序可以查询的数据库中?
V
Vonng 已提交
438

M
MuAlex 已提交
439
最显然的选择可能是直接在mapper或reducer中使用客户端库作为您最喜欢的数据库,并从批处理作业直接写入数据库服务器,一次写入一条记录。这确实奏效(假设您的防火墙规则允许从您的Hadoop环境直接访问您的生产数据库),但由于以下几个原因,这是一个坏主意:
V
Vonng 已提交
440 441

* 正如前面讨论的连接一样,为每个记录提出一个网络请求比批处理任务的正常吞吐量要慢几个数量级。即使客户端库支持批处理,性能也可能很差。
M
MuAlex 已提交
442 443
* MapReduce作业经常并行运行许多任务。如果所有mapper或reducer都同时以批处理速率写入相同的输出数据库,那么该数据库可能很容易过载,并且其查询性能可能受到影响。这可能会导致系统其他部分的运行问题[35]。
* 通常情况下,MapReduce为作业输出提供了一个完全的“全有或全无”的保证:如果作业成功,则结果就是每个任务只一次成功执行的输出,如果某些任务失败则必须重试。如果整个作业失败,则不会生成任何输出。然而,从作业内部程序写入外部系统会产生不能被隐藏的外部可见性的副作用。因此,您不得不担心部分完成的作业在其他系统可见,以及Hadoop任务尝试和推测性执行的复杂性。
V
Vonng 已提交
444

M
MuAlex 已提交
445
更好的解决方案是在批处理作业中创建一个全新的数据库,并将其作为文件写入分布式文件系统中作业的输出目录中,就像上一节的搜索索引一样。这些数据文件一旦写入就不可变,可以批量加载到处理只读查询的服务器中。有很多基于键值存储的产品支持在MapReduce作业中构建数据库文件,包括Voldemort [46],Terrapin [47],ElephantDB [48]和HBase批量加载[49]。
V
Vonng 已提交
446

M
MuAlex 已提交
447
构建这些数据库文件是MapReduce的一个很好的用法:使用mapper提取一个键,然后使用该键进行排序已经成为构建索引所需的大部分工作。由于大多数这些键值存储是只读的(文件只能由批处理作业一次写入,而且是不可变的),所以数据结构非常简单。例如,它们不需要WAL(请参阅第82页的「使B树可靠」, Write-Ahead Logging - 预写式日志)。
V
Vonng 已提交
448

M
MuAlex 已提交
449
将数据加载到Voldemort时,服务器将继续向旧数据文件提供请求,同时将新数据文件从分布式文件系统复制到服务器的本地磁盘。一旦复制完成,服务器会自动切换到查询新文件。如果在这个过程中出现任何问题,它可以很容易地再次切换回旧的文件,因为它们仍然存在,并且是不变的[46]。
V
Vonng 已提交
450 451 452

#### 批量过程输出的哲学

M
MuAlex 已提交
453
本章前面讨论过的Unix哲学(第394页的“Unix哲学”)鼓励通过明确的数据流来进行实验:程序读取输入并写入输出。在这个过程中,输入保持不变,任何之前的输出都被新输出完全替换,这样没有其他副作用。这意味着您可以随心所欲对命令重新运行,调整或调试,而不会扰乱系统的状态。
V
Vonng 已提交
454

M
MuAlex 已提交
455
MapReduce作业的输出处理遵循相同的思想。将输入视为不可变来避免副作用(如写入外部数据库),批处理作业不仅性能良好,而且更容易维护:
V
Vonng 已提交
456

M
MuAlex 已提交
457 458 459 460 461
* 如果在代码中引入了一个错误,并且输出错误或损坏了,则可以简单地回滚到代码的先前版本,然后重新运行该作业,输出将再次正确。或者,甚至更简单,您可以将旧的输出保存在不同的目录中,然后切换回原来的目录。具有读写事务的数据库没有这个属性:如果你部署了有Bug的代码,将错误的数据写入数据库,那么回滚代码将无法修复数据库中的数据。 (能够从错误代码中恢复的思想被称为人为容错[50]。)
* 由于易于回滚,功能开发可以比“错误意味着不可挽回的损害”的环境更快地进行。这种使不可逆性最小化的原则有利于敏捷软件的开发[51]。
* 如果map或reduce任务失败,MapReduce框架将自动重新调度并在同一个输入上再次运行。如果失败是由于代码中的一个错误造成,那么它会一直崩溃,并最终导致作业在几次尝试之后失败。但是如果故障是由于暂时的问题引起,那么故障是可以容忍的。这种自动重试是安全的,因为输入不可变,而失败任务的输出会被MapReduce框架丢弃。
* 同一组文件可用作各种不同作业的输入,其中包括计算指标并评估作业的输出是否达到预期的监控作业(例如,将其与前一次运行的输出进行比较并测量差异)。
* 与Unix工具类似,MapReduce作业将逻辑与串联(配置输入和输出目录)分开,这就提供了关注点的分离,并且带来代码重用的可能性:一个团队可以专注于实现一项工作其他团队可以决定何时何地运行这项工作。
V
Vonng 已提交
462

M
MuAlex 已提交
463
在这些领域,对Unix运行良好的设计原则似乎也适用于Hadoop,但Unix和Hadoop在某些方面也有所不同。例如,因为大多数Unix工具都没有指定文件的类型,所以必须做大量的输入解析(本章开头的日志分析示例使用{print $ 7}来提取URL)。在Hadoop上,通过使用更多结构化的文件格式来消除一些低价值的语法转换:Avro(请参阅第122页上的“Avro”)和Parquet(请参阅第95页上的“面向列的存储”)经常被使用,因为它们提供高效的基于schema的编码,并允许随着时间的推移模式的演变(见第4章)。
V
Vonng 已提交
464 465 466

### 比较Hadoop和分布式数据库

M
MuAlex 已提交
467
正如我们所看到的,Hadoop有点像Unix的分布式版本,其中HDFS对应文件系统,而MapReduce是Unix进程的特殊实现(在Unix中排序总是在map和reduce这两个阶段之间进行)。我们看到了如何在这些基础上实现各种连接和分组操作。
V
Vonng 已提交
468

M
MuAlex 已提交
469
当MapReduce论文[1]发表时,它在某种意义上说并不新鲜。我们在前几节中讨论的所有处理和并行连接算法已经在十多年前的所谓的大规模并行处理(MPP)数据库中实现了[3,40]。例如,Gamma数据库机器,Teradata和Tandem NonStop SQL是这方面的先驱[52]。
V
Vonng 已提交
470

M
MuAlex 已提交
471
最大的区别是MPP数据库集中于在一组机器上并行执行分析SQL查询,而MapReduce和分布式文件系统[19]的组合则更像是一个可以运行任意程序的通用操作系统。
V
Vonng 已提交
472 473 474

#### 存储的多样性

M
MuAlex 已提交
475
数据库要求您根据特定的模型(例如关系或文档)来构造数据,而分布式文件系统中的文件只是字节序列,可以使用任何数据模型和编码格式来编写。它们可能是数据库记录的集合,但同样可以是文本,图像,视频,传感器读数,稀疏矩阵,特征向量,基因组序列或任何其他类型的数据。
V
Vonng 已提交
476

M
MuAlex 已提交
477
说白了,Hadoop开放了将数据不加区分地转储到HDFS的可能性,之后才考虑如何进一步处理它[53]。相比之下,在将数据以特定格式导入数据库之前,MPP数据库通常需要对数据和查询模式进行仔细的前期建模。
V
Vonng 已提交
478

M
MuAlex 已提交
479
纯粹来看,这种谨慎的建模和导入似乎是可取的,因为这意味着数据库用户的数据质量更好。然而,在实践中,似乎只是简单地使数据可用 - 即使它有些怪异,形式原始,难以使用 - 通常也比尝试在前期确定理想的数据模型更有价值[54 ]。
V
Vonng 已提交
480

M
MuAlex 已提交
481
这个想法与数据仓库类似(请参阅第91页上的“数据仓库”):简单的将大型组织的各个部分的数据集中在一起是很有价值的,因为它可以将之前完全不同的数据集关联在一起。 MPP数据库所要求的谨慎的模式设计减慢了数据集中收集速度;以原始形式收集数据,以后再去关注设计schema,使数据收集速度加快(这种理念有时被称为“数据湖”或“企业数据中心”【55】)。
V
Vonng 已提交
482

M
MuAlex 已提交
483
不加区别的数据倾泻转移了解析数据的责任:不再是强迫数据集的生产者将其转化为标准化的格式,数据的解析成为消费者的问题(读时建模方法【56】;请参阅第39页上的“文档模型中的模式灵活性”)。如果生产者和消费者是不同团队并且有不同优先级,这可能是一个优势。甚至可能不存在一个理想的数据模型,利用数据的目的不同带来处理方式的不同。以原始形式简单地转储数据可以进行多次这样的转换。这种方法被称为寿司原则:“原始数据而不是加工过的数据更好”【57】。
V
Vonng 已提交
484

M
MuAlex 已提交
485
因此,Hadoop经常被用于实现ETL过程(请参阅“数据仓库”第91页):事务处理系统中的数据以某种原始形式转储到分布式文件系统中,然后编写MapReduce作业来清理数据,将其转换为关系形式,并将其导入MPP数据仓库以进行分析。数据建模仍然会有,但在一个单独的步骤中,从数据收集中分离出来。这种解耦可以实现因为分布式文件系统支持以任何格式编码的数据。
V
Vonng 已提交
486

M
MuAlex 已提交
487
#### 处理模型的多样性
V
Vonng 已提交
488

M
MuAlex 已提交
489
MPP数据库是单一的,紧密集成的软件,负责磁盘上的存储布局,查询计划,调度和执行。由于这些组件都可以针对数据库的特定需求进行调整和优化,因此整个系统可以在其设计的查询类型上取得非常好的性能。而且,SQL语言提供丰富查询的优雅机制,无需编写代码,业务分析师使用的图形工具(例如Tableau)可访问直接该语言。
V
Vonng 已提交
490

M
MuAlex 已提交
491
另一方面,并非所有类型的处理都容易使用SQL查询表达。例如,如果要构建机器学习和推荐系统,或者使用相关性排名模型的全文搜索索引,或者执行图像分析,则很可能需要更通用的数据处理模型。这些类型的处理通常对特定的应用程序非常具体(例如机器学习的特征工程,机器翻译的自然语言模型,欺诈预测的风险评估函数),因此它们不可避免地需要编写代码,而不仅仅是查询。
V
Vonng 已提交
492

M
MuAlex 已提交
493
MapReduce使工程师能够轻松地在大型数据集上运行自己的代码。如果你有HDFS和MapReduce,那么你可以在它上面建立一个SQL查询执行引擎,事实上这正是Hive项目所做的[31]。而且对于不适合用SQL查询表示的批处理,也可以自己实现处理方式。
V
Vonng 已提交
494

M
MuAlex 已提交
495
随后,人们发现MapReduce对于某些类型的处理来说太过于限制,表现得很差,基于Hadoop开发了其他各种处理模型(我们将在第419页的“Beyond MapReduce”中看到其中的一些)。仅仅两种处理模型,SQL和MapReduce是不够的,我们需要更多不同的模型!而且由于Hadoop平台的开放性,实施一整套方法是可行的,而这在整体MPP数据库的范围内是不可能的[58]。
V
Vonng 已提交
496

M
MuAlex 已提交
497
至关重要的是,这些不同的处理模型都可以在一个共享用途的集群上运行,所有这些机器都可以访问分布式文件系统上的相同文件。在Hadoop方法中,不需要将数据导入到几个不同的专用系统中进行不同类型的处理:系统足够灵活,可以支持同一个群集内不同的工作负载。不需要移动数据使得从数据中取值变得容易得多,并且在新的处理模型进行实验更加容易。
V
Vonng 已提交
498

M
MuAlex 已提交
499
Hadoop生态系统包括随机访问的OLTP数据库,如HBase(请参阅第70页的“SSTables和LSM-Trees”)和MPP类型的分析数据库,如Impala [41]。 HBase和Impala都不使用MapReduce,但都使用HDFS进行存储。访问和处理数据的方式完全不同,但是它们可以共存并被集成到同一个系统中。
V
Vonng 已提交
500 501 502

#### 为频繁的故障而设计

M
MuAlex 已提交
503
在比较MapReduce和MPP数据库设计理念时,另外两个关键点是:故障处理;对内存和磁盘的使用。与在线系统相比,批处理对故障不太敏感,因为失败不会立即影响用户,并且任务可以再次运行。
V
Vonng 已提交
504

M
MuAlex 已提交
505
如果一个节点在执行查询时崩溃,大多数MPP数据库会中止整个查询,让用户重新提交查询或自动重新运行[3]。查询通常运行几秒钟最多几分钟,这种处理方法可以接受,因为重试的代价不是太大。 MPP数据库还倾向于在内存中保留尽可能多的数据(例如,使用哈希连接(join))以避免从磁盘读取的成本。
V
Vonng 已提交
506

M
MuAlex 已提交
507
另一方面,MapReduce通过以单个任务的粒度重试工作,可以容忍map或reduce任务的失败,而不会影响作业的整体。它也非常渴望将数据写入磁盘,一方面是为了容错,另一方面考虑数据集可能太大放不进内存。
V
Vonng 已提交
508

M
MuAlex 已提交
509
MapReduce方法更适用于较大的作业:处理很多数据并运行很长时间的作业,以至于在此过程中很可能遇到至少一个任务故障。在这种情况下,由于单个任务失败而重新运行整个工作太浪费了。即使以单个任务的粒度进行恢复带来的开销使得无故障处理更慢,如果任务失败率足够高,仍然可以进行合理的权衡。
V
Vonng 已提交
510

M
MuAlex 已提交
511
但是这些假设有多大可能性?在大多数集群中,机器故障确实发生,但是它们不是很频繁 - 可能很少,大多数工作都不会经历机器故障。为了容错真的值得引入这些额外开销吗?
V
Vonng 已提交
512

M
MuAlex 已提交
513
要了解MapReduce保守使用内存和任务级恢复的原因,需要明白最初设计MapReduce的背景。 Google拥有混合使用的数据中心,在线生产服务和离线批处理作业在相同的机器上运行。每个任务都有一个使用容器执行的资源分配(CPU核心,RAM,磁盘空间等)。每个任务也具有优先级,如果优先级较高的任务需要更多的资源,则可以终止(抢占)同一台机器上较低优先级的任务以释放资源。优先级还决定了计算资源的价格:团队必须为他们使用的资源付费,优先级更高的进程更贵[59]。
V
Vonng 已提交
514

M
MuAlex 已提交
515
这种架构允许为非生产(低优先级)投入过度的计算资源,因为系统知道如果必要的话它可以回收资源。与分离生产和非生产任务的系统相比,它可以更好地利用机器和提高效率。但是,如果MapReduce作业以低优先级运行,它随时都有被抢占的风险,因为优先级较高的进程需要其资源。批量工作有效地“拾取桌子下面的碎片”,利用高优先级进程已经占据以外的资源。
V
Vonng 已提交
516

M
MuAlex 已提交
517
在谷歌,运行一个小时的MapReduce任务有大约5%被终止的风险,为更高优先级的进程腾出空间。这个比率比由于硬件问题,机器重新启动或其他原因引起的故障率高出一个数量级[59]。按照这种抢占率,如果一个作业有100个任务,每个运行10分钟,那么至少有一个任务在完成之前将被终止的风险大于50%。
V
Vonng 已提交
518

M
MuAlex 已提交
519
这就是为什么MapReduce能够容忍频繁意外的任务终止的原因:这不是因为硬件特别不可靠,而是因为任意终止进程可以在计算集群中更好地利用资源。
V
Vonng 已提交
520

M
MuAlex 已提交
521
在开源的集群调度器中,抢占的使用较少。 YARN的CapacityScheduler支持抢占以平衡不同队列的资源分配[58],但在编写本文时,YARN,Mesos或Kubernetes不支持通用优先级抢占[60]。在任务不经常被终止的环境中,MapReduce的设计决策没有太多意义。在下一节中,我们将看看MapReduce的一些替代方案,这些方案做出了不同的设计决策。
V
Vonng 已提交
522

M
MuAlex 已提交
523
## 超越MapReduce
V
Vonng 已提交
524

M
MuAlex 已提交
525
虽然MapReduce在2010年前后变得非常流行并受到大量炒作,但它只是分布式系统众多编程模型的一种。根据数据量,数据结构和处理类型,可能其他工具更适合用于计算。
V
Vonng 已提交
526

M
MuAlex 已提交
527
尽管如此,我们在这一章花了很多时间讨论MapReduce,因为它是一个有用的学习工具,是分布式文件系统的一个相当清晰和简单的抽象。也就是说,易于理解它在做什么,而不是在易于使用。恰恰相反:使用原始的MapReduce API来实现复杂的处理工作实际上是非常困难和费力的 - 例如,您需要从头开始实现任何连接算法[37]。
V
Vonng 已提交
528

M
MuAlex 已提交
529
针对直接使用MapReduce的困难,基于MapReduce衍生出很多更高级的编程模型(Pig,Hive,Cascading,Crunch)的抽象。如果您了解MapReduce的工作原理,那么它们相当容易学习,而且它们的高级架构使许多常见的批处理任务更容易实现。
V
Vonng 已提交
530

M
MuAlex 已提交
531
但是,MapReduce执行模型本身也存在一些问题,这些问题并没有通过增加另一个抽象层次来解决,而且在某些类型的处理中性能很差。一方面,MapReduce非常强大:您可以使用它来处理任意大量的数据,即使用户系统不可靠并且会有频繁的任务终止(虽然速度可能很慢)。另一方面,对于某种类型的处理来说其他的工具有时候也会更快。
V
Vonng 已提交
532

M
MuAlex 已提交
533
在本章的其余部分中,我们将介绍另外一些批处理方法。在第十一章我们将转向流处理,这可以看作是加速批处理的另一种方法。
V
Vonng 已提交
534

M
MuAlex 已提交
535
### 中间状态具体化
V
Vonng 已提交
536

M
MuAlex 已提交
537
如前所述,每个MapReduce作业都独立于其他任何作业。作业与其他地方的接口是分布式文件系统上的输入和输出目录。如果希望一个作业的输出成为第二个作业的输入,则需要将第二个作业的输入目录配置为与第一个作业的输出目录,并且外部工作流调度程序必须保证第一份作业已经完成的情况下才会启动下一个作业。
V
Vonng 已提交
538

M
MuAlex 已提交
539
如果第一个作业的输出要在系统内广播,之前的设置非常合理。在这种情况下,您需要能够通过名称来引用它,并将其用作多个不同作业(包括由其他团队开发的作业)的输入。将数据发布到分布式文件系统中的众所周知的位置允许松耦合,这样作业就不需要知道是谁提供输入或消耗其输出(请参阅“分离逻辑和线路”,395页)。
V
Vonng 已提交
540

M
MuAlex 已提交
541
但是,在很多情况下,您知道一个作业的输出只能用作另一个作业的输入,并且他们由同一个团队维护。在这种情况下,分布式文件系统上的文件只是简单的中间状态:一种将数据从一个作业传递到下一个作业的方式。在用于构建由50或100个MapReduce作业[29]组成的推荐系统的复杂工作流程中,存在很多这样的中间状态。
V
Vonng 已提交
542

M
MuAlex 已提交
543
将这个中间状态写入文件的过程称为具体化。 (我们在第101页的“聚合:数据立方体和具体化视图”中已经遇到了这个术语。它意味着要着重于计算某个操作的结果并写出来,而不是有需求时才进行计算。)
V
Vonng 已提交
544

M
MuAlex 已提交
545
相反,本章开头的日志分析示例使用Unix管道将一个命令的输出与另一个命令的输出连接起来。管道并没有完全实现中间状态,而是只使用一个小的内存缓冲区,将输出增量流向输入。
V
Vonng 已提交
546 547 548

MapReduce的完全实现中间状态的方法与Unix管道相比存在不足:

M
MuAlex 已提交
549 550 551
* MapReduce作业只有在前面的作业(生成其输入)中的所有任务都完成时才能启动,而由Unix管道连接的进程同时启动,输出一旦生成就会被使用。不同机器上的偏差或不同的负荷意味着一份工作往往会有一些较慢的任务。必须等到所有前面的工作完成才能拖慢了整个工作流程的执行。
* mapper通常是多余的:它们只读取刚刚由reducer写入的相同文件,并为下一个分区和排序阶段做好准备。在许多情况下,mapper代码可能是之前的reducer的一部分:如果reducer输出被分区和排序的方式与mapper输出相同,那么reducer们可以直接链接在一起,而不需要在中间插入一个mapper。
* 将中间状态存储在分布式文件系统中意味着这些文件被复制到多个节点,这对于这样的临时数据通常过于浪费。
V
Vonng 已提交
552 553 554

#### 数据流引擎

M
MuAlex 已提交
555
了解决MapReduce的这些问题,人们开发了几种用于分布式批量计算的新的执行引擎,其中最着名的是Spark [61,62],Tez [63,64]和Flink [65,66]。他们设计的方式有很多不同之处,但他们有一个共同点:他们把整个工作流作为一项工作来处理,而不是把它分解成独立的子作业。
V
Vonng 已提交
556

M
MuAlex 已提交
557
由于它们通过几个处理阶段明确地对数据流建模,所以这些系统被称为数据流引擎。像MapReduce一样,它们通过反复调用用户定义的函数,每次在单个线程上处理一条记录。他们通过对输入进行分区来并行工作,并将一个功能的输出复制到网络上,成为另一个功能的输入。
V
Vonng 已提交
558

M
MuAlex 已提交
559
与MapReduce不同,这些功能不需要mapper和reducer的严格交替,而是可以更灵活进行组合。我们把这些函数函数称为操作符,数据流引擎提供了几个不同的选项来连接一个操作符的输出到另一个的输入:
V
Vonng 已提交
560

M
MuAlex 已提交
561 562 563
* 一个选项是通过键对记录进行重新分区和排序,就像在MapReduce的洗牌阶段一样(请参阅“分布式执行MapReduce”)。此功能可以像在MapReduce中一样启用排序合并连接和分组。
* 另一种可能是获取几个输入,并以相同的方式进行分区,但跳过排序。这节省了分区哈希连接的工作,这要求记录的分区重要,但顺序无所谓,因为哈希表本身就是无序。
* 对于广播哈希连接,可以将一个运算符的相同输出发送到连接运算符的所有分区。
V
Vonng 已提交
564

M
MuAlex 已提交
565
这种处理引擎的风格基于像Dryad [67]和Nephele [68]这样的研究系统,与MapReduce模型相比,它提供了几个优点:
V
Vonng 已提交
566 567

* 排序等昂贵的工作只需要在实际需要的地方执行,而不是在每个Map和Reduce阶段之间默认发生。
M
MuAlex 已提交
568 569 570 571 572
* 没有不必要的map任务,因为mapper的工作通常可以合并到前面的reducer中(因为mapper不会更改数据集的分区)。
* 由于工作流程中的所有连接和数据依赖性都是明确声明的,因此调度程序会清楚在哪里需要哪些数据,因此可以进行本地优化。例如,它可以尝试将使用某些数据的任务放在与生成它的任务相同的机器上,以便可以通过共享内存缓冲区交换数据,而不必通过网络复制数据。
* 通常将运算符之间的中间状态保存在内存中比写入本地磁盘更为高效,这比写入HDFS需要更少的I/O(必须将其复制到多个计算机的磁盘上)。 MapReduce已经将这种优化用于mapper的输出,但是数据流引擎将该思想推广到了所有的中间状态。
* 运算符可以在输入准备就绪后立即开始执行;不需要等待整个前一阶段的完成。
* 与MapReduce(为每个任务启动一个新的JVM)相比,Java虚拟机(JVM)进程可以重用来运行新操作,从而减少启动开销。
V
Vonng 已提交
573

M
MuAlex 已提交
574
您可以使用数据流引擎来执行与MapReduce相同的计算,并且由于此处提到的优化,通常执行速度会明显更快。既然操作符是map和reduce的泛化,相同的处理代码可以在任一执行引擎上运行:Pig,Hive或Cascading中实现的工作流可以通过简单的配置更改从MapReduce切换到Tez或Spark,而无需修改代码[64]。
V
Vonng 已提交
575

M
MuAlex 已提交
576
Tez是一个相当瘦的库,它依赖于YARN shuffle服务来实现节点间数据的实际复制[58],而Spark和Flink则是拥有自己的网络通信层,调度器和面向用户的API的大型框架。我们随后会讨论这些API。
V
Vonng 已提交
577 578 579 580 581

#### 容错

完全实现中间状态到分布式文件系统的一个优点是它是持久的,这使得MapReduce中的容错相当容易:如果一个任务失败,它可以在另一台机器上重新启动,并从文件系统重新读取相同的输入。

M
MuAlex 已提交
582
Spark,Flink和Tez避免将中间状态写入HDFS,因此他们采取了不同的容错方案:如果一台机器发生故障,并且该机器上的中间状态丢失,则会从其他仍然可用的数据重新计算(优先使用之前的可以拿到的中间数据,然后才是HDFS上的原始输入数据)。
V
Vonng 已提交
583

M
MuAlex 已提交
584
为了实现重新计算,框架必须跟踪给定数据是如何计算的 - 使用哪个输入分区,以及哪个运算符被应用。 Spark使用弹性分布式数据集(RDD)抽象来追踪数据的祖先[61],而Flink为运算符状态设置检查点,允许在执行过程中遇到错误时重新运算[66]。
V
Vonng 已提交
585

M
MuAlex 已提交
586
在重新计算数据时,重要的是要知道计算是否是确定性的:也就是说,给定相同的输入数据,运算符是否始终生成相同的输出?如果一些丢失的数据已经发送给下游运算符,这个问题就很重要。如果运算符重新启动,重新计算的数据与原有的丢失数据不一致,下游运算很难解决新旧数据之间的矛盾。对于不确定性运算符来说,解决方案通常是同样停止下游运算,然后再运行新数据。
V
Vonng 已提交
587

M
MuAlex 已提交
588
为了避免这种级联故障,最好让运算符具有确定性。但是请注意,非确定性行为很容易不经意间发生:例如,许多编程语言在迭代哈希表的元素时不能保证任何特定顺序,许多概率和统计算法明确依赖于使用随机数,以及任何依赖系统时钟或外部数据源也是不确定的。为了可靠地从故障中恢复我们需要消除这种不确定性,例如通过使用固定种子产生伪随机数。
V
Vonng 已提交
589

M
MuAlex 已提交
590
通过重新计算数据从故障中恢复并不总是合适:如果中间数据比源数据小得多,或者如果计算量非常大(CPU开销大),那么将中间数据转化为文件可能比将其重新计算更便宜。
V
Vonng 已提交
591

M
MuAlex 已提交
592
#### 具体化的讨论
V
Vonng 已提交
593

M
MuAlex 已提交
594
回到与Unix的类比,我们看到MapReduce就像是将每个命令的输出写入临时文件,而数据流引擎看起来更像是Unix管道。尤其是Flink,围绕流水线执行的思想而建立:也就是说,将运算符的输出递增地传递给其他运算符,并且在开始处理之前不等待输入完成。
V
Vonng 已提交
595

M
MuAlex 已提交
596
排序操作不可避免地需要消耗其整个输入,然后才能生成任何输出,因为最后一个输入记录可能排在最前。任何需要排序的运算都需要至少暂时地累积(记录)状态。但是工作流程的许多其他部分可以以流水线方式执行。
V
Vonng 已提交
597

M
MuAlex 已提交
598
当作业完成时,它的输出需要持久化到某个地方,以便用户可以找到并使用它 - 很可能它会再次写入分布式文件系统。因此,在使用数据流引擎时,HDFS上的文件数据集通常仍是作业的输入和最终输出。和MapReduce一样,输入是不可变的,输出被完全替换。和MapReduce相比的好处是,您可以省去将所有中间状态写入文件系统的开销。
V
Vonng 已提交
599

V
Vonng 已提交
600 601
### 图与迭代处理

M
MuAlex 已提交
602
在第49页上的“类图形数据模型”中,我们讨论了使用图形来建模数据,并使用图形查询语言来遍历图形中的边和顶点。第2章的讨论集中在OLTP风格的使用上:快速执行查询来查找少量符合特定条件的顶点。
V
Vonng 已提交
603

M
MuAlex 已提交
604
图表在批处理环境中的应用也很有趣,其目标是在整个图表上执行某种离线处理或分析。这种需求经常出现在机器学习应用程序(如推荐引擎)或排序系统中。例如,最着名的图形分析算法之一是PageRank [69],它试图根据都有什么网页链接到这个网页来估算这个网页的流行度。它是被用于确定搜索引擎呈现结果顺序公式的一部分。
V
Vonng 已提交
605

M
MuAlex 已提交
606
> 像Spark,Flink和Tez这样的数据流引擎(参见第419页“中间状态的实现化”)通常将运算符作为有向无环图(DAG)排列在作业中。这与图形处理不一样:在数据流引擎中,从一个运算符到另一个运算符的数据流被构造成一个图,而数据本身通常由关系型元组(relational-style tuples)构成。在图形处理中,数据本身就是图形的形式。另一个不幸的命名混淆!
V
Vonng 已提交
607

M
MuAlex 已提交
608
许多图算法是一次遍历一个边,将一个顶点与相邻的顶点连接起来以便传播一些信息,并且重复直到满足一些条件为止 - 例如,直到后面没有更多的边,或者直到一些收敛条件。我们在图2-6中看到一个例子,它通过重复地跟踪后续的边来指示哪个位置在哪个位置当中(这种算法被称为传递闭包),从而列出了包含在数据库中的北美所有位置。
V
Vonng 已提交
609

M
MuAlex 已提交
610
可以在分布式文件系统(包含顶点和边的列表的文件)中存储图形,但是这种“重复直到完成”的思路不能用普通的MapReduce来表示,因为它只执行一次数据传递。因此这种算法经常以迭代方式实现:
V
Vonng 已提交
611

M
MuAlex 已提交
612 613
1. 外部调度程序运行批处理来计算算法中的一步。
2. 当批处理过程完成时,调度器检查它是否完成(基于完成条件 - 例如,没有更多后续的边,或者与上次迭代相比的变化低于某个阈值)。
V
Vonng 已提交
614 615
3. 如果尚未完成,则调度程序返回到步骤1并运行另一轮批处理。

M
MuAlex 已提交
616 617 618
这种方法可行,但是用MapReduce实现往往是非常低效的,因为MapReduce没有考虑算法的迭代性:它总是读取整个输入数据集并产生一个全新的输出数据集,即使与上次迭代相比,只有一小部分发生变化。

#### Pregel处理模型
V
Vonng 已提交
619

M
MuAlex 已提交
620
作为图形批处理的优化,批量同步并行(BSP)计算模型[70]已经流行起来。其中,它由Apache Giraph [37],Spark的GraphX API和Flink的Gelly API [71]实现。它也被称为Pregel模型,因为Google的Pregel论文使这种处理图的方法变得流行[72]。
V
Vonng 已提交
621

M
MuAlex 已提交
622
回想一下在MapReduce中,mapper在概念上“发送消息”给reducer的特定调用,因为框架将所有具有相同主键的mapper输出集中在一起。 Pregel背后的思想与其类似:一个顶点可以“发送消息”到另一个顶点,通常这些消息沿着图的边发送。
V
Vonng 已提交
623

M
MuAlex 已提交
624
在每次迭代中,每个顶点调用一个函数,将所有发送给它的消息传递给函数 - 就像调用reducer一样。与MapReduce的不同之处在于,在Pregel模型中,顶点在一次迭代到下一次迭代期间记忆它的状态,所以这个函数只需要处理新的传入消息。如果在图的某个部分没有发送消息,则不需要做任何工作。
V
Vonng 已提交
625

M
MuAlex 已提交
626
这与参与者(Actor)模型有些相似(请参阅第130页上的“分布式参与者框架”),如果我们把每个点当做一个参与者,除了顶点状态和顶点之间的消息具有容错性和持久性,并且通信以固定的方式进行:在每一次迭代中,框架传递在前一次迭代中发送的所有消息。参与者通常没有这样的时间保证。
V
Vonng 已提交
627 628 629

#### 容错

M
MuAlex 已提交
630
顶点只能通过消息传递进行通信(而不是直接相互查询)有助于提高Pregel作业的性能,因为消息可以批处理,而且等待通信的时间也减少了。唯一的等待是在迭代之间:由于Pregel模型保证所有在一次迭代中发送的消息都传递给下一个迭代,所以在下一个迭代开始之前,先前的迭代必须完全完成,并且所有的消息必须在网络上复制。
V
Vonng 已提交
631

M
MuAlex 已提交
632
即使底层网络可能导致消息丢失,重复或任意延迟(请参阅第267页上的“不可靠网络”),Pregel实施可保证在接下来的迭代中消息在其目标顶点处理并且仅处理一次。像MapReduce一样,该框架透明地从故障中恢复,以简化基于Pregel模型的算法。
V
Vonng 已提交
633

M
MuAlex 已提交
634
这种容错通过在迭代结束时定期检查所有顶点的状态来实现的,即将其全部状态写入持久存储。如果某个节点发生故障并且其内存中状态丢失,则最简单的解决方法是将整个图计算回滚到上一个检查点,然后重新启动计算。如果算法是确定性的并且记录了消息,那么也可以选择性地只恢复丢失的分区(就像我们之前在数据流引擎中所讨论那样)[72]。
V
Vonng 已提交
635

M
MuAlex 已提交
636
#### 并行执行
V
Vonng 已提交
637

M
MuAlex 已提交
638
顶点不需要知道它在哪个物理机器上执行;当它发送消息到其他顶点时,它只是发送到一个顶点ID。框架来决定图的分区,即确定哪个顶点运行在哪个机器上,以及如何通过网络路由消息,以便它们正确运行。
V
Vonng 已提交
639

M
MuAlex 已提交
640
由于编程模型一次仅处理一个顶点(有时称为“像顶点一样思考”),所以框架可以以任意方式划分图形。理想情况下,如果顶点之间需要进行大量的通信,那么它将被分配在同一台机器上。然而,寻找这样一个优化的分区在实践中是困难的,图形顶点经常被任意分配,而不会尝试将相关的顶点分在一起。
V
Vonng 已提交
641

M
MuAlex 已提交
642
因此,图算法通常会有很多跨机器通信,而中间状态(节点之间发送的消息)往往比原始图大。通过网络发送消息的开销增大会显着减慢分布式图算法的速度。
V
Vonng 已提交
643

M
MuAlex 已提交
644
出于这个原因,如果你的图可以放在一台计算机的内存中,那么单机(甚至可能是单线程)算法很可能会超越分布式批处理[73,74]。即使图形太大以至于无法放入内存,也可以放在单个计算机的磁盘上,使用GraphChi等框架进行单机处理是一个可行的选择[75]。如果图形太大而不适合单个机器,像Pregel这样的分布式方案是不可避免的。人们正在对有效的并行化图算法进行探索。
V
Vonng 已提交
645 646


V
Vonng 已提交
647 648
### 高级API和语言

M
MuAlex 已提交
649
从MapReduce流行到现在,分布式批处理的执行引擎已经成熟。到目前为止,基础设施已经足够强大,能够存储和处理超过10,000台机器群集上的PB级别的数据。由于在这种规模下物理操作批处理过程的问题已经或多或少得到了解决,所以我们更加关注其他方面:改进编程模型,提高处理效率,扩大这些技术可以解决的问题集。
V
Vonng 已提交
650

M
MuAlex 已提交
651
如前所述,Hive,Pig,Cascading和Crunch等高级语言和API由于减少了手工编写MapReduce作业带来的工作量而变得非常流行。随着Tez的出现,这些高级语言还可以移动到新的数据流执行引擎,而无需重写作业代码。 Spark和Flink,也包括他们自己的高级数据流API经常从FlumeJava中获得灵感[34]。
V
Vonng 已提交
652

M
MuAlex 已提交
653
这些数据流API通常使用关系型构建块来表达一个计算:基于某个字段的值连接数据集;按主键分组元组;通过一些条件过滤;并通过计数,求和或其他函数来聚合元组。在内部,这些操作是使用本章前面讨论过的各种连接和分组算法来实现的。
V
Vonng 已提交
654

M
MuAlex 已提交
655
除了需要较少代码这个明显优势之外,这些高级接口还允许交互式使用,在这种交互式使用中,您可以将分析代码增量地写到shell中并经常运行,以观察它在做什么。这种开发风格在探索数据集和试验处理方法时非常有用。这也让人联想到Unix哲学,我们在第394页的“Unix哲学”中讨论过这个问题。
V
Vonng 已提交
656

M
MuAlex 已提交
657
此外,这些高级接口不仅使我们使用系统的效率更高,而且提高了机器的工作执行效率。
V
Vonng 已提交
658

M
MuAlex 已提交
659
#### 向声明式查询语言转变
V
Vonng 已提交
660

M
MuAlex 已提交
661
与拼接执行连接的代码相比,指定连接为关系运算符的优点是,框架可以分析连接输入的属性,并自动决定哪个上述连接算法最适合手头的任务。 Hive,Spark和Flink都有基于开销的查询优化器可以做到这一点,甚至可以改变连接顺序减少中间状态的数量最[66,77,78,79]。
V
Vonng 已提交
662

M
MuAlex 已提交
663
连接算法的选择可以对批处理作业的性能产生很大的影响,不必理解和记住本章中讨论的各种连接算法对用户来说更加友好。如果以声明的方式指定连接,则可以做到这点:应用程序简单地说明哪些连接是必需的,查询优化器决定如何最好地执行连接。我们以前在第42页的“数据的查询语言”中提到了这个想法。
V
Vonng 已提交
664

M
MuAlex 已提交
665
但是,在其他方面,MapReduce和之后的数据流引擎与SQL的完全声明性查询模型有很大不同。 MapReduce是围绕函数回调的思想构建的:对于每个记录或者一组记录,调用一个用户定义的函数(mapper或reducer),并且该函数可以自由地调用任意代码来决定输出什么。这种方法的优点是可以在现有大型系统上进行解析,自然语言分析,图像分析以及运行数字或统计算法等。
V
Vonng 已提交
666

M
MuAlex 已提交
667
我们一直使用能否轻松运行任意代码来区分MapReduce和MPP数据库(参见“比较Hadoop和分布式数据库”一节,第414页)。虽然数据库具有编写用户定义函数的功能,但是它们通常使用起来很麻烦,而且与大多数编程语言中广泛使用的程序包管理器和依赖管理系统(例如Maven,npm,Rubygems)做不到很好的集成。
V
Vonng 已提交
668

M
MuAlex 已提交
669
但是,数据流引擎已经发现,除了连接之外,更多使用声明性特征有其他的优点。例如,如果一个回调函数只包含一个简单的过滤条件,或者只是从一条记录中选择了一些字段,那么在每条记录调用函数会带来相当大的CPU开销。如果以声明方式表示这样简单的过滤和映射操作,那么查询优化器可以利用面向列的存储布局(请参阅第95页的“面向列的存储”),并从磁盘只读取所需的列。 Hive,Spark DataFrames和Impala也使用向量化执行(请参阅第99页的“内存带宽和向量化处理”):在对CPU缓存很友好的内部循环中迭代数据,并避免函数调用。Spark生成JVM字节码[79],Impala使用LLVM为这些内部循环生成原生代码[41]。
V
Vonng 已提交
670

M
MuAlex 已提交
671
通过将声明式与高级API结合起来,查询优化器可以在执行期间利用这些声明式方法,批处理框架看起来更像MPP数据库(并且在性能上可以媲美)。同时,通过运行任意代码和以读取任意格式数据保持扩展性,它们保持了灵活性的优势。
V
Vonng 已提交
672

M
MuAlex 已提交
673
#### 不同领域专业化
V
Vonng 已提交
674

M
MuAlex 已提交
675
尽管能够运行任意代码的可扩展性是有用的,但是通常在标准处理模式不断重复发生,所以值得构建通用模块的可重用实现。传统上,MPP数据库满足了商业智能分析和业务报告的需求,但这只是批处理适用的众多领域之一。
V
Vonng 已提交
676

M
MuAlex 已提交
677
另一个越来越重要的领域是统计和数值算法,它们是机器学习应用(如分类和推荐系统)所需要的。可重用的实现正在出现:例如,Mahout在MapReduce,Spark和Flink之上实现了用于机器学习的各种算法,而MADlib在关系型MPP数据库(Apache HAWQ)中也实现了类似的功能[54]。
V
Vonng 已提交
678

M
MuAlex 已提交
679
空间算法例如k-近邻算法[80]也很有用,它在一些多维空间中搜索与目标接近的项 - 这是一种相似性搜索。近似搜索对于基因组分析算法也很重要,它们需要找到相似但不相同的字符串[81]。
V
Vonng 已提交
680

M
MuAlex 已提交
681
分布式中的批处理引擎执行被越来越多的算法使用。随着批处理系统拥有内置功能和高级声明式运算,并且MPP数据库变得更加可编程和灵活,两者开始看起来更相似:最终,它们都只是存储和处理数据的系统。
V
Vonng 已提交
682

V
Vonng 已提交
683 684 685
## 本章小结


M
MuAlex 已提交
686
在本章中,我们探讨了批处理话题。我们首先查看了诸如awk,grep和sort之类的Unix工具,然后我们看到了这些工具的设计理念是如何运用到MapReduce和更新的数据流引擎中的。其中一些设计原则有: 输入是不可变的,输出是为了成为另一个(还未知的)程序的输入,而复杂的问题是通过编写“做好一件事”的小工具来解决的。
V
Vonng 已提交
687

M
MuAlex 已提交
688
在Unix世界中,允许一个程序与另一个程序组合的统一接口是文件和管道;在MapReduce中,接口是一个分布式文件系统。我们看到数据流引擎添加了自己的管道式数据传输机制,以避免将中间状态持久化到分布式文件系统,但作业的初始输入和最终输出通常仍然是HDFS。
V
Vonng 已提交
689 690 691

分布式批处理框架需要解决的两个主要问题是:

M
MuAlex 已提交
692 693 694
- 分区

在MapReduce中,mapper根据输入文件块进行分区。mapper的输出被重新分区,排序,合并到可配置数量的reducer分区中。这个过程的目的是整合相关数据 - 例如,所有主键相同的记录都放在同一个地方。
V
Vonng 已提交
695

M
MuAlex 已提交
696
后MapReduce数据流引擎尽量避免排序,除非无法避免,但它们采取了大致类似的分区方法。
V
Vonng 已提交
697

M
MuAlex 已提交
698
- 容错
V
Vonng 已提交
699

M
MuAlex 已提交
700
MapReduce经常写入磁盘,可以从单个失败的任务中轻松地恢复,而无需重新启动整个作业,但在无故障的情况下减慢了执行速度。数据流引擎的中间状态较少持久化,而是保留在内存中,这意味着如果节点发生故障,重新计算需要更多的数据。确定性运算符减少了需要重新计算的数据量。
V
Vonng 已提交
701 702


M
MuAlex 已提交
703
我们讨论了几种MapReduce的连接算法,其中大多数也是在MPP数据库和数据流引擎中使用的。他们还很好地阐述了分区算法如何工作:
V
Vonng 已提交
704

M
MuAlex 已提交
705
- 排序合并连接
V
Vonng 已提交
706

M
MuAlex 已提交
707
每个正在连接的输入都通过一个mapper提取主键。通过分区,排序和合并,具有相同主键的所有记录最终都会进入同一个reducer的函数。这个函数可以输出连接后的记录。
V
Vonng 已提交
708

M
MuAlex 已提交
709
- 广播哈希连接
V
Vonng 已提交
710

M
MuAlex 已提交
711
连接的两个输入中有一个数量很少,所以它不需要分区并且可以被完全加载到一个哈希表中。因此,您可以为大的输入的每个分区启动一个mapper,将小输入的哈希表加载到每个mapper中,然后一次扫描大输入的一条记录,并在哈希表中进行查询。
V
Vonng 已提交
712

M
MuAlex 已提交
713
- 分区哈希连接
V
Vonng 已提交
714

M
MuAlex 已提交
715
如果连接的两个输入以相同的方式分区(使用相同的主键,相同的哈希函数和相同数量的分区),则可以独立地为每个分区使用哈希表。
V
Vonng 已提交
716

M
MuAlex 已提交
717
分布式批处理引擎有一个故意限制的编程模型:回调函数(比如mapper和reducer)被认为是无状态的,除了指定的输出外,没有外部可见的副作用。这个限制允许框架通过抽象抽象隐藏一些困难的分布式系统问题:对崩溃和网络问题,任务可以安全地重试,任何失败任务的输出都被丢弃。如果某个分区的多个任务成功,则只有其中一个真正使其输出可见。
V
Vonng 已提交
718

M
MuAlex 已提交
719
得益于这个框架,您在批处理作业中的代码无需担心实现容错机制:框架可以保证作业的最终输出与没有发生错误的情况相同,尽管可能不得不重新尝试各种任务。与处理用户请求并且写入数据库(这里可能给请求带来副作用)的在线服务相比,这种机制更为可靠。
V
Vonng 已提交
720

M
MuAlex 已提交
721
批量处理工作的显着特点是它读取一些输入数据并产生一些输出数据,而不修改输入 - 换句话说,输出来源于输入。重要的是,输入数据是有界的:它有一个已知的,固定的大小(例如,某个时间点的日志文件或数据库内容的快照)。因为它是有界的,一个工作知道什么时候它完成了整个输入的读取,和什么时候任务最终完成。
V
Vonng 已提交
722

M
MuAlex 已提交
723
在下一章中,我们将转向流处理,其中的输入是未知的 - 也就是说,你还有一个作业,但是它的输入是永无止境的数据流。在这种情况下,作业永远不会完成,因为在任何时候都可能有更多的任务进来。我们将看到流和批处理在某些方面类似,但是关于无边界的流的假设同样也给怎样构建系统带来了很多变化。
V
Vonng 已提交
724 725


V
Vonng 已提交
726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025

## 参考文献

1.  Jeffrey Dean and Sanjay Ghemawat:
[MapReduce: Simplified Data Processing on Large Clusters](http://research.google.com/archive/mapreduce.html),” at *6th USENIX Symposium on Operating System Design
    and Implementation* (OSDI), December 2004.

1.  Joel Spolsky:
[The Perils of JavaSchools](http://www.joelonsoftware.com/articles/ThePerilsofJavaSchools.html),” *joelonsoftware.com*, December 25, 2005.

1.  Shivnath Babu and Herodotos Herodotou:
[Massively Parallel Databases and MapReduce Systems](http://research.microsoft.com/pubs/206464/db-mr-survey-final.pdf),” *Foundations and Trends in Databases*,
    volume 5, number 1, pages 1–104, November 2013.
    [doi:10.1561/1900000036](http://dx.doi.org/10.1561/1900000036)

1.  David J. DeWitt and Michael Stonebraker:
[MapReduce: A Major Step Backwards](https://homes.cs.washington.edu/~billhowe/mapreduce_a_major_step_backwards.html),” originally published at *databasecolumn.vertica.com*, January 17, 2008.

1.  Henry Robinson:
[The Elephant Was a Trojan Horse: On the Death of Map-Reduce at Google](http://the-paper-trail.org/blog/the-elephant-was-a-trojan-horse-on-the-death-of-map-reduce-at-google/),”
    *the-paper-trail.org*, June 25, 2014.

1.[The Hollerith Machine](https://www.census.gov/history/www/innovations/technology/the_hollerith_tabulator.html),” United States Census Bureau, *census.gov*.

1.[IBM 82, 83, and 84 Sorters Reference Manual](http://www.textfiles.com/bitsavers/pdf/ibm/punchedCard/Sorter/A24-1034-1_82-83-84_sorters.pdf),” Edition A24-1034-1, International Business
    Machines Corporation, July 1962.

1.  Adam Drake:
[Command-Line Tools Can Be 235x Faster than Your Hadoop Cluster](http://aadrake.com/command-line-tools-can-be-235x-faster-than-your-hadoop-cluster.html),” *aadrake.com*, January 25, 2014.

1.[GNU Coreutils 8.23 Documentation](http://www.gnu.org/software/coreutils/manual/html_node/index.html),” Free Software Foundation, Inc., 2014.

1.  Martin Kleppmann:
[Kafka, Samza, and the Unix Philosophy of Distributed Data](http://martin.kleppmann.com/2015/08/05/kafka-samza-unix-philosophy-distributed-data.html),” *martin.kleppmann.com*, August 5, 2015.

1.  Doug McIlroy:
    [Internal Bell Labs memo](http://cm.bell-labs.com/cm/cs/who/dmr/mdmpipe.pdf),
    October 1964. Cited in: Dennis M. Richie:
[Advice from Doug McIlroy](https://www.bell-labs.com/usr/dmr/www/mdmpipe.html),”
    *cm.bell-labs.com*.

1.  M. D. McIlroy, E. N. Pinson, and B. A. Tague:
[UNIX Time-Sharing System: Foreword](https://archive.org/details/bstj57-6-1899),”
    *The Bell System Technical Journal*, volume 57, number 6, pages 1899–1904,
    July 1978.

1.  Eric S. Raymond:
    <a href="http://www.catb.org/~esr/writings/taoup/html/">*The Art of UNIX Programming*</a>.
    Addison-Wesley, 2003. ISBN: 978-0-13-142901-7

1.  Ronald Duncan:
[Text File Formats – ASCII Delimited Text – Not CSV or TAB Delimited Text](https://ronaldduncan.wordpress.com/2009/10/31/text-file-formats-ascii-delimited-text-not-csv-or-tab-delimited-text/),”
    *ronaldduncan.wordpress.com*, October 31, 2009.

1.  Alan Kay:
[Is 'Software Engineering' an Oxymoron?](http://tinlizzie.org/~takashi/IsSoftwareEngineeringAnOxymoron.pdf),” *tinlizzie.org*.

1.  Martin Fowler:
[InversionOfControl](http://martinfowler.com/bliki/InversionOfControl.html),”
    *martinfowler.com*, June 26, 2005.

1.  Daniel J. Bernstein:
[Two File Descriptors for Sockets](http://cr.yp.to/tcpip/twofd.html),” *cr.yp.to*.

1.  Rob Pike and Dennis M. Ritchie:
[The Styx Architecture for Distributed Systems](http://doc.cat-v.org/inferno/4th_edition/styx),” *Bell Labs Technical Journal*, volume 4, number 2, pages
146–152, April 1999.

1.  Sanjay Ghemawat, Howard Gobioff, and Shun-Tak
    Leung: “[The Google File System](http://research.google.com/archive/gfs-sosp2003.pdf),”
    at *19th ACM Symposium on Operating Systems Principles* (SOSP), October 2003.
    [doi:10.1145/945445.945450](http://dx.doi.org/10.1145/945445.945450)

1.  Michael Ovsiannikov, Silvius Rus, Damian Reeves, et al.:
[The Quantcast File System](http://db.disi.unitn.eu/pages/VLDBProgram/pdf/industry/p808-ovsiannikov.pdf),” *Proceedings of the VLDB Endowment*, volume 6, number 11, pages 1092–1101, August 2013.
    [doi:10.14778/2536222.2536234](http://dx.doi.org/10.14778/2536222.2536234)

1.[OpenStack Swift 2.6.1 Developer Documentation](http://docs.openstack.org/developer/swift/),” OpenStack Foundation, *docs.openstack.org*, March 2016.

1.  Zhe Zhang, Andrew Wang, Kai Zheng, et al.:
[Introduction to HDFS Erasure Coding in Apache Hadoop](http://blog.cloudera.com/blog/2015/09/introduction-to-hdfs-erasure-coding-in-apache-hadoop/),” *blog.cloudera.com*, September 23, 2015.

1.  Peter Cnudde:
[Hadoop Turns 10](http://yahoohadoop.tumblr.com/post/138739227316/hadoop-turns-10),”
    *yahoohadoop.tumblr.com*, February 5, 2016.

1.  Eric Baldeschwieler:
[Thinking About the HDFS vs. Other Storage Technologies](http://hortonworks.com/blog/thinking-about-the-hdfs-vs-other-storage-technologies/),” *hortonworks.com*, July 25, 2012.

1.  Brendan Gregg:
[Manta: Unix Meets Map Reduce](http://dtrace.org/blogs/brendan/2013/06/25/manta-unix-meets-map-reduce/),” *dtrace.org*, June 25, 2013.

1.  Tom White: *Hadoop: The Definitive Guide*,
    4th edition. O'Reilly Media, 2015. ISBN: 978-1-491-90163-2

1.  Jim N. Gray:
[Distributed Computing Economics](http://arxiv.org/pdf/cs/0403019.pdf),” Microsoft
    Research Tech Report MSR-TR-2003-24, March 2003.

1.  Márton Trencséni:
[Luigi vs Airflow vs Pinball](http://bytepawn.com/luigi-airflow-pinball.html),”
    *bytepawn.com*, February 6, 2016.

1.  Roshan Sumbaly, Jay Kreps, and Sam Shah:
[The 'Big Data' Ecosystem at LinkedIn](http://www.slideshare.net/s_shah/the-big-data-ecosystem-at-linkedin-23512853),” at *ACM International Conference on Management of Data*
    (SIGMOD), July 2013.
    [doi:10.1145/2463676.2463707](http://dx.doi.org/10.1145/2463676.2463707)

1.  Alan F. Gates, Olga Natkovich, Shubham Chopra, et al.:
[Building a High-Level Dataflow System on Top of Map-Reduce: The Pig Experience](http://www.vldb.org/pvldb/2/vldb09-1074.pdf),” at *35th International Conference on Very Large Data
    Bases* (VLDB), August 2009.

1.  Ashish Thusoo, Joydeep Sen Sarma, Namit Jain, et al.:
[Hive – A Petabyte Scale Data Warehouse Using Hadoop](http://i.stanford.edu/~ragho/hive-icde2010.pdf),” at *26th IEEE International Conference on Data Engineering* (ICDE), March 2010.
    [doi:10.1109/ICDE.2010.5447738](http://dx.doi.org/10.1109/ICDE.2010.5447738)

1.[Cascading 3.0 User Guide](http://docs.cascading.org/cascading/3.0/userguide/),” Concurrent, Inc., *docs.cascading.org*, January 2016.

1.[Apache Crunch User Guide](https://crunch.apache.org/user-guide.html),” Apache Software Foundation, *crunch.apache.org*.

1.  Craig Chambers, Ashish Raniwala, Frances
    Perry, et al.: “[FlumeJava: Easy, Efficient Data-Parallel Pipelines](https://research.google.com/pubs/archive/35650.pdf),” at *31st ACM SIGPLAN Conference on Programming Language
    Design and Implementation* (PLDI), June 2010.
    [doi:10.1145/1806596.1806638](http://dx.doi.org/10.1145/1806596.1806638)

1.  Jay Kreps:
[Why Local State is a Fundamental Primitive in Stream Processing](https://www.oreilly.com/ideas/why-local-state-is-a-fundamental-primitive-in-stream-processing),” *oreilly.com*, July 31, 2014.

1.  Martin Kleppmann:
[Rethinking Caching in Web Apps](http://martin.kleppmann.com/2012/10/01/rethinking-caching-in-web-apps.html),” *martin.kleppmann.com*, October 1, 2012.

1.  Mark Grover, Ted Malaska, Jonathan
    Seidman, and Gwen Shapira: *[Hadoop Application Architectures](http://shop.oreilly.com/product/0636920033196.do)*. O'Reilly Media, 2015. ISBN: 978-1-491-90004-8

1.  Philippe Ajoux, Nathan Bronson,
    Sanjeev Kumar, et al.:
[Challenges to Adopting Stronger Consistency at Scale](https://www.usenix.org/system/files/conference/hotos15/hotos15-paper-ajoux.pdf),” at *15th USENIX Workshop on Hot Topics in
    Operating Systems* (HotOS), May 2015.

1.  Sriranjan Manjunath:
[Skewed Join](https://wiki.apache.org/pig/PigSkewedJoinSpec),” *wiki.apache.org*,
    2009.

1.  David J. DeWitt, Jeffrey F. Naughton, Donovan A.
    Schneider, and S. Seshadri: “[Practical Skew Handling in Parallel Joins](http://www.vldb.org/conf/1992/P027.PDF),” at *18th International Conference on Very Large Data Bases* (VLDB), August 1992.

1.  Marcel Kornacker, Alexander Behm, Victor
    Bittorf, et al.: “[Impala: A Modern, Open-Source SQL Engine for Hadoop](http://pandis.net/resources/cidr15impala.pdf),” at *7th Biennial Conference on Innovative Data Systems
    Research* (CIDR), January 2015.

1.  Matthieu Monsch:
[Open-Sourcing PalDB, a Lightweight Companion for Storing Side Data](https://engineering.linkedin.com/blog/2015/10/open-sourcing-paldb--a-lightweight-companion-for-storing-side-da),” *engineering.linkedin.com*, October 26, 2015.

1.  Daniel Peng and Frank Dabek:
[Large-Scale Incremental Processing Using Distributed Transactions and Notifications](https://www.usenix.org/legacy/event/osdi10/tech/full_papers/Peng.pdf),” at *9th USENIX
    conference on Operating Systems Design and Implementation* (OSDI), October 2010.

1.["Cloudera Search User Guide,"](http://www.cloudera.com/documentation/cdh/5-1-x/Search/Cloudera-Search-User-Guide/Cloudera-Search-User-Guide.html) Cloudera, Inc., September 2015.

1.  Lili Wu, Sam Shah, Sean Choi, et al.:
[The Browsemaps: Collaborative Filtering at LinkedIn](http://ls13-www.cs.uni-dortmund.de/homepage/rsweb2014/papers/rsweb2014_submission_3.pdf),” at *6th Workshop on Recommender Systems and
    the Social Web* (RSWeb), October 2014.

1.  Roshan Sumbaly, Jay Kreps, Lei Gao, et al.:
[Serving Large-Scale Batch Computed Data with Project Voldemort](http://static.usenix.org/events/fast12/tech/full_papers/Sumbaly.pdf),” at *10th USENIX Conference on File and Storage
    Technologies* (FAST), February 2012.

1.  Varun Sharma:
[Open-Sourcing Terrapin: A Serving System for Batch Generated Data](https://engineering.pinterest.com/blog/open-sourcing-terrapin-serving-system-batch-generated-data-0),” *engineering.pinterest.com*, September 14, 2015.

1.  Nathan Marz:
[ElephantDB](http://www.slideshare.net/nathanmarz/elephantdb),” *slideshare.net*, May 30, 2011.

1.  Jean-Daniel (JD) Cryans:
[How-to: Use HBase Bulk Loading, and Why](http://blog.cloudera.com/blog/2013/09/how-to-use-hbase-bulk-loading-and-why/),” *blog.cloudera.com*, September 27, 2013.

1.  Nathan Marz:
[How to Beat the CAP   Theorem](http://nathanmarz.com/blog/how-to-beat-the-cap-theorem.html),” *nathanmarz.com*, October 13, 2011.

1.  Molly Bartlett Dishman and Martin Fowler:
[Agile   Architecture](http://conferences.oreilly.com/software-architecture/sa2015/public/schedule/detail/40388),” at *O'Reilly Software Architecture Conference*, March 2015.

1.  David J. DeWitt and Jim N. Gray:
[Parallel Database Systems: The Future of High Performance Database Systems](http://www.cs.cmu.edu/~pavlo/courses/fall2013/static/papers/dewittgray92.pdf),”
    *Communications of the ACM*, volume 35, number 6, pages 85–98, June 1992.
    [doi:10.1145/129888.129894](http://dx.doi.org/10.1145/129888.129894)

1.  Jay Kreps:
[But the multi-tenancy thing is actually really really hard](https://twitter.com/jaykreps/status/528235702480142336),” tweetstorm, *twitter.com*, October 31, 2014.

1.  Jeffrey Cohen, Brian Dolan, Mark Dunlap, et al.: “[MAD Skills: New Analysis Practices for Big Data](http://www.vldb.org/pvldb/2/vldb09-219.pdf),” *Proceedings of the VLDB Endowment*, volume 2, number
    2, pages 1481–1492, August 2009.
    [doi:10.14778/1687553.1687576](http://dx.doi.org/10.14778/1687553.1687576)

1.  Ignacio
    Terrizzano, Peter Schwarz, Mary Roth, and John E. Colino:
[Data Wrangling: The Challenging Journey from the Wild to the Lake](http://cidrdb.org/cidr2015/Papers/CIDR15_Paper2.pdf),” at *7th Biennial Conference on Innovative Data Systems
    Research* (CIDR), January 2015.

1.  Paige Roberts:
[To Schema on Read or to Schema on Write, That Is the Hadoop Data Lake Question](http://adaptivesystemsinc.com/blog/to-schema-on-read-or-to-schema-on-write-that-is-the-hadoop-data-lake-question/),” *adaptivesystemsinc.com*, July 2, 2015.

1.  Bobby Johnson and Joseph Adler:
[The Sushi Principle: Raw Data Is Better](https://vimeo.com/123985284),” at
    *Strata+Hadoop World*, February 2015.

1.  Vinod Kumar Vavilapalli, Arun C. Murthy, Chris Douglas, et al.:
[Apache Hadoop YARN: Yet Another Resource Negotiator](http://www.socc2013.org/home/program/a5-vavilapalli.pdf),” at *4th ACM Symposium on Cloud Computing* (SoCC), October 2013.
    [doi:10.1145/2523616.2523633](http://dx.doi.org/10.1145/2523616.2523633)

1.  Abhishek Verma, Luis Pedrosa, Madhukar Korupolu, et al.:
[Large-Scale Cluster Management at Google with Borg](http://research.google.com/pubs/pub43438.html),” at *10th European Conference on Computer Systems* (EuroSys), April 2015.
    [doi:10.1145/2741948.2741964](http://dx.doi.org/10.1145/2741948.2741964)

1.  Malte Schwarzkopf:
[The Evolution of Cluster Scheduler Architectures](http://www.firmament.io/blog/scheduler-architectures.html),” *firmament.io*, March 9, 2016.

1.  Matei Zaharia, Mosharaf Chowdhury, Tathagata Das, et al.:
[Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing](https://www.usenix.org/system/files/conference/nsdi12/nsdi12-final138.pdf),” at *9th
    USENIX Symposium on Networked Systems Design and Implementation* (NSDI), April 2012.

1.  Holden Karau, Andy Konwinski, Patrick Wendell, and Matei
    Zaharia: *Learning Spark*. O'Reilly Media, 2015. ISBN: 978-1-449-35904-1

1.  Bikas Saha and Hitesh Shah:
[Apache Tez: Accelerating Hadoop Query Processing](http://www.slideshare.net/Hadoop_Summit/w-1205phall1saha),” at *Hadoop Summit*, June 2014.

1.  Bikas Saha, Hitesh Shah, Siddharth Seth, et al.:
[Apache Tez: A Unifying Framework for Modeling and Building Data Processing Applications](http://home.cse.ust.hk/~weiwa/teaching/Fall15-COMP6611B/reading_list/Tez.pdf),” at *ACM
    International Conference on Management of Data* (SIGMOD), June 2015.
    [doi:10.1145/2723372.2742790](http://dx.doi.org/10.1145/2723372.2742790)

1.  Kostas Tzoumas:
[Apache Flink: API, Runtime, and Project Roadmap](http://www.slideshare.net/KostasTzoumas/apache-flink-api-runtime-and-project-roadmap),” *slideshare.net*, January 14, 2015.

1.  Alexander Alexandrov, Rico Bergmann, Stephan Ewen, et al.:
[The Stratosphere Platform for Big Data Analytics](https://ssc.io/pdf/2014-VLDBJ_Stratosphere_Overview.pdf),” *The VLDB Journal*, volume 23, number 6, pages 939–964, May 2014.
    [doi:10.1007/s00778-014-0357-y](http://dx.doi.org/10.1007/s00778-014-0357-y)

1.  Michael Isard, Mihai Budiu, Yuan Yu, et al.:
[Dryad: Distributed Data-Parallel Programs from Sequential Building Blocks](http://research.microsoft.com/en-us/projects/dryad/eurosys07.pdf),” at *European Conference on Computer
    Systems* (EuroSys), March 2007.
    [doi:10.1145/1272996.1273005](http://dx.doi.org/10.1145/1272996.1273005)

1.  Daniel Warneke and Odej Kao:
[Nephele: Efficient Parallel Data Processing in the Cloud](https://stratosphere2.dima.tu-berlin.de/assets/papers/Nephele_09.pdf),” at *2nd Workshop on Many-Task Computing on Grids and
    Supercomputers* (MTAGS), November 2009.
    [doi:10.1145/1646468.1646476](http://dx.doi.org/10.1145/1646468.1646476)

1.  Lawrence Page, Sergey Brin, Rajeev
    Motwani, and Terry Winograd: “<a href="http://ilpubs.stanford.edu:8090/422/">The <span class="keep-together">PageRank

1.  Leslie G. Valiant:
[A Bridging Model for Parallel Computation](http://dl.acm.org/citation.cfm?id=79181),”
    *Communications of the ACM*, volume 33, number 8, pages 103–111, August 1990.
    [doi:10.1145/79173.79181](http://dx.doi.org/10.1145/79173.79181)

1.  Stephan Ewen, Kostas Tzoumas, Moritz Kaufmann, and Volker Markl:
[Spinning Fast Iterative Data Flows](http://vldb.org/pvldb/vol5/p1268_stephanewen_vldb2012.pdf),” *Proceedings of the VLDB Endowment*, volume 5, number 11, pages 1268-1279, July 2012.
    [doi:10.14778/2350229.2350245](http://dx.doi.org/10.14778/2350229.2350245)

1.  Grzegorz Malewicz, Matthew H.
    Austern, Aart J. C. Bik, et al.: “[Pregel: A System for Large-Scale Graph Processing](https://kowshik.github.io/JPregel/pregel_paper.pdf),” at *ACM International Conference on Management of
    Data* (SIGMOD), June 2010.
    [doi:10.1145/1807167.1807184](http://dx.doi.org/10.1145/1807167.1807184)

1.  Frank McSherry, Michael Isard, and Derek G. Murray:
[Scalability! But at What COST?](http://www.frankmcsherry.org/assets/COST.pdf),” at
    *15th USENIX Workshop on Hot Topics in Operating Systems* (HotOS), May 2015.

1.  Ionel Gog, Malte Schwarzkopf, Natacha Crooks, et al.:
[Musketeer: All for One, One for All in Data Processing Systems](http://www.cl.cam.ac.uk/research/srg/netos/camsas/pubs/eurosys15-musketeer.pdf),” at *10th European Conference on
    Computer Systems* (EuroSys), April 2015.
    [doi:10.1145/2741948.2741968](http://dx.doi.org/10.1145/2741948.2741968)

1.  Aapo Kyrola, Guy Blelloch, and Carlos Guestrin:
[GraphChi: Large-Scale Graph Computation on Just a PC](https://www.usenix.org/system/files/conference/osdi12/osdi12-final-126.pdf),” at *10th USENIX Symposium on Operating Systems
    Design and Implementation* (OSDI), October 2012.

1.  Andrew Lenharth, Donald Nguyen, and Keshav Pingali:
[Parallel Graph Analytics](http://cacm.acm.org/magazines/2016/5/201591-parallel-graph-analytics/fulltext),” *Communications of the ACM*, volume 59, number 5, pages 78–87, May
    2016. [doi:10.1145/2901919](http://dx.doi.org/10.1145/2901919)

1.  Fabian Hüske:
[Peeking into Apache Flink's Engine Room](http://flink.apache.org/news/2015/03/13/peeking-into-Apache-Flinks-Engine-Room.html),” *flink.apache.org*, March 13, 2015.

1.  Mostafa Mokhtar:
[Hive 0.14 Cost Based Optimizer (CBO) Technical Overview](http://hortonworks.com/blog/hive-0-14-cost-based-optimizer-cbo-technical-overview/),” *hortonworks.com*, March 2, 2015.

1.  Michael Armbrust, Reynold S Xin, Cheng Lian, et al.:
[Spark SQL: Relational Data Processing in Spark](http://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf),” at *ACM International Conference on Management of Data* (SIGMOD), June 2015.
    [doi:10.1145/2723372.2742797](http://dx.doi.org/10.1145/2723372.2742797)

1.  Daniel Blazevski:
[Planting Quadtrees for Apache Flink](http://insightdataengineering.com/blog/flink-knn/),” *insightdataengineering.com*, March 25, 2016.

1.  Tom White:
[Genome Analysis Toolkit: Now Using Apache Spark for Data Processing](http://blog.cloudera.com/blog/2016/04/genome-analysis-toolkit-now-using-apache-spark-for-data-processing/),” *blog.cloudera.com*, April 6, 2016.


V
Vonng 已提交
1026 1027
------

V
Vonng 已提交
1028 1029
| 上一章                            | 目录                            | 下一章                   |
| --------------------------------- | ------------------------------- | ------------------------ |
M
MuAlex 已提交
1030
| [第三部分:派生数据](part-iii.md) | [设计数据密集型应用](README.md) | [第十章:流处理](ch7.md) |