Netty框架设计为异步驱动。使用这种类比,它可以以最少的线程使用量处理大量连接。如果您要创建一个使用netty框架将调用分派到远程位置的api,则应该对调用使用相同的类比。
而不是让api直接返回值,而是使其返回a
Future<?>或a
Promise<?>。在您的应用程序中有多种实现此系统的方法,最简单的方法是创建一个自定义处理程序,该处理程序将传入的请求映射到
PromiseFIFO队列中的。
例如:
这在很大程度上基于这样的回答,我在过去的提交。
我们从out处理程序开始,该处理程序将请求映射到管道中的请求:
public class MyLastHandler extends SimpleInboundHandler<String> { private final SynchronousQueue<Promise<String>> queue; public MyLastHandler (SynchronousQueue<Promise<String>> queue) { super(); this.queue = queue; } // The following is called messageReceived(ChannelHandlerContext, String) in 5.0. @Override public void channelRead0(ChannelHandlerContext ctx, String msg) { this.queue.remove().setSuccss(msg); // Or setFailure(Throwable) }}然后,我们需要一种将命令发送到远程服务器的方法:
Channel channel = ....;SynchronousQueue<Promise<String>> queue = ....;public Future<String> sendCommandAsync(String command) { return sendCommandAsync(command, new DefaultPromise<>());}public Future<String> sendCommandAsync(String command, Promise<String> promise) { synchronized(channel) { queue.offer(promise); channel.write(command); } channel.flush();}完成方法之后,我们需要一种方法来调用它:
sendCommandAsync("USER anonymous", new DefaultPromise<>().addListener( (Future<String> f) -> { String response = f.get(); if (response.startWidth("331")) { // do something } // etc } ));如果被叫者想使用我们的api作为阻塞调用,他也可以这样做:
String response = sendCommandAsync("USER anonymous").get();if (response.startWidth("331")) { // do something}// etc请注意,如果Thread状态被中断,则
Future.get()可能抛出
InterruptedException,这与套接字读取操作不同,套接字读取操作只能通过套接字上的某些交互来取消。在中,此异常应该不是问题
FutureListener。



