栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

rpc方式(rpc api)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

rpc方式(rpc api)

一起实现RPC,超详细!!! 第一篇 一个简单的RPC通信逻辑如下:

服务端暴露其要提供服务的接口客户端通过动态代理调用服务端的接口方法,通过代理将要传输的信息发送到服务端(代理是为了让客户端不去关注远程的细节,就像调用本地方法一样)服务端通过Socket进行不间断监听,如接收到数据后,就创建一个线程去执行服务端通过反射从客户端传来的信息中去找到对应的方法服务端将执行得到的数据结果封装到response中返回给客户端客户端收到数据并进行输出


项目的整体框架如下:

通过maven进行管理,父工程为My-RPC

My-RPC

RPC-API:接口相关的类RPC-COMMON:通用类RPC-CORE:RPC核心框架Client:客户端Server:服务端


实现过程如下: 一、定义接口 1.定义服务端要提供的接口

该接口位于RPC-API下,其提供一个方法hello,参数为一个HelloObject对象(后续会创建)。

package com.t598.api;


public interface HelloService {
    
    String hello(HelloObject helloObject);
}
2.创建hello方法中需要的HelloObject
package com.t598.api;

import lombok.AllArgsConstructor;
import lombok.Data;

import java.io.Serializable;


@Data
@AllArgsConstructor
public class HelloObject implements Serializable {
    private Integer id;
    private String message;
}

二、创建传输对象 1.定义一个用于客户端与服务端之间传输信息的对象

该对象可以让服务端来唯一的确定客户端要调用的方法(通过接口名、方法名、参数列表、参数类型来唯一确定)。该对象位于RPC-COMMON中。

package com.t598.common.entity;

import lombok.Builder;
import lombok.Data;

import java.io.Serializable;


@Data
@Builder
public class RpcRequest implements Serializable {
    
    private String interfaceName;

    
    private String methodName;

    
    private Object[] parameters;

    
    private Class[] paramTypes;
}

2.创建返回消息的对象

服务端通过RpcRequest对象找到对应的方法并执行之后,需要给客户端返还一个消息(成功或者失败),我们将执行结果封装到RpcResponse中。该对象位于RPC-COMMON中。

package com.t598.common.entity;

import com.t598.common.enumeration.ResponseCode;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;


@Data
@NoArgsConstructor
@AllArgsConstructor
public class RpcResponse implements Serializable {
    
    private Integer statusCode;

    
    private String message;

    
    private T data;


    
    public static  RpcResponse success(T data){
        //声明响应对象
        RpcResponse response = new RpcResponse<>();
        //从响应代码的枚举类ResponseCode中获取成功的代码,并将其写入当前的response响应状态
        response.setStatusCode(ResponseCode.SUCCESS.getCode());
        //将响应数据写入当前的response响应数据
        response.setData(data);
        //返回响应对象response
        return response;
    }

    public static  RpcResponse fail(ResponseCode code){
        //声明响应对象
        RpcResponse response = new RpcResponse<>();
        //写入当前的response响应状态
        response.setStatusCode(code.getCode());
        //写入当前的response补充信息
        response.setMessage(code.getMessage());
        //返回响应对象response
        return response;
    }
}

三、动态代理 1.创建客户端的动态代理对象

客户端通过动态代理来与服务端通信,客户端只需要调用,不用管具体的怎么实现,就像调用本地方法一样。位于RPC-COMMON中。

package com.t598.core.transport;
import com.t598.common.entity.RpcRequest;
import com.t598.common.entity.RpcResponse;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;


public class RpcClientProxy implements InvocationHandler {
    
    private String host;
    
    private int port;

    public RpcClientProxy(String host, int port) {
        this.host = host;
        this.port = port;
    }

    
    @SuppressWarnings("unchecked")
    public  T getProxy(Class clazz) {
        return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this);
    }

    
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        //声明传输的rpcRequest对象
        RpcRequest rpcRequest = RpcRequest.builder()
                .interfaceName(method.getDeclaringClass().getName())
                .methodName(method.getName())
                .parameters(args)
                .paramTypes(method.getParameterTypes())
                .build();
        RpcClient rpcClient = new RpcClient();
        return ((RpcResponse) rpcClient.sendRequest(rpcRequest, host, port)).getData();
    }
}

2.实现具体的通信逻辑

创建一个RpcClient类来实现具体的通信逻辑。位于RPC-COMMON中。

package com.t598.core.transport;

import com.t598.common.entity.RpcRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;

public class RpcClient {
    private static final Logger logger = LoggerFactory.getLogger(RpcClient.class);

    
    public Object sendRequest(RpcRequest rpcRequest, String host, int port){
        try (Socket socket = new Socket(host, port)) {
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
            ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
            objectOutputStream.writeObject(rpcRequest);
            objectOutputStream.flush();
            return objectInputStream.readObject();
        } catch (IOException | ClassNotFoundException e) {
            logger.error("调用时有错误发生:" + e);
            return null;
        }
    }
}

四、反射调用 1.服务端通过反射找到客户端要调用的方法

服务端一直监听客户端与之通信的端口(这里设置为8888)端口,当有请求连接时通过线程池创建线程让其执行通信的逻辑。位于RPC-COMMON中。

package com.t598.core.transport;

import com.t598.common.entity.RpcRequest;
import com.t598.common.entity.RpcResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.*;


public class RpcServer {
    private final ExecutorService threadPool;
    private static final Logger logger = LoggerFactory.getLogger(RpcServer.class);

    
    public RpcServer(){
        // 核心线程数
        int corePoolSize = 5;
        // 最大线程数
        int maximumPoolSize = 50;
        // 空闲线程的等待时间
        long keepAliveTime = 60;
        // 阻塞队列
        BlockingQueue workingQueue = new ArrayBlockingQueue<>(100);
        // 线程工厂
        ThreadFactory threadFactory = Executors.defaultThreadFactory();

        threadPool = new ThreadPoolExecutor(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                TimeUnit.SECONDS,
                workingQueue,
                threadFactory
        );
    }

    public void register(Object service, int port){
        try(ServerSocket serverSocket = new ServerSocket(port)){
            logger.info("服务器正在启动...");
            Socket socket;
            while ((socket = serverSocket.accept()) != null) {
                logger.info("客户端连接!Ip为:" + socket.getInetAddress());
                threadPool.execute(new WorkerThread(socket, service));
            }
        } catch (IOException e) {
            logger.error("连接时有错误发生:", e);
        }
    }

    private class WorkerThread implements Runnable {
        private final Socket socket;
        private final Object service;

        public WorkerThread(Socket socket, Object service) {
            this.socket = socket;
            this.service = service;
        }

        @Override
        public void run() {
            try (ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
                 ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) {
                RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
                Method method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes());
                Object returnObject = method.invoke(service, rpcRequest.getParameters());
                objectOutputStream.writeObject(RpcResponse.success(returnObject));
                objectOutputStream.flush();
            } catch (IOException | ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
                logger.error("调用或发送时有错误发生:", e);
            }
        }

    }
}

五、简单测试 1.客户端测试

位于Client中。

package com.t598.client;

import com.t598.api.HelloObject;
import com.t598.api.HelloService;
import com.t598.core.transport.RpcClientProxy;

public class TestClient {
    public static void main(String[] args) {
        //创建客户端的动态代理,由代理来负责与服务端通信
        RpcClientProxy rpcClientProxy = new RpcClientProxy("127.0.0.1", 8888);
        //返回代理对象,需要通信的接口是HelloService
        HelloService helloService = rpcClientProxy.getProxy(HelloService.class);
        //创建需要传输的对象,id为12,message为"this is a message"
        HelloObject helloObject = new HelloObject(12, "this is a message");
        //与服务端的HelloService接口通信,调用hello方法,并传入传输对象helloObject,返回字符串hello
        String hello = helloService.hello(helloObject);
        System.out.println(hello);
    }
}
2.服务端测试

位于Server中。

package com.t598.server;

import com.t598.core.transport.RpcServer;

public class TestServer {
    public static void main(String[] args) {
        HelloServiceImpl helloService = new HelloServiceImpl();
        RpcServer rpcServer = new RpcServer();
        rpcServer.register(helloService, 8888);
    }
}

PS:需要实现服务端提供的HelloService接口

package com.t598.server;

import com.t598.api.HelloObject;
import com.t598.api.HelloService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HelloServiceImpl implements HelloService {
    private static final Logger logger = LoggerFactory.getLogger(HelloServiceImpl.class);
    @Override
    public String hello(HelloObject helloObject) {
        logger.info("接收到:{}", helloObject.getMessage());
        return "这是调用的返回值,id=" + helloObject.getId();
    }
}

六、执行结果 服务端启动,等待连接:

客户端启动:

客户端接收消息:

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/773400.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号