栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

通过flink 插入数据Stream

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

通过flink 插入数据Stream

1. 场景:

通过模拟数据 发送kafa —> flink 接收到kakfa 数据–> 然后通过flink 进行 streamLoad 插入数据到doris 中;

2. 建表语句
 CREATE TABLE `wudl_doris01` (
   `id` int NULL COMMENT "",
   `name` varchar(200) NULL COMMENT "",
   `address` string NULL COMMENT "",
   `city` varchar(2000) NULL COMMENT "",
   `phone` varchar(200) NULL COMMENT ""
 ) ENGINE=OLAP
 DUPLICATE KEY(`id`)
 COMMENT "flink sink  测试表"
 DISTRIBUTED BY HASH(`id`) BUCKETS 1
 PROPERTIES (
 "replication_num" = "1",
 "in_memory" = "false",
 "storage_format" = "V2"
 );
3.flink 插入数据代码:

项目结构

3.1 DorisBean:
package com.wudl.flink.doris.bean;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;

import java.io.Serializable;




public class DorisBean  implements Serializable {
    private static final long serialVersionUID = 1L;

    private Integer id;
    private String name;
    private String address;
    private String city;
    private String phone;

    @Override
    public String toString() {
        return "DorisBean{" +
                "id=" + id +
                ", name='" + name + ''' +
                ", address='" + address + ''' +
                ", city='" + city + ''' +
                ", phone='" + phone + ''' +
                '}';
    }

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }



    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    public String getCity() {
        return city;
    }

    public void setCity(String city) {
        this.city = city;
    }

    public String getPhone() {
        return phone;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public void setPhone(String phone) {
        this.phone = phone;
    }

    public DorisBean() {
    }

    public DorisBean(Integer id, String name, String address, String city, String phone) {
        this.id = id;
        this.name = name;
        this.address = address;
        this.city = city;
        this.phone = phone;
    }
}

3.2 返回的一个bean
package com.wudl.flink.doris.bean;

import java.io.Serializable;


public class RespContent implements Serializable {

    private static final long serialVersionUID = 1L;

    private int TxnId;

    private String Label;

    private String Status;

    private String ExistingJobStatus;

    private String Message;

    private long NumberTotalRows;

    private long NumberLoadedRows;

    private int NumberFilteredRows;

    private int NumberUnselectedRows;

    private long LoadBytes;

    private int LoadTimeMs;

    private int BeginTxnTimeMs;

    private int StreamLoadPutTimeMs;

    private int ReadDataTimeMs;

    private int WriteDataTimeMs;

    private int CommitAndPublishTimeMs;

    private String ErrorURL;

    public int getTxnId() {
        return TxnId;
    }

    public void setTxnId(int txnId) {
        TxnId = txnId;
    }

    public String getLabel() {
        return Label;
    }

    public void setLabel(String label) {
        Label = label;
    }

    public String getStatus() {
        return Status;
    }

    public void setStatus(String status) {
        Status = status;
    }

    public String getExistingJobStatus() {
        return ExistingJobStatus;
    }

    public void setExistingJobStatus(String existingJobStatus) {
        ExistingJobStatus = existingJobStatus;
    }

    public String getMessage() {
        return Message;
    }

    public void setMessage(String message) {
        Message = message;
    }

    public long getNumberTotalRows() {
        return NumberTotalRows;
    }

    public void setNumberTotalRows(long numberTotalRows) {
        NumberTotalRows = numberTotalRows;
    }

    public long getNumberLoadedRows() {
        return NumberLoadedRows;
    }

    public void setNumberLoadedRows(long numberLoadedRows) {
        NumberLoadedRows = numberLoadedRows;
    }

    public int getNumberFilteredRows() {
        return NumberFilteredRows;
    }

    public void setNumberFilteredRows(int numberFilteredRows) {
        NumberFilteredRows = numberFilteredRows;
    }

    public int getNumberUnselectedRows() {
        return NumberUnselectedRows;
    }

    public void setNumberUnselectedRows(int numberUnselectedRows) {
        NumberUnselectedRows = numberUnselectedRows;
    }

    public long getLoadBytes() {
        return LoadBytes;
    }

    public void setLoadBytes(long loadBytes) {
        LoadBytes = loadBytes;
    }

    public int getLoadTimeMs() {
        return LoadTimeMs;
    }

    public void setLoadTimeMs(int loadTimeMs) {
        LoadTimeMs = loadTimeMs;
    }

    public int getBeginTxnTimeMs() {
        return BeginTxnTimeMs;
    }

    public void setBeginTxnTimeMs(int beginTxnTimeMs) {
        BeginTxnTimeMs = beginTxnTimeMs;
    }

    public int getStreamLoadPutTimeMs() {
        return StreamLoadPutTimeMs;
    }

    public void setStreamLoadPutTimeMs(int streamLoadPutTimeMs) {
        StreamLoadPutTimeMs = streamLoadPutTimeMs;
    }

    public int getReadDataTimeMs() {
        return ReadDataTimeMs;
    }

    public void setReadDataTimeMs(int readDataTimeMs) {
        ReadDataTimeMs = readDataTimeMs;
    }

    public int getWriteDataTimeMs() {
        return WriteDataTimeMs;
    }

    public void setWriteDataTimeMs(int writeDataTimeMs) {
        WriteDataTimeMs = writeDataTimeMs;
    }

    public int getCommitAndPublishTimeMs() {
        return CommitAndPublishTimeMs;
    }

    public void setCommitAndPublishTimeMs(int commitAndPublishTimeMs) {
        CommitAndPublishTimeMs = commitAndPublishTimeMs;
    }

    public String getErrorURL() {
        return ErrorURL;
    }

    public void setErrorURL(String errorURL) {
        ErrorURL = errorURL;
    }

    @Override
    public String toString() {
        return "RespContent{" +
                "TxnId=" + TxnId +
                ", Label='" + Label + ''' +
                ", Status='" + Status + ''' +
                ", ExistingJobStatus='" + ExistingJobStatus + ''' +
                ", Message='" + Message + ''' +
                ", NumberTotalRows=" + NumberTotalRows +
                ", NumberLoadedRows=" + NumberLoadedRows +
                ", NumberFilteredRows=" + NumberFilteredRows +
                ", NumberUnselectedRows=" + NumberUnselectedRows +
                ", LoadBytes=" + LoadBytes +
                ", LoadTimeMs=" + LoadTimeMs +
                ", BeginTxnTimeMs=" + BeginTxnTimeMs +
                ", StreamLoadPutTimeMs=" + StreamLoadPutTimeMs +
                ", ReadDataTimeMs=" + ReadDataTimeMs +
                ", WriteDataTimeMs=" + WriteDataTimeMs +
                ", CommitAndPublishTimeMs=" + CommitAndPublishTimeMs +
                ", ErrorURL='" + ErrorURL + ''' +
                '}';
    }

}

3.4 DorisSink
package com.wudl.flink.doris.sink;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.wudl.flink.doris.bean.RespContent;
import com.wudl.flink.doris.utils.DorisStreamLoad;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;



public class DorisSink extends RichSinkFunction {

    private static final Logger log = LoggerFactory.getLogger(DorisSink.class);

    private final static List DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList("Success", "Publish Timeout"));

    private DorisStreamLoad dorisStreamLoad;

    private String columns;

    private String jsonFormat;

    public DorisSink(DorisStreamLoad dorisStreamLoad, String columns, String jsonFormat) {
        this.dorisStreamLoad = dorisStreamLoad;
        this.columns = columns;
        this.jsonFormat = jsonFormat;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
    }


    
    public static Boolean checkStreamLoadStatus(RespContent respContent) {
        return DORIS_SUCCESS_STATUS.contains(respContent.getStatus())
                && respContent.getNumberTotalRows() == respContent.getNumberLoadedRows();
    }

    @Override
    public void invoke(String value, Context context) throws Exception {
        // 截取有效数据

        JSONObject data = JSONObject.parseObject(value);
//        value = JSON.toJSonString(value);
        value = JSON.toJSONString(data.get("data"));
        System.out.println("value----"+value);
        DorisStreamLoad.LoadResponse loadResponse = dorisStreamLoad.loadBatch(value, columns, jsonFormat);
        if (loadResponse != null && loadResponse.status == 200) {
            RespContent respContent = JSON.parseObject(loadResponse.respContent, RespContent.class);
            if (!checkStreamLoadStatus(respContent)) {
                log.error("Stream Load fail{}:", loadResponse);
            } else {
                log.info("Stream Load success{}:", loadResponse);
            }
        } else {
            log.error("Stream Load Request failed:{}", loadResponse);
        }
    }

}

3.5 GenerateData 生成数据
package com.wudl.flink.doris.source;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.wudl.flink.doris.bean.DorisBean;
import com.wudl.flink.doris.utils.MyKafkaUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.text.SimpleDateFormat;
import java.util.*;



public class GenerateData implements SourceFunction  {


        private boolean isRunning = true;

        String[] citys = {"北京","广东","山东","江苏","河南","上海","河北","浙江","香港","山西","陕西","湖南","重庆","福建","天津","云南","四川","广西","安徽","海南","江西","湖北","山西","辽宁","内蒙古"};

        Integer i = 0;
        List  list = new ArrayList<>();
        @Override
        public void run(SourceContext ctx) throws Exception {

            Random random = new Random();
            SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            while (isRunning) {
                int number = random.nextInt(4) + 1;
                Integer id =  i++;
                String name = df.format(new Date());
//                Integer name = id+100;
                String address = "1";
                String city = citys[random.nextInt(citys.length)];
//                int age = random.nextInt(25);
                String phone = getTel();
                DorisBean  dorisBean = new DorisBean(id,name,address,city,phone);
                list.add(dorisBean);
                if (list.size()==2000)
                {
                    Map map = new HashMap<>();
                    map.put("data",list);
                    String s = JSON.toJSONString(map);
                    System.out.println("map--->"+s);
                    list = new ArrayList<>();
                    ctx.collect(s);
//                    Thread.sleep(5000*2);
                }


            }
        }

        @Override
        public void cancel() {

            isRunning = false;
        }


    private static String[] telFirst="134,135,136,137,138,139,150,151,152,157,158,159,130,131,132,155,156,133,153".split(",");
    private static String getTel() {
        int index=getNum(0,telFirst.length-1);
        String first=telFirst[index];
        String second=String.valueOf(getNum(1,888)+10000).substring(1);
        String third=String.valueOf(getNum(1,9100)+10000).substring(1);
        return first+second+third;
    }

    public static int getNum(int start,int end) {
        return (int)(Math.random()*(end-start+1)+start);
    }

    public static void main(String[] args) throws Exception {
         String default_topic = "wudltopicdoris01";
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(8);
        DataStreamSource generateSource = env.addSource(new GenerateData());
        generateSource.print("--------");
        generateSource.addSink(MyKafkaUtil.getKafkaProducer(default_topic));

        env.execute();


    }

}

3.6 doris 加载工具类
package com.wudl.flink.doris.utils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.base64;
import java.util.Calendar;
import java.util.UUID;


public class DorisStreamLoad implements Serializable {
    private static final long serialVersionUID = 1L;

    private static final Logger log = LoggerFactory.getLogger(DorisStreamLoad.class);
    
    private static String loadUrlPattern = "http://%s/api/%s/%s/_stream_load?";
    
    private String hostPort;
    
    private String db;
    
    private String table;
    
    private String username;
    
    private String password;
    
    private String loadUrlStr;
    
    private String authEncoding;

    public DorisStreamLoad(String hostPort, String db, String table, String username, String password) {
        this.hostPort = hostPort;
        this.db = db;
        this.table = table;
        this.username = username;
        this.password = password;
        this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, table);
        this.authEncoding = base64.getEncoder().encodeToString(String.format("%s:%s", username, password).getBytes(StandardCharsets.UTF_8));
    }

    
    private HttpURLConnection getConnection(String urlStr, String label, String columns, String jsonformat) throws IOException {
        URL url = new URL(urlStr);
        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
        conn.setInstanceFollowRedirects(false);
        conn.setRequestMethod("PUT");
        conn.setRequestProperty("Authorization", "Basic " + authEncoding);
        conn.addRequestProperty("Expect", "100-continue");
        conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
        conn.addRequestProperty("label", label);
        conn.addRequestProperty("max_filter_ratio", "0");
        conn.addRequestProperty("strict_mode", "true");
        conn.addRequestProperty("columns", columns);
        conn.addRequestProperty("format", "json");
        conn.addRequestProperty("jsonpaths", jsonformat);
        conn.addRequestProperty("strip_outer_array", "true");
        conn.setDoOutput(true);
        conn.setDoInput(true);
        return conn;
    }

    public static class LoadResponse {
        public int status;
        public String respMsg;
        public String respContent;

        public LoadResponse(int status, String respMsg, String respContent) {
            this.status = status;
            this.respMsg = respMsg;
            this.respContent = respContent;
        }

        @Override
        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("status: ").append(status);
            sb.append(", resp msg: ").append(respMsg);
            sb.append(", resp content: ").append(respContent);
            return sb.toString();
        }
    }

    
    public LoadResponse loadBatch(String data, String columns, String jsonformat) {
        Calendar calendar = Calendar.getInstance();
        //导入的lable,全局唯一
        String label = String.format("flink_import_%s%02d%02d_%02d%02d%02d_%s",
                calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH),
                calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),
                UUID.randomUUID().toString().replaceAll("-", ""));

        HttpURLConnection feConn = null;
        HttpURLConnection beConn = null;

        try {
            // build request and send to fe
            feConn = getConnection(loadUrlStr, label, columns, jsonformat);
            int status = feConn.getResponseCode();
            // fe send back http response code TEMPORARY_REDIRECT 307 and new be location
            if (status != 307) {
                throw new Exception("status is not TEMPORARY_REDIRECT 307, status: " + status);
            }
            String location = feConn.getHeaderField("Location");
            if (location == null) {
                throw new Exception("redirect location is null");
            }
            // build request and send to new be location
            beConn = getConnection(location, label, columns, jsonformat);
            // send data to be
            BufferedOutputStream bos = new BufferedOutputStream(beConn.getOutputStream());
            bos.write(data.getBytes());
            bos.close();

            // get respond
            status = beConn.getResponseCode();
            String respMsg = beConn.getResponseMessage();
            InputStream stream = (InputStream) beConn.getContent();
            BufferedReader br = new BufferedReader(new InputStreamReader(stream));
            StringBuilder response = new StringBuilder();
            String line;
            while ((line = br.readLine()) != null) {
                response.append(line);
            }
            return new LoadResponse(status, respMsg, response.toString());

        } catch (Exception e) {
            e.printStackTrace();
            String err = "failed to load audit via AuditLoader plugin with label: " + label;
            log.warn(err, e);
            return new LoadResponse(-1, e.getMessage(), err);
        } finally {
            if (feConn != null) {
                feConn.disconnect();
            }
            if (beConn != null) {
                beConn.disconnect();
            }
        }
    }

}

3.7 kafka 工具类
package com.wudl.flink.doris.utils;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.util.Properties;



public class MyKafkaUtil {
    private static String brokers = "192.168.1.161:6667";
    private static String default_topic = "wudltopic";

    public static FlinkKafkaProducer getKafkaProducer(String topic) {
        return new FlinkKafkaProducer(brokers,
                topic,
                new SimpleStringSchema());
    }

    public static  FlinkKafkaProducer getKafkaProducer(KafkaSerializationSchema kafkaSerializationSchema) {

        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);

        return new FlinkKafkaProducer(default_topic,
                kafkaSerializationSchema,
                properties,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
    }

    public static FlinkKafkaConsumer getKafkaConsumer(String topic, String groupId) {

        Properties properties = new Properties();

        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);

        return new FlinkKafkaConsumer(topic,
                new SimpleStringSchema(),
                properties);

    }

    
    //拼接Kafka相关属性到DDL
    public static String getKafkaDDL(String topic, String groupId) {
        return  " 'connector' = 'kafka', " +
                " 'topic' = '" + topic + "'," +
                " 'properties.bootstrap.servers' = '" + brokers + "', " +
                " 'properties.group.id' = '" + groupId + "', " +
                " 'format' = 'json', " +
                " 'scan.startup.mode' = 'latest-offset'  ";
    }


}

3.8 程序入口类
package com.wudl.flink.doris;

import com.wudl.flink.doris.sink.DorisSink;
import com.wudl.flink.doris.source.GenerateData;
import com.wudl.flink.doris.utils.DorisStreamLoad;
import com.wudl.flink.doris.utils.MyKafkaUtil;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.util.Properties;



public class DorisApp {

    private static final String bootstrapServer = "192.168.1.161:6667";

    private static final String groupName = "flink_doris_group006";

    private static final String topicName = "wudltopicdoris01";

    private static final String hostPort = "192.168.1.161:8090";

    private static final String dbName = "wudldb";

    private static final String tbName = "wudl_doris01";

    private static final String userName = "root";

    private static final String password = "";

//    private static final String columns = "id,name,address,city,phone";
    private static final String columns = "address,city,id,name,phone";
//"address":"广东省","city":"海南","id":183,"name":"2022-01-03 00:41:37","phone":"15007840220"}

//    private static final String jsonFormat = "["$.name","$.age","$.price","$.sale"]";
    private static final String jsonFormat = "["$.address","$.city","$.id","$.name","$.phone"]";
//    private static final String jsonFormat =   "["$.address","$.city","$.id","$.name","$.phone"]";

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServer);
        props.put("group.id", groupName);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//        props.put("auto.offset.reset", "earliest");
//        props.put("max.poll.records", "10000");

        SimpleStringSchema simpleStringSchema = new SimpleStringSchema();
        StreamExecutionEnvironment blinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
//        blinkStreamEnv.setParallelism(1);
        blinkStreamEnv.enableCheckpointing(10000);
        blinkStreamEnv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer<>(topicName,
                simpleStringSchema,
                props);

//        DataStreamSource dataStreamSource = blinkStreamEnv.socketTextStream("192.168.1.163", 9999);

//        DataStreamSource dataStreamSource = blinkStreamEnv.addSource(flinkKafkaConsumer);
                DataStreamSource dataStreamSource = blinkStreamEnv.addSource(new GenerateData());

        dataStreamSource.map(new MapFunction() {
            @Override
            public String map(String s) throws Exception {
                System.out.println(s);
                return s;
            }
        });


        DorisStreamLoad dorisStreamLoad = new DorisStreamLoad(hostPort, dbName, tbName, userName, password);

        dataStreamSource.addSink(new DorisSink(dorisStreamLoad, columns, jsonFormat));

        blinkStreamEnv.execute("flink kafka to doris");

    }
}

4.执行结果 执行半个小时 插入插入 4千万数据

5. pom 文件








    org.wudlflink13
    1.0-SNAPSHOT
    4.0.0

    wudl-flink-13

    
    
        
            aliyun
            http://maven.aliyun.com/nexus/content/groups/public/
        
        
            apache
            https://repository.apache.org/content/repositories/snapshots/
        
        
            cloudera
            https://repository.cloudera.com/artifactory/cloudera-repos/
        

        
            spring-plugin
            https://repo.spring.io/plugins-release/
        

    

    
        UTF-8
        UTF-8
        1.8
        1.8
        1.8
        2.11
        1.13.3
    
    
        
            com.alibaba
            fastjson
            1.2.78
        
        
            org.apache.flink
            flink-connector-elasticsearch7_2.11
            1.12.3
        

        
            org.apache.flink
            flink-connector-hbase-2.2_2.11
            ${flink.version}
        


        
            org.apache.flink
            flink-jdbc_2.11
            1.10.3
        

        


        
            org.apache.flink
            flink-clients_2.12
            ${flink.version}
            
                
                    slf4j-api
                    org.slf4j
                
            
        
        
            org.apache.flink
            flink-scala_2.12
            ${flink.version}
        
        
            org.apache.flink
            flink-java
            ${flink.version}
        
        
            org.apache.flink
            flink-streaming-scala_2.12
            ${flink.version}
        
        
            org.apache.flink
            flink-streaming-java_2.12
            ${flink.version}
        
        
            org.apache.flink
            flink-table-api-scala-bridge_2.12
            ${flink.version}
        
        
            org.apache.flink
            flink-table-api-java-bridge_2.12
            ${flink.version}
        

        
        
            org.apache.flink
            flink-table-planner-blink_2.12
            ${flink.version}
            
                
                    slf4j-api
                    org.slf4j
                
            
        
        
            org.apache.flink
            flink-table-common
            ${flink.version}
        

        
            org.apache.flink
            flink-cep_2.12
            ${flink.version}
        


        
        
            org.apache.flink
            flink-connector-kafka_2.12
            ${flink.version}
        
        
            org.apache.flink
            flink-sql-connector-kafka_2.12
            ${flink.version}
        
        
            org.apache.flink
            flink-connector-jdbc_2.12
            ${flink.version}
        
        
            org.apache.flink
            flink-csv
            ${flink.version}
        
        
            org.apache.flink
            flink-json
            ${flink.version}
        

        
            org.apache.bahir
            flink-connector-redis_2.11
            1.0
            
                
                    flink-streaming-java_2.11
                    org.apache.flink
                
                
                    flink-runtime_2.11
                    org.apache.flink
                
                
                
                
                
                
                    flink-java
                    org.apache.flink
                
            
        

        
            org.apache.flink
            flink-connector-hive_2.12
            ${flink.version}
        
        
            org.apache.hive
            hive-metastore
            2.1.0
            
                
                    hadoop-hdfs
                    org.apache.hadoop
                
                
                    slf4j-api
                    org.slf4j
                
            
        












        
            org.apache.flink
            flink-shaded-hadoop-2-uber
            2.7.5-10.0
            
                
                    slf4j-log4j12
                    org.slf4j
                
            
        

        
            org.apache.hbase
            hbase-client
            2.1.0
            
                
                    slf4j-api
                    org.slf4j
                
                
                    slf4j-log4j12
                    org.slf4j
                
            
        
        
            mysql
            mysql-connector-java
            5.1.38
            
        

        
        
            io.vertx
            vertx-core
            3.9.0
        
        
            io.vertx
            vertx-jdbc-client
            3.9.0
        
        
            io.vertx
            vertx-redis-client
            3.9.0
        

        
        
            org.slf4j
            slf4j-log4j12
            1.7.7
            runtime
        
        
            log4j
            log4j
            1.2.17
            runtime
        


        
            org.projectlombok
            lombok
            1.18.2
            provided
        

        
            org.apache.hadoop
            hadoop-hdfs
            3.1.3
        

        
            org.apache.hadoop
            hadoop-common
            3.1.3
            
                
                    slf4j-api
                    org.slf4j
                
                
                    slf4j-log4j12
                    org.slf4j
                
            
        
        
            org.apache.flink
            flink-hbase_2.11
            1.10.1
        


    

    
        
            
                org.apache.maven.plugins
                maven-assembly-plugin
                3.0.0
                
                    
                        jar-with-dependencies
                    
                
                
                    
                        make-assembly
                        package
                        
                            single
                        
                    
                
            
        
    

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/696704.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号