欢迎投稿

今日深度:

IDE往集群中HBase中写数据 (实时写入数据),idehbase

IDE往集群中HBase中写数据 (实时写入数据),idehbase


1、创建封装对象 (列簇项)

package com.aura.bean;

import java.io.Serializable;
//hbase 列簇项
public class CategoryClickCount implements Serializable {
    //点击的品类
    private String name;
    //点击的次数
    private long count;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public long getCount() {
        return count;
    }

    public void setCount(long count) {
        this.count = count;
    }

    public CategoryClickCount(String name, long count) {
        this.name = name;
        this.count = count;
    }
}

2、编写接口   插入与查询 

package com.aura.dao;



import com.aura.bean.CategoryClickCount;

import java.util.List;
//插入与查询  接口
public interface HBaseDao {
    //往hbase里面插入一条数据
    public void save(String tableName,String rowkey,String family,String q,long value);
    //根据rowkey(查询)返回数据
    
/**
 * 在Hbase查询中  输入tableName与rowkey  比按(输入)family或者时间戳 更快
 * 因为(输入)family或者时间戳会降低查询性能
 */
public List<CategoryClickCount> count(String tableName,String rowkey);}
3、编写实现类
package com.aura.dao.impl;

import com.aura.bean.CategoryClickCount;
import com.aura.dao.HBaseDao;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.util.Bytes;


import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
//实现类
public class HBaseImpl implements HBaseDao {
    HConnection htablePool = null; //HConnection 获取HBase连接  类似于JDBC
    public HBaseImpl(){//创建类自动加载
    //设置程序入口
        Configuration conf = HBaseConfiguration.create();
        //HBase自带的zookeeper
        conf.set("hbase.zookeeper.quorum","hadoop02:2181");//zookeeper监控中一个的hbase端口号
        //conf.set("zookeeper.znode.parent","/mybase"); 
    // HBase原数据存在zookeeper中的mybase目录下   (此时本集群没有配置这个参数,所以省略)
            
 try {
            htablePool = HConnectionManager.createConnection(conf);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     *根据表名获取表对象
     * tableName 表名
     * return 表对象
     */
    public HTableInterface getTable(String tableName){//根据表名获取表对象
        HTableInterface table = null;
        try {
            table = htablePool.getTable(tableName);

        } catch (IOException e) {
            e.printStackTrace();
        }
        return table;
    }

    /**
     * 往hbase里面插入一条数据
     * @param tableName 表名
     * @param rowkey 行键
     * @param family 列簇
     * @param q 品类
     * @param value  出现了多少次
     *  2018-12-12_电影 f q 15
     *        updateStateBykey 对内存要求要一点
     *         reduceBykey  对内存要求低一点
     * hbase: 只有一种数据类型,字节数组
     */
    @Override
    public void save(String tableName, String rowkey, String family, String q, long value) {
        HTableInterface table = getTable(tableName);//获取表对象
        try {
            //incrementColumnValue (英可乐蒙特可伦歪乐)相同的rowkey/family 的列进行value累加API
            table.incrementColumnValue(rowkey.getBytes(),family.getBytes(),q.getBytes(),value);
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            try {
                if(table != null){
                    table.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }



    /**
     * 根据rowkey返回数据
     * @param tableName  表名
     * @param rowkey rowkey
     * @return
     */
    @Override
    public List<CategoryClickCount> count(String tableName, String rowkey) {
        ArrayList<CategoryClickCount> list = new ArrayList<CategoryClickCount>();
        HTableInterface table = getTable(tableName);
        //PrefixFilter  前缀过滤器     HBase 支持左匹配
        PrefixFilter prefixFilter = new PrefixFilter(rowkey.getBytes());
        Scan scan = new Scan();//Scan 扫描仪
        scan.setFilter(prefixFilter);//过滤器安装扫描仪中
        try {
            ResultScanner scanner = table.getScanner(scan);//对象 用扫描仪 获取数据
            for(Result result:scanner){//遍历scanner  用于一条数据封装成一个对象
                for(Cell cell: result.rawCells()){//Cell 单元格(每一个对象看作一个单元格)
                    byte[] date_name = CellUtil.cloneRow(cell);//cloneRow克隆(复制)获取rowkey
                    String name = new String(date_name).split("_")[1];
                    byte[] value = CellUtil.cloneValue(cell);//cloneValue克隆(复制)获取value
                    long count = Bytes.toLong(value);
                    CategoryClickCount categoryClickCount = new CategoryClickCount(name, count);
                    list.add(categoryClickCount);
                }

            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            if(table != null){
                try {
                    table.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        return list;
    }
}

4、工厂类

package com.aura.dao.factory;

import com.aura.dao.HBaseDao;
import com.aura.dao.impl.HBaseImpl;
//工厂类
public class HBaseFactory {
    public static HBaseDao getHBaseDao(){
        return new HBaseImpl();
    }
}

5、测试类

package com.aura.Test;

import com.aura.bean.CategoryClickCount;
import com.aura.dao.HBaseDao;
import com.aura.dao.factory.HBaseFactory;

import java.util.List;
//IDE往集群中HBase中写数据

 /* 往hbase里面插入一条数据
 * tableName 表名      aura
 * rowkey 行键         2018-05-23_电影
 * family 列簇         f
 * q 品类               name
 *  value  出现了多少次    10L 20L...
 */
//测试类
public class Test {
    public static void main(String[] args) {

        HBaseDao hBaseDao = HBaseFactory.getHBaseDao();
        //往HBase中插入数据
//        hBaseDao.save("aura",
//                "2018-05-23_电影","f","name",10L);
//        hBaseDao.save("aura",
//                "2018-05-23_电影","f","name",20L);
//        hBaseDao.save("aura",
//                "2018-05-21_电视剧","f","name",11L);
//        hBaseDao.save("aura",
//                "2018-05-21_电视剧","f","name",24L);
//
//        hBaseDao.save("aura",
//                "2018-05-23_电视剧","f","name",110L);
//        hBaseDao.save("aura",
//                "2018-05-23_电视剧","f","name",210L);

        List<CategoryClickCount> list = hBaseDao.count("aura", "2018-05-23");
        for(CategoryClickCount cc:list){
            System.out.println(cc.getName() + "  "+ cc.getCount());
        }

    }

}
// 解析数据类
package com.aura.utils;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;

public class Utils {
    public static String getKey(String line){
        HashMap<String, String> map = new HashMap<String,String>();
        map.put("0","其他");
        map.put("1","电视剧");
        map.put("2","电影");
        map.put("3","综艺");
        map.put("4","动漫");
        map.put("5","纪录片");
        map.put("6","游戏");
        map.put("7","资讯");
        map.put("8","娱乐");
        map.put("9","财经");
        map.put("10","网络电影");
        map.put("11","片花");
        map.put("12","音乐");
        map.put("13","军事");
        map.put("14","教育");
        map.put("15","体育");
        map.put("16","儿童");
        map.put("17","旅游");
        map.put("18","时尚");
        map.put("19","生活");
        map.put("20","汽车");
        map.put("21","搞笑");
        map.put("22","广告");
        map.put("23","原创");
        //获取到品类ID
        String categoryid = line.split("&")[9].split("/")[4];
        //获取到品类的名称
        String name = map.get(categoryid);
        //获取用户访问数据的时间
        String stringTime = line.split("&")[8].split("=")[1];
        //获取日期
        String date = getDay(Long.valueOf(stringTime));

        return date+"_"+name;
    }

    public static String getDay(long time){
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
        return simpleDateFormat.format(new Date(time));
    }
}

// 实时插入数据类   使用SparkStreaming从kafka中读取数据
 
package com.aura.spark.category;
import com.aura.dao.HBaseDao;
import com.aura.dao.factory.HBaseFactory;
import com.aura.utils.Utils;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;

/**
 * SparkStreaming的数据来源来自于Kafka的topics的aura
 */
public class CategoryRealCount {
    public static void main(String[] args) {
        //初始化程序入口
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("CategoryRealCount");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(3));//Durations.seconds(3)批处理间隔为3秒
        /*或者使用下面方法就自动创建SparkContext()
        JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(3));*/

        //读取数据
        HashMap<String, String> KafkaParams = new HashMap<>();
        KafkaParams.put("metadata.broker.list", "hadoop02:9092");//
        HashSet<String> topics = new HashSet<>();//创建topics 主题
        topics.add("aura");//添加主题 aura
        //JavaDStream<String> logDStream = null
        KafkaUtils.createDirectStream(//此方法无返回值
                ssc,
                String.class,//指定key的返回类型
                String.class,//指定value的返回类型
                StringDecoder.class,//解码器
                StringDecoder.class,
                KafkaParams,
                topics
        ).map(new Function<Tuple2<String, String>, String>() {
            //Function<Tuple2<String, String>, String>: Function函数类 Tuple2<String, String>传入类型(出就是读取每一行,key-value的形式),String返回类型
            @Override
            public String call(Tuple2<String, String> tuple2) throws Exception {
                //kafka读出来数据是kv的形式[String代表k的数据类型(k可就是偏移位置的信息, String代表v的数据类型(kafka内每一条数据), StringDecoder代表的就是解码器, StringDecoder]
                //直接返回的是InputDStream[(String,String)]的KV数据类型,因为偏移位置的信息对我们是没有用的所以我们要.map(_._2)
                return tuple2._2;
            }
        }).mapToPair(new PairFunction<String, String, Long>() {//输入的是一个string的字符串,输出的是一个(String, Long) 的map
            public Tuple2<String,Long> call(String line) throws Exception{
                return new Tuple2<String, Long>(Utils.getKey(line),1L);
            }
        }).reduceByKey(new Function2<Long, Long, Long>() {
            @Override
            public Long call(Long x, Long y) throws Exception {
                return x + y;
            }
        }).foreachRDD(new VoidFunction<JavaPairRDD<String, Long>>() {//遍历每一个数据存入Hbase
            @Override
            public void call(JavaPairRDD<String, Long> rdd) throws Exception {
                rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String, Long>>>() {
                    @Override
                    public void call(Iterator<Tuple2<String, Long>> partition) throws Exception {
                        HBaseDao hBaseDao = HBaseFactory.getHBaseDao();
                        while(partition.hasNext()){
                            Tuple2<String, Long> tuple = partition.next();
                            hBaseDao.save("aura",tuple._1,"f","name",tuple._2);
                            System.out.println(tuple._1 + "   "+ tuple._2);
                        }
                    }
                });
            }
        });




        //代码的逻辑
        //logDStream.print();
        //启动应用程序
        ssc.start();
        try {
            ssc.awaitTermination();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        ssc.stop();

    }

}

6、运行结果:运行前要先打打开集群

            1)启动zookeeper :zkServer.sh start

            2 ) 启动HDFS :start-dfs.sh

            3 ) 启动kafka的HA集群

                注意:先启动zookeeper

            [hadoop@hadoop02 kafka_2.11-1.1.0]$ bin/kafka-server-start.sh config/server.properties

            [hadoop@hadoop03 kafka_2.11-1.1.0]$ bin/kafka-server-start.sh config/server.properties

            [hadoop@hadoop04 kafka_2.11-1.1.0]$ bin/kafka-server-start.sh config/server.properties

            [hadoop@hadoop05 kafka_2.11-1.1.0]$ bin/kafka-server-start.sh config/server.properties

            

            4 )模拟数据实时产生

            [hadoop@hadoop02 ~]$ cat data.txt | while read line  (从data.txt文件中按读取每一行)

                > do     (开始)
                > echo "$line" >> data.log  (读取每一行后 写入data.log)
                > sleep 0.5  (睡眠时间 0.5)

              > done  (结束)

            5 )启动flume:

                [hadoop@hadoop02 ~]$ flume-ng agent --conf conf --conf-file file2kafka.properties --name a1 -Dflume.hadoop.logger=INFO,console 

  

            6 ) 启动HBbase  : ./start-hbase.sh   (在hbase的安装目录bin下启动:./[hadoop@hadoop02 bin]$./start-hbase.sh )

            7 ) 往HBbase中写入数据  运行 CategoryRealCount 实时数据插入类

            7 ) 运行Test 测试类

            8 ) 操作hbase 数据命令:hbase shell (这里不用操作)

www.htsjk.Com true http://www.htsjk.com/hbase/27712.html NewsArticle IDE往集群中HBase中写数据 (实时写入数据),idehbase 1、创建封装对象 (列簇项) package com.aura.bean; import java.io.Serializable; //hbase 列簇项 public class CategoryClickCount implements Serializable { //点击的...
相关文章
    暂无相关文章
评论暂时关闭