// Copyright 2013 The Go Authors. All rights reserved.// Use of this source code is governed by a BSD-style// license that can be found in the LICENSE file.package pollimport// fdMutex is a specialized synchronization primitive that manages// lifetime of an fd and serializes access to Read, Write and Close// methods on FD.type fdMutex struct { state uint64 rsema uint32 wsema uint32}// fdMutex.state is organized as follows:// 1 bit - whether FD is closed, if set all subsequent lock operations will fail.// 1 bit - lock for read operations.// 1 bit - lock for write operations.// 20 bits - total number of references (read+write+misc).// 20 bits - number of outstanding read waiters.// 20 bits - number of outstanding write waiters.const ( mutexClosed = 1 << 0 mutexRLock = 1 << 1 mutexWLock = 1 << 2 mutexRef = 1 << 3 mutexRefMask = (1<<20 - 1) << 3 mutexRWait = 1 << 23 mutexRMask = (1<<20 - 1) << 23 mutexWWait = 1 << 43 mutexWMask = (1<<20 - 1) << 43)const overflowMsg = "too many concurrent operations on a single file or socket (max 1048575)"// Read operations must do rwlock(true)/rwunlock(true).//// Write operations must do rwlock(false)/rwunlock(false).//// Misc operations must do incref/decref.// Misc operations include functions like setsockopt and setDeadline.// They need to use incref/decref to ensure that they operate on the// correct fd in presence of a concurrent close call (otherwise fd can// be closed under their feet).//// Close operations must do increfAndClose/decref.// incref adds a reference to mu.// It reports whether mu is available for reading or writing.func ( *fdMutex) () bool {for { := atomic.LoadUint64(&.state)if &mutexClosed != 0 {returnfalse } := + mutexRefif &mutexRefMask == 0 {panic(overflowMsg) }ifatomic.CompareAndSwapUint64(&.state, , ) {returntrue } }}// increfAndClose sets the state of mu to closed.// It returns false if the file was already closed.func ( *fdMutex) () bool {for { := atomic.LoadUint64(&.state)if &mutexClosed != 0 {returnfalse }// Mark as closed and acquire a reference. := ( | mutexClosed) + mutexRefif &mutexRefMask == 0 {panic(overflowMsg) }// Remove all read and write waiters. &^= mutexRMask | mutexWMaskifatomic.CompareAndSwapUint64(&.state, , ) {// Wake all read and write waiters, // they will observe closed flag after wakeup.for &mutexRMask != 0 { -= mutexRWaitruntime_Semrelease(&.rsema) }for &mutexWMask != 0 { -= mutexWWaitruntime_Semrelease(&.wsema) }returntrue } }}// decref removes a reference from mu.// It reports whether there is no remaining reference.func ( *fdMutex) () bool {for { := atomic.LoadUint64(&.state)if &mutexRefMask == 0 {panic("inconsistent poll.fdMutex") } := - mutexRefifatomic.CompareAndSwapUint64(&.state, , ) {return &(mutexClosed|mutexRefMask) == mutexClosed } }}// lock adds a reference to mu and locks mu.// It reports whether mu is available for reading or writing.func ( *fdMutex) ( bool) bool {var , , uint64var *uint32if { = mutexRLock = mutexRWait = mutexRMask = &.rsema } else { = mutexWLock = mutexWWait = mutexWMask = &.wsema }for { := atomic.LoadUint64(&.state)if &mutexClosed != 0 {returnfalse }varuint64if & == 0 {// Lock is free, acquire it. = ( | ) + mutexRefif &mutexRefMask == 0 {panic(overflowMsg) } } else {// Wait for lock. = + if & == 0 {panic(overflowMsg) } }ifatomic.CompareAndSwapUint64(&.state, , ) {if & == 0 {returntrue }runtime_Semacquire()// The signaller has subtracted mutexWait. } }}// unlock removes a reference from mu and unlocks mu.// It reports whether there is no remaining reference.func ( *fdMutex) ( bool) bool {var , , uint64var *uint32if { = mutexRLock = mutexRWait = mutexRMask = &.rsema } else { = mutexWLock = mutexWWait = mutexWMask = &.wsema }for { := atomic.LoadUint64(&.state)if & == 0 || &mutexRefMask == 0 {panic("inconsistent poll.fdMutex") }// Drop lock, drop reference and wake read waiter if present. := ( &^ ) - mutexRefif & != 0 { -= }ifatomic.CompareAndSwapUint64(&.state, , ) {if & != 0 {runtime_Semrelease() }return &(mutexClosed|mutexRefMask) == mutexClosed } }}// Implemented in runtime package.func runtime_Semacquire( *uint32)func runtime_Semrelease( *uint32)// incref adds a reference to fd.// It returns an error when fd cannot be used.func ( *FD) () error {if !.fdmu.incref() {returnerrClosing(.isFile) }returnnil}// decref removes a reference from fd.// It also closes fd when the state of fd is set to closed and there// is no remaining reference.func ( *FD) () error {if .fdmu.decref() {return .destroy() }returnnil}// readLock adds a reference to fd and locks fd for reading.// It returns an error when fd cannot be used for reading.func ( *FD) () error {if !.fdmu.rwlock(true) {returnerrClosing(.isFile) }returnnil}// readUnlock removes a reference from fd and unlocks fd for reading.// It also closes fd when the state of fd is set to closed and there// is no remaining reference.func ( *FD) () {if .fdmu.rwunlock(true) { .destroy() }}// writeLock adds a reference to fd and locks fd for writing.// It returns an error when fd cannot be used for writing.func ( *FD) () error {if !.fdmu.rwlock(false) {returnerrClosing(.isFile) }returnnil}// writeUnlock removes a reference from fd and unlocks fd for writing.// It also closes fd when the state of fd is set to closed and there// is no remaining reference.func ( *FD) () {if .fdmu.rwunlock(false) { .destroy() }}
The pages are generated with Goldsv0.6.9-preview. (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu.
PR and bug reports are welcome and can be submitted to the issue list.
Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds.