栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

通过Netty完成自定义消息协议设计

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

通过Netty完成自定义消息协议设计

  • 消息协议
  • 自定义消息协议
    • 协议定义
    • 通过Netty实现自定义消息协议
      • 1. 工程目录
      • 2. netty-msg-agreement
      • 3. netty-msg-server
      • 4. netty-msg-client

消息协议

消息协议的概念听起来非常的高大上,但是消息协议到底指代的是什么?

消息协议是指通讯双方传输的数据(消息)是如何表达描述的。

如 HTTP 协议,浏览器在打开一个网页是,首先和服务端建立连接,然后发送请求(请求中主要包括一些请求头、请求类型、请求URL、请求报文等),服务端接收到请求后,首先会对当前请求通过既定规则进行解析,然后响应返回响应的数据流给客户端,这些既定规则也就是消息协议。

自定义消息协议

那么自定义消息一般包括哪些内容呢?如:

  • 版本号,
  • 消息类型, 请求/响应, GET、POST、DELETE
  • 消息长度
  • 消息的正文
  • 序列化算法

如何自定义一个消息协议???

协议定义

statusCode | sessionId | reqType | contentLength | content

上面我们自定义了一个协议,其中 statusCode 表示状态代码,sessionId、reqType、contentLength就是请求头信息,content为消息内容。
下面通过Netty框架来完成一个自定义消息协议的定义,以及客户端、服务端使用当前协议进行数据交换的过程。

通过Netty实现自定义消息协议 1. 工程目录


首先创建一个Maven工程,其中 netty-msg-agreement 表示自定义消息协议,netty-msg-client 表示客户端,netty-msg-server 表示服务端,client、server模块依赖与agreement模块。

netty-msg-protocol pom.xml 依赖:添加netty-all依赖以及lombox依赖。


    io.netty
    >netty-all
    4.1.69.Final


    org.projectlombok
    >lombok
    1.18.14
    provided

2. netty-msg-agreement


其中MessageRecord代表消息记录,客户端服务端通过该对象进行消息传递。消息传递过程中,客户端服务端通过MessageRecordDecoder(解码器)、MessageRecordEncoder(编码器)进行消息编码和解码。

主要代码如下:

2.1 MessageRecord

@Data
@AllArgsConstructor
@NoArgsConstructor
public class MessageRecord {
    // 状态代码:4个字节
    private int statusCode;
    // 消息请求头
    private Header header;
    // 消息内容
    private Object body;
}

2.1 Header

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Header {
    // 会话id:8个字节
    private long sessionId;
    // 请求方式:1个字节
    private byte reqType;
    // 请求体长度:4个字节
    private int contentLength;
}

2.3 MessageRecordEncoder-编码器

public class MessageRecordEncoder extends MessageToByteEncoder {
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, MessageRecord messageRecord, ByteBuf byteBuf) throws Exception {
        System.out.println(">>>>>>>>>>>消息编码 start>>>>>>>>>>>");
        // 状态行
        byteBuf.writeInt(messageRecord.getStatusCode());

        // 请求头
        Header header = messageRecord.getHeader();
        byteBuf.writeLong(header.getSessionId());
        byteBuf.writeByte(header.getReqType());

        Object body = messageRecord.getBody();
        if (body != null){// 消息内容不为空
            ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
            ObjectOutputStream out = new ObjectOutputStream(outputStream);
            out.writeObject(body);
            byte[] bytes = outputStream.toByteArray();
            // 消息长度
            byteBuf.writeInt(bytes.length);
            // 消息内容
            byteBuf.writeBytes(bytes);
        }else {// 消息内容为空
            byteBuf.writeInt(0);
        }
        // 写入并且刷新
        channelHandlerContext.writeAndFlush(messageRecord);
        System.out.println(">>>>>>>>>>>消息编码 end>>>>>>>>>>>");
    }
}

2.4 MessageRecordDecoder-解码器

public class MessageRecordDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List out) throws Exception {
        System.out.println(">>>>>>>>>>>消息解码 start>>>>>>>>>>>");
        // 通过byteBuf获取数据
        int statusCode = byteBuf.readInt();// 获取4个字节
        Header header = new Header();
        header.setSessionId(byteBuf.readLong());// 获取8个字节
        header.setReqType(byteBuf.readByte());
        header.setContentLength(byteBuf.readInt());// 获取4个字节
        if (header.getContentLength() > 0){// 消息长度大于0
            MessageRecord messageRecord = new MessageRecord();
            messageRecord.setStatusCode(statusCode);
            messageRecord.setHeader(header);
            // 获取消息体
            byte[] bytes = new byte[header.getContentLength()];
            byteBuf.readBytes(bytes);// 读取消息体内容到 bytes
            ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);// java自带反序列化工具
            ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
            messageRecord.setBody(objectInputStream.readObject());
            System.out.println("收到消息内容为:" + messageRecord);

            // 注意:需要将消息传输对象添加到 `List out` 中,如果不添加,服务端接收处理不到消息内容
            out.add(messageRecord);
        }else {
            System.out.println("消息内容为空,不解析");
        }
        System.out.println(">>>>>>>>>>>消息解码 end>>>>>>>>>>>");
    }
}
 

2.5 枚举

public enum RequestTypeEnums {
    GET((byte) 1),
    POST((byte) 2),
    DELETE((byte) 3),
    ;

    private byte reqType;

    RequestTypeEnums(byte reqType) {
        this.reqType = reqType;
    }

    public byte getReqType() {
        return this.reqType;
    }
}


public enum StatusCodeEnums {
    SUCCESS(0, "成功"),
    FAIL(-1, "失败"),
    EXCEPTION(-2, "异常"),
    ;

    private int statusCode;
    private String desc;

    StatusCodeEnums(int statusCode, String desc) {
        this.statusCode = statusCode;
        this.desc = desc;
    }

    public int getStatusCode() {
        return statusCode;
    }

    public String getDesc() {
        return this.desc;
    }
}
3. netty-msg-server


其中,ProtocolServer 为服务启动类,等待客户端连接。ServerFinalHeaders 为服务端消息处理类。

在netty-msg-server pom.xml 中添加 netty-msg-agreement 依赖。


   org.example
    >netty-msg-agreement
    1.0-SNAPSHOT

3.1 ProtocolServer

public class ProtocolServer {

    public static void main(String[] args) {
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup work = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2);
        ServerBootstrap bootstrap = new ServerBootstrap();

        bootstrap.group(boss, work).channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline()
                                .addLast(new LengthFieldbasedframeDecoder(1024 * 1024,
                                        13, // statusCode + sessionId + reqType
                                        4, // 请求体长度
                                        0,
                                        0))
                                .addLast(new MessageRecordDecoder())
                                .addLast(new MessageRecordEncoder())
                                .addLast(new ServerFinalHeaders());
                    }
                });

        try {
            int port = 8080;
            ChannelFuture future = bootstrap.bind(port).sync();
            System.out.println(">>>>>>>>>>ProtocolServer start success>>>>>>>>>>" + port);
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            boss.shutdownGracefully();
            work.shutdownGracefully();
        }
    }
}

3.2 ServerFinalHeaders

public class ServerFinalHeaders extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        MessageRecord messageRecord = (MessageRecord) msg;
        System.out.println("Server 收到消息内容为:" + messageRecord);

        // 把消息写回客户端
        messageRecord.setBody("server data:" + messageRecord.getBody());
        ctx.channel().writeAndFlush(messageRecord);
        super.channelRead(ctx, msg);
    }
}
4. netty-msg-client


其中,ProtocolClient 为客户端启动类,通过该类与服务端ProtocolServer建立连接,并传递消息。ClientFinalHeaders 为客户端消息处理类。

在netty-msg-server pom.xml 中添加 netty-msg-agreement 依赖。


   org.example
    >netty-msg-agreement
    1.0-SNAPSHOT

4.1 ProtocolClient

public class ProtocolClient {

    public static void main(String[] args) {
        EventLoopGroup work = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2);
        Bootstrap bootstrap = new Bootstrap();

        bootstrap.group(work).channel(NioSocketChannel.class)
                .handler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline()
                                .addLast(new LengthFieldbasedframeDecoder(1024 * 1024,
                                        13, // statusCode + sessionId + reqType
                                        4, // 请求体长度
                                        0,
                                        0))
                                .addLast(new MessageRecordDecoder())
                                .addLast(new MessageRecordEncoder())
                                .addLast(new ClientFinalHeaders());
                    }
                });

        try {
            ChannelFuture future = bootstrap.connect(new InetSocketAddress("localhost", 8080)).sync();
            Channel channel = future.channel();
            for (int i = 0; i < 5; i++) {
                MessageRecord msg = new MessageRecord();
                msg.setStatusCode(StatusCodeEnums.SUCCESS.getStatusCode());

                Header header = new Header();
                header.setSessionId(System.currentTimeMillis());
                header.setReqType(RequestTypeEnums.POST.getReqType());
                msg.setHeader(header);

                String body = String.format("第%s条请求数据:%s", i + 1, UUID.randomUUID().toString());
                msg.setBody(body);

                channel.writeAndFlush(msg);
            }
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            work.shutdownGracefully();
        }
    }
}

4.2 ClientFinalHeaders

public class ClientFinalHeaders extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        MessageRecord messageRecord = (MessageRecord) msg;
        System.out.println("Client 收到消息内容为:" + messageRecord);
        super.channelRead(ctx, msg);
    }
}
转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号