栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Flink之State状态编程

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Flink之State状态编程

文章目录

1.State分类2.算子状态(Operator State)

2.1 算子状态的数据结构(non-keyed state) 3.键控状态(keyed State)

3.1 键控状态数据结构 4.状态后端(state backends)5.状态编程

1.State分类


State[ValueState、ReadOnlyBroadcastState、MapState、AppendingState]

AppendingState[FoldingState、MergingState]

MergingState[ListState、AggregatingState、ReducingState]

在flink中,状态始终与特定算子相关联,像reduce、sum等算子都是默认带状态的,而map、flatmap本身时不带状态的,如果需要用到状态,可以自定义

为了使运行的flink了解算子的状态,算子需要预先注册其状态

算子状态(Operator State): 算子状态的作用范围限定为算子任务键控状态(keyed State):生产中应用案例较多, 根据输入数据流中定义的key来维护和访问

不管是哪种类型的State,都有2种不同的状态,raw(原生状态)、managed(Flink设定好的状态)

managed状态由Flink-runtime控制,类似于RocksDB、HashTable、Fs;类似于ValueState、ListState,Flink-runtime能将状态进行特定的编码,然后写入到检查点,所有的算子都能使用managed-stateraw状态而是将state维护在自己的数据结构,当checkpoint的时候,只会将state以序列化的形式写进checkpoint,flink只能看到原生的字节,而对state的数据结构一无所知 2.算子状态(Operator State)

2.1 算子状态的数据结构(non-keyed state)

列表状态(list state)
联合列表状态(union list state)
广播状态(broadast)

3.键控状态(keyed State)

3.1 键控状态数据结构

值状态(value state)
列表状态 list state
映射状态 map state
聚合状态(reducing state & aggregating state)

4.状态后端(state backends)

MemoryStateBackend(一般用于测试环境)

FsStateBackend(将checkpoint存在文件系统中,本地状态还是存在taskmanager本地内存中,不适合超大状态的存储)

RocksDBStateBackend(将所有状态序列化后,存入本地RocksDB(kv存储介质)中存储)

5.状态编程
package com.shufang.flink.state

import com.shufang.flink.bean.SensorReading
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

object StateDemo {

  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.getConfig.setAutoWatermarkInterval(300)
    //    env.setStateBackend(new MemoryStateBackend())

    val sensorStream: DataStream[SensorReading] = env.socketTextStream("localhost", 9999)
      .map(a => {
        val strings: Array[String] = a.split(",")
        SensorReading(strings(0), strings(1).trim.toLong, strings(2).trim.toDouble)
      }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(2)) {
      override def extractTimestamp(element: SensorReading): Long = {
        element.timeStamp
      }
    })

    val processedStream: DataStream[(String, Double, Double)] = sensorStream.keyBy(_.id)
      .process(new MyProcessFunction01)

    val processedStream02: DataStream[(String, Double, Double)] = sensorStream.keyBy(_.id)
      .flatMap(new MyFlatMapFunction)

    
    val processStream03: DataStream[(String, Double, Double)] = sensorStream.keyBy(_.id)
      .flatMapWithState[(String, Double, Double), Double] {
        //如果状态为空,那么只更新state为当前temp
        case (sensor: SensorReading, None) => (List.empty, Some(sensor.temperture))
        //实际上,这里使用Option来维持状态的,没有状态保存而获取的话,就相当于getorelse(0)

        case (sensor: SensorReading, pretemp: Some[Double]) =>
          val lastTemp: Double = pretemp.get
          val diff: Double = (sensor.temperture - lastTemp).abs
          if (diff > 10) {
            (Seq((sensor.id, lastTemp, sensor.temperture)), Some(sensor.temperture))
          } else {
            (List(), Some(sensor.temperture))
          }
      }
    processStream03.print("flatMapWith-result")
//    processedStream02.print("报警信息-温度波动过大")
    sensorStream.print("输入数据")

    env.execute("state")
  }
}



class MyFlatMapFunction extends RichFlatMapFunction[SensorReading, (String, Double, Double)] {

  private var preTemp: ValueState[Double] = _
  //利用open函数的特性,在初始化的时候就执行
  override def open(parameters: Configuration): Unit = {
    preTemp = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp", classOf[Double]))
  }

  override def flatMap(value: SensorReading,
                       out: Collector[(String, Double, Double)]): Unit = {
    val lastTemp: Double = preTemp.value()

    if ((value.temperture - lastTemp).abs > 10) {
      out.collect((value.id, lastTemp, value.temperture))
    }
    preTemp.update(value.temperture)
  }
}



class MyProcessFunction01 extends KeyedProcessFunction[String, SensorReading, (String, Double, Double)] {
  //声明State
  lazy val pretemp: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("pretemp", classOf[Double]))

  override def processElement(
                               value: SensorReading,
                               ctx: KeyedProcessFunction[String, SensorReading, (String, Double, Double)]#Context,
                               out: Collector[(String, Double, Double)]): Unit = {
    //调用state
    val lastTemp: Double = pretemp.value()
    val currentTemp: Double = value.temperture

    if ((currentTemp - lastTemp).abs > 10) {
      out.collect((value.id, lastTemp, currentTemp))
    }

    //更新state
    pretemp.update(currentTemp)

  }
}

state设置ttl

// 设置ttl的配置
val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build
 //声明状态描述器   
val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
//state.enableTimeToLive(ttl配置)
  stateDescriptor.enableTimeToLive(ttlConfig)
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/735346.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号