栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

大数据之flink状态

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

大数据之flink状态

一、State的存活时间

任何的keyed state都有存活时间,如果配置了TTL;,且状态值已过期,会尽大可能清除对应的值。

设置ValueState存活时间

package cn._51doit.flink.day08;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

//设置ValueState的存活时间
public class KeyedStateTTLDemo {

    public static void main(String[] args) throws Exception{

        //创建Flink流计算执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(10000);
        //设置重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));

        //创建DataStream
        //Source
        DataStreamSource lines = env.socketTextStream("localhost", 8888);

        //调用Transformation开始
        //调用Transformation
        SingleOutputStreamOperator> wordAndOne = lines.flatMap(new FlatMapFunction>() {
            @Override
            public void flatMap(String line, Collector> collector) throws Exception {
                String[] words = line.split(" ");
                for (String word : words) {
                    if("error".equals(word)) {
                        throw new RuntimeException("出现异常了!!!!!");
                    }
                    //new Tuple2(word, 1)
                    collector.collect(Tuple2.of(word, 1));
                }
            }
        });

        //分组
        KeyedStream, String> keyed = wordAndOne.keyBy(t -> t.f0);

        keyed.map(new RichMapFunction, Tuple2>() {

            private transient ValueState counter;

            @Override
            public void open(Configuration parameters) throws Exception {
                //定义一个状态TTLCOnfig
                StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(10))
                        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                        .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                        .build();

                //想使用状态,先定义一个状态描述器(State的类型,名称)
                ValueStateDescriptor stateDescriptor = new ValueStateDescriptor<>("wc-desc", Integer.class);
                //关联状态描述器
                stateDescriptor.enableTimeToLive(ttlConfig);
                //初始化或恢复历史状态
                counter = getRuntimeContext().getState(stateDescriptor);
            }

            @Override
            public Tuple2 map(Tuple2 input) throws Exception {
                //String word = input.f0;
                Integer currentCount = input.f1;
                //从ValueState中取出历史次数
                Integer historyCount = counter.value(); //获取当前key对应的value
                if(historyCount == null) {
                    historyCount = 0;
                }
                Integer total = historyCount + currentCount; //累加
                //跟新状态(内存中)
                counter.update(total);
                input.f1 = total; //累加后的次数
                return input;
            }
        }).print();

        //启动执行
        env.execute("StreamingWordCount");

    }

}

二、案例统计去重人数

用户ID,活动ID,事件类型(1:浏览,2:参与)、
user1, A, 1
user1, A, 1
user1, A, 2
user2, A, 1
user2, B, 1

统计:各个活动,各个事件的人数和次数
package cn._51doit.flink.day08;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

public class ActivityCount {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource lines = env.socketTextStream("localhost", 8888);
        
        //对数据进行整理
        SingleOutputStreamOperator> tpDataStream = lines.map(new MapFunction>() {
            @Override
            public Tuple3 map(String line) throws Exception {
                String[] fields = line.split(",");
                return Tuple3.of(fields[0], fields[1], fields[2]);
            }
        });

        KeyedStream, Tuple2> keyed = tpDataStream.keyBy(new KeySelector, Tuple2>() {
            @Override
            public Tuple2 getKey(Tuple3 value) throws Exception {
                return Tuple2.of(value.f1, value.f2);
            }
        });

        //KeyedStream, Tuple2> keyed = tpDataStream.keyBy(t -> Tuple2.of(t.f1, t.f2), TypeInformation.of(new TypeHint>() {}));

        SingleOutputStreamOperator> result = keyed.process(new ActivityCountFunction());

        result.print();

        env.execute();


    }
}

定义ActivityCountFunction:

package cn._51doit.flink.day08;

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.util.HashSet;


public class ActivityCountFunction extends KeyedProcessFunction, Tuple3, Tuple4> {

    private transient ValueState actCountState;
    private transient ValueState> userDisState;

    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor stateDescriptor1 = new ValueStateDescriptor<>("ac-count", Long.class);
        actCountState = getRuntimeContext().getState(stateDescriptor1);
        ValueStateDescriptor> stateDescriptor2 = new ValueStateDescriptor>("dis-ac-count", TypeInformation.of(new TypeHint>(){}));
        userDisState = getRuntimeContext().getState(stateDescriptor2);
    }
    @Override
    public void processElement(Tuple3 value, Context ctx, Collector> out) throws Exception {

        //计算次数
        Long historyCount = actCountState.value();
        if(historyCount == null) {
            historyCount = 0L;
        }
        Long totalCount = historyCount + 1;
        actCountState.update(totalCount);
        //计算人数
        HashSet disUserSet = userDisState.value();
        if (disUserSet == null) {
            disUserSet = new HashSet<>();
        }
        disUserSet.add(value.f0);
        userDisState.update(disUserSet);
        //输出结果
        out.collect(Tuple4.of(value.f1, value.f2, totalCount, (long) disUserSet.size()));
    }
}

三、Broadcast State的使用

实时关连数据库的维度数据

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/762664.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号