栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink流处理——Sink

Flink流处理——Sink

1:Kafka

    org.apache.flink
    flink-connector-kafka-0.11_2.12
    1.10.1
package com.atguigu.sink;

import com.atguigu.bean.SensorReading;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;

public class SinkTest1_Kafka {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        String path ="D:\大数据组件API\Flink\Flink01\src\main\resources\test.txt";
        DataStreamSource dataStream = env.readTextFile(path);

        DataStream inputStream = dataStream.map(value -> {
            String[] split = value.split(",");
            return new SensorReading(split[0], new Long(split[1]), new Double(split[2])).toString();
        });

        DataStreamSink second = inputStream.addSink(new FlinkKafkaProducer011(
                "hadoop112:9092", "second", new SimpleStringSchema()));
        

        env.execute();
    }
}
2:redis

    org.apache.bahir
    flink-connector-redis_2.11
    1.0
package com.atguigu.sink;

import com.atguigu.bean.SensorReading;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

public class SinkTest1_Redis {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        String path ="D:\大数据组件API\Flink\Flink01\src\main\resources\test.txt";
        DataStreamSource dataStream = env.readTextFile(path);

        DataStream inputStream = dataStream.map(value -> {
            String[] split = value.split(",");
            return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));
        });

        FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
                .setHost("hadoop112")
                .setPort(6379)
                .build();

        inputStream.addSink(new RedisSink<>(config, new RedisMapper() {
            @Override
            public RedisCommandDescription getCommandDescription() {

                return new RedisCommandDescription(RedisCommand.HSET, "sensor_tempe");
            }

            @Override
            public String getKeyFromData(SensorReading data) {
                return data.getId();
            }

            @Override
            public String getValueFromData(SensorReading data) {
                return data.getTemperature().toString();
            }
        }));

//        inputStream.addSink(new RedisSink(config, new MyRedisMapper()) );

        env.execute();
    }

    public static class MyRedisMapper implements RedisMapper {
        // 保存到 redis 的命令,存成哈希表
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET, "sensor_tempe");
        }
        public String getKeyFromData(SensorReading data) {
            return data.getId();
        }
        public String getValueFromData(SensorReading data) {
            return data.getTemperature().toString();
        }
    }
}
3:ES

    org.apache.flink
    flink-connector-elasticsearch6_2.12
    1.10.1
package com.atguigu.sink;

import com.atguigu.bean.SensorReading;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkbase;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.util.ArrayList;
import java.util.HashMap;

public class SinkTest1_ES {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        String path ="D:\大数据组件API\Flink\Flink01\src\main\resources\test.txt";
        DataStreamSource dataStream = env.readTextFile(path);

        DataStream inputStream = dataStream.map(value -> {
            String[] split = value.split(",");
            return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));
        });


        // es 的 httpHosts 配置
        ArrayList httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("hadoop112",9200));

        ElasticsearchSink elasticsearchSink = new ElasticsearchSink.Builder<>(httpHosts, new MyEsSink()).build();
        inputStream.addSink(elasticsearchSink);


        env.execute();
    }

    public static class MyEsSink implements ElasticsearchSinkFunction{

        @Override
        public void process(SensorReading element, RuntimeContext ctx, RequestIndexer indexer) {
            HashMap hashMap = new HashMap<>();

            hashMap.put("id",element.getId());
            hashMap.put("temp",element.getTemperature().toString());
            hashMap.put("ts",element.getTimestamp().toString());

            IndexRequest indexRequest = Requests.indexRequest()
                    .index("senor")
                    .type("info")
                    .source(hashMap);
            
            indexer.add(indexRequest);


        }
    }

}
4:JDBC自定义Sink

    mysql
    mysql-connector-java
    5.1.47
package com.atguigu.sink;

import com.atguigu.bean.SensorReading;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.ArrayList;

public class SinkTest1_JDBC {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        String path ="D:\大数据组件API\Flink\Flink01\src\main\resources\test.txt";
        DataStreamSource dataStream = env.readTextFile(path);

        DataStream inputStream = dataStream.map(value -> {
            String[] split = value.split(",");
            return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));
        });

        inputStream.addSink(new MyJdbc());

        env.execute();
    }

    public static class MyJdbc extends RichSinkFunction {

        Connection conn = null;
        String sql = "INSERT INTO `firstorder`(`phone_no`,`is_new`) VALUES(?,?)";
        String sql2 = "Update `firstorder` set is_new = ? where phone_no = ?";
        PreparedStatement insertStmt;
        PreparedStatement updateStmt;

        @Override
        public void open(Configuration parameters) throws Exception {
            conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test","root","123456");
            insertStmt = conn.prepareStatement(sql);
            updateStmt=conn.prepareStatement(sql2);
        }

        @Override
        public void invoke(SensorReading value, Context context) throws Exception {
            updateStmt.setString(1,value.getTemperature().toString());
            updateStmt.setString(2,value.getId());

            updateStmt.executeUpdate();

            if(updateStmt.getUpdateCount() == 0){
                insertStmt.setString(1,value.getId());
                insertStmt.setString(2,value.getTemperature().toString());
                insertStmt.execute();
            }
        }

        @Override
        public void close() throws Exception {
            insertStmt.close();
            updateStmt.close();
            conn.close();
        }
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/467374.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号