栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

第四章 FlinkAPI基础练习之Kafka

第四章 FlinkAPI基础练习之Kafka

kafka Connetor对接 1、运行环境说明
查看topic:  bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka --list
创建topic:  bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka --create --replication-factor 3 --partitions 1 --topic first

发送消息:bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first


消费消息:  bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

消费者组名: kafka-consumer-groups.sh 
--bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 
--list;
查看消费者组消费状态 :./kafka-consuemr-groups.sh --bootstrap-server hadoop102:9092 --group console-consumer-8000 --describe 
2、Kafka Source
  • 相关API:FlinkKafkaConsumer-》FlinkKafkaConsumerbase-》 RichParallelSourceFunction(富函数-并⾏读取kafka多分区)

(1)依赖配置


 org.apache.flink
 flink-connector-kafka_${scala.version}
 ${flink.version}

(2)代码实战

  • 创建kafkaSource - 需要知道 - kafka生产者节点&端口号,所属消费者组,topic
package com.lihaiwei.text1.app;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

public class flink04kafka {
    public static void main(String[] args) throws Exception {
        //1、构建执⾏任务环境以及任务的启动的⼊⼝, 存储全局相关的参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2、配置kafka连接参数
        Properties props = new Properties();
        //kafka地址
        props.setProperty("bootstrap.servers", "hadoop102:9092");
        //消费者组名
        props.setProperty("group.id", "console-consumer-8000");
        //消费消息时字符串序列化和反序列化规则
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //消费消息时offset重置规则
        props.setProperty("auto.offset.reset", "latest");
        //⾃动提交
        props.setProperty("enable.auto.commit", "true");
        props.setProperty("auto.commit.interval.ms", "2000");
        //有后台线程每隔10s检测⼀下Kafka的分区变化情况,新分区配置后可以拉区数据
        props.setProperty("flink.partition-discovery.interval-millis", "10000");

        // 3、定义kafka的消费者
        FlinkKafkaConsumer consumer =new FlinkKafkaConsumer<>("first", new SimpleStringSchema(), props);

        // 4、设置从记录的消费者组内的offset开始消费
        consumer.setStartFromGroupOffsets();

        // 5、设置kafka作为source
        DataStream ds = env.addSource(consumer);

        // 6、打印输出
        ds.print("打印后");
        // 7、DataStream需要调⽤execute,可以取个名称
        env.execute("custom source job");
    }
}
  • 运行结果

(1)kafka生产端发送消息

(2)运行结果

3、Kafka Sink
  • 相关API:

(1)依赖配置

创建topic :bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka --create --replication-factor 3 --partitions 1 --topic second

开启消费者:  bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic second

(2)代码实战

  • 需求:消费first中的数据,写入到second中
package com.lihaiwei.text1.app;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import java.util.Properties;

public class flink04kafka {
    public static void main(String[] args) throws Exception {
        //1、构建执⾏任务环境以及任务的启动的⼊⼝, 存储全局相关的参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2、配置kafka连接参数
        Properties props = new Properties();
        //kafka地址
        props.setProperty("bootstrap.servers", "hadoop102:9092");
        //消费者组名
        props.setProperty("group.id", "console-consumer-8000");
        //消费消息时字符串序列化和反序列化规则
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //消费消息时offset重置规则
        props.setProperty("auto.offset.reset", "latest");
        //⾃动提交
        props.setProperty("enable.auto.commit", "true");
        props.setProperty("auto.commit.interval.ms", "2000");
        //有后台线程每隔10s检测⼀下Kafka的分区变化情况
        props.setProperty("flink.partition-discovery.interval-millis", "10000");

        // 3、定义kafka的消费者的
        FlinkKafkaConsumer consumer =new FlinkKafkaConsumer<>("first", new SimpleStringSchema(), props);

        // 4、设置从记录的消费者组内的offset开始消费
        consumer.setStartFromGroupOffsets();

        // 5、设置kafka作为source
        DataStream ds = env.addSource(consumer);

        // 6、打印输出
        ds.print("打印后");

        // 7、处理,拼接字符串
        DataStream mapDS = ds.map(new MapFunction() {
            @Override
            public String map(String value) throws Exception {
                return "sink"+value;
            }
        });

        // 8、输出到kafka中
        FlinkKafkaProducer kafkaSink = new FlinkKafkaProducer<>("second", new SimpleStringSchema(), props);
        mapDS.addSink(kafkaSink);

        // 9、DataStream需要调⽤execute,可以取个名称
        env.execute("custom source job");
    }
}
  • 运行结果

①生产者发送消息

②程序运行结果

③消费者消费消息

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/584419.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号