PoolArena 是 Netty 申请内存的主要入口,Netty 借鉴 jemalloc 中 Arena 的设计思想,采用固定数量的多个 Arena 进行内存分配,默认数量通常为 CPU 核数 * 2
。线程在首次申请分配内存时,会通过 round-robin
的方式轮询 PoolArena 数组,选择一个固定的 PoolArena ,该线程在整个生命周期内都只会与该 PoolArena 打交道,所以每个线程都会保存对应的 PoolArena 信息,从而提高访问效率。
本篇文章深入分析 PoolArena 的源码及核心原理。
PoolArena 的结构
PoolArena 是一个抽象类,它有两个子类:DirectArena 和 HeapArena,其类图如下:
PoolArena 继承 SizeClass,实现 PoolArenaMetric 接口。PoolArenaMetric 提供了一些方法来获取 PoolArena的指标信息,它可以让我们更好地了解内存池的使用情况,以便优化和调优应用程序。
PoolArena 重要的属性如下:
abstract class PoolArena<T> extends SizeClasses implements PoolArenaMetric { enum SizeClass { Small, Normal } private final PoolSubpage<T>[] smallSubpagePools; private final PoolChunkList<T> q050; private final PoolChunkList<T> q025; private final PoolChunkList<T> q000; private final PoolChunkList<T> qInit; private final PoolChunkList<T> q075; private final PoolChunkList<T> q100; // .... }
从这里可以看出,PoolArena 只有 Small 和 Norma 两种内存规格,两种内存规格,就有两种内存分配的方式:
PoolSubpage 类型的数组:smallSubpagePools,用于分配小于 28K 的内存。
由 6 个 PoolChunkList 组成的双向链表:用于分配小于 4MB 的内存。
结构如下:
PoolArena 的构造函数
构造函数如下:
protected PoolArena(PooledByteBufAllocator parent, int pageSize, int pageShifts, int chunkSize, int cacheAlignment) { super(pageSize, pageShifts, chunkSize, cacheAlignment); // 所属分配器 this.parent = parent; directMemoryCacheAlignment = cacheAlignment; // 39 numSmallSubpagePools = nSubpages; smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools); for (int i = 0; i < smallSubpagePools.length; i ++) { // 初始化 Subpage 首节点 smallSubpagePools[i] = newSubpagePoolHead(); } q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE, chunkSize); q075 = new PoolChunkList<T>(this, q100, 75, 100, chunkSize); q050 = new PoolChunkList<T>(this, q075, 50, 100, chunkSize); q025 = new PoolChunkList<T>(this, q050, 25, 75, chunkSize); q000 = new PoolChunkList<T>(this, q025, 1, 50, chunkSize); qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25, chunkSize); q100.prevList(q075); q075.prevList(q050); q050.prevList(q025); q025.prevList(q000); q000.prevList(null); qInit.prevList(qInit); List<PoolChunkListMetric> metrics = new ArrayList<PoolChunkListMetric>(6); metrics.add(qInit); metrics.add(q000); metrics.add(q025); metrics.add(q050); metrics.add(q075); metrics.add(q100); chunkListMetrics = Collections.unmodifiableList(metrics); }
构造函数主要是初始化 smallSubpagePools 数组和 PoolChunkList 双向链表 。这里重点讲下 PoolChunkList 双向链表。从构造函数中我们可以看到该双向链表由 6 个节点组成,每个节点代表不同的内存使用率,如下:
qInit,内存使用率为 0% ~ 25% 的 Chunk。
q000,内存使用率为 1% ~ 50% 的 Chunk。
q025,内存使用率为 25% ~ 75% 的 Chunk。
q050,内存使用率为 50% ~ 100% 的 Chunk。
q075,内存使用率为 75% ~ 100% 的 Chunk。
q100,内存使用率为 100% 的 Chunk。
构建的双向链表结构如下
针对这个结构,有两个问题需要解答:
qInit 和 q000 有什么区别?这样相似的两个节点为什么不设计成一个?
节点与节点之间的内存使用率重叠很大,为什么要这么设计?
第一个问题:qInit 和 q000 有什么区别?这样相似的两个节点为什么不设计成一个?
仔细观察这个 PoolChunkList 的双向链表,你会发现它并不是一个完全的双向链表,它与完全的双向链表有两个区别:
qInit 的 前驱节点是自己。这就意味着在 qInit 节点中的 PoolChunk 使用率到达 0% 后,它并不会被回收。
q000 则没有前驱节点,这样就导致一个问题,随着 PoolChunk 的内存使用率降低,直到小于 1% 后,它并不会退回到 qInit 节点,而是等待完全释放后被回收。
所以如果某个 PoolChunk 的内存使用率一直都在 0 ~ 25% 之间波动,那么它就可以一直停留在 qInit 中,这样就避免了重复的初始化工作,故而 qInit 的作用主要在于避免某 PoolChunk 的内存使用变化率不大的情况下的频繁初始化和释放,提高内存分配的效率。而 q000 则用于 PoolChunk 内存使用变化率较大,待完全释放后进行内存回收,防止永远驻留在内存中。
qInit 和 q000 的配合使用,使得 Netty 的内存分配和回收效率更高效了。
第二个问题:节点与节点之间的内存使用率重叠很大,为什么要这么设计?
我们先看下图:
从上图可以看出,这些节点几乎有一半空间是重叠的,为什么要这么设计呢?我们假定,q025 的范围为 [25%,50%),q050 的范围为 [50%,75%),如果有一个 PoolChunk 它的内存使用率变化情况为 40%、55%、45%、60%、48%,66%,这样就会导致这个 PoolChunk 会在 q025 、q050 这两个 PoolChunkList 不断移动,势必会造成性能损耗。如果范围是 [25%,75%) 和 [50%,100%),这样的内存使用率变化情况只会在 q025 中,只要当内存使用率超过了 75% 才会移动到 q050,而随着该 PoolChunk 的内存使用率降低,它也不是降到 75% 就回到 q025,而是要到 50%,这样可以调整的范围就大的多了。
内存分配
PoolArena 提供了 allocate()
用于内存分配,该方法根据申请内存的大小规格来分配不同规格的内存:
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) { PooledByteBuf<T> buf = newByteBuf(maxCapacity); allocate(cache, buf, reqCapacity); return buf; } private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) { // 根据 size 计算 sizeIdex final int sizeIdx = size2SizeIdx(reqCapacity); if (sizeIdx <= smallMaxSizeIdx) { // Small 规格,在 PoolSubpage 中分配 tcacheAllocateSmall(cache, buf, reqCapacity, sizeIdx); } else if (sizeIdx < nSizes) { // Normal 规则,在 PoolChunk 中分配 tcacheAllocateNormal(cache, buf, reqCapacity, sizeIdx); } else { // Huge 规格,直接分配 int normCapacity = directMemoryCacheAlignment > 0 ? normalizeSize(reqCapacity) : reqCapacity; allocateHuge(buf, normCapacity); } }
首先根据申请的内存大小 reqCapacity 计算 sizeIdex,sizeIdex 是在 SizeClass 中计算的,如下:
public int size2SizeIdx(int size) { if (size == 0) { return 0; } if (size > chunkSize) { return nSizes; } size = alignSizeIfNeeded(size, directMemoryCacheAlignment); // 对于小于 lookupMaxSize 这段,可以直接在 size2idxTab 表中取 if (size <= lookupMaxSize) { return size2idxTab[size - 1 >> LOG2_QUANTUM]; } // 这里要跟计算 size 的个公式来倒推,大明哥数学都还给老师就不推到了 int x = log2((size << 1) - 1); int shift = x < LOG2_SIZE_CLASS_GROUP + LOG2_QUANTUM + 1 ? 0 : x - (LOG2_SIZE_CLASS_GROUP + LOG2_QUANTUM); int group = shift << LOG2_SIZE_CLASS_GROUP; int log2Delta = x < LOG2_SIZE_CLASS_GROUP + LOG2_QUANTUM + 1 ? LOG2_QUANTUM : x - LOG2_SIZE_CLASS_GROUP - 1; int deltaInverseMask = -1 << log2Delta; int mod = (size - 1 & deltaInverseMask) >> log2Delta & (1 << LOG2_SIZE_CLASS_GROUP) - 1; return group + mod; }
得到 sizeIdex 后我们就可以确认使用哪种方式来进行内存分配:
Small:[0,38]
Normal:[39,68]
Huge:(68,)
tcacheAllocateSmall:Small 规格
tcacheAllocateSmall()
用于分配 Small 规格的内存:
private void tcacheAllocateSmall(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity, final int sizeIdx) { // 使用缓存 if (cache.allocateSmall(this, buf, reqCapacity, sizeIdx)) { // was able to allocate out of the cache so move on return; } // 确定是哪个 PoolSubpage 块 final PoolSubpage<T> head = smallSubpagePools[sizeIdx]; final boolean needsNormalAllocation; // 锁定整个链表 synchronized (head) { final PoolSubpage<T> s = head.next; needsNormalAllocation = s == head; // 这里表示该链表中有空闲的内存可供分配 if (!needsNormalAllocation) { assert s.doNotDestroy && s.elemSize == sizeIdx2size(sizeIdx) : "doNotDestroy=" + s.doNotDestroy + ", elemSize=" + s.elemSize + ", sizeIdx=" + sizeIdx; long handle = s.allocate(); assert handle >= 0; s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity, cache); } } // needsNormalAllocation == true,说明该 PoolSubpage 中没有对应的内存,需要从 PoolChunk 中分配 PoolSubpage if (needsNormalAllocation) { synchronized (this) { allocateNormal(buf, reqCapacity, sizeIdx, cache); } } // allocationsSmall count + 1 incSmallAllocation(); }
PoolThreadCache 缓存中是否存在,有就直接分配即可
如果在 PoolThreadCache 缓存中没有,则从 smallSubpagePools 数组中取,这里需要注意,因为并发的关系,这里使用了
synchronized (head)
来保证线程安全,锁定 head 就是锁定整个链表。如果
head.next == head
说明当前链表中没有空闲的内存可分配,需要从 PoolChunk 中分配 PoolSubpage。
tcacheAllocateNormal:Normal 规格
tcacheAllocateNormal()
用于分配 Normal 规格的内存。
private void tcacheAllocateNormal(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity, final int sizeIdx) { if (cache.allocateNormal(this, buf, reqCapacity, sizeIdx)) { // was able to allocate out of the cache so move on return; } // 注意这里是对整个 PoolArena 加锁 synchronized (this) { allocateNormal(buf, reqCapacity, sizeIdx, cache); ++allocationsNormal; } }
因为 Normal 规格的内存需要从 PoolChunk 中分配,其主要是利用 5种不同类型的 PoolChunkList 来进行分配,而一个 PoolArena 中只有一个 PoolChunkList 链表,所以需要对整个 PoolArena 加锁。
private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadCache threadCache) { if (q050.allocate(buf, reqCapacity, sizeIdx, threadCache) || q025.allocate(buf, reqCapacity, sizeIdx, threadCache) || q000.allocate(buf, reqCapacity, sizeIdx, threadCache) || qInit.allocate(buf, reqCapacity, sizeIdx, threadCache) || q075.allocate(buf, reqCapacity, sizeIdx, threadCache)) { return; } // 生成一个新的 PoolChunk PoolChunk<T> c = newChunk(pageSize, nPSizes, pageShifts, chunkSize); boolean success = c.allocate(buf, reqCapacity, sizeIdx, threadCache); assert success; // 加入到 qInit qInit.add(c); }
从这个方法我们可以看出,在 PoolChunkList 双向链表中它并不是从 qInit 到 q100 按照顺序来分配的,而是按照q050 —> q025 —> q000 —> qInit —> q075
这样的顺序,这样做的目的是这样的顺序内存分配效率相对更高些。
allocateHuge:Huge 规格
allocateHuge()
用于分配 Huge 规格的内存,其分配方式是不进行池化处理,直接从堆或者堆外内存分配。
private void allocateHuge(PooledByteBuf<T> buf, int reqCapacity) { PoolChunk<T> chunk = newUnpooledChunk(reqCapacity); activeBytesHuge.add(chunk.chunkSize()); buf.initUnpooled(chunk, reqCapacity); allocationsHuge.increment(); }
内存释放
PoolArena 提供了 free() 用于对内存进行释放:
void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) { if (chunk.unpooled) { // 非池化,直接释放即可 int size = chunk.chunkSize(); destroyChunk(chunk); activeBytesHuge.add(-size); deallocationsHuge.increment(); } else { SizeClass sizeClass = sizeClass(handle); // 加入到缓存中 if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) { // cached so not free it. return; } // 释放内存 freeChunk(chunk, handle, normCapacity, sizeClass, nioBuffer, false); } }
对于 Huge 这类没有池化的内存,则直接释放 PoolChunk 即可。
对于池化的内存,优先加入到 PoolThreadCache 缓存中,如果添加失败的话,则调用
freeChunk()
释放内存
void freeChunk(PoolChunk<T> chunk, long handle, int normCapacity, SizeClass sizeClass, ByteBuffer nioBuffer,boolean finalizer) { final boolean destroyChunk; // 加锁 synchronized (this) { // 在 PoolChunkList 中进行释放,并调整其对应的数据结构 destroyChunk = !chunk.parent.free(chunk, handle, normCapacity, nioBuffer); } if (destroyChunk) { destroyChunk(chunk); } }
由于 PoolArena 只是内存的分配和释放的入口,真正执行内存分配的是在 PoolChunk 和 PoolSubpage 中,所以这篇文章在内存分配和释放地方并没深入到这两个类当中,在后面讲解 PoolChunk 和 PoolSubpage 时再详细深入分析。
转自:大明哥_