在之前,我们实现了基于传统BIO方式的传输模式。这里,我们将使用效率更高、更方便的Netty进行传输。
实现过程如下: 一、导入依赖 1.导入Netty依赖
RPC-CORE
2.导入jackson依赖io.netty netty-all 4.1.50.Final
二、序列化 1.序列化接口com.fasterxml.jackson.core jackson-core 2.11.0 com.fasterxml.jackson.core jackson-databind 2.11.0 com.fasterxml.jackson.core jackson-annotations 2.11.0
创建CommonSerializer接口(switch case 方便后续添加其他序列化方法)
package com.t598.core.serializer;
public interface CommonSerializer {
byte[] serializer(Object object);
Object deserialize(byte[] bytes, Class> clazz) throws Exception;
int getCode();
static CommonSerializer getByte(int code) {
switch (code) {
case 1:
return new JsonSerializer();
default:
return null;
}
}
}
2.实现JSON的序列化器
使用JSON序列化器无法保证反序列化后仍然为原实例类型,后续会修改
package com.t598.core.serializer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.t598.common.entity.RpcRequest;
import com.t598.common.enumeration.SerializerCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
public class JsonSerializer implements CommonSerializer{
private static final Logger logger = LoggerFactory.getLogger(JsonSerializer.class);
private ObjectMapper objectMapper = new ObjectMapper();
@Override
public byte[] serializer(Object object) {
try {
// 将Object值序列化为字节数组
return objectMapper.writevalueAsBytes(object);
} catch (JsonProcessingException e) {
logger.error("序列化时发生错误:{}", e.getMessage());
e.printStackTrace();
return null;
}
}
@Override
public Object deserialize(byte[] bytes, Class> clazz) throws Exception {
try {
// 根据clazz将bytes反序列换为对应的实例,还需要判断反序列化后实例类型是否正确
// 因为RepRequest中有一个Object的数据,进行反序列时可能会失败,
Object obj = objectMapper.readValue(bytes, clazz);
if (obj instanceof RpcRequest) {
obj = handleRequest(obj);
}
return obj;
} catch (IOException e) {
logger.error("反序列化时发生错误:{}", e.getMessage());
e.printStackTrace();
return null;
}
}
@Override
public int getCode() {
return SerializerCode.valueOf("JSON").getCode();
}
private Object handleRequest(Object obj) throws IOException {
RpcRequest rpcRequest = (RpcRequest) obj;
for (int i = 0; i < rpcRequest.getParamTypes().length; i++) {
Class> clazz = rpcRequest.getParamTypes()[i];
// 确定此Class对象表示的类或接口是否与指定的Class参数表示的类或接口相同,
// 或者是其超类或超接口。 如果是,则返回true ; 否则返回false 。
if (!clazz.isAssignableFrom(rpcRequest.getParameters()[i].getClass())) {
// 如果不对应,则将这个参数重新反序列化为对应的参数类型。
byte[] bytes = objectMapper.writevalueAsBytes(rpcRequest.getParameters()[i]);
rpcRequest.getParameters()[i] = objectMapper.readValue(bytes, clazz);
}
}
return rpcRequest;
}
}
三、协议
1.自定义协议
@Override
public void start(int port) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.option(ChannelOption.SO_BACKLOG, 256) //服务端接收连接的队列长度
.option(ChannelOption.SO_KEEPALIVE, true) //心跳机制,保持心跳连接
.childOption(ChannelOption.TCP_NODELAY, true) //立即发送数据
.childHandler(new ChannelInitializer(){
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new CommonEncoder(new JsonSerializer()));
pipeline.addLast(new CommonDecoder());
pipeline.addLast(new NettyServerHandler());
}
});
ChannelFuture future = serverBootstrap.bind(port).sync();
future.channel().closeFuture().sync();
}catch (InterruptedException e){
logger.error("启动服务器时有错误发生: ", e);
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
NettyClient
package com.t598.core.transport.netty.client;
import com.t598.common.entity.RpcRequest;
import com.t598.common.entity.RpcResponse;
import com.t598.core.codec.CommonDecoder;
import com.t598.core.codec.CommonEncoder;
import com.t598.core.serializer.JsonSerializer;
import com.t598.core.transport.RpcClient;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.AttributeKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NettyClient implements RpcClient {
private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);
private String host;
private int port;
private static final Bootstrap bootstrap;
public NettyClient(String host, int port) {
this.host = host;
this.port = port;
}
static {
NioEventLoopGroup group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new CommonDecoder());
pipeline.addLast(new CommonEncoder(new JsonSerializer()));
pipeline.addLast(new NettyClientHandler());
}
})
.option(ChannelOption.SO_KEEPALIVE, true);
}
@Override
public Object sendRequest(RpcRequest rpcRequest) {
try {
ChannelFuture future = bootstrap.connect(host, port).sync();
logger.info("客户端连接到服务器:{}:{}", host, port);
// 获取到连接的通道
Channel channel = future.channel();
if (channel != null) {
// 异步方法进行监听,当发送数据后判断是否发送成功,如果成功的话打印成功的日志,否则打印失败的日志
// 这里的发送时非阻塞的,判断是否发送成功但不能得到数据,
channel.writeAndFlush(rpcRequest).addListener(future1 -> {
if (future1.isSuccess()) {
logger.info(String.format("客户端发送消息:%s", rpcRequest.toString()));
} else {
logger.error("发送消息时有错误发生:", future.cause());
}
});
channel.closeFuture().sync();
// 这里通过获取到key 的方式获取到返回结果。
AttributeKey key = AttributeKey.valueOf("rpcResponse");
RpcResponse rpcResponse = channel.attr(key).get();
return rpcResponse.getData();
}
} catch (InterruptedException e) {
logger.error("发送消息时有错误发生:", e);
}
return null;
}
}
3.Handler
NettyClientHandler
package com.t598.core.transport.netty.client; import com.t598.common.entity.RpcRequest; import com.t598.common.entity.RpcResponse; import com.t598.common.factory.SingletonFactory; import com.t598.core.serializer.CommonSerializer; import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.ReferenceCountUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; public class NettyClientHandler extends SimpleChannelInboundHandler{ private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class); private final UnprocessedRequests unprocessedRequests; public NettyClientHandler() { this.unprocessedRequests = SingletonFactory.getInstance(UnprocessedRequests.class); } @Override protected void channelRead0(ChannelHandlerContext ctx, RpcResponse msg) throws Exception { try { logger.info(String.format("客户端接收到消息: %s", msg)); unprocessedRequests.complete(msg); } finally { ReferenceCountUtil.release(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.error("过程调用时有错误发生:"); cause.printStackTrace(); ctx.close(); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleState state = ((IdleStateEvent) evt).state(); if (state == IdleState.WRITER_IDLE) { logger.info("发送心跳包 [{}]", ctx.channel().remoteAddress()); Channel channel = ChannelProvider.get((InetSocketAddress) ctx.channel().remoteAddress(), CommonSerializer.getByte(CommonSerializer.DEFAULT_SERIALIZER)); RpcRequest rpcRequest = new RpcRequest(); rpcRequest.setHeartBeat(true); channel.writeAndFlush(rpcRequest).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } } else { super.userEventTriggered(ctx, evt); } } }
NettyServerHandler
package com.t598.core.transport.netty.server; import com.t598.common.entity.RpcRequest; import com.t598.common.entity.RpcResponse; import com.t598.common.factory.SingletonFactory; import com.t598.core.handler.RequestHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.ReferenceCountUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class NettyServerHandler extends SimpleChannelInboundHandler4.额外的类{ private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class); private final RequestHandler requestHandler; public NettyServerHandler() { this.requestHandler = SingletonFactory.getInstance(RequestHandler.class); } @Override protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) throws Exception { try { if(msg.getHeartBeat()) { logger.info("接收到客户端心跳包..."); return; } logger.info("服务器接收到请求: {}", msg); Object result = requestHandler.handle(msg); if (ctx.channel().isActive() && ctx.channel().isWritable()) { ctx.writeAndFlush(RpcResponse.success(result, msg.getRequestId())); } else { logger.error("通道不可写"); } } finally { ReferenceCountUtil.release(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.error("处理过程调用时有错误发生:"); cause.printStackTrace(); ctx.close(); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleState state = ((IdleStateEvent) evt).state(); if (state == IdleState.READER_IDLE) { logger.info("长时间未收到心跳包,断开连接..."); ctx.close(); } } else { super.userEventTriggered(ctx, evt); } } }
PackageType(PRC-COMMON)
package com.t598.common.enumeration;
import lombok.AllArgsConstructor;
import lombok.Getter;
@AllArgsConstructor
@Getter
public enum PackageType {
REQUEST_PACK(0),
RESPONSE_PACK(1);
private final int code;
}
SerializerCode(PRC-COMMON)
package com.t598.common.enumeration;
import lombok.AllArgsConstructor;
import lombok.Getter;
@AllArgsConstructor
@Getter
public enum SerializerCode {
KRYO(0),
JSON(1),
HESSIAN(2),
PROTOBUF(3);
private final int code;
}



