欢迎投稿

今日深度:

flink和hbase整合,flinkhbase整合

flink和hbase整合,flinkhbase整合


有两种方式

第一种,批处理模式整合,即从hbase查询数据并转为DataSet格式

首先引入flink整合hbase的jar(版本号请根据实际调整)

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table_2.11</artifactId>
    <version>1.1.4</version>
</dependency>
2 在resource下添加hbase-site.xml文件

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
        <name>hbase.zookeeper.quorum</name>
        <value>host1:2181,host2:2181,host3:2181</value>
    </property>

    <property>
        <name>hbase.zookeeper.property.clientPort</name>
        <value>2181</value>
    </property>
    <property>
        <name>zookeeper.znode.parent</name>
        <value>/hbase</value>
    </property>
</configuration>
3 通过如下方式获取hbase查询结果

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

 DataSet<Tuple2<String, String>> hbaseInput =  env.createInput(new TableInputFormat<Tuple2<String, String>>(){
            @Override
            protected Scan getScanner() {
                Scan result = new Scan();
                return result;
            }
            @Override
            protected String getTableName() {
                return "logTest";
            }
            @Override
            protected Tuple2<String, String> mapResultToTuple(Result result) {

                Tuple2<String,String> tup = new Tuple2<String,String>();
                tup.setField(Bytes.toString(result.getRow()),0);
                tup.setField(Bytes.toString(result.getValue("cf".getBytes(), "AMOUNT".getBytes())), 1);
                return tup;
            }
        });

第二种方式,基于datastream整合

经测试,上述方法如果在DataStream流中执行时env变量会因为序列化问题报错,如果把env定义在DataStream的map函数内部也会报错,所以事先思路是在输入流的

map函数中通过hbase的API函数手动查询数据,并对结果解析后转为DataStream流,其实目的就是为了用流触发hbase查询,后续可以通过状态管理把上一次查询结果保存起来,每次应用启动只查询hbase一次,数据增量处理用状态管理实现

 DataStream<String> ds1 = stream.map(new MapFunction<String, String>() {
            @Override
            public String map(String s) throws Exception {
                int sum=0;
                int filter=0;
                Map<String,Object> result =  Util.scanTable("logTest");
                List<HashMap<String,Object>> list = (List<HashMap<String,Object>>)result.get("data");
                Iterator<HashMap<String,Object>> it = list.iterator();
                while(it.hasNext()){
                    sum++;
                    if(Float.parseFloat(it.next().get("AMOUNT").toString())<5000.00){
                        filter++;
                    }
                }
                float rate = (float)filter/sum;
                if (rate >0.5 ){
                    DecimalFormat decimalFormat=new DecimalFormat(".00");//格式化
                    Map<String, String> mapRate = new HashMap<String, String>();
                    mapRate.put("CONCERN_TYPE", "3");
                    mapRate.put("CUST_LOYALTY", String.valueOf(rate));
                    mapRate.put("CONCERN_DESC", "flink 实现:当前交易金额低于5000元的用户占比" + decimalFormat.format(rate * 100) + "%已经高于50%,请尽快针对性展开营销活动");
                    Util.insertRow("custConcern", "custConcern"+"3", "cf", mapRate);
                    // sendKafkaMsg("busiTopic",riskline._1(),riskline._1()+riskline._2());
                }
                return String.valueOf(rate);
            }
        });




www.htsjk.Com true http://www.htsjk.com/hbase/25713.html NewsArticle flink和hbase整合,flinkhbase整合 有两种方式 第一种,批处理模式整合,即从hbase查询数据并转为DataSet格式 首先引入flink整合hbase的jar(版本号请根据实际调整) dependency groupId org.apache.flin...
相关文章
    暂无相关文章
评论暂时关闭