栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

RPC系列之Netty实现自定义RPC框架

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

RPC系列之Netty实现自定义RPC框架

进行这个章节之前,需要去看一下RMI的实现哈,如果了解过的童鞋可以直接跳过,如果没有活着不知道RMI的童鞋,移驾到下面的链接看完之后再回来继续看这篇

RPC系列之入门_阿小冰的博客-CSDN博客RPC系列之入门https://blog.csdn.net/qq_38377525/article/details/123507599?spm=1001.2014.3001.5502


介绍

说到RPC,应该能想的到Dubbo吧, Dubbo的底层是使用了Netty作为网络通讯框架,Netty赋予了Dubbo使用RPC远程调用服务,那接下来我们体验一下用Netty实现一个简单的RPC框架,消费者和提供者约定接口和协议,消费者远程调用提供者的服务

1、创建一个接口,定义一个抽象方法,用于约定消费者和提供者之间的调用

2、创建一个提供者,需要监听消费者的请求,并按照1中的约束返回数据

3、创建一个消费者,需要透明的调用自己不存在的方法,内部需要使用Netty实现数据通信

4、提供者与消费者数据传输使用json字符串数据格式

5、提供者使用netty集成SpringBoot环境来实现需求

需求案例 

客户端远程调用服务端提供一个根据id查询订单的方法

代码实现 1、服务端代码

编写Rpc注解

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface Rpc {
}

 编写接口类
public interface IOrderService {
    Order getById(int id);
}
 编写实现类
@Service
@Rpc
public class OrderServiceImpl implements IOrderService {
    Map orderMap = new HashMap();

    @Override
    public Order getById(int id) {
        Order order1 = new Order();
        order1.setId(1);
        order1.setTitle("铅笔盒");

        Order order2 = new Order();
        order2.setId(1);
        order2.setTitle("A4");

        orderMap.put(order1.getId(), order1);
        orderMap.put(order2.getId(), order2);

        return orderMap.get(id);
    }
}
服务业务处理类RpcServerHandler
@Component
@ChannelHandler.Sharable
public class RpcServerHandler extends SimpleChannelInboundHandler implements ApplicationContextAware {
    //本地缓存
    private static final Map SERVICE_INSTANCE_MAP = new ConcurrentHashMap();

    
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        Map serviceMap = applicationContext.getBeansWithAnnotation(Rpc.class);
        if (serviceMap != null && serviceMap.size() > 0) {
            Set> entries = serviceMap.entrySet();
            for (Map.Entry entry : entries) {
                Object entryValue = entry.getValue();
                if (entryValue.getClass().getInterfaces().length == 0) {
                    throw new RuntimeException("服务必须实现接口");
                }
                //默认获取第一个接口作为缓存bean的名称
                String name = entryValue.getClass().getInterfaces()[0].getName();
                SERVICE_INSTANCE_MAP.put(name, entryValue);
            }
        }
    }

    
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        //接收客户端请求,将msg装换为RpcRequest对象
        RpcRequest request = JSON.parseObject(s, RpcRequest.class);
        RpcResponse response = new RpcResponse();
        response.setRequestId(request.getRequestId());
        try {
            //业务处理
            response.setResult(handler(request));
        } catch (Exception e) {
            e.printStackTrace();
            response.setError(e.getMessage());
        }
        //返回客户端
        channelHandlerContext.write(JSON.toJSonString(response));
    }

    
    public Object handler(RpcRequest rpcRequest) throws InvocationTargetException {
        // 3.根据传递过来的beanName从缓存中查找到对应的bean
        Object serviceBean = SERVICE_INSTANCE_MAP.get(rpcRequest.getClassName());
        if (serviceBean == null) {
            throw new RuntimeException("根据beanName找不到服务,beanName:" + rpcRequest.getClassName());
        }
        //4.解析请求中的方法名称. 参数类型 参数信息
        Class serviceBeanClass = serviceBean.getClass();
        String methodName = rpcRequest.getMethodName();
        Class[] parameterTypes = rpcRequest.getParameterTypes();
        Object[] parameters = rpcRequest.getParameters();
        //5.反射调用bean的方法- CGLIB反射调用
        FastClass fastClass = FastClass.create(serviceBeanClass);
        FastMethod method = fastClass.getMethod(methodName, parameterTypes);
        return method.invoke(serviceBean, parameters);
    }


}
 编写Netty启动类RpcServer
@Service
public class RpcServer implements DisposableBean {
    private NioEventLoopGroup bossGroup;

    private NioEventLoopGroup workerGroup;

    @Autowired
    RpcServerHandler rpcServerHandler;

    public void startServer(String ip, int port) {
        bossGroup = new NioEventLoopGroup(1);
        workerGroup = new NioEventLoopGroup();
        //绑定端口
        try {
            //2.创建服务端启动助手
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //3.设置参数
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            //业务处理
                            pipeline.addLast(rpcServerHandler);
                        }
                    });
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            System.out.println("====================服务端启动成功!=====================");
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            if (bossGroup != null) {
                bossGroup.shutdownGracefully();
            }
            if (workerGroup != null) {
                workerGroup.shutdownGracefully();
            }
        }

    }

    @Override
    public void destroy() throws Exception {
        if (bossGroup != null) {
            bossGroup.shutdownGracefully();
        }
        if (workerGroup != null) {
            workerGroup.shutdownGracefully();
        }
    }
}
编写启动类ServerBootstrapApplication 
@SpringBootApplication
public class ServerBootstrapApplication implements CommandLineRunner {
    @Autowired
    RpcServer rpcServer;

    public static void main(String[] args) {
        SpringApplication.run(ServerBootstrapApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        new Thread(new Runnable() {
            @Override
            public void run() {
                rpcServer.startServer("127.0.0.1",8099);
            }
        }).start();
    }
}
 编写客户端业务处理类RpcClientHandler
public class RpcClientHandler extends SimpleChannelInboundHandler implements Callable {
    ChannelHandlerContext context;
    //客户端的消息
    String requestMsg;
    //服务端的消息
    String responseMsg;

    public void setRequestMsg(String requestMsg) {
        this.requestMsg = requestMsg;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        context = ctx;
    }

    
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        requestMsg = s;
        notify();
    }

    @Override
    public Object call() throws Exception {
        //消息发送
        context.writeAndFlush(requestMsg);
        //线程等待
        wait();
        return responseMsg;
    }
}
编写客户端Netty启动类 
public class RpcClient {
    private EventLoopGroup group;

    private Channel channel;

    private String ip;

    private int port;

    private RpcClientHandler rpcClientHandler = new RpcClientHandler();
    //线程池
    private ExecutorService executorService = Executors.newCachedThreadPool();

    public RpcClient(String ip, int port) {
        this.ip = ip;
        this.port = port;
        initClient();
    }

    public void initClient() {
        //创建线程组
        group = new NioEventLoopGroup();
        //创建启动助手
        Bootstrap bootstrap = new Bootstrap();
        //设置启动参数
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                .handler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(SocketChannel channel) throws Exception {
                        ChannelPipeline pipeline = channel.pipeline();
                        //String类型编解码器
                        pipeline.addLast(new StringDecoder());
                        pipeline.addLast(new StringEncoder());
                        //添加客户端处理类
                        pipeline.addLast(rpcClientHandler);
                    }
                });
        //4.连接Netty服务端
        try {
            channel = bootstrap.connect(ip, port).sync().channel();
        } catch (InterruptedException e) {
            e.printStackTrace();
            if (channel != null) {
                channel.close();
            }
            if (group != null) {
                group.shutdownGracefully();
            }
        }
    }
    
    public void close() {
        if (channel != null) {
            channel.close();
        }
        if (group != null) {
            group.shutdownGracefully();
        }
    }
    
    public Object send(String msg) throws ExecutionException, InterruptedException {
        rpcClientHandler.setRequestMsg(msg);
        Future submit = executorService.submit(rpcClientHandler);
        return submit.get();
    }
}

编写Rpc代理类
public class RpcClientProxy {
    public static Object createProxy(Class cla) {
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{cla}, new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                //封装request请求对象
                RpcRequest request = new RpcRequest();
                request.setRequestId(UUID.randomUUID().toString());
                request.setClassName(method.getDeclaringClass().getName());
                request.setMethodName(method.getName());
                request.setParameterTypes(method.getParameterTypes());
                request.setParameters(args);
                //创建RpcClient对象
                RpcClient rpcClient = new RpcClient("127.0.0.1", 8099);
                try {
                    //发送消息
                    Object requestMsg = rpcClient.send(JSON.toJSonString(request));
                    RpcResponse response = JSON.parseObject(requestMsg.toString(), RpcResponse.class);
                    if (response.getError() != null) {
                        throw new RuntimeException(response.getError());
                    }
                    //返回结果
                    Object result = response.getResult();
                    return JSON.parseObject(result.toString(), method.getReturnType());
                } catch (Exception e) {
                    throw e;
                } finally {
                    rpcClient.close();
                }

            }
        });
    }
}
 编写客户端启动类ClinetBootStrapApplication 
public class ClinetBootStrapApplication {
    public static void main(String[] args) {
        IOrderService orderService=(IOrderService)RpcClientProxy.createProxy(IOrderService.class);
        Order order = orderService.getById(1);
        System.out.println(order);
    }
}
启动服务端,在启动客户端,观察日志 

 到此就完成了使用Netty自定义RPC框架

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/769718.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号