很久以前实习的时候接触flink,并且在此方面做了很多工作,但是过了几年时间,居然再次搭建 kafka-flink 会遇到那么多困难,无意间翻到那个时候写的文档,这里再次总结并公开一下,希望也能帮到需要的人。
效果描述首先必须强调一点:以下内容并不是以 flink 的job的方式在flink中运行,但是,可以打包成 jar ,然后提交到 flink 中运行。
因为本地测试的需要,一般这种入门级别,用来验证 kafka 的项目,本地运行即可。
当启动本地 IDE,即进入监听 kafka 的状态,然后进入云服务器,启动kafka 生产者脚本,手动输入消息到 kafka 中。IDE 中的效果就是消费这些消息,全部都打印出来。如图所示:
源码地址:https://gitee.com/smile-yan/flink-kafka-demo.git
环境介绍:
- kafka 版本:kafka_2.13-3.0.0,但是实际上跟 kafka 版本联系不大。相差不太大的版本应该都可以用。
- Oracle JDK 1.8
- Flink :flink-1.12.5 ,为了和 pom 保持此一致,最好使用 1.12 版本比较稳定,其他版本可以考虑试一下。
pom 文件,注意 要保持 kafka, flink 版本相一致,不要轻易尝试随便改其中某一个版本。
4.0.0 cn.smileyan.demo flink-kafka-demo 1.0-SNAPSHOT jar 2.15 3.13.1 1.12.1 2.11 8 8 3.2.4 local true compile prod false provided org.apache.flink flink-java ${flink.version} ${flink.scope} log4j * org.slf4j slf4j-log4j12 org.apache.flink flink-streaming-java_${scala.binary.version} ${flink.version} ${flink.scope} log4j * org.slf4j slf4j-log4j12 org.apache.flink flink-clients_${scala.binary.version} ${flink.version} ${flink.scope} log4j * org.slf4j slf4j-log4j12 org.apache.flink flink-connector-kafka_${scala.binary.version} ${flink.version} log4j * org.slf4j slf4j-log4j12 org.apache.flink flink-connector-elasticsearch6_${scala.binary.version} ${flink.version} log4j * org.slf4j slf4j-log4j12 com.alibaba fastjson 1.2.69 org.apache.maven.plugins maven-shade-plugin ${maven-shade-plugin.version} package shade false com.google.code.findbugs:jsr305 org.slf4j:* log4j:* *:* meta-INF public class HelloFlink { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties props = new Properties(); props.put("bootstrap.servers", "服务器地址:9092"); props.put("zookeeper.connect", "服务器地址:2181"); props.put("group.id", "metric-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "latest"); DataStreamSource dataStreamSource = env.addSource(new FlinkKafkaConsumer ( "metric", new SimpleStringSchema(), props)).setParallelism(1); dataStreamSource.print(); env.execute("Flink add data source"); } }
运行效果如上面第一个图所示,再次说明:图中的效果是本地IDE启动后,监听 kafka 的效果,并没有真正打包成 jar 项目并提交到 flink 而运行。但是这份源码是可以打包提交到 flink 运行的(已测有效)
总结由于技术需要,可能还会在这方面多花一些时间,主要会在流计算上花更多的时间。特意做个笔记,共勉。
Smileyan
2021.10.30 18:16



