Source File
sema.go
Belonging Package
runtime
// Copyright 2009 The Go Authors. All rights reserved.// Use of this source code is governed by a BSD-style// license that can be found in the LICENSE file.// Semaphore implementation exposed to Go.// Intended use is provide a sleep and wakeup// primitive that can be used in the contended case// of other synchronization primitives.// Thus it targets the same goal as Linux's futex,// but it has much simpler semantics.//// That is, don't think of these as semaphores.// Think of them as a way to implement sleep and wakeup// such that every sleep is paired with a single wakeup,// even if, due to races, the wakeup happens before the sleep.//// See Mullender and Cox, ``Semaphores in Plan 9,''// https://swtch.com/semaphore.pdfpackage runtimeimport ()// Asynchronous semaphore for sync.Mutex.// A semaRoot holds a balanced tree of sudog with distinct addresses (s.elem).// Each of those sudog may in turn point (through s.waitlink) to a list// of other sudogs waiting on the same address.// The operations on the inner lists of sudogs with the same address// are all O(1). The scanning of the top-level semaRoot list is O(log n),// where n is the number of distinct addresses with goroutines blocked// on them that hash to the given semaRoot.// See golang.org/issue/17953 for a program that worked badly// before we introduced the second level of list, and// BenchmarkSemTable/OneAddrCollision/* for a benchmark that exercises this.type semaRoot struct {lock mutextreap *sudog // root of balanced tree of unique waiters.nwait atomic.Uint32 // Number of waiters. Read w/o the lock.}var semtable semTable// Prime to not correlate with any user patterns.const semTabSize = 251type semTable [semTabSize]struct {root semaRootpad [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte}func ( *semTable) ( *uint32) *semaRoot {return &[(uintptr(unsafe.Pointer())>>3)%semTabSize].root}// sync_runtime_Semacquire should be an internal detail,// but widely used packages access it using linkname.// Notable members of the hall of shame include:// - gvisor.dev/gvisor// - github.com/sagernet/gvisor//// Do not remove or change the type signature.// See go.dev/issue/67401.////go:linkname sync_runtime_Semacquire sync.runtime_Semacquirefunc sync_runtime_Semacquire( *uint32) {semacquire1(, false, semaBlockProfile, 0, waitReasonSemacquire)}//go:linkname poll_runtime_Semacquire internal/poll.runtime_Semacquirefunc poll_runtime_Semacquire( *uint32) {semacquire1(, false, semaBlockProfile, 0, waitReasonSemacquire)}// sync_runtime_Semrelease should be an internal detail,// but widely used packages access it using linkname.// Notable members of the hall of shame include:// - gvisor.dev/gvisor// - github.com/sagernet/gvisor//// Do not remove or change the type signature.// See go.dev/issue/67401.////go:linkname sync_runtime_Semrelease sync.runtime_Semreleasefunc sync_runtime_Semrelease( *uint32, bool, int) {semrelease1(, , )}//go:linkname internal_sync_runtime_SemacquireMutex internal/sync.runtime_SemacquireMutexfunc internal_sync_runtime_SemacquireMutex( *uint32, bool, int) {semacquire1(, , semaBlockProfile|semaMutexProfile, , waitReasonSyncMutexLock)}//go:linkname sync_runtime_SemacquireRWMutexR sync.runtime_SemacquireRWMutexRfunc sync_runtime_SemacquireRWMutexR( *uint32, bool, int) {semacquire1(, , semaBlockProfile|semaMutexProfile, , waitReasonSyncRWMutexRLock)}//go:linkname sync_runtime_SemacquireRWMutex sync.runtime_SemacquireRWMutexfunc sync_runtime_SemacquireRWMutex( *uint32, bool, int) {semacquire1(, , semaBlockProfile|semaMutexProfile, , waitReasonSyncRWMutexLock)}//go:linkname sync_runtime_SemacquireWaitGroup sync.runtime_SemacquireWaitGroupfunc sync_runtime_SemacquireWaitGroup( *uint32, bool) {:= waitReasonSyncWaitGroupWaitif {= waitReasonSynctestWaitGroupWait}semacquire1(, false, semaBlockProfile, 0, )}//go:linkname poll_runtime_Semrelease internal/poll.runtime_Semreleasefunc poll_runtime_Semrelease( *uint32) {semrelease()}//go:linkname internal_sync_runtime_Semrelease internal/sync.runtime_Semreleasefunc internal_sync_runtime_Semrelease( *uint32, bool, int) {semrelease1(, , )}func readyWithTime( *sudog, int) {if .releasetime != 0 {.releasetime = cputicks()}goready(.g, )}type semaProfileFlags intconst (semaBlockProfile semaProfileFlags = 1 << iotasemaMutexProfile)// Called from runtime.func semacquire( *uint32) {semacquire1(, false, 0, 0, waitReasonSemacquire)}func semacquire1( *uint32, bool, semaProfileFlags, int, waitReason) {:= getg()if != .m.curg {throw("semacquire not on the G stack")}// Easy case.if cansemacquire() {return}// Harder case:// increment waiter count// try cansemacquire one more time, return if succeeded// enqueue itself as a waiter// sleep// (waiter descriptor is dequeued by signaler):= acquireSudog():= semtable.rootFor():= int64(0).releasetime = 0.acquiretime = 0.ticket = 0if &semaBlockProfile != 0 && blockprofilerate > 0 {= cputicks().releasetime = -1}if &semaMutexProfile != 0 && mutexprofilerate > 0 {if == 0 {= cputicks()}.acquiretime =}for {lockWithRank(&.lock, lockRankRoot)// Add ourselves to nwait to disable "easy case" in semrelease..nwait.Add(1)// Check cansemacquire to avoid missed wakeup.if cansemacquire() {.nwait.Add(-1)unlock(&.lock)break}// Any semrelease after the cansemacquire knows we're waiting// (we set nwait above), so go to sleep..queue(, , )goparkunlock(&.lock, , traceBlockSync, 4+)if .ticket != 0 || cansemacquire() {break}}if .releasetime > 0 {blockevent(.releasetime-, 3+)}releaseSudog()}func semrelease( *uint32) {semrelease1(, false, 0)}func semrelease1( *uint32, bool, int) {:= semtable.rootFor()atomic.Xadd(, 1)// Easy case: no waiters?// This check must happen after the xadd, to avoid a missed wakeup// (see loop in semacquire).if .nwait.Load() == 0 {return}// Harder case: search for a waiter and wake it.lockWithRank(&.lock, lockRankRoot)if .nwait.Load() == 0 {// The count is already consumed by another goroutine,// so no need to wake up another goroutine.unlock(&.lock)return}, , := .dequeue()if != nil {.nwait.Add(-1)}unlock(&.lock)if != nil { // May be slow or even yield, so unlock first:= .acquiretimeif != 0 {// Charge contention that this (delayed) unlock caused.// If there are N more goroutines waiting beyond the// one that's waking up, charge their delay as well, so that// contention holding up many goroutines shows up as// more costly than contention holding up a single goroutine.// It would take O(N) time to calculate how long each goroutine// has been waiting, so instead we charge avg(head-wait, tail-wait)*N.// head-wait is the longest wait and tail-wait is the shortest.// (When we do a lifo insertion, we preserve this property by// copying the old head's acquiretime into the inserted new head.// In that case the overall average may be slightly high, but that's fine:// the average of the ends is only an approximation to the actual// average anyway.)// The root.dequeue above changed the head and tail acquiretime// to the current time, so the next unlock will not re-count this contention.:= -:=if .waiters != 0 {:= -+= ( + ) / 2 * int64(.waiters)}mutexevent(, 3+)}if .ticket != 0 {throw("corrupted semaphore ticket")}if && cansemacquire() {.ticket = 1}readyWithTime(, 5+)if .ticket == 1 && getg().m.locks == 0 && getg() != getg().m.g0 {// Direct G handoff//// readyWithTime has added the waiter G as runnext in the// current P; we now call the scheduler so that we start running// the waiter G immediately.//// Note that waiter inherits our time slice: this is desirable// to avoid having a highly contended semaphore hog the P// indefinitely. goyield is like Gosched, but it emits a// "preempted" trace event instead and, more importantly, puts// the current G on the local runq instead of the global one.// We only do this in the starving regime (handoff=true), as in// the non-starving case it is possible for a different waiter// to acquire the semaphore while we are yielding/scheduling,// and this would be wasteful. We wait instead to enter starving// regime, and then we start to do direct handoffs of ticket and P.//// See issue 33747 for discussion.//// We don't handoff directly if we're holding locks or on the// system stack, since it's not safe to enter the scheduler.goyield()}}}func cansemacquire( *uint32) bool {for {:= atomic.Load()if == 0 {return false}if atomic.Cas(, , -1) {return true}}}// queue adds s to the blocked goroutines in semaRoot.func ( *semaRoot) ( *uint32, *sudog, bool) {.g = getg().elem = unsafe.Pointer().next = nil.prev = nil.waiters = 0var *sudog:= &.treapfor := *; != nil; = * {if .elem == unsafe.Pointer() {// Already have addr in list.if {// Substitute s in t's place in treap.* =.ticket = .ticket.acquiretime = .acquiretime // preserve head acquiretime as oldest time.parent = .parent.prev = .prev.next = .nextif .prev != nil {.prev.parent =}if .next != nil {.next.parent =}// Add t first in s's wait list..waitlink =.waittail = .waittailif .waittail == nil {.waittail =}.waiters = .waitersif .waiters+1 != 0 {.waiters++}.parent = nil.prev = nil.next = nil.waittail = nil} else {// Add s to end of t's wait list.if .waittail == nil {.waitlink =} else {.waittail.waitlink =}.waittail =.waitlink = nilif .waiters+1 != 0 {.waiters++}}return}=if uintptr(unsafe.Pointer()) < uintptr(.elem) {= &.prev} else {= &.next}}// Add s as new leaf in tree of unique addrs.// The balanced tree is a treap using ticket as the random heap priority.// That is, it is a binary tree ordered according to the elem addresses,// but then among the space of possible binary trees respecting those// addresses, it is kept balanced on average by maintaining a heap ordering// on the ticket: s.ticket <= both s.prev.ticket and s.next.ticket.// https://en.wikipedia.org/wiki/Treap// https://faculty.washington.edu/aragon/pubs/rst89.pdf//// s.ticket compared with zero in couple of places, therefore set lowest bit.// It will not affect treap's quality noticeably..ticket = cheaprand() | 1.parent =* =// Rotate up into tree according to ticket (priority).for .parent != nil && .parent.ticket > .ticket {if .parent.prev == {.rotateRight(.parent)} else {if .parent.next != {panic("semaRoot queue")}.rotateLeft(.parent)}}}// dequeue searches for and finds the first goroutine// in semaRoot blocked on addr.// If the sudog was being profiled, dequeue returns the time// at which it was woken up as now. Otherwise now is 0.// If there are additional entries in the wait list, dequeue// returns tailtime set to the last entry's acquiretime.// Otherwise tailtime is found.acquiretime.func ( *semaRoot) ( *uint32) ( *sudog, , int64) {:= &.treap:= *for ; != nil; = * {if .elem == unsafe.Pointer() {goto}if uintptr(unsafe.Pointer()) < uintptr(.elem) {= &.prev} else {= &.next}}return nil, 0, 0:= int64(0)if .acquiretime != 0 {= cputicks()}if := .waitlink; != nil {// Substitute t, also waiting on addr, for s in root tree of unique addrs.* =.ticket = .ticket.parent = .parent.prev = .previf .prev != nil {.prev.parent =}.next = .nextif .next != nil {.next.parent =}if .waitlink != nil {.waittail = .waittail} else {.waittail = nil}.waiters = .waitersif .waiters > 1 {.waiters--}// Set head and tail acquire time to 'now',// because the caller will take care of charging// the delays before now for all entries in the list..acquiretime == .waittail.acquiretime.waittail.acquiretime =.waitlink = nil.waittail = nil} else {// Rotate s down to be leaf of tree for removal, respecting priorities.for .next != nil || .prev != nil {if .next == nil || .prev != nil && .prev.ticket < .next.ticket {.rotateRight()} else {.rotateLeft()}}// Remove s, now a leaf.if .parent != nil {if .parent.prev == {.parent.prev = nil} else {.parent.next = nil}} else {.treap = nil}= .acquiretime}.parent = nil.elem = nil.next = nil.prev = nil.ticket = 0return , ,}// rotateLeft rotates the tree rooted at node x.// turning (x a (y b c)) into (y (x a b) c).func ( *semaRoot) ( *sudog) {// p -> (x a (y b c)):= .parent:= .next:= .prev.prev =.parent =.next =if != nil {.parent =}.parent =if == nil {.treap =} else if .prev == {.prev =} else {if .next != {throw("semaRoot rotateLeft")}.next =}}// rotateRight rotates the tree rooted at node y.// turning (y (x a b) c) into (x a (y b c)).func ( *semaRoot) ( *sudog) {// p -> (y (x a b) c):= .parent:= .prev:= .next.next =.parent =.prev =if != nil {.parent =}.parent =if == nil {.treap =} else if .prev == {.prev =} else {if .next != {throw("semaRoot rotateRight")}.next =}}// notifyList is a ticket-based notification list used to implement sync.Cond.//// It must be kept in sync with the sync package.type notifyList struct {// wait is the ticket number of the next waiter. It is atomically// incremented outside the lock.wait atomic.Uint32// notify is the ticket number of the next waiter to be notified. It can// be read outside the lock, but is only written to with lock held.//// Both wait & notify can wrap around, and such cases will be correctly// handled as long as their "unwrapped" difference is bounded by 2^31.// For this not to be the case, we'd need to have 2^31+ goroutines// blocked on the same condvar, which is currently not possible.notify uint32// List of parked waiters.lock mutexhead *sudogtail *sudog}// less checks if a < b, considering a & b running counts that may overflow the// 32-bit range, and that their "unwrapped" difference is always less than 2^31.func less(, uint32) bool {return int32(-) < 0}// notifyListAdd adds the caller to a notify list such that it can receive// notifications. The caller must eventually call notifyListWait to wait for// such a notification, passing the returned ticket number.////go:linkname notifyListAdd sync.runtime_notifyListAddfunc notifyListAdd( *notifyList) uint32 {// This may be called concurrently, for example, when called from// sync.Cond.Wait while holding a RWMutex in read mode.return .wait.Add(1) - 1}// notifyListWait waits for a notification. If one has been sent since// notifyListAdd was called, it returns immediately. Otherwise, it blocks.////go:linkname notifyListWait sync.runtime_notifyListWaitfunc notifyListWait( *notifyList, uint32) {lockWithRank(&.lock, lockRankNotifyList)// Return right away if this ticket has already been notified.if less(, .notify) {unlock(&.lock)return}// Enqueue itself.:= acquireSudog().g = getg().ticket =.releasetime = 0:= int64(0)if blockprofilerate > 0 {= cputicks().releasetime = -1}if .tail == nil {.head =} else {.tail.next =}.tail =goparkunlock(&.lock, waitReasonSyncCondWait, traceBlockCondWait, 3)if != 0 {blockevent(.releasetime-, 2)}releaseSudog()}// notifyListNotifyAll notifies all entries in the list.////go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAllfunc notifyListNotifyAll( *notifyList) {// Fast-path: if there are no new waiters since the last notification// we don't need to acquire the lock.if .wait.Load() == atomic.Load(&.notify) {return}// Pull the list out into a local variable, waiters will be readied// outside the lock.lockWithRank(&.lock, lockRankNotifyList):= .head.head = nil.tail = nil// Update the next ticket to be notified. We can set it to the current// value of wait because any previous waiters are already in the list// or will notice that they have already been notified when trying to// add themselves to the list.atomic.Store(&.notify, .wait.Load())unlock(&.lock)// Go through the local list and ready all waiters.for != nil {:= .next.next = nilif .g.bubble != nil && getg().bubble != .g.bubble {println("semaphore wake of synctest goroutine", .g.goid, "from outside bubble")fatal("semaphore wake of synctest goroutine from outside bubble")}readyWithTime(, 4)=}}// notifyListNotifyOne notifies one entry in the list.////go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOnefunc notifyListNotifyOne( *notifyList) {// Fast-path: if there are no new waiters since the last notification// we don't need to acquire the lock at all.if .wait.Load() == atomic.Load(&.notify) {return}lockWithRank(&.lock, lockRankNotifyList)// Re-check under the lock if we need to do anything.:= .notifyif == .wait.Load() {unlock(&.lock)return}// Update the next notify ticket number.atomic.Store(&.notify, +1)// Try to find the g that needs to be notified.// If it hasn't made it to the list yet we won't find it,// but it won't park itself once it sees the new notify number.//// This scan looks linear but essentially always stops quickly.// Because g's queue separately from taking numbers,// there may be minor reorderings in the list, but we// expect the g we're looking for to be near the front.// The g has others in front of it on the list only to the// extent that it lost the race, so the iteration will not// be too long. This applies even when the g is missing:// it hasn't yet gotten to sleep and has lost the race to// the (few) other g's that we find on the list.for , := (*sudog)(nil), .head; != nil; , = , .next {if .ticket == {:= .nextif != nil {.next =} else {.head =}if == nil {.tail =}unlock(&.lock).next = nilif .g.bubble != nil && getg().bubble != .g.bubble {println("semaphore wake of synctest goroutine", .g.goid, "from outside bubble")fatal("semaphore wake of synctest goroutine from outside bubble")}readyWithTime(, 4)return}}unlock(&.lock)}//go:linkname notifyListCheck sync.runtime_notifyListCheckfunc notifyListCheck( uintptr) {if != unsafe.Sizeof(notifyList{}) {print("runtime: bad notifyList size - sync=", , " runtime=", unsafe.Sizeof(notifyList{}), "\n")throw("bad notifyList size")}}//go:linkname internal_sync_nanotime internal/sync.runtime_nanotimefunc internal_sync_nanotime() int64 {return nanotime()}
![]() |
The pages are generated with Golds v0.7.9-preview. (GOOS=linux GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds. |