Hadoop中的Java接口---读取数据,hadoopjava接口---
从Haddoop URL读取数据
要从Hadoop文件系统读取文件,最简单的方法使用java.net.URL对象打开数据流,从中读取数据。
InputStream in = null;
try{
in = new URL("hdfs://host/path").openStream();
}finally{
IOUtils.closeStream(in);
}
让Java程序能够识别Hadoop的hdfs URL还需要通过FsUrlStreamHandlerFactory实例调用java.net.URL的setURLStreamHandlerFactory方法。每个Java虚拟机只能调用一次这个方法。
public class URLCat{
static {
URL.setStreamHandlerFactory(new FsUrlStreamHandlerFactory());
}
public static void main(String args[])throws Exception{
InputStream in = null;
try{
in = new URL(args[0]).openStream();
IOUtils.copyBytes(in, System.out, 4096, false);
}finally{
IOUtils.closeStream(in);
}
}
}
我们使用Hadoop提供的IOUtils类。copyBytes方法将InputStream中数据复制到系统标准输出中,即重定向输出到命令行。cpoyBytes方法的最后两个参数,第一个设置用于复制的缓冲区大小,第二个设置复制结束后是否关闭数据流。
通过FileSystemAPI读取数据
Hadoop文件系统中通过Hadoop Path对象来代表文件。可以将路径视为一个Hadoop文件系统URI,如hdfs://localhost/user/tom/quangle.txt
FileSystem是一个通用的文件API。获取FileStream实例有下面几个静态工程方法
public static FileSystem get(Configuration conf)throws IOException
public static FileSystem get(URI uri, Configuration conf)throws IOException
public static FileSystem get(URI uri, Configuration conf, String user)throws IOException
Configuration对象封装了客户端或服务器的配置,通过设置配置文件读取类路径来实现(如 conf/core-site.xml)
- 第一个方法返回的是默认文件系统(在
conf/core-site.xml中指定的,如果没有指定,则使用默认的本地文件系统) - 第二个方法通过给定的URI方案和权限来确定要使用的文件系统,如果给定URI中没有指定方案,则返回默认文件系统
- 第三个方法作为给定用户来访问文件系统,对安全来说是至关重要的
有了FileSystem实例之后,我们调用open()函数来获取文件的输入流。第一个方法使用默认的缓冲区大小4KB
public FSDataInputStream open(Path f) throws IOException
public abstract FSDataInputStream open(Path f, int bufferSize)throws IOException
我们重写前面的例子:
public class FileSystemCat{
public static void main(String args[])throws Exception{
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
InputStream in = null;
try{
in = fs.open(new Path(uri));
IOUtils.copyBytes(in, System.out, 4096, false);
}finally{
IOUtils.closeStream(in);
}
}
}
FSDateInputStream对象
FileSystem对象中的open()方法返回的是FSDataInputStream对象,而不是标准的java.io类对象。这个类是继承了java.io.DataInputStream接口的一个特殊类,并支持随机访问,由此可以从流的任意位置读取数据
public class FSDataInputStream extends DataInputStream
implements Seekable, PositionedReadable
Seekable接口支持在文件中找到指定位置,并提供一个查询当前位置相对于文件起始位置偏移量(getPos())的查询方法
public interface Seekable{
void seek(long pos) throws IOException;
long getPos() throws IOException;
boolean seekToNewSource(long targetPos)throws IOException
}
seek()可以移动到文件中任意一个绝对位置,java.io.InputStream的skip()则只能相对于当前位置定位到另一个位置。seek()方法是一个相对高开销的操作,需要慎重使用。
public interface PositionReadable{
public int read(long position, byte[] buffer, int offset, int length) throws IOException
public void readFully(long position, byte[] buffer, int offset, int length) throws IOException
public void readFully(long postion, byte[] buffer)throws IOException
FSDataInputStream类也实现了PositionReadable接口,从一个指定偏移量出读取文件的一部分。
read()方法从文件的指定position处读取至多为length字节的数据并出入缓冲区buffer的指定偏离量offset处。返回值是实际读到的字节数:调用者需要检查这个值,它有可能小于指定的length长度readFully()方法将指定length长度的字节数数据读取到buffer中,除非已经读取到文件末尾,这种情况下将跑出EOFException异常