package trace
import (
"bufio"
"fmt"
"io"
"slices"
"strings"
"internal/trace/internal/tracev1"
"internal/trace/tracev2"
"internal/trace/version"
)
type Reader struct {
version version .Version
r *bufio .Reader
lastTs Time
gen *generation
spill *spilledBatch
spillErr error
spillErrSync bool
frontier []*batchCursor
cpuSamples []cpuSample
order ordering
syncs int
done bool
v1Events *traceV1Converter
}
func NewReader (r io .Reader ) (*Reader , error ) {
br := bufio .NewReader (r )
v , err := version .ReadHeader (br )
if err != nil {
return nil , err
}
switch v {
case version .Go111 , version .Go119 , version .Go121 :
tr , err := tracev1 .Parse (br , v )
if err != nil {
return nil , err
}
return &Reader {
v1Events : convertV1Trace (tr ),
}, nil
case version .Go122 , version .Go123 , version .Go125 :
return &Reader {
version : v ,
r : br ,
order : ordering {
traceVer : v ,
mStates : make (map [ThreadID ]*mState ),
pStates : make (map [ProcID ]*pState ),
gStates : make (map [GoID ]*gState ),
activeTasks : make (map [TaskID ]taskState ),
},
}, nil
default :
return nil , fmt .Errorf ("unknown or unsupported version go 1.%d" , v )
}
}
func (r *Reader ) ReadEvent () (e Event , err error ) {
if r .done {
return Event {}, io .EOF
}
if r .v1Events != nil {
if r .syncs == 0 {
ev , ok := r .v1Events .events .Peek ()
if ok {
r .syncs ++
return syncEvent (r .v1Events .evt , Time (ev .Ts -1 ), r .syncs ), nil
}
}
ev , err := r .v1Events .next ()
if err == io .EOF {
r .done = true
r .syncs ++
return syncEvent (nil , r .v1Events .lastTs +1 , r .syncs ), nil
} else if err != nil {
return Event {}, err
}
return ev , nil
}
defer func () {
if err != nil {
return
}
if err = e .validateTableIDs (); err != nil {
return
}
if e .base .time <= r .lastTs {
e .base .time = r .lastTs + 1
}
r .lastTs = e .base .time
}()
if ev , ok := r .order .Next (); ok {
return ev , nil
}
if len (r .frontier ) == 0 && len (r .cpuSamples ) == 0 {
if r .spillErr != nil {
if r .spillErrSync {
return Event {}, r .spillErr
}
r .spillErrSync = true
r .syncs ++
return syncEvent (nil , r .lastTs , r .syncs ), nil
}
if r .gen != nil && r .spill == nil {
r .done = true
r .syncs ++
return syncEvent (nil , r .lastTs , r .syncs ), nil
}
r .gen , r .spill , r .spillErr = readGeneration (r .r , r .spill , r .version )
if r .gen == nil {
r .spillErrSync = true
r .syncs ++
return syncEvent (nil , r .lastTs , r .syncs ), nil
}
r .cpuSamples = r .gen .cpuSamples
for _ , m := range r .gen .batchMs {
batches := r .gen .batches [m ]
bc := &batchCursor {m : m }
ok , err := bc .nextEvent (batches , r .gen .freq )
if err != nil {
return Event {}, err
}
if !ok {
continue
}
r .frontier = heapInsert (r .frontier , bc )
}
r .syncs ++
return syncEvent (r .gen .evTable , r .gen .freq .mul (r .gen .minTs ), r .syncs ), nil
}
tryAdvance := func (i int ) (bool , error ) {
bc := r .frontier [i ]
if ok , err := r .order .Advance (&bc .ev , r .gen .evTable , bc .m , r .gen .gen ); !ok || err != nil {
return ok , err
}
ok , err := bc .nextEvent (r .gen .batches [bc .m ], r .gen .freq )
if err != nil {
return false , err
}
if ok {
heapUpdate (r .frontier , i )
} else {
r .frontier = heapRemove (r .frontier , i )
}
return true , nil
}
if len (r .cpuSamples ) != 0 {
if len (r .frontier ) == 0 || r .cpuSamples [0 ].time < r .frontier [0 ].ev .time {
e := r .cpuSamples [0 ].asEvent (r .gen .evTable )
r .cpuSamples = r .cpuSamples [1 :]
return e , nil
}
}
if len (r .frontier ) == 0 {
return Event {}, fmt .Errorf ("broken trace: frontier is empty:\n[gen=%d]\n\n%s\n%s\n" , r .gen .gen , dumpFrontier (r .frontier ), dumpOrdering (&r .order ))
}
if ok , err := tryAdvance (0 ); err != nil {
return Event {}, err
} else if !ok {
slices .SortFunc (r .frontier , (*batchCursor ).compare )
success := false
for i := 1 ; i < len (r .frontier ); i ++ {
if ok , err = tryAdvance (i ); err != nil {
return Event {}, err
} else if ok {
success = true
break
}
}
if !success {
return Event {}, fmt .Errorf ("broken trace: failed to advance: frontier:\n[gen=%d]\n\n%s\n%s\n" , r .gen .gen , dumpFrontier (r .frontier ), dumpOrdering (&r .order ))
}
}
ev , ok := r .order .Next ()
if !ok {
panic ("invariant violation: advance successful, but queue is empty" )
}
return ev , nil
}
func dumpFrontier(frontier []*batchCursor ) string {
var sb strings .Builder
for _ , bc := range frontier {
spec := tracev2 .Specs ()[bc .ev .typ ]
fmt .Fprintf (&sb , "M %d [%s time=%d" , bc .m , spec .Name , bc .ev .time )
for i , arg := range spec .Args [1 :] {
fmt .Fprintf (&sb , " %s=%d" , arg , bc .ev .args [i ])
}
fmt .Fprintf (&sb , "]\n" )
}
return sb .String ()
}
The pages are generated with Golds v0.7.7-preview . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .