栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Springboot整合ElasticSearch和ElasticSearch数据同步

Springboot整合ElasticSearch和ElasticSearch数据同步

Spring-data-elasticsearch是Spring提供的操作ElasticSearch的数据层,封装了大量的基础操作,通过它可以很方便的操作ElasticSearch的数据。在项目中使用了ElasticSearchRepository和ElasticsearchRestTemplate进行查询。同时介绍了通过logstash将数据从mysql同步到ElasticSearch。

版本问题

spring data elasticsearch版本和elasticseach版本以及springboot版本需要一一对应才能够进行开发设计。接下来说一下我所使用的版本

我使用的Springboot版本是2.3.7.RELEASE,对应es版本应该是7及以上,我使用的es版本为7.4.2,spring-boot-starter-data-elasticsearch对应2.6.2


			org.springframework.boot
			spring-boot-starter-data-elasticsearch
			2.6.2

 
            
                org.springframework.boot
                spring-boot-dependencies
                2.3.7.RELEASE
                pom
                import
            
配置 配置文件

es会占用两个端口,9200作为Http协议,主要用于外部通讯,可进行修改,9300是ES节点之间通讯使用的端口,它是tcp通讯端口,java配置es时需要使用该端口,默认端口最好不要修改。但是由于我的服务器中还有一个es6,因此tcp端口设为9301

spring:
  data:
     elasticsearch:
        cluster-name: zgw-es
        cluster-nodes: 39.105.205.63:9301
    repositories:
       enabled: true
启动类

启动类添加注解配置Repositories的包路径

@EnableElasticsearchRepositories(basePackages = "com.chinaunicom.unicmdb.promsync.repository")
查询使用 ElasticSearchRepository的基本使用

domain

package com.chinaunicom.unicmdb.promsync.entity;


import com.alibaba.fastjson.annotation.JSONField;
import com.chinaunicom.common.core.annotation.Excel;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.document;
import org.springframework.data.elasticsearch.annotations.Field;

@document(indexName = "room")
public class SpaceRoom {

    @Id
    @JSonField(name = "GID")
    @Excel(name = "机房ID")
    private String  GID;

    @Field(store = true,index = true)
    @JSonField(name = "NAME")
    @Excel(name = "机房名称")
    private String NAME;

    @Field(store = true,index = true)
    @JSonField(name = "STATION_ID")
    @Excel(name = "所属数据中心ID")
    private String STATION_ID;


    @Field(store = true,index = true)
    @JSonField(name = "STATION_NAME")
    @Excel(name = "所属数据中心")
    private String STATION_NAME;

    @Field(store = true,index = true)
    @JSonField(name = "REGION_ID")
    @Excel(name = "所属区域ID")
    private String REGION_ID;

    @Field(store = true,index = true)
    @JSonField(name = "province")
    @Excel(name = "所属省份")
    private String province;

    @Field(store = true,index = true)
    @JSonField(name = "ALIAS")
    @Excel(name = "别名")
    private String  ALIAS;

    @Field(store = true,index = true)
    @JSonField(name = "IS_SUPPORTIDC")
    @Excel(name = "是否支持IDC")
    private Integer   IS_SUPPORTIDC;

    @Field(store = true,index = true)
    @JSonField(name = "CLASS_ID")
    @Excel(name = "机房等级名称")
    private Long  CLASS_ID;



    @Field(store = true,index = true)
    @JSonField(name = "TYPE_ID")
    @Excel(name = "机房类型id")
    private Long TYPE_ID;

    @Field(store = true,index = true)
    @JSonField(name = "TYPE_DESC")
    @Excel(name = "机房类型名称")
    private Long TYPE_DESC;

    @Field(store = true,index = true)
    @JSonField(name = "CREATE_DATE")
    @Excel(name = "创建时间")
    private String CREATE_DATE;

    @Field(store = true,index = true)
    @JSonField(name = "ADDRESS")
    @Excel(name = "地址")
    private String ADDRESS;

    @Field(store = true,index = true)
    @JSonField(name = "RES_TYPE_ID")
    @Excel(name = "资源规格")
    private  Long RES_TYPE_ID;


    @Field(store = true,index = true)
    @JSonField(name = "CLASS_DESC")
    @Excel(name = "级别")
    private String CLASS_DESC;

    @Field(store = true,index = true)
    @JSonField(name = "MNT_TYPE_ID")
    @Excel(name = "维护方式")
    private Integer MNT_TYPE_ID;


    @Field(store = true,index = true)
    @JSonField(name = "link_MAN")
    @Excel(name = "机房负责人")
    private String link_MAN;

    @Field(store = true,index = true)
    @JSonField(name = "MAINTENANCE_UNIT")
    @Excel(name = "维护单位")
   private String MAINTENANCE_UNIT;


    public String getGID() {
        return GID;
    }

    public void setGID(String GID) {
        this.GID = GID;
    }

    public String getNAME() {
        return NAME;
    }

    public void setNAME(String NAME) {
        this.NAME = NAME;
    }

    public String getSTATION_ID() {
        return STATION_ID;
    }

    public void setSTATION_ID(String STATION_ID) {
        this.STATION_ID = STATION_ID;
    }

    public String getSTATION_NAME() {
        return STATION_NAME;
    }

    public void setSTATION_NAME(String STATION_NAME) {
        this.STATION_NAME = STATION_NAME;
    }

    public String getREGION_ID() {
        return REGION_ID;
    }

    public void setREGION_ID(String REGION_ID) {
        this.REGION_ID = REGION_ID;
    }

    public String getProvince() {
        return province;
    }

    public void setProvince(String province) {
        this.province = province;
    }

    public String getALIAS() {
        return ALIAS;
    }

    public void setALIAS(String ALIAS) {
        this.ALIAS = ALIAS;
    }

    public Integer getIS_SUPPORTIDC() {
        return IS_SUPPORTIDC;
    }

    public void setIS_SUPPORTIDC(Integer IS_SUPPORTIDC) {
        this.IS_SUPPORTIDC = IS_SUPPORTIDC;
    }

    public Long getCLASS_ID() {
        return CLASS_ID;
    }

    public void setCLASS_ID(Long CLASS_ID) {
        this.CLASS_ID = CLASS_ID;
    }

    public Long getTYPE_ID() {
        return TYPE_ID;
    }

    public void setTYPE_ID(Long TYPE_ID) {
        this.TYPE_ID = TYPE_ID;
    }

    public Long getTYPE_DESC() {
        return TYPE_DESC;
    }

    public void setTYPE_DESC(Long TYPE_DESC) {
        this.TYPE_DESC = TYPE_DESC;
    }

    public String getCREATE_DATE() {
        return CREATE_DATE;
    }

    public void setCREATE_DATE(String CREATE_DATE) {
        this.CREATE_DATE = CREATE_DATE;
    }

    public String getADDRESS() {
        return ADDRESS;
    }

    public void setADDRESS(String ADDRESS) {
        this.ADDRESS = ADDRESS;
    }

    public Long getRES_TYPE_ID() {
        return RES_TYPE_ID;
    }

    public void setRES_TYPE_ID(Long RES_TYPE_ID) {
        this.RES_TYPE_ID = RES_TYPE_ID;
    }

    public String getCLASS_DESC() {
        return CLASS_DESC;
    }

    public void setCLASS_DESC(String CLASS_DESC) {
        this.CLASS_DESC = CLASS_DESC;
    }

    public Integer getMNT_TYPE_ID() {
        return MNT_TYPE_ID;
    }

    public void setMNT_TYPE_ID(Integer MNT_TYPE_ID) {
        this.MNT_TYPE_ID = MNT_TYPE_ID;
    }

    public String getlink_MAN() {
        return link_MAN;
    }

    public void setlink_MAN(String link_MAN) {
        this.link_MAN = link_MAN;
    }

    public String getMAINTENANCE_UNIT() {
        return MAINTENANCE_UNIT;
    }

    public void setMAINTENANCE_UNIT(String MAINTENANCE_UNIT) {
        this.MAINTENANCE_UNIT = MAINTENANCE_UNIT;
    }
}

repository
ElasticsearchRepository中有几本方法可以直接调用,同时可根据需求自定义设置方法。

import com.chinaunicom.unicmdb.promsync.entity.SpaceRoom;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;

import java.util.List;

public interface SpaceRoomRepository extends ElasticsearchRepository {
   //根据地址和别名模糊搜索
    List queryAllByADDRESSAndALIASIsLike(String address, String alias);
}

ElasticsearchRestTemplate

在2.6.2版本中ElasticsearchTemplate已不建议使用,可使用 ElasticsearchRestTemplate进行高级搜索。

@Autowired
    private ElasticsearchRestTemplate restTemplate;


   @GetMapping("es/search/room/{address}")
    public AjaxResult SearchRoomToEs(@PathVariable("address")String address,@RequestParam("name")String name,
                                     @RequestParam("pageSize")Integer pageSize,@RequestParam("pageNum")Integer pageNum
    ) throws IOException {
       
        BoolQueryBuilder builder = QueryBuilders.boolQuery();
        //设置PhraseQuery,短语搜索
        QueryBuilder builder1 = QueryBuilders.matchPhraseQuery("NAME", name).slop(6);
        //设置PhraseQuery,联合索引
        QueryBuilder builder2 = QueryBuilders.matchQuery("province", address);
        builder.must(builder1).must(builder2);
        //分页
        Pageable page = PageRequest.of(pageNum, pageSize);
        HighlightBuilder highlightBuilder = new HighlightBuilder();
        //高亮显示
        highlightBuilder.field("NAME").preTags("").postTags("");

        Query query = new NativeSearchQueryBuilder().withQuery(builder).withPageable(page)
                .withSort(SortBuilders.fieldSort("STATION_ID").order(SortOrder.DESC))
                .withHighlightBuilder(highlightBuilder).build();

        SearchHits data = restTemplate.search(query, SpaceRoom.class);
        return AjaxResult.success(data);
    }
mysql同步数据到ElasticSearch 通过logstash

logstash可从mysql中查询到数据同步到ElasticSearch,设置定时任务进行定时同步,记录每一次结果进行记录,同步时进行更新或新增,设置update_time字段,每次查询更新数据。

需要上传mysql的jar包

logstash 安装启动可自行百度

logstash-db-sync.conf 配置

input {
    jdbc {
         type => portal_user
        # 设置 MySql/MariaDB 数据库url以及数据库名称
        jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/portal-cloud?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=UTC"
        # 用户名和密码
        jdbc_user => "****"
        jdbc_password => "*****"
        # 数据库驱动所在位置,可以是绝对路径或者相对路径
        jdbc_driver_library => "/usr/local/logstash6.6.0/sync/mysql-connector-java-8.0.22.jar"
        # 驱动类名
        jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
        # 开启分页
        jdbc_paging_enabled => "true"
        # 分页每页数量,可以自定义
        jdbc_page_size => "1000"
        # 执行的sql文件路径
        statement_filepath => "/usr/local/logstash6.6.0/sync/sys_user.sql"
        # 设置定时任务间隔  含义:分、时、天、月、年,全部为*默认含义为每分钟跑一次任务
        schedule => "* * * * *"
        # 索引类型
        type => "_doc"
        # 是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path 的文件
        use_column_value => true
        # 记录上一次追踪的结果值
        last_run_metadata_path => "/usr/local/logstash6.6.0/sync/track_time"
        # 如果 use_column_value 为true, 配置本参数,追踪的 column 名,可以是自增id或者时间
        tracking_column => "update_time"
        # tracking_column 对应字段的类型
        tracking_column_type => "timestamp"
        # 是否清除 last_run_metadata_path 的记录,true则每次都从头开始查询所有的数据库记录
        clean_run => false
        # 数据库字段名称大写转小写
        lowercase_column_names => false
    }
}
output {
   if[type]=="portal_user"{
    elasticsearch {
        # es地址
        hosts => ["127.0.0.1:9208"]
        # 同步的索引名
        index => "sys_user"
        # 设置_docID和数据相同
        document_id => "%{userId}"
        # document_id => "%{itemId}"

        template_name =>"myik"
       template =>"/usr/local/logstash6.6.0/sync/logstash-ik.json"
      template_overwrite =>true
     manage_template =>false
    }
    # 日志输出
    stdout {
        codec => json_lines
    }
   }
}

sys_user.sql
查询数据并转换为es字段

SELECT DISTINCT user_id AS userId, user_name AS userName,nick_name AS nickName,email,avatar,phonenumber,`password`,sex,'status',del_flag,login_ip,login_date,create_by,create_time,remark,password_code,secret,update_time FROM sys_user WHERe update_time >= :sql_last_value

数据删除
以上只进行新增和更新,无法判断删除字段,可设置逻辑删除字段,定期进行删除 ElasticSearchRepository

可通过ElasticSearchRepository将查询到的数据进行保存,但是该方法占用内存较大,且耗时,不推荐

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/780299.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号