HBase源码分析之Client端操作处理,hbase源码client端
hbase version 1.1.5
使用HBase的服务要先建立与HBase服务器的连接,这里我们从hbase client的建立连接开始HBase源码分析的旅程。
先看一段hbase连接的代码
Connection connection = ConnectionFactory.createConnection(config);
Table table = connection.getTable(TableName.valueOf("table1"));
try {
// Use the table as needed, for a single operation and a single thread
} finally {
table.close();
connection.close();
}
Connection接口
Connection接口有定义以下常用方法
Table getTable(TableName tableName) // 返回一个Table的实现(HTable),用于访问hbase的表
public RegionLocator getRegionLocator(TableName tableName) //返回表的region信息
Admin getAdmin() //返回Admin的实现来管理HBase集群
这里我们先分析Table接口以及其实现HTable,先看看Table接口定义的方法,hbase table的所有的操作都定义在这了。
boolean exists(Get get)
Result get(Get get)
Result[] get(List<Get> gets)
ResultScanner getScanner(Scan scan)
void put(Put put)
void put(List<Put> puts)
void delete(Delete delete)
void mutateRow(final RowMutations rm)
HTable的实现
Get
接下来我们来看HTable的实现,首先我们从Get方法开始
先看最简单的单个Get请求
// HTable.java
private Result get(Get get, final boolean checkExistenceOnly) throws IOException {
// if we are changing settings to the get, clone it.
...
if (get.getConsistency() == Consistency.STRONG) {
// Good old call.
final Get getReq = get;
RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
getName(), get.getRow()) {
@Override
public Result call(int callTimeout) throws IOException {
ClientProtos.GetRequest request =
RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), getReq);
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
controller.setCallTimeout(callTimeout);
try {
ClientProtos.GetResponse response = getStub().get(controller, request);
if (response == null) return null;
return ProtobufUtil.toResult(response.getResult());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
};
return rpcCallerFactory.<Result>newCaller(rpcTimeout).callWithRetries(callable,
this.operationTimeout);
}
// Call that takes into account the replica
RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas(
rpcControllerFactory, tableName, this.connection, get, pool,
connConfiguration.getRetriesNumber(),
operationTimeout,
connConfiguration.getPrimaryCallTimeoutMicroSecond());
return callable.call();
}
通过代码可以看出get请求直接构造了一个RegionServerCallable对象,通过RegionServerCallable的文档说该类是实现一个到RegionServer的调用,那怎么确定是哪个RegionServer和怎么和RegionServer建立连接的呢?我们来看看RegionServerCallable的prepare方法
// RegionServerCallable.java
/**
* Prepare for connection to the server hosting region with row from tablename. Does lookup
* to find region location and hosting server.
* @param reload Set this to true if connection should re-find the region
* @throws IOException e
*/
@Override
public void prepare(final boolean reload) throws IOException {
try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
this.location = regionLocator.getRegionLocation(row, reload);
}
if (this.location == null) {
throw new IOException("Failed to find location, tableName=" + tableName +
", row=" + Bytes.toString(row) + ", reload=" + reload);
}
setStub(getConnection().getClient(this.location.getServerName()));
}
通过这里我们可以看出通过regionLocator.getRegionLocation 获取row所属region的regionserver的地址信息。再通过调用HConnetion.getClient 建立和对应regionserver的连接,返回一个regionserver的ClientProtocol proxy 。这样我们就可以封装一个请求和regionserver通信了。调用regionserver rpcserver的get方法。
// HRegionLocation.java
public class HRegionLocation implements Comparable<HRegionLocation> {
private final HRegionInfo regionInfo;
private final ServerName serverName;
private final long seqNum;
这里单个的Get方法的客户端流程就走完了。下面我们来看看传入一个List的Get方法
// HTable.java
public Result[] get(List<Get> gets) throws IOException {
if (gets.size() == 1) {
return new Result[]{get(gets.get(0))};
}
try {
Object [] r1 = batch((List)gets);
// translate.
Result [] results = new Result[r1.length];
...
}
}
实际上这里就是调用了batch方法,
// HTable.java
protected AsyncProcess multiAp;
public void batch(final List<? extends Row> actions, final Object[] results)
throws InterruptedException, IOException {
AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
}
}
这里通过AsyncProcess.submitAll方法来提交操作请求
// AsyncProcess.java
public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
List<Action<Row>> actions = new ArrayList<Action<Row>>(rows.size());
// The position will be used by the processBatch to match the object array returned.
int posInList = -1;
NonceGenerator ng = this.connection.getNonceGenerator();
for (Row r : rows) {
posInList++;
if (r instanceof Put) {
Put put = (Put) r;
if (put.isEmpty()) {
throw new IllegalArgumentException("No columns to insert for #" + (posInList+1)+ " item");
}
}
Action<Row> action = new Action<Row>(r, posInList);
setNonce(ng, r, action);
actions.add(action);
}
AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null);
ars.groupAndSendMultiAction(actions, 1);
return ars;
}
添加一个nonce的批次信息后,再通过AsyncRequestFutureImpl.groupAndSendMultiAction提交
private void groupAndSendMultiAction(List<Action<Row>> currentActions, int numAttempt) {
Map<ServerName, MultiAction<Row>> actionsByServer =
new HashMap<ServerName, MultiAction<Row>>();
boolean isReplica = false;
List<Action<Row>> unknownReplicaActions = null;
for (Action<Row> action : currentActions) {
RegionLocations locs = findAllLocationsOrFail(action, true);
if (locs == null) continue;
boolean isReplicaAction = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
if (isReplica && !isReplicaAction) {
// This is the property of the current implementation, not a requirement.
throw new AssertionError("Replica and non-replica actions in the same retry");
}
isReplica = isReplicaAction;
HRegionLocation loc = locs.getRegionLocation(action.getReplicaId());
if (loc == null || loc.getServerName() == null) {
if (isReplica) {
if (unknownReplicaActions == null) {
unknownReplicaActions = new ArrayList<Action<Row>>();
}
unknownReplicaActions.add(action);
} else {
// TODO: relies on primary location always being fetched
manageLocationError(action, null);
}
} else {
byte[] regionName = loc.getRegionInfo().getRegionName();
addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
}
}
boolean doStartReplica = (numAttempt == 1 && !isReplica && hasAnyReplicaGets);
boolean hasUnknown = unknownReplicaActions != null && !unknownReplicaActions.isEmpty();
if (!actionsByServer.isEmpty()) {
// If this is a first attempt to group and send, no replicas, we need replica thread.
sendMultiAction(actionsByServer, numAttempt, (doStartReplica && !hasUnknown)
? currentActions : null, numAttempt > 1 && !hasUnknown);
}
if (hasUnknown) {
...
}
}
这里根据row region所在的regionserver进行分组,然后调用sendMultiAction。sendMultiAction中根据actionsByServer的分组,分别构造一个SingleServerRequestRunnable的Runnable对象提交到线程池之行。其执行流程与单个get方法一致,只是调用的是regionserver rpcserver的multi方法。
Delete
// HTable.java
public void delete(final Delete delete)
public void delete(final List<Delete> deletes)
delete方法与get方法类似,也是有单个和批量2个方法,其内部处理流程也与get方法一致,这里就不再说了。
其他的append和increment方法的处理流程与单个get类似,只是最后调用的是server端的mutate方法。
Put
// HTable.java
@Override
public void put(final Put put) throws IOException {
getBufferedMutator().mutate(put);
if (autoFlush) {
flushCommits();
}
}
public void put(final List<Put> puts) throws IOException {
getBufferedMutator().mutate(puts);
if (autoFlush) {
flushCommits();
}
}
Put方法的单个处理和多个处理的流程是一致的,都是调用了getBufferedMutator().mutate(puts)来处理的。
// HTable.java
BufferedMutator getBufferedMutator()
BufferedMutator看其javadoc说是与Table接口一样,与单个HBase表通信,但是只用于异步的批量puts的操作。来看看异步是怎么实现的
// BufferedMutatorImpl.java
public synchronized void mutate(Mutation m) throws InterruptedIOException,
RetriesExhaustedWithDetailsException {
doMutate(m);
}
private void doMutate(Mutation m) throws InterruptedIOException,
RetriesExhaustedWithDetailsException {
if (closed) {
throw new IllegalStateException("Cannot put when the BufferedMutator is closed.");
}
if (!(m instanceof Put) && !(m instanceof Delete)) {
throw new IllegalArgumentException("Pass a Delete or a Put");
}
// This behavior is highly non-intuitive... it does not protect us against
// 94-incompatible behavior, which is a timing issue because hasError, the below code
// and setter of hasError are not synchronized. Perhaps it should be removed.
if (ap.hasError()) {
writeAsyncBuffer.add(m);
backgroundFlushCommits(true);
}
if (m instanceof Put) {
validatePut((Put) m);
}
currentWriteBufferSize += m.heapSize();
//private List<Row> writeAsyncBuffer = new LinkedList<>();
writeAsyncBuffer.add(m);
while (currentWriteBufferSize > writeBufferSize) {
backgroundFlushCommits(false);
}
}
doMutate方法的主要处理为将请求的操作放到writeAsyncBuffer里面,当占用大小大于writeBufferSize(由hbase.client.write.buffer配置,默认2097152)时调用backgroundFlushCommits。
// BufferedMutatorImpl.java
/**
* Send the operations in the buffer to the servers. Does not wait for the server's answer. If
* the is an error (max retried reach from a previous flush or bad operation), it tries to send
* all operations in the buffer and sends an exception.
*
* @param synchronous - if true, sends all the writes and wait for all of them to finish before
* returning.
*/
private void backgroundFlushCommits(boolean synchronous) throws InterruptedIOException,
RetriesExhaustedWithDetailsException {
try {
if (!synchronous) {
ap.submit(tableName, writeAsyncBuffer, true, null, false);
if (ap.hasError()) {
LOG.debug(tableName + ": One or more of the operations have failed -"
+ " waiting for all operation in progress to finish (successfully or not)");
}
}
if (synchronous || ap.hasError()) {
while (!writeAsyncBuffer.isEmpty()) {
ap.submit(tableName, writeAsyncBuffer, true, null, false);
}
RetriesExhaustedWithDetailsException error = ap.waitForAllPreviousOpsAndReset(null);
if (error != null) {
if (listener == null) {
throw error;
} else {
this.listener.onException(error, this);
}
}
}
} finally {
currentWriteBufferSize = 0;
for (Row mut : writeAsyncBuffer) {
if (mut instanceof Mutation) {
currentWriteBufferSize += ((Mutation) mut).heapSize();
}
}
}
}
backgroundFlushCommits方法的功能为给servers发送buffer中的多个操作,但是不等待服务器的应答(通过boolean synchronous参数控制是同步还是异步,doMutate调用时用的是false,用异步方式)。如果有失败会重新发送(这里是同步的处理)。
主要代码为
// BufferedMutatorImpl.java
protected AsyncProcess ap;
ap.submit(tableName, writeAsyncBuffer, true, null, false);
下面来看看submit方法
//AsyncProcess.java
/**
1. Extract from the rows list what we can submit. The rows we can not submit are kept in the
2. list. Does not send requests to replicas (not currently used for anything other
3. than streaming puts anyway).
4. 5. @param pool ExecutorService to use.
5. @param tableName The table for which this request is needed.
6. @param callback Batch callback. Only called on success (94 behavior).
7. @param needResults Whether results are needed, or can be discarded.
8. @param rows - the submitted row. Modified by the method: we remove the rows we took.
9. @param atLeastOne true if we should submit at least a subset.
*/
public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName,
List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback,
boolean needResults) throws InterruptedIOException {
if (rows.isEmpty()) {
return NO_REQS_RESULT;
}
Map<ServerName, MultiAction<Row>> actionsByServer =
new HashMap<ServerName, MultiAction<Row>>();
List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size());
NonceGenerator ng = this.connection.getNonceGenerator();
long nonceGroup = ng.getNonceGroup(); // Currently, nonce group is per entire client.
// Location errors that happen before we decide what requests to take.
List<Exception> locationErrors = null;
List<Integer> locationErrorRows = null;
do {
// Wait until there is at least one slot for a new task.
waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1);
// Remember the previous decisions about regions or region servers we put in the
// final multi.
Map<HRegionInfo, Boolean> regionIncluded = new HashMap<HRegionInfo, Boolean>();
Map<ServerName, Boolean> serverIncluded = new HashMap<ServerName, Boolean>();
int posInList = -1;
Iterator<? extends Row> it = rows.iterator();
while (it.hasNext()) {
Row r = it.next();
HRegionLocation loc;
try {
if (r == null) throw new IllegalArgumentException("#" + id + ", row cannot be null");
// Make sure we get 0-s replica.
RegionLocations locs = connection.locateRegion(
tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID);
if (locs == null || locs.isEmpty() || locs.getDefaultRegionLocation() == null) {
throw new IOException("#" + id + ", no location found, aborting submit for"
+ " tableName=" + tableName + " rowkey=" + Bytes.toStringBinary(r.getRow()));
}
loc = locs.getDefaultRegionLocation();
} catch (IOException ex) {
locationErrors = new ArrayList<Exception>();
locationErrorRows = new ArrayList<Integer>();
LOG.error("Failed to get region location ", ex);
// This action failed before creating ars. Retain it, but do not add to submit list.
// We will then add it to ars in an already-failed state.
retainedActions.add(new Action<Row>(r, ++posInList));
locationErrors.add(ex);
locationErrorRows.add(posInList);
it.remove();
break; // Backward compat: we stop considering actions on location error.
}
if (canTakeOperation(loc, regionIncluded, serverIncluded)) {
Action<Row> action = new Action<Row>(r, ++posInList);
setNonce(ng, r, action);
retainedActions.add(action);
// TODO: replica-get is not supported on this path
byte[] regionName = loc.getRegionInfo().getRegionName();
addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
it.remove();
}
}
} while (retainedActions.isEmpty() && atLeastOne && (locationErrors == null));
if (retainedActions.isEmpty()) return NO_REQS_RESULT;
return submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults,
locationErrors, locationErrorRows, actionsByServer, pool);
}
这个方法比较长,这里大致说下其处理流程
1、由于多个操作会访问不同的regionserver,这里先根据操作请求内容获取其regionserver并做分组
Map<ServerName, MultiAction<Row>> actionsByServer =
new HashMap<ServerName, MultiAction<Row>>();
2、提交分好组的请求 submitMultiActions,submitMultiActions里面再调用了sendMultiAction,这个就和batch方法最终调用的一样了。
<CResult> AsyncRequestFuture submitMultiActions(TableName tableName,
List<Action<Row>> retainedActions, long nonceGroup, Batch.Callback<CResult> callback,
Object[] results, boolean needResults, List<Exception> locationErrors,
List<Integer> locationErrorRows, Map<ServerName, MultiAction<Row>> actionsByServer,
ExecutorService pool) {
AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
tableName, retainedActions, nonceGroup, pool, callback, results, needResults);
// Add location errors if any
if (locationErrors != null) {
...
}
}
ars.sendMultiAction(actionsByServer, 1, null, false);
return ars;
}
其他说明:
基于性能考虑,会缓存历史的region和regionserver的信息
Map<HRegionInfo, Boolean> regionIncluded = new HashMap<HRegionInfo, Boolean>();
Map<ServerName, Boolean> serverIncluded = new HashMap<ServerName, Boolean>();
并发控制处理
maxTotalConcurrentTasks参数hbase.client.max.total.tasks默认值100
maxConcurrentTasksPerServer 参数hbase.client.max.perserver.tasks默认值2
maxConcurrentTasksPerRegion 参数hbase.client.max.perregion.tasks默认值1
scan
// HTable.java
public ResultScanner getScanner(final Scan scan) throws IOException {
if (scan.getBatch() > 0 && scan.isSmall()) {
throw new IllegalArgumentException("Small scan should not be used with batching");
}
if (scan.getCaching() <= 0) {
scan.setCaching(getScannerCaching());
}
if (scan.getMaxResultSize() <= 0) {
scan.setMaxResultSize(scannerMaxResultSize);
}
if (scan.isReversed()) {
if (scan.isSmall()) {
return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
} else {
return new ReversedClientScanner(getConfiguration(), scan, getName(),
this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
}
}
if (scan.isSmall()) {
return new ClientSmallScanner(getConfiguration(), scan, getName(),
this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
} else {
return new ClientScanner(getConfiguration(), scan, getName(), this.connection,
this.rpcCallerFactory, this.rpcControllerFactory,
pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
}
}
这里根据isSmall和isReversed的不同,分别返回4个不同的ClientScanner子类
isSmall: small scan可以获得更好的性能,其使用pread方式。在客户端其将openScanner,next,closeScanner 放在一个RPC请求中。总的来说就是通过减少网络IO来达到提高性能的效果的。这种场景呢是适用于scan的范围在一个data block(64k)的。
isReversed: 使用反向的方式scan数据
下面先来看看ClientScanner
// ClientScanner.java
/**
* Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start
* row maybe changed changed.
* @param conf The {@link Configuration} to use.
* @param scan {@link Scan} to use in this scanner
* @param tableName The table that we wish to scan
* @param connection Connection identifying the cluster
* @throws IOException
*/
public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
throws IOException {
...
this.conf = conf;
initializeScannerInConstruction();
}
protected void initializeScannerInConstruction() throws IOException{
// initialize the scanner
nextScanner(this.caching, false);
}
/*
* Gets a scanner for the next region. If this.currentRegion != null, then
* we will move to the endrow of this.currentRegion. Else we will get
* scanner at the scan.getStartRow(). We will go no further, just tidy
* up outstanding scanners, if <code>currentRegion != null</code> and
* <code>done</code> is true.
* @param nbRows
* @param done Server-side says we're done scanning.
*/
protected boolean nextScanner(int nbRows, final boolean done)
throws IOException {
// Close the previous scanner if it's open
if (this.callable != null) {
this.callable.setClose();
call(callable, caller, scannerTimeout);
this.callable = null;
}
// Where to start the next scanner
byte [] localStartKey;
// if we're at end of table, close and return false to stop iterating
if (this.currentRegion != null) {
byte [] endKey = this.currentRegion.getEndKey();
if (endKey == null ||
Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
checkScanStopRow(endKey) ||
done) {
close();
if (LOG.isTraceEnabled()) {
LOG.trace("Finished " + this.currentRegion);
}
return false;
}
localStartKey = endKey;
if (LOG.isTraceEnabled()) {
LOG.trace("Finished " + this.currentRegion);
}
} else {
localStartKey = this.scan.getStartRow();
}
if (LOG.isDebugEnabled() && this.currentRegion != null) {
// Only worth logging if NOT first region in scan.
LOG.debug("Advancing internal scanner to startKey at '" +
Bytes.toStringBinary(localStartKey) + "'");
}
try {
callable = getScannerCallable(localStartKey, nbRows);
// Open a scanner on the region server starting at the
// beginning of the region
call(callable, caller, scannerTimeout);
this.currentRegion = callable.getHRegionInfo();
if (this.scanMetrics != null) {
this.scanMetrics.countOfRegions.incrementAndGet();
}
} catch (IOException e) {
close();
throw e;
}
return true;
}
nextScanner方法是获取下一个region的scanner,
// ClientScanner.java
protected ScannerCallableWithReplicas getScannerCallable(byte [] localStartKey,
int nbRows) {
scan.setStartRow(localStartKey);
ScannerCallable s =
new ScannerCallable(getConnection(), getTable(), scan, this.scanMetrics,
this.rpcControllerFactory);
s.setCaching(nbRows);
ScannerCallableWithReplicas sr = new ScannerCallableWithReplicas(tableName, getConnection(),
s, pool, primaryOperationTimeout, scan,
retries, scannerTimeout, caching, conf, caller);
return sr;
}
Result[] call(ScannerCallableWithReplicas callable,
RpcRetryingCaller<Result[]> caller, int scannerTimeout)
throws IOException, RuntimeException {
if (Thread.interrupted()) {
throw new InterruptedIOException();
}
// callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
// we do a callWithRetries
return caller.callWithoutRetries(callable, scannerTimeout);
}
// RpcRetryingCaller.java
public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
throws IOException, RuntimeException {
// The code of this method should be shared with withRetries.
this.globalStartTime = EnvironmentEdgeManager.currentTime();
try {
callable.prepare(false);
return callable.call(callTimeout);
} catch (Throwable t) {
Throwable t2 = translateException(t);
ExceptionUtil.rethrowIfInterrupt(t2);
// It would be nice to clear the location cache here.
if (t2 instanceof IOException) {
throw (IOException)t2;
} else {
throw new RuntimeException(t2);
}
}
}
最终调用的是ScannerCallable, 先prepare方法获取region所在的regionserver地址,然后建立rpc连接。然后在call方法中构造scan的rpc请求,调用rpcserver的scan方法,至此初始化ClientScanner完成。
// ScannerCallable.java
public void prepare(boolean reload) throws IOException {
if (Thread.interrupted()) {
throw new InterruptedIOException();
}
RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(!reload,
id, getConnection(), getTableName(), getRow());
location = id < rl.size() ? rl.getRegionLocation(id) : null;
if (location == null || location.getServerName() == null) {
// With this exception, there will be a retry. The location can be null for a replica
// when the table is created or after a split.
throw new HBaseIOException("There is no location for replica id #" + id);
}
ServerName dest = location.getServerName();
setStub(super.getConnection().getClient(dest));
if (!instantiated || reload) {
checkIfRegionServerIsRemote();
instantiated = true;
}
...
}
再来看看ClientScanner的next方法
public Result next() throws IOException {
// If the scanner is closed and there's nothing left in the cache, next is a no-op.
if (cache.size() == 0 && this.closed) {
return null;
}
if (cache.size() == 0) {
loadCache();
}
if (cache.size() > 0) {
return cache.poll();
}
// if we exhausted this scanner before calling close, write out the scan metrics
writeScanMetrics();
return null;
}
protected void loadCache() throws IOException {
Result[] values = null;
long remainingResultSize = maxScannerResultSize;
int countdown = this.caching;
// We need to reset it if it's a new callable that was created with a countdown in nextScanner
callable.setCaching(this.caching);
// This flag is set when we want to skip the result returned. We do
// this when we reset scanner because it split under us.
boolean retryAfterOutOfOrderException = true;
// We don't expect that the server will have more results for us if
// it doesn't tell us otherwise. We rely on the size or count of results
boolean serverHasMoreResults = false;
boolean allResultsSkipped = false;
do {
allResultsSkipped = false;
try {
// Server returns a null values if scanning is to stop. Else,
// returns an empty array if scanning is to go on and we've just
// exhausted current region.
values = call(callable, caller, scannerTimeout);
// When the replica switch happens, we need to do certain operations again.
// The callable will openScanner with the right startkey but we need to pick up
// from there. Bypass the rest of the loop and let the catch-up happen in the beginning
// of the loop as it happens for the cases where we see exceptions.
// Since only openScanner would have happened, values would be null
if (values == null && callable.switchedToADifferentReplica()) {
// Any accumulated partial results are no longer valid since the callable will
// openScanner with the correct startkey and we must pick up from there
clearPartialResults();
this.currentRegion = callable.getHRegionInfo();
continue;
}
retryAfterOutOfOrderException = true;
...
}
这里主要看loadCache方法,也是通过调用call来从服务器端读取数据。
再看ClientSmallScanner
// ClientSmallScanner.java
protected void initializeScannerInConstruction() throws IOException {
// No need to initialize the scanner when constructing instance, do it when
// calling next(). Do nothing here.
}
初始化的时候不处理,在next方法中才去调用nextScanner。
至此hbase client的基本操作就分析完了。