栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Rabbitmq

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Rabbitmq

Rabbitmq rabbitmq安装

​ docker 安装 rabbitmq

  • 下载镜像

docker pull rabbitmq:3.7.7-management

  • 启动镜像(用户名和密码设置为 guest guest)
docker run -dit --name rabbitmq3.8.0 -e RABBITMQ_DEFAULT_USER=guest -e RABBITMQ_DEFAULT_PASS=guest -p 15672:15672 -p 5672:5672 rabbitmq:3.8.0-management
  • 访问 rabbitmq 管理界面

    http://127.0.0.1:15672

    guest/guest

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-KvmoLAsy-1652499874763)(…up-9de29c6b93228b96111a601d377264d053e.webp)]

docker 安装 rabbitMQ 延时队列插件(delayed_message_exchange)
    1. 查找 Docker 容器中的 RabbitMQ 镜像

    docker ps -a

[root@linux ~]# docker ps -a
CONTAINER ID        IMAGE                    COMMAND                  CREATED             STATUS              PORTS                                                                                        NAMES
8efd6f3add3c        chenchuxin/dubbo-admin   "catalina.sh run"        6 weeks ago         Up 5 weeks          0.0.0.0:9090->8080/tcp                                                                       dubbo-admin
6939b83d0942        zookeeper                "/docker-entrypoint.…"   6 weeks ago         Up 5 weeks                                                                                                       zookeeper01
2aec2548a9f8        525bd2016729             "docker-entrypoint.s…"   6 weeks ago         Up 5 weeks          0.0.0.0:27017->27017/tcp                                                                     docker_mongodb
    1. 上传 /opt/rabbitmq_delayed_message_exchange-3.8.0.ez 插件到 Linux 文件夹中
    1. 拷贝插件文件到 rabbitMQ 的 Docker 容器中
[root@linux ~]# docker cp opt/rabbitmq_delayed_message_exchange-3.8.0.ez rabbitmq3.8.0:/plugins
    1. 进入 rabbitMQ 的 Docker 容器中 docker exec -it rabbitmq3.8.0 bash
[root@linux ~]# docker exec -it rabbitmq3.8.0 bash
root@myRabbit:/# 
    1. 查看插件列表
rabbitmq-plugins list
    1. 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    1. 创建交换器


rabbitmq基本原理

RabbitMQ:我们通常谈到消息队列,就会联想到这其中的三者:生产者、消费者和消息队列,生产者将消息发送到消息队列,消费者从消息队列中获取消息进行处理。对于RabbitMQ,它在此基础上做了一层抽象,引入了交换器exchange的概念,交换器是作用于生产者和消息队列之间的中间桥梁,它起了一种消息路由的作用,也就是说生产者并不和消息队列直接关联,而是先发送给交换器,再由交换器路由到对应的队列,至于它是根据何种规则路由到消息队列的,就是我们下面需要介绍的内容了。这里的生产者并没有直接将消息发送给消息队列,而是通过建立与Exchange(交换器)的Channel(信道),将消息发送给Exchange,Exchange根据路由规则,将消息转发给指定的消息队列。消息队列储存消息,等待消费者取出消息,消费者通过建立与消息队列相连的Channel,从消息队列中获取消息。

AMQP:Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。

rabbitmq代码实现 rabbitmq客户端

1、交换器构造器的创建

public class DelayExchangeBuilder {
    
    public final static  String DEFAULT_DELAY_EXCHANGE = "secp.delayed.exchange";
    
    public final static  String DELAY_EXCHANGE = "secp.direct.exchange";

    
    public static CustomExchange buildExchange() {
        Map args = new HashMap();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DEFAULT_DELAY_EXCHANGE, "x-delayed-message", true, false, args);
    }

    
    public static CustomExchange buildDefaultExchange() {
        return new CustomExchange(DELAY_EXCHANGE, ExchangeTypes.DIRECT, true, false);
    }
}

2、发送默认消息并且创建队列绑定队列

@Slf4j
@Configuration
public class RabbitMqClient {

    private static final Logger logger = LoggerFactory.getLogger(RabbitMqClient.class);

    private final RabbitAdmin rabbitAdmin;

    private final RabbitTemplate rabbitTemplate;


    @Resource
    private SimpleMessageListenerContainer messageListenerContainer;

    @Resource
    BusProperties busProperties;
    @Resource
    private ApplicationEventPublisher publisher;


    @Resource
    private ApplicationContext applicationContext;


    @Bean
    public void initQueue() {
        Map beansWithRqbbitComponentMap = this.applicationContext.getBeansWithAnnotation(RabbitComponent.class);
        Class clazz = null;
        for (Map.Entry entry : beansWithRqbbitComponentMap.entrySet()) {
            log.info("初始化队列............");
            //获取到实例对象的class信息
            clazz = entry.getValue().getClass();
            Method[] methods = clazz.getMethods();
            RabbitListener rabbitListener = clazz.getAnnotation(RabbitListener.class);
            if (ObjectUtil.isNotEmpty(rabbitListener)) {
                createQueue(rabbitListener);
            }
            for (Method method : methods) {
                RabbitListener methodRabbitListener = method.getAnnotation(RabbitListener.class);
                if (ObjectUtil.isNotEmpty(methodRabbitListener)) {
                    createQueue(methodRabbitListener);
                }
            }

       
    }


    private Map sentObj = new HashMap<>();


    @Autowired
    public RabbitMqClient(RabbitAdmin rabbitAdmin, RabbitTemplate rabbitTemplate) {
        this.rabbitAdmin = rabbitAdmin;
        this.rabbitTemplate = rabbitTemplate;
    }




    
    public void sendMessage(String queueName, Object params) {
        log.info("发送消息到mq");
        try {
            Queue queue = new Queue(queueName);
            addQueue(queue);
            CustomExchange customExchange = DelayExchangeBuilder.buildDefaultExchange();
            rabbitAdmin.declareExchange(customExchange);
            Binding binding = BindingBuilder.bind(queue).to(customExchange).with(queueName).noargs();
            rabbitAdmin.declareBinding(binding);
            messageListenerContainer.setQueueNames(queueName);
            rabbitTemplate.convertAndSend(DelayExchangeBuilder.DELAY_EXCHANGE, queueName, params, message -> {
                return message;
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }




    public RabbitMqClient put(String key, Object value) {
        this.sentObj.put(key, value);
        return this;
    }

    
    public void sendMessage(String queueName, Object params, Integer expiration) {
        this.send(queueName, params, expiration);
    }

    private void send(String queueName, Object params, Integer expiration) {
        Queue queue = new Queue(queueName);
        addQueue(queue);
        CustomExchange customExchange = DelayExchangeBuilder.buildExchange();
        rabbitAdmin.declareExchange(customExchange);
        Binding binding = BindingBuilder.bind(queue).to(customExchange).with(queueName).noargs();
        rabbitAdmin.declareBinding(binding);
        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        log.debug("发送时间:" + sf.format(new Date()));
        messageListenerContainer.setQueueNames(queueName);

        rabbitTemplate.convertAndSend(DelayExchangeBuilder.DEFAULT_DELAY_EXCHANGE, queueName, params, message -> {
            if (expiration != null && expiration > 0) {
                message.getMessageProperties().setHeader("x-delay", expiration);
            }
            return message;
        });
    }



}

3、消费消息

@Slf4j
@RabbitListener(queues = SecpDataConstant.SECP_DATA_SYNC)
@RabbitComponent(value = "testReceiver2")
public class TestReceiver2 extends BaseRabbiMqHandler {

    @RabbitHandler
    public void onMessage(BaseMap baseMap, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        super.onMessage(baseMap, deliveryTag, channel, new MqListener() {
            @Override
            public void handler(BaseMap map, Channel channel) {
                //业务处理
                String orderId = map.get(SecpDataConstant.SECP_DATA_USER_INFO).toString();
                log.info("MQ Receiver2,orderId : " + orderId);
            }
        });
    }

}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/884720.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号