欢迎投稿

今日深度:

hbase学习,

hbase学习,


主要基于Thrift进行了HBase接口的封装。接口中主要提供了getgetMultiplegetScannerResults、分批返回数据、putputMultiple几个方法。

其中ByteBuffer tableHBase的表名;

TGetTScan类似于HBase原生的GetScan

keysvc是平台为每个应用分配的验证码和服务名,主要用于权限验证等。

1.   get方法

根据TGet从表table中查询出来一条记录

public TResult get(ByteBuffer table, TGet get, ByteBuffer key, ByteBuffer svc)

样例代码:

ByteBuffer table = ByteBuffer.wrap("test_large".getBytes());

ByteBuffer key = ByteBuffer.wrap("key".getBytes());

ByteBuffer svc = ByteBuffer.wrap("svc".getBytes());

 

//打开链接

TTransport transport = new TSocket(host, port, timeout);

TProtocol protocol = new TBinaryProtocol(transport);

THBaseService.Iface client = new THBaseService.Client(protocol);       

transport.open();

 

//设置TGet

TGet get = new TGet();

get.setRow("row_5000".getBytes());

//查询

TResult result = client.get(table, get, key, svc);

for (TColumnValue resultColumnValue : result.getColumnValues()) {

          System.out.println("family = " + new String(resultColumnValue.getFamily()));

          System.out.println("qualifier = " + new String(resultColumnValue.getFamily()));

          System.out.println("value = " + new String(resultColumnValue.getValue()));

          System.out.println("timestamp = " + resultColumnValue.getTimestamp());

}

transport.close();

 

2.   getMultiple方法

根据List<TGet>从表table中查询出来多条记录

public List<TResult> getMultiple(ByteBuffer table, List<TGet> gets, ByteBuffer key, ByteBuffer svc)

样例代码:

TTransport transport = new TSocket(host, port, timeout);

TProtocol protocol = new TBinaryProtocol(transport);

THBaseService.Iface client = new THBaseService.Client(protocol);       

transport.open();

 

//设置TGet

TGet get = new TGet();

get.setRow("row_5000".getBytes());

List<TGet> gets = new ArrayList<TGet>();

gets.add(get);

get = new TGet();

get.setRow("row2".getBytes());

gets.add(get);

//查询

List<TResult> results = client.getMultiple(table, gets, key, svc);

3.   getScannerResults方法

根据TScan从表table中查询出来numRows条记录

public List<TResult> getScannerResults(ByteBuffer table, TScan scan, int numRows, ByteBuffer key, ByteBuffer svc)

样例代码:

TTransport transport = new TSocket(host, port, timeout);

TProtocol protocol = new TBinaryProtocol(transport);

THBaseService.Iface client = new THBaseService.Client(protocol);       

transport.open();

 

//设置Tscan

TScan paramTScan = new TScan();

paramTScan.setCaching(1000);

paramTScan.setStartRow(Bytes.toBytes("row_0"));

 

//查询

int rowNum =10;

List<TResult> results = client.getScannerResults(table, paramTScan, rowNum, key, svc);

transport.close();

4.   分批返回数据的方法

//使用scan取得scannerId

public int openScanner(ByteBuffer table, TScan scan, ByteBuffer key, ByteBuffer svc)

//根据scannerId取得numRows条记录

public List<TResult> getScannerRows(int scannerId, int numRows)

//关闭Scanner

public void closeScanner(int scannerId)

 

样例代码:

TTransport transport = new TSocket(host, port, timeout);

TProtocol protocol = new TBinaryProtocol(transport);

THBaseService.Iface client = new THBaseService.Client(protocol);       

transport.open();

 

//设置TScan

TScan paramTScan = new TScan();

paramTScan.setCaching(1000);

paramTScan.setStartRow(Bytes.toBytes("row_0"));

//取得scannerId

int scannerId = client.openScanner(table, paramTScan, key, svc);

List<TResult> results = null;

int rowNum = 1000;

while (true) {

     //根据scannerId取得numRows条记录

    results = client.getScannerRows(scannerId, rowNum);

    if (results == null || results.size() == 0) {

         break;

    }

       

    for (int i = 0; i < results.size(); i++) {

        TResult result = results.get(i);

        //TODO

    }

       

}

       

client.closeScanner(scannerId);

transport.close();

5.   TScan中的Filter设置

TScan中设置Filter时,只支持设置字符串格式。其中字符串格式和原生的HBase Shell中的格式相同。

样例代码:

TScan paramTScan = new TScan();

paramTScan.setCaching(1000);

paramTScan.setFilterString("PrefixFilter ('row5')".getBytes());

6.   put方法

put插入到表table

publicvoid put(ByteBuffer table, TPut put, ByteBuffer key, ByteBuffer svc)

 

样例代码:

TTransport transport = new TSocket(host, port, timeout);

TProtocol protocol = new TBinaryProtocol(transport);

THBaseService.Iface client = new THBaseService.Client(protocol);       

transport.open();

 

//拼装put

TPut put = new TPut();

//设置key

put.setRow("row3".getBytes());

 

//设置列   

TColumnValue cell  = new TColumnValue();

cell.setFamily("family1".getBytes());

cell.setQualifier("qualifier1".getBytes());

cell.setValue("value3".getBytes());

//组合多个列

List<TColumnValue> columnValues = new ArrayList<TColumnValue>();

columnValues.add(cell);

put.setColumnValues(columnValues);

 

//put插入到example表中

client.put(ByteBuffer.wrap("example".getBytes()), put, key, svc);

       

transport.close();

7.   putMultiple方法

puts批量插入到表table

publicvoid putMultiple(ByteBuffer table, List<TPut> puts, ByteBuffer key, ByteBuffer svc)

 

样例代码:

TTransport transport = new TSocket(host, port, timeout);

TProtocol protocol = new TBinaryProtocol(transport);

THBaseService.Iface client = new THBaseService.Client(protocol);       

transport.open();

//拼装一个put

TPut put = new TPut();

put.setRow("row4".getBytes());

       

TColumnValue cell  = new TColumnValue();

cell.setFamily("family1".getBytes());

cell.setQualifier("qualifier1".getBytes());

cell.setValue("value4".getBytes());

       

List<TColumnValue> columnValues = new ArrayList<TColumnValue>();

columnValues.add(cell);

put.setColumnValues(columnValues);

       

//put加入到List<TPut>

List<TPut> puts = new ArrayList<TPut>();

puts.add(put);

       

//拼装另一个put

put = new TPut();

put.setRow("row5".getBytes());

       

cell  = new TColumnValue();

cell.setFamily("family1".getBytes());

cell.setQualifier("qualifier1".getBytes());

cell.setValue("value5".getBytes());

       

columnValues = new ArrayList<TColumnValue>();

columnValues.add(cell);

put.setColumnValues(columnValues);

 

//put加入到List<TPut>

puts.add(put);

       

//List<TPut>批量插入到表example

client.putMultiple(ByteBuffer.wrap("example".getBytes()), puts, key, svc);

       

transport.close();

8.   客户端连接池

建议客户端使用连接池来管理TSocket,这样可以提供效率,稳定性。平台提供了一个连接池供使用。

GenericConnectionProvider 线程安全的。初始化一次GenericConnectionProvider后,可以在多线程中使用genericConnectionProvider. getConnection()方法取得TSocketTSocket使用完成后,调用GenericConnectionProvider. returnCon(TSocket)方法归还TSocket给连接池。

在进程结束或者tomcat关闭时,可以调用GenericConnectionProvider.destroy()方法,销毁连接池。

样例代码:

GenericConnectionProvider connectionProvider = new GenericConnectionProvider();

    connectionProvider.setServiceIP(TestAiThriftClient.host);

    connectionProvider.setServicePort(TestAiThriftClient.port);

    connectionProvider.setConTimeOut(TestAiThriftClient.timeout);

    connectionProvider.setMaxActive(20);

    connectionProvider.setTestOnBorrow(true);

    connectionProvider.setTestOnReturn(true);

connectionProvider.setTestWhileIdle(true);

 

//初始化连接池

connectionProvider.init();

 

//可以在多线程中,从连接池中取得连接

TSocket socket = connectionProvider.getConnection();

TProtocol protocol = new TBinaryProtocol(socket);

THBaseService.Iface client = new THBaseService.Client(protocol);

 

//执行查询逻辑

... ... ...

 

//查询完成后,归还连接给连接池。最好在tryfinally块中执行该动作。

connectionProvider.returnCon(socket);

 

//进程停止或tomcat关闭时,须关闭连接池

connectionProvider.destroy();


www.htsjk.Com true http://www.htsjk.com/hbase/42203.html NewsArticle hbase学习, 主要基于 Thrift 进行了 HBase 接口的封装。接口中主要提供了 get 、 getMultiple 、 getScannerResults 、分批返回数据、 put 、 putMultiple 几个方法。 其中 ByteBuffer table 指 HBase 的表名;...
相关文章
    暂无相关文章
评论暂时关闭