栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 数据挖掘与分析

Flink--Table API UDF函数之表函数

Flink--Table API UDF函数之表函数

首先自定义一个类,继承TableFunction类,返回值类型由TableFunction的泛型决定,由 protected collect 来发送结果数据。

在Table API中,Table函数需要与.joinLateral或.leftOuterJoinLateral一起使用。

在SQL 中,则需要 lateral table 来一起使用

使用起来的结果有点类似 hive 的 lateral view 

package com.ct.day08

import com.ct.day01.SensorSource
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.functions.TableFunction
import org.apache.flink.types.Row



object TableFunctionExample {

  def main(args: Array[String]): Unit = {


    val  env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream = env.fromElements(
      "hello#world",
      "hi#flink"
    )


    val settings = EnvironmentSettings.newInstance()
      .inStreamingMode()
      .useBlinkPlanner()
      .build()

    val tEnv = StreamTableEnvironment.create(env,settings)

    val table = tEnv.fromDataStream(stream,'s)

    val mySplit = new split("#")

    table
        .leftOuterJoinLateral(mySplit('s) as ('word,'length))
      //上下两种写法等价  类似 hive的 lateral view
   //   .joinLateral(mySplit('s) as ('word,'length))
      .select('s,'word,'length)
      .toAppendStream[Row]
//      .print()

    tEnv.createTemporaryView("test",table)

    tEnv.registerFunction("mySplit",mySplit)

    tEnv.sqlQuery("select s,word,length from test,lateral table(mySplit(s)) as T(word,length)"
    )
        .toAppendStream[Row]
        .print()


    env.execute()
  }


  //TableFunction后边的泛型是返回值类型
  class split(sep : String) extends TableFunction[(String,Int)]{
    def eval(s : String) : Unit={

      //使用 collect 输出结果
      s.split(sep).foreach(x=>collect((x,x.length)))

    }

  }

}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/278826.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号