可以通过addSource方法来自定义SourceFunction,并可指定Timestamp和Watermark生成规则。addSource方法接收一个SourceFunction
SourceFunction中定义了一个run(SourceContext
- 数据源发送事件数据并生成Timestamp方法:collectWithTimestamp(T element,long timestamp)
element代表需发送的元素,timestamp代表这个元素对应的时间戳。该方法只在EventTime时有效,ProcessingTime时设置的timestamp直接忽略。 - 生成Watermark的方法:emitWatermark(Watermark mark)
当发送一个时间戳为T的mark时,表示该数据流上不会再有timestamp<=T的事件记录,一般来说,这个T是基于最大的timestamp来生成的,比如最大timestamp-1。
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import java.util.Arrays;
//在SourceFunction函数中,指定Timestamp和生成Watermark示例
public class Test {
public static void main(String[] args) throws Exception{
//创建流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置事件时间EventTime语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//并行度为1
env.setParallelism(1);
//演示数据
Tuple3[] input = {
Tuple3.of("user1", 1000L, 1),
Tuple3.of("user1", 1999L, 2),
Tuple3.of("user1", 2000L, 3),
Tuple3.of("user1", 2100L, 4),
Tuple3.of("user1", 2130L, 5)
};
//通过示例数据生成DataStream
DataStream> source = env.addSource(
//SourceFunction中进行时间戳分配和水位线生成
new SourceFunction>() {
@Override
public void run(SourceContext> ctx) throws Exception {
//遍历数据集
Arrays.asList(input).forEach(tp3 -> {
//指定时间戳
ctx.collectWithTimestamp(tp3, (long) tp3.f1);
System.out.println("collectWithTimestamp:"+ (long) tp3.f1);
//发送水位线,当前元素时间戳-1
ctx.emitWatermark(new Watermark((long) tp3.f1 - 1));
System.out.println("emitWatermark:"+ ((long) tp3.f1 - 1));
System.out.println("**************************************");
});
//代表结束标志
ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
}
@Override
public void cancel() {}
});
//结果打印
source.print();
//执行程序
env.execute();
}
}
上面程序每接收到一个事件数据都会调用生成Timestamp和Watermark,Watermark值为当前事件的Timestamp-1



