栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

rabbitmq实现分布式事务(rabbitmq的六种模式)

rabbitmq实现分布式事务(rabbitmq的六种模式)

一,整合rabbitMQ
    导入maven依赖
   	
   		org.springframework.amqp
   		spring-rabbit
   	
    在application.yml中添加上rabbitmq的配置
spring:
  rabbitmq:
    virtual-host: /
    host: localhost
    username: guest
    password: guest
    port: 5672
      #    消息确认配置项
      #    确认消息发送到队列
    publisher-returns: true
      #    确认消息发送到交换机
    publisher-/confirm/i-type: correlated
    #调整监听使用手动
    listener:
      direct:
        acknowledge-mode: manual
二,配置消息队列和交换机

根据场景选择下面配置模式
1.配置直连(一对一消费)

@Configuration
public class DirectRabbitConfig {
    //返回一个队列
    // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
    // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
    // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
    @Bean
    public Queue TestDirectRabbit(){
        return new Queue("operationLog",true,false,false,null);
    }
    //交换机起名
    @Bean
    DirectExchange TestDirectExchange(){
        return new DirectExchange("TestDirectExchange",true,false);
    }
    //交换机和队列绑定,提供路由
    @Bean
    Binding BindingDirect(){
        return BindingBuilder.bind(TestDirectRabbit()).to(TestDirectExchange()).with("TestDirectRouting");
    }
   }

这种方式是,发送一个消息到消息队列,只会一个交换机消费一次

    TOPIC型
    Topic型,当多个队列绑定到同一个交换机上时,根据routingkey来确定,与Direct不同的是,rountingkey有统配规则,例如topic#,表示所有前面是topic后面无论是什么,都会被捕获到,然后消费,达到了一个可以选择性直连或者广播的效果
@Configuration
public class TopicRabbitConfig {
   final static String man="topic.man";
   final static String woman="topic.woman";

   //队列1
   @Bean
   public Queue getQueue1(){
       return new Queue("TopicQueue1",true,false,false,null);
   }
   //队列2
   @Bean
   public Queue getQueue2(){
       return new Queue("TopicQueue2",true,false,false,null);
   }
   //交换机
   @Bean
   public TopicExchange getTopicExchange(){
       return new TopicExchange("TopicExchange");
   }
   //绑定
   @Bean
   public Binding getBinding1(){
       return BindingBuilder.bind(getQueue1()).to(getTopicExchange()).with(man);
   }
   @Bean
   public Binding getBinding2(){
       return BindingBuilder.bind(getQueue2()).to(getTopicExchange()).with("topic.#");
   }
}
    广播模式
    一个消息,可以被大家都消费一遍
@Configuration
public class FanoutRabbitConfig {
    //队列一
    @Bean
    public Queue getFanoutQueue1(){
        return new Queue("FanoutQueue1",true,false,false);
    }
    //队列二
    @Bean
    public Queue getFanoutQueue2(){
        return new Queue("FanoutQueue2",true,false,false);
    }
    //队列三
    @Bean
    public Queue getFanoutQueue3(){
        return new Queue("FanoutQueue3",true,false,false);
    }
    //交换机
    @Bean
    public FanoutExchange getFanoutExchange(){
        return new FanoutExchange("FanoutExchange");
    }
    //绑定交换机与队列
    @Bean
    public Binding getFanoutBinding1(){
        return BindingBuilder.bind(getFanoutQueue1()).to(getFanoutExchange());
    }
    //绑定交换机与队列
    @Bean
    public Binding getFanoutBinding2(){
        return BindingBuilder.bind(getFanoutQueue2()).to(getFanoutExchange());
    }
    //绑定交换机与队列
    @Bean
    public Binding getFanoutBinding3(){
        return BindingBuilder.bind(getFanoutQueue3()).to(getFanoutExchange());
    }
}
三,监听消费
    编写一个手动监听的功能类
@Component
public class MyListener implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("deliveryTag=" + deliveryTag);
        //一次接收一条
        channel.basicAck(1, false);
        byte[] body = message.getBody();

        String s = new String(body);
      System.out.println("消费信息:" + s);

    }
}
    创建一个监听容器,把上面对象存入
@Configuration
public class SimpleMessageListenerConfig {
   @Autowired
   CachingConnectionFactory connectionFactory;
   @Autowired
   MyListener myListener;
   @Bean
   public SimpleMessageListenerContainer createSimpleMessageListenerContainer(){
       SimpleMessageListenerContainer container=new SimpleMessageListenerContainer(connectionFactory);
       //监听为手动处理消费
       container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
       //设置需要监听的队列名
       container.setQueueNames("operationLog");
       //将监听处理类加入容器
       container.setMessageListener(myListener);
       return container;
   }


}
四,编写controller,发送消息给rabbitmq
@RestController
@RequestMapping("/rabbit")
public class RabbitController {
    //注入rabbitTemplate来发送消息
    @Autowired
    RabbitTemplate rabbitTemplate;

    @RequestMapping("/log2")
    public void TopicRabbit1(){
        Map map = new HashMap<>();
        map.put("lmh", "测试666");
        //将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
        rabbitTemplate.convertAndSend("TopicExchange", "topic.man",map);
        System.out.println("上传日志===");
    }

测试一下就好。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/771666.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号