栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink 消费 Kafka 数据实时落Apache doris数据仓库(KFD)

Flink 消费 Kafka 数据实时落Apache doris数据仓库(KFD)

1.概述

Apache Doris(原百度 Palo)是一款基于大规模并行处理技术的分布式 SQL 数据仓库,由百度在 2017 年开源,2018 年 8 月进入 Apache 孵化器。

Apache Doris是一个现代化的MPP分析型数据库产品。仅需亚秒级响应时间即可获得查询结果,有效地支持实时数据分析。Apache Doris的分布式架构非常简洁,易于运维,并且可以支持10PB以上的超大数据集。

Apache Doris可以满足多种数据分析需求,例如固定历史报表,实时数据分析,交互式数据分析和探索式数据分析等。令您的数据分析工作更加简单高效!

2.场景介绍

这里我们介绍的是通过Doris提供的Stream load 结合Flink计算引擎怎么实现数据实时快速入库操作。

使用环境如下:

  mysql 5.x/8.x (主要是业务数据库)
  kafka 2.11 (消息队列)
  flink 1.10.1 (流式计算引擎)
  doris 0.14.7  (核心数仓)
  Canal (Mysql binlog数据采集工具)

3.实现方案

这里我们采用的历史数据离线处理+增量数据实时处理的架构

3.1 历史数据离线处理

历史数据离线处理方式,这里我们使用是Doris ODBC外表方式,将mysql的表映射到doris里,然后使用

 insert into   select * from 

3.1.1 外表创建方法

首先Apache Doris 0.13.x以上版本
要在所有的BE节点安装对应数据的ODBC驱动
创建外表
具体可以参考我的另外一篇文章,这里不多做介绍

[Apache doris ODBC外表使用方式]

https://mp.weixin.qq.com/s/J0suRGPNkxD6oHSRFK6KTA

3.2 增量数据实时处理

增量数据的实时处理,这里我们是通过 Canal 监控 Mysql binlog 解析并推送到指定的 Kafka 队列,然后通过 Flink 去实时消费Kafka队列的数据,然后你可以根据自己的需要对数据进行处理,算法等,最后将明细数据或者实时计算的中间结果保存到对应的doris数据表中,这里使用的是stream load,你可以使用Flink doris connector。

3.2.1 doris sink实现

这里我们首先实现一个Flink doris sink

 import com.alibaba.fastjson.JSON;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 ​
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 ​
 
 public class DorisSink extends RichSinkFunction {
 ​
     private static final Logger log = LoggerFactory.getLogger(DorisSink.class);
 ​
     private final static List DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList("Success", "Publish Timeout"));
 ​
     private DorisStreamLoad dorisStreamLoad;
 ​
     private String columns;
 ​
     private String jsonFormat;
 ​
     public DorisSink(DorisStreamLoad dorisStreamLoad, String columns, String jsonFormat) {
         this.dorisStreamLoad = dorisStreamLoad;
         this.columns = columns;
         this.jsonFormat = jsonFormat;
     }
 ​
     @Override
     public void open(Configuration parameters) throws Exception {
         super.open(parameters);
     }
 ​
 ​
     
     public static Boolean checkStreamLoadStatus(RespContent respContent) {
         if (DORIS_SUCCESS_STATUS.contains(respContent.getStatus())
                 && respContent.getNumberTotalRows() == respContent.getNumberLoadedRows()) {
             return true;
         } else {
             return false;
         }
     }
 ​
     @Override
     public void invoke(String value, Context context) throws Exception {
         DorisStreamLoad.LoadResponse loadResponse = dorisStreamLoad.loadBatch(value, columns, jsonFormat);
         if (loadResponse != null && loadResponse.status == 200) {
             RespContent respContent = JSON.parseObject(loadResponse.respContent, RespContent.class);
             if (!checkStreamLoadStatus(respContent)) {
                 log.error("Stream Load fail{}:", loadResponse);
             }
         } else {
             log.error("Stream Load Request failed:{}", loadResponse);
         }
     }
 }
3.2.2 Stream Load 工具类
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 ​
 ​
 import java.io.Serializable;
 import java.io.IOException;
 import java.io.BufferedOutputStream;
 import java.io.InputStream;
 import java.io.BufferedReader;
 import java.io.InputStreamReader;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
 import java.util.base64;
 import java.util.Calendar;
 import java.util.UUID;
 ​
 ​
 
 ​
 public class DorisStreamLoad implements Serializable {
 ​
     private static final Logger log = LoggerFactory.getLogger(DorisStreamLoad.class);
     //连接地址,这里使用的是连接FE
     private static String loadUrlPattern = "http://%s/api/%s/%s/_stream_load?";
     //fe ip地址
     private String hostPort;
     //数据库
     private String db;
     //要导入的数据表名
     private String tbl;
     //用户名
     private String user;
     //密码
     private String passwd;
     private String loadUrlStr;
     private String authEncoding;
 ​
 ​
     public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd) {
         this.hostPort = hostPort;
         this.db = db;
         this.tbl = tbl;
         this.user = user;
         this.passwd = passwd;
         this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
         this.authEncoding = base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
     }
     //获取http连接信息
     private HttpURLConnection getConnection(String urlStr, String label, String columns, String jsonformat) throws IOException {
         URL url = new URL(urlStr);
         HttpURLConnection conn = (HttpURLConnection) url.openConnection();
         conn.setInstanceFollowRedirects(false);
         conn.setRequestMethod("PUT");
         conn.setRequestProperty("Authorization", "Basic " + authEncoding);
         conn.addRequestProperty("Expect", "100-continue");
         conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
         conn.addRequestProperty("label", label);
         conn.addRequestProperty("max_filter_ratio", "0");
         conn.addRequestProperty("strict_mode", "true");
         conn.addRequestProperty("columns", columns);
         conn.addRequestProperty("format", "json");
         conn.addRequestProperty("jsonpaths", jsonformat);
         conn.addRequestProperty("strip_outer_array", "true");
         conn.setDoOutput(true);
         conn.setDoInput(true);
 ​
         return conn;
     }
 ​
     public static class LoadResponse {
         public int status;
         public String respMsg;
         public String respContent;
 ​
         public LoadResponse(int status, String respMsg, String respContent) {
             this.status = status;
             this.respMsg = respMsg;
             this.respContent = respContent;
         }
 ​
         @Override
         public String toString() {
             StringBuilder sb = new StringBuilder();
             sb.append("status: ").append(status);
             sb.append(", resp msg: ").append(respMsg);
             sb.append(", resp content: ").append(respContent);
             return sb.toString();
         }
     }
     //执行数据导入
     public LoadResponse loadBatch(String data, String columns, String jsonformat) {
         Calendar calendar = Calendar.getInstance();
         //导入的lable,全局唯一
         String label = String.format("flink_import_%s%02d%02d_%02d%02d%02d_%s",
                 calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH),
                 calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),
                 UUID.randomUUID().toString().replaceAll("-", ""));
 ​
         HttpURLConnection feConn = null;
         HttpURLConnection beConn = null;
         try {
             // build request and send to fe
             feConn = getConnection(loadUrlStr, label, columns, jsonformat);
             int status = feConn.getResponseCode();
             // fe send back http response code TEMPORARY_REDIRECT 307 and new be location
             if (status != 307) {
                 throw new Exception("status is not TEMPORARY_REDIRECT 307, status: " + status);
             }
             String location = feConn.getHeaderField("Location");
             if (location == null) {
                 throw new Exception("redirect location is null");
             }
             // build request and send to new be location
             beConn = getConnection(location, label, columns, jsonformat);
             // send data to be
             BufferedOutputStream bos = new BufferedOutputStream(beConn.getOutputStream());
             bos.write(data.getBytes());
             bos.close();
 ​
             // get respond
             status = beConn.getResponseCode();
             String respMsg = beConn.getResponseMessage();
             InputStream stream = (InputStream) beConn.getContent();
             BufferedReader br = new BufferedReader(new InputStreamReader(stream));
             StringBuilder response = new StringBuilder();
             String line;
             while ((line = br.readLine()) != null) {
                 response.append(line);
             }
             return new LoadResponse(status, respMsg, response.toString());
 ​
         } catch (Exception e) {
             e.printStackTrace();
             String err = "failed to load audit via AuditLoader plugin with label: " + label;
             log.warn(err, e);
             return new LoadResponse(-1, e.getMessage(), err);
         } finally {
             if (feConn != null) {
                 feConn.disconnect();
             }
             if (beConn != null) {
                 beConn.disconnect();
             }
         }
     }
 ​
 }

4.总结

本文只是抛砖引玉的方式给大家一个使用Stream load进行数据接入的使用方式及示例,Doris还有很多数据接入的方式等待大家去探索

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/752655.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号