栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink ProcessWindowFunction 和WindowFunction使用

Flink ProcessWindowFunction 和WindowFunction使用

错误提示一:

overloaded method value aggregate with alternatives:
  [ACC, V, R](preAggregator: org.apache.flink.api.common.functions.AggregateFunction[org.example.hot.items.UserBehavior,ACC,V], windowFunction: org.apache.flink.streaming.api.scala.function.ProcessWindowFunction[V,R,org.apache.flink.api.java.tuple.Tuple,org.apache.flink.streaming.api.windowing.windows.TimeWindow])(implicit evidence$13: org.apache.flink.api.common.typeinfo.TypeInformation[ACC], implicit evidence$14: org.apache.flink.api.common.typeinfo.TypeInformation[V], implicit evidence$15: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] 
  [ACC, V, R](preAggregator: org.apache.flink.api.common.functions.AggregateFunction[org.example.hot.items.UserBehavior,ACC,V], windowFunction: (org.apache.flink.api.java.tuple.Tuple, org.apache.flink.streaming.api.windowing.windows.TimeWindow, Iterable[V], org.apache.flink.util.Collector[R]) => Unit)(implicit evidence$10: org.apache.flink.api.common.typeinfo.TypeInformation[ACC], implicit evidence$11: org.apache.flink.api.common.typeinfo.TypeInformation[V], implicit evidence$12: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] 
  [ACC, V, R](preAggregator: org.apache.flink.api.common.functions.AggregateFunction[org.example.hot.items.UserBehavior,ACC,V], windowFunction: org.apache.flink.streaming.api.scala.function.WindowFunction[V,R,org.apache.flink.api.java.tuple.Tuple,org.apache.flink.streaming.api.windowing.windows.TimeWindow])(implicit evidence$7: org.apache.flink.api.common.typeinfo.TypeInformation[ACC], implicit evidence$8: org.apache.flink.api.common.typeinfo.TypeInformation[V], implicit evidence$9: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R]
 cannot be applied to (org.example.hot.items.CountAgg, org.example.hot.items.MyWindowFunction)
      .aggregate(new CountAgg(), new MyWindowFunction() )

1.利用算子aggregate()进行聚合操作

class CountAgg extends AggregateFunction[UserBehavior, Long, Long]{
  override def createAccumulator(): Long = 0L

  override def add(in: UserBehavior, acc: Long): Long = {
    acc + 1L
  }

  override def getResult(acc: Long): Long = {
    acc
  }

  override def merge(acc: Long, acc1: Long): Long = acc + acc1
}



class MyWindowFunction extends WindowFunction[Long, ItemViewCount, Long, TimeWindow] {
  override def apply(key: Long, w: TimeWindow, iterable: lang.Iterable[Long], collector: Collector[ItemViewCount]): Unit = {
    val itemId = key
    val windowEnd = w.getEnd
    val count = iterable.iterator.next()
    collector.collect( ItemViewCount(itemId, windowEnd, count) )
  }
}


// DataStream数据流处理逻辑,分组、开窗、聚合
val aggStream: DataStream[ItemViewCount] = dataStream
      .filter(_.behavior == "pv")
      .keyBy("itemId")
      .window(SlidingEventTimeWindows.of(Time.seconds(2L), Time.seconds(1L)))
      .aggregate(new CountAgg(), new MyWindowFunction() )

2.查找问题原因

根据报错信息提示为类型不匹配,keyBy()算子之后的得到的KeyedStream[T,K]数据流的K的类型为org.apache.flink.api.java.tuple.Tuple,而MyWindowFunction的泛型给的是Long类型,分析原因为keyBy()算子使用的方式问题,在Flink-1.14.3版本中,有3种方式:

@deprecated("use [[DataStream.keyBy(KeySelector)]] instead")
def keyBy(fields: Int*): KeyedStream[T, JavaTuple]
@deprecated("use [[DataStream.keyBy(KeySelector)]] instead")
def keyBy(firstField: String, otherFields: String*): KeyedStream[T, JavaTuple]
def keyBy[K: TypeInformation](fun: T => K): KeyedStream[T, K]

上面代码中使用的是def keyBy(firstField: String, otherFields: String*): KeyedStream[T, JavaTuple]这种方式,所以返回的K值类型为org.apache.flink.api.java.tuple.Tuple类型,而不是Long类型。

这里我将keyBy()算子的使用方式改为def keyBy[K: TypeInformation](fun: T => K): KeyedStream[T, K],因为上面两种已经将要过期了,个人习惯问题。修改后的处理逻辑代码如下:

// DataStream数据流处理逻辑,分组、开窗、聚合
val aggStream: DataStream[ItemViewCount] = dataStream
      .filter(_.behavior == "pv")
      .keyBy(_.itemId)
      .window(SlidingEventTimeWindows.of(Time.seconds(2L), Time.seconds(1L)))
      .aggregate(new CountAgg(), new MyWindowFunction() )

错误提示二:

overloaded method value aggregate with alternatives:
  [ACC, V, R](preAggregator: org.apache.flink.api.common.functions.AggregateFunction[org.example.hot.items.UserBehavior,ACC,V], windowFunction: org.apache.flink.streaming.api.scala.function.ProcessWindowFunction[V,R,Long,org.apache.flink.streaming.api.windowing.windows.TimeWindow])(implicit evidence$13: org.apache.flink.api.common.typeinfo.TypeInformation[ACC], implicit evidence$14: org.apache.flink.api.common.typeinfo.TypeInformation[V], implicit evidence$15: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] 
  [ACC, V, R](preAggregator: org.apache.flink.api.common.functions.AggregateFunction[org.example.hot.items.UserBehavior,ACC,V], windowFunction: (Long, org.apache.flink.streaming.api.windowing.windows.TimeWindow, Iterable[V], org.apache.flink.util.Collector[R]) => Unit)(implicit evidence$10: org.apache.flink.api.common.typeinfo.TypeInformation[ACC], implicit evidence$11: org.apache.flink.api.common.typeinfo.TypeInformation[V], implicit evidence$12: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] 
  [ACC, V, R](preAggregator: org.apache.flink.api.common.functions.AggregateFunction[org.example.hot.items.UserBehavior,ACC,V], windowFunction: org.apache.flink.streaming.api.scala.function.WindowFunction[V,R,Long,org.apache.flink.streaming.api.windowing.windows.TimeWindow])(implicit evidence$7: org.apache.flink.api.common.typeinfo.TypeInformation[ACC], implicit evidence$8: org.apache.flink.api.common.typeinfo.TypeInformation[V], implicit evidence$9: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R]
 cannot be applied to (org.example.hot.items.CountAgg, org.example.hot.items.MyWindowFunction)
      .aggregate(new CountAgg(), new MyWindowFunction() )

经过上面keyBy()算子使用方式修改过后,依然提示上述错误,经反复检查代码发现,在MyWindowFunciton上继承的WindowFunction引入的包类型错误,引入包提示有两个

org.apache.flink.streaming.api.scala.function.WindowFunction
org.apache.flink.streaming.api.functions.windowing.WindowFunction

正确引入的应该是第一个类路径:org.apache.flink.streaming.api.scala.function.WindowFunction

正确引入后,重新编写MyWindowFunction的override def process()方法

class MyWindowFunction extends WindowFunction[Long, ItemViewCount, Long, TimeWindow] {
  override def apply(key: Long, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit = {
    val itemId = key
    val windowEnd = window.getEnd
    val count = input.iterator.next()
    out.collect( ItemViewCount(itemId, windowEnd, count) )
  }
}

总结:

1.keyBy()算子使用方式:具体可以查看DataStream[T]源码

2.WindowFunction类import包路径问题:

        org.apache.flink.streaming.api.scala.function.WindowFunction

注:在Flink-1.14.3版本中推荐使用ProcessWindowFunction类代替WindowFunction类

class MyProcessWindowFunction extends ProcessWindowFunction[Long, ItemViewCount, Long, TimeWindow]{
  override def process(key: Long, context: Context, elements: Iterable[Long], out: Collector[ItemViewCount]): Unit = {
    val itemId = key
    val windowEnd = context.window.getEnd
    val count = elements.iterator.next()
    out.collect( ItemViewCount(itemId, windowEnd, count) )
  }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/735722.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号