栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

SpringBoot集成RabbitMQ

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

SpringBoot集成RabbitMQ

如今较为流行的消息中间件,例如RabbitMQ和Kafka,在此介绍使用SpringBoot集成RabbitMQ。

1.引入SpringBoot集成的RabbitMQ的启动包依赖,因为SpringBoot对RabbitMQ有着很好的集成,所以在导入依赖时,这里我的SpringBoot使用的是最新版的:


    org.springframework.boot
    spring-boot-starter-amqp


    org.springframework.boot
    spring-boot-starter-web

2.配置配置文件application.yml,在此注意我们的ip地址和端口号就行:

server:
  port: 8021
spring:
  application:
    name: rabbitmq-provider

  //配置我们的rabbitmq的主机地址,本机Windows的话就用127.0.0.1,其他主机的话就用ip地址,这里我使用的是我的服务器ip地址
  rabbitmq:
    host: 101.35.105.175
    //配置我们的rabbitmq的端口号,开放的一般是5672
    port: 5672
    username: admin
    password: 123
    #虚拟host 可以不设置,使用server默认host
    virtual-host: JCcccHost

3.创建好我们的工程之后,创建生产者:这里我们对各个不同的rabbitmq的工作模式分别创建不同:

3.1、HelloWorld模式:
生产者:这里我们使用RabbitTemplate 的模板对象,我们只需对其进行注入就能使用RabbitTemplate 对象对消息进行操作。

@SpringBootTest(classes = SpringbootRabbitmqApplication.class)
@RunWith(SpringRunner.class)
public class HelloWorldDemo {
    @Autowired
    RabbitTemplate rt;

    @Test
    public void send() {
        rt.convertAndSend("hello-world","hello world!!!");
    }
}

消费者:
使用@RabbitListener(queuesToDeclare = @Queue(“hello-world”))去对我们的队列进行绑定。
这里的msg参数,是接收生产者产生的消息。

@Component
@RabbitListener(queuesToDeclare = @Queue("hello-world"))
public class HelloWorld {

    @RabbitHandler
    public void receive(String msg){
        System.out.println("msg:"+msg);
    }
}

执行后:


3.2、work模式:
生产者:

package com.example.springbootrabbitmq.helloworld;



import com.example.springbootrabbitmq.SpringbootRabbitmqApplication;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest(classes = SpringbootRabbitmqApplication.class)
@RunWith(SpringRunner.class)
public class HelloWorldDemo {
    @Autowired
    RabbitTemplate rt;

    @Test
    public void send() {
        for (int i = 0; i < 100; i++) {
            rt.convertAndSend("hello-world","hello world!!!"+i);
        }
    }
}

消费者:多个消费者,绑定一个消息队列,轮询方式消费信息

package com.example.springbootrabbitmq.helloworld;


import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class HelloWorld {

    @RabbitHandler
    @RabbitListener(queuesToDeclare = @Queue("hello-world"))
    public void receive01(String msg){
        System.out.println("receive01:"+msg);
    }
    @RabbitHandler
    @RabbitListener(queuesToDeclare = @Queue("hello-world"))
    public void receive02(String msg){
        System.out.println("receive02:"+msg);
    }
    @RabbitHandler
    @RabbitListener(queuesToDeclare = @Queue("hello-world"))
    public void receive03(String msg){
        System.out.println("receive03:"+msg);
    }
}

3.3、fanout模式——广播模式
生产者:
第一个参数:设置我们的交换机
第二个参数:设置routekey
第三个参数:消息体

@Test
    public void fanOutSend() {
        rt.convertAndSend("logs","","i am fanout");
    }

消费者:注解去绑定临时队列,绑定交换机,绑定消息类型

 @RabbitListener(bindings = {
            @QueueBinding(value = @Queue,
                    exchange = @Exchange(value = "logs",type = "fanout"))
    })
    public void receive02(String msg){
        System.out.println("receive02:"+msg);
    }
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue,
                    exchange = @Exchange(value = "logs",type = "fanout"))
    })
    public void receive03(String msg){
        System.out.println("receive03:"+msg);
    }

3.4、direct路由模式:绑定我们的路由key
生产者:

 @Test
    public void DirectSend() {
        rt.convertAndSend("route-directs","info","i am info");
        rt.convertAndSend("route-directs","error","i am error");
    }

消费者:

package com.example.springbootrabbitmq.direct;


import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Direct {
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue,
                    key = {"info"},
                    exchange = @Exchange(value = "route-directs",type = "direct"))
    })
    public void receive01(String msg){
        System.out.println("receive01:"+msg);
    }
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue,
                    key = {"info","error"},
                    exchange = @Exchange(value = "route-directs",type = "direct"))
    })
    public void receive02(String msg){
        System.out.println("receive02:"+msg);
    }

}

3.5、Topic(动态路由):
生产者:

@Test
    public void TopicSend() {
        rt.convertAndSend("route-Topic","info","i am info");
        rt.convertAndSend("route-Topic","info.save","i am info");
        rt.convertAndSend("route-Topic","info.save.send","i am info");
        rt.convertAndSend("route-Topic","error.save","i am error");
        rt.convertAndSend("route-Topic","error.save.send","i am error");
    }

消费者:

package com.example.springbootrabbitmq.topic;


import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Topic {
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue,
                    key = {"info.*","error.*"},
                    exchange = @Exchange(value = "route-topic",type = "topic"))
    })
    public void receive01(String msg){
        System.out.println("receive01:"+msg);
    }
}

4、各位老板可以点个免费的关注吗 谢谢啦!!!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/298594.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号