欢迎投稿

今日深度:

MapReduce 操作 HBase,mapreducehbase

MapReduce 操作 HBase,mapreducehbase


1.HBase与MR关系

  HBase和MapReduce,这两者并没有直接关系,隶属于不同的项目。这里讲到的MapReduce on HBase是指利用HBase表做为MR计算框架的数据输入源或者输出源源,使得能够利用MR的并行计算能力计算HBase的内部数据。

 

2.官方HBase-MapReduce

(1)查看HBase的MapReduce任务的执行

        $ bin/hbase mapredcp

(2)执行环境变量的导入

        $ export HBASE_HOME=/opt/module/hbase-1.3.1

        $ export HADOOP_HOME=/opt/module//hadoop-2.7.2

        $ export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`

(3)运行官方的MapReduce任务

        -- 案例一:统计Student表中有多少行数据

        $ /opt/module/hadoop-2.7.2/bin/yarn jar lib/hbase-server-1.3.1.jar rowcounter student

(4)案例二:使用MapReduce将本地数据导入到HBase

1)在本地创建一个tsv格式的文件:fruit.tsv

    1001    Apple    Red

    1002    Pear        Yellow

    1003    Pineapple    Yellow

2)创建HBase表

    hbase(main):001:0> create 'fruit','info'

3)在HDFS中创建input_fruit文件夹并上传fruit.tsv文件

    $ /opt/module/hadoop-2.7.2/bin/hdfs dfs -mkdir /input_fruit/

    $ /opt/module/hadoop-2.7.2/bin/hdfs dfs -put fruit.tsv /input_fruit/

4)执行MapReduce到HBase的fruit表中

    $ /opt/module/hadoop-2.7.2/bin/yarn jar lib/hbase-server-1.3.1.jar importtsv \

    -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:color fruit \

    hdfs://hadoop102:9000/input_fruit

5)使用scan命令查看导入后的结果

 

 

3.自定义HBase-MapReduce1

    目标:将fruit表中的一部分数据,通过MR迁入到fruit_mr表中。

分步实现:

(0) 新建项目后在pom.xml中添加依赖:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"

         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>



    <groupId>com.luomk</groupId>

    <artifactId>HBaseMapReduce1</artifactId>

    <version>1.0-SNAPSHOT</version>



    <dependencies>

        <dependency>

            <groupId>org.apache.hbase</groupId>

            <artifactId>hbase-server</artifactId>

            <version>1.3.1</version>

        </dependency>



        <dependency>

            <groupId>org.apache.hbase</groupId>

            <artifactId>hbase-client</artifactId>

            <version>1.3.1</version>

        </dependency>

    </dependencies>



</project>

(1)构建HbaseMapper类,用于读取fruit表中的数据

package com.luomk;



import org.apache.hadoop.hbase.Cell;

import org.apache.hadoop.hbase.CellUtil;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableMapper;

import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;



public class HbaseMapper extends TableMapper<ImmutableBytesWritable, Put> {

    @Override

    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {

        //获取put对象

        Put v = new Put(key.copyBytes());



        for (Cell cell : value.rawCells()) {

            if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {

                v.add(cell);

            }

        }

        context.write(key, v);

    }

}

(2)构建HbaseReducer类,用于将读取到的fruit表中的数据写入到fruit_mr表中

package com.luomk;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableReducer;

import java.io.IOException;

public class HbaseReducer extends TableReducer<ImmutableBytesWritable, Put, ImmutableBytesWritable> {

    @Override

    protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {

        for (Put value : values) {

            context.write(key,value);

        }



    }

}

(3)调用执行

package com.luomk;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.client.Scan;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.hadoop.mapreduce.Job;

import java.io.IOException;



public class HbaseDriver {



    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {



        //1.获取Hbase的conf&封装job

        Configuration configuration = new Configuration();

        Configuration conf = HBaseConfiguration.create(configuration);



        Job job = Job.getInstance(conf);



        //2.设置主类

        job.setJarByClass(HbaseDriver.class);



        Scan scan = new Scan();



        //3.设置Mapper类

        TableMapReduceUtil.initTableMapperJob(Bytes.toBytes(args[0]), scan, HbaseMapper.class, ImmutableBytesWritable.class, Put.class, job);



        //4.设置reducer个数

        // job.setNumReduceTasks(1);

       

        //5.设置Reducer

        TableMapReduceUtil.initTableReducerJob(args[1], HbaseReducer.class, job);



        //提交

        boolean result = job.waitForCompletion(true);

        System.exit(result ? 0 : 1);

    }

}

(4)打包运行任务

$ ~/modules/hadoop-2.7.2/bin/yarn jar  ~/softwares/jars/hbase-0.0.1-SNAPSHOT.jar com.luomk.HbaseDriver

提示:运行任务前,如果待数据导入的表不存在,则需要提前创建。

提示:maven打包命令:-P local clean package或-P dev clean package install(将第三方jar包一同打包,需要插件:maven-shade-plugin)

 

4.自定义HBase-MapReduce2

    目标:实现将HDFS中的数据写入到HBase表中。

    分步实现:

(0) 新建项目后在pom.xml中添加依赖:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"

         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>



    <groupId>com.luomk</groupId>

    <artifactId>HBaseMapReducer2</artifactId>

    <version>1.0-SNAPSHOT</version>



    <dependencies>

        <dependency>

            <groupId>org.apache.hbase</groupId>

            <artifactId>hbase-server</artifactId>

            <version>1.3.1</version>

        </dependency>



        <dependency>

            <groupId>org.apache.hbase</groupId>

            <artifactId>hbase-client</artifactId>

            <version>1.3.1</version>

        </dependency>

    </dependencies>



</project>

(1)构建HbaseMapper于读取HDFS中的文件数据

package com.luomk;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;



/**

* :实现将HDFS中的数据写入到HBase表中。

*/

public class HbaseMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {



    private ImmutableBytesWritable k = new ImmutableBytesWritable();



    @Override

    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        String line = value.toString();

        String[] split = line.split("\t");

        k.set(Bytes.toBytes(split[0]));

        Put put = new Put(Bytes.toBytes(split[0]));

        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(split[1]));

        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(split[2]));

        context.write(k, put);

    }

}

(2)构建HbaseReducer类

package com.luomk;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableReducer;

import java.io.IOException;



public class HbaseReducer extends TableReducer<ImmutableBytesWritable, Put, ImmutableBytesWritable> {

    @Override

    protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {

        for (Put value : values) {

            context.write(key, value);

        }

    }

}

(3)调用执行

package com.luomk;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;



public class HbaseDriver {



    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {



        Configuration configuration = new Configuration();

        Configuration conf = HBaseConfiguration.create(configuration);



        Job job = Job.getInstance(conf);



        job.setJarByClass(HbaseDriver.class);



        job.setMapperClass(HbaseMapper.class);

        job.setMapOutputKeyClass(ImmutableBytesWritable.class);

        job.setMapOutputValueClass(Put.class);



        TableMapReduceUtil.initTableReducerJob(args[1], HbaseReducer.class, job);



        FileInputFormat.setInputPaths(job, new Path(args[0]));



        boolean result = job.waitForCompletion(true);

        System.exit(result ? 0 : 1);

    }

}

(4)打包运行

$ ~/modules/hadoop-2.7.2/bin/yarn jar ~/softwares/jars/hbase-0.0.1-SNAPSHOT.jar com.luomk.HbaseDriver

提示:运行任务前,如果待数据导入的表不存在,则需要提前创建之。

提示:maven打包命令:-P local clean package或-P dev clean package install(将第三方jar包一同打包,需要插件:maven-shade-plugin)

 

5.源码下载地址:https://github.com/luomingkui/hbase

 

www.htsjk.Com true http://www.htsjk.com/hbase/31684.html NewsArticle MapReduce 操作 HBase,mapreducehbase 1.HBase与MR关系 HBase和MapReduce,这两者并没有直接关系,隶属于不同的项目。这里讲到的MapReduce on HBase是指利用HBase表做为MR计算框架的数据输入源或者输出...
相关文章
    暂无相关文章
评论暂时关闭