Source File
	poolqueue.go
Belonging Package
	sync
// Copyright 2019 The Go Authors. All rights reserved.// Use of this source code is governed by a BSD-style// license that can be found in the LICENSE file.package syncimport ()// poolDequeue is a lock-free fixed-size single-producer,// multi-consumer queue. The single producer can both push and pop// from the head, and consumers can pop from the tail.//// It has the added feature that it nils out unused slots to avoid// unnecessary retention of objects. This is important for sync.Pool,// but not typically a property considered in the literature.type poolDequeue struct {// headTail packs together a 32-bit head index and a 32-bit// tail index. Both are indexes into vals modulo len(vals)-1.//// tail = index of oldest data in queue// head = index of next slot to fill//// Slots in the range [tail, head) are owned by consumers.// A consumer continues to own a slot outside this range until// it nils the slot, at which point ownership passes to the// producer.//// The head index is stored in the most-significant bits so// that we can atomically add to it and the overflow is// harmless.headTail atomic.Uint64// vals is a ring buffer of interface{} values stored in this// dequeue. The size of this must be a power of 2.//// vals[i].typ is nil if the slot is empty and non-nil// otherwise. A slot is still in use until *both* the tail// index has moved beyond it and typ has been set to nil. This// is set to nil atomically by the consumer and read// atomically by the producer.vals []eface}type eface struct {typ, val unsafe.Pointer}const dequeueBits = 32// dequeueLimit is the maximum size of a poolDequeue.//// This must be at most (1<<dequeueBits)/2 because detecting fullness// depends on wrapping around the ring buffer without wrapping around// the index. We divide by 4 so this fits in an int on 32-bit.const dequeueLimit = (1 << dequeueBits) / 4// dequeueNil is used in poolDequeue to represent interface{}(nil).// Since we use nil to represent empty slots, we need a sentinel value// to represent nil.type dequeueNil *struct{}func ( *poolDequeue) ( uint64) (, uint32) {const = 1<<dequeueBits - 1= uint32(( >> dequeueBits) & )= uint32( & )return}func ( *poolDequeue) (, uint32) uint64 {const = 1<<dequeueBits - 1return (uint64() << dequeueBits) |uint64(&)}// pushHead adds val at the head of the queue. It returns false if the// queue is full. It must only be called by a single producer.func ( *poolDequeue) ( any) bool {:= .headTail.Load(), := .unpack()if (+uint32(len(.vals)))&(1<<dequeueBits-1) == {// Queue is full.return false}:= &.vals[&uint32(len(.vals)-1)]// Check if the head slot has been released by popTail.:= atomic.LoadPointer(&.typ)if != nil {// Another goroutine is still cleaning up the tail, so// the queue is actually still full.return false}// The head slot is free, so we own it.if == nil {= dequeueNil(nil)}*(*any)(unsafe.Pointer()) =// Increment head. This passes ownership of slot to popTail// and acts as a store barrier for writing the slot..headTail.Add(1 << dequeueBits)return true}// popHead removes and returns the element at the head of the queue.// It returns false if the queue is empty. It must only be called by a// single producer.func ( *poolDequeue) () (any, bool) {var *efacefor {:= .headTail.Load(), := .unpack()if == {// Queue is empty.return nil, false}// Confirm tail and decrement head. We do this before// reading the value to take back ownership of this// slot.--:= .pack(, )if .headTail.CompareAndSwap(, ) {// We successfully took back slot.= &.vals[&uint32(len(.vals)-1)]break}}:= *(*any)(unsafe.Pointer())if == dequeueNil(nil) {= nil}// Zero the slot. Unlike popTail, this isn't racing with// pushHead, so we don't need to be careful here.* = eface{}return , true}// popTail removes and returns the element at the tail of the queue.// It returns false if the queue is empty. It may be called by any// number of consumers.func ( *poolDequeue) () (any, bool) {var *efacefor {:= .headTail.Load(), := .unpack()if == {// Queue is empty.return nil, false}// Confirm head and tail (for our speculative check// above) and increment tail. If this succeeds, then// we own the slot at tail.:= .pack(, +1)if .headTail.CompareAndSwap(, ) {// Success.= &.vals[&uint32(len(.vals)-1)]break}}// We now own slot.:= *(*any)(unsafe.Pointer())if == dequeueNil(nil) {= nil}// Tell pushHead that we're done with this slot. Zeroing the// slot is also important so we don't leave behind references// that could keep this object live longer than necessary.//// We write to val first and then publish that we're done with// this slot by atomically writing to typ..val = nilatomic.StorePointer(&.typ, nil)// At this point pushHead owns the slot.return , true}// poolChain is a dynamically-sized version of poolDequeue.//// This is implemented as a doubly-linked list queue of poolDequeues// where each dequeue is double the size of the previous one. Once a// dequeue fills up, this allocates a new one and only ever pushes to// the latest dequeue. Pops happen from the other end of the list and// once a dequeue is exhausted, it gets removed from the list.type poolChain struct {// head is the poolDequeue to push to. This is only accessed// by the producer, so doesn't need to be synchronized.head *poolChainElt// tail is the poolDequeue to popTail from. This is accessed// by consumers, so reads and writes must be atomic.tail atomic.Pointer[poolChainElt]}type poolChainElt struct {poolDequeue// next and prev link to the adjacent poolChainElts in this// poolChain.//// next is written atomically by the producer and read// atomically by the consumer. It only transitions from nil to// non-nil.//// prev is written atomically by the consumer and read// atomically by the producer. It only transitions from// non-nil to nil.next, prev atomic.Pointer[poolChainElt]}func ( *poolChain) ( any) {:= .headif == nil {// Initialize the chain.const = 8 // Must be a power of 2= new(poolChainElt).vals = make([]eface, ).head =.tail.Store()}if .pushHead() {return}// The current dequeue is full. Allocate a new one of twice// the size.:= len(.vals) * 2if >= dequeueLimit {// Can't make it any bigger.= dequeueLimit}:= &poolChainElt{}.prev.Store().vals = make([]eface, ).head =.next.Store().pushHead()}func ( *poolChain) () (any, bool) {:= .headfor != nil {if , := .popHead(); {return ,}// There may still be unconsumed elements in the// previous dequeue, so try backing up.= .prev.Load()}return nil, false}func ( *poolChain) () (any, bool) {:= .tail.Load()if == nil {return nil, false}for {// It's important that we load the next pointer// *before* popping the tail. In general, d may be// transiently empty, but if next is non-nil before// the pop and the pop fails, then d is permanently// empty, which is the only condition under which it's// safe to drop d from the chain.:= .next.Load()if , := .popTail(); {return ,}if == nil {// This is the only dequeue. It's empty right// now, but could be pushed to in the future.return nil, false}// The tail of the chain has been drained, so move on// to the next dequeue. Try to drop it from the chain// so the next pop doesn't have to look at the empty// dequeue again.if .tail.CompareAndSwap(, ) {// We won the race. Clear the prev pointer so// the garbage collector can collect the empty// dequeue and so popHead doesn't back up// further than necessary..prev.Store(nil)}=}}
|  | The pages are generated with Golds v0.7.9-preview. (GOOS=linux GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds. |