Flink作为生产者Flink可以用于Kafka的生产者,也可以用于Kafka的消费者。
需要创建FlinkKafkaProducer对象作为生产者向Kafka中发送消息
代码实现
public class MyFlinkKafkaProducer {
public static void main(String[] args) throws Exception {
// TODO 1. 获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);//并行度的数量需要和Kafka主题中的分区数量一致,效率最高
// TODO 2. 创建数据源
DataStreamSource socketSource = env.socketTextStream("localhost", 9001);
// TODO 3. 创建FlinkKafkaProducer对象
Properties conf = new Properties();
conf.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");
FlinkKafkaProducer flinkKafkaProducer = new FlinkKafkaProducer("first", new SimpleStringSchema(), conf);
// FlinkKafkaProducer kafkaProducer1 = new FlinkKafkaProducer<>("hadoop102:9092,hadoop103:9092,hadoop104:9092", "first", new SimpleStringSchema());
// TODO 4. 绑定flinkafkaProducer对象
socketSource.addSink(flinkKafkaProducer);
// TODO 5. 启动执行
env.execute();
}
}
Flink作为消费者
需要创建FlinkKafkaConsumer对象作为消费者从Kafka中消费消息作为Flink程序的数据源。
代码实现
public class MyFlinkKafkaConsumer {
public static void main(String[] args) throws Exception {
// TODO 1. 获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
// TODO 2. 创建数据源 - FlinkKafkaConsumer对象
Properties conf = new Properties();
conf.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");
FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer<>("first", new SimpleStringSchema(), conf);
// TODO 4. 绑定FlinkKafkaConsumer对象
DataStreamSource kafkaDS = env.addSource(flinkKafkaConsumer);
// TODO 4. 消费主题中数据并打印
kafkaDS.print();
// TODO 5. 启动执行
env.execute();
}
}



