使用sparkstreaming消费Kafka的数据,实现word count
依赖
org.apache.spark spark-streaming-kafka-0-10_2.12 3.0.0 com.fasterxml.jackson.core jackson-core 2.10.1
实现wordcount代码
val conf = new SparkConf().setAppName("StreamWordCount").setMaster("local[*]")
val sc = new StreamingContext(conf, Seconds(3))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "hadoop01:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "group1",
"auto.offset.reset" -> "earliest"
)
val topics = Array("topic01")
val stream = KafkaUtils.createDirectStream[String, String](
sc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val value: DStream[String] = stream.map(record => record.value())
value.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.print()
sc.start()
sc.awaitTermination()
启动Kafka,创建Kafka producer
kafka-console-producer.sh --broker-list hadoop01:9092 --to topic01



