栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

RabbitMQ 学习记录

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

RabbitMQ 学习记录

RabbitMQ使用
      • 安装rabbitMQ
          • 1. 安装erlang的yum仓库
          • 2. 安装rabbitMQ的yum仓库
          • 3. 启动rabbitMQ服务
          • 4. 添加用户,开放端口
      • rabbitMQ四种交换机
          • 1. Direct Exchange
          • 2. Fanout Exchange
          • 3. Topic Exchange
          • 4. Headers Exchanges
      • springboot整合RabbitMQ
          • 1. 添加maven依赖
          • 2. yml配置对应参数
          • 3. 项目中使用其他配置
          • 4. 使用示例

安装rabbitMQ

需要rabbitMQ版本与erlang版本匹配,最好是从官网下载

1. 安装erlang的yum仓库
curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
yum install -y epel-release
yum install -y e
2. 安装rabbitMQ的yum仓库
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
yum install -y r
3. 启动rabbitMQ服务
systemctl start rabbitmq-server
rabbitmq-plugins enable rabbitmq_management
4. 添加用户,开放端口
rabbitmqctl add_user admin admin #添加用户
rabbitmqctl add_user_tags admin administrator #给用户增加标签
rabbitmqctl set_permissions -p '/' 'admin' '.*' '.*' '.*' #给用户增加访问权限
#开放5672、15672端口
rabbitMQ四种交换机 1. Direct Exchange

将队列绑定到交换机上,同一个队列可以用不同的路由键绑定同一个交换机,消息发送需要路由键完全匹配。

2. Fanout Exchange

扇形交换机接收到的消息,会分发到所有绑定到该交换机的所有队列,与路由键不相关。

3. Topic Exchange

主题交换机,支持路由键通过通配符匹配。

4. Headers Exchanges

不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。在绑定Queue与Exchange时指定一组键值对;当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。

匹配规则x-match有下列两种类型:

x-match = all :表示所有的键值对都匹配才能接受到消息

x-match = any :表示只要有键值对匹配就能接受到消息

springboot整合RabbitMQ 1. 添加maven依赖

        
            org.springframework.boot
            spring-boot-starter-amqp
            2.5.2
        
2. yml配置对应参数
spring:
   rabbitmq:
    host: 192.168.0.203
    port: 5672
    username: admin
    password: admin
    virtual-host: /test
3. 项目中使用其他配置
public class RabbitConnection extends AbstractConnectionFactory {
    public RabbitConnection(ConnectionFactory rabbitConnectionFactory) {
        super(rabbitConnectionFactory);
    }

    @Override
    public Connection createConnection() throws AmqpException {
        return this.createBareConnection();
    }
}

获取rabbitTemplate,可以通过注解获取yml配置的链接,也可以自己配置

@Component
public class RabbitTemplateFactory {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public RabbitTemplate getRabbitTemplate(){
        return this.rabbitTemplate;
    }

    public RabbitTemplate getRabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplateMy = new RabbitTemplate();
        rabbitTemplateMy.setConnectionFactory(connectionFactory);
        rabbitTemplateMy.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplateMy;
    }

    public RabbitTemplate getRabbitTemplate(String host,int port,String username,String password,String virHost){
        com.rabbitmq.client.ConnectionFactory rabbitConnection = new com.rabbitmq.client.ConnectionFactory();
        rabbitConnection.setHost(host);
        rabbitConnection.setPort(port);
        rabbitConnection.setUsername(username);
        rabbitConnection.setPassword(password);
        rabbitConnection.setVirtualHost(virHost);
        RabbitConnection connection = new RabbitConnection(rabbitConnection);
        RabbitTemplate rabbitTemplateMy = new RabbitTemplate();
        rabbitTemplateMy.setConnectionFactory(connection);
        rabbitTemplateMy.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplateMy;
    }

    public RabbitAdmin getRabbitAdmin(){
        return new RabbitAdmin(this.rabbitTemplate);
    }

    public RabbitAdmin getRabbitAdmin(RabbitTemplate rabbitTemplate){
        return new RabbitAdmin(rabbitTemplate);
    }
}
4. 使用示例
@RestController
@RequestMapping("mes/rabbitDemo")
@Api(tags="rabbitMQ使用demo")
public class RabbitMQDemo {

    //如果使用默认配置,可以直接使用rabbitTemplate,如果有其他不同配置可以使用下面自己配置
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private RabbitTemplateFactory rabbitTemplateFactory;


    @GetMapping("sendMessageByRoute")
    @ApiOperation("通配符匹配路由")
    public Result sendMessageByRoute(){
        JSONObject o = new JSONObject();
        o.put("test1","按鼠标");
        o.put("test2","发生了");
        o.put("是客服","sdfkj");
        RabbitTemplate rabbitTemplate1 = rabbitTemplateFactory.getRabbitTemplate("192.168.0.203",5672,"admin","admin","/test");
        rabbitTemplate1.convertAndSend("amq.topic","topic.1",o);
        return new Result();
    }

    @GetMapping("sendMessageByExchange")
    @ApiOperation("指定交换机路由")
    public Result sendMessageByExchange(){
        JSONObject o = new JSONObject();
        o.put("test1","按鼠标");
        o.put("test2","发生了");
        o.put("是客服","sdfkj");
        RabbitTemplate rabbitTemplate1 = rabbitTemplateFactory.getRabbitTemplate("192.168.0.203",5672,"admin","admin","/test");
        rabbitTemplate1.convertAndSend("test.direct","1",o);
        return new Result();
    }

    @GetMapping("sendMessageWithOutRoute")
    @ApiOperation("扇形交换机广播")
    public Result sendMessageWithOutRoute(){
        JSONObject o = new JSONObject();
        o.put("test1","按鼠标");
        o.put("test2","发生了");
        o.put("是客服","sdfkj");
        RabbitTemplate rabbitTemplate1 = rabbitTemplateFactory.getRabbitTemplate("192.168.0.203",5672,"admin","admin","/test");
        rabbitTemplate1.convertAndSend("amq.fanout","*",o);
        return new Result();
    }


    @GetMapping("getMessageByQueueName")
    @ApiOperation("通过队列名获取消息")
    public Result getMessageByQueueName(){
        JSONObject o = new JSONObject();
        o= JSONObject.parseObject(rabbitTemplate.receiveAndConvert("test.que1").toString());
        System.out.println(o.toJSONString());
        return new Result();
    }

    //根据队列监听
    @RabbitListener(queues = {"test.que2","test.que3"})
    public void listenRabbit(String msg){
        JSONObject o = new JSONObject();
        o= JSONObject.parseObject(msg);
        System.out.println(o.toJSONString());
    }

    //根据路由监听
    @RabbitListener(bindings = {
            @QueueBinding(value = @org.springframework.amqp.rabbit.annotation.Queue("test.que1"),
                    exchange = @org.springframework.amqp.rabbit.annotation.Exchange(name = "test.direct"),
                    key = {"9"})})
    public void listenRabbit(Object msg){
        JSONObject o = new JSONObject();
//        o= JSONObject.parseObject(msg);
        System.out.println(msg.toString());
    }
    @RabbitListener(bindings = {
            @QueueBinding(value = @org.springframework.amqp.rabbit.annotation.Queue("test.que1"),
                    exchange = @org.springframework.amqp.rabbit.annotation.Exchange(name = "test.direct"),
            key = {"1"})})
    public void listenRabbits(Object msg){
        JSONObject o = new JSONObject();
//        o= JSONObject.parseObject(msg);
        System.out.println("s"+msg.toString());
    }


    @GetMapping("createExchangeAndQueue")
    @ApiOperation("创建交换机并绑定队列")
    public Result createExchangeAndQueue(){
        DirectExchange directExchange = new DirectExchange("exchangeCreateByCode");
        RabbitTemplate rabbitTemplate1 = rabbitTemplateFactory.getRabbitTemplate("192.168.0.203",5672,"admin","admin","/test");
        AmqpAdmin amqpAdmin = rabbitTemplateFactory.getRabbitAdmin(rabbitTemplate1);
        Queue queue = new Queue("test.que9");
        amqpAdmin.declareExchange(directExchange);
        amqpAdmin.declareQueue(queue);
        amqpAdmin.declareBinding(new Binding("test.que9",Binding.DestinationType.QUEUE,"exchangeCreateByCode","9",null));
        return new Result();
    }

    //需要添加监听器才能自动创建
    //创建队列
    @Bean("helloQueue")
    public Queue helloQueue(){
        return new Queue("helloQueue");
    }
    //创建交换机
    @Bean("helloTopic")
    public TopicExchange topicExchange(){
        return new TopicExchange("helloTopic");
    }
    //绑定队列交换机
    @Bean
    public Binding bindingExchangeMessage(@Qualifier("helloQueue") Queue helloQueue,@Qualifier("helloTopic") TopicExchange topicExchange){

        return BindingBuilder.bind(helloQueue).to(topicExchange).with("topic.test");
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/845900.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号