Hadoop文件合并,
转自:http://acesdream.blog.51cto.com/10029622/1625442
整个代码的工作就是把本地目录下个若干个小文件,合并成一个较大的文件,写入到HDFS中。话不多说,代码如下:
补充说明:后来发现,书上的源代码是没有问题的,只不过是书上的源代码要打成jar包,放在集群的机器上去运行,如果在Eclipse下面调试运行的话,就会出现问题。出现问题的原因如下
| 1 2 3 4 5 6 7 |
//读取本地文件系统,如果要想正确运行,必须要打成jar包,在hadoop集群的机器上面运行
FileSystem hdfs = FileSystem.get(conf);
FileSystem local = FileSystem.getLocal(conf);
//通过URI可以远程读取HDFS,所以Eclipse下面调试要使用这种写法,打成jar包这种形式也是可以的
FileSystem hdfs = FileSystem.get(URI.create(serverPath), conf);
FileSystem local = FileSystem.getLocal(conf);
|
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
package com.hadoop.examples;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
* @Package
* @ClassName: PutMerge
* @Description: 读取本地目录下的文件,写入到HDFS,在写入的过程中,
* 把这三个文件合成一个文件
* @author lxy
* @date 2015年3月25日 上午9:59:38
* @version V1.0
*/
public class PutMerge {
public static void main(String[] args) throws IOException {
// 输入目录,目录下有三个txt,文章最后面会儿给出文件内容
String localPathStr = "E:\\test";
// 输出目录,HDFS路径,文章最后面会给出合并之后的文件内容
String serverPath =
"hdfs://192.168.3.57:8020/user/lxy/mergeresult/merge.txt";
//输入目录,是一个本地目录
Path inputDir = new Path(localPathStr);
//输出目录,是一个HDFS路径
Path hdfsFile = new Path(serverPath);
Configuration conf = new Configuration();
/**
* Hadoop in Action的原代码如下
* FileSystem hdfs = FileSystem.get(conf);
* 但是这样的话,使用Eclipse调试时,执行下面的语句是就会报异常,因为它是读取本地
* 文件系统
* FSDataOutputStream out = hdfs.create(hdfsFile);
*/
// 根据上面的serverPath,获取到的是一个org.apache.hadoop.hdfs.DistributedFileSystem对象
FileSystem hdfs = FileSystem.get(URI.create(serverPath), conf);
FileSystem local = FileSystem.getLocal(conf);
try {
//获取输入目录下的文件以及文件夹列表
FileStatus[] inputFiles = local.listStatus(inputDir);
//在hdfs上创建一个文件
FSDataOutputStream out = hdfs.create(hdfsFile);
for (int i = 0; i < inputFiles.length; i++) {
System.out.println(inputFiles[i].getPath().getName());
//打开本地输入流
FSDataInputStream in = local.open(inputFiles[i].getPath());
byte buffer[] = new byte[256];
int bytesRead = 0;
while ((bytesRead = in.read(buffer)) > 0) {
//往hdfs上的文件写数据
out.write(buffer, 0, bytesRead);
}
//释放资源
in.close();
}
//释放资源
out.close();
}
catch (IOException e) {
e.printStackTrace();
}
}
}
|
我的测试目录下有三个txt文件
1.txt
| 1 2 3 4 5 6 7 |
1 hello Hadoop
2 hello Hadoop
3 hello Hadoop
4 hello Hadoop
5 hello Hadoop
6 hello Hadoop
7 hello Hadoop
|
2.txt
| 1 2 3 4 5 6 7 |
8 hello Hadoop
9 hello Hadoop
10 hello Hadoop
11 hello Hadoop
12 hello Hadoop
13 hello Hadoop
14 hello Hadoop
|
3.txt
| 1 2 3 4 5 6 7 |
15 hello Hadoop
16 hello Hadoop
17 hello Hadoop
18 hello Hadoop
19 hello Hadoop
20 hello Hadoop
21 hello Hadoop
|
合并之后的文件如下所示:
本站文章为和通数据库网友分享或者投稿,欢迎任何形式的转载,但请务必注明出处.
同时文章内容如有侵犯了您的权益,请联系QQ:970679559,我们会在尽快处理。