Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 134 additions & 72 deletions runtime/metricsview/executor/executor_rewrite_rollup.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ const (
// Rollup skip reasons: early disqualification
const (
skipRawRows = "raw_rows"
skipComparisonTimeRange = "comparison_time_range"
skipNonPrimaryTimeDimension = "non_primary_time_dimension"
)

Expand All @@ -49,21 +48,23 @@ const (
//
// Routing decision:
//
// 1. Quick disqualification: raw-row queries and comparison time range queries are skipped.
// 1. Quick disqualification: raw-row queries and queries referencing a non-primary time dimension are skipped.
//
// 2. Eligibility (per rollup): a rollup is eligible only if all of these hold:
// a. Query time grain is derivable from the rollup grain (e.g. month from day).
// b. For day+ grains, the query timezone matches the rollup timezone.
// c. Query time range start is aligned to the rollup grain.
// c. Both the base and (when present) comparison time range starts are aligned to the rollup grain.
// d. All queried dimensions are present in the rollup.
// e. The time range's time dimension is available in the rollup.
// f. All queried measures are present; computed measures (count, count_distinct) are rejected.
// e. The base and comparison time range time dimensions are available in the rollup.
// f. All queried measures are present. Computed measures are rejected except for comparison-derived
// computes (ComparisonValue / ComparisonDelta / ComparisonRatio / ComparisonTime), which are
// allowed as long as their referenced base measure is in the rollup.
// g. All WHERE filter dimensions are present in the rollup.
//
// 3. Time coverage: an eligible rollup must cover the requested time range.
// 3. Time coverage: an eligible rollup must cover both the base and (when present) comparison time ranges.
// For explicit time ranges, the query range is clamped to the base table's actual data range first.
// For no-time-range queries ("all data"), the rollup must cover the base table's full min/max range.
// Additionally, if the base table has data beyond the query end, the query end must be aligned
// Additionally, if the base table has data beyond a range's end, that end must be aligned
// to the rollup grain (to prevent the last bucket from pulling in extra data).
//
// 4. Selection: among eligible rollups, prefer the coarsest grain (fewer rows to scan).
Expand Down Expand Up @@ -100,18 +101,6 @@ func (e *Executor) rewriteQueryForRollup(ctx context.Context, qry *metricsview.Q
return nil, nil
}

// Disqualify: queries with comparison time ranges
if qry.ComparisonTimeRange != nil {
_, span := tracer.Start(ctx, "rollup.selection")
span.SetAttributes(
attribute.Int("rollup.candidate_count", len(e.metricsView.Rollups)),
attribute.String("rollup.result", "skipped"),
attribute.String("rollup.skip_reason", skipComparisonTimeRange),
)
span.End()
return nil, nil
}

// Disqualify: queries using a non-primary time dimension (rollups are built on the primary)
if qry.TimeRange != nil && qry.TimeRange.TimeDimension != "" && qry.TimeRange.TimeDimension != e.metricsView.TimeDimension {
_, span := tracer.Start(ctx, "rollup.selection")
Expand All @@ -130,9 +119,10 @@ func (e *Executor) rewriteQueryForRollup(ctx context.Context, qry *metricsview.Q
// Extract dimension names from the WHERE clause
whereDims := collectWhereDimensions(qry.Where)

// Determine whether the query has a non-zero time range using start and end to make sure they are resolved. At this point
// e.RewriteQueryTimeRanges is called earlier, and thus other fields would be unset, and only start and end will be set.
// Determine whether the query has a non-zero time range / comparison time range using start and end to make sure they are resolved.
// At this point e.RewriteQueryTimeRanges is called earlier, and thus other fields would be unset, and only start and end will be set.
hasTimeRange := qry.TimeRange != nil && (!qry.TimeRange.Start.IsZero() || !qry.TimeRange.End.IsZero())
hasComparisonTimeRange := qry.ComparisonTimeRange != nil && (!qry.ComparisonTimeRange.Start.IsZero() || !qry.ComparisonTimeRange.End.IsZero())

// Parent span for rollup selection
selectionCtx, selectionSpan := tracer.Start(ctx, "rollup.selection")
Expand Down Expand Up @@ -216,32 +206,13 @@ func (e *Executor) rewriteQueryForRollup(ctx context.Context, qry *metricsview.Q
}
rollupEffEnd := timeutil.OffsetTime(rollupMax, timeutil.TimeGrainFromAPI(rollup.TimeGrain), 1, rollupLoc)

// Check coverage for the base time range (or full base table range when no time range is set),
// and additionally for the comparison time range when present.
rangeRejected := false
if hasTimeRange {
// Clamp query range to the base table's actual data range.
// This ensures a rollup isn't rejected when the query extends beyond both the base table and rollup.
effectiveStart := qry.TimeRange.Start
if !effectiveStart.IsZero() && baseMin.After(effectiveStart) {
effectiveStart = baseMin
}
effectiveEnd := qry.TimeRange.End
if !effectiveEnd.IsZero() && baseMax.Before(effectiveEnd) {
effectiveEnd = baseMax
}

// Check coverage: rollup must cover the effective (clamped) range
if !effectiveStart.IsZero() && rollupMin.After(effectiveStart) {
rejectCandidate(rejectStartNotCovered,
attribute.String("rollup.effective_start", effectiveStart.Format(time.RFC3339)),
attribute.String("rollup.rollup_min", rollupMin.Format(time.RFC3339)),
)
continue
}
if !effectiveEnd.IsZero() && rollupEffEnd.Before(effectiveEnd) {
rejectCandidate(rejectEndNotCovered,
attribute.String("rollup.effective_end", effectiveEnd.Format(time.RFC3339)),
attribute.String("rollup.rollup_eff_end", rollupEffEnd.Format(time.RFC3339)),
)
continue
if reason, attrs := checkRangeCoverage(qry.TimeRange, baseMin, baseMax, rollupMin, rollupEffEnd); reason != "" {
rejectCandidate(reason, attrs...)
rangeRejected = true
}
} else {
// No time range: rollup must cover the base table's full range
Expand All @@ -250,28 +221,39 @@ func (e *Executor) rewriteQueryForRollup(ctx context.Context, qry *metricsview.Q
attribute.String("rollup.base_min", baseMin.Format(time.RFC3339)),
attribute.String("rollup.rollup_min", rollupMin.Format(time.RFC3339)),
)
continue
}
if rollupEffEnd.Before(baseMax) {
rangeRejected = true
} else if rollupEffEnd.Before(baseMax) {
rejectCandidate(rejectEndNotCovered,
attribute.String("rollup.base_max", baseMax.Format(time.RFC3339)),
attribute.String("rollup.rollup_eff_end", rollupEffEnd.Format(time.RFC3339)),
)
continue
rangeRejected = true
}
}
if !rangeRejected && hasComparisonTimeRange {
if reason, attrs := checkRangeCoverage(qry.ComparisonTimeRange, baseMin, baseMax, rollupMin, rollupEffEnd); reason != "" {
rejectCandidate(reason, attrs...)
rangeRejected = true
}
}
if rangeRejected {
continue
}

// Check end alignment now: if data extends beyond the query end and the end is not aligned to the rollup grain,
// the last rollup bucket would include data beyond the requested range. rollupEligible only checks start alignment.
// Essentially it just check if base has data >= query end time, then makes sure the query end time is rollup grain aligned
if hasTimeRange && !qry.TimeRange.End.IsZero() && !baseMax.Before(qry.TimeRange.End) &&
!timeAligned(qry.TimeRange.End, rollup.TimeGrain, rollupLoc, e.metricsView.FirstDayOfWeek) {
rejectCandidate(rejectEndNotAligned,
attribute.String("rollup.query_end", qry.TimeRange.End.Format(time.RFC3339)),
attribute.String("rollup.base_max", baseMax.Format(time.RFC3339)),
attribute.String("rollup.rollup_grain", rollup.TimeGrain.String()),
)
continue
// Applies to both the base and comparison time ranges.
if hasTimeRange {
if reason, attrs := checkEndAlignment(qry.TimeRange.End, baseMax, rollup.TimeGrain, rollupLoc, e.metricsView.FirstDayOfWeek); reason != "" {
rejectCandidate(reason, attrs...)
continue
}
}
if hasComparisonTimeRange {
if reason, attrs := checkEndAlignment(qry.ComparisonTimeRange.End, baseMax, rollup.TimeGrain, rollupLoc, e.metricsView.FirstDayOfWeek); reason != "" {
rejectCandidate(reason, attrs...)
continue
}
}

candidateSpan.SetAttributes(attribute.String("rollup.eligible", "true"))
Expand Down Expand Up @@ -332,22 +314,27 @@ func rollupEligible(rollup *runtimev1.MetricsViewSpec_Rollup, qry *metricsview.Q
}
}

// 3. Start time aligned to rollup grain (use rollup timezone for alignment).
// End alignment is checked conditionally in the coverage phase: only when the base table
// has data beyond the query end (to prevent the last rollup bucket from pulling in extra data).
if qry.TimeRange != nil && !qry.TimeRange.Start.IsZero() {
rollupLoc := time.UTC
if rollup.TimeZone != "" {
loc, err := time.LoadLocation(rollup.TimeZone)
if err != nil {
return false, "", fmt.Errorf("invalid timezone %q for rollup %q: %w", rollup.TimeZone, rollup.Table, err)
}
rollupLoc = loc
// 3. Start time aligned to rollup grain (use rollup timezone for alignment) for both the base
// and comparison ranges. End alignment is checked conditionally in the coverage phase: only when
// the base table has data beyond the query end (to prevent the last rollup bucket from pulling in extra data).
rollupLoc := time.UTC
if rollup.TimeZone != "" {
loc, err := time.LoadLocation(rollup.TimeZone)
if err != nil {
return false, "", fmt.Errorf("invalid timezone %q for rollup %q: %w", rollup.TimeZone, rollup.Table, err)
}
rollupLoc = loc
}
if qry.TimeRange != nil && !qry.TimeRange.Start.IsZero() {
if !timeAligned(qry.TimeRange.Start, rollup.TimeGrain, rollupLoc, firstDayOfWeek) {
return false, rejectStartNotAligned, nil
}
}
if qry.ComparisonTimeRange != nil && !qry.ComparisonTimeRange.Start.IsZero() {
if !timeAligned(qry.ComparisonTimeRange.Start, rollup.TimeGrain, rollupLoc, firstDayOfWeek) {
return false, rejectStartNotAligned, nil
}
}

// 4. All query dimensions present in rollup
rollupDims := make(map[string]bool, len(rollup.Dimensions))
Expand Down Expand Up @@ -386,14 +373,26 @@ func rollupEligible(rollup *runtimev1.MetricsViewSpec_Rollup, qry *metricsview.Q
}

// 6. All queried measures present in rollup; reject computed measures (count, count_distinct, etc.)
// since they produce incorrect results on pre-aggregated rollup tables.
// since they produce incorrect results on pre-aggregated rollup tables. Comparison-derived
// computed measures (ComparisonValue / ComparisonDelta / ComparisonRatio / ComparisonTime) are
// allowed because they are pure SQL wrappers over a referenced base measure: the actual
// aggregation is done by the referenced measure, which itself must be in the rollup.
rollupMeasures := make(map[string]bool, len(rollup.Measures))
for _, m := range rollup.Measures {
rollupMeasures[strings.ToLower(m)] = true
}
for _, m := range qry.Measures {
if m.Compute != nil {
return false, rejectComputedMeasure, nil
refName, ok := comparisonReferencedMeasure(m.Compute)
if !ok {
// Non-comparison compute (count, count_distinct, URI, percent_of_total, etc.)
return false, rejectComputedMeasure, nil
}
// refName == "" for ComparisonTime: pure date arithmetic, no measure dependency.
if refName != "" && !rollupMeasures[strings.ToLower(refName)] {
return false, rejectMeasureMissing, nil
}
continue
}
if !rollupMeasures[strings.ToLower(m.Name)] {
return false, rejectMeasureMissing, nil
Expand Down Expand Up @@ -482,6 +481,69 @@ func collectWhereDimensionsRec(expr *metricsview.Expression, dims map[string]boo
}
}

// checkRangeCoverage verifies that the rollup covers the given time range. The query range is first
// clamped to the base table's actual data range so a rollup isn't rejected when the query extends
// beyond the base table. Returns the reject reason and trace attributes, or "" if covered.
func checkRangeCoverage(tr *metricsview.TimeRange, baseMin, baseMax, rollupMin, rollupEffEnd time.Time) (string, []attribute.KeyValue) {
effectiveStart := tr.Start
if !effectiveStart.IsZero() && baseMin.After(effectiveStart) {
effectiveStart = baseMin
}
effectiveEnd := tr.End
if !effectiveEnd.IsZero() && baseMax.Before(effectiveEnd) {
effectiveEnd = baseMax
}
if !effectiveStart.IsZero() && rollupMin.After(effectiveStart) {
return rejectStartNotCovered, []attribute.KeyValue{
attribute.String("rollup.effective_start", effectiveStart.Format(time.RFC3339)),
attribute.String("rollup.rollup_min", rollupMin.Format(time.RFC3339)),
}
}
if !effectiveEnd.IsZero() && rollupEffEnd.Before(effectiveEnd) {
return rejectEndNotCovered, []attribute.KeyValue{
attribute.String("rollup.effective_end", effectiveEnd.Format(time.RFC3339)),
attribute.String("rollup.rollup_eff_end", rollupEffEnd.Format(time.RFC3339)),
}
}
return "", nil
}

// checkEndAlignment enforces grain alignment on a range's end when the base table has data at or
// beyond that end (otherwise the last rollup bucket would pull in data past the requested range).
// Returns the reject reason and trace attributes, or "" if no enforcement is needed or it passes.
func checkEndAlignment(end, baseMax time.Time, grain runtimev1.TimeGrain, loc *time.Location, firstDayOfWeek uint32) (string, []attribute.KeyValue) {
if end.IsZero() || baseMax.Before(end) {
return "", nil
}
if timeAligned(end, grain, loc, firstDayOfWeek) {
return "", nil
}
return rejectEndNotAligned, []attribute.KeyValue{
attribute.String("rollup.query_end", end.Format(time.RFC3339)),
attribute.String("rollup.base_max", baseMax.Format(time.RFC3339)),
attribute.String("rollup.rollup_grain", grain.String()),
}
}

// comparisonReferencedMeasure returns the base measure referenced by a comparison-derived compute,
// along with ok=true if the compute is a comparison type that's safe to evaluate against a rollup.
// Returns ("", true) for ComparisonTime since it is pure date arithmetic with no measure dependency.
// Returns ("", false) for non-comparison computes (count, count_distinct, percent_of_total, uri),
// which cannot be correctly re-aggregated from a pre-aggregated rollup table.
func comparisonReferencedMeasure(c *metricsview.MeasureCompute) (string, bool) {
switch {
case c.ComparisonValue != nil:
return c.ComparisonValue.Measure, true
case c.ComparisonDelta != nil:
return c.ComparisonDelta.Measure, true
case c.ComparisonRatio != nil:
return c.ComparisonRatio.Measure, true
case c.ComparisonTime != nil:
return "", true
}
return "", false
}

// normalizeTimezone validates and normalizes a timezone string for comparison.
// It normalizes UTC variants (empty, "UTC", "Etc/UTC") to "UTC".
// Note: Go's time.LoadLocation preserves the input name, so aliases like "US/Eastern"
Expand Down
Loading