HBase的RPC源码分析,HBaseRPC源码分析
RPC服务是指跨网络的服务调用,客户端发出服务请求,经过网络传输到服务端。服务端解析该请求,调用本地方法获取结果,然后将结果作为响应包通过网络发送回客户端,这样客户端在调用远程方法时就会像调用本地方法一样简单。
RPC调用时有两个问题需要解决,其一是client端与server端采用何种方式通信,其二是请求信息和结果以何种格式在网络上传输,也就是RPC通信协议和RPC通信框架。通信协议需要client端和server端约定好请求参数的类型、参数顺序以及相应结果的类型等等,通信框架则定义了两端通信的方式,可选的包括TCP/UDP通信,HTTP等。除此之外,RPC框架还需要解决server端服务调用超时、client端的重试、服务端限流和server端的调度等问题。
HBase中的master/regionserver/client等组件在RPC中的关系如下图中所示:
其中,client与master之间的通信主要是hbase的DML操作,包括table schema的更改,table region的迁移合并、region server的上线和下线以及集群负载均衡、Table的快照管理等功能;client与regionserver之间的通信用于实现数据读写请求,如get、multiGet、mutate、scan、bulkLoadHfile、执行coprocessor等;master与region
server之间的通信用于region server向master汇报自身的状态,包括自身管理的region和自身的当前状态等等。
下面分别从客户端和服务端的角度分析hbase中rpc调用的实现。在客户端中,hbase的rpc服务可以拆分为两个主要流程,分别是请求构建和请求处理。下面我们以hbase的get请求为例,以求清晰解释客户端的rpc流程。首先讲解请求是如何构建的。
上面这段代码就是get请求的构建流程,其将get请求封装进了一个RegionServerCallable对象,最后调用rpcCallerFactory中的callWithRetries方法开始rpc调用。这里有一个需要注意的伏笔,就是get请求最终是落在了下面这一句:
ClientProtos.GetResponse response = getStub().get(controller, request)
也就是说getStub中返回的ClientService.BlockingInterface管理了最终请求的处理。
上面的先按下不表,接着进入callWithRetries方法,其第一个try块中的代码就是其主要逻辑。如下图中所示:
简而言之就是分别调用了callable的prepare方法和call方法,call方法的定义我们在上面看到了,那么prepare方法做了什么事呢,进入prepare方法,分析其代码:
prepare方法接收一个reload参数以决定其每次链接regionserver失败时是否会重新定位region所在的位置,一般设为true。上面代码的关键是最后一句:
setStub(getConnection().getClient(this.location.getServerName()))setStub实际上是设置了callable中的ClientService.BlockingInterface类型成员变量。至此,呼应了前面埋下的伏笔。ClientService.BlockingInterface关联了rpc请求的处理。客户端发往服务端rpc请求可以划分两类,分别是涉及集群管理的DML操作请求和涉及数据读写的DDL操作请求,这两类操作请求分别由AdminService.BlockingInterface&ClientService.BlockingInterface两个类来处理。
getClient方法的主要代码如下图中所示:
getClient主要做了两件事,首先根据端口名,主机名&调用方法名构造了key,接着调用了rpcClient的createBlockingRpcChannel方法。Hbase中有多种不同类型的rpcClient,我们以最常见的AsyncRpcClient为例继续讲解。
在createBlockingRpcChannel内部实例化了一个BlockingRpcChannelImplementation类。该类调用了rpcClient的CallBlockingMethod方法,这个方法实际上是一个callable方法,不同的rpcClient中定义了不同的call实现,在AsyncRpcClient的call实现中的主要流程如下:
上图展示了主要代码,后面的都是错误处理的代码。可以看出这里用到了netty提供的异步模型Future/Promise方法。在这个模型中,Future表示一个可能还没有实际完成的异步任务的结果,针对这个结果可以添加Callback以便在任务执行成功或失败后作出对应的操作,而Promise交由任务执行者,任务执行者可以通过Promise标记任务完成或者失败。
这里的异步任务是经过callMethod方法包装的,callMethod方法接受一个MethodDescriptor类型的参数method,并将其封装为AsyncCall类型的call,然后调用writeRequest()。writeRequest()方法位于AsyncRpcChannel.java中,其将请求call的参数封装好写入到netty的channel中,并添加一个CallWriteListener用于监听channel的写入结果。
上述代码主要位于AsyncRpcChannel.java、AbstractRpcClient.java&AsyncRpcClient.java三个源文件中。通过分析上述代码,可以看到hbase的异步rpc请求在实现上是依赖了netty提供的promise/future模型的。
此外,还有一个问题就是client端与server端的socket连接是在什么时候建立起来的?
上面说了AbstractRpcClient这个类是个抽象类,各个rpcClient会覆写这个类中的方法,在这个类的callBlockingMethod中建立起了client端与server端的连接,具体是在下面一句:
val = call(pcrc,md,param,returnType,ticket,isa);在call方法中依次调用了connection.tracedWriteRequest->writeRequest->setupIOstreams->setupConnection,最终在setupConnection中建立起了本次rpc与服务端的连接,建立连接的主要代码主要代码如下:
while(true){
try{
this.socket=socketFactory.createSocket();
this.socket.setTcpNoDelay(tcpNoDelay);
this.socket.setKeepAlive(tcpKeepAlive);
if(localAddr != null) {
this.socket.bind(localAddr);
}
NetUtils.connect(this.socket, remoteId.getAddress(),connectTO);
this.socket.setSoTimeout(readTO);
return;
}catch(SocketTimeoutException toe){
handleConnectionFailure(timeoutFailures++,maxRetries,toe);
}catch(IOException ie){
handleConnectionFailure(ioFailures++,maxRetries,toe);
}
}maxRetries定义了建连的重试次数,从源码可以看出,对待io错误和socket超时这里是分别计算重试次数的。
至此,rpc请求的处理简要叙述完毕。笔者水平有限,上述文中不妥的地方欢迎大家一起讨论。
参考资料:
1、http://www.binospace.com/index.php/in-depth-analysis-hbase-rpc-0-95-version-implementation-mechanism/ 2、http://www.fireflies.me/2014/01/%E4%B8%89%E3%80%81hbase%E7%9A%84rpc%E6%A1%86%E6%9E%B6/ 3、http://blog.csdn.net/JavaMan_chen/article/details/47039517