栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 系统运维 > 运维 > Linux

flink打包运行

Linux 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

flink打包运行

一个简单的flink打包运行的demo

第一步:环境准备

首先你要有个flink运行环境,我这里是使用虚拟机搭建的单机模式,启动flink,在8081端口就可以看见flink UI,在这里就可以进行flink作业的管理:

第二步:flink代码的编写

我使用flink SQL编写的代码,代码很简单,使用SQL读入kafka中一个topic的消息,写入另一个kafka topic中:

package com.ms.flinksql;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class Kafka2Kafka {
    public static void main(String[] args) {

        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        String ddlSource = "CREATE TABLE user_behavior (n" +
                "    user_id BIGINT,n" +
                "    item_id BIGINT,n" +
                "    category_id BIGINT,n" +
                "    behavior STRING,n" +
                "    ts TIMESTAMP(3)n" +
                ") WITH (n" +
                "    'connector.type' = 'kafka',n" +
                "    'connector.version' = 'universal',n" +
                "    'connector.topic' = 'user_behavior',n" +
                "    'connector.startup-mode' = 'latest-offset',n" +
                "    'connector.properties.zookeeper.connect' = '192.168.126.128:2181',n" +
                "    'connector.properties.bootstrap.servers' = '192.168.126.128:9092',n" +
                "    'format.type' = 'json'n" +
                ")";

        String ddlSink = "CREATE TABLE user_behavior_sink (n" +
                "    user_id BIGINT,n" +
                "    item_id BIGINTn" +
                ") WITH (n" +
                "    'connector.type' = 'kafka',n" +
                "    'connector.version' = 'universal',n" +
                "    'connector.topic' = 'user_behavior_sink',n" +
                "    'connector.properties.zookeeper.connect' = '192.168.126.128:2181',n" +
                "    'connector.properties.bootstrap.servers' = '192.168.126.128:9092',n" +
                "    'format.type' = 'json',n" +
                "    'update-mode' = 'append'n" +
                ")";

        //提取读取到的数据,然后只要两个字段,重新发送到 Kafka 新 topic
        String sql = "insert into user_behavior_sink select user_id, item_id from user_behavior";

        tableEnv.executeSql(ddlSource);
        tableEnv.executeSql(ddlSink);
        tableEnv.executeSql(sql);
    }
}

maven的依赖如下:这里一些依赖是scope是provided,是啥意思捏?其实就是scope标注为provided在编译阶段会起作用,这样你的代码就不会出现找不到依赖的情况,可以通过编译从而打包,但是这些依赖不会真的打到jar里面。为啥要这样做呢,因为flink-table-api-java-bridge,flink-streaming-scala...这些包实际上flink环境是自带的,你根部不需要在项目的jar里打进去,这样jar包整的这么大还不讨好,何必呢?


    
        org.apache.flink
        flink-table-api-java-bridge_${scala.version}
        ${flink.version}
        provided
    
    
        org.apache.flink
        flink-streaming-scala_${scala.version}
        ${flink.version}
        provided
    
    
        org.apache.flink
        flink-table-common
        ${flink.version}
        provided
    
    
        org.apache.flink
        flink-table-planner_${scala.version}
        ${flink.version}
        provided
    

    
        org.apache.flink
        flink-table-planner-blink_${scala.version}
        ${flink.version}
        provided
    

    
        org.apache.flink
        flink-connector-kafka_${scala.version}
        ${flink.version}
    

    
        org.apache.flink
        flink-json
        ${flink.version}
    

    
        org.apache.flink
        flink-clients_${scala.version}
        ${flink.version}
        provided
    


第三步:打包

为了将需要的依赖都打进jar包里面,在maven的pom.xml里面指定打包工具:


   
        
            
            maven-compiler-plugin
            3.8.1
            
                1.8
                1.8
            
        

        
            maven-assembly-plugin 
            
                
                    
                    jar-with-dependencies
                
                
                    
                        
                        com.ms.flinksql.Kafka2Kafka
                    
                
            
            
                
                    make-assembly
                    package
                    
                        single
                    
                
            
        
    

使用idea打包

第四步:把jar包上传到flink环境运行


之后我们就可以在任务界面看到我们的任务啦:

第五步:测试一下任务

打开kafka,在source topic写入数据,sink topic中就会实时输出:

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/851233.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号