Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 29 additions & 14 deletions internal/schema-registry/command_cluster_describe.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package schemaregistry

import (
"encoding/json"
"fmt"
"math"
"strconv"
"strings"

"github.com/spf13/cobra"

metricsv2 "github.com/confluentinc/ccloud-sdk-go-v2/metrics/v2"

"github.com/confluentinc/cli/v4/pkg/ccloudv2"
pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/log"
Expand Down Expand Up @@ -58,7 +57,7 @@
return cmd
}

func (c *command) clusterDescribe(cmd *cobra.Command, _ []string) error {

Check failure on line 60 in internal/schema-registry/command_cluster_describe.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Refactor this method to reduce its Cognitive Complexity from 24 to the 15 allowed.

[S3776] Cognitive Complexity of functions should not be too high See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3374&issues=4d41b8d6-f0d6-4186-aff8-aaa8f6f4efaa&open=4d41b8d6-f0d6-4186-aff8-aaa8f6f4efaa
var numSchemas string
var availableSchemas string

Expand Down Expand Up @@ -96,7 +95,11 @@
return err
}

metricsResponse, httpResp, err := metricsClient.MetricsDatasetQuery("cloud", schemaCountQueryFor(cluster.GetId()))
queryBody, err := schemaCountQueryBodyFor(cluster.GetId())
if err != nil {
return err
}
metricsResponse, httpResp, err := metricsClient.MetricsDatasetQueryRaw("cloud", queryBody)
if err := ccloudv2.UnmarshalFlatQueryResponseIfDataSchemaMatchError(err, metricsResponse, httpResp); err != nil {
return err
}
Expand Down Expand Up @@ -158,18 +161,30 @@
return table.Print()
}

func schemaCountQueryFor(schemaRegistryId string) metricsv2.QueryRequest {
aggregations := []metricsv2.Aggregation{{Metric: "io.confluent.kafka.schema_registry/schema_count"}}
filter := metricsv2.Filter{
FieldFilter: &metricsv2.FieldFilter{
Field: metricsv2.PtrString("resource.schema_registry.id"),
Op: "EQ",
Value: metricsv2.StringAsFieldFilterValue(metricsv2.PtrString(schemaRegistryId)),
// schemaCountQueryBodyFor builds the Metrics-API request body for schema_count.
// schema_count is a GAUGE. For legacy LSRCs whose schemas span two PSRCs, each
// PSRC emits its own data point against the same LSRC ID, and the API's default
// MEAN time aggregation under-counts. We mirror cc-billing-worker's query
// (metrics/configurable/cloud_metrics_plugin.go) which uses the undocumented
// "time_agg" field to force MAX per series before "agg":"SUM" combines them.
// The v2 SDK doesn't expose "time_agg", so the body is built and sent raw.
func schemaCountQueryBodyFor(schemaRegistryId string) ([]byte, error) {
return json.Marshal(map[string]any{
"aggregations": []map[string]any{{
"time_agg": "MAX",
"agg": "SUM",
"metric": "io.confluent.kafka.schema_registry/schema_count",
}},
"filter": map[string]any{
"field": "resource.schema_registry.id",
"op": "EQ",
"value": schemaRegistryId,
},
}
req := metricsv2.NewQueryRequest(aggregations, "ALL", []string{"PT1M/now-2m|m"})
req.SetFilter(filter)
return *req
"format": "FLAT",
"granularity": "PT1H",
"intervals": []string{"PT1H/now-2m|m"},
"limit": 1000,
})
}

func getMaxSchemaLimitPriceKey(serviceProvider, serviceProviderRegion, streamGovernancePackage string) string {
Expand Down
48 changes: 48 additions & 0 deletions pkg/ccloudv2/metrics.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package ccloudv2

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
Expand Down Expand Up @@ -49,6 +51,52 @@ func (c *MetricsClient) MetricsDatasetQuery(dataset string, query metricsv2.Quer
return c.Version2Api.V2MetricsDatasetQueryPost(c.context(), dataset).QueryRequest(query).Execute()
}

// MetricsDatasetQueryRaw posts a hand-built JSON body to /v2/metrics/{dataset}/query.
// Use this when the request needs a field the typed SDK doesn't expose (e.g. the
// undocumented "time_agg" knob that the Metrics API requires to override the
// gauge MEAN time aggregation; see schema-registry cluster describe).
func (c *MetricsClient) MetricsDatasetQueryRaw(dataset string, body []byte) (*metricsv2.QueryResponse, *http.Response, error) {
cfg := c.GetConfig()
if len(cfg.Servers) == 0 {
return nil, nil, fmt.Errorf("metrics client has no configured server")
}
url := fmt.Sprintf("%s/v2/metrics/%s/query", cfg.Servers[0].URL, dataset)

req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, url, bytes.NewReader(body))
if err != nil {
return nil, nil, err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json")
if token, err := auth.GetDataplaneToken(c.cfg.Context()); err == nil {
req.Header.Set("Authorization", "Bearer "+token)
}

httpResp, err := cfg.HTTPClient.Do(req)
if err != nil {
return nil, httpResp, err
}
defer httpResp.Body.Close()

respBody, err := io.ReadAll(httpResp.Body)
if err != nil {
return nil, httpResp, err
}
if httpResp.StatusCode >= 400 {
return nil, httpResp, fmt.Errorf("metrics API returned %d: %s", httpResp.StatusCode, string(respBody))
}

var flat flatQueryResponse
if err := json.Unmarshal(respBody, &flat); err != nil {
return nil, httpResp, err
}
points := make([]metricsv2.Point, len(flat.Data))
for i, p := range flat.Data {
points[i] = metricsv2.Point{Value: p.Value, Timestamp: p.Timestamp}
}
return &metricsv2.QueryResponse{FlatQueryResponse: metricsv2.NewFlatQueryResponse(points)}, httpResp, nil
}

func UnmarshalFlatQueryResponseIfDataSchemaMatchError(err error, metricsResponse *metricsv2.QueryResponse, httpResp *http.Response) error {
if !IsDataMatchesMoreThanOneSchemaError(err) {
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
| Private Regional Endpoint URLs | us-east-1=https://lsrc-stk1d.us-east-1.aws.private.stag.cpdev.cloud |
| | us-west-2=https://lsrc-stgvk1d.us-west-2.aws.private.stag.cpdev.cloud |
| Catalog Endpoint URL | http://127.0.0.1:1030 |
| Used Schemas | 0 |
| Available Schemas | 1000 |
| Used Schemas | 7 |
| Available Schemas | 993 |
| Free Schemas Limit | 1000 |
| Global Compatibility | FULL |
| Mode | READWRITE |
Expand Down
37 changes: 35 additions & 2 deletions test/test-server/metrics_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package testserver

import (
"encoding/json"
"io"
"net/http"
"strings"
"testing"
"time"

Expand All @@ -13,16 +15,47 @@ import (

var queryTime = time.Date(2019, 12, 19, 16, 1, 0, 0, time.UTC)

// schemaCountTwoPsrcValue simulates an LSRC whose schemas span two PSRCs (e.g.
// PSRC1=6, PSRC2=1 schemas). With the gauge's default MEAN time aggregation,
// the API would return a fractional value (~3.5). With the billing-shaped query
// (time_agg=MAX, agg=SUM), the per-PSRC counts collapse correctly.
const schemaCountTwoPsrcValue float64 = 7

func handleMetricsQuery(t *testing.T) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
require.NoError(t, err)

// schema_count is a GAUGE; for legacy LSRCs spanning two PSRCs, the
// API's default MEAN under-counts. The CLI must send the undocumented
// "time_agg":"MAX" alongside "agg":"SUM". See
// internal/schema-registry/command_cluster_describe.go:schemaCountQueryBodyFor.
// We parse as raw JSON because the v2 SDK type can't represent time_agg.
var raw map[string]any
require.NoError(t, json.Unmarshal(body, &raw))

value := 0.0
if aggs, ok := raw["aggregations"].([]any); ok {
for _, a := range aggs {
agg, _ := a.(map[string]any)
metric, _ := agg["metric"].(string)
if !strings.HasSuffix(metric, "/schema_count") {
continue
}
require.Equal(t, "MAX", agg["time_agg"], "schema_count query must set time_agg=MAX to override the gauge MEAN default")
require.Equal(t, "SUM", agg["agg"], "schema_count query must set agg=SUM")
value = schemaCountTwoPsrcValue
}
}

resp := &metricsv2.QueryResponse{
FlatQueryResponse: &metricsv2.FlatQueryResponse{
Data: []metricsv2.Point{
{Value: 0.0, Timestamp: queryTime},
{Value: float32(value), Timestamp: queryTime},
},
},
}
err := json.NewEncoder(w).Encode(resp)
err = json.NewEncoder(w).Encode(resp)
require.NoError(t, err)
}
}