IDE往集群中HBase中写数据 (实时写入数据),idehbase
1、创建封装对象 (列簇项)
package com.aura.bean; import java.io.Serializable; //hbase 列簇项 public class CategoryClickCount implements Serializable { //点击的品类 private String name; //点击的次数 private long count; public String getName() { return name; } public void setName(String name) { this.name = name; } public long getCount() { return count; } public void setCount(long count) { this.count = count; } public CategoryClickCount(String name, long count) { this.name = name; this.count = count; } }
2、编写接口 插入与查询
package com.aura.dao; import com.aura.bean.CategoryClickCount; import java.util.List; //插入与查询 接口 public interface HBaseDao { //往hbase里面插入一条数据 public void save(String tableName,String rowkey,String family,String q,long value); //根据rowkey(查询)返回数据
/** * 在Hbase查询中 输入tableName与rowkey 比按(输入)family或者时间戳 更快 * 因为(输入)family或者时间戳会降低查询性能 */public List<CategoryClickCount> count(String tableName,String rowkey);}
3、编写实现类
package com.aura.dao.impl; import com.aura.bean.CategoryClickCount; import com.aura.dao.HBaseDao; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; import java.util.ArrayList; import java.util.List; //实现类 public class HBaseImpl implements HBaseDao { HConnection htablePool = null; //HConnection 获取HBase连接 类似于JDBC public HBaseImpl(){//创建类自动加载
//设置程序入口 Configuration conf = HBaseConfiguration.create(); //HBase自带的zookeeper conf.set("hbase.zookeeper.quorum","hadoop02:2181");//zookeeper监控中一个的hbase端口号 //conf.set("zookeeper.znode.parent","/mybase");
// HBase原数据存在zookeeper中的mybase目录下 (此时本集群没有配置这个参数,所以省略)
try { htablePool = HConnectionManager.createConnection(conf); } catch (IOException e) { e.printStackTrace(); } } /** *根据表名获取表对象 * tableName 表名 * return 表对象 */ public HTableInterface getTable(String tableName){//根据表名获取表对象 HTableInterface table = null; try { table = htablePool.getTable(tableName); } catch (IOException e) { e.printStackTrace(); } return table; } /** * 往hbase里面插入一条数据 * @param tableName 表名 * @param rowkey 行键 * @param family 列簇 * @param q 品类 * @param value 出现了多少次 * 2018-12-12_电影 f q 15 * updateStateBykey 对内存要求要一点 * reduceBykey 对内存要求低一点 * hbase: 只有一种数据类型,字节数组 */ @Override public void save(String tableName, String rowkey, String family, String q, long value) { HTableInterface table = getTable(tableName);//获取表对象 try { //incrementColumnValue (英可乐蒙特可伦歪乐)相同的rowkey/family 的列进行value累加API table.incrementColumnValue(rowkey.getBytes(),family.getBytes(),q.getBytes(),value); } catch (Exception e) { e.printStackTrace(); }finally { try { if(table != null){ table.close(); } } catch (IOException e) { e.printStackTrace(); } } } /** * 根据rowkey返回数据 * @param tableName 表名 * @param rowkey rowkey * @return */ @Override public List<CategoryClickCount> count(String tableName, String rowkey) { ArrayList<CategoryClickCount> list = new ArrayList<CategoryClickCount>(); HTableInterface table = getTable(tableName); //PrefixFilter 前缀过滤器 HBase 支持左匹配 PrefixFilter prefixFilter = new PrefixFilter(rowkey.getBytes()); Scan scan = new Scan();//Scan 扫描仪 scan.setFilter(prefixFilter);//过滤器安装扫描仪中 try { ResultScanner scanner = table.getScanner(scan);//对象 用扫描仪 获取数据 for(Result result:scanner){//遍历scanner 用于一条数据封装成一个对象 for(Cell cell: result.rawCells()){//Cell 单元格(每一个对象看作一个单元格) byte[] date_name = CellUtil.cloneRow(cell);//cloneRow克隆(复制)获取rowkey String name = new String(date_name).split("_")[1]; byte[] value = CellUtil.cloneValue(cell);//cloneValue克隆(复制)获取value long count = Bytes.toLong(value); CategoryClickCount categoryClickCount = new CategoryClickCount(name, count); list.add(categoryClickCount); } } } catch (IOException e) { e.printStackTrace(); }finally { if(table != null){ try { table.close(); } catch (IOException e) { e.printStackTrace(); } } } return list; } }
4、工厂类
package com.aura.dao.factory; import com.aura.dao.HBaseDao; import com.aura.dao.impl.HBaseImpl; //工厂类 public class HBaseFactory { public static HBaseDao getHBaseDao(){ return new HBaseImpl(); } }
5、测试类
package com.aura.Test; import com.aura.bean.CategoryClickCount; import com.aura.dao.HBaseDao; import com.aura.dao.factory.HBaseFactory; import java.util.List; //IDE往集群中HBase中写数据 /* 往hbase里面插入一条数据 * tableName 表名 aura * rowkey 行键 2018-05-23_电影 * family 列簇 f * q 品类 name * value 出现了多少次 10L 20L... */ //测试类 public class Test { public static void main(String[] args) { HBaseDao hBaseDao = HBaseFactory.getHBaseDao(); //往HBase中插入数据 // hBaseDao.save("aura", // "2018-05-23_电影","f","name",10L); // hBaseDao.save("aura", // "2018-05-23_电影","f","name",20L); // hBaseDao.save("aura", // "2018-05-21_电视剧","f","name",11L); // hBaseDao.save("aura", // "2018-05-21_电视剧","f","name",24L); // // hBaseDao.save("aura", // "2018-05-23_电视剧","f","name",110L); // hBaseDao.save("aura", // "2018-05-23_电视剧","f","name",210L); List<CategoryClickCount> list = hBaseDao.count("aura", "2018-05-23"); for(CategoryClickCount cc:list){ System.out.println(cc.getName() + " "+ cc.getCount()); } } }
// 解析数据类package com.aura.utils; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; public class Utils { public static String getKey(String line){ HashMap<String, String> map = new HashMap<String,String>(); map.put("0","其他"); map.put("1","电视剧"); map.put("2","电影"); map.put("3","综艺"); map.put("4","动漫"); map.put("5","纪录片"); map.put("6","游戏"); map.put("7","资讯"); map.put("8","娱乐"); map.put("9","财经"); map.put("10","网络电影"); map.put("11","片花"); map.put("12","音乐"); map.put("13","军事"); map.put("14","教育"); map.put("15","体育"); map.put("16","儿童"); map.put("17","旅游"); map.put("18","时尚"); map.put("19","生活"); map.put("20","汽车"); map.put("21","搞笑"); map.put("22","广告"); map.put("23","原创"); //获取到品类ID String categoryid = line.split("&")[9].split("/")[4]; //获取到品类的名称 String name = map.get(categoryid); //获取用户访问数据的时间 String stringTime = line.split("&")[8].split("=")[1]; //获取日期 String date = getDay(Long.valueOf(stringTime)); return date+"_"+name; } public static String getDay(long time){ SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd"); return simpleDateFormat.format(new Date(time)); } }
// 实时插入数据类 使用SparkStreaming从kafka中读取数据
package com.aura.spark.category;import com.aura.dao.HBaseDao; import com.aura.dao.factory.HBaseFactory; import com.aura.utils.Utils; import kafka.serializer.StringDecoder; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; import scala.Tuple2; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; /** * SparkStreaming的数据来源来自于Kafka的topics的aura */ public class CategoryRealCount { public static void main(String[] args) { //初始化程序入口 SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("CategoryRealCount"); JavaSparkContext sc = new JavaSparkContext(conf); JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(3));//Durations.seconds(3)批处理间隔为3秒 /*或者使用下面方法就自动创建SparkContext() JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(3));*/ //读取数据 HashMap<String, String> KafkaParams = new HashMap<>(); KafkaParams.put("metadata.broker.list", "hadoop02:9092");// HashSet<String> topics = new HashSet<>();//创建topics 主题 topics.add("aura");//添加主题 aura //JavaDStream<String> logDStream = null KafkaUtils.createDirectStream(//此方法无返回值 ssc, String.class,//指定key的返回类型 String.class,//指定value的返回类型 StringDecoder.class,//解码器 StringDecoder.class, KafkaParams, topics ).map(new Function<Tuple2<String, String>, String>() { //Function<Tuple2<String, String>, String>: Function函数类 Tuple2<String, String>传入类型(出就是读取每一行,key-value的形式),String返回类型 @Override public String call(Tuple2<String, String> tuple2) throws Exception { //kafka读出来数据是kv的形式[String代表k的数据类型(k可就是偏移位置的信息, String代表v的数据类型(kafka内每一条数据), StringDecoder代表的就是解码器, StringDecoder] //直接返回的是InputDStream[(String,String)]的KV数据类型,因为偏移位置的信息对我们是没有用的所以我们要.map(_._2) return tuple2._2; } }).mapToPair(new PairFunction<String, String, Long>() {//输入的是一个string的字符串,输出的是一个(String, Long) 的map public Tuple2<String,Long> call(String line) throws Exception{ return new Tuple2<String, Long>(Utils.getKey(line),1L); } }).reduceByKey(new Function2<Long, Long, Long>() { @Override public Long call(Long x, Long y) throws Exception { return x + y; } }).foreachRDD(new VoidFunction<JavaPairRDD<String, Long>>() {//遍历每一个数据存入Hbase @Override public void call(JavaPairRDD<String, Long> rdd) throws Exception { rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String, Long>>>() { @Override public void call(Iterator<Tuple2<String, Long>> partition) throws Exception { HBaseDao hBaseDao = HBaseFactory.getHBaseDao(); while(partition.hasNext()){ Tuple2<String, Long> tuple = partition.next(); hBaseDao.save("aura",tuple._1,"f","name",tuple._2); System.out.println(tuple._1 + " "+ tuple._2); } } }); } }); //代码的逻辑 //logDStream.print(); //启动应用程序 ssc.start(); try { ssc.awaitTermination(); } catch (InterruptedException e) { e.printStackTrace(); } ssc.stop(); } }
6、运行结果:运行前要先打打开集群
1)启动zookeeper :zkServer.sh start
2 ) 启动HDFS :start-dfs.sh
3 ) 启动kafka的HA集群
注意:先启动zookeeper
[hadoop@hadoop02 kafka_2.11-1.1.0]$ bin/kafka-server-start.sh config/server.properties
[hadoop@hadoop03 kafka_2.11-1.1.0]$ bin/kafka-server-start.sh config/server.properties
[hadoop@hadoop04 kafka_2.11-1.1.0]$ bin/kafka-server-start.sh config/server.properties
[hadoop@hadoop05 kafka_2.11-1.1.0]$ bin/kafka-server-start.sh config/server.properties
4 )模拟数据实时产生
[hadoop@hadoop02 ~]$ cat data.txt | while read line (从data.txt文件中按读取每一行)
> do (开始)> echo "$line" >> data.log (读取每一行后 写入data.log)
> sleep 0.5 (睡眠时间 0.5)
> done (结束)
5 )启动flume:[hadoop@hadoop02 ~]$ flume-ng agent --conf conf --conf-file file2kafka.properties --name a1 -Dflume.hadoop.logger=INFO,console
6 ) 启动HBbase : ./start-hbase.sh (在hbase的安装目录bin下启动:./[hadoop@hadoop02 bin]$./start-hbase.sh )
7 ) 往HBbase中写入数据 运行 CategoryRealCount 实时数据插入类
7 ) 运行Test 测试类
8 ) 操作hbase 数据命令:hbase shell (这里不用操作)