栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Doris Stream Load 用csv导入或字符串调用http连接池导入

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Doris Stream Load 用csv导入或字符串调用http连接池导入

public class DorisStreamLoader {

    private String curl;
    private String authHeader;

    public void init(String host, int port, String dataBase, String table, String username, String password) {
        curl = String.format("http://%s:%s/api/%s/%s/_stream_load",
                host, port, dataBase, table);
        final String tobeEncode = username + ":" + password;
        byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
        authHeader = "Basic " + new String(encoded);
    }

    
    public String loadCsv(String csvPath) {
        FileReader fileReader = null;
        BufferedReader buffer = null;
        String tableHeader = null;
        StringBuilder stringBuilder = null;
        try {
            fileReader = new FileReader(csvPath);
            buffer = new BufferedReader(fileReader);
            tableHeader = buffer.readLine().replace(""", "");
            stringBuilder = new StringBuilder();
            while (true) {
                String line = buffer.readLine();
                if (line == null)
                    break;
                else {
                    line = line.replace(""", "");
                    stringBuilder.append(line + "n");
                }
            }
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

        try {
            fileReader.close();
            buffer.close();
        } catch (IOException e) {

        }
        if (tableHeader == null || stringBuilder == null)
            return "NULL";
        return loadString(tableHeader, stringBuilder.toString());
    }

    private static PoolingHttpClientConnectionManager poolHttpClientConnManager =
            new PoolingHttpClientConnectionManager();

    static {
        poolHttpClientConnManager.setMaxTotal(300);
        poolHttpClientConnManager.setDefaultMaxPerRoute(150);
    }

    private static CloseableHttpClient getCloseableHttpClient() {
        HttpClientBuilder clientbuilder = HttpClients
                .custom()
                .setRedirectStrategy(new DefaultRedirectStrategy() {
                    @Override
                    protected boolean isRedirectable(String method) {
                        // If the connection target is FE, you need to deal with 307 redirect。
                        return true;
                    }
                })
                // 自定义连接管理器
                .setConnectionManager(poolHttpClientConnManager)
                // 删除空闲连接时间
                .evictIdleConnections(40, TimeUnit.SECONDS)
                .disableAutomaticRetries(); // 关闭自动重试
        return clientbuilder.build();
    }

    
    public String loadString(String tableHeader, String content) {
        CloseableHttpClient client = getCloseableHttpClient();
        HttpPut put = new HttpPut(curl);
        put.removeHeaders(HttpHeaders.CONTENT_LENGTH);
        put.removeHeaders(HttpHeaders.TRANSFER_ENCODING);
        put.setHeader(HttpHeaders.EXPECT, "100-continue");
        put.setHeader(HttpHeaders.AUTHORIZATION, authHeader);
        put.setHeader("label", UUID.randomUUID().toString());
        put.setHeader("columns", tableHeader);
        put.setHeader("column_separator", ",");
        put.setEntity(new ByteArrayEntity(content.getBytes()));
        CloseableHttpResponse response = null;
        String loadResult = "";
        try {
            response = client.execute(put);
            if (response.getEntity() != null && response.getStatusLine().getStatusCode() == 200) {
                loadResult = EntityUtils.toString(response.getEntity());
            } else {
                loadResult = response.getStatusLine().toString();
            }
            response.close();
            return loadResult;
        } catch (IOException e) {
            e.printStackTrace();
            return "IOException";
        }
    }


    public static void main(String[] args) throws Exception {
        DorisStreamLoader loader = new DorisStreamLoader();
        Long now = new Date().getTime();
        loader.init("10.25.108.166", 8030, "baokangtest", "sc_g_motorvehicle_test", "baokang", "baokang");
        String statusInfo = loader.loadCsv("D:\sc_g_motorvehicle_test_202205062204.csv");
        System.out.println("耗时" + (new Date().getTime() - now) + "毫秒,运行状态" + statusInfo);
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/877264.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号