栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Netty(四)

Netty(四)

Netty 四

RPC基本介绍

RPC基本介绍
    RPC(Remote Procedure Call 远程过程调用)是一个计算机通信协议. 用于在一个程序调用执行另一个程序的子程序(如方法)两个或多个程序部署在不同的服务器上, 只要都实现了 RPC, 互相之间调用方法是和本地调用一样RPC中, Client叫服务消费者, Server叫服务提供者常见的 RPC框架有: 阿里的 Dubbo, Google的 gRPC, Go语言的 rpcx, Apache的 thrift, Spring Cloud

RPC调用流程

用 Netty自己实现 Dubbo RPC:

    创建一个接口, 定义抽象方法(用于消费者和提供者之间的约定创建一个提供者, 该类需要监听消费者的请求, 并按照约定返回数据创建一个消费者, 该类调用自己不存在的方法, 通过 Netty请求提供者返回数据
// 用于消费者和提供者之间的约定的接口
public interface HelloService {
    String hello(String mes);
}

public class HelloServiceImpl implements HelloService {
    private static int count = 0; // 调用累计数

    @Override
    public String hello(String mes) {
        System.out.println("收到客户端消息=" + mes);
        return "你好客户端, 我已经收到你的消息 (" + mes + ") 第" + (++count) + "次";
    }

}

public class NettyServer {
    public static void startServer(String hostName, int port) {
        startServer0(hostName, port);
    }

    private static void startServer0(String hostname, int port) {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer() {
                                      @Override
                                      protected void initChannel(SocketChannel ch) throws Exception {
                                          ChannelPipeline pipeline = ch.pipeline();
                                          pipeline.addLast(new StringDecoder());
                                          pipeline.addLast(new StringEncoder());
                                          pipeline.addLast(new NettyServerHandler());

                                      }
                                  }
                    );
            ChannelFuture channelFuture = serverBootstrap.bind(hostname, port).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("客户端调用了服务, 协议 + 消息=" + msg);
        // 客户端调用 RPC时, 需要自定义一个协议. 例如: 指定某字符串开头 "RPC://hello.你好"
        String message = msg.toString();
        if (message.startsWith(ClientBootstrap.providerName)) {
            String result = new HelloServiceImpl().hello(
                    message.split(ClientBootstrap.providerName)[1]
            );
            ctx.writeAndFlush(result);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

// 服务提供者(NettyServer
public class ServerBootstrap {
    public static void main(String[] args) {
        NettyServer.startServer("127.0.0.1", 7000);
    }
}

public class NettyClient {
    // 创建线程池
    private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    private static NettyClientHandler client;

    // 编写方法使用代理模式, 获取一个代理对象
    public Object getBean(final Class serivceClass, final String providerName) {
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                new Class[]{serivceClass}, (proxy, method, args) -> {
                    if (client == null) {
                        initClient();
                    }
                    // 设置要发给服务器端的信息: providerName协议头 args[0]
                    client.setParams(providerName + args[0]);

                    return executor.submit(client).get();
                });
    }

    // 初始化 Netty客户端
    private static void initClient() {
        client = new NettyClientHandler();
        NioEventLoopGroup group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(
                        new ChannelInitializer() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast(new StringDecoder());
                                pipeline.addLast(new StringEncoder());
                                pipeline.addLast(client);
                            }
                        }
                );
        try {
            bootstrap.connect("127.0.0.1", 7000).sync();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
    private ChannelHandlerContext context;
    private String result;
    private String params;

    // 与服务器的连接创建后调用 (1)
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(" channelActive()被调用 ");
        context = ctx;
    }

    // 收到服务器的数据后调用 (4)
    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(" channelRead()被调用 ");
        result = msg.toString();
        notify(); // 唤醒等待的线程
    }

    // 被代理对象调用: 发送数据给服务器 -> wait() -> 等待被唤醒(channelRead) -> 返回结果(3) -> (5)
    @Override
    public synchronized Object call() throws Exception {
        System.out.println(" call()被调用 1 ");
        context.writeAndFlush(params);
        // 进行 wait(), 等待直到 channelRead方法获取到服务器的结果后, 唤醒
        wait();
        System.out.println(" call()被调用 2 ");
        // 服务方返回的结果
        return result;
    }

    // (2)
    void setParams(String params) {
        System.out.println(" setParams()被调用 ");
        this.params = params;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

public class ClientBootstrap {
    public static final String providerName = "RPC://hello."; // 定义协议头

    public static void main(String[] args) throws Exception {
        // 创建一个消费者
        NettyClient customer = new NettyClient();
        // 创建代理对象
        HelloService helloService = (HelloService) customer.getBean(HelloService.class, providerName);
        for (; ; ) {
            Thread.sleep(2 * 1000);
            // 通过代理对象, 调用服务提供者的方法
            String res = helloService.hello("hello!!!");
            System.out.println("调用的结果=" + res);
        }
    }
}

如果您觉得有帮助,欢迎点赞哦 ~ 谢谢!!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/701644.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号