在实际生产环境中,经常需要使用flink读取外部的数据源作为数据的输入流,其中kafka就是重要的实时数据源,flink可以通过消费kafka指定的topic数据达到实时处理的目的,下面演示如何使用flink读取kafka的数据
环境准备1、安装并启动zk服务
这个相信基本上都会了,就不再演示了
2、安装并启动kafka
本文为演示方便,直接使用docker快速启动一个kafka的容器,可以执行如下命令
docker run -d --name my_kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_ConNECT=ZK公网IP:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://ZK公网IP:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka
注意执行上面的命令之前,确保zk已经启动
docker命令执行完毕后,检查kafka容器是否创建成功
3、创建一个topic
进入上面的kafka的docker容器,创建一个topic,以供后面使用,执行下面命令进入容器:
docker exec -it my_kafka /bin/bash
进入bin目录,
cd /opt/kafka_2.13-2.8.1/bin
在该目录下创建一个topic,执行下面的命令进行topic的创建成功后,
./kafka-topics.sh --zookeeper ZK公网IP:2181 --create --topic zcy --partitions 2 --replication-factor 1
可通过下面命令查看已存在的topic列表
./kafka-topics.sh --zookeeper ZK公网IP:2181 --list
以上的准备工作完成后,下面开始编码实现
编码实现1、导入flink-kafka的依赖
org.apache.flink flink-connector-kafka-0.11_2.121.10.1
2、核心代码
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import java.util.Properties;
public class SoureTest3 {
public static void main(String[] args) throws Exception {
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//读取kafka的数据
Properties properties = new Properties();
properties.setProperty("bootstrap.servers","kafka公网IP:9092");
properties.setProperty("group.id", "consumer-group");
properties.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "latest");
DataStreamSource dataStreamSource = env.addSource(
new FlinkKafkaConsumer011<>(
"zcy",
new SimpleStringSchema(),
properties)
);
dataStreamSource.print();
env.execute();
}
}
然后使用下面的命令再在kafka的终端,开启生产者的shell窗口,
./kafka-console-producer.sh --broker-list 公网IP:9092 --topic zcy
效果如下:
启动上面的程序,观察控制台,这时等待接收外部topic的数据
然后从kafka的终端发送一条消息,可以看到,数据就能成功输出到控制台了,几乎是近实时的



