Source File
coro.go
Belonging Package
runtime
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package runtime
import
// A coro represents extra concurrency without extra parallelism,
// as would be needed for a coroutine implementation.
// The coro does not represent a specific coroutine, only the ability
// to do coroutine-style control transfers.
// It can be thought of as like a special channel that always has
// a goroutine blocked on it. If another goroutine calls coroswitch(c),
// the caller becomes the goroutine blocked in c, and the goroutine
// formerly blocked in c starts running.
// These switches continue until a call to coroexit(c),
// which ends the use of the coro by releasing the blocked
// goroutine in c and exiting the current goroutine.
//
// Coros are heap allocated and garbage collected, so that user code
// can hold a pointer to a coro without causing potential dangling
// pointer errors.
type coro struct {
gp guintptr
f func(*coro)
// State for validating thread-lock interactions.
mp *m
lockedExt uint32 // mp's external LockOSThread counter at coro creation time.
lockedInt uint32 // mp's internal lockOSThread counter at coro creation time.
}
//go:linkname newcoro
// newcoro creates a new coro containing a
// goroutine blocked waiting to run f
// and returns that coro.
func newcoro( func(*coro)) *coro {
:= new(coro)
.f =
:= getcallerpc()
:= getg()
systemstack(func() {
:= .m
:= corostart
:= *(**funcval)(unsafe.Pointer(&))
= newproc1(, , , true, waitReasonCoroutine)
// Scribble down locked thread state if needed and/or donate
// thread-lock state to the new goroutine.
if .lockedExt+.lockedInt != 0 {
.mp =
.lockedExt = .lockedExt
.lockedInt = .lockedInt
}
})
.coroarg =
.gp.set()
return
}
// corostart is the entry func for a new coroutine.
// It runs the coroutine user function f passed to corostart
// and then calls coroexit to remove the extra concurrency.
func corostart() {
:= getg()
:= .coroarg
.coroarg = nil
defer coroexit()
.f()
}
// coroexit is like coroswitch but closes the coro
// and exits the current goroutine
func coroexit( *coro) {
:= getg()
.coroarg =
.coroexit = true
mcall(coroswitch_m)
}
//go:linkname coroswitch
// coroswitch switches to the goroutine blocked on c
// and then blocks the current goroutine on c.
func coroswitch( *coro) {
:= getg()
.coroarg =
mcall(coroswitch_m)
}
// coroswitch_m is the implementation of coroswitch
// that runs on the m stack.
//
// Note: Coroutine switches are expected to happen at
// an order of magnitude (or more) higher frequency
// than regular goroutine switches, so this path is heavily
// optimized to remove unnecessary work.
// The fast path here is three CAS: the one at the top on gp.atomicstatus,
// the one in the middle to choose the next g,
// and the one at the bottom on gnext.atomicstatus.
// It is important not to add more atomic operations or other
// expensive operations to the fast path.
func coroswitch_m( *g) {
:= .coroarg
.coroarg = nil
:= .coroexit
.coroexit = false
:= .m
// Track and validate thread-lock interactions.
//
// The rules with thread-lock interactions are simple. When a coro goroutine is switched to,
// the same thread must be used, and the locked state must match with the thread-lock state of
// the goroutine which called newcoro. Thread-lock state consists of the thread and the number
// of internal (cgo callback, etc.) and external (LockOSThread) thread locks.
:= .lockedm != 0
if .mp != nil || {
if != .mp || .lockedInt != .lockedInt || .lockedExt != .lockedExt {
print("coro: got thread ", unsafe.Pointer(), ", want ", unsafe.Pointer(.mp), "\n")
print("coro: got lock internal ", .lockedInt, ", want ", .lockedInt, "\n")
print("coro: got lock external ", .lockedExt, ", want ", .lockedExt, "\n")
throw("coro: OS thread locking must match locking at coroutine creation")
}
}
// Acquire tracer for writing for the duration of this call.
//
// There's a lot of state manipulation performed with shortcuts
// but we need to make sure the tracer can only observe the
// start and end states to maintain a coherent model and avoid
// emitting an event for every single transition.
:= traceAcquire()
if {
// Detach the goroutine from the thread; we'll attach to the goroutine we're
// switching to before returning.
.lockedm.set(nil)
}
if {
// The M might have a non-zero OS thread lock count when we get here, gdestroy
// will avoid destroying the M if the G isn't explicitly locked to it via lockedm,
// which we cleared above. It's fine to gdestroy here also, even when locked to
// the thread, because we'll be switching back to another goroutine anyway, which
// will take back its thread-lock state before returning.
gdestroy()
= nil
} else {
// If we can CAS ourselves directly from running to waiting, so do,
// keeping the control transfer as lightweight as possible.
.waitreason = waitReasonCoroutine
if !.atomicstatus.CompareAndSwap(_Grunning, _Gwaiting) {
// The CAS failed: use casgstatus, which will take care of
// coordinating with the garbage collector about the state change.
casgstatus(, _Grunning, _Gwaiting)
}
// Clear gp.m.
setMNoWB(&.m, nil)
}
// The goroutine stored in c is the one to run next.
// Swap it with ourselves.
var *g
for {
// Note: this is a racy load, but it will eventually
// get the right value, and if it gets the wrong value,
// the c.gp.cas will fail, so no harm done other than
// a wasted loop iteration.
// The cas will also sync c.gp's
// memory enough that the next iteration of the racy load
// should see the correct value.
// We are avoiding the atomic load to keep this path
// as lightweight as absolutely possible.
// (The atomic load is free on x86 but not free elsewhere.)
:= .gp
if .ptr() == nil {
throw("coroswitch on exited coro")
}
var guintptr
.set()
if .gp.cas(, ) {
= .ptr()
break
}
}
// Check if we're switching to ourselves. This case is able to break our
// thread-lock invariants and an unbuffered channel implementation of
// coroswitch would deadlock. It's clear that this case should just not
// work.
if == {
throw("coroswitch of a goroutine to itself")
}
// Emit the trace event after getting gnext but before changing curg.
// GoSwitch expects that the current G is running and that we haven't
// switched yet for correct status emission.
if .ok() {
.GoSwitch(, )
}
// Start running next, without heavy scheduling machinery.
// Set mp.curg and gnext.m and then update scheduling state
// directly if possible.
setGNoWB(&.curg, )
setMNoWB(&.m, )
if !.atomicstatus.CompareAndSwap(_Gwaiting, _Grunning) {
// The CAS failed: use casgstatus, which will take care of
// coordinating with the garbage collector about the state change.
casgstatus(, _Gwaiting, _Grunnable)
casgstatus(, _Grunnable, _Grunning)
}
// Donate locked state.
if {
.lockedg.set()
.lockedm.set()
}
// Release the trace locker. We've completed all the necessary transitions..
if .ok() {
traceRelease()
}
// Switch to gnext. Does not return.
gogo(&.sched)
}
The pages are generated with Golds v0.7.0-preview. (GOOS=linux GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds. |