栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark Streaming 常见的输入数据源(以WordCount计算为例)

Spark Streaming 常见的输入数据源(以WordCount计算为例)

  SparkStreaming中的数据抽象叫做DStream。DStream是抽象类,它把连续的数据流拆成很多的小RDD数据块, 这叫做“微批次”, spark的流式处理, 都是“微批次处理”。 DStream内部实现上有批次处理时间间隔,滑动窗口等机制来保证每个微批次的时间间隔里, 数据流以RDD的形式发送给spark做进一步处理。因此, 在一个为批次的处理时间间隔里, DStream只产生一个RDD。

1、Socket数据源

Scala版本——WordCount计算

package com.sparkscala.streaming

import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}


object StreamingWordCountScala {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.WARN)

    //一、初始化程序入口
    
    val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(3))

    //二、获取数据流,就是数据源
    //从 socket 中获取数据
    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.244.130", 1234)

    //三、数据处理
    //val result: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
    val words: DStream[String] = lines.flatMap(_.split(" "))
    val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
    val wordResult: DStream[(String, Int)] = wordAndOne.reduceByKey(_ + _)

    //四、数据输出查看
    wordResult.print()

    //五、启动任务
    ssc.start()  //启动
    ssc.awaitTermination()  //线程等待,等待处理下一批次任务
    ssc.stop()  //关闭
  }
}

Java8 版本——WordCount计算

package com.sparkjava.streaming;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

import java.util.Arrays;


public class StreamingWordCountJava {
    public static void main(String[] args) throws InterruptedException {
        Logger.getLogger("org").setLevel(Level.WARN);

        //一、初始化程序入口
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("StreamingWordCountJava");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(3));

        //二、获取数据流,就是数据源
        JavaReceiverInputDStream lines = jssc.socketTextStream("192.168.244.130", 1234);

        //三、数据处理
        JavaDStream words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
        JavaPairDStream pair = words.mapToPair(word -> new Tuple2<>(word, 1));
        JavaPairDStream wordCounts = pair.reduceByKey((i, j) -> i + j);

        //四、数据输出
        wordCounts.print();

        //五、启动程序
        jssc.start();
        jssc.awaitTermination();
        jssc.stop();
    }
}

2、HDFS数据源

WordCount计算

package com.sparkscala.streaming

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}


object WordCountFromHDFS {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.WARN)

    //一、初始化程序入口
    val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(3))

    //二、获取数据流,其实就是数据源,这个是目录
    //统计的是HDFS上的文件夹内在 Seconds(3) 时间段之内新增的数据,不是所有的
    val lines: DStream[String] = ssc.textFileStream("hdfs://hadoop0/worddir")

    //三、数据处理
    val words: DStream[String] = lines.flatMap(_.split(" "))
    val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
    val wordResult: DStream[(String, Int)] = wordAndOne.reduceByKey(_ + _)

    //四、数据输出
    wordResult.print()

    //五、启动任务
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }
}
3、Kafka数据源

后面研究研究

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/742971.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号