将计算结果保存起来,下次启动还会保留上次的历史纪录
(1)单值状态(valueState)
(2)集合 状态(listState)
(3)map状态(mapState)
(4)reuceing状态(reducingState)
使用普通的集合,但是集合map是放在taskmanager的内存中(数据在taskmanager中执行的),内存溢出
—每一条数据处理一次,每个key是独立的
object Demo01NoState {
def main(args: Array[String]): Unit = {
//创建flink环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//获取数据
val lineDS: DataStream[String] = env.socketTextStream("master", 8888)
//将单词切分
val wordsDS = lineDS.flatMap(_.split(","))
//将数据存储kv
val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1))
//将数据分组
val keybyDS: KeyedStream[(String, Int), String] = kvDS.keyBy(_._1)
keybyDS.process(new KeyedProcessFunction[String, (String, Int), (String, Int)] {
//map是放在taskmanager的内存中(数据在taskmanager中执行的)
val map = new mutable.HashMap[String, Int]()
override def processElement(value: (String, Int), ctx: KeyedProcessFunction[String, (String, Int), (String, Int)]#Context, out: Collector[(String, Int)]): Unit = {
val key: String = value._1
val i: Int = value._2
val old_count: Int = map.getOrElse(key, 0)
val new_count: Int = old_count + i
map.put(key, new_count)
out.collect(key, new_count)
}
})
.print()
env.execute()
}
}
4、valueState状态
–状态本质就是变量:value,list,map和普通的变量区别是状态会被checkpoint持久化到hdfs中如果任务中途失败,重新启动后可以接着计算
—定义状态,由于做了keyby,每个key都会创建一个状态,独立的
—获取flink的环境,父类的方法 val context: RuntimeContext = getRuntimeContext
object Demo02ValueState {
def main(args: Array[String]): Unit = {
//创建flink环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//获取数据
val lineDS: DataStream[String] = env.socketTextStream("master", 8888)
//将单词切分
val wordsDS = lineDS.flatMap(_.split(","))
//将数据存储kv
val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1))
//将数据分组
val keybyDS: KeyedStream[(String, Int), String] = kvDS.keyBy(_._1)
//使用map算子加状态统计单词数量
keybyDS.map(new MymapFunction)
.print()
env.execute()
}
}
class MymapFunction extends RichMapFunction[(String, Int), (String, Int)] {
var valuestate: ValueState[Int] = _
override def open(parameters: Configuration): Unit = {
//获取flink的环境,父类的方法
val context: RuntimeContext = getRuntimeContext
//创建状态描述的对象
val valueStateDescriptor = new ValueStateDescriptor[Int]("count", classOf[Int])
//获取创建这个状态
valuestate = context.getState(valueStateDescriptor)
}
override def map(value: (String, Int)): (String, Int) = {
//更新每一条数据的状态
val new_count: Int = value._2
val old_couunt: Int = valuestate.value()
valuestate.update(new_count + old_couunt)
//将数据发送下游
(value._1, new_count + old_couunt)
}
}
5、ListState状态
自定义类实现学生平均年龄
object Demo03ListState {
def main(args: Array[String]): Unit = {
//创建flink环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//获取学生数据
val lineDS: DataStream[String] = env.socketTextStream("master", 8888)
//将数据切分,获取学生班级和学生年龄
val kvDS: DataStream[(String, Double)] = lineDS.map(line => {
val splits: Array[String] = line.split(",")
val age = splits(2)
val clazz = splits(4)
(clazz, age.toDouble)
})
kvDS.keyBy(_._1)
.map(new MyRichFuntions)
.print()
env.execute()
}
}
class MyRichFuntions extends RichMapFunction[(String, Double), (String, Double)] {
var liststate: ListState[Double] = _
//使用liststate保存每个班级所有的年龄
override def open(parameters: Configuration): Unit = {
//获取flink的环境
val context: RuntimeContext = getRuntimeContext
val listStateDescriptor: ListStateDescriptor[Double] = new ListStateDescriptor[Double]("ages", classOf[Double])
liststate = context.getListState(listStateDescriptor)
}
override def map(value: (String, Double)): (String, Double) = {
val clazz: String = value._1
val age: Double = value._2
//将年龄保存到状态中
liststate.add(age)
//获取所有的年龄计算平均值,是个迭代器
val iner: lang.Iterable[Double] = liststate.get()
//获取迭代器
val ages: util.Iterator[Double] = iner.iterator()
var age_count: Double = 0.0
var size_count: Int = 0
while (ages.hasNext) {
val age: Double = ages.next()
age_count += age
size_count += 1
}
val ave_age: Double = age_count / size_count
(clazz, ave_age)
}
}



