栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

StructuredStreaming

StructuredStreaming

1 设置日志级别

Logger.getLogger("org").setLevel(Level.WARN)

session.sparkContext.setLogLevel("WARN")

案例

object _05StructuredStreamingDemo {
    def main(args: Array[String]): Unit = {
        //获取SparkSql的上下文对象
        val spark: SparkSession = SparkSession.builder().appName("test1").master("local[*]").getOrCreate()
        spark.sparkContext.setLogLevel("ERROR")
        //接收nc发送过来的数据
        import spark.implicits._
        val ds: Dataset[String] = spark.readStream   //是一个实时的读取流对象,因此不会只读一次
          .format("socket")
          .option("host", "qianfeng01")
          .option("port", 10087)
          .load().as[String]
        //数据处理
        val value: KeyValueGroupedDataset[String, (String, Int)] = ds.flatMap(_.split(" ")).map((_, 1)).groupByKey(_._1)
        //统计一个key中有多少对的(key,1)
        val value1: Dataset[(String, Long)] = value.count()
        
        //StruncturedStreaming必须使用writeStream.start()来执行
        value1.writeStream
          
          .outputMode(OutputMode.Complete())
          .format("console")
          .start()      //启动数据流计算程序
          .awaitTermination()   //防止没有数据产生时,停止程序
    }
}

读取kafka里面的消息

package com.qf.sparkstreaming.day04

import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{DataTypes, StructType}
import org.apache.spark.sql.{Dataframe, SparkSession}


object _03KafkaSourceJson {
    def main(args: Array[String]): Unit = {
        val session: SparkSession = SparkSession.builder().appName("test1").master("local[*]").getOrCreate()
        session.sparkContext.setLogLevel("ERROR")

        //作为消费者,从kafka读取数据,获取到的数据有schema,
        // 分别是 key|value|topic|partition|offset|timestamp|timestampType|
        val frame: Dataframe = session.readStream.format("kafka")
          .option("kafka.bootstrap.servers","qianfeng01:9092,qianfeng02:9092,qianfeng03:9092")
          .option("startingOffsets","earliest")
          .option("subscribe","student").load()
        //处理kafka中的数据

        val last_event = new StructType()
          .add("has_sound",DataTypes.BooleanType)
          .add("has_motion",DataTypes.BooleanType)
          .add("has_person",DataTypes.BooleanType)
          .add("start_time",DataTypes.DateType)
          .add("end_time",DataTypes.DateType)



        val cameras = new StructType()
          .add("device_id",DataTypes.StringType)
          .add("last_event",last_event)

        val devices = new StructType()
          .add("cameras",cameras)

        val schema = new StructType()
          .add("devices",devices)


        //映射时间格式
        val jsonOptions = Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss.sss'Z'")

        import session.implicits._
        import org.apache.spark.sql.functions._
        //处理value是json的数据,然后返回的是字段value的数据是一个json数据
        val frame1: Dataframe = frame.selectExpr("cast(value as String)")
          .select(from_json('value, schema, jsonOptions).alias("value"))

        //查询value里的has_person ,start_time,end_time
        val frame2: Dataframe = frame1.
          selectExpr("value.devices.cameras.last_event.has_person",
              "value.devices.cameras.last_event.start_time",
              "value.devices.cameras.last_event.end_time"
          )
            .filter($"has_person"===true)
            .groupBy($"has_person",$"start_time")
            .count()


        frame2.writeStream
          .outputMode(OutputMode.Update())
          .format("console")
          .start()
          .awaitTermination()
    }
}

生产者测试

{"devices":{"cameras":{"device_id":"awJo6rH","last_event":{"has_sound":true,"has_motion":true,"has_person":true,"start_time":"2016-12-29T00:00:00.000Z","end_time":"2016-12-29T18:42:00.000Z"}}}}
{"devices":{"cameras":{"device_id":"awJo6rH","last_event":{"has_sound":true,"has_motion":true,"has_person":false,"start_time":"2016-12-29T00:00:00.000Z","end_time":"2016-12-29T18:42:00.000Z"}}}}
{"devices":{"cameras":{"device_id":"awJo6rH","last_event":{"has_sound":true,"has_motion":true,"has_person":true,"start_time":"2016-12-29T00:00:00.000Z","end_time":"2016-12-29T18:42:00.000Z"}}}}
{"devices":{"cameras":{"device_id":"awJo6rH","last_event":{"has_sound":true,"has_motion":true,"has_person":false,"start_time":"2016-12-29T00:00:00.000Z","end_time":"2016-12-29T18:42:00.000Z"}}}}
{"devices":{"cameras":{"device_id":"awJo6rH","last_event":{"has_sound":true,"has_motion":true,"has_person":true,"start_time":"2016-12-29T00:00:00.000Z","end_time":"2016-12-29T18:42:00.000Z"}}}}

//作为消费者,从kafka读取数据,获取到的数据有schema,
// 分别是 key|value|topic|partition|offset|timestamp|timestampType|

读取kafka的上的student主题,别忘记开启生产者进行测试

StructureStreming的Sink
HDFS sink

package com.qf.sparkstreaming.day04

import org.apache.spark.sql.{Dataframe, Dataset, SparkSession}

object _04SinkHdfs {
    def main(args: Array[String]): Unit = {
        val session: SparkSession = SparkSession.builder()
        .appName("test1")
        .master("local[*]").getOrCreate()
        session.sparkContext.setLogLevel("ERROR")

        //作为消费者,从kafka读取数据,获取到的数据有schema,
        // 分别是 key|value|topic|partition|offset|timestamp|timestampType|
        val frame: Dataframe = session.readStream.format("kafka")
          .option("kafka.bootstrap.servers",
                  "qianfeng01:9092,qianfeng02:9092,qianfeng03:9092")
//          .option("startingOffsets","earliest")
          .option("subscribe","pet").load()

        //处理一下数据
        val frame1: Dataframe = frame.selectExpr("cast(value as String)")

        //保存到本地磁盘
        frame1.writeStream
          .format("text")
//          .option("path","out4")   //存储到本地磁盘
          .option("path","hdfs://qianfeng01/hdfssink")
          .option("checkpointLocation", "checkpoint")
          .start()
          .awaitTermination()
    }
}

Kafka sink

package com.qf.sparkstreaming.day04

import org.apache.spark.sql.{Dataframe, SparkSession}

object _05SinkKafka {
    def main(args: Array[String]): Unit = {
        val session: SparkSession = SparkSession.builder()
        .appName("test1").master("local[*]").getOrCreate()
        session.sparkContext.setLogLevel("ERROR")

        //作为消费者,从kafka读取数据,获取到的数据有schema,
        // 分别是 key|value|topic|partition|offset|timestamp|timestampType|
        val frame: Dataframe = session.readStream.format("kafka")
          .option("kafka.bootstrap.servers",
                  "qianfeng01:9092,qianfeng02:9092,qianfeng03:9092")
//          .option("startingOffsets","earliest")
          .option("subscribe","pet").load()

        //处理一下数据
        val frame1: Dataframe = frame.selectExpr("cast(value as String)")

        //保存到kafka中
        frame1.writeStream
          .format("kafka")
          .option("checkpointLocation", "checkpoint")
          .option("topic","good")
          .option("kafka.bootstrap.servers",
                  "qianfeng01:9092,qianfeng02:9092,qianfeng03:9092")
          .start()
          .awaitTermination()
    }
}
package com.qf.sparkstreaming.day04

import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{Dataframe, Dataset, SparkSession}

object _06SinkKafka {
    def main(args: Array[String]): Unit = {
        val session: SparkSession = SparkSession.builder().appName("test1").master("local[*]").getOrCreate()
        session.sparkContext.setLogLevel("ERROR")

        //作为消费者,从kafka读取数据,获取到的数据有schema,
        // 分别是 key|value|topic|partition|offset|timestamp|timestampType|
        val frame: Dataframe = session.readStream.format("kafka")
          .option("kafka.bootstrap.servers",
                  "qianfeng01:9092,qianfeng02:9092,qianfeng03:9092")
//          .option("startingOffsets","earliest")
          .option("subscribe","pet").load()

        import session.implicits._
        //处理一下数据
        val frame1: Dataset[String] = frame
        .selectExpr("cast(value as String)").as[String]
        val frame2: Dataset[String] = frame1.map(x => {
            val arr: Array[String] = x.split("::")
            (arr(0).toInt, arr(1), arr(2))
        }).as[(Int, String, String)]
        .filter(_._3.contains("Comedy")).toDF("id", "name", "info")
          //落地到kafka时,如果dataset描述的是多个字段的表格形式,应该合并成一个字段,才会被当成kafka的value值进行保存。
          .map(row => {
              "" + row.getAs("id") + row.getAs("name") + row.getAs("info")
          })

//        frame2.writeStream
//          .format("console")
//          .start()
//          .awaitTermination()


//
//        //保存到kafka中
        frame2.writeStream
          .format("kafka")
          .outputMode(OutputMode.Append())
          .option("checkpointLocation", "checkpoint")
          .option("topic","good")
          .option("kafka.bootstrap.servers",
                  "qianfeng01:9092,qianfeng02:9092,qianfeng03:9092")
          .start()
          .awaitTermination()
    }
}

Mysql Sink

package com.qf.sparkstreaming.day04

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{Dataframe, Dataset, ForeachWriter, Row, SparkSession}

object _06SinkMysql {
    def main(args: Array[String]): Unit = {
        val session: SparkSession = SparkSession.builder().appName("test1").master("local[*]").getOrCreate()
        session.sparkContext.setLogLevel("ERROR")

        //作为消费者,从kafka读取数据,获取到的数据有schema,
        // 分别是 key|value|topic|partition|offset|timestamp|timestampType|
        val frame: Dataframe = session.readStream.format("kafka")
          .option("kafka.bootstrap.servers","qianfeng01:9092,qianfeng02:9092,qianfeng03:9092")
//          .option("startingOffsets","earliest")
          .option("subscribe","pet").load()

        import session.implicits._
        //处理一下数据
        val frame1: Dataset[String] = frame.selectExpr("cast(value as String)").as[String]
        val frame2: Dataframe = frame1.map(x => {
            val arr: Array[String] = x.split("::")
            (arr(0).toInt, arr(1), arr(2))
        }).as[(Int, String, String)].filter(_._3.contains("Comedy")).toDF("id", "name", "info")


        //保存到mysql中
        frame2.writeStream
            .foreach(new MyWriter)
          .start()
          .awaitTermination()
    }
}
class MyWriter extends ForeachWriter[Row]{
    private var connection:Connection = _
    private var statement:PreparedStatement = _
    //连接mysql,打开连接
    override def open(partitionId: Long, version: Long): Boolean = {
        //加载驱动
        Class.forName("com.mysql.jdbc.Driver")
        connection= DriverManager.getConnection("jdbc:mysql://localhost:3306/sz2003_db", "root", "123456")
        statement= connection.prepareStatement(s"insert into movie values (?,?,?)")
        true
    }

    
    override def process(value: Row): Unit = {
        //给问号赋值
        statement.setInt(1,value.getAs("id"))
        statement.setString(2,value.get(1).toString)
        statement.setString(3,value.get(2).toString)
        //执行
        statement.execute()
    }

    
    override def close(errorOrNull: Throwable): Unit = {
        connection.close()
    }
}

使用Statement

package com.qf.sparkstreaming.day04

import java.sql.{Connection, DriverManager, Statement}

import org.apache.spark.sql.{Dataframe, Dataset, ForeachWriter, Row, SparkSession}

object _08SinkMysql {
    def main(args: Array[String]): Unit = {
        // 1. 创建 SparkSession
        val session = SparkSession.builder()
          .appName("hdfs_sink")
          .master("local[6]")
          .getOrCreate()

        import session.implicits._

        //作为消费者,从kafka读取数据,获取到的数据有schema,
        // 分别是 key|value|topic|partition|offset|timestamp|timestampType|
        val frame: Dataframe = session.readStream.format("kafka")
          .option("kafka.bootstrap.servers","qianfeng01:9092,qianfeng02:9092,qianfeng03:9092")
          //          .option("startingOffsets","earliest")
          .option("subscribe","pet").load()

        import session.implicits._
        //处理一下数据
        val frame1: Dataset[String] = frame.selectExpr("cast(value as String)").as[String]
        val frame2: Dataframe = frame1.map(x => {
            val arr: Array[String] = x.split("::")
            (arr(0).toInt, arr(1), arr(2))
        }).as[(Int, String, String)].toDF("id", "name", "category")

        // 4. 落地到 MySQL
        class MySQLWriter extends ForeachWriter[Row] {
            private val driver = "com.mysql.jdbc.Driver"
            private var connection: Connection = _
            private val url = "jdbc:mysql://localhost:3306/sz2003_db"
            private var statement: Statement = _

            override def open(partitionId: Long, version: Long): Boolean = {
                Class.forName(driver)
                connection = DriverManager.getConnection(url,"root","123456")
                statement = connection.createStatement()
                true
            }

            
            override def process(value: Row): Unit = {
                statement.executeUpdate(s"insert into movie values(${value.get(0)}, '${value.get(1)}', '${value.get(2)}')")
            }

            override def close(errorOrNull: Throwable): Unit = {
                connection.close()
            }
        }

        frame2.writeStream
          .foreach(new MySQLWriter)
          .start()
          .awaitTermination()
    }
}

Trigger

package com.qf.sparkstreaming.day04


import org.apache.spark.sql._
import org.apache.spark.sql.streaming.Trigger



object _09Trigger {
    def main(args: Array[String]): Unit = {
        val session: SparkSession = SparkSession.builder().appName("test1").master("local[*]").getOrCreate()
        session.sparkContext.setLogLevel("ERROR")

        //作为消费者,从kafka读取数据,获取到的数据有schema,
        // 分别是 key|value|topic|partition|offset|timestamp|timestampType|
        val frame: Dataframe = session.readStream.format("kafka")
          .option("kafka.bootstrap.servers","qianfeng01:9092,qianfeng02:9092,qianfeng03:9092")
          //          .option("startingOffsets","earliest")
          .option("subscribe","pet").load()

        //处理一下数据
        val frame1: Dataframe = frame.selectExpr("cast(value as String)")

        //保存到kafka中
        frame1.writeStream
          .format("console")
          .trigger(Trigger.ProcessingTime(0))
          .start()
          .awaitTermination()
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/457798.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号