栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

手撸RPC框架

手撸RPC框架

使用Zookeeper作为注册中心,RMI作为连接技术,手写RPC框架

1.框架结构

● 连接器:提供默认链接信息配置和提供连接
● 注册器:提供注册服务和获取代理对象(没有具体的注册信息)
● RPC静态工厂:创建注册器、获取连接、注册服务和获取代理对象(已经通过静态初始化注册信息)

2.依赖

        
            org.apache.zookeeper
            zookeeper
          
            3.4.11
        
    
3.项目 3.1 连接器
package com.fyp.rpc.connection;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;


public class ZkConnection {
    private String zkServer;
    private int sessionTimeout;

    public ZkConnection() {
        super();
        this.zkServer = "localhost:2181";
        this.sessionTimeout = 10000;
    }

    public ZkConnection(String zkServer, int sessionTimeout) {
        this.zkServer = zkServer;
        this.sessionTimeout = sessionTimeout;
    }

    public ZooKeeper getConnection() throws IOException {
        return new ZooKeeper(zkServer, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {

            }
        });
    }
}
3.2 注册器
package com.fyp.rpc.registry;

import com.fyp.rpc.connection.ZkConnection;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.rmi.Naming;
import java.rmi.NotBoundException;
import java.rmi.Remote;
import java.util.List;


public class FypRpcRegistry {
    private ZkConnection connection;
    private String ip;
    private int port;

    
    public void registerService(Class serviceInterface, Remote serviceObject) throws IOException, KeeperException, InterruptedException {
        // rmi = rmi://ip:port/com.fyp.service.UsrService
        String rmi = "rmi://" + ip + ":" + port + "/" + serviceInterface.getName();
        // 拼接一个有规则的zk存储节点命名
        String path = "/fyp/rpc/" + serviceInterface.getName();

        List children = connection.getConnection().getChildren("/fyp/rpc", false);
        if(!children.contains(serviceInterface.getName())) {
            Stat stat = new Stat();
            connection.getConnection().getData(path, false, stat);
            connection.getConnection().delete(path, stat.getCversion());
        }
        connection.getConnection().create(path,rmi.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        Naming.rebind(rmi, serviceObject);
    }

    
    public  T getServiceProxy(Class serviceInterface) throws IOException, NotBoundException, KeeperException, InterruptedException {
        String path = "/fyp/rpc/" + serviceInterface.getName();
        byte[] datas = connection.getConnection().getData(path, false, null);
        String rmi = new String(datas);
        Object obj = Naming.lookup(rmi);
        return (T) obj;
    }

    public String getIp() {
        return ip;
    }

    public void setIp(String ip) {
        this.ip = ip;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public ZkConnection getConnection() {
        return connection;
    }

    public void setConnection(ZkConnection connection) {
        this.connection = connection;
    }
}
3.3 RPC静态工厂
package com.fyp.rpc;

import com.fyp.rpc.connection.ZkConnection;
import com.fyp.rpc.registry.FypRpcRegistry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;

import java.io.IOException;
import java.io.InputStream;
import java.rmi.NotBoundException;
import java.rmi.Remote;
import java.rmi.registry.LocateRegistry;
import java.util.List;
import java.util.Properties;


public class FypRpcFactory {
    private static final Properties config = new Properties();
    private static final ZkConnection connection;
    private static final FypRpcRegistry registry;
    
    static {
        try {
            InputStream input = FypRpcRegistry.class.getClassLoader().getResourceAsStream("fyp-rpc.properties");
            config.load(input);
            String serverIp = config.getProperty("registry.ip") == null ? "localhost" : config.getProperty("registry.ip");
            int serverPort= config.getProperty("registry.port") == null ? 9090 : Integer.parseInt(config.getProperty("registry.port"));
            String zkServe = config.getProperty("zk.server") == null ? "localhost:2181" : config.getProperty("zk.server");
            int zkSessionTimeout = config.getProperty("zk.sessionTimeout") == null ? 10000 : Integer.parseInt(config.getProperty("zk.sessionTimeout"));
            connection = new ZkConnection(zkServe,zkSessionTimeout);
            registry = new FypRpcRegistry();
            registry.setIp(serverIp);
            registry.setPort(serverPort);
            registry.setConnection(connection);
            LocateRegistry.createRegistry(serverPort);

            List children = connection.getConnection().getChildren("/", false);
            if(!children.contains("fyp")) {
                connection.getConnection().create("/fyp",null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
            List fypChildren = connection.getConnection().getChildren("/fyp", false);
            if(!fypChildren.contains("rpc")) {
                connection.getConnection().create("/fyp/rpc",null,ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
            }
        } catch (Exception e) {
            e.printStackTrace();
            // 初始化发生异常,中断虚拟机
            throw new ExceptionInInitializerError(e);
        }
    }

    public static void registerService(Class serviceInterface, Remote serviceObject) throws IOException, InterruptedException, KeeperException {
        registry.registerService(serviceInterface, serviceObject);
    }

    public static  T getServiceProxy(Class serviceInterface) throws IOException, KeeperException, InterruptedException, NotBoundException {
        return registry.getServiceProxy(serviceInterface);
    }
}

总结:
说白了,RPC框架已经被实现了,最大众的dubbo大家应该都用过了,这篇文章就是基于RMI技术实现的简易版dubbo,后续会给出优化————服务自动发现注册、服务容错和负载均衡,想了解的的不妨加个收藏。
最后,如果有需要先了解dubbo再来学习RPC框架的,可以参考学习下面这篇文章。
《Linux环境下Dubbo环境搭建及启动》

学习参考:
尚学堂RPC远程过程调用:https://www.bilibili.com/video/BV11i4y1N7LQ

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/698488.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号