栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Flink的累加器和广播变量、广播流、分布式缓存

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Flink的累加器和广播变量、广播流、分布式缓存

1、Accumulator累加器 


Accumulator即累加器,与Mapreduce counter的应用场景差不多,都能很好地观察task在运行期间的数据变化。可以在Flink job任务中的算子函数中使用累加器,但是只能在任务执行结束后才能获得累加器的最终结果。Counter是一个具体的累加器(Accumulator)实现,常用的Counter有IntCounter,LongCounter和DoubleCounter。

用法:

    1:创建累加器
    private IntCounter numLines = new IntCounter();
    2:注册累加器
    getRuntimeContext().addAccumulator("num-lines",this.numLines);
    3:使用累加器
    this.numLines.add(1);
    4:获取累加器的结果
    myJobExcutionResult.getAccumulatorResult("num-lines")

 案列:统计map算子处理数据的条数

package Flink_API;

import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.configuration.Configuration;


public class BatchCounterTest {
    public static void main(String[] args) throws Exception {

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource dataSource=env.fromElements("1","2","3","4","5");

        DataSet map = dataSource.map(new RichMapFunction() {
            //            1:创建累加器
            private IntCounter numLines = new IntCounter();

            @Override
            public void open(Configuration parameters) throws Exception {
                //注册累加器
                getRuntimeContext().addAccumulator("num-lines", numLines);
            }

            @Override
            public String map(String s) throws Exception {
               //使用累加器
                numLines.add(1);
                return s;
            }
        }).setParallelism(5);
        map.print();
        env.execute("BatchCounterTest");
    }
}

2、广播变量:是通过广播将广播变量分发到taskmanager中进行处理

广播变量的使用步骤:
    1、初始化数据
    DataSet toBroadcast = env.fromElements(1,2,3);
    2、广播数据(即注册数据,那个算子用,就在那个算子后面进行注册)
    算子.withBroadcastSet(toBroadcast,"broadcastSetName");
    3、获取数据
    Collection broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");


实例程序:Flink从数据园中静静可以获取到用户的性命,最终需要将用户的性命和年龄信息打印出来。

package Flink_API;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

public class BatchBroadcastTest {
    public static void main(String[] args){
        //获取Flink的运行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //准备需要的广播数据
        ArrayList> broadData=new ArrayList<>();
        broadData.add(new Tuple2<>("wtt",29));
        broadData.add(new Tuple2<>("lidong",30));
        broadData.add(new Tuple2<>("hengda",40));
        DataSource> tupleData=env.fromCollection(broadData);

        //处理需要广播的数据,将数据集转换成Map类型,Map中的key就是用户的性命,value就是用户年龄。
        DataSet> toBroadCast = tupleData.map(new MapFunction, HashMap>() {
            @Override
            public HashMap map(Tuple2 stringIntegerTuple2) throws Exception {
                HashMap hashMap=new HashMap<>();
                hashMap.put(stringIntegerTuple2.f0,stringIntegerTuple2.f1);
                return hashMap;
            }
        }).setParallelism(3);//到此,广播的数据已经准备好了

        //注意:在这里使用RichMapFunction获取广播变量
        //数据源单纯的姓名信息
        DataSource nameDataSource = env.fromElements("wtt","lidong","hengda");

        DataSet data=nameDataSource.map(new RichMapFunction() {

            List> broadCastMap=new ArrayList>();
            HashMap allMap=new HashMap();

            
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                //获取广播数据
                broadCastMap = getRuntimeContext().getBroadcastVariable("toBroadCastMapName");
                for(HashMap map:broadCastMap){
                    allMap.putAll(map);//最终保存的格式就是{"name":"age"}

                }
            }
            
            @Override
            public String map(String s) throws Exception {
                return s;
            }
        });
    }
}

3、广播流:批处理当中就是广播变量,流处理当中就是广播流

package Flink_API;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.Collector;

import java.io.Serializable;
import java.util.Properties;

//广播流
public class FlinkBroadcastStream {

    public static void main(String[] args) throws Exception {
        //创建运行环境
        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
        //Flink是以数据自带的时间戳字段为准
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //设置并行度
        env.setParallelism(1);

         //1、获取第一个流,获取用户的浏览信息
        DataStream browseStream = getUserBrowseDataStream(env);
        //获取用户的黑名单流信息
        //2、获取用户的点击信息
        DataStream blackUserDataStream = getUserBlackUserDataStream(env);

        //1定义一个MapStateDescriptor来描述我们要广播的数据的格式
        MapStateDescriptor descriptor=new MapStateDescriptor("userBlackList",String.class,BlackUser.class);

        //2将其中的配置数据源注册成广播流
        BroadcastStream broadcastStream = blackUserDataStream.broadcast(descriptor);


        //3通过connect连接主流和广播流
        DataStream filterDataStream = browseStream.connect(broadcastStream)
                .process(new BroadcastProcessFunction(){
                    @Override
                    public void processElement(UserBrowseLog value, ReadonlyContext readOnlyContext, Collector collector) throws Exception {
                          //从广播中获取对应的key的value
                        ReadOnlyBroadcastState broadcastState=readOnlyContext.getBroadcastState(descriptor);
                        BlackUser blackUser=broadcastState.get(value.userID);
                        if(blackUser !=null){
                            System.out.print("用户"+value.userID + "在黑名单中,过滤掉该用户的浏览信息");
                        }else{
                            collector.collect(value);
                        }
                    }

                    @Override
                    public void processBroadcastElement(BlackUser value, Context context, Collector collector) throws Exception {
                        //实时更新广播流当中的数据
                        BroadcastState broadcastState=context.getBroadcastState(descriptor);
                        broadcastState.put(value.userID,value);
                        System.out.print("------------------>广播流当前的数据是:---------------->");
                        System.out.print(broadcastState);
                    }
                });
        filterDataStream.print();
        env.execute("FlinkBroadcastStream");
    }

    private static DataStream getUserBrowseDataStream(StreamExecutionEnvironment env) {
        Properties consumerProperties = new Properties();
        consumerProperties.setProperty("bootstrap.severs","page01:9001");
        consumerProperties.setProperty("grop.id","browsegroup");

        DataStreamSource dataStreamSource=env.addSource(new FlinkKafkaConsumer010("browse_topic", (KeyedDeserializationSchema) new SimpleStringSchema(),consumerProperties));

        DataStream processData=dataStreamSource.process(new ProcessFunction() {
            @Override
            public void processElement(String s, Context context, Collector collector) throws Exception {
                try{
                    UserBrowseLog browseLog = com.alibaba.fastjson.JSON.parseObject(s, UserBrowseLog.class);
                    if(browseLog !=null){
                        collector.collect(browseLog);
                    }
                }catch(Exception e){
                    System.out.print("解析Json——UserBrowseLog异常:"+e.getMessage());
                }
            }
        });
        //设置watermark
        return processData;
    }

    private static DataStream getUserBlackUserDataStream(StreamExecutionEnvironment env) {
        Properties consumerProperties = new Properties();
        consumerProperties.setProperty("bootstrap.severs","page01:9002");
        consumerProperties.setProperty("grop.id","browsegroup");

        DataStreamSource dataStreamSource=env.addSource(new FlinkKafkaConsumer010("browse_topic", (KeyedDeserializationSchema) new SimpleStringSchema(),consumerProperties));

        DataStream processData=dataStreamSource.process(new ProcessFunction() {
            @Override
            public void processElement(String s, Context context, Collector collector) throws Exception {
                try{
                    BlackUser blackUser = com.alibaba.fastjson.JSON.parseObject(s, BlackUser.class);
                    if(blackUser !=null){
                        collector.collect(blackUser);
                    }
                }catch(Exception e){
                    System.out.print("解析Json——UserBrowseLog异常:"+e.getMessage());
                }
            }
        });
        return processData;
    }
    //定义用户黑名单的配置信息
    public static class BlackUser implements Serializable{
        private String userID;
        private String userName;
        public BlackUser(){

        }

        public BlackUser(String userID, String userName) {
            this.userID = userID;
            this.userName = userName;
        }

        public String getUserID() {
            return userID;
        }

        public void setUserID(String userID) {
            this.userID = userID;
        }

        public String getUserName() {
            return userName;
        }

        public void setUserName(String userName) {
            this.userName = userName;
        }
    }
    //浏览类
    public static class UserBrowseLog implements Serializable {
        private String userID;
        private String eventTime;
        private String eventType;
        private String productID;
        private Integer productPrice;

        public String getUserID() {
            return userID;
        }

        public void setUserID(String userID) {
            this.userID = userID;
        }

        public String getEventTime() {
            return eventTime;
        }

        public void setEventTime(String eventTime) {
            this.eventTime = eventTime;
        }

        public String getEventType() {
            return eventType;
        }

        public void setEventType(String eventType) {
            this.eventType = eventType;
        }

        public String getProductID() {
            return productID;
        }

        public void setProductID(String productID) {
            this.productID = productID;
        }

        public Integer getProductPrice() {
            return productPrice;
        }

        public void setProductPrice(Integer productPrice) {
            this.productPrice = productPrice;
        }

        @Override
        public String toString() {
            return "UserBrowseLog{" +
                    "userID='" + userID + ''' +
                    ", eventTime='" + eventTime + ''' +
                    ", eventType='" + eventType + ''' +
                    ", productID='" + productID + ''' +
                    ", productPrice=" + productPrice +
                    '}';
        }
    }
}

4、Flink分布式缓存Distributed Cache

Flink提供了一个分布式缓存,类似于hadoop,可以使用户在并行函数中很方便的读取本地文件,并把它放在taskmanager节点中,防止task重复拉取。此缓存的工作机制如下:程序注册一个文件或者目录(本地或者远程文件系统,例如hdfs或者s3),通过ExecutionEnvironment注册缓存文件并为它起一个名称。当程序执行,Flink自动将文件或者目录复制到所有taskmanager节点的本地文件系统,仅会执行一次。用户可以通过这个指定的名称查找文件或者目录,然后从taskmanager节点的本地文件系统访问它

注册: 

//获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//1:注册一个文件,可以使用hdfs上的文件 也可以是本地文件进行测试
env.registerCachedFile("/Users/wangzhiwu/WorkSpace/quickstart/text","a.txt");

使用: 

 File myFile = getRuntimeContext().getDistributedCache().getFile("a.text");

 a.text文件


hello flink hello Flink

完整代码:

public class DisCacheTest {
    public static void main(String[] args) throws Exception{
        //获取运行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //1:注册一个文件,可以使用hdfs上的文件 也可以是本地文件进行测试
      
        env.registerCachedFile("/Users/wangzhiwu/WorkSpace/quickstart/text","a.txt");
        DataSource data = env.fromElements("a", "b", "c", "d");
        DataSet result = data.map(new RichMapFunction() {
            private ArrayList dataList = new ArrayList();
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                //2:使用文件
                File myFile = getRuntimeContext().getDistributedCache().getFile("a.txt");
                List lines = FileUtils.readLines(myFile);
                for (String line : lines) {
                    this.dataList.add(line);
                    System.err.println("分布式缓存为:" + line);
                }
            }
            @Override
            public String map(String value) throws Exception {
                //在这里就可以使用dataList
                System.err.println("使用datalist:" + dataList + "------------" +value);
                //业务逻辑
                return dataList +":" +  value;
            }
        });
        result.printToErr();
    }
}//

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/781829.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号