import com.alibaba.fastjson.JSONObject;
import com.poizon.bigdata.flink.job.baseFlinkJob;
import com.poizon.bigdata.flink.utils.TimeUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
@Slf4j
public class UnifyStateJobUV4 extends baseFlinkJob {
private static Logger logger = LoggerFactory.getLogger(UnifyStateJobUV4.class);
public static void main(String[] args) throws Exception {
ParameterTool parameters = ParameterTool.fromArgs(args);
Map kvMap = parameters.toMap();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(30000L);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000L);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);
env.setStateBackend(new FsStateBackend("file:///Users/wangpengpeng/dewu/checkpoint"));
DataStreamSource stringDataStreamSource = env.addSource(MyKafkaUtil.getKafkaConsumer("test", "wppp"));
//TODO 3.将每行数据转换为JSON对象
OutputTag outputTag = new OutputTag("Dirty") {};
SingleOutputStreamOperator mapStream = stringDataStreamSource.map(new RichMapFunction() {
@Override
public EventBean map(String s) throws Exception {
EventBean eventBean= JSONObject.parseObject(s, EventBean.class);
return eventBean;
}
}).setParallelism(1);
mapStream.map(new RichMapFunction() {
@Override
public void open(Configuration parameters) throws Exception {
}
@Override
public EventBean map(EventBean eventBean) throws Exception {
return eventBean;
}
}).setParallelism(1);
SingleOutputStreamOperator process = mapStream
.keyBy(row-> row.getUid())
.process(new KeyedProcessFunction() {
MapState mapState;
ValueState valueState;
@Override
public void open(Configuration configuration){
MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("uv", String.class, Integer.class);
mapState = getRuntimeContext().getMapState(mapStateDescriptor);
valueState = getRuntimeContext().getState(new ValueStateDescriptor("uv_cnt", Long.class));
}
@Override
public void processElement(EventBean eventBean, Context context, Collector collector) throws Exception {
try {
String dateTime = TimeUtil.getShortDateFormat(eventBean.getTime());
String key = dateTime + eventBean.getUid();
Integer value = mapState.get(key);
logger.info(dateTime +",value:" +value+"," +key);
if (value == null ) {
Long currentCnt = valueState.value() == null ? 0L: valueState.value();
logger.info("currentCnt" +currentCnt +", valueState.value()" + valueState.value());
valueState.update(currentCnt+ 1L);
}else {
valueState.update(valueState.value());
}
mapState.put(key, 1);
collector.collect(dateTime + "的uv:" + valueState.value());
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}).setParallelism(1);
process.print("正常数据");
env.execute("ddd");
}
}
import lombok.Data;
import lombok.Getter;
@Data
public class EventBean {
public String uid;
public Long time;
}