距离写的代码的时间比较久了,在这里只是简单的总结介绍一下。
简单实现了基于Netty的RPC框架并将其注册到Nacos,介绍内容包含序列化,自定义协议,负载均衡算法,Nacos相关服务。
文末有源码链接。
著名的分布式服务框架Dubbo使用Dubbo协议进行节点间通信,而Dubbo协议默认使用Netty作为基础通信组件。还有Zookeeper,RocketMQ等底层rpc通讯也使用的是Netty。因此学习Netty对掌握这些框架原理还是比不可少的。
序列化:
为实现传输数据和通信,序列化是比不可少的,选择合适的序列化算法是非常重要的,通常要选择性能高,生成字节数少的算法。这里我准备了以下实现:JDK自带的,Json ,还有一个貌似当初因为时间关系没实现的Protobuf,以后有时间补上。
Java{
@Override
public T deserialize(Class clazz, byte[] bytes) {
try {
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
return (T) ois.readObject();
} catch (IOException | ClassNotFoundException e) {
log.error("Failed to serialize deserialize java");
e.printStackTrace();
}
return null;
}
@Override
public byte[] serialize(T object) {
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(object);
return bos.toByteArray();
} catch (IOException e) {
log.error("Failed to serialize serialize java");
e.printStackTrace();
}
return null;
}
},
Json{
@Override
public T deserialize(Class clazz, byte[] bytes) {
Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new SerializeType.ClassCodec()).create();
//Gson gson=new Gson();
String json = new String(bytes, StandardCharsets.UTF_8);
log.debug("deserialize--{}",json);
System.out.println("json--deserialize--"+json);
return gson.fromJson(json, clazz);
}
@Override
public byte[] serialize(T object) {
//Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new SerializeType.ClassCodec()).create();
Gson gson=new Gson();
String json = null;
try {
json = gson.toJson(object);
} catch (Exception e) {
e.printStackTrace();
}
log.debug("serialize--{}",json);
System.out.println("json--serialize--"+json);
return json.getBytes(StandardCharsets.UTF_8);
}
},
Protobuf{
@Override
public T deserialize(Class clazz, byte[] bytes) {
return null;
}
@Override
public byte[] serialize(T object) {
return new byte[0];
}
};
class ClassCodec implements JsonSerializer>, JsonDeserializer> {
@Override
public Class> deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
try {
String str = json.getAsString();
return Class.forName(str);
} catch (ClassNotFoundException e) {
throw new JsonParseException(e);
}
}
@Override // String.class
public JsonElement serialize(Class> src, Type typeOfSrc, JsonSerializationContext context) {
// class -> json
return new JsonPrimitive(src.getName());
}
}
自定义的协议:
@ChannelHandler.Sharable public class MessageCodec extends MessageToMessageCodec{ @Override protected void encode(ChannelHandlerContext channelHandlerContext, Message message, List
负载均衡算法:实现了两个分别是:随机和轮换
public interface LoadBalancer {
Instance getInstance(List instances);
}
public class RandomLoadBalance implements LoadBalancer{
private static final Random random = new Random();
@Override
public Instance getInstance(List instances) {
return instances.get(random.nextInt(instances.size()));
}
}
public class RoundRobinLoadBalance implements LoadBalancer{
private AtomicInteger atomicInteger=new AtomicInteger(0);
@Override
public Instance getInstance(List instances) {
int current;
int next;
do{
current=atomicInteger.get();
next = current >= Integer.MAX_VALUE ? 0 : current + 1;
}while (!atomicInteger.compareAndSet(current,next));
return instances.get(next%instances.size());
}
}
解决粘包/拆包的方法:
主要思想是添加一个长度字段,用于记录偏移量。
public class ProcotolframeDecoder extends LengthFieldbasedframeDecoder {
public ProcotolframeDecoder() {
this(1024, 16, 4, 0, 0);
}
public ProcotolframeDecoder(int maxframeLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {
super(maxframeLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
}
}
关于在Nacos获取服务的部分:
自动扫描注册
public static void registerTogNacos(){
Set> classes = ServicesFactory.map.keySet();
for (Class clazz:classes
) {
try {
NacosRegistryCentre.namingService.registerInstance(clazz.getName(),Config.IP,Config.PORT);
} catch (NacosException e) {
log.error("{} 注册nacos异常",clazz.getName());
e.printStackTrace();
}
}
}
public class NacosRegistryCentre implements RegistryCentre{
private static ConcurrentHashMap>> notified = new ConcurrentHashMap<>();
private static ConcurrentHashMap>> instancesCache = new ConcurrentHashMap<>();
private final RoundRobinLoadBalance roundRobinLoadBalance=new RoundRobinLoadBalance();
public static NamingService namingService;
static {
try {
namingService = NamingFactory.createNamingService(Config.NACOS_SERVER_ADDR);
} catch (NacosException e) {
log.error("Failed to connect to nacos");
e.printStackTrace();
throw new RuntimeException(e);
}
}
@Override
public Instance getServices(String serviceName, String GroupName) {
try {
List allInstances = namingService.getAllInstances(serviceName, GroupName);
return roundRobinLoadBalance.getInstance(allInstances);
} catch (NacosException e) {
e.printStackTrace();
}
throw new RuntimeException("no services efficient");
}
@Override
public Instance getServices(String serviceName) {
try {
List allInstances = namingService.getAllInstances(serviceName);
return roundRobinLoadBalance.getInstance(allInstances);
} catch (NacosException e) {
e.printStackTrace();
}
throw new RuntimeException("no services efficient");
}
}
源码:https://gitee.com/alice-175/netty-rpc



