之前谈到,网络传输使用BIO的方式,我们在simpleRPC-04改为NIO来传输,引入netty的编解码方式。
我们在simpleRPC-04中将使用netty来优化我们的客户端client和服务端server,
实现 项目创建创建module:simpleRPC-04
在java下创建package:com.rpc
我们配置pom.xml依赖如下:
commonSimpleRPC org.example 1.0-SNAPSHOT 4.0.0 simpleRPC-048 8 org.projectlombok lombok1.18.12 provided io.netty netty-all4.1.51.Final
我们的common和simpleRPC-03是一样的:
Blog.java
package com.rpc.common;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Blog implements Serializable {
private Integer id;
private Integer useId;
private String title;
}
RPCRequest.java
package com.rpc.common;
import lombok.Builder;
import lombok.Data;
import java.io.Serializable;
@Data
@Builder
public class RPCRequest implements Serializable {
// 服务类名,客户端只知道接口名,在服务端中用接口名指向实现类
private String interfaceName;
// 方法名
private String methodName;
// 参数列表
private Object[] params;
// 参数类型
private Class>[] paramsTypes;
}
RPCResponse.java
package com.rpc.common;
import lombok.Builder;
import lombok.Data;
import java.io.Serializable;
@Data
@Builder
public class RPCResponse implements Serializable {
// 状态信息
private int code;
private String message;
// 具体数据
private Object data;
public static RPCResponse success(Object data) {
return RPCResponse.builder().code(200).data(data).build();
}
public static RPCResponse fail() {
return RPCResponse.builder().code(500).message("服务器发生错误").build();
}
}
User.java
package com.rpc.common;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
@Builder
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {
// 客户端和服务端共有的
private Integer id;
private String userName;
private Boolean sex;
}
service
同样的,service和simpleRPC-04是一样的:
ServiceProvider.java
package com.rpc.service;
import java.util.HashMap;
import java.util.Map;
public class ServiceProvider {
private Map interfaceProvider;
// 构造函数,初始化一个空的 hashmap 赋给 Map interfaceProvider
public ServiceProvider() {
this.interfaceProvider = new HashMap<>();
}
public void provideServiceInterface(Object service) {
// 反射,.getClass().getInterfaces()得到class的interface,按照interfaces name(key)和object(value)存入map
Class>[] interfaces = service.getClass().getInterfaces();
for (Class clazz : interfaces) {
interfaceProvider.put(clazz.getName(), service);
}
}
public Object getService(String interfaceName) {
return interfaceProvider.get(interfaceName); // 通过interface name得到object
}
}
BlogService.java
package com.rpc.service;
import com.rpc.common.Blog;
public interface BlogService {
Blog getBlogById(Integer id);
}
BlogServiceImpl.java
package com.rpc.service;
import com.rpc.common.Blog;
public class BlogServiceImpl implements BlogService {
@Override
public Blog getBlogById(Integer id) {
Blog blog = Blog.builder()
.id(id)
.title("我的博客")
.useId(22).build();
System.out.println("客户端查询了" + id + "博客");
return blog;
}
}
UserService.java
package com.rpc.service;
import com.rpc.common.User;
public interface UserService {
// 客户端通过这个接口调用服务端的实现类
User getUserByUserId(Integer id);
// 给这个服务增加一个功能
Integer insertUserId(User user);
}
UserServiceImpl.java
package com.rpc.service;
import com.rpc.common.User;
public class UserServiceImpl implements UserService {
@Override
public User getUserByUserId(Integer id) {
// 模拟从数据库中取用户的行为
User user = User.builder()
.id(id)
.userName("he2121")
.sex(true).build();
System.out.println("客户端查询了" + id + "的用户");
return user;
}
@Override
public Integer insertUserId(User user) {
System.out.println("插入数据成功: " + user);
return 1;
}
}
server
我们的simpleRPC-04客户端和服务端用netty进行改进,
我们的PRCServer.java接口和simpleRPC-03一样:
RPCServer.java
package com.rpc.server;
public interface RPCServer {
void start(int port);
void stop();
}
我们创建一个NettyRPCServer.java来实现RPCServer接口:
代码和注释如下:
package com.rpc.server;
import com.rpc.service.ServiceProvider;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.AllArgsConstructor;
@AllArgsConstructor
public class NettyRPCServer implements RPCServer {
private ServiceProvider serviceProvider;
@Override
public void start(int port) {
// netty服务线程组负责建立连接(TCP/IP连接),work负责具体的请求
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
System.out.println("Netty服务端启动了");
try {
// 启动Netty服务器
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 初始化
serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)
.childHandler(new NettyServerInitializer(serviceProvider));
// 同步阻塞
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
// 死循环监听
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
@Override
public void stop() {
}
}
NettyServerInitializer.java
package com.rpc.server; import com.rpc.service.ServiceProvider; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LengthFieldbasedframeDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.serialization.ClassResolver; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; import lombok.AllArgsConstructor; @AllArgsConstructor public class NettyServerInitializer extends ChannelInitializer{ private ServiceProvider serviceProvider; @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 解码器:消息格式 [长度][消息体],解决粘包问题 pipeline.addLast(new LengthFieldbasedframeDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); // 编码器:计算当前待大宋消息的长度,写入到前4个字节中 pipeline.addLast(new LengthFieldPrepender(4)); // 这里使用的还是java 序列化方式, netty的自带的解码编码支持传输这种结构 pipeline.addLast(new ObjectEncoder()); pipeline.addLast(new ObjectDecoder(new ClassResolver() { @Override public Class> resolve(String className) throws ClassNotFoundException { return Class.forName(className); } })); pipeline.addLast(new NettyRPCServerHandler(serviceProvider)); } }
NettyRPCServerHandler.java
package com.rpc.server; import com.rpc.common.RPCRequest; import com.rpc.common.RPCResponse; import com.rpc.service.ServiceProvider; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import lombok.AllArgsConstructor; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @AllArgsConstructor public class NettyRPCServerHandler extends SimpleChannelInboundHandler{ private ServiceProvider serviceProvider; @Override protected void channelRead0(ChannelHandlerContext ctx, RPCRequest msg) throws Exception { // System.out.println(msg); RPCResponse response = getResponse(msg); ctx.writeAndFlush(response); ctx.close(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } // 这里和WorkThread里的getResponse差不多 RPCResponse getResponse(RPCRequest request) { // 得到服务名 String interfaceName = request.getInterfaceName(); // 得到服务器相应类 Object service = serviceProvider.getService(interfaceName); // 反射调用方法 Method method = null; try { method = service.getClass().getMethod(request.getMethodName(), request.getParamsTypes()); Object invoke = method.invoke(service, request.getParams()); return RPCResponse.success(invoke); } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { e.printStackTrace(); System.out.println("方法执行错误"); return RPCResponse.fail(); } } }
定义服务端:TestServer.java
package com.rpc.server;
import com.rpc.service.*;
public class TestServer {
public static void main(String[] args) {
UserService userService = new UserServiceImpl();
BlogService blogService = new BlogServiceImpl();
// Map serviceProvide = new HashMap<>();
// serviceProvide.put("com.ganghuan.myRPCVersion2.service.UserService",userService);
// serviceProvide.put("com.ganghuan.myRPCVersion2.service.BlogService",blogService);
ServiceProvider serviceProvider = new ServiceProvider();
serviceProvider.provideServiceInterface(userService); // 把userService存入 serviceProvider
serviceProvider.provideServiceInterface(blogService); // 把blogService存入 serviceProvider
// RPCServer RPCServer = new ThreadPoolRPCRPCServer(serviceProvider);
RPCServer RPCServer = new NettyRPCServer(serviceProvider);
RPCServer.start(8899);
}
}
client
接下来是客户端的改造
RPCClient.java和simpleRPC-03一样:
package com.rpc.client;
import com.rpc.common.RPCRequest;
import com.rpc.common.RPCResponse;
public interface RPCClient {
RPCResponse sendRequest(RPCRequest request);
}
RPCClientProxy.java 代理类需要稍微修改:
package com.rpc.client;
import com.rpc.common.RPCRequest;
import com.rpc.common.RPCResponse;
import lombok.AllArgsConstructor;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
@AllArgsConstructor
public class RPCClientProxy implements InvocationHandler {
private RPCClient client;
// jdk动态代理,每一次代理对象调用方法,会经过此方法增强(反射获取request对象,socket发送至客户端)
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// request的构建,使用了lombok中的builder,更加简洁
RPCRequest request = RPCRequest.builder().interfaceName(method.getDeclaringClass().getName())
.methodName(method.getName())
.params(args)
.paramsTypes(method.getParameterTypes())
.build();
// 数据传输
RPCResponse response = client.sendRequest(request);
// System.out.println(response);
return response.getData();
}
T getProxy(Class clazz) {
Object o = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this);
return (T)o;
}
}
NettyRPCClient.java
package com.rpc.client;
import com.rpc.common.RPCRequest;
import com.rpc.common.RPCResponse;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.AttributeKey;
public class NettyRPCClient implements RPCClient{
private static final Bootstrap bootstrap;
private static final EventLoopGroup evenLoopGroup;
private String host;
private int port;
// 构造函数
public NettyRPCClient(String host, int port) {
this.host = host;
this.port = port;
}
// netty客户端初始化,重复使用
static {
evenLoopGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(evenLoopGroup).channel(NioSocketChannel.class).handler(new NettyClientInitializer());
}
// 这里需要操作一下,因为netty的传输都是异步的,你发送request,会立刻返回一个值, 而不是想要的相应的response
@Override
public RPCResponse sendRequest(RPCRequest request) {
try {
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
Channel channel = channelFuture.channel();
// 发送数据
channel.writeAndFlush(request);
channel.closeFuture().sync();
// 阻塞的获得结果,通过给channel设计别名,获取特定名字下的channel中的内容(这个在hanlder中设置)
// AttributeKey是,线程隔离的,不会由线程安全问题。
// 实际上不应通过阻塞,可通过回调函数
AttributeKey key = AttributeKey.valueOf("RPCResponse");
RPCResponse response = channel.attr(key).get();
System.out.println(response);
return response;
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}
NettyClientInitializer.java
package com.rpc.client; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LengthFieldbasedframeDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.serialization.ClassResolver; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; public class NettyClientInitializer extends ChannelInitializer{ @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 消息格式 [长度][消息体] pipeline.addLast(new LengthFieldbasedframeDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); // 计算当前待大宋消息的长度,写入到前4个字节中 pipeline.addLast(new LengthFieldPrepender(4)); pipeline.addLast(new ObjectEncoder()); pipeline.addLast(new ObjectDecoder(new ClassResolver() { @Override public Class> resolve(String className) throws ClassNotFoundException { return Class.forName(className); } })); pipeline.addLast(new NettyClientHandler()); } }
NettyClientHandler.java
package com.rpc.client; import com.rpc.common.RPCResponse; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.AttributeKey; public class NettyClientHandler extends SimpleChannelInboundHandler{ @Override protected void channelRead0(ChannelHandlerContext ctx, RPCResponse msg) throws Exception { // 接收到response, 给channel设计别名,让sendRequest里读取response AttributeKey key = AttributeKey.valueOf("RPCResponse"); ctx.channel().attr(key).set(msg); ctx.channel().close(); } // 跟NettyRPCServerHandler一样 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
最后定义客户端:
TestClient.java
package com.rpc.client;
import com.rpc.common.Blog;
import com.rpc.common.User;
import com.rpc.service.BlogService;
import com.rpc.service.UserService;
public class TestClient {
public static void main(String[] args) {
// 构建一个使用java socket或者netty的客户端
RPCClient rpcClient = new NettyRPCClient("127.0.0.1", 8899);
// 把这个客户端传入代理客户端
RPCClientProxy rpcClientProxy = new RPCClientProxy(rpcClient);
// 代理客户端根据不同的服务,获得一个代理类, 并且这个代理类的方法以或者增强(封装数据,发送请求)
UserService userService = rpcClientProxy.getProxy(UserService.class);
// 服务的方法1
User userByUserId = userService.getUserByUserId(10);
System.out.println("从服务器端得到的user为:" + userByUserId);
// 服务的方法2
User user = User.builder().userName("张三").id(100).sex(true).build();
Integer integer = userService.insertUserId(user);
System.out.println("向服务器端插入数据" + integer);
// 服务的方法3
BlogService blogService = rpcClientProxy.getProxy(BlogService.class);
Blog blogById = blogService.getBlogById(10000);
System.out.println("从服务端得到的blog为:" + blogById);
}
}
文件结构
simpleRPC-04的文件结构如下
运行我们先运行 TestServer.java
然后运行 TestClient.java



