hbase协处理器endpoint应用:hbase数据加盐(Salting)后的数据查询方法,hbasesalting
hbase协处理器endpoint应用:hbase数据加盐(Salting)后的数据查询方法
1 介绍
上一篇文章中介绍了hbase数据加盐的方法,并简单介绍了加盐后的数据查询思路,但没有给出具体的实现方法,本文则介绍一下具体用hbase协处理器endpoint的实现。
协处理器分两种类型,系统协处理器可以全局导入region server上的所有数据表,表协处理器即是用户可以指定一张表使用协处理器。协处理器框架为了更好支持其行为的灵活性,提供了两个不同方面的插件。一个是观察者(observer),类似于关系数据库的触发器。另一个是终端(endpoint),动态的终端有点像存储过程。本文介绍的实现是endpoint的应用。
2.实现
2.1 示例
首先看一下hbase给出的示例计算表的行数RowCountEndpoint.java,源代码在hbase-examples的org.apache.hadoop.hbase.coprocessor.example
public voidgetRowCount(RpcController controller, ExampleProtos.CountRequest request,
RpcCallback<ExampleProtos.CountResponse> done) {
Scan scan= newScan();
scan.setFilter(new FirstKeyOnlyFilter());
ExampleProtos.CountResponse response = null;
InternalScanner scanner = null;
try{
scanner= env.getRegion().getScanner(scan);
List<Cell> results = newArrayList<Cell>();
booleanhasMore= false;
byte[]lastRow= null;
longcount= 0;
do{
hasMore = scanner.next(results);
for (Cell kv : results) {
byte[] currentRow = CellUtil.cloneRow(kv);
if (lastRow == null|| !Bytes.equals(lastRow, currentRow)){
lastRow = currentRow;
count++;
}
}
results.clear();
} while(hasMore);
response= ExampleProtos.CountResponse.newBuilder()
.setCount(count).build();
} catch(IOException ioe){
ResponseConverter.setControllerException(controller, ioe);
} finally{
if(scanner!= null){
try {
scanner.close();
} catch (IOException ignored) {}
}
}
done.run(response);
}
实现比较简单,region遍历所有的行返回行数,客户端再把所有的region行数相加即得到整个表的行数。
2.2 server实现
接下来给出仿照RowCountEndpoint实现hbase数据加盐(Salting)后的数据查询方法。
1)接口协议定义
由于hbase内部通信使用的protobuf协议,首先我们要生存协议类,如上面的ExampleProtos,定义自己要实现的协议类DataProtos
package generated;
optionjava_package="com.bigdata.coprocessor.endpoint.generated";
optionjava_outer_classname="DataProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
message DataQueryRequest {
optional string tableName = 1;
optional string startRow = 2;
optional string endRow = 3;
optional string rowKey = 4;
optional bool incluedEnd = 5;
optional bool isSalting = 6;
}
message DataQueryResponse {
messageCell{
requiredbytes value = 1;
requiredbytes family = 2;
requiredbytes qualifier = 3;
requiredbytes row = 4;
}
message Row{
optionalbytes rowKey = 1;
repeatedCell cellList = 2;
}
repeated Row rowList = 1;
}
service QueryDataService{
rpcqueryByStartRowAndEndRow(DataQueryRequest)
returns (DataQueryResponse);
rpcqueryByRowKey(DataQueryRequest)
returns (DataQueryResponse);
}
里面定义了请求对象DataQueryRequest与响应对象BigDataQueryResponse,定义了一个服务DataService,服务里定义了两个方法,分别是根据起止行rowkey查询和根据单个rowkey查询,然后需要用protoc.exe生成对应的java实现类
执行命令protoc.exe DataProtos.proto --java_out=e:\hbase\protoc-2.4.1即可生成DataProtos.java,protoc.exe工具我也上传了,可以下载使用。
2)实现协处理器
server端代码
/*
*Licensed to the Apache Software Foundation (ASF) under one
* ormore contributor license agreements. Seethe NOTICE file
*distributed with this work for additional information
*regarding copyright ownership. The ASFlicenses this file
* toyou under the Apache License, Version 2.0 (the
*"License"); you may not use this file except in compliance
*with the License. You may obtain a copyof the License at
*
*http://www.apache.org/licenses/LICENSE-2.0
*
*Unless required by applicable law or agreed to in writing, software
*distributed under the License is distributed on an "AS IS" BASIS,
*WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*See the License for the specific language governing permissions and
*limitations under the License.
*/
package com.bigdata.coprocessor.endpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.Coprocessor;
importorg.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
importorg.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
importorg.apache.hadoop.hbase.coprocessor.CoprocessorService;
importorg.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
importorg.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
importorg.apache.hadoop.hbase.filter.InclusiveStopFilter;
importorg.apache.hadoop.hbase.protobuf.ResponseConverter;
importorg.apache.hadoop.hbase.regionserver.HRegion;
importorg.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.protobuf.ByteString;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
importcom.bigdata.coprocessor.endpoint.generated.DataProtos;
importcom.bigdata.coprocessor.endpoint.generated.DataProtos.DataQueryRequest;
importcom.bigdata.coprocessor.endpoint.generated.DataProtos.DataQueryResponse;
public class QueryEndpoint extendsDataProtos.QueryDataService implements
Coprocessor,CoprocessorService {
privateRegionCoprocessorEnvironment env;
publicQueryEndpoint() {
}
/**
* Just returns a reference to this object,which implements the
* RowCounterService interface.
*/
@Override
publicService getService() {
returnthis;
}
/**
* Returns a count of the rows in the regionwhere this coprocessor is
* loaded.
*/
@Override
publicvoid queryByStartRowAndEndRow(RpcController controller,
DataProtos.DataQueryRequestrequest,
RpcCallback<DataProtos.DataQueryResponse>done) {
DataProtos.DataQueryResponseresponse = null;
InternalScannerscanner = null;
try{
StringstartRow = request.getStartRow();
StringendRow = request.getEndRow();
StringregionStartKey = Bytes.toString(this.env.getRegion()
.getRegionInfo().getStartKey());
StringregionEndKey = Bytes.toString(this.env.getRegion()
.getRegionInfo().getEndKey());
if(request.getIsSalting())// 如果加盐过则在key前添加盐值
{
StringstartSalt = null;
StringendSalt = null;
if(null != regionStartKey && !regionStartKey.isEmpty()) {
startSalt= regionStartKey.split("_")[0];// 加盐的方式为盐值+"_",所以取_前面的
}
//if(null != regionEndKey && !regionEndKey.isEmpty())
//{
//endSalt = regionStartKey.split("_")[0];//加盐的方式为盐值+"_",所以取_前面的
//
//}
if(null != startSalt) {
if(null != startRow) {
startRow= startSalt + "_" + startRow;
endRow= startSalt + "_" + endRow;
}
}
}
Scanscan = new Scan();
if(null != startRow) {
scan.setStartRow(Bytes.toBytes(startRow));
}
if(null != endRow) {
if(request.getIncluedEnd()) {
Filterfilter = new InclusiveStopFilter(
Bytes.toBytes(endRow));
scan.setFilter(filter);
}else {
scan.setStopRow(Bytes.toBytes(endRow));
}
}
scanner= this.env.getRegion().getScanner(scan);
List<Cell>results = new ArrayList<Cell>();
booleanhasMore = false;
DataProtos.DataQueryResponse.BuilderresponseBuilder = DataProtos.DataQueryResponse
.newBuilder();
do{
hasMore= scanner.next(results);
DataProtos.DataQueryResponse.Row.BuilderrowBuilder = DataProtos.DataQueryResponse.Row
.newBuilder();
if(null != results && results.size() > 0) {
rowBuilder.setRowKey(ByteString.copyFrom(results.get(0)
.getRow()));
for(Cell kv : results) {
DataProtos.DataQueryResponse.Cell.BuildercellBuilder = DataProtos.DataQueryResponse.Cell
.newBuilder();
cellBuilder.setFamily(ByteString.copyFrom(kv
.getFamily()));
cellBuilder.setQualifier(ByteString.copyFrom(kv
.getQualifier()));
cellBuilder.setRow(ByteString.copyFrom(kv.getRow()));
cellBuilder
.setValue(ByteString.copyFrom(kv.getValue()));
rowBuilder.addCellList(cellBuilder);
}
}
responseBuilder.addRowList(rowBuilder);
results.clear();
}while (hasMore);
response= responseBuilder.build();
}catch (IOException ignored) {
ResponseConverter.setControllerException(controller,ignored);
}finally {
if(scanner != null) {
try{
scanner.close();
}catch (IOException e) {
}
}
}
done.run(response);
}
@Override
publicvoid queryByRowKey(RpcController controller,
DataQueryRequestrequest, RpcCallback<DataQueryResponse> done) {
DataProtos.DataQueryResponseresponse = null;
InternalScannerscanner = null;
try{
StringrowKey = request.getRowKey();
StringregionStartKey = Bytes.toString(this.env.getRegion()
.getRegionInfo().getStartKey());
StringregionEndKey = Bytes.toString(this.env.getRegion()
.getRegionInfo().getEndKey());
if(request.getIsSalting())// 如果加盐过则在key前添加盐值
{
StringstartSalt = null;
StringendSalt = null;
if(null != regionStartKey && !regionStartKey.isEmpty()) {
startSalt= regionStartKey.split("_")[0];// 加盐的方式为盐值+"_",所以取_前面的
}
if(null != startSalt) {
if(null != rowKey) {
rowKey= startSalt + "_" + rowKey;
}
}
}
Getget = new Get(Bytes.toBytes(rowKey));
Resultresult = this.env.getRegion().get(get);
DataProtos.DataQueryResponse.BuilderresponseBuilder = DataProtos.DataQueryResponse
.newBuilder();
DataProtos.DataQueryResponse.Row.BuilderrowBuilder = DataProtos.DataQueryResponse.Row
.newBuilder();
if(null!= result && !result.isEmpty())
{
List<KeyValue>list = result.list();
if(null!= list && !list.isEmpty())
{
rowBuilder.setRowKey(ByteString.copyFrom(list.get(0)
.getRow()));
for(KeyValuekv : list)
{
DataProtos.DataQueryResponse.Cell.BuildercellBuilder = DataProtos.DataQueryResponse.Cell
.newBuilder();
cellBuilder.setFamily(ByteString.copyFrom(kv
.getFamily()));
cellBuilder.setQualifier(ByteString.copyFrom(kv
.getQualifier()));
cellBuilder.setRow(ByteString.copyFrom(kv.getRow()));
cellBuilder
.setValue(ByteString.copyFrom(kv.getValue()));
rowBuilder.addCellList(cellBuilder);
}
}
}
responseBuilder.addRowList(rowBuilder);
response= responseBuilder.build();
}catch (IOException ignored) {
ResponseConverter.setControllerException(controller,ignored);
}finally {
if(scanner != null) {
try{
scanner.close();
}catch (IOException e) {
}
}
}
done.run(response);
}
/**
* Stores a reference to the coprocessorenvironment provided by the
* {@linkorg.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from
* the region where this coprocessor is loaded.Since this is a coprocessor
* endpoint, it always expects to be loaded ona table region, so always
* expects this to be an instance of {@linkRegionCoprocessorEnvironment}.
*
* @param env
* the environment provided by the coprocessor host
* @throws IOException
* if the provided environment is not an instance of
* {@code RegionCoprocessorEnvironment}
*/
@Override
publicvoid start(CoprocessorEnvironment env) throws IOException {
if(env instanceof RegionCoprocessorEnvironment) {
this.env= (RegionCoprocessorEnvironment) env;
}else {
thrownew CoprocessorException("Must be loaded on a table region!");
}
}
@Override
publicvoid stop(CoprocessorEnvironment env) throws IOException {
//nothing to do
}
}
2.3 client 查询实现
package com.bigdata.coprocessor.client;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
importorg.apache.hadoop.hbase.client.coprocessor.Batch;
importorg.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
importcom.bigdata.coprocessor.endpoint.generated.DataProtos;
import com.google.protobuf.ServiceException;
public class QueryDemo
{
protected static Configuration conf = null;
static
{
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "10.175.55.138,10.175.55.139,10.175.55.140");
}
static publicList<DataProtos.DataQueryResponse.Row> queryByStartRowAndStopRow(String tableName, String startRow, String stopRow, boolean isIncludeEnd, boolean isSalting)
{
final DataProtos.DataQueryRequest.Builder requestBuilder = DataProtos.DataQueryRequest.newBuilder();
requestBuilder.setTableName(tableName);
requestBuilder.setStartRow(stopRow);
requestBuilder.setEndRow(stopRow);
requestBuilder.setIncluedEnd(isIncludeEnd);
requestBuilder.setIsSalting(isSalting);
try
{
HTabletable =
new HTable(HBaseConfiguration.create(conf),
tableName);
Map<byte[],List<DataProtos.DataQueryResponse.Row>> result = table.coprocessorService(DataProtos.QueryDataService.class, null, null, newBatch.Call<DataProtos.QueryDataService,List<DataProtos.DataQueryResponse.Row>>(){
public List<DataProtos.DataQueryResponse.Row>call(DataProtos.QueryDataService counter) throws IOException
{
ServerRpcControllercontroller = new ServerRpcController();
BlockingRpcCallback<DataProtos.DataQueryResponse>rpcCallback = new BlockingRpcCallback<DataProtos.DataQueryResponse>();
counter.queryByStartRowAndEndRow(controller, requestBuilder.build(),rpcCallback);
DataProtos.DataQueryResponseresponse = rpcCallback.get();
if(controller.failedOnException())
{
throw controller.getFailedOn();
}
List<DataProtos.DataQueryResponse.Row>rows = response.getRowListList();
if(null != rows)
{
for(DataProtos.DataQueryResponse.Row row : rows)
{
// System.out.println(row.getRowKey().toStringUtf8());
}
}
return rows;
}
});
List<DataProtos.DataQueryResponse.Row>rets = newLinkedList<DataProtos.DataQueryResponse.Row>();
for (Map.Entry<byte[],List<DataProtos.DataQueryResponse.Row>> entry: result.entrySet())
{
if(null != entry.getValue())
{
rets.addAll(entry.getValue());
}
}
return rets;
}
catch(ServiceException e)
{
}
catch(Throwable e)
{
}
return null;
}
public static void main(String[] args) throws IOException, Throwable
{
List<DataProtos.DataQueryResponse.Row>rows = queryByStartRowAndStopRow("test", "00", "01", true, true);
if(null != rows)
{
for(DataProtos.DataQueryResponse.Row row : rows)
{
System.out.println(row.getRowKey().toStringUtf8());
}
}
}
}
3. 部署
3.1 打包
将代码打包名字随意取,比如打包为query.jar
3.2通过hbase-site.xml增加
|
<property> <name>hbase.coprocessor.region.classes</name> <value>xxxx.CounterEndPoint </value> </property> |
如果要配置多个,就用逗号(,)分割。
包含此类的jar必须位于hbase的classpath
这种coprocessor是作用于所有的表,如果你只想作用于部分表,请使用下面一种方式。
3.3通过shell方式
讲query.jar 上传到hdfs目录/coprocessor/下
增加:
|
hbase(main):005:0> alter 't1', METHOD => 'table_att', 'coprocessor'=>'hdfs:///coprocessor/query.jar| com.bigdata.coprocessor.endpoint. QueryEndpoint 1001|arg1=1,arg2=2' Updating all regions with the new schema... 1/1 regions updated. Done. 0 row(s) in 1.0730 seconds |
coprocessor格式为:
[FilePath]|ClassName|Priority|arguments
arguments: k=v[,k=v]+
其中FilePath是hdfs路径,例如/coprocessor/query.jar
ClassNameEndPoint实现类的全名
Priority为,整数,框架会根据这个数据决定多个cp的执行顺序
Arguments,传给cp的参数
如果hbase的classpath包含改类,FilePath可以留空
3.4 卸载
先describe“tableName‘,查看你要卸载的cp的编号
然后alter 't1',METHOD => 'table_att_unset', NAME=> 'coprocessor$3',coprocessor$3可变。