我们之前学的转换算子是无法获取访问时间的时间搓信息和水位线信息的。而这在一些应用场景下,极为总要,例如MapFunction这样的map转换算子就无法访问时间戳或者当前时间的事件时间。
基于此,DataStream API 提供了一些列的Low-level转换算子。可以访问时间戳,watermark以及注册定时事件。还可以输出特定的一些事件。例如超时时间等。process function 用来构建事件驱动的应用以及实现自定义的业务逻辑(使用之前的window函数和转换算子无法实现)。例如,flink sql 就是使用process function实现的
flink 提供了8个process function:
- ProcessFunction
- KeyedProcessFunction
- CoProcessFunction
- ProcessJoinFunction
- BroadcastProcessFunction
- KeyedBroadcastProcessFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
2 KeyedProcessFunction
这里我们重点介绍KeyedProcessFunction。
KeyedProcessFunction用来操作KeyedStream .KeyedprocessFunction会处理流的每一个元素。输出为0个,1个或者多个元素。所有的Process Function 都继承自RichFunction接口,所以都有open() close(),getRunctionContext()等方法。而,KeyedProcessFunction[KEY,In,OUT]还额外提供两个方法:
processElement(v:In,ctx:Context,out:Collector[OUT])。六中的每一个元素,都会调用这个方法,调用结果将会放在Collector数据类型中输出,context
可以访问元素的时间戳,元素的key,以及timerService时间服务。Context还可以将结果输出到别的流(side outputs)
onTimer(timestamp:Long,ctx:OnTimerContext,out:Collector[OUT])是一个回调函数。当之前注册的定时器触发时调用,参数timestamp为定时器所设定的触发的时间戳。Collector为输出结果的集合,Ontimercontex和processElement的Context参数一样,提供了上线文的一些信息,例如,定时器触发的时间信息(时间时间或者处理时间)
3 定时器TimerService
4 代码整体演示
package com.study.liucf.unbounded.process
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import com.study.liucf.bean.{LiucfSensorReding, LiucfSensorRedingAgg}
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object LiucfProcessTest {
val EXAMPLE_SIZE=5L
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//为从此环境创建的所有流设置时间特性
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val paramTool = ParameterTool.fromArgs(args)
val ip = paramTool.get("ip")
val port = paramTool.getInt("port")
// val sockerInput: DataStream[String] = env.socketTextStream("192.168.109.151", 9999)
val sockerInput: DataStream[String] = env.socketTextStream(ip, port)
val transRes = sockerInput
.map(d=>{
val arr = d.split(",")
LiucfSensorReding(arr(0),arr(1).toLong,arr(2).toDouble)
})
// .process(new LiucfProcessFunction("这个函数是没有被实现过的")) //如果基于当前流进行process处理直接在这里调用就就可以了
.keyBy(_.id)//按照传感器id分组,然后后面就可以是用keyedprocess类
.process(new LiucfKeyedProcessFunction())// 参数可以使用KeyedProcessFunction
transRes.print("result")
env.execute(" liucf process test")
}
}
class LiucfKeyedProcessFunction extends KeyedProcessFunction[String,LiucfSensorReding,(String,Double)]{
var myvalueState:ValueState[Int] = _
override def open(parameters: Configuration): Unit = {
myvalueState= getRuntimeContext.getState(new ValueStateDescriptor[Int]("myvalueState",classOf[Int]))
}
override def processElement(value: LiucfSensorReding,
ctx: KeyedProcessFunction[String, LiucfSensorReding, (String, Double)]#Context,
out: Collector[(String, Double)]
): Unit = {
ctx.getCurrentKey //Get key of the element being processed.
ctx.timestamp()//Timestamp of the element currently being processed or timestamp of a firing timer.
ctx.timerService().currentWatermark()//Returns the current event-time watermark.
ctx.timerService().registerEventTimeTimer(ctx.timestamp()+60000L)//
}
override def onTimer(timestamp: Long,
ctx: KeyedProcessFunction[String, LiucfSensorReding, (String, Double)]#OnTimerContext,
out: Collector[(String, Double)]): Unit = {
}
}
5 KeyedProcessFunction-实例
需求:如果同一个采集器联系采集到的十条数据都在持续上升那么就进行告警
package com.study.liucf.unbounded.process
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import com.study.liucf.bean.LiucfSensorReding
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object LiucfKedProcessTest {
val EXAMPLE_SIZE=5L
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//为从此环境创建的所有流设置时间特性
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val paramTool = ParameterTool.fromArgs(args)
val ip = paramTool.get("ip")
val port = paramTool.getInt("port")
// val sockerInput: DataStream[String] = env.socketTextStream("192.168.109.151", 9999)
val sockerInput: DataStream[String] = env.socketTextStream(ip, port)
val transRes = sockerInput
.map(d=>{
val arr = d.split(",")
LiucfSensorReding(arr(0),arr(1).toLong,arr(2).toDouble)
})
.keyBy(_.id)
.process(new LiucfKedProcessForTempIncreWraning(10000L))//处理连续上升告警
transRes.print("result")
env.execute(" liucf process test")
}
}
class LiucfKedProcessForTempIncreWraning(interval:Long) extends KeyedProcessFunction[String,LiucfSensorReding,String]{
lazy val lastTempState:ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTempState",classOf[Double]))
lazy val timerTsState:ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("timerTsState",classOf[Long]))
override def processElement(value: LiucfSensorReding,
ctx: KeyedProcessFunction[String, LiucfSensorReding, String]#Context,
out: Collector[String]): Unit = {
println("开始processElement。。。")
val lastTempValue=lastTempState.value() //取出上一次的温度值
val timerTsValue = timerTsState.value()
println("lastTempValue="+lastTempValue+" timerTsValue="+timerTsValue)
//把新采集到的温度值更新到状态里用于下一次再来一条温度数据时比较的时候用
lastTempState.update(value.temperature)
//当前采集到的温度数据和上一次记录的温度数据状态进行比较
if(value.temperature>lastTempValue && timerTsValue==0){
//如果温度上升,且没有定时器的时候。注册当前新采集到的温度数据到达时候的时间戳之后interval秒的定时器,
// 这里简单的使用flink process time 来处理
val ts = ctx.timerService().currentProcessingTime() + interval //定时器时间戳
//注册定时器
ctx.timerService().registerProcessingTimeTimer(ts)
//吧当前定时的定时时间戳保存到值状态里
timerTsState.update(ts)
println("开始定时。。。")
} else if(value.temperaturelastTempValue && timerTsValue==0)里的条件了
ctx.timerService().deleteProcessingTimeTimer(timerTsValue)
timerTsState.clear()
println("温度开始下降...")
}
}
override def onTimer(timestamp: Long,
ctx: KeyedProcessFunction[String, LiucfSensorReding, String]#OnTimerContext,
out: Collector[String]): Unit = {
out.collect("传感器:"+ctx.getCurrentKey+" 在 "+timestamp +" 后连续 "+interval/1000 +"秒采集到的温度值连续升高")
//告警触发后定时器不用删除了,但是告警时间戳状态需要清除,方面下一次重头赋值状态。
timerTsState.clear()
}
}
6 processfunction-自定义侧输出流
需求:把采集到的温度分为主流(高温流)和侧边流(低温流)分别进行输出
package com.study.liucf.unbounded.process
import org.apache.flink.streaming.api.functions.ProcessFunction
import com.study.liucf.bean.LiucfSensorReding
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object LiucfProcessSideOutputStreamTest {
val EXAMPLE_SIZE=5L
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//为从此环境创建的所有流设置时间特性
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val paramTool = ParameterTool.fromArgs(args)
val ip = paramTool.get("ip")
val port = paramTool.getInt("port")
// val sockerInput: DataStream[String] = env.socketTextStream("192.168.109.151", 9999)
val sockerInput: DataStream[String] = env.socketTextStream(ip, port)
val transRes = sockerInput
.map(d=>{
val arr = d.split(",")
LiucfSensorReding(arr(0),arr(1).toLong,arr(2).toDouble)
})
val highTempDateStream = transRes
.process(new LiucfSideOutputStream(10))
//输出主流
highTempDateStream.print("high")
//输出侧边流
highTempDateStream
.getSideOutput(
new OutputTag[(String,Long,Double)]("low")
).print("low-Temperature")
// transRes.print("result")
env.execute(" liucf process test")
}
}
class LiucfSideOutputStream(threshold:Long) extends ProcessFunction[LiucfSensorReding,LiucfSensorReding]{
override def processElement(value: LiucfSensorReding,
ctx: ProcessFunction[LiucfSensorReding, LiucfSensorReding]#Context,
out: Collector[LiucfSensorReding]): Unit = {
if(value.temperature



