为了展示 RPC 服务如何使用,我们将把配置文件的名称从 “Sender” 和 “Receiver” 更改为 “Client” 和 “Server”,当我们调用服务器时,我们将获得参数的斐波那契值。
Integer response = (Integer) template.convertSendAndReceive(exchange.getName(), "rpc", start++);
System.out.println(" [.] Got '" + response + "'");回调队列(Callback queue)一般来说通过 RabbitMQ 来实现 RPC 是很容易的。一个客户端发送请求信息,服务器端将其应用到一个回复信息中。为了接收到回复信息,客户端需要在发送请求的时候同时发送一个回调队列(callback queue)的地址。
当我们使用上面的 convertSendAndReceive() 方法时,Spring AMQP 的 RabbitTemplate 为我们处理回调队列,使用 RabbitTemplate 时无需做任何其他设置。
关联标识(Correlation Id)上边介绍的方法中,我们建议给每一个 RPC 请求新建一个回调队列。这不是一个高效的做法,幸好这儿有一个更好的办法 —— 我们可以为每个客户端只建立一个独立的回调队列。
这就带来一个新问题,当此队列接收到一个响应的时候它无法辨别出这个响应是属于哪个请求的。correlationId 就是为了解决这个问题而来的。我们给每个请求设置一个独一无二的值。稍后,当我们从回调队列中接收到一个消息的时候,我们就可以查看这条属性从而将响应和请求匹配起来。如果我们接手到的消息的 correlationId 是未知的,那就直接销毁掉它,因为它不属于我们的任何一条请求。
你也许会问,为什么我们接收到未知消息的时候不抛出一个错误,而是要将它忽略掉?这是为了解决服务器端有可能发生的竞争情况。尽管可能性不大,但 RPC 服务器还是有可能在已将应答发送给我们但还未将确认消息发送给请求的情况下死掉。如果这种情况发生,RPC 在重启后会重新处理请求。这就是为什么我们必须在客户端优雅的处理重复响应,同时 RPC 也需要尽可能保持幂等性。
总结我们的 RPC 如此工作:
当客户端启动的时候,它创建一个匿名独享的回调队列。
在 RPC 请求中,客户端发送带有两个属性的消息:一个是设置回调队列的 replyTo 属性,另一个是设置唯一值的 correlationId 属性。
将请求发送到一个 rpc_queue(tut.rpc) 队列中。
RPC 工作者(又名:服务器)等待请求发送到这个队列中来。当请求出现的时候,它执行他的工作并且将带有执行结果的消息发送给 replyTo 字段指定的队列。
客户端等待回调队列里的数据。当有消息出现的时候,它会检查 correlationId 属性。如果此属性的值与请求匹配,将它返回给应用。 整合到一起
斐波那契数列任务:
private int fib(int i) {
return (i == 0 || i == 1) ? i : (fib(i - 2) + fib(i - 1));
}我们定义一个斐波那契的方法,假定只有有效的正整数输入。(不要指望它为大数据工作,这可能是最慢的递归实现)
配置类@Profile({"tut6", "rpc"})
@Configuration
public class Tut6Config {
@Profile("client")
private static class ClientConfig {
@Bean
public DirectExchange exchange() {
return new DirectExchange("tut.rpc");
}
@Bean
public Tut6Client client() {
return new Tut6Client();
}
}
@Profile("server")
private static class ServerConfig {
@Bean
public Queue queue() {
return new Queue("tut.rpc.requests");
}
@Bean
public DirectExchange exchange() {
return new DirectExchange("tut.rpc");
}
@Bean
public Binding binding(DirectExchange exchange, Queue queue) {
return BindingBuilder.bind(queue)
.to(exchange)
.with("rpc");
}
@Bean
public Tut6Server server() {
return new Tut6Server();
}
}
}服务端public class Tut6Server {
@RabbitListener(queues = "tut.rpc.requests")
public int process(int in) {
System.out.println(" [x] Received request for " + in);
int result = fib(in);
System.out.println(" [.] Returned " + result);
return result;
}
private int fib(int i) {
return (i == 0 || i == 1) ? i : (fib(i - 2) + fib(i - 1));
}
}客户端public class Tut6Client {
@Autowired
private AmqpTemplate template;
@Autowired
private DirectExchange exchange;
private int start = 0;
@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void send() {
System.out.println(" [x] Requesting fib(" + start + ")");
Integer response = (Integer) template
.convertSendAndReceive(exchange.getName(), "rpc", start++);
System.out.println(" [.] Got '" + response + "'");
}
}运行maven 编译
mvn clean package -Dmaven.test.skip=true
运行
java -jar target/rabbitmq-tutorial-0.0.1-SNAPSHOT.jar --spring.profiles.active=tut6,server --tutorial.client.duration=60000 java -jar target/rabbitmq-tutorial-0.0.1-SNAPSHOT.jar --spring.profiles.active=tut6,client --tutorial.client.duration=60000



