栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

13-flink-1.10.1-Flink状态管理

13-flink-1.10.1-Flink状态管理

目录

1 Flink 中的状态

2 算子状态(Operator State)

 2.1 算子状态类型

3 键控状态(keyed State)

 3.1 键控状态数据结构

3.2 键控状态使用

3.3  实例

3.3.1 第一种实现方式

3.3.2 第二种实现方式


1 Flink 中的状态

  • 由一个任务维护,并且用来计算某个结果的所有数据,都属于这个任务的状态
  • 可以认为状态就是一个本地变量,可以被任务的业务逻辑访问,(我们自己定义的简单的本地变量是不可以的,因为flink是分布式的,要保证状态一致性需要序列化反序列化,flink底层已经实现一套机制。)
  • Flink会进行状态管理,包括状态一致性,故障处理以及高效存储和访问,以便开发人员专注于应用程序的逻辑
  •  其实对于一些简单的算子比如map filter 可以没有状态,一些复查的算子比如reduce,比如窗口计算或者一些聚合计算需要有状态。flink 计算结果的数据实际上就是依赖2份数据了,第一份就是新输入的数据,第二份就是上一次计算过得保存成一个变量的数据(这个就是状态)

在flink中,状态始终与特定算子相关联,

为了使运行的flink了解算子的状态,算子需要预先注册其状态

总的来说有两种类型状态:

算子状态(operator State)

      算子状态的作用范围限定为算子任务

键控状态(keyed State)

     根据输入数据流中的定义的键(key)来维护和访问

2 算子状态(Operator State)

  • 算子状态的作用范围限定为算子任务,由同一并行任务所处理的所有数据都可以访问到相同的状态。
  • 状态对同一子任务而言是共享的,也就是如上图上面一个task1里的所有数据都可以访问上面一个task1的状态。
  • 算子啊状态不能有相同或者不同算子的另一个子任务访问。 也就是如上图上面一个task1的子任务的不能访问下面task子任务的状态,反之亦然。

 2.1 算子状态类型

① 列表状态(list state)

将状态表示为一组数据的列表

② 联合列表状态(Union list state)

也将状态表示为数据的列表,它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序是如何恢复

③ 广播状态(brodcast state)

如果一个算子有多个任务,而他的每个任务的状态又都相同,那么这种特殊情况最适合使用广播状态。

3 键控状态(keyed State)

重点使用

  • 键控状态是根据输入数据流中定义的键来维护和访问的。
  • flink 为每一个key维护一个状态实例,并将具有相同键的所有数据,都分区到相同一个算子任务中,这个任务会维护和处理这个key对应的状态
  • 当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key。

 3.1 键控状态数据结构

① 值状态(value state)

将状态表示成单个的值

② 列表状态(list state)

将状态表示成一组数据列表

③ 映射状态(map state)

将状态表示为一组key-value对

聚合状态(reduce state & aggregating state)

将状态表示为一个用于集合操作的列表

3.2 键控状态使用

从上面使用状态过程可以看出,使用的过程中使用了运行时上下文,那么运行时上下文,在RichFunction是可以获取运行时上下文的,键控状态的使用位置也是在RichFunction内使用的,这是前提

举例测试代码

package com.study.liucf.unbounded.state

import java.util.Map
import java.{lang, util}

import com.study.liucf.bean.LiucfSensorReding
import com.study.liucf.unbounded.transform.LiucfReduce
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, MapState, MapStateDescriptor, ReducingState, ReducingStateDescriptor, ValueState, ValueStateDescriptor}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time


object LiucfStateTest {
  def main(args: Array[String]): Unit = {
    
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //为从此环境创建的所有流设置时间特性
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)
    
    val paramTool = ParameterTool.fromArgs(args)
    val ip = paramTool.get("ip")
    val port = paramTool.getInt("port")
    
    //    val sockerInput: DataStream[String] = env.socketTextStream("192.168.109.151", 9999)
    val sockerInput: DataStream[String] = env.socketTextStream(ip, port)
    
    val transRes = sockerInput
      .map(d=>{
        val arr = d.split(",")
        LiucfSensorReding(arr(0),arr(1).toLong,arr(2).toDouble)
      })

    
    transRes.print("result")

    
    env.execute(" liucf watermark test")
  }
}


class LiucfRichMapFunction extends RichMapFunction[LiucfSensorReding,Tuple2[String,Double]]{
  
  var valueState:ValueState[Double] = _
  
  lazy val listState:ListState[Long] = getRuntimeContext.getListState(new ListStateDescriptor("listState",classOf[Long]))
  
  lazy val mapState:MapState[String,LiucfSensorReding] = getRuntimeContext.getMapState(new MapStateDescriptor("mapState",classOf[String],classOf[LiucfSensorReding]))
  
  lazy val reduceState:ReducingState[LiucfSensorReding] = getRuntimeContext.getReducingState(new ReducingStateDescriptor("reduceState",new LiucfReduce(),classOf[LiucfSensorReding]))

  override def open(parameters: Configuration): Unit = {
    getRuntimeContext.getState(new ValueStateDescriptor("valueState",classOf[Double]))

  }

  override def map(value: LiucfSensorReding): (String, Double) = {
    
    val valueStateV = valueState.value() // 取出valuestate 值
    valueState.update(value.temperature) // 更新valuestate 值
    
    val listStateIterable = listState.get() // 取出listState列表
    val lst: util.ArrayList[Long]= new util.ArrayList()
    lst.add(10000L)
    lst.add(2000L)
    listState.addAll(lst) // 更新listState-追加一个列表进入原状态列表
    listState.update(lst) // 更新listState-覆盖更新一个列表进入状态列表
    listState.add(value.timestamp) // 更新listState-追加1个元素进入状态列表
    
    mapState.contains(value.id) // 判断key中是否包含
    val iterable: lang.Iterable[Map.Entry[String, LiucfSensorReding]] = mapState.entries() // 取出valuestate 整个集合
    val reding: LiucfSensorReding = mapState.get(value.id) // 取出valuestate ke-value中的value
    mapState.put(value.id,value) // 更新mapState-追加1个元素进入状态列表

    
    val liucfSensorReding = reduceState.get() // 取出reduceState 值
    reduceState.add(value) // 更新reduceState 值
    (value.id,value.timestamp)
  }

}

3.3  实例

需求:

     每个传感器采集到的温度前5条记录求平均值,第六条开始,如果采集到的温度与之前记录的平均值的差不大于10度,则该条采集记录参与平均值重新计算;如果采集到的温度与之前记录的平均值的差大于等于10度,则进行抖动过大告警,且这条采集记录不参与平均值计算。

3.3.1 第一种实现方式
package com.study.liucf.unbounded.state

import java.util.Map
import java.{lang, util}

import com.study.liucf.bean.{LiucfSensorReding, LiucfSensorRedingAgg}
import com.study.liucf.unbounded.transform.LiucfReduce
import org.apache.flink.api.common.functions.{ReduceFunction, RichFilterFunction, RichFlatMapFunction, RichMapFunction}
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, MapState, MapStateDescriptor, ReducingState, ReducingStateDescriptor, ValueState, ValueStateDescriptor}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector


object LiucfStateDemo1 {
  val EXAMPLE_SIZE=5L
  def main(args: Array[String]): Unit = {

    
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //为从此环境创建的所有流设置时间特性
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)
    
    val paramTool = ParameterTool.fromArgs(args)
    val ip = paramTool.get("ip")
    val port = paramTool.getInt("port")
    
    //    val sockerInput: DataStream[String] = env.socketTextStream("192.168.109.151", 9999)
    val sockerInput: DataStream[String] = env.socketTextStream(ip, port)
    
    val transRes = sockerInput
      .map(d=>{
        val arr = d.split(",")
        LiucfSensorRedingAgg(arr(0),arr(1).toLong,arr(2).toDouble,0.0)
      })
      .keyBy(_.id)
        .flatMap(new LiucfFlatMapRichFunction(EXAMPLE_SIZE))

    
    transRes.print("result")

    
    env.execute(" liucf watermark test")
  }
}

class LiucfFlatMapRichFunction(exampleSize:Long) extends RichFlatMapFunction[LiucfSensorRedingAgg,LiucfSensorRedingAgg]{
  
  lazy val sensorCounterValueState:ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("sensorCounterValueState",classOf[Long]))
  
  lazy val sensorReduceState:ReducingState[LiucfSensorRedingAgg] = getRuntimeContext.getReducingState(new ReducingStateDescriptor[LiucfSensorRedingAgg]("sensorReduceState",new LiucfReduceFunction(),classOf[LiucfSensorRedingAgg]))

  override def flatMap(value: LiucfSensorRedingAgg, out: Collector[LiucfSensorRedingAgg]): Unit = {
    
    var sensorCounter = sensorCounterValueState.value()

    
    if(sensorCounter<=exampleSize){//前exampleSize条采集记录只做为采样: 计数和reduce相加,不做告警
      sensorCounter += 1 // 新采集的数据到来是,当前计数的状态值+1
      sensorCounterValueState.update(sensorCounter)//更新状态值为+1后的值


      if(null == sensorReduceState.get()){ //第一条采集的数据到达时因为是初始的reducestate 因此状态值时null,需要特殊处理
        
        val sensor = LiucfSensorRedingAgg(value.id,value.timestamp,value.temperature,value.temperature)
        sensorReduceState.add(sensor)
      }else{// 不是当前传感器的采集的第一条数据时,把新到达的数据传入reduce,然后就会和之前的状态值记录的值进行聚合操作。
        sensorReduceState.add(value)
      }
    }else{ // 采样已经结束,进入告警期,然后开始安装规则判断告警和计算
      
      val aggTemperature = sensorReduceState.get().target
      
      val avgTemperature = aggTemperature / sensorCounter
      if((avgTemperature-value.temperature).abs>5){
        
        out.collect(LiucfSensorRedingAgg(value.id,value.timestamp,value.temperature,avgTemperature))
      }else{
        
        sensorReduceState.add(value)
        sensorCounter += 1
        sensorCounterValueState.update(sensorCounter)
      }
    }

  }
}


class LiucfReduceFunction() extends ReduceFunction[LiucfSensorRedingAgg]{
  
  override def reduce(value1: LiucfSensorRedingAgg, value2: LiucfSensorRedingAgg): LiucfSensorRedingAgg = {
    val newReduceStatevalue = LiucfSensorRedingAgg(value1.id,value2.timestamp,value2.temperature,value1.target+value2.temperature)
    printf("Former state: %s,new date: %s,new state:%s n",value1,value2,newReduceStatevalue)
    newReduceStatevalue
  }
}

测试输入:

➜ nc -l 9999
sensor_4,1630851514,40
sensor_4,1630851514,40
sensor_4,1630851514,40
sensor_4,1630851514,40
sensor_4,1630851514,40
sensor_4,1630851514,40
sensor_4,1630851514,40
sensor_4,1630851514,40
sensor_4,1630851514,40
sensor_4,1630851514,40
sensor_4,1630851514,10
sensor_4,1630851514,45

测试输出:

Former state: LiucfSensorRedingAgg(sensor_4,1630851514,40.0,40.0),new date: LiucfSensorRedingAgg(sensor_4,1630851514,40.0,0.0),new state:LiucfSensorRedingAgg(sensor_4,1630851514,40.0,80.0) 
Former state: LiucfSensorRedingAgg(sensor_4,1630851514,40.0,80.0),new date: LiucfSensorRedingAgg(sensor_4,1630851514,40.0,0.0),new state:LiucfSensorRedingAgg(sensor_4,1630851514,40.0,120.0) 
Former state: LiucfSensorRedingAgg(sensor_4,1630851514,40.0,120.0),new date: LiucfSensorRedingAgg(sensor_4,1630851514,40.0,0.0),new state:LiucfSensorRedingAgg(sensor_4,1630851514,40.0,160.0) 
Former state: LiucfSensorRedingAgg(sensor_4,1630851514,40.0,160.0),new date: LiucfSensorRedingAgg(sensor_4,1630851514,40.0,0.0),new state:LiucfSensorRedingAgg(sensor_4,1630851514,40.0,200.0) 
Former state: LiucfSensorRedingAgg(sensor_4,1630851514,40.0,200.0),new date: LiucfSensorRedingAgg(sensor_4,1630851514,40.0,0.0),new state:LiucfSensorRedingAgg(sensor_4,1630851514,40.0,240.0) 
Former state: LiucfSensorRedingAgg(sensor_4,1630851514,40.0,240.0),new date: LiucfSensorRedingAgg(sensor_4,1630851514,40.0,0.0),new state:LiucfSensorRedingAgg(sensor_4,1630851514,40.0,280.0) 
Former state: LiucfSensorRedingAgg(sensor_4,1630851514,40.0,280.0),new date: LiucfSensorRedingAgg(sensor_4,1630851514,40.0,0.0),new state:LiucfSensorRedingAgg(sensor_4,1630851514,40.0,320.0) 
Former state: LiucfSensorRedingAgg(sensor_4,1630851514,40.0,320.0),new date: LiucfSensorRedingAgg(sensor_4,1630851514,40.0,0.0),new state:LiucfSensorRedingAgg(sensor_4,1630851514,40.0,360.0) 
Former state: LiucfSensorRedingAgg(sensor_4,1630851514,40.0,360.0),new date: LiucfSensorRedingAgg(sensor_4,1630851514,40.0,0.0),new state:LiucfSensorRedingAgg(sensor_4,1630851514,40.0,400.0) 
result> LiucfSensorRedingAgg(sensor_4,1630851514,10.0,40.0)
Former state: LiucfSensorRedingAgg(sensor_4,1630851514,40.0,400.0),new date: LiucfSensorRedingAgg(sensor_4,1630851514,45.0,0.0),new state:LiucfSensorRedingAgg(sensor_4,1630851514,45.0,445.0) 

3.3.2 第二种实现方式
package com.study.liucf.unbounded.state

import java.util.Map
import java.{lang, util}
import com.study.liucf.bean.{LiucfSensorReding, LiucfSensorRedingAgg}
import com.study.liucf.unbounded.transform.LiucfReduce
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, MapState, MapStateDescriptor, ReducingState, ReducingStateDescriptor, ValueState, ValueStateDescriptor}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time


object LiucfStateDemo2 {
  val EXAMPLE_SIZE=5L
  def main(args: Array[String]): Unit = {
    
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //为从此环境创建的所有流设置时间特性
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)
    
    val paramTool = ParameterTool.fromArgs(args)
    val ip = paramTool.get("ip")
    val port = paramTool.getInt("port")
    
    //    val sockerInput: DataStream[String] = env.socketTextStream("192.168.109.151", 9999)
    val sockerInput: DataStream[String] = env.socketTextStream(ip, port)
    
    val transRes = sockerInput
      .map(d=>{
        val arr = d.split(",")
        LiucfSensorReding(arr(0),arr(1).toLong,arr(2).toDouble)
      })
      .keyBy(_.id)
      .flatMapWithState[LiucfSensorRedingAgg,(Long,Double)]({//[LiucfSensorRedingAgg,(Long,Double)] 表示 [返回值类型,状态类型]
        //(输入数据,状态类型) => (返回数据:第一条数据不返回,状态数据)
        case (data:LiucfSensorReding,None)=>(List.empty,Some((1,data.temperature)))
        //(输入数据,状态类型) => {处理过程,返回结果}
        case (data:LiucfSensorReding,lastState:Some[(Long,Double)])=>{
          val lastStatValue = lastState.get
          if(lastStatValue._1<=EXAMPLE_SIZE){//EXAMPLE_SIZE次以内采集期:不输出告警,状态里的采集条数和温度总和进行统计
            (List.empty,Some((lastStatValue._1+1,lastStatValue._2+data.temperature)))
          }else{//告警期
            val temperatureAvg = lastStatValue._2/lastStatValue._1 //平均温度
            if((data.temperature-temperatureAvg).abs>5){
              //满足条件输出告警,当前状态保持和上一次一样不变
              (List(LiucfSensorRedingAgg(data.id,data.timestamp,data.temperature,temperatureAvg)),lastState)
            }else{
              //不满足告警条件,继续求温度和与采集的条数
              (List.empty,Some((lastStatValue._1+1,lastStatValue._2+data.temperature)))
            }
          }
        }
      })
    
    transRes.print("result")

    
    env.execute(" liucf watermark test")
  }
}

测试输入:

➜  nc -l 9999
sensor_4,1630851514,40
sensor_4,1630851514,40
sensor_4,1630851514,40
sensor_4,1630851514,40
sensor_4,1630851514,40
sensor_4,1630851514,40
sensor_4,1630851514,40
sensor_4,1630851514,40
sensor_4,1630851514,40
sensor_4,1630851514,40
sensor_4,1630851514,10
sensor_4,1630851514,45

测试输出:

result> LiucfSensorRedingAgg(sensor_4,1630851514,10.0,40.0)

可见方式一和方式二结果是一样的。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/326289.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号