- 消息协议
- 自定义消息协议
- 协议定义
- 通过Netty实现自定义消息协议
- 1. 工程目录
- 2. netty-msg-agreement
- 3. netty-msg-server
- 4. netty-msg-client
消息协议的概念听起来非常的高大上,但是消息协议到底指代的是什么?
消息协议是指通讯双方传输的数据(消息)是如何表达描述的。
如 HTTP 协议,浏览器在打开一个网页是,首先和服务端建立连接,然后发送请求(请求中主要包括一些请求头、请求类型、请求URL、请求报文等),服务端接收到请求后,首先会对当前请求通过既定规则进行解析,然后响应返回响应的数据流给客户端,这些既定规则也就是消息协议。
自定义消息协议那么自定义消息一般包括哪些内容呢?如:
- 版本号,
- 消息类型, 请求/响应, GET、POST、DELETE
- 消息长度
- 消息的正文
- 序列化算法
- …
如何自定义一个消息协议???
协议定义statusCode | sessionId | reqType | contentLength | content
上面我们自定义了一个协议,其中 statusCode 表示状态代码,sessionId、reqType、contentLength就是请求头信息,content为消息内容。
下面通过Netty框架来完成一个自定义消息协议的定义,以及客户端、服务端使用当前协议进行数据交换的过程。
首先创建一个Maven工程,其中 netty-msg-agreement 表示自定义消息协议,netty-msg-client 表示客户端,netty-msg-server 表示服务端,client、server模块依赖与agreement模块。
netty-msg-protocol pom.xml 依赖:添加netty-all依赖以及lombox依赖。
2. netty-msg-agreementio.netty >netty-all4.1.69.Final org.projectlombok >lombok1.18.14 provided
其中MessageRecord代表消息记录,客户端服务端通过该对象进行消息传递。消息传递过程中,客户端服务端通过MessageRecordDecoder(解码器)、MessageRecordEncoder(编码器)进行消息编码和解码。
主要代码如下:
2.1 MessageRecord
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MessageRecord {
// 状态代码:4个字节
private int statusCode;
// 消息请求头
private Header header;
// 消息内容
private Object body;
}
2.1 Header
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Header {
// 会话id:8个字节
private long sessionId;
// 请求方式:1个字节
private byte reqType;
// 请求体长度:4个字节
private int contentLength;
}
2.3 MessageRecordEncoder-编码器
public class MessageRecordEncoder extends MessageToByteEncoder{ @Override protected void encode(ChannelHandlerContext channelHandlerContext, MessageRecord messageRecord, ByteBuf byteBuf) throws Exception { System.out.println(">>>>>>>>>>>消息编码 start>>>>>>>>>>>"); // 状态行 byteBuf.writeInt(messageRecord.getStatusCode()); // 请求头 Header header = messageRecord.getHeader(); byteBuf.writeLong(header.getSessionId()); byteBuf.writeByte(header.getReqType()); Object body = messageRecord.getBody(); if (body != null){// 消息内容不为空 ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); ObjectOutputStream out = new ObjectOutputStream(outputStream); out.writeObject(body); byte[] bytes = outputStream.toByteArray(); // 消息长度 byteBuf.writeInt(bytes.length); // 消息内容 byteBuf.writeBytes(bytes); }else {// 消息内容为空 byteBuf.writeInt(0); } // 写入并且刷新 channelHandlerContext.writeAndFlush(messageRecord); System.out.println(">>>>>>>>>>>消息编码 end>>>>>>>>>>>"); } }
2.4 MessageRecordDecoder-解码器
public class MessageRecordDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List
2.5 枚举
public enum RequestTypeEnums {
GET((byte) 1),
POST((byte) 2),
DELETE((byte) 3),
;
private byte reqType;
RequestTypeEnums(byte reqType) {
this.reqType = reqType;
}
public byte getReqType() {
return this.reqType;
}
}
public enum StatusCodeEnums {
SUCCESS(0, "成功"),
FAIL(-1, "失败"),
EXCEPTION(-2, "异常"),
;
private int statusCode;
private String desc;
StatusCodeEnums(int statusCode, String desc) {
this.statusCode = statusCode;
this.desc = desc;
}
public int getStatusCode() {
return statusCode;
}
public String getDesc() {
return this.desc;
}
}
3. netty-msg-server
其中,ProtocolServer 为服务启动类,等待客户端连接。ServerFinalHeaders 为服务端消息处理类。
在netty-msg-server pom.xml 中添加 netty-msg-agreement 依赖。
org.example >netty-msg-agreement1.0-SNAPSHOT
3.1 ProtocolServer
public class ProtocolServer {
public static void main(String[] args) {
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup work = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, work).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new LengthFieldbasedframeDecoder(1024 * 1024,
13, // statusCode + sessionId + reqType
4, // 请求体长度
0,
0))
.addLast(new MessageRecordDecoder())
.addLast(new MessageRecordEncoder())
.addLast(new ServerFinalHeaders());
}
});
try {
int port = 8080;
ChannelFuture future = bootstrap.bind(port).sync();
System.out.println(">>>>>>>>>>ProtocolServer start success>>>>>>>>>>" + port);
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
boss.shutdownGracefully();
work.shutdownGracefully();
}
}
}
3.2 ServerFinalHeaders
public class ServerFinalHeaders extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
MessageRecord messageRecord = (MessageRecord) msg;
System.out.println("Server 收到消息内容为:" + messageRecord);
// 把消息写回客户端
messageRecord.setBody("server data:" + messageRecord.getBody());
ctx.channel().writeAndFlush(messageRecord);
super.channelRead(ctx, msg);
}
}
4. netty-msg-client
其中,ProtocolClient 为客户端启动类,通过该类与服务端ProtocolServer建立连接,并传递消息。ClientFinalHeaders 为客户端消息处理类。
在netty-msg-server pom.xml 中添加 netty-msg-agreement 依赖。
org.example >netty-msg-agreement1.0-SNAPSHOT
4.1 ProtocolClient
public class ProtocolClient {
public static void main(String[] args) {
EventLoopGroup work = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2);
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(work).channel(NioSocketChannel.class)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new LengthFieldbasedframeDecoder(1024 * 1024,
13, // statusCode + sessionId + reqType
4, // 请求体长度
0,
0))
.addLast(new MessageRecordDecoder())
.addLast(new MessageRecordEncoder())
.addLast(new ClientFinalHeaders());
}
});
try {
ChannelFuture future = bootstrap.connect(new InetSocketAddress("localhost", 8080)).sync();
Channel channel = future.channel();
for (int i = 0; i < 5; i++) {
MessageRecord msg = new MessageRecord();
msg.setStatusCode(StatusCodeEnums.SUCCESS.getStatusCode());
Header header = new Header();
header.setSessionId(System.currentTimeMillis());
header.setReqType(RequestTypeEnums.POST.getReqType());
msg.setHeader(header);
String body = String.format("第%s条请求数据:%s", i + 1, UUID.randomUUID().toString());
msg.setBody(body);
channel.writeAndFlush(msg);
}
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
work.shutdownGracefully();
}
}
}
4.2 ClientFinalHeaders
public class ClientFinalHeaders extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
MessageRecord messageRecord = (MessageRecord) msg;
System.out.println("Client 收到消息内容为:" + messageRecord);
super.channelRead(ctx, msg);
}
}



