栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

在Java中简单的将Mysql中的数据添加到es中并且定时检查更新

在Java中简单的将Mysql中的数据添加到es中并且定时检查更新

废话不说 先上pom依赖

    <这个是统一的依赖管理>

    

        7.13.0

    

    <下面是pom依赖>

    

        org.elasticsearch.client

        elasticsearch-rest-high-level-client

        7.13.0

        

            

                org.elasticsearch

                elasticsearch

            

            

                org.elasticsearch.client

                elasticsearch-rest-client

            

        

    

    

        org.elasticsearch

        elasticsearch

        ${elasticsearch.version}

    

    

        org.elasticsearch.client

        elasticsearch-rest-client

        ${elasticsearch.version}

        

            

                commons-logging

                commons-logging

            

        

    

    

    

        com.alibaba

        fastjson

    

我这边用的是多模块SpringBoot  这个是service的es依赖

相信码友们不需要其他的依赖了

依赖弄完之后我们开始Service的代码了

首先思路 是先到Mysql中查询到我们想要的数据

然后将查询到的数据转换成json数据然后es就可以进行存储了

代码如下

package com.tm.config;

import org.apache.http.HttpHost;

import org.elasticsearch.client.RequestOptions;

import org.elasticsearch.client.RestClient;

import org.elasticsearch.client.RestHighLevelClient;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Configuration

public class config {

    public static final RequestOptions COMMON_OPTIONS;

    static {

        RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();

        COMMON_OPTIONS = builder.build();

    }

    

    @Bean

    public static RestHighLevelClient esRestClient(){

        RestHighLevelClient client = new RestHighLevelClient(

                RestClient.builder(

                        //集群配置法

                        new HttpHost("192.168.206.133",19200,"http")));

        return client;

    }

}

上面的是连接es的配置

package com.tm.service.impl;

import com.alibaba.fastjson.JSONObject;

import com.tm.config.config;

import com.tm.mapper.EsSyncGoodsSpuMapper;

import com.tm.model.entity.EsSyncGoodsSpuEntity;

import org.elasticsearch.action.bulk.BulkRequest;

import org.elasticsearch.action.index.IndexRequest;

import org.elasticsearch.client.RestHighLevelClient;

import org.elasticsearch.common.xcontent.XContentType;

import org.springframework.context.annotation.Configuration;

import org.springframework.scheduling.annotation.EnableScheduling;

import org.springframework.scheduling.annotation.Scheduled;

import org.springframework.stereotype.Component;

import javax.annotation.Resource;

import java.util.List;

 //这三个注解是定时器的

@EnableScheduling

@Component

@Configuration

public class EsSyncGoodsSpuServiceImpl {

    //注入一下

    @Resource

    private EsSyncGoodsSpuMapper esSyncGoodsSpuMapper;

    //这个定时器注解就是每过去5秒执行一下这个aaa的方法

    @Scheduled(cron = "0/5 * * * * ?")

    public void aaa() {

        //这边是查询mysql的数据

        List list = esSyncGoodsSpuMapper.aaa();

        //调用高层对象

        RestHighLevelClient restHighLevelClient = config.esRestClient();

        //然后我这边使用forEach循环将数据添加到es中

        list.forEach(a -> {

            //创建一个索引请求(这里面写的是我们想要添加的索引)

            IndexRequest index = new IndexRequest("goods_spu");

            //这边是获取到我们查询得到的数据将这个查询的id当成我们es中_id(不要es自带的)

            index.id(a.getSpuId().toString());

            //创建批量操作对象

            BulkRequest request = new BulkRequest();

            //这里我将查询到的数据循环转换成json

            index.source(JSONObject.toJSonString(a), XContentType.JSON);

            //将转换成json的数据添加到我们创建的对象中去

            request.add(index);

            try {

                //将数据通过bulk操作进入es。。

                restHighLevelClient.bulk(request, config.COMMON_OPTIONS);

            } catch (Exception e) {

                e.printStackTrace();

            }

            //打印......

            System.out.println(list);

        });

    }

}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/605028.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号