栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

SpringBoot 整合 RabbitMQ

SpringBoot 整合 RabbitMQ

SpringBoot整合RabbitMQ
    • 一、maven依赖
    • 二、RabbitMQConfig.java
    • 三、RabbitMQProducer.java
    • 四、RabbitMQConsumer.java
    • 五、application.yml
    • 六、DatabaseTest.java
    • 七、测试

GitHub: link. 欢迎star

注意:本篇博客风格(不多比比就是撸代码!!!)

一、maven依赖
        
        
            org.springframework.boot
            spring-boot-starter-amqp
            2.6.1
        
二、RabbitMQConfig.java
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;


@SpringBootConfiguration
public class RabbitMQConfig {

    // work模式
    @Value("${rabbitmq.work.queue-name}")
    private String queue_name_work;
    // 订阅模式
    @Value("${rabbitmq.fanout.queue-name-1}")
    private String queue_name_fanout_1;
    @Value("${rabbitmq.fanout.queue-name-2}")
    private String queue_name_fanout_2;
    @Value("${rabbitmq.fanout.exchange-name}")
    private String exchange_name_fanout;
    // topic模式
    @Value("${rabbitmq.topic.queue-name-1}")
    private String queue_name_topic_1;
    @Value("${rabbitmq.topic.routing-key-1}")
    private String routing_key_name_topic_1;
    @Value("${rabbitmq.topic.queue-name-2}")
    private String queue_name_topic_2;
    @Value("${rabbitmq.topic.routing-key-2}")
    private String routing_key_name_topic_2;
    @Value("${rabbitmq.topic.exchange-name}")
    private String exchange_name_topic;
    // 消息/confirm/i机制
    @Value("${rabbitmq./confirm/i.queue-name}")
    private String queue_name_/confirm/i;
    // 消息return机制
    @Value("${rabbitmq.return.queue-name}")
    private String queue_name_return;
    @Value("${rabbitmq.return.routing-key}")
    private String routing_key_name_return;
    @Value("${rabbitmq.return.exchange-name}")
    private String exchange_name_return;

    
    @Bean
    public Queue queueWork() {
        return new Queue(queue_name_work);
    }

    
    @Bean
    public Queue queueFanout1() {
        return new Queue(queue_name_fanout_1);
    }

    @Bean
    public Queue queueFanout2() {
        return new Queue(queue_name_fanout_2);
    }

    @Bean
    public FanoutExchange exchangeFanout() {
        return new FanoutExchange(exchange_name_fanout);
    }

    @Bean
    public Binding bindingFanoutExchange1() {
        return BindingBuilder.bind(queueFanout1()).to(exchangeFanout());
    }

    @Bean
    public Binding bindingFanoutExchange2() {
        return BindingBuilder.bind(queueFanout2()).to(exchangeFanout());
    }

    
    @Bean
    public Queue queueTopic1() {
        return new Queue(queue_name_topic_1);
    }

    @Bean
    public Queue queueTopic2() {
        return new Queue(queue_name_topic_2);
    }

    @Bean
    public TopicExchange exchangeTopic() {
        return new TopicExchange(exchange_name_topic);
    }

    @Bean
    public Binding bindingTopicExchange1() {
        return BindingBuilder.bind(queueTopic1()).to(exchangeTopic()).with(routing_key_name_topic_1);
    }

    @Bean
    public Binding bindingTopicExchange2() {
        return BindingBuilder.bind(queueTopic2()).to(exchangeTopic()).with(routing_key_name_topic_2);
    }

    
    @Bean
    public Queue queue/confirm/i() {
        return new Queue(queue_name_/confirm/i);
    }

    
    @Bean
    public Queue queueReturn() {
        return new Queue(queue_name_return);
    }

    @Bean
    public TopicExchange exchangeReturn() {
        return new TopicExchange(exchange_name_return);
    }

    @Bean
    public Binding bindingReturnExchange() {
        return BindingBuilder.bind(queueReturn()).to(exchangeReturn()).with(routing_key_name_return);
    }
}
三、RabbitMQProducer.java
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;


@Slf4j
@Component
public class RabbitMQProducer {

    @Resource
    private RabbitTemplate rabbitTemplate;
    // work模式
    @Value("${rabbitmq.work.queue-name}")
    private String queue_name_work;
    // 订阅模式
    @Value("${rabbitmq.fanout.exchange-name}")
    private String exchange_name_fanout;
    // topic模式
    @Value("${rabbitmq.topic.exchange-name}")
    private String exchange_name_topic;
    // 消息/confirm/i机制
    @Value("${rabbitmq./confirm/i.queue-name}")
    private String queue_name_/confirm/i;
    // 消息return机制
    @Value("${rabbitmq.return.exchange-name}")
    private String exchange_name_return;

    
    public void sendWork(String message) {
        rabbitTemplate.convertAndSend(queue_name_work, message);
    }

    
    public void sendFanout(String message) {
        rabbitTemplate.convertAndSend(exchange_name_fanout, "", message);
    }

    
    public void sendTopic(String routingKey, String message) {
        rabbitTemplate.convertAndSend(exchange_name_topic, routingKey, message);
    }

    
    public void send/confirm/i(Object message) {
        CorrelationData correlationData = new CorrelationData(Thread.currentThread().getId() + "_" + System.currentTimeMillis());
        rabbitTemplate.convertAndSend(queue_name_/confirm/i, message, correlationData);
        rabbitTemplate.set/confirm/iCallback(new RabbitTemplate./confirm/iCallback() {
            @Override
            public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
                if (ack) {
                    log.info("消息/confirm/i成功!! id:{}", correlationData.getId());
                } else {
                    log.info("消息/confirm/i失败!! id:{} cause:{}", correlationData.getId(), cause);
                }
            }
        });
    }

    
    public void sendReturn(String routingKey, String message) {
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                log.info("returnedMessage >> exchange:{} routingKey:{} getMessage:{} replyCode:{} replyText:{}", returned.getExchange(), returned.getRoutingKey(), returned.getMessage(), returned.getReplyCode(), returned.getReplyText());
            }
        });
        rabbitTemplate.convertAndSend(exchange_name_return, routingKey, message);
    }
}
四、RabbitMQConsumer.java
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Slf4j
@Component
public class RabbitMQConsumer {

    // work1
    @RabbitListener(queues = "${rabbitmq.work.queue-name}")
    public void consumeWork1(String message) {
        log.info("consumeWork1 message:{}", message);
    }

    // work2
    @RabbitListener(queues = "${rabbitmq.work.queue-name}")
    public void consumeWork2(String message) {
        log.info("consumeWork2 message:{}", message);
    }

    // 订阅1
    @RabbitListener(queues = "${rabbitmq.fanout.queue-name-1}")
    public void consumeFanout1(String message) {
        log.info("consumeFanout1 message:{}", message);
    }

    // 订阅2
    @RabbitListener(queues = "${rabbitmq.fanout.queue-name-2}")
    public void consumeFanout2(String message) {
        log.info("consumeFanout2 message:{}", message);
    }

    // topic1
    @RabbitListener(queues = "${rabbitmq.topic.queue-name-1}")
    public void consumeTopic1(String message) {
        log.info("consumeTopic1 message:{}", message);
    }

    // topic2
    @RabbitListener(queues = "${rabbitmq.topic.queue-name-2}")
    public void consumeTopic2(String message) {
        log.info("consumeTopic2 message:{}", message);
    }

    // 消息/confirm/i机制
    @RabbitListener(queues = "${rabbitmq./confirm/i.queue-name}")
    public void consume/confirm/i(String message) {
        log.info("consume/confirm/i message:{}", message);
    }

    // 消息return机制
    @RabbitListener(queues = "${rabbitmq.return.queue-name}")
    public void consumeReturn(String message) {
        log.info("consumeReturn message:{}", message);
    }
}
五、application.yml
spring:
  # RabbitMQ
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: admin
    password: admin
    # 配置虚拟机
    virtual-host: /
    # 开启消息确认机制 confirm 异步
    publisher-/confirm/i-type: correlated
    # 开启return机制
    publisher-returns: true
    # 消息开启手动确认
    listener:
      direct:
        acknowledge-mode: manual

rabbitmq:
  work:
    queue-name: queue_work
  fanout:
    queue-name-1: queue_fanout_1
    queue-name-2: queue_fanout_2
    exchange-name: exchange_fanout
  topic:
    queue-name-1: queue_topic_1
    routing-key-1: topic.#
    queue-name-2: queue_topic_2
    routing-key-2: topic.*
    exchange-name: exchange_topic
  /confirm/i:
    queue-name: queue_confirm
  return:
    queue-name: queue_return
    exchange-name: exchange_return
    routing-key: return.*
六、DatabaseTest.java
import com.andon.springbootrabbitmq.rabbitmq.RabbitMQProducer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.Resource;


@RunWith(SpringRunner.class)
@SpringBootTest
public class DatabaseTest {

    @Resource
    private RabbitMQProducer rabbitMQProducer;

    @Test
    public void test05() {
        rabbitMQProducer.sendReturn("return.an.return", "return!!");
    }

    @Test
    public void test04() {
        rabbitMQProducer.send/confirm/i("/confirm/i!!");
    }

    @Test
    public void test03() {
        for (int i = 0; i < 10; i++) {
            if (i % 2 == 0) {
                rabbitMQProducer.sendTopic("topic.an.topic", "topic" + i);
            } else {
                rabbitMQProducer.sendTopic("topic.an", "topic" + i);
            }
        }
    }

    @Test
    public void test02() {
        for (int i = 0; i < 10; i++) {
            rabbitMQProducer.sendFanout("fanout" + i);
        }
    }

    @Test
    public void test01() {
        for (int i = 0; i < 10; i++) {
            rabbitMQProducer.sendWork("work" + i);
        }
    }
}
七、测试





GitHub: link. 欢迎star

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/650521.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号