public class DorisStreamLoader {
private String curl;
private String authHeader;
public void init(String host, int port, String dataBase, String table, String username, String password) {
curl = String.format("http://%s:%s/api/%s/%s/_stream_load",
host, port, dataBase, table);
final String tobeEncode = username + ":" + password;
byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
authHeader = "Basic " + new String(encoded);
}
public String loadCsv(String csvPath) {
FileReader fileReader = null;
BufferedReader buffer = null;
String tableHeader = null;
StringBuilder stringBuilder = null;
try {
fileReader = new FileReader(csvPath);
buffer = new BufferedReader(fileReader);
tableHeader = buffer.readLine().replace(""", "");
stringBuilder = new StringBuilder();
while (true) {
String line = buffer.readLine();
if (line == null)
break;
else {
line = line.replace(""", "");
stringBuilder.append(line + "n");
}
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
try {
fileReader.close();
buffer.close();
} catch (IOException e) {
}
if (tableHeader == null || stringBuilder == null)
return "NULL";
return loadString(tableHeader, stringBuilder.toString());
}
private static PoolingHttpClientConnectionManager poolHttpClientConnManager =
new PoolingHttpClientConnectionManager();
static {
poolHttpClientConnManager.setMaxTotal(300);
poolHttpClientConnManager.setDefaultMaxPerRoute(150);
}
private static CloseableHttpClient getCloseableHttpClient() {
HttpClientBuilder clientbuilder = HttpClients
.custom()
.setRedirectStrategy(new DefaultRedirectStrategy() {
@Override
protected boolean isRedirectable(String method) {
// If the connection target is FE, you need to deal with 307 redirect。
return true;
}
})
// 自定义连接管理器
.setConnectionManager(poolHttpClientConnManager)
// 删除空闲连接时间
.evictIdleConnections(40, TimeUnit.SECONDS)
.disableAutomaticRetries(); // 关闭自动重试
return clientbuilder.build();
}
public String loadString(String tableHeader, String content) {
CloseableHttpClient client = getCloseableHttpClient();
HttpPut put = new HttpPut(curl);
put.removeHeaders(HttpHeaders.CONTENT_LENGTH);
put.removeHeaders(HttpHeaders.TRANSFER_ENCODING);
put.setHeader(HttpHeaders.EXPECT, "100-continue");
put.setHeader(HttpHeaders.AUTHORIZATION, authHeader);
put.setHeader("label", UUID.randomUUID().toString());
put.setHeader("columns", tableHeader);
put.setHeader("column_separator", ",");
put.setEntity(new ByteArrayEntity(content.getBytes()));
CloseableHttpResponse response = null;
String loadResult = "";
try {
response = client.execute(put);
if (response.getEntity() != null && response.getStatusLine().getStatusCode() == 200) {
loadResult = EntityUtils.toString(response.getEntity());
} else {
loadResult = response.getStatusLine().toString();
}
response.close();
return loadResult;
} catch (IOException e) {
e.printStackTrace();
return "IOException";
}
}
public static void main(String[] args) throws Exception {
DorisStreamLoader loader = new DorisStreamLoader();
Long now = new Date().getTime();
loader.init("10.25.108.166", 8030, "baokangtest", "sc_g_motorvehicle_test", "baokang", "baokang");
String statusInfo = loader.loadCsv("D:\sc_g_motorvehicle_test_202205062204.csv");
System.out.println("耗时" + (new Date().getTime() - now) + "毫秒,运行状态" + statusInfo);
}
}