欢迎投稿

今日深度:

Hadoop源代码分析之Hadoop RPC(Server),

Hadoop源代码分析之Hadoop RPC(Server),


1. Server :Hadoop  RPC Server的实现,这是一个抽象类,只有一个抽象方法  
public abstract Writable call(Class<?> protocol,Writable param, long receiveTime)  throws IOException; 具体的实现在RPC.Server中
其中又包括5个类 :
Call:用于存储客户端发来的请求
Listener: 监听类,用于监听客户端发来的请求,把数据封装成Call对象,添加到callQueue队列中。
Handler:请求处理类,会循环阻塞读取callQueue中的call对象,并对其进行操作。
Responder:响应RPC请求类,请求处理完毕,由Responder发送给请求客户端。
Connection:连接类,真正的客户端请求读取逻辑在这个类中。其中Listener,Handler,Responder都继承了Thread,在服务器启动时同时启动这三个线程,下面看这个三个线程的run方法

Listener的关键代码:其实和一般的NIO服务器ServerChannal写法差不多。
public void run() {
      ...
      while (running) {
        SelectionKey key = null;
          selector.select();
          Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
          while (iter.hasNext()) {
            key = iter.next();
            iter.remove();
            try {
              if (key.isValid()) {
                if (key.isAcceptable())
                  doAccept(key); //创建Connection对象,并把Connection做为key的attachment以便下面的doRead方法使用                   
                else if (key.isReadable())
                  doRead(key);//主要调用Connection的readAndProcess方法,读取客户端发送过来的数据进行处理,分为三个步骤
                                         //1.读取IPC连接魔数和协议版本号并完成版本检查
                                         //2.进行连接头检查,主要调用了两个函数processHeader():保证服务器实现了IPC接口和获取用户信息
                                              authorize():保证用户有相关的权限访问远程接口                
                                         //3.调用processData()方法处理数据,主要是新建一个Call对象,读取数据填充这个Call的成员变量,并加入到
                                              callQueue队列中  
  
              }
            } catch (IOException e) {
            }
            key = null;
          }
        } 
        ...
     
}

Handler才是真正执行客户端发过来的远程调用,其关键代码如下:
public void run() {
      ...
      while (running) {
        try {
          final Call call = callQueue.take(); // pop the queue; maybe blocked here,从callQueue队列中取出call对象进行处理
         ...          //Subject.doAs()是java的鉴权与授权服务(JAAS)中的方法
 
         value = Subject.doAs(call.connection.user, 
                           new PrivilegedExceptionAction<Writable>() {
                              @Override
                              public Writable run() throws Exception {
                                      return call(call.connection.protocol,call.param, call.timestamp); 
                                                  /**调用Server的那个抽象方法(当然是调用它子类的实现啦 (RPC.Server.call()))。其实现的关键代码如下:                                                   public Writable call(Class<?> protocol, Writable param, long receivedTime)throws IOException {
                                                            try {
                                                               Invocation call = (Invocation)param;
                                                               Method method =protocol.getMethod(call.getMethodName(),call.getParameterClasses());
                                                                method.setAccessible(true);
                                                               Object value = method.invoke(instance, call.getParameters());
                                                                 ...
                                                                return new ObjectWritable(method.getReturnType(), value);                                                                } catch (InvocationTargetException e) {
                                                                 ...
                                                             }  这个就是我们常用的java反射调用方法啦  **/
                              }
                           }
                          );
              
          }
          ...
          setupResponse(buf, call, 
                        (error == null) ? Status.SUCCESS : Status.ERROR, 
                        value, errorClass, error); //把返回结果储存在这个Call中的response
          responder.doRespond(call);//把数据回写给客户端,其中会调用一个Response中一个非常重要的方法processResponse()
                                                          //这个方法只在通道空闲时响应(要处理的Call队列长度为1),忙时不会响应,而是交个Responder                                                           //进行集中处理和响应
        }
         ...
    } 
  Responder的关键代码: public void run() {
     ...
      while (running) {
        try {
          waitPending();     // If a channel is being registered, wait.
          writeSelector.select(PURGE_INTERVAL);
          Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
          while (iter.hasNext()) {
            SelectionKey key = iter.next();
            iter.remove();
            try {
              if (key.isValid() && key.isWritable()) {
                  doAsyncWrite(key);//把数据写到缓存区从而发送给客户端,同样调用了processResponse()方法
              }
            } catch (IOException e) {
                  ...
          } 下面来看看processResponse()方法 private boolean processResponse(LinkedList<Call> responseQueue,
                                    boolean inHandler) throws IOException {
      boolean error = true;
      boolean done = false;       // 该通道上没有数据要发送
      int numElements = 0;
      Call call = null;
      try {
        synchronized (responseQueue) {
          // If there are no items for this channel, then we are done
          numElements = responseQueue.size();
          if (numElements == 0) {
            error = false;
            return true;              // no more data for this channel.
          }
           call = responseQueue.removeFirst();
         ...
          //异步写尽可能多的数据
          int numBytes = channelWrite(channel, call.response);
          if (numBytes < 0) {
            return true;
          }
          if (!call.response.hasRemaining()) {
           //应答数据已经写完
            call.connection.decRpcCount();
            if (numElements == 1) {    // last call fully processes.
              done = true;             // no more data for this channel.
            } else {
              done = false;            // more calls pending to be sent.
            }
           ...
          } else {
            //应答数据没有写完,插入队列头,等待再次发送
             call.connection.responseQueue.addFirst(call);
            if (inHandler) {
               //不在Response线程中,在Handler线程中,前面说过当通道空闲时,Handler线程也会调用这个方法往通道中写数据,同样如果                //数据没写完,就需要交给Responder处理,这是就需要把次没写完的数据标记为Responder感兴趣的事件,等待Responder的                //Selector选择出来并处理
              // set the serve time when the response has to be sent later
              call.timestamp = System.currentTimeMillis();
              //成员变量peding++,该变量表示现在有多少个线程在进行通道注册
              incPending();
              try {
                // 唤醒在select()方法上等待的Responder线程
                writeSelector.wakeup();                 //这样才能调用这个注册方法进行注册
                channel.register(writeSelector, SelectionKey.OP_WRITE, call);
              } catch (ClosedChannelException e) {
                //Its ok. channel might be closed else where.
                done = true;
              } finally {
                decPending();
              }
            }
           ...
          }
          error = false;              // everything went off well
        }
      } finally {
       ...
      }
      return done;
    }

我这里只是拿了Hadoop RPC Server最核心的三个类的最核心代码拿出来分析了一下,当然这个Server还包括了大量的异常处理和超时处理,这些都在源代码中有体现,我就不在多说了   -----亲爱的

www.htsjk.Com true http://www.htsjk.com/Hadoop/39793.html NewsArticle Hadoop源代码分析之Hadoop RPC(Server), 1. Server :Hadoop  RPC Server的实现,这是一个抽象类,只有一个抽象方法   public abstract Writable call(Class? protocol,Writable param, long receiveTime)  throws IOExcep...
相关文章
    暂无相关文章
评论暂时关闭