Flink Table API 和 SQL 为用户提供了一组用于数据转换的内置函数SQL 中支持的很多函数,Table API 和 SQL 都已经做了实现
常用函数类型有:比较函数、逻辑函数、算数函数
其他函数(Functions)
用户自定义函数(UDF)
用户定义函数(User-defined Functions,UDF)是一个重要的特性,它们显著地扩展了查询的表达能力;在大多数情况下,用户定义的函数必须先注册,然后才能在查询中使用;函数通过调用 registerFunction()方法在 TableEnvironment 中注册。当用户定义的函数被注册时,它被插入到 TableEnvironment 的函数目录中,这样Table API 或 SQL 解析器就可以识别并正确地解释它;
标量函数(Scalar Functions)用户定义的标量函数,可以将0、1或多个标量值,映射到新的标量值;为了定义标量函数,必须在 org.apache.flink.table.functions 中扩展基类Scalar Function,并实现(一个或多个)求值(eval)方法;标量函数的行为由求值方法决定,求值方法必须公开声明并命名为 eval;
下面看一个具体的案例代码
package com.congge.table.api.udf;
import com.congge.source.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;
public class TestScalarFunction {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
String path = "E:\code-self\flink_study\src\main\resources\sensor.txt";
// 1. 读取数据
DataStreamSource inputStream = env.readTextFile(path);
// 2. 转换成POJO
DataStream dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// 3. 将流转换成表
Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");
// 4. 自定义标量函数,实现求id的hash值
// table API
HashCode hashCode = new HashCode(23);
// 需要在环境中注册UDF
tableEnv.registerFunction("hashCode", hashCode);
Table resultTable = sensorTable.select("id, ts, hashCode(id)");
// SQL
tableEnv.createTemporaryView("sensor", sensorTable);
Table resultSqlTable = tableEnv.sqlQuery("select id, ts, hashCode(id) from sensor");
// 打印输出
tableEnv.toAppendStream(resultTable, Row.class).print("result");
tableEnv.toAppendStream(resultSqlTable, Row.class).print("sql");
env.execute();
}
// 实现自定义的ScalarFunction
public static class HashCode extends ScalarFunction{
private int factor = 13;
public HashCode(int factor) {
this.factor = factor;
}
public int eval(String str){
return str.hashCode() * factor;
}
}
}
本次案例需求为,使用Flink Table Api 读取文件数据后,通过自定义函数,输出原字段同时,也将id的hash值打印输出,运行上面的代码,观察控制台输出效果



