Hadoop源代码分析之Hadoop RPC(Server),
1. Server :Hadoop RPC Server的实现,这是一个抽象类,只有一个抽象方法public abstract Writable call(Class<?> protocol,Writable param, long receiveTime) throws IOException; 具体的实现在RPC.Server中
其中又包括5个类 :
Call:用于存储客户端发来的请求
Listener: 监听类,用于监听客户端发来的请求,把数据封装成Call对象,添加到callQueue队列中。
Handler:请求处理类,会循环阻塞读取callQueue中的call对象,并对其进行操作。
Responder:响应RPC请求类,请求处理完毕,由Responder发送给请求客户端。
Connection:连接类,真正的客户端请求读取逻辑在这个类中。其中Listener,Handler,Responder都继承了Thread,在服务器启动时同时启动这三个线程,下面看这个三个线程的run方法
Listener的关键代码:其实和一般的NIO服务器ServerChannal写法差不多。
public void run() {
...
while (running) {
SelectionKey key = null;
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
try {
if (key.isValid()) {
if (key.isAcceptable())
doAccept(key); //创建Connection对象,并把Connection做为key的attachment以便下面的doRead方法使用
else if (key.isReadable())
doRead(key);//主要调用Connection的readAndProcess方法,读取客户端发送过来的数据进行处理,分为三个步骤
//1.读取IPC连接魔数和协议版本号并完成版本检查
//2.进行连接头检查,主要调用了两个函数processHeader():保证服务器实现了IPC接口和获取用户信息
authorize():保证用户有相关的权限访问远程接口
//3.调用processData()方法处理数据,主要是新建一个Call对象,读取数据填充这个Call的成员变量,并加入到
callQueue队列中
}
} catch (IOException e) {
}
key = null;
}
}
...
}
Handler才是真正执行客户端发过来的远程调用,其关键代码如下:
public void run() {
...
while (running) {
try {
final Call call = callQueue.take(); // pop the queue; maybe blocked here,从callQueue队列中取出call对象进行处理 ... //Subject.doAs()是java的鉴权与授权服务(JAAS)中的方法
value = Subject.doAs(call.connection.user,
new PrivilegedExceptionAction<Writable>() {
@Override
public Writable run() throws Exception {
return call(call.connection.protocol,call.param, call.timestamp); /**调用Server的那个抽象方法(当然是调用它子类的实现啦 (RPC.Server.call()))。其实现的关键代码如下: public Writable call(Class<?> protocol, Writable param, long receivedTime)throws IOException {
try {
Invocation call = (Invocation)param; Method method =protocol.getMethod(call.getMethodName(),call.getParameterClasses());
method.setAccessible(true); Object value = method.invoke(instance, call.getParameters());
... return new ObjectWritable(method.getReturnType(), value); } catch (InvocationTargetException e) {
... } 这个就是我们常用的java反射调用方法啦 **/
}
}
);
}
... setupResponse(buf, call,
(error == null) ? Status.SUCCESS : Status.ERROR,
value, errorClass, error); //把返回结果储存在这个Call中的response
responder.doRespond(call);//把数据回写给客户端,其中会调用一个Response中一个非常重要的方法processResponse() //这个方法只在通道空闲时响应(要处理的Call队列长度为1),忙时不会响应,而是交个Responder //进行集中处理和响应
}
...
} Responder的关键代码: public void run() {
... while (running) {
try {
waitPending(); // If a channel is being registered, wait.
writeSelector.select(PURGE_INTERVAL);
Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
try {
if (key.isValid() && key.isWritable()) {
doAsyncWrite(key);//把数据写到缓存区从而发送给客户端,同样调用了processResponse()方法
}
} catch (IOException e) {
... } 下面来看看processResponse()方法 private boolean processResponse(LinkedList<Call> responseQueue,
boolean inHandler) throws IOException {
boolean error = true;
boolean done = false; // 该通道上没有数据要发送
int numElements = 0;
Call call = null;
try {
synchronized (responseQueue) {
// If there are no items for this channel, then we are done
numElements = responseQueue.size();
if (numElements == 0) {
error = false;
return true; // no more data for this channel.
}
call = responseQueue.removeFirst();
... //异步写尽可能多的数据
int numBytes = channelWrite(channel, call.response);
if (numBytes < 0) {
return true;
}
if (!call.response.hasRemaining()) { //应答数据已经写完
call.connection.decRpcCount();
if (numElements == 1) { // last call fully processes.
done = true; // no more data for this channel.
} else {
done = false; // more calls pending to be sent.
}
... } else {
//应答数据没有写完,插入队列头,等待再次发送
call.connection.responseQueue.addFirst(call);
if (inHandler) { //不在Response线程中,在Handler线程中,前面说过当通道空闲时,Handler线程也会调用这个方法往通道中写数据,同样如果 //数据没写完,就需要交给Responder处理,这是就需要把次没写完的数据标记为Responder感兴趣的事件,等待Responder的 //Selector选择出来并处理
// set the serve time when the response has to be sent later
call.timestamp = System.currentTimeMillis();
//成员变量peding++,该变量表示现在有多少个线程在进行通道注册
incPending();
try {
// 唤醒在select()方法上等待的Responder线程 writeSelector.wakeup(); //这样才能调用这个注册方法进行注册
channel.register(writeSelector, SelectionKey.OP_WRITE, call);
} catch (ClosedChannelException e) {
//Its ok. channel might be closed else where.
done = true;
} finally {
decPending();
}
}
... }
error = false; // everything went off well
}
} finally {
... }
return done;
}
我这里只是拿了Hadoop RPC Server最核心的三个类的最核心代码拿出来分析了一下,当然这个Server还包括了大量的异常处理和超时处理,这些都在源代码中有体现,我就不在多说了 -----亲爱的
本站文章为和通数据库网友分享或者投稿,欢迎任何形式的转载,但请务必注明出处.
同时文章内容如有侵犯了您的权益,请联系QQ:970679559,我们会在尽快处理。