- Flink的详细使用
- 1. 安装部署
- 安装
- 2. 执行任务
- Standalone 模式
- 启动/停止
- 执行任务
- Yarn 模式
- Session-cluster 模式
- 启动 yarn-session
- 执行任务
- Per-Job-Cluster 模式
- 3. 执行环境
- Environment
- getExecutionEnvironment(常用)
- createLocalEnvironment
- createRemoteEnvironment
- Source、Sink
- Transform(算子)
- map
- flatMap
- Filter
- KeyBy
- 滚动聚合算子
- Reduce
- Split 和 Select
- Connect 和 CoMap
- Union
- 广播(broadcast)
- 4. Window
- TimeWindow
- CountWindow
- 5. 时间语义与 Watermark
- 时间语义
- Watermark(和 Window 一起使用)
- 乱序时间的引用
- 顺序时间的引用
- 6. 状态管理
- 键控状态(keyed state)
- Keyed State 支持数据类型
- 例子:ValueState
- 7. ProcessFunction API
- KeyedProcessFunction
- TimerService 和 定时器(Timers)
- 侧输出流(SideOutput)
- 8. 检查点(CheckPoint)
- 9. 状态一致性
- 分类
- 端到端 exactly-once
- 事务写入
- 预写日志(不常用)
- 两阶段提交
- Flink+Kafka 端到端状态一致性的保证
- Maven(pom.xml)
-
第一步:将 flink-1.10.1-bin-scala_2.12.tgz 上传到服务器中并解压缩
-
第二步:修改 conf/flink-conf.yaml 文件
# 修改 jobmanager.rpc.address 参数,修改为 jobmanager 的机器 jobmanager.rpc.address: hadoop151
-
第三步:修改 conf/slaves 文件
# slave 机器 hadoop152 hadoop153
-
第四步:将 flink 整个目录分发到其他机器上
-
命令
# 启动 bin/start-cluster.sh # 停止 bin/stop-cluster.sh
-
访问 web 页面
- http://hadoop151:8081
# =================== 启动任务 =================== bin/flink run -c 全限定类名 –p 分区个数 jar包 # 示例 bin/flink run -c com.itfzk.flink.wordcount.KafkaStreamWordCount -p 3 FlinkStudyDemo-1.0-SNAPSHOT-jar-with-dependencies.jar # =================== 停止任务 =================== bin/flink cancel JobId # 示例 bin/flink cancel f69fbd0650ae4202b2a46b3ad2089606Yarn 模式 Session-cluster 模式 启动 yarn-session
-
命令
# =================== 启动 yarn-session =================== # -n(--container):TaskManager 的数量 # -s(--slots): 每个 TaskManager 的 slot 数量,默认一个 slot 一个 core,默认每个 taskmanager 的 slot 的个数为 1,有时可以多一些 taskmanager,做冗余 # -jm:JobManager 的内存(单位 MB) # -tm:每个 taskmanager 的内存(单位 MB) # -nm:yarn 的 appName(现在 yarn 的 ui 上的名字) # -d:后台执行 bin/yarn-session.sh -n 2 -s 2 -jm 1024 -tm 1024 -nm test -d # =================== 停止 yarn-session =================== yarn application -kill Application-Id # 示例 yarn application -kill application_1633171918776_0003
-
访问 web 页面
- 启动 yarn-session 后会出现 web 地址,例如:http://hadoop153:42189
# =================== 启动任务 =================== bin/flink run -c 全限定类名 –p 分区个数 jar包 # 示例 bin/flink run -c com.itfzk.flink.wordcount.KafkaStreamWordCount -p 3 FlinkStudyDemo-1.0-SNAPSHOT-jar-with-dependencies.jar # =================== 停止任务 =================== bin/flink cancel JobId # 示例 bin/flink cancel f69fbd0650ae4202b2a46b3ad2089606Per-Job-Cluster 模式
# =================== 启动任务 =================== bin/flink run –m yarn-cluster -c 全限定类名 –p 分区个数 jar包 # 示例 bin/flink run –m yarn-cluster -c com.itfzk.flink.wordcount.KafkaStreamWordCount -p 3 FlinkStudyDemo-1.0-SNAPSHOT-jar-with-dependencies.jar # =================== 停止任务 =================== bin/flink cancel JobId # 示例 bin/flink cancel f69fbd0650ae4202b2a46b3ad20896063. 执行环境 Environment getExecutionEnvironment(常用)
-
创建一个执行环境,表示当前执行程序的上下文。 getExecutionEnvironment 会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式
// 普通运行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 流式运行环境(常用) StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
返回本地执行环境,需要在调用时指定默认的并行度
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-
返回集群执行环境,将 Jar 提交到远程服务器。需要在调用时指定 JobManager的 IP 和端口号,并指定要在集群中运行的 Jar 包
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", 6123, "YOURPATH//WordCount.jar");
- 官网地址
- 官网地址
DataStreamflatMapmapStram = dataStream.map(new MapFunction () { public Integer map(String value) throws Exception { } });
DataStreamFilterflatMapStream = dataStream.flatMap(new FlatMapFunction () { public void flatMap(String value, Collector out) throws Exception { } });
DataStreamKeyByfilterStream = dataStream.filter(new FilterFunction () { public boolean filter(String value) throws Exception { } });
- DataStream → KeyedStream
- 将一个流拆分成不相交的分区,每个分区包含具有相同 key 的元素,在内部以 hash 的形式实现的
- sum()
- max()
- min()
- maxBy()
- minBy()
- KeyedStream → DataStream
- 一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果
Split
- DataStream → SplitStream
- 根据某些特征把一个 DataStream 拆分成两个或者多个 DataStream
Select
- SplitStream→DataStream
- 从一个 SplitStream 中获取一个或者多个DataStream
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String filePath = "E:\~fzk\java\IDEA\bigdata\FlinkStudyDemo\test\test1";
DataStream inputDataStream = env.readTextFile(filePath);
DataStream map = inputDataStream.map(new MapFunction() {
public SensorsData map(String value) throws Exception {
String[] splits = value.split(" ");
return new SensorsData(splits[0], new Long(splits[1]), new Double(splits[2]));
}
});
KeyedStream keyedStream = map.keyBy("id");
// split:分流
SplitStream splitStream = keyedStream.split(new OutputSelector() {
public Iterable select(SensorsData value) {
return value.getWendu() > 37 ? Collections.singletonList("h") : Collections.singletonList("d");
}
});
// select:选择一个或多个 DataStream
DataStream resultDataStream = splitStream.select("d");
env.execute();
}
Connect 和 CoMap
Connect
- DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被 Connect 之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立
CoMap
- ConnectedStreams → DataStream:作用于 ConnectedStreams 上,功能与 map 和 flatMap 一样,对 ConnectedStreams 中的每一个 Stream 分别进行 map 和 flatMap处理
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String filePath = "E:\~fzk\java\IDEA\bigdata\FlinkStudyDemo\test\test1";
DataStream inputDataStream = env.readTextFile(filePath);
DataStream map = inputDataStream.map(new MapFunction() {
public SensorsData map(String value) throws Exception {
String[] splits = value.split(" ");
return new SensorsData(splits[0], new Long(splits[1]), new Double(splits[2]));
}
});
KeyedStream keyedStream = map.keyBy("id");
SplitStream splitStream = keyedStream.split(new OutputSelector() {
public Iterable select(SensorsData value) {
return value.getWendu() > 37 ? Collections.singletonList("high") : Collections.singletonList("low");
}
});
DataStream highDataStream = splitStream.select("high");
DataStream lowDataStream = splitStream.select("low");
// connect & CoMapFunction:合流
ConnectedStreams connectedStreams = highDataStream.connect(lowDataStream);
DataStream
Union
- DataStream → DataStream:对两个或者两个以上的 DataStream 进行 union 操作,产生一个包含所有 DataStream 元素的新 DataStream
- 连接流的类型一样
-
DataStream → DataStream:向每个分区广播元素
dataStream.broadcast();
官网地址
-
TimeWindow:按照时间生成 Window
-
CountWindow:按照指定的数据条数生成一个 Window,与时间无关
// 窗口大小为 10s,每次滑动 5s DataStreamCountWindow> resultDataStream = stringDataSource .flatMap(new MyFlatMapFunction()) .keyBy(0) .timeWindow(Time.seconds(10), Time.seconds(5)) .sum(1);
// 窗口大小为 10 个数,每次滑动 5 个数 DataStream5. 时间语义与 Watermark 时间语义> resultDataStream = stringDataSource .flatMap(new MyFlatMapFunction()) .keyBy(0) .countWindow(10, 5) .sum(1);
- Event Time:是事件创建的时间
- Ingestion Time:是数据进入 Flink 的时间
- Processing Time:是每一个执行基于时间操作的算子的本地系统时间
时间语义的引入
// EventTime env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // IngestionTime env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); // ProcessingTime env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);Watermark(和 Window 一起使用)
- Watermark 是一种衡量 Event Time 进展的机制
- Watermark 是用于处理乱序事件的,而正确的处理乱序事件,通常用 Watermark 机制结合 window 来实现
- Watermark 可以理解成一个延迟触发机制,我们可以设置 Watermark 的延时 时长 t,每次系统会校验已经到达的数据中最大的 maxEventTime,然后认定 eventTime 小于 maxEventTime - t 的所有数据都已经到达,如果有窗口的停止时间等于 maxEventTime – t,那么这个窗口被触发执行
-
接口:AssignerWithPeriodicWatermarks
-
使用前设置时间语义
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置时间语义 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //周期性的生成 watermar,默认周期是 200 毫秒 env.getConfig().setAutoWatermarkInterval(5000); String filePath = "E:\~fzk\java\IDEA\bigdata\FlinkStudyDemo\test\test1"; DataStreaminputDataStream = env.readTextFile(filePath); DataStream map = inputDataStream.map(new MapFunction () { public SensorsData map(String value) throws Exception { String[] splits = value.split(" "); return new SensorsData(splits[0], new Long(splits[1]), new Double(splits[2])); } }); //乱序时间情况下的 watermark //Time.milliseconds(1000) :延迟时间,1000ms DataStream eventTimeDataStream = map.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor (Time.milliseconds(1000)) { @Override public long extractTimestamp(SensorsData element) { return element.getTimestamp(); } }); env.execute(); } // 类 public class SensorsData { private String id; private Long timestamp; private double wendu; }
-
接口:AssignerWithPunctuatedWatermarks
-
使用前设置时间语义
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置时间语义 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); String filePath = "E:\~fzk\java\IDEA\bigdata\FlinkStudyDemo\test\test1"; DataStreaminputDataStream = env.readTextFile(filePath); DataStream map = inputDataStream.map(new MapFunction () { public SensorsData map(String value) throws Exception { String[] splits = value.split(" "); return new SensorsData(splits[0], new Long(splits[1]), new Double(splits[2])); } }); //顺序时间情况下的 watermark DataStream eventTimeDataStream = map.assignTimestampsAndWatermarks(new AscendingTimestampExtractor () { @Override public long extractAscendingTimestamp(SensorsData element) { return element.getTimestamp(); } }); env.execute(); } // 类 public class SensorsData { private String id; private Long timestamp; private double wendu; }
- 键控状态是根据输入数据流中定义的键(key)来维护和访问的。Flink 为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个 key 对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的 key。因此,具有相同 key 的所有数据都会访问相同的状态。Keyed State 很类似于一个分布式的 key-value map 数据结构,只能用于 KeyedStream(keyBy 算子处理之后)
- 存储一份状态值
- ValueState
保存单个的值,值的类型为 T - get 操作: ValueState.value()
- set 操作: ValueState.update(T value)
- ListState
保存一个列表,列表里的元素的数据类型为 T - ListState.add(T value)
- ListState.addAll(List
values) - ListState.get()返回 Iterable
- ListState.update(List
values)
- MapState
保存 Key-Value 对 - MapState.get(UK key)
- MapState.put(UK key, UV value)
- MapState.contains(UK key)
- MapState.remove(UK key)
-
我们可以利用 Keyed State,实现这样一个需求:检测传感器的温度值,如果连续的两个温度差值超过 10 度,就输出报警
public class Test { public static void main(String[] args) throws Exception { //创建执行环节 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreaminputDataStream = env.socketTextStream("localhost", 9999); SingleOutputStreamOperator myBeanDataStream = inputDataStream.map(new MapFunction () { @Override public MyBean map(String s) throws Exception { String[] split = s.split(" "); return new MyBean(split[0], Double.valueOf(split[1])); } }); SingleOutputStreamOperator > resultDataStream = myBeanDataStream .keyBy((KeySelector ) data -> data.getId()) .flatMap(new MyRichFlatMapFunction(10.0)); resultDataStream.print(); env.execute(); } } // 富方法:存储状态值 public class MyRichFlatMapFunction extends RichFlatMapFunction > { private ValueState myValueState; private Double abs; public MyRichFlatMapFunction(Double abs) { this.abs = abs; } @Override public void open(Configuration parameters) throws Exception { // 创建状态值 myValueState = getRuntimeContext().getState(new ValueStateDescriptor ("my-flatmap", Double.class)); } @Override public void flatMap(MyBean myBean, Collector > collector) throws Exception { // 获取状态值 Double lastWendu = myValueState.value(); if(lastWendu != null){ double absWebdu = Math.abs(myBean.getWendu() - lastWendu); if (absWebdu > abs){ collector.collect(new Tuple3<>(myBean.getId(), lastWendu, myBean.getWendu())); } } // 修改状态值 myValueState.update(myBean.getWendu()); } @Override public void close() throws Exception { // 清除状态值 myValueState.clear(); } } // 实体类 public class MyBean { private String id; private Double wendu; }
- DataStream API 提供了一系列的 Low-Level 转换算子。可以访问时间戳、watermark 以及注册定时事件。还可以输出特定的一些事件,例如超时事件等
- Flink 提供了 8 个 Process Function
- ProcessFunction
- KeyedProcessFunction
- CoProcessFunction
- ProcessJoinFunction
- BroadcastProcessFunction
- KeyedBroadcastProcessFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
-
KeyedProcessFunction 用来操作 KeyedStream。KeyedProcessFunction 会处理流的每一个元素,输出为 0 个、1 个或者多个元素。所有的 Process Function 都继承自RichFunction 接口,所以都有 open()、close()和 getRuntimeContext()等方法。
-
KeyedProcessFunction
还额外提供了两个方法 - processElement
- 流中的每一个元素都会调用这个方法,调用结果将会放在 Collector 数据类型中输出。Context 可以访问元素的时间戳,元素的 key,以及 TimerService 时间服务。Context 还可以将结果输出到别的流(side outputs)
- onTimer
- 回调函数。当之前注册的定时器触发时调用。参数 timestamp 为定时器所设定的触发的时间戳。Collector 为输出结果的集合。onTimerContext 和processElement 的 Context 参数一样,提供了上下文的一些信息,例如定时器触发的时间信息(事件时间或者处理时间)
class MyKeyedProcessFunction extends KeyedProcessFunction
{ @Override public void open(Configuration parameters) throws Exception { } @Override public void processElement(MyBean myBean, KeyedProcessFunction .Context context, Collector collector) throws Exception { collector.collect(myBean); } @Override public void onTimer(long timestamp, KeyedProcessFunction .onTimerContext ctx, Collector out) throws Exception { } @Override public void close() throws Exception { } } - processElement
-
Context 和 onTimerContext 所持有的 TimerService 对象拥有以下方法
- long currentProcessingTime() :返回当前处理时间
- long currentWatermark() :返回当前 watermark 的时间戳
- void registerProcessingTimeTimer(long timestamp) :会注册当前 key 的processing time 的定时器。当 processing time 到达定时时间时,触发 timer。
- void registerEventTimeTimer(long timestamp) :会注册当前 key 的 event time 定时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
- void deleteProcessingTimeTimer(long timestamp) :删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行
- void deleteEventTimeTimer(long timestamp) :删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行
class MyKeyedProcessFunction extends KeyedProcessFunction
{ @Override public void processElement(MyBean myBean, KeyedProcessFunction .Context context, Collector collector) throws Exception { long currentProcessingTime = context.timerService().currentProcessingTime(); long currentWatermark = context.timerService().currentWatermark(); context.timerService().registerProcessingTimeTimer(10000l); context.timerService().registerEventTimeTimer(10000l); context.timerService().deleteProcessingTimeTimer(10000l); context.timerService().deleteEventTimeTimer(10000l); } }
-
process function 的 side outputs 功能可以产生多条流,并且这些流的数据类型可以不一样。一个 side output 可以定义为 OutputTag[X]对象,X 是输出流的数据类型。process function 可以通过 Context 对象发射一个事件到一个或者多个 side outputs
-
事例:监控传感器温度值,将温度值低于 30 度的数据输出到 side output
public class Test { public static void main(String[] args) throws Exception { //创建执行环节 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreaminputDataStream = env.socketTextStream("localhost", 9999); SingleOutputStreamOperator > myBeanDataStream = inputDataStream.map(new MapFunction >() { @Override public Tuple2 map(String s) throws Exception { String[] split = s.split(" "); return new Tuple2 (split[0], Double.valueOf(split[1])); } }); //定义侧输出流 OutputTag > outputTag = new OutputTag >("high-output") {}; // 使用自定义算子:ProcessFunction SingleOutputStreamOperator > resultDataStream = myBeanDataStream .process(new MyProcessFunction(30.0, outputTag)); resultDataStream.print("low-wendu"); // 获取侧输出流并输出 resultDataStream.getSideOutput(outputTag).print("high-wendu"); env.execute(); } private static class MyProcessFunction extends ProcessFunction , Tuple2 > { private Double wenduLimit; private OutputTag > outputTag; // 初始化 public MyProcessFunction(Double wenduLimit, OutputTag > outputTag) { this.wenduLimit = wenduLimit; this.outputTag = outputTag; } @Override public void processElement(Tuple2 myBean, ProcessFunction , Tuple2 >.Context context, Collector > collector) throws Exception { // 温度高于限制温度就将数据加入到侧输出流,否则正常输出 if(myBean.f1 > wenduLimit){ context.output(outputTag, myBean); }else { collector.collect(myBean); } } } }
- 在执行流应用程序期间,Flink 会定期保存状态的一致检查点
- 如果发生故障, Flink 将会使用最近的检查点来一致恢复应用程序的状态,并重新启动处理流程
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 检查点数据保存位置
// RocksDB
env.setStateBackend(new RocksDBStateBackend(new URI("hdfs:///tmp/check-point-test")));
// 文件
//env.setStateBackend(new FsStateBackend("file:///tmp/check-point-test"));
// 内存
//env.setStateBackend(new MemoryStateBackend());
// 检查点配置
// 设置 checkpoint 周期时间
env.enableCheckpointing(10000l);
// 设置 exactly-once 模式
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 设置检查点在被丢弃之前可能需要的最长时间
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 设置检查点尝试之间的最小暂停
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// 设置可能同时进行的检查点尝试的最大次数
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 使检查点能够在外部持久化,当拥有的作业失败或暂停时
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 重启策略
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 60000));
}
9. 状态一致性
分类
- AT-MOST-ONCE(最多一次)
- 当任务故障时,最简单的做法是什么都不干,既不恢复丢失的状态,也不重播丢失的数据。最多处理一次事件
- AT-LEAST-ONCE(至少一次)
- 在大多数的真实应用场景,我们希望不丢失事件。所有的事件都得到了处理,而一些事件还可能被处理多次
- EXACTLY-ONCE(精确一次)
- 恰好处理一次语义不仅仅意味着没有事件丢失,还意味着针对每一个数据,内部状态仅仅更新一次
- 内部保证 :checkpoint
- source :可重设数据的读取位置
- sink :从故障恢复时,数据不会重复写入外部系统
- 幂等写入
- 事务写入
- 构建的事务对应着 checkpoint,等到 checkpoint 真正完成的时候,才把所有对应的结果写入 sink 系统中
- 实现方式
- 预写日志
- 两阶段提交
- 把结果数据先当成状态保存,然后在收到 checkpoint 完成的通知时,一次性写入 sink 系统
- 简单易于实现,由于数据提前在状态后端中做了缓存,所以无论什么sink 系统,都能用这种方式一批搞定
- DataStream API 提供了一个模板类:GenericWriteAheadSink,来实现这种事务性 sink
- 对于每个 checkpoint,sink 任务会启动一个事务,并将接下来所有接收的数据添加到事务里
- 然后将这些数据写入外部 sink 系统,但不提交它们 —— 这时只是“预提交”
- 当它收到 checkpoint 完成的通知时,它才正式提交事务,实现结果的真正写入
- 内部
- heckpoint 机制,把状态存盘,发生故障的时候可以恢复,保证内部的状态一致性
- source
- kafka consumer 作为 source,可以将偏移量保存下来,如果后续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据,保证一致性
- sink
- kafka producer 作为sink,采用两阶段提交 sink,需要实现一个 TwoPhaseCommitSinkFunction
org.apache.flink flink-clients_${scala.binary.version} ${flink.version} org.apache.flink flink-sql-connector-kafka_${scala.binary.version} ${flink.version} org.apache.flink flink-connector-kafka_${scala.binary.version} ${flink.version} kafka-clients org.apache.kafka org.apache.flink flink-streaming-java_${scala.binary.version} slf4j-api org.slf4j commons-collections commons-collections ${flink.version} org.slf4j slf4j-api 1.7.25 org.slf4j slf4j-simple 1.7.25 mysql mysql-connector-java 8.0.13 com.google.guava guava 18.0 com.google.code.gson gson 2.8.5 com.fasterxml.jackson.core jackson-databind 2.9.4 org.apache.httpcomponents httpclient 4.5.2 commons-logging commons-logging org.projectlombok lombok 1.18.4 com.jayway.jsonpath json-path 2.4.0 compile joda-time joda-time 2.9.9 junit junit 4.12 test asm asm 3.3.1 asm asm-commons 3.3.1 asm asm-util 3.3.1 cglib cglib-nodep 2.2.2 org.apache.flink flink-statebackend-rocksdb_${scala.binary.version} compile slf4j-api org.slf4j ${flink.version} com.beust jcommander 1.72 com.alibaba druid 1.1.21 commons-dbutils commons-dbutils 1.7 com.alibaba fastjson 1.2.72 org.apache.kafka kafka_${scala.binary.version} ${kafka.version} compile org.apache.commons commons-collections4 4.1 com.github.oshi oshi-core 3.5.0



