栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark:Streaming 实践 Dstream 转换算子、窗口、输出文件

Spark:Streaming 实践 Dstream 转换算子、窗口、输出文件

文章目录
    • 1、启动集群
    • 2、IDEA安装依赖
      • 2.1 启动服务端监听 Socket 服务
      • 2.2 实现 transform() 方法,分割多个单词
      • 2.3 UpdateStateByKeyTest 更新值
      • 2.4 Dstream 窗口操作
      • 2.5 DStream 输出操作

1、启动集群
启动zookeeper,hadoop,flume
# 1、三个节点
/usr/zookeeper/zookeeper-3.4.10/bin/zkServer.sh start 
/usr/zookeeper/zookeeper-3.4.10/bin/zkServer.sh status

# 2、master节点 启动hadoop
/usr/hadoop/hadoop-2.7.3/sbin/start-all.sh

2、IDEA安装依赖
    
        2.11.8
        2.7.3
        2.4.0
        1.2.4
        2.1.1
    

    
        
        
            org.scala-lang
            scala-library
            ${scala.version}
        

        
        
            org.apache.spark
            spark-core_2.11
            ${spark.version}
        

        
            org.apache.spark
            spark-streaming_2.11
            2.4.0
        

        
            org.apache.spark
            spark-sql_2.11
            2.4.0
        

        
            org.apache.spark
            spark-mllib_2.11
            ${spark.version}
        

        
            org.apache.spark
            spark-streaming-kafka-0-8_2.11
            ${spark.version}
        

        
        
            org.apache.spark
            spark-streaming-flume_2.11
            2.3.0
        

        
        
            org.apache.hadoop
            hadoop-client
            ${hadoop.version}
        

        
            org.apache.hadoop
            hadoop-common
            2.7.3
        

        
            org.apache.hadoop
            hadoop-hdfs
            2.7.3
        

        
        
            org.apache.hbase
            hbase-client
            ${hbase.version}
        
        
            org.apache.hbase
            hbase-common
            ${hbase.version}
        

        
            org.apache.hbase
            hbase-server
            ${hbase.version}
        

        
            org.apache.hbase
            hbase-protocol
            ${hbase.version}
        

        
            org.apache.hbase
            hbase-annotations
            ${hbase.version}
            test-jar
            test
        

        
        
            org.apache.hive
            hive-exec
            ${hive.version}
        
        
            org.apache.hive
            hive-jdbc
            ${hive.version}
        

        
        
            org.apache.kafka
            kafka-clients
            2.4.0
        

        
            org.apache.kafka
            kafka-streams
            2.4.0
        

        
        
            mysql
            mysql-connector-java
            5.1.46
        


        
            com.alibaba
            fastjson
            1.2.17
        

        
            com.huaban
            jieba-analysis
            1.0.2
        

        
            junit
            junit
            4.12
            test
        
    

    
        
            
                org.scala-tools
                maven-scala-plugin
                2.15.2
                
                    
                        
                            compile
                            testCompile
                        
                    
                
            

            
                org.apache.maven.plugins
                maven-compiler-plugin
                3.1
                
                    1.8
                    1.8
                
            

            
                org.apache.maven.plugins
                maven-surefire-plugin
                2.12.4
                
                    true
                
            
        
        compile
    

在之前安装过的只需要安装:

        
            org.apache.kafka
            kafka-streams
            2.4.0
        

2.1 启动服务端监听 Socket 服务

命令:nc -lk 9999


2.2 实现 transform() 方法,分割多个单词
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object TransformTest {
  def main(args: Array[String]): Unit = {
    // 1.创建SparkConf对象
    val sparkConf: SparkConf = new SparkConf()
      .setAppName("TransformTest").setMaster("local[2]")
    // 2.创建SparkContext对象,它是所有任务计算的源头
    val sc: SparkContext = new SparkContext(sparkConf)
    // 3.设置日志级别
    sc.setLogLevel("WARN")
    // 4.创建StreamingContext,需要两个参数,分别为SparkContext和批处理时间间隔
    val ssc: StreamingContext = new StreamingContext(sc,Seconds(5))
    // 5.连接socket服务,需要socket服务地址、端口号及存储级别(默认的)
    // 以上是固定搭配结构
    val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.142.128",9999)
    
    // 6.使用RDD-to-RDD函数,返回新的DStream对象(即words),并空格切分每行
    val words: DStream[String] = dstream.transform(rdd => rdd.flatMap(_.split(" ")))
    // 7.打印输出结果
    words.print()
    // 8.开启流式计算
    ssc.start()
    // 9.让程序一直运行,除非人为干预停止
    ssc.awaitTermination()
  }
}

2.3 UpdateStateByKeyTest 更新值

如果要使用updateStateByKey算子,就必须设置一个checkpoint目录,开启checkpoint机制。以便于在内存数据丢失的 时候,可以从checkpoint恢复数据。

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object UpdateStateByKeyTest {
  //newValues 表示当前批次汇总成的(word,1)中相同单词的所有1
  //runningCount 表示历史的所有相同key的value总和
  def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    val newCount =runningCount.getOrElse(0)+newValues.sum
    Some(newCount)
  }
  def main(args: Array[String]): Unit = {
    // 1.创建SparkConf对象 设置appName和master地址  local[2] 表示本地采用2个线程运行任务
    val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[2]")
    // 2.创建SparkContext对象,它是所有任务计算的源头,它会创建DAGScheduler和TaskScheduler
    val sc: SparkContext = new SparkContext(sparkConf)
    // 3.设置日志级别
    sc.setLogLevel("WARN")
    // 4.创建StreamingContext,需要2个参数,一个是SparkContext,一个是批处理的时间间隔
    val ssc: StreamingContext = new StreamingContext(sc,Seconds(2))
    // 5.配置检查点目录,使用updateStateByKey方法必须配置检查点目录
    ssc.checkpoint("D:\data\ch1")
    // 6.对接socket数据创建DStream对象,需要socket服务的地址、端口号及存储级别(默认的)
    val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("master",9999)
    
    // 7.按空格进行切分每一行,并将切分的单词出现次数记录为1
    val wordAndOne: DStream[(String, Int)] = dstream.flatMap(_.split(" ")).map(word =>(word,1))
    // 8.调用updateStateByKey操作,统计单词在全局中出现的次数
    var result: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunction)
    // 9.打印输出结果
    result.print()
    // 10.开启流式计算
    ssc.start()
    // 11.让程序一直运行,除非人为干预停止
    ssc.awaitTermination()
  }
}


当注释掉 ssc.checkpoint("D:\data\ch1"),则运行会报错。


2.4 Dstream 窗口操作
  • 事先设定一个滑动窗口的长度(也就是窗口的持续时间);
  • 设定滑动窗口的时间间隔(每隔多长时间执行一次计算),让窗口按照指定时间间隔在源DStream上滑动;
  • 每次窗口停放的位置上,都会有一部分Dstream(或者一部分RDD)被框入窗口内,形成一个小段的Dstream;
  • 可以启动对这个小段DStream的计算。

方法名称相关说明
window(windowLength, slideInterval)基于源DStream产生的窗口化的批数据,计算得到一个新的Dstream;
countByWindow(windowLength, slideInterval)返回流中元素的一个滑动窗口数;
reduceByWindow(func, windowLength, slideInterval)返回一个单元素流。利用函数func聚集滑动时间间隔的流的元素创建这个单元素流。函数func必须满足结合律,从而可以支持并行计算;
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])应用到一个(K,V)键值对组成的DStream上时,会返回一个由(K,V)键值对组成的新的DStream。每一个key的值均由给定的reduce函数(func函数)进行聚合计算。注意:在默认情况下,这个算子利用了Spark默认的并发任务数去分组。可以通过numTasks参数的设置来指定不同的任务数。
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])更加高效的 reduceByKeyAndWindow,每个窗口的reduce值,是基于先前窗口的reduce值进行增量计算得到的;它会对进入滑动窗口的新数据进行reduce操作,并对离开窗口的老数据进行“逆向reduce‖操作。但是,只能用于“可逆reduce函数”,即那些reduce函数都有一个对应的“逆向reduce函数”(以InvFunc参数传入)。
countByValueAndWindow(windowLength, slideInterval, [numTasks])当应用到一个(K,V)键值对组成的DStream上,返回一个由(K,V)键值对组成的新的DStream;每个key的值都是它们在滑动窗口中出现的频率。
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream,ReceiverInputDStream}
object WindowTest {
  def main(args: Array[String]): Unit = {
    // 1.创建SparkConf对象
    val sparkConf: SparkConf = new SparkConf()
      .setAppName("WindowTest ").setMaster("local[2]")
    // 2.创建SparkContext对象,它是所有任务计算的源头
    val sc: SparkContext = new SparkContext(sparkConf)
    // 3.设置日志级别
    sc.setLogLevel("WARN")
    // 4.创建StreamingContext,需要两个参数,分别为SparkContext和批处理时间间隔
    val ssc: StreamingContext = new StreamingContext(sc,Seconds(1))
    // 5.连接socket服务,需要socket服务地址、端口号及存储级别(默认的)
    val dstream: ReceiverInputDStream[String] = ssc
      .socketTextStream("master",9999)
      
    // 6.按空格进行切分每一行
    val words: DStream[String] = dstream.flatMap(_.split(" "))
    // 7.调用window操作,需要两个参数,窗口长度和滑动时间间隔
    val windowWords: DStream[String] = words.window(Seconds(3),Seconds(1))
    // 8.打印输出结果
    windowWords.print()
    // 9.开启流式计算
    ssc.start()
    // 10.让程序一直运行,除非人为干预停止
    ssc.awaitTermination()
  }
}


把批处理时间间隔、窗口长度和滑动时间间隔进行变换,在其中两者非整数倍的情况下,会报错。


2.5 DStream 输出操作
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream

object SaveAsTextFilesTest {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root")
    //1.创建SparkConf对象 设置appName和master地址  local[2] 表示本地采用2个线程运行任务
    val sparkConf: SparkConf = new SparkConf().setAppName("SaveAsTextFilesTest").setMaster("local[2]")
    //2.创建SparkContext对象,它是所有任务计算的源头,它会创建DAGScheduler和TaskScheduler
    val sc: SparkContext = new SparkContext(sparkConf)
    //3.设置日志级别
    sc.setLogLevel("WARN")
    //4.创建StreamingContext,需要2个参数,一个是SparkContext,一个是批处理的时间间隔
    val ssc: StreamingContext = new StreamingContext(sc,Seconds(5))
    //5.对接socket数据创建DStream对象,需要socket服务的地址、端口号及存储级别(默认的)
    val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.142.128",9999)
    
    //6.调用saveAsTextFiles操作,将nc窗口输出的内容保存到HDFS上
    dstream.saveAsTextFiles("hdfs://master:8020//saveAsTextFiles/satf","txt")
    //7.开启流式计算
    ssc.start()
    //8.让程序一直运行,除非人为干预停止
    ssc.awaitTermination()
  }
}


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/651498.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号