代码实现:org.apache.spark spark-streaming-kafka-0-10_2.11 2.4.5
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("SparkStreamDemo1")
//定义流处理文件,采集周期为3秒
val streamingContext = new StreamingContext(conf, Seconds(3))
//数据源
val socketLineStream: ReceiverInputDStream[String] = streamingContext.socketTextStream("192.168.91.180",7777)
//处理每三秒钟采集到的数据
val wordStream: DStream[String] = socketLineStream.flatMap(line => line.split("\s+"))
val wordCountStream: DStream[(String, Int)] = wordStream.map(x => (x, 1)).reduceByKey(_ + _)
wordCountStream.print()
//启动采集器
streamingContext.start()
streamingContext.awaitTermination()
}
运行结果:
启动nc监听端口7777 nc -lk 7777



