栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

SpringBoot+kafka+ES实现信息数据同步管理(下)

SpringBoot+kafka+ES实现信息数据同步管理(下)

  上一篇文章主要介绍了项目的整体结构,这篇文章展示具体结构的实现

一、项目版本

SpringBoot  2.1.23    ES:6.7

引入jar


            org.elasticsearch.client
            elasticsearch-rest-client
            6.7.0
        

        
        
            org.elasticsearch.client
            elasticsearch-rest-high-level-client
            6.7.0
        
        
            org.elasticsearch
            elasticsearch
            6.7.0
            
                
                    com.fasterxml.jackson.dataformat
                    jackson-dataformat-smile
                
                
                    com.fasterxml.jackson.dataformat
                    jackson-dataformat-cbor
                
                
                    com.fasterxml.jackson.core
                    jackson-databind
                
            
        

        
        
            org.springframework.data
            spring-data-elasticsearch
            
            
                
                    io.netty
                    netty-codec
                
                
                    com.fasterxml.jackson.core
                    jackson-databind
                
            
        


        
        
            org.springframework.kafka
            spring-kafka
            2.2.0.RELEASE
        

        
            org.springframework.kafka
            spring-kafka-test
            test
            
                
                    com.fasterxml.jackson.core
                    jackson-databind
                
            
        
二、配置kafka

kafka配置yml文件

spring:
# kafka配置
    kafka:
        bootstrap-servers: kafka集群地址+端口

        producer:
            batch-size: 16384
            buffer-memory: 33554432
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
            retries: 0
            value-serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer:
            # 指定默认消费者group id
            group-id: consumer-group
            auto-offset-reset: earliest
            enable-auto-commit: true
            auto-commit-interval: 5000
            # 指定消息key和消息体的编解码方式
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        #开发环境topic
        template:
            default-topic: topic_sys_dev

编写kafka发送消息接口:

public interface ISysSendToKafkaService
{
    void sendMsgToKafka(Sys sys,String type);

}

kafka消息实现类

@Service
public class SysSendToKafkaServiceImpl implements ISysSendToKafkaService {
    private static final Logger log = LoggerFactory.getLogger(SysSendToKafkaServiceImpl.class);

    @Value("${spring.kafka.template.default-topic}")
    private String kafkaTopic ;
    @Autowired
    KafkaTemplate kafkaTemplate;



    @Override
    public void sendMsgToKafka(Sys sys,String type) {
        JSonObject jsonObject = new JSonObject();
        jsonObject.put("type",type);
        jsonObject.put("sys",sys);
        kafkaTemplate.send(kafkaTopic,jsonObject.toJSonString());
    }
}

以上配置类之后,在SpringBoot的中,对TIDB数据处理完成之后,再调用该方法,将数据发送到kafka中去。

//调用存放到kafka
 sendToKafkaService.sendMsgToKafka(sys,"create");

此处,我对前端处理的消息做了标记,页面新增的数据用type=create,修改的数据type=update,删除的数据用type=delete,这样在消费kafka消息的时候明确对ES的数据做什么样的处理

三、kafka队列消息的消费处理

1、在yml中配置ES信息,ES配置的是集群地址

#开发环境es配置地址
qymp:
    elasticsearch:
        hostlist: 192.168.110.110:9200,192.168.110.111:9200,192.168.110.112:9200

2、配置ESconfig

@Configuration
public class ESConfig {
    @Value("${qymp.elasticsearch.hostlist}")
    private String hostlist;

    @Bean // 高版本客户端
    public RestHighLevelClient restHighLevelClient() {
        // 解析 hostlist 配置信息。假如以后有多个,则需要用 , 分开
        String[] split = hostlist.split(",");
        // 创建 HttpHost 数组,其中存放es主机和端口的配置信息
        HttpHost[] httpHostArray = new HttpHost[split.length];
        for (int i = 0; i < split.length; i++) {
            String item = split[i];
            httpHostArray[i] = new HttpHost(item.split(":")[0], Integer.parseInt(item.split(":")[1]), "http");
        }
        // 创建RestHighLevelClient客户端
        return new RestHighLevelClient(RestClient.builder(httpHostArray));
    }

    // 项目主要使用 RestHighLevelClient,对于低级的客户端暂时不用
    @Bean
    public RestClient restClient() {
        // 解析hostlist配置信息
        String[] split = hostlist.split(",");
        // 创建HttpHost数组,其中存放es主机和端口的配置信息
        HttpHost[] httpHostArray = new HttpHost[split.length];
        for (int i = 0; i < split.length; i++) {
            String item = split[i];
            httpHostArray[i] = new HttpHost(item.split(":")[0], Integer.parseInt(item.split(":")[1]), "http");
        }
        return RestClient.builder(httpHostArray).build();
    }
}

3、新建工具类KafkaConsumer,用于监听kafka队列消息,并消费,对ES数据进行同步处理,

@Slf4j
@Component
public class KafkaConsumer {

    @Autowired
    @Qualifier("restHighLevelClient")
    private RestHighLevelClient client;


    @KafkaListener(topics = {"${spring.kafka.template.default-topic}"})
    public void userConsumer(String message) throws IOException {
        JSonObject jsonObject = JSONObject.parseObject(message);
        String jsondata = (jsonObject.get("sys")).toString();
        Sys sys= FastJsonUtils.getJsonToBean(jsondata,Sys.class);
        String type = (jsonObject.get("type")).toString();

        if ("delete".equals(type)){
            //
            deleteData(sys);
        }else {
            delData(sys);
        }
    }

    //处理数据
    public void delData(Sys sysXx) throws IOException {
        //判断当前数据是否存在于ES数据库
        //1. 创建检索请求
        SearchRequest searchRequest0 = new SearchRequest();
        //1.1)指定索引
        searchRequest0.indices("es_sys");
        searchRequest0.types("base_sysxx");
        //1.2)构造检索条件
        SearchSourceBuilder sourceBuilder0 = new SearchSourceBuilder();

        QueryBuilder queryBuilder = QueryBuilders.boolQuery()
                .must(QueryBuilders.termQuery("uuid", sysXx.getQyuuid()))
                .mustNot(QueryBuilders.termQuery("yxbz","1"));

        sourceBuilder0.query(queryBuilder);//精确查询查询
        searchRequest0.source(sourceBuilder0);
        //2. 执行检索
        SearchResponse searchResponse0 = client.search(searchRequest0, RequestOptions.DEFAULT);
        JSonObject jsonObject = FastJsonUtils.toJsonObject(searchResponse0) ;
        JSonObject jsonhit = jsonObject.getJSonObject("hits");
        JSonArray hits= JSON.parseArray(jsonhit.getString("hits").toString());
        String id = null;

        for(int i=0;i0){
            //进行更新操作
            IndexRequest indexRequest = new IndexRequest("es_sys","base_sysxx")
                    .id(id)
                    .source(map);
            indexRequest.opType("index");//可选create或index
            IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
            log.info("----更新ES同步数据----------uuid:"+id);
        }else {
            IndexRequest indexRequest = new IndexRequest("es_sys","base_sysxx")
                    .id(sysQympXx.getQyuuid().toString())
                    .source(map);
            indexRequest.opType("create");//可选create或index
            IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
            log.info("----新增ES同步数据----------uuid:"+sysXx.getQyuuid());
        }
    }


    
    public void deleteData(Sys sysXx) throws IOException {

        //进行实体封装
        Map map = new HashMap();
        map.put("zt",sysXx.getZt());
        map.put("yxj",syXx.getYxj());
        map.put("whbz",sysXx.getWhbz());
        map.put("xxwzd",sysXx.getXxwzd());
        map.put("createBy",sysXx.getCreateBy());
        map.put("createTime",sysXx.getCreateTime());
        map.put("updateBy",sysXx.getUpdateBy());
        map.put("updateTime",sysXx.getUpdateTime());
        map.put("uuid",sysXx.getuuid());

        map.put("yxbz","1");//删除的标志是1

        //进行更新操作
        IndexRequest indexRequest = new IndexRequest("es_sys","base_sysxx")
                .id(sysXx.getQyuuid().toString())
                .source(map);
        indexRequest.opType("index");//可选create或index
        IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
        log.info("----删除ES同步数据---(逻辑删除)-------IndexResponse:"+indexResponse);

    }



}
四、SpringBoot控制层调用ES查询方法举例
@GetMapping("/getXl")
    public Object getXl(String mc) throws IOException {
        List resultlist = new ArrayList<>();
        //1. 创建检索请求
        SearchRequest searchRequest = new SearchRequest();
        //1.1)指定索引
        searchRequest.indices("es_sys");
        searchRequest.types("base_sysxx");
        //1.2)构造检索条件
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        QueryBuilder queryBuilder = QueryBuilders.boolQuery()
                //.must(QueryBuilders.queryStringQuery(mc))
                .must(QueryBuilders.matchPhraseQuery("mc",mc))
                .mustNot(QueryBuilders.termQuery("yxbz", "1"));//模糊查询
        // sourceBuilder.query(queryBuilder);//左右模糊
        sourceBuilder.query(queryBuilder);

        // 设置源字段过虑,第一个参数结果集包括哪些字段,第二个参数表示结果集不包括哪些字段
        sourceBuilder.fetchSource(new String[]{"mc", "uuid", "yxj"}, new String[]{});
        sourceBuilder.size(10);
        searchRequest.source(sourceBuilder);
        //2. 执行检索
        SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
        JSonObject jsonObject = FastJsonUtils.toJsonObject(searchResponse);
        JSonObject jsonhit = jsonObject.getJSonObject("hits");
        JSonArray hitsArray = JSON.parseArray(jsonhit.getString("hits").toString());
        for (int i = 0; i < hitsArray.size(); i++) {
            McIdVo vo = new McIdVo();
            JSonObject hi = (JSONObject) hitsArray.get(i);
            String key = (hi.get("id")).toString();
            vo.setMcId(key);
            Map map = (Map) hi.get("sourceAsMap");

            if (map.get("uuid")!=null){
                //获取UUId
                String uuidStr = (map.get("uuid")).toString();
                vo.setuuid(uuidStr);
            }
            resultlist.add(vo);
        }
        return AjaxResult.success(resultlist);
    }

最后问题就到这:

总结一下:在此次项目中主要问题点在于SpringBoot与ES版本的配置

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/734812.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号