栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ消息中间件技术精讲(四)

RabbitMQ消息中间件技术精讲(四)

第四章 RabbitMQ与Spring整合

文章目录

第四章 `RabbitMQ`与`Spring`整合

4.1-`AMQP`核心组件4.2-`SprigAMQP`管理组件`RabbitAdmin`应用4.3-`SprigAMQP-RabbitMQ`声明式配置使用4.4-`SpringAMQP`消息模板组件-`RabbitTemplate`4.5-`SpringAMQP`简单消息监听容器-`SimpleMessageListenerContainer`4.6-`SpringAMQP`消息监听适配器-`MessageListenerAdapter`4.7-`SpringAMPQ`消息转换器-`MessageConverter`4.8-`RabbitMQ`与`SpringBoot2.0`整合

生产端消费端 4.9-`RabbitMQ`与`SpringCloud Stream`整合

4.1-AMQP核心组件

RabbitAdmin //管控组件SpringAMQP声明 //声明交换机、绑定、队列等操作,使用@Bean注解,直接加入IOCRabbitTemplate //消息模板SimpleMessageListenerContainer //简单消息监听容器,可以在消费者端进行详细配置MessageListenerAdapter //消息监听适配器,适用于添加完监听器之后MessageConverter // 消息转换器,比如消息序列号和反序列化 4.2-SprigAMQP管理组件RabbitAdmin应用

    作用:

    RabbitAdmin类可以很好的操作RabbitMQ,比如增加删除绑定交换机、队列,在Spring中直接进行注入即可使用

    //ConnectionFactory 必须是 org.springframework.amqp.rabbit.connection.ConnectionFactory;
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
    

    注意:autoStartup必须要设置为true,否则Spring容器不会加载RabbitAdmin类

    RabbitAdmin底层实现就是从Spring容器中获取Exchange、Bingding、RoutingKey、Queue的@Bean声明,然后使用RabbitTemplate的execute方法执行对应的声明、修改、删除等一系列RabbitMq基础操作

    代码演示

      引入依赖

      
          com.rabbitmq
          amqp-client
          RELEASE
      
      
          org.springframework.boot
          spring-boot-starter-amqp
          2.3.2.RELEASE
      
      

      配置Bean

      import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
      import org.springframework.amqp.rabbit.connection.ConnectionFactory;
      //引入的是spring.amqp下的connectionFactory,而不是rabbit.client包下的
      @Configuration
      @ComponentScan({"com.cy.amqp"})
      public class RabbitMQConfig {
      @Bean
      public ConnectionFactory connectionFactory(){
          CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
          connectionFactory.setAddresses("127.0.0.1:5672");
          connectionFactory.setUsername("admin");
          connectionFactory.setPassword("admin");
          connectionFactory.setVirtualHost("/");
          return connectionFactory;
      }
      
      @Bean
      public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
          RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
          rabbitAdmin.setAutoStartup(true);
          return rabbitAdmin;
      }
      }
      

      编写测试类

      @RunWith(SpringRunner.class)
      @SpringBootTest
      public class ApplicationTests {
          @Autowired
          private RabbitAdmin rabbitAdmin;
          @Test
          public void testRabbitAdmin(){
              rabbitAdmin.declareExchange(new DirectExchange("test.direct", false,false ));
              rabbitAdmin.declareQueue(new Queue("test.direct.queue",false ));
      
              rabbitAdmin.declareBinding(new Binding("test.direct.queue", Binding.DestinationType.QUEUE, "test.direct", "direct", null));
              //BindingBuilder 链式操作,但在操作中,如果没有事先声明创建交换机和队列,会报错提示找不到
              rabbitAdmin.declareBinding(
                      BindingBuilder
                      .bind(new Queue("test.direct.queue", false))
                      .to(new DirectExchange("test.direct", false, false))
                      .with("direct")
              );
              rabbitAdmin.purgeQueue("test.direct.queue");
          }
      }
      

    RabbitAdmin源码

    实现了InitializingBean接口,表明在Bean配置加载完后再加载RabbitAdmin配置。找到afterPropertiesSet()方法中最要的initialize()初始化方法。可以看到Exchange、Queue、Binding都是从Spring容器中获取三种类型,加载到上方定义的contextExchanges、contextQueues、contextBindings三种容器中。 后续的源码中,也可以看出通过筛选Spring容器中RabbitMQ的信息之后,再去建立RabbitMQ服务器的连接。主要通过Spring以@Bean的方式,将配置加载到Spring容器之后,再从容器中获取相关信息,再去建立连接。

4.3-SprigAMQP-RabbitMQ声明式配置使用

在Rabbit基础API里面声明一个Exchange、声明一个绑定、一个队列

// 用的是  amqp-client 依赖;
channel.exchangeDeclare()
channel.queueDeclare()
channel.queueBind()

使用SpringAMQP去声明,就需要使用SpringAMQP的如下模式,即声明@Bean方式

// 用的是spring-boot-starter-amqp 依赖;
  
@Bean  
public TopicExchange exchange001() {  
    return new TopicExchange("topic001", true, false);  
}  

@Bean  
public Queue queue001() {  
    return new Queue("queue001", true); //队列持久  
}  

@Bean  
public Binding binding001() {  
    return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*");  
}  

@Bean  
public TopicExchange exchange002() {  
    return new TopicExchange("topic002", true, false);  
}  

@Bean  
public Queue queue002() {  
    return new Queue("queue002", true); //队列持久  
}

@Bean  
public Binding binding002() {  
    return BindingBuilder.bind(queue002()).to(exchange002()).with("rabbit.*");  
} 

@Bean  
public Queue queue003() {  
    return new Queue("queue003", true); //队列持久  
}

@Bean  
public Binding binding003() {  
    //同一个Exchange绑定了2个队列
    return BindingBuilder.bind(queue003()).to(exchange001()).with("mq.*");  
} 

4.4-SpringAMQP消息模板组件-RabbitTemplate

消息模板,是与SpringAMQP整合的时候进行发送消息的关键类

该类提供了丰富的发送消息方法,包括可靠性投递消息方法、回调监听消息接口/confirm/iCallback,返回值确认接口ReturnCallback等。同样的我们需要进行注入到Spring容器中,然后直接使用

在与Spring整合时,需要实例化,但是在和Springboot整合时,在配置文件中添加配置即可

代码演示

RabbitMQConfig-配置参数

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    return rabbitTemplate;
}

测试类

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void sendMsg(){
    //创建消息
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.getHeaders().put("desc", "消息描述。。。");
    messageProperties.getHeaders().put("type", "自定义消息类型");
    //消息体
    Message message = new Message("hello rabbit".getBytes(), messageProperties);
    //转换并发送
    rabbitTemplate.convertAndSend("dir01", "spring.amqp", message, new MessagePostProcessor() {
        public Message postProcessMessage(Message message) throws AmqpException {
            message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
            message.getMessageProperties().getHeaders().put("attr", "额外新增的属性");
            return message;
        }
    });
}

简单写法

@Test
public  void sendmsg2(){
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setContentType("text/plain"); 
    Message message = new Message("mq消息".getBytes(), messageProperties);
    rabbitTemplate.send("topic", "spring.abc", message);
    rabbitTemplate.convertAndSend("topic1", "spring.a", "hello,1");
}
4.5-SpringAMQP简单消息监听容器-SimpleMessageListenerContainer

    这个类非常强大,我们可以对它进行很多设置,对于消费者的配置项,这个类可以满足

    监听队列(多个队列)、自动启动、自动声明功能

    设置事务特性、事务管理器、事务属性、事务容器(并发)、是否开启事务、回滚消息等

    设置消费者数量、最小最大数量、批量消费

    设置消息确认和自动确认模式、是否重回队列、异常捕捉handler函数

    设置消费者标签生成策略、是否独占模式、消费者属性等

    设置具体的监听器、消息转换器等

    SimpleMessageListenerContainer可以进行动态设置,比如在运行中的应用可以动态的修改其消费者数量大小、接收消息的模式等

    很多基于RabbitMQ的自制后端管控台在进行动态设置的时候,也是根据这一特性去实现的

    主要使用方法:

messageListenerAdapter 消息监听适配器

defaultListenerMethod 默认监听方法

Delegate 委托对象,实际真实的委托人,进行处理消息

queueOrTagToMethodName 队列与方法进行绑定

代码演示

RabbitMQConfig-配置参数

 @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer(connectionFactory);
        listenerContainer.setConcurrentConsumers(1); // 设置当前消费者为1
        listenerContainer.setMaxConcurrentConsumers(5);// 设置最大消费者5
        listenerContainer.setQueues(queue001(),queue002(),queue003());// 设置监听的队列,此处使用了4.3 声明式配置中定义的队列
        listenerContainer.setDefaultRequeueRejected(false);// 设置是否重回队列
        listenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);//设置接收方式,AUTO-自动接收,MANUAL-手动接收,NULL-不接收
        listenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy() {  //设置消费者标签策略
            @Override
            public String createConsumerTag(String s) {
                return s+"_"+ UUID.randomUUID().toString();
            }
        });
        listenerContainer.setMessageListener(new MessageListener() {   // 监听消息
            @Override
            public void onMessage(Message message) {
                String msg = new String(message.getBody());
                System.out.println("消息:"+msg);
            }
        });
        return listenerContainer;
    }
4.6-SpringAMQP消息监听适配器-MessageListenerAdapter

RabbitMQconfig - 配置参数

//原始方法,可以直接new MessageListener
listenerContainer.setMessageListener(new MessageListener() {   // 监听消息
    @Override
    public void onMessage(Message message) {
        String msg = new String(message.getBody());
        System.out.println("消息:"+msg);
    }
});


// 1.1 默认是有自己的方法名字的:handleMessage

MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
listenerContainer.setMessageListener(adapter);

// 1.2 可以自己指定一个方法的名字: consumeMessage
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
listenerContainer.setMessageListener(adapter);

// 2 也可以添加一个转换器: 从字节数组转换为String
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
adapter.setMessageConverter(new MessageConverter() { // 消息转换器
    @Override
    public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
        // java对象,转换成message对象
        return new Message(o.toString().getBytes(),messageProperties);
    }
    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        // message对象,转换为java对象
        String contentType = message.getMessageProperties().getContentType();
        if (null != contentType && contentType.contains("text")) {
            return new String(message.getBody());
        }
        return message.getBody();
    }
});
listenerContainer.setMessageListener(adapter);

return listenerContainer;

MessageDelegate 消息代理

public class MessageDelegate {
	
    public void handleMessage(byte[] messageBody) {
        System.out.println("默认方法, 消息内容:" + new String(messageBody));
    }

    public void consumeMessage(byte[] messageBody) {
        System.out.println("字节数组方法, 消息内容:" + new String(messageBody));
    }

    public void consumeMessage(String messageBody) {
        System.err.println("字符串方法, 消息内容:" + messageBody);
    }
    
    public void method1(String messageBody) {
        System.err.println("method1 收到消息内容:" + new String(messageBody));
    }
    
    public void method2(String messageBody) {
        System.err.println("method2 收到消息内容:" + new String(messageBody));
    }
    
    
    public void consumeMessage(Map messageBody) {
        System.err.println("map方法, 消息内容:" + messageBody);
    }
    

    public void consumeMessage(Order order) {
        System.err.println("order对象, 消息内容, id: " + order.getId() +
                ", name: " + order.getName() +
                ", content: "+ order.getContent());
    }
    
    public void consumeMessage(Packaged pack) {
        System.err.println("package对象, 消息内容, id: " + pack.getId() +
                ", name: " + pack.getName() +
                ", content: "+ pack.getDescription());
    }

    public void consumeMessage(File file) {
        System.err.println("文件对象 方法, 消息内容:" + file.getName());
    }
}

单元测试

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class RabbitAdminTest {
    @Autowired
    private  RabbitAdmin rabbitAdmin ;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 测试对应 RabbitMQConfig-1.1和1.2
    @Test
    public void testAdapter1() {
        MessageProperties messageProperties = new MessageProperties();
        Message message = new Message("hello rabbitmq-1".getBytes(), messageProperties);
        rabbitTemplate.convertAndSend("topic001", "spring.a", message);
    }

    // 测试对应 RabbitMQConfig-2 也可以添加一个转换器: 从字节数组转换为String
    @Test
    public void testAdapter2() {
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("text/plain");
        Message message = new Message("hello rabbitmq-2".getBytes(), messageProperties);
        rabbitTemplate.convertAndSend("topic001", "spring.b", message);
    }
     // 测试对应 RabbitMQConfig-3 可以将队列名称,和我们自定义的方法名进行绑定
    @Test
    public void testAdapter3() {
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("text/plain");
        Message message1 = new Message("hello rabbitmq-3.1".getBytes(), messageProperties);
        rabbitTemplate.convertAndSend("topic001", "spring.c", message1);
        Message message2 = new Message("hello rabbitmq-3.2".getBytes(), messageProperties);
        rabbitTemplate.convertAndSend("topic002", "rabbit.a", message2);
    }
}
4.7-SpringAMPQ消息转换器-MessageConverter

发送消息时,正常情况下消息体为二进制的数据方式进行传输,如果希望内部帮我们进行转换,或者指定自定义的转换器,就需要使用MessageConverter自定义转换器,一般都需要实现接口重写两个方法

toMessage java对象转换为MessagefromMessage Message对象转换为java对象 除了转换string类型数据,还有以下

json转换器 Jackson2JsonMessageConverter 可以进行java对象的转换功能DefaultJackson2JavaTypeMapper映射器 可以进行java对象的映射关系自定义二进制转换器,比如图片、PDF、PPT、流媒体

RabbitMqConfig 配置参数

    // 1.1 支持json格式的转换器
    MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
    adapter.setDefaultListenerMethod("consumeMessage");
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    adapter.setMessageConverter(jackson2JsonMessageConverter);
    listenerContainer.setMessageListener(adapter);

    // 1.2 支持java对象转换  DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter
    MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
    adapter.setDefaultListenerMethod("consumeMessage");
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
    //添加包信任,否则报错:The class 'com.lc.study.spring.OrderEntity' is not in the trusted packages
    javaTypeMapper.setTrustedPackages("*");
    jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
    adapter.setMessageConverter(jackson2JsonMessageConverter);
    listenerContainer.setMessageListener(adapter);

    // 1.3 支持java多对象映射转换 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter
    MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
    adapter.setDefaultListenerMethod("consumeMessage");
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
    //添加包信任,否则报错:The class 'com.lc.study.spring.OrderEntity' is not in the trusted packages
    javaTypeMapper.setTrustedPackages("*");
    HashMap> classHashMap = new HashMap<>();
    classHashMap.put("order",com.lc.study.spring.OrderEntity.class);
    classHashMap.put("package",com.lc.study.spring.PackageEntity.class);
    javaTypeMapper.setIdClassMapping(classHashMap);
    jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
    adapter.setMessageConverter(jackson2JsonMessageConverter);
    listenerContainer.setMessageListener(adapter);

    // 1.4 text/image/pdf 转换器
    MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
    adapter.setDefaultListenerMethod("consumeMessage");
    //全局转换器,大的
    ContentTypeDelegatingMessageConverter converter = new ContentTypeDelegatingMessageConverter();
    //文本转换器
    MessageConverter testConverter = new MessageConverter() {
        @Override
        public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
            return new Message(o.toString().getBytes(),messageProperties);
        }

        @Override
        public Object fromMessage(Message message) throws MessageConversionException {
            String contentType = message.getMessageProperties().getContentType();
            if (null != contentType && contentType.contains("text")) {
                return new String(message.getBody());
            }
            return message.getBody();
        }
    };
    converter.addDelegate("text",testConverter);
    converter.addDelegate("html/text",testConverter);
    converter.addDelegate("xml/text",testConverter);
    converter.addDelegate("text/plain",testConverter);
    // json转换器
    Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter();
    converter.addDelegate("json",jsonMessageConverter);
    converter.addDelegate("application/json",jsonMessageConverter);
    // 图片转换器
    MessageConverter imageConverter = new MessageConverter() {
        @Override
        public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
            throw  new MessageConversionException("图片转换异常");
        }

        @Override
        public Object fromMessage(Message message) throws MessageConversionException {
            Object _extName = message.getMessageProperties().getHeaders().get("extName");
            String extName = _extName == null ?"png":_extName.toString();
            System.out.println(extName);
            byte[] body = message.getBody();
            String fileName = UUID.randomUUID().toString();
            String path = "C:/Users/24669/Downloads/workspace/" + fileName +"."+extName;
            File file = new File(path);
            try {
                Files.copy(new ByteArrayInputStream(body),file.toPath());
            } catch (IOException e) {
                e.printStackTrace();
            }
            return null;
        }
    };
    converter.addDelegate("image/png",imageConverter);
    converter.addDelegate("image",imageConverter);

    // pdf 转换器
    MessageConverter pdfConverter = new MessageConverter() {
        @Override
        public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
            throw  new MessageConversionException("pdf转换异常");
        }

        @Override
        public Object fromMessage(Message message) throws MessageConversionException {

            byte[] body = message.getBody();
            String fileName = UUID.randomUUID().toString();
            String path = "C:/Users/24669/Downloads/workspace/" + fileName +".pdf";
            File file = new File(path);
            try {
                Files.copy(new ByteArrayInputStream(body),file.toPath());
            } catch (IOException e) {
                e.printStackTrace();
            }
            return null;
        }
    };
    converter.addDelegate("application/pdf",pdfConverter);
    adapter.setMessageConverter(converter);
    listenerContainer.setMessageListener(adapter);
    return listenerContainer;

MessageDelegate 消息代理 同上一章

单元测试

	//测试对应  1.1 支持json格式的转换器
    @Test
    public void testJsonMessage() throws JsonProcessingException {
        OrderEntity order = new OrderEntity();
        order.setId("0001");
        order.setName("订单消息");
        order.setContent("小笼包+豆浆+茶叶蛋");

        ObjectMapper mapper = new ObjectMapper();
        String json = mapper.writevalueAsString(order);
        System.out.println("json消息体:"+json);
        MessageProperties properties = new MessageProperties();
        //这里注意一定要修改contentType为 application/json
        properties.setContentType("application/json");
        Message message = new Message(json.getBytes(), properties);
        rabbitTemplate.convertAndSend("topic001","spring.json",message);
    }

    //测试对应  1.2 支持java对象格式的转换器
    @Test
    public void testJavaEntityMessage() throws JsonProcessingException {
        OrderEntity order = new OrderEntity();
        order.setId("0002");
        order.setName("订单消息");
        order.setContent("小笼包+豆浆+茶叶蛋");

        ObjectMapper mapper = new ObjectMapper();
        String json = mapper.writevalueAsString(order);
        System.out.println("java对象:"+json);
        MessageProperties properties = new MessageProperties();
        //这里注意一定要修改contentType为 application/json
        properties.setContentType("application/json");
        properties.getHeaders().put("__TypeId__","com.lc.study.spring.OrderEntity");
        Message message = new Message(json.getBytes(), properties);
        rabbitTemplate.convertAndSend("topic001","spring.entity",message);
    }

    //测试对应  1.3 支持java多对象映射的转换器
    @Test
    public void testJavaEntityMapperMessage() throws JsonProcessingException {
        OrderEntity order = new OrderEntity();
        order.setId("0003");
        order.setName("订单消息");
        order.setContent("小笼包+豆浆+茶叶蛋");

        ObjectMapper mapper = new ObjectMapper();
        String json = mapper.writevalueAsString(order);
        System.out.println("java对象:"+json);
        MessageProperties properties = new MessageProperties();
        //这里注意一定要修改contentType为 application/json
        properties.setContentType("application/json");
        properties.getHeaders().put("__TypeId__","order");
        Message message = new Message(json.getBytes(), properties);
        rabbitTemplate.convertAndSend("topic001","spring.entity",message);

        PackageEntity pack = new PackageEntity();
        pack.setId("0001");
        pack.setName("快递包裹信息");
        pack.setDescription("你的快递已送达");

        String json2 = mapper.writevalueAsString(pack);
        System.out.println("json2对象:"+json2);
        MessageProperties properties2 = new MessageProperties();
        //这里注意一定要修改contentType为 application/json
        properties2.setContentType("application/json");
        properties2.getHeaders().put("__TypeId__","package");
        Message message2 = new Message(json2.getBytes(), properties2);
        rabbitTemplate.convertAndSend("topic001","spring.entity2",message2);
    }

    //测试对应  1.4 text/image/pdf 的转换器
    @Test
    public void testTextJsonImagePdfMessage() throws Exception  {
        byte[] body1 = Files.readAllBytes(Paths.get("C:/Users/24669/Downloads", "picture.png"));
        MessageProperties properties1 = new MessageProperties();
        properties1.setContentType("image/png");
        properties1.getHeaders().put("extName","png");

        Message message1 = new Message(body1, properties1);
        rabbitTemplate.convertAndSend("topic001","spring.png",message1);

        byte[] body2 = Files.readAllBytes(Paths.get("C:/Users/24669/Downloads", "test.pdf"));
        MessageProperties properties2 = new MessageProperties();
        properties2.setContentType("application/pdf");
        Message message2 = new Message(body2, properties2);
        rabbitTemplate.convertAndSend("topic001","spring.pdf",message2);
    }
4.8-RabbitMQ与SpringBoot2.0整合 生产端

publisher-/confirm/is 实现一个监听器,用于监听Broker端给我们返回的确认请求 RabbitTemplate./confirm/iCallback

publisher-returns 保证消息对于Broker端可达,如果出现路由键不可达的情况,则使用监听器对不可达消息进行后续处理,保证消息的路由成功RabbitTemplate.ReturnCallback

在发送消息的时候,对template进行配置mandatory = true 保证监听有效

当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会将消息返回给生产者;当mandatory设置为false时,出现上述情形broker会直接将消息扔掉。

// pom文件

    spring-boot-starter-parent
    org.springframework.boot
    2.3.4.RELEASE


    
        org.springframework.boot
        spring-boot-starter-web
    
    
        org.springframework.boot
        spring-boot-starter-amqp
    
    
        org.springframework.boot
        spring-boot-starter-test
    

// 配置application.yml
server:
  port: 5555
spring:
  rabbitmq:
    addresses: 192.168.2.58:5672
    username: guest
    password: guest
    virtual-host: /
    publisher-/confirm/i-type: correlated
    publisher-returns: true
    template:
      mandatory: true
// 声明式创建元数据(交换机、队列、绑定)
@Configuration
@ComponentScan(value = {"com.lc.producer.*"})
public class ProducerConfig {
    //声明交换机和队列,以及绑定
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("springboot.exchange1", true, false);
    }
    @Bean
    public Queue queue1() {
        //如果你想创建一个只有自己可见的队列,即不允许其它用户访问,RabbitMQ允许你将一个Queue声明成为排他性的(Exclusive Queue)。
        //1-只对首次声明它的连接(Connection)可见;2-会在其连接断开的时候自动删除。
        // String name, boolean durable, boolean exclusive, boolean autoDelete
        return new Queue("spring.boot.queue1",true,false,false);
    }
    @Bean
    public Binding binding1() {
        return BindingBuilder.bind(queue1()).to(topicExchange()).with("springboot.#");
    }
}
// 自定义消费发送类,重写rabbitTemplate
@Component
public class RabbitSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private RabbitConfirmCallback rabbit/confirm/iCallback;
    @Autowired
    private RabbitReturnCallback rabbitReturnCallback;
    public void sendMessage(Object message, MessageProperties properties) {
        Message msg = new Message(message.toString().getBytes(), properties);
        rabbitTemplate.setMandatory(true);
        //消息从 producer 到 rabbitmq broker有一个 confirmCallback 确认模式。
        //消息从 exchange 到 queue 投递失败有一个 returnCallback 退回模式。
        rabbitTemplate.set/confirm/iCallback(rabbit/confirm/iCallback);
        rabbitTemplate.setReturnCallback(rabbitReturnCallback);

        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        // 第一次测试,使用正确的routingkey;第二次测试,使用错误的routingkey
        rabbitTemplate.convertAndSend("springboot.exchange1", "springboot.a", msg, correlationData);
    }
}
// 消息/confirm/iCallBack类,确认模式
@Component
public class Rabbit/confirm/iCallback implements RabbitTemplate./confirm/iCallback {
    @Override
    public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
        System.err.println("correlationdata:" + correlationData);
        System.err.println("ack:" + ack);
        if (!ack) {
            System.err.println("异常处理..."+cause);
        } else {
            System.err.println("消息已送达");
        }
    }
}
// 消息 returnCallBack类,退回模式
@Component
public class RabbitReturnCallback implements RabbitTemplate.ReturnCallback {
    @Override
    public void returnedMessage(Message message, int i, String s, String s1, String s2) {
        System.err.println("消息未达的原因:" + s);
    }
}

// 测试类
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitMqApplication.class)
public class ProducerTest {

    @Autowired
    private RabbitSender rabbitSender;

    private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

    @Test
    public void testSender() throws Exception {
        MessageProperties properties = new MessageProperties();
        properties.getHeaders().put("key","1231231");
        properties.getHeaders().put("send_time",simpleDateFormat.format(new Date()));
        rabbitSender.sendMessage("springBoot+rabbitmq",properties);
    }
}
消费端

手动签收数据

消费端使用@RabbitMQListener注解

server:
  port: 5556
spring:
  rabbitmq:
    addresses: 192.168.2.58:5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual # 手动签收消息
@Component
public class RabbitReceiver {
    // 使用注解进行时间监听绑定,也可以用作创建元数据
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "springboot.queue1",durable = "true"),
            exchange = @Exchange(value = "springboot.exchange1",durable = "true",type = "topic",ignoreDeclarationExceptions = "true"),
            key = "springboot.#"
    ))
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws IOException {
        System.out.println("消费端:"+ new String((byte[]) message.getPayload()));
        Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        // 手动ACK
        channel.basicAck(deliveryTag,false);
    }
}
4.9-RabbitMQ与SpringCloud Stream整合

支持消息的生产与发送,可以是不同的消息中间件(Rabbitmq---->Spring Cloud Stream---->Kafka)SpringCloud Stream整合消息中间件,会有输入端和输出端,中间绑定消息中间件可以是Rabbitmq,Kafka,相当于重写了出、入接口@Output 输出注解,用户定义发送消息接口@Input 输入注解,用于定义消费接口@StreamListener 用于定义监听方法注解缺点:不能100%实现消息的可靠性投递,会存在少量消息丢失问题。因为要兼容Kafka

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/758475.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号