SparkStreaming中的数据抽象叫做DStream。DStream是抽象类,它把连续的数据流拆成很多的小RDD数据块, 这叫做“微批次”, spark的流式处理, 都是“微批次处理”。 DStream内部实现上有批次处理时间间隔,滑动窗口等机制来保证每个微批次的时间间隔里, 数据流以RDD的形式发送给spark做进一步处理。因此, 在一个为批次的处理时间间隔里, DStream只产生一个RDD。
1、Socket数据源Scala版本——WordCount计算
package com.sparkscala.streaming
import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StreamingWordCountScala {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.WARN)
//一、初始化程序入口
val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)
val ssc: StreamingContext = new StreamingContext(conf, Seconds(3))
//二、获取数据流,就是数据源
//从 socket 中获取数据
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.244.130", 1234)
//三、数据处理
//val result: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
val words: DStream[String] = lines.flatMap(_.split(" "))
val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
val wordResult: DStream[(String, Int)] = wordAndOne.reduceByKey(_ + _)
//四、数据输出查看
wordResult.print()
//五、启动任务
ssc.start() //启动
ssc.awaitTermination() //线程等待,等待处理下一批次任务
ssc.stop() //关闭
}
}
Java8 版本——WordCount计算
package com.sparkjava.streaming;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.util.Arrays;
public class StreamingWordCountJava {
public static void main(String[] args) throws InterruptedException {
Logger.getLogger("org").setLevel(Level.WARN);
//一、初始化程序入口
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("StreamingWordCountJava");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(3));
//二、获取数据流,就是数据源
JavaReceiverInputDStream lines = jssc.socketTextStream("192.168.244.130", 1234);
//三、数据处理
JavaDStream words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
JavaPairDStream pair = words.mapToPair(word -> new Tuple2<>(word, 1));
JavaPairDStream wordCounts = pair.reduceByKey((i, j) -> i + j);
//四、数据输出
wordCounts.print();
//五、启动程序
jssc.start();
jssc.awaitTermination();
jssc.stop();
}
}
2、HDFS数据源
WordCount计算
package com.sparkscala.streaming
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
object WordCountFromHDFS {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.WARN)
//一、初始化程序入口
val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)
val ssc: StreamingContext = new StreamingContext(conf, Seconds(3))
//二、获取数据流,其实就是数据源,这个是目录
//统计的是HDFS上的文件夹内在 Seconds(3) 时间段之内新增的数据,不是所有的
val lines: DStream[String] = ssc.textFileStream("hdfs://hadoop0/worddir")
//三、数据处理
val words: DStream[String] = lines.flatMap(_.split(" "))
val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
val wordResult: DStream[(String, Int)] = wordAndOne.reduceByKey(_ + _)
//四、数据输出
wordResult.print()
//五、启动任务
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}
3、Kafka数据源
后面研究研究



