NioByteUnsafe.read读事件处理
原理图源码分析一NioByteUnsafe.read 读事件消息接收缓冲区分配与自适应
原理图源码分析一内存分配与窗口调整
alloc.ioBuffer直接内存分配 javaChannel数据读取至接收缓冲区
原理图源码分析一doreadBytes 总结
NioByteUnsafe.read读事件处理 原理图 源码分析一NioByteUnsafe.read自旋不超过16次读取消息通过allocHandle和allocator分配自适应大小的缓冲区[缓冲区大小的调整在当前read事件处理完成后,自旋过程中不会调整]doReadBytes完成socket数据拷贝至堆外内存fireChannelRead触发netty事件处理本次读事件自旋超过16次则中断allocHandle.record(totalReadAmount)调整缓冲区大小
public final void read() {
...... 删除校验代码
final ChannelPipeline pipeline = pipeline();
获取消息缓冲区分配器
final ByteBufAllocator allocator = config.getAllocator();
每次读事件发生时最多读取多少次
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
allocHandle负责每次指定缓冲区的大小 然后交给ByteBufAllocator分配相应大小的缓冲区
RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
if (allocHandle == null) {
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
}
ByteBuf byteBuf = null;
已经读取多少次
int messages = 0;
boolean close = false;
try {
int totalReadAmount = 0;
boolean readPendingReset = false;
do {
分配接收缓冲区大小
byteBuf = allocHandle.allocate(allocator);
int writable = byteBuf.writableBytes();
实际读取缓冲区大小
int localReadAmount = doReadBytes(byteBuf);
如果读取的字节数为0 则说明本次事件处理完毕
if (localReadAmount <= 0) {
byteBuf.release();
byteBuf = null;
如果读到-1,说明客户端关闭
close = localReadAmount < 0;
if (close) {
setReadPending(false);
}
break;
}
if (!readPendingReset) {
readPendingReset = true;
setReadPending(false);
}
触发fire事件[不一定是完整的数据包 在handler协议栈中处理分包黏包问题]
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
读指针totalReadAmount超过java int表示范围,则中断本次读取
if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
避免int溢出
totalReadAmount = Integer.MAX_VALUE;
break;
}
记录本次读事件一共读取多少字节
totalReadAmount += localReadAmount;
默认配置为true
if (!config.isAutoRead()) {
break;
}
当前窗口没读满 说明读取完毕
if (localReadAmount < writable) {
break;
}
} while (++ messages < maxMessagesPerRead);本次读事件自旋未超过16次
触发pipeline读完成机制
pipeline.fireChannelReadComplete();
自适应: 调整消息接收缓冲区大小
FixedRecvByteBufAllocator 窗口固定;AdaptiveRecvByteBufAllocator则窗口自适应(默认)
allocHandle.record(totalReadAmount);
if (close) {
closeOnRead(pipeline);
close = false;
}
} catch (Throwable t) {
触发pipeline异常机制
handleReadException(pipeline, byteBuf, t, close);
} finally {
移除SelectionKey读事件的监听
if (!config.isAutoRead() && !isReadPending()) {
removeReadOp();
}
}
}
读事件消息接收缓冲区分配与自适应
原理图
allocate分配接收缓冲区record根据本次读事件消息总大小调整接收缓冲区大小默认接收缓冲区为非池化堆外内存
源码分析一内存分配与窗口调整
| SIZE_TABLE窗口字节大小 |
|---|
| 64,80,96,112,128,144,160,176,192,208,224,240,256,272,288,304,320,336,352,368,384,400,416,432,448,464,480,496,512,1024,2048,4096,8192,16384,32768,65536 |
初始窗口字节数为1024record方法窗口调整时,如果窗口要增大则直接增大4个单位(小于512每次变化为16个字节,大于512每次增加一倍)如果要减小则读事件总字节数要小于当前窗口两个单位,并且连续两次发生才进行减小,并且只减小一个单位
public class AdaptiveRecvByteBufAllocator implements RecvByteBufAllocator {
static final int DEFAULT_MINIMUM = 64;
static final int DEFAULT_INITIAL = 1024;
static final int DEFAULT_MAXIMUM = 65536;
private static final int INDEX_INCREMENT = 4;
private static final int INDEX_DECREMENT = 1;
共53个窗口大小值: 16,32,48,64,80,96,112,128,144,160,176,192,208,224,240,256,272,288,304,320,336,352,368,384,400,416,432,448,464,480,496,512,1024,2048,4096,8192,16384,32768,65536,131072,262144,524288,1048576,2097152,4194304,8388608,16777216,33554432,67108864,134217728,268435456,536870912,1073741824
默认窗口初始化为1024字节,外部使用限制了窗口最小64字节 最大65536字节,更大和更小的窗口都不会被使用
private static final int[] SIZE_TABLE;
private static final class HandleImpl implements Handle {
HandleImpl(int minIndex, int maxIndex, int initial) {
this.minIndex = minIndex;
this.maxIndex = maxIndex;
二分法求数组值为1024的索引
index = getSizeTableIndex(initial);
默认1024
nextReceiveBufferSize = SIZE_TABLE[index];
}
@Override
public ByteBuf allocate(ByteBufAllocator alloc) {
通过内存分配器分配直接内存
return alloc.ioBuffer(nextReceiveBufferSize);
}
@Override
public void record(int actualReadBytes) {
读事件总字节数要小于当前窗口两个单位
if (actualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]) {
if (decreaseNow) {
index-- SIZE_TABLE[index]表示当前窗口的大小
index = Math.max(index - INDEX_DECREMENT, minIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
} else {
第一次actualReadBytes小于窗口则设置下一次增大
decreaseNow = true;
}
} else if (actualReadBytes >= nextReceiveBufferSize) {
如果窗口比actualReadBytes实际读的数据小,则增大窗口
index = Math.min(index + INDEX_INCREMENT, maxIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
}
}
}
}
alloc.ioBuffer直接内存分配
默认分配堆外内存
public ByteBuf ioBuffer(int initialCapacity) {
if (PlatformDependent.hasUnsafe()) {
一般都是堆外内存
return directBuffer(initialCapacity);
}
安卓平台等特殊情况采用堆内存
return heapBuffer(initialCapacity);
}
public UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(maxCapacity);
this.alloc = alloc;
调用JNI分配堆外内存
setByteBuffer(allocateDirect(initialCapacity), false);
}
通过jdk的unsafe完成内存分配以及内存假值0填充
DirectByteBuffer(int cap) {
boolean pa = VM.isDirectMemoryPageAligned();
int ps = Bits.pageSize();
long size = Math.max(1L, (long)cap + (pa ? ps : 0));
Bits.reserveMemory(size, cap);
long base = 0;
try {
分配堆外内存
base = unsafe.allocateMemory(size);
} catch (OutOfMemoryError x) {
Bits.unreserveMemory(size, cap);
throw x;
}
初始化堆外内存假值0[笔者大胆猜测下是为了完成缺页处理]
unsafe.setMemory(base, size, (byte) 0);
if (pa && (base % ps != 0)) {
address = base + ps - (base & (ps - 1));
} else {
address = base;
}
cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
att = null;
}
初始化内存地址设置直接内存
final void setByteBuffer(ByteBuffer buffer, boolean tryFree) {
if (tryFree) {
ByteBuffer oldBuffer = this.buffer;
if (oldBuffer != null) {
if (doNotFree) {
donotFree = false;
} else {
通过策略如unsafe.freeMemory(address)释放内存
freeDirect(oldBuffer);
}
}
}
设置直接内存的相关元信息到java的封装对象上
this.buffer = buffer;
memoryAddress = PlatformDependent.directBufferAddress(buffer);
tmpNioBuf = null;
capacity = buffer.remaining();
}
javaChannel数据读取至接收缓冲区
原理图
源码分析一doreadBytes
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
读取核心
return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes());
}
写入堆外内存,返回具体写入多少字节
public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
ensureWritable(length);// UnpooledByteBufAllocatorInstrumentedUnpooledUnsafeNoCleanerDirectByteBuf
UnpooledUnsafeDirectByteBuf.setBytes完成javachannel数据读取
int writtenBytes = setBytes(writerIndex, in, length);
if (writtenBytes > 0) {
writerIndex += writtenBytes;
}
返回写入多少字节数
return writtenBytes;
}
javachannel读取数据到堆外内存
| 类名 | 作用 |
|---|---|
| FileDescriptor | javachannel对应的文件描述符 |
| NativeDispatcher | 调用NativeDispatcher.read将fd上的数据copy到直接内存 |
写入堆外内存: 指定首地址和大小,通过 native方法完成数据写入
var9 = NativeDispatcher.read(fd, ((DirectBuffer)buf).address() , size);
public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
ensureAccessible();
ByteBuffer tmpBuf = internalNioBuffer();
tmpBuf.clear().position(index).limit(index + length);
try {
javachannel将数据写入buffer
底层调用 IOUtil.read(this.fd, var1, -1L, nd);
return in.read(tmpBuf);
} catch (ClosedChannelException ignored) {
发生异常返回-1触发netty的关闭流程
return -1;
}
}
总结
读事件发生由堆外内存分配器创建自适应大小的堆外内存消息接收缓冲区读事件自旋16次读取后则触发fireChannelRead事件javachannel底层的数据copy通过socket文件描述符,堆外内存的首地址和大小以及NativeDispatcher底层的native完成堆外内存数据copy



