栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink 实践教程-入门(9):Jar 作业开发

Flink 实践教程-入门(9):Jar 作业开发

作者:腾讯云流计算 Oceanus 团队

流计算 Oceanus 简介

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。

Flink Jar 作业既支持使用 DataStream API 编程也支持使用 Table API/SQL 编程, Table API 和 SQL 也可以很容易地集成并嵌入到 DataStream 程序中,请参见 与 DataStream API 集成 [1] 章节了解如何将 DataStream 与 Table 之间的相互转化。

流计算 Oceanus 支持 Flink Jar 作业和 Flink SQL 作业,本文将向您详细介绍如何使用 Flink DataStream API 进行 Jar 作业开发,并在流计算 Oceanus 平台运行。

操作视频 前置准备 创建流计算 Oceanus 集群

进入 Oceanus 控制台 [2],点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群 [3]。

创建消息队列 CKafka

进入 CKafka 控制台 [4],点击左上角【新建】,即可完成 CKafka 的创建,具体可参考 CKafka 创建实例 [5]。

创建 Topic:

进入 CKafka 实例,点击【topic 管理】>【新建】,即可完成 Topic 的创建,具体可参考 CKafka 创建 Topic [6]。

开发 DataStream 作业 1. 新建 Maven 工程。

在本地 IDEA 中新建Maven Project,并配置 pom.xml 文件。pom.xml 文件内容如下:



    4.0.0

    com.oceanus
    jar_demos
    1.0-SNAPSHOT

    
        8
        8
    
    
        
        
            org.apache.flink
            flink-java
            1.13.2
            provided
        
        
            org.apache.flink
            flink-streaming-java_2.11
            1.13.2
            provided
        
        
        
            org.apache.flink
            flink-connector-kafka_2.11
            1.13.2
            provided
        

        
        
        
            org.apache.flink
            flink-clients_2.11
            1.13.2
            test
        
    

    
        
            
                org.apache.maven.plugins
                maven-jar-plugin
                3.2.0
                
                    
                    
                        
                            com.demos.HelloWorld
                        
                    
                
            
        
    

2. 代码编写

Flink DataStream 作业代码如下:

package com.demos;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;


public class HelloWorld {
    public static void main(String[] args) throws Exception {
        // 1. 设置运行环境
        StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();

        List data = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            data.add(i);
        }

        // 2. 配置数据源读取数据
        // 预定义数据源支持从文件、套接字、集合读入数据;自定义数据源支持 Kafka、MySQL 等使用 addSource() 函数读入数据
        DataStreamSource> dataStream = sEnv.fromElements(data);

        // 3. 数据加工
        DataStream ds = dataStream.flatMap(new FlatMapFunction, String>() {
            @Override
            public void flatMap(List value, Collector out) throws Exception {
                value.forEach(v -> out.collect(v.toString()));
            }
        });

        // 4. 数据输出
        // 预定义目的端支持把数据写入文件、标准输出(stdout)、标准错误输出(stderr)和 socket;自定义目的端支持 Kafka、MySQL 等使用 addSink() 函数写出数据
        Properties sinkProps = new Properties();
        String hosts = "10.0.0.29:9092";
        sinkProps.setProperty("bootstrap.servers", hosts);
        String outTopic = "flink-demo9";
        FlinkKafkaProducer producer = new FlinkKafkaProducer(outTopic, new SimpleStringSchema(), sinkProps);
        ds.addSink(producer);
        // ds.print();

        // 5. 执行程序
        sEnv.execute("helloworld");
    }
}
打包 Jar 包

使用 IDEA 自带打包工具 Build Artifacts 或者命令行进行打包。
命令行打包命令:

mvn clean package

命令行打包后生成的 Jar 包可以在项目 target 目录下找到,Jar 名为 jar_demos-1.0-SNAPSHOT.jar。

流计算 Oceanus 作业 1. 上传依赖

在 Oceanus 控制台,点击左侧【依赖管理】,点击左上角【新建】新建依赖,上传本地 Jar 包。

2. 创建作业

在 Oceanus 控制台,点击左侧【作业管理】,点击左上角【新建】新建作业,作业类型选择 Jar 作业,点击【开发调试】进入作业编辑页面。

【主程序包】选择刚刚上传的依赖,并选择最新版本。参考 pom.xml 文件填写主类,此处填入 com.demos.HelloWorld。

3. 运行作业

点击【发布草稿】即可运行,可通过【日志】面板 TaskManager 或 Flink UI 查看运行信息。

总结
  1. DataStream 作业支持各类异构数据源与数据目的端。自定义数据源支持 Kafka、MySQL 等,使用 addSource() 函数读入数据;自定义目的端支持 Kafka、MySQL 等,使用 addSink() 函数写出数据。

  2. 打包时无需打包 flink 核心依赖,流计算 Oceanus 平台已提供。

阅读参考

[1] 与 DataStream API 集成:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/data_stream_api/
[2] Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview
[3] 创建独享集群:https://cloud.tencent.com/document/product/849/48298
[4] CKafka 控制台:https://console.cloud.tencent.com/ckafka/index?rid=1
[5] CKafka 创建实例:https://cloud.tencent.com/document/product/597/54839
[6] Ckafka 创建 Topic:https://cloud.tencent.com/document/product/597/54854

流计算 Oceanus 限量秒杀专享活动火爆进行中↓↓

​​

关注“腾讯云大数据”公众号,技术交流、最新活动、服务专享一站Get~

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/652730.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号