栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark进阶(七):spark streaming 用法[上]

spark进阶(七):spark streaming 用法[上]

Spark Streaming是Spark Core API(Spark RDD)的扩展,支持对实时数据流进行可伸缩、高吞吐量及容错处理。数据可以从Kafka、Flume、Kinesis或TCPSocket等多种来源获取,并且可以使用复杂的算法处理数据,这些算法由map()、reduce()、join()和window()等高级函数表示。

Spark Streaming提供了一种高级抽象,称为DStream(Discretized Stream)。 在内部,对输入数据流拆分成的每个批次实际上是一个RDD,一个DStream由多个RDD组成,相当于一个RDD序列。

与RDD类似,许多普通RDD上可用的操作算子DStream也支持。使用这些算子可以修改输入DStream中的数据,进而创建一个新的DStream。对DStream的操作主要有3种:无状态操作、状态操作、窗口操作。

状态操作是指,需要把当前时间批次和历史时间批次的数据进行累加计算,即当前时间批次的处理需要使用之前批次的数据或中间结果。使用updateStateByKey()算子可以保留key的状态,并持续不断地用新状态更新之前的状态。

在DStream上使用persist()方法可以将该DStream的每个RDD持久化到内存中。这在DStream中的数据需要被计算多次(例如,对同一数据进行多次操作)时非常有用。对于基于窗口的操作(如reduceByWindow()、reduceByKeyAndWindow()),以及基于状态的操作(如updateStateByKey(),都默认开启了persist()。

Spark Streaming应用程序必须全天候运行,因此与应用程序逻辑无关的故障(例如系统故障、JVM崩溃等)不应该对其产生影响。为此,Spark Streaming需要对足够的数据设置检查点,存储到容错系统中,使其能够从故障中恢复。

一、接收websock数据并累加处理
  • 创建流context
  • 设置checkpoint
  • 连接端口
  • 进行累加处理
import org.apache.log4j.{Level, Logger}
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.DStream

object SparkStream {
  val updateFunc: (Seq[Int], Option[Int]) => Some[Int] = (values:Seq[Int], state:Option[Int]) => {
    val currentCount = values.sum  // 累加当前批次单词
    val previousCount = state.getOrElse(0)  //获取上一批次单词,默认为0
    Some(currentCount+previousCount)  // 加和
  }
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conf = new SparkConf()
      .setMaster("local[2]")
      .setAppName("WordCount")
    val ssc = new StreamingContext(conf, Seconds(10))  // 设置10秒统计一次
    ssc.checkpoint("hdfs://localhost:9000/spark-ck")  // 设置检查点

    val lines = ssc.socketTextStream("localhost", 9999)  // DStream通过连接端口获取数据
    val words = lines.flatMap(_.split(" "))
    val pairs = words.map(word => (word, 1))
    val result:DStream[(String, Int)] = pairs.updateStateByKey(updateFunc)  // 进行累加操作
    result.print()  // 打印前10行

    ssc.start()   // 启动
    ssc.awaitTermination()   // 等待完成
  }
}

打开终端,开启9999端口:

(base) [~/softwares/kafka_2.12-2.8.1]$ nc -lk 9999
1 2 3 4 5 6
hello world

二、kafka源的统计字数处理

首先添加依赖:


  org.apache.spark
  spark-streaming-kafka-0-10_2.12
  3.1.2

  • 创建StringContext
  • 对kafka进行配置
  • 通过 KafkaUtils.createDirectStream进行直连
  • 获取DStream进行累加处理
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.kafka.common.serialization.StringDeserializer


object StreamKafka {
  val updateFunc: (Seq[Int], Option[Int]) => Some[Int] = (values:Seq[Int], state:Option[Int]) => {
    val currentCount = values.sum  // 累加当前批次单词
    val previousCount = state.getOrElse(0)  //获取上一批次单词,默认为0
    Some(currentCount+previousCount)  // 加和
  }
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("StreamKafka")

    val ssc = new StreamingContext(conf, Seconds(10))  // 设置10秒统计一次
    ssc.checkpoint("hdfs://localhost:9000/spark-ck")  // 设置检查点

    val kafkaTopics = Array("topictest")  // 设置使用的kafka topic 可以是多个

    // kafka 的 配置
    val kafkaParams = Map[String, Object] (
      "bootstrap.servers" -> "ffzs-ub:9092",
      "key.deserializer"->classOf[StringDeserializer],   // 设置 key ,value 反序列化使用的类
      "value.deserializer"->classOf[StringDeserializer],
      "group.id"->"1",  // 设置消费者组ID,ID相同的消费者属于同一组
      "enable.auto.commit"->(false:java.lang.Boolean)  // kafka不自动提交偏移量,通过spark操作, 默认是true
    )

    // 创建DStream
    val inputStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String,String](
      ssc,
      LocationStrategies.PreferConsistent,
      Subscribe[String, String](kafkaTopics,kafkaParams)
    )

    // 解析取出key和value
    val linesDStream = inputStream.map(record => (record.key(), record.value()))
    val word = linesDStream.map(_._2)
      .flatMap(_.split(" "))
      .map(it => (it, 1))

    val result = word.updateStateByKey(updateFunc)
    result.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

终端打开kafka生产者:

(base) [~/softwares/kafka_2.12-2.8.1]$ kafka-console-producer.sh --broker-list ffzs-ub:9092 --topic topictest
>a b b b b bbc c
>hello world
>ffzs is a good man

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/311340.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号