栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ实现数据库与ElasticSearch的数据同步

RabbitMQ实现数据库与ElasticSearch的数据同步

生产者消费者导入MQ的依赖
 
     org.springframework.boot
     spring-boot-starter-amqp
 

添加配置
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: 123456
virtual-host: myHost

RabbitMQ的配置信息
  1. 两个消息队列 分别是数据增改 和 删除的队列
  2. 创建主题(topic)类型的交换机,并绑定刚刚创建的两个消息队列,并分别设置相应的key,消费者可以通过不同的key来判断该消费那一条消息队列的数据。
生产者的配置

所谓的生产者就是我们数据库的服务方,当我们对数据库的数据进行增删改的时候,我们应该像消息队列发送消息来通知ES我们进行了增删改操作,以便ES进行数据的同步。

@Configuration
public class RabbitMQConfig {

    public static final String QUEUE_COURSE_SAVE = "queue.course.save";
    public static final String QUEUE_COURSE_REMOVE = "queue.course.remove";
    public static final String KEY_COURSE_SAVE = "key.course.save";
    public static final String KEY_COURSE_REMOVE = "key.course.remove";
    public static final String COURSE_EXCHANGE = "edu.course.exchange";

    @Bean
    public Queue queueCourseSave() {
        return new Queue(QUEUE_COURSE_SAVE);
    }

    @Bean
    public Queue queueCourseRemove() {
        return new Queue(QUEUE_COURSE_REMOVE);
    }

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(COURSE_EXCHANGE);
    }

    @Bean
    public Binding bindCourseSave() {
        return BindingBuilder.bind(queueCourseSave()).to(topicExchange()).with(KEY_COURSE_SAVE);
    }

    @Bean
    public Binding bindCourseRemove() {
        return BindingBuilder.bind(queueCourseRemove()).to(topicExchange()).with(KEY_COURSE_REMOVE);
    }
}
生产者控制层

生产者发送消息的主要方法

@Autowired
RabbitTemplate rabbitTemplate;
rabbitTemplate.convertAndSend(交换机的名称,消息的key,消息内容);
@Slf4j
@RestController
public class CourseController {



    @Autowired
    private ICourseService courseService;

    @Autowired
    RabbitTemplate rabbitTemplate;


    @PostMapping("/course-upload")
    public ResponseEntity upload(MultipartFile file) throws IOException {
        //创建文件输入流
        InputStream inputStream = file.getInputStream();
        //获得文件名
        String filename = file.getOriginalFilename();
        //调用文件上传方法
        OSSUtil.upload(inputStream,filename);
        //回调上传的文件
        String url = OSSUtil.getURL(filename);
        //返回前端
        return ResponseEntity.ok(url);
    }

    @GetMapping("/courses")
    public ResponseEntity> findAllPage(@RequestParam("current") Integer current, @RequestParam("PAGE_SIZE") Integer PAGE_SIZE){
        Page page = new Page<>(current,PAGE_SIZE);
        return ResponseEntity.ok(courseService.page(page));
    }

    @GetMapping("/course")
    public ResponseEntity findOne(@RequestParam("id") Integer id){
        return ResponseEntity.ok(courseService.findOne(id));
    }

    @PostMapping("/course")
    public ResponseEntity add(@RequestBody Course course){
        courseService.save(course);
        rabbitTemplate.convertAndSend(RabbitMQConfig.COURSE_EXCHANGE,RabbitMQConfig.KEY_COURSE_SAVE, JSON.toJSONString(course));
        return ResponseEntity.ok("ok");
    }

    @PutMapping("/course")
    public ResponseEntity modify(@RequestBody Course course){
        courseService.updateById(course);
        rabbitTemplate.convertAndSend(RabbitMQConfig.COURSE_EXCHANGE,RabbitMQConfig.KEY_COURSE_SAVE,JSON.toJSONString(course));
        return ResponseEntity.ok("ok");
    }

    @DeleteMapping("/course/{id}")
    public ResponseEntity deleteProductHandovers(@PathVariable("id") Integer id){
        courseService.removeById(id);
        rabbitTemplate.convertAndSend(RabbitMQConfig.COURSE_EXCHANGE,RabbitMQConfig.KEY_COURSE_REMOVE,id);
        return ResponseEntity.ok("ok");
    }
}
消费者对消息队列进行监听

所谓的消费者就是ES服务的操作方,通过实时的对消息队列的监听,通过消息队列对应的key值来进行选择服务的调用,不同的key调用不同的服务,获取服务方传输的数据,然后进行数据的同步。

@Slf4j
@Component
public class CourseMQListener {

    public static final String QUEUE_COURSE_SAVE = "queue.course.save";
    public static final String QUEUE_COURSE_REMOVE = "queue.course.remove";
    public static final String KEY_COURSE_SAVE = "key.course.save";
    public static final String KEY_COURSE_REMOVE = "key.course.remove";
    public static final String COURSE_EXCHANGE = "course.exchange";
    
    @Autowired
    ICourseService courseService;

    
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue(value = QUEUE_COURSE_SAVE, durable = "true"),
                    exchange = @Exchange(value = COURSE_EXCHANGE,
                            type = ExchangeTypes.TOPIC,
                            ignoreDeclarationExceptions = "true")
                    , key = KEY_COURSE_SAVE)})
    public void receiveCourseSaveMessage(String message) {
        try {
            log.info("课程添加:{}",message);
            Course course = JSON.parseObject(message,Course.class);
            //将消息转为课程,保存到es中
            courseService.saveOrUpdate(course);
            log.info("添加完成:{}",course);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue(value = QUEUE_COURSE_REMOVE, durable = "true"),
                    exchange = @Exchange(value = COURSE_EXCHANGE,
                            type = ExchangeTypes.TOPIC,
                            ignoreDeclarationExceptions = "true")
                    , key = KEY_COURSE_REMOVE)})
    public void receiveCourseDeleteMessage(Long id) {
        try {
            courseService.removeById(id);
            log.info("课程删除完成:{}",id);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/688098.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号