最近在学习Netty框架,为了学以致用,决定自己实现一个简单的RPC框架,该项目采用Netty进行通信,使用Spring进行搭建,目前已经实现了RPC框架的基本功能,其余的改进尚在进一步完善中,github地址:https://github.com/13350747533/HahaRpc
在这里记录一下简单的思路:
服务注册与发现:
首先是服务的注册与发现,本项目使用的是zookeeper来作为注册中心,使用ZKClient来对zookeeper的节点进行管理。
服务发现的代码:
public String discovery(String serviceName) {
//创建zookeeper节点
ZkClient zkClient = new ZkClient(zkAddress, Constant.ZK_SESSION_TIMEOUT, Constant.ZK_CONNECTION_TIMEOUT);
LOGGER.debug("connect zookeeper");
try{
//获取service节点
String servicePath = Constant.ZK_REGISTRY_PATH + "/" + serviceName;
if(!zkClient.exists(servicePath)){
throw new RuntimeException(String.format("can not find any service node on path: %s", servicePath));
}
List addressList = zkClient.getChildren(servicePath);
if(CollectionUtil.isEmpty(addressList)){
throw new RuntimeException(String.format("can not find any address node on path: %s", servicePath));
}
//获取 address 节点
String address;
int size = addressList.size();
if(size == 1) {
//若只有一个地址
address = addressList.get(0);
LOGGER.debug("get only address node: {}", address);
}else{
address = addressList.get(ThreadLocalRandom.current().nextInt(size));
LOGGER.debug("get random address node: {}", address);
}
//获取address节点的值
String addressPath = servicePath + "/" + address;
return zkClient.readData(addressPath);
}finally {
zkClient.close();
}
}
服务注册的代码:
private final ZkClient zkClient;
public ZooKeeperServiceRegistry(String zkAddress) {
zkClient = new ZkClient(zkAddress, Constant.ZK_SESSION_TIMEOUT,Constant.ZK_CONNECTION_TIMEOUT);
LOGGER.debug("connect zookeeper");
}
@Override
public void registry(String serviceName, String serviceAddress) {
//创建registry节点 (持久)
String registryPath = Constant.ZK_REGISTRY_PATH;
if(!zkClient.exists(registryPath)){
zkClient.createPersistent(registryPath);
LOGGER.debug("create registry node :{}", registryPath);
}
//创建Service节点 (持久)
String servicePath = registryPath + "/" + serviceName;
if(!zkClient.exists(servicePath)){
zkClient.createPersistent(servicePath);
LOGGER.debug("create service node : {}", servicePath);
}
//创建Address节点 (临时)
String addressPath = servicePath + "/address-";
String addressNode = zkClient.createEphemeralSequential(addressPath,serviceAddress);
LOGGER.debug("create address node : {}", addressNode);
}
处理完了服务的注册与发现以后,我们先来看一下服务端的代码。
首先我们要确定哪些方法需要被远程调用,为此我们自定义一个RPCService注解。
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcService {
Class> value();
String version() default "";
// String serlize() default "protostuff";
}
该注解继承了spring的@component,因此在spring容器被初始化的时候所有带有@RPCService注解的方法都会被扫描到。
我们定义一个NettyServer类,它要实现ApplicationContextAware, InitializingBean这两个接口。然后实现setApplicationContext方法,这个方法会完成上面提到的扫描。
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
//扫描带有RpcService注解的类并初始化 handleMapduixiang
Map serviceBeanMap = applicationContext.getBeansWithAnnotation(RpcService.class);
if(MapUtils.isNotEmpty(serviceBeanMap)){
for(Object serviceBean : serviceBeanMap.values()) {
RpcService rpcService = serviceBean.getClass().getAnnotation(RpcService.class);
String serviceName = rpcService.value().getName();
String serviceVersion = rpcService.version();
// String serialize = rpcService.serlize();
if (StringUtil.isNotEmpty(serviceVersion)){
serviceName += "-" + serviceVersion;
}
handlerMap.put(serviceName, serviceBean);
// LOGGER.debug("handlerMap is {}", handlerMap);
}
}
}
同时,我们还要实现 afterPropertiesSet()方法,该方法在spring的bean的生命周期中会在属性填充之后调用,我们利用这个方法来开启netty服务端:
public void afterPropertiesSet() throws Exception {
super.start();
}
在父类中,有我们的netty服务器的相关代码和zookeeper注册的相关代码:
public void run() {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
//创建并初始化Netty服务器 BootStrap 对象
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childHandler((new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//解码,编码,并处理请求
pipeline.addLast(new RpcDecoder(RpcRequest.class, serlizer));
pipeline.addLast(new RpcEncoder(RpcResponse.class, serlizer));
pipeline.addLast(new RpcServerHandler(handlerMap, threadPoolExecutor));
}
}));
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
//获取RPC服务器的IP和端口号
String[] addressArray = StringUtil.split(serviceAddress,":");
String ip = addressArray[0];
int port = Integer.parseInt(addressArray[1]);
//启动RPC服务器
ChannelFuture future = bootstrap.bind(ip, port).sync();
//注册RPC服务器
// LOGGER.debug("serviceRegistry is {}", serviceRegistry);
if (serviceRegistry != null) {
LOGGER.debug("1 handleMap is {}", handlerMap);
for(String interfaceName : handlerMap.keySet()){
serviceRegistry.registry(interfaceName, serviceAddress);
LOGGER.debug("registry service: {} => {}", interfaceName, serviceAddress);
}
}
LOGGER.debug("server started on port {}", port);
//关闭 RPC服务器
future.channel().closeFuture().sync();
}finally{
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
});
thread.start();
这里我们可以看到自定义的解码和编码的类RpcEncoder和RpcDecoder,以及Rpc的响应和请求的自定义类RpcRequest,RpcResponse,现在让我们展时先放一放,稍后再来看看这几个类。
现在服务端收到了来自客户端的请求,我们需要对其进行处理,定义RpcServerHandler类继承自SimpleChannelInboundHandler,在其中重写channelRead0方法:
public void channelRead0(final ChannelHandlerContext ctx, RpcRequest request) throws Exception {
threadPoolExecutor.execute(new Runnable() {
public void run() {
//创建并初始化RPC响应对象
RpcResponse response = new RpcResponse();
LOGGER.debug("cannelRead0start, request is {}", request);
response.setRequestId(request.getRequestId());
try{
Object result = handle(request);
response.setResult(result);
} catch (Exception e) {
LOGGER.error("handler result failuer", e);
response.setException(e);
}
LOGGER.debug("channelread0 voer, response is {}", response);
//写入RPC响应对象并自动关闭连接
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
});
}
在handle方法中,使用反射的方法进行调用:
private Object handle(RpcRequest request) throws Exception {
//获取服务对象
// LOGGER.debug("Request is {}", request);
String serviceName = request.getInterfaceName();
String serviceVersion = request.getServiceVersion();
if(StringUtil.isNotEmpty(serviceVersion)) {
serviceName += "-" + serviceVersion;
}
Object serviceBean = handlerMap.get(serviceName);
if (serviceBean == null) {
throw new RuntimeException(String.format("can not find service bean by key : %s", serviceName));
}
// 获取反射调用所需的参数
Class> serviceClass = serviceBean.getClass();
String methodName = request.getMethodName();
Class>[] parameterTypes = request.getParametersTypes();
Object[] parameters = request.getParameters();
//反射调用方法
Method method = serviceClass.getMethod(methodName, parameterTypes);
method.setAccessible(true);
return method.invoke(serviceBean, parameters);
//使用cglib
// FastClass serviceFastClass = FastClass.create(serviceClass);
// FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
// return serviceFastMethod.invoke(serviceBean, parameters);
}
服务端的代码到此为止,接下来让我们看看client端的代码。
Client目前只有两个类,首先是比较简单的RpcClient:
public RpcResponse send(RpcRequest request) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
//创建并初始化netty客户端Bootstrap对象
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new RpcEncoder(RpcRequest.class, serializer)); //编码RPC请求
pipeline.addLast(new RpcDecoder(RpcResponse.class, serializer)); //解码rpc响应
pipeline.addLast(RpcClient.this); //处理RPC响应
}
});
bootstrap.option(ChannelOption.TCP_NODELAY, true);
//连接RPC服务器
ChannelFuture future = bootstrap.connect(host, port).sync();
//写入RPC请求数据并关闭连接
Channel channel = future.channel();
channel.writeAndFlush(request).sync();
channel.closeFuture().sync();
//返回RPC响应对象
return response;
} finally {
group.shutdownGracefully();
}
}
就是一个标准的netty客户端的写法,接下来的是RpcProxy:
publicT create(final Class> interfaceClass, final String serviceVersion) { //创建动态代理对象 return (T) Proxy.newProxyInstance( interfaceClass.getClassLoader(), new Class>[]{interfaceClass}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // 创建RPC请求对象并设置请求属性 RpcRequest request = new RpcRequest(); request.setRequestId(UUID.randomUUID().toString()); request.setInterfaceName(method.getDeclaringClass().getName()); request.setServiceVersion(serviceVersion); request.setMethodName(method.getName()); request.setParametersTypes(method.getParameterTypes()); request.setParameters(args); //获取 RPC服务地址 if (serviceDiscovery != null) { String serviceName = interfaceClass.getName(); if (StringUtil.isNotEmpty(serviceVersion)) { serviceName += "-" + serviceVersion; } serviceAddress = serviceDiscovery.discovery(serviceName); LOGGER.debug("discovery service: {} => {}", serviceName, serviceAddress); } if (StringUtil.isEmpty(serviceAddress)) { throw new RuntimeException("server address is empty"); } //从RPC服务地址中解析主机名与端口号 String[] array = StringUtil.split(serviceAddress,":"); String host = array[0]; int port = Integer.parseInt(array[1]); //创建RPC客户端对象并发送RPC请求 RpcClient client = new RpcClient(host, port); long time = System.currentTimeMillis(); RpcResponse response = client.send(request); LOGGER.debug("time: {}ms", System.currentTimeMillis() - time); if(response == null){ throw new RuntimeException("response is null"); } //返回RPC响应结果 if(response.hasException()){ return response.getException(); }else{ return response.getResult(); } } } ); }
在create方法里面,封装请求体,调用send方法,请求服务提供者。
最后在让我们来看看之前的几个类:
public class RpcRequest {
private String requestId;
private String interfaceName;
private String serviceVersion;
private String methodName;
private Class>[] parametersTypes;
private Object[] parameters;
}
public class RpcResponse {
private String requestId;
private Exception exception;
private Object result;
public boolean hasException(){
return exception != null;
}
}
public class RpcDecoder extends ByteToMessageDecoder {
private Class> genericClass;
Serializer serializer;
public RpcDecoder(Class> genericClass, Serializer serializer){
this.serializer = serializer;
this.genericClass = genericClass;
}
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List
public class RpcEncoder extends MessageToByteEncoder {
private Class> genericClass;
private Serializer serializer;
public RpcEncoder(Class> genericClass, Serializer serializer) {
this.serializer = serializer;
this.genericClass = genericClass;
}
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
if(genericClass.isInstance(o)) {
byte[] data = SerializationUtil.serialize(o);
byteBuf.writeInt(data.length);
byteBuf.writeBytes(data);
}
}
}
RpcEncoder和RpcDecoder都继承自netty自带的解码编码器,我们可以为其指定自己的序列化方式。



