Source File
chan.go
Belonging Package
runtime
// Copyright 2014 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package runtime
// This file contains the implementation of Go channels.
// Invariants:
// At least one of c.sendq and c.recvq is empty,
// except for the case of an unbuffered channel with a single goroutine
// blocked on it for both sending and receiving using a select statement,
// in which case the length of c.sendq and c.recvq is limited only by the
// size of the select statement.
//
// For buffered channels, also:
// c.qcount > 0 implies that c.recvq is empty.
// c.qcount < c.dataqsiz implies that c.sendq is empty.
import (
)
const (
maxAlign = 8
hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
debugChan = false
)
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
timer *timer // timer feeding this chan
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
type waitq struct {
first *sudog
last *sudog
}
//go:linkname reflect_makechan reflect.makechan
func reflect_makechan( *chantype, int) *hchan {
return makechan(, )
}
func makechan64( *chantype, int64) *hchan {
if int64(int()) != {
panic(plainError("makechan: size out of range"))
}
return makechan(, int())
}
func makechan( *chantype, int) *hchan {
:= .Elem
// compiler checks this but be safe.
if .Size_ >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || .Align_ > maxAlign {
throw("makechan: bad alignment")
}
, := math.MulUintptr(.Size_, uintptr())
if || > maxAlloc-hchanSize || < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var *hchan
switch {
case == 0:
// Queue or element size is zero.
= (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
.buf = .raceaddr()
case !.Pointers():
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
= (*hchan)(mallocgc(hchanSize+, nil, true))
.buf = add(unsafe.Pointer(), hchanSize)
default:
// Elements contain pointers.
= new(hchan)
.buf = mallocgc(, , true)
}
.elemsize = uint16(.Size_)
.elemtype =
.dataqsiz = uint()
lockInit(&.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", , "; elemsize=", .Size_, "; dataqsiz=", , "\n")
}
return
}
// chanbuf(c, i) is pointer to the i'th slot in the buffer.
//
// chanbuf should be an internal detail,
// but widely used packages access it using linkname.
// Notable members of the hall of shame include:
// - github.com/fjl/memsize
//
// Do not remove or change the type signature.
// See go.dev/issue/67401.
//
//go:linkname chanbuf
func chanbuf( *hchan, uint) unsafe.Pointer {
return add(.buf, uintptr()*uintptr(.elemsize))
}
// full reports whether a send on c would block (that is, the channel is full).
// It uses a single word-sized read of mutable state, so although
// the answer is instantaneously true, the correct answer may have changed
// by the time the calling function receives the return value.
func full( *hchan) bool {
// c.dataqsiz is immutable (never written after the channel is created)
// so it is safe to read at any time during channel operation.
if .dataqsiz == 0 {
// Assumes that a pointer read is relaxed-atomic.
return .recvq.first == nil
}
// Assumes that a uint read is relaxed-atomic.
return .qcount == .dataqsiz
}
// entry point for c <- x from compiled code.
//
//go:nosplit
func chansend1( *hchan, unsafe.Pointer) {
chansend(, , true, getcallerpc())
}
/*
* generic single channel send/recv
* If block is not nil,
* then the protocol will not
* sleep but return if it could
* not complete.
*
* sleep can wake up with g.param == nil
* when a channel involved in the sleep has
* been closed. it is easiest to loop and re-run
* the operation; we'll see that it's now closed.
*/
func chansend( *hchan, unsafe.Pointer, bool, uintptr) bool {
if == nil {
if ! {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", , "\n")
}
if raceenabled {
racereadpc(.raceaddr(), , abi.FuncPCABIInternal())
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not closed, we observe that the channel is
// not ready for sending. Each of these observations is a single word-sized read
// (first c.closed and second full()).
// Because a closed channel cannot transition from 'ready for sending' to
// 'not ready for sending', even if the channel is closed between the two observations,
// they imply a moment between the two when the channel was both not yet closed
// and not ready for sending. We behave as if we observed the channel at that moment,
// and report that the send cannot proceed.
//
// It is okay if the reads are reordered here: if we observe that the channel is not
// ready for sending and then observe that it is not closed, that implies that the
// channel wasn't closed during the first observation. However, nothing here
// guarantees forward progress. We rely on the side effects of lock release in
// chanrecv() and closechan() to update this thread's view of c.closed and full().
if ! && .closed == 0 && full() {
return false
}
var int64
if blockprofilerate > 0 {
= cputicks()
}
lock(&.lock)
if .closed != 0 {
unlock(&.lock)
panic(plainError("send on closed channel"))
}
if := .recvq.dequeue(); != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(, , , func() { unlock(&.lock) }, 3)
return true
}
if .qcount < .dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
:= chanbuf(, .sendx)
if raceenabled {
racenotify(, .sendx, nil)
}
typedmemmove(.elemtype, , )
.sendx++
if .sendx == .dataqsiz {
.sendx = 0
}
.qcount++
unlock(&.lock)
return true
}
if ! {
unlock(&.lock)
return false
}
// Block on the channel. Some receiver will complete our operation for us.
:= getg()
:= acquireSudog()
.releasetime = 0
if != 0 {
.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
.elem =
.waitlink = nil
.g =
.isSelect = false
.c =
.waiting =
.param = nil
.sendq.enqueue()
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&.lock), waitReasonChanSend, traceBlockChanSend, 2)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive()
// someone woke us up.
if != .waiting {
throw("G waiting list is corrupted")
}
.waiting = nil
.activeStackChans = false
:= !.success
.param = nil
if .releasetime > 0 {
blockevent(.releasetime-, 2)
}
.c = nil
releaseSudog()
if {
if .closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
}
// send processes a send operation on an empty channel c.
// The value ep sent by the sender is copied to the receiver sg.
// The receiver is then woken up to go on its merry way.
// Channel c must be empty and locked. send unlocks c with unlockf.
// sg must already be dequeued from c.
// ep must be non-nil and point to the heap or the caller's stack.
func send( *hchan, *sudog, unsafe.Pointer, func(), int) {
if raceenabled {
if .dataqsiz == 0 {
racesync(, )
} else {
// Pretend we go through the buffer, even though
// we copy directly. Note that we need to increment
// the head/tail locations only when raceenabled.
racenotify(, .recvx, nil)
racenotify(, .recvx, )
.recvx++
if .recvx == .dataqsiz {
.recvx = 0
}
.sendx = .recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
}
if .elem != nil {
sendDirect(.elemtype, , )
.elem = nil
}
:= .g
()
.param = unsafe.Pointer()
.success = true
if .releasetime != 0 {
.releasetime = cputicks()
}
goready(, +1)
}
// timerchandrain removes all elements in channel c's buffer.
// It reports whether any elements were removed.
// Because it is only intended for timers, it does not
// handle waiting senders at all (all timer channels
// use non-blocking sends to fill the buffer).
func timerchandrain( *hchan) bool {
// Note: Cannot use empty(c) because we are called
// while holding c.timer.sendLock, and empty(c) will
// call c.timer.maybeRunChan, which will deadlock.
// We are emptying the channel, so we only care about
// the count, not about potentially filling it up.
if atomic.Loaduint(&.qcount) == 0 {
return false
}
lock(&.lock)
:= false
for .qcount > 0 {
= true
typedmemclr(.elemtype, chanbuf(, .recvx))
.recvx++
if .recvx == .dataqsiz {
.recvx = 0
}
.qcount--
}
unlock(&.lock)
return
}
// Sends and receives on unbuffered or empty-buffered channels are the
// only operations where one running goroutine writes to the stack of
// another running goroutine. The GC assumes that stack writes only
// happen when the goroutine is running and are only done by that
// goroutine. Using a write barrier is sufficient to make up for
// violating that assumption, but the write barrier has to work.
// typedmemmove will call bulkBarrierPreWrite, but the target bytes
// are not in the heap, so that will not help. We arrange to call
// memmove and typeBitsBulkBarrier instead.
func sendDirect( *_type, *sudog, unsafe.Pointer) {
// src is on our stack, dst is a slot on another stack.
// Once we read sg.elem out of sg, it will no longer
// be updated if the destination's stack gets copied (shrunk).
// So make sure that no preemption points can happen between read & use.
:= .elem
typeBitsBulkBarrier(, uintptr(), uintptr(), .Size_)
// No need for cgo write barrier checks because dst is always
// Go memory.
memmove(, , .Size_)
}
func recvDirect( *_type, *sudog, unsafe.Pointer) {
// dst is on our stack or the heap, src is on another stack.
// The channel is locked, so src will not move during this
// operation.
:= .elem
typeBitsBulkBarrier(, uintptr(), uintptr(), .Size_)
memmove(, , .Size_)
}
func closechan( *hchan) {
if == nil {
panic(plainError("close of nil channel"))
}
lock(&.lock)
if .closed != 0 {
unlock(&.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
:= getcallerpc()
racewritepc(.raceaddr(), , abi.FuncPCABIInternal())
racerelease(.raceaddr())
}
.closed = 1
var gList
// release all readers
for {
:= .recvq.dequeue()
if == nil {
break
}
if .elem != nil {
typedmemclr(.elemtype, .elem)
.elem = nil
}
if .releasetime != 0 {
.releasetime = cputicks()
}
:= .g
.param = unsafe.Pointer()
.success = false
if raceenabled {
raceacquireg(, .raceaddr())
}
.push()
}
// release all writers (they will panic)
for {
:= .sendq.dequeue()
if == nil {
break
}
.elem = nil
if .releasetime != 0 {
.releasetime = cputicks()
}
:= .g
.param = unsafe.Pointer()
.success = false
if raceenabled {
raceacquireg(, .raceaddr())
}
.push()
}
unlock(&.lock)
// Ready all Gs now that we've dropped the channel lock.
for !.empty() {
:= .pop()
.schedlink = 0
goready(, 3)
}
}
// empty reports whether a read from c would block (that is, the channel is
// empty). It is atomically correct and sequentially consistent at the moment
// it returns, but since the channel is unlocked, the channel may become
// non-empty immediately afterward.
func empty( *hchan) bool {
// c.dataqsiz is immutable.
if .dataqsiz == 0 {
return atomic.Loadp(unsafe.Pointer(&.sendq.first)) == nil
}
// c.timer is also immutable (it is set after make(chan) but before any channel operations).
// All timer channels have dataqsiz > 0.
if .timer != nil {
.timer.maybeRunChan()
}
return atomic.Loaduint(&.qcount) == 0
}
// entry points for <- c from compiled code.
//
//go:nosplit
func chanrecv1( *hchan, unsafe.Pointer) {
chanrecv(, , true)
}
//go:nosplit
func chanrecv2( *hchan, unsafe.Pointer) ( bool) {
_, = chanrecv(, , true)
return
}
// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv( *hchan, unsafe.Pointer, bool) (, bool) {
// raceenabled: don't need to check ep, as it is always on the stack
// or is new memory allocated by reflect.
if debugChan {
print("chanrecv: chan=", , "\n")
}
if == nil {
if ! {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
throw("unreachable")
}
if .timer != nil {
.timer.maybeRunChan()
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
if ! && empty() {
// After observing that the channel is not ready for receiving, we observe whether the
// channel is closed.
//
// Reordering of these checks could lead to incorrect behavior when racing with a close.
// For example, if the channel was open and not empty, was closed, and then drained,
// reordered reads could incorrectly indicate "open and empty". To prevent reordering,
// we use atomic loads for both checks, and rely on emptying and closing to happen in
// separate critical sections under the same lock. This assumption fails when closing
// an unbuffered channel with a blocked send, but that is an error condition anyway.
if atomic.Load(&.closed) == 0 {
// Because a channel cannot be reopened, the later observation of the channel
// being not closed implies that it was also not closed at the moment of the
// first observation. We behave as if we observed the channel at that moment
// and report that the receive cannot proceed.
return
}
// The channel is irreversibly closed. Re-check whether the channel has any pending data
// to receive, which could have arrived between the empty and closed checks above.
// Sequential consistency is also required here, when racing with such a send.
if empty() {
// The channel is irreversibly closed and empty.
if raceenabled {
raceacquire(.raceaddr())
}
if != nil {
typedmemclr(.elemtype, )
}
return true, false
}
}
var int64
if blockprofilerate > 0 {
= cputicks()
}
lock(&.lock)
if .closed != 0 {
if .qcount == 0 {
if raceenabled {
raceacquire(.raceaddr())
}
unlock(&.lock)
if != nil {
typedmemclr(.elemtype, )
}
return true, false
}
// The channel has been closed, but the channel's buffer have data.
} else {
// Just found waiting sender with not closed.
if := .sendq.dequeue(); != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(, , , func() { unlock(&.lock) }, 3)
return true, true
}
}
if .qcount > 0 {
// Receive directly from queue
:= chanbuf(, .recvx)
if raceenabled {
racenotify(, .recvx, nil)
}
if != nil {
typedmemmove(.elemtype, , )
}
typedmemclr(.elemtype, )
.recvx++
if .recvx == .dataqsiz {
.recvx = 0
}
.qcount--
unlock(&.lock)
return true, true
}
if ! {
unlock(&.lock)
return false, false
}
// no sender available: block on this channel.
:= getg()
:= acquireSudog()
.releasetime = 0
if != 0 {
.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
.elem =
.waitlink = nil
.waiting =
.g =
.isSelect = false
.c =
.param = nil
.recvq.enqueue()
if .timer != nil {
blockTimerChan()
}
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
// someone woke us up
if != .waiting {
throw("G waiting list is corrupted")
}
if .timer != nil {
unblockTimerChan()
}
.waiting = nil
.activeStackChans = false
if .releasetime > 0 {
blockevent(.releasetime-, 2)
}
:= .success
.param = nil
.c = nil
releaseSudog()
return true,
}
// recv processes a receive operation on a full channel c.
// There are 2 parts:
// 1. The value sent by the sender sg is put into the channel
// and the sender is woken up to go on its merry way.
// 2. The value received by the receiver (the current G) is
// written to ep.
//
// For synchronous channels, both values are the same.
// For asynchronous channels, the receiver gets its data from
// the channel buffer and the sender's data is put in the
// channel buffer.
// Channel c must be full and locked. recv unlocks c with unlockf.
// sg must already be dequeued from c.
// A non-nil ep must point to the heap or the caller's stack.
func recv( *hchan, *sudog, unsafe.Pointer, func(), int) {
if .dataqsiz == 0 {
if raceenabled {
racesync(, )
}
if != nil {
// copy data from sender
recvDirect(.elemtype, , )
}
} else {
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
:= chanbuf(, .recvx)
if raceenabled {
racenotify(, .recvx, nil)
racenotify(, .recvx, )
}
// copy data from queue to receiver
if != nil {
typedmemmove(.elemtype, , )
}
// copy data from sender to queue
typedmemmove(.elemtype, , .elem)
.recvx++
if .recvx == .dataqsiz {
.recvx = 0
}
.sendx = .recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
.elem = nil
:= .g
()
.param = unsafe.Pointer()
.success = true
if .releasetime != 0 {
.releasetime = cputicks()
}
goready(, +1)
}
func chanparkcommit( *g, unsafe.Pointer) bool {
// There are unlocked sudogs that point into gp's stack. Stack
// copying must lock the channels of those sudogs.
// Set activeStackChans here instead of before we try parking
// because we could self-deadlock in stack growth on the
// channel lock.
.activeStackChans = true
// Mark that it's safe for stack shrinking to occur now,
// because any thread acquiring this G's stack for shrinking
// is guaranteed to observe activeStackChans after this store.
.parkingOnChan.Store(false)
// Make sure we unlock after setting activeStackChans and
// unsetting parkingOnChan. The moment we unlock chanLock
// we risk gp getting readied by a channel operation and
// so gp could continue running before everything before
// the unlock is visible (even to gp itself).
unlock((*mutex)())
return true
}
// compiler implements
//
// select {
// case c <- v:
// ... foo
// default:
// ... bar
// }
//
// as
//
// if selectnbsend(c, v) {
// ... foo
// } else {
// ... bar
// }
func selectnbsend( *hchan, unsafe.Pointer) ( bool) {
return chansend(, , false, getcallerpc())
}
// compiler implements
//
// select {
// case v, ok = <-c:
// ... foo
// default:
// ... bar
// }
//
// as
//
// if selected, ok = selectnbrecv(&v, c); selected {
// ... foo
// } else {
// ... bar
// }
func selectnbrecv( unsafe.Pointer, *hchan) (, bool) {
return chanrecv(, , false)
}
//go:linkname reflect_chansend reflect.chansend0
func reflect_chansend( *hchan, unsafe.Pointer, bool) ( bool) {
return chansend(, , !, getcallerpc())
}
//go:linkname reflect_chanrecv reflect.chanrecv
func reflect_chanrecv( *hchan, bool, unsafe.Pointer) ( bool, bool) {
return chanrecv(, , !)
}
func chanlen( *hchan) int {
if == nil {
return 0
}
:= debug.asynctimerchan.Load() != 0
if .timer != nil && {
.timer.maybeRunChan()
}
if .timer != nil && ! {
// timer channels have a buffered implementation
// but present to users as unbuffered, so that we can
// undo sends without users noticing.
return 0
}
return int(.qcount)
}
func chancap( *hchan) int {
if == nil {
return 0
}
if .timer != nil {
:= debug.asynctimerchan.Load() != 0
if {
return int(.dataqsiz)
}
// timer channels have a buffered implementation
// but present to users as unbuffered, so that we can
// undo sends without users noticing.
return 0
}
return int(.dataqsiz)
}
//go:linkname reflect_chanlen reflect.chanlen
func reflect_chanlen( *hchan) int {
return chanlen()
}
//go:linkname reflectlite_chanlen internal/reflectlite.chanlen
func reflectlite_chanlen( *hchan) int {
return chanlen()
}
//go:linkname reflect_chancap reflect.chancap
func reflect_chancap( *hchan) int {
return chancap()
}
//go:linkname reflect_chanclose reflect.chanclose
func reflect_chanclose( *hchan) {
closechan()
}
func ( *waitq) ( *sudog) {
.next = nil
:= .last
if == nil {
.prev = nil
.first =
.last =
return
}
.prev =
.next =
.last =
}
func ( *waitq) () *sudog {
for {
:= .first
if == nil {
return nil
}
:= .next
if == nil {
.first = nil
.last = nil
} else {
.prev = nil
.first =
.next = nil // mark as removed (see dequeueSudoG)
}
// if a goroutine was put on this queue because of a
// select, there is a small window between the goroutine
// being woken up by a different case and it grabbing the
// channel locks. Once it has the lock
// it removes itself from the queue, so we won't see it after that.
// We use a flag in the G struct to tell us when someone
// else has won the race to signal this goroutine but the goroutine
// hasn't removed itself from the queue yet.
if .isSelect && !.g.selectDone.CompareAndSwap(0, 1) {
continue
}
return
}
}
func ( *hchan) () unsafe.Pointer {
// Treat read-like and write-like operations on the channel to
// happen at this address. Avoid using the address of qcount
// or dataqsiz, because the len() and cap() builtins read
// those addresses, and we don't want them racing with
// operations like close().
return unsafe.Pointer(&.buf)
}
func racesync( *hchan, *sudog) {
racerelease(chanbuf(, 0))
raceacquireg(.g, chanbuf(, 0))
racereleaseg(.g, chanbuf(, 0))
raceacquire(chanbuf(, 0))
}
// Notify the race detector of a send or receive involving buffer entry idx
// and a channel c or its communicating partner sg.
// This function handles the special case of c.elemsize==0.
func racenotify( *hchan, uint, *sudog) {
// We could have passed the unsafe.Pointer corresponding to entry idx
// instead of idx itself. However, in a future version of this function,
// we can use idx to better handle the case of elemsize==0.
// A future improvement to the detector is to call TSan with c and idx:
// this way, Go will continue to not allocating buffer entries for channels
// of elemsize==0, yet the race detector can be made to handle multiple
// sync objects underneath the hood (one sync object per idx)
:= chanbuf(, )
// When elemsize==0, we don't allocate a full buffer for the channel.
// Instead of individual buffer entries, the race detector uses the
// c.buf as the only buffer entry. This simplification prevents us from
// following the memory model's happens-before rules (rules that are
// implemented in racereleaseacquire). Instead, we accumulate happens-before
// information in the synchronization object associated with c.buf.
if .elemsize == 0 {
if == nil {
raceacquire()
racerelease()
} else {
raceacquireg(.g, )
racereleaseg(.g, )
}
} else {
if == nil {
racereleaseacquire()
} else {
racereleaseacquireg(.g, )
}
}
}
The pages are generated with Golds v0.7.0-preview. (GOOS=linux GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds. |