最近使用flink sql做实时数仓,需要开发自定义函数,
特此总结出UDF/UDAF/UDTF三种函数的写法,并配以代码和注释说明。
一、UDF
介绍:自定义标量函数(User Defined Scalar Function),一行输入,一行输出。
import org.apache.flink.table.functions.ScalarFunction;
public class DemoUDF extends ScalarFunction {
public DemoUDF() {}
public String eval(String str) {
return str.toLowerCase();
}
}
二、UDAF
介绍:自定义聚合函数,多行输入,一行输出。
import org.apache.flink.table.functions.AggregateFunction; // AggregateFunction<聚合的最终结果类型,聚合期间的中间结果类型> public class DemoUDAF extends AggregateFunction{ //定义一个累加器,存放聚合的中间结果 public static class SumAccumulator{ public long sumPrice; } //初始化累加器 @Override public SumAccumulator createAccumulator() { SumAccumulator sumAccumulator = new SumAccumulator(); sumAccumulator.sumPrice=0; return sumAccumulator; } //根据输入,更新累加器 @Override public void accumulate(SumAccumulator accumulator,Long input){ accumulator.sumPrice += input; } //返回聚合的最终结果 @Override public Long getValue(SumAccumulator accumulator) { return accumulator.sumPrice; } }
三、UDTF
介绍:自定义<表函数>,这类函数是作用于表,而不是字段。
UDTF可以细分为3类:
一行输入,多行输出一列输入,多列输出一行输入,多行多列输出
UDTF之一行输入,多行输出
import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.annotation.FunctionHint; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.types.Row; public class DemoRowMore extends TableFunction{ public DemoRowMore() { } // 使用注解指定输出数据的名称和类型 // 没有返回值,使用collect收集数据,可以收集多次 @FunctionHint(output = @DataTypeHint("ROW
")) public void eval(String data,String split){ String[] arr = data.split(split); for (String s : arr) { collect(Row.of(s)); } } }
数据样例: a,b,c d,e Flink SQL> select word from tablename,LATERAL TABLE(DemoRowMore(fieldname,',')) as T(word); Flink SQL> a b c d e
UDTF之一列输入,多列输出
import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.annotation.FunctionHint; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.types.Row; public class DemoColumnMore extends TableFunction{ public DemoColumnMore() { } // 使用注解指定输出数据的名称和类型 @FunctionHint(output = @DataTypeHint("ROW
")) public void eval(String data, String split){ String[] arr = data.split(split); Row row = new Row(2); row.setField(0,arr[0]); row.setField(1,arr[1]); collect(row); } }
数据样例: '张三,上海' Flink SQL> select name,city from tablename,LATERAL TABLE(DemoColumnMore(fieldname,',')) as T(name,city) Flink SQL> '张三','上海'
创作不易,希望对你有所帮助!
一健三联就是对我最大的鼓励,笔芯~
交流加微wex997520707~



