// Copyright 2014 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package runtime

// This file contains the implementation of Go channels.

// Invariants:
//  At least one of c.sendq and c.recvq is empty,
//  except for the case of an unbuffered channel with a single goroutine
//  blocked on it for both sending and receiving using a select statement,
//  in which case the length of c.sendq and c.recvq is limited only by the
//  size of the select statement.
//
// For buffered channels, also:
//  c.qcount > 0 implies that c.recvq is empty.
//  c.qcount < c.dataqsiz implies that c.sendq is empty.

import (
	
	
	
)

const (
	maxAlign  = 8
	hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
	debugChan = false
)

type hchan struct {
	qcount   uint           // total data in the queue
	dataqsiz uint           // size of the circular queue
	buf      unsafe.Pointer // points to an array of dataqsiz elements
	elemsize uint16
	closed   uint32
	elemtype *_type // element type
	sendx    uint   // send index
	recvx    uint   // receive index
	recvq    waitq  // list of recv waiters
	sendq    waitq  // list of send waiters

	// lock protects all fields in hchan, as well as several
	// fields in sudogs blocked on this channel.
	//
	// Do not change another G's status while holding this lock
	// (in particular, do not ready a G), as this can deadlock
	// with stack shrinking.
	lock mutex
}

type waitq struct {
	first *sudog
	last  *sudog
}

//go:linkname reflect_makechan reflect.makechan
func reflect_makechan( *chantype,  int) *hchan {
	return makechan(, )
}

func makechan64( *chantype,  int64) *hchan {
	if int64(int()) !=  {
		panic(plainError("makechan: size out of range"))
	}

	return makechan(, int())
}

func makechan( *chantype,  int) *hchan {
	 := .elem

	// compiler checks this but be safe.
	if .size >= 1<<16 {
		throw("makechan: invalid channel element type")
	}
	if hchanSize%maxAlign != 0 || .align > maxAlign {
		throw("makechan: bad alignment")
	}

	,  := math.MulUintptr(.size, uintptr())
	if  ||  > maxAlloc-hchanSize ||  < 0 {
		panic(plainError("makechan: size out of range"))
	}

	// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
	// buf points into the same allocation, elemtype is persistent.
	// SudoG's are referenced from their owning thread so they can't be collected.
	// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
	var  *hchan
	switch {
	case  == 0:
		// Queue or element size is zero.
		 = (*hchan)(mallocgc(hchanSize, nil, true))
		// Race detector uses this location for synchronization.
		.buf = .raceaddr()
	case .ptrdata == 0:
		// Elements do not contain pointers.
		// Allocate hchan and buf in one call.
		 = (*hchan)(mallocgc(hchanSize+, nil, true))
		.buf = add(unsafe.Pointer(), hchanSize)
	default:
		// Elements contain pointers.
		 = new(hchan)
		.buf = mallocgc(, , true)
	}

	.elemsize = uint16(.size)
	.elemtype = 
	.dataqsiz = uint()
	lockInit(&.lock, lockRankHchan)

	if debugChan {
		print("makechan: chan=", , "; elemsize=", .size, "; dataqsiz=", , "\n")
	}
	return 
}

// chanbuf(c, i) is pointer to the i'th slot in the buffer.
func chanbuf( *hchan,  uint) unsafe.Pointer {
	return add(.buf, uintptr()*uintptr(.elemsize))
}

// full reports whether a send on c would block (that is, the channel is full).
// It uses a single word-sized read of mutable state, so although
// the answer is instantaneously true, the correct answer may have changed
// by the time the calling function receives the return value.
func full( *hchan) bool {
	// c.dataqsiz is immutable (never written after the channel is created)
	// so it is safe to read at any time during channel operation.
	if .dataqsiz == 0 {
		// Assumes that a pointer read is relaxed-atomic.
		return .recvq.first == nil
	}
	// Assumes that a uint read is relaxed-atomic.
	return .qcount == .dataqsiz
}

// entry point for c <- x from compiled code
//go:nosplit
func chansend1( *hchan,  unsafe.Pointer) {
	chansend(, , true, getcallerpc())
}

/*
 * generic single channel send/recv
 * If block is not nil,
 * then the protocol will not
 * sleep but return if it could
 * not complete.
 *
 * sleep can wake up with g.param == nil
 * when a channel involved in the sleep has
 * been closed.  it is easiest to loop and re-run
 * the operation; we'll see that it's now closed.
 */
func chansend( *hchan,  unsafe.Pointer,  bool,  uintptr) bool {
	if  == nil {
		if ! {
			return false
		}
		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	if debugChan {
		print("chansend: chan=", , "\n")
	}

	if raceenabled {
		racereadpc(.raceaddr(), , funcPC())
	}

	// Fast path: check for failed non-blocking operation without acquiring the lock.
	//
	// After observing that the channel is not closed, we observe that the channel is
	// not ready for sending. Each of these observations is a single word-sized read
	// (first c.closed and second full()).
	// Because a closed channel cannot transition from 'ready for sending' to
	// 'not ready for sending', even if the channel is closed between the two observations,
	// they imply a moment between the two when the channel was both not yet closed
	// and not ready for sending. We behave as if we observed the channel at that moment,
	// and report that the send cannot proceed.
	//
	// It is okay if the reads are reordered here: if we observe that the channel is not
	// ready for sending and then observe that it is not closed, that implies that the
	// channel wasn't closed during the first observation. However, nothing here
	// guarantees forward progress. We rely on the side effects of lock release in
	// chanrecv() and closechan() to update this thread's view of c.closed and full().
	if ! && .closed == 0 && full() {
		return false
	}

	var  int64
	if blockprofilerate > 0 {
		 = cputicks()
	}

	lock(&.lock)

	if .closed != 0 {
		unlock(&.lock)
		panic(plainError("send on closed channel"))
	}

	if  := .recvq.dequeue();  != nil {
		// Found a waiting receiver. We pass the value we want to send
		// directly to the receiver, bypassing the channel buffer (if any).
		send(, , , func() { unlock(&.lock) }, 3)
		return true
	}

	if .qcount < .dataqsiz {
		// Space is available in the channel buffer. Enqueue the element to send.
		 := chanbuf(, .sendx)
		if raceenabled {
			racenotify(, .sendx, nil)
		}
		typedmemmove(.elemtype, , )
		.sendx++
		if .sendx == .dataqsiz {
			.sendx = 0
		}
		.qcount++
		unlock(&.lock)
		return true
	}

	if ! {
		unlock(&.lock)
		return false
	}

	// Block on the channel. Some receiver will complete our operation for us.
	 := getg()
	 := acquireSudog()
	.releasetime = 0
	if  != 0 {
		.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	.elem = 
	.waitlink = nil
	.g = 
	.isSelect = false
	.c = 
	.waiting = 
	.param = nil
	.sendq.enqueue()
	// Signal to anyone trying to shrink our stack that we're about
	// to park on a channel. The window between when this G's status
	// changes and when we set gp.activeStackChans is not safe for
	// stack shrinking.
	atomic.Store8(&.parkingOnChan, 1)
	gopark(chanparkcommit, unsafe.Pointer(&.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
	// Ensure the value being sent is kept alive until the
	// receiver copies it out. The sudog has a pointer to the
	// stack object, but sudogs aren't considered as roots of the
	// stack tracer.
	KeepAlive()

	// someone woke us up.
	if  != .waiting {
		throw("G waiting list is corrupted")
	}
	.waiting = nil
	.activeStackChans = false
	 := !.success
	.param = nil
	if .releasetime > 0 {
		blockevent(.releasetime-, 2)
	}
	.c = nil
	releaseSudog()
	if  {
		if .closed == 0 {
			throw("chansend: spurious wakeup")
		}
		panic(plainError("send on closed channel"))
	}
	return true
}

// send processes a send operation on an empty channel c.
// The value ep sent by the sender is copied to the receiver sg.
// The receiver is then woken up to go on its merry way.
// Channel c must be empty and locked.  send unlocks c with unlockf.
// sg must already be dequeued from c.
// ep must be non-nil and point to the heap or the caller's stack.
func send( *hchan,  *sudog,  unsafe.Pointer,  func(),  int) {
	if raceenabled {
		if .dataqsiz == 0 {
			racesync(, )
		} else {
			// Pretend we go through the buffer, even though
			// we copy directly. Note that we need to increment
			// the head/tail locations only when raceenabled.
			racenotify(, .recvx, nil)
			racenotify(, .recvx, )
			.recvx++
			if .recvx == .dataqsiz {
				.recvx = 0
			}
			.sendx = .recvx // c.sendx = (c.sendx+1) % c.dataqsiz
		}
	}
	if .elem != nil {
		sendDirect(.elemtype, , )
		.elem = nil
	}
	 := .g
	()
	.param = unsafe.Pointer()
	.success = true
	if .releasetime != 0 {
		.releasetime = cputicks()
	}
	goready(, +1)
}

// Sends and receives on unbuffered or empty-buffered channels are the
// only operations where one running goroutine writes to the stack of
// another running goroutine. The GC assumes that stack writes only
// happen when the goroutine is running and are only done by that
// goroutine. Using a write barrier is sufficient to make up for
// violating that assumption, but the write barrier has to work.
// typedmemmove will call bulkBarrierPreWrite, but the target bytes
// are not in the heap, so that will not help. We arrange to call
// memmove and typeBitsBulkBarrier instead.

func sendDirect( *_type,  *sudog,  unsafe.Pointer) {
	// src is on our stack, dst is a slot on another stack.

	// Once we read sg.elem out of sg, it will no longer
	// be updated if the destination's stack gets copied (shrunk).
	// So make sure that no preemption points can happen between read & use.
	 := .elem
	typeBitsBulkBarrier(, uintptr(), uintptr(), .size)
	// No need for cgo write barrier checks because dst is always
	// Go memory.
	memmove(, , .size)
}

func recvDirect( *_type,  *sudog,  unsafe.Pointer) {
	// dst is on our stack or the heap, src is on another stack.
	// The channel is locked, so src will not move during this
	// operation.
	 := .elem
	typeBitsBulkBarrier(, uintptr(), uintptr(), .size)
	memmove(, , .size)
}

func closechan( *hchan) {
	if  == nil {
		panic(plainError("close of nil channel"))
	}

	lock(&.lock)
	if .closed != 0 {
		unlock(&.lock)
		panic(plainError("close of closed channel"))
	}

	if raceenabled {
		 := getcallerpc()
		racewritepc(.raceaddr(), , funcPC())
		racerelease(.raceaddr())
	}

	.closed = 1

	var  gList

	// release all readers
	for {
		 := .recvq.dequeue()
		if  == nil {
			break
		}
		if .elem != nil {
			typedmemclr(.elemtype, .elem)
			.elem = nil
		}
		if .releasetime != 0 {
			.releasetime = cputicks()
		}
		 := .g
		.param = unsafe.Pointer()
		.success = false
		if raceenabled {
			raceacquireg(, .raceaddr())
		}
		.push()
	}

	// release all writers (they will panic)
	for {
		 := .sendq.dequeue()
		if  == nil {
			break
		}
		.elem = nil
		if .releasetime != 0 {
			.releasetime = cputicks()
		}
		 := .g
		.param = unsafe.Pointer()
		.success = false
		if raceenabled {
			raceacquireg(, .raceaddr())
		}
		.push()
	}
	unlock(&.lock)

	// Ready all Gs now that we've dropped the channel lock.
	for !.empty() {
		 := .pop()
		.schedlink = 0
		goready(, 3)
	}
}

// empty reports whether a read from c would block (that is, the channel is
// empty).  It uses a single atomic read of mutable state.
func empty( *hchan) bool {
	// c.dataqsiz is immutable.
	if .dataqsiz == 0 {
		return atomic.Loadp(unsafe.Pointer(&.sendq.first)) == nil
	}
	return atomic.Loaduint(&.qcount) == 0
}

// entry points for <- c from compiled code
//go:nosplit
func chanrecv1( *hchan,  unsafe.Pointer) {
	chanrecv(, , true)
}

//go:nosplit
func chanrecv2( *hchan,  unsafe.Pointer) ( bool) {
	_,  = chanrecv(, , true)
	return
}

// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv( *hchan,  unsafe.Pointer,  bool) (,  bool) {
	// raceenabled: don't need to check ep, as it is always on the stack
	// or is new memory allocated by reflect.

	if debugChan {
		print("chanrecv: chan=", , "\n")
	}

	if  == nil {
		if ! {
			return
		}
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	// Fast path: check for failed non-blocking operation without acquiring the lock.
	if ! && empty() {
		// After observing that the channel is not ready for receiving, we observe whether the
		// channel is closed.
		//
		// Reordering of these checks could lead to incorrect behavior when racing with a close.
		// For example, if the channel was open and not empty, was closed, and then drained,
		// reordered reads could incorrectly indicate "open and empty". To prevent reordering,
		// we use atomic loads for both checks, and rely on emptying and closing to happen in
		// separate critical sections under the same lock.  This assumption fails when closing
		// an unbuffered channel with a blocked send, but that is an error condition anyway.
		if atomic.Load(&.closed) == 0 {
			// Because a channel cannot be reopened, the later observation of the channel
			// being not closed implies that it was also not closed at the moment of the
			// first observation. We behave as if we observed the channel at that moment
			// and report that the receive cannot proceed.
			return
		}
		// The channel is irreversibly closed. Re-check whether the channel has any pending data
		// to receive, which could have arrived between the empty and closed checks above.
		// Sequential consistency is also required here, when racing with such a send.
		if empty() {
			// The channel is irreversibly closed and empty.
			if raceenabled {
				raceacquire(.raceaddr())
			}
			if  != nil {
				typedmemclr(.elemtype, )
			}
			return true, false
		}
	}

	var  int64
	if blockprofilerate > 0 {
		 = cputicks()
	}

	lock(&.lock)

	if .closed != 0 && .qcount == 0 {
		if raceenabled {
			raceacquire(.raceaddr())
		}
		unlock(&.lock)
		if  != nil {
			typedmemclr(.elemtype, )
		}
		return true, false
	}

	if  := .sendq.dequeue();  != nil {
		// Found a waiting sender. If buffer is size 0, receive value
		// directly from sender. Otherwise, receive from head of queue
		// and add sender's value to the tail of the queue (both map to
		// the same buffer slot because the queue is full).
		recv(, , , func() { unlock(&.lock) }, 3)
		return true, true
	}

	if .qcount > 0 {
		// Receive directly from queue
		 := chanbuf(, .recvx)
		if raceenabled {
			racenotify(, .recvx, nil)
		}
		if  != nil {
			typedmemmove(.elemtype, , )
		}
		typedmemclr(.elemtype, )
		.recvx++
		if .recvx == .dataqsiz {
			.recvx = 0
		}
		.qcount--
		unlock(&.lock)
		return true, true
	}

	if ! {
		unlock(&.lock)
		return false, false
	}

	// no sender available: block on this channel.
	 := getg()
	 := acquireSudog()
	.releasetime = 0
	if  != 0 {
		.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	.elem = 
	.waitlink = nil
	.waiting = 
	.g = 
	.isSelect = false
	.c = 
	.param = nil
	.recvq.enqueue()
	// Signal to anyone trying to shrink our stack that we're about
	// to park on a channel. The window between when this G's status
	// changes and when we set gp.activeStackChans is not safe for
	// stack shrinking.
	atomic.Store8(&.parkingOnChan, 1)
	gopark(chanparkcommit, unsafe.Pointer(&.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

	// someone woke us up
	if  != .waiting {
		throw("G waiting list is corrupted")
	}
	.waiting = nil
	.activeStackChans = false
	if .releasetime > 0 {
		blockevent(.releasetime-, 2)
	}
	 := .success
	.param = nil
	.c = nil
	releaseSudog()
	return true, 
}

// recv processes a receive operation on a full channel c.
// There are 2 parts:
// 1) The value sent by the sender sg is put into the channel
//    and the sender is woken up to go on its merry way.
// 2) The value received by the receiver (the current G) is
//    written to ep.
// For synchronous channels, both values are the same.
// For asynchronous channels, the receiver gets its data from
// the channel buffer and the sender's data is put in the
// channel buffer.
// Channel c must be full and locked. recv unlocks c with unlockf.
// sg must already be dequeued from c.
// A non-nil ep must point to the heap or the caller's stack.
func recv( *hchan,  *sudog,  unsafe.Pointer,  func(),  int) {
	if .dataqsiz == 0 {
		if raceenabled {
			racesync(, )
		}
		if  != nil {
			// copy data from sender
			recvDirect(.elemtype, , )
		}
	} else {
		// Queue is full. Take the item at the
		// head of the queue. Make the sender enqueue
		// its item at the tail of the queue. Since the
		// queue is full, those are both the same slot.
		 := chanbuf(, .recvx)
		if raceenabled {
			racenotify(, .recvx, nil)
			racenotify(, .recvx, )
		}
		// copy data from queue to receiver
		if  != nil {
			typedmemmove(.elemtype, , )
		}
		// copy data from sender to queue
		typedmemmove(.elemtype, , .elem)
		.recvx++
		if .recvx == .dataqsiz {
			.recvx = 0
		}
		.sendx = .recvx // c.sendx = (c.sendx+1) % c.dataqsiz
	}
	.elem = nil
	 := .g
	()
	.param = unsafe.Pointer()
	.success = true
	if .releasetime != 0 {
		.releasetime = cputicks()
	}
	goready(, +1)
}

func chanparkcommit( *g,  unsafe.Pointer) bool {
	// There are unlocked sudogs that point into gp's stack. Stack
	// copying must lock the channels of those sudogs.
	// Set activeStackChans here instead of before we try parking
	// because we could self-deadlock in stack growth on the
	// channel lock.
	.activeStackChans = true
	// Mark that it's safe for stack shrinking to occur now,
	// because any thread acquiring this G's stack for shrinking
	// is guaranteed to observe activeStackChans after this store.
	atomic.Store8(&.parkingOnChan, 0)
	// Make sure we unlock after setting activeStackChans and
	// unsetting parkingOnChan. The moment we unlock chanLock
	// we risk gp getting readied by a channel operation and
	// so gp could continue running before everything before
	// the unlock is visible (even to gp itself).
	unlock((*mutex)())
	return true
}

// compiler implements
//
//	select {
//	case c <- v:
//		... foo
//	default:
//		... bar
//	}
//
// as
//
//	if selectnbsend(c, v) {
//		... foo
//	} else {
//		... bar
//	}
//
func selectnbsend( *hchan,  unsafe.Pointer) ( bool) {
	return chansend(, , false, getcallerpc())
}

// compiler implements
//
//	select {
//	case v, ok = <-c:
//		... foo
//	default:
//		... bar
//	}
//
// as
//
//	if selected, ok = selectnbrecv(&v, c); selected {
//		... foo
//	} else {
//		... bar
//	}
//
func selectnbrecv( unsafe.Pointer,  *hchan) (,  bool) {
	return chanrecv(, , false)
}

//go:linkname reflect_chansend reflect.chansend
func reflect_chansend( *hchan,  unsafe.Pointer,  bool) ( bool) {
	return chansend(, , !, getcallerpc())
}

//go:linkname reflect_chanrecv reflect.chanrecv
func reflect_chanrecv( *hchan,  bool,  unsafe.Pointer) ( bool,  bool) {
	return chanrecv(, , !)
}

//go:linkname reflect_chanlen reflect.chanlen
func reflect_chanlen( *hchan) int {
	if  == nil {
		return 0
	}
	return int(.qcount)
}

//go:linkname reflectlite_chanlen internal/reflectlite.chanlen
func reflectlite_chanlen( *hchan) int {
	if  == nil {
		return 0
	}
	return int(.qcount)
}

//go:linkname reflect_chancap reflect.chancap
func reflect_chancap( *hchan) int {
	if  == nil {
		return 0
	}
	return int(.dataqsiz)
}

//go:linkname reflect_chanclose reflect.chanclose
func reflect_chanclose( *hchan) {
	closechan()
}

func ( *waitq) ( *sudog) {
	.next = nil
	 := .last
	if  == nil {
		.prev = nil
		.first = 
		.last = 
		return
	}
	.prev = 
	.next = 
	.last = 
}

func ( *waitq) () *sudog {
	for {
		 := .first
		if  == nil {
			return nil
		}
		 := .next
		if  == nil {
			.first = nil
			.last = nil
		} else {
			.prev = nil
			.first = 
			.next = nil // mark as removed (see dequeueSudog)
		}

		// if a goroutine was put on this queue because of a
		// select, there is a small window between the goroutine
		// being woken up by a different case and it grabbing the
		// channel locks. Once it has the lock
		// it removes itself from the queue, so we won't see it after that.
		// We use a flag in the G struct to tell us when someone
		// else has won the race to signal this goroutine but the goroutine
		// hasn't removed itself from the queue yet.
		if .isSelect && !atomic.Cas(&.g.selectDone, 0, 1) {
			continue
		}

		return 
	}
}

func ( *hchan) () unsafe.Pointer {
	// Treat read-like and write-like operations on the channel to
	// happen at this address. Avoid using the address of qcount
	// or dataqsiz, because the len() and cap() builtins read
	// those addresses, and we don't want them racing with
	// operations like close().
	return unsafe.Pointer(&.buf)
}

func racesync( *hchan,  *sudog) {
	racerelease(chanbuf(, 0))
	raceacquireg(.g, chanbuf(, 0))
	racereleaseg(.g, chanbuf(, 0))
	raceacquire(chanbuf(, 0))
}

// Notify the race detector of a send or receive involving buffer entry idx
// and a channel c or its communicating partner sg.
// This function handles the special case of c.elemsize==0.
func racenotify( *hchan,  uint,  *sudog) {
	// We could have passed the unsafe.Pointer corresponding to entry idx
	// instead of idx itself.  However, in a future version of this function,
	// we can use idx to better handle the case of elemsize==0.
	// A future improvement to the detector is to call TSan with c and idx:
	// this way, Go will continue to not allocating buffer entries for channels
	// of elemsize==0, yet the race detector can be made to handle multiple
	// sync objects underneath the hood (one sync object per idx)
	 := chanbuf(, )
	// When elemsize==0, we don't allocate a full buffer for the channel.
	// Instead of individual buffer entries, the race detector uses the
	// c.buf as the only buffer entry.  This simplification prevents us from
	// following the memory model's happens-before rules (rules that are
	// implemented in racereleaseacquire).  Instead, we accumulate happens-before
	// information in the synchronization object associated with c.buf.
	if .elemsize == 0 {
		if  == nil {
			raceacquire()
			racerelease()
		} else {
			raceacquireg(.g, )
			racereleaseg(.g, )
		}
	} else {
		if  == nil {
			racereleaseacquire()
		} else {
			racereleaseacquireg(.g, )
		}
	}
}