flink和hbase整合,flinkhbase整合
有两种方式
第一种,批处理模式整合,即从hbase查询数据并转为DataSet格式
首先引入flink整合hbase的jar(版本号请根据实际调整)
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table_2.11</artifactId> <version>1.1.4</version> </dependency>2 在resource下添加hbase-site.xml文件
<?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>hbase.zookeeper.quorum</name> <value>host1:2181,host2:2181,host3:2181</value> </property> <property> <name>hbase.zookeeper.property.clientPort</name> <value>2181</value> </property> <property> <name>zookeeper.znode.parent</name> <value>/hbase</value> </property> </configuration>3 通过如下方式获取hbase查询结果
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<String, String>> hbaseInput = env.createInput(new TableInputFormat<Tuple2<String, String>>(){
@Override
protected Scan getScanner() {
Scan result = new Scan();
return result;
}
@Override
protected String getTableName() {
return "logTest";
}
@Override
protected Tuple2<String, String> mapResultToTuple(Result result) {
Tuple2<String,String> tup = new Tuple2<String,String>();
tup.setField(Bytes.toString(result.getRow()),0);
tup.setField(Bytes.toString(result.getValue("cf".getBytes(), "AMOUNT".getBytes())), 1);
return tup;
}
});第二种方式,基于datastream整合
经测试,上述方法如果在DataStream流中执行时env变量会因为序列化问题报错,如果把env定义在DataStream的map函数内部也会报错,所以事先思路是在输入流的
map函数中通过hbase的API函数手动查询数据,并对结果解析后转为DataStream流,其实目的就是为了用流触发hbase查询,后续可以通过状态管理把上一次查询结果保存起来,每次应用启动只查询hbase一次,数据增量处理用状态管理实现
DataStream<String> ds1 = stream.map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
int sum=0;
int filter=0;
Map<String,Object> result = Util.scanTable("logTest");
List<HashMap<String,Object>> list = (List<HashMap<String,Object>>)result.get("data");
Iterator<HashMap<String,Object>> it = list.iterator();
while(it.hasNext()){
sum++;
if(Float.parseFloat(it.next().get("AMOUNT").toString())<5000.00){
filter++;
}
}
float rate = (float)filter/sum;
if (rate >0.5 ){
DecimalFormat decimalFormat=new DecimalFormat(".00");//格式化
Map<String, String> mapRate = new HashMap<String, String>();
mapRate.put("CONCERN_TYPE", "3");
mapRate.put("CUST_LOYALTY", String.valueOf(rate));
mapRate.put("CONCERN_DESC", "flink 实现:当前交易金额低于5000元的用户占比" + decimalFormat.format(rate * 100) + "%已经高于50%,请尽快针对性展开营销活动");
Util.insertRow("custConcern", "custConcern"+"3", "cf", mapRate);
// sendKafkaMsg("busiTopic",riskline._1(),riskline._1()+riskline._2());
}
return String.valueOf(rate);
}
});
本站文章为和通数据库网友分享或者投稿,欢迎任何形式的转载,但请务必注明出处.
同时文章内容如有侵犯了您的权益,请联系QQ:970679559,我们会在尽快处理。