栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

【ES】Java代码bulkProcessor同步mysql到es中(引用即用)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

【ES】Java代码bulkProcessor同步mysql到es中(引用即用)

一、项目中添加一个工具类就可以了
  • 配置类不用改,需要改的地方就是业务需求的地方
  • 每两分钟同步一次,加了个定时任务 
  • 自已已应用到测试环境,是能正常运行的
package com.example.gauditdemo.config;

import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;




@Configuration
public class ESConfigClient {

    public final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Bean
    public RestHighLevelClient esClient(){
        return new RestHighLevelClient(RestClient.builder(new HttpHost("localhost", 9200, "http")));
    }

}

package com.example.gauditdemo.utils;

import com.example.gauditdemo.config.ESConfigClient;
import com.example.gauditdemo.dao.OperationDao;
import org.elasticsearch.action.bulk.*;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizevalue;
import org.elasticsearch.common.unit.Timevalue;
import org.elasticsearch.common.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.TimeUnit;


@Component
public class ElasticSearchUtil {

    public final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    private ESConfigClient esConfigClient;

    private BulkProcessor bulkProcessor;

    @Resource
    private OperationDao operationDao;

    @Value("${audit.index.prefix.env}")
    private String auditIndexPrefixEnv;

    @PostConstruct
    public void init() {
        BulkProcessor.Listener listener = new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId, BulkRequest request) {
                //重写beforeBulk,在每次bulk request发出前执行,在这个方法里面可以知道在本次批量操作中有多少操作数
                int numberOfActions = request.numberOfActions();
                logger.info("同步数量 Executing bulk [{}] with {} requests", executionId, numberOfActions);
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                //重写afterBulk方法,每次批量请求结束后执行,可以在这里知道是否有错误发生。
                if (response.hasFailures()) {
                    logger.error("Bulk [{}] executed with failures,response = {}", executionId, response.buildFailureMessage());
                } else {
                    logger.info("写入成功 Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis());
                }
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                //重写方法,如果发生错误就会调用。
                logger.error("写入失败 Failed to execute bulk", failure);
            }
        };
        //在这里调用build()方法构造bulkProcessor,在底层实际上是用了bulk的异步操作
        this.bulkProcessor = BulkProcessor.builder(((bulkRequest, bulkResponseActionListener) -> {
            esConfigClient.esClient().bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener);
        }), listener)
                // 1000条数据请求执行一次bulk
                .setBulkActions(1000)
                // 3mb的数据刷新一次bulk
                .setBulkSize(new ByteSizevalue(3L, ByteSizeUnit.MB))
                // 并发请求数量, 0不并发, 1并发允许执行
                .setConcurrentRequests(1)
                // 固定1s必须刷新一次
                .setFlushInterval(Timevalue.timevalueSeconds(5L))
                // 重试5次,间隔100s
                .setBackoffPolicy(BackoffPolicy.constantBackoff(Timevalue.timevalueSeconds(100L), 5))
                .build();
    }

    @PreDestroy
    public void destroy() {
        try {
            bulkProcessor.awaitClose(30, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            logger.error("Failed to close bulkProcessor", e);
        }
        logger.info("bulkProcessor closed!");
    }

    
    @Scheduled(cron = "0 */2 * * * ?")
    public void insertIntoEs() {
        long startTime = System.currentTimeMillis();
        List> mapList = operationDao.selectOperationAndChangeDate();
        logger.info("同步数据 tongBuSize:{}条", mapList.size());
        SimpleDateFormat sdFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Calendar cal= Calendar.getInstance();
        if (!mapList.isEmpty()) {
            mapList.forEach(item -> {
                try {
                    // 自动生成的operationtime自动映射成date类型
                    cal.setTime(sdFormat.parse(item.get("operationtime").toString()));
                    // 插入es数据时间相差8小时
                    cal.add(Calendar.HOUR_OF_DAY,+8);
                    item.replace("operationtime", cal.getTime());
                } catch (ParseException e) {
                    logger.error("Failed to convert time", e);
                }
                this.bulkProcessor.add(new IndexRequest().index(auditIndexPrefixEnv + item.get("indexsuffix").toString()).source(item, XContentType.JSON));
            });
        }
        System.out.println(" use time: " + (System.currentTimeMillis() - startTime) / 1000 + "s");
    }

}
二、自己过程中遇到的问题及解决办法
  • 报错 bulk has been already closed 原因及解决办法:因为在定时任务那里,bulkProcessor添加完到es中,我就将es关闭掉了,导致下一次定时任务到的时候,就会报这个错。解决办法:就是将 client.close() 删除掉就可以了 
  • mysql时间类型与es中时间类型不一致 原因及解决办法:mysql中有一个时间date类型,同步到es中,这个时间类型在es中是text类型,导致查询会报错,预想应该在es中也是date类型才对。解决办法:mybatis中查询出来的结果将时间进行转换,代码中有,我写了注释。最好将es中的索引先删除掉,然后es会自动创建索引和字段类型的。
  • es中时间比mysql中查询出来的时间少了8个小时 原因及解决办法:同步的时候发现es中时间少8小时。解决办法:mybatis中查询出来的结果将时间加8个小时,代码中有,可以参考一下,我写了注释。
  • 定时任务起效果,需要三个条件 原因及解决办法:自己搜索0.0
  • 测试的时候,我是在windwos中安装的es和kibana 原因及解决办法:比较方便,方便我调式
  • 为什么同步es中要用bulkProcessor 原因及解决办法:当你数据量特别大的时候,不用bulkProcessor,如果一次性同步几百万条数据,会将es弄崩掉的。解决办法:加上bulkProcessor,可以1000条同步一次,或者几s自动同步一次,也比较方便
  • mybatis中查询使用流查询,添加 fetchSize="1000" 原因及解决办法:mybatis中一次性从mysql中查询几万条数据,会造成数据库压力。解决办法:加上fetchSize,流查询,缓解数据库压力。
  • 想不到还有什么坑,想到了我再添加,有不清楚的可以下面评论,看到我会解答的。
三、测试环境同步日志

四、我将demo放在了github上,有需要的可以自己参考一下

 定时同步mysql到es的demo项目

     

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/643297.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号