栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 数据挖掘与分析

Flink 核心编程:Sink

Flink 核心编程:Sink

和其他所有的计算框架一样,Flink 也有一些基础的开发步骤以及基础,核心的 API,从开发步骤的角度来讲,主要分为四大部分

转换算子可以把一个或多个 DataStream 转成一个新的 DataStream.程序可以把多个复杂 的转换组合成复杂的数据流拓扑。

Sink 有下沉的意思,在 Flink 中所谓的 Sink 其实可以表示为将数据存储起来的意思,也可以将范围扩大,表示将处理完的数据发送到指定的存储系统的输出操作。前面的 print 就是一个 sink。

一、数据写入 Kafka

代码如下:

package day04;

import bean.WaterSensor;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.util.Properties;

public class Flink01_Sink_Kafka {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 读取端口数据
        DataStreamSource socketDS = env.socketTextStream("localhost", 1111);

        // 处理数据
        SingleOutputStreamOperator mapDS = socketDS.map((MapFunction) value -> {
            String[] els = value.split(" ");
            return JSON.toJSONString(new WaterSensor(els[0], Long.parseLong(els[1]), Double.parseDouble(els[2])));
        }).returns(Types.STRING);

        // 将数据写入 Kafka
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.225:9092");
        properties.setProperty(ProducerConfig.ACKS_CONFIG, "-1");
        mapDS.addSink(new FlinkKafkaProducer("test", new SimpleStringSchema(), properties));

        env.execute();

    }
}

与 Kafka Source 几乎一样。

二、数据写入 Redis

redis 在写入 hash 结构的数据需要注意,作为 key-value 结构的数据,string、set、list这些只需要一个 key,hash 结构的数据需要额外提供一个 field 即:key-(field-value)

代码如下:

package day04;

import bean.WaterSensor;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

public class Flink02_Sink_Redis {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 读取端口数据
        DataStreamSource socketDS = env.socketTextStream("localhost", 1111);

        // 处理数据
        SingleOutputStreamOperator mapDS = socketDS.map((MapFunction) value -> {
            String[] els = value.split(" ");
            return new WaterSensor(els[0], Long.parseLong(els[1]), Double.parseDouble(els[2]));
        }).returns(Types.POJO(WaterSensor.class));

        // 将数据写入 Redis
        FlinkJedisPoolConfig jedisPoolConfig = new FlinkJedisPoolConfig.Builder()
                .setHost("localhost")
                .setPort(6379)
                .build();


        mapDS.addSink(new RedisSink<>(jedisPoolConfig, new RedisMapper() {
            @Override
            public RedisCommandDescription getCommandDescription() {
                return new RedisCommandDescription(RedisCommand.HSET, "Sensor");
            }

            @Override
            public String getKeyFromData(WaterSensor data) {
                return data.getId();
            }

            @Override
            public String getValueFromData(WaterSensor data) {
                return Double.toString(data.getVc());
            }
        }));

        // 执行任务
        env.execute();
    }
}

对于非 hash 结构的数据 getKeyFromData、getValueFromData 即指定 key 和 value,getCommandDescription 则指定插入数据的命令,对于 hash 结构的数据则是 getCommandDescription 里指定 key,getKeyFromData 指定 field。

三、数据写入 ES

代码如下:

package day04;

import bean.WaterSensor;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.xcontent.XContentType;

import java.util.Collections;
import java.util.List;

public class Flink03_Sink_ES {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 读取端口数据
        DataStreamSource socketDS = env.socketTextStream("localhost", 1111);

        // 处理数据
        SingleOutputStreamOperator mapDS = socketDS.map((MapFunction) value -> {
            String[] els = value.split(" ");
            return new WaterSensor(els[0], Long.parseLong(els[1]), Double.parseDouble(els[2]));
        }).returns(Types.POJO(WaterSensor.class));

        // 将数据写入 ES
        List hosts = Collections.singletonList(new HttpHost("localhost", 9200));

        ElasticsearchSink.Builder elasticsearchSink = new ElasticsearchSink.Builder<>(hosts, new ElasticsearchSinkFunction() {
            @Override
            public void process(WaterSensor element, RuntimeContext ctx, RequestIndexer indexer) {
                IndexRequest request = Requests
                        .indexRequest("sensor")
                        .id(element.getId())
                        .source(JSON.toJSONString(element), XContentType.JSON);
                indexer.add(request);
            }
        });

        elasticsearchSink.setBulkFlushMaxActions(1);


        mapDS.addSink(elasticsearchSink.build());

        env.execute();
    }
}

对于流式数据需要设置批量提交的周期

// 按操作次数
elasticsearchSink.setBulkFlushMaxActions(1);
// 按时间,单位:秒
elasticsearchSink.setBulkFlushInterval(1);
// 按数据量
elasticsearchSink.setBulkFlushMaxSizeMb(100);
四、数据写入 RDB

flink 最新版本提供 JDBC 形式的 sink,之前数据若要写入 MySQL 这类关系型数据库还是需要自定义 sink 的。代码如下:

package day04;

import bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.sql.PreparedStatement;
import java.sql.SQLException;

public class Flink04_Sink_JDBC {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 读取端口数据
        DataStreamSource socketDS = env.socketTextStream("localhost", 1111);

        // 处理数据
        SingleOutputStreamOperator mapDS = socketDS.map((MapFunction) value -> {
            String[] els = value.split(" ");
            return new WaterSensor(els[0], Long.parseLong(els[1]), Double.parseDouble(els[2]));
        }).returns(Types.POJO(WaterSensor.class));

        // 将数据写入 MySQL 中
        String sql = "insert into encryption.sensor(id,ts,vc) values(?,?,?)";

        mapDS.addSink(JdbcSink.sink(sql,
                (ps, waterSensor) -> {
                    ps.setString(1, waterSensor.getId());
                    ps.setLong(2, waterSensor.getTs());
                    ps.setDouble(3, waterSensor.getVc());
                }, JdbcExecutionOptions.builder().withBatchSize(1).build()
                , new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withDriverName("com.mysql.cj.jdbc.Driver")
                        .withUrl("jdbc:mysql://这是一个不愿意透露域名的云数据库地址:3306")
                        .withUsername("这是一个隐藏的用户")
                        .withPassword("这是一个隐藏的密码")
                        .build()));

        env.execute();
    }
}

同理也是需要指定批次提交的周期

// 按数据量
JdbcExecutionOptions.builder().withBatchSize(int size);
// 按时间
JdbcExecutionOptions.builder().withBatchIntervalMs(long intervalMs);
// 按次数
JdbcExecutionOptions.builder().withMaxRetries(int maxRetries);
五、自定义 Sink

代码如下:

package day04;

import bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

public class Flink05_Sink_Custom {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 读取端口数据
        DataStreamSource socketDS = env.socketTextStream("localhost", 1111);

        // 处理数据
        SingleOutputStreamOperator mapDS = socketDS.map((MapFunction) value -> {
            String[] els = value.split(" ");
            return new WaterSensor(els[0], Long.parseLong(els[1]), Double.parseDouble(els[2]));
        }).returns(Types.POJO(WaterSensor.class));

        // 自定义 Sink 写入 Mysql 中
        mapDS.addSink(new MySQLRichSinkFunction("jdbc:mysql://这是一个不愿意透露域名的云数据库地址:3306", "这是一个隐藏的用户", "这是一个隐藏的密码"));

        env.execute();
    }

    private static class MySQLRichSinkFunction extends RichSinkFunction {
        private final String url;
        private final String userName;
        private final String password;
        private Connection con;
        private PreparedStatement ps;

        public MySQLRichSinkFunction(String url, String userName, String password) {
            this.url = url;
            this.userName = userName;
            this.password = password;
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            con = DriverManager.getConnection(url, userName, password);
            ps = con.prepareStatement("insert into encryption.sensor(id,ts,vc) values(?,?,?)");
        }

        @Override
        public void close() throws Exception {
            if (ps != null) {
                ps.close();
            }
            if (con != null) {
                con.close();
            }
        }

        @Override
        public void invoke(WaterSensor value, Context context) throws Exception {
            ps.setString(1, value.getId());
            ps.setLong(2, value.getTs());
            ps.setDouble(3, value.getVc());
            ps.execute();
        }
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/278726.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号