栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

netty4核心源码分析第七篇一核心篇服务端NioByteUnsafe处理read事件

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

netty4核心源码分析第七篇一核心篇服务端NioByteUnsafe处理read事件

文章目录

NioByteUnsafe.read读事件处理

原理图源码分析一NioByteUnsafe.read 读事件消息接收缓冲区分配与自适应

原理图源码分析一内存分配与窗口调整

alloc.ioBuffer直接内存分配 javaChannel数据读取至接收缓冲区

原理图源码分析一doreadBytes 总结

NioByteUnsafe.read读事件处理 原理图

源码分析一NioByteUnsafe.read

自旋不超过16次读取消息通过allocHandle和allocator分配自适应大小的缓冲区[缓冲区大小的调整在当前read事件处理完成后,自旋过程中不会调整]doReadBytes完成socket数据拷贝至堆外内存fireChannelRead触发netty事件处理本次读事件自旋超过16次则中断allocHandle.record(totalReadAmount)调整缓冲区大小

public final void read() {
    ...... 删除校验代码
    final ChannelPipeline pipeline = pipeline();
    获取消息缓冲区分配器
    final ByteBufAllocator allocator = config.getAllocator();
    每次读事件发生时最多读取多少次
    final int maxMessagesPerRead = config.getMaxMessagesPerRead();
    allocHandle负责每次指定缓冲区的大小 然后交给ByteBufAllocator分配相应大小的缓冲区
    RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
    if (allocHandle == null) {
        this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
    }
    ByteBuf byteBuf = null;
    已经读取多少次
    int messages = 0;
    boolean close = false;
    try {
        int totalReadAmount = 0;
        boolean readPendingReset = false;
        do {
            分配接收缓冲区大小
            byteBuf = allocHandle.allocate(allocator);
            int writable = byteBuf.writableBytes();
            实际读取缓冲区大小
            int localReadAmount = doReadBytes(byteBuf);
            如果读取的字节数为0 则说明本次事件处理完毕
            if (localReadAmount <= 0) {
                byteBuf.release();
                byteBuf = null;
                如果读到-1,说明客户端关闭
                close = localReadAmount < 0;
                if (close) {
                    setReadPending(false);
                }
                break;
            }
            if (!readPendingReset) {
                readPendingReset = true;
                setReadPending(false);
            }
            触发fire事件[不一定是完整的数据包 在handler协议栈中处理分包黏包问题]
            pipeline.fireChannelRead(byteBuf);
            byteBuf = null;
            读指针totalReadAmount超过java int表示范围,则中断本次读取
            if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
                避免int溢出
                totalReadAmount = Integer.MAX_VALUE;
                break;
            }
            记录本次读事件一共读取多少字节
            totalReadAmount += localReadAmount;
            默认配置为true
            if (!config.isAutoRead()) {
                break;
            }
            当前窗口没读满 说明读取完毕
            if (localReadAmount < writable) {
                break;
            }
        } while (++ messages < maxMessagesPerRead);本次读事件自旋未超过16次
        触发pipeline读完成机制
        pipeline.fireChannelReadComplete();
        自适应: 调整消息接收缓冲区大小
        FixedRecvByteBufAllocator 窗口固定;AdaptiveRecvByteBufAllocator则窗口自适应(默认)
        allocHandle.record(totalReadAmount);

        if (close) {
            closeOnRead(pipeline);
            close = false;
        }
    } catch (Throwable t) {
        触发pipeline异常机制
        handleReadException(pipeline, byteBuf, t, close);
    } finally {
        移除SelectionKey读事件的监听
        if (!config.isAutoRead() && !isReadPending()) {
            removeReadOp();
        }
    }
}       
读事件消息接收缓冲区分配与自适应 原理图

allocate分配接收缓冲区record根据本次读事件消息总大小调整接收缓冲区大小默认接收缓冲区为非池化堆外内存
源码分析一内存分配与窗口调整

SIZE_TABLE窗口字节大小
64,80,96,112,128,144,160,176,192,208,224,240,256,272,288,304,320,336,352,368,384,400,416,432,448,464,480,496,512,1024,2048,4096,8192,16384,32768,65536

初始窗口字节数为1024record方法窗口调整时,如果窗口要增大则直接增大4个单位(小于512每次变化为16个字节,大于512每次增加一倍)如果要减小则读事件总字节数要小于当前窗口两个单位,并且连续两次发生才进行减小,并且只减小一个单位

public class AdaptiveRecvByteBufAllocator implements RecvByteBufAllocator {
    static final int DEFAULT_MINIMUM = 64;
    static final int DEFAULT_INITIAL = 1024;
    static final int DEFAULT_MAXIMUM = 65536;

    private static final int INDEX_INCREMENT = 4;
    private static final int INDEX_DECREMENT = 1;
    共53个窗口大小值: 16,32,48,64,80,96,112,128,144,160,176,192,208,224,240,256,272,288,304,320,336,352,368,384,400,416,432,448,464,480,496,512,1024,2048,4096,8192,16384,32768,65536,131072,262144,524288,1048576,2097152,4194304,8388608,16777216,33554432,67108864,134217728,268435456,536870912,1073741824

    默认窗口初始化为1024字节,外部使用限制了窗口最小64字节 最大65536字节,更大和更小的窗口都不会被使用
    private static final int[] SIZE_TABLE;

    private static final class HandleImpl implements Handle {
        HandleImpl(int minIndex, int maxIndex, int initial) {
            this.minIndex = minIndex;
            this.maxIndex = maxIndex;
            二分法求数组值为1024的索引
            index = getSizeTableIndex(initial);
            默认1024
            nextReceiveBufferSize = SIZE_TABLE[index];
        }

        @Override
        public ByteBuf allocate(ByteBufAllocator alloc) {
            通过内存分配器分配直接内存
            return alloc.ioBuffer(nextReceiveBufferSize);
        }
        @Override
        public void record(int actualReadBytes) {
	        读事件总字节数要小于当前窗口两个单位
            if (actualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]) {
                if (decreaseNow) {
                    index-- SIZE_TABLE[index]表示当前窗口的大小
                    index = Math.max(index - INDEX_DECREMENT, minIndex);
                    nextReceiveBufferSize = SIZE_TABLE[index];
                    decreaseNow = false;
                } else {
                    第一次actualReadBytes小于窗口则设置下一次增大
                    decreaseNow = true;
                }
            } else if (actualReadBytes >= nextReceiveBufferSize) {
                如果窗口比actualReadBytes实际读的数据小,则增大窗口
                index = Math.min(index + INDEX_INCREMENT, maxIndex);
                nextReceiveBufferSize = SIZE_TABLE[index];
                decreaseNow = false;
            }
        }
    }
}



alloc.ioBuffer直接内存分配

默认分配堆外内存

    public ByteBuf ioBuffer(int initialCapacity) {
        if (PlatformDependent.hasUnsafe()) {
            一般都是堆外内存
            return directBuffer(initialCapacity);
        }
        安卓平台等特殊情况采用堆内存
        return heapBuffer(initialCapacity);
    }
public UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
    super(maxCapacity);

    this.alloc = alloc;
    调用JNI分配堆外内存
    setByteBuffer(allocateDirect(initialCapacity), false);
}

通过jdk的unsafe完成内存分配以及内存假值0填充

DirectByteBuffer(int cap) {
    boolean pa = VM.isDirectMemoryPageAligned();
    int ps = Bits.pageSize();
    long size = Math.max(1L, (long)cap + (pa ? ps : 0));
    Bits.reserveMemory(size, cap);

    long base = 0;
    try {
        分配堆外内存
        base = unsafe.allocateMemory(size);
    } catch (OutOfMemoryError x) {
        Bits.unreserveMemory(size, cap);
        throw x;
    }
    初始化堆外内存假值0[笔者大胆猜测下是为了完成缺页处理]
    unsafe.setMemory(base, size, (byte) 0);
    if (pa && (base % ps != 0)) {
        address = base + ps - (base & (ps - 1));
    } else {
        address = base;
    }
    cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
    att = null;
}

初始化内存地址设置直接内存

  final void setByteBuffer(ByteBuffer buffer, boolean tryFree) {
        if (tryFree) {
            ByteBuffer oldBuffer = this.buffer;
            if (oldBuffer != null) {
                if (doNotFree) {
                    donotFree = false;
                } else {
                	通过策略如unsafe.freeMemory(address)释放内存
                    freeDirect(oldBuffer);
                }
            }
        }
        设置直接内存的相关元信息到java的封装对象上
        this.buffer = buffer;
        memoryAddress = PlatformDependent.directBufferAddress(buffer);
        tmpNioBuf = null;
        capacity = buffer.remaining();
    }
javaChannel数据读取至接收缓冲区 原理图

源码分析一doreadBytes
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
    读取核心
    return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes());
}

写入堆外内存,返回具体写入多少字节

public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
    ensureWritable(length);// UnpooledByteBufAllocatorInstrumentedUnpooledUnsafeNoCleanerDirectByteBuf
    UnpooledUnsafeDirectByteBuf.setBytes完成javachannel数据读取
    int writtenBytes = setBytes(writerIndex, in, length);
    if (writtenBytes > 0) {
        writerIndex += writtenBytes;
    }
    返回写入多少字节数
    return writtenBytes;
}

javachannel读取数据到堆外内存

类名作用
FileDescriptorjavachannel对应的文件描述符
NativeDispatcher调用NativeDispatcher.read将fd上的数据copy到直接内存

写入堆外内存: 指定首地址和大小,通过 native方法完成数据写入
var9 = NativeDispatcher.read(fd, ((DirectBuffer)buf).address() , size);

public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
    ensureAccessible();
    ByteBuffer tmpBuf = internalNioBuffer();
    tmpBuf.clear().position(index).limit(index + length);
    try {
        javachannel将数据写入buffer
        底层调用 IOUtil.read(this.fd, var1, -1L, nd);
        return in.read(tmpBuf);
    } catch (ClosedChannelException ignored) {
        发生异常返回-1触发netty的关闭流程
        return -1;
    }
}
总结

读事件发生由堆外内存分配器创建自适应大小的堆外内存消息接收缓冲区读事件自旋16次读取后则触发fireChannelRead事件javachannel底层的数据copy通过socket文件描述符,堆外内存的首地址和大小以及NativeDispatcher底层的native完成堆外内存数据copy

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/785382.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号