hbase笔记,
1.HBase(NoSQL)的数据模型1.1 表(table),是存储管理数据的。
1.2 行键(row key),类似于MySQL中的主键。
行键是HBase表天然自带的。
1.3 列族(column family),列的集合。
HBase中列族是需要在定义表时指定的,列是在插入记录时动态增加的。
HBase表中的数据,按列族来存储,每个列族单独一个文件存储。
(按行存储适合查询一行的数据,按列存储适合查询一列的数据)。
1.4 时间戳(timestamp),列(也称作标签、修饰符)的一个属性。
行键和列确定的单元格,可以存储多个数据,每个数据含有时间戳属性,数据具有版本特性。
如果不指定时间戳或者版本,默认取最新的数据。
1.5 存储的数据都是字节数组。(字节存储是可以存储任何形式的数据,图片,文本,音频等)
1.6 表中的数据是按照行键的顺序物理存储的。(行键的顺序,按行健的ascII码排序存储)。
1.7由{row key, column( =<family> + <label>), version} 唯一确定的单元。cell中的数据是没有类型的,全部是字节码形式存贮。
2.HBase的物理模型
2.1 HBase是适合海量数据(如20PB)的秒级简单查询的数据库。
2.2 HBase表中的记录,按照行键进行拆分, 拆分成一个个的region。
许多个region存储在region server(单独的物理机器)中的。
这样,对表的操作转化为对多台region server的并行查询。
3.HBase的体系结构
3.1 HBase是主从式结构,HMaster、HRegionServer
4.HBase伪分布安装
4.1 解压缩、重命名、设置环境变量
4.2 修改$HBASE_HOME/conf/hbase-env.sh,修改内容如下:
export JAVA_HOME=/usr/local/jdk
export HBASE_MANAGES_ZK=true
4.2 修改$HBASE_HOME/conf/hbase-site.xml,修改内容如下:
<property>
<name>hbase.rootdir</name>
<value>hdfs://hadoop0:9000/hbase</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>hadoop0</value>
</property>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
4.3 (可选)文件regionservers的内容为hadoop0
4.4 启动hbase,执行命令start-hbase.sh
******启动hbase之前,确保hadoop是运行正常的,并且可以写入文件*******
4.5 验证:(1)执行jps,发现新增加了3个java进程,分别是HMaster、HRegionServer、HQuorumPeer
(2)使用浏览器访问http://hadoop0:60010
===================================================================
create
create 'users','user_id','address','info'
disable 'users'
drop 'users'
describe 'users'
put 'users','xiaoming','info:age','24'
put 'users','xiaoming','info:birthday','1987-06-17'
put 'users','xiaoming','info:company','alibaba'
put 'users','xiaoming','address:contry','china'
put 'users','xiaoming','address:province','zhejiang'
put 'users','xiaoming','address:city','hangzhou'
put 'users','zhangyifei','info:birthday','1987-4-17'
put 'users','zhangyifei','info:favorite','movie'
put 'users','zhangyifei','info:company','alibaba'
put 'users','zhangyifei','address:contry','china'
put 'users','zhangyifei','address:province','guangdong'
put 'users','zhangyifei','address:city','jieyang'
put 'users','zhangyifei','address:town','xianqiao'
scan 'users'
get 'users','xiaoming'
get 'users','xiaoming' ,'info:birthday'
put 'users','xiaoming','info:age' ,'29'
get 'users','xiaoming','info:age'
put 'users','xiaoming','info:age' ,'30'
get 'users','xiaoming','info:age'
get 'users','xiaoming' ,'info'
获取单元格数据的版本数据
get 'users','xiaoming',{COLUMN=>'info:age',VERSIONS=>1}
get 'users','xiaoming',{COLUMN=>'info:age',VERSIONS=>2}
get 'users','xiaoming',{COLUMN=>'info:age',VERSIONS=>3}
get 'users','xiaoming',{COLUMN=>'info:age',TIMESTAMP=>1436239985527}
get 'users','xiaoming','info:age'
delete 'users','xiaoming','info:age'
get 'users','xiaoming'
put 'users','xiaoming','info:age' ,'31'
deleteall 'users','xiaoming'
count 'users'
truncate 'users'
truncate 'users'
hbase的机群搭建过程(在原来的hadoop0上的hbase伪分布基础上进行搭建) 1.1 集群结构,主节点(hmaster)是hadoop0,从节点(region server)是hadoop1和hadoop2 1.2 修改hadoop0上的hbase的几个文件 (1)修改hbase-env.sh的最后一行export HBASE_MANAGES_ZK=false (2)修改hbase-site.xml文件的hbase.zookeeper.quorum的值为hadoop0,hadoop1,hadoop2 (3)修改regionservers文件(存放的region server的hostname),内容修改为hadoop1、hadoop2 1.3 复制hadoop0中的hbase文件夹到hadoop1、hadoop2中 复制hadoop0中的/etc/profile到hadoop1、hadoop2中,在hadoop1、hadoop2上执行source /etc/profile 1.4 首先启动hadoop,然后启动zookeeper集群。 最后在hadoop0上启动hbase集群。
1hbase java 搭建: 1.1.建立Java项目 1.2.把hbase的的压缩包解压,在把lib目录下的jar包导入到下面中。 2搭建环境遇到的问题: 2.1使用jdk需要在1.7以上的版本。不然报错
org/apache/hadoop/eclipse/NewMapReduceProjectWizard : Unsupported major.minor version 51.0解决办法
The selected wizard could not be started.
Plug-in org.apache.hadoop.eclipse was unable to load class org.apache.hadoop.eclipse.NewMapReduceProjectWizard.
org/apache/hadoop/eclipse/NewMapReduceProjectWizard : Unsupported major.minor version 51.0
Hadoop学习笔记—15.HBase框架学习(基础实践篇)
参考链接:http://www.cnblogs.com/edisonchou/p/4405906.htmlHBase -- 基于HDFS的开源分布式NoSQL数据库
参考链接:http://www.cnblogs.com/hanganglin/p/4128472.htmlHBase(Hadoop Database)是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统,我们可以利用HBase技术在廉价的PC上搭建起大规模结构化存储集群。同Google的Bigtable基于GFS(Google FileSystem)所提供分布式数据存储服务一样,HBase基于HDFS之上也能提供类似的分布式数据存储服务。HBase利用Hadoop HDFS作为其文件存储系统,利用Hadoop MapReduce来处理HBase中的海量数据,利用Zookeeper作为协调工具。HBase为海量数据的及时查询提供了一个很好的开源解决方案,在大数据处理时代扮演者重要的角色。
一、HBase基础知识1、逻辑数据模型
HBase不支持条件查询和Order By等复杂查询,读取记录时只能按行键或全表扫描。HBase表中存储的数据都是字节数组,并且数据是以行键的物理顺序存储。它介于NoSQL与RDBMS之间,仅支持单行事务(可通过hive来支持实现多表join等复杂操作),主要用来存储非结构化和半结构化数据。
几个关键概念:
1) 表(table):用于存储管理数据,具有稀疏的、面向列的特点。HBase中的每一张表,就是所谓的大表(Bigtable),可以有上亿行,上百万列。对于为值为空的列,并不占用存储空间,因此表可以设计的非常稀疏。
2) 行键(RowKey):类似于MySQL中的主键,HBase根据行键来快速检索数据,一个行键对应一条记录。与MySQL主键不同的是,HBase的行键是天然固有的,每一行数据都存在行键。
3) 列族(ColumnFamily):是列的集合。列族在表定义时需要指定,而列在插入数据时动态指定。列中的数据都是以二进制形式存在,没有数据类型。在物理存储结构上,每个表中的每个列族单独以一个文件存储(参见图1.2)。
4) 时间戳(TimeStamp):是列的一个属性,是一个64位整数。由行键和列确定的单元格,可以存储多个数据,每个数据含有时间戳属性,数据具有版本特性。可根据版本(VERSIONS)或时间戳来指定查询历史版本数据,如果都不指定,则默认返回最新版本的数据。
5) 区域(Region):HBase自动把表水平划分成的多个区域,划分的区域随着数据的增大而增多。
图1.1 HBase-Regin存储结构
图1.2 HBase逻辑数据模型与物理数据模型
2、物理数据模型
HBase表中所有行按照行键的字典序列排序,并且在行的方向上分割为多个HRegion。
Region按大小分割的,每个表一开始只有一个Region,随着数据不断插入表,region不断增大,当增大到一个阀值的时候,HRegion就会等分为两个新的HRegion。当table中的行不断增多,就会有越来越多的HRegion。Hregion是Hbase中分布式存储和负载均衡的最小单元,最小单元就表示不同的Hregion可以分布在不同的HRegion server上,但一个Hregion是不会拆分到多个server上的。需要注意的是,HRegion虽然是分布式存储的最小单元,但并不是存储的最小单元。
图1.3 HBase的数据模型
3、架构体系
1) Client:①使用HBase RPC机制与HMaster和HRegionServer进行通信;②Client与HMaster进行通信进行管理类操作;③Client与HRegionServer进行数据读写类操作。
2) Zookeeper:①保证任何时候,集群中只有一个running master,避免单点问题;②存贮所有Region的寻址入口,包括-ROOT-表地址、HMaster地址;③实时监控Region Server的状态,将Region server的上线和下线信息,实时通知给Master;④存储Hbase的schema,包括有哪些table,每个table有哪些column family。
3) HMaster:可以启动多个HMaster,通过Zookeeper的Master Election机制保证总有一个Master运行。角色功能:①为Region server分配region;②负责region server的负载均衡;③发现失效的region serve并重新分配其上的region;④GFS上的垃圾文件回收;⑤处理用户对标的增删改查操作。
4) HRegionServer:HBase中最核心的模块,主要负责响应用户I/O请求,向HDFS文件系统中读写数据。作用:①维护Master分配给它的region,处理对这些region的IO请求;②负责切分在运行过程中变得过大的region。此外,HRegionServer管理一些列HRegion对象,每个HRegion对应Table中一个Region,HRegion由多个HStore组成,每个HStore对应Table中一个Column Family的存储,Column Family就是一个集中的存储单元,故将具有相同IO特性的Column放在一个Column Family会更高效。
5) HStore:HBase存储的核心,由MemStore和StoreFile组成。用户写入数据的流程为:client写入 -> 存入MemStore,一直到MemStore满 -> Flush成一个StoreFile,直至增长到一定阈值 -> 触发Compact合并操作 -> 多个StoreFile合并成一个StoreFile,同时进行版本合并和数据删除 -> 当StoreFiles Compact后,逐步形成越来越大的StoreFile -> 单个StoreFile大小超过一定阈值后,触发Split操作,把当前Region Split成2个Region,Region会下线,新Split出的2个孩子Region会被HMaster分配到相应的HRegionServer上,使得原先1个Region的压力得以分流到2个Region上,如图所示。
图1.4 HStore写入流程
6) HLog:在分布式系统环境中,无法避免系统出错或者宕机,一旦HRegionServer意外退出,MemStore中的内存数据就会丢失,引入HLog就是防止这种情况。工作机制:每个HRegionServer中都会有一个HLog对象,HLog是一个实现Write Ahead Log的类,每次用户操作写入Memstore的同时,也会写一份数据到HLog文件,HLog文件定期会滚动出新,并删除旧的文件(已持久化到StoreFile中的数据)。当HRegionServer意外终止后,HMaster会通过Zookeeper感知,HMaster首先处理遗留的HLog文件,将不同region的log数据拆分,分别放到相应region目录下,然后再将失效的region重新分配,领取到这些region的HRegionServer在Load Region的过程中,会发现有历史HLog需要处理,因此会Replay HLog中的数据到MemStore中,然后flush到StoreFiles,完成数据恢复。
架构体系如图1.5所示:
图1.5 HBase架构体系图
4、两张特殊的表
HBase中存有两张特殊的表,-ROOT-和.META.。
1) .META.:记录了用户表的Region信息,.META.可以有多个regoin。
2) -ROOT-:记录了.META.表的Region信息,-ROOT-只有一个region。Zookeeper中记录了-ROOT-表的location。
Client访问用户数据之前需要首先访问zookeeper,然后访问-ROOT-表,接着访问.META.表,最后才能找到用户数据的位置去访问,如图1.6所示:
图1.6 Client访问HBase数据顺序图
二、HBase安装本实例安装环境:① 操作系统:CentOS;② HBase版本:1.0.1(hbase-1.0.1-bin.tar.gz)。
1、伪分布式模式安装
1) 上传hbase压缩包到/usr/local下并解压,然后重新命名为hbase,执行命令:① tar -zxvf hbase-0.94.7-security.tar.gz;② mv hbase-0.94.7-security hbase;
2) 配置环境变量,增加一行hbase环境配置HBASE_HOME=/usr/local/hbase,并在PATH中增加$HBASE_HOME/bin,执行命令:① vi /etc/profile;② source /etc/profile;
3) 修改配置文件,执行命令:vi /usr/local/hbase/conf/hbase-env.sh,将# export JAVA_HOME=/usr/java/jdk1.6.0/行去掉注视,并更改为正确路径/usr/local/jdk/,然后去掉# export HBASE_MANAGES_ZK=true行的注视;
4) 修改文件hbase-site.xml,执行命令:vi /usr/local/hbase/conf/hbase-site.xml,增加如下内容:
<property><name>hbase.rootdir</name><value>hdfs://hadoop0:9000/hbase</value></property><property><name>hbase.cluster.distributed</name><value>true</value></property><property><name>hbase.zookeeper.quorum</name><value>hadoop0</value></property><property><name>dfs.replication</name><value>1</value></property>
5) (可选)修改regionservers为localhost;
6) 启动HBase,执行命令:start-hbase.sh,注意启动hbase之前必须确保hadoop已启动并可写入数据。启动后,会增加HMaster、HQuorumPeer、HRegionServer三个进程。
注:HBase也伪分布式模式也支持使用本地磁盘作为存储路径,不需要hdfs的支持。只需要修改hbase-env.sh中的JAVA_HOME和hbase-site.xml中的hbase.rootdir,其中hbase.rootdir的值以file://开头,如file:///Downloads/hbase_data,启动之后只有HMaster一个进程。
2、集群模式安装(在伪分布模式基础上进行搭建)
本集群安装实例结构为:主节点(hmaster)是hadoop0,从节点(region server)是hadoop1和hadoop2。
1) 修改主节点hadoop0上的几个hbase配置文件,修改明细如下:
▶ 修改hbase-env.sh的最后一行:export HBASE_MANAGES_ZK=false;
▶ 修改hbase-site.xml文件的hbase.zookeeper.quorum的值:hadoop0,hadoop1,hadoop2;
▶ 修改regionservers文件(存放的region server的hostname),内容修改为:hadoop1(\r\n换行)hadoop2;
2) 如果hbase.rootdir配置了NameNode HA的逻辑nameservice名称,那么需要将hadoop中的core-site.xml和hdfs-site.xml配置文件拷贝至hbase的conf目录下;
3) 复制hadoop0中的hbase文件夹到hadoop1、hadoop2中,在hadoop0上执行命令:scp /usr/local/hbase hadoop1:/usr/local/;
4) 复制hadoop0中的/etc/profile到hadoop1、hadoop2中,在hadoop0上执行命令:scp /etc/profile hadoop1:/etc/,然后分别在hadoop1和hadoop2上执行命令:source /etc/profile;
5) 启动,记住启动顺序:① 启动zookeeper集群;② 启动hadoop集群;③ 启动hbase集群;
6) 添加Backup Masters,选择节点(这里的节点必须不是hbase从节点,即regionservers中配置的节点,节点可以选择为非zookeeper节点)并执行命令:hbase-daemon.sh start master,可以在当前节点启动HMaster进程,多个HMaser节点依靠Zookeeper来协调,当active的HMaster挂掉后,其他Backup Master节点就会通过Zookeeper选择一个并激活为active状态;
注:可通过“http://hadoop0:16010/master-status”查看HBase集群状态,启动hbase后,会在HDFS根目录下新建hbase目录。这里不同版本查看的web默认端口可能不一样,早期版本端口号应该是60010,hbase-1.x版本之后默认改成了16010,默认配置见hbase-x.x.x-src/hbase-common/src/main/resources/hbase-default.xml中的hbase.master.info.port值,也可以在hbase-site.xml中配置自定义查看端口。
三、HBase的Shell命令操作进入hbase命令行终端,执行命令:hbase shell,退出命令:quit。 官方文档:http://abloz.com/hbase/book.html,一些常用HBase shell命令见下表:
1、创建表,关键字create
语法:create '表名称', '列族名称1','列族名称2','列族名称N'
范例:create 'users','user_id','address','info' --创建表user,该表有三个列族user_id,address,info
2、添加记录,关键字put
语法:put '表名称', '行名称', '列名称:', '值'
范例:put 'users','hans','info:age','28' --在表users中插入数据,行名称为hans,列族info下列age(动态新增)的值设置为28
3、查看记录,关键字get
语法:get '表名称', '行名称' [,'列名称']
范例:get 'user','hans','info:age' --查看表users中行名称为hans,列族info下列age的最新的值
4、查看所有记录,关键字scan
语法:scan '表名称'
scan '表名称',{COLUMNS=>'列族名称:列名称'} --查看表中某个列中的所有数据,COLUMNS必须大写
范例:scan 'users' --查看表users下所有的记录
5、删除记录,关键字delete
语法:delete '表名称' ,'行名称' , '列名称' --删除某行的某列记录
deleteall '表名称' ,'行名称' --删除整行记录
范例:delete 'user','hans','info:age' --删除表users中行名称为hans的列族info下列age的记录
6、删除表,关键字drop
语法:disable '表名称' --删除表之前必须先使其失效
drop '表名称' --删除表
7、查看表中的记录总数,关键字count
语法:count '表名称'
8、查看表结构信息,关键字describe
语法:describe '表名称'
注意:结果中列族的versions指定了最多版本数
9、其他操作
truncate '表名称' --清空表数据
exists '表名称' --是否存在某表
is_enabled '表名称' --某表是否可用
is_disabled '表名称' --某表是否已失效
help 或 help 'create' -帮助命令
四、使用Java API操作HBase1、获得特殊的Configuration,HBase操作的Configuration通过HBaseConfiguration.create()获取,并设置hbase.rootdir和hbase.zookeeper.quorum属性,获取方法如下:
//创建HBase的Configurationprivatestatic Configuration getConfiguration() { Configuration conf = HBaseConfiguration.create(); //客户端通过zookeeper去访问hbase conf.set("hbase.zookeeper.quorum", "hadoop0:2181,hadoop1:2181,hadoop2:2181"); return conf; }
2、创建、删除表使用HBaseAdmin,插入记录、查询记录、遍历记录使用HTable,详细操作实例如下所示:
package com.hicoor.hbase; import java.io.IOException; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; publicclass HBaseDemo { publicstaticfinal String TABLE_NAME = "table1"; publicstaticfinal String FAMILY_NAME = "family1"; publicstaticfinal String ROW_KEY = "rowkey1"; publicstaticvoid main(String[] args) throws Exception { Configuration conf = getConfiguration(); //创建、删除表使用HBaseAdmin HBaseAdmin hBaseAdmin = new HBaseAdmin(conf); //创建表 //createTable(hBaseAdmin,conf); //删除表 //deleteTable(hBaseAdmin); //插入记录、查询记录、遍历记录使用HTable HTable hTable = new HTable(conf,TABLE_NAME); //插入记录 //insertRecord(hTable); //查询数据 //searchRecord(hTable); //遍历数据 scanRecord(hTable); //关闭HTable //hTable.close(); } privatestaticvoid scanRecord(HTable hTable) throws IOException { Scan scan = new Scan(); ResultScanner scanner = hTable.getScanner(scan); //scanner包含所有行 result为一行(同一个行键)for (Result result : scanner) { List<Cell> cells = result.listCells(); //result.getRow()获取当前行键 System.out.println("行键:"+new String(result.getRow())); for(Cell cell:cells){ System.out.println("\t"+cell.toString()+"\t"+new String(CellUtil.cloneValue(cell))); } } } privatestaticvoid searchRecord(HTable hTable) throws IOException { Get get = new Get(ROW_KEY.getBytes()); //根据指定列查询 get.addColumn(FAMILY_NAME.getBytes(), "age".getBytes()); Result result = hTable.get(get); List<Cell> list = result.listCells(); for(Cell c:list){ System.out.println(c.toString()+"\t"+new String(CellUtil.cloneValue(c))); } } privatestaticvoid insertRecord(HTable hTable) throws IOException { //指定行键 Put put = new Put(ROW_KEY.getBytes()); //指定列族、列名、值 同hbase shell语句:put 'table1','rowkey1','family:age','25' put.addColumn(FAMILY_NAME.getBytes(), "age".getBytes(), "29".getBytes()); hTable.put(put); } privatestaticvoid deleteTable(HBaseAdmin hBaseAdmin) throws IOException { if(hBaseAdmin.tableExists(TABLE_NAME)){ //删除表之前先让表失效 hBaseAdmin.disableTable(TABLE_NAME); hBaseAdmin.deleteTable(TABLE_NAME); } } privatestaticvoid createTable(HBaseAdmin hBaseAdmin,Configuration conf) throws Exception { if(hBaseAdmin.tableExists(TABLE_NAME)){ System.out.println("table exists!"); } else { HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(TABLE_NAME)); //添加列族 HColumnDescriptor family = new HColumnDescriptor(FAMILY_NAME); tableDescriptor.addFamily(family); hBaseAdmin.createTable(tableDescriptor); System.out.println("table create success"); } } //创建HBase的Configurationprivatestatic Configuration getConfiguration() { Configuration conf = HBaseConfiguration.create(); //客户端通过zookeeper去访问hbase conf.set("hbase.zookeeper.quorum", "hadoop0:2181,hadoop1:2181,hadoop2:2181"); return conf; } }
3、通过MapReduce批量导入数据到HBase,下述例子表示将手机流量数据上(格式详见《Hadoop学习(4)-- MapReduce》中的“手机流量数据统计分析”示例)传到HBase数据库中,源码如下所示(注意自定义reduder继承TableReducer):
package hbase; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; publicclass BatchImportHBaseByMR { publicstaticvoid main(String[] args) throws Exception { final Configuration configuration = new Configuration(); //设置zookeeper configuration.set("hbase.zookeeper.quorum", "hadoop0"); //设置hbase表名称 configuration.set(TableOutputFormat.OUTPUT_TABLE, "wlan_log"); //将该值改大,防止hbase超时退出 configuration.set("dfs.socket.timeout", "180000"); final Job job = new Job(configuration, "HBaseBatchImport"); job.setMapperClass(BatchImportMapper.class); job.setReducerClass(BatchImportReducer.class); //设置map的输出,不设置reduce的输出类型 job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); //不再设置输出路径,而是设置输出格式类型 job.setOutputFormatClass(TableOutputFormat.class); FileInputFormat.setInputPaths(job, "hdfs://hadoop0:9000/input"); job.waitForCompletion(true); } staticclass BatchImportMapper extends Mapper<LongWritable, Text, LongWritable, Text>{ SimpleDateFormat dateformat1 = new SimpleDateFormat("yyyyMMddHHmmss"); Text v2 = new Text(); protectedvoid map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException { final String[] splited = value.toString().split("\t"); try { final Date date = new Date(Long.parseLong(splited[0].trim())); final String dateFormat = dateformat1.format(date); String rowKey = splited[1]+":"+dateFormat; v2.set(rowKey+"\t"+value.toString()); context.write(key, v2); } catch (NumberFormatException e) { final Counter counter = context.getCounter("BatchImport", "ErrorFormat"); counter.increment(1L); System.out.println("出错了"+splited[0]+" "+e.getMessage()); } }; } staticclass BatchImportReducer extends TableReducer<LongWritable, Text, NullWritable>{ protectedvoid reduce(LongWritable key, java.lang.Iterable<Text> values, Context context) throws java.io.IOException ,InterruptedException { for (Text text : values) { final String[] splited = text.toString().split("\t"); final Put put = new Put(Bytes.toBytes(splited[0])); put.add(Bytes.toBytes("cf"), Bytes.toBytes("date"), Bytes.toBytes(splited[1])); //省略其他字段,调用put.add(....)即可 context.write(NullWritable.get(), put); } }; } }
hbase java 端操作
import java.io.IOException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Mapper; public class HBaseApp {
/** * @param args */ private static Configuration getConfiguration() { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.rootdir", "hdfs://hadoop0:9000/hbase"); //使用eclipse时必须添加这个,否则无法定位 conf.set("hbase.zookeeper.quorum", "hadoop0"); return conf; } //创建一张表 public static void create(String tableName, String columnFamily) throws IOException{ HBaseAdmin admin = new HBaseAdmin(getConfiguration()); if (admin.tableExists(tableName)) { System.out.println("table exists!"); }else{ HTableDescriptor tableDesc = new HTableDescriptor(tableName); tableDesc.addFamily(new HColumnDescriptor(columnFamily)); admin.createTable(tableDesc); System.out.println("create table success!"); } } //添加一条记录 public static void put(String tableName, String row, String columnFamily, String column, String data) throws IOException{ HTable table = new HTable(getConfiguration(), tableName); Put p1 = new Put(Bytes.toBytes(row)); p1.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(data)); table.put(p1); System.out.println("put'"+row+"',"+columnFamily+":"+column+"','"+data+"'"); } //读取一条记录 public static void get(String tableName, String row) throws IOException{ HTable table = new HTable(getConfiguration(), tableName); Get get = new Get(Bytes.toBytes(row)); Result result = table.get(get); System.out.println("Get: "+result); }
//显示所有数据 public static void scan(String tableName) throws IOException{ HTable table = new HTable(getConfiguration(), tableName); Scan scan = new Scan(); ResultScanner scanner = table.getScanner(scan); for (Result result : scanner) { System.out.println("Scan: "+result); } } //删除表 public static void delete(String tableName) throws IOException{ HBaseAdmin admin = new HBaseAdmin(getConfiguration()); if(admin.tableExists(tableName)){ try { admin.disableTable(tableName); admin.deleteTable(tableName); } catch (IOException e) { e.printStackTrace(); System.out.println("Delete "+tableName+" 失败"); } } System.out.println("Delete "+tableName+" 成功"); } public static void main(String[] args) throws IOException { String tableName="hbase_tb_test"; String columnFamily="cf"; // HBaseApp.create(tableName, columnFamily); // HBaseApp.put(tableName, "row1", columnFamily, "cl2", "asd"); // HBaseApp.get(tableName, "row1"); // HBaseApp.scan(tableName); HBaseApp.delete(tableName); } }
java代码
参考链接
|
Hbase调用JavaAPI实现批量导入操作
http://787141854-qq-com.iteye.com/blog/2067818 http://blog.pureisle.net/archives/1869.html |