栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

SpringCloud商城day06 数据同步解决方案-2021-10-11

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

SpringCloud商城day06 数据同步解决方案-2021-10-11

一.基于Canal, 开发数据监控微服务

1. Canal: 阿里开源, java开发, 监控数据库 -> 双机房数据同步

   (1) 工作原理: canal -> 伪装成mysql备份机 -> mysql数据改变 -> 数据写入 binary log -> 日志发送到Canal -> canal提取mysql更新数据

   (2) Canal 环境部署 - LINUX虚拟机:

1) mysql开启binlog模式, 值ON已开启, OFF未开启
    SHOW VARIABLES LIKE '%log_bin%'
    
2) 修改 /etc/my.cnf 开启binlog模式
    [mysql]
    log-bin=mysql-bin
    binlog-format=ROW
    server_id=1

3) 重启mysql: mysql -h localhost -u root -p

4) 使用root账号 -> 创建用户 -> 授予权限
    create user canal@'%' IDENTIFIED by 'canal';
    GRANT SELECT, REPLICATION SLAVE, REPLICATION, SUPER ON *.* TO 'canal'@'%';
    FLUSH PRIVILEGES;

5) canal服务端安装:
    a. 下载地址: https://github.com/alibaba/canal/releases/tag/canal-1.0.24
    b. 上传到linux系统, 解压到目录 /usr/local/canal
    c. 修改exmaple下的实例配置: 
       #position info
       canal.instance.master.address=192.168.200.128:3306
       #username/password
       canal.instance.dbusername=canal
       canal.instance.dbpassword=canal
    
       #注释掉这个参数, 开启扫描全库
       #canal.instance.defaultDatabaseName=

6) 启动服务: [root@localhost canal]# ./bin/startup.sh

7) 查看日志: cat /usr/local/canal/logs/canal/canal.log

   (3) 整合Canal+springboot源码包 -> 添加到maven仓库

1) 下载canal整合包spring-boot-start-canal:
    https://github.com/chengqian56131/spring-starter-canal

2) 解压进入starter-canal: cmd

3) cd spring-boot-starter-canal-master/starter-canal

4) mvn指令: mvn clean install -DskipTest

5) 进入本地仓库 -> 搜索xpand -> 已安装在目录 repository/com

2. 数据监控微服务 changgou_canal:

(1) 添加依赖: start-canal / spring-rabbit

(2) 启动类CanalApplication: 
    @EnableCanalClient   //声明是canal的客户端
    @SpringBootApplication 

(3) 配置application.properties:  
    canal.client.instances.example.host=192.168.200.128  //连接主机
    canal.client.instances.example.port=11111
    canal.client.instances.example.batchSize=1000  //批大小,每次数据库交互处理1000条数据
    spring.rabbitmq.host=192.168.200.128  //连接rabbitmq

(4) 测试监听类 -> com.changgou.canal.listener.BusinessListener

    @CanalEventListener  //声明当前类是canal监听类
    public class BusinessListener{

    @ListenPoint(schema="changgou_business", table="tb_ad")//监听方法注解, 监听的库名和表名
    public void addUpdate(CanalEntry.EventType e, CanalEntry.RowData r)  //监听方法
        //eventType: 当前操作数据库的类型
        //rowdata:  当前操作数据库的数据
        
        rowData.getBeforeColumnslist();  //改变前的数据
        rowData.getAfterColumnsList();   //改变后的数据

二. 基于Canal, 更新首页广告缓存

1. 需求: 广告表tb_ad数据改变 -> canal获取binlog数据 -> changgou_canal微服务获取字段"position" -> 发送消息到rabbitmq -> 运营微服务获取消息"position" -> 调用nginx中的ad_update.lua脚本 -> 更新缓存

2. 第一部分代码: tb_ad数据改变 -> canal监控 -> 发送到mq 

(1) canal微服务->添加 config包 ->创建 com.changgou.canal.config.RabbitMQConfig.class
    定义队列名psfs AD_UPDATE_QUEUE="ad_update_queue"-> 声明队列@bean(AD_UPDATE_QUEUE)

(2) 监听类BusinessListener.java:
    //注入rabbit
    @Autowired
    private RabbitTemplate rabbitTemplate;

    //遍历改变后的数据, 查询字段"position"
    for(canalEntry.Column : rowData.getAfterColumsList()){
        if("position".equals(column.getName())){
            
            //发送消息
            rabbitTemplate.convertAndSend("", RabbitMQConfig.AD_UPDATE_QUEUE, colunm.getValue());
        }
    }

3. 第二部分:  changgou_service_business提取mq消息 -> 远程调用nginx -> 执行ad_update.lua -> 存储并更新到redis

(1) 在changgou_service_business微服务添加依赖: spring-boot-starter-amop / okhttp

(2) 修改yml文件: rabbitmq.host: 192.168.200.128

(3) 监听类 business/listener/AdListener.java
    @Component  //声明当前类是bean
    public class AdListener{
       
        @RabbitListener(queues="ad_update_queue")  //指定要监听的队列
        public void receiveMessage(String message){
            
            //发起远程调用
            OkHttpClient okHttpClient = new OkHttpClient();
            //请求路径, 虚拟机中nginx的ad_update.lua文件
            String url = "http://192.168.200.128/add_update?position="+message; 
            //OKHttp3 构建请求
            Request request = new Request().Builder().url(url).build();   
            //设置新的访问
            Call call = okHttpClient.newCall(request); 
            //发起请求
            call.enqueue(new Callback(){
                //重写回调方法
                @Override
                public void onFailure(Call call, IOException e){
                    //请求失败
                    e.printStackTrace();
                }
                @Override
                public void onResponse(Call call, Response response) throws IOException{
                    //请求成功
                   System.out.println("请求成功"+response.message());
        }
    }
       

三. 商品上架索引 -> 导入数据 -> 全文搜索引擎库ElasticSearch

1. 需求: 商品上架 -> skuList导入索引库

2. 步骤: canal微服务监控tb_spu表更改isMarketable -> 发送spuId到rabbitmq ->  RabbitMQ创建fanout交换机 -> search微服务接收消息spuId -> feign调用goods微服务 -> 获取商品skuList -> 调用ES的restAPI -> 导入skulist到索引库

3. 实现第一部分 -> 发送上架消息:

(1)添加RabbitMQConfig: 
    //定义交换机
    public static final String GOODS_UP_EXCHANGE="goods_up_exchange";
    //定义队列
    psfs SEARCH_ADD_QUEUE="search_add_queue";
    //声明队列
    @Bean(SEARCH_ADD_QUEUE)
    public Queue SEARCH_ADD_QUEUE(){
        return new Queue (SEARCH_ADD_QUEUE); 
    }
    //声明交换机
    @Bean(GOODS_UP_EXCHANGE)
    public Exchange GOODS_UP_EXCHANGE(){
        return ExchangeBuilder.fanoutExchange(GOODS_UP_EXCHANGE).durable(true).build();
    }
    //交换机和队列的绑定
    @Bean
    public Binding GOOGDS_UP_EXCHANGE_BINDING(@Qualifier(SEARCH_ADD_QUEUE)Queue queue, @Qualifier(GOODS_UP_EXCHANGE)Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("").noargs();
    }    

(2) changgou_canal中添加监听类-> listener/SpuListener.java
    @CanalEventListener
    Public class SpuListener{
        @ListenPoint(schema="changgou_goods", table="tb_spu")
        public void goodsUp(CanalEntry.EventType eventType, CanalEntry.RowData rowData){
            //获取改变前的数据 -> 转换为map
            rowData.getBeforeColumnsList().forEach(c)->new HashMap().put(c.getName(), c.getValue());
            //获取改变后的数据 -> 转换为map
            rowData.getBeforeColumnsList().forEach(c)->new HashMap().put(c.getName(), c.getValue());

            //获取最新上架的商品 状态由 oldData"0" -> newData"1"
               if("0".equals(oldData.get("is_marketable"))&&"1".equals(newData.get("is_marketable"))){
            //商品spuId发送到mq
            redisTemplate.convertAndSend(RabbitMQConfig.GOODS_UP_EXCAHNGE, "", newData.get("id");
            }

        }
    }

 4. 实现第二部分 -> 更新商品分词索引库

(1) ES访问方式:
    1) 端口: 192:168:200:128:9200
    2) Chrome插件 -> 更多工具 -> 扩展程序 -> 发送ES-head文件

(2) 创建 service_search_api
    1) 依赖: changgou_common / spring-boot-starter-data
    2) com.changgou.search.feign:
        a. 
    3) com.changgou.search.pojo
        a. SkuInfo.java -> 索引库的映射实体类 -> java代码操作实体类 -> 更新索引库数据
        b. @document(indexName="skuinfo", type="docs")  //定义index和type

(3) 创建搜索微服务-> changgou_service_search
    1) 依赖: common / eureka-client / elasticsearch / goods-api / search-api / amqp
    2) yml配置: #省略了格式, 节省空间
        server.port: 9009
        spring.application.name: search
        rabbitmq.host: 192.168.200.128
        redis.host: 192.168.200.128
        main: allow-bean-definition-overriding: true  #允许重名覆盖
        data.elasticsearch:
            cluster-name: elasticsearch
            cluster-nodes: 192.168.200.128:9300  #集群节点端口
        eureka:
            client.service-url.defaultZone: http://127.0.0.1.6868/eureka
            instance: prefer-ip-address: true   #将IP注册到Eureka Server上
        feign:
            hystrix.enabled: true
            client.config.default:
                connectTimeout: 600000  #消费者超时连接生产者
                readTimeout: 600000   #调用生产者连接超时
        hystrix.command.default.execution:
            timeout.enabled: false  #超时交给ribbon控制
            isolation.strategy: SEMAPHORE
    
    3) 启动类SearchApplication: @SpringBootApplication @EnableEurekaClient @EnableFeignClients(basePackages="com.changgou.goods.feign")
    4) 创建包com.changgou.search.config.RabbitMQConfig.java

        5)  商品服务提供接口 -> 给search微服务调用 -> 通过spuId查询SkuList

1) com.changgou.goods.controller.SkuController.java
    //定义远程接口
    @GetMapping("/spu/{skuID}") 
    public List findSkuListBySpuId(@PathVariable("spuId")String spuId){
        
        //封装条件的map
        Map searchMap = new HashMap<>();
        //规定传递的spuId不是特定的意义"all", 则携带spuId值查询
        if(!"all".equals(spuId){
            searchMap.put("spuId", spuId);
        }
        //传递的spuId值为"all"时, 只携带一个参数status="1", 返回全部审核通过的skuList
        searchMap.put("status", "1");
        //返回集合
        return skuService.findList(searchMap);
    }

2) 定义feign接口: com.changgou.goods.feign.SkuFeign.java
    a. 添加依赖: changgou_common
    @FeignClient(name="goods")
    public interface SkuFeign{
        GetMapping("/sku/spu/{spuId}")
        List findSkuListBySpuId(@PathVariable("spuId")String spuId);
    }

        6)  在search微服务中更新ES索引数据:

1) com.changgou.search.dao.ESManagerMapper.java
    //参数1: 当前操作的实体类;  参数2: 当前主键数据类型
    public interface ESManagerMapper Extends ElasticsearchRepository{}

2) search.service.impl.ESManagerServiceImpl.java
    @Service
    public class ESManagerServiceImpl implements ESManagerService{
        
        //创建索引库结构
        @Override
        public void createMappingAndIndex(){
            //创建索引
            elasticsearchTemplate.createTndex(SkuInfo.class); 
            //创建映射
            elasticsearchTemplate.putMapping(SkuInfo.class);
        }
        
        //导入全部sku集合到索引库
        @Override
        public void importAll(){
            //查询skuList
            List skuList = skuFeign.findSkuListBySpuId("all");
            
            if(skuList==null||skuList.size()<=0){
                throw new RuntimeException("当前数据未查询到,无法导入索引库");
            }

            //skuList转换为json [{key:value}, {}, ...]
            String JSonSkuList = JSON.toJSonString(skuList);
            //将JSON串转换为SkuInfo
            List skuInfoList = JSON.parseArray(jasonSkuLlist, SkuInfo.class);

            //转换规格信息 -> map
           for(SkuInfo skuInfo : skuInfoList){
               skuInfo.setSpecMap(JSON.parseObject(skuInfo.getSpec(), Map.class));
           }
            //导入索引库
            esManagerMapper.saveAll(skuInfoList);
        }

        //根据spu查询skuList, 添加索引库 
        @Override
        public void importDataBySpuId(String spuId){
            -> 代码同上...
        }
    }
    
3) ESManagerController.java
    @RestController
    @RequestMapping("/manager")
    public class ESMnagerController{
        @Autrowired
        private ESManagerService esManagerService;

        @GetMapping("/create")
        pubic Result reate(){
            //创建索引映射
            esManagerService.createMappingAndIndex();
            return new Result(true, StatusCode.OK, "创建索引结构成功");
        }
        //导入全部数据
        @GetMapping("/importAll")
        public Result importAll(){
            esManagerService.importALL();
            return new Result(true, StatusCode.OK, "创建索引结构成功");
        }
    }

        7) search 微服务中添加消息监听类 -> 获取上架的spuId

1) serach微服务: com.changgou.search.listener.GoodsUpListener.java
   //监听mq 获取spuId
   @Component
   public class GoodsUpListener{
       @Autowired
       private ESManagerService esManagerService;
       
       @RabbitListener(queues=RabbitMQConfig.SEARCH_ADD_QUEUE)
        public void receiveMessage(String spuId){
            //接收到的消息spuId
            //查询skuList, 并导入suoyinku
            esManagerService.importDataBySpuId(spuId);
        }
   }

四. 商品下架索引 -> 删除数据 -> Canal+RabbitMQ+ES

1.需求: 商品下架 -> is_market="0" -> 索引库移除

2. canal微服务监控tb_spu -> 商品下架后 -> 发送spuId到rabbitMQ -> 新建商品下架exchange+索引删除queque -> 发送spuId到search微服务 -> 删除ES数据

3. 实现:

  1) 修改数据监控服务

(1) 修改canal微服务: com.changgou.canal.config.RabbitMQConfig.java
    //声明交换机
    psfs GOODS_DOWN_EXCHANGE="goods_down_exchange";
    //声明交换机
    @Bean(GOODS_DOWN_EXCHANGE)
    public Exchange exchange(){ //方法见名知意
        return ExchangeBuilder.fanoutExchange (GOODS_DOWN_EXCHANGE).durable(true).build(); //持久化
    }
    
    //声明队列
    public static final String SEARCH_DELETE_QUEUE="search_delete_queue";
    //定义队列
    @Bean(SEARCH_DELETE_QUEUE)
    public Queue queue(){ //方法见名知意
        return new Queue(SEARCH_DELETE_QUEUE);
    }
    //绑定交换机
    @Bean
    public Binding GDEBinding(@Qualifier(GOODS_DOWN_EXCHANGE)Exchange exchange, @Qualifier(SEARCH_DELETE_QUEUE)Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("").noargs();
    }



(2) 修改监听类: com.changgou.canal.listener.SpuListener.java
    //获取下架商品, is_marketable 原转态"1" -> 新状态"1"
    if("1".equals(oldData.get("is_marketable"))&&"0".equals.(newData.get("is_marketable"))){      
     rabbitTemplate.convertAndSend(RabbitMQConfig.GOODS_DOWN_EXCHANGE,"", newData.get("id"));

  2) 在search微服务中监听消息, 删除索引库

(1) search微服务业务层添加删除方法: com.changgou.search.service.ESManagerServiceImpl.java
    //根据spuId删除索引库数据
    @Override
    public void delDataBySpuId(String spuId){
        List skuList = skuFeign.findSkuListBySpuId(spuId);
        //判断无数据, throw new RuntimeException();
        //遍历skuList
        for(Sku sku:skuList){
            //删除
            esManagerService.deleteById(Long.parse(sku.getId()));
        }
    }

(2) search微服务添加监听类: com.changgou.search.listener.GoodsDeleteListener.java
    @Component
    public class GoodsDeleteListener{
        @rabbitListener(queues="RrabbitMQConfig.SEARCH_DELETE_QUEUE)
        public void void receiveMmessage(String spuId){
            //接收消息队列spuId, 删除索引
            //调用业务层,删除索引库
            esManagerService.deleteDataBbySpuId(spuId);
        }
    }

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/318064.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号