https://help.aliyun.com/document_detail/168049.html
2、Blink3.X版本完全兼容Flink1.5版本
https://help.aliyun.com/document_detail/111873.html
3、Datastream完全兼容开源Flink1.5.2版本
https://help.aliyun.com/document_detail/156813.html
4、GitHub上的Blink分支显示兼容Flink1.5.1
https://github.com/apache/flink/tree/blink
5、当前 Apache Flink 版本是 1.14.0
https://flink.apache.org/news/2021/09/29/release-1.14.0.html
查阅Apache Flink官网版本资料发现,对阿里内部版本 Blink 合并后的首次版本发布是Apache Flink 1.9.0
二、构建Blink3.3.0工程使用Maven3、IntelliJ IDEA构建开发环境
1、pom.xml文件2、Blink Datastream简单示例2.11.12 2.11 blink-3.3.0 1.8 1.8 1.8 com.alibaba.blink flink-core${blink.version} com.alibaba.blink flink-java${blink.version} com.alibaba.blink flink-streaming-java_${scala.binary.version}${blink.version} slf4j-api org.slf4j com.alibaba.blink flink-streaming-scala_${scala.binary.version}${blink.version} scala-library org.scala-lang com.alibaba.blink flink-table_2.11${blink.version} commons-codec commons-codec commons-collections commons-collections slf4j-api org.slf4j snappy-java org.xerial.snappy antlr4-runtime org.antlr junit junit4.8.1 test org.scala-lang scala-library2.11.12 org.slf4j slf4j-log4j121.7.7 runtime log4j log4j1.2.17 runtime org.antlr antlr4-runtime4.7.2
使用Socket产生流数据:
public class SocketServerTest {
public static void main(String[] args) throws Exception {
ServerSocket server = new ServerSocket(9000);
Socket client = server.accept();
System.out.println("客户端:" + client.getInetAddress().getHostAddress());
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(client.getOutputStream()));
while (true) {
bw.write((int) (Math.random() * 100) % 10 + "n");
bw.flush();
Thread.sleep(1000L);
}
}
}
Flink socketTextStream处理流数据:
public class StreamWordCountTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//生成DataSource
DataStream dataSource = env.socketTextStream("localhost", 9000, "n");
//解析数据:传入的数据使用Tuple2结构化,窗口周期5秒,按Tuple2第1个元素分组,第2个元素统计,
DataStream> windowCounts = dataSource
.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String value, Collector> out) {
System.out.println("value=" + value);
for (String word : value.split("\n")) {
out.collect(new Tuple2(word, 1));
}
}
})
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
windowCounts.print().setParallelism(1);
//执行
env.execute("StreamTest");
}
}
3、Blink Table简单示例
public class TableWordCountTest {
public static void main(String[] args) throws Exception {
//构建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = TableEnvironment.getBatchTableEnvironment(env);
//生成DataSource
DataStreamSource elementSource = env.fromElements(
new WordCount("Lily", 1),
new WordCount("Lily", 1),
new WordCount("Lily", 1),
new WordCount("Tom", 1),
new WordCount("Tom", 1),
new WordCount("Jack", 1));
//注册 WordCountTableTemp 表,设置字段
tEnv.registerBoundedStream("TEMP_TABLE", elementSource, "word, num");
Table table = tEnv.sqlQuery(
"SELECt word, SUM(num) as num FROM TEMP_TABLE GROUP BY word");
//输出
table.printSchema();
table.print();
//执行
tEnv.execute("TableTest");
}
public static class WordCount {
public String word;
public long num;
public WordCount() {
}
public WordCount(String word, long num) {
this.word = word;
this.num = num;
}
@Override
public String toString() {
return "WordCount{" +
"word='" + word + ''' +
", num=" + num +
'}';
}
}
}
三、总结
Blink 3.X 对应 Flink 1.5版本;
开源Flink1.9版本首次完成阿里Blink整合,但其实 Blink 的查询处理器的集成还没有完全完成;
开源Flink1.10版本完全整合Blink,增强了流式SQL处理能力和成熟的批处理能力;
开源Flink1.12版本,DataStream流批一体迈出了第一步;
流批一体化,也是大数据实时处理的趋势。



