栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

基于Netty实现自定义消息通信协议(协议设计及解析应用实战)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

基于Netty实现自定义消息通信协议(协议设计及解析应用实战)

  • 请求序号:基于双工协议,提供异步能力,也就是收到的异步消息需要找到前面的通信请求进行响应处理

  • 消息长度

  • 消息正文

协议定义

====

sessionId | reqType | Content-Length | Content |

其中Version,Content-Length,SessionId就是Header信息,Content就是交互的主体。

定义项目结构以及引入包


io.netty

netty-all

org.slf4j

slf4j-log4j12

org.projectlombok

lombok

项目结构如图4-1所示:

  • netty-message-mic : 表示协议模块。

  • netty-message-server :表示nettyserver。

图4-1

  • 引入log4j.properties

在nettyMessage-mic中,包的结构如下。

定义Header


表示消息头

@Data

public class Header{

private long sessionId; //会话id : 占8个字节

private byte type; //消息类型:占1个字节

private int length; //消息长度 : 占4个字节

}

定义MessageRecord


表示消息体

@Data

public class MessageRecord{

private Header header;

private Object body;

}

OpCode


定义操作类型

public enum OpCode {

《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》无偿开源 威信搜索公众号【编程进阶路】
BUSI_REQ((byte)0),

BUSI_RESP((byte)1),

PING((byte)3),

PONG((byte)4);

private byte code;

private OpCode(byte code) {

this.code=code;

}

public byte code(){

return this.code;

}

}

定义编解码器

======

分别定义对该消息协议的编解码器

MessageRecordEncoder


@Slf4j

public class MessageRecordEncoder extends MessageToByteEncoder {

@Override

protected void encode(ChannelHandlerContext channelHandlerContext, MessageRecord record, ByteBuf byteBuf) throws Exception {

log.info(“=开始编码Header部分=”);

Header header=record.getHeader();

byteBuf.writeLong(header.getSessionId()); //保存8个字节的sessionId

byteBuf.writeByte(header.getType()); //写入1个字节的请求类型

log.info(“=开始编码Body部分=”);

Object body=record.getBody();

if(body!=null){

ByteArrayOutputStream bos=new ByteArrayOutputStream();

ObjectOutputStream oos=new ObjectOutputStream(bos);

oos.writeObject(body);

byte[] bytes=bos.toByteArray();

byteBuf.writeInt(bytes.length); //写入消息体长度:占4个字节

byteBuf.writeBytes(bytes); //写入消息体内容

}else{

byteBuf.writeInt(0); //写入消息长度占4个字节,长度为0

}

}

}

MessageRecordDecode


@Slf4j

public class MessageRecordDecode extends ByteToMessageDecoder {

@Override

protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception {

MessageRecord record=new MessageRecord();

Header header=new Header();

header.setSessionId(byteBuf.readLong()); //读取8个字节的sessionid

header.setType(byteBuf.readByte()); //读取一个字节的操作类型

record.setHeader(header);

//如果byteBuf剩下的长度还有大于4个字节,说明body不为空

if(byteBuf.readableBytes()>4){

int length=byteBuf.readInt(); //读取四个字节的长度

header.setLength(length);

byte[] contents=new byte[length];

byteBuf.readBytes(contents,0,length);

ByteArrayInputStream bis=new ByteArrayInputStream(contents);

ObjectInputStream ois=new ObjectInputStream(bis);

record.setBody(ois.readObject());

list.add(record);

log.info(“序列化出来的结果:”+record);

}else{

log.error(“消息内容为空”);

}

}

}

测试协议的解析和编码


EmbeddedChannel是netty专门改进针对ChannelHandler的单元测试而提供的

public class CodesMainTest {

public static void main( String[] args ) throws Exception {

EmbeddedChannel channel=new EmbeddedChannel(

new LoggingHandler(),

new MessageRecordEncoder(),

new MessageRecordDecode());

Header header=new Header();

header.setSessionId(123456);

header.setType(OpCode.PING.code());

MessageRecord record=new MessageRecord();

record.setBody(“Hello World”);

record.setHeader(header);

channel.writeOutbound(record);

ByteBuf buf= ByteBufAllocator.DEFAULT.buffer();

new MessageRecordEncoder().encode(null,record,buf);

channel.writeInbound(buf);

}

}

编码包分析


运行上述代码后,会得到下面的一个信息

±------------------------------------------------+

| 0 1 2 3 4 5 6 7 8 9 a b c d e f |

±-------±------------------------------------------------±---------------+

|00000000| 00 00 00 00 00 01 e2 40 03 00 00 00 12 ac ed 00 |…@…|

|00000010| 05 74 00 0b 48 65 6c 6c 6f 20 57 6f 72 6c 64 |.t…Hello World |

±-------±------------------------------------------------±---------------+

按照协议规范:

  • 前面8个字节表示sessionId

  • 一个字节表示请求类型

  • 4个字节表示长度

  • 后面部分内容表示消息体

测试粘包和半包问题

=========

通过slice方法进行拆分,得到两个包。

ByteBuf中提供了一个slice方法,这个方法可以在不做数据拷贝的情况下对原始ByteBuf进行拆分。

public class CodesMainTest {

public static void main( String[] args ) throws Exception {

//EmbeddedChannel是netty专门针对ChannelHandler的单元测试而提供的类。可以通过这个类来测试channel输入入站和出站的实现

EmbeddedChannel channel=new EmbeddedChannel(

//解决粘包和半包问题

// new LengthFieldBasedFrameDecoder(2048,10,4,0,0),

new LoggingHandler(),

new MessageRecordEncoder(),

new MessageRecordDecode());

Header header=new Header();

header.setSessionId(123456);

header.setType(OpCode.PING.code());

MessageRecord record=new MessageRecord();

record.setBody(“Hello World”);

record.setHeader(header);

channel.writeOutbound(record);

ByteBuf buf= ByteBufAllocator.DEFAULT.buffer();

new MessageRecordEncoder().encode(null,record,buf);

//模拟半包和粘包问题***//

//把一个包通过slice拆分成两个部分

ByteBuf bb1=buf.slice(0,7); //获取前面7个字节

ByteBuf bb2=buf.slice(7,buf.readableBytes()-7); //获取后面的字节

bb1.retain();

channel.writeInbound(bb1);

channel.writeInbound(bb2);

}

}

运行上述代码会得到如下异常, readerIndex(0) +length(8)表示要读取8个字节,但是只收到7个字节,所以直接报错。

2021-08-31 15:53:01,385 [io.netty.handler.logging.LoggingHandler]-[DEBUG] [id: 0xembedded, L:embedded - R:embedded] READ: 7B

±------------------------------------------------+

| 0 1 2 3 4 5 6 7 8 9 a b c d e f |

±-------±------------------------------------------------±---------------+

|00000000| 00 00 00 00 00 01 e2 |… |

±-------±------------------------------------------------±---------------+

2021-08-31 15:53:01,397 [io.netty.handler.logging.LoggingHandler]-[DEBUG] [id: 0xembedded, L:embedded - R:embedded] READ COMPLETE

Exception in thread “main” io.netty.handler.codec.DecoderException: java.lang.IndexOutOfBoundsException: readerIndex(0) + length(8) exceeds writerIndex(7): UnpooledSlicedByteBuf(ridx: 0, widx: 7, cap: 7/7, unwrapped: PooledUnsafeDirectByteBuf(ridx: 0, widx: 31, cap: 256))

解决拆包问题

======

LengthFieldBasedFrameDecoder是长度域解码器,它是解决拆包粘包最常用的解码器,基本上能覆盖大部分基于长度拆包的场景。其中开源的消息中间件RocketMQ就是使用该解码器进行解码的。

首先来说明一下该解码器的核心参数

  • lengthFieldOffset,长度字段的偏移量,也就是存放长度数据的起始位置

  • lengthFieldLength,长度字段锁占用的字节数

  • lengthAdjustment,在一些较为复杂的协议设计中,长度域不仅仅包含消息的长度,还包含其他数据比如版本号、数据类型、数据状态等,这个时候我们可以使用lengthAdjustment进行修正,它的值=包体的长度值-长度域的值

  • initialBytesToStrip,解码后需要跳过的初始字节数,也就是消息内容字段的起始位置

  • lengthFieldEndOffset,长度字段结束的偏移量, 该属性的值=lengthFieldOffset+lengthFieldLength

public class CodesMainTest {

public static void main( String[] args ) throws Exception {

EmbeddedChannel channel=new EmbeddedChannel(

//解决粘包和半包问题

new LengthFieldBasedFrameDecoder(1024,

9,4,0,0),

new LoggingHandler(),

new MessageRecordEncoder(),

new MessageRecordDecode());

Header header=new Header();

header.setSessionId(123456);

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/845682.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号