1、封装MyKafkaUtil使用SparkStreaming消费kafka中的实时数据,是流式数据处理的常用场景。
下面利用对接kafka,实现词频统计。
由于对scala语言不是太熟悉,我这里使用Java语言编写工具类
package cn.streaming.kafkaStream;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.dstream.InputDStream;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class MyKafkaUtil {
public static InputDStream>
getKafkaStream(StreamingContext ssc, List topics) {
Map params = new HashMap();
// 填写kafka相关参数
params.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop001:9092,hadoop002:9092");
params.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
params.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
params.put(ConsumerConfig.GROUP_ID_CONFIG,"kafka_source");
// 使用kafkaUtils获取kafka流
InputDStream> kafkaStream = KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topics, params)
);
return kafkaStream;
}
}
2、主程序实现以上代码主要是获取kafka的输入流,简化主程序
使用scala编写
package cn.streaming.kafkaStream
import java.util
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkStreaming_kafka {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("kafka_source")
val ssc = new StreamingContext(conf, Seconds(3))
val topics: util.List[String] = util.Arrays.asList("kafka_source_stream")
val kfkastream: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getKafkaStream(ssc, topics)
// 获取kafka的record的value值
val stres : DStream[String] = kfkastream.map(_.value())
// 进行词频统计
val rec: DStream[(String, Int)] = stres.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
rec.print()
// 启动
ssc.start()
ssc.awaitTermination()
}
}
3、启动控制台生产者
启动之前,我们需要在kafka中创建对应的主题,然后启动生产者。
启动指令:bin/kafka-console-producer.sh --broker-list hadoop001:9092,hadoop002:9092 --topic kafka_source_stream
4、启动流处理程序5、观察结果启动程序,这里直接在本地环境运行。
可以看出,这里我们已经统计完成。



