栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Flink-java案例:在一小时内每隔5s统计一次商品pv的TopN

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Flink-java案例:在一小时内每隔5s统计一次商品pv的TopN

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;


public class TopNTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env
                .readTextFile("F:\idea_project\flink\src\main\resources\UserBehavior.csv")
                .map(new MapFunction() {
                    @Override
                    public UserBehavior map(String value) throws Exception {
                        String[] arr = value.split(",", -1); // 根据 ',' 进行切分,-1 参数表示没值也保留为空
                        return new UserBehavior(
                                arr[0], arr[1], arr[2], arr[3],
                                Long.parseLong(arr[4]) * 1000L // 将事件毫秒时间转成秒值
                        );
                    }
                })
                .filter(r -> r.behavior.equals("pv"))
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0)) // 水位线设置为0,因为数据已经排好序了
                                .withTimestampAssigner(new SerializableTimestampAssigner() {
                                    @Override
                                    public long extractTimestamp(UserBehavior value, long l) {
                                        return value.timestamp; // 指定事件时间的字段
                                    }
                                })
                )
                .keyBy(r -> r.itemId) // 根据key进行分流
                .window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5))) //滑动窗口: 窗口总大小为1小时,滑动窗口大小为5分钟
                .aggregate(new CountAgg(), new WindowResult()) // 对分流进行聚合,并使用全窗口函数进行统计
                .keyBy(r -> r.windowEnd) // 根据key再次分流
                .process(new TopN(3)) // 使用分区窗口函数,统计top3
                .print(); // 在控制台打印

        env.execute();
    }

    public static class TopN extends KeyedProcessFunction{
        private ListState listState;
        private Integer n;

        public TopN(Integer n) {
            this.n = n;
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            listState = getRuntimeContext().getListState(
                    new ListStateDescriptor("list-state", Types.POJO(ItemViewCount.class))
            );
        }

        @Override
        public void processElement(ItemViewCount value, Context ctx, Collector out) throws Exception {
            listState.add(value);
            ctx.timerService().registerEventTimeTimer(value.windowEnd + 1L);
        }

        @Override
        public void onTimer(long timestamp, onTimerContext ctx, Collector out) throws Exception {
            super.onTimer(timestamp, ctx, out);
            ArrayList itemViewCountArrayList = new ArrayList<>();
            for (ItemViewCount ivc : listState.get()) itemViewCountArrayList.add(ivc);
            listState.clear();

            itemViewCountArrayList.sort(new Comparator() {
                @Override
                public int compare(ItemViewCount t2, ItemViewCount t1) {
                    return t1.count.intValue() - t2.count.intValue();
                }
            });

            StringBuilder result = new StringBuilder();
            result
                    .append("===========================================n")
                    .append("窗口结束时间:" + new Timestamp(timestamp - 1L))
                    .append("n");
            for (int i = 0; i < n; i++) {
                ItemViewCount curr = itemViewCountArrayList.get(i);
                result
                        .append("第" + (i + 1) + "名的商品id是:" + curr.itemId)
                        .append(",浏览次数是:" + curr.count)
                        .append("n");
            }
            result
                    .append("===========================================nn");
            out.collect(result.toString());
        }
    }

    public static class WindowResult extends ProcessWindowFunction {
        @Override
        public void process(String s, Context context, Iterable elements, Collector out) throws Exception {
            out.collect(new ItemViewCount(s, elements.iterator().next(), context.window().getStart(), context.window().getEnd()));
        }
    }

    public static class CountAgg implements AggregateFunction {
        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(UserBehavior value, Long accumulator) {
            return accumulator + 1L;
        }

        @Override
        public Long getResult(Long accumulator) {
            return accumulator;
        }

        @Override
        public Long merge(Long a, Long b) {
            return null;
        }
    }

    // 每个商品在每个窗口中的浏览次数
    public static class ItemViewCount {
        public String itemId;
        public Long count;
        public Long windowStart;
        public Long windowEnd;

        public ItemViewCount() {
        }

        public ItemViewCount(String itemId, Long count, Long windowStart, Long windowEnd) {
            this.itemId = itemId;
            this.count = count;
            this.windowStart = windowStart;
            this.windowEnd = windowEnd;
        }

        @Override
        public String toString() {
            return "ItemViewCount{" +
                    "itemId='" + itemId + ''' +
                    ", count=" + count +
                    ", windowStart=" + new Timestamp(windowStart) +
                    ", windowEnd=" + new Timestamp(windowEnd) +
                    '}';
        }
    }

    public static class UserBehavior {
        public String userId;
        public String itemId;
        public String categoryId;
        public String behavior;
        public Long timestamp;

        public UserBehavior() {
        }

        public UserBehavior(String userId, String itemId, String categoryId, String behavior, Long timestamp) {
            this.userId = userId;
            this.itemId = itemId;
            this.categoryId = categoryId;
            this.behavior = behavior;
            this.timestamp = timestamp;
        }

        @Override
        public String toString() {
            return "UserBehavior{" +
                    "userId='" + userId + ''' +
                    ", itemId='" + itemId + ''' +
                    ", categoryId='" + categoryId + ''' +
                    ", behavior='" + behavior + ''' +
                    ", timestamp=" + new Timestamp(timestamp) +
                    '}';
        }
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/684704.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号