基础结构
实现
存储数据基本机构 eface
// 存储元素的结构体,类型指针和值指针
type eface struct {
typ, val unsafe.Pointer
}
Pool 底层用 eface 来存储单个 Object, 包括 typ 指针: Object 的类型,val 指针: Object 的值
poolDequeue
type poolDequeue struct {
headTail uint64
vals []eface
}
headTail:
[hhhhhhhh hhhhhhhh hhhhhhhh hhhhhhhh tttttttt tttttttt tttttttt tttttttt]
1. headTail表示下标,高32位表示头下标,低32位表示尾下标,poolDequeue定义了,head tail的pack和unpack函数方便转化,
实际用的时候都会mod ( len(vals) - 1 ) 来防止溢出
2. head和tail永远只用32位表示,溢出后会从0开始,这也满足循环队列的设计
3. 队列为空的条件 tail == head
4. 队列满的条件 (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head tail加上队列长度和head相等(实际上就是队列已有的空间都有值了,满了)
对应计算头部和尾部值
func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) {
// 定义32位1值
const mask = 1<<dequeueBits - 1
// 头部为高32位,右移32为得到高32位值,与mask取与
head = uint32((ptrs >> dequeueBits) & mask)
// 64位直接与mask取与,高32位被截取
tail = uint32(ptrs & mask)
return
}
func (d *poolDequeue) pack(head, tail uint32) uint64 {
const mask = 1<<dequeueBits - 1
return (uint64(head) << dequeueBits) |
uint64(tail&mask)
}
vals:
1 poolDequeue是被poolChain使用,poolChain使用poolDequeue时
a) 初始化vals长度为8,vals长度必须是2的幂
b) 当队列满时,vals长度*2,最大扩展到 dequeueLimit = (1 << 32) / 4 = (1 << 30),之后就不会扩展了
2 为什么vals长度必须是2的幂 ?
这是因为go的内存管理策略是将内存分为2的幂大小的链表,申请2的幂大小的内存可以有效减小分配内存的开销
3 为什么dequeueLimit是(1 << 32) / 4 = 1 << 30 ?
a) dequeueLimit 必须是2的幂(上边解释过)
b) head和tail都是32位,最大是1 << 31,如果都用的话,head和tail就是无符号整型,无符号整型使用的时候会有很多上溢的错误,这类错误是不容易检测的,所以相比之下还不如用31位有符号整型,有错就报出来,结论参考https://stackoverrun.com/cn/q/10770747
poolDequeue 是一个无锁、固定大小的单生产端多消费端的环形队列,单一 producer 可以在头部 push 和 pop(可能和传统队列头部只能 push 的定义不同),多 consumer 可以在尾部 pop
poolChain
poolChain 是动态版的 poolDequeue head(poolDequeue)[prev] –> <— next[prev] —> <—[next] tail(poolDequeue) 动态的队列,队列每个节点又是一个环形队列 (poolDequeue)
type poolChainElt struct {
poolDequeue
next, prev *poolChainElt
}
type poolChain struct {
// 头指针,只能单一producer操作(push, pop)
head *poolChainElt
// 尾指针,可以被多个consumer pop,必须是原子操作
tail *poolChainElt
}
// poolChain成员函数
func (c *poolChain) pushHead(val interface{})
1. 如果head为nil,说明队列现在是空的,那么新建一个节点,将head和tail都指向这个节点
2. 将val push到head的环形队列中,如果push成功了,可以返回了
3. 如果没push成功,则说明head的环形队列满了,就再创建一个两倍head大小的节点[最大(1 << 32) / 4],
将新节点作为head,并且处理好新head和旧head的next,prev关系
4. 将val push到head的环形队列中
func (c *poolChain) popHead()
1. 先在head环形队列中popHead试试,如果空了,当前节点就没用了,就删掉当前节点,去prev节点并且把prev节点作为新head再取一值递归下去,
能取到就返回,取不到说明队列空了
func (c *poolChain) popTail()
1. 如果tail为nil,说明队列是空的,直接返回
2. 如果tail非nil,就取取试试,有东西就返回
3. 如果没取出来东西,那么说明tail节点没存东西了,递归去prev节点环形队列中popTail,并且把prev节点作为tail,能取到就返回,取不到就是空了
poolLocal
- poolLocal 是每个调度器 (P) 存 Object 的结构体
- private 是每个调度器私有的,shared 是所有调度器公有的,每个调度器 pop 时的逻辑是: 先看 private,没有在看自己的 shared,再没有就去其他调度器的 shared 偷,再没有才是空
- pad 是防止伪共享,参考https://www.cnblogs.com/cyfonly/p/5800758.html
type poolLocal struct {
poolLocalInternal
// Prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
// Local per-P Pool appendix.
// 当前调度器的内部资源
type poolLocalInternal struct {
// 当前调度器的私有资源
private interface{} // Can be used only by the respective P.
// 所有调度器的公有资源
shared poolChain // Local P can pushHead/popHead; any P can popTail.
}
两个重要的方法
- PUT
- 首先关闭竞争检测,然后会将当前 goroutine 固定到一个调度器 (P) 上,且不允许抢占
- 从 Pool 的 local 中取出来当前 goroutine 固定到那个调度器 (P) 对应的 poolLocal, 没有就新建
- 先判断这个当前调度器 (P) 专属 poolLocal,私有空间是不是空的,如果是把 x 放到私有空间,并把 x 置 nil
- 判断 x 是否为 nil,如果不为空说明私有空间满了,就 push 到该调度器专属 poolLocal 的 shared head
- 允许抢占,开启竞争检测
==参考 (https://gocn.vip/topics/10396)==