栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 数据挖掘与分析

Flink--Table API UDF函数之标量函数

Flink--Table API UDF函数之标量函数

flink实现标量函数(scalarFunction)的方式是,自定义一个类,继承 ScalarFunction类,定义一个eval 函数,直接用def 不是override

注意tableAPI的形式下,不需要注册该函数,可以直接调用实例化的对象

但是在SQL形式下,需要先注册该函数

package com.ct.day08

import com.ct.day01.SensorSource
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.types.Row


object ScalarFunctionExample {

  def main(args: Array[String]): Unit = {


    val  env = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)

    val settings = EnvironmentSettings
      .newInstance()
      .inStreamingMode()
      .useBlinkPlanner()
      .build()

    val tEnv = StreamTableEnvironment.create(env,settings)


    val stream = env.addSource(new SensorSource)


    val table: Table = tEnv.fromDataStream(stream,'id,'timestamp as 'ts,'temperature)

    val hashcode = new myHashcode(10)

    //table API   不需要注册函数
    table
      .select('id,hashcode('id))
      .toRetractStream[Row]
//      .print()

    //SQl写法  需要注册函数

    tEnv.registerFunction("myhashcode",hashcode)
    //注册表
    tEnv.createTemporaryView("sensor",table)

    tEnv.sqlQuery("select id,myhashcode(id) from sensor")
        .toRetractStream[Row]
        .print()


    env.execute()

  }


  //编写一个 标量自定义函数,将hashcode * 参数
  class myHashcode(factor : Int) extends ScalarFunction {

    def eval(s:String) : Int = {
      factor * s.hashCode
    }
  }

}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/278821.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号