栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

RPC框架设计-4-Netty高级应用

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

文章目录
  • 前言
  • 一、Netty编解码器
    • 1.1 Java的编解码
    • 1.2 Netty编解码器
    • 1.3 编码器(Encoder)
    • 1.4 编码解码器Codec
  • 二、Netty案例-群聊天室
    • 2.1 聊天室服务端编写
    • 2.2 聊天室客户端代码
  • 三、基于Netty的Http服务器开发
    • 3.1 介绍
    • 3.2 功能需求
    • 3.3 服务端代码实现
  • 四、基于Netty的WebSocket开发网页版聊天室
    • 4.1 WebSocket简介
    • 4.2 WebSocket和HTTP的区别
    • 4.3 代码实现
  • 五、Netty中粘包和拆包的解决方案
    • 5.1 粘包和拆包简介
    • 5.2 用代码来演示一下粘包和拆包的效果
    • 5.3 粘包和拆包的解决方法


前言 一、Netty编解码器 1.1 Java的编解码
  1. 编码(Encode)称为序列化, 它将对象序列化为字节数组,用于网络传输、数据持久化或者其它用途。
  2. 解码(Decode)称为反序列化,它把从网络、磁盘等读取的字节数组还原成原始对象(通常是原始对象的拷贝),以方便后续的业务逻辑操作。

    java序列化对象只需要实现java.io.Serializable接口并生成序列化ID,这个类就能够通过 java.io.ObjectInput和java.io.ObjectOutput序列化和反序列化。

Java序列化目的:

  1. 网络传输。
  2. 对象持久化。
    Java序列化缺点:
  3. 无法跨语言。
  4. 序列化后码流太大。
  5. 序列化性能太低。
    Java序列化仅仅是Java编解码技术的一种,由于它的种种缺陷,衍生出了多种编解码技术和框架,这些编解码框架实现消息的高效序列化。
1.2 Netty编解码器
  1. 概念
    在网络应用中需要实现某种编解码器,将原始字节数据与自定义的消息对象进行互相转换。网络中都是以字节码的数据形式来传输数据的,服务器编码数据后发送到客户端,客户端需要对数据进行解码。
    对于Netty而言,编解码器由两部分组成:编码器、解码器。
    解码器:负责将消息从字节或其他序列形式转成指定的消息对象。
    编码器:将消息对象转成字节或其他序列形式在网络上传输。

Netty 的编(解)码器实现了 ChannelHandlerAdapter,也是一种特殊的 ChannelHandler,所以依赖于 ChannelPipeline,可以将多个编(解)码器链接在一起,以实现复杂的转换逻辑。
Netty里面的编解码: 解码器:负责处理“入站 InboundHandler”数据。 编码器:负责“出站OutboundHandler” 数据。

  1. 解码器(Decoder)
    解码器负责 解码“入站”数据从一种格式到另一种格式,解码器处理入站数据是抽象 ChannelInboundHandler的实现。需要将解码器放在ChannelPipeline中。

对于解码器,Netty中主要提供了抽象基类ByteToMessageDecoder和MessageToMessageDecoder

抽象解码器
1)ByteToMessageDecoder: 用于将字节转为消息,需要检查缓冲区是否有足够的字节
2)ReplayingDecoder: 继承ByteToMessageDecoder,不需要检查缓冲区是否有足够的字节,但是 ReplayingDecoder速度略慢于ByteToMessageDecoder,同时不是所有的ByteBuf都支持。 项目复杂性高则使用ReplayingDecoder,否则使用ByteToMessageDecoder
3)MessageToMessageDecoder: 用于从一种消息解码为另外一种消息(例如POJO到POJO)

核心方法:

decode(ChannelHandlerContext ctx, ByteBuf msg, List out)
 

代码实现-解码器:

package com.lagou.codec;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.util.CharsetUtil;

import java.util.List;


public class MessageDecoder extends MessageToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, Object msg, List list) throws Exception {
        System.out.println("正在进行消息解码....");
        ByteBuf byteBuf = (ByteBuf) msg;
        // 传递到下一个handler
        list.add(byteBuf.toString(CharsetUtil.UTF_8));
    }
}

添加了解码器并且将之传递到下一个handler后我们需要将他添加到我们的pipeline中,加到我们自定义处理器之前:


我们在服务端和客户端都加好了解码器之后那么我们自定义处理器中就不需要再进行转换了

1.3 编码器(Encoder)

与ByteToMessageDecoder和MessageToMessageDecoder相对应,Netty提供了对应的编码器实现MessageToByteEncoder和MessageToMessageEncoder,二者都实现 ChannelOutboundHandler接口。

抽象编码器

  1. MessageToByteEncoder: 将消息转化成字节
    2.MessageToMessageEncoder: 用于从一种消息编码为另外一种消息(例如POJO到POJO)
    核心方法:
encode(ChannelHandlerContext ctx, String msg, List out)
 

代码实现-编码器:

package com.lagou.codec;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.util.CharsetUtil;

import java.util.List;


public class MessageEncoder extends MessageToMessageEncoder {

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, String msg, List list) throws Exception {
        System.out.println("正在进行消息编码....");
        // 传递到下一个handler
        list.add(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
    }
}
 

然后操作和上面解码器一样,将编码器添加到pipeline中,修改自定义handler中的输出



1.4 编码解码器Codec

编码解码器: 同时具有编码与解码功能,特点同时实现了ChannelInboundHandler和ChannelOutboundHandler接口,因此在数据输入和输出时都能进行处理。

Netty提供提供了一个ChannelDuplexHandler适配器类,编码解码器的抽象基类 ByteToMessageCodec ,MessageToMessageCodec都继承与此类.

代码实现-编码解码器:

package com.lagou.codec;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;
import io.netty.util.CharsetUtil;

import java.util.List;


public class MessageCodec extends MessageToMessageCodec {
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Object msg, List list) throws Exception {
        System.out.println("正在进行消息编码....");
        // 传递到下一个handler
        list.add(Unpooled.copiedBuffer((String) msg, CharsetUtil.UTF_8));
    }

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, Object msg, List list) throws Exception {
        System.out.println("正在进行消息解码....");
        ByteBuf byteBuf = (ByteBuf) msg;
        // 传递到下一个handler
        list.add(byteBuf.toString(CharsetUtil.UTF_8));
    }
}

然后服务端和客户端就可以只添加这一个了:

二、Netty案例-群聊天室

案例要求:

  1. 编写一个 Netty 群聊系统,实现服务器端和客户端之间的数据简单通讯
  2. 实现多人群聊
  3. 服务器端:可以监测用户上线,离线,并实现消息转发功能
  4. 客户端:可以发送消息给其它所有用户,同时可以接受其它用户发送的消息

代码编写:

2.1 聊天室服务端编写

NettyChatServer

package com.lagou.chat;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;


public class NettyChatServer {

    private int port;

    public NettyChatServer(int port) {
        this.port = port;
    }


    public void run() throws InterruptedException {
        NioEventLoopGroup bossGroup = null;
        NioEventLoopGroup workerGroup = null;
        try {
        // 1. 创建bossGroup线程组: 处理网络事件--连接事件
        bossGroup = new NioEventLoopGroup(1);// BoseGroup一般只需要一个线程就够了,因为只需要处理连接事件
        // 2. 创建workerGroup线程组: 处理网络事件--读写事件
        workerGroup = new NioEventLoopGroup(); //不写线程数,默认是2*处理器线程数
        // 3. 创建服务端启动助手
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        // 4. 设置bossGroup线程组和workerGroup线程组
        serverBootstrap.group(bossGroup,workerGroup)
                // 5. 设置服务端通道实现为NIO
                .channel(NioServerSocketChannel.class)
                // 6. 参数设置
                // option主要针对bossGroup,设置连接等待的个数:初始化服务器可连接队列大小
                .option(ChannelOption.SO_BACKLOG,128)
                // childOption主要针对workerGroup,设置通道的活跃状态,
                .childOption(ChannelOption.SO_KEEPALIVE,Boolean.TRUE)
                // 7. 创建一个通道初始化对象(作用就是向pipeline中添加自定义业务处理handler)
                .childHandler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(SocketChannel channel) throws Exception {
                        // 添加编解码器
                        channel.pipeline().addLast(new StringDecoder());
                        channel.pipeline().addLast(new StringEncoder());
                        // 8. 向pipeline中添加自定义业务处理handler
                        channel.pipeline().addLast(new NettyChatServerHandle());
                    }
                });

        // 9. 启动服务端并绑定端口,同时将异步改为同步(如果启动绑定端口失败的话就没意义了,所以需要同步)
        // ChannelFuture future = serverBootstrap.bind(9999).sync();
        ChannelFuture future = serverBootstrap.bind(port);
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (channelFuture.isSuccess()) {
                    System.out.println("端口绑定成功!");
                } else {
                    System.out.println("端口绑定失败!");
                }
            }
        });
        System.out.println("聊天室服务端启动成功...");
        // 10. 关闭通道和关闭连接池,同样也将异步改为同步(并不是真正意义上的关闭,而是监听通道关闭的状态(事件),监听到以后才继续执行,所以必须得是同步的动作)
        future.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new NettyChatServer(9998).run();
    }
}

自定义服务端处理handler

package com.lagou.chat;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.ArrayList;
import java.util.List;


public class NettyChatServerHandle extends SimpleChannelInboundHandler {

    public static List channelList = new ArrayList<>();

    
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 当有新的客户端连接的时候,将通道放入集合
        Channel channel = ctx.channel();
        channelList.add(channel);
        System.out.println("[Server]:" + channel.remoteAddress().toString().substring(1) + "上线了");
    }

    
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        // 当有客户端断开连接的时候就移除对应的通道
        Channel channel = ctx.channel();
        channelList.remove(channel);
        System.out.println("[Server]:" + channel.remoteAddress().toString().substring(1) + "下线了");
    }

    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        Channel channel = ctx.channel();
        channelList.remove(channel);
        System.out.println("[Server]:" + channel.remoteAddress().toString().substring(1) + "发生了异常,强制下线");
    }

    
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        // 当前发送消息的通道--也就是当前发送消息的客户端
        Channel channel = ctx.channel();
        for (Channel channel1 : channelList) {
            // 排除自身通道(不能给自身发消息)
            if (channel != channel1) {
                channel1.writeAndFlush("[" + channel.remoteAddress().toString().substring(1) + "]说:" + msg);
            }
        }
    }
}

2.2 聊天室客户端代码

NettyChatClient

package com.lagou.chat;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.util.Scanner;


public class NettyChatClient {

    // 服务端IP
    private String ip;

    // 服务端端口号
    private int port;

    public NettyChatClient(String ip, int port) {
        this.ip = ip;
        this.port = port;
    }

    public void run() throws InterruptedException {
        NioEventLoopGroup group = null;
        try {
            // 1. 创建线程组
            group = new NioEventLoopGroup();
            // 2. 创建客户端启动助手
            Bootstrap bootstrap = new Bootstrap();
            // 3. 设置线程组
            bootstrap.group(group)
                    // 4. 设置客户端通道实现为NIO
                    .channel(NioSocketChannel.class)
                    // 5. 创建一个通道初始化对象
                    .handler(new ChannelInitializer() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            // 添加编解码器
                            socketChannel.pipeline().addLast(new StringDecoder());
                            socketChannel.pipeline().addLast(new StringEncoder());
                            // 6. 向pipeline中添加自定义业务处理handler
                            socketChannel.pipeline().addLast(new NettyChatClientHandle());
                        }
                    });
            // 7. 启动客户端,等待连接服务端,同时将异步改为同步
            ChannelFuture channelFuture = bootstrap.connect(ip, port).sync();
            Channel channel = channelFuture.channel();
            System.out.println("-----"+channel.localAddress().toString().substring(1)+"-----");
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNextLine()) {
                String msg = scanner.nextLine();
                // 向聊天室的服务端发消息
                channel.writeAndFlush(msg);
            }
            // 8. 关闭通道和关闭连接池(同样的这里也不是直接关闭通道,也是监听关闭通道的事件)
            channel.closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new NettyChatClient("127.0.0.1", 9998).run();
    }
}

自定义客户端处理handler

package com.lagou.chat;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;


public class NettyChatClientHandle extends SimpleChannelInboundHandler {

    
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(msg);
    }
}

三、基于Netty的Http服务器开发 3.1 介绍

Netty的HTTP协议栈无论在性能还是可靠性上,都表现优异,非常适合在非Web容器的场景下应用,相比于传统的Tomcat、Jetty等Web容器,它更加轻量和小巧,灵活性和定制性也更好。

3.2 功能需求
  1. Netty 服务器在 8080 端口监听
  2. 浏览器发出请求 "http://localhost:8080/ "
  3. 服务器可以回复消息给客户端 "Hello! 我是Netty服务器 " ,并对特定请求资源进行过滤(比如icon).
3.3 服务端代码实现
  1. NettyHttpServer
package com.lagou.http;

import com.lagou.chat.NettyChatServerHandle;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;


public class NettyHttpServer {

    private int port;

    public NettyHttpServer(int port) {
        this.port = port;
    }


    public void run() throws InterruptedException {
        NioEventLoopGroup bossGroup = null;
        NioEventLoopGroup workerGroup = null;
        try {
        // 1. 创建bossGroup线程组: 处理网络事件--连接事件
        bossGroup = new NioEventLoopGroup(1);// BoseGroup一般只需要一个线程就够了,因为只需要处理连接事件
        // 2. 创建workerGroup线程组: 处理网络事件--读写事件
        workerGroup = new NioEventLoopGroup(); //不写线程数,默认是2*处理器线程数
        // 3. 创建服务端启动助手
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        // 4. 设置bossGroup线程组和workerGroup线程组
        serverBootstrap.group(bossGroup,workerGroup)
                // 5. 设置服务端通道实现为NIO
                .channel(NioServerSocketChannel.class)
                // 6. 参数设置
                // option主要针对bossGroup,设置连接等待的个数:初始化服务器可连接队列大小
                .option(ChannelOption.SO_BACKLOG,128)
                // childOption主要针对workerGroup,设置通道的活跃状态,
                .childOption(ChannelOption.SO_KEEPALIVE,Boolean.TRUE)
                // 7. 创建一个通道初始化对象(作用就是向pipeline中添加自定义业务处理handler)
                .childHandler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(SocketChannel channel) throws Exception {
                        ChannelPipeline pipeline = channel.pipeline();
                        // 添加Http编解码器
                        pipeline.addLast(new HttpServerCodec());
                        // 8. 向pipeline中添加自定义业务处理handler
                        pipeline.addLast(new NettyHttpServerHandler());
                    }
                });

        // 9. 启动服务端并绑定端口,同时将异步改为同步(如果启动绑定端口失败的话就没意义了,所以需要同步)
        // ChannelFuture future = serverBootstrap.bind(9999).sync();
        ChannelFuture future = serverBootstrap.bind(port);
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (channelFuture.isSuccess()) {
                    System.out.println("端口绑定成功!");
                } else {
                    System.out.println("端口绑定失败!");
                }
            }
        });
        System.out.println("HTTP服务端启动成功...");
        // 10. 关闭通道和关闭连接池,同样也将异步改为同步(并不是真正意义上的关闭,而是监听通道关闭的状态(事件),监听到以后才继续执行,所以必须得是同步的动作)
        future.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new NettyHttpServer(8080).run();
    }
}

注意,编解码器和之前的不一样了哈:HttpServerCodec

NettyHttpServerHandler

package com.lagou.http;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;


public class NettyHttpServerHandler extends SimpleChannelInboundHandler {

    
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
        // 1、判断请求是否是Http请求
        if (msg instanceof HttpRequest) {
            DefaultHttpRequest request = (DefaultHttpRequest) msg;
            System.out.println("浏览器请求路径:" + request.uri());
            if ("/favicon.ico".equalsIgnoreCase(request.uri())) {
                System.out.println("图标不响应");
                return;
            }
            // 2、给浏览器进行响应
            ByteBuf byteBuf = Unpooled.copiedBuffer("Hello! 我是Netty服务器", CharsetUtil.UTF_8);
            DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK,
                    byteBuf);
            // 设置响应头
            response.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/html;charset=utf-8");
            response.headers().set(HttpHeaderNames.CONTENT_LENGTH,byteBuf.readableBytes());
            // 消息出站
            ctx.writeAndFlush(response);
        }
    }
}
四、基于Netty的WebSocket开发网页版聊天室 4.1 WebSocket简介

WebSocket是一种在单个TCP连接上进行全双工通信的协议。WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,客户端和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。
应用场景十分广泛:

  1. 社交订阅
  2. 协同编辑/编程
  3. 股票基金报价
  4. 体育实况更新
  5. 多媒体聊天
  6. 在线教育
4.2 WebSocket和HTTP的区别

http协议是用在应用层的协议,他是基于tcp协议的,http协议建立连接也必须要有三次握手才能发送信息。 http连接分为短连接,长连接,短连接是每次请求都要三次握手才能发送自己的信息。即每一 个request对应一个response。长连接是在一定的期限内保持连接。保持TCP连接不断开。客户端与服务器通信,必须要有客户端先发起, 然后服务器返回结果。客户端是主动的,服务器是被动的。 客户端 要想实时获取服务端消息就得不断发送长连接到服务端.
WebSocket实现了多路复用,他是全双工通信。在webSocket协议下服务端和客户端可以同时发送信息。 建立了WebSocket连接之后, 服务端可以主动发送信息到客户端。而且信息当中不必在带有head 的部分信息了与http的长链接通信来说,这种方式,不仅能降低服务器的压力。而且信息当中也减少了部分多余的信息。

4.3 代码实现

代码:记得放代码链接
整体和上面的聊天差不多,前端部分是在网上找的资源,下面看一些比较重要的以及和前面不一样的地方

  1. 首先是配置文件中需要引入web模块和模板引擎,以及netty
    什么是模板引擎?
    模板引擎的作用就是我们来写一个页面模板,比如有些值呢,是动态的,我们写一些表达式。而这些值,从哪来呢,就是我们在后台封装一些数据。然后把这个模板和这个数据交给我们模板引擎,模板引擎按照我们这个数据帮你把这表达式解析、填充到我们指定的位置,然后把这个数据最终生成一个我们想要的内容给我们写出去,这就是我们这个模板引擎,不管是jsp还是其他模板引擎,都是这个思想。
    引入模板引擎的目的是什么?
    我们之前开发,我们需要将前端转成jsp页面,jsp好处就是当我们查出一些数据转发到JSP页面以后,我们可以用jsp轻松实现数据的显示,及交互等。
    jsp支持非常强大的功能,包括能写Java代码,但是呢,我们现在的这种情况,SpringBoot这个项目首先是以jar的方式,不是war,其二,我们用的还是嵌入式的Tomcat,所以呢,他现在默认是不支持jsp的。
    那该怎么办呢?:SpringBoot推荐我们可以来使用模板引擎。
    SpringBoot给我们推荐的模板引擎就是Thymeleaf,这模板引擎是一个高级语言的模板引擎,他的这个语法更简单,而且功能更强大
    关于Thymeleaf模板引擎大家可以看下这篇:Spring Boot——Thymeleaf
  2. 然后就是配置文件和配置文件读取的config,这个大家用过SpringBoot就好理解了

  3. NettyServer和之前的NettyServer差不多,没什么大变化,除了实现了一个Runable接口,另外就是自定义了一个通道初始化类


    通道初始化类中就和之前的不一样了

    首先要开启对大数据流的支持
    然后需要将pose请求信息包装秤同一个request和response
    还需要进行协议的升级,http升级为ws协议,支持websocket
  4. 自定义处理类中有几个比较重要的点
    首先是泛型需要是:TextWebSocketFrame(websocket数据是帧的形式处理)
    很重要
    然后需要设置通道共享添加@ChannelHandler.Sharable,因为@Component修饰对象是单例的,如果不设置通道共享的话,就没法开通多个客户端了

    其他事件的操作和之前没什么区别
  5. 再对SpringBoot的启动类进行改造,Spring容器启动完成后执行Netty服务的启动
  6. 至于客户端的动作就放到js中操作即可
$(function () {
    //这里需要注意的是,prompt有两个参数,前面是提示的话,后面是当对话框出来后,在对话框里的默认值
    var username = "";
    while (true) {
        //弹出一个输入框,输入一段文字,可以提交
        username = prompt("请输入您的名字", ""); //将输入的内容赋给变量 name ,
        if (username.trim() === "")//如果返回的有内容
        {
            alert("名称不能输入空")
        } else {
            $("#username").text(username);
            break;
        }
    }

    var ws = new WebSocket("ws://localhost:8081/chat");
    ws.onopen = function () {
        console.log("连接成功.")
    }
    ws.onmessage = function (evt) {
        showMessage(evt.data);
    }
    ws.onclose = function (){
        console.log("连接关闭")
    }

    ws.onerror = function (){
        console.log("连接异常")
    }

    function showMessage(message) {
        // 张三:你好
        var str = message.split(":");
        $("#msg_list").append(`

  • RPC框架设计-4-Netty高级应用
    ${str[0]}
    ${str[1]}
  • `); // 置底 setBottom(); } $('#my_test').bind({ focus: function (event) { event.stopPropagation() $('#my_test').val(''); $('.arrow_box').hide() }, keydown: function (event) { event.stopPropagation() if (event.keyCode === 13) { if ($('#my_test').val().trim() === '') { this.blur() $('.arrow_box').show() setTimeout(() => { this.focus() }, 1000) } else { $('.arrow_box').hide() //发送消息 sendMsg(); this.blur() setTimeout(() => { this.focus() }) } } } }); $('#send').on('click', function (event) { event.stopPropagation() if ($('#my_test').val().trim() === '') { $('.arrow_box').show() } else { sendMsg(); } }) function sendMsg() { var message = $("#my_test").val(); $("#msg_list").append(`
  • ` + message + `
  • `); $("#my_test").val(''); //发送消息 message = username + ":" + message; ws.send(message); // 置底 setBottom(); } // 置底 function setBottom() { // 发送消息后滚动到底部 const container = $('.m-message') const scroll = $('#msg_list') container.animate({ scrollTop: scroll[0].scrollHeight - container[0].clientHeight + container.scrollTop() + 100 }); } });

    好啦,开发完成,我们只需要多打开几个页面换几个不同的名字就可以开始聊天啦

    五、Netty中粘包和拆包的解决方案 5.1 粘包和拆包简介

    粘包和拆包是TCP网络编程中不可避免的,无论是服务端还是客户端,当我们读取或者发送消息的时候,都需要考虑TCP底层的粘包/拆包机制。
    TCP是个“流”协议,所谓流,就是没有界限的一串数据。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。

    如图所示,假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到的字节数是不确定的,故可能存在以下4种情况。

    1. 服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有发生粘包和拆包
    2. 服务端一次接收到了两个数据包,D1和D2粘合在一起,被称为TCP粘包
    3. 如果D2的数据包比较大, 服务端分两次读取到了两个数据包,第一次读取到了完整的D1包和D2包 的部分内容,第二次读取到了D2包的剩余内容,这被称为TCP拆包
    4. 如果D1, D2的数据包都很大, 服务端分多次才能将D1和D2包接收完全,期间发生多次拆包

      TCP粘包和拆包产生的原因:
      数据从发送方到接收方需要经过操作系统的缓冲区,而造成粘包和拆包的主要原因就在这个缓冲区上。粘包可以理解为缓冲区数据堆积,导致多个请求数据粘在一起,而拆包可以理解为发送的数据大于缓冲区,进行拆分处理。
    5.2 用代码来演示一下粘包和拆包的效果

    上代码:

    1. 粘包-客户端
    package com.lagou.unpacking;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    
    public class NettyClient {
        public static void main(String[] args) throws InterruptedException {
            //1. 创建线程组
            EventLoopGroup group = new NioEventLoopGroup();
            //2. 创建客户端启动助手
            Bootstrap bootstrap = new Bootstrap();
            //3. 设置线程组
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)//4. 设置客户端通道实现为NIO
                    .handler(new ChannelInitializer() { //5. 创建一个通道初始化对象
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //6. 向pipeline中添加自定义业务处理handler
                            ch.pipeline().addLast(new NettyClientHandler());
                        }
                    });
            //7. 启动客户端,等待连接服务端,同时将异步改为同步
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9999).sync();
            //8. 关闭通道和关闭连接池
            channelFuture.channel().closeFuture().sync();
            group.shutdownGracefully();
        }
    }
    

    客户端处理通道就绪事件

    @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            for (int i = 0; i < 10; i++) {
                ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀.我是Netty客户端"+i+"$",
                        CharsetUtil.UTF_8));
            }
        }
    

    粘包-服务端

    package com.lagou.unpacking;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    
    public class NettyServer {
        public static void main(String[] args) throws InterruptedException {
            //1. 创建bossGroup线程组: 处理网络事件--连接事件
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            //2. 创建workerGroup线程组: 处理网络事件--读写事件 2*处理器线程数
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            //3. 创建服务端启动助手
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //4. 设置bossGroup线程组和workerGroup线程组
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class) //5. 设置服务端通道实现为NIO
                    .option(ChannelOption.SO_BACKLOG, 128)//6. 参数设置
                    .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)//6. 参数设置
                    .childHandler(new ChannelInitializer() { //7. 创建一个通道初始化对象
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //8. 向pipeline中添加自定义业务处理handler
                            ch.pipeline().addLast(new NettyServerHandler());
                        }
                    });
            //9. 启动服务端并绑定端口,同时将异步改为同步
            ChannelFuture future = serverBootstrap.bind(9999);
            future.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        System.out.println("端口绑定成功!");
                    } else {
                        System.out.println("端口绑定失败!");
                    }
                }
            });
            System.out.println("服务端启动成功.");
            //10. 关闭通道(并不是真正意义上关闭,而是监听通道关闭的状态)和关闭连接池
            future.channel().closeFuture().sync();
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
    

    服务端处理通道就绪事件

    @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf byteBuf = (ByteBuf) msg;
            System.out.println("客户端发送过来的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
            System.out.println("读取次数:"+(++count));
        }
    

    启动后效果:

    结论:客户端连续发送了10次,服务端一次读取了客户端发送过来的消息,应该读取10次. 因此发生粘包.

    1. 拆包-客户端
    package com.lagou.stickingbag;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    
    public class NettyClient {
        public static void main(String[] args) throws InterruptedException {
            //1. 创建线程组
            EventLoopGroup group = new NioEventLoopGroup();
            //2. 创建客户端启动助手
            Bootstrap bootstrap = new Bootstrap();
            //3. 设置线程组
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)//4. 设置客户端通道实现为NIO
                    .handler(new ChannelInitializer() { //5. 创建一个通道初始化对象
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //6. 向pipeline中添加自定义业务处理handler
                            ch.pipeline().addLast(new NettyClientHandler());
                        }
                    });
            //7. 启动客户端,等待连接服务端,同时将异步改为同步
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9999).sync();
            //8. 关闭通道和关闭连接池
            channelFuture.channel().closeFuture().sync();
            group.shutdownGracefully();
        }
    }
    

    客户端处理通道就绪事件

    @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            //一次发送102400
            char[] chars = new char[102400];
            Arrays.fill(chars, 0, 102398, 'a');
            chars[102399] = 'n';
            for (int i = 0; i < 10; i++) {
                ctx.writeAndFlush(Unpooled.copiedBuffer(chars, CharsetUtil.UTF_8));
            }
        }
    

    拆包-服务端

    package com.lagou.stickingbag;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    
    public class NettyServer {
        public static void main(String[] args) throws InterruptedException {
            //1. 创建bossGroup线程组: 处理网络事件--连接事件
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            //2. 创建workerGroup线程组: 处理网络事件--读写事件 2*处理器线程数
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            //3. 创建服务端启动助手
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //4. 设置bossGroup线程组和workerGroup线程组
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class) //5. 设置服务端通道实现为NIO
                    .option(ChannelOption.SO_BACKLOG, 128)//6. 参数设置
                    .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)//6. 参数设置
                    .childHandler(new ChannelInitializer() { //7. 创建一个通道初始化对象
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //8. 向pipeline中添加自定义业务处理handler
                            ch.pipeline().addLast(new NettyServerHandler());
                        }
                    });
            //9. 启动服务端并绑定端口,同时将异步改为同步
            ChannelFuture future = serverBootstrap.bind(9999);
            System.out.println("服务端启动成功.");
            //10. 关闭通道(并不是真正意义上关闭,而是监听通道关闭的状态)和关闭连接池
            future.channel().closeFuture().sync();
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
    
    

    服务端处理通道就绪事件

    @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf byteBuf = (ByteBuf) msg;
            System.out.println("长度是:" + byteBuf.readableBytes());
            System.out.println("读取次数 = " + (++count));
        }
    

    启动看下效果:

    结论:当客户端发送的数据包比较大的时候, 应该读取10次, 读取了18次, 所以很明显发生了拆包事件.

    5.3 粘包和拆包的解决方法
    1. 业内解决方案
      由于底层的TCP无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案,可以归纳如下。
      1)消息长度固定,累计读取到长度和为定长LEN的报文后,就认为读取到了一个完整的信息
      2)将换行符作为消息结束符
      3)将特殊的分隔符作为消息的结束标志,回车换行符就是一种特殊的结束分隔符
      4)通过在消息头中定义长度字段来标识消息的总长度

    2. Netty中的粘包和拆包解决方案
      Netty提供了4种解码器来解决,分别如下:
      1)固定长度的拆包器 FixedLengthFrameDecoder,每个应用层数据包的都拆分成都是固定长度的大小
      2)行拆包器 LineBasedFrameDecoder,每个应用层数据包,都以换行符作为分隔符,进行分割拆分
      3)分隔符拆包器 DelimiterBasedFrameDecoder,每个应用层数据包,都通过自定义的分隔 符,进行分割拆分
      4)基于数据包长度的拆包器 LengthFieldBasedFrameDecoder,将应用层数据包的长度,作为接收端应用层数据包的拆分依据。按照应用层数据包的大小,拆包。这个拆包器,有一个要求,就是应用层协议中包含数据包的长度

    3. 代码实现
      1)LineBasedFrameDecoder解码器(行拆包器)
      添加解码器,并指定接收的消息的长度最大是2048

    // 添加解码器
    ch.pipeline().addLast(new LineBasedFrameDecoder(2048));
    

    代码中添加换行符

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for (int i = 0; i < 10; i++) {
            ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀.我是Netty客户端"+i+"n",
                    CharsetUtil.UTF_8));
        }
    }
    

    2)DelimiterBasedFrameDecoder解码器(分隔符拆包器)
    添加解码器,并指定接收的消息的长度最大是2048

    // 添加解码器
    ByteBuf byteBuf = Unpooled.copiedBuffer("$".getBytes(StandardCharsets.UTF_8));
    ch.pipeline().addLast(new DelimiterBasedFrameDecoder(2048, byteBuf));
    

    代码中添加’$‘作为分隔符

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for (int i = 0; i < 10; i++) {
            ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀.我是Netty客户端"+i+"",
                    CharsetUtil.UTF_8));
        }
    }
    

    粘包和拆包的改造都差不多,但是都是需要对消息进行改造的

    转载请注明:文章转载自 www.mshxw.com
    我们一直用心在做
    关于我们 文章归档 网站地图 联系我们

    版权所有 (c)2021-2022 MSHXW.COM

    ICP备案号:晋ICP备2021003244-6号