Hadoop源码学习-以创建目录为例,hadoop源码
之前提到,在终端输入命令hadoop fs -mkdir dir时,最后是转换成运行JAVA程序,执行类FsShell,并传递相应的参数。
在类FsShell里的执行过程
类FsShell是使用命令hadoop fs时执行的类,它的功能就是:运行一个通用文件系统客户端,能够对文件系统进行相关操作。
FsShell类的main方法如下:
/**
* main() has some simple utility methods
* @param argv the command and its arguments
* @throws Exception upon error
*/
public static void main(String argv[]) throws Exception {
FsShell shell = newShellInstance(); //创建实例
Configuration conf = new Configuration();
conf.setQuietMode(false);
shell.setConf(conf);
int res;
try {
// 解析参数,并执行命令
res = ToolRunner.run(shell, argv);
} finally {
shell.close();
}
System.exit(res);
}
在主函数中,先是创建一个FsShell实例,然后进行参数解析,最后运行命令。其中,类ToolRunner的run方法的源代码如下:
public static int run(Configuration conf, Tool tool, String[] args)
throws Exception{
if(conf == null) {
conf = new Configuration();
}
GenericOptionsParser parser = new GenericOptionsParser(conf, args);
//set the configuration back, so that Tool can configure itself
tool.setConf(conf);
//get the args w/o generic hadoop args
String[] toolArgs = parser.getRemainingArgs();
return tool.run(toolArgs);
}
public static int run(Tool tool, String[] args)
throws Exception{
return run(tool.getConf(), tool, args);
}
类ToolRunner的run方法主要做了两件事情:
1)进行配置解析,即解析参数;
2)执行命令。虽然最后执行的是tool.run(toolArgs),但FsShell类实现了Tool接口,因此,这里执行的run方法,实际上就是在执行类FsShell里的run方法,此方法源码如下:
@Override
public int run(String argv[]) throws Exception {
// initialize FsShell
init();
int exitCode = -1;
if (argv.length < 1) {
printUsage(System.err);
} else {
String cmd = argv[0];
Command instance = null;
try {
instance = commandFactory.getInstance(cmd);
if (instance == null) {
throw new UnknownCommandException();
}
exitCode = instance.run(Arrays.copyOfRange(argv, 1, argv.length));
} catch (IllegalArgumentException e) {
displayError(cmd, e.getLocalizedMessage());
if (instance != null) {
printInstanceUsage(System.err, instance);
}
} catch (Exception e) {
// instance.run catches IOE, so something is REALLY wrong if here
LOG.debug("Error", e);
displayError(cmd, "Fatal internal error");
e.printStackTrace(System.err);
}
}
return exitCode;
}
在这个run方法中,也同样是做了两件事:
1)通过CommandFactory工厂得到一个实例;
2)执行实例的run方法,并返回退出代码。
CommandFactory工厂负责将命令转换成类,例如,当我们在终端输入的是hadoop fs -mkdir dir时,这个工厂会将命令mkdir转换成类Mkdir。不同的命令会转换成不同的类,其后,会调用文件系统的相应方法进行操作,根据不同的情况,使用的文件系统也可能不一样,这与配置文件相关,会实例化相应的文件系统。具体的操作是在文件系统代码中实现的。
例如,以下是分布式文件系统public class DistributedFileSystem extends FileSystem,其实,这个就是HDFS.
在类Mkdir里的执行方法
接下来执行的是Mkdir类的processNonexistentPath方法。
processNonexistentPath方法源码如下:
protected void processNonexistentPath(PathData item) throws IOException {
// check if parent exists. this is complicated because getParent(a/b/c/) returns a/b/c, but
// we want a/b
if (!item.fs.exists(new Path(item.path.toString()).getParent()) && !createParents) {
throw new PathNotFoundException(item.toString());
}
if (!item.fs.mkdirs(item.path)) {
throw new PathIOException(item.toString());
}
}
这个方法的功能是:
判断这个目录的父目录是否存在,如果不存在且不创建父目录,则抛出PathNotFoundException异常,即没有找到路径。
否则,通过方法item.fs.mkdirs()创建目录。
PathData类的成员fs是抽象类FileSystem的实现类,其中,DistributedFileSystem类就是抽象类FileSystem的一个实现类,如下:
public class DistributedFileSystem extends FileSystem
但在上述mkdirs()方法调用的时候,具体使用哪个文件系统,取决于配置文件里的配置。
在类DistributedFileSystem里的执行方法
调用mkdirs()方法之后,最后会调用类DistributedFileSystem的mkdirsInternal方法,如下:
private boolean mkdirsInternal(Path f, final FsPermission permission,
final boolean createParent) throws IOException {
statistics.incrementWriteOps(1); // 更新统计数据,这个统计数据用于跟踪到目前为止在这个文件系统中有多少次读和写操作。这里统计的是写操作。
Path absF = fixRelativePart(f); // 将相对路径转换成绝对路径。
// 创建一个`FileSystemLinkResolver`类的对象,并重写此类的`doCall`和`next`方法。
// 然后调用对象的`resolve`方法。
return new FileSystemLinkResolver<Boolean>() {
@Override
public Boolean doCall(final Path p)
throws IOException, UnresolvedLinkException {
return dfs.mkdirs(getPathName(p), permission, createParent);
}
@Override
public Boolean next(final FileSystem fs, final Path p)
throws IOException {
// FileSystem doesn't have a non-recursive mkdir() method
// Best we can do is error out
if (!createParent) {
throw new IOException("FileSystem does not support non-recursive"
+ "mkdir");
}
return fs.mkdirs(p, permission);
}
}.resolve(this, absF);
}
resolve方法的功能是:
1) 尝试使用指定的文件系统和路径,去执行doCall方法。
2) 如果doCall方法调用失败,则尝试重新解析路径,然后执行next方法。
doCall方法
在doCall方法中,调用的是dfs.mkdirs(getPathName(p), permission, createParent)方法,dfs是一个DFSClient类的对象。
DFSClient可以连接到一个Hadoop文件系统,并执行一些基本的文件任务。它使用ClientProtocol协议与NameNode守护进程进行通信,能够直接连接到DataNodes读/写数据块。
Hadoop DFS用户应该获取一个DistributedFileSystem类的实例,这个实例使用DFSClient来处理文件系统任务。
执行类DFSClient里的方法
在这里,调用了DFSClient类的mkdirs方法,此方法的源代码如下:
public boolean mkdirs(String src, FsPermission permission,
boolean createParent) throws IOException {
if (permission == null) {
permission = FsPermission.getDefault();
}
FsPermission masked = permission.applyUMask(dfsClientConf.uMask);
return primitiveMkdir(src, masked, createParent);
}
此方法实现的功能就是:使用给出的名字和权限,创建目录。
接下来,我们看看到底是怎么创建目录的,我们跟踪方法primitiveMkdir的内部实现,此方法的源码如下:
public boolean primitiveMkdir(String src, FsPermission absPermission,
boolean createParent)
throws IOException {
checkOpen(); // 检查客户端是否正在运行,即检查dfsClient是否运行,也相当于检查DistributedFileSystem(DFS)是否初始化,因此在DFS初始化的时候,会初始化dfsClient。
if (absPermission == null) {
absPermission =
FsPermission.getDefault().applyUMask(dfsClientConf.uMask);
}
if(LOG.isDebugEnabled()) {
LOG.debug(src + ": masked=" + absPermission);
}
TraceScope scope = Trace.startSpan("mkdir", traceSampler);
try {
// 创建目录
return namenode.mkdirs(src, absPermission, createParent);
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
InvalidPathException.class,
FileAlreadyExistsException.class,
FileNotFoundException.class,
ParentNotDirectoryException.class,
SafeModeException.class,
NSQuotaExceededException.class,
DSQuotaExceededException.class,
UnresolvedPathException.class,
SnapshotAccessControlException.class);
} finally {
scope.close();
}
}
可以看到,实际上创建目录的任务是由namenode完成的。