栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 数据挖掘与分析

Flink --Table API 之聚合函数(AggregateFunction)

Flink --Table API 之聚合函数(AggregateFunction)

自定义聚合函数的步骤如下:

自定义一个类,继承 AggregateFunction 类,实现其三个方法:

1.createAccumulator :用来初始化累加器

2.accumulate:对于每个输入的数据,调用该方法来更新累加器

3.getValue: 获取累加器的值  

package com.ct.day08

import com.ct.day01.{SensorReading, SensorSource}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.types.Row


object AggregateFunctionExample {
  def main(args: Array[String]): Unit = {

    val  env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()

    val tEnv = StreamTableEnvironment.create(env,settings)

    val stream: DataStream[SensorReading] = env.addSource(new SensorSource)


    val table: Table = tEnv.fromDataStream(stream,'id,'timestamp as 'ts,'temperature)

    val myAggregateFunction = new MyAggregateFunction

    //table API 写法
    table
      .groupBy('id)
      .aggregate(myAggregateFunction('temperature) as 'aggTemp)
      .select('id,'aggTemp)
      .toRetractStream[Row]
 //     .print()


    //SQL 写法   需要先注册函数

    tEnv.createTemporaryView("sensor",table)

    tEnv.registerFunction("aggregate",myAggregateFunction)

    tEnv.sqlQuery(" select id,aggregate(temperature) from sensor group by id")
        .toRetractStream[Row]
        .print()



env.execute()

  }

  // 创建一个累加器类,用来存在 传感器的 数量 和 温度总和

  class MyAccumulate{
   var  sum:Double = _
    var count:Int = _
  }


  

  class MyAggregateFunction extends AggregateFunction[Double ,MyAccumulate ]{
    override def getValue(acc: MyAccumulate): Double = acc.sum/acc.count


    override def createAccumulator(): MyAccumulate = new MyAccumulate


    def accumulate(myAccumulate: MyAccumulate,temp:Double) :Unit={

      myAccumulate.count +=1
      myAccumulate.sum += temp


    }
  }

}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/278816.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号