在之前的Join算子中,一般使用的是coGroup算子,因为一个算子可以提供多种语义,但是也是有一些弊端的。因为coGroup只能实现在同一个窗口的两个数据流之间进行join,在实际的计算过程中,往往会遇到当req发生时,resp迟迟无法响应,这个时候,就会出现一个跨窗口的问题。也就是说经常会出现数据乱序,或者数据延迟的情况,导致两个流的数据是不同步的,也就会导致,join的过程中丢失数据问题。不在同一个窗口中的数据无法join,这个问题flink官方提供了另外一种join的方式,也就是interval join。他的核心思想就是,将两个流通过keyed分区。然后,按照key 在一个相对的时间段内进行Join。
Interval join用一个公共键连接两个流的元素(我们现在称它们为A&B),其中流B的元素的时间戳位于流A中元素的时间戳的相对时间间隔内。
这也可以更正式地表达为 b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound]ora.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound
这里要表达的意思也就是说,当两个流进行join的时候,会根据左流的时间戳在右流中寻找公共键。
其中a和b是a和b中共享公共密钥的元素。只要下限始终小于或等于上限,下限和上限都可以是负值或正值。间隔联接当前仅执行内部联接。
当一对元素传递给ProcessJoinFunction时,它们将被分配有两个元素的较大时间戳(可以通过ProcessJoinFunction.context访问)。
注意:Interval Join 仅支持event time
在上面的例子中,我们将两个流“橙色”和“绿色”连接起来,下限为-2毫秒,上限为+1毫秒。默认情况下,这些边界是包含的,但是。lowerBoundExclusive()和upperBoundExclusive可用于更改是否包含上下界。
再次使用更正式的符号,这将转化为
orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound
如三角形所示。
2、代码实现import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction; import org.apache.flink.streaming.api.windowing.time.Time; ... DataStream3、Interval Join源码实现原理orangeStream = ... DataStream greenStream = ... orangeStream .keyBy( ) .intervalJoin(greenStream.keyBy( )) .between(Time.milliseconds(-2), Time.milliseconds(1)) .process (new ProcessJoinFunction out) { out.collect(first + "," + second); } });
在Interval Join的实现当中,其中的核心实现为IntervalJoinOperator类,这个类提供了执行IntervalJoin的核心逻辑
构造方法
public IntervalJoinOperator( long lowerBound, long upperBound, boolean lowerBoundInclusive, boolean upperBoundInclusive, TypeSerializerleftTypeSerializer, TypeSerializer rightTypeSerializer, ProcessJoinFunction udf) { super(Preconditions.checkNotNull(udf)); Preconditions.checkArgument(lowerBound <= upperBound, "lowerBound <= upperBound must be fulfilled"); // Move buffer by +1 / -1 depending on inclusiveness in order not needing // to check for inclusiveness later on // 这里根据是否包含上下界,来进行判断,是否执行 +1 或者 -1 操作 this.lowerBound = (lowerBoundInclusive) ? lowerBound : lowerBound + 1L; this.upperBound = (upperBoundInclusive) ? upperBound : upperBound - 1L; }
初始化状态
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
//构建 左流缓冲区,类型为keyedState的MapState 其中时间戳是key,值为BufferEntry 类型的List ArrayList
this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
LEFT_BUFFER,
LongSerializer.INSTANCE,
new ListSerializer<>(new BufferEntrySerializer<>(leftTypeSerializer))
));
//构建右流缓冲区 类型为keyedState的MapState 其中时间戳是key,值为BufferEntry类型的List ArrayList
this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
RIGHT_BUFFER,
LongSerializer.INSTANCE,
new ListSerializer<>(new BufferEntrySerializer<>(rightTypeSerializer))
));
}
处理左流和右流的数据
@Override public void processElement1(StreamRecordrecord) throws Exception { //左流 processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true); } @Override public void processElement2(StreamRecord record) throws Exception { //右流 processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false); }
我们可以明显的看出来,左流和右流的处理,都是依靠于processElement方法。
privatevoid processElement( final StreamRecord record, final MapState >> ourBuffer, final MapState >> otherBuffer, final long relativeLowerBound, final long relativeUpperBound, final boolean isLeft) throws Exception { //获取当前流的值,可以是左流也可以是右流 final THIS ourValue = record.getValue(); //获取当前元素的时间戳,左流 or 右流 final long ourTimestamp = record.getTimestamp(); //是否迟到,是否小于 当前的 watermark if (isLate(ourTimestamp)) { return; } //将该方法的实现写到下方了,这里的意思是将当前的元素写入,当前key 所属的 state中,也就是左流keyedstate 或者右流keyed state //addToBuffer(ourBuffer, ourValue, ourTimestamp); List > elemsInBucket = ourBuffer.get(ourTimestamp); if (elemsInBucket == null) { elemsInBucket = new ArrayList<>(); } elemsInBucket.add(new BufferEntry<>(ourValue, false)); ourBuffer.put(ourTimestamp, elemsInBucket); //遍历 其他流的state 。 for (Map.Entry >> bucket: otherBuffer.entries()) { final long timestamp = bucket.getKey(); //如果时间不在范围内 则看一下保存的元素 if (timestamp < ourTimestamp + relativeLowerBound || timestamp > ourTimestamp + relativeUpperBound) { continue; } //如果在说明有值啊,当前值对应 other 多个元素的时候,会执行for循环,也就是 1 x n 的输出 for (BufferEntry entry: bucket.getValue()) { if (isLeft) { collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp); } else { collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp); } } } //计算清理时间是否到了,到了的话就清理 long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp; if (isLeft) { internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime); } else { internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime); } }
经过我们上述的分析,可以分析出来,Interval Join 是一种依赖于EventTime的一种join方式,它将左流和右流相同的key的数据按照时间戳来进行存储在不同的缓存里面,leftBuffer 和rightBuffer。
运行思路是这样的。
首先进来一个元素,这个元素可能是左流也可能是右流的元素,然后校验是否过期,过期就丢弃,不过期继续处理。
然后将他放入到左流或右流单独所属的cache中,时间戳为key,然后判断时间段,和他所对应的缓存 里面是否有值,如果有则返回,没有则等待右流将他唤醒。
然后会判断时间戳到达什么位置了。是否到了该清理的时候,如果到了,则会按照时间戳来进行清理。
public void onEventTime(InternalTimertimer) throws Exception { long timerTimestamp = timer.getTimestamp(); String namespace = timer.getNamespace(); logger.trace("onEventTime @ {}", timerTimestamp); switch (namespace) { case CLEANUP_NAMESPACE_LEFT: { long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound; logger.trace("Removing from left buffer @ {}", timestamp); leftBuffer.remove(timestamp); break; } case CLEANUP_NAMESPACE_RIGHT: { long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp; logger.trace("Removing from right buffer @ {}", timestamp); rightBuffer.remove(timestamp); break; } default: throw new RuntimeException("Invalid namespace " + namespace); } }
到这里,我想我们对Interval Join 有了一些深入的理解了。
1、根据时间段来进行join,可以处于边界,也可以不处于边界
2、根据双cache来进行存储数据,以及根据keyed来进行join逻辑的实现
3、他是内连接的,目前不支持左外链接,想做的话,可以手动指定清理策略(改源码重新打包,或者基于双亲委派机制的在项目中添加对应的类,来进行改造)。



