第2章:MapReduce

MapReduce是一个数据处理的编程模型。这个模型很简单,但也不是简单到不能够支持一些有用的语言。Hadoop能够运行以多种语言写成的MapReduce程序。在这一章中,我们将看看怎样用Java,Ruby,Python语言来写同一个例子。更重要的是,MapReduce程序天生并发运行,这就相当于把能够进行大数据分析的工具交到了某个拥有足够多机器的人手里。

##气候数据集
在我们的例子中,将会写一个程序来挖掘天气数据。天气传感器每一个小时都会在全球的许多地方收集数据,并且也收集了大量的日志数据。这些数据非常适合于用MapReduce分析。因为我们想要处理所有数据,并且这些数据是半结构化的和面向记录的。

###数据格式
我们所使用的数据来自于国家气候数据中心或称为NCDC。数据以行形式ASCII格式存储,每一行一条记录。这种格式支持丰富的气象属性集合,其中许多属性是可选的,长度可变的。简便起见,我们仅仅关注基本的属性,如温度。温度总是有值并且长度固定。

示例2-1显示了一行记录,并且将主要的属性进行了注释。这一行记录被分成了多行,每个属性一行。真实文件中,这些属性都会被放进一行,并且没有分隔符。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
示例:2-1
0057
332130 # USAF 天气基站标识
99999 # WBAN 天气基站标识
19500101 # 观察日期
0300 # 观察时间
4
+51317 # 纬度 (角度 x 1000)
+028783 # 经度 (角度 x 1000)
FM-12
+0171 # 海拔 (米)
99999
V020
320 # 风向 (角度)
1 # 质量码
N
0072
1
00450 # 天空最高高度 (米)
1 # 质量码
C
N
010000 # 可见距离 (米)
1 # 质量码
N
9
-0128 # 空气温度 (摄氏度 x 10)
1 # 质量码
-0139 # 露点温度 (摄氏度 x 10)
1 # 质量码
10268 # 大气压 (百帕 x 10)
1 # 质量码

数据文件按照日期和天气基站整理。从1901到2001,每一年都有一个目录文件。每一个目录文件中包括每一个天气基站收集到的当年气候数据的压缩文件。例如1990年部分文件:

1
2
3
4
5
6
7
8
9
10
11
% ls raw/1990 | head
010010-99999-1990.gz
010014-99999-1990.gz
010015-99999-1990.gz
010016-99999-1990.gz
010017-99999-1990.gz
010030-99999-1990.gz
010040-99999-1990.gz
010080-99999-1990.gz
010100-99999-1990.gz
010150-99999-1990.gz

由于有成千上万个天气基站,所以每一年都由大量的相关小文件组成。通常处理少量的大文件更容易和有效。所以这些数据需要被预处理,使每一年的所有记录都被放到一个文件中(附录C中有详细的方法说明)。

##使用Unix工具分析
如何获取每一年的全球最高温度呢?我们首先不使用Hadoop工具来回答这个问题。
这将会为我们提供一个性能基准线和检查我们往后的结果是否准确的方法。
经典的处理行结构数据的工具是awk。示例2-2向我们展示了如何获取每一年全球最高温度。

1
2
3
4
5
6
7
8
9
10
11
12
示例2-2
#!/usr/bin/env bash
for year in all/*
do
echo -ne `basename $year .gz`"\t"
gunzip -c $year | \
awk '{ temp = substr($0, 88, 5) + 0;
q = substr($0, 93, 1);
if (temp !=9999 && q ~ /[01459]/ && temp > max) max = temp }
END { print max }'
done
`

这个脚本循环处理已经压缩的年文件,首先输出年度值,然后使用awk处理每一个文件。awk脚本从这些数据中提取出空气温度和质量码。空气温度通过加0转换成整数,下一步,判断温度(温度9999在NCDC中表示没检测到温度)和质量码是否有效。质量码表示此温度值是否准确或者错误。如果温度值没有问题,则与目前为止最高温度相比较,如果比目前最高温度高,则更新最高温度。当文件中所有行被处理之后,END块被执行,打印出最高温度。下面看看部分运行结果:

1
2
3
4
5
6
7
8
% ./max_temperature.sh
1901 317
1902 244
1903 289
1904 256
1905 283
...
`

源文件中的温度值被扩大了10倍,所以1901年的最高温度是31.7摄氏度,由于在20世纪初读取到的气候值非常有限,所以这个结果只能是近似真实。在硬件是单个超大型高CPU EC2实例计算中跑完整个世纪的数据花了42分钟。

为了提高处理速度,我们需要并行运行部分程序。理论上,我们很容易想到可以使用计算机中所有可用的线程并行处理不同的年份数据。但是这样仍然存在一些问题。

首先,将整个处理工作进程等分为相同的部分并不简单或明显。在这个例子中,不同的年份的文件大小不一样,并且有的差别很大。所有一些处理进程将会完成地早一些,一些将会晚一些。即时完成早的进程再处理其它工作,整个运行时间仍然被最大的文件限制。一个更好的途径是将输入数据分成大小相等的块,并且处理每一个数据块。虽然这样可能造成更多的工作量。

第二,将每一个独立的处理结果合并在一起需要额外处理工作。在这个例子中,每一年的处理结果都是相互独立的。这些结果会被连接在一起,并且按年排序。如果通过数据量大小数据块途径,合并将更加容易出错。就这个例子而言,某一年的数据可能被分成多个数据块,每一个数据块都单独处理,并得到每一块的最高温度。最后,我们还需要找到某年中这些块中最高温度中的最高温度作为这一年的最高温度。

第三,你仍然会被单个计算机的处理能力限制。如果用单个计算机中所有的处理器,最快的处理时间是20分钟,那么,你不可能更快。而且有的数据集超过单个计算机的处理能力。当使用多台计算机一起处理时,一些其它的因素又会影响性性能,主要有协调性和可靠性两类。谁来执行所有的作业?我们将怎么处理失败的进程?

所以,虽然并行处理是可行的,但却是不那么容易控制的,是复杂的。使用像Hadoop这样的框架来处理这些问题极大地帮助了我们。

##使用Hadoop分析数据
为了充分利用Hadoop提供的并行处理优势,我们需要将我们的查询写在一个MapReduce作业中。在本地的,小数据量地测试后,我们将能够在集群中运行它。

###Map和Reduce
MapReduce将处理过程分成两阶段,map阶段和reduce阶段。每阶段将key-value键值对做为输入和输出。开发者可以选择输入输出参数类型,也能指定两个函数:map函数和reduce函数。

map阶段的输入数据是原始的NCDC数据。我们选择文本格式。文本中的每一行表示一条文本记录。key值是行开头距离当前文件开头的位移,但是我们不需要它,忽略即可。

map函数很简单。因为我们仅关心年份和温度,所以获取每行的年度和温度即可,其它属性不需要。这个例子中,仅仅是一个数据准备阶段,以某种方法准备reduce函数能够处理的数据。map函数还是一个丢弃坏记录的地方,例如那些没有测量到的,不准备的或错误的温度。

为了展现map怎么样工作的,选取少量的输入数据进行说明(为了适应页面宽度,一些没有使用到的列用省略号表示)
0067011990999991950051507004…9999999N9+00001+99999999999…
0043011990999991950051512004…9999999N9+00221+99999999999…
0043011990999991950051518004…9999999N9-00111+99999999999…
0043012650999991949032412004…0500001N9+01111+99999999999…
0043012650999991949032418004…0500001N9+00781+99999999999…
这些行以key-value的形式提供给map函数:
(0, 0067011990999991950051507004…9999999N9+00001+99999999999…)
(106, 0043011990999991950051512004…9999999N9+00221+99999999999…)
(212, 0043011990999991950051518004…9999999N9-00111+99999999999…)
(318, 0043012650999991949032412004…0500001N9+01111+99999999999…)
(424, 0043012650999991949032418004…0500001N9+00781+99999999999…)
关键值是行的位移,在map函数中我们可以忽略它。map函数仅仅需要获取到年度和温度值(以粗体表示的数据),然后输出。输出的时候将温度值转换成整数。
(1950, 0)
(1950, 22)
(1950, −11)
(1949, 111)
(1949, 78)

map的输出结果在被送往reduce函数之前被MapReduce框架按照关键字排序合并处理。所以在进行下一步之前,reduce函数会接收到如下数据:
(1949, [111, 78])
(1950, [0, 22, −11])
如上所示,每一年的所有温度值都合并到一个列表中。reduce函数所要做的就是遍历每一年的温度,然后找到最高温度。
(1949, 111)
(1950, 22)
以上就是最终的输出:每一年的最高温度。
整个数据流程如图2-1所示。在图表底部是对应的Unix命令。它模拟整个MapReduce流程,我们将会在这章节的后面Hadoop Streaming中看到。图2-1 MapReduce逻辑数据流程图

###JAVA MapReduce
在知道了MapReduce程序怎么样工作了之后,下一步是用代码实现它。我们需要做三件事情:map函数,reduce函数,运行作业的代码。map功能以Mapper抽象类表示
,它申明了一个map()抽象方法。示例2-3显示了map函数的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
示例2-3
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MaxTemperatureMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 9999;

@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}

Mapper类是一个泛型,有四个形参,分别表示输入key,输入值,输出key,和map函数输出值类型。就当前的例子来说,输入key是一个长整型的位移,输入值是一行文本,输出key是年份,输出会是是空气温度(整数)。Hadoop使用它自己的基本类型集而不使用JAVA内建的基本类型。因为Hadoop自己的基本类型对网络序列化进行了优化。这些基本类型可以在 org.apache.hadoop.io pack‐
age中找到。这里我们使用 LongWritable类型,它表示长文本类型,对应了Java的String类型,又使用了 IntWritable类型,对应于Java的Integer类型。

map函数被传了一个key值和一个value值,我们把包含输入的一行文本转换成Java String类型数据,并使用String的SubString方法取到我们感兴趣的列值。

map函数也提供了一个Context实例,以便将输出结果写入其中。在我们的这个例子中,我们把年份作为文本类型Key值写到Context中,把温度封闭成IntWritable类型也写入Context.并且只有温度有效并且质量码显示当前温度的获取是正常的时候才写入。

reduce功能类似地用Reduce抽象类表示,实例类见示例2-4

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
示例2-4
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MaxTemperatureReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}

Reduce抽象类也是一个泛型类,也具有四个形参。reduce函数的输入类型必须匹配map的输出类型,即Text和IntWritable.此例子中,reduce函数的输出是Text和IntWritable类型,分别表示年份与当前年份最高温度。通过遍历温度值,将当前温度值与最高温度比较来找到当前年份的最高温度。

第三部分是运行MapReduce作业的代码,见示例2-5.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
示例2-5
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MaxTemperature {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: MaxTemperature <input path> <output path>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(MaxTemperature.class);
job.setJobName("Max temperature");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

Job对象指明运行一个作业所需要的所有设置以及让你控制作业如何执行。当我们在一个Hadoop集群上运行这个作业的时候,我们需要将代码打包成JAR文件,Hadoop会把JAR文件在集群中分发。我们可以通过setJarByClass方法指定类文件,而不需要显示指明JAR文件的名字。Hadoop会搜索包含setJarByClass指定的类的相关JAR文件。

创建了一个实例Job后,指定输入和输出文件路径。通过调用 FileInputFormat 的静态方法addInputPath()指定输入路径,此路径可以是一个文件,也可以是一个目录。如果是一个目录,输入的数据包含此目录下所有文件。还可以是文件类型。就像方法名所表示的那样,addInputPath()可以被调用多次以便添加多个输入路径。

输出路径通过FileOutputFormat 的静态方法setOutputPath()指定。输出路径仅可以指定一次。它指定了一个目录。reduce会把它的输出结果的文件放到这个目录下。这个目录在运行Hadoop之前不应该存在。因为如果存在Hadoop将会报错并不会执行作业。这是为了预防数据丢失。因为如果不小心覆盖了同一目录下其它作业的输出结果将是非常令人懊恼的。

下一步使用 setMapperClass() 和setReducerClass()方法指定map和reduce类。setOutputKeyClass()和 setOutputValueClass()方法控制reduce函数输出参数的类型。必须和Reduce抽象类中参数的一致。map输出参数的类型默认是相同的类型。所以如果map和reduce函数有相同的输出参数类型时就不需要特别指定了。就像我们这个例子这样。然而,如果它们不相同,就需要通过 setMapOutputKeyClass() 和setMapOutputValueClass()函数来指定map的输出参数类型。

map函数的输入参数类型通过输入格式指定。我们没有显示地设置,因为我们使用了默认的TextInputFormat格式。

在指定了自定义的map和reduce函数之后,就可以准备执行作业了。Job类的waitForCompletion()方法用于提交作业,并用等待作业完成。这个方法需要一个参数,用以表示是否将作业日志详细信息输出到控制台。如果为true,就输出。这个方法的返回值是一个布尔类型,用于表示作业的执行成功与否。成功返回true,失败返回false。这里我们将成功与否转换成了0或1。

1
2
这部分使用的Java MapReduce API以及这本书所使用的所有API被称为"New API"。  
它代替了功能相同的老的API。这两种API区别请查看附录D,并且附录D有如何在这两种API转换的相关建议。当然你也能在这儿用旧的API完成相同功能的获取每年最高温度的应用。

###测试运行
在完成MapReduce作业编写之后,正常情况下使用少量数据集测试运行,方便立即检测出代码问题。首先以脱机模式安装Hadoop(附录A中有说明),这个模式下Hadoop使用本地文件生成本地作业运行。可以在这本书的网站上找到安装和编译这个示例的说明。
让我们使用上面五行数据运行这个作业,输出结果稍微调整了一下以便适应页面,并且有一些行被删除了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
% export HADOOP_CLASSPATH=hadoop-examples.jar
% hadoop MaxTemperature input/ncdc/sample.txt output
14/09/16 09:48:39 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
14/09/16 09:48:40 WARN mapreduce.JobSubmitter: Hadoop command-line option
parsing not performed. Implement the Tool interface and execute your application
with ToolRunner to remedy this.
14/09/16 09:48:40 INFO input.FileInputFormat: Total input paths to process : 1
14/09/16 09:48:40 INFO mapreduce.JobSubmitter: number of splits:1
14/09/16 09:48:40 INFO mapreduce.JobSubmitter: Submitting tokens for job:
job_local26392882_0001
14/09/16 09:48:40 INFO mapreduce.Job: The url to track the job:
http://localhost:8080/
14/09/16 09:48:40 INFO mapreduce.Job: Running job: job_local26392882_0001
14/09/16 09:48:40 INFO mapred.LocalJobRunner: OutputCommitter set in config null
14/09/16 09:48:40 INFO mapred.LocalJobRunner: OutputCommitter is
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
14/09/16 09:48:40 INFO mapred.LocalJobRunner: Waiting for map tasks
14/09/16 09:48:40 INFO mapred.LocalJobRunner: Starting task:
attempt_local26392882_0001_m_000000_0
14/09/16 09:48:40 INFO mapred.Task: Using ResourceCalculatorProcessTree : null
14/09/16 09:48:40 INFO mapred.LocalJobRunner:
14/09/16 09:48:40 INFO mapred.Task: Task:attempt_local26392882_0001_m_000000_0
is done. And is in the process of committing
14/09/16 09:48:40 INFO mapred.LocalJobRunner: map
14/09/16 09:48:40 INFO mapred.Task: Task 'attempt_local26392882_0001_m_000000_0'
done.
14/09/16 09:48:40 INFO mapred.LocalJobRunner: Finishing task:
attempt_local26392882_0001_m_000000_0
14/09/16 09:48:40 INFO mapred.LocalJobRunner: map task executor complete.
14/09/16 09:48:40 INFO mapred.LocalJobRunner: Waiting for reduce tasks
14/09/16 09:48:40 INFO mapred.LocalJobRunner: Starting task:
attempt_local26392882_0001_r_000000_0
14/09/16 09:48:40 INFO mapred.Task: Using ResourceCalculatorProcessTree : null
14/09/16 09:48:40 INFO mapred.LocalJobRunner: 1 / 1 copied.
14/09/16 09:48:40 INFO mapred.Merger: Merging 1 sorted segments
14/09/16 09:48:40 INFO mapred.Merger: Down to the last merge-pass, with 1
segments left of total size: 50 bytes
14/09/16 09:48:40 INFO mapred.Merger: Merging 1 sorted segments
14/09/16 09:48:40 INFO mapred.Merger: Down to the last merge-pass, with 1
segments left of total size: 50 bytes
14/09/16 09:48:40 INFO mapred.LocalJobRunner: 1 / 1 copied.
14/09/16 09:48:40 INFO mapred.Task: Task:attempt_local26392882_0001_r_000000_0
is done. And is in the process of committing
14/09/16 09:48:40 INFO mapred.LocalJobRunner: 1 / 1 copied.
14/09/16 09:48:40 INFO mapred.Task: Task attempt_local26392882_0001_r_000000_0
28 | Chapter 2: MapReduce
is allowed to commit now
14/09/16 09:48:40 INFO output.FileOutputCommitter: Saved output of task
'attempt...local26392882_0001_r_000000_0' to file:/Users/tom/book-workspace/
hadoop-book/output/_temporary/0/task_local26392882_0001_r_000000
14/09/16 09:48:40 INFO mapred.LocalJobRunner: reduce > reduce
14/09/16 09:48:40 INFO mapred.Task: Task 'attempt_local26392882_0001_r_000000_0'
done.
14/09/16 09:48:40 INFO mapred.LocalJobRunner: Finishing task:
attempt_local26392882_0001_r_000000_0
14/09/16 09:48:40 INFO mapred.LocalJobRunner: reduce task executor complete.
14/09/16 09:48:41 INFO mapreduce.Job: Job job_local26392882_0001 running in uber
mode : false
14/09/16 09:48:41 INFO mapreduce.Job: map 100% reduce 100%
14/09/16 09:48:41 INFO mapreduce.Job: Job job_local26392882_0001 completed
successfully
14/09/16 09:48:41 INFO mapreduce.Job: Counters: 30
File System Counters
FILE: Number of bytes read=377168
FILE: Number of bytes written=828464
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
Map-Reduce Framework
Map input records=5
Map output records=5
Map output bytes=45
Map output materialized bytes=61
Input split bytes=129
Combine input records=0
Combine output records=0
Reduce input groups=2
Reduce shuffle bytes=61
Reduce input records=5
Reduce output records=2
Spilled Records=10
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=39
Total committed heap usage (bytes)=226754560
File Input Format Counters
Bytes Read=529
File Output Format Counters
Bytes Written=29

当我们在hadoop命令第一个参数填写一个类名的时候,会启动一个JVM(JAVA虚拟机),并执行这个类。hadoop命令添加hadoop库和库所依赖的其它库文件到Classpath变量,并且加载hadoop配置。为了将应用中的类文件添加到classpath中,我们定义了一个 HADOOP_CLASSPATH环境变量,来加载我们所写的hadoop脚本。

1
当以本地(脱机)模式运行时,这本书中所有程序都假设你已经以这种方法设置了 HADOOP_CLASSPATH环境变量。这条命令应该在示例代码所在目录运行。

作业运行日志提供了一些有用的信息。例如,我们能看到这个作业被给了一个作业ID:job_local26392882_0001.运行了一个map任务和一个reduce任务(ID分别是:attempt_local26392882_0001_m_000000_0 和attempt_local26392882_0001_r_000000_0)。知道作业和任务ID在调用MapReduce作业时将很有用。

最后还有一部分名为”Counters”的数据,这部分数据是Hadoop为每一个作业生成的统计信息。这些信息将对于检查处理的数据与预期的数据是否一样非常有用。例如,我们能知道通过系统各部分的记录数,5条map输入记录,5条map输出记录(可以看出map对于每一条有效的输入记录都有对应的一条输出记录)。还能看出以key值分成2组的5条reduce输入记录,以及2条输出记录。

输出结果写入输出目录。每一个reduce函数生成一个输出文件。这个作业只有一个reduce函数,所以只产生一个文件。名称是part-r-00000:
% cat output/part-r-00000
1949 111
1950 22

这个结果跟之前手工计算的一致。这个结果表示1949年最高温度是11.1摄氏度,1950是2.2度。

##扩展
你已经知道了MapReduce怎么样处理少量数据。现在是时候全局看系统,并且对于大数据处理的数据流。简单来说,到目前为止,我们所举的例子都用的本地计算机的文件。更进一步,我们将要在分布计算机(特别是HDFS,我们将在下一节中学到)中存储文件数据。使用Hadoop的资源管理系统YARN(第4节),Hadoop会将MapReduce计算过程分发到各个计算机中计算,而这些计算机每一台都保存着一部分数据。让我们来看看这些是如何发生的。

###工作流
首先,MapReduce作业是客户端需要去执行的工作单元。它包括输入数据,MapReduce程序以及一些配置信息。Hadoop会把这个作业分成多个任务步骤执行。有两种类型:map任务和reduce任务。这些任务通过YARN计划调度并在分布式系统节点上运行。如果一个任务失败了,YARN会把它放到另外一个节点上重新运行。

Hadoop会把输入数据化分成大小相同的数据片断(被称为输入片或均片),Hadoop会为每一个片创建一个map任务。map任务会一条条记录地循环执行用户自定义的map函数,直到这个片断中所有记录处理完毕。

很多片断意味着处理每一个片断的时间比一次处理整个输入数据的时间少。所以当我们并发地处理这些片断,而这些片断很小时,能够更好地负载均衡。所以一个性能好的机器比一个性能差些的机器能够相应在处理更多地片断。即使这些机器性能完全一样,失败的处理进程或者同时运行的作业使负载均衡成为可能(Even if the machines are identical, failed processes or other jobs running
concurrently make load balancing desirable)。并且当片断细粒度越高,负载均衡的质量也会越高。

别外一方面,如果片断过于小,管理片断和创建Map任务所花费的时候则会成为整个作业执行时间的瓶颈。对于大多数作业来说,一个好的片断大小趋向于一个HDFS块的大小,默认是128M。这个大小可以被集群(Cluster)改变(集群的改为会影响在机群中新创建的所有文件),或者文件新建时就指定。

Hadoop尽量会在输入数据存放的HDFS那个节点运行Map任务,因为这样不会占用宝贵的集群带宽资源。这被称为本地优化。然后,有时候拥有HDFS数据的节点上正运行着其它Map任务,作业调试器会尝试着在当前集群其它空闲的节点上创建一个Map任务。极少情况下,会到其它集群中的某个节点中创建一个Map任务,这样就需要集群间网络传输。这三种可能性在图表2-2中展示:图2-2

现在清楚了为什么最优的片断大小是设置成HDFS块大小。因为这样做是数据能被存储在一个节点上的最大数据量。如果一个片断跨两个块大小,任何一个HDFS节点都不太可能储存两个块大小的数据量,这个势必会造成片断的部分数据通过网络传输到正在运行Map任务的节点上。这明显的比直接在本地运行Map任务的性能差一些。

Map任务会将它的输出结果写入本地硬盘中,而不是HDFS,为什么要这样做?因为Map的输出只是中间的输出,后续它将会被Reduce任务处理产生最终输出结果。一旦作业完成了,Map的输出结果可以被丢弃,所以将Map的输出结果复制到HDFS中不必要的。如果在Reduce利用Map的输入结果前,节点运行失败了。Hadoop将在自动的在另外一个节点中重新执行这个Map任务,重新产生输入结果。

Reduce任务没有像Map任务那样利用数据本地化的优势,一个Reduce任务的输入往往来自所有Map任务的输出。就拿目前的例子来说,我们有一个Reduce任务,其输入数据来自所有的Map任务。因此存储的Map结果必须通过网络传输到运行Reduce的节点上。之后这些传过来的数据会被合并,并传到用户自定义的reduce函数中执行。Reduce的输出结果正常都会存储在HDFS中。就像第三节说明的,对于存储Reduce输出结果的每一个HDFS块,第一份复制的数据会存储在本地,其它复制的数据会存储在其它集群可靠的HDFS块中。因此存储Reduce的输出结果确定需要消耗网络带宽,但也仅仅和一个正常的HDFS输出通道消耗的一样多。

拥有一个Reduce任务的数据流在图表2-3中展示。虚线框表示节点,虚线箭头表示节点内的数据传输。实线的箭头表示节点间的数据传输。图2-3 单个Reduce任务的MapReduce数据流

Reduce任务的个数不是由输入数据量的大小决定,而是单独指定的。在”默认的MapReduce作业”那一节,你将会看到对于给定的作业,如何选择Reduce任务的个数。

当有多个reduce时,map任务会将它们的结果分区,每一个map任务会为每一个reduce任务创建一个分区。每一个分区里可以用很多个key和ke关联的值,但某一个key的所有记录必须在同一个分区里。分区这个过程能够被用户自定义的函数控制,但一般来讲,默认的分区函数已经能够工作地很好了。它使用哈希函数来将key分类。

多个reduce的一般数据流程图在图表2-4显示。这张图表清楚地显示了map和reduce之间的数据流为什么被通俗地叫做”洗牌”。”洗牌”的过程比这个图表显示的更复杂。你将会在”洗牌和排序”这一节中看到,调整它可以对作业的运行时间有很大影响。图2-4 多个reduce任务的MapReduce数据流

最后,也可以有零个reduce任务。这种情况发生在仅并发执行map任务就能够输出结果的时候。此时数据的传输仅发生在map的输出结果写入HDFS的时候(如图2-5)。
图2-5 零reduce任务的MapReduce数据流

###组合函数(Combiner Function)
许多MapReduce作业执行时间被集群的带宽资源限制。所以值得我们去尽量减少map与reduce之间传输的数据量。Hadoop允许用户指定一个组合函数,以便在map输出结果后执行。这个组合函数的输出形成了reduce任务的输入。由于组合函数是优化函数,所以Hadoop不能确保为每一个map输出记录调用多少次组合函数。也就是说,零次,一次或多次调用组合函数,reduce最终都应该输出相同的结果。

组合函数的这种特性限制了它能被使用的业务情形。用一个例子能更好说明。假设最大的温度,例如1950的,被两个map任务处理,因为1950年数据分布在不同的片断中。假如第一个map任务输出如下结果:
(1950,0)
(1950,20)
(1950,10)
第二个map输出如下结果:
(1950,25)
(1950,15)
Hadoop将会用以上所有值组成列表传给reduce
(1950,[0,20,10,25,15])
输出:
(1950,25)
既然25是当前列表最大的值。我们就像使用reduce函数一样用一个组合函数找出每一个map结果中的最大温度值。这样的话,reduce得到以下值:
(1950,[20,25])
并且产生与之前相同的结果。我们可以用一种更简洁的方式表示上面的过程:
max(0, 20, 10, 25, 15) = max(max(0, 20, 10), max(25, 15)) = max(20, 25) = 25
然而,并不是所有这样的处理都是合适的,例如,我们要计算平均温度,就不能在组合函数中计算平均温度,因为:mean(0, 20, 10, 25, 15) = 14,但是mean(mean(0, 20, 10), mean(25, 15)) = mean(10, 20) = 15。

组合函数不能代替Reduce函数(Reduce函数仍然需要用来处理来自不同map的含有相同key值的记录),但是它能帮助减少在map与reduce之间传递的数据量。因此,在你的MapReduce作业中,总是值得我们考虑是否使用组合函数。

####指定组合函数
回到之前JAVA MapReduce程序,组合函数使用Reduce类定义,在这个应用中,它与Reduce功能一样。我们唯一要做的就是在作业中设定组合类(示例2-6)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
示例2-6
public class MaxTemperatureWithCombiner {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: MaxTemperatureWithCombiner
<input path> " +"<output path>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(MaxTemperatureWithCombiner.class);
job.setJobName("Max temperature");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MaxTemperatureMapper.class);
job.setCombinerClass(MaxTemperatureReducer.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

###运行一个分布式的MapReduce作业
相同的程序将在全量数据库执行。MapReduce特性是无形中扩大了能处理的数据量大小和硬件体积,运行在10个节点的EC2集群上,这个程序跑了6分钟。在第6节中我们将会看看在集群中运行程序具体的一些技术特性。

##Hadoop Streaming
Hadoop给MapReduce提供了API允许你用除了JAVA语言之外的其它语言写map和reduce函数。Hadoop流使用Unix系统标准流作业Hadoop和你的程序之间的接口,所以你能使用任意其它的能够读取Unix系统标准流输入数据并能够将数据写到标准输出的语言来写MapReduce程序。

流天生地就适用于文本处理。Map的输入数据通过标准的输入流输入到你自定义的map函数中。在map函数中,你将会一行一行的处理数据,然后将这些数据写入到输出流中。map会用Tab分隔key和value,并将它们做为键值对单独一行输出。这些数据将会以相同的格式做为reduce函数的输入。在输入之间,框架将会把它们按照键值排序,然后reduce会处理这些行,然后将结果输出到标准的输出流。

让我们以流的方式重写查找每一年最高温度的MapReduce程序来说明。

###Ruby
map函数以Ruby语言编写,见示例2-7

1
2
3
4
5
6
7
示例2-7
#!/usr/bin/env ruby
STDIN.each_line do |line|
val = line
year, temp, q = val[15,4], val[87,5], val[92,1]
puts "#{year}\t#{temp}" if (temp != "+9999" && q =~ /[01459]/)
end

在示例2-7代码块中,Ruby从标准全局IO常量类型STDIN中读取输入数据,然后遍历每一行数据,找到行中相关的字段,如果有效,则输出到标准的输出流。

有必要看一下Streaming与Java MapReduce API之间的区别。 Java API会一条条记录地调用map函数,然后如果使用Streaming形式,map函数可以自己决定怎么样处理输入数据,可以多行一起处理也可以单行处理。JAVA map实现的函数是被推数据,但是它仍然可以考虑通过将多条记录放到一个实例变量中来实现一次处理多行的操作。这种情况下,你需要实现cleanup()方法,以便知道最后一条记录处理完的时候,能够结束处理。

由于示例2-7基于标准的输入输出操作,可以不通过Hadoop测试,直接通过Unix命令。

1
2
3
4
5
6
% cat input/ncdc/sample.txt | ch02-mr-intro/src/main/ruby/max_temperature_map.rb
1950 +0000
1950 +0022
1950 -0011
1949 +0111
1949 +0078

reduce函数稍微有点复杂,如示例2-8

1
2
3
4
5
6
7
8
9
10
11
12
#!/usr/bin/env ruby
last_key, max_val = nil, -1000000
STDIN.each_line do |line|
key, val = line.split("\t")
if last_key && last_key != key
puts "#{last_key}\t#{max_val}"
last_key, max_val = key, val.to_i
else
last_key, max_val = key, [max_val, val.to_i].max
end
end
puts "#{last_key}\t#{max_val}" if last_key

同map函数一样,reduce函数也会从标准输入中遍历行,但不一样的是,当处理每一个key组的时候,需要存储某个状态。在这个示例中,关键字是年,我们存储最后一次遍历的key,并保存每一个key组中最大的温度。MapReduce框架会确保输入数据会按照关键值排序,所以我们知道如果当前key值不同于上一次遍历的key值时,我们就进入了新的key组。当使用JAVA API时,reduce函数输入的数据就已经按照key值分好了组,而不像Streaming一样需要人为地去判断key组边界。

对于每一行,我们取得key和value值,然后看看是否到达了一组的最后( last_key && last_key != key), 如果到达了,我们记录下这组的Key和最高温度,以Tab制表符分隔,然后初始化最高温度,如果没有到达组的最后,则更新当前Key值的最高温度。最后一行作用是确保最后一个Key组的最高温度能够被记录。

我们现在能够用Unix命令来模拟整个的MapReduce传输通道(等效于图2-1中所示的Unix通道)。

1
2
3
4
5
% cat input/ncdc/sample.txt | \
ch02-mr-intro/src/main/ruby/max_temperature_map.rb | \
sort | ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb
1949 111
1950 22

输出结果与Java程序的一样。下一步使用Hadoop来运行。
Hadoop命令不支持流选项,不过,你可以在jar选项中指定Streaming JAR文件,然后指定输入和输出文件路径,以及map和redeuce脚本文件,看起来如下:

1
2
3
4
5
% hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-input input/ncdc/sample.txt \
-output output \
-mapper ch02-mr-intro/src/main/ruby/max_temperature_map.rb \
-reducer ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb

当在一个集群中基于大数据执行时,我们需要使用-combiner选项来指定组合函数。

1
2
3
4
5
6
7
8
% hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-files ch02-mr-intro/src/main/ruby/max_temperature_map.rb,\
ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb \
-input input/ncdc/all \
-output output \
-mapper ch02-mr-intro/src/main/ruby/max_temperature_map.rb \
-combiner ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb \
-reducer ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb

注意我们使用了-files选项,当我们在集群上运行流程序时,需要将map和reduce脚本文件复制到集群中。

###Python
流程序支持任意能够从标准输入读取数据并将数据写入标准输出的语言。所以使用读者更熟悉的Python,再写一遍以上例子。map脚本如示例2-9,reduce脚本如示例2-10.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
示例2-9:map script
#!/usr/bin/env python
import re
import sys
for line in sys.stdin:
val = line.strip()
(year, temp, q) = (val[15:19], val[87:92], val[92:93])
if (temp != "+9999" and re.match("[01459]", q)):
print "%s\t%s" % (year, temp)

示例:2-10 reduce script
import sys
(last_key, max_val) = (None, -sys.maxint)
for line in sys.stdin:
(key, val) = line.strip().split("\t")
if last_key and last_key != key:
print "%s\t%s" % (last_key, max_val)
(last_key, max_val) = (key, int(val))
else:
(last_key, max_val) = (key, max(max_val, int(val)))
if last_key:
print "%s\t%s" % (last_key, max_val)

我们能像Ruby中一样以相同的方法来运行这个作业。

1
2
3
4
5
% cat input/ncdc/sample.txt | \
ch02-mr-intro/src/main/python/max_temperature_map.py | \
sort | ch02-mr-intro/src/main/python/max_temperature_reduce.py
1949 111
1950 22

本文是笔者翻译自《OReilly.Hadoop.The.Definitive.Guide.4th.Edition》第一部分第2节,后续将继续翻译其它章节。虽尽力翻译,但奈何水平有限,错误再所难免,如果有问题,请不吝指出!希望本文对你有所帮助。