java 从零开始手写 RPC (01) 基于 socket 实现
java 从零开始手写 RPC (02)-netty4 实现客户端和服务端
java 从零开始手写 RPC (03) 如何实现客户端调用服务端?
java 从零开始手写 RPC (04) -序列化
前面我们的例子是一个固定的出参和入参,固定的方法实现。
本节将实现通用的调用,让框架具有更广泛的实用性。
基本思路所有的方法调用,基于反射进行相关处理实现。
服务端 核心类- RpcServer
调整如下:
serverBootstrap.group(workerGroup, bossGroup)
.channel(NioServerSocketChannel.class)
// 打印日志
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
// 解码 bytes=>resp
.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)))
// request=>bytes
.addLast(new ObjectEncoder())
.addLast(new RpcServerHandler());
}
})
// 这个参数影响的是还没有被accept 取出的连接
.option(ChannelOption.SO_BACKLOG, 128)
// 这个参数只是过一段时间内客户端没有响应,服务端会发送一个 ack 包,以判断客户端是否还活着。
.childOption(ChannelOption.SO_KEEPALIVE, true);
其中 ObjectDecoder 和 ObjectEncoder 都是 netty 内置的实现。
RpcServerHandlerpackage com.github.houbb.rpc.server.handler;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.common.rpc.domain.RpcRequest;
import com.github.houbb.rpc.common.rpc.domain.impl.DefaultRpcResponse;
import com.github.houbb.rpc.server.service.impl.DefaultServiceFactory;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class RpcServerHandler extends SimpleChannelInboundHandler {
private static final Log log = LogFactory.getLog(RpcServerHandler.class);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final String id = ctx.channel().id().asLongText();
log.info("[Server] channel {} connected " + id);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
final String id = ctx.channel().id().asLongText();
log.info("[Server] channel read start: {}", id);
// 接受客户端请求
RpcRequest rpcRequest = (RpcRequest)msg;
log.info("[Server] receive channel {} request: {}", id, rpcRequest);
// 回写到 client 端
DefaultRpcResponse rpcResponse = handleRpcRequest(rpcRequest);
ctx.writeAndFlush(rpcResponse);
log.info("[Server] channel {} response {}", id, rpcResponse);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
private DefaultRpcResponse handleRpcRequest(final RpcRequest rpcRequest) {
DefaultRpcResponse rpcResponse = new DefaultRpcResponse();
rpcResponse.seqId(rpcRequest.seqId());
try {
// 获取对应的 service 实现类
// rpcRequest=>invocationRequest
// 执行 invoke
Object result = DefaultServiceFactory.getInstance()
.invoke(rpcRequest.serviceId(),
rpcRequest.methodName(),
rpcRequest.paramTypeNames(),
rpcRequest.paramValues());
rpcResponse.result(result);
} catch (Exception e) {
rpcResponse.error(e);
log.error("[Server] execute meet ex for request", rpcRequest, e);
}
// 构建结果值
return rpcResponse;
}
}
和以前类似,不过 handleRpcRequest 要稍微麻烦一点。
这里需要根据发射,调用对应的方法。
pojo其中使用的出参、入参实现如下:
RpcRequestpackage com.github.houbb.rpc.common.rpc.domain;
import java.util.List;
public interface RpcRequest extends baseRpc {
long createTime();
String serviceId();
String methodName();
List paramTypeNames();
// 调用参数信息列表
Object[] paramValues();
}
RpcResponse
package com.github.houbb.rpc.common.rpc.domain;
public interface RpcResponse extends baseRpc {
Throwable error();
Object result();
}
baseRpc
package com.github.houbb.rpc.common.rpc.domain;
import java.io.Serializable;
public interface baseRpc extends Serializable {
String seqId();
baseRpc seqId(final String traceId);
}
ServiceFactory-服务工厂
为了便于对所有的 service 实现类统一管理,这里定义 service 工厂类。
ServiceFactorypackage com.github.houbb.rpc.server.service;
import com.github.houbb.rpc.server.config.service.ServiceConfig;
import com.github.houbb.rpc.server.registry.ServiceRegistry;
import java.util.List;
public interface ServiceFactory {
ServiceFactory registerServices(final List serviceConfigList);
Object invoke(final String serviceId, final String methodName,
List paramTypeNames, final Object[] paramValues);
}
DefaultServiceFactory
作为默认实现,如下:
package com.github.houbb.rpc.server.service.impl;
import com.github.houbb.heaven.constant.PunctuationConst;
import com.github.houbb.heaven.util.common.ArgUtil;
import com.github.houbb.heaven.util.lang.reflect.ReflectMethodUtil;
import com.github.houbb.heaven.util.util.CollectionUtil;
import com.github.houbb.rpc.common.exception.RpcRuntimeException;
import com.github.houbb.rpc.server.config.service.ServiceConfig;
import com.github.houbb.rpc.server.service.ServiceFactory;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class DefaultServiceFactory implements ServiceFactory {
private Map serviceMap;
private Map methodMap;
private static final DefaultServiceFactory INSTANCE = new DefaultServiceFactory();
private DefaultServiceFactory(){}
public static DefaultServiceFactory getInstance() {
return INSTANCE;
}
@Override
public synchronized ServiceFactory registerServices(List serviceConfigList) {
ArgUtil.notEmpty(serviceConfigList, "serviceConfigList");
// 集合初始化
serviceMap = new HashMap<>(serviceConfigList.size());
// 这里只是预估,一般为2个服务。
methodMap = new HashMap<>(serviceConfigList.size()*2);
for(ServiceConfig serviceConfig : serviceConfigList) {
serviceMap.put(serviceConfig.id(), serviceConfig.reference());
}
// 存放方法名称
for(Map.Entry entry : serviceMap.entrySet()) {
String serviceId = entry.getKey();
Object reference = entry.getValue();
//获取所有方法列表
Method[] methods = reference.getClass().getMethods();
for(Method method : methods) {
String methodName = method.getName();
if(ReflectMethodUtil.isIgnoreMethod(methodName)) {
continue;
}
List paramTypeNames = ReflectMethodUtil.getParamTypeNames(method);
String key = buildMethodKey(serviceId, methodName, paramTypeNames);
methodMap.put(key, method);
}
}
return this;
}
@Override
public Object invoke(String serviceId, String methodName, List paramTypeNames, Object[] paramValues) {
//参数校验
ArgUtil.notEmpty(serviceId, "serviceId");
ArgUtil.notEmpty(methodName, "methodName");
// 提供 cache,可以根据前三个值快速定位对应的 method
// 根据 method 进行反射处理。
// 对于 paramTypes 进行 string 连接处理。
final Object reference = serviceMap.get(serviceId);
final String methodKey = buildMethodKey(serviceId, methodName, paramTypeNames);
final Method method = methodMap.get(methodKey);
try {
return method.invoke(reference, paramValues);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RpcRuntimeException(e);
}
}
private String buildMethodKey(String serviceId, String methodName, List paramTypeNames) {
String param = CollectionUtil.join(paramTypeNames, PunctuationConst.AT);
return serviceId+PunctuationConst.colon+methodName+PunctuationConst.colon
+param;
}
}
ServiceRegistry-服务注册类
接口
package com.github.houbb.rpc.server.registry;
public interface ServiceRegistry {
ServiceRegistry port(final int port);
ServiceRegistry register(final String serviceId, final Object serviceImpl);
ServiceRegistry expose();
}
实现
package com.github.houbb.rpc.server.registry.impl;
import com.github.houbb.heaven.util.common.ArgUtil;
import com.github.houbb.rpc.common.config.protocol.ProtocolConfig;
import com.github.houbb.rpc.server.config.service.DefaultServiceConfig;
import com.github.houbb.rpc.server.config.service.ServiceConfig;
import com.github.houbb.rpc.server.core.RpcServer;
import com.github.houbb.rpc.server.registry.ServiceRegistry;
import com.github.houbb.rpc.server.service.impl.DefaultServiceFactory;
import java.util.ArrayList;
import java.util.List;
public class DefaultServiceRegistry implements ServiceRegistry {
private static final DefaultServiceRegistry INSTANCE = new DefaultServiceRegistry();
private int rpcPort;
private ProtocolConfig protocolConfig;
private List serviceConfigList;
private DefaultServiceRegistry(){
// 初始化默认参数
this.serviceConfigList = new ArrayList<>();
this.rpcPort = 9527;
}
public static DefaultServiceRegistry getInstance() {
return INSTANCE;
}
@Override
public ServiceRegistry port(int port) {
ArgUtil.positive(port, "port");
this.rpcPort = port;
return this;
}
@Override
@SuppressWarnings("unchecked")
public synchronized DefaultServiceRegistry register(final String serviceId, final Object serviceImpl) {
ArgUtil.notEmpty(serviceId, "serviceId");
ArgUtil.notNull(serviceImpl, "serviceImpl");
// 构建对应的其他信息
ServiceConfig serviceConfig = new DefaultServiceConfig();
serviceConfig.id(serviceId).reference(serviceImpl);
serviceConfigList.add(serviceConfig);
return this;
}
@Override
public ServiceRegistry expose() {
// 注册所有服务信息
DefaultServiceFactory.getInstance()
.registerServices(serviceConfigList);
// 暴露 netty server 信息
new RpcServer(rpcPort).start();
return this;
}
}
ServiceConfig 是一些服务的配置信息,接口定义如下:
package com.github.houbb.rpc.server.config.service; public interface ServiceConfig测试 maven 引入{ String id(); ServiceConfig id(String id); T reference(); ServiceConfig reference(T reference); }
引入服务端的对应 maven 包:
服务端启动com.github.houbb rpc-server 0.0.6
// 启动服务
DefaultServiceRegistry.getInstance()
.register(ServiceIdConst.CALC, new CalculatorServiceImpl())
.expose();
这里注册了一个计算服务,并且设置对应的实现。
和以前实现类似,此处不再赘述。
启动日志:
[DEBUG] [2021-10-05 13:39:42.638] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter. [INFO] [2021-10-05 13:39:42.645] [Thread-0] [c.g.h.r.s.c.RpcServer.run] - RPC 服务开始启动服务端 十月 05, 2021 1:39:43 下午 io.netty.handler.logging.LoggingHandler channelRegistered 信息: [id: 0xec4dc74f] REGISTERED 十月 05, 2021 1:39:43 下午 io.netty.handler.logging.LoggingHandler bind 信息: [id: 0xec4dc74f] BIND: 0.0.0.0/0.0.0.0:9527 十月 05, 2021 1:39:43 下午 io.netty.handler.logging.LoggingHandler channelActive 信息: [id: 0xec4dc74f, L:/0:0:0:0:0:0:0:0:9527] ACTIVE [INFO] [2021-10-05 13:39:43.893] [Thread-0] [c.g.h.r.s.c.RpcServer.run] - RPC 服务端启动完成,监听【9527】端口
ps: 写到这里忽然发现忘记添加对应的 register 日志了,这里可以添加对应的 registerListener 拓展。
小结为了便于大家学习,以上源码已经开源:
https://github.com/houbb/rpc
希望本文对你有所帮助,如果喜欢,欢迎点赞收藏转发一波。
我是老马,期待与你的下次重逢。



