【Hadoop权威指南】Hadoop I/O,
Hadoop I/O
Hadoop自带一套原子操作用于数据I/O。
数据完整性
检测数据是否损坏的常见措施是,在数据第一次引入系统时计算校验和,并在数据通过一个不可靠的通道进行传输时再次计算检验和。如果计算所得的新校验和不匹配,那么就认为数据已损坏。该技术只能检测却不能修复数据。
注意,校验和也可能损坏。常用的错误检测码是:循环冗余校验,任何大小的数据输入均计算得到一个32位的整数校验和。
HDFS的数据完整性
HDFS会对写入的所有数据计算校验和,并在读取数据时验证校验和。
datanode负责在验证收到的数据后存储数据及其校验和。它在收到客户端的数据或复制期间其他datanode的数据时执行这个操作。
客户端从datanode读取数据时,也要验证校验和。每个datanode均持久保存一个用于验证的检验和日志。
每个datanode也会在一个后台线程中运行一个datablockScanner,从而定期验证存储在这个datanode上是所有数据块。
由于HDFS存储每个数据块的复本,因此它可以复制完好的数据复本来修复损坏的数据块。客户端在读取数据块时,如果检测到错误,就向namenode报告已损坏的数据块及其正在尝试读操作的这个datanode,最后才抛出ChecksumException。namenode将这个已损坏的数据块的复本标记为已损坏。之后,它安排这个数据块的一个复本复制到另一个datanode。此后,已损坏的数据块复本被删除。
LocalFileSytem
Hadoop的LocalFileSystem执行客户端的校验和验证。在读取文件时需要验证校验和,如果检测到错误,LocalFileSystem将抛出一个ChecksumException。
如果要禁用校验和,使用RawLocalFileSystem代替LocalFileSystem。
ChecksumFileSystem
LocalFileSystem通过ChecksumFileSystem来完成自己的任务,ChecksumFileSystem继承自FileSystem类。
压缩
压缩:压缩后能够节省空间和减少网络中的传输。所以在hadoop中压缩是非常重要的。
codec实现了一种压缩-解压缩算法。在Hadoop中,对一个CompressionCodec接口的一个实现代表一个codec。
Compression format Hadoop CompressionCodec
DEFLATE org.apache.hadoop.io.compress.DefaultCodec
gzip org.apache.hadoop.io.compress.GzipCodec
bzip2 org.apache.hadoop.io.compress.BZip2Codec
LZO com.hadoop.compression.lzo.LzopCodec
可以用ComressionCodec轻松的压缩和解压缩。我们可以用CompressionOutput创建一个CompressionOutputStream(未压缩的数据写到此)。相反,可以用compressionInputStream进行解压缩。
public static void main(String[] args) throws Exception
{
// TODO Auto-generated method stub
String codecClassname = args[0];
Class<?> codecClass = Class.forName(codecClassname);
Configuration configuration = new Configuration();
CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, configuration);
CompressionOutputStream outputStream = codec.createOutputStream(System.out);
IOUtils.copyBytes(System.in, outputStream, 4096,false);
outputStream.finish();
}
通过使用getCodec()方法,CompressionCodecFactory提供了一种方法可以将文件扩展名映射到一个CompressionCodec,该方法去文件的Path对象作为参数。
压缩和切分
因为HDFS默认是以块的来存储数据的,所以在压缩时考虑是否支持分割时非常重要的。
在MapReduce使用压缩:例如要压缩MapReduce作业的输出,需要将配置文件中mapred.output.compress的属性设置为true,mapred.output.compression.codec属性设置为打算使用的的压缩codec的类名。
//对查找最高气温作业所产生输出进行压缩
public static void main(String[] args) throws IOException {
if (args.length != 2) {
System.err.println("Usage: MaxTemperatureWithCompression <input path> " +
"<output path>");
System.exit(-1);
}
JobConf conf = new JobConf(MaxTemperatureWithCompression.class);
conf.setJobName("Max temperature with output compression");
FileInputFormat.addInputPath(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
/*[*/conf.setBoolean("mapred.output.compress", true);
conf.setClass("mapred.output.compression.codec", GzipCodec.class,
CompressionCodec.class);/*]*/
conf.setMapperClass(MaxTemperatureMapper.class);
conf.setCombinerClass(MaxTemperatureReducer.class);
conf.setReducerClass(MaxTemperatureReducer.class);
JobClient.runJob(conf);
}
对map任务的输出进行压缩,采用LZO这样的快速压缩方式,可以获得性能的提升。
序列化
序列化,就是讲结构化对象转化为字节流,以便在网络上传输或者写到磁盘永久存储。反序列化是将字节流转回结构化对象的逆过程。序列化常用于:进程间通信和永久存储。
Hadoop中,系统中多个节点上进程间的通信是通过“远程过程调用”RPC实现的。RPC协议将消息序列化为二进制流后发送到远程节点,远程节点接着反序列化。
Hadoop使用自己的序列化格式Writable。
Writable接口定义了两个方法:一个将其状态写入到DataOutput二进制流,另一个从DataInput二进制流读取其状态,
public interface Writable {
void write(DataOutput out) throws IOException;
void readFields(DataInput in) throws IOException;
}
Hadoop自带的org.apache.hadoop.io 包中有广泛的Writable类供选择。
Apache Avro是一个独立于编程语言的数据序列化系统。
1. Avro规范精确定义所有实现必须支持的二进制格式
2. 有丰富的数据模式解析能力
3. 为一系列对象指定一个对象容器格式
4. 还可用于RPC
基于文件的数据结构
对于基于MapReduce的数据处理,将每个二进制数据的大对象融入自己的文件中并不能实现很高的可扩展性,针对上述情况,Hadoop开发了一组更高层次的容器。
SequenceFile
记录文本等顺序文件。
//写入SequenceFile对象
public class SequenceFileWriteDemo {
private static final String[] DATA = {
"One, two, buckle my shoe",
"Three, four, shut the door",
"Five, six, pick up sticks",
"Seven, eight, lay them straight",
"Nine, ten, a big fat hen"
};
public static void main(String[] args) throws IOException {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path path = new Path(uri);
IntWritable key = new IntWritable();
Text value = new Text();
SequanceFile.Writer writer = null;
try {
writer = SequenceFile.createWriter(fs, conf, path,
key.getClass(), value.getClass());
for(int i = 0; i < 100; i++) {
key.set(100 - i);
value.set(DATA[i%DATA.length]);
System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
writer.append(key, value);
}
} finally {
IOUtils.closeStream(writer);
}
}
}
//读取SequenceFile
public class SequenceFileReadDemo {
public static void main(String[] args) throws IOException {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path path = new Path(uri);
SequenceFile Reader reader = null;
try {
reader = new SequenceFile.Reader(fs, path, conf);
Writable key = (Writable)
ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Writable value = (Writable)
ReflectionUtils.newInstance(reader.getValueClass(), conf);
long posiotion = reader.getPosition();
while(reader.next(key, value)) {
String syncSeen = reader.syncSeen()?"*":"";
System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key, value);
position = reader.getPosition();
}
} finally {
IOUtils.closeStream(reader);
}
}
}
MapFile
MapFile是已经排序的SequenceFile,它已加入用于搜索键的索引。
对MapFile的写入和读取都类似与SequenceFile