栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

021 Spark Streaming

021 Spark Streaming

1、简介

Spark Streaming抽象、架构与原理
Spark Streaming 背压(Back Pressure)机制

2、Spark检查点存储到HDSF

环境配置同020 Spark SQL,并启动HIVE

package com.atguigu.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object WorldCount {
  def main(args: Array[String]) {

	System.setProperty("HADOOP_USER_NAME", "atguigu")
	
    // 定义更新状态方法,参数values为当前批次单词频度,state为以往批次单词频度
    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.foldLeft(0)(_ + _)
      val previousCount = state.getOrElse(0)
      Some(currentCount + previousCount)
    }

    val conf = new SparkConf().setMaster("local[*]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(conf, Seconds(3))
    ssc.checkpoint("./ck")

    // Create a DStream that will connect to hostname:port, like hadoop102:9999
    val lines = ssc.socketTextStream("hadoop102", 9998)

    // Split each line into words
    val words = lines.flatMap(_.split(" "))

    //import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
    // Count each word in each batch
    val pairs = words.map(word => (word, 1))

    // 使用updateStateByKey来更新状态,统计从运行开始以来单词总的次数
    val stateDstream = pairs.updateStateByKey[Int](updateFunc)
    stateDstream.print()

    ssc.start()             // Start the computation
    ssc.awaitTermination()  // Wait for the computation to terminate
    //ssc.stop()
  }
}

nc -lk 9999

3、滑动窗口操作

Spark Streaming之window(窗口操作)
优雅关闭spark streaming job填坑之路
sparkStreaming 连接数据库 --设计模式
回调方法?钩子方法?模板模式?
scala学习笔记-case class

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/760855.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号