0
1
mirror of https://github.com/golang/go synced 2025-05-27 15:10:40 +00:00

internal/trace: refactor how experimental batches are exposed

This change modifies how per-generation experimental batches are
exposed. Rather than expose them on the ExperimentalEvent, it exposes it
as part of the Sync event, so it's clear to the caller when the
information becomes relevant and when it should be parsed.

This change also adds a field to each ExperimentalEvent indicating which
experiment the event is a part of.

Because this information needs to appear *before* a generation is
observed, we now ensure there is a sync event both before and after each
generation. This means the final sync event is now a special case;
previously we would only emit a sync event after each generation.

This change is based on feedback from Austin Clements on the
experimental events functionality.

For .

Change-Id: I48b0fe12b22abb7ac8820a9e73447bfed8419856
Reviewed-on: https://go-review.googlesource.com/c/go/+/644215
Auto-Submit: Michael Knyszek <mknyszek@google.com>
Reviewed-by: Michael Pratt <mpratt@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
This commit is contained in:
Michael Anthony Knyszek
2025-01-09 18:17:49 +00:00
committed by Gopher Robot
parent d7f6f6fd54
commit ca1cfea371
10 changed files with 141 additions and 82 deletions

@ -58,9 +58,8 @@ type evTable struct {
extraStringIDs map[string]extraStringID
nextExtra extraStringID
// expData contains extra unparsed data that is accessible
// only to ExperimentEvent via an EventExperimental event.
expData map[event.Experiment]*ExperimentalData
// expBatches contains extra unparsed data relevant to a specific experiment.
expBatches map[event.Experiment][]ExperimentalBatch
}
// addExtraString adds an extra string to the evTable and returns

@ -313,26 +313,15 @@ type ExperimentalEvent struct {
// Name is the name of the event.
Name string
// Experiment is the name of the experiment this event is a part of.
Experiment string
// ArgNames is the names of the event's arguments in order.
// This may refer to a globally shared slice. Copy before mutating.
ArgNames []string
// Args contains the event's arguments.
Args []uint64
// Data is additional unparsed data that is associated with the experimental event.
// Data is likely to be shared across many ExperimentalEvents, so callers that parse
// Data are encouraged to cache the parse result and look it up by the value of Data.
Data *ExperimentalData
}
// ExperimentalData represents some raw and unparsed sidecar data present in the trace that is
// associated with certain kinds of experimental events. For example, this data may contain
// tables needed to interpret ExperimentalEvent arguments, or the ExperimentEvent could just be
// a placeholder for a differently encoded event that's actually present in the experimental data.
type ExperimentalData struct {
// Batches contain the actual experimental data, along with metadata about each batch.
Batches []ExperimentalBatch
}
// ExperimentalBatch represents a packet of unparsed data along with metadata about that packet.
@ -658,6 +647,35 @@ func (e Event) StateTransition() StateTransition {
return s
}
// Sync returns details that are relevant for the following events, up to but excluding the
// next EventSync event.
func (e Event) Sync() Sync {
if e.Kind() != EventSync {
panic("Sync called on non-Sync event")
}
var expBatches map[string][]ExperimentalBatch
if e.table != nil {
expBatches = make(map[string][]ExperimentalBatch)
for exp, batches := range e.table.expBatches {
expBatches[go122.Experiments()[exp]] = batches
}
}
return Sync{
N: int(e.base.args[0]),
ExperimentalBatches: expBatches,
}
}
// Sync contains details potentially relevant to all the following events, up to but excluding
// the next EventSync event.
type Sync struct {
// N indicates that this is the Nth sync event in the trace.
N int
// ExperimentalBatches contain all the unparsed batches of data for a given experiment.
ExperimentalBatches map[string][]ExperimentalBatch
}
// Experimental returns a view of the raw event for an experimental event.
//
// Panics if Kind != EventExperimental.
@ -668,10 +686,10 @@ func (e Event) Experimental() ExperimentalEvent {
spec := go122.Specs()[e.base.typ]
argNames := spec.Args[1:] // Skip timestamp; already handled.
return ExperimentalEvent{
Name: spec.Name,
ArgNames: argNames,
Args: e.base.args[:len(argNames)],
Data: e.table.expData[spec.Experiment],
Name: spec.Name,
Experiment: go122.Experiments()[spec.Experiment],
ArgNames: argNames,
Args: e.base.args[:len(argNames)],
}
}
@ -848,8 +866,8 @@ func (e Event) validateTableIDs() error {
return nil
}
func syncEvent(table *evTable, ts Time) Event {
return Event{
func syncEvent(table *evTable, ts Time, n int) Event {
ev := Event{
table: table,
ctx: schedCtx{
G: NoGoroutine,
@ -861,4 +879,6 @@ func syncEvent(table *evTable, ts Time) Event {
time: ts,
},
}
ev.base.args[0] = uint64(n)
return ev
}

@ -86,6 +86,14 @@ const (
AllocFree event.Experiment = 1 + iota
)
func Experiments() []string {
return experiments[:]
}
var experiments = [...]string{
AllocFree: "AllocFree",
}
// Experimental events.
const (
_ event.Type = 127 + iota

@ -8,7 +8,7 @@ import "testing"
func TestPanicEvent(t *testing.T) {
// Use a sync event for this because it doesn't have any extra metadata.
ev := syncEvent(nil, 0)
ev := syncEvent(nil, 0, 0)
mustPanic(t, func() {
_ = ev.Range()

@ -75,7 +75,7 @@ func MutatorUtilizationV2(events []Event, flags UtilFlags) [][]MutatorUtil {
states := make(map[GoID]GoState)
bgMark := make(map[GoID]bool)
procs := []procsCount{}
seenSync := false
nSync := 0
// Helpers.
handleSTW := func(r Range) bool {
@ -97,7 +97,7 @@ func MutatorUtilizationV2(events []Event, flags UtilFlags) [][]MutatorUtil {
// Process the event.
switch ev.Kind() {
case EventSync:
seenSync = true
nSync = ev.Sync().N
case EventMetric:
m := ev.Metric()
if m.Name != "/sched/gomaxprocs:threads" {
@ -135,9 +135,9 @@ func MutatorUtilizationV2(events []Event, flags UtilFlags) [][]MutatorUtil {
switch ev.Kind() {
case EventRangeActive:
if seenSync {
// If we've seen a sync, then we can be sure we're not finding out about
// something late; we have complete information after that point, and these
if nSync > 1 {
// If we've seen a full generation, then we can be sure we're not finding out
// about something late; we have complete information after that point, and these
// active events will just be redundant.
break
}

@ -27,6 +27,7 @@ type generation struct {
batches map[ThreadID][]batch
batchMs []ThreadID
cpuSamples []cpuSample
minTs timestamp
*evTable
}
@ -100,6 +101,9 @@ func readGeneration(r *bufio.Reader, spill *spilledBatch) (*generation, *spilled
// problem as soon as we see it.
return nil, nil, fmt.Errorf("generations out of order")
}
if g.minTs == 0 || b.time < g.minTs {
g.minTs = b.time
}
if err := processBatch(g, b); err != nil {
return nil, nil, err
}
@ -163,10 +167,10 @@ func processBatch(g *generation, b batch) error {
}
g.freq = freq
case b.exp != event.NoExperiment:
if g.expData == nil {
g.expData = make(map[event.Experiment]*ExperimentalData)
if g.expBatches == nil {
g.expBatches = make(map[event.Experiment][]ExperimentalBatch)
}
if err := addExperimentalData(g.expData, b); err != nil {
if err := addExperimentalBatch(g.expBatches, b); err != nil {
return err
}
default:
@ -435,18 +439,13 @@ func parseFreq(b batch) (frequency, error) {
return frequency(1.0 / (float64(f) / 1e9)), nil
}
// addExperimentalData takes an experimental batch and adds it to the ExperimentalData
// for the experiment its a part of.
func addExperimentalData(expData map[event.Experiment]*ExperimentalData, b batch) error {
// addExperimentalBatch takes an experimental batch and adds it to the list of experimental
// batches for the experiment its a part of.
func addExperimentalBatch(expBatches map[event.Experiment][]ExperimentalBatch, b batch) error {
if b.exp == event.NoExperiment {
return fmt.Errorf("internal error: addExperimentalData called on non-experimental batch")
return fmt.Errorf("internal error: addExperimentalBatch called on non-experimental batch")
}
ed, ok := expData[b.exp]
if !ok {
ed = new(ExperimentalData)
expData[b.exp] = ed
}
ed.Batches = append(ed.Batches, ExperimentalBatch{
expBatches[b.exp] = append(expBatches[b.exp], ExperimentalBatch{
Thread: b.m,
Data: b.data,
})

@ -343,6 +343,14 @@ func (l *Events) Pop() (*Event, bool) {
return ptr, true
}
func (l *Events) Peek() (*Event, bool) {
if l.off == l.n {
return nil, false
}
a, b := l.index(l.off)
return &l.buckets[a][b], true
}
func (l *Events) All() func(yield func(ev *Event) bool) {
return func(yield func(ev *Event) bool) {
for i := 0; i < l.Len(); i++ {

@ -17,16 +17,21 @@ import (
)
// Reader reads a byte stream, validates it, and produces trace events.
//
// Provided the trace is non-empty the Reader always produces a Sync
// event as the first event, and a Sync event as the last event.
// (There may also be any number of Sync events in the middle, too.)
type Reader struct {
r *bufio.Reader
lastTs Time
gen *generation
spill *spilledBatch
spillErr error // error from reading spill
frontier []*batchCursor
cpuSamples []cpuSample
order ordering
emittedSync bool
r *bufio.Reader
lastTs Time
gen *generation
spill *spilledBatch
spillErr error // error from reading spill
frontier []*batchCursor
cpuSamples []cpuSample
order ordering
syncs int
done bool
go121Events *oldTraceConverter
}
@ -56,8 +61,6 @@ func NewReader(r io.Reader) (*Reader, error) {
gStates: make(map[GoID]*gState),
activeTasks: make(map[TaskID]taskState),
},
// Don't emit a sync event when we first go to emit events.
emittedSync: true,
}, nil
default:
return nil, fmt.Errorf("unknown or unsupported version go 1.%d", v)
@ -66,13 +69,30 @@ func NewReader(r io.Reader) (*Reader, error) {
// ReadEvent reads a single event from the stream.
//
// If the stream has been exhausted, it returns an invalid
// event and io.EOF.
// If the stream has been exhausted, it returns an invalid event and io.EOF.
func (r *Reader) ReadEvent() (e Event, err error) {
// Return only io.EOF if we're done.
if r.done {
return Event{}, io.EOF
}
// Handle old execution traces.
if r.go121Events != nil {
if r.syncs == 0 {
// Always emit a sync event first, if we have any events at all.
ev, ok := r.go121Events.events.Peek()
if ok {
r.syncs++
return syncEvent(r.go121Events.evt, Time(ev.Ts-1), r.syncs), nil
}
}
ev, err := r.go121Events.next()
if err != nil {
// XXX do we have to emit an EventSync when the trace is done?
if err == io.EOF {
// Always emit a sync event at the end.
r.done = true
r.syncs++
return syncEvent(nil, r.go121Events.lastTs+1, r.syncs), nil
} else if err != nil {
return Event{}, err
}
return ev, nil
@ -115,10 +135,6 @@ func (r *Reader) ReadEvent() (e Event, err error) {
// Check if we need to refresh the generation.
if len(r.frontier) == 0 && len(r.cpuSamples) == 0 {
if !r.emittedSync {
r.emittedSync = true
return syncEvent(r.gen.evTable, r.lastTs), nil
}
if r.spillErr != nil {
return Event{}, r.spillErr
}
@ -127,8 +143,10 @@ func (r *Reader) ReadEvent() (e Event, err error) {
// and there's nothing left in the frontier, and
// there's no spilled batch, indicating that there's
// no further generation, it means we're done.
// Return io.EOF.
return Event{}, io.EOF
// Emit the final sync event.
r.done = true
r.syncs++
return syncEvent(nil, r.lastTs, r.syncs), nil
}
// Read the next generation.
var err error
@ -155,9 +173,12 @@ func (r *Reader) ReadEvent() (e Event, err error) {
}
r.frontier = heapInsert(r.frontier, bc)
}
// Reset emittedSync.
r.emittedSync = false
r.syncs++
if r.lastTs == 0 {
r.lastTs = r.gen.freq.mul(r.gen.minTs)
}
// Always emit a sync event at the beginning of the generation.
return syncEvent(r.gen.evTable, r.lastTs, r.syncs), nil
}
tryAdvance := func(i int) (bool, error) {
bc := r.frontier[i]

@ -14,14 +14,14 @@ import (
// Validator is a type used for validating a stream of trace.Events.
type Validator struct {
lastTs trace.Time
gs map[trace.GoID]*goState
ps map[trace.ProcID]*procState
ms map[trace.ThreadID]*schedContext
ranges map[trace.ResourceID][]string
tasks map[trace.TaskID]string
seenSync bool
Go121 bool
lastTs trace.Time
gs map[trace.GoID]*goState
ps map[trace.ProcID]*procState
ms map[trace.ThreadID]*schedContext
ranges map[trace.ResourceID][]string
tasks map[trace.TaskID]string
nSync int
Go121 bool
}
type schedContext struct {
@ -60,7 +60,7 @@ func (v *Validator) Event(ev trace.Event) error {
// Validate timestamp order.
if v.lastTs != 0 {
if ev.Time() <= v.lastTs {
e.Errorf("timestamp out-of-order for %+v", ev)
e.Errorf("timestamp out-of-order (want > %v) for %+v", v.lastTs, ev)
} else {
v.lastTs = ev.Time()
}
@ -73,8 +73,11 @@ func (v *Validator) Event(ev trace.Event) error {
switch ev.Kind() {
case trace.EventSync:
// Just record that we've seen a Sync at some point.
v.seenSync = true
s := ev.Sync()
if s.N != v.nSync+1 {
e.Errorf("sync count is not sequential: expected %d, got %d", v.nSync+1, s.N)
}
v.nSync = s.N
case trace.EventMetric:
m := ev.Metric()
if !strings.Contains(m.Name, ":") {
@ -140,7 +143,7 @@ func (v *Validator) Event(ev trace.Event) error {
if new == trace.GoUndetermined {
e.Errorf("transition to undetermined state for goroutine %d", id)
}
if v.seenSync && old == trace.GoUndetermined {
if v.nSync > 1 && old == trace.GoUndetermined {
e.Errorf("undetermined goroutine %d after first global sync", id)
}
if new == trace.GoNotExist && v.hasAnyRange(trace.MakeResourceID(id)) {
@ -193,7 +196,7 @@ func (v *Validator) Event(ev trace.Event) error {
if new == trace.ProcUndetermined {
e.Errorf("transition to undetermined state for proc %d", id)
}
if v.seenSync && old == trace.ProcUndetermined {
if v.nSync > 1 && old == trace.ProcUndetermined {
e.Errorf("undetermined proc %d after first global sync", id)
}
if new == trace.ProcNotExist && v.hasAnyRange(trace.MakeResourceID(id)) {

@ -990,7 +990,8 @@ func TestCrashWhileTracing(t *testing.T) {
if err != nil {
t.Fatalf("could not create trace.NewReader: %v", err)
}
var seen, seenSync bool
var seen bool
nSync := 0
i := 1
loop:
for ; ; i++ {
@ -1005,7 +1006,7 @@ loop:
}
switch ev.Kind() {
case traceparse.EventSync:
seenSync = true
nSync = ev.Sync().N
case traceparse.EventLog:
v := ev.Log()
if v.Category == "xyzzy-cat" && v.Message == "xyzzy-msg" {
@ -1019,7 +1020,7 @@ loop:
if err := cmd.Wait(); err == nil {
t.Error("the process should have panicked")
}
if !seenSync {
if nSync <= 1 {
t.Errorf("expected at least one full generation to have been emitted before the trace was considered broken")
}
if !seen {