欢迎投稿

今日深度:

Storm+Kafka+Redis实现热门搜索,kafkaredis

Storm+Kafka+Redis实现热门搜索,kafkaredis


前面的章节已经分别讲到过Storm和Kafka,Redis的集成,由于项目中有个需求要保留统计客户的历史搜索,因为搜索的频率比较快,要让App端上客户端快速的看到自己的历史搜索,在选型上spark和Storm作为备选,Spark的吞吐量比较大但是相应的延迟比较高(spark比较适用于大数据量大统计并且对实时醒要求不是太高),因为要快速的响应客户端的调用决定用Storm并且Storm对Kafka和Redis的集成也非常好.

说下设计思路:Storm读取Kafka的数据,如果查询的数据在Redis中存在,则合并更新原有的数据;如果Redis中不存在那么就直接插入,因此Redis适合用Hash类型

如下为拓扑:

public class StormKafkaRedisTopo {

    private static final String REDIS_HOST = "localhost";
    private static final int REDIS_PORT = 6379;
    private static final String KAFKA_SPOUT = "kafka_spout";
    private static final String KAFKA_BLOT = "kafka_blot";
    private static final String REDIS_BLOT = "redis_blot";
    private static final String CONVERTREDIS_BLOT = "convertredis_blot";
    private static final String STORE_REIS = "store_reis";

    public static void main(String[] args) {
        BrokerHosts hosts = new ZkHosts("localhost:2181");
        SpoutConfig spoutConfigt = new SpoutConfig(hosts, "testPartion", "/zkkafkaspout", "kafkaspout");
        List<String> zkServers = new ArrayList<String>();
        zkServers.add("localhost");
        spoutConfigt.zkServers = zkServers;
        spoutConfigt.zkPort = 2181;

        Config config = new Config();
        Map<String, String> map = new HashMap<String, String>();
        map.put("metadata.broker.list", "localhost:9092");
        map.put("serializer.class", "kafka.serializer.StringEncoder");
        config.put("kafka.broker.properties", map);
        spoutConfigt.scheme = new SchemeAsMultiScheme(new MessageScheme());

        //设置Redis look接口
        JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
                .setHost(REDIS_HOST).setPort(REDIS_PORT).build();
        RedisLookupMapper lookupMapper = new RedisLookMapperBlot();
        RedisLookupBolt lookupBolt = new RedisLookupBolt(poolConfig, lookupMapper);

        KafkaMessageStroBlot kafkaMessageMapper = new KafkaMessageStroBlot();
        RedisStoreBolt redisStoreBolt = new RedisStoreBolt(poolConfig, kafkaMessageMapper);

        TopologyBuilder builder = new TopologyBuilder();
        //读取Kafka的数据
        builder.setSpout(KAFKA_SPOUT, new KafkaSpout(spoutConfigt));
        builder.setBolt(KAFKA_BLOT, new KafkaMessageBlot()).shuffleGrouping(KAFKA_SPOUT);
        //读取Redis数据
        builder.setBolt(REDIS_BLOT, lookupBolt).shuffleGrouping(KAFKA_BLOT);
        //数据转换并存入到Redis当中
        builder.setBolt(CONVERTREDIS_BLOT, new ConvertKafka2RedisBlot()).shuffleGrouping(REDIS_BLOT);
        builder.setBolt(STORE_REIS, redisStoreBolt).shuffleGrouping(CONVERTREDIS_BLOT);

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("KafkaRedis", config, builder.createTopology());
    }
}



www.htsjk.Com true http://www.htsjk.com/redis/36571.html NewsArticle Storm+Kafka+Redis实现热门搜索,kafkaredis 前面的章节已经分别讲到过Storm和Kafka,Redis的集成,由于项目中有个需求要保留统计客户的历史搜索,因为搜索的频率比较快,要让App端上客户端快...
相关文章
    暂无相关文章
评论暂时关闭