flink实现标量函数(scalarFunction)的方式是,自定义一个类,继承 ScalarFunction类,定义一个eval 函数,直接用def 不是override
注意tableAPI的形式下,不需要注册该函数,可以直接调用实例化的对象
但是在SQL形式下,需要先注册该函数
package com.ct.day08
import com.ct.day01.SensorSource
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.types.Row
object ScalarFunctionExample {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.useBlinkPlanner()
.build()
val tEnv = StreamTableEnvironment.create(env,settings)
val stream = env.addSource(new SensorSource)
val table: Table = tEnv.fromDataStream(stream,'id,'timestamp as 'ts,'temperature)
val hashcode = new myHashcode(10)
//table API 不需要注册函数
table
.select('id,hashcode('id))
.toRetractStream[Row]
// .print()
//SQl写法 需要注册函数
tEnv.registerFunction("myhashcode",hashcode)
//注册表
tEnv.createTemporaryView("sensor",table)
tEnv.sqlQuery("select id,myhashcode(id) from sensor")
.toRetractStream[Row]
.print()
env.execute()
}
//编写一个 标量自定义函数,将hashcode * 参数
class myHashcode(factor : Int) extends ScalarFunction {
def eval(s:String) : Int = {
factor * s.hashCode
}
}
}



