栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

JAVA 导入数据到Elasticsearch中

JAVA 导入数据到Elasticsearch中

导入数据到Elasticsearch中

文章目录

导入数据到Elasticsearch中

前言写入的几种方案具体操作实现

版本: 测试

es入门使用
es索引使用
es分词使用
es聚合使用
java操作es

前言

我们在使用es时候,需要手动将数据导入到es中,导入数据无非就是给es中写入数据,可以从mysql中写入,也可以从其他db或者excle中导入中间需要做一层转换,然后使用es的相关api批量写入es中。

写入的几种方案

写入方案有很多种:

业务代码中异步写入
如我们创单成功时,异步将订单数据写入es中数据同步到mq中然后mq在写入es中
如项目日志操作,kafka + es 收集日志操作将mysql指定表中数据写入到es
这里我们演示将某个表中数据写入es中订阅mysql binlog 异步导入es
cancel伪装bin log 将解析后的数据导入到es中 具体操作

一般随着业务的发展,db查询到一定程度就显得力不从心(当然这里指的是加索引,分库分表以及各种优化之后也无济于事,数据量还是很大),此时我们就需要使用es来提高查询效率,此时就需要从db将数据导入到es中。

读取db数据批量写入es依次循环,直到数据都被写完 实现

写入es方法有很多种,我们可以使用es的api,一条一条写,也可以批量操作。

这里我们使用es官方推荐的一个工具类Using Bulk Processor操作
官方文档地址

版本:

jdk: 11
es:7.1
sb : 2.1
mysql: 5.7

首先导入依赖

 
        
            org.springframework.boot
            spring-boot-starter-web
        

        
            org.springframework.boot
            spring-boot-starter-test
            test
        


        
            org.springframework.boot
            spring-boot-starter-test
        
        
            org.elasticsearch.client
            elasticsearch-rest-high-level-client
            7.3.0
            
                
                    org.elasticsearch
                    elasticsearch
                
            
        

        
            org.elasticsearch
            elasticsearch
            7.3.0
        


        
            org.springframework.boot
            spring-boot-starter-thymeleaf
        

        
            org.springframework.boot
            spring-boot-starter-web
        

        
            org.springframework.boot
            spring-boot-devtools
            runtime
            true
        

        
            org.springframework.boot
            spring-boot-configuration-processor
            true
        


        
            org.springframework.boot
            spring-boot-starter-test
            test
            
                
                    org.junit.vintage
                    junit-vintage-engine
                
            
        

        
        
            org.apache.httpcomponents
            httpclient
            4.5.3
        

        
            com.alibaba
            fastjson
            1.2.58
        

        
            org.projectlombok
            lombok
            true
        

        
            mysql
            mysql-connector-java
            5.1.38
            
            compile
        

        
        
            org.apache.commons
            commons-lang3
            3.9
        
        
            junit
            junit
            4.12
            test
        


    

yml配置

spring:
  devtools:
    restart:
      enabled: true  #u8BBEu7F6Eu5F00u542Fu70EDu90E8u7F72
      additional-paths: src/main/java #u91CDu542Fu76EEu5F55
      exclude: WEB-INF/**
    freemarker:
      cache: false    #u9875u9762u4E0Du52A0u8F7Du7F13u5B58uFF0Cu4FEEu6539u5373u65F6u751Fu6548

  elasticsearch:
    rest:
      uris: 127.0.0.1:9200
server:
  port: 8080

logging:
  level:
    root: info
    com.xdclass.search: debug

我们需要初始化db,以及es(这里可以集群,也可以单机),然后写相关api
首先是db操作,这里直接单个db实例,简单一点,当然也可以使用其他的mybatis,jpa,jdbc等

package com.example.esdemo.config;

import java.sql.Connection;
import java.sql.DriverManager;

public class DBHelper {
    public static final String url = "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai";
//    public static final String name = "com.mysql.cj.jdbc.Driver";
    public static final String name = "com.mysql.jdbc.Driver";
    public static final String user = "root";
    public static final String password = "root";
    private  static Connection  connection = null;

    public  static   Connection  getConn(){
        try {
            Class.forName(name);
            connection = DriverManager.getConnection(url,user,password);
        }catch (Exception e){
            e.printStackTrace();
        }
        return  connection;
    }
}

es配置

package com.example.esdemo.config;

import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ElasticsearchConfig {


    @Value("${spring.elasticsearch.rest.uris}")
    private String hostlist;

    @Bean
    public RestHighLevelClient restHighLevelClient() {
        //解析hostlist配置信息
        String[] split = hostlist.split(",");
        //创建HttpHost数组,其中存放es主机和端口的配置信息
        HttpHost[] httpHostArray = new HttpHost[split.length];
        for (int i = 0; i < split.length; i++) {
            String item = split[i];
            httpHostArray[i] = new HttpHost(item.split(":")[0], Integer.parseInt(item.split(":")[1]), "http");
        }
        //创建RestHighLevelClient客户端
        return new RestHighLevelClient(RestClient.builder(httpHostArray));
    }//项目主要使用RestHighLevelClient,对于低级的客户端暂时不用

    @Bean
    public RestClient restClient() {
        // 解析hostlist配置信息
        String[] split = hostlist.split(",");
        //创建HttpHost数组,其中存放es主机和端口的配置信息
        HttpHost[] httpHostArray = new HttpHost[split.length];
        for (int i = 0; i < split.length; i++) {
            String item = split[i];
            httpHostArray[i] = new HttpHost(item.split(":")[0], Integer.parseInt(item.split(":")[1]), "http");
        }
        return RestClient.builder(httpHostArray).build();
    }
}

db 和es相关的表明和index名

 */
public class importDb2Es {

    private String dbTableName;

    private String esIndexName;
    //get  set 省略
}

然后是相关service,导入service

public interface importService {

    void importDb2Es(String dbName,String esIndexName);
}

实现

package com.example.esdemo.service.impl;

import com.example.esdemo.config.DBHelper;
import com.example.esdemo.imports.importDb2Es;
import com.example.esdemo.service.importService;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizevalue;
import org.elasticsearch.common.unit.Timevalue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;



@Component
public class importServiceImpl implements importService {

    private static final Logger logger = LogManager.getLogger(importServiceImpl.class);

    @Autowired
    private RestHighLevelClient client;


    @Override
    public void importDb2Es(importDb2Es importDb2Es) {
        writeMySQLDataToES(importDb2Es.getDbTableName(),importDb2Es.getDbTableName());
    }


    private void writeMySQLDataToES(String tableName,String esIndeName) {
        BulkProcessor bulkProcessor = getBulkProcessor(client);
        Connection connection = null;
        PreparedStatement ps = null;
        ResultSet rs = null;
        try {
            connection = DBHelper.getConn();
            logger.info("start handle data:" + tableName);
            String sql = "select * from " + tableName;
            ps = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
            // 根据自己需要设置 fetchSize
            ps.setFetchSize(20);
            rs = ps.executeQuery();
            ResultSetmetaData colData = rs.getmetaData();
            ArrayList> dataList = new ArrayList<>();
            HashMap map = null;
            int count = 0;
            // c 就是列的名字   v 就是列对应的值
            String c = null;
            String v = null;
            while (rs.next()) {
                count++;
                map = new HashMap(128);
                for (int i = 1; i < colData.getColumnCount(); i++) {
                    c = colData.getColumnName(i);
                    v = rs.getString(c);
                    map.put(c, v);
                }
                dataList.add(map);
                // 每1万条 写一次   不足的批次的数据 最后一次提交处理
                if (count % 10000 == 0) {
                    logger.info("mysql handle data  number:" + count);
                    // 将数据添加到 bulkProcessor
                    for (HashMap hashMap2 : dataList) {
                        bulkProcessor.add(new IndexRequest(esIndeName).source(hashMap2));
                    }
                    // 每提交一次 清空 map 和  dataList
                    map.clear();
                    dataList.clear();
                }
            }
            // 处理 未提交的数据
            for (HashMap hashMap2 : dataList) {
                bulkProcessor.add(new IndexRequest(esIndeName).source(hashMap2));
            }
            bulkProcessor.flush();

        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            try {
                rs.close();
                ps.close();
                connection.close();
                boolean terinaFlag = bulkProcessor.awaitClose(150L, TimeUnit.SECONDS);
                logger.info(terinaFlag);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }

    private BulkProcessor getBulkProcessor(RestHighLevelClient client) {

        BulkProcessor bulkProcessor = null;
        try {

            BulkProcessor.Listener listener = new BulkProcessor.Listener() {
                @Override
                public void beforeBulk(long executionId, BulkRequest request) {
                    logger.info("Try to insert data number : "
                            + request.numberOfActions());
                }

                @Override
                public void afterBulk(long executionId, BulkRequest request,
                                      BulkResponse response) {
                    logger.info("************** Success insert data number : "
                            + request.numberOfActions() + " , id: " + executionId);
                }

                @Override
                public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                    logger.error("Bulk is unsuccess : " + failure + ", executionId: " + executionId);
                }
            };

            BiConsumer> bulkConsumer = (request, bulkListener) -> client
                    .bulkAsync(request, RequestOptions.DEFAULT, bulkListener);

            BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener);
            builder.setBulkActions(5000);
            builder.setBulkSize(new ByteSizevalue(100L, ByteSizeUnit.MB));
            builder.setConcurrentRequests(10);
            builder.setFlushInterval(Timevalue.timevalueSeconds(100L));
            builder.setBackoffPolicy(BackoffPolicy.constantBackoff(Timevalue.timevalueSeconds(1L), 3));
            // 注意点:让参数设置生效
            bulkProcessor = builder.build();

        } catch (Exception e) {
            e.printStackTrace();
            try {
                bulkProcessor.awaitClose(100L, TimeUnit.SECONDS);
            } catch (Exception e1) {
                logger.error(e1.getMessage());
            }
        }
        return bulkProcessor;
    }
}

这里我们提供一个http 测试

 @Autowired
    private importService importService;

    @RequestMapping("api/import")
    public Map imports(importDb2Es importDb2Es) {
        Map map = new HashMap<>();


        importService.importDb2Es(importDb2Es);
        map.put("code", 200);
        map.put("msg", "成功");

        return map;
    }

这里我们准备了一个表一些数据

测试

我们调用http接口

http://127.0.0.1:8080/api/import?dbTableName=position&esIndexName=position_index

可以看到我们的后台导入日志50条数据

我们去es中查询如下100条(这里我请求了2次,所以是100条)

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/706882.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号