栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Java将CSV文件导入InfluxDB

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Java将CSV文件导入InfluxDB

1. 依赖
    
      org.influxdb
      influxdb-java
      2.10
    
2. 工具类
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.*;


import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class InfluxDBConnection {
    // 用户名
    private String username;
    // 密码
    private String password;
    // 连接地址
    private String openurl;
    // 数据库
    private String database;
    // 保留策略
    private String retentionPolicy;

    private InfluxDB influxDB;

    public InfluxDBConnection(String username, String password, String openurl, String database,
                              String retentionPolicy) {

        this.username = username;
        this.password = password;
        this.openurl = openurl;
        this.database = database;
        this.retentionPolicy = retentionPolicy == null || retentionPolicy.equals("") ? "autogen" : retentionPolicy;
        influxDbBuild();
    }
     
    public void batchInsert(final String database, final String retentionPolicy, final InfluxDB.ConsistencyLevel consistencyLevel,
                            final List records) {
        influxDB.write(database, retentionPolicy, consistencyLevel, records);
    }
 
}
3. 测试
public class ReadCSVBatchInstallDB {
    public static void main(String[] args) throws IOException {
        InfluxDBConnection influxDBConnection = new InfluxDBConnection("admin", "", "http://node04:8086", "test", "");

        String path = "D:\Code\IDEA\influxdb\input\2019-01.csv";
        File file = new File(path);
        BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(file)));
        String line = "";
        ArrayList list = new ArrayList<>();
        int count = 0;
        int tt = 0;
        HashMap tags = new HashMap();
        HashMap fields = new HashMap();
        ArrayList records = new ArrayList<>();


        Timestamp start = new Timestamp(System.currentTimeMillis());
        while ((line = reader.readLine()) != null) {
            long time = System.currentTimeMillis();
            count++;
            tt++;
            //System.out.println(count+" "+line);
            if (line.trim() != "") {
                String[] arr = line.split(",");
                tags.put("date", arr[0].trim());
                tags.put("latitude", arr[1].trim());
                tags.put("longitude", arr[2].trim());
                fields.put("index1", arr[3].trim());
                fields.put("index2", arr[4].trim());

                //一条记录值
                Point point = influxDBConnection.pointBuilder("weather", time, tags, fields);
                BatchPoints batchPoints = BatchPoints.database("test").consistency(InfluxDB.ConsistencyLevel.ALL).build();
                batchPoints.point(point);
                records.add(batchPoints.lineProtocol());
                if (count == 1000) {
                    
                    influxDBConnection.batchInsert("test", "", InfluxDB.ConsistencyLevel.ALL, records);
                    count = 0;
                    records.clear();
                    System.out.println(tt);
                }
            }
        }
        influxDBConnection.batchInsert("test", "", InfluxDB.ConsistencyLevel.ALL, records);
        Timestamp end = new Timestamp(System.currentTimeMillis());
        System.out.println(tt);
        System.out.println(start);
        System.out.println(end);
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/328575.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号