欢迎投稿

今日深度:

从kafka到flink到hbase的心酸路程示例(希望有用),kafkahbase

从kafka到flink到hbase的心酸路程示例(希望有用),kafkahbase


如果好的大家就给个赞啊,回粉下,给点鼓励。一定是效率文章

1.首先创建maven工程。

2.依赖文件如下

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-core</artifactId>
        <version>1.3.2</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.10_2.11 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
        <version>1.3.2</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11 -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>0.10.2.0</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java_2.11 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>1.3.2</version>
    </dependency>


    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-hbase_2.11</artifactId>
        <version>1.3.2</version>
    </dependency>

</dependencies>

3.导入依赖接下来是代码编写

注意几点:代码里面需要配置zk的地址,以及hbase的zk。详细就不多说了直接贴代码


/**
 *
 * @author ${xiniu}
 * @version $Id: flink_hbase.java,
 */
public class flink_hbase {


    private static String hbaseZookeeperQuorum = "10.25.135.53,10.45.149.164,10.45.151.125";
    private static String hbaseZookeeperClinentPort = "2181";
    private static TableName tableName = TableName.valueOf("testflink");
    private static final String columnFamily = "cf1";



    public static void main(String[] args) {


        final String ZOOKEEPER_HOST = "10.25.135.53:2181,10.45.149.164:2181,10.45.151.125:2181";
        final String KAFKA_HOST = "10.45.151.125:9092,10.45.150.142:9092";
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(1000); // 非常关键,一定要设置启动检查点!!
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        Properties props = new Properties();
        props.setProperty("zookeeper.connect", ZOOKEEPER_HOST);
        props.setProperty("bootstrap.servers", KAFKA_HOST);
        props.setProperty("group.id", "test-consumer-group");

        DataStream<String> transction = env.addSource(new FlinkKafkaConsumer010<String>("test2", new SimpleStringSchema(), props));
        //DataStream<String> transction1 = env.addSource(new FlinkKafkaConsumer010<String>("test3",new SimpleStringSchema(), props));

        transction.rebalance().map(new MapFunction<String, Object>() {
           public String map(String value)throws IOException{


               writeIntoHBase(value);
               return value;
           }

        }).print();
        //transction.writeAsText("/home/admin/log2");
        // transction.addSink(new HBaseOutputFormat();
        try {
            env.execute();
        } catch (Exception ex) {

            Logger.getLogger(flink_hbase.class.getName()).log(Level.SEVERE, null, ex);
            ex.printStackTrace();
        }
    }

    public static void writeIntoHBase(String m)throws IOException
    {
        org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create();

        config.set("hbase.zookeeper.quorum", hbaseZookeeperQuorum);
        config.set("hbase.master", "10.45.151.26:60000");
        config.set("hbase.zookeeper.property.clientPort", hbaseZookeeperClinentPort);
        config.setInt("hbase.rpc.timeout", 20000);
        config.setInt("hbase.client.operation.timeout", 30000);
        config.setInt("hbase.client.scanner.timeout.period", 200000);

        //config.set(TableOutputFormat.OUTPUT_TABLE, hbasetable);

        Connection c = ConnectionFactory.createConnection(config);

        Admin admin = c.getAdmin();
        if(!admin.tableExists(tableName)){
            admin.createTable(new HTableDescriptor(tableName).addFamily(new HColumnDescriptor(columnFamily)));
        }
        Table t = c.getTable(tableName);

        TimeStamp ts = new TimeStamp(new Date());

        Date date = ts.getDate();

        Put put = new Put(org.apache.hadoop.hbase.util.Bytes.toBytes(date.toString()));

        put.addColumn(org.apache.hadoop.hbase.util.Bytes.toBytes(columnFamily), org.apache.hadoop.hbase.util.Bytes.toBytes("test"),
                org.apache.hadoop.hbase.util.Bytes.toBytes(m));
        t.put(put);

        t.close();
        c.close();
    }
}

 当然当你上传到服务器的时候有好几种错误,贴出来


这种很明显的是因为slot不够用,那么你首先你去查看自己的从节点是不是有任务占用了,结束任务就好。

问题2


这个错误是因为你本地hosts文件中没有配置你的hbase主机,本地服务器或者机器不能找到,无法连接你的hbase。那你需要去本地hosts文件中

配置你的hbase服务器的 ip+主机名。

 当然这种还有可能是你的机器超时时常设置的太短了,那么可以一下代码进行设置:

	config.setInt("hbase.rpc.timeout", 20000);
        config.setInt("hbase.client.operation.timeout", 30000);
        config.setInt("hbase.client.scanner.timeout.period", 200000);

以上问题是常遇见,新手比较刺手的问题。

当然还有其他的一些小问题,提供几个解决思路,

1.首先有可能是因为你打包的时候依赖没有打进来

2.你机器磁盘占用了空间你的jar不能解压到机器上,带式无法识别一些依赖。

3.另外遇见哪些找不见主类的什么的,基本是打包或者那个细节没注意。


   上面的例子是可以跑通的,如果新手可以去直接去跑。另外遇见的问题应该上面都有,欢迎交流。有新问题或者建议可以留言!


 有用就赞,有问题就提出来,欢迎关注常来交流              

--犀牛。多多指导

www.htsjk.Com true http://www.htsjk.com/hbase/35143.html NewsArticle 从kafka到flink到hbase的心酸路程示例(希望有用),kafkahbase 如果好的大家就给个赞啊,回粉下,给点鼓励。一定是效率文章 1.首先创建maven工程。 2.依赖文件如下 dependencies dependency grou...
相关文章
    暂无相关文章
评论暂时关闭