// Copyright 2023 The Go Authors. All rights reserved.// Use of this source code is governed by a BSD-style// license that can be found in the LICENSE file.package traceimport ()// Reader reads a byte stream, validates it, and produces trace events.typeReaderstruct { r *bufio.Reader lastTs Time gen *generation spill *spilledBatch spillErr error// error from reading spill frontier []*batchCursor cpuSamples []cpuSample order ordering emittedSync bool go121Events *oldTraceConverter}// NewReader creates a new trace reader.func ( io.Reader) (*Reader, error) { := bufio.NewReader() , := version.ReadHeader()if != nil {returnnil, }switch {caseversion.Go111, version.Go119, version.Go121: , := oldtrace.Parse(, )if != nil {returnnil, }return &Reader{go121Events: convertOldFormat(), }, nilcaseversion.Go122, version.Go123:return &Reader{r: ,order: ordering{mStates: make(map[ThreadID]*mState),pStates: make(map[ProcID]*pState),gStates: make(map[GoID]*gState),activeTasks: make(map[TaskID]taskState), },// Don't emit a sync event when we first go to emit events.emittedSync: true, }, nildefault:returnnil, fmt.Errorf("unknown or unsupported version go 1.%d", ) }}// ReadEvent reads a single event from the stream.//// If the stream has been exhausted, it returns an invalid// event and io.EOF.func ( *Reader) () ( Event, error) {if .go121Events != nil { , := .go121Events.next()if != nil {// XXX do we have to emit an EventSync when the trace is done?returnEvent{}, }return , nil }// Go 1.22+ trace parsing algorithm. // // (1) Read in all the batches for the next generation from the stream. // (a) Use the size field in the header to quickly find all batches. // (2) Parse out the strings, stacks, CPU samples, and timestamp conversion data. // (3) Group each event batch by M, sorted by timestamp. (batchCursor contains the groups.) // (4) Organize batchCursors in a min-heap, ordered by the timestamp of the next event for each M. // (5) Try to advance the next event for the M at the top of the min-heap. // (a) On success, select that M. // (b) On failure, sort the min-heap and try to advance other Ms. Select the first M that advances. // (c) If there's nothing left to advance, goto (1). // (6) Select the latest event for the selected M and get it ready to be returned. // (7) Read the next event for the selected M and update the min-heap. // (8) Return the selected event, goto (5) on the next call.// Set us up to track the last timestamp and fix up // the timestamp of any event that comes through.deferfunc() {if != nil {return }if = .validateTableIDs(); != nil {return }if .base.time <= .lastTs { .base.time = .lastTs + 1 } .lastTs = .base.time }()// Consume any events in the ordering first.if , := .order.Next(); {return , nil }// Check if we need to refresh the generation.iflen(.frontier) == 0 && len(.cpuSamples) == 0 {if !.emittedSync { .emittedSync = truereturnsyncEvent(.gen.evTable, .lastTs), nil }if .spillErr != nil {returnEvent{}, .spillErr }if .gen != nil && .spill == nil {// If we have a generation from the last read, // and there's nothing left in the frontier, and // there's no spilled batch, indicating that there's // no further generation, it means we're done. // Return io.EOF.returnEvent{}, io.EOF }// Read the next generation.varerror .gen, .spill, = readGeneration(.r, .spill)if .gen == nil {returnEvent{}, } .spillErr = // Reset CPU samples cursor. .cpuSamples = .gen.cpuSamples// Reset frontier.for , := range .gen.batches { := &batchCursor{m: } , := .nextEvent(, .gen.freq)if != nil {returnEvent{}, }if ! {// Turns out there aren't actually any events in these batches.continue } .frontier = heapInsert(.frontier, ) }// Reset emittedSync. .emittedSync = false } := func( int) (bool, error) { := .frontier[]if , := .order.Advance(&.ev, .gen.evTable, .m, .gen.gen); ! || != nil {return , }// Refresh the cursor's event. , := .nextEvent(.gen.batches[.m], .gen.freq)if != nil {returnfalse, }if {// If we successfully refreshed, update the heap.heapUpdate(.frontier, ) } else {// There's nothing else to read. Delete this cursor from the frontier. .frontier = heapRemove(.frontier, ) }returntrue, nil }// Inject a CPU sample if it comes next.iflen(.cpuSamples) != 0 {iflen(.frontier) == 0 || .cpuSamples[0].time < .frontier[0].ev.time { := .cpuSamples[0].asEvent(.gen.evTable) .cpuSamples = .cpuSamples[1:]return , nil } }// Try to advance the head of the frontier, which should have the minimum timestamp. // This should be by far the most common caseiflen(.frontier) == 0 {returnEvent{}, fmt.Errorf("broken trace: frontier is empty:\n[gen=%d]\n\n%s\n%s\n", .gen.gen, dumpFrontier(.frontier), dumpOrdering(&.order)) }if , := (0); != nil {returnEvent{}, } elseif ! {// Try to advance the rest of the frontier, in timestamp order. // // To do this, sort the min-heap. A sorted min-heap is still a // min-heap, but now we can iterate over the rest and try to // advance in order. This path should be rare.slices.SortFunc(.frontier, (*batchCursor).compare) := falsefor := 1; < len(.frontier); ++ {if , = (); != nil {returnEvent{}, } elseif { = truebreak } }if ! {returnEvent{}, fmt.Errorf("broken trace: failed to advance: frontier:\n[gen=%d]\n\n%s\n%s\n", .gen.gen, dumpFrontier(.frontier), dumpOrdering(&.order)) } }// Pick off the next event on the queue. At this point, one must exist. , := .order.Next()if ! {panic("invariant violation: advance successful, but queue is empty") }return , nil}func dumpFrontier( []*batchCursor) string {varstrings.Builderfor , := range { := go122.Specs()[.ev.typ]fmt.Fprintf(&, "M %d [%s time=%d", .m, .Name, .ev.time)for , := range .Args[1:] {fmt.Fprintf(&, " %s=%d", , .ev.args[]) }fmt.Fprintf(&, "]\n") }return .String()}
The pages are generated with Goldsv0.7.0-preview. (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu.
PR and bug reports are welcome and can be submitted to the issue list.
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds.