本次改进主要针对序列化的方式进行改进。之前一直在使用java自带的序列化方式,或者是netty的序列化方式,性能不高。
这个版本我们定义自己的序列化方式。
实现 项目创建创建一个名为simpleRPC-05的 module
创建名为com.rpc的包
依赖配置我们需要新加入一个fastjson依赖:
pom.xml
commonSimpleRPC org.example 1.0-SNAPSHOT 4.0.0 simpleRPC-058 8 org.projectlombok lombok1.18.12 provided io.netty netty-all4.1.51.Final com.alibaba fastjson1.2.67
我们需要修改 RPCResponse.java 来适应新的序列化方式:
RPCResponse.java
package com.rpc.common;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import java.io.Serializable;
@Data
@Builder
@AllArgsConstructor
public class RPCResponse implements Serializable {
private int code;
private String message;
// 更新,这里我们需要加入这个,不然用其它序列化方式(除了java Serialize)得不到data的type
private Class> dataType;
private Object data;
public static RPCResponse success(Object data) {
return RPCResponse.builder().code(200).data(data).dataType(data.getClass()).build();
}
public static RPCResponse fail() {
return RPCResponse.builder().code(500).message("服务器发生错误").build();
}
}
RPCRequest.java 跟原来一样
package com.rpc.common;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import java.io.Serializable;
@Data
@Builder
@AllArgsConstructor
public class RPCRequest implements Serializable {
// 服务类名,客户端只知道接口名,在服务端中用接口名指向实现类
private String interfaceName;
// 方法名
private String methodName;
// 参数列表
private Object[] params;
// 参数类型
private Class>[] paramsTypes;
}
Blog.java
package com.rpc.common;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Blog implements Serializable {
private Integer id;
private Integer useId;
private String title;
}
User.java
package com.rpc.common;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {
// 客户端和服务端共有的,模拟RPC中传输的信息
private Integer id;
private String userName;
private Boolean sex;
}
service
ServiceProvider.java
package com.rpc.server;
import java.util.HashMap;
import java.util.Map;
public class ServiceProvider {
private Map interfaceProvider;
// 构造函数,初始化一个空的 hashmap 赋给 Map interfaceProvider
public ServiceProvider() {
this.interfaceProvider = new HashMap<>();
}
public void provideServiceInterface(Object service) {
// 反射,.getClass().getInterfaces()得到class的interface,按照interfaces name(key)和object(value)存入map
Class>[] interfaces = service.getClass().getInterfaces();
for (Class clazz : interfaces) {
interfaceProvider.put(clazz.getName(), service);
}
}
public Object getService(String interfaceName) {
return interfaceProvider.get(interfaceName); // 通过interface name得到object
}
}
UserService.java
package com.rpc.server;
import com.rpc.common.User;
public interface UserService {
// 客户端通过这个接口调用服务端的实现类
User getUserByUserId(Integer id);
// 给这个服务增加一个功能
Integer insertUserId(User user);
}
UserServiceImpl.java
package com.rpc.server;
import com.rpc.common.User;
public class UserServiceImpl implements UserService {
@Override
public User getUserByUserId(Integer id) {
// 模拟从数据库中取用户的行为
User user = User.builder()
.id(id)
.userName("he2121")
.sex(true).build();
System.out.println("客户端查询了" + id + "的用户");
return user;
}
@Override
public Integer insertUserId(User user) {
System.out.println("插入数据成功: " + user);
return 1;
}
}
BlogService.java
package com.rpc.server;
import com.rpc.common.Blog;
public interface BlogService {
Blog getBlogById(Integer id);
}
BlogServiceImpl.java
package com.rpc.server;
import com.rpc.common.Blog;
public class BlogServiceImpl implements BlogService {
@Override
public Blog getBlogById(Integer id) {
Blog blog = Blog.builder()
.id(id)
.title("我的博客")
.useId(22).build();
System.out.println("客户端查询了" + id + "博客");
return blog;
}
}
codec
我们定义序列化接口:
Serializer.java
package com.rpc.codec;
public interface Serializer {
// 把对象序列化成字节数组
byte[] serialize(Object obj);
// 从字节数组反序列化成消息, 使用java自带序列化方式不用messageType也能得到相应的对象(序列化字节数组里包含类信息)
// 其它方式需指定消息格式,再根据message转化成相应的对象
Object deserialize(byte[] bytes, int messageType);
// 返回使用的序列器,是哪个
// 0:java自带序列化方式, 1: json序列化方式
int getType();
// 根据序号取出序列化器,暂时有两种实现方式,需要其它方式,实现这个接口即可
static Serializer getSerializerByCode(int code){
switch (code){
case 0:
return new ObjectSerializer();
case 1:
return new JsonSerializer();
default:
return null;
}
}
}
定义序列化类
ObjectSerializer.java
package com.rpc.codec;
import java.io.*;
public class ObjectSerializer implements Serializer {
// 利用java IO 对象 -> 字节数组
@Override
public byte[] serialize(Object obj) {
byte[] bytes = null;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try {
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(obj);
oos.flush();
bytes = bos.toByteArray();
oos.close();
bos.close();
} catch (IOException e) {
e.printStackTrace();
}
return bytes;
}
// 字节数组 -> 对象
@Override
public Object deserialize(byte[] bytes, int messageType) {
Object obj = null;
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
try {
ObjectInputStream ois = new ObjectInputStream(bis);
obj = ois.readObject();
ois.close();
bis.close();
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
}
return obj;
}
// 0 代表java原生序列化器
@Override
public int getType() {
return 0;
}
}
JsonSerializer.java
package com.rpc.codec;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.rpc.common.RPCRequest;
import com.rpc.common.RPCResponse;
public class JsonSerializer implements Serializer{
// 序列化底层是FASTJSON把对象转化为字节流(byte数组)
@Override
public byte[] serialize(Object obj) {
byte[] bytes = JSONObject.toJSonBytes(obj);
return bytes;
}
// 反序列化
@Override
public Object deserialize(byte[] bytes, int messageType) {
Object obj = null;
// 传输的消息分为request与response
switch (messageType){
case 0: // 如果是request
RPCRequest request = JSON.parseObject(bytes, RPCRequest.class);
// 修bug 参数为空 直接返回
if(request.getParams() == null) return request;
Object[] objects = new Object[request.getParams().length];
// 把json字串转化成对应的对象, fastjson可以读出基本数据类型,不用转化
for(int i = 0; i < objects.length; i++){
Class> paramsType = request.getParamsTypes()[i];
if (!paramsType.isAssignableFrom(request.getParams()[i].getClass())){
objects[i] = JSONObject.toJavaObject((JSONObject) request.getParams()[i],request.getParamsTypes()[i]);
}else{
objects[i] = request.getParams()[i];
}
}
request.setParams(objects);
obj = request;
break;
case 1: // 如果是response
RPCResponse response = JSON.parseObject(bytes, RPCResponse.class);
Class> dataType = response.getDataType();
if(! dataType.isAssignableFrom(response.getData().getClass())){
response.setData(JSONObject.toJavaObject((JSONObject) response.getData(),dataType));
}
obj = response;
break;
default:
System.out.println("暂时不支持此种消息");
throw new RuntimeException();
}
return obj;
}
// 1 代表着json序列化方式
@Override
public int getType() {
return 1;
}
}
MessageType.java
package com.rpc.codec;
import lombok.AllArgsConstructor;
@AllArgsConstructor
public enum MessageType {
REQUEST(0),RESPonSE(1);
private int code;
public int getCode() {
return code;
}
}
MyEncode.java
package com.rpc.codec;
import com.rpc.common.RPCRequest;
import com.rpc.common.RPCResponse;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import lombok.AllArgsConstructor;
@AllArgsConstructor
public class MyEncode extends MessageToByteEncoder {
private Serializer serializer;
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
// 写入消息类型
if(msg instanceof RPCRequest){
out.writeShort(MessageType.REQUEST.getCode());
}
else if(msg instanceof RPCResponse){
out.writeShort(MessageType.RESPONSE.getCode());
}
// 写入序列化方式
out.writeShort(serializer.getType());
// 得到序列化数组
byte[] serialize = serializer.serialize(msg);
// 写入长度
out.writeInt(serialize.length);
// 写入序列化字节数组
out.writeBytes(serialize);
}
}
MyDecode.java
package com.rpc.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.AllArgsConstructor;
import java.util.List;
@AllArgsConstructor
public class MyDecode extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List
client
客户端部分和simpleRPC-04基本一样,但是NettyClientHandler.java 需要修改一下:
NettyClientInitializer.java
package com.rpc.client; import com.rpc.codec.JsonSerializer; import com.rpc.codec.MyDecode; import com.rpc.codec.MyEncode; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LengthFieldbasedframeDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.serialization.ClassResolver; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; public class NettyClientInitializer extends ChannelInitializer{ @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 使用自定义的编解码器 pipeline.addLast(new MyDecode()); // 编码需要传入序列化器,这里是json,还支持ObjectSerializer,也可以自己实现其他的 pipeline.addLast(new MyEncode(new JsonSerializer())); pipeline.addLast(new NettyClientHandler()); } }
其他部分都一样。
serverserver部分和simpleRPC-04基本一样,同样的,NettyServerInitializer.java需要修改一下:
NettyServerInitializer.java
package com.rpc.server; import com.rpc.codec.JsonSerializer; import com.rpc.codec.MyDecode; import com.rpc.codec.MyEncode; import com.rpc.service.ServiceProvider; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LengthFieldbasedframeDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.serialization.ClassResolver; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; import lombok.AllArgsConstructor; @AllArgsConstructor public class NettyServerInitializer extends ChannelInitializer{ private ServiceProvider serviceProvider; @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 使用自定义的解码器 pipeline.addLast(new MyDecode()); // 使用自定义的编码器,而且解码器需要传入序列化器,这里是json,还支持ObjectSerializer,也可以自己实现其他的 pipeline.addLast(new MyEncode(new JsonSerializer())); pipeline.addLast(new NettyRPCServerHandler(serviceProvider)); } }
其他部分一样。
文件结构simpleRPC-05文件结构如下:
运行TestServer.java
再运行TestClient.java



