栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

消息中间件RabbitMQ(五)——实现RPC调用

消息中间件RabbitMQ(五)——实现RPC调用

文章目录

1. RPC2. 实现原理3. 代码实现

3.1 客户端实现3.2 服务端实现3.3 测试 4. 小结

1. RPC

对于微服务开发者,对于 RPC(Remote Procedure Call Protocol 远程过程调用协议)并不会陌生吧, RESTful API、Dubbo、WebService等都是RPC的实现调用

在RabbitMQ中也提供了 RPC 功能,并且使用起来很简单,下面就来学习一下

2. 实现原理

再来熟悉下原理图

上图把RPC的过程描述的很清楚:

Client先发送一条消息,和普通的消息相比,消息多了两个关键内容:一个是 correlation_id,表示这条消息的唯一 id,一个是 reply_to,表示回复队列的名字Server从消息发送队列获取消息并处理相应的业务逻辑,处理完成后,将处理结果发送到 reply_to指定的回调队列中Client从回调队列中读取消息,就可知道执行结果 3. 代码实现 3.1 客户端实现

客户端配置文件:application.properties

server.port=8889
spring.rabbitmq.host=192.168.3.157
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
# 开启消息确认
spring.rabbitmq.publisher-/confirm/i-type=correlated
# 开启发送失败退回
spring.rabbitmq.publisher-returns=true

spring.rabbitmq.publisher-/confirm/i-type=correlated这项配置作用是:通过 correlated来确认消息。

只有开启了这个配置,将来的消息中才会带 correlation_id,只有通过 correlation_id才能将发送的消息和返回值之间关联起来

客户端配置类:

package com.scorpios.rabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RPCRabbitMQConfig {


    // 交换机的名称
    public static final String SCORPIOS_RPC_EXCHANGE_NAME = "scorpios_rpc_exchange_name";


    // 发送队列名称
    public static final String SCORPIOS_RPC_MSG_QUEUE = "scorpios_rpc_msg_queue";

    // 返回队列名称
    public static final String SCORPIOS_RPC_REPLY_QUEUE = "scorpios_rpc_reply_queue";

    @Bean
    TopicExchange topicExchange(){
        return new TopicExchange(RPCRabbitMQConfig.SCORPIOS_RPC_EXCHANGE_NAME,true,false);
    }

    @Bean
    Queue queueOne() {
        return new Queue(RPCRabbitMQConfig.SCORPIOS_RPC_MSG_QUEUE,true,false,false);
    }

    @Bean
    Queue queueTwo() {
        return new Queue(RPCRabbitMQConfig.SCORPIOS_RPC_REPLY_QUEUE,true,false,false);
    }


    
    @Bean
    Binding bindingMsg(){
        return BindingBuilder.bind(queueOne()).to(topicExchange()).with(RPCRabbitMQConfig.SCORPIOS_RPC_MSG_QUEUE);
    }

    
    @Bean
    Binding bindingReply(){
        return BindingBuilder.bind(queueTwo()).to(topicExchange()).with(RPCRabbitMQConfig.SCORPIOS_RPC_REPLY_QUEUE);
    }

    
    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
       RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
       rabbitTemplate.setReplyAddress(RPCRabbitMQConfig.SCORPIOS_RPC_REPLY_QUEUE);
       rabbitTemplate.setReplyTimeout(5000);
       return rabbitTemplate;
    }

    
    @Bean
    SimpleMessageListenerContainer replyContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(RPCRabbitMQConfig.SCORPIOS_RPC_REPLY_QUEUE);
        container.setMessageListener(rabbitTemplate(connectionFactory));
        return container;
    }

}

上面代码解释说明:

定义一个TopicExchange交换机,一个MsgQueue队列,一个ReplyQueue,并与交换机进行绑定自定义一个RabbitTemplate用户发送消息,虽然在 SpringBoot中,默认情况下系统自动提供RabbitTemplate,但是这里需要对该RabbitTemplate重新进行定制,因为要给RabbitTemplate添加返回队列,最后还需要给返回队列设置一个监听器

下面来编写消息发送代码:

@Slf4j
@RestController
public class RabbitMQController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/send/message")
    public String send(String message) {

        // 创建消息对象
        Message newMessage = MessageBuilder.withBody(message.getBytes()).build();

        log.info("Client 发送的消息为:{}", newMessage);

        // 客户端给消息队列发送消息,并返回响应结果
        Message result = rabbitTemplate.sendAndReceive(RPCRabbitMQConfig.SCORPIOS_RPC_EXCHANGE_NAME, RPCRabbitMQConfig.SCORPIOS_RPC_MSG_QUEUE, newMessage);

        String response = "";
        if (result != null) {
            // 获取已发送的消息的 correlationId
            String correlationId = newMessage.getMessageProperties().getCorrelationId();
            log.info("发送消息的correlationId为:{}", correlationId);

            // 获取响应头信息
            HashMap headers = (HashMap) result.getMessageProperties().getHeaders();

            // 获取 server 返回的消息 correlationId
            String msgId = (String) headers.get("spring_returned_message_correlation");

            // 将已发送的消息的 correlationId与server返回的消息 correlationId进行对比,相同则取出响应结果
            if (msgId.equals(correlationId)) {
                response = new String(result.getBody());
                log.info("client 收到的响应结果为:{}", response);
            }
        }
        return response;
    }

}

解释说明:

消息发送调用 sendAndReceive方法,该方法自带返回值,返回值就是服务端返回的消息服务端返回的消息中,头信息中包含了 spring_returned_message_correlation字段,这就是消息发送时的 correlation_id,通过消息发送时的 correlation_id以及返回消息头中的 spring_returned_message_correlation字段值,就可以将返回的消息内容和发送的消息绑定到一起,确认出这个返回的内容就是针对这个发送的消息的

注意:如果没有在 application.properties 中配置 correlated,发送的消息中就没有 correlation_id,这样就无法将返回的消息内容和发送的消息内容关联起来

3.2 服务端实现

服务端配置文件 application.properties与客户端中的配置文件一致

服务端配置类:

@Configuration
public class RPCServerRabbitMQConfig {


    // 交换机的名称
    public static final String SCORPIOS_RPC_EXCHANGE_NAME = "scorpios_rpc_exchange_name";


    // 发送队列名称
    public static final String SCORPIOS_RPC_MSG_QUEUE = "scorpios_rpc_msg_queue";

    // 返回队列名称
    public static final String SCORPIOS_RPC_REPLY_QUEUE = "scorpios_rpc_reply_queue";

    @Bean
    TopicExchange topicExchange(){
        return new TopicExchange(RPCServerRabbitMQConfig.SCORPIOS_RPC_EXCHANGE_NAME,true,false);
    }

    @Bean
    Queue queueOne() {
        return new Queue(RPCServerRabbitMQConfig.SCORPIOS_RPC_MSG_QUEUE,true,false,false);
    }

    @Bean
    Queue queueTwo() {
        return new Queue(RPCServerRabbitMQConfig.SCORPIOS_RPC_REPLY_QUEUE,true,false,false);
    }


    @Bean
    Binding bindingMsg(){
        return BindingBuilder.bind(queueOne()).to(topicExchange()).with(RPCServerRabbitMQConfig.SCORPIOS_RPC_MSG_QUEUE);
    }

    @Bean
    Binding bindingReply(){
        return BindingBuilder.bind(queueTwo()).to(topicExchange()).with(RPCServerRabbitMQConfig.SCORPIOS_RPC_REPLY_QUEUE);
    }

}

最后我们再来看下消息的消费:

@Slf4j
@Component
public class RpcServerConsumer {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    // 此消费者消费msgQueue队列中的消息
    @RabbitListener(queues = RPCServerRabbitMQConfig.SCORPIOS_RPC_MSG_QUEUE)
    public void process(Message msg) {
        log.info("server 收到msgQueue队列中的消息为 : {}",msg.toString());
        Message response = MessageBuilder.withBody(("我是服务端Server,收到的消息为:"+new String(msg.getBody())).getBytes()).build();
        // 把收到的原消息的CorrelationId取出
        CorrelationData correlationData = new CorrelationData(msg.getMessageProperties().getCorrelationId());
        // 想replyQueue队列发送确认消息
        rabbitTemplate.sendAndReceive(RPCServerRabbitMQConfig.SCORPIOS_RPC_EXCHANGE_NAME, RPCServerRabbitMQConfig.SCORPIOS_RPC_REPLY_QUEUE, response, correlationData);
    }
    
}

解释说明:

服务端首先收到消息并打印出来服务端提取出原消息中的 correlation_id服务端调用 sendAndReceive方法,将消息发送给 replyQueue队列,同时带上 correlation_id参数 3.3 测试

启动Client与Server服务,并在浏览器中输入:http://localhost:8889/send/scorpios

Client服务日志:

Server服务日志:

浏览器响应结果:

4. 小结

再来看一下这个原理图:

定义一个Exchange交换机,两个队列:MsgQueue、ReplyQueueClient调用 sendAndReceive方法向MsgQueue队列中发送消息,该方法自带返回值,返回值就是服务端返回的消息在Server端消费MsgQueue队列消息后,往ReplayQueue中发送消息

代码地址:https://github.com/Hofanking/springboot-rabbitmq-example

springboot-rabbitmq-rpc-client

springboot-rabbitmq-rpc-server

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/728892.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号