Hadoop学习笔记之---Hadoop I/O,
1. 数据完整性:任何语言对IO的操作都要保持其数据的完整性。hadoop当然希望数据在存储和处理中不会丢失或损坏。检查数据完整性的常用方法是校验和。
- HDFS的数据完整性:客户端在写或者读取HDFS的文件时,都会对其进行校验和验证,当然我们可以通过在Open()方法读取之前,将false传给FileSystem中的setVerifyCheckSum()来禁用校验和。
- 本地文件系统,hadoop的本地文件系统执行客户端校验,这意味着,在写一个filename文件时,文件系统的客户端以透明方式创建了一个隐藏的文件.filename.crc,块的大小做为元数据存于此,所以读取文件时会进行校验和验证。
- ChecksumFileSystem:可以通过它对其数据验证。
2. 压缩:压缩后能够节省空间和减少网络中的传输。所以在hadoop中压缩是非常重要的。hadoop的压缩格式
| 压缩格式 | 算法 | 文件扩展名 | 多文件 | 可分割性 |
| DEFLATEa | DEFLATE | .deflate | no | no |
| gzip(zip) | DEFLATE | .gz(.zip) | no(yes) | no(yes) |
| bzip2 | bzip2 | .bz2 | no | yes |
| LZO | LZO | .lzo | no | no |
- 编码/解码
- /**
- * @param args
- */
- public static void main(String[] args) throws Exception
- {
- // TODO Auto-generated method stub
- String codecClassname = args[0];
- Class<?> codecClass = Class.forName(codecClassname);
- Configuration configuration = new Configuration();
- CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, configuration);
- CompressionOutputStream outputStream = codec.createOutputStream(System.out);
- IOUtils.copyBytes(System.in, outputStream, 4096,false);
- outputStream.finish();
- }
- 压缩和分割:因为HDFS默认是以块的来存储数据的,所以在压缩时考虑是否支持分割时非常重要的。
- 在MapReduce使用压缩:例如要压缩MapReduce作业的输出,需要将配置文件中mapred.output.compress的属性设置为true
- public static void main(String[] args) throws IOException {
- if (args.length != 2) {
- System.err.println("Usage: MaxTemperatureWithCompression <input path> " +
- "<output path>");
- System.exit(-1);
- }
- JobConf conf = new JobConf(MaxTemperatureWithCompression.class);
- conf.setJobName("Max temperature with output compression");
- FileInputFormat.addInputPath(conf, new Path(args[0]));
- FileOutputFormat.setOutputPath(conf, new Path(args[1]));
- conf.setOutputKeyClass(Text.class);
- conf.setOutputValueClass(IntWritable.class);
- /*[*/conf.setBoolean("mapred.output.compress", true);
- conf.setClass("mapred.output.compression.codec", GzipCodec.class,
- CompressionCodec.class);/*]*/
- conf.setMapperClass(MaxTemperatureMapper.class);
- conf.setCombinerClass(MaxTemperatureReducer.class);
- conf.setReducerClass(MaxTemperatureReducer.class);
- JobClient.runJob(conf);
- }
3.序列化:将字节流和机构化对象的转化。hadoop是进程间通信(RPC调用),PRC序列号结构特点:紧凑,快速,可扩展,互操作,hadoop使用自己的序列化格式Writerable,
-
Writerable接口:
- package org.apache.hadoop.io;
- import java.io.DataOutput;
- import java.io.DataInput;
- import java.io.IOException;
- public interface Writable {
- void write(DataOutput out) throws IOException;// 将序列化流写入DataOutput
- void readFields(DataInput in) throws IOException; //从DataInput流读取二进制
- }
- package WritablePackage;
- import java.io.ByteArrayInputStream;
- import java.io.ByteArrayOutputStream;
- import java.io.DataInputStream;
- import java.io.DataOutputStream;
- import java.io.IOException;
- import org.apache.hadoop.io.Writable;
- import org.apache.hadoop.util.StringUtils;
- import org.hsqldb.lib.StringUtil;
- public class WritableTestBase
- {
- public static byte[] serialize(Writable writable) throws IOException
- {
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- DataOutputStream dataOutputStream = new DataOutputStream(outputStream);
- writable.write(dataOutputStream);
- dataOutputStream.close();
- return outputStream.toByteArray();
- }
- public static byte[] deserialize(Writable writable,byte[] bytes) throws IOException
- {
- ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
- DataInputStream dataInputStream = new DataInputStream(inputStream);
- writable.readFields(dataInputStream);
- dataInputStream.close();
- return bytes;
- }
- public static String serializeToString(Writable src) throws IOException
- {
- return StringUtils.byteToHexString(serialize(src));
- }
- public static String writeTo(Writable src, Writable des) throws IOException
- {
- byte[] data = deserialize(des, serialize(src));
- return StringUtils.byteToHexString(data);
- }
- }
Writerable 类
Java primitive Writable implementation Serialized size (bytes)
boolean BooleanWritable 1
byte ByteWritable 1
int IntWritable 4
VIntWritable 1–5
float FloatWritable 4
long LongWritable 8
VLongWritable 1–9
4. 基于文件的数据结构
- SequenceFile类:是二进制键/值对提供一个持久化的数据结构。SequenceFile的读取和写入。
- package WritablePackage;
- import java.io.IOException;
- import java.net.URI;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IOUtils;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.SequenceFile;
- public class SequenceFileWriteDemo
- {
- private static final String[] DATA = {
- "One, two, buckle my shoe",
- "Three, four, shut the door",
- "Five, six, pick up sticks",
- "Seven, eight, lay them straight",
- "Nine, ten, a big fat hen"
- };
- /**
- * @param args
- * @throws IOException
- */
- public static void main(String[] args) throws IOException
- {
- // TODO Auto-generated method stub
- String url = args[0];
- Configuration conf = new Configuration();
- FileSystem fs = FileSystem.get(URI.create(url),conf);
- Path path = new Path(url);
- IntWritable key = new IntWritable();
- Text value = new Text();
- SequenceFile.Writer writer =null;
- try
- {
- writer = SequenceFile.createWriter(fs,conf,path,key.getClass(),value.getClass());
- for(int i = 0; i< 100; i++)
- {
- key.set(100-i);
- value.set(DATA[i%DATA.length]);
- System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
- writer.append(key,value);
- }
- }
- catch (Exception e)
- {
- // TODO: handle exception
- }
- finally
- {
- IOUtils.closeStream(writer);
- }
- }
- }
- package WritablePackage;
- import java.io.IOException;
- import java.net.URI;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IOUtils;
- import org.apache.hadoop.io.SequenceFile;
- import org.apache.hadoop.io.Writable;
- import org.apache.hadoop.util.ReflectionUtils;
- public class SequenceFileReadDemo
- {
- /**
- * @param args
- * @throws IOException
- */
- public static void main(String[] args) throws IOException
- {
- // TODO Auto-generated method stub
- String url = args[0];
- Configuration conf = new Configuration();
- FileSystem fs = FileSystem.get(URI.create(url),conf);
- Path path = new Path(url);
- SequenceFile.Reader reader = null;
- try
- {
- reader = new SequenceFile.Reader(fs,path,conf);
- Writable key =(Writable)ReflectionUtils.newInstance(reader.getKeyClass(), conf);
- Writable value =(Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
- long position = reader.getPosition();
- while(reader.next(key,value))
- {
- String syncSeen = reader.syncSeen()? "*":"";
- System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key, value);
- position = reader.getPosition(); // beginning of next record
- }
- }
- finally
- {
- IOUtils.closeStream(reader);
- }
- }
- }
- MapFile 是经过排序的带索引的sequenceFile,可以根据键值进行查找,MapFile可以被任务是java.util.map一种持久化形式。注意它必须按顺序添加条目。
本站文章为和通数据库网友分享或者投稿,欢迎任何形式的转载,但请务必注明出处.
同时文章内容如有侵犯了您的权益,请联系QQ:970679559,我们会在尽快处理。