栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink流处理API

Flink流处理API

Flink 流处理API

尚硅谷Java版Flink

1 Environment执行环境 1.1 getExecutionEnvironment

​ 创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。

1.2 createLocalEnvironment

​ 可以直接在本地创建一个执行环境,有一个参数是用来设置执行环境的并行度;

1.3 createLocalEnvironmentWithWebUI(new Configuration())

​ 在本地的web端直接创建一个执行环境,这样的好处是在env运行的时候可以直接在网络端口查看代码的运行情况,网络地址为localhost:8081;

1.3createRemoteEnvironment

​ 返回集群执行环境,将Jar提交到远程服务器。需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包。

// 代码样例EnironmentDemo
package StudyFlink.DataStream.API.Environmen;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class getExecutionEnvironment {
    public static void main(String[] args) {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度
        env.setParallelism(1);

        // 返回本地的执行环境,需要在调用时指定默认的并行度
        LocalStreamEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();
        // 创建一个本地的web执行环境,localhost:8081;可以在线查看jar包的执行情况;
        StreamExecutionEnvironment localEnvWithWebUI = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        // 返回集群的执行环境,将Jar包提交到远程服务器。需要在调用时指定JobManager的Ip和端口号,并指定要在集群中运行的Jar包
        StreamExecutionEnvironment removteEnv = StreamExecutionEnvironment.createRemoteEnvironment("jobmanager-hostname", 6123, "YOURPATH//wordcount.jar");
    }
}

2 Source读取数据源

定义一个POJO类;

package StudyFlink.DataStream.API.Beans;

public class SensorReading {
    // 属性
    private String id;
    private Long timeStamp;
    private Double temperature;
	// 无参构造方法
    public SensorReading() {
    }
	// 有参构造方法
    public SensorReading(String id, Long timeStamp, Double temperature) {
        this.id = id;
        this.timeStamp = timeStamp;
        this.temperature = temperature;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public Long getTimeStamp() {
        return timeStamp;
    }

    public void setTimeStamp(Long timeStamp) {
        this.timeStamp = timeStamp;
    }

    public Double getTemperature() {
        return temperature;
    }

    public void setTemperature(Double temperature) {
        this.temperature = temperature;
    }

    @Override
    public String toString() {
        return "SensorReading{" +
                "id='" + id + ''' +
                ", timeStamp=" + timeStamp +
                ", temperature=" + temperature +
                '}';
    }
}

2.1 从集合中读取数据
package StudyFlink.DataStream.API.Source;

import StudyFlink.DataStream.API.Beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;

public class collection {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 从集合中读取数据
        DataStreamSource dataSource = env.fromCollection(Arrays.asList(
                new SensorReading("01", 10000L, 36.2),
                new SensorReading("02", 10001L, 36.3)
        ));

        // 直接从元素中获取
        DataStreamSource elementSource = env.fromElements(1, 2, 3, 4, 5);
        
        // 输出的print可以传递参数;
        dataSource.print("SensorReading输出");
        elementSource.print("整数输出");
        // 执行任务,jobName
        env.execute("SourceJob");
    }
}

2.2 从文件中读取数据
// sensor.txt
"01",10000,33.6
"02",10001,33.4
"03",10002,33.9
"01",10003,33.0
"02",10004,33.8
package StudyFlink.DataStream.API.Source;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class fromFile {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 从文件中读取,但是这个读取之后的结果是一个字符串,需要在后续的操作中进行map操作
        DataStreamSource dataSource = env.readTextFile("resources\sensor.txt");

        dataSource.print();
        env.execute();
    }
}

2.3 从Kafka消息队列中读取数据

首先需要去启动zookeeper,然后启动kafka;

package StudyFlink.DataStream.API.Source;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

import java.util.Properties;

public class kafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 设置与kafka之间的连接
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "consumer-group");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");

        // 从kafka中获取消息队列
        // 因为kafka是一个外部组件,获取消息必须通过addSource连接器连接;
        // FlinkKafkaConsumer消费者,需要传递的参数:主题String、反序列化的方法、和Properties
        DataStreamSource dataSource = env.addSource(new FlinkKafkaConsumer011("sensor", new SimpleStringSchema(), properties));
        
        dataSource.print();
        env.execute();
    }
}

2.4 自定义Source

除了以上的source数据来源,我们还可以自定义source。需要做的,只是传入一个SourceFunction就可以。

// 希望可以随机生成传感器的温度数据
package StudyFlink.DataStream.API.Source;

import StudyFlink.DataStream.API.Beans.SensorReading;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.expressions.Rand;

import java.util.HashMap;
import java.util.Random;

public class mySensor {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        DataStreamSource dataStream = env.addSource(new MySensorSource());

        dataStream.print();

        env.execute();
    }

    // 实现自定义的SourceFunction,他需要实现SourceFunction函数
    private static class MySensorSource implements SourceFunction {
        // 定义一个标识位,用来控制数据的产生
        private boolean running = true;
        @Override
        public void run(SourceContext ctx) throws Exception {
            // 定义一个随机数生成器
            Random random = new Random();
            // 设置10个传感器的初始温度;
            HashMap sensorTempMap = new HashMap<>();
            for (int i = 0; i < 10; i++) {
                // 收集传感器的名称和一个高斯分布的随机初始温度值
                sensorTempMap.put("sensor_"+(i+1),60+random.nextGaussian()*20);
            }
            // 参数ctx是一个收集器;
            while (running){
                for (String sensorId : sensorTempMap.keySet()) {
                    // 在当前温度的基础上随机波动
                    double newTemp = sensorTempMap.get(sensorId) + random.nextGaussian();
                    sensorTempMap.put(sensorId,newTemp);
                    // 收集新的数据
                    ctx.collect(new SensorReading(sensorId,System.currentTimeMillis(),newTemp));
                }
                // 控制输出频率
                Thread.sleep(1000L);
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }
}

3 Transform 转换算子

基本转换算子的定义:作用在数据流中的每一条单独的数据上的算子。

基本转换算子会针对流中的每一个单独的事件做处理,也就是说每一个输入数据会产生一个输出数据。

单值转换,数据的分割,数据的过滤,都是基本转换操作的典型例子。

3.1 map

来一个处理一个,中间只是做一些简单的处理;一对一进行处理;可以转换数据类型;

例如,将获取的温度传感器的字符串数据转换成sensorReading的格式;

3.2flatMap

和Map最大的不同之处就是flatMap是一对多的输出,而Map函数是一对一的输出;

flatmap是将数据打散,进行一些拆分操作;例如对输入的String字符串按照某种个数做拆分然后再进行输出;

输入为一个,输出为多个,中间使用收集器Collector进行收集数据;

例如,在wordCount案例当中,每次从socket中获取的字符串有可能是多个,需要按照逗号将其分开,然后利用Collector收集器收集分隔之后的单词,这样的过程就是一个输入是一个字符串,输出却是多个字符串的过程;

3.3 Filter

对数据按照一定的规则做一些筛选和过滤;例如,传感器的温度当中筛选出高于一定温度的数据,对这部分数据进行输出;

filter不能改变数据类型;

//E:FlinkDataFromMyself_ExcelDatasrcmainjavaStudyFlinkDataStreamAPItransformbase_transform.java
package StudyFlink.DataStream.API.transform;

import StudyFlink.DataStream.API.Beans.SensorReading;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;

import java.util.HashMap;
import java.util.Random;


public class base_transform {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(1);

        // 自定义数据源
        DataStreamSource dataStream = env.readTextFile("E:\Flink\DataFromMyself_ExcelData\src\main\resources\sensor.txt");

        // 基本转换:map
        // 将读取的字符串,转换成该字符串的长度并进行输出
        DataStream mapStream = dataStream.map(new MapFunction() {
            @Override
            public Integer map(String value) throws Exception {
                // value是输入数据
                // 输出是返回一个Integer类型的数
                return value.length();
            }
        });
        

        // 基本转换:flatMap
        // 按照逗号切分字段,返回逗号分隔之后的每一个String
        SingleOutputStreamOperator flatMapStream = dataStream.flatMap(new FlatMapFunction() {
            @Override
            public void flatMap(String value, Collector out) throws Exception {
                // 该函数的返回值是void
                // 使用out进行收集数据
                String[] fields = value.split(",");
                for (String field : fields) {
                    out.collect(field);
                }
            }
        });

        // 基本转换:filter
        // 过滤:条件的筛选,例如筛选sensor_1开头的数据
        // filter不能改变数据类型
        SingleOutputStreamOperator filterStream = dataStream.filter(new FilterFunction() {
            @Override
            public boolean filter(String value) throws Exception {
                return value.startsWith("sensor_1");
            }
        });

        // 打印输出
        mapStream.print();

        // 执行
        env.execute();
    }

    private static class MySensorFunction implements SourceFunction {
        // 创建一个数据产生的标识位
        private boolean running = true;
        @Override
        public void run(SourceContext ctx) throws Exception {
            // 定义一个随机数生成器
            Random random = new Random();
            // 设定传感器温度的初始值
            HashMap sensorMap = new HashMap<>();
            for (int i = 0; i < 10; i++) {
                // 设定传感器的名称和初始温度
                sensorMap.put("sensor_"+i,random.nextGaussian()*20+60);
            }
            while (running){
                for (String sensorId : sensorMap.keySet()) {
                    // 传感器的温度在当前的基础上进行随机波动
                    double newTemp = random.nextGaussian() + sensorMap.get(sensorId);
                    sensorMap.put(sensorId,newTemp);
                    // 收集当前数据
                    ctx.collect(new SensorReading(sensorId,System.currentTimeMillis(),newTemp));
                }
                // 控制输出频率
                Thread.sleep(1000L);
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }
}

键控流转换算子 3.4 KeyBy

​ 很多流处理程序的一个基本要求就是要能对数据进行分组,分组后的数据共享某一个相同的属性。 DataStream API 提供了一个叫做 KeyedStream 的抽象,此抽象会从逻辑上对 DataStream 进行分区,分区后的数据拥有同样的 Key 值,分区后的流互不相关。

​ 针对 KeyedStream 的状态转换操作可以读取数据或者写入数据到当前事件 Key 所对应的状态中。这表明拥有同样 Key 的所有事件都可以访问同样的状态,也就是说所以这些事件可以一起处理。
​ KeyedStream 可以使用 map, flatMap 和 filter 算子来处理。接下来我们会使用 keyBy 算子来将 DataStream 转换成KeyedStream,并讲解基于 key 的转换操作:滚动聚合和 reduce算子。

​ DataStream→ KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现的。将所有相同的key的流放在同一个分区当中,但是一个分区中不一定只有一个key。

​ 虽然KeyedStream仍然是一个DataStream,但是基于KeyedStream所调用的api和基于DataStream所能够调用的api是不一样的;KeyedStream之后可以做很多的聚合操作;

3.5 滚动聚合算子(Rolling Aggregation)

​ 滚动聚合算子由 KeyedStream 调用,并生成一个聚合以后的 DataStream,例如: sum,minimum, maximum。一个滚动聚合算子会为每一个观察到的 key 保存一个聚合的值。针对每一个输入事件,算子将会更新保存的聚合结果,并发送一个带有更新后的值的事件到下游算子。滚动聚合不需要用户自定义函数,但需要接受一个参数,这个参数指定了在哪一个字段上面做聚合操作。

DataStream API 提供了以下滚动聚合方法。 这些算子可以针对KeyedStream的每一个支流做聚合。

  • sum() 针对当前key所对应的数据做求和操作;
  • min() 针对当前key所对应的数据取最小值;
  • max() 针对当前key所对应的数据取最大值;
  • minBy() 取最小值的同时还会保留该条流数据的别的信息,例如取温度的最小值,同时还会保留传感器标签和时间戳,数据流print输出的时候所带有的信息是该温度值所对应的相关信息;
  • maxBy() 取最大值的同时还会保留该条流数据的别的信息;

滚动聚合算子无法组合起来使用,每次计算只能使用一个单独的滚动聚合算子。

package StudyFlink.DataStream.API.transform;

import StudyFlink.DataStream.API.Beans.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class RollingAggregation {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(1);

        // 自定义数据源
        DataStreamSource dataStream = env.readTextFile("E:\Flink\DataFromMyself_ExcelData\src\main\resources\sensor.txt");

        // 将字符串数据转换成sensorReading
        SingleOutputStreamOperator mapStream = dataStream.map(new MapFunction() {
            @Override
            public SensorReading map(String value) throws Exception {
                String[] fields = value.split(",");
                return new SensorReading(fields[0], Long.valueOf(fields[1]), Double.valueOf(fields[2]));
            }
        });

        // lambda表达式实现map函数
        SingleOutputStreamOperator lambdaStream = dataStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], Long.valueOf(fields[1]), Double.valueOf(fields[2]));
        });

        // keyBy分组
        // 返回的Tuple是因为keyby分组可以传递多个参数,这时候就是用元祖包装的Tuple
        KeyedStream keyedStream = mapStream.keyBy("id");

        // 聚合操作
        SingleOutputStreamOperator maxStream = keyedStream.maxBy("temperature");
        SingleOutputStreamOperator maxByStream = keyedStream.maxBy("temperature");

        // 打印输出
        keyedStream.print();

        // 执行
        env.execute();
    }
}

3.6 Reduce

​ DataStream→ KeyedStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。

​ reduce 算子是滚动聚合的泛化实现。它将一个 ReduceFunction 应用到了一个 KeyedStream 上面去。 reduce 算子将会把每一个输入事件和当前已经 reduce 出来的值做聚合计算。 reduce 操作不会改变流的事件类型。输出流数据类型和输入流数据类型是一样的。
​ reduce 函数可以通过实现接口 ReduceFunction 来创建一个类。 ReduceFunction 接口定义了 reduce() 方法,此方法接收两个输入事件,输出一个相同类型的事件。

package StudyFlink.DataStream.API.Transform;

import StudyFlink.DataStream.API.Beans.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class reduce {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(1);

        // 自定义数据源
        DataStreamSource dataStream = env.readTextFile("E:\Flink\DataFromMyself_ExcelData\src\main\resources\sensor.txt");

        // 将字符串数据转换成sensorReading
        SingleOutputStreamOperator mapStream = dataStream.map(new MapFunction() {
            @Override
            public SensorReading map(String value) throws Exception {
                String[] fields = value.split(",");
                return new SensorReading(fields[0], Long.valueOf(fields[1]), Double.valueOf(fields[2]));
            }
        });
        // 分组
        KeyedStream keyedStream = mapStream.keyBy("id");

        // reduce聚合,取最大的温度值,以及当前最新的时间戳
        // 数据类型不做转换
        SingleOutputStreamOperator reduceStream = keyedStream.reduce(new ReduceFunction() {
            @Override
            public SensorReading reduce(SensorReading value1, SensorReading value2) throws Exception {
                return new SensorReading(value1.getId(), value2.getTimeStamp(), Math.max(value1.getTemperature(), value1.getTemperature()));
            }
        });

        // lambda表达式实现reduce函数
        SingleOutputStreamOperator reduceStream2 = keyedStream.reduce((curState, newData) ->
                new SensorReading(curState.getId(), newData.getTimeStamp(), Math.max(curState.getTemperature(), newData.getTemperature())));

        // 打印输出
        reduceStream.print();

        // 执行
        env.execute();
    }
}
3.7 Split和Select

​ split按照一定的特征,把数据流根据不同的特点分开,然后通过select将split分流操作之后数据分开;

​ DataStream → SplitStream:根据某些特征把一个DataStream拆分成两个或者多个DataStream。

​ SplitStream→DataStream:从一个SplitStream中获取一个或者多个DataStream。

​ 例如,将传感器的温度值按照30℃分开,将高于30℃的设置为高温,低于30℃的设置为低温。

package StudyFlink.DataStream.API.Transform;

import StudyFlink.DataStream.API.Beans.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.expressions.Lower;

import java.security.acl.LastOwnerException;
import java.util.Collections;

public class MultipliesTransform {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(1);

        // 自定义数据源
        DataStreamSource dataStream = env.readTextFile("E:\Flink\DataFromMyself_ExcelData\src\main\resources\sensor.txt");

        // 将字符串数据转换成sensorReading
        SingleOutputStreamOperator mapStream = dataStream.map(new MapFunction() {
            @Override
            public SensorReading map(String value) throws Exception {
                String[] fields = value.split(",");
                return new SensorReading(fields[0], Long.valueOf(fields[1]), Double.valueOf(fields[2]));
            }
        });

        // split分流操作,30℃为界;
        SplitStream splitStream = mapStream.split(new OutputSelector() {
            @Override
            public Iterable select(SensorReading value) {
                // Iterable字符串类型的迭代器,可以包含多个标签
                // split之后会改数据流上面盖戳,标志分类之后的数据类型
                // 返回的数据类型是一个可以迭代的类型,可以使用集合或者Collections
                if (value.getTemperature() > 30) {
                    return Collections.singletonList("High");
                } else {
                    return Collections.singletonList("Lower");
                }
            }
        });

        // select选择分流
        DataStream highStream = splitStream.select("High");
        DataStream lowerStream = splitStream.select("Lower");

        // 打印输出
        highStream.print();
        lowerStream.print();

        // 执行
        env.execute();
    }
}
3.8Connect和CoMap

​ DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。

​ 数据类型可以不同。

​ ConnectedStreams → DataStream:作用于ConnectedStreams上,功能与map和flatMap一样,对ConnectedStreams中的每一个Stream分别进行map和flatMap处理。

3.9 Union

​ DataStream → DataStream:对两个或者两个以上的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream,但是要求两条流是同样的数据类型。

package StudyFlink.DataStream.API.Transform;

import StudyFlink.DataStream.API.Beans.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;

import java.util.Collections;

public class UnionTransform {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(1);

        // 自定义数据源
        DataStreamSource dataStream = env.readTextFile("E:\Flink\DataFromMyself_ExcelData\src\main\resources\sensor.txt");

        // 将字符串数据转换成sensorReading
        SingleOutputStreamOperator mapStream = dataStream.map(new MapFunction() {
            @Override
            public SensorReading map(String value) throws Exception {
                String[] fields = value.split(",");
                return new SensorReading(fields[0], Long.valueOf(fields[1]), Double.valueOf(fields[2]));
            }
        });

        // split分流操作,30℃为界;
        SplitStream splitStream = mapStream.split(new OutputSelector() {
            @Override
            public Iterable select(SensorReading value) {
                // Iterable字符串类型的迭代器,可以包含多个标签
                // split之后会改数据流上面盖戳,标志分类之后的数据类型
                // 返回的数据类型是一个可以迭代的类型,可以使用集合或者Collections
                if (value.getTemperature() > 30) {
                    return Collections.singletonList("High");
                } else {
                    return Collections.singletonList("Lower");
                }
            }
        });

        // select选择分流
        DataStream highStream = splitStream.select("High");
        DataStream lowerStream = splitStream.select("Lower");

        // connect合流
        // 首先将高温流转换成二元组类型,与低温流连接合并之后,输出状态信息
        SingleOutputStreamOperator> warningStream = highStream.map(new MapFunction>() {
            @Override
            public Tuple2 map(SensorReading value) throws Exception {
                return new Tuple2<>(value.getId(), value.getTemperature());
            }
        });

        // 基于warningStream合并lowerStream
        // connect的返回值有两个泛型,分别对应输入流中的两个数据类型
        // 两条流的数据类型可以不相同
        ConnectedStreams, SensorReading> connectStream = warningStream.connect(lowerStream);

        // 然后再调用CoMapFunction合并两条流里面的信息
        SingleOutputStreamOperator coMapStream = connectStream.map(new CoMapFunction, SensorReading, Object>() {
            @Override
            public Object map1(Tuple2 value) throws Exception {
                return new Tuple3<>(value.f0, value.f1, "High Temperature");
            }

            @Override
            public Object map2(SensorReading value) throws Exception {
                return new Tuple2<>(value.getId(), value.getTemperature());
            }
        });
        
        // union合并数据流,要求两条流的数据类型相同
        DataStream unionStream = highStream.union(lowerStream);

        // 打印输出
        highStream.print();
        lowerStream.print();

        // 执行
        env.execute();
    }
}

 
分布式转换算子 

​ 分区操作对应于我们之前讲过的“数据交换策略”这一节。这些操作定义了事件如何分配到不同的任务中去。当我们使用 DataStream API 来编写程序时,系统将自动的选择 数据分区策略,然后根据操作符的语义和设置的并行度将数据路由到正确的地方去。有些时候,我们需要在应用程序的层面控制分区策略,或者自定义分区策略。例如,如果我们知道会发生数据倾斜,那么我们想要针对数据流做负载均衡,将数据流平均发送到接下来的操作符中去。又或者,应用程序的业务逻辑可能需要一个算子所有的并行任务都需要接收同样的数据。再或者,我们需要自定义分区策略的时候。在这一小节,我们将展示 DataStream 的一些方法,可以使我们来控制或者自定义数据分区策略。

keyBy() 方法不同于分布式转换算子。所有的分布式转换算子将产生 DataStream 数据类型。而 keyBy() 产生的类型是 KeyedStream,它拥有自己的 keyed state。

3.10 Random

​ 随机数据交换由 DataStream.shuffle()方法实现。 shuffle方法将数据随机的分配到下游算子的并行任务中去。

3.11 Round-Robin

​ rebalance() 方法使用 Round-Robin 负载均衡算法将输入流平均分配到随后的并行运行的任务中去。

3.14 Rescale

​ rescale()方法使用的也是 round-robin算法,但只会将数据发送到接下来的并行运行的任务中的一部分任务中。本质上,当发送者任务数量和接收者任务数量不一样时, rescale分区策略提供了一种轻量级的负载均衡策略。如果接收者任务的数量是发送者任务的数量的倍数时, rescale 操作将会效率更高。

​ rebalance() 和 rescale() 的根本区别在于任务之间连接的机制不同。 rebalance() 将会针对所有发送者任务和所有接收者任务之间建立通信通道,而 rescale() 仅仅针对每一个任务和下游算子的一部分子并行任务之间建立通信通道。 rescale 的示意图如下。

3.15 Boradcast

​ broadcast() 方法将输入流的所有数据复制并发送到下游算子的所有并行任务中去。

3.16 Global

​ global() 方法将所有的输入流数据都发送到下游算子的第一个并行任务中去。这个操作需要很谨慎,因为将所有数据发送到同一个 task,将会对应用程序造成很大的压力。

3.17 Custom

​ 当 Flink 提供的分区策略都不适用时,我们可以使用 partitionCustom() 方法来自定义分区策略。这个方法接收一个 Partitioner 对象,这个对象需要实现分区逻辑以及定义针对流的哪一个字段或者 key 来进行分区。

// 数据传输过程中的重分区
package StudyFlink.DataStream.API.Transform;

import StudyFlink.DataStream.API.Beans.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Partition {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(4);

        // 从文件中读取数据
        DataStreamSource dataStream = env.readTextFile("E:\Flink\DataFromMyself_ExcelData\src\main\resources\sensor.txt");

        dataStream.print("input");

        // 1. shuffle操作,打乱分区
        DataStream shuffleStream = dataStream.shuffle();
        shuffleStream.print("shuffle");

        // 2.keyby
        // 将字符串数据转换成sensorReading
        SingleOutputStreamOperator mapStream = dataStream.map(new MapFunction() {
            @Override
            public SensorReading map(String value) throws Exception {
                String[] fields = value.split(",");
                return new SensorReading(fields[0], Long.valueOf(fields[1]), Double.valueOf(fields[2]));
            }
        });
        // keyby,会将相同分组的数据放在同一个分区里面
        KeyedStream keyedStream = mapStream.keyBy("id");
        keyedStream.print("keyby");

        // 3.global
        // 把所有的数据全部放在一个分组里面
        dataStream.global().print("global");

        env.execute();


    }
}

4 支持的数据类型

​ Flink流应用程序处理的是以数据对象表示的事件流。所以在Flink内部,我们需要能够处理这些对象。它们需要被序列化和反序列化,以便通过网络传送它们;或者从状态后端、检查点和保存点读取它们。为了有效地做到这一点,Flink需要明确知道应用程序所处理的数据类型。Flink使用类型信息的概念来表示数据类型,并为每个数据类型生成特定的序列化器、反序列化器和比较器。

​ Flink还具有一个类型提取系统,该系统分析函数的输入和返回类型,以自动获取类型信息,从而获得序列化器和反序列化器。但是,在某些情况下,例如lambda函数或泛型类型,需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。

​ Flink支持Java和Scala中所有常见数据类型。使用最广泛的类型有以下几种。

4.1 基础数据类型

​ Flink支持所有的Java和Scala基础数据类型,Int, Double, Long, String, …

4.2 java和scala元祖类型 4.3 Scala样例类(case classes) 4.4 java简单对象(POJOS) 4.5 其它(Arrays,List,Maps,Enums,等等)

​ Flink对Java和Scala中的一些特殊目的的类型也都是支持的,比如Java的ArrayList,HashMap,Enum等等。

5 实现UDF函数——更细粒度的控制流 5.1 函数类(Function Classes)

​ Flink暴露了所有udf函数的接口(实现方式为接口或者抽象类)。例如MapFunction, FilterFunction, ProcessFunction等等。

下面例子实现了自定义的输入数据源接口:

package StudyFlink.DataStream.API.Source;

import StudyFlink.DataStream.API.Beans.SensorReading;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.expressions.Rand;

import java.util.HashMap;
import java.util.Random;

public class mySensor {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
		
        // udf函数接口
        DataStreamSource dataStream = env.addSource(new MySensorSource());

        dataStream.print();

        env.execute();
    }
	
    // 实现了函数类接口
    // 实现自定义的SourceFunction,他需要实现SourceFunction函数
    private static class MySensorSource implements SourceFunction {
        // 定义一个标识位,用来控制数据的产生
        private boolean running = true;
        @Override
        public void run(SourceContext ctx) throws Exception {
            // 定义一个随机数生成器
            Random random = new Random();
            // 设置10个传感器的初始温度;
            HashMap sensorTempMap = new HashMap<>();
            for (int i = 0; i < 10; i++) {
                // 收集传感器的名称和一个高斯分布的随机初始温度值
                sensorTempMap.put("sensor_"+(i+1),60+random.nextGaussian()*20);
            }
            // 参数ctx是一个收集器;
            while (running){
                for (String sensorId : sensorTempMap.keySet()) {
                    // 在当前温度的基础上随机波动
                    double newTemp = sensorTempMap.get(sensorId) + random.nextGaussian();
                    sensorTempMap.put(sensorId,newTemp);
                    // 收集新的数据
                    ctx.collect(new SensorReading(sensorId,System.currentTimeMillis(),newTemp));
                }
                // 控制输出频率
                Thread.sleep(1000L);
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }
}

5.2 匿名函数类(Lambda Functions)
package StudyFlink.DataStream.API.Transform;

import StudyFlink.DataStream.API.Beans.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class reduce {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(1);

        // 自定义数据源
        DataStreamSource dataStream = env.readTextFile("E:\Flink\DataFromMyself_ExcelData\src\main\resources\sensor.txt");

        // 将字符串数据转换成sensorReading
        SingleOutputStreamOperator mapStream = dataStream.map(new MapFunction() {
            @Override
            public SensorReading map(String value) throws Exception {
                String[] fields = value.split(",");
                return new SensorReading(fields[0], Long.valueOf(fields[1]), Double.valueOf(fields[2]));
            }
        });
        // 分组
        KeyedStream keyedStream = mapStream.keyBy("id");

        // lambda表达式实现reduce函数
        SingleOutputStreamOperator reduceStream2 = keyedStream.reduce((curState, newData) ->
                new SensorReading(curState.getId(), newData.getTimeStamp(), Math.max(curState.getTemperature(), newData.getTemperature())));

        // 打印输出
        reduceStream.print();

        // 执行
        env.execute();
    }
}

5.3 富函数(Rich Functions)

​ “富函数”是DataStream API提供的一个函数类的接口,所有Flink函数类都有其Rich版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。

  • RichMapFunction
  • RichFlatMapFunction
  • RichFilterFunction

​ Rich Function有一个生命周期的概念。典型的生命周期方法有:

  • open()方法是rich function的初始化方法,当一个算子例如map或者filter被调用之前open()会被调用。
  • close()方法是生命周期中的最后一个调用的方法,做一些清理工作。
  • getRuntimeContext()方法提供了函数的RuntimeContext的一些信息,例如函数执行的并行度,任务的名字,以及state状态
package StudyFlink.DataStream.API.Transform;

import StudyFlink.DataStream.API.Beans.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class RichFunction {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(1);

        // 自定义数据源
        DataStreamSource dataStream = env.readTextFile("E:\Flink\DataFromMyself_ExcelData\src\main\resources\sensor.txt");

        // 将字符串数据转换成sensorReading
        SingleOutputStreamOperator mapStream = dataStream.map(new MapFunction() {
            @Override
            public SensorReading map(String value) throws Exception {
                String[] fields = value.split(",");
                return new SensorReading(fields[0], Long.valueOf(fields[1]), Double.valueOf(fields[2]));
            }
        });

        DataStream> resultStream = mapStream.map(new MyMapper());

        // 打印输出
        resultStream.print();

        // 执行
        env.execute();
    }
	// 实现普通的map函数类
    public static class MyMapper0 implements MapFunction>{
        @Override
        public Tuple2 map(SensorReading value) throws Exception {
            return new Tuple2<>(value.getId(),value.getId().length());
        }
    }

    // 实现自定义的富函数类
    // 富函数里面会有定义有关上下文信息的内容,并且有一个open和close的操作,在使用数据库连接的时候可以方便使用;
    public static class MyMapper extends RichMapFunction>{
        @Override
        public Tuple2 map(SensorReading value) throws Exception {
            // getRuntimeContext()获取运行时的上下文信息
            return new Tuple2<>(value.getId(),getRuntimeContext().getIndexOfThisSubtask());
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            // 初始化工作,一般是定义状态,或者跟外部数据库连接的时候在这里连接一次数据库
            super.open(parameters);
        }

        @Override
        public void close() throws Exception {
            // 关闭连接和清空状态的收尾工作
            super.close();
        }
    }
}

6 Sink

​ Flink没有类似于spark中foreach方法,让用户进行迭代的操作。虽有对外的输出操作都要利用Sink完成。最后通过类似如下方式完成整个任务最终输出操作。

stream.addSink(new MySink())

​ 官方提供了一部分的框架的sink。除此以外,需要用户自定义实现sink。

6.1 Kafka

pox.xml


    org.apache.flink
    flink-connector-kafka-0.11_2.11
    1.10.0

package StudyFlink.DataStream.API.sink;

import StudyFlink.DataStream.API.Beans.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;

import java.util.Properties;

public class kafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(1);

        // 设置与kafka之间的连接
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "consumer-group");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");

        // 从kafka中获取消息队列
        DataStreamSource dataStream = env.addSource(new FlinkKafkaConsumer011("sensor", new SimpleStringSchema(), properties));

        // 将字符串数据转换成sensorReading
        SingleOutputStreamOperator mapStream = dataStream.map(new MapFunction() {
            @Override
            public String map(String value) throws Exception {
                String[] fields = value.split(",");
                return new SensorReading(fields[0], Long.valueOf(fields[1]), Double.valueOf(fields[2])).toString();
            }
        });

        // 写入到kafka中
        // 因为前面已经做了toString的序列化操作了,所以在kafka序列化配置的时候直接写上new SimpleStringSchema()
        mapStream.addSink(new FlinkKafkaProducer011("localhost:9092","sinktest",new SimpleStringSchema()));

        dataStream.print();

        env.execute();
    }
}

启动一个kafka的消费者;

6.2 Redis

pox.xml


    org.apache.bahir
    flink-connector-redis_2.11
    1.0

package StudyFlink.DataStream.API.sink;

import StudyFlink.DataStream.API.Beans.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigbase;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

public class redis {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(1);

        // 自定义数据源
        DataStreamSource dataStream = env.readTextFile("E:\Flink\DataFromMyself_ExcelData\src\main\resources\sensor.txt");

        // 将字符串数据转换成sensorReading
        SingleOutputStreamOperator mapStream = dataStream.map(new MapFunction() {
            @Override
            public SensorReading map(String value) throws Exception {
                String[] fields = value.split(",");
                return new SensorReading(fields[0], Long.valueOf(fields[1]), Double.valueOf(fields[2]));
            }
        });

        // 定义redis连接配置
        FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
                .setHost("localhost")
                .setPort(6379)
                .build();
        mapStream.addSink(new RedisSink<>(config, new MyRedisMapper()));

        // 打印输出
        dataStream.print();

        // 执行
        env.execute();
    }

    // 自定义redisMapper
    public static class MyRedisMapper implements RedisMapper{
        // 定义保存数据到redis的命令,存成hashMap表,hset sensor_temp id temperature
        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET,"sensor_temp");
        }

        @Override
        public String getKeyFromData(SensorReading data) {
            return data.getId();
        }

        @Override
        public String getValueFromData(SensorReading data) {
            return data.getTemperature().toString();
        }
    }
}

6.3 Elasticsearch

pom.xml


    org.apache.flink
    flink-connector-elasticsearch6_2.11
    1.10.0

package StudyFlink.DataStream.API.sink;

import StudyFlink.DataStream.API.Beans.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkbase;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;

import java.util.ArrayList;
import java.util.HashMap;

public class ElasticSearch {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(1);

        // 自定义数据源
        DataStreamSource dataStream = env.readTextFile("E:\Flink\DataFromMyself_ExcelData\src\main\resources\sensor.txt");

        // 将字符串数据转换成sensorReading
        SingleOutputStreamOperator mapStream = dataStream.map(new MapFunction() {
            @Override
            public SensorReading map(String value) throws Exception {
                String[] fields = value.split(",");
                return new SensorReading(fields[0], Long.valueOf(fields[1]), Double.valueOf(fields[2]));
            }
        });

        // 定义es的连接配置
        ArrayList httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("localhost",9200));

        mapStream.addSink(new ElasticsearchSink.Builder(httpHosts,new MyEsSinkFunction()).build());

        // 打印输出
        dataStream.print();

        // 执行
        env.execute();
    }

    // 实现自定义的ES的写入操作
    public static class MyEsSinkFunction implements ElasticsearchSinkFunction{
        @Override
        public void process(SensorReading element, RuntimeContext ctx, RequestIndexer indexer) {
            // 定义写入的数据source
            HashMap dataSource = new HashMap<>();
            dataSource.put("id",element.getId());
            dataSource.put("temp",element.getTemperature().toString());
            dataSource.put("ts",element.getTimeStamp().toString());

            // 创建请求作为向es发起的命令
            IndexRequest indexRequest = Requests.indexRequest()
                    .index("sensor")
                    .type("redingdata")
                    .source(dataSource);

            // 用index发送请求
            indexer.add(indexRequest);
        }
    }
}

6.4 JDBC自定义sink

自定义写入mysql


    mysql
    mysql-connector-java
    5.1.44

E:FlinkDataFromMyself_ExcelDatasrcmainjavaStudyFlinkDataStreamAPIsinkmysql.java

package StudyFlink.DataStream.API.sink;

import StudyFlink.DataStream.API.Beans.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.http.HttpHost;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.ArrayList;

public class mysql {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(1);

        // 自定义数据源
        DataStreamSource dataStream = env.readTextFile("E:\Flink\DataFromMyself_ExcelData\src\main\resources\sensor.txt");

        // 将字符串数据转换成sensorReading
        SingleOutputStreamOperator mapStream = dataStream.map(new MapFunction() {
            @Override
            public SensorReading map(String value) throws Exception {
                String[] fields = value.split(",");
                return new SensorReading(fields[0], Long.valueOf(fields[1]), Double.valueOf(fields[2]));
            }
        });

        mapStream.addSink( new MySqlFunction());

        // 打印输出
        dataStream.print();

        // 执行
        env.execute();
    }

    // 实现自定义的SinkFunction
    private static class MySqlFunction extends RichSinkFunction {
        // 因为需要在open和invoke里面同时使用,所在首先在外边定义一下
        // 声明一些预编译器
        Connection connection = null;
        PreparedStatement insertStmt = null;
        PreparedStatement updateStmt = null;
        @Override
        public void open(Configuration parameters) throws Exception {
            // 建立数据库连接
            connection = DriverManager.getConnection("jdbc:mysql://localhost//3306/test","root","root");
            insertStmt = connection.prepareStatement("insert into sensor_temp(id,temp) values (?,?)");
            updateStmt = connection.prepareStatement("update sensor_temp set temp = ? where id = ?");
        }

        // 每来一条数据,调用链接,执行sql
        @Override
        public void invoke(SensorReading value, Context context) throws Exception {
            // 直接执行更新语句,如果没有更新就插入
            updateStmt.setDouble(1,value.getTemperature());
            updateStmt.setString(2,value.getId());
            updateStmt.execute();

            // 查看是否更新成功
            // 如果更新不成功,就执行插入
            if (updateStmt.getUpdateCount() == 0){
                insertStmt.setString(1,value.getId());
                insertStmt.setDouble(2,value.getTemperature());
                insertStmt.execute();
            }
        }

        @Override
        public void close() throws Exception {
            // 关闭所有连接
            insertStmt.close();
            updateStmt.close();
            connection.close();
        }
    }
}

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号