MapReduce 操作 HBase,mapreducehbase
1.HBase与MR关系
HBase和MapReduce,这两者并没有直接关系,隶属于不同的项目。这里讲到的MapReduce on HBase是指利用HBase表做为MR计算框架的数据输入源或者输出源源,使得能够利用MR的并行计算能力计算HBase的内部数据。
2.官方HBase-MapReduce
(1)查看HBase的MapReduce任务的执行
$ bin/hbase mapredcp
(2)执行环境变量的导入
$ export HBASE_HOME=/opt/module/hbase-1.3.1
$ export HADOOP_HOME=/opt/module//hadoop-2.7.2
$ export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`
(3)运行官方的MapReduce任务
-- 案例一:统计Student表中有多少行数据
$ /opt/module/hadoop-2.7.2/bin/yarn jar lib/hbase-server-1.3.1.jar rowcounter student
(4)案例二:使用MapReduce将本地数据导入到HBase
1)在本地创建一个tsv格式的文件:fruit.tsv
1001 Apple Red
1002 Pear Yellow
1003 Pineapple Yellow
2)创建HBase表
hbase(main):001:0> create 'fruit','info'
3)在HDFS中创建input_fruit文件夹并上传fruit.tsv文件
$ /opt/module/hadoop-2.7.2/bin/hdfs dfs -mkdir /input_fruit/
$ /opt/module/hadoop-2.7.2/bin/hdfs dfs -put fruit.tsv /input_fruit/
4)执行MapReduce到HBase的fruit表中
$ /opt/module/hadoop-2.7.2/bin/yarn jar lib/hbase-server-1.3.1.jar importtsv \
-Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:color fruit \
hdfs://hadoop102:9000/input_fruit
5)使用scan命令查看导入后的结果
3.自定义HBase-MapReduce1
目标:将fruit表中的一部分数据,通过MR迁入到fruit_mr表中。
分步实现:
(0) 新建项目后在pom.xml中添加依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.luomk</groupId>
<artifactId>HBaseMapReduce1</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>
</dependencies>
</project>
(1)构建HbaseMapper类,用于读取fruit表中的数据
package com.luomk;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
public class HbaseMapper extends TableMapper<ImmutableBytesWritable, Put> {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
//获取put对象
Put v = new Put(key.copyBytes());
for (Cell cell : value.rawCells()) {
if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
v.add(cell);
}
}
context.write(key, v);
}
}
(2)构建HbaseReducer类,用于将读取到的fruit表中的数据写入到fruit_mr表中
package com.luomk;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import java.io.IOException;
public class HbaseReducer extends TableReducer<ImmutableBytesWritable, Put, ImmutableBytesWritable> {
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
for (Put value : values) {
context.write(key,value);
}
}
}
(3)调用执行
package com.luomk;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import java.io.IOException;
public class HbaseDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1.获取Hbase的conf&封装job
Configuration configuration = new Configuration();
Configuration conf = HBaseConfiguration.create(configuration);
Job job = Job.getInstance(conf);
//2.设置主类
job.setJarByClass(HbaseDriver.class);
Scan scan = new Scan();
//3.设置Mapper类
TableMapReduceUtil.initTableMapperJob(Bytes.toBytes(args[0]), scan, HbaseMapper.class, ImmutableBytesWritable.class, Put.class, job);
//4.设置reducer个数
// job.setNumReduceTasks(1);
//5.设置Reducer
TableMapReduceUtil.initTableReducerJob(args[1], HbaseReducer.class, job);
//提交
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
(4)打包运行任务
$ ~/modules/hadoop-2.7.2/bin/yarn jar ~/softwares/jars/hbase-0.0.1-SNAPSHOT.jar com.luomk.HbaseDriver
提示:运行任务前,如果待数据导入的表不存在,则需要提前创建。
提示:maven打包命令:-P local clean package或-P dev clean package install(将第三方jar包一同打包,需要插件:maven-shade-plugin)
4.自定义HBase-MapReduce2
目标:实现将HDFS中的数据写入到HBase表中。
分步实现:
(0) 新建项目后在pom.xml中添加依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.luomk</groupId>
<artifactId>HBaseMapReducer2</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>
</dependencies>
</project>
(1)构建HbaseMapper于读取HDFS中的文件数据
package com.luomk;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* :实现将HDFS中的数据写入到HBase表中。
*/
public class HbaseMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
private ImmutableBytesWritable k = new ImmutableBytesWritable();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split("\t");
k.set(Bytes.toBytes(split[0]));
Put put = new Put(Bytes.toBytes(split[0]));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(split[1]));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(split[2]));
context.write(k, put);
}
}
(2)构建HbaseReducer类
package com.luomk;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import java.io.IOException;
public class HbaseReducer extends TableReducer<ImmutableBytesWritable, Put, ImmutableBytesWritable> {
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
for (Put value : values) {
context.write(key, value);
}
}
}
(3)调用执行
package com.luomk;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
public class HbaseDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
Configuration conf = HBaseConfiguration.create(configuration);
Job job = Job.getInstance(conf);
job.setJarByClass(HbaseDriver.class);
job.setMapperClass(HbaseMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
TableMapReduceUtil.initTableReducerJob(args[1], HbaseReducer.class, job);
FileInputFormat.setInputPaths(job, new Path(args[0]));
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
(4)打包运行
$ ~/modules/hadoop-2.7.2/bin/yarn jar ~/softwares/jars/hbase-0.0.1-SNAPSHOT.jar com.luomk.HbaseDriver
提示:运行任务前,如果待数据导入的表不存在,则需要提前创建之。
提示:maven打包命令:-P local clean package或-P dev clean package install(将第三方jar包一同打包,需要插件:maven-shade-plugin)
5.源码下载地址:https://github.com/luomingkui/hbase