// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package trace

import (
	
	
	
	
	

	
	
	
)

// Reader reads a byte stream, validates it, and produces trace events.
type Reader struct {
	r           *bufio.Reader
	lastTs      Time
	gen         *generation
	spill       *spilledBatch
	spillErr    error // error from reading spill
	frontier    []*batchCursor
	cpuSamples  []cpuSample
	order       ordering
	emittedSync bool

	go121Events *oldTraceConverter
}

// NewReader creates a new trace reader.
func ( io.Reader) (*Reader, error) {
	 := bufio.NewReader()
	,  := version.ReadHeader()
	if  != nil {
		return nil, 
	}
	switch  {
	case version.Go111, version.Go119, version.Go121:
		,  := oldtrace.Parse(, )
		if  != nil {
			return nil, 
		}
		return &Reader{
			go121Events: convertOldFormat(),
		}, nil
	case version.Go122, version.Go123:
		return &Reader{
			r: ,
			order: ordering{
				mStates:     make(map[ThreadID]*mState),
				pStates:     make(map[ProcID]*pState),
				gStates:     make(map[GoID]*gState),
				activeTasks: make(map[TaskID]taskState),
			},
			// Don't emit a sync event when we first go to emit events.
			emittedSync: true,
		}, nil
	default:
		return nil, fmt.Errorf("unknown or unsupported version go 1.%d", )
	}
}

// ReadEvent reads a single event from the stream.
//
// If the stream has been exhausted, it returns an invalid
// event and io.EOF.
func ( *Reader) () ( Event,  error) {
	if .go121Events != nil {
		,  := .go121Events.next()
		if  != nil {
			// XXX do we have to emit an EventSync when the trace is done?
			return Event{}, 
		}
		return , nil
	}

	// Go 1.22+ trace parsing algorithm.
	//
	// (1) Read in all the batches for the next generation from the stream.
	//   (a) Use the size field in the header to quickly find all batches.
	// (2) Parse out the strings, stacks, CPU samples, and timestamp conversion data.
	// (3) Group each event batch by M, sorted by timestamp. (batchCursor contains the groups.)
	// (4) Organize batchCursors in a min-heap, ordered by the timestamp of the next event for each M.
	// (5) Try to advance the next event for the M at the top of the min-heap.
	//   (a) On success, select that M.
	//   (b) On failure, sort the min-heap and try to advance other Ms. Select the first M that advances.
	//   (c) If there's nothing left to advance, goto (1).
	// (6) Select the latest event for the selected M and get it ready to be returned.
	// (7) Read the next event for the selected M and update the min-heap.
	// (8) Return the selected event, goto (5) on the next call.

	// Set us up to track the last timestamp and fix up
	// the timestamp of any event that comes through.
	defer func() {
		if  != nil {
			return
		}
		if  = .validateTableIDs();  != nil {
			return
		}
		if .base.time <= .lastTs {
			.base.time = .lastTs + 1
		}
		.lastTs = .base.time
	}()

	// Consume any events in the ordering first.
	if ,  := .order.Next();  {
		return , nil
	}

	// Check if we need to refresh the generation.
	if len(.frontier) == 0 && len(.cpuSamples) == 0 {
		if !.emittedSync {
			.emittedSync = true
			return syncEvent(.gen.evTable, .lastTs), nil
		}
		if .spillErr != nil {
			return Event{}, .spillErr
		}
		if .gen != nil && .spill == nil {
			// If we have a generation from the last read,
			// and there's nothing left in the frontier, and
			// there's no spilled batch, indicating that there's
			// no further generation, it means we're done.
			// Return io.EOF.
			return Event{}, io.EOF
		}
		// Read the next generation.
		var  error
		.gen, .spill,  = readGeneration(.r, .spill)
		if .gen == nil {
			return Event{}, 
		}
		.spillErr = 

		// Reset CPU samples cursor.
		.cpuSamples = .gen.cpuSamples

		// Reset frontier.
		for ,  := range .gen.batches {
			 := &batchCursor{m: }
			,  := .nextEvent(, .gen.freq)
			if  != nil {
				return Event{}, 
			}
			if ! {
				// Turns out there aren't actually any events in these batches.
				continue
			}
			.frontier = heapInsert(.frontier, )
		}

		// Reset emittedSync.
		.emittedSync = false
	}
	 := func( int) (bool, error) {
		 := .frontier[]

		if ,  := .order.Advance(&.ev, .gen.evTable, .m, .gen.gen); ! ||  != nil {
			return , 
		}

		// Refresh the cursor's event.
		,  := .nextEvent(.gen.batches[.m], .gen.freq)
		if  != nil {
			return false, 
		}
		if  {
			// If we successfully refreshed, update the heap.
			heapUpdate(.frontier, )
		} else {
			// There's nothing else to read. Delete this cursor from the frontier.
			.frontier = heapRemove(.frontier, )
		}
		return true, nil
	}
	// Inject a CPU sample if it comes next.
	if len(.cpuSamples) != 0 {
		if len(.frontier) == 0 || .cpuSamples[0].time < .frontier[0].ev.time {
			 := .cpuSamples[0].asEvent(.gen.evTable)
			.cpuSamples = .cpuSamples[1:]
			return , nil
		}
	}
	// Try to advance the head of the frontier, which should have the minimum timestamp.
	// This should be by far the most common case
	if len(.frontier) == 0 {
		return Event{}, fmt.Errorf("broken trace: frontier is empty:\n[gen=%d]\n\n%s\n%s\n", .gen.gen, dumpFrontier(.frontier), dumpOrdering(&.order))
	}
	if ,  := (0);  != nil {
		return Event{}, 
	} else if ! {
		// Try to advance the rest of the frontier, in timestamp order.
		//
		// To do this, sort the min-heap. A sorted min-heap is still a
		// min-heap, but now we can iterate over the rest and try to
		// advance in order. This path should be rare.
		slices.SortFunc(.frontier, (*batchCursor).compare)
		 := false
		for  := 1;  < len(.frontier); ++ {
			if ,  = ();  != nil {
				return Event{}, 
			} else if  {
				 = true
				break
			}
		}
		if ! {
			return Event{}, fmt.Errorf("broken trace: failed to advance: frontier:\n[gen=%d]\n\n%s\n%s\n", .gen.gen, dumpFrontier(.frontier), dumpOrdering(&.order))
		}
	}

	// Pick off the next event on the queue. At this point, one must exist.
	,  := .order.Next()
	if ! {
		panic("invariant violation: advance successful, but queue is empty")
	}
	return , nil
}

func dumpFrontier( []*batchCursor) string {
	var  strings.Builder
	for ,  := range  {
		 := go122.Specs()[.ev.typ]
		fmt.Fprintf(&, "M %d [%s time=%d", .m, .Name, .ev.time)
		for ,  := range .Args[1:] {
			fmt.Fprintf(&, " %s=%d", , .ev.args[])
		}
		fmt.Fprintf(&, "]\n")
	}
	return .String()
}