Flink对kafka消费的主代码EnvUtil工具类SinkString下沉组件类KafkaConf工具类
Flink对kafka消费的主代码import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;
public class KafkaConsumer {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = EnvUtil.getEnv();
// 此处的配置信息properties是引用的java的
Properties props = new Properties();
//链接kafka的信息参数,地址就是部分broker的信息地址,可以写多个。
//bootstrap.servers这个参数是用来配置发现Kafka集群信息
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConf.ADDRESS);
// 需指定group.id,值是管理端的app
props.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConf.APP);
//需要指定client.id 值是管理端的APP
props.put(ConsumerConfig.CLIENT_ID_CONFIG, KafkaConf.APP);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//创建一个消费者的实例
FlinkKafkaConsumer consumerInstance = new FlinkKafkaConsumer(KafkaConf.TOPIC, new SimpleStringSchema(), props);
// TODO: 2022/1/22 因为Flink消费kafka的数据需要区分是否进行了checkpoint功能。
consumerInstance.setCommitOffsetsOnCheckpoints(true);
// TODO: 2022/1/22 为啥设置name函数
// name 是算子名称 为什么需要起名字/源码的注释。也可以设置uid
//visualization 直观的意思
SingleOutputStreamOperator source = env.addSource(consumerInstance).name("jingluohuanwanwu");
source.addSink(new SinkString())
.name("SinkStr");
env.execute("jingluohuanwanwu");
}
}
EnvUtil工具类
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class EnvUtil {
public static StreamExecutionEnvironment getEnv() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置checkpoint 间隔和模式
env.enableCheckpointing(300_000, CheckpointingMode.EXACTLY_ONCE);
// 设置checkpoint的超时时间
env.getCheckpointConfig().setCheckpointTimeout(600_000);
return env;
}
}
SinkString下沉组件类
import lombok.extern.slf4j.Slf4j; import org.apache.flink.streaming.api.functions.sink.SinkFunction; @Slf4j public class SinkString implements SinkFunctionKafkaConf工具类{ @Override public void invoke(String value, Context context) throws Exception { log.info("sink print:" + value); } }
public class KafkaConf {
public final static String TOPIC = "xxx";
public final static String APP = "xxx";
public final static String ADDRESS = "test-nameserver:50088";
}



