栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ:@RabbitListener注解简化消息监听

RabbitMQ:@RabbitListener注解简化消息监听

pom.xml:



    4.0.0

    
        org.springframework.boot
        spring-boot-starter-parent
        2.6.2
    

    jar

    com.kaven
    springboot
    0.0.1-SNAPSHOT

    springboot
    springboot

    
        1.8
    

    
        
            org.springframework.boot
            spring-boot-starter-web
        
        
            org.projectlombok
            lombok
        
        
            org.springframework.boot
            spring-boot-starter-amqp
        
        
            com.google.code.gson
            gson
        
    

    
        
            
                org.springframework.boot
                spring-boot-maven-plugin
            
        
    

application.yml:

spring:
  rabbitmq:
    addresses: 192.168.1.9:5672
    username: admin
    password: admin

User类(消息负载的实体类):

package com.kaven.springboot.rabbitmq;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

@Setter
@Getter
@ToString
@AllArgsConstructor
public class User {
    private String username;
    private String password;
    private String code;
}

Json2UserMessageConverter类(消息转换器,将json数据转换成User对象,json数据由消息体的byte[]生成):

package com.kaven.springboot.rabbitmq;

import com.google.gson.Gson;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;

@Component("json2UserMessageConverter")
public class Json2UserMessageConverter implements MessageConverter {

    private static final Gson GSON = new Gson();

    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        return new Message(GSON.toJson(object).getBytes(StandardCharsets.UTF_8));
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        return GSON.fromJson(new String(message.getBody(), StandardCharsets.UTF_8), User.class);
    }
}

Consumer类(消息监听,使用@RabbitListener注解简化消息监听):

package com.kaven.springboot.rabbitmq;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Component
public class Consumer {

    @RabbitListener(
            bindings = {
                    @QueueBinding(
                            value = @Queue("queue.user"),
                            exchange = @Exchange(value = "exchange.user", type = ExchangeTypes.TOPIC),
                            key = {"*.user"}
                    )
            },
            messageConverter = "json2UserMessageConverter"

    )
    public void process(User user) {
        System.out.println("Consumer - process 接收消息: " + user);
    }
}

Producer类(用于发布消息):

package com.kaven.springboot.rabbitmq;

import com.google.gson.Gson;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.UUID;

@Component
public class Producer {

    private static final Gson GSON = new Gson();

    @Resource
    private final RabbitTemplate rabbitTemplate;

    public Producer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendMsg(User user) {
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setExpiration("10000");
        Message message = new Message(GSON.toJson(user).getBytes(StandardCharsets.UTF_8), messageProperties);
        rabbitTemplate.send("exchange.user", "new.user", message, correlationId);
    }
}

ProducerController类(用于发布消息的接口):

package com.kaven.springboot.rabbitmq;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
public class ProducerController {

    @Resource
    private Producer producer;

    @GetMapping("/send")
    public String send(User user) {
        producer.sendMsg(user);
        return "数据发送成功";
    }
}

启动类:

package com.kaven.springboot;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringbootApplication {
    public static void main(String[] args) {
        SpringApplication application = new SpringApplication(SpringbootApplication.class);
        application.run(args);
    }
}

启动应用,使用Postman请求接口。

控制台输出:

Consumer - process 接收消息: User(username=kaven, password=itkaven, code=908899)

效果符合预期,使用@RabbitListener注解简化了消息监听,不需要自己定义交换机、队列以及绑定关系等bean,将这些需要的bean全部交给Spring Boot来管理。博客就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/742068.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号