spark读取kafka后写入redis,kafkaredis
package com.prince.demo.test
import com.typesafe.config.ConfigFactory
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.sql.SparkSession
import redis.clients.jedis.Jedis
/**
* Created by prince on 2017/9/13.
*/
object SparkStreamingWriteRedis {
Logger.getLogger("org").setLevel(Level.WARN)
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.appName("SparkStreamingWriteRedis").master("local[*]").getOrCreate()
val sparkContext = spark.sparkContext
val ssc = new StreamingContext(sparkContext, Seconds(1))
implicit val conf = ConfigFactory.load
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> conf.getString("kafka.brokers"),
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> conf.getString("kafka.group"),
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean))
val topic = conf.getString("kafka.topics")
val topics = Array(topic)
val stream = KafkaUtils
.createDirectStream(ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))
val input = stream.flatMap(line => {
Some(line.value.toString)
})
input.foreachRDD(rdd => {
rdd.foreachPartition(part => {
val jedis = new Jedis("192.168.1.97", 6379, 3000)
jedis.auth("123456")
part.foreach(x => {
jedis.lpush("test_key", x)
jedis.close()
})
})
})
ssc.start()
ssc.awaitTermination()
}
}
本站文章为和通数据库网友分享或者投稿,欢迎任何形式的转载,但请务必注明出处.
同时文章内容如有侵犯了您的权益,请联系QQ:970679559,我们会在尽快处理。