栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

【手写一个RPC框架】simpleRPC-04

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

【手写一个RPC框架】simpleRPC-04

前言

之前谈到,网络传输使用BIO的方式,我们在simpleRPC-04改为NIO来传输,引入netty的编解码方式。

我们在simpleRPC-04中将使用netty来优化我们的客户端client和服务端server,

实现 项目创建

创建module:simpleRPC-04

在java下创建package:com.rpc

配置依赖

我们配置pom.xml依赖如下:



    
        SimpleRPC
        org.example
        1.0-SNAPSHOT
    
    4.0.0

    simpleRPC-04

    
        8
        8
    

    
        
        
            org.projectlombok
            lombok
            1.18.12
            provided
        

        
            io.netty
            netty-all
            4.1.51.Final
        
    


common

我们的common和simpleRPC-03是一样的:

Blog.java

package com.rpc.common;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;


@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Blog implements Serializable {
    private Integer id;
    private Integer useId;
    private String title;
}

RPCRequest.java

package com.rpc.common;


import lombok.Builder;
import lombok.Data;

import java.io.Serializable;



@Data
@Builder
public class RPCRequest implements Serializable {
    // 服务类名,客户端只知道接口名,在服务端中用接口名指向实现类
    private String interfaceName;
    // 方法名
    private String methodName;
    // 参数列表
    private Object[] params;
    // 参数类型
    private Class[] paramsTypes;
}

RPCResponse.java

package com.rpc.common;

import lombok.Builder;
import lombok.Data;

import java.io.Serializable;


@Data
@Builder
public class RPCResponse implements Serializable {

    // 状态信息
    private int code;

    private String message;
    // 具体数据
    private Object data;

    public static RPCResponse success(Object data) {
        return RPCResponse.builder().code(200).data(data).build();
    }

    public static RPCResponse fail() {
        return RPCResponse.builder().code(500).message("服务器发生错误").build();
    }
}

User.java

package com.rpc.common;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;




@Builder
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {
    // 客户端和服务端共有的
    private Integer id;
    private String userName;
    private Boolean sex;
}

service

同样的,service和simpleRPC-04是一样的:

ServiceProvider.java

package com.rpc.service;


import java.util.HashMap;
import java.util.Map;


public class ServiceProvider {
    
    private Map interfaceProvider;

    // 构造函数,初始化一个空的 hashmap 赋给 Map interfaceProvider
    public ServiceProvider() {
        this.interfaceProvider = new HashMap<>();
    }

    public void provideServiceInterface(Object service) {
        // 反射,.getClass().getInterfaces()得到class的interface,按照interfaces name(key)和object(value)存入map
        Class[] interfaces = service.getClass().getInterfaces();
        for (Class clazz : interfaces) {
            interfaceProvider.put(clazz.getName(), service);
        }
    }

    public Object getService(String interfaceName) {
        return interfaceProvider.get(interfaceName);  // 通过interface name得到object
    }
}

BlogService.java

package com.rpc.service;

import com.rpc.common.Blog;

public interface BlogService {

    Blog getBlogById(Integer id);
}

BlogServiceImpl.java

package com.rpc.service;

import com.rpc.common.Blog;

public class BlogServiceImpl implements BlogService {

    @Override
    public Blog getBlogById(Integer id) {
        Blog blog = Blog.builder()
                        .id(id)
                        .title("我的博客")
                        .useId(22).build();
        System.out.println("客户端查询了" + id + "博客");
        return blog;
    }
}

UserService.java

package com.rpc.service;

import com.rpc.common.User;


public interface UserService {

    // 客户端通过这个接口调用服务端的实现类
    User getUserByUserId(Integer id);

    // 给这个服务增加一个功能
    Integer insertUserId(User user);
}

UserServiceImpl.java

package com.rpc.service;

import com.rpc.common.User;



public class UserServiceImpl implements UserService {

    @Override
    public User getUserByUserId(Integer id) {
        // 模拟从数据库中取用户的行为
        User user = User.builder()
                        .id(id)
                        .userName("he2121")
                        .sex(true).build();
        System.out.println("客户端查询了" + id + "的用户");
        return user;
    }

    @Override
    public Integer insertUserId(User user) {
        System.out.println("插入数据成功: " + user);
        return 1;
    }
}

server

我们的simpleRPC-04客户端和服务端用netty进行改进,

我们的PRCServer.java接口和simpleRPC-03一样:

RPCServer.java

package com.rpc.server;


public interface RPCServer {
    void start(int port);
    void stop();
}

我们创建一个NettyRPCServer.java来实现RPCServer接口:

代码和注释如下:

package com.rpc.server;

import com.rpc.service.ServiceProvider;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.AllArgsConstructor;



@AllArgsConstructor
public class NettyRPCServer implements RPCServer {
    private ServiceProvider serviceProvider;

    @Override
    public void start(int port) {
        // netty服务线程组负责建立连接(TCP/IP连接),work负责具体的请求
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workGroup = new NioEventLoopGroup();
        System.out.println("Netty服务端启动了");

        try {
            // 启动Netty服务器
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            // 初始化
            serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)
                           .childHandler(new NettyServerInitializer(serviceProvider));
            // 同步阻塞
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            // 死循环监听
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }

    @Override
    public void stop() {

    }
}

NettyServerInitializer.java

package com.rpc.server;

import com.rpc.service.ServiceProvider;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldbasedframeDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolver;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import lombok.AllArgsConstructor;


@AllArgsConstructor
public class NettyServerInitializer extends ChannelInitializer {
    private ServiceProvider serviceProvider;
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // 解码器:消息格式 [长度][消息体],解决粘包问题
        pipeline.addLast(new LengthFieldbasedframeDecoder(Integer.MAX_VALUE, 0, 4,
                        0, 4));
        // 编码器:计算当前待大宋消息的长度,写入到前4个字节中
        pipeline.addLast(new LengthFieldPrepender(4));

        // 这里使用的还是java 序列化方式, netty的自带的解码编码支持传输这种结构
        pipeline.addLast(new ObjectEncoder());
        pipeline.addLast(new ObjectDecoder(new ClassResolver() {
            @Override
            public Class resolve(String className) throws ClassNotFoundException {
                return Class.forName(className);
            }
        }));

        pipeline.addLast(new NettyRPCServerHandler(serviceProvider));
    }
}

NettyRPCServerHandler.java

package com.rpc.server;

import com.rpc.common.RPCRequest;
import com.rpc.common.RPCResponse;
import com.rpc.service.ServiceProvider;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.AllArgsConstructor;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;


@AllArgsConstructor
public class NettyRPCServerHandler extends SimpleChannelInboundHandler {
    private ServiceProvider serviceProvider;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RPCRequest msg) throws Exception {
        // System.out.println(msg);
        RPCResponse response = getResponse(msg);
        ctx.writeAndFlush(response);
        ctx.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    // 这里和WorkThread里的getResponse差不多
    RPCResponse getResponse(RPCRequest request) {
        // 得到服务名
        String interfaceName = request.getInterfaceName();
        // 得到服务器相应类
        Object service = serviceProvider.getService(interfaceName);
        // 反射调用方法
        Method method = null;
        try {
            method = service.getClass().getMethod(request.getMethodName(), request.getParamsTypes());
            Object invoke = method.invoke(service, request.getParams());
            return RPCResponse.success(invoke);
        } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
            e.printStackTrace();
            System.out.println("方法执行错误");
            return RPCResponse.fail();
        }
    }
}

定义服务端:TestServer.java

package com.rpc.server;

import com.rpc.service.*;

public class TestServer {
    public static void main(String[] args) {
        UserService userService = new UserServiceImpl();
        BlogService blogService = new BlogServiceImpl();

//        Map serviceProvide = new HashMap<>();
//        serviceProvide.put("com.ganghuan.myRPCVersion2.service.UserService",userService);
//        serviceProvide.put("com.ganghuan.myRPCVersion2.service.BlogService",blogService);
        ServiceProvider serviceProvider = new ServiceProvider();
        serviceProvider.provideServiceInterface(userService);  // 把userService存入 serviceProvider
        serviceProvider.provideServiceInterface(blogService);  // 把blogService存入 serviceProvider

//        RPCServer RPCServer = new ThreadPoolRPCRPCServer(serviceProvider);
        RPCServer RPCServer = new NettyRPCServer(serviceProvider);
        RPCServer.start(8899);
    }
}

client

接下来是客户端的改造

RPCClient.java和simpleRPC-03一样:

package com.rpc.client;


import com.rpc.common.RPCRequest;
import com.rpc.common.RPCResponse;


public interface RPCClient {
    RPCResponse sendRequest(RPCRequest request);
}

RPCClientProxy.java 代理类需要稍微修改:

package com.rpc.client;

import com.rpc.common.RPCRequest;
import com.rpc.common.RPCResponse;
import lombok.AllArgsConstructor;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;


@AllArgsConstructor
public class RPCClientProxy implements InvocationHandler {
    private RPCClient client;

    // jdk动态代理,每一次代理对象调用方法,会经过此方法增强(反射获取request对象,socket发送至客户端)
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        // request的构建,使用了lombok中的builder,更加简洁
        RPCRequest request = RPCRequest.builder().interfaceName(method.getDeclaringClass().getName())
                                       .methodName(method.getName())
                                       .params(args)
                                       .paramsTypes(method.getParameterTypes())
                                       .build();
        // 数据传输
        RPCResponse response = client.sendRequest(request);
//        System.out.println(response);
        return response.getData();
    }

     T getProxy(Class clazz) {
        Object o = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this);
        return (T)o;
    }
}

NettyRPCClient.java

package com.rpc.client;

import com.rpc.common.RPCRequest;
import com.rpc.common.RPCResponse;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.AttributeKey;


public class NettyRPCClient implements RPCClient{
    private static final Bootstrap bootstrap;
    private static final EventLoopGroup evenLoopGroup;
    private String host;
    private int port;

    // 构造函数
    public NettyRPCClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    // netty客户端初始化,重复使用
    static {
        evenLoopGroup = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(evenLoopGroup).channel(NioSocketChannel.class).handler(new NettyClientInitializer());
    }

    // 这里需要操作一下,因为netty的传输都是异步的,你发送request,会立刻返回一个值, 而不是想要的相应的response
    @Override
    public RPCResponse sendRequest(RPCRequest request) {
        try {
            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
            Channel channel = channelFuture.channel();
            // 发送数据
            channel.writeAndFlush(request);
            channel.closeFuture().sync();
            // 阻塞的获得结果,通过给channel设计别名,获取特定名字下的channel中的内容(这个在hanlder中设置)
            // AttributeKey是,线程隔离的,不会由线程安全问题。
            // 实际上不应通过阻塞,可通过回调函数
            AttributeKey key = AttributeKey.valueOf("RPCResponse");
            RPCResponse response = channel.attr(key).get();

            System.out.println(response);
            return response;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }
}

NettyClientInitializer.java

package com.rpc.client;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldbasedframeDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolver;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;


public class NettyClientInitializer extends ChannelInitializer {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // 消息格式 [长度][消息体]
        pipeline.addLast(new LengthFieldbasedframeDecoder(Integer.MAX_VALUE, 0, 4,
                        0, 4));
        // 计算当前待大宋消息的长度,写入到前4个字节中
        pipeline.addLast(new LengthFieldPrepender(4));
        pipeline.addLast(new ObjectEncoder());

        pipeline.addLast(new ObjectDecoder(new ClassResolver() {
            @Override
            public Class resolve(String className) throws ClassNotFoundException {
                return Class.forName(className);
            }
        }));
        pipeline.addLast(new NettyClientHandler());
    }
}

NettyClientHandler.java

package com.rpc.client;

import com.rpc.common.RPCResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.AttributeKey;


public class NettyClientHandler extends SimpleChannelInboundHandler {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RPCResponse msg) throws Exception {
        // 接收到response, 给channel设计别名,让sendRequest里读取response
        AttributeKey key = AttributeKey.valueOf("RPCResponse");
        ctx.channel().attr(key).set(msg);
        ctx.channel().close();
    }

    // 跟NettyRPCServerHandler一样
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

最后定义客户端:

TestClient.java

package com.rpc.client;

import com.rpc.common.Blog;
import com.rpc.common.User;
import com.rpc.service.BlogService;
import com.rpc.service.UserService;



public class TestClient {
    public static void main(String[] args) {
        // 构建一个使用java socket或者netty的客户端
        RPCClient rpcClient = new NettyRPCClient("127.0.0.1", 8899);
        // 把这个客户端传入代理客户端
        RPCClientProxy rpcClientProxy = new RPCClientProxy(rpcClient);
        // 代理客户端根据不同的服务,获得一个代理类, 并且这个代理类的方法以或者增强(封装数据,发送请求)
        UserService userService = rpcClientProxy.getProxy(UserService.class);

        // 服务的方法1
        User userByUserId = userService.getUserByUserId(10);
        System.out.println("从服务器端得到的user为:" + userByUserId);

        // 服务的方法2
        User user = User.builder().userName("张三").id(100).sex(true).build();
        Integer integer = userService.insertUserId(user);
        System.out.println("向服务器端插入数据" + integer);

        // 服务的方法3
        BlogService blogService = rpcClientProxy.getProxy(BlogService.class);
        Blog blogById = blogService.getBlogById(10000);
        System.out.println("从服务端得到的blog为:" + blogById);
    }
}


文件结构

simpleRPC-04的文件结构如下

运行

我们先运行 TestServer.java

然后运行 TestClient.java

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/682421.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号