欢迎投稿

今日深度:

HBase rowkey设计实例,

HBase rowkey设计实例,


需求:绘制渠道用户的每日趋势(每分钟一组数据一天1440组,2000+个渠道,区分新/老用户,2*1440*2000+=576万+/每天),需要保存90天。

查询条件:渠道号、新or老用户、日期

rowkey:渠道_日期_新or老用户_小时分钟(hhmm)

 

连接HBase

from thrift.protocol import TBinaryProtocol
from thrift.transport import TSocket
from thrift.transport import TTransport
from hbase import Hbase

def hbase_connect():
    transport = TSocket.TSocket('*', 9090)
    # transport = TSocket.TSocket('10.50.14.105', 9090)
    hbase_transport = TTransport.TBufferedTransport(transport)
    protocol = TBinaryProtocol.TBinaryProtocol(transport)
    hbase_client = Hbase.Client(protocol)
    hbase_transport.open()
    return hbase_transport, hbase_client

创建表:

def create_hbase_table():
    transport, client = hbase_connect()
    tables = client.getTableNames()
    print tables
    client.createTable('client_rt_pv', [Hbase.ColumnDescriptor(name = 'default')])
    tables = client.getTableNames()
    print tables

 

插入数据:

mutationsbatch = []

#### loop
rowkey = '_'.join([tmp_pub, dayStr, 'ac', tmp_ct])
mutations = [
    Hbase.Mutation(column="default:pv", value=str(tmp_pv)),
    Hbase.Mutation(column="default:uv", value=str(tmp_uv)),
    Hbase.Mutation(column="default:pvdivuv", value=str('%.2f' % (tmp_pv/float(tmp_uv) if tmp_uv != 0 else 0, ))),
    Hbase.Mutation(column="default:tm", value=str(tmp_ct)),
    Hbase.Mutation(column="default:pub", value=str("".join([tmp_pub, ' ']))),
    Hbase.Mutation(column="default:pubname", value=pub_id.get(tmp_pub, 'unknown'))]

mutationsbatch.append(Hbase.BatchMutation(row=rowkey, mutations=mutations))

### end loop
hbase_client.mutateRows("client_rt_pv", mutationsbatch, None)
hive_transport.close()

 

www.htsjk.Com true http://www.htsjk.com/hbase/41805.html NewsArticle HBase rowkey设计实例, 需求:绘制渠道用户的每日趋势(每分钟一组数据一天1440组,2000+个渠道,区分新/老用户,2*1440*2000+=576万+/每天),需要保存90天。 查询条件:渠道号、新or老用户...
评论暂时关闭