栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink自定义source(flink窗口函数)

flink自定义source(flink窗口函数)

函数(Functions)

Flink Table API 和 SQL 为用户提供了一组用于数据转换的内置函数SQL 中支持的很多函数,Table API 和 SQL 都已经做了实现

常用函数类型有:比较函数、逻辑函数、算数函数

 

其他函数(Functions)

 

用户自定义函数(UDF)

用户定义函数(User-defined Functions,UDF)是一个重要的特性,它们显著地扩展了查询的表达能力;在大多数情况下,用户定义的函数必须先注册,然后才能在查询中使用;函数通过调用 registerFunction()方法在 TableEnvironment 中注册。当用户定义的函数被注册时,它被插入到 TableEnvironment 的函数目录中,这样Table API 或 SQL 解析器就可以识别并正确地解释它;

标量函数(Scalar Functions)

用户定义的标量函数,可以将0、1或多个标量值,映射到新的标量值;为了定义标量函数,必须在 org.apache.flink.table.functions 中扩展基类Scalar Function,并实现(一个或多个)求值(eval)方法;标量函数的行为由求值方法决定,求值方法必须公开声明并命名为 eval;

下面看一个具体的案例代码

package com.congge.table.api.udf;
import com.congge.source.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;

public class TestScalarFunction {

    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        String path = "E:\code-self\flink_study\src\main\resources\sensor.txt";

        // 1. 读取数据
        DataStreamSource inputStream = env.readTextFile(path);

        // 2. 转换成POJO
        DataStream dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });

        // 3. 将流转换成表
        Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");

        // 4. 自定义标量函数,实现求id的hash值
        // table API
        HashCode hashCode = new HashCode(23);
        // 需要在环境中注册UDF
        tableEnv.registerFunction("hashCode", hashCode);
        Table resultTable = sensorTable.select("id, ts, hashCode(id)");

        // SQL
        tableEnv.createTemporaryView("sensor", sensorTable);
        Table resultSqlTable = tableEnv.sqlQuery("select id, ts, hashCode(id) from sensor");

        // 打印输出
        tableEnv.toAppendStream(resultTable, Row.class).print("result");
        tableEnv.toAppendStream(resultSqlTable, Row.class).print("sql");

        env.execute();
    }

    // 实现自定义的ScalarFunction
    public static class HashCode extends ScalarFunction{
        private int factor = 13;

        public HashCode(int factor) {
            this.factor = factor;
        }

        public int eval(String str){
            return str.hashCode() * factor;
        }
    }
}

本次案例需求为,使用Flink Table Api 读取文件数据后,通过自定义函数,输出原字段同时,也将id的hash值打印输出,运行上面的代码,观察控制台输出效果

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/771460.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号