欢迎投稿

今日深度:

hbase协处理器endpoint应用:hbase数据加盐(Salting)后的数据查询方法,hbasesalting

hbase协处理器endpoint应用:hbase数据加盐(Salting)后的数据查询方法,hbasesalting


hbase协处理器endpoint应用:hbase数据加盐(Salting)后的数据查询方法

1 介绍  

上一篇文章中介绍了hbase数据加盐的方法,并简单介绍了加盐后的数据查询思路,但没有给出具体的实现方法,本文则介绍一下具体用hbase协处理器endpoint的实现。

      协处理器分两种类型,系统协处理器可以全局导入region server上的所有数据表,表协处理器即是用户可以指定一张表使用协处理器。协处理器框架为了更好支持其行为的灵活性,提供了两个不同方面的插件。一个是观察者(observer),类似于关系数据库的触发器。另一个是终端(endpoint),动态的终端有点像存储过程。本文介绍的实现是endpoint的应用。

2.实现

2.1      示例

首先看一下hbase给出的示例计算表的行数RowCountEndpoint.java,源代码在hbase-examplesorg.apache.hadoop.hbase.coprocessor.example

 

public voidgetRowCount(RpcController controller, ExampleProtos.CountRequest request,

                         RpcCallback<ExampleProtos.CountResponse> done) {

   Scan scan= newScan();

   scan.setFilter(new FirstKeyOnlyFilter());

   ExampleProtos.CountResponse response = null;

   InternalScanner scanner = null;

   try{

     scanner= env.getRegion().getScanner(scan);

     List<Cell> results = newArrayList<Cell>();

     booleanhasMore= false;

     byte[]lastRow= null;

     longcount= 0;

     do{

        hasMore = scanner.next(results);

        for (Cell kv : results) {

          byte[] currentRow = CellUtil.cloneRow(kv);

          if (lastRow == null|| !Bytes.equals(lastRow, currentRow)){

            lastRow = currentRow;

            count++;

          }

        }

        results.clear();

     } while(hasMore);

 

     response= ExampleProtos.CountResponse.newBuilder()

          .setCount(count).build();

   } catch(IOException ioe){

     ResponseConverter.setControllerException(controller, ioe);

   } finally{

     if(scanner!= null){

        try {

          scanner.close();

        } catch (IOException ignored) {}

     }

   }

   done.run(response);

  }

         实现比较简单,region遍历所有的行返回行数,客户端再把所有的region行数相加即得到整个表的行数。

2.2 server实现

接下来给出仿照RowCountEndpoint实现hbase数据加盐(Salting)后的数据查询方法。

1)接口协议定义

由于hbase内部通信使用的protobuf协议,首先我们要生存协议类,如上面的ExampleProtos,定义自己要实现的协议类DataProtos

package generated;

optionjava_package="com.bigdata.coprocessor.endpoint.generated";

optionjava_outer_classname="DataProtos";

option java_generic_services = true;

option java_generate_equals_and_hash = true;

option optimize_for = SPEED;

 

message DataQueryRequest {

 optional string tableName = 1;

 optional string startRow = 2;

 optional string endRow = 3;

 optional string rowKey = 4;

 optional bool   incluedEnd = 5;

 optional bool   isSalting = 6;

}

message DataQueryResponse {

  messageCell{

        requiredbytes value = 1;

        requiredbytes family = 2;

        requiredbytes qualifier = 3;

        requiredbytes row = 4;

  }

 message Row{

        optionalbytes rowKey = 1;

        repeatedCell cellList = 2;

  }

 repeated Row rowList = 1;

}

 

service QueryDataService{

    rpcqueryByStartRowAndEndRow(DataQueryRequest)

        returns (DataQueryResponse);

    rpcqueryByRowKey(DataQueryRequest)

        returns (DataQueryResponse);

}

里面定义了请求对象DataQueryRequest与响应对象BigDataQueryResponse,定义了一个服务DataService,服务里定义了两个方法,分别是根据起止行rowkey查询和根据单个rowkey查询,然后需要用protoc.exe生成对应的java实现类

执行命令protoc.exe DataProtos.proto --java_out=e:\hbase\protoc-2.4.1即可生成DataProtos.java,protoc.exe工具我也上传了,可以下载使用。

2)实现协处理器

server端代码

 

 

/*

 *Licensed to the Apache Software Foundation (ASF) under one

 * ormore contributor license agreements.  Seethe NOTICE file

 *distributed with this work for additional information

 *regarding copyright ownership.  The ASFlicenses this file

 * toyou under the Apache License, Version 2.0 (the

 *"License"); you may not use this file except in compliance

 *with the License.  You may obtain a copyof the License at

 *

 *http://www.apache.org/licenses/LICENSE-2.0

 *

 *Unless required by applicable law or agreed to in writing, software

 *distributed under the License is distributed on an "AS IS" BASIS,

 *WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 *See the License for the specific language governing permissions and

 *limitations under the License.

 */

 

package com.bigdata.coprocessor.endpoint;

 

import java.io.IOException;

import java.util.ArrayList;

import java.util.List;

 

import org.apache.hadoop.hbase.Cell;

import org.apache.hadoop.hbase.CellUtil;

import org.apache.hadoop.hbase.Coprocessor;

importorg.apache.hadoop.hbase.CoprocessorEnvironment;

import org.apache.hadoop.hbase.KeyValue;

import org.apache.hadoop.hbase.client.Get;

importorg.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.client.Scan;

import org.apache.hadoop.hbase.coprocessor.CoprocessorException;

importorg.apache.hadoop.hbase.coprocessor.CoprocessorService;

importorg.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;

importorg.apache.hadoop.hbase.filter.Filter;

import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;

importorg.apache.hadoop.hbase.filter.InclusiveStopFilter;

importorg.apache.hadoop.hbase.protobuf.ResponseConverter;

importorg.apache.hadoop.hbase.regionserver.HRegion;

importorg.apache.hadoop.hbase.regionserver.InternalScanner;

import org.apache.hadoop.hbase.util.Bytes;

 

import com.google.protobuf.ByteString;

import com.google.protobuf.RpcCallback;

import com.google.protobuf.RpcController;

import com.google.protobuf.Service;

importcom.bigdata.coprocessor.endpoint.generated.DataProtos;

importcom.bigdata.coprocessor.endpoint.generated.DataProtos.DataQueryRequest;

importcom.bigdata.coprocessor.endpoint.generated.DataProtos.DataQueryResponse;

 

public class QueryEndpoint extendsDataProtos.QueryDataService implements

                   Coprocessor,CoprocessorService {

 

         privateRegionCoprocessorEnvironment env;

 

         publicQueryEndpoint() {

         }

 

         /**

          * Just returns a reference to this object,which implements the

          * RowCounterService interface.

          */

         @Override

         publicService getService() {

                   returnthis;

         }

 

         /**

          * Returns a count of the rows in the regionwhere this coprocessor is

          * loaded.

          */

         @Override

         publicvoid queryByStartRowAndEndRow(RpcController controller,

                            DataProtos.DataQueryRequestrequest,

                            RpcCallback<DataProtos.DataQueryResponse>done) {

                   DataProtos.DataQueryResponseresponse = null;

                   InternalScannerscanner = null;

                   try{

                            StringstartRow = request.getStartRow();

                            StringendRow = request.getEndRow();

                            StringregionStartKey = Bytes.toString(this.env.getRegion()

                                               .getRegionInfo().getStartKey());

                            StringregionEndKey = Bytes.toString(this.env.getRegion()

                                               .getRegionInfo().getEndKey());

                            if(request.getIsSalting())// 如果加盐过则在key前添加盐值

                            {

                                     StringstartSalt = null;

                                     StringendSalt = null;

                                     if(null != regionStartKey && !regionStartKey.isEmpty()) {

                                               startSalt= regionStartKey.split("_")[0];// 加盐的方式为盐值+"_",所以取_前面的

 

                                     }

                                     //if(null != regionEndKey && !regionEndKey.isEmpty())

                                     //{

                                     //endSalt = regionStartKey.split("_")[0];//加盐的方式为盐值+"_",所以取_前面的

                                     //

                                     //}

                                     if(null != startSalt) {

                                               if(null != startRow) {

                                                        startRow= startSalt + "_" + startRow;

                                                        endRow= startSalt + "_" + endRow;

                                               }

                                     }

                            }

                            Scanscan = new Scan();

                            if(null != startRow) {

                                     scan.setStartRow(Bytes.toBytes(startRow));

                            }

                            if(null != endRow) {

                                     if(request.getIncluedEnd()) {

                                               Filterfilter = new InclusiveStopFilter(

                                                                 Bytes.toBytes(endRow));

                                               scan.setFilter(filter);

                                     }else {

                                               scan.setStopRow(Bytes.toBytes(endRow));

                                     }

                            }

                            scanner= this.env.getRegion().getScanner(scan);

                            List<Cell>results = new ArrayList<Cell>();

                            booleanhasMore = false;

                            DataProtos.DataQueryResponse.BuilderresponseBuilder = DataProtos.DataQueryResponse

                                               .newBuilder();

                            do{

                                     hasMore= scanner.next(results);

                                     DataProtos.DataQueryResponse.Row.BuilderrowBuilder = DataProtos.DataQueryResponse.Row

                                                        .newBuilder();

                                     if(null != results && results.size() > 0) {

                                               rowBuilder.setRowKey(ByteString.copyFrom(results.get(0)

                                                                 .getRow()));

                                               for(Cell kv : results) {

                                                        DataProtos.DataQueryResponse.Cell.BuildercellBuilder = DataProtos.DataQueryResponse.Cell

                                                                           .newBuilder();

                                                        cellBuilder.setFamily(ByteString.copyFrom(kv

                                                                           .getFamily()));

                                                        cellBuilder.setQualifier(ByteString.copyFrom(kv

                                                                           .getQualifier()));

                                                        cellBuilder.setRow(ByteString.copyFrom(kv.getRow()));

                                                        cellBuilder

                                                                           .setValue(ByteString.copyFrom(kv.getValue()));

                                                        rowBuilder.addCellList(cellBuilder);

                                               }

                                     }

                                     responseBuilder.addRowList(rowBuilder);

                                     results.clear();

                            }while (hasMore);

                            response= responseBuilder.build();

                   }catch (IOException ignored) {

                            ResponseConverter.setControllerException(controller,ignored);

                   }finally {

                            if(scanner != null) {

                                     try{

                                               scanner.close();

                                     }catch (IOException e) {

                                     }

                            }

                   }

                   done.run(response);

         }

 

         @Override

         publicvoid queryByRowKey(RpcController controller,

                            DataQueryRequestrequest, RpcCallback<DataQueryResponse> done) {

                   DataProtos.DataQueryResponseresponse = null;

                   InternalScannerscanner = null;

                   try{

                            StringrowKey = request.getRowKey();

                            StringregionStartKey = Bytes.toString(this.env.getRegion()

                                               .getRegionInfo().getStartKey());

                            StringregionEndKey = Bytes.toString(this.env.getRegion()

                                               .getRegionInfo().getEndKey());

                            if(request.getIsSalting())// 如果加盐过则在key前添加盐值

                            {

                                     StringstartSalt = null;

                                     StringendSalt = null;

                                     if(null != regionStartKey && !regionStartKey.isEmpty()) {

                                               startSalt= regionStartKey.split("_")[0];// 加盐的方式为盐值+"_",所以取_前面的

 

                                     }

 

                                     if(null != startSalt) {

                                               if(null != rowKey) {

                                                        rowKey= startSalt + "_" + rowKey;

                                               }

                                     }

                            }

                            Getget = new Get(Bytes.toBytes(rowKey));

                            Resultresult = this.env.getRegion().get(get);

 

                           

                            DataProtos.DataQueryResponse.BuilderresponseBuilder = DataProtos.DataQueryResponse

                                               .newBuilder();

                            DataProtos.DataQueryResponse.Row.BuilderrowBuilder = DataProtos.DataQueryResponse.Row

                                               .newBuilder();

                            if(null!= result && !result.isEmpty())

                            {

                                     List<KeyValue>list = result.list();

                                     if(null!= list && !list.isEmpty())

                                     {

                                               rowBuilder.setRowKey(ByteString.copyFrom(list.get(0)

                                                                 .getRow()));

                                               for(KeyValuekv : list)

                                               {

                                                        DataProtos.DataQueryResponse.Cell.BuildercellBuilder = DataProtos.DataQueryResponse.Cell

                                                                           .newBuilder();

                                                        cellBuilder.setFamily(ByteString.copyFrom(kv

                                                                           .getFamily()));

                                                        cellBuilder.setQualifier(ByteString.copyFrom(kv

                                                                           .getQualifier()));

                                                        cellBuilder.setRow(ByteString.copyFrom(kv.getRow()));

                                                        cellBuilder

                                                                           .setValue(ByteString.copyFrom(kv.getValue()));

                                                        rowBuilder.addCellList(cellBuilder);

                                               }

                                     }

                            }

                            responseBuilder.addRowList(rowBuilder);

                           

                            response= responseBuilder.build();

                   }catch (IOException ignored) {

                            ResponseConverter.setControllerException(controller,ignored);

                   }finally {

                            if(scanner != null) {

                                     try{

                                               scanner.close();

                                     }catch (IOException e) {

                                     }

                            }

                   }

                   done.run(response);

         }

 

         /**

          * Stores a reference to the coprocessorenvironment provided by the

          * {@linkorg.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from

          * the region where this coprocessor is loaded.Since this is a coprocessor

          * endpoint, it always expects to be loaded ona table region, so always

          * expects this to be an instance of {@linkRegionCoprocessorEnvironment}.

          *

          * @param env

          *           the environment provided by the coprocessor host

          * @throws IOException

          *            if the provided environment is not an instance of

          *            {@code RegionCoprocessorEnvironment}

          */

         @Override

         publicvoid start(CoprocessorEnvironment env) throws IOException {

                   if(env instanceof RegionCoprocessorEnvironment) {

                            this.env= (RegionCoprocessorEnvironment) env;

                   }else {

                            thrownew CoprocessorException("Must be loaded on a table region!");

                   }

         }

 

         @Override

         publicvoid stop(CoprocessorEnvironment env) throws IOException {

                   //nothing to do

         }

 

}

 

2.3 client 查询实现

package com.bigdata.coprocessor.client;

 

import java.io.IOException;

import java.util.LinkedList;

import java.util.List;

import java.util.Map;

 

import org.apache.hadoop.conf.Configuration;

importorg.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.client.HTable;

importorg.apache.hadoop.hbase.client.coprocessor.Batch;

importorg.apache.hadoop.hbase.ipc.BlockingRpcCallback;

import org.apache.hadoop.hbase.ipc.ServerRpcController;

 

importcom.bigdata.coprocessor.endpoint.generated.DataProtos;

import com.google.protobuf.ServiceException;

 

public class QueryDemo

{

    protected static Configuration conf = null;

    static

    {

        conf = HBaseConfiguration.create();

        conf.set("hbase.zookeeper.quorum", "10.175.55.138,10.175.55.139,10.175.55.140");

    }

    static publicList<DataProtos.DataQueryResponse.Row> queryByStartRowAndStopRow(String tableName, String startRow, String stopRow, boolean isIncludeEnd, boolean isSalting)

    {

        final DataProtos.DataQueryRequest.Builder requestBuilder = DataProtos.DataQueryRequest.newBuilder();

        requestBuilder.setTableName(tableName);

        requestBuilder.setStartRow(stopRow);

        requestBuilder.setEndRow(stopRow);

        requestBuilder.setIncluedEnd(isIncludeEnd);

        requestBuilder.setIsSalting(isSalting);

        try

        {

            HTabletable = new HTable(HBaseConfiguration.create(conf), tableName);

            Map<byte[],List<DataProtos.DataQueryResponse.Row>> result = table.coprocessorService(DataProtos.QueryDataService.class, null, null, newBatch.Call<DataProtos.QueryDataService,List<DataProtos.DataQueryResponse.Row>>(){

                public List<DataProtos.DataQueryResponse.Row>call(DataProtos.QueryDataService counter) throws IOException

                {

                    ServerRpcControllercontroller = new ServerRpcController();

                    BlockingRpcCallback<DataProtos.DataQueryResponse>rpcCallback = new BlockingRpcCallback<DataProtos.DataQueryResponse>();

                    counter.queryByStartRowAndEndRow(controller, requestBuilder.build(),rpcCallback);

                    DataProtos.DataQueryResponseresponse = rpcCallback.get();

                    if(controller.failedOnException())

                    {

                        throw controller.getFailedOn();

                    }

                    List<DataProtos.DataQueryResponse.Row>rows = response.getRowListList();

                    if(null != rows)

                    {

                        for(DataProtos.DataQueryResponse.Row row : rows)

                        {

//                          System.out.println(row.getRowKey().toStringUtf8());

                        }

                    }

                    return rows;

                }

            });

           

            List<DataProtos.DataQueryResponse.Row>rets = newLinkedList<DataProtos.DataQueryResponse.Row>();

            for (Map.Entry<byte[],List<DataProtos.DataQueryResponse.Row>> entry: result.entrySet())

            {

                if(null != entry.getValue())

                {

                    rets.addAll(entry.getValue());

                }

            }

            return rets;

        }

        catch(ServiceException e)

        {

           

        }

        catch(Throwable e)

        {

           

        }

        return null;

    }

   

    public static void main(String[] args) throws IOException, Throwable

    {

        List<DataProtos.DataQueryResponse.Row>rows = queryByStartRowAndStopRow("test", "00", "01", true, true);

        if(null != rows)  

        {

            for(DataProtos.DataQueryResponse.Row row : rows)

            {

                System.out.println(row.getRowKey().toStringUtf8());

            }

        }

    }

}

3. 部署

3.1 打包

将代码打包名字随意取,比如打包为query.jar

3.2通过hbase-site.xml增加

 

 

 

 

<property>

  <name>hbase.coprocessor.region.classes</name>

  <value>xxxx.CounterEndPoint </value>

</property>

如果要配置多个,就用逗号(,)分割。

包含此类的jar必须位于hbase的classpath

这种coprocessor是作用于所有的表,如果你只想作用于部分表,请使用下面一种方式。

3.3通过shell方式

 讲query.jar 上传到hdfs目录/coprocessor/下
增加:

 

 

 

 

 

hbase(main):005:0> alter 't1', METHOD => 'table_att',

'coprocessor'=>'hdfs:///coprocessor/query.jar| com.bigdata.coprocessor.endpoint. QueryEndpoint 1001|arg1=1,arg2=2'

Updating all regions with the new schema...

1/1 regions updated.

Done.

0 row(s) in 1.0730 seconds

coprocessor格式为:
[FilePath]|ClassName|Priority|arguments
arguments: k=v[,k=v]+

其中FilePath是hdfs路径,例如/coprocessor/query.jar

ClassNameEndPoint实现类的全名

Priority为,整数,框架会根据这个数据决定多个cp的执行顺序

Arguments,传给cp的参数

如果hbase的classpath包含改类,FilePath可以留空

3.4 卸载

   先describe“tableName‘,查看你要卸载的cp的编号

   然后alter 't1',METHOD => 'table_att_unset', NAME=> 'coprocessor$3',coprocessor$3可变。

 


www.htsjk.Com true http://www.htsjk.com/hbase/26109.html NewsArticle hbase协处理器endpoint应用:hbase数据加盐(Salting)后的数据查询方法,hbasesalting hbase协处理器endpoint应用:hbase数据加盐(Salting)后的数据查询方法 1 介绍   上一篇文章中介绍了hbase数据...
相关文章
    暂无相关文章
评论暂时关闭