栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Netty学习笔记(2) Netty入门 - 概述和组件

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Netty学习笔记(2) Netty入门 - 概述和组件

文章目录

前言1. 概述

1. Netty是什么2. Netty的作者和地位3. Netty的优势 2. Hello World

2.1 目标2.2 流程分析2.3 加深理解代码 3. 组件

3.1 EventLoop

1. 事件循环组:EventLoopGroup2. 一个简单的案例:处理普通事件3. 一个简单的案例:处理io事件4. 一个简单的案例:处理io事件改进5. handler执行过程中如何进行换人 3.2 Channel

1. 一些方法2. ChannelFuture-连接问题3. ChannelFuture-处理结果4. ChannelFuture-关闭问题5. ChannelFuture-处理关闭 3.3 Future & Promise

1. JDK中的Future2. netty-future3. netty-promise 3.4 Handler 和 Pipeline

1. InboundHandler 和 OutboundHandler2.入站出站的一些细节问题 3.5 一个调试工具类 EmbeddedChannel3.6 ByteBuf

1. 测试创建2. 直接内存 VS 堆内存3. 池化 VS 非池化4. 组成(4部分)5. 写入6. 扩容7. 读取8. retain 和 release9. slice10. duplicate11. copy12. CompositeByteBuf13. Unpooled14. ByteBuf 的优势


前言

笔记基于黑马的Netty教学讲义加上自己的一些理解,感觉这是看过的视频中挺不错的,基本没有什么废话,视频地址:黑马Netty


1. 概述 1. Netty是什么
Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.

Netty 是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端。

注意:这里的异步不要和异步IO混淆,这里的异步指Netty使用多线程来完成结果传送,让发送请求的线程不阻塞。



2. Netty的作者和地位




3. Netty的优势



2. Hello World 2.1 目标

并发一个简单的服务器和客户端

客户端向服务器发送Hello World服务器仅接受,不返回

加入依赖

 
    
      io.netty
      netty-all
      4.1.51.Final
    

服务器:

public class HelloServer {
    public static void main(String[] args) {
        //1.服务器端的启动器,负责组装Netty组件,启动服务器
        new ServerBootstrap()
                //2. BossEvenLoop,WorkEventLoop(selector,thread:一个线程一个selector)
                //group:组
                .group(new NioEventLoopGroup())
                //3. 选择服务器的serverSocketChannel实现,netty还是支持很多时间的,OIO:BIO
                .channel(NioServerSocketChannel.class)
                //4. boss:负责建立连接    worker(child):负责处理读写
                //下面这段代码决定了worker能做什么事(handler)
                .childHandler(
                        //ChannelInitializer:跟客户端连接后进行数据读写的通道,负责添加别的handler
                        new ChannelInitializer() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        //6. 添加具体的handler
                        ch.pipeline().addLast(new StringDecoder()); //将传输过来的ByteBuf转换为字符串
                        //自定义handler
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                            @Override   //channelRead:读时间
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                //打印传输过来的消息,这个消息是上一步转换好的字符串
                                System.out.println(msg);
                            }
                        });
                    }
                })
                //7. 绑定监听端口
                .bind(8080);
    }
}

客户端:

public class HelloClient {
    public static void main(String[] args) throws InterruptedException {
        //1. 创建启动器
        new Bootstrap()
                //2. 添加选择器EventLoop
                .group(new NioEventLoopGroup())
                //3. 选择一个客户端的channel实现
                .channel(NioSocketChannel.class)
                //4. 添加处理器
                .handler(new ChannelInitializer() {
                    @Override   //连接建立后会调用这个方法
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        //添加一个编码器
                        ch.pipeline().addLast(new StringEncoder());

                    }
                })
                //连接到服务器
                .connect(new InetSocketAddress("localhost", 8080))
                .sync()
                .channel()
                .writeAndFlush("hello world");
    }
}



2.2 流程分析

    首先服务器 创建启动器ServerBootstrap在启动器中添加一些EventLoop,这些EventLoop就是不断循环监听不同类型事件的选择服务器的serverSocketChannel实现添加处理器,这里添加一个初始化通道的处理器:ChannelInitializer,负责添加其他的处理器(handler)绑定端口是多少客户端开始建立创建启动器Bootstrap进行连接在启动器中添加一些EventLoop,这些EventLoop就是不断循环监听不同类型事件的,客户端也有这种东西选择一个客户端的channel实现,用于给服务器发数据的添加处理器,同样添加一个连接建立后的处理器在处理器中添加一个StringEncoder(),把发送的消息转化成字节码客户端连接端口.sync()中客户端的sync在保证了在连接之后才可以向下运行.channel()中获取到发送数据的通道发送数据Hello World服务器其中某个EventLoop接收到read事件由StringDecoder解码器进行对发送过来的数据解码最后服务器调用channelRead方法输出发送过来的数据



2.3 加深理解代码

一开始就要树立正确的通道

把channel理解为数据的通道,可读可写把msg理解为流动的数据,最开始是ByteBuf,但经过pipeline(流水线)的加工。会变成其他类型的对象,最后输出又变成ByteBuf把handler理解为数据的处理工序
1、工序有多道,合成在一起就是pipeline负责发布事件(读、读取完成…)传播给每个handler,handler对自己感兴趣的事件进行处理(重写了相应的事件处理方法)
2、handler分 Inbound(入栈) 和 Outbound(出栈) 两类把eventLoop(内部包含线程和选择器理解为处理数据的工人
1、工人可以管理多个channel的io操作(内部有selector),并且一旦工人负责了某个channel,就要负责到底(绑定),类似accept,read、writeAble…目的是可**以保证线程安全。**一个工人可以管多个channel,但是多个channel对应一个工人。
2、工人既可以执行IO操作,也可以进行任务处理,每位工人有任务队列,队列里面可以堆放很多个channel的待处理任务,任务分为普通任务和定时任务。(底层是单线程的线程池)
2、工人按照pipeline顺序,依次按照 handler 的规划(代码)处理数据,可以为每道工序指定不同的工人



3. 组件 3.1 EventLoop

直接翻译为事件循环对象:
EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理Channel上源源不断的 io 事件。

它的继承关系比较复杂:

一条线继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法另一条线继承自 netty 自己的 OrderedEventExecutor(有顺序的)
1、提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop
2、提供了parent() 方法来看看自己属于哪个 EventLoopGroup
1. 事件循环组:EventLoopGroup

EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全),注意这里是IO对象,其他事件是可以换EventLoop的。

继承自 netty 自己的 EventExecutorGroup
1、实现了 Iterable 接口提供遍历 EventLoop 的能力
2、另有 next 方法获取集合中下一个 EventLoop
2. 一个简单的案例:处理普通事件

@Slf4j
public class TestEventLoop {
    public static void main(String[] args) {
        //1. 创建事件循环组
        
        EventLoopGroup group = new NioEventLoopGroup(2);//io事件,普通任务,定时任务
        //EventLoopGroup group = new DefaultEventLoopGroup(); 这种能处理普通任务,定时任务

        //2. 获取下一个事件循环对象,有一个轮循的效果
        System.out.println(group.next());
        System.out.println(group.next());
        System.out.println(group.next());
        //io.netty.channel.nio.NioEventLoop@5c0369c4
        //io.netty.channel.nio.NioEventLoop@2be94b0f
        //io.netty.channel.nio.NioEventLoop@5c0369c4

        //3. 执行普通方法
        group.next().submit(()->{
            //提交这个事件给group里面的其他线程去处理
            log.debug("ok");
        });
        log.debug("main");

        //4. 定时任务
        group.next().scheduleAtFixedRate(()->{
            log.debug("okok");
        }, 1, 1, TimeUnit.SECONDS);

        //System.out.println(NettyRuntime.availableProcessors() );    //获取CPU核数8
    }
}

3. 一个简单的案例:处理io事件

服务端:

@Slf4j
public class EventLoopServer {
    public static void main(String[] args) {
        new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                            @Override                       //ByteBuf
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                log.debug(buf.toString(Charset.defaultCharset()));
                            }
                        });

                    }
                })
                .bind(8080);
    }
}

客户端:

public class EventGroupClint {
    public static void main(String[] args) throws InterruptedException {
        //1. 创建启动器
        Channel localhost = new Bootstrap()
                //2. 添加选择器EventLoop
                .group(new NioEventLoopGroup())
                //3. 选择一个客户端的channel实现
                .channel(NioSocketChannel.class)
                //4. 添加处理器
                .handler(new ChannelInitializer() {
                    @Override   //连接建立后会调用这个方法
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        //添加一个编码器
                        ch.pipeline().addLast(new StringEncoder());

                    }
                })
                //连接到服务器
                .connect(new InetSocketAddress("localhost", 8080))
                .sync()
                .channel();

        System.out.println(localhost);
        System.out.println("");
    }
}

结论:可以发现,对于同一个channel上的发送请求,是由EventLoop中的指定的线程来执行的,也就是绑定起来了。channel和一个ExentLoop绑定起来了。但是注意一个线程可以管多个channel,这两不是双向的。


4. 一个简单的案例:处理io事件改进

细分1:boss 和 worker细分2:创建一个新的EventLoopGroup专门处理那些耗时较长的操作,不让NioEventLoopGroup去执行,避免处理一个导致后面全部消息都被阻塞住

@Slf4j
public class EventLoopServer {
    public static void main(String[] args) {
        //细分2:创建一个新的EventLoopGroup专门处理那些耗时较长的操作
        EventLoopGroup group = new DefaultEventLoopGroup();

        new ServerBootstrap()
                //细分1:boss 和 worker
                //参数1:boss,只负责处理accept事件,不需要特意把线程数设置成1,因为对于accept这一种类型的,只会创建一个线程来处理
                //参数2:worker,只负责socketChannel上的读写,线程数根据自己的情况来写
                .group(new NioEventLoopGroup(), new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter(){
                            @Override                       //ByteBuf
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                log.debug(buf.toString(Charset.defaultCharset()));
                                //将消息传递给下一个handler
                                ctx.fireChannelRead(msg);
                            }
                        }).addLast(group, "handler2", new ChannelInboundHandlerAdapter(){
                            @Override                       //ByteBuf
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                log.debug(buf.toString(Charset.defaultCharset()));
                            }
                        });
                    }
                })
                .bind(8080);
    }
}

可以看到在ChannelInboundHandlerAdapter那里,第一次是使用nioEventLoopGroup里面的线程来执行的,而第二个是给定义了一个DefaultEventLoopGroup,这么说第二次执行的线程就是DefaultEventLoopGroup里面的线程,而不是nioEventLoopGroup里面的线程,从结果图也可以看出来:

下面这个图说明了,一开始NioGroup里面的粉色和紫色的EventGroup和对于的Channel绑定了,但是在中途添加handler方法的时候,使用了一个DefaultEventGroup(绿色)来进行操作,这时候Channel同时由Nio的EventGroup和DefaultGroup同时管理。



5. handler执行过程中如何进行换人

我们进行源码的分析:

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    //下一个handler的事件循环是否与当前事件循环是同一个线程
    //返回的executor是下一个handler的eventLoop,因为它是继承EventExecutor 的,所以名字不同不用在意
    EventExecutor executor = next.executor();
    
   //所以如果两个handler绑定的是同一个线程,那么就可以直接调用
   //如果绑定的是不同的线程,那就要把要调用的代码放在一个Runnable对象中传给下一个线程调用
   
    //当前handler中的线程是否和executor(下一个)的是同一个线程
    if (executor.inEventLoop()) {
    	//让下一个handler也在同一个线程内执行
        next.invokeChannelRead(m);
    } else {
        //否则,将要执行的代码作为任务提交给下一个事件循环处理(换人)
        //executor是下一个handler的线程,这个线程去执行当前的方法
        executor.execute(new Runnable() {
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}



3.2 Channel 1. 一些方法

Channel 的主要作用

close() 可以用来关闭Channel

closeFuture() 用来处理 Channel 的关闭
1、sync 方法作用是同步等待 Channel关闭
2、而 addListener 方法是异步等待 Channel 关闭

pipeline() 方法添加处理器

write() 方法将数据写入,不会立刻就发出,而是会写入缓冲区,当缓冲区的数据满了或者调用了flush() 方法的时候就会立刻发出。

writeAndFlush() 方法将数据写入并立刻刷出



2. ChannelFuture-连接问题

channelFuture.sync() 方法的问题

public class EventGroupClint {
    public static void main(String[] args) throws InterruptedException {
        //1. 创建启动器
        ChannelFuture channelFuture = new Bootstrap()
                //2. 添加选择器EventLoop
                .group(new NioEventLoopGroup())
                //3. 选择一个客户端的channel实现
                .channel(NioSocketChannel.class)
                //4. 添加处理器
                .handler(new ChannelInitializer() {
                    @Override   //连接建立后会调用这个方法
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        //添加一个编码器
                        ch.pipeline().addLast(new StringEncoder());

                    }
                })
                //连接到服务器
                //异步非阻塞:main线程发起调用,真正执行 connect 是 nio 线程
                .connect(new InetSocketAddress("localhost", 8080));
        //sync如果不加上,那么writeAndFlush不起作用,因为上面的connect是异步执行的
        //所以 channelFuture.channel()是拿不到的,因为此时connect还没执行完成
        //channel拿不到,不知道发到哪去
        channelFuture.sync();
        Channel channel = channelFuture.channel();
        channel.writeAndFlush("hello flash");
    }
}



3. ChannelFuture-处理结果

channelFuture,类似于带有这种Future,promise 的类型的都是和异步方法配合使用,用来处理结果

方法1. 使用sync方法同步处理结果: channelFuture.sync()
运行到这个方法的时候就会阻塞住等待运行,等到connect方法运行结束,nio线程连接建立完毕,这时候就可以继续向下运行拿到channel了。注意这里发起连接请求和等待连接请求的都是主线程,只有中间连接部分是nio线程去执行的。


方法2. 使用addListener方法异步处理结果
使用这个方法之后,等待连接建立和连接建立成功都交给其他线程,使用channelFuture.addListener方法,等nio连接执行完成之后,就会调用里面的operationComplete方法,再写入就行了。main线程就可以当作是一个甩手掌柜,只是发起连接请求,获取连接和调用写出方法发送数据都是其他线程来干的。

   channelFuture.addListener(new ChannelFutureListener() {
            //在nio线程那边连接建立好了以后会调用这个方法
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Channel channel = future.channel();
                channel.writeAndFlush("hello flash");
            }
        });



4. ChannelFuture-关闭问题

入下面代码:如何在channel调用close() 方法后会执行 log.debug(“处理关闭后的操作”); 这行代码,下面注释给了两种错误的方法:

在主线程最后执行,显然不可以,因为主线程一连接上就立刻执行了在其他线程内执行,也不行,因为close是异步的不是阻塞的,是由nio线程执行的,所以close还没结束就已经执行这行代码了

@Slf4j
public class CloseFutureClint {
    public static void main(String[] args) throws InterruptedException {
        ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer() {
                    @Override   //连接建立后会调用这个方法
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        //添加一个编码器
                        ch.pipeline().addLast(new StringEncoder());
                        
                    }
                })
                .connect(new InetSocketAddress("localhost", 8080));

        channelFuture.sync();
        Channel channel = channelFuture.channel();
        new Thread(()->{
            Scanner scanner = new Scanner(System.in);
            while(true){
                String line = scanner.nextLine();
                if("q".equals(line)){
                    channel.close();
                    //这里也不行,因为close方法也是异步的,有可能还没close,就已经执行了
                    //log.debug("处理关闭后的操作");
                    break;
                }
                channel.writeAndFlush(line);
            }
        }, "input").start();

        //处理关闭后的,这里操作不行,因为主线程一上来就运行这行代码了,达不到关闭的效果
        //log.debug("处理关闭后的操作");
    }
}



5. ChannelFuture-处理关闭

第一种:解决方法

调用ChannelFuture closeFuture = channel.closeFuture();获取closeFuturecloseFuture.sync(), 这种方法就能让主线程阻塞住等到nio线程执行关闭了之后才继续运行,而关闭后的处理工作是主线程完成的。closeFuture.addListener,这种方法是异步执行的,由nio线程关闭之后立刻去执行里面的方法,此时关闭和执行里面的代码的线程是同一个。

@Slf4j
public class CloseFutureClint {
    public static void main(String[] args) throws InterruptedException {
        ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer() {
                    @Override   //连接建立后会调用这个方法
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        //添加一个编码器
                        ch.pipeline().addLast(new StringEncoder());
                        ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                    }
                })
                .connect(new InetSocketAddress("localhost", 8080));

        channelFuture.sync();
        Channel channel = channelFuture.channel();
        new Thread(()->{
            Scanner scanner = new Scanner(System.in);
            while(true){
                String line = scanner.nextLine();
                if("q".equals(line)){
                    channel.close();
                    //这里也不行,因为close方法也是异步的,有可能还没close,就已经执行了
                    //log.debug("处理关闭后的操作");
                    break;
                }
                channel.writeAndFlush(line);
            }
        }, "input").start();

        //处理关闭后的,这里操作不行,因为主线程一上来就运行这行代码了,达不到关闭的效果
        //log.debug("处理关闭后的操作");

        //获取CloseFuture对象
        //1、同步处理关闭
        //2、异步处理关闭
        ChannelFuture closeFuture = channel.closeFuture();
        System.out.println("waiting close......");
        

        
        closeFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                log.debug("处理关闭后的操作");
            }
        });

    }
}

问题:close关闭之后程序没有结束,而是继续运行,怎么处理?
因为 NioEventLoopGroup 中还有一些其他的线程没有结束,所以程序还在运行。那么我们在close之后就要手动关闭了。

解决:

NioEventLoopGroup group = new NioEventLoopGroup()单独建立在关闭之后调用 group.shutdownGracefully()进行关闭其他nio线程,程序结束

关闭的流程:此时NioEventLoopGroup中拒绝再接收新的任务,并把剩下的任务执行完之后线程就停止。

@Slf4j
public class CloseFutureClint {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();
        ChannelFuture channelFuture = new Bootstrap()
                .group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer() {
                    @Override   //连接建立后会调用这个方法
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        //添加一个编码器
                        ch.pipeline().addLast(new StringEncoder());
                        ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                    }
                })
                .connect(new InetSocketAddress("localhost", 8080));

        channelFuture.sync();
        Channel channel = channelFuture.channel();
        new Thread(()->{
            Scanner scanner = new Scanner(System.in);
            while(true){
                String line = scanner.nextLine();
                if("q".equals(line)){
                    channel.close();
                    //这里也不行,因为close方法也是异步的,有可能还没close,就已经执行了
                    //log.debug("处理关闭后的操作");
                    break;
                }
                channel.writeAndFlush(line);
            }
        }, "input").start();

        //处理关闭后的,这里操作不行,因为主线程一上来就运行这行代码了,达不到关闭的效果
        //log.debug("处理关闭后的操作");

        //获取CloseFuture对象
        //1、同步处理关闭
        //2、异步处理关闭
        ChannelFuture closeFuture = channel.closeFuture();
        System.out.println("waiting close......");
        

        
        closeFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                log.debug("处理关闭后的操作");
                //优雅地关闭
                group.shutdownGracefully();
            }
        });

    }
}

为什么netty内部要用这么多的异步,其实有点像CPU指令流水线内味,每个线程都执行自己的工作,效率会快很多。


3.3 Future & Promise

在异步调用的时候,经常会用到这两个接口

首先要说明 netty 中的 Future 接口和 jdk 中的 Future 接口同名,但是是两个接口,netty中的Future接口继承自JDK中的Future,而Promise 又对 netty Future 进行了扩展。

promise -> netty Future -> JDK Future

jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器

功能/名称jdk Futurenetty FuturePromise
cancel取消任务--
isCanceled任务是否取消-
isDone任务是否完成,不能区分成功失败-
get获取任务结果,阻塞等待--
getNow获取任务结果,非阻塞,还未产生结果时返回 null-
await-等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断-
sync-等待任务结束,如果任务失败,抛出异常-
isSuccess-判断任务是否成功
cause-获取失败信息,非阻塞,如果没有失败,返回null-
addLinstener-添加回调,异步接收结果-
setSuccess--设置成功结果
setFailure--设置失败结果



1. JDK中的Future
@Slf4j
public class TestJdkFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        //1. 线程池
        ExecutorService service = Executors.newFixedThreadPool(2);

        //2. 提交任务,返回值是一个Future对象
        Future submit = service.submit(new Callable() {

            @Override
            public Integer call() throws Exception {
                log.debug("执行计算");
                Thread.sleep(1000);
                return 50;
            }
        });

        //2. 主线程通过Future获取结果, 同步等待
        log.debug("等待结果");
        log.debug("结果是{}", submit.get());
        

结果:Future执行结果填入是由那个执行任务的线程去填的,我们没有机会去填



2. netty-future

一个特点是io线程执行计算的可以用io线程来获取结果。

@Slf4j
public class TestNettyFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();

        EventLoop eventLoop = group.next();

        Future future = eventLoop.submit(new Callable() {
            @Override
            public Integer call() throws Exception {
                log.debug("执行计算");
                Thread.sleep(1000);
                return 70;
            }
        });

        //log.debug("等待结果");
        //log.debug("结果是{}", future.get());
        //DEBUG [main] (22:16:31,658) (TestNettyFuture.java:35) - 等待结果
        //DEBUG [nioEventLoopGroup-2-1] (22:16:31,659) (TestNettyFuture.java:29) - 执行计算
        //DEBUG [main] (22:16:32,669) (TestNettyFuture.java:36) - 结果是70

        future.addListener(new GenericFutureListener>() {
            @Override
            public void operationComplete(Future future) throws Exception {
                log.debug("接收结果:{}", future.getNow());
                //io线程执行io线程拿结果
                //DEBUG [nioEventLoopGroup-2-1] (22:20:37,192) (TestNettyFuture.java:30) - 执行计算
                //DEBUG [nioEventLoopGroup-2-1] (22:20:38,196) (TestNettyFuture.java:45) - 接收结果:70
            }
        });


    }
}



3. netty-promise

我们可以主动设置一个 promise 对象,然后往 promise 里面进行写入线程执行的结果,我们就可以在外部获取到这个结果。

@Slf4j
public class TestNettyPromise {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1. 准备一个EventLoop对象
        EventLoop eventLoop = new NioEventLoopGroup().next();
        //2. 可以主动创建promise对象, promise就是一个结果的容器
        DefaultPromise promise = new DefaultPromise<>(eventLoop);

        new Thread(()->{
            //3. 任意一个线程执行计算完毕,计算完毕之后向promise 填充结果
            log.debug("开始计算");
            try {
                int i = 1/0;
                Thread.sleep(1000);
            } catch (Exception e) {
                //设置失败的返回结果
                promise.setFailure(e);
                e.printStackTrace();
            }
            //设置成功的返回结果
            promise.setSuccess(80);
        }).start();

        //4. 接收结果的线程
        log.debug("等待结果...");
        log.debug("结果是:{}", promise.get());

        //DEBUG [main] (23:19:10,786) (TestNettyPromise.java:37) - 等待结果...
        //DEBUG [Thread-0] (23:19:10,786) (TestNettyPromise.java:27) - 开始计算
        //DEBUG [main] (23:19:11,799) (TestNettyPromise.java:38) - 结果是:80
    }
}



3.4 Handler 和 Pipeline

ChannelHandler 用来处理 Channel 上的各种事件,分为入站,出站两种,所有 ChannelHandler 被连成一串,就是Pipeline

入站处理器通常是 ChannelInboundHandlerAdapter的子类,主要用来读取客户端数据,写回数据出站处理器通常是 ChannelOutboundHandlerAdapter 的子类,主要用于对写回结果进行加工

打个比喻,每个 Channel 是一个产品的加工车间,Pipeline 是车间中的流水线, ChannelHandler 就是流水线上的各道工序,而后面要讲的 ByteBuf 是原材料,经过很多工序的加工:先经过一道道入站工序,再经过一道道出站工序最终变成产品。



1. InboundHandler 和 OutboundHandler

有几个要说明的点

在调用 pipeline.addLast 的时候可以在第一个参数给这个处理器起名字在获取到 pipeline 的时候,pipeline 中已经有两个处理器了,一个是head,一个是tail,顺序是 head -> tail调用 pipeline.addLast 方法的时候,往 pipeline 的 tail前面去加,比如加入了3个名为h1, h2, h3的处理器,那么这时候的链子就是 head -> h1 -> h2 -> h3 ->tail对于入站处理器 ChannelInboundHandlerAdapter,调用 super.channelRead(ctx, name) 这个方法意思是往下一个处理器传送数据,比如从h1传到h2.如果中途某一个处理器没有调用super方法,那么处理器链就断了要调用出站的处理器,就要在入栈处理器里面调用 write 相关的方法对于入站处理器打印的顺序是h1 -> h2 -> h3,顺序处理对于出站处理器调用 ch.writeAndFlush() 打印的顺序是 h3->h2->h1, 逆序处理

@Slf4j
public class TestPipeline {
    public static void main(String[] args) {
        new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        //1. 通过channel 拿到 Pipeline
                        ChannelPipeline pipeline = ch.pipeline();
                        //2. 添加处理器
                        //注意:netty 会帮我们自动添加两个handler
                        //head -> tail
                        //所以pipeline.addLast()是把handler加入到tail的前一个
                        //第一个参数可以起名字
                        //head ->h1 ->h2 ->h3 ->h4 ->h5-> tail
                        //注意:这里打印是 1-2-3-6-5-4,出站顺序是从tail往前的
                        pipeline.addLast("h1", new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                //入站处理器处理channelRead方法
                                log.debug("1");
                                ByteBuf buf = (ByteBuf)msg;
                                String name = buf.toString(Charset.defaultCharset());
                                //super.channelRead(ctx, msg) 意思是调用下一个handler处理,参数可以放在msg的位置传给下一个handler,这里是传给2
                                super.channelRead(ctx, name);
                            }
                        });

                        pipeline.addLast("h2", new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                log.debug("2");
                                String student = new String(msg.toString());
                                //传给3
                                //这里要么调用 ctx.fireChannelRead(msg);,要么调用super.channelRead(ctx, student);
                                //不调用链子就断了,数据就传不到下一个handler了
                                super.channelRead(ctx, student);

                            }
                        });

                        pipeline.addLast("h3", new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                log.debug("3,结果:{}, class:{}", msg, msg.getClass());
                                //这里就没必要调用channelRead了,因为都没有下一个入站处理器了
                                //super.channelRead(ctx, msg);
                                ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server".getBytes(StandardCharsets.UTF_8)));
                            }
                        });
                        //注意:出站处理器只有等到入站处理器调用了write方法才执行
                        pipeline.addLast("h4", new ChannelOutboundHandlerAdapter(){
                            //出站处理器重写write方法
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                log.debug("4");
                                super.write(ctx, msg, promise);
                            }
                        });

                        pipeline.addLast("h5", new ChannelOutboundHandlerAdapter(){
                            //出站处理器重写write方法
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                log.debug("5");
                                super.write(ctx, msg, promise);
                            }
                        });
                    }
                })
                .bind(8080);
    }

    //结果输出:
    //DEBUG [nioEventLoopGroup-2-2] (23:54:54,839) (TestPipeline.java:46) - 1
    //DEBUG [nioEventLoopGroup-2-2] (23:54:54,841) (TestPipeline.java:57) - 2
    //DEBUG [nioEventLoopGroup-2-2] (23:54:54,841) (TestPipeline.java:67) - 3,结果:张三, class:class java.lang.String
    //DEBUG [nioEventLoopGroup-2-2] (23:54:54,842) (TestPipeline.java:87) - 5
    //DEBUG [nioEventLoopGroup-2-2] (23:54:54,842) (TestPipeline.java:78) - 4

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    static class Student{
        private String name;
    }
}



2.入站出站的一些细节问题

ctx.channel().write(msg) vs ctx.write(msg)

都是出栈处理器调用 ctx.channel().write(msg) 的时候会从tail往前找出站处理器来处理调用 ctx.write(msg) 会从当前处理器往前找出站处理器来处理处理器连接的方式是双向链表

比如:有这么一张图,里面有 1, 2, 3, 4, 5 ,6

    如果从3开始调用 ctx.channel().write(msg),那么就打印 1-2-3-6-5-4如果从3开始调用 ctx.write(msg) 会打印 1-2-3,因为从3往前找已经没有出站处理器了



3.5 一个调试工具类 EmbeddedChannel
@Slf4j
public class TestEmbeddedChannel {
    public static void main(String[] args) {
        ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter(){
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                log.debug("1");
                super.channelRead(ctx, msg);
            }
        };
        ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter(){
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                log.debug("2");
                super.channelRead(ctx, msg);
            }
        };
        ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter(){
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                log.debug("3");
                super.write(ctx, msg, promise);
            }
        };
        ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter(){
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                log.debug("4");
                super.write(ctx, msg, promise);
            }
        };

        EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
        //模拟入站操作
        channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));
        //1 - 2
        //模拟出站操作
        channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("world".getBytes()));
        //4 - 3
    }
}



3.6 ByteBuf

是对字节数据的封装,是 Nio 中 ByteBuffer 的加强



1. 测试创建

ByteBuf 初始化容量是 256, 会自动扩容成原来的两倍我们可以自定义初始容量和最大容量

public class TestByteBuf {
    public static void main(String[] args) {
        ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(16);
        log(buf);
        //PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 256)
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < 17; i++) {
            sb.append("a");
        }
        buf.writeBytes(sb.toString().getBytes());
        log(buf);
        //PooledUnsafeDirectByteBuf(ridx: 0, widx: 300, cap: 512)
    }

    private static void log(ByteBuf buffer) {
        int length = buffer.readableBytes();
        int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
        StringBuilder buf = new StringBuilder(rows * 80 * 2)
                .append("read index:").append(buffer.readerIndex())
                .append(" write index:").append(buffer.writerIndex())
                .append(" capacity:").append(buffer.capacity())
                .append(newline);
        appendPrettyHexDump(buf, buffer);
        System.out.println(buf.toString());
    }
}



2. 直接内存 VS 堆内存

可以使用下面的代码来创建池化基于堆的 ByteBuf(Java内存)

ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10);

也可以使用下面的代码来创建池化基于直接内存的 ByteBuf(系统内存)

ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10);

直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用,详情看第一章的Nio直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放



3. 池化 VS 非池化

ByteBuf 中支持池化的管理,池化的最大意义在于可以重用 ByteBuf,对于那些创建比较慢的资源我们可以用池的资源进行优化,优点:

我们预先创建好多个 ByteBuf 实例, 在创建的时候就直接调用就行,这样的好处就是节省内存,因为 Java 中创建 ByteBuf 默认是直接内存,但是要知道直接内存是存在于操作系统中的,创建的代价是很高的,就算是堆内存,不断地创建 ByteBuf 实例也只会增添 GC 回收对象的压力。有了池化,则可以重用池中 ByteBuf 实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率高并发的时候采用池化技术更能节省内存,减少内存溢出的风险。

我们可以通过以下方式来决定是否开启池化功能:在 VM 参数中设置

-Dio.netty.allocator.type={unpooled|pooled}

4.1 以后,非 Android 平台默认启用池化实现,Android 平台启用非池化实现4.1 之前,池化功能还不成熟,默认是非池化实现

证明创建的ByteBuf默认是直接内存:当然也可以测试其他的内存

ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
System.out.println(buf.getClass());
//class io.netty.buffer.PooledUnsafeDirectByteBuf



4. 组成(4部分)


最开始读写指针都在 0 位置,然后写指针开始写入,比如写指针写到了 index 的位置,那么此时 0 -index 位置的字节叫做可读字节,读指针开始从 0 读取,到index停止,假设这时候读到了 read 的位置,此时读指针在 read 的位置, 那么从0 - read 这部分被称为废弃字节。

好处:

分为了读写指针,这样就不用像 byteBuffer 那样不停切换读写模式了自动扩容,不用我们手动去扩容了



5. 写入

方法列表,省略一些不重要的方法

方法签名含义备注
writeBoolean(boolean value)写入 boolean 值用一字节 01|00 代表 true|false
writeByte(int value)写入 byte 值
writeShort(int value)写入 short 值
writeInt(int value)写入 int 值Big Endian(大端写入),即 0x250,写入后 00 00 02 50 ,低位靠后
writeIntLE(int value)写入 int 值Little Endian(低位写入),即 0x250,写入后 50 02 00 00 ,高位靠后
writeLong(long value)写入 long 值
writeChar(int value)写入 char 值
writeFloat(float value)写入 float 值
writeDouble(double value)写入 double 值
writeBytes(ByteBuf src)写入 netty 的 ByteBuf
writeBytes(byte[] src)写入 byte[]
writeBytes(ByteBuffer src)写入 nio 的 ByteBuffer
int writeCharSequence(CharSequence sequence, Charset charset)写入字符串

注意

这些方法的未指明返回值的,其返回值都是 ByteBuf,意味着可以链式调用网络传输,默认习惯是 Big Endian,大端写入,就好比 0x1 -> 0000 0000 0000 0000 0000 0000 0000 0001

测试其中一些方法:可以看到写入 int 之后从高位开始为00 00 00 05,大端输入,不要误以为里面的 0 是低位,这只是调试工具。

还有一类方法是 set 开头的一系列方法,也可以写入数据,但不会改变写指针位置



6. 扩容

比如我们开始设置了一个6容量的数据,此时写入之后发生了扩容

扩容的机制和HashMap的扩容长度一样,扩容成比写入后的数据长度大的 2的n次方 倍比如写入之后大小是12,就扩容成16,写入之后的大小是512,就扩容成1024注意扩容长度不能超过扩容最大容量(可以由自己一开始就设定),超过了就会报错

我们拿上面 5 举例子,添加一个 6后发生扩容,扩容后的打印:

read index:0 write index:12 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 00 00 00 06             |............    |
+--------+-------------------------------------------------+----------------+



7. 读取

在上面的基础上,已经有12个字节了,下面就测试读取4次:

System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
log(buffer);




这时候读指针来到了4,0-3的内容就是废弃字节了,读指针不再关注这一部分,这时候我们如果想要重复读取5,也可以像Nio里面的ByteBuffer那样设置一个标记位:

buffer.markReaderIndex();
System.out.println(buffer.readInt());
log(buffer);




这时候调用 buffer.resetReaderIndex() 方法可以让指针从8回到4,那么此时就可以再次读取4-7的内容了,读出来的还是5.




注意:还有种办法是采用 get 开头的一系列方法,这些方法不会改变 read index,因为这类方法通常都是按照索引去获取的,所以不会改变读指针的位置



8. retain 和 release

由于 Netty 中有堆外内存的 ByteBuf 实现,堆外内存最好是手动来释放,而不是等 GC 垃圾回收,等不及。针对不同的实现回收方法不一样

UnpooledHeapByteBuf 使用的是 JVM 内存,只需等 GC 回收内存即可UnpooledDirectByteBuf 使用的就是直接内存了,需要我们主动调用特殊的方法来回收内存,因为等到垃圾回收机制去主动回收的时候其实是不及时的PooledByteBuf 和它的子类使用了池化机制,需要更复杂的规则来回收内存

回收内存的源码实现,请关注下面方法的不同实现
protected abstract void deallocate()



当然,对于回收实现 Netty 采用了引用计数法来控制回收内存,主要是每个ByteBuf 都实现了一个通用的接口 ReferenceCounted。redis 里面用的也是引用计数法

每个 ByteBuf 对象的初始计数为 1,表示有人在使用,不能回收调用 release 方法计数减 1,如果计数为 0,证明没有人用了,ByteBuf 内存被回收调用 retain 方法计数加 1,表示调用者没用完之前,其它 handler 即使调用了 release 也不会造成回收。避免了别人(handler )误调用了 release 方法导致 ByteBuf 为 0,被回收掉当计数为 0 时,底层内存会被回收,这时即使 ByteBuf 对象还在,其各个方法均无法正常使用

public interface ReferenceCounted {
    int refCnt();

    ReferenceCounted retain();

    ReferenceCounted retain(int var1);

    ReferenceCounted touch();

    ReferenceCounted touch(Object var1);

    boolean release();

    boolean release(int var1);
}




那么是谁来负责调用 release 呢?我们通常情况对于使用一些流方法,到最后需要关闭流的时候,我们都会选择在一个 try - catch - finally 代码块中去关闭,但是这不是 Netty 中的方法,里面的方法比我们想的要复杂得多

ByteBuf buf = ...
try {
    ...
} finally {
    buf.release();
}

因为 pipeline 的存在,一般需要将 ByteBuf 传递给下一个 ChannelHandler,如果在 finally 中 release 了,就失去了传递性(当然,如果在这个 ChannelHandler 内这个 ByteBuf 已完成了它的使命,那么便无须再传递)

在前面的 handler 在 pineline 中的双向存储结构中,head 和 tail 还有一个作用就是用来收尾的,如果一个 ByteBuf 从head开始入站,到了 tail 还没有被释放掉,那么此时 tail 就会帮我们释放 ByteBuf 。同理,ByteBuf 从 tail 开始出站,到了 head 还没有被释放掉,此时 head 也会帮我们去释放。

基本规则是,谁是最后使用者,谁负责 release,详细分析如下

起点,对于 NIO 实现来讲,在 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read 方法中首次创建 ByteBuf 放入 pipeline(line 163 pipeline.fireChannelRead(byteBuf))入站 ByteBuf 处理原则
1、 对原始 ByteBuf 不做处理,调用 ctx.fireChannelRead(msg) 向后传递,这时无须 release
2、将原始 ByteBuf 转换为其它类型的 Java 对象,这时 ByteBuf 就没用了,必须 release
3、如果不调用 ctx.fireChannelRead(msg) 向后传递,那么也必须 release
4、注意各种异常,如果 ByteBuf 没有成功传递到下一个 ChannelHandler,必须 release
5、假设消息一直向后传,那么 TailContext 会负责释放未处理消息(原始的 ByteBuf)出站 ByteBuf 处理原则:出站消息最终都会转为 ByteBuf 输出,一直向前传,由 HeadContext flush 后 release异常处理原则: 有时候不清楚 ByteBuf 被引用了多少次,但又必须彻底释放,可以循环调用 release 直到返回 true



**从源码角度理解realse:**这里的释放感觉不同实现类的实现释放的手段也不同,有的直接realse掉,而有的是将 ByteBuf 送回 Buf 池中循环利用

尾部释放:
DefaultChannelPipeline.class -> final class TailContext
我们直接截取 TailContext 中的释放方法

protected void onUnhandledInboundMessage(Object msg) {
    try {
        logger.debug(
            "Discarded inbound message {} that reached at the tail of the pipeline. " +
            "Please check your pipeline configuration.", msg);
    } finally {
        ReferenceCountUtil.release(msg);
    }
}
public static boolean release(Object msg) {
	//判断类有没有实现ReferenceCounted接口,如果实现了就证明是一个ByteBuf类
    if (msg instanceof ReferenceCounted) {
    	//调用接口的release方法
        return ((ReferenceCounted) msg).release();
    }
    //不是就返回false
    return false;
}

头部释放:
DefaultChannelPipeline.class -> final class HeadContext
我们直接截取 HeadContext 中的释放方法

  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
     	this.unsafe.write(msg, promise);
 }
//截取一小段
public final void write(Object msg, ChannelPromise promise) {
            this.assertEventLoop();
            //获取出站缓冲区
            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            //缓冲区为空,此时证明从尾到头没有处理器对ByteBuf进行了释放,那么此时head
            //就调用 ReferenceCountUtil.release(msg) 方法释放,又回到了上面的源码
            //如果有出站缓冲区,那么就交给出站缓冲区来进行释放操作
            if (outboundBuffer == null) {
                this.safeSetFailure(promise, this.newClosedChannelException(AbstractChannel.this.initialCloseCause));
                ReferenceCountUtil.release(msg);
            } else {
                int size;



9. slice

首先我们假设有一个 ByteBuf 要处理,比如第一部分要进行读写处理,第二部分要进行其他处理,我们当然可以选择初始化两个 ByteBuf 来装这两部分。但是有一个问题,原来只有一个 ByteBuf ,我们再弄多 2 个,就成了 3 个了,而且ByteBuf 使用的还是直接内存,是很浪费的选择,针对这种情况,我们可以采用
slice来解决。

slice: 【零拷贝】 的体现之一,对原始 ByteBuf 进行切片成多个 ByteBuf,切片后的 ByteBuf 并没有发生内存复制,还是使用原始 ByteBuf 的内存,切片后的 ByteBuf 维护独立的 read,write 指针 。


一个案例:测试切片并且证明使用的是同一片空间

@Slf4j
public class TestSlice {
    public static void main(String[] args) {
        ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);
        buf.writeBytes(new byte[]{'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j'});

        Util.log(buf);
        

        //调用slic, 下面从0 - 5切片
        //参数1:从哪开始, 参数2:切片长度
        ByteBuf s1 = buf.slice(0, 5);
        ByteBuf s2 = buf.slice(5, 5);
        Util.log(s1);
        Util.log(s2);
        

        //证明切片后使用的还是原来的内存,只是内存一分为2
        //我们往s1中把第一个字母改成b
        s1.setByte(0, 'b');
        //查看后来的
        Util.log(s1);
        
        
        //查看原始的, 可以看到我们对s1进行改变,原始的也变了,证明原始的和s1使用的是一块空间
        Util.log(buf);
        
    }
}




一些要注意的地方:

    我们在进行切片之后,不能再往新的 ByteBuf 去追加内容,会直接报错。但是我们可以往原来的 ByteBuf
    去继续添加内容。可以这么想,比如 ByteBuf1 切片切成了 ByteBuf2 和 ByteBuf3 两部分,这时候我们往
    ByteBuf1 去添加当然没问题,可是我们往 ByteBuf2 或者 ByteBuf3
    去添加的时候就有问题了,相当于,切片之后的 ByteBuf 的长度是固定的了,已经被限制了。注意切片后的 ByteBuf 的读写指针是和原来的 ByteBuf 的读写指针独立的,互不影响原有的 ByteBuf1 进行了一次 realease() 方法释放掉之后,就有可能影响切片之后的 ByteBuf2 和 ByteBuf3 。因为原来的内存都释放掉了,切片的当然不能用了。我们可以使用前面说的 ByteBuf.retain() 方法让计数器+1,这时候再掉用 realease() 就不会释放了。当然有可能有人有疑问,这样一加一减有啥用?用处大了,你总不想你在和别人合作的时候自己调用的 ByteBuf 无缘无故被别人释放吧,这种写法其实更多是在保证自己的代码不被破坏。所以,正确的写法是:
ByteBuf s1 = buf.slice(0, 5);
ByteBuf s2 = buf.slice(5, 5);
s1.retain();
s2.retain();
...
...
业务执行中
...
...
//最后经过我们自己的业务后
s1.release();
s2.release();



10. duplicate

减少了一次内存复制,一次性全部复制过来,使用的还是用一片内存



11. copy

会将底层内存数据进行深拷贝,因此无论读写,都与原始 ByteBuf 无关,相当于又申请了一块空间。



12. CompositeByteBuf

【零拷贝】的体现之一,可以将多个 ByteBuf 合并为一个逻辑上的 ByteBuf,避免拷贝


我们先定义出两个 ByteBuf 接下来需要合并在一起

 ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);
        buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
        ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);
        buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});

首先我们可以通过下面这种方法进行写入:但是有一个缺点就是 writeBytes 这个方法里面是发生了复制的,没有达到"零拷贝"的效果
 ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
 ByteBuf byteBuf = buffer.writeBytes(buf1).writeBytes(buf2);
 Util.log(byteBuf);

下面我们用 CompositeByteBuf 进行组合:但是发现有一个问题,就是 byteBufs 并没有创建出来,其实是因为 addComponents 这个方法 或者 addComponent 都不会自动去调整写入指针的位置,就导致了并没有真正写入
CompositeByteBuf byteBufs = ByteBufAllocator.DEFAULT.compositeBuffer();
byteBufs.addComponents(buf1, buf2);
Util.log(byteBufs);
//read index:0 write index:0 capacity:10

解决方法就是在addComponents 这个方法添加一个参数 true,可以让指针自动增长,目的是不直接拷贝,而是使用类似记录地址的方式,提高效率,但是缺点就是带来了更复杂的维护
byteBufs.addComponents(true, buf1, buf2);

read index:0 write index:10 capacity:10
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06 07 08 09 0a                   |..........      |
+--------+-------------------------------------------------+----------------+


同样的,使用这种方法当 release 的时候,我们也要手动进行 retain, 然后自己最后再手动释放掉,避免被别人破坏



13. Unpooled

Unpooled 是一个工具类,类如其名,提供了非池化的 ByteBuf 创建、组合、复制等操作,这里仅介绍其跟【零拷贝】相关的 wrappedBuffer 方法,可以用来包装 ByteBuf

ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);
buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);
buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});

// 当包装 ByteBuf 个数超过一个时, 底层使用了 CompositeByteBuf
ByteBuf buf3 = Unpooled.wrappedBuffer(buf1, buf2);
System.out.println(ByteBufUtil.prettyHexDump(buf3));

也可以用来包装普通字节数组,底层也不会有拷贝操作

ByteBuf buf4 = Unpooled.wrappedBuffer(new byte[]{1, 2, 3}, new byte[]{4, 5, 6});
System.out.println(buf4.getClass());
System.out.println(ByteBufUtil.prettyHexDump(buf4));



14. ByteBuf 的优势

池化计数使得 ByteBuf 的回收利用率高,也减少了内存泄露的风险,减少内存的消耗。对于直接内存,较少了操作系统内存的占用;对于堆内存,则减轻了 GC 的负担读写指针分离,不需要像 ByteBuffer 一样切换读写模式。用起来更方便可以自动扩容支持链式调用,使用更流畅内置的很多方法 slice、duplicate、CompositeByteBuf 都可以体现零拷贝





如有错误,欢迎指出!!!!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/715731.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号