在使用滑动窗口时,每次计算的数据有部分是包含上一次计算的数据的,还要计算一次,会重复计算,所以可以每次将计算的结果保存起来,下一次计算时只需要将不要的数据删除,加上这次新来的数据,底层的优化,这部分的优化也是由框架自动实现的
worDS.reduceByKeyAndWindow( (x: Int, y: Int) => x + y //传入的聚合函数 , (x: Int, y: Int) => x - y //传入的聚合函数,做减法,针对多余的数据做减法处理 , Durations.seconds(10) //窗口的时间大小 , Durations.seconds(5)) //多久一个任务,滑动一次,前面指定了 .print()2、稽查布控(spark Streaming的应用)
采集数据(生产数据)–>消息队列–>sparkStreaming(消费数据)–>filter(过滤操作)–>写回MySQL
—在查询数据的时候,根据黑名单输出指定的数据
—使用nc模拟数据采集读取数据处理过程:
* 1、创建集合存储黑名单数据,运行使用发现List不合理,不能动态的添加删除黑名单的数据
* 2、使用MySQL读取数据,放在外面还是不能实现动态的添加删除数据
* 3、算子外面的代码在Driver端执行的,只是执行一次,所以需要将连接放在算子内部
* val nameList: List[String] = get_Conn()
* 4、因此将MySQL连接放在内部,在Executor端执行,但是每次执行都需要创建一个连接
* 来一条数据创建一个,如果数据量过大,会将MYSQL搞崩溃
* 5、优化使用高性能算子mapPartitions(),每一个分区创建一个连接,一个分区执行一次
* 6、使用transform()算子,transform内的代码每个batch会执行一次处理完成写回MySQL过程:
* 1、将最终包含黑名单的数据结果写回MySQL数据库,使用foreachRDD,刚好和transform是一对一起使用的
* 2、由于连接不能序列化,不能在网络中传输,需要使用
foreachPartition,每个分区创建一个连接
object Demo04BuKong {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setAppName("Demo04BuKong")
conf.setMaster("local[2]")
val sc: SparkContext = new SparkContext(conf)
//创建StreamingContext对象
val ssc: StreamingContext = new StreamingContext(sc, Durations.seconds(5))
//创建连接nc,模拟实时数据流
val pointDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)
//进行filter操作
//pointDS是Dstream类型,底层还是RDD的转化
val point_end: DStream[String] = pointDS.transform(rdd => {
//每个batch执行一次,每5s执行一次
//可以再优化,使用广播
val nameList: List[String] = get_Conn()
//直接使用上面的sc可能解析会有问题,通过rdd新建一个SparkContext连接用于广播
val rddsc: SparkContext = rdd.sparkContext
val brostlist: Broadcast[List[String]] = rddsc.broadcast(nameList)
val filterRDD = rdd.filter(point => {
val splits: Array[String] = point.split(",") //进来的一条条数据
val mdn: String = splits(0) //将手机号切出来,获取手机号
brostlist.value.contains(mdn) //返回集合中包含的手机号
})
filterRDD
})
//将最终结果写回mysql,使用foreachRDD,刚好和transform是一对一起使用的
point_end.foreachRDD(rdd => {
rdd.foreachPartition(rd => {
Class.forName("com.mysql.jdbc.Driver")
val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/shujiaku"
, "root"
, "123456")
val statement = conn.prepareStatement("insert into result values(?)")
rd.foreach(line => {
statement.setString(1, line)
statement.addBatch() //批处理插入数据
})
statement.executeBatch() //执行
conn.close() //关闭连接
})
})
//启动任务
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
def get_Conn(): List[String] = {
//创建ListBuffer存储查询的数据
val mdnList: ListBuffer[String] = new ListBuffer[String]()
Class.forName("com.mysql.jdbc.Driver")
val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/shujiaku"
, "root"
, "123456")
val statement: Statement = conn.createStatement()
val sql: String = "select * from mdnn"
val result: ResultSet = statement.executeQuery(sql)
while (result.next()) {
val mdn: String = result.getString("mdn")
mdnList.append(mdn)
}
conn.close()
//返回list数据
mdnList.toList
}
}



