栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

SpringAMQP 消息转换器

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

SpringAMQP 消息转换器

说明:在SpringAMQP的发送中,接收消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送。

ctrl+p,查看ConvertAndSend方法中的类型,发现都是Object。

测试:
在之前的配置类中声明一个队列object.queue

package com.yy.comsumer.config;


import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {

    //yy.fanout 声明交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("yy.fanout");
    }

    //fanout.queue1 声明第1个队列
    @Bean
    public Queue fanoutQueue(){
        return new Queue("fanout.queue1");
    }

    //绑定 绑定队列1和交换机
    @Bean
    public Binding fanoutBinding1(Queue fanoutQueue,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
    }


    //fanout.queue2 声明第2个队列
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    //绑定 绑定队列2和交换机
    @Bean
    public Binding fanoutBinding2(Queue fanoutQueue2,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }


    @Bean
    public Queue objectQueue(){
        return new Queue("object.queue");
    }


}

重启生效

编写消息发送map

package com.yy.spring;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.HashMap;
import java.util.Map;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessage2SimpleQueue(){
        String queueName="simple.queue";
        String message="hello,string amqp!";
        rabbitTemplate.convertAndSend(queueName,message);
    }

    @Test
    public void testSendMessage2WorkQueue() throws InterruptedException {
        String queueName="simple.queue";
        String message="hello,message__";
        for (int i = 1; i < 50; i++) {
            rabbitTemplate.convertAndSend(queueName,message + i);
            Thread.sleep(20);
        }

    }



    @Test
    public void testSendFanoutExchange()  {
        //交换机名称
        String exchangeName = "yy.fanout";
        // 消息
        String message = "Hi ya every one!";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName,"",message);

    }



    @Test
    public void testSendDirectExchange()  {
        //交换机名称
        String exchangeName = "yy.direct";
        // 消息
        String message = "Hi ya blue!";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName,"blue",message);

    }


    @Test
    public void testSendTopicExchange()  {
        //交换机名称
        String exchangeName = "yy.topic";
        // 消息
        String message = "Hi ya china NO.1!";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName,"china.news",message);

    }


    @Test
    public void testSendObjectQueue()  {
        Map msg = new HashMap<>();
        msg.put("name","张三");
        msg.put("age","21");
        // 发送消息
        rabbitTemplate.convertAndSend("object.queue",msg);

    }

}


查看消息,中文被java序列化。
此序列化数据长度太长,有安全等问题,建议调整。

Spring的对消息对象的处理是由org.springframwork.amqp.support.convert.MessageConvert来处理的。而默认实现是simpleMessageConverter,基于JDK的ObjectOutputStream完成序列化的。
如果要修改只需要定义一个MessageConverter类型的Bean即可。推荐用JSON方式序列化,步骤如下:

在父工程添加依赖


 com.fasterxml.jackson.core
    jackson-databind

在publisher服务声明MessageConvert
利用的SpringBoot自动封装的原理,声明的MessageConvert将覆盖默认的MessageConvert。
启动类也是配置类,所以可以放在这里。

package com.yy.spring;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;


@SpringBootApplication
public class PublisherApplication {
    public static void main(String[] args) {
        SpringApplication.run(PublisherApplication.class);
    }

    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

先删除掉之前的队列中消息,再重启publisher

接收Json

  1. 在consumer中引入jackson依赖(因为父工程已经引入,此处不需要重复引用)
  2. 在consumer定义MessageConvert(因为消息发送进行了转换,这里接收反过来也要转换)
  3. 定义消费者,监听Object.queue队列消息。

消息者定义MessageConvert

package com.yy.comsumer;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class);
    }

    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

定义消费者,监听Object.queue队列消息

package com.yy.comsumer.listener;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.time.LocalDate;
import java.time.LocalTime;
import java.util.Map;

@Component
//声明为bean,让Spring能够找到
public class SpringRabbitListener {


    @RabbitListener(queues = "object.queue")
    //使用注解,声明队列,交换机,绑定的key,这样就不需要像fanout一样声明配置类
    public void listenObjectQueue(Map msg){
        System.err.println("接收到Object.queue的消息:" + msg);
    }

}

重启消费者后就能打印

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/672395.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号