栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink读取kafka数据

flink读取kafka数据

前言

在实际生产环境中,经常需要使用flink读取外部的数据源作为数据的输入流,其中kafka就是重要的实时数据源,flink可以通过消费kafka指定的topic数据达到实时处理的目的,下面演示如何使用flink读取kafka的数据

环境准备

1、安装并启动zk服务

这个相信基本上都会了,就不再演示了

2、安装并启动kafka

本文为演示方便,直接使用docker快速启动一个kafka的容器,可以执行如下命令

docker run -d --name my_kafka 
-p 9092:9092 
-e KAFKA_BROKER_ID=0 
-e KAFKA_ZOOKEEPER_ConNECT=ZK公网IP:2181 
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://ZK公网IP:9092 
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka

注意执行上面的命令之前,确保zk已经启动

docker命令执行完毕后,检查kafka容器是否创建成功

 3、创建一个topic

进入上面的kafka的docker容器,创建一个topic,以供后面使用,执行下面命令进入容器:

docker exec -it my_kafka /bin/bash

进入bin目录,

cd /opt/kafka_2.13-2.8.1/bin

在该目录下创建一个topic,执行下面的命令进行topic的创建成功后,

./kafka-topics.sh --zookeeper ZK公网IP:2181 
 --create --topic zcy 
 --partitions 2 --replication-factor 1

可通过下面命令查看已存在的topic列表

./kafka-topics.sh --zookeeper ZK公网IP:2181 --list

 

以上的准备工作完成后,下面开始编码实现

编码实现

1、导入flink-kafka的依赖

        
            org.apache.flink
            flink-connector-kafka-0.11_2.12
            1.10.1
        

2、核心代码

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

import java.util.Properties;


public class SoureTest3 {

    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //读取kafka的数据
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","kafka公网IP:9092");
        properties.setProperty("group.id", "consumer-group");
        properties.setProperty("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");

        DataStreamSource dataStreamSource = env.addSource(
                new FlinkKafkaConsumer011<>(
                        "zcy",
                        new SimpleStringSchema(),
                        properties)
        );
        dataStreamSource.print();
        env.execute();
    }

}

然后使用下面的命令再在kafka的终端,开启生产者的shell窗口,

./kafka-console-producer.sh --broker-list 公网IP:9092 --topic zcy

效果如下:

 启动上面的程序,观察控制台,这时等待接收外部topic的数据

 然后从kafka的终端发送一条消息,可以看到,数据就能成功输出到控制台了,几乎是近实时的

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/701663.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号