栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

手写一个简单rpc框架(一)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

手写一个简单rpc框架(一)

        扑街前言:前面说了netty的基本运用、Java的NIO等一系列的知识,这些知识已经可以做一个简单的rpc框架,本篇和下篇我们一起了解一个怎么完成一个rpc框架,当然个只是为了更好的了解rpc框架的基本逻辑,并不是真的可以用于业务使用。(认识到自己是菜鸟的第47天,今天突然记起来是多少天了)


        在编写具体代码之前,我们要了解什么是rpc框架,它是由什么结构组成的,而最常见RPC框架就是Dubbo。

        RPC 的主要功能目标是让构建分布式计算(应用)更容易,是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议规范,简单的来说就是像调用本地服务一样调用远程服务,对开发者而言是透明的。

RPC的优势
    RPC框架一般使用长链接,不必每次通信都要3次握手,减少网络开销。RPC框架一般都有注册中心,有丰富的监控管理。发布、下线接口、动态扩展等,对调用方来说是无感知、统一化的操作。协议私密,安全性较高。rpc 能做到协议更简单内容更小,效率更高。rpc是面向服务的更高级的抽象,支持服务注册发现,负载均衡,超时重试,熔断降级等高级特性。
RPC架构设计

        一幅图片解释整个rpc的架构设计


        上述简单描述一下rpc的一些简单概念,那么首先来编写服务端的代码,因为服务端的编写难度要小于客户端。从上面图片可以看出服务端主要是:服务注册(这里用zookeeper作为注册中心)、监听端口接收连接(包括请求解码、响应编码、请求处理,而编码和解码又有一次编、解码和二次编、解码)。当然这是简单的基本功能,比如限流、健康监测之类的后续再说,先迈出第一步很重要。

 zookeeper

        在代码开始之前,还需要简单的了解zookeeper的安装和使用,安装就是在zookeeper的官网下载一下最新的稳定版本,然后解压,打开bin目录,运行zkServer.cmd即可。具体的详细下篇文章再说,本篇文章重点是rpc框架的基本编写。至于Java中使用zookeeper,可以类比Redis的使用,Redis为Java提供了两个客户端 Jedis 和 Redisson,下面代码我们也是使用zookeeper了一个客户端 zkclient。


代码逻辑示例

        上述内容结合之前文章的相关的网络编程内容,可以先写一个服务端,代码如下。 首先结合上面的流程图,大致说一下逻辑。1、需要一个引导类,用于引导整个rpc服务的启动;2、有一个启动器,在启动器中完成服务注册和基于netty的监听;3、需要一个服务注册的server,基于zookeeper实现服务注册,需要封装zookeeper的连接和调用;4、需要一个netty的server,实现一次、二次编解码,并且实现请求处理调用具体的业务逻辑。

引导类

        这里单独说明一下@PostConstruct注解,这个具体作用是当这个类被注册bean的时候,会运行一次被修饰的方法,但是在jdk9之后就去除了。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

@Configuration
public class RpcServerBootstrap {

    @Autowired
    private RpcServerRnner rpcServerRnner;

    @PostConstruct
    public void initRpcServer (){
        // 运行启动器
        rpcServerRnner.run();
    }
}
启动器
import com.rpc.server.registry.RpcRegistry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
public class RpcServerRnner {
    @Autowired
    private RpcRegistry rpcRegistry;

    @Resource
    private RpcServer rpcServer;

    
    public void run () {
        // 服务注册
        rpcRegistry.serviceRegistry();

        // 启动服务,监听端口,接收连接请求
        rpcServer.start();
    }
}
配置信息对象
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Data
@Component
public class RpcServerConfiguration {

    
    @Value("${rpc.server.zk.root}")
    private String zkRoot;

    
    @Value("${rpc.server.zk.addr}")
    private String zkAddr;


    
    @Value("${rpc.network.port}")
    private int rpcPort;

    
    @Value("${server.port}")
    private int serverPort;

    
    @Value("${rpc.server.zk.timeout:10000}")
    private int connectTimeout;
}

zookeeper客户端连接
import org.I0Itec.zkclient.ZkClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ServerZkClientConfig {

    
    @Autowired
    private RpcServerConfiguration rpcServerConfiguration;

    
    @Bean
    public ZkClient zkClient() {
        return new ZkClient(rpcServerConfiguration.getZkAddr(), rpcServerConfiguration.getConnectTimeout());
    }
}
zookeeper连接操作接口
import org.I0Itec.zkclient.ZkClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


@Component
public class ServerZKit {

    @Autowired
    private ZkClient zkClient;

    @Autowired
    private RpcServerConfiguration rpcServerConfiguration;

    
    public void createRootNode() {
        boolean exists = zkClient.exists(rpcServerConfiguration.getZkRoot());
        if (!exists) {
            zkClient.createPersistent(rpcServerConfiguration.getZkRoot());
        }
    }

    
    public void createPersistentNode(String path) {
        String pathName = rpcServerConfiguration.getZkRoot() + "/" + path;
        boolean exists = zkClient.exists(pathName);
        if (!exists) {
            zkClient.createPersistent(pathName);
        }
    }

    
    public void createNode(String path) {
        String pathName = rpcServerConfiguration.getZkRoot() + "/" + path;
        boolean exists = zkClient.exists(pathName);
        if (!exists) {
            zkClient.createEphemeral(pathName);
        }
    }
}
用于服务请求连接的注解
import org.springframework.core.annotation.AliasFor;
import org.springframework.stereotype.Component;
import java.lang.annotation.*;

@Component
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@documented
public @interface HrpcService {

    
    @AliasFor(annotation = Component.class)
    String value() default "";

    
    Class interfaceClass() default void.class;

    
    String interfaceName() default "";

    
    String version() default "";

    
    String group() default "";
}
创建一个Spring的Bean工厂,用于封装获取IOC容器中的bean信息
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

import java.lang.annotation.Annotation;
import java.util.Map;

@Component
public class SpringBeanFactory implements ApplicationContextAware {

    
    private static ApplicationContext context;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        context = applicationContext;
    }

    

    
    public static    T getBean(Class cls) {
        return context.getBean(cls);
    }

    
    public static Object getBean(String beanName) {
        return context.getBean(beanName);
    }

    
    public static Map getBeanListByAnnotationClass(Class annotationClass) {
         return context.getBeansWithAnnotation(annotationClass);
    }

    
    public static void registerSingleton(Object bean) {
        DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) context.getAutowireCapableBeanFactory();
        // 让bean完成Spring初始化过程中所有增强器检验,只是不重新创建bean
        beanFactory.applyBeanPostProcessorsAfterInitialization(bean,bean.getClass().getName());
        //将bean以单例的形式入驻到容器中,此时通过bean.getClass().getName()或bean.getClass()都可以拿到放入Spring容器的Bean
        beanFactory.registerSingleton(bean.getClass().getName(),bean);
    }
}
服务注册

        上面的准备工作基本上就做完了,下面开始正式的逻辑代码。这里提一点,当spring boot整合这个自定义框架的时候,可以有很多方式,这个不再细说,可以参考文章spring boot的自动配置,这里可以直接粗暴一点在spring boot项目的启动类上@SpringBootApplication(scanbasePackages ={"包路径","包路径"})。

        再说一下@component 注解,这是由spring 提供,被其修饰的类被声明为spring 的组件,简单来说就是创建bean并放置IOC容器中。

public interface RpcRegistry {
    
    void serviceRegistry();
}

import com.rpc.annotation.HrpcService;
import com.rpc.server.config.RpcServerConfiguration;
import com.rpc.server.registry.RpcRegistry;
import com.rpc.spring.SpringBeanFactory;
import com.rpc.util.IpUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
@Slf4j
public class ZkRegistry implements RpcRegistry {
    
    @Autowired
    private SpringBeanFactory springBeanFactory;

    
    @Autowired
    private ServerZKit zKitClient;

    
    @Autowired
    private RpcServerConfiguration rpcServerConfiguration;

    @Override
    public void serviceRegistry() {
        
        // 获取被HrpcService 注解修饰的,IOC中的所有的bean信息
        Map annotationClass = springBeanFactory.getBeanListByAnnotationClass(HrpcService.class);

        // 没被注册信息,直接结束
        if (annotationClass == null || annotationClass.size() < 0){
            return;
        }

        // 迭代所有的bean
        for (Object bean : annotationClass.values()) {
            // 获取HrpcService 注解信息
            HrpcService hrpcService = bean.getClass().getAnnotation(HrpcService.class);

            // 获取HrpcService 注解的interfaceClass属性,也就是接口对象
            Class interfaceClass = hrpcService.interfaceClass();

            // 获取接口的名称
            String name = interfaceClass.getName();

            
            // 根节点
            zKitClient.createRootNode();

            // 子节点,用于接口名称
            zKitClient.createPersistentNode(name);

            // 获取ip
            String ip = IpUtil.getRealIp();
            // ip + 端口
            String node = ip + rpcServerConfiguration.getZkAddr();
            // 子节点对应下级节点
            zKitClient.createNode(name + "/" + node);

            // 打印日志
            log.info("服务{}-{}注册成功", name, node);
        }
    }
}

序列化工具
import io.protostuff.linkedBuffer;
import io.protostuff.ProtostuffIOUtil;
import io.protostuff.Schema;
import io.protostuff.runtime.RuntimeSchema;
import lombok.extern.slf4j.Slf4j;

import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;


@Slf4j
public class ProtostuffUtil {
	
	//存储因为无法直接序列化/反序列化 而需要被包装的类型Class
	private static final Set> WRAPPER_SET = new HashSet>();
	
	static {
		WRAPPER_SET.add(List.class);
		WRAPPER_SET.add(ArrayList.class);
		WRAPPER_SET.add(CopyOnWriteArrayList.class);
		WRAPPER_SET.add(linkedList.class);
		WRAPPER_SET.add(Stack.class);
		WRAPPER_SET.add(Vector.class);
		WRAPPER_SET.add(Map.class);
		WRAPPER_SET.add(HashMap.class);
		WRAPPER_SET.add(TreeMap.class);
		WRAPPER_SET.add(linkedHashMap.class);
		WRAPPER_SET.add(Hashtable.class);
		WRAPPER_SET.add(SortedMap.class);
		WRAPPER_SET.add(Object.class);
	}
	
	//注册需要使用包装类进行序列化的Class对象
	public static void registerWrapperClass(Class clazz) {
		WRAPPER_SET.add(clazz);
	}
	
	
	public static  byte[] serialize(T t,boolean useWrapper) {
		Object serializerObj = t;
		if (useWrapper) {
			serializerObj = SerializeDeserializeWrapper.build(t);
		}
		return serialize(serializerObj);
	}
	
	
	public static  byte[] serialize(T t) {
		//获取序列化对象的class
		Class clazz = (Class) t.getClass();
		Object serializerObj = t;
		if (WRAPPER_SET.contains(clazz)) {
			serializerObj = SerializeDeserializeWrapper.build(t);//将原始序列化对象进行包装
		}
		return doSerialize(serializerObj);
	}
	
	
	
	public static  byte[] doSerialize(T t) {
		//获取序列化对象的class
		Class clazz = (Class) t.getClass();
		//获取Schema
		// RuntimeSchema schema = RuntimeSchema.createFrom(clazz);//根据给定的class创建schema
		
		Schema schema = RuntimeSchema.getSchema(clazz);//内部有缓存机制
		
		linkedBuffer buffer = linkedBuffer.allocate(linkedBuffer.DEFAULT_BUFFER_SIZE);
		byte[] protostuff = null;
		try {
			protostuff = ProtostuffIOUtil.toByteArray(t, schema, buffer);
		} catch (Exception e){
			log.error("protostuff serialize error,{}",e.getMessage());
		}finally {
			buffer.clear();
		}
		return protostuff;
	}
	
	
	
	public static  T deserialize(byte[] data,Class clazz) {
		//判断是否经过包装
		if (WRAPPER_SET.contains(clazz)) {
			SerializeDeserializeWrapper wrapper = new SerializeDeserializeWrapper();
			ProtostuffIOUtil.mergeFrom(data,wrapper,RuntimeSchema.getSchema(SerializeDeserializeWrapper.class));
			return wrapper.getData();
		}else {
			Schema schema = RuntimeSchema.getSchema(clazz);
			T newMessage = schema.newMessage();
			ProtostuffIOUtil.mergeFrom(data,newMessage,schema);
			return newMessage;
		}
	}
	
	
	private static class SerializeDeserializeWrapper {
		//被包装的数据
		T data;
		
		public static  SerializeDeserializeWrapper build(T data){
			SerializeDeserializeWrapper wrapper = new SerializeDeserializeWrapper();
			wrapper.setData(data);
			return wrapper;
		}
		
		public T getData() {
			return data;
		}
		
		public void setData(T data) {
			this.data = data;
		}
	}
}
一次编码
import io.netty.handler.codec.LengthFieldbasedframeDecoder;


public class frameDecoder extends LengthFieldbasedframeDecoder {
    public frameDecoder() {
        super(Integer.MAX_VALUE, 0, 4, 0, 4);
    }
}
二次编码
import com.rpc.util.ProtostuffUtil;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import lombok.extern.slf4j.Slf4j;

import java.util.List;


@Slf4j
public class RpcResponseEncoder extends MessageToMessageEncoder {
    @Override
    protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception {
        
        try {
            // 使用序列化工具,将msg序列化
            byte[] bytes = ProtostuffUtil.serialize(msg);

            // 由ctx分配构建一个buffer对象
            ByteBuf buffer = ctx.alloc().buffer(bytes.length);
            // 将数据交给buffer
            buffer.writeBytes(bytes);

            // 添加写出
            out.add(buffer);
        } catch (Exception e) {
            // 异常
            log.error("RpcResponseEncoder exception ,msg={}",e.getMessage());
        }
    }
} 
一次解码 
import io.netty.handler.codec.LengthFieldPrepender;


public class frameEncoder extends LengthFieldPrepender {
    public frameEncoder() {
        super(4);
    }
}
二次解码
import com.itheima.rpc.util.ProtostuffUtil;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import lombok.extern.slf4j.Slf4j;

import java.util.List;


@Slf4j
public class RpcResponseEncoder extends MessageToMessageEncoder {
    @Override
    protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception {
        
        try {
            // 使用序列化工具,将msg序列化
            byte[] bytes = ProtostuffUtil.serialize(msg);

            // 由ctx分配构建一个buffer对象
            ByteBuf buffer = ctx.alloc().buffer(bytes.length);
            // 将数据交给buffer
            buffer.writeBytes(bytes);

            // 添加写出
            out.add(buffer);
        } catch (Exception e) {
            // 异常
            log.error("RpcResponseEncoder exception ,msg={}",e.getMessage());
        }
    }
} 
响应对象 
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class RpcResponse {
    private String requestId;
    private Object result;
    private Throwable cause;

    public boolean isError() {
        return cause != null;
    }
}
请求对象
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class RpcRequest {
    private String requestId;
    private String className;
    private String methodName;
    private Class[] parameterTypes;
    private Object[] parameters;
}
业务逻辑调用Handler
import com.itheima.rpc.data.RpcRequest;
import com.itheima.rpc.data.RpcResponse;
import com.itheima.rpc.spring.SpringBeanFactory;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;


@Slf4j
public class RpcRequestHandler extends SimpleChannelInboundHandler {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcRequest rpcRequest) throws Exception {
        
        log.info("服务端收到的请求是:{}",rpcRequest);
        // 构建响应对象
        RpcResponse rpcResponse = new RpcResponse();
        // 于请求对象关联
        rpcResponse.setRequestId(rpcRequest.getRequestId());

        try {
            // 接口名称
            String interfaceName = rpcRequest.getClassName();
            // 方法名称
            String methodName = rpcRequest.getMethodName();
            // 参数类型
            Class[] parameterTypes = rpcRequest.getParameterTypes();
            // 实际参数
            Object[] parameters = rpcRequest.getParameters();

            // 从容器中获取bean实例
            Object bean = SpringBeanFactory.getBean(interfaceName);
            // 反射获取method 对象
            Method method = bean.getClass().getMethod(methodName, parameterTypes);
            // 执行对应方法,拿到返回值
            Object result = method.invoke(bean, parameters);

            // 添加到响应对象
            rpcResponse.setResult(result);
        } catch (Exception e) {
            log.error("RpcRequestHandler exception,msg={}",e.getMessage());
            rpcResponse.setCause(e);
        } finally {
            // 将结果写回
            log.info("向客户端发送响应,{}",rpcResponse);
            ctx.writeAndFlush(rpcResponse);
        }
    }
}
netty代码实现
import com.rpc.netty.codec.frameDecoder;
import com.rpc.netty.codec.frameEncoder;
import com.rpc.netty.codec.RpcRequestDecoder;
import com.rpc.netty.codec.RpcResponseEncoder;
import com.rpc.netty.handler.RpcRequestHandler;
import com.rpc.server.boot.RpcServer;
import com.rpc.server.config.RpcServerConfiguration;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.NettyRuntime;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.springframework.beans.factory.annotation.Autowired;

public class NettServer implements RpcServer {
    @Autowired
    private RpcServerConfiguration rpcServerConfiguration;


    @Override
    public void start() {
        
        // 构建注册serverSocketChannel 的线程池
        NioEventLoopGroup boss = new NioEventLoopGroup(1, new DefaultThreadFactory("boss"));
        // 构建注册socketChannel 的线程池
        NioEventLoopGroup worker = new NioEventLoopGroup(0, new DefaultThreadFactory("worker"));
        // 构建业务调用的线程池
        NioEventLoopGroup rpcRequestHandler = new NioEventLoopGroup(NettyRuntime.availableProcessors() * 2, new DefaultThreadFactory("reqRequestHandler"));

        // 业务逻辑调用
        RpcRequestHandler requestHandler = new RpcRequestHandler();

        try {
            // 构建引导类
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            // 配置引导类
            serverBootstrap.group(boss, worker)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG,1024)
                    .childOption(ChannelOption.TCP_NODELAY,true)
                    .childOption(ChannelOption.SO_KEEPALIVE,true)
                    .childHandler(new ChannelInitializer() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            // 获取 pipeline
                            ChannelPipeline pipeline = socketChannel.pipeline();

                            
                            // 一级编码
                            pipeline.addLast("frameEncoder", new frameEncoder());
                            // 二级编码
                            pipeline.addLast("RpcResponseEncoder", new RpcResponseEncoder());

                            // 一级解码
                            pipeline.addLast("frameDecoder", new frameDecoder());
                            // 二级解码
                            pipeline.addLast("RpcRequestDecoder", new RpcRequestDecoder());

                            // 业务线程池调用
                            pipeline.addLast(rpcRequestHandler, "requestHandler", requestHandler);
                        }
                    });

            // 启动引导类,监听端口,设置同步
            ChannelFuture future = serverBootstrap.bind(rpcServerConfiguration.getRpcPort()).sync();
            // 监控等待关闭
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            // 异常
            e.printStackTrace();
        } finally {
            
            boss.shutdownGracefully();
            worker.shutdownGracefully();
            rpcRequestHandler.shutdownGracefully();
        }
    }
}

        上述内容就已经完成了一个服务端的创建,后续文章在说客户端,本次结束。

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号