服务器: Spring boot 启动服务 ( http://127.0.0.1:8080/ )
package org.kayla.rpcfx.provider;
@RestController
@SpringBootApplication
@EnableAspectJAutoProxy
@Slf4j
public class RpcfxServerApplication {
public static void main(String[] args) {
SpringApplication.run(RpcfxServerApplication.class, args);
}
@Autowired
RpcfxInvoker invoker;
@PostMapping("/")
public RpcfxResponse invoke(@RequestBody RpcfxRequest request) {
return invoker.invoke(request);
}
}
客户端:
package org.kayla.rpcfx.core.client.netty.client;
@Slf4j
public class ClientBootStrap {
static final String HOST = "127.0.0.1";
static final int PORT = 8080;
public static void main(String[] args) throws Exception {
//创建reactor 线程组
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
//1 设置reactor 线程组
b.group(workerGroup);
//2 设置nio类型的channel
b.channel(NioSocketChannel.class);
//3 设置监听端口
b.remoteAddress(HOST, PORT);
//4 设置通道的参数
b.option(ChannelOption.SO_KEEPALIVE, true);
//5 装配通道流水线
b.handler(new ClientInitializer());
ChannelFuture f = b.connect();
f.addListener((ChannelFuture futureListener) ->
{
if (futureListener.isSuccess()) {
log.info("EchoClient客户端连接成功!");
} else {
log.info("EchoClient客户端连接失败!");
}
});
// 阻塞,直到连接完成
f.sync();
Channel channel = f.channel();
RpcfxRequest request = new RpcfxRequest();
request.setServiceClass("org.kayla.rpcfx.api.UserService");
request.setMethod("findById");
request.setParams(new Object[]{1});
String reqJson = JSON.toJSonString(request);
DefaultFullHttpRequest req = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST,
"/");
req.setDecoderResult(DecoderResult.SUCCESS);
req.headers().add(HttpHeaderNames.CONTENT_TYPE, "application/json;charset=UTF-8");
req.headers().add(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
// req.headers().add(HttpHeaderNames.ACCEPT, "application/json");
req.headers().add(HttpHeaderNames.HOST, "127.0.0.1:8080");
ByteBuf buffer = req.content().clear();
int p0 = buffer.writerIndex();
buffer.writeBytes(reqJson.getBytes());
int p1 = buffer.writerIndex();
int i = buffer.readableBytes();
System.out.println("buffer.readableBytes(): " + i);
System.out.println("p1 - p0: " + (p1 - p0) );
// req.headers().add(HttpHeaderNames.CONTENT_LENGTH, p1 - p0);
req.headers().add(HttpHeaderNames.CONTENT_LENGTH, buffer.readableBytes());
channel.writeAndFlush(req).sync();
// 7 等待通道关闭的异步任务结束
// 服务监听通道会一直等待通道关闭的异步任务结束
ChannelFuture closeFuture = channel.closeFuture();
closeFuture.sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
package org.kayla.rpcfx.core.client.netty.client; public class ClientInitializer extends ChannelInitializer{ @Override protected void initChannel(SocketChannel sh) throws Exception { ChannelPipeline pipeline = sh.pipeline(); pipeline.addLast(new HttpClientCodec()); pipeline.addLast(new HttpObjectAggregator(65536)); pipeline.addLast(new ClientInboundHandler()); pipeline.addLast(new ClientOutboundHandler()); } } @Slf4j class ClientInboundHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) throws Exception { log.info("channelRead0"); ByteBuf content = msg.content(); int len = content.readableBytes(); byte[] arr = new byte[len]; content.getBytes(0, arr); log.info(new String(arr, "UTF-8")); } } @Slf4j class ClientOutboundHandler extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { log.info("write"); if (msg instanceof FullHttpRequest) { FullHttpRequest request = (FullHttpRequest) msg; ByteBuf content = request.content(); int len = content.readableBytes(); byte[] arr = new byte[len]; content.getBytes(0, arr); log.info(new String(arr, "UTF-8")); } super.write(ctx, msg, promise); } }
http - Netty 5 sending JSON POST request - Stack Overflow



