栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

【Flink消费Fkafa】

【Flink消费Fkafa】

Flink消费kafka

Flink对kafka消费的主代码EnvUtil工具类SinkString下沉组件类KafkaConf工具类

Flink对kafka消费的主代码
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.Properties;


public class KafkaConsumer {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = EnvUtil.getEnv();

        // 此处的配置信息properties是引用的java的
        Properties props = new Properties();

        //链接kafka的信息参数,地址就是部分broker的信息地址,可以写多个。
        //bootstrap.servers这个参数是用来配置发现Kafka集群信息
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConf.ADDRESS);

        // 需指定group.id,值是管理端的app
        props.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConf.APP);

        //需要指定client.id 值是管理端的APP
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, KafkaConf.APP);

        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        //创建一个消费者的实例
        FlinkKafkaConsumer consumerInstance = new FlinkKafkaConsumer(KafkaConf.TOPIC, new SimpleStringSchema(), props);


        // TODO: 2022/1/22  因为Flink消费kafka的数据需要区分是否进行了checkpoint功能。
        

        
        consumerInstance.setCommitOffsetsOnCheckpoints(true);

        // TODO: 2022/1/22 为啥设置name函数 
        // name 是算子名称  为什么需要起名字/源码的注释。也可以设置uid
        //visualization 直观的意思
        
        SingleOutputStreamOperator source = env.addSource(consumerInstance).name("jingluohuanwanwu");

        source.addSink(new SinkString())
                .name("SinkStr");

        env.execute("jingluohuanwanwu");


    }
}

EnvUtil工具类
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class EnvUtil {
    public static StreamExecutionEnvironment getEnv() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置checkpoint 间隔和模式
        env.enableCheckpointing(300_000, CheckpointingMode.EXACTLY_ONCE);
        // 设置checkpoint的超时时间
        env.getCheckpointConfig().setCheckpointTimeout(600_000);
        return env;
    }
}
SinkString下沉组件类
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;





@Slf4j
public class SinkString implements SinkFunction {

    @Override
    public void invoke(String value, Context context) throws Exception {
        log.info("sink print:" + value);
    }
}

KafkaConf工具类
public class KafkaConf {
    public final static String TOPIC = "xxx";
    public final static String APP = "xxx";
    public final static String ADDRESS = "test-nameserver:50088";
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/711038.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号