From 847cc2fb5d3944689d3084347749a07c3a970732 Mon Sep 17 00:00:00 2001 From: Paul Wells Date: Thu, 5 Mar 2026 10:16:52 -0800 Subject: [PATCH 1/2] encode monotonic time in wall clock (#1439) * encode monotonic time in wall clock * Create mighty-beans-taste.md --- .changeset/mighty-beans-taste.md | 5 +++ utils/mono/mono.go | 45 ++++++++------------- utils/mono/mono_test.go | 67 ++++++++++++++++++++++++++++++-- 3 files changed, 85 insertions(+), 32 deletions(-) create mode 100644 .changeset/mighty-beans-taste.md diff --git a/.changeset/mighty-beans-taste.md b/.changeset/mighty-beans-taste.md new file mode 100644 index 000000000..d01e63a50 --- /dev/null +++ b/.changeset/mighty-beans-taste.md @@ -0,0 +1,5 @@ +--- +"@livekit/protocol": patch +--- + +encode monotonic time in wall clock diff --git a/utils/mono/mono.go b/utils/mono/mono.go index cfba51650..1526501ec 100644 --- a/utils/mono/mono.go +++ b/utils/mono/mono.go @@ -12,14 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Package mono enforces use of monotonic time when creating/parsing time.Time from external sources. -// -// Using time.Now produces monotonic time values that correctly measure time difference in the presence of clock resets. -// -// On the other hand, time produce by time.Unix or time.Parse doesn't have this property. Clock reset may lead to incorrect -// durations computed from these timestamps. To fix this, prefer using Unix and Parse provided by this package. -// -// Monotonic time could also be erased when using functions like Truncate, Round, In, UTC. Be careful when using these. +// These APIs encode monotonic time into time.Time wall-clock fields. Returned +// values intentionally do not carry Go's internal monotonic payload and are +// meant to be compared with other mono timestamps. // // More details: https://go.googlesource.com/proposal/+/master/design/12914-monotonic.md package mono @@ -38,37 +33,31 @@ func resetClock() { epochNano = epoch.UnixNano() } -// jumpClock adjusts reference timestamp by a given duration emulating a clock reset/jump. -// Used in tests only. +// jumpClock adjusts reference timestamp by a given duration emulating a clock +// reset/jump. Used in tests only. func jumpClock(dt time.Duration) { epoch = epoch.Add(-dt) // we pretend time.Now() jumps, not the reference epochNano = epoch.UnixNano() } -// FromTime ensures that time.Time value uses monotonic clock. -// -// Deprecated: You should probably use Unix or Parse instead. +// FromTime creates a Time from the monotonic part of t. Note that the monotonic +// part of t could have been erased when using functions like Truncate, Round, +// In, UTC, etc... Be careful when using this func FromTime(t time.Time) time.Time { - return fromTime(t) -} - -func fromTime(t time.Time) time.Time { if t.IsZero() { return time.Time{} } - return epoch.Add(t.Sub(epoch)) + return time.Unix(0, epochNano+int64(t.Sub(epoch))) } -// Now is a wrapper for time.Time. -// -// Deprecated: time.Now always uses monotonic clock. +// Now creates a monotonic time without reading the system wall clock func Now() time.Time { - return time.Now() + return time.Unix(0, epochNano+int64(time.Since(epoch))) } // Unix is an analog of time.Unix that produces monotonic time. func Unix(sec, nsec int64) time.Time { - return fromTime(time.Unix(sec, nsec)) + return FromTime(time.Unix(sec, nsec)) } // Parse is an analog of time.Parse that produces monotonic time. @@ -77,17 +66,17 @@ func Parse(layout, value string) (time.Time, error) { if err != nil { return time.Time{}, err } - return fromTime(t), nil + return FromTime(t), nil } -// UnixNano returns the number of nanoseconds elapsed, based on the application start time. -// This value may be different from time.Now().UnixNano() in the presence of time resets. +// UnixNano returns the number of nanoseconds elapsed, based on the application +// start time. func UnixNano() int64 { return epochNano + int64(time.Since(epoch)) } -// UnixMicro returns the number of microseconds elapsed, based on the application start time. -// This value may be different from time.Now().UnixMicro() in the presence of time resets. +// UnixMicro returns the number of microseconds elapsed, based on the +// application start time. func UnixMicro() int64 { return UnixNano() / 1000 } diff --git a/utils/mono/mono_test.go b/utils/mono/mono_test.go index 2e89ce94a..efe7d58db 100644 --- a/utils/mono/mono_test.go +++ b/utils/mono/mono_test.go @@ -9,7 +9,7 @@ import ( func TestMonoZero(t *testing.T) { ts := time.Time{} - ts2 := fromTime(ts) + ts2 := FromTime(ts) require.True(t, ts.IsZero()) require.True(t, ts2.IsZero()) require.True(t, ts.Equal(ts2)) @@ -17,16 +17,75 @@ func TestMonoZero(t *testing.T) { } func TestMono(t *testing.T) { - t.Cleanup(resetClock) // restore + t.Cleanup(resetClock) ts1 := time.Now() ts2 := ts1.Add(time.Second) - ts1m := fromTime(ts1) + ts1m := FromTime(ts1) // emulate a clock reset, +1h jump // TODO: use synctest when we switch to Go 1.25 jumpClock(time.Hour) - ts2m := fromTime(ts2) + ts2m := FromTime(ts2) require.Equal(t, ts2.Sub(ts1), ts2m.Sub(ts1m)) } + +func TestNoGoMonotonicPayload(t *testing.T) { + t.Cleanup(resetClock) + + now := Now() + fromTime := FromTime(time.Now()) + fromUnix := Unix(123, 456) + fromParse, err := Parse(time.RFC3339Nano, "2026-03-05T12:34:56.789123456Z") + require.NoError(t, err) + + require.Equal(t, now, now.Round(0)) + require.Equal(t, fromTime, fromTime.Round(0)) + require.Equal(t, fromUnix, fromUnix.Round(0)) + require.Equal(t, fromParse, fromParse.Round(0)) +} + +func TestSerializationRoundTripComparableAcrossClockJump(t *testing.T) { + t.Cleanup(resetClock) + + t1 := Now() + t2 := t1.Add(10 * time.Second) + + t1Encoded := t1.UnixNano() + t2Encoded := t2.UnixNano() + + // emulate a clock reset, +1h jump + jumpClock(time.Hour) + + t1Decoded := Unix(0, t1Encoded) + t2Decoded := Unix(0, t2Encoded) + + require.Equal(t, t2.Sub(t1), t2Decoded.Sub(t1Decoded)) +} + +func TestNowProgressesAcrossClockJump(t *testing.T) { + t.Cleanup(resetClock) + + t1 := Now() + time.Sleep(time.Millisecond) + + // emulate a clock reset, +1h jump + jumpClock(time.Hour) + t2 := Now() + + require.Greater(t, t2.Sub(t1), time.Duration(0)) +} + +func BenchmarkTime(b *testing.B) { + b.Run("Now()", func(b *testing.B) { + for b.Loop() { + _ = Now() + } + }) + b.Run("time.Now()", func(b *testing.B) { + for b.Loop() { + _ = time.Now() + } + }) +} From 1ab9b313414668f73894be318448063a56d33153 Mon Sep 17 00:00:00 2001 From: Tyler Barrus Date: Thu, 5 Mar 2026 16:25:49 -0500 Subject: [PATCH 2/2] update gateway reporter (#1442) --- observability/gatewayobs/gen_reporter.go | 37 ++++++++++- observability/gatewayobs/gen_reporter_noop.go | 65 +++++++++++++++++-- 2 files changed, 94 insertions(+), 8 deletions(-) diff --git a/observability/gatewayobs/gen_reporter.go b/observability/gatewayobs/gen_reporter.go index 006a38efa..68084852e 100644 --- a/observability/gatewayobs/gen_reporter.go +++ b/observability/gatewayobs/gen_reporter.go @@ -6,7 +6,7 @@ import ( "time" ) -const Version_OL9B4A8 = true +const Version_NI039G8 = true type KeyResolver interface { Resolve(string) @@ -24,9 +24,42 @@ type ProjectReporter interface { RegisterFunc(func(ts time.Time, tx ProjectTx) bool) Tx(func(tx ProjectTx)) TxAt(time.Time, func(tx ProjectTx)) + WithRequestedPriority(priority string) RequestedPriorityReporter + WithDeferredRequestedPriority() (RequestedPriorityReporter, KeyResolver) + ProjectTx +} + +type RequestedPriorityTx interface{} + +type RequestedPriorityReporter interface { + RegisterFunc(func(ts time.Time, tx RequestedPriorityTx) bool) + Tx(func(tx RequestedPriorityTx)) + TxAt(time.Time, func(tx RequestedPriorityTx)) + WithGrantedPriority(priority string) GrantedPriorityReporter + WithDeferredGrantedPriority() (GrantedPriorityReporter, KeyResolver) + RequestedPriorityTx +} + +type GrantedPriorityTx interface{} + +type GrantedPriorityReporter interface { + RegisterFunc(func(ts time.Time, tx GrantedPriorityTx) bool) + Tx(func(tx GrantedPriorityTx)) + TxAt(time.Time, func(tx GrantedPriorityTx)) + WithBillablePriority(priority string) BillablePriorityReporter + WithDeferredBillablePriority() (BillablePriorityReporter, KeyResolver) + GrantedPriorityTx +} + +type BillablePriorityTx interface{} + +type BillablePriorityReporter interface { + RegisterFunc(func(ts time.Time, tx BillablePriorityTx) bool) + Tx(func(tx BillablePriorityTx)) + TxAt(time.Time, func(tx BillablePriorityTx)) WithProvider(name string) ProviderReporter WithDeferredProvider() (ProviderReporter, KeyResolver) - ProjectTx + BillablePriorityTx } type ProviderTx interface{} diff --git a/observability/gatewayobs/gen_reporter_noop.go b/observability/gatewayobs/gen_reporter_noop.go index 6c7d9070b..328b69560 100644 --- a/observability/gatewayobs/gen_reporter_noop.go +++ b/observability/gatewayobs/gen_reporter_noop.go @@ -7,10 +7,13 @@ import ( ) var ( - _ Reporter = (*noopReporter)(nil) - _ ProjectReporter = (*noopProjectReporter)(nil) - _ ProviderReporter = (*noopProviderReporter)(nil) - _ ModelReporter = (*noopModelReporter)(nil) + _ Reporter = (*noopReporter)(nil) + _ ProjectReporter = (*noopProjectReporter)(nil) + _ RequestedPriorityReporter = (*noopRequestedPriorityReporter)(nil) + _ GrantedPriorityReporter = (*noopGrantedPriorityReporter)(nil) + _ BillablePriorityReporter = (*noopBillablePriorityReporter)(nil) + _ ProviderReporter = (*noopProviderReporter)(nil) + _ ModelReporter = (*noopModelReporter)(nil) ) type noopKeyResolver struct{} @@ -41,10 +44,60 @@ func NewNoopProjectReporter() ProjectReporter { func (r *noopProjectReporter) RegisterFunc(f func(ts time.Time, tx ProjectTx) bool) {} func (r *noopProjectReporter) Tx(f func(ProjectTx)) {} func (r *noopProjectReporter) TxAt(ts time.Time, f func(ProjectTx)) {} -func (r *noopProjectReporter) WithProvider(name string) ProviderReporter { +func (r *noopProjectReporter) WithRequestedPriority(priority string) RequestedPriorityReporter { + return &noopRequestedPriorityReporter{} +} +func (r *noopProjectReporter) WithDeferredRequestedPriority() (RequestedPriorityReporter, KeyResolver) { + return &noopRequestedPriorityReporter{}, noopKeyResolver{} +} + +type noopRequestedPriorityReporter struct{} + +func NewNoopRequestedPriorityReporter() RequestedPriorityReporter { + return &noopRequestedPriorityReporter{} +} + +func (r *noopRequestedPriorityReporter) RegisterFunc(f func(ts time.Time, tx RequestedPriorityTx) bool) { +} +func (r *noopRequestedPriorityReporter) Tx(f func(RequestedPriorityTx)) {} +func (r *noopRequestedPriorityReporter) TxAt(ts time.Time, f func(RequestedPriorityTx)) {} +func (r *noopRequestedPriorityReporter) WithGrantedPriority(priority string) GrantedPriorityReporter { + return &noopGrantedPriorityReporter{} +} +func (r *noopRequestedPriorityReporter) WithDeferredGrantedPriority() (GrantedPriorityReporter, KeyResolver) { + return &noopGrantedPriorityReporter{}, noopKeyResolver{} +} + +type noopGrantedPriorityReporter struct{} + +func NewNoopGrantedPriorityReporter() GrantedPriorityReporter { + return &noopGrantedPriorityReporter{} +} + +func (r *noopGrantedPriorityReporter) RegisterFunc(f func(ts time.Time, tx GrantedPriorityTx) bool) {} +func (r *noopGrantedPriorityReporter) Tx(f func(GrantedPriorityTx)) {} +func (r *noopGrantedPriorityReporter) TxAt(ts time.Time, f func(GrantedPriorityTx)) {} +func (r *noopGrantedPriorityReporter) WithBillablePriority(priority string) BillablePriorityReporter { + return &noopBillablePriorityReporter{} +} +func (r *noopGrantedPriorityReporter) WithDeferredBillablePriority() (BillablePriorityReporter, KeyResolver) { + return &noopBillablePriorityReporter{}, noopKeyResolver{} +} + +type noopBillablePriorityReporter struct{} + +func NewNoopBillablePriorityReporter() BillablePriorityReporter { + return &noopBillablePriorityReporter{} +} + +func (r *noopBillablePriorityReporter) RegisterFunc(f func(ts time.Time, tx BillablePriorityTx) bool) { +} +func (r *noopBillablePriorityReporter) Tx(f func(BillablePriorityTx)) {} +func (r *noopBillablePriorityReporter) TxAt(ts time.Time, f func(BillablePriorityTx)) {} +func (r *noopBillablePriorityReporter) WithProvider(name string) ProviderReporter { return &noopProviderReporter{} } -func (r *noopProjectReporter) WithDeferredProvider() (ProviderReporter, KeyResolver) { +func (r *noopBillablePriorityReporter) WithDeferredProvider() (ProviderReporter, KeyResolver) { return &noopProviderReporter{}, noopKeyResolver{} }