第5章:Hadoop I/O

Hadoop有一些数据I/O方面操作的工具,其中一些比Hadoop使用的都更普遍。例如数据完整性和压缩。但是当使用这些工具处理多达几TB数据的时候,仍然需要特别注意。其它的一些工具或APIs构建成了模块用于开发分布式系统,例如:序列化框架和硬盘上的数据结构化。

数据完整性

Hadoop的使用者肯定都希望数据在存储或处理的过程中不会丢失或损坏。然而,由于硬盘或网络的每一次I/O操作都有可能在读或写的过程中出错,Hadoop经常处理的数据量都相当大,当这样地大量数据在系统中传输时,数据损坏的概率还是挺高的。

通常用于判断数据是否损坏所使用的方法是当数据第一次进入系统时候计算一个校验和,并不管在任何时候,数据被传输通过一个不可信的通道后,再一次计算校验和,就可以判断数据是否损坏。如果新生成的校验和与原始的校验和不一样,就认为数据遭到了损坏。这个技术不能够修复数据,仅仅是检查数据是否损坏。(这也是不使用低档的硬件的原因,特殊情况下,肯定要使用ECC内存)。注意:有时校验和也可能损坏,而数据没有损坏,但这种可能性很低,因为校验和比数据小多了。

通常使用的错误检测码是CRC-32(32位循环冗余校验),不管输入的数据量多大,都计算出一个32的整数校验和。CRC-32在Hadoop的ChecksumFileSystem类中用于计算校验和。然而HDFS使用的是另一种更有效的方式,叫做CRC-32C。

HDFS中的数据完整性

HDFS透明地计算写入的所有数据的校验和,并且默认情况下会在读取数据时验证校验和。每个用dfs.bytes-per-checksum属性指定大小的数据块都会单独生成一个校验和。默认是512字节。因为CRC-32C校验和是4个字节,所以存储开销少于1%。((12810241024)/5124/12810241024)100%=0.78%。

数据节点有责任在接收到数据后存储数据前校验数据,并计算校验和,不管从客户端接收数据还是在复制复本数据期间从其它节点接收数据。正在写入数据到HDFS的客户端将校验和传入数据节点的管道中(如第3章介绍的那样),管道中的最后一个数据节点验证校验和。如果数据节点检测到错误,客户端会收到一个IOExcption子类的异常,客户端应该采取应用程序特定的措施,比如,重新尝试写入。

当客户端从数据节点读取数据的时候,它们也要验证校验和,将读取的数据计算出的校验和和存储在数据节点中的校验和进行比较。每一个数据节点都永久保存了一份验证校验和的历史日志。所以HDFS知道每一个数据块最近验证校验和的时间。当客户端成功地校验了某一块数据的时候,它会告诉数据节点,数据节点会更新日志。保存这些校验的信息可以用于检测坏的硬盘。

除了客户端读取数据时的块校验,每一个数据节点都会以一个后台线程方式执行一个DataBlockScanner,周期性检测存储在这个数据节点上的数据块是否损坏。这是为了防止由于物理存储媒介的”位衰减”造成的数据损坏。可以看第11章的”数据节点块扫描”相关内容详细了解如何获取扫描的报告。

由于HDFS存储着数据块的复本,所以它能够通过复制一个好的复本生成一个新的,没有损坏的复本来修复已经损坏的块。具体实现方法是客户端在读取块数据时如果检测到一个错误,它会向名称节点报告这个坏的块和块所在的数据节点,然后抛出一个ChecksumException异常。名称节点会将这个块状态标记为”损坏”,并不再将其它客户端提供这个块的地址,然后在另外一个数据节点中生成这个块的一个复本,直到达到设置的块复本数,一旦完成后,这个损坏的块就会被删除。

我们可以在调用FileSystem的open方法打开文件之前,调用它的setVerifyChecksum方法,传递一个false值,就可以关闭校验和的验证。如果使用shell命令,我们可以在-get或-copyToLocal命令中添加-ignoreCrc选项同样可以关闭校验和的验证。如果你想要看看这部分损坏的数据,再进行相应处理的话,这个功能将是有用的。例如,你也许想在删除损坏的块数据之前想要看看是否能够恢复块中的数据。

你可以通过使用hadoop fs -checksum来得到一个文件的校验和。这对于检查HDFS中两个文件是否有相同的内容是有用的(distcp命令也可以做这些事情),可以参看”使用distcp命令并发复制”小节举的示例。

LocalFileSystem

Hadoop LocalFileSystem在客户端验证校验和。这意味着当你向一个叫”filename”的文件中写入数据时,文件系统客户端会在文件相同目录透明的创建一个隐藏的文件,叫做 .filename.crc,这个文件包含每一段文件数据的检验和。段的大小由属性file.bytes-per-checksum定义,默认是512字节。段大小做为元数据存储在.crc后缀的文件中,所以即使段大小的设置以后改变了,文件仍然可以完好无损地读取。当读取文件时,会验证检验和,如果出现错误,LocalFileSystem抛出ChecksumException异常。

计算校验和的代价很小(JAVA中,使用native代码计算),不会对读取或写入文件造成什么影响。对于大多数应用来说,为了保证数据的完整性这是完全可以承受的代价。然而,如果底层的文件系统本身支持校验,就可以关闭LocalFileSystem的校验。此时可以使用RawLocalFileSysem代替LocalFileSystem。要想在应用的全局范围内关闭校验和,可以将属性fs.file.impl的值设置为org.apache.hadoop.fs.RawLocalFileSystem。如果你仅仅想对某一些读取操作关闭检验和验证时,也可以直接创建一个RawLocalFileSystem实例。例如:

1
2
3
Configuration conf = ...
FileSystem fs = new RawLocalFileSystem();
fs.initialize(null, conf);

ChecksumFileSystem

LocalFileSystem使用CheckFileSystem来实现校验和的功能。这个类可以很方便地在其它没有校验和功能的文件系统中加入校验和功能。ChecksumFileSystem仅仅是FileSystem的一个封装类。一般的写法如下:

1
2
FileSystem rawFs = ...
FileSystem checksummedFs = new ChecksumFileSystem(rawFs);

底层的文件系统叫做原生文件系统,可以通过ChecksumFileSystem的getRawFileSystem方法获得。ChecksumFileSystem还有一些和校验和相关的有用的方法,例如getChecksumFile()方法用于获取任意文件的校验和文件的路径。可以查看相关参考资料看看其它的方法。

如果在读取文件的过程中,ChecksumFileSystem检测到一个错误。它将会调用它的reportChecksumFailure()方法。这个方法默认的实现没有做任何事情。但是LocalFileSystem将检验失败的文件和它的校验和单独移到同一设备中叫做”bad_files”的目录中。管理员应该定期地检查这个目录中是否有坏的文件并处理。

压缩

文件压缩有两个主要的好处:减少存储文件所需的空间;加快数据在网络传输的速率和从硬盘读写文件的速度。当处理海量数据时,这些好处带来的效果是很可观的,所以它值得我们仔细考虑如何在Hadoop使用压缩。

有多种不同的压缩格式,工具和算法。每一种都有不同的特性。表5-1例举出了Hadoop常用的一些压缩工具/格式。

压缩格式 工具 算法 文件扩展名 可切片?
DEFLATE^a N/A DEFLATE .deflate No
gzip gzip DEFLATE .gz No
bzip2 bzip2 bzip2 .bz2 Yes
LZO lzop LZO .lzo No^b
LZ4 N/A LZ4 .lz4 No
Snappy N/A Snappy .snappy No

所有的压缩算法都会在空间与时间两方面进行权衡:更快地压缩与解压缩算法通常以节约更小的空间为代价。表5-1所列的压缩工具一般都会提供9个不同的选择来在空间与时间上进行权衡。-1意思是优化速度,-9意思是优化空间。例如下面的命令以最快的压缩速度创建了一个叫做file.gz的压缩文件。

1
% gzip -1 file

不同的压缩工具有着非常不同的压缩特性。通常选择gzip压缩格式,所有压缩工具中,gzip在空间与时间权衡方面性能居中。bzip2比gzip有更高的压缩率,但是更慢。bzip2的解压速度比它的压缩速度快,但是还是比其它压缩格式要慢。另一方面,LZO、LZ4和Snappy都优化了速度,在速度上按照顺序依次更快,且都比gzip快很多。但是压缩率不如gzip高。Snappy和LZ4比LZO的解压速度要快很多^c

表5-1的”可切片”一列表示压缩格式是否支持切片(也就是说你可以定位到数据流的任意一点,并从这一点开始读取数据)。可切片的压缩格式特别适合MapReduce。可以看稍后的”压缩与切片”小节进一步讨论。

编解码器

编解码器是压缩-解压算法的实现。Hadoop中,一个编解码器是CompressionCodec接口的一个实现类。例如:GzipCodec封装了gzip的压缩与解压算法。表5-2列举出了Hadoop中的编解码器。

压缩格式 Hadoop CompressCodec
DEFLATE org.apache.hadoop.io.compress.DefaultCodec
gzip org.apache.hadoop.io.compress.GzipCodec
bzip2 org.apache.hadoop.io.compress.BZip2Codec
LZO com.hadoop.compression.lzo.LzopCodec
LZ4 org.apache.hadoop.io.compress.Lz4Codec
Snappy org.apache.hadoop.io.compress.SnappyCodec

LZO库是遵循GPL协议的,所以在Apache发布的Hadoop中也许没有包含,所以需要单独从Google(或GitHub,包含bug修复和更多工具)下载。
LzopCodec是一个兼容lzop工具的实现类,本质上,是带有额外头文件的LZO压缩工具类。也有一个纯LZO压缩格式的实现类叫做LzoCodec,它使用的是.lzo_deflate文件扩展名(就如同DEFLATE,是一个没有头文件的gzip)。

使用CompressionCodec压缩与解压流
CompressionCodec有两个方法,可以让你很容易的压缩或解压数据。为了在写入输出流时压缩数据,你可以使用createOutputStream(OutputStream out)方法创建一个CompressionOutputStream输出流,然后你将未压缩的数据写入,CompressionOutputStream就可以让数据以压缩的形式写入底层输出流中。相应地,要想从一个输入流中解压数据,调用createInputStream(InpuStream in)方法获取一个CompressionInputStream对象,这个对象可以让你从底层流的压缩数据中读取出解压后的数据。

CompressionOutputStream和CompressionInputStream与java.util.zip.DeflaterOutputStream和java.util.zip.DeflaterInputStream相似。不过,前两者比后两者多个功能,可以重置底层的压缩或解压器。这对于想分块压缩数据的应用来说是很重要的。例如本章后面将讲解的SequenceFile。

示例5-1展示了如何将从标准输入流读取的数据压缩,然后写入标准输出流中。

1
2
3
4
5
6
7
8
9
10
11
12
13
示例5-1:压缩读取的数据并写入标准输出流的程序
public class StreamCompressor {
public static void main(String[] args) throws Exception {
String codecClassname = args[0];
Class<?> codecClass = Class.forName(codecClassname);
Configuration conf = new Configuration();
CompressionCodec codec = (CompressionCodec)
ReflectionUtils.newInstance(codecClass, conf);
CompressionOutputStream out = codec.createOutputStream(System.out);
IOUtils.copyBytes(System.in, out, 4096, false);
out.finish();
}
}

这个应用需要一个 CompressionCodec实现类的全类名做为命令行第一个参数。我们使用ReflectionUtils创建一个编码类的实例,然后获取一个System.out的压缩封装类,然后我们调用IOUtils的copyBytes方法将输入流中的数据复制到输出流中。在输出之前,数据会经过 CompressionOutputStream压缩。最后调用 CompressionOutputStream的finish方法告诉压缩器停止向输出流中写入压缩的数据,但不关闭输出流。我们可以用下面的命令进行尝试,压缩”Text”字符串,调用StreamCompressor类,传入 GzipCodec压缩器,然后用gunzip进行解压。

1
2
3
% echo "Text" | hadoop StreamCompressor org.apache.hadoop.io.compress.GzipCodec \
| gunzip -
Text

使用CompressionCodecFactory推断出压缩编码
如果你需要读取一个压缩文件,正常情况下你会观察压缩文件的扩展名来推断出所使用的压缩编码,.gz结尾的文件可以使用 GzipCodec读取等等。每一种压缩编码的扩展名都在表5-1中列举。CompressionCodecFactory通过它的getCodec()方法提供了一种通过扩展名得到相应压缩编码的方法。示例5-2显示了如何使用这种方法解压文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
示例:5-2 从文件扩展名推断出解压缩编码来对文件进行解压的程序
public class FileDecompressor {
public static void main(String[] args) throws Exception {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path inputPath = new Path(uri);
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
CompressionCodec codec = factory.getCodec(inputPath);
if (codec == null) {
System.err.println("No codec found for " + uri);
System.exit(1);
}
String outputUri =
CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension());
InputStream in = null;
OutputStream out = null;
try {
in = codec.createInputStream(fs.open(inputPath));
out = fs.create(new Path(outputUri));
IOUtils.copyBytes(in, out, conf);
} finally {
IOUtils.closeStream(in);
IOUtils.closeStream(out);
}
}
}

一旦推断出了编码,就会使用 CompressionCodecFactory的 removeSuffix() 静态方法去掉文件的后缀得到输出文件名。通过这种方法,使用如下命令,一个叫做file.gz的文件会被解压成file。

1
% hadoop FileDecompressor file.gz

CompressionCodecFactory会加载表5-2中除了LZO之外的所有编码,和表5-3中属性 io.compression.codecs所列举的所有编码,默认情况下,这个属性值为空,如果你希望注册一个自定义的编码(例如这个外部的LZO编码),你就需要修改这个属性。每一种编码都知道它自己的默认文件扩展名,因此CompressionCodecFactory就可以通过搜索所有注册的编码找到与给定文件名匹配的编码。

表5-3:压缩编码配置属性

属性名 类型 默认值 描述
io.compression.codecs 逗号分隔的类名 外部的 CompressionCodec压缩与解压缩类列表

本地库
从性能考虑,推荐使用本地库来进行压缩与解压缩。例如一项测试表明,使用本地库中的gzip比使用java内建的gzip实现至多减少50%解压缩时间和大约10%的压缩时间。表5-4显示了对于所有压缩格式,java与本地库是否提供了实现。所有的格式都有本地实现,但并不是都有java实现,例如LZO压缩格式。

表5-4:压缩库实现

压缩格式 java实现 本地库实现
DEFLATE Yes Yes
gzip Yes Yes
bzip2 Yes Yes
LZO No Yes
LZ4 No Yes
Snappy No Yes

Apache Hadoop的二进制tar包带有预建的针对64位Linux系统的压缩编码本地库实现,叫做 libhadoop.so。对于其它平台,你需要按照Hadoop源码顶级目录中BUILDING.txt文件说明的那样编译自己的库文件。

通过设置java系统属性 java.library.path可以找到本地库,可以在hadoop的etc/hadoop目录中进行设置。如果你不想在hadoop配置文件中设置,你就需要在应用程序中设置。

默认情况下,Hadoop会自动寻找与它运行平台相对应的本地库,如果发现了,就会自动加载。这就意味着你不需要为了使用本地库而做任何配置。然后,在某些情况下,你希望不使用本地库,例如你想调度一个压缩相关的问题时。你可以通过将属性io.native.lib.available设置成false来不使用本地库,而使用Java内建的编码(如果可以找得到的话)。

编码池
如果你正在使用本地库,而且在你的应用程序中需要做大量的压缩与解压缩工作,可以考虑使用编码池(CodecPool)。编码池可以让你重用解压缩对象,而省去了创建这些对象的开销。

示例5-3中的代码显示了编码池的应用。虽然本例实际上只需要创建一个解压缩对象,没有必要使用编码池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
示例5-3:从标准输入中读取数据,使用编码池压缩器将数据写入标准输出中
public class PooledStreamCompressor {
public static void main(String[] args) throws Exception {
String codecClassname = args[0];
Class<?> codecClass = Class.forName(codecClassname);
Configuration conf = new Configuration();
CompressionCodec codec = (CompressionCodec)
ReflectionUtils.newInstance(codecClass, conf);
Compressor compressor = null;
try {
compressor = CodecPool.getCompressor(codec);
CompressionOutputStream out =
codec.createOutputStream(System.out, compressor);
IOUtils.copyBytes(System.in, out, 4096, false);
out.finish();
} finally {
CodecPool.returnCompressor(compressor);
}
}
}

对于给定的 CompressionCodec,我们可以从编码池中获取压缩实例。然后在CompressionCodec的 createOutputStream() 方法中使用这个压缩实例。通过使用finally块,我们就可以确保,这个压缩实例在使用完以后能够回到编码池中,即使在从标准输入流复制数据到标准输出流中时,出现了IOException异常。

未完待续………