栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink中自定义Rich函数实现

Flink中自定义Rich函数实现

在Flink中,我们知道map ,flatMap,reduce算子都可以自定义函数实现,比如MapFunction:

public class MyMapFunction implements MapFunction {
    @Override
    public Integer map(String s) throws Exception {
        return Integer.parseInt(s);
    }
}

同时,Flink中还提供了对应的Rich函数,比如RichMapFunction,RichFlatMapFunction,RichReduceFunciton,而Rich相关函数都会继承AbstractRichFunction,这个类中会实现如下几个方法:

// Flink在算子调用前会执行open方法
public void open(Configuration parameters) throws Exception 
// 获取Flink运行时上下文,每个并行的算子任务都有一个上下文,会记录算子执行过程中一些信息,包括算子的并行度、任务序号、广播数据、累加器、监控数据、以及重要的状态数据
public RuntimeContext getRuntimeContext() ;
public void close()

Rich算子是Flink中状态计算的实现入口,我们这里模拟实现一个:

public class MyRichMapFunction extends RichMapFunction {

    private MapState mapState;
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        mapState = getRuntimeContext().getMapState(new MapStateDescriptor("testCount",String.class,Integer.class));
    }

    @Override
    public void close() throws Exception {
        super.close();
    }

    @Override
    public Integer map(String s) throws Exception {
        if(!mapState.contains(s)){
            mapState.put(s,0);
        }
        mapState.put(s,mapState.get(s)+1);
        return Integer.parseInt(s);
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/700533.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号