上一篇文章中的项目比较乱,并且只能对一个类中的方法进行远程调用,如果需要对其他类中的方法进行远程调用还需要进行修改,这一节把远程调用一些关键的信息提取出来,通过 RpcRequest 和 RpcResponse 来传输关键信息。
网络传输这里也为了之后升级为 netty 增加了 RpcRequestTransport 接口,现在先实现 Socket 传输的方法。
其他就是优化了 RpcClientProxy 客户端动态代理,因为将网络传输独立处理,之前的 Proxy 类需要进行修改。还优化了 Server 客户端,将处理的部分独立成 RpcServerHandler 类,后续可以升级为异步处理。
升级后的项目结构如下:
RpcRequest 主要定义了远程调用Id、类名、方法名、参数、参数类型等。
RpcResponse 主要定义了调用Id、返回结果,message 等。
为了保证动态代理过程中可以获取到远程调用的类名以及网络传输方式,在代理类的创建过程中以构造函数的方式传入。
主要业务流程是:
- 服务端启动客户端生成调用 RpcClientProxy 中的 getProxy 方法得到代理类,通过代理类发起远程调用服务端接收到客户端请求后将 RpcRequest 交给 RpcServerHandler 处理并返回服务端生成 RpcResponse 返回客户端
可以看到现在和真正的 Rpc 还有一些距离,下一节会增加基于 Zookeeper 的注册中心,服务端会将自己的服务注册到注册中心,而客户端会通过注册中心来获取服务地址,然后进行请求。
以下是关键类代码:
package common.dto;
import lombok.*;
import java.io.Serializable;
@AllArgsConstructor
@NoArgsConstructor
@Getter
@Builder
@ToString
public class RpcRequest implements Serializable {
private static final long serialVersionUID = 1905122041950251207L;
private String requestId;
private String interfaceName;
private String className;
private String methodName;
private Object[] parameters;
private Class>[] paramTypes;
private String version;
private String group;
public String getRpcServiceName() {
return this.getInterfaceName() + this.getGroup() + this.getVersion();
}
}
package common.dto; import lombok.*; import java.io.Serializable; @AllArgsConstructor @NoArgsConstructor @Getter @Setter @Builder @ToString public class RpcResponseimplements Serializable { private String requestId; private Integer code; private String message; private T data; public static RpcResponse success(T data, String requestId) { RpcResponse response = new RpcResponse<>(); response.setCode(200); response.setMessage("success!"); response.setRequestId(requestId); if (null != data) { response.setData(data); } return response; } }
package demo3;
import common.Hello;
import common.HelloService;
import common.ServiceImpl.HelloServiceImpl;
import common.transport.Socket.SocketRpcRequestTransport;
public class Client3 {
public static void main(String[] args) {
HelloService service = new RpcClientProxy(new SocketRpcRequestTransport(), new HelloServiceImpl()).getProxy(HelloService.class);
String hello = service.hello(new Hello("111","233"));
System.out.print(hello);
}
}
package demo3;
import common.dto.RpcRequest;
import common.dto.RpcResponse;
import common.transport.RpcRequestTransport;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.UUID;
@Slf4j
public class RpcClientProxy implements InvocationHandler {
private RpcRequestTransport rpcRequestTransport;
private Object service;
public RpcClientProxy(RpcRequestTransport rpcRequestTransport){
this.rpcRequestTransport = rpcRequestTransport;
service = new Object();
}
public RpcClientProxy(RpcRequestTransport rpcRequestTransport, Object service){
this.rpcRequestTransport = rpcRequestTransport;
this.service = service;
}
@SuppressWarnings("unchecked")
public T getProxy(Class clazz) {
return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class>[]{clazz}, this);
}
@SneakyThrows
@SuppressWarnings("unchecked")
@Override
public Object invoke(Object proxy, Method method, Object[] args) {
log.info("invoked method: [{}]", method.getName());
RpcRequest rpcRequest = RpcRequest.builder().methodName(method.getName())
.parameters(args)
.interfaceName(method.getDeclaringClass().getName())
.paramTypes(method.getParameterTypes())
.className(service.getClass().getName())
.requestId(UUID.randomUUID().toString())
.build();
log.info(rpcRequest.toString());
RpcResponse
package common.transport.Socket;
import common.Exceptions.RpcException;
import common.dto.RpcRequest;
import common.transport.RpcRequestTransport;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
public class SocketRpcRequestTransport implements RpcRequestTransport {
@Override
public Object sendRpcRequest(RpcRequest rpcRequest) {
try (Socket socket = new Socket("127.0.0.1",9999)) {
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
// Send data to the server through the output stream
objectOutputStream.writeObject(rpcRequest);
ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
// Read RpcResponse from the input stream
return objectInputStream.readObject();
} catch (IOException | ClassNotFoundException e) {
throw new RpcException("调用服务失败:", e);
}
}
}
package demo3;
import common.dto.RpcRequest;
import common.dto.RpcResponse;
import lombok.extern.slf4j.Slf4j;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
@Slf4j
public class Server3 {
public static boolean running = true;
public static RpcServerHandler rpcServerHandler = new RpcServerHandler();
public static void main(String[] args) throws Exception {
ServerSocket serverSocket = new ServerSocket(9999);
while (running){
Socket s = serverSocket.accept();
process(s);
s.close();
}
serverSocket.close();
}
public static void process(Socket socket) throws Exception {
try (ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) {
RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
Object result = rpcServerHandler.handle(rpcRequest);
objectOutputStream.writeObject(RpcResponse.success(result, rpcRequest.getRequestId()));
objectOutputStream.flush();
} catch (IOException | ClassNotFoundException e) {
log.error("occur exception:", e);
}
}
}
package demo3;
import common.Exceptions.RpcException;
import common.dto.RpcRequest;
import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@Slf4j
public class RpcServerHandler {
public Object handle(RpcRequest rpcRequest) throws Exception {
Object service = Class.forName(rpcRequest.getClassName()).newInstance();
return invokeTargetMethod(rpcRequest, service);
}
private Object invokeTargetMethod(RpcRequest rpcRequest, Object service) {
Object result;
try {
Method method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes());
result = method.invoke(service, rpcRequest.getParameters());
log.info("service:[{}] successful invoke method:[{}]", rpcRequest.getInterfaceName(), rpcRequest.getMethodName());
} catch (NoSuchMethodException | IllegalArgumentException | InvocationTargetException | IllegalAccessException e) {
throw new RpcException(e.getMessage(), e);
}
return result;
}
}



