From 45b98575b8bfbf85a3f3584aa131c3b47b114cbd Mon Sep 17 00:00:00 2001 From: Tushar Malik Date: Fri, 29 May 2026 17:40:12 -0400 Subject: [PATCH] DGS-24388 - Update schema counts to match UI and billing --- .../command_cluster_describe.go | 43 +++++++++++------ pkg/ccloudv2/metrics.go | 48 +++++++++++++++++++ .../schema-registry/cluster/describe.golden | 4 +- test/test-server/metrics_handlers.go | 37 +++++++++++++- 4 files changed, 114 insertions(+), 18 deletions(-) diff --git a/internal/schema-registry/command_cluster_describe.go b/internal/schema-registry/command_cluster_describe.go index a8b93cec29..f9971ed874 100644 --- a/internal/schema-registry/command_cluster_describe.go +++ b/internal/schema-registry/command_cluster_describe.go @@ -1,6 +1,7 @@ package schemaregistry import ( + "encoding/json" "fmt" "math" "strconv" @@ -8,8 +9,6 @@ import ( "github.com/spf13/cobra" - metricsv2 "github.com/confluentinc/ccloud-sdk-go-v2/metrics/v2" - "github.com/confluentinc/cli/v4/pkg/ccloudv2" pcmd "github.com/confluentinc/cli/v4/pkg/cmd" "github.com/confluentinc/cli/v4/pkg/log" @@ -96,7 +95,11 @@ func (c *command) clusterDescribe(cmd *cobra.Command, _ []string) error { return err } - metricsResponse, httpResp, err := metricsClient.MetricsDatasetQuery("cloud", schemaCountQueryFor(cluster.GetId())) + queryBody, err := schemaCountQueryBodyFor(cluster.GetId()) + if err != nil { + return err + } + metricsResponse, httpResp, err := metricsClient.MetricsDatasetQueryRaw("cloud", queryBody) if err := ccloudv2.UnmarshalFlatQueryResponseIfDataSchemaMatchError(err, metricsResponse, httpResp); err != nil { return err } @@ -158,18 +161,30 @@ func (c *command) clusterDescribe(cmd *cobra.Command, _ []string) error { return table.Print() } -func schemaCountQueryFor(schemaRegistryId string) metricsv2.QueryRequest { - aggregations := []metricsv2.Aggregation{{Metric: "io.confluent.kafka.schema_registry/schema_count"}} - filter := metricsv2.Filter{ - FieldFilter: &metricsv2.FieldFilter{ - Field: metricsv2.PtrString("resource.schema_registry.id"), - Op: "EQ", - Value: metricsv2.StringAsFieldFilterValue(metricsv2.PtrString(schemaRegistryId)), +// schemaCountQueryBodyFor builds the Metrics-API request body for schema_count. +// schema_count is a GAUGE. For legacy LSRCs whose schemas span two PSRCs, each +// PSRC emits its own data point against the same LSRC ID, and the API's default +// MEAN time aggregation under-counts. We mirror cc-billing-worker's query +// (metrics/configurable/cloud_metrics_plugin.go) which uses the undocumented +// "time_agg" field to force MAX per series before "agg":"SUM" combines them. +// The v2 SDK doesn't expose "time_agg", so the body is built and sent raw. +func schemaCountQueryBodyFor(schemaRegistryId string) ([]byte, error) { + return json.Marshal(map[string]any{ + "aggregations": []map[string]any{{ + "time_agg": "MAX", + "agg": "SUM", + "metric": "io.confluent.kafka.schema_registry/schema_count", + }}, + "filter": map[string]any{ + "field": "resource.schema_registry.id", + "op": "EQ", + "value": schemaRegistryId, }, - } - req := metricsv2.NewQueryRequest(aggregations, "ALL", []string{"PT1M/now-2m|m"}) - req.SetFilter(filter) - return *req + "format": "FLAT", + "granularity": "PT1H", + "intervals": []string{"PT1H/now-2m|m"}, + "limit": 1000, + }) } func getMaxSchemaLimitPriceKey(serviceProvider, serviceProviderRegion, streamGovernancePackage string) string { diff --git a/pkg/ccloudv2/metrics.go b/pkg/ccloudv2/metrics.go index 7809cb1d14..a8f8a07785 100644 --- a/pkg/ccloudv2/metrics.go +++ b/pkg/ccloudv2/metrics.go @@ -1,8 +1,10 @@ package ccloudv2 import ( + "bytes" "context" "encoding/json" + "fmt" "io" "net/http" "time" @@ -49,6 +51,52 @@ func (c *MetricsClient) MetricsDatasetQuery(dataset string, query metricsv2.Quer return c.Version2Api.V2MetricsDatasetQueryPost(c.context(), dataset).QueryRequest(query).Execute() } +// MetricsDatasetQueryRaw posts a hand-built JSON body to /v2/metrics/{dataset}/query. +// Use this when the request needs a field the typed SDK doesn't expose (e.g. the +// undocumented "time_agg" knob that the Metrics API requires to override the +// gauge MEAN time aggregation; see schema-registry cluster describe). +func (c *MetricsClient) MetricsDatasetQueryRaw(dataset string, body []byte) (*metricsv2.QueryResponse, *http.Response, error) { + cfg := c.GetConfig() + if len(cfg.Servers) == 0 { + return nil, nil, fmt.Errorf("metrics client has no configured server") + } + url := fmt.Sprintf("%s/v2/metrics/%s/query", cfg.Servers[0].URL, dataset) + + req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, url, bytes.NewReader(body)) + if err != nil { + return nil, nil, err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json") + if token, err := auth.GetDataplaneToken(c.cfg.Context()); err == nil { + req.Header.Set("Authorization", "Bearer "+token) + } + + httpResp, err := cfg.HTTPClient.Do(req) + if err != nil { + return nil, httpResp, err + } + defer httpResp.Body.Close() + + respBody, err := io.ReadAll(httpResp.Body) + if err != nil { + return nil, httpResp, err + } + if httpResp.StatusCode >= 400 { + return nil, httpResp, fmt.Errorf("metrics API returned %d: %s", httpResp.StatusCode, string(respBody)) + } + + var flat flatQueryResponse + if err := json.Unmarshal(respBody, &flat); err != nil { + return nil, httpResp, err + } + points := make([]metricsv2.Point, len(flat.Data)) + for i, p := range flat.Data { + points[i] = metricsv2.Point{Value: p.Value, Timestamp: p.Timestamp} + } + return &metricsv2.QueryResponse{FlatQueryResponse: metricsv2.NewFlatQueryResponse(points)}, httpResp, nil +} + func UnmarshalFlatQueryResponseIfDataSchemaMatchError(err error, metricsResponse *metricsv2.QueryResponse, httpResp *http.Response) error { if !IsDataMatchesMoreThanOneSchemaError(err) { return nil diff --git a/test/fixtures/output/schema-registry/cluster/describe.golden b/test/fixtures/output/schema-registry/cluster/describe.golden index b62e7ab1d5..9a5b513499 100644 --- a/test/fixtures/output/schema-registry/cluster/describe.golden +++ b/test/fixtures/output/schema-registry/cluster/describe.golden @@ -6,8 +6,8 @@ | Private Regional Endpoint URLs | us-east-1=https://lsrc-stk1d.us-east-1.aws.private.stag.cpdev.cloud | | | us-west-2=https://lsrc-stgvk1d.us-west-2.aws.private.stag.cpdev.cloud | | Catalog Endpoint URL | http://127.0.0.1:1030 | -| Used Schemas | 0 | -| Available Schemas | 1000 | +| Used Schemas | 7 | +| Available Schemas | 993 | | Free Schemas Limit | 1000 | | Global Compatibility | FULL | | Mode | READWRITE | diff --git a/test/test-server/metrics_handlers.go b/test/test-server/metrics_handlers.go index 324373be49..904363669d 100644 --- a/test/test-server/metrics_handlers.go +++ b/test/test-server/metrics_handlers.go @@ -2,7 +2,9 @@ package testserver import ( "encoding/json" + "io" "net/http" + "strings" "testing" "time" @@ -13,16 +15,47 @@ import ( var queryTime = time.Date(2019, 12, 19, 16, 1, 0, 0, time.UTC) +// schemaCountTwoPsrcValue simulates an LSRC whose schemas span two PSRCs (e.g. +// PSRC1=6, PSRC2=1 schemas). With the gauge's default MEAN time aggregation, +// the API would return a fractional value (~3.5). With the billing-shaped query +// (time_agg=MAX, agg=SUM), the per-PSRC counts collapse correctly. +const schemaCountTwoPsrcValue float64 = 7 + func handleMetricsQuery(t *testing.T) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + require.NoError(t, err) + + // schema_count is a GAUGE; for legacy LSRCs spanning two PSRCs, the + // API's default MEAN under-counts. The CLI must send the undocumented + // "time_agg":"MAX" alongside "agg":"SUM". See + // internal/schema-registry/command_cluster_describe.go:schemaCountQueryBodyFor. + // We parse as raw JSON because the v2 SDK type can't represent time_agg. + var raw map[string]any + require.NoError(t, json.Unmarshal(body, &raw)) + + value := 0.0 + if aggs, ok := raw["aggregations"].([]any); ok { + for _, a := range aggs { + agg, _ := a.(map[string]any) + metric, _ := agg["metric"].(string) + if !strings.HasSuffix(metric, "/schema_count") { + continue + } + require.Equal(t, "MAX", agg["time_agg"], "schema_count query must set time_agg=MAX to override the gauge MEAN default") + require.Equal(t, "SUM", agg["agg"], "schema_count query must set agg=SUM") + value = schemaCountTwoPsrcValue + } + } + resp := &metricsv2.QueryResponse{ FlatQueryResponse: &metricsv2.FlatQueryResponse{ Data: []metricsv2.Point{ - {Value: 0.0, Timestamp: queryTime}, + {Value: float32(value), Timestamp: queryTime}, }, }, } - err := json.NewEncoder(w).Encode(resp) + err = json.NewEncoder(w).Encode(resp) require.NoError(t, err) } }