栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink--Table API 之表聚合函数(TableAggregateFunction)

Flink--Table API 之表聚合函数(TableAggregateFunction)

自定义表聚合函数步骤如下:

自定义一个类继承TableAggregateFunction类,实现三个方法:

1.createAccumulator :用来初始化累加器

2.accumulate:对于每一个新来的数据,调用该方法来更新累加器的值

3.emitValue输出结果,使用 Collector对象的  .collect 方法输出

例子:

自定义一个表聚合函数,来更新获取每个传感器最大的两个温度

package com.ct.day08

import com.ct.day01.SensorSource
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.functions.TableAggregateFunction
import org.apache.flink.util.Collector
import org.apache.flink.types.Row


object TableAggregateFunctionExample {

  def main(args: Array[String]): Unit = {


    val  env = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)

    val settings = EnvironmentSettings
      .newInstance()
      .inStreamingMode()
      .useBlinkPlanner()
      .build()

    val tEnv = StreamTableEnvironment.create(env,settings)


    val stream = env.addSource(new SensorSource)


    val table: Table = tEnv.fromDataStream(stream,'id,'timestamp as 'ts,'temperature)

    val  top2Agg = new Top2Agg

    //table API形式

    table
      .groupBy('id)
      .flatAggregate(top2Agg('temperature) as ('temp,'rank))
      .select('id,'temp,'rank)
      .toRetractStream[Row]
      .print()



    env.execute()

  }

  class Top2Accumulator{
    var firstTemp : Double =0.0
    var secondTemp:Double =0.0
  }


  

  class Top2Agg extends TableAggregateFunction[(Double,Int),Top2Accumulator]{
    override def createAccumulator(): Top2Accumulator = new Top2Accumulator

    //传入两个参数, 一个是累加器,另一个是 温度值
    def accumulate(top2Acc:Top2Accumulator,temp:Double) :Unit={

      if(temp > top2Acc.firstTemp){
        top2Acc.secondTemp = top2Acc.firstTemp
        top2Acc.firstTemp=temp
      }
      else if(temp > top2Acc.secondTemp){
        top2Acc.secondTemp=temp
      }

    }

    //传入两个参数, 累加器  和  collector 对象,泛型即是输出数据的泛型
    def  emitValue(top2Acc:Top2Accumulator,out:Collector[(Double,Int)]):Unit={

      out.collect(top2Acc.firstTemp,1)
      out.collect(top2Acc.secondTemp,2)

    }

  }


}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/281551.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号