1. 依赖
org.influxdb
influxdb-java
2.10
2. 工具类
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class InfluxDBConnection {
// 用户名
private String username;
// 密码
private String password;
// 连接地址
private String openurl;
// 数据库
private String database;
// 保留策略
private String retentionPolicy;
private InfluxDB influxDB;
public InfluxDBConnection(String username, String password, String openurl, String database,
String retentionPolicy) {
this.username = username;
this.password = password;
this.openurl = openurl;
this.database = database;
this.retentionPolicy = retentionPolicy == null || retentionPolicy.equals("") ? "autogen" : retentionPolicy;
influxDbBuild();
}
public void batchInsert(final String database, final String retentionPolicy, final InfluxDB.ConsistencyLevel consistencyLevel,
final List records) {
influxDB.write(database, retentionPolicy, consistencyLevel, records);
}
}
3. 测试
public class ReadCSVBatchInstallDB {
public static void main(String[] args) throws IOException {
InfluxDBConnection influxDBConnection = new InfluxDBConnection("admin", "", "http://node04:8086", "test", "");
String path = "D:\Code\IDEA\influxdb\input\2019-01.csv";
File file = new File(path);
BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(file)));
String line = "";
ArrayList list = new ArrayList<>();
int count = 0;
int tt = 0;
HashMap tags = new HashMap();
HashMap fields = new HashMap();
ArrayList records = new ArrayList<>();
Timestamp start = new Timestamp(System.currentTimeMillis());
while ((line = reader.readLine()) != null) {
long time = System.currentTimeMillis();
count++;
tt++;
//System.out.println(count+" "+line);
if (line.trim() != "") {
String[] arr = line.split(",");
tags.put("date", arr[0].trim());
tags.put("latitude", arr[1].trim());
tags.put("longitude", arr[2].trim());
fields.put("index1", arr[3].trim());
fields.put("index2", arr[4].trim());
//一条记录值
Point point = influxDBConnection.pointBuilder("weather", time, tags, fields);
BatchPoints batchPoints = BatchPoints.database("test").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
records.add(batchPoints.lineProtocol());
if (count == 1000) {
influxDBConnection.batchInsert("test", "", InfluxDB.ConsistencyLevel.ALL, records);
count = 0;
records.clear();
System.out.println(tt);
}
}
}
influxDBConnection.batchInsert("test", "", InfluxDB.ConsistencyLevel.ALL, records);
Timestamp end = new Timestamp(System.currentTimeMillis());
System.out.println(tt);
System.out.println(start);
System.out.println(end);
}
}