自定义聚合函数的步骤如下:
自定义一个类,继承 AggregateFunction 类,实现其三个方法:
1.createAccumulator :用来初始化累加器
2.accumulate:对于每个输入的数据,调用该方法来更新累加器
3.getValue: 获取累加器的值
package com.ct.day08
import com.ct.day01.{SensorReading, SensorSource}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.types.Row
object AggregateFunctionExample {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
val tEnv = StreamTableEnvironment.create(env,settings)
val stream: DataStream[SensorReading] = env.addSource(new SensorSource)
val table: Table = tEnv.fromDataStream(stream,'id,'timestamp as 'ts,'temperature)
val myAggregateFunction = new MyAggregateFunction
//table API 写法
table
.groupBy('id)
.aggregate(myAggregateFunction('temperature) as 'aggTemp)
.select('id,'aggTemp)
.toRetractStream[Row]
// .print()
//SQL 写法 需要先注册函数
tEnv.createTemporaryView("sensor",table)
tEnv.registerFunction("aggregate",myAggregateFunction)
tEnv.sqlQuery(" select id,aggregate(temperature) from sensor group by id")
.toRetractStream[Row]
.print()
env.execute()
}
// 创建一个累加器类,用来存在 传感器的 数量 和 温度总和
class MyAccumulate{
var sum:Double = _
var count:Int = _
}
class MyAggregateFunction extends AggregateFunction[Double ,MyAccumulate ]{
override def getValue(acc: MyAccumulate): Double = acc.sum/acc.count
override def createAccumulator(): MyAccumulate = new MyAccumulate
def accumulate(myAccumulate: MyAccumulate,temp:Double) :Unit={
myAccumulate.count +=1
myAccumulate.sum += temp
}
}
}



