进行这个章节之前,需要去看一下RMI的实现哈,如果了解过的童鞋可以直接跳过,如果没有活着不知道RMI的童鞋,移驾到下面的链接看完之后再回来继续看这篇
RPC系列之入门_阿小冰的博客-CSDN博客RPC系列之入门https://blog.csdn.net/qq_38377525/article/details/123507599?spm=1001.2014.3001.5502
介绍
说到RPC,应该能想的到Dubbo吧, Dubbo的底层是使用了Netty作为网络通讯框架,Netty赋予了Dubbo使用RPC远程调用服务,那接下来我们体验一下用Netty实现一个简单的RPC框架,消费者和提供者约定接口和协议,消费者远程调用提供者的服务
需求案例1、创建一个接口,定义一个抽象方法,用于约定消费者和提供者之间的调用
2、创建一个提供者,需要监听消费者的请求,并按照1中的约束返回数据
3、创建一个消费者,需要透明的调用自己不存在的方法,内部需要使用Netty实现数据通信
4、提供者与消费者数据传输使用json字符串数据格式
5、提供者使用netty集成SpringBoot环境来实现需求
客户端远程调用服务端提供一个根据id查询订单的方法
代码实现 1、服务端代码编写Rpc注解
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface Rpc {
} 编写接口类 public interface IOrderService {
Order getById(int id);
} 编写实现类 @Service
@Rpc
public class OrderServiceImpl implements IOrderService {
Map 服务业务处理类RpcServerHandler @Component @ChannelHandler.Sharable public class RpcServerHandler extends SimpleChannelInboundHandler编写Netty启动类RpcServerimplements ApplicationContextAware { //本地缓存 private static final Map SERVICE_INSTANCE_MAP = new ConcurrentHashMap(); @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { Map serviceMap = applicationContext.getBeansWithAnnotation(Rpc.class); if (serviceMap != null && serviceMap.size() > 0) { Set > entries = serviceMap.entrySet(); for (Map.Entry entry : entries) { Object entryValue = entry.getValue(); if (entryValue.getClass().getInterfaces().length == 0) { throw new RuntimeException("服务必须实现接口"); } //默认获取第一个接口作为缓存bean的名称 String name = entryValue.getClass().getInterfaces()[0].getName(); SERVICE_INSTANCE_MAP.put(name, entryValue); } } } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception { //接收客户端请求,将msg装换为RpcRequest对象 RpcRequest request = JSON.parseObject(s, RpcRequest.class); RpcResponse response = new RpcResponse(); response.setRequestId(request.getRequestId()); try { //业务处理 response.setResult(handler(request)); } catch (Exception e) { e.printStackTrace(); response.setError(e.getMessage()); } //返回客户端 channelHandlerContext.write(JSON.toJSonString(response)); } public Object handler(RpcRequest rpcRequest) throws InvocationTargetException { // 3.根据传递过来的beanName从缓存中查找到对应的bean Object serviceBean = SERVICE_INSTANCE_MAP.get(rpcRequest.getClassName()); if (serviceBean == null) { throw new RuntimeException("根据beanName找不到服务,beanName:" + rpcRequest.getClassName()); } //4.解析请求中的方法名称. 参数类型 参数信息 Class> serviceBeanClass = serviceBean.getClass(); String methodName = rpcRequest.getMethodName(); Class>[] parameterTypes = rpcRequest.getParameterTypes(); Object[] parameters = rpcRequest.getParameters(); //5.反射调用bean的方法- CGLIB反射调用 FastClass fastClass = FastClass.create(serviceBeanClass); FastMethod method = fastClass.getMethod(methodName, parameterTypes); return method.invoke(serviceBean, parameters); } }
@Service
public class RpcServer implements DisposableBean {
private NioEventLoopGroup bossGroup;
private NioEventLoopGroup workerGroup;
@Autowired
RpcServerHandler rpcServerHandler;
public void startServer(String ip, int port) {
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
//绑定端口
try {
//2.创建服务端启动助手
ServerBootstrap serverBootstrap = new ServerBootstrap();
//3.设置参数
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
//业务处理
pipeline.addLast(rpcServerHandler);
}
});
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
System.out.println("====================服务端启动成功!=====================");
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
}
}
@Override
public void destroy() throws Exception {
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
}
} 编写启动类ServerBootstrapApplication @SpringBootApplication
public class ServerBootstrapApplication implements CommandLineRunner {
@Autowired
RpcServer rpcServer;
public static void main(String[] args) {
SpringApplication.run(ServerBootstrapApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
new Thread(new Runnable() {
@Override
public void run() {
rpcServer.startServer("127.0.0.1",8099);
}
}).start();
}
} 编写客户端业务处理类RpcClientHandler public class RpcClientHandler extends SimpleChannelInboundHandler编写客户端Netty启动类implements Callable { ChannelHandlerContext context; //客户端的消息 String requestMsg; //服务端的消息 String responseMsg; public void setRequestMsg(String requestMsg) { this.requestMsg = requestMsg; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { context = ctx; } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception { requestMsg = s; notify(); } @Override public Object call() throws Exception { //消息发送 context.writeAndFlush(requestMsg); //线程等待 wait(); return responseMsg; } }
public class RpcClient {
private EventLoopGroup group;
private Channel channel;
private String ip;
private int port;
private RpcClientHandler rpcClientHandler = new RpcClientHandler();
//线程池
private ExecutorService executorService = Executors.newCachedThreadPool();
public RpcClient(String ip, int port) {
this.ip = ip;
this.port = port;
initClient();
}
public void initClient() {
//创建线程组
group = new NioEventLoopGroup();
//创建启动助手
Bootstrap bootstrap = new Bootstrap();
//设置启动参数
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
//String类型编解码器
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
//添加客户端处理类
pipeline.addLast(rpcClientHandler);
}
});
//4.连接Netty服务端
try {
channel = bootstrap.connect(ip, port).sync().channel();
} catch (InterruptedException e) {
e.printStackTrace();
if (channel != null) {
channel.close();
}
if (group != null) {
group.shutdownGracefully();
}
}
}
public void close() {
if (channel != null) {
channel.close();
}
if (group != null) {
group.shutdownGracefully();
}
}
public Object send(String msg) throws ExecutionException, InterruptedException {
rpcClientHandler.setRequestMsg(msg);
Future submit = executorService.submit(rpcClientHandler);
return submit.get();
}
} 编写Rpc代理类
public class RpcClientProxy {
public static Object createProxy(Class cla) {
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{cla}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//封装request请求对象
RpcRequest request = new RpcRequest();
request.setRequestId(UUID.randomUUID().toString());
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParameters(args);
//创建RpcClient对象
RpcClient rpcClient = new RpcClient("127.0.0.1", 8099);
try {
//发送消息
Object requestMsg = rpcClient.send(JSON.toJSonString(request));
RpcResponse response = JSON.parseObject(requestMsg.toString(), RpcResponse.class);
if (response.getError() != null) {
throw new RuntimeException(response.getError());
}
//返回结果
Object result = response.getResult();
return JSON.parseObject(result.toString(), method.getReturnType());
} catch (Exception e) {
throw e;
} finally {
rpcClient.close();
}
}
});
}
} 编写客户端启动类ClinetBootStrapApplication public class ClinetBootStrapApplication {
public static void main(String[] args) {
IOrderService orderService=(IOrderService)RpcClientProxy.createProxy(IOrderService.class);
Order order = orderService.getById(1);
System.out.println(order);
}
} 启动服务端,在启动客户端,观察日志
到此就完成了使用Netty自定义RPC框架



