栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink-KeyedState-MapState结合Window使用

Flink-KeyedState-MapState结合Window使用

文章目录
  • (一)MapState使用步骤
  • (二)MapState验证
  • (三)完整DEMO

(一)MapState使用步骤

映射状态(MapState),将状态表示为一组Key-Value键值对对;(get(),put(),类似HashMap)

  • MapState.get(UK key)
  • MapState.put(UK key, UV value)
  • MapState.contains(UK key)
  • MapState.remove(UK key)

在实际开发环境中,使用MapState的情况最多,下面我们将MapState结合 Window进行使用

①:KeyedState必须作用在RichFunction下

②:在RichFunction中 Open()方法,定义状态描述器(指明状态存储类型等)

③:状态变量定义

④:状态使用

(二)MapState验证

我们前边讲过:

Keyed State基于KeyedStream(数据流使用keyBy()算子)基础上才可使用。

Keyed State由数据流中定义的每个具体的键(key) 来维护和访问,每个key维护一个状态实例,flink会将具有相同键的数据分区到同一个算子任务中,这个任务会维护和处理这个key对应的状态(会自动将状态的访问范围限定为当前数据的key,一个key只能访问它自己的状态,不同key之间也不能互相访问) 当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key

键控状态Keyed State 数据结构:由于每个键属于 keyed operator 的一个并行实例,可将其简单地视为


上方,项目中设置并行度为4,那么job将会开启4个SpeedAlarmWindow窗口

窗口中的MapState使用姿势:

其中呢,window-apply-sink将会组成算子链,每一个算子链由一个具体的线程执行 (具体请查看:Flink窗口核心概念-有KEY窗口和无KEY窗口)

我们上边,再次强调flink会将具有相同键的数据分区到同一个算子任务中,

因此,如果程序逻辑正常

假如Key=1 被并行度1线程拿到,那么以后Key=1的数据将会一直被分配到并行度1线程执行;

假如Key=2 也被并行度1线程拿到,那么以后Key=2的数据也将会一直被分配到并行度1线程执行;

假如Key=3 被并行度4线程拿到,那么以后Key=3的数据将会一直被分配到并行度4线程执行

我们这里使用print来模拟flink的sink操作

从上图执行结果来看…

当前窗口打开了四个

线程号2处理了Key=12的数据,且对上一条定位进行了存储,从超速开始>>>超速持续中

Key=12具体的数据:

完全符合我们的预期!如此简洁的State操作,可以让我们从繁杂的状态管理中解放出来,更加侧重关注与我们的业务逻辑

(三)完整DEMO
package com.leilei;

import cn.hutool.core.util.RandomUtil;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.Iterator;
import java.util.Random;
import java.util.UUID;


public class Flink_State_2_MapState {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        env.setParallelism(4);
        final DataStreamSource locationSource = env.addSource(new LocationSource());
        final WindowedStream windowedStream = locationSource
                .keyBy(Location::getVehicleId)
                .countWindow(1);
        DataStream result = windowedStream.apply(new SpeedAlarmWindow());
        result.print();
        //result.addSink(new MysqlSink());
        env.execute("mapState-window");

    }


    public static class SpeedAlarmWindow extends RichWindowFunction {
        MapState locationState;
        String uuid;

        @Override
        public void apply(Integer integer, GlobalWindow window, Iterable locationList, Collector out) throws Exception {
            for (Location location : locationList) {
                final String key = location.getVehicleId().toString();
                final Location preLocation = locationState.get(key);
                if (preLocation == null) {
                    if (location.getGpsSpeed() > location.getLimitSpeed()) {
                        locationState.put(key, location);
                        out.collect(location.toString() + ">>超速开始");
                        break;
                    }
                } else {
                    if (location.getGpsSpeed() > location.getLimitSpeed()) {
                        locationState.put(key, location);
                        out.collect(location.toString() + ">>持续超速中" +  "n" +
                                "上一条超速数据为:" + "n" + preLocation.toString());
                    } else {
                        locationState.remove(key);
                        out.collect(location.toString() + ">>超速结束" + "n"+" 结束前定位:" + "n" + preLocation.toString());
                    }
                }
            }
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            uuid = UUID.randomUUID().toString();
            System.out.println(uuid + "窗口打开了");
            super.open(parameters);
            locationState = getRuntimeContext().getMapState(new MapStateDescriptor<>("locationState",
                    TypeInformation.of(String.class),
                    TypeInformation.of(Location.class)));

        }

        @Override
        public void close() throws Exception {
            super.close();
            System.out.println(uuid + "窗口关闭了");
        }
    }

    public static class LocationSource implements SourceFunction {
        Boolean flag = true;
        @Override
        public void run(SourceContext ctx) throws Exception {
            Random random = new Random();
            while (flag) {
                int vehicleId = random.nextInt(20) + 1;
                Location location = Location.builder()
                        .vehicleId(vehicleId)
                        .plate("川A000" + vehicleId)
                        .color("绿")
                        .date(Integer.parseInt(LocalDate.now().format(DateTimeFormatter.BASIC_ISO_DATE)))
                        .gpsSpeed(RandomUtil.randomInt(88, 100))
                        .limitSpeed(RandomUtil.randomInt(88, 95))
                        .devTime(System.currentTimeMillis())
                        .build();
                ctx.collect(location);
                Thread.sleep(2000);
            }
        }

        @Override
        public void cancel() {
            flag = false;
        }
    }

    
    public static class MysqlSink extends RichSinkFunction {
        Connection conn = null;
        PreparedStatement ps = null;
        String url = "jdbc:mysql://xx:3306/learn?useUnicode=true&characterEncoding=utf-8&useSSL=false";
        String username = "root";
        String password = "xx";

        
        @Override
        public void open(Configuration parameters) throws Exception {
            conn = DriverManager.getConnection(url, username, password);
            conn.setAutoCommit(false);
        }

        
        @Override
        public void close() throws Exception {
            if (conn != null) {
                conn.close();
            }
            if (ps != null) {
                ps.close();
            }
        }

        
        @Override
        public void invoke(String str, SinkFunction.Context context) throws Exception {
            String sql = "insert into demo (`test_save_point`) " +
                    "values(?)";
            ps = conn.prepareStatement(sql);
            ps.setString(1, str);
            ps.execute();
            conn.commit();
        }
    }
}


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/460788.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号