栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink UDF/UDAF/UDTF

flink UDF/UDAF/UDTF

最近使用flink sql做实时数仓,需要开发自定义函数,
特此总结出UDF/UDAF/UDTF三种函数的写法,并配以代码和注释说明。

一、UDF
介绍:自定义标量函数(User Defined Scalar Function),一行输入,一行输出。

import org.apache.flink.table.functions.ScalarFunction;


public class DemoUDF extends ScalarFunction {

    public DemoUDF() {}

    public String eval(String str) {
        return str.toLowerCase();
    }
}

二、UDAF
介绍:自定义聚合函数,多行输入,一行输出。

import org.apache.flink.table.functions.AggregateFunction;


// AggregateFunction<聚合的最终结果类型,聚合期间的中间结果类型>
public class DemoUDAF extends AggregateFunction {

    //定义一个累加器,存放聚合的中间结果
    public static class SumAccumulator{
        public long sumPrice;
    }

    //初始化累加器
    @Override
    public SumAccumulator createAccumulator() {
        SumAccumulator sumAccumulator = new SumAccumulator();
        sumAccumulator.sumPrice=0;
        return sumAccumulator;
    }

    //根据输入,更新累加器
    @Override
    public void accumulate(SumAccumulator accumulator,Long input){
        accumulator.sumPrice += input;
    }

    //返回聚合的最终结果
    @Override
    public Long getValue(SumAccumulator accumulator) {
        return accumulator.sumPrice;
    }
}

三、UDTF
介绍:自定义<表函数>,这类函数是作用于表,而不是字段。
UDTF可以细分为3类:

一行输入,多行输出一列输入,多列输出一行输入,多行多列输出

UDTF之一行输入,多行输出

import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;




public class DemoRowMore extends TableFunction {

    public DemoRowMore() {
    }

    // 使用注解指定输出数据的名称和类型
    // 没有返回值,使用collect收集数据,可以收集多次
    @FunctionHint(output = @DataTypeHint("ROW"))
    public void eval(String data,String split){
        String[] arr = data.split(split);
        for (String s : arr) {
            collect(Row.of(s));
        }
    }
}
数据样例:
a,b,c
d,e

Flink SQL>
select word
from tablename,LATERAL TABLE(DemoRowMore(fieldname,',')) as T(word);

Flink SQL>
a
b
c
d
e

UDTF之一列输入,多列输出

import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;


public class DemoColumnMore extends TableFunction {

    public DemoColumnMore() {
    }

    // 使用注解指定输出数据的名称和类型
    @FunctionHint(output = @DataTypeHint("ROW"))
    public void eval(String data, String split){
        String[] arr = data.split(split);
        Row row = new Row(2);
        row.setField(0,arr[0]);
        row.setField(1,arr[1]);
        collect(row);
    }
}
数据样例:
'张三,上海'

Flink SQL>
select name,city
from tablename,LATERAL TABLE(DemoColumnMore(fieldname,',')) as T(name,city)

Flink SQL>
'张三','上海'

创作不易,希望对你有所帮助!

一健三联就是对我最大的鼓励,笔芯~

交流加微wex997520707~

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/758969.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号