使用Zookeeper作为注册中心,RMI作为连接技术,手写RPC框架
1.框架结构● 连接器:提供默认链接信息配置和提供连接
● 注册器:提供注册服务和获取代理对象(没有具体的注册信息)
● RPC静态工厂:创建注册器、获取连接、注册服务和获取代理对象(已经通过静态初始化注册信息)
3.项目 3.1 连接器org.apache.zookeeper zookeeper 3.4.11
package com.fyp.rpc.connection;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
public class ZkConnection {
private String zkServer;
private int sessionTimeout;
public ZkConnection() {
super();
this.zkServer = "localhost:2181";
this.sessionTimeout = 10000;
}
public ZkConnection(String zkServer, int sessionTimeout) {
this.zkServer = zkServer;
this.sessionTimeout = sessionTimeout;
}
public ZooKeeper getConnection() throws IOException {
return new ZooKeeper(zkServer, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
}
});
}
}
3.2 注册器
package com.fyp.rpc.registry;
import com.fyp.rpc.connection.ZkConnection;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.rmi.Naming;
import java.rmi.NotBoundException;
import java.rmi.Remote;
import java.util.List;
public class FypRpcRegistry {
private ZkConnection connection;
private String ip;
private int port;
public void registerService(Class extends Remote> serviceInterface, Remote serviceObject) throws IOException, KeeperException, InterruptedException {
// rmi = rmi://ip:port/com.fyp.service.UsrService
String rmi = "rmi://" + ip + ":" + port + "/" + serviceInterface.getName();
// 拼接一个有规则的zk存储节点命名
String path = "/fyp/rpc/" + serviceInterface.getName();
List children = connection.getConnection().getChildren("/fyp/rpc", false);
if(!children.contains(serviceInterface.getName())) {
Stat stat = new Stat();
connection.getConnection().getData(path, false, stat);
connection.getConnection().delete(path, stat.getCversion());
}
connection.getConnection().create(path,rmi.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Naming.rebind(rmi, serviceObject);
}
public T getServiceProxy(Class serviceInterface) throws IOException, NotBoundException, KeeperException, InterruptedException {
String path = "/fyp/rpc/" + serviceInterface.getName();
byte[] datas = connection.getConnection().getData(path, false, null);
String rmi = new String(datas);
Object obj = Naming.lookup(rmi);
return (T) obj;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public ZkConnection getConnection() {
return connection;
}
public void setConnection(ZkConnection connection) {
this.connection = connection;
}
}
3.3 RPC静态工厂
package com.fyp.rpc;
import com.fyp.rpc.connection.ZkConnection;
import com.fyp.rpc.registry.FypRpcRegistry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import java.io.IOException;
import java.io.InputStream;
import java.rmi.NotBoundException;
import java.rmi.Remote;
import java.rmi.registry.LocateRegistry;
import java.util.List;
import java.util.Properties;
public class FypRpcFactory {
private static final Properties config = new Properties();
private static final ZkConnection connection;
private static final FypRpcRegistry registry;
static {
try {
InputStream input = FypRpcRegistry.class.getClassLoader().getResourceAsStream("fyp-rpc.properties");
config.load(input);
String serverIp = config.getProperty("registry.ip") == null ? "localhost" : config.getProperty("registry.ip");
int serverPort= config.getProperty("registry.port") == null ? 9090 : Integer.parseInt(config.getProperty("registry.port"));
String zkServe = config.getProperty("zk.server") == null ? "localhost:2181" : config.getProperty("zk.server");
int zkSessionTimeout = config.getProperty("zk.sessionTimeout") == null ? 10000 : Integer.parseInt(config.getProperty("zk.sessionTimeout"));
connection = new ZkConnection(zkServe,zkSessionTimeout);
registry = new FypRpcRegistry();
registry.setIp(serverIp);
registry.setPort(serverPort);
registry.setConnection(connection);
LocateRegistry.createRegistry(serverPort);
List children = connection.getConnection().getChildren("/", false);
if(!children.contains("fyp")) {
connection.getConnection().create("/fyp",null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
List fypChildren = connection.getConnection().getChildren("/fyp", false);
if(!fypChildren.contains("rpc")) {
connection.getConnection().create("/fyp/rpc",null,ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
}
} catch (Exception e) {
e.printStackTrace();
// 初始化发生异常,中断虚拟机
throw new ExceptionInInitializerError(e);
}
}
public static void registerService(Class extends Remote> serviceInterface, Remote serviceObject) throws IOException, InterruptedException, KeeperException {
registry.registerService(serviceInterface, serviceObject);
}
public static T getServiceProxy(Class serviceInterface) throws IOException, KeeperException, InterruptedException, NotBoundException {
return registry.getServiceProxy(serviceInterface);
}
}
总结:
说白了,RPC框架已经被实现了,最大众的dubbo大家应该都用过了,这篇文章就是基于RMI技术实现的简易版dubbo,后续会给出优化————服务自动发现注册、服务容错和负载均衡,想了解的的不妨加个收藏。
最后,如果有需要先了解dubbo再来学习RPC框架的,可以参考学习下面这篇文章。
《Linux环境下Dubbo环境搭建及启动》
学习参考:
尚学堂RPC远程过程调用:https://www.bilibili.com/video/BV11i4y1N7LQ



