栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

flink-kafka 简单例子(java)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

flink-kafka 简单例子(java)

问题描述

很久以前实习的时候接触flink,并且在此方面做了很多工作,但是过了几年时间,居然再次搭建 kafka-flink 会遇到那么多困难,无意间翻到那个时候写的文档,这里再次总结并公开一下,希望也能帮到需要的人。

效果描述

首先必须强调一点:以下内容并不是以 flink 的job的方式在flink中运行,但是,可以打包成 jar ,然后提交到 flink 中运行。

因为本地测试的需要,一般这种入门级别,用来验证 kafka 的项目,本地运行即可。

当启动本地 IDE,即进入监听 kafka 的状态,然后进入云服务器,启动kafka 生产者脚本,手动输入消息到 kafka 中。IDE 中的效果就是消费这些消息,全部都打印出来。如图所示:

源码

源码地址:https://gitee.com/smile-yan/flink-kafka-demo.git

环境介绍:

  • kafka 版本:kafka_2.13-3.0.0,但是实际上跟 kafka 版本联系不大。相差不太大的版本应该都可以用。
  • Oracle JDK 1.8
  • Flink :flink-1.12.5 ,为了和 pom 保持此一致,最好使用 1.12 版本比较稳定,其他版本可以考虑试一下。

pom 文件,注意 要保持 kafka, flink 版本相一致,不要轻易尝试随便改其中某一个版本。



    4.0.0

    cn.smileyan.demo
    flink-kafka-demo
    1.0-SNAPSHOT
    jar
    
        2.15
        3.13.1
        1.12.1
        2.11
        8
        8
        3.2.4
    
    
        
            local
            
                true
            
            
                compile
            
        
        
            prod
            
                false
            
            
                provided
            
        
    

    
        
            org.apache.flink
            flink-java
            ${flink.version}
            ${flink.scope}
            
                
                    log4j
                    *
                
                
                    org.slf4j
                    slf4j-log4j12
                
            
        
        
            org.apache.flink
            flink-streaming-java_${scala.binary.version}
            ${flink.version}
            ${flink.scope}
            
                
                    log4j
                    *
                
                
                    org.slf4j
                    slf4j-log4j12
                
            
        
        
            org.apache.flink
            flink-clients_${scala.binary.version}
            ${flink.version}
            ${flink.scope}
            
                
                    log4j
                    *
                
                
                    org.slf4j
                    slf4j-log4j12
                
            
        
        
            org.apache.flink
            flink-connector-kafka_${scala.binary.version}
            ${flink.version}
            
                
                    log4j
                    *
                
                
                    org.slf4j
                    slf4j-log4j12
                
            
        
        
            org.apache.flink
            flink-connector-elasticsearch6_${scala.binary.version}
            ${flink.version}
            
                
                    log4j
                    *
                
                
                    org.slf4j
                    slf4j-log4j12
                
            
        

        
            com.alibaba
            fastjson
            1.2.69
        
    
    
    
        
            
                org.apache.maven.plugins
                maven-shade-plugin
                ${maven-shade-plugin.version}
                
                    
                        package
                        
                            shade
                        
                        
                            false
                            
                                
                                    com.google.code.findbugs:jsr305
                                    org.slf4j:*
                                    log4j:*
                                
                            
                            
                                
                                    
                                    *:*
                                    
                                        meta-INF
public class HelloFlink {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties props = new Properties();
        props.put("bootstrap.servers", "服务器地址:9092");
        props.put("zookeeper.connect", "服务器地址:2181");
        props.put("group.id", "metric-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "latest");

        DataStreamSource dataStreamSource = env.addSource(new FlinkKafkaConsumer(
                "metric",
                new SimpleStringSchema(),
                props)).setParallelism(1);

        dataStreamSource.print();
        env.execute("Flink add data source");
    }

}

运行效果如上面第一个图所示,再次说明:图中的效果是本地IDE启动后,监听 kafka 的效果,并没有真正打包成 jar 项目并提交到 flink 而运行。但是这份源码是可以打包提交到 flink 运行的(已测有效)

总结

由于技术需要,可能还会在这方面多花一些时间,主要会在流计算上花更多的时间。特意做个笔记,共勉。

Smileyan
2021.10.30 18:16

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/358039.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号