栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 数据挖掘与分析

SparkStreaming-解析

SparkStreaming-解析

1.初始化StreamingContext
//SparkStreaming程序入口
val conf: SparkConf = new SparkConf().setAppName("wordcount").setMaster("local[*]")//如果需要打包运行在服务器上,不需要指定master
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))

        初始化完Context之后:

        1.定义消息输入源来创建DStreams;

        2.定义DStreams的转化操作和输出操作;

        3.通过streamingContext.start()来启动消息采集和处理;

        4.等待程序终止,可以通过streamingContext.awaitTermination()来设置;

        5.通过streamingContext.stop()来手动终止处理程序。

示例代码:

package chapter5
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
  def main(args: Array[String]): Unit = {
    //SparkStreaming程序入口
    val conf: SparkConf = new SparkConf().setAppName("wordcount").setMaster("local[*]")//因为要打包运行在服务器上,所以不需要指定master
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))
    //接收数据,socketTextStream第一个参数是数据源,第二个参数是自定义端口号,和数据源中的对应上即可
    val data: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop101", 9999)
    //按照分隔符进行切分
    val spliFile: DStream[String] = data.flatMap(_.split(" "))
    //将每个单词记为1
    val wordAndOne: DStream[(String, Int)] = spliFile.map((_, 1))
    //聚合单词
    val wordAndCount: DStream[(String, Int)] = wordAndOne.reduceByKey(_ + _)
    //打印输出
    wordAndCount.print()
    //开启SparkStreaming
    ssc.start()
    //让程序一直处于接收状态,等待关闭
    ssc.awaitTermination()
  }
}

StreamingContext和SparkContext什么关系?

        StreamingContext一旦启动,对DStreaming的操作无法修改。在同一时间一个JVM中只有一个StreamingContext可以启动。stop()方法将同时停止StopContext,可以传入参数stopSparkContext用于只停止StreamingContext。在Spark1.4版本后,通过设置sparkConf.set("spark.streaming.stopGracefullyOnShutdown","true")即可停止SparkStreaming并不会丢失数据。在StreamingContext的start方法中已经注册了Hook方法。

2.什么是DStreams

        Discretized Stream是SparkStreaming的基础抽象,代表持续性的数据流和经过各种Spark源语操作后的结果数据流。在内部实现上,DStream是一系列连续的RDD来表示。每个RDD含有一段时间间隔内的数据,SparkStreaming把一系列连续的批次称为DStream。

DStream的特点:

        将流式计算分解成为一系列确定并且较小的批处理作业;可以将失败或执行较慢的任务在其他节点上执行;有较强的容错能力,基于lineage;DStream内含high-level operations进行处理;DStream内部实现为一个RDD序列。

        对数据的操作也是按照RDD为单位来进行的。

 

         计算过程由Sparkengine来完成。

 3.DStreams输入

        SparkStreaming原生支持一些不同的数据源。一些“核心”数据源已经被打包到Spark Streaming的Maven工件中,而其他的一些则可以通过spark-streaming-kafka等附加工件获取。

Streaming的数据源和接收器:

        数据源:

基本数据源:socket、file,akka actor。Steaming中自带了该数据源的读取API;

高级数据源:kafka,flume,kinesis,Twitter等其他的数据。必须单独导入集成的JAR包。

        接收器:

        Socket数据源,有一个接收器,用户接受socket的数据。每个接收器必须占用一个cores(线程)来接收数据。如果资源不足,那么任务会处于等待状态。

        每个接收器都以spark执行器程序中一个长期运行的任务的形式运行,因此会占据分配给应用的CPU核心。此外,我们还需要有可用的CPU核心来处理数据。这意味着如果要运行多个接收器,就必须至少有和接收器数目相同的核心数,还要加上用来完成计算所需要的核心数。例如,如果我们想要在流计算应用中运行10个接收器,那么至少需要为应用分配11个CPU核心。所以如果在本地模式运行,不要使用local或者local[1],

        除核心数据源外,还可以用附加数据源接收器来从一些知名数据获取系统中接收的数据,这些接收器都作为SparkStreaming的组件进行独立打包了。它们仍然是spark的一部分,不过需要在构建文件中添加额外的包才能使用他们,现有的接收器包括Twitter、Apache Kafka、Amazon Kinesis、Apache Flume,以及ZeroMQ。可以通过添加与Spark版本匹配 的 Maven 工件 spark-streaming-[projectname]_2.10 来引入这些附加接收器。

(1)Spark对Kafka两种连接方式的对比

        Kafka项目在版本0.8和0.10之间引入了一个新的消费者api,因此有两个独立的相应Spark Streaming包可用。请选择正确的包装; 请注意,0.8集成与后来的0.9和0.10代理兼容,但0.10集成与早期的代理不兼容。

spark-streaming-kafka-0-8

spark-streaming-kafka-0-10

Broker Version

0.8.2.1或更高

0.10.0或更高

Api Stability

不赞成

稳定的

Language Support

Scala,Java,Python

Scala,Java

Receiver DStream

没有

Direct DStream

SSL / TLS Support

没有

Offset Commit Api

没有

Dynamic Topic Subscription

没有

        Spark对Kafka的连接方式有两种,一种是Direct方式直连模式,另一种是Receiver方式接收器模式。Direct方式只在driver端接收数据,所以继承了InputDStream,是没有receivers的。主要通过KafkaUtils#createDirectStream以及KafkaUtiles#createStream这两个API来创建,除了要传入的参数不同外,接收kafka数据的节点、拉取数据的时机也完全不同。

(2)Receiver方式

        Receiver:接收器模式是使用Kafka高级ConsumerAPI实现的。与所有接收器一样,从Kafka通过Receiver接收的数据存储在SparkExecutor的内存中,然后由SparkStreaming启动的job来处理数据。然而默认配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,确保零数据丢失,要启动SparkStreaming的预写日志机制(Write Ahead Log)。该机制会同步的将接收到的Kafka数据保存到分布式文件系统(比如HDFS)上的预写日志中,以便底层节点在发生故障时也可以使用预写日志中的数据进行恢复。

        单点读数据,读到的数据会缓存到executor的cache里,增大了内存的使用压力。

        在Receiver的方式中,使用的是Kafka的高阶API接口从Zookeeper中获取offset值,这也是传统的从Kafka中读取数据的方式,但是由于SparkStreaming消费的数据和Zookeeper中记录的offset不同步,这种方式偶尔会造成数据的重复消费。

      特点:

        在spark的executor中,启动一个接收器,专门用于读取kafka的数据,然后存入到内存中,供sparkStreaming消费。

        1.为了保证数据0丢失,WAL,数据会保存两份,有冗余;

        2.Receiver是单点读数据,如果挂掉,程序则无法运行;

        3.数据读到executor内存中,增大了内存使用的压力,如果消费不及时,会造成数据挤压。

        对于使用Maven项目定义的scala/java应用程序时,我们需要添加相应的依赖包:


    org.apache.spark
    spark-streaming-kafka_2.11
    1.6.3

         在流应用程序代码中,导入KafkaUtils并创建输入DStream,如下所示。

import org.apache.spark.streaming.kafka._
val kafkaStream = KafkaUtils.createStream(streamingContext, 
     [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

注意:

        1.Kafka中topic的partition与SparkStreaming中生成的RDD的partition无关,因此,在KafkaUtils.createStream()中,增加某个topic的partition的数量,只会增加单个Receiver消费topic的线程数,也就是读取Kafka中topic partition的线程数量,它不会增加Spark在处理数据时的并行性;

        2.可以使用不同的consumer group和topic创建多个Kafka输入DStream,以使用多个receivers并行接收数据;

        3.如果已使用HDFS等复制文件系统启用了“预读日志”,则接收的数据已在日志中复制。因此,输入流的存储级别的存储级别StorageLevel.MEMORY_AND_DISK_SER(即,使用KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER))。

(3)Direct方式

        Direct:直连模式,在spark1.3之后,引入了direct方式。不同于Receiver的方式,Direct方式没有receiver这一层,其会周期性的获取kafka中每个topic的每个partition中的最新offsets,并且相应的定义要在每个batch中处理偏移范围,当启动处理数据的作业时,kafka的简单消费者API用于从kafka读取定义的偏移范围。

较Receiver优势:

        1.简化的并行:在Receiver的方式中我们提到创建多个Receiver之后利用union来合并成一个DStream的方式提高数据传输并行度。而在Direct方式中,Kafka中的partition与RDD中的partition是一一对应的并行读取kafka数据 ,这种映射关系也更有利于理解和优化;

        2.高效:在Receiver模式中,为了达到0数据丢失需要将数据存入Write Ahead Log中,这样在kafka和日志中就保存了两份数据,浪费。而Direct中不存在,只要kafka中保存的数据够长,都可以从kafka中进行数据恢复;

        3.精确一次:在Receiver的方式中,使用的是kafka的高阶api接口从zookeeper中获取offset值,这是传统的从kafka中读取数据的方式,但由于SparkStreaming消费的数据和zookeeper中记录的offset不同步,这种方式偶尔会造成数据的重复消费,Direct方式,直接使用简单的kafka低阶api,offset则利用sparkstreaming的checkpoints进行记录,消除了这种不一致性。缺点是不会更新zookeeper中的offset值,因此基于zookeeper的kafka监视工具将不会显示进度,但是可以在每个批处理中访问此方法处理的偏移量,并自行更新zookeeper。

直连模式特点:batch time 每隔一段时间,去kafka读取一批数据,然后消费。

         简化并行度,rdd的分区数量=topic的分区数量

         数据存储于kafka中,没有数据冗余

         不存在单点问题

         效率高

可以实现仅消费一次的语义 exactly-once语义

对于使用Maven项目定义的Scala / Java应用程序时,我们需要添加相应的依赖包:


    org.apache.spark
    spark-streaming-kafka-0-10_2.11
2.3.1

//请注意,导入的命名空间包括版本org.apache.spark.streaming.kafka010

(3)SparkStreaming整合kafka

package chapter5

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import java.lang

object kafka_demo {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("kafka").setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    val ssc = new StreamingContext(sc, Seconds(5))

    //kafka配置文件
    val kafkaParams: Map[String, Object] = Map[String, Object](
      //连接kafka集群
      "bootstrap.servers" -> "hadoop101:9092,hadoop102:9092,hadoop103:9092",
      //key的反序列化
      "key.deserializer" -> classOf[StringDeserializer],
      //value的反序列化
      "value.deserializer" -> classOf[StringDeserializer],
      //消费者组id
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      //从哪开始消费偏移量
      //latest:从最新偏移量开始消费数据
      //earlist:从最早偏移量开始消费数据
      "auto.offset.reset" -> "latest",
      //是否自动提交偏移量,默认否
      "enable.auto.commit" -> (false: lang.Boolean)
    )

    //消费kafka中哪个topic的数据,sparkStreming可以同时消费多个topic的数据
    val topics: Array[String] = Array("topicA", "topicB")

    //通过直连方式去kafka拉取数据
    val inputDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))
    //获取kafka当中的值
    val data: DStream[String] = inputDStream.map(_.value())
    //切分
    val spliFile: DStream[String] = data.flatMap(_.split(" "))
    //每个单词记为1
    val wordAndOne: DStream[(String, Int)] = spliFile.map((_, 1))
    //聚合单词
    val wordAndCount: DStream[(String, Int)] = wordAndOne.reduceByKey(_ + _)
    //打印输出
    wordAndCount.print()
    //开启sparkStreaming
    ssc.start()
    //等待关闭
    ssc.awaitTermination()
  }
}

        放在集群上执行,需要开始生产者消费者,参考昨天。

(4)DStreams转换

        DStream上的原语与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updataStateByKey()、transform()以及各种Window相关的原语。

Transformation

Meaning

map(func)

将源DStream中的每个元素通过一个函数func从而得到新的DStreams。

flatMap(func)

和map类似,但是每个输入的项可以被映射为0或更多项。

filter(func)

选择源DStream中函数func判为true的记录作为新DStreams 

repartition(numPartitions)

通过创建更多或者更少的partition来改变此DStream的并行级别。

union(otherStream)

联合源DStreams和其他DStreams来得到新DStream 

count()

统计源DStreams中每个RDD所含元素的个数得到单元素RDD的新DStreams。

reduce(func)

通过函数func(两个参数一个输出)来整合源DStreams中每个RDD元素得到单元素RDD的DStreams。这个函数需要关联从而可以被并行计算。

countByValue()

对于DStreams中元素类型为K调用此函数,得到包含(K,Long)对的新DStream,其中Long值表明相应的K在源DStream中每个RDD出现的频率。

reduceByKey(func, [numTasks])

对(K,V)对的DStream调用此函数,返回同样(K,V)对的新DStream,但是新DStream中的对应V为使用reduce函数整合而来。Note:默认情况下,这个操作使用Spark默认数量的并行任务(本地模式为2,集群模式中的数量取决于配置参数spark.default.parallelism)。你也可以传入可选的参数numTaska来设置不同数量的任务。 

join(otherStream, [numTasks])

两DStream分别为(K,V)和(K,W)对,返回(K,(V,W))对的新DStream。 

cogroup(otherStream, [numTasks])

两DStream分别为(K,V)和(K,W)对,返回(K,(Seq[V],Seq[W])对新DStreams 

transform(func)

将RDD到RDD映射的函数func作用于源DStream中每个RDD上得到新DStream。这个可用于在DStream的RDD上做任意操作。 

updateStateByKey(func)

得到”状态”DStream,其中每个key状态的更新是通过将给定函数用于此key的上一个状态和新值而得到。这个可用于保存每个key值的任意状态数据。 

DStream的转化操作可以分为无状态(stateless)和有状态(stateful)两种。

        · 无状态转化操作:每个批次的处理不依赖于之前批次的数据。常见的RDD转化操作例如,map()、filter()、reduceByKey()等。

        · 有状态转化操作 :需要使用之前批次的数据或者是中间结果来计算当前批次数据。有状态转化操作包括基于滑动窗口的转化操作和追踪状态变化的转化操作。  

1)无状态转化操作

        无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每个RDD。部分无状态转化操作列在下表。针对键值对的DStream转化操作(比如reduceByKey())要添加import StreamingContext._ 才能在scala中使用。

           需要注意的是,尽管这些函数看起来作用在整个流上,但是事实上每个DStream在内部是由许多RDD(批次)组成,且无状态转化操作分别应用在每个RDD上。例如reduceByKey()会规约每个时间区间中的数据,但不会规约不用区间之间的数据。

        举个例子,在之前的wordcount程序中,我们只会统计1秒内接收到的数据的单词个数,而不会累加。

        无状态转化操作也能在多个DStream见整合数据,不过也是在各个时间区间内。例如,键值对DStream拥有和RDD一样的与连接相关的转化操作,也就是cogroup()、join()、leftOuterJoin()等。我们可以在DStream上使用这些操作,这样就对每个批次分别执行了对应的RDD操作。

        我们还可以像在常规的 Spark 中一样使用 DStream 的 union() 操作将它和另一个 DStream 的内容合并起来,也可以使用 StreamingContext.union() 来合并多个流。

2)有状态转化操作

特殊的Transformations

        追踪状态变化UpdateStateByKey 检查点 rdd.cache,persist,checkpoint

        UpdateStateByKey原语用于记录历史记录,有时,我们需要在 DStream 中跨批次维护状态(例如流计算中累加wordcount)。针对这种情况,updateStateByKey() 为我们提供了对一个状态变量的访问,用于键值对形式的 DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件 更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态) 对。

        updateStateByKey() 的结果会是一个新的 DStream,其内部的 RDD 序列是由每个时间区间对应的(键,状态)对组成的。

        updateStateByKey操作使得我们可以在用新信息进行更新时保持任意的状态。为使用这个功能,你需要做下面两步: 
        1. 定义状态,状态可以是一个任意的数据类型。 
        2. 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。

        使用updateStateByKey需要对检查点目录进行配置,会使用检查点来保存状态。

        更新版的wordcount:

package chapter5

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object updataStateByKey_Demo {

  def updateFunc(currentValue:Seq[Int], historyValue:Option[Int]) : Option[Int] = {
    val result: Int = currentValue.sum+historyValue.getOrElse(0)
    Some(result)
  }

  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("updataStateByKey").setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    val ssc = new StreamingContext(sc, Seconds(5))

    //设置检查点,用来保存状态
    ssc.checkpoint("./666")
    //接收数据
    val data: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop101", 9999)
    //按照分隔符进行分割
    val spliFile: DStream[String] = data.flatMap(_.split(" "))
    //每个单词记为1
    val wordAndOne: DStream[(String, Int)] = spliFile.map((_, 1))
    //调用转化算子
    val wordAndCount: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunc)
    //打印输出
    wordAndCount.print()
    //开启SparkStreaming
    ssc.start()
    //一直开启
    ssc.awaitTermination()
  }
}

Window Operations

        Window Operations类似于Storm中的State,可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Streaming的允许状态。

        基于窗口的操作会在一个比StreamingContext的批次间隔更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。

         所有基于窗口的操作都需要两个参数,分别是窗口时长以及滑动步长,两者都必须是StreamContext的批次间隔的整数倍。窗口时长控制每次计算最近的多少个批次的数据,其实就是最近的windowDuration/batchInterval个批次。如果有一个以10秒为批次间隔的源DStream,要创建一个最近 30 秒的时间窗口(即最近 3 个批次),就应当把 windowDuration 设为 30 秒。而滑动步长的默认值与批次间隔相等,用来控制对新的 DStream 进行计算的间隔。如果源 DStream 批次间隔为 10 秒,并且我们只希望每两个批次计算一次窗口结果, 就应该把滑动步长设置为 20 秒。

两个参数:窗口大小;滑动间隔。

        窗口大小:设置一次执行多少个批次数据;

        滑动间隔:隔多久滑动一次窗口。

        窗口长度>滑动间隔:数据重复消费;

        窗口长度<滑动间隔:数据丢失;

        窗口长度=滑动间隔:数据消费且仅消费一次。

Transformation

Meaning

window(windowLengthslideInterval)

基于对源DStream窗化的批次进行计算返回一个新的DStream

countByWindow(windowLengthslideInterval)

返回一个滑动窗口计数流中的元素。

reduceByWindow(funcwindowLengthslideInterval)

通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流。

reduceByKeyAndWindow(funcwindowLengthslideInterval, [numTasks])

当在一个(K,V)对的DStream上调用此函数,会返回一个新(K,V)对的DStream,此处通过对滑动窗口中批次数据使用reduce函数来整合每个key的value值。Note:默认情况下,这个操作使用Spark的默认数量并行任务(本地是2),在集群模式中依据配置属性(spark.default.parallelism)来做grouping。你可以通过设置可选参数numTasks来设置不同数量的tasks。 

reduceByKeyAndWindow(funcinvFuncwindowLengthslideInterval, [numTasks])

这个函数是上述函数的更高效版本,每个窗口的reduce值都是通过用前一个窗的reduce值来递增计算。通过reduce进入到滑动窗口数据并”反向reduce”离开窗口的旧数据来实现这个操作。一个例子是随着窗口滑动对keys的“加”“减”计数。通过前边介绍可以想到,这个函数只适用于”可逆的reduce函数”,也就是这些reduce函数有相应的”反reduce”函数(以参数invFunc形式传入)。如前述函数,reduce任务的数量通过可选参数来配置。注意:为了使用这个操作,检查点必须可用。 

countByValueAndWindow(windowLength,slideInterval, [numTasks])

对(K,V)对的DStream调用,返回(K,Long)对的新DStream,其中每个key的值是其在滑动窗口中频率。如上,可配置reduce任务数量。

        reduceByWindow() 和 reduceByKeyAndWindow() 让我们可以对每个窗口更高效地进行归约操作。它们接收一个归约函数,在整个窗口上执行,比如 +。除此以外,它们还有一种特殊形式,通过只考虑新进入窗口的数据和离开窗口的数据,让 Spark 增量计算归约结果。这种特殊形式需要提供归约函数的一个逆函数,比如 + 对应的逆函数为 -。对于较大的窗口,提供逆函数可以大大提高执行效率。

package chapter5
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object reduceByKeyAndWindow_Demo {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("demo").setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    val ssc = new StreamingContext(sc, Seconds(5))

    //接收数据
    val data: DStream[String] = ssc.socketTextStream("hadoop101",9999)
    val spliFile: DStream[String] = data.flatMap(_.split(" "))
    val wordAndOne: DStream[(String, Int)] = spliFile.map((_, 1))
    //聚合,设置滑动窗口其中窗口大小20s,滑动间隔5s
    val wordAndCount: DStream[(String, Int)] = wordAndOne.reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(20), Seconds(5))
    wordAndCount.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

重要操作

1) Transform Operation

        Transform原语允许DStream上执行任意的RDD-to-RDD函数。即使这些函数并没有在DStream的API中暴露出来,通过该函数可以方便的扩展Spark API。 

        该函数每一批次调度一次。其实也就是对DStream中的RDD应用转换。

package chapter5
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object transform_Demo {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("demo").setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    val ssc = new StreamingContext(sc, Seconds(5))

    //接收数据
    val file: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop101", 9999)
    val spliFile: DStream[String] = file.flatMap(_.split(" "))
    val wordAndOne: DStream[(String, Int)] = spliFile.map((_, 1))
    val wordAndCount: DStream[(String, Int)] = wordAndOne.reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(30), Seconds(5))
    //在聚合后的基础上通过transform排序,取前两名
    val result: DStream[(String, Int)] = wordAndCount.transform(rdd => {
      val sortRDD: RDD[(String, Int)] = rdd.sortBy(_._2, false)
      sortRDD.take(2).foreach(println)
      sortRDD
    })
    result.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

 2)foreachRDD       

        通用的输出操作 foreachRDD(),它用来对 DStream 中的 RDD 运行任意计算。这和transform() 有些类似,都可以让我们访问任意 RDD。在 foreachRDD() 中,可以重用我们在 Spark 中实现的所有行动操作。比如,常见的用例之一是把数据写到诸如 MySQL 的外部数据库中。

需要注意的:

        连接不能写在driver层面;

        如果写在foreach则每个RDD都创建,得不偿失;

        增加foreachPartition,在分区创建;

        可以考虑使用连接池优化。

package chapter5
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object transform_Demo {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("demo").setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    val ssc = new StreamingContext(sc, Seconds(5))

    //接收数据
    val file: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop101", 9999)
    val spliFile: DStream[String] = file.flatMap(_.split(" "))
    val wordAndOne: DStream[(String, Int)] = spliFile.map((_, 1))
    val wordAndCount: DStream[(String, Int)] = wordAndOne.reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(30), Seconds(5))
    //通过foreachRDD遍历(无返回值)和transform类似只是无返回值
    val result: Unit = wordAndCount.foreachRDD(rdd => {
      val sortRDD: RDD[(String, Int)] = rdd.sortBy(_._2, false)
      sortRDD.take(2).foreach(println)
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

3) Join 操作

        连接操作(leftOuterJoin,rightOuterJoin,fullOuterJoin也可以),可以连接Stream-Stream,windows-stream to windows-stream、stream-datase。

package chapter5

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object join_Demo {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("join").setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    val ssc = new StreamingContext(sc, Seconds(5))
    val file: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop101", 9999)
    val spliFile: DStream[String] = file.flatMap(_.split(" "))
    //生成两个DStream
    val DStream1: DStream[(String, String)] = spliFile.map(word => (word, word + "one"))
    val DStream2: DStream[(String, String)] = spliFile.map(word => (word, word + "two"))
    val result: DStream[(String, (String, String))] = DStream1.join(DStream2)
    result.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

DStreams输出

        输出操作指定了对刘淑菊经转换操作得到的数据所要执行的操作(例如把结果推入到外部数据库或者输出到屏幕上)。与RDD中的惰性求值类似,如果一个DStream极其派生出的DStream都没有被执行输出操作,那么这些DStream就都不会被求值。如果StreamingContext中没有设定的输出操作,整个context都不会启动。

Output Operation

Meaning

print()

在运行流程序的驱动结点上打印DStream中每一批次数据的最开始10个元素。这用于开发和调试。在Python API中,同样的操作叫pprint()。 

saveAsTextFiles(prefix, [suffix])

以text文件形式存储这个DStream的内容。每一批次的存储文件名基于参数中的prefix和suffix。”prefix-Time_IN_MS[.suffix]”. 

saveAsObjectFiles(prefix, [suffix])

以Java对象序列化的方式将Stream中的数据保存为 SequenceFiles . 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]". Python中目前不可用。

saveAsHadoopFiles(prefix, [suffix])

将Stream中的数据保存为 Hadoop files. 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]". 
Python API Python中目前不可用。

foreachRDD(func)

这是最通用的输出操作,即将函数func用于产生于stream的每一个RDD。其中参数传入的函数func应该实现将每一个RDD中数据推送到外部系统,如将RDD存入文件或者通过网络将其写入数据库。注意:函数func在运行流应用的驱动中被执行,同时其中一般函数RDD操作从而强制其对于流RDD的运算。

        dstream.foreachRDD内的代码在Driver端执行;

        rdd.foreachPartition内的代码在Executor端执行;

        rdd.foreach内的代码在Executor端执行。

        通用的输出操作foreachRDD(),它用来对DStream中的RDD运行任意计算。这和transform()有些类似,都可以访问任意RDD。在foreachRDD()中,可以重用我们在Spark中实现的所有行动操作。比如,把数据写入到MYSQL的外部数据库中,连接不能写在Driver端层面,如果写在foreach则每个rdd都创建,得不偿失。增加foreachPartition,在分区创建,可以使用连接池优化。

dstream.foreachRDD { rdd =>
  // error val connection = createNewConnection()  // executed at the driver 序列化错误

  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record) // executed at the worker
    )
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/278569.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号