栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink学习笔记-阿里云Blink

Flink学习笔记-阿里云Blink

因为项目环境限制,流处理引擎只能使用阿里云 Blink3.3.0 版本,翻阅阿里云官网 Blink 和 GitHub Flink 的Blink分支资料,成功构建并运行 Blink 的 DataStream 和 Table,记录在此作为笔记。 一、说明 1、据阿里云官网资料显示,Blink已经停售

https://help.aliyun.com/document_detail/168049.html

 

2、Blink3.X版本完全兼容Flink1.5版本

https://help.aliyun.com/document_detail/111873.html

 

3、Datastream完全兼容开源Flink1.5.2版本

https://help.aliyun.com/document_detail/156813.html

 

4、GitHub上的Blink分支显示兼容Flink1.5.1

https://github.com/apache/flink/tree/blink

 

5、当前 Apache Flink 版本是 1.14.0

https://flink.apache.org/news/2021/09/29/release-1.14.0.html

查阅Apache Flink官网版本资料发现,对阿里内部版本 Blink 合并后的首次版本发布是Apache Flink 1.9.0

二、构建Blink3.3.0工程

使用Maven3、IntelliJ IDEA构建开发环境

1、pom.xml文件

        2.11.12
        2.11
        blink-3.3.0
        1.8
        1.8
        1.8
    
    
     
        
            com.alibaba.blink
            flink-core
            ${blink.version}
            
        
        
            com.alibaba.blink
            flink-java
            ${blink.version}
            
        
        
            com.alibaba.blink
            flink-streaming-java_${scala.binary.version}
            ${blink.version}
            
                
                    slf4j-api
                    org.slf4j
                
            
            
        
        
            com.alibaba.blink
            flink-streaming-scala_${scala.binary.version}
            ${blink.version}
            
                
                    scala-library
                    org.scala-lang
                
            
            
        
        
            com.alibaba.blink
            flink-table_2.11
            ${blink.version}
            
                
                    commons-codec
                    commons-codec
                
                
                    commons-collections
                    commons-collections
                
                
                    slf4j-api
                    org.slf4j
                
                
                    snappy-java
                    org.xerial.snappy
                
                
                    antlr4-runtime
                    org.antlr
                
            
            
        
        
        
            junit
            junit
            4.8.1
            test
        
        
            org.scala-lang
            scala-library
            2.11.12
        
​
        
        
            org.slf4j
            slf4j-log4j12
            1.7.7
            runtime
        
        
            log4j
            log4j
            1.2.17
            runtime
        
        
            org.antlr
            antlr4-runtime
            4.7.2
        
    
2、Blink Datastream简单示例

使用Socket产生流数据:

public class SocketServerTest {
​
    public static void main(String[] args) throws Exception {
        ServerSocket server = new ServerSocket(9000);
        Socket client = server.accept();
        System.out.println("客户端:" + client.getInetAddress().getHostAddress());
        BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(client.getOutputStream()));
        while (true) {
            bw.write((int) (Math.random() * 100) % 10 + "n");
            bw.flush();
            Thread.sleep(1000L);
        }
    }
}

Flink socketTextStream处理流数据:

public class StreamWordCountTest {
​
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //生成DataSource
        DataStream dataSource = env.socketTextStream("localhost", 9000, "n");
        //解析数据:传入的数据使用Tuple2结构化,窗口周期5秒,按Tuple2第1个元素分组,第2个元素统计,
        DataStream> windowCounts = dataSource
                .flatMap(new FlatMapFunction>() {
                    @Override
                    public void flatMap(String value, Collector> out) {
                        System.out.println("value=" + value);
                        for (String word : value.split("\n")) {
                            out.collect(new Tuple2(word, 1));
                        }
                    }
                })
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);
​
        windowCounts.print().setParallelism(1);
​
        //执行
        env.execute("StreamTest");
    }
}
3、Blink Table简单示例
public class TableWordCountTest {
​
    public static void main(String[] args) throws Exception {
​
        //构建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment tEnv = TableEnvironment.getBatchTableEnvironment(env);
​
        //生成DataSource
        DataStreamSource elementSource = env.fromElements(
                new WordCount("Lily", 1),
                new WordCount("Lily", 1),
                new WordCount("Lily", 1),
                new WordCount("Tom", 1),
                new WordCount("Tom", 1),
                new WordCount("Jack", 1));
​
        //注册 WordCountTableTemp 表,设置字段
        tEnv.registerBoundedStream("TEMP_TABLE", elementSource, "word, num");
​
        Table table = tEnv.sqlQuery(
                "SELECt word, SUM(num) as num FROM TEMP_TABLE GROUP BY word");
        //输出
        table.printSchema();
        table.print();
        //执行
        tEnv.execute("TableTest");
    }
​
​
    public static class WordCount {
        public String word;
        public long num;
​
        public WordCount() {
        }
​
        public WordCount(String word, long num) {
            this.word = word;
            this.num = num;
        }
​
        @Override
        public String toString() {
            return "WordCount{" +
                    "word='" + word + ''' +
                    ", num=" + num +
                    '}';
        }
    }
}
三、总结

Blink 3.X 对应 Flink 1.5版本;

开源Flink1.9版本首次完成阿里Blink整合,但其实 Blink 的查询处理器的集成还没有完全完成;

开源Flink1.10版本完全整合Blink,增强了流式SQL处理能力和成熟的批处理能力;

开源Flink1.12版本,DataStream流批一体迈出了第一步;

流批一体化,也是大数据实时处理的趋势。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/327175.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号