栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Flink之Java入门

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Flink之Java入门

介绍

Flink是一个处理流数据的组件,在实时计算等场景下可以发挥巨大的作用。
流数据一般分为:

  • 有界数据流(知道数据的起点和终点,例如一个txt文件的数据)
  • 无界数据流(不知道数据的终点,例如kafka消息、socket数据)
java demo 添加依赖

    1.8
    1.12.2


        
            org.apache.flink
            flink-clients_2.12
            ${flink.version}
        
        
            org.apache.flink
            flink-scala_2.12
            ${flink.version}
        
        
            org.apache.flink
            flink-java
            ${flink.version}
        
        
            org.apache.flink
            flink-streaming-scala_2.12
            ${flink.version}
        
        
            org.apache.flink
            flink-streaming-java_2.12
            ${flink.version}
        
        
            org.apache.flink
            flink-table-api-scala-bridge_2.12
            ${flink.version}
        
        
            org.apache.flink
            flink-table-api-java-bridge_2.12
            ${flink.version}
        


示例代码 无界数据流demo

配置netcat, 网上下载nc.exe 点我下载
在安装目录打开cmd,输入如下命令,配置端口为9000

nc -L -p 9000 -v

执行成功后如图所示

接下来编写代码,本demo实现了一个将字符串数据先按照逗号分割,然后转为大写的逻辑

package com.greenutility.mask.util;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class FlinkTest {

    public static void main(String[] args) throws Exception {
        // 1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置运行模式
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 2.加载数据源
        DataStreamSource elementsSource = env.socketTextStream("127.0.0.1", 9000);
        // 3.数据转换
        DataStream flatMap = elementsSource.flatMap(new FlatMapFunction() {
            @Override
            public void flatMap(String element, Collector out) {
                String[] wordArr = element.split(",");
                for (String word : wordArr) {
                    out.collect(word);
                }
            }
        });
        //DataStream 下边为DataStream子类
        SingleOutputStreamOperator source = flatMap.map(new MapFunction() {
            @Override
            public String map(String value) {
                return value.toUpperCase();
            }
        });
        // 4.数据输出
        source.print();
        // 5.执行程序
        env.execute("flink-hello-world");
    }
}

执行main方法,flink已经开始监听netcat的socket数据了,此时我们在cmd里输入一些字符串

然后观察控制台的输出

我们可以看到,来自socket的字符串数据已经成功按照预期进行了处理

有界数据流demo

我们首先在本地新建一个test.txt文件,随便输入一些字符串

然后将上个demo中的加载数据源那一行代码替换为

DataStreamSource elementsSource = env.readTextFile("D:\test.txt");

执行main方法

数据处理成功!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/877161.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号