栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

9.4.2、Spark Streaming

9.4.2、Spark Streaming

1、滑动数据(优化)

在使用滑动窗口时,每次计算的数据有部分是包含上一次计算的数据的,还要计算一次,会重复计算,所以可以每次将计算的结果保存起来,下一次计算时只需要将不要的数据删除,加上这次新来的数据,底层的优化,这部分的优化也是由框架自动实现的

worDS.reduceByKeyAndWindow(
  (x: Int, y: Int) => x + y //传入的聚合函数
  , (x: Int, y: Int) => x - y //传入的聚合函数,做减法,针对多余的数据做减法处理
  , Durations.seconds(10) //窗口的时间大小
  , Durations.seconds(5)) //多久一个任务,滑动一次,前面指定了
  .print()
2、稽查布控(spark Streaming的应用)

采集数据(生产数据)–>消息队列–>sparkStreaming(消费数据)–>filter(过滤操作)–>写回MySQL
—在查询数据的时候,根据黑名单输出指定的数据
—使用nc模拟数据采集

读取数据处理过程:
* 1、创建集合存储黑名单数据,运行使用发现List不合理,不能动态的添加删除黑名单的数据
* 2、使用MySQL读取数据,放在外面还是不能实现动态的添加删除数据
* 3、算子外面的代码在Driver端执行的,只是执行一次,所以需要将连接放在算子内部
* val nameList: List[String] = get_Conn()
* 4、因此将MySQL连接放在内部,在Executor端执行,但是每次执行都需要创建一个连接
* 来一条数据创建一个,如果数据量过大,会将MYSQL搞崩溃
* 5、优化使用高性能算子mapPartitions(),每一个分区创建一个连接,一个分区执行一次
* 6、使用transform()算子,transform内的代码每个batch会执行一次

处理完成写回MySQL过程:
* 1、将最终包含黑名单的数据结果写回MySQL数据库,使用foreachRDD,刚好和transform是一对一起使用的
* 2、由于连接不能序列化,不能在网络中传输,需要使用

foreachPartition,每个分区创建一个连接
object Demo04BuKong {

  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo04BuKong")
    conf.setMaster("local[2]")
    val sc: SparkContext = new SparkContext(conf)

    //创建StreamingContext对象
    val ssc: StreamingContext = new StreamingContext(sc, Durations.seconds(5))
    //创建连接nc,模拟实时数据流
    val pointDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)

    

    //进行filter操作
    //pointDS是Dstream类型,底层还是RDD的转化
    val point_end: DStream[String] = pointDS.transform(rdd => {
      //每个batch执行一次,每5s执行一次
      //可以再优化,使用广播
      val nameList: List[String] = get_Conn()
      //直接使用上面的sc可能解析会有问题,通过rdd新建一个SparkContext连接用于广播
      val rddsc: SparkContext = rdd.sparkContext
      val brostlist: Broadcast[List[String]] = rddsc.broadcast(nameList)

      val filterRDD = rdd.filter(point => {
        val splits: Array[String] = point.split(",") //进来的一条条数据
        val mdn: String = splits(0) //将手机号切出来,获取手机号
        brostlist.value.contains(mdn) //返回集合中包含的手机号
      })
      filterRDD
    })

    

    //将最终结果写回mysql,使用foreachRDD,刚好和transform是一对一起使用的
    point_end.foreachRDD(rdd => {

      rdd.foreachPartition(rd => {
        Class.forName("com.mysql.jdbc.Driver")
        val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/shujiaku"
          , "root"
          , "123456")
        val statement = conn.prepareStatement("insert into result values(?)")

        rd.foreach(line => {
          statement.setString(1, line)
          statement.addBatch() //批处理插入数据
        })
        statement.executeBatch() //执行
        conn.close() //关闭连接
      })
    })
    //启动任务
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }

  
  def get_Conn(): List[String] = {
    //创建ListBuffer存储查询的数据
    val mdnList: ListBuffer[String] = new ListBuffer[String]()

    Class.forName("com.mysql.jdbc.Driver")
    val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/shujiaku"
      , "root"
      , "123456")
    val statement: Statement = conn.createStatement()
    val sql: String = "select * from mdnn"
    val result: ResultSet = statement.executeQuery(sql)
    while (result.next()) {
      val mdn: String = result.getString("mdn")
      mdnList.append(mdn)
    }
    conn.close()
    //返回list数据
    mdnList.toList
  }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/673574.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号