栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

大数据之flink数据一致性

大数据之flink数据一致性

一、flink分析结果写入redis

1、下载flink-hadoop整合包,放入所有节点

2、KafkaToRedisWordCount

package cn._51doit.flink.day08;

import cn._51doit.flink.day02.RedisSinkDemo;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.Collector;

import java.util.Properties;


public class KafkaToRedisWordCount {

    //--topic doit2021 --groupId g02 --redisHost node-3.51doit.cn 
	//--redisPwd 123456 --fsBackend hdfs://node-1.51doit.cn:9000/flinkck2021
    public static void main(String[] args) throws Exception{

        //System.setProperty("HADOOP_USER_NAME", "root");
        ParameterTool parameterTool = ParameterTool.fromArgs(args);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		 //可以间内存中的状态持久化到StateBackend
        env.enableCheckpointing(parameterTool.getLong("chkInterval", 30000));
        //设置状态存储的后端
        env.setStateBackend(new FsStateBackend(parameterTool.getRequired("fsBackend")));
        //如果你手动cancel job后,不删除job的checkpoint数据
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        //添加KafkaSource
        //设置Kafka相关参数
        Properties properties = new Properties();//设置Kafka的地址和端口
        properties.setProperty("bootstrap.servers", "node-1.51doit.cn:9092,node-1.51doit.cn:9092,node-1.51doit.cn:9092");
        //读取偏移量策略:如果没有记录偏移量,就从头读,如果记录过偏移量,就接着读
        properties.setProperty("auto.offset.reset", "earliest");
        //设置消费者组ID
        properties.setProperty("group.id", parameterTool.get("groupId"));
        //开启checkpoint,不然让flink的消费(source对他的subtask)自动提交偏移量
        properties.setProperty("enable.auto.commit", "false");
        //创建FlinkKafkaConsumer并传入相关参数
        FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(
                parameterTool.getRequired("topic"), //要读取数据的Topic名称
                new SimpleStringSchema(), //读取文件的反序列化Schema
                properties //传入Kafka的参数
        );
        //kafkaConsumer.setCommitOffsetsonCheckpoints(false); //设置在checkpoint是不将偏移量保存到kafka特殊的topic中,可设可不设

        //使用addSource添加kafkaConsumer
        DataStreamSource lines = env.addSource(kafkaConsumer);

        SingleOutputStreamOperator> wordAndOne = lines.flatMap(new FlatMapFunction>() {
            @Override
            public void flatMap(String line, Collector> collector) throws Exception {
                String[] words = line.split(" ");
                for (String word : words) {
                    //new Tuple2(word, 1)
                    collector.collect(Tuple2.of(word, 1));
                }
            }
        });

        //分组
        KeyedStream, String> keyed = wordAndOne.keyBy(t -> t.f0);

        //聚合
        SingleOutputStreamOperator> summed = keyed.sum(1);

        //将聚合后的结果写入到Redis中
        //调用Sink
        //summed.addSink()
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
                .setHost(parameterTool.getRequired("redisHost"))
                .setPassword(parameterTool.getRequired("redisPwd"))
                .setDatabase(9).build();

        summed.addSink(new RedisSink>(conf, new RedisSinkDemo.RedisWordCountMapper()));

        env.execute();


    }

    private static class RedisWordCountMapper implements RedisMapper> {

        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET, "WORD_COUNT");
        }

        @Override
        public String getKeyFromData(Tuple2 data) {
            return data.f0;
        }

        @Override
        public String getValueFromData(Tuple2 data) {
            return data.f1.toString();
        }
    }

}

备注:若redis挂了,flink继续写入数据,redis恢复,错过数据依旧写进来,因为;
取消flink, 不删除偏移量数据,重启后指定上次checkpoint,还能继续计算, 上面的案例就使用的这种方式或者使用savePoint,取消时手动保存。

二、从kafka读取数据,处理后写回kafka

package cn._51doit.flink.day09;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import java.util.Properties;


public class KafkaToKafka {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //开启checkpointing
        env.enableCheckpointing(30000);
        env.setStateBackend(new FsStateBackend("file:///Users/xing/Desktop/flinkck20210123"));

        //设置Kafka相关参数
        Properties properties = new Properties();//设置Kafka的地址和端口
        properties.setProperty("bootstrap.servers", "node-1.51doit.cn:9092,node-1.51doit.cn:9092,node-1.51doit.cn:9092");
        //读取偏移量策略:如果没有记录偏移量,就从头读,如果记录过偏移量,就接着读
        properties.setProperty("auto.offset.reset", "earliest");
        //设置消费者组ID
        properties.setProperty("group.id", "g1");
        //没有开启checkpoint,让flink提交偏移量的消费者定期自动提交偏移量
        properties.setProperty("enable.auto.commit", "false");
        //创建FlinkKafkaConsumer并传入相关参数
        FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(
                "doit2021", //要读取数据的Topic名称
                new SimpleStringSchema(), //读取文件的反序列化Schema
                properties //传入Kafka的参数
        );
        //使用addSource添加kafkaConsumer
        kafkaConsumer.setCommitOffsetsOnCheckpoints(false); //在checkpoint时,不将偏移量写入到kafka特殊的topic中

        DataStreamSource lines = env.addSource(kafkaConsumer);

        SingleOutputStreamOperator filtered = lines.filter(e -> !e.startsWith("error"));

        //使用的是AtLeastOnce
//        FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer<>(
//                "node-1.51doit.cn:9092,node-2.51doit.cn:9092,node-3.51doit.cn:9092", "out2021", new SimpleStringSchema()
//        );


        //写入Kafka的topic
        String topic = "out2021";
        //设置Kafka相关参数
        properties.setProperty("transaction.timeout.ms",1000 * 60 * 5 + "");

        //创建FlinkKafkaProducer
        FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer(
                topic, //指定topic
                new KafkaStringSerializationSchema(topic), //指定写入Kafka的序列化Schema
                properties, //指定Kafka的相关参数
                FlinkKafkaProducer.Semantic.EXACTLY_onCE //指定写入Kafka为EXACTLY_ONCE语义
        );

        filtered.addSink(kafkaProducer);

        env.execute();


    }
}

2、定义KafkaStringSerializationSchema

package cn._51doit.flink.day09;

import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;
import java.nio.charset.Charset;


public class KafkaStringSerializationSchema implements KafkaSerializationSchema {

    private String topic;
    private String charset;
    //构造方法传入要写入的topic和字符集,默认使用UTF-8
    public KafkaStringSerializationSchema(String topic) {
        this(topic, "UTF-8");
    }
    public KafkaStringSerializationSchema(String topic, String charset) {
        this.topic = topic;
        this.charset = charset;
    }
    //调用该方法将数据进行序列化
    @Override
    public ProducerRecord serialize(String element, @Nullable Long timestamp) {
        //将数据转成bytes数组
        byte[] bytes = element.getBytes(Charset.forName(charset));
        //返回ProducerRecord
        return new ProducerRecord<>(topic, bytes);
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/775840.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号