栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

【无标题】

【无标题】

Flink集成Kafka

Flink可以用于Kafka的生产者,也可以用于Kafka的消费者。

Flink作为生产者

需要创建FlinkKafkaProducer对象作为生产者向Kafka中发送消息

代码实现

public class MyFlinkKafkaProducer {

    public static void main(String[] args) throws Exception {

        // TODO 1. 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);//并行度的数量需要和Kafka主题中的分区数量一致,效率最高

        // TODO 2. 创建数据源
        DataStreamSource socketSource = env.socketTextStream("localhost", 9001);

        // TODO 3. 创建FlinkKafkaProducer对象
        Properties conf = new Properties();
        conf.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");
        FlinkKafkaProducer flinkKafkaProducer = new FlinkKafkaProducer("first", new SimpleStringSchema(), conf);
        // FlinkKafkaProducer kafkaProducer1 = new FlinkKafkaProducer<>("hadoop102:9092,hadoop103:9092,hadoop104:9092", "first", new SimpleStringSchema());

        // TODO 4. 绑定flinkafkaProducer对象
        socketSource.addSink(flinkKafkaProducer);

        // TODO 5. 启动执行
        env.execute();
    }
}
Flink作为消费者

需要创建FlinkKafkaConsumer对象作为消费者从Kafka中消费消息作为Flink程序的数据源。

代码实现

public class MyFlinkKafkaConsumer {

    public static void main(String[] args) throws Exception {

        // TODO 1. 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        // TODO 2. 创建数据源 - FlinkKafkaConsumer对象
        Properties conf = new Properties();
        conf.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");
        FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer<>("first", new SimpleStringSchema(), conf);

        // TODO 4. 绑定FlinkKafkaConsumer对象
        DataStreamSource kafkaDS = env.addSource(flinkKafkaConsumer);
        
        // TODO 4. 消费主题中数据并打印
        kafkaDS.print();

        // TODO 5. 启动执行
        env.execute();
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/746269.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号