一、State的存活时间
任何的keyed state都有存活时间,如果配置了TTL;,且状态值已过期,会尽大可能清除对应的值。
设置ValueState存活时间
package cn._51doit.flink.day08;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
//设置ValueState的存活时间
public class KeyedStateTTLDemo {
public static void main(String[] args) throws Exception{
//创建Flink流计算执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000);
//设置重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));
//创建DataStream
//Source
DataStreamSource lines = env.socketTextStream("localhost", 8888);
//调用Transformation开始
//调用Transformation
SingleOutputStreamOperator> wordAndOne = lines.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String line, Collector> collector) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
if("error".equals(word)) {
throw new RuntimeException("出现异常了!!!!!");
}
//new Tuple2(word, 1)
collector.collect(Tuple2.of(word, 1));
}
}
});
//分组
KeyedStream, String> keyed = wordAndOne.keyBy(t -> t.f0);
keyed.map(new RichMapFunction, Tuple2>() {
private transient ValueState counter;
@Override
public void open(Configuration parameters) throws Exception {
//定义一个状态TTLCOnfig
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(10))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
//想使用状态,先定义一个状态描述器(State的类型,名称)
ValueStateDescriptor stateDescriptor = new ValueStateDescriptor<>("wc-desc", Integer.class);
//关联状态描述器
stateDescriptor.enableTimeToLive(ttlConfig);
//初始化或恢复历史状态
counter = getRuntimeContext().getState(stateDescriptor);
}
@Override
public Tuple2 map(Tuple2 input) throws Exception {
//String word = input.f0;
Integer currentCount = input.f1;
//从ValueState中取出历史次数
Integer historyCount = counter.value(); //获取当前key对应的value
if(historyCount == null) {
historyCount = 0;
}
Integer total = historyCount + currentCount; //累加
//跟新状态(内存中)
counter.update(total);
input.f1 = total; //累加后的次数
return input;
}
}).print();
//启动执行
env.execute("StreamingWordCount");
}
}
二、案例统计去重人数
用户ID,活动ID,事件类型(1:浏览,2:参与)、 user1, A, 1 user1, A, 1 user1, A, 2 user2, A, 1 user2, B, 1 统计:各个活动,各个事件的人数和次数
package cn._51doit.flink.day08;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
public class ActivityCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource lines = env.socketTextStream("localhost", 8888);
//对数据进行整理
SingleOutputStreamOperator> tpDataStream = lines.map(new MapFunction>() {
@Override
public Tuple3 map(String line) throws Exception {
String[] fields = line.split(",");
return Tuple3.of(fields[0], fields[1], fields[2]);
}
});
KeyedStream, Tuple2> keyed = tpDataStream.keyBy(new KeySelector, Tuple2>() {
@Override
public Tuple2 getKey(Tuple3 value) throws Exception {
return Tuple2.of(value.f1, value.f2);
}
});
//KeyedStream, Tuple2> keyed = tpDataStream.keyBy(t -> Tuple2.of(t.f1, t.f2), TypeInformation.of(new TypeHint>() {}));
SingleOutputStreamOperator> result = keyed.process(new ActivityCountFunction());
result.print();
env.execute();
}
}
定义ActivityCountFunction:
package cn._51doit.flink.day08; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; import java.util.HashSet; public class ActivityCountFunction extends KeyedProcessFunction, Tuple3 , Tuple4 > { private transient ValueState actCountState; private transient ValueState > userDisState; @Override public void open(Configuration parameters) throws Exception { ValueStateDescriptor stateDescriptor1 = new ValueStateDescriptor<>("ac-count", Long.class); actCountState = getRuntimeContext().getState(stateDescriptor1); ValueStateDescriptor > stateDescriptor2 = new ValueStateDescriptor >("dis-ac-count", TypeInformation.of(new TypeHint >(){})); userDisState = getRuntimeContext().getState(stateDescriptor2); } @Override public void processElement(Tuple3 value, Context ctx, Collector > out) throws Exception { //计算次数 Long historyCount = actCountState.value(); if(historyCount == null) { historyCount = 0L; } Long totalCount = historyCount + 1; actCountState.update(totalCount); //计算人数 HashSet disUserSet = userDisState.value(); if (disUserSet == null) { disUserSet = new HashSet<>(); } disUserSet.add(value.f0); userDisState.update(disUserSet); //输出结果 out.collect(Tuple4.of(value.f1, value.f2, totalCount, (long) disUserSet.size())); } }
三、Broadcast State的使用
实时关连数据库的维度数据



