Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/asyncapi/command_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func (c *command) getMessageExamples(consumer *ckgo.Consumer, topicName, content
return nil, err
}

err = deserializationProvider.InitDeserializer(srEndpoint, srClusterId, "value", serdes.SchemaRegistryAuth{Token: token}, nil)
err = deserializationProvider.InitDeserializer(srEndpoint, srClusterId, "", "value", serdes.SchemaRegistryAuth{Token: token}, nil)
if err != nil {
return nil, err
}
Expand Down
3 changes: 3 additions & 0 deletions internal/kafka/command_topic_consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,12 +286,15 @@ func (c *command) consumeCloud(cmd *cobra.Command, args []string) error {
subject = schemaRegistryContext
}

log.CliLogger.Tracef("consumeCloud: kafkaClusterId=%q topic=%q", cluster.ID, topic)

groupHandler := &GroupHandler{
SrClient: srClient,
SrApiKey: srApiKey,
SrApiSecret: srApiSecret,
SrClusterId: srClusterId,
SrClusterEndpoint: srEndpoint,
KafkaClusterId: cluster.ID,
Token: token,
KeyFormat: keyFormat,
ValueFormat: valueFormat,
Expand Down
97 changes: 50 additions & 47 deletions internal/kafka/command_topic_produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
cmd.Flags().String("delimiter", ":", "The delimiter separating each key and value.")
cmd.Flags().StringSlice("config", nil, `A comma-separated list of configuration overrides ("key=value") for the producer client. For a full list, see https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html`)
pcmd.AddProducerConfigFileFlag(cmd)
cmd.Flags().String("schema-registry-endpoint", "", "Endpoint for Schema Registry cluster.")

Check failure on line 65 in internal/kafka/command_topic_produce.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Define a constant instead of duplicating this literal "schema-registry-endpoint" 3 times.

[S1192] String literals should not be duplicated See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3373&issues=7da7000c-6f47-48eb-8769-f1c334535903&open=7da7000c-6f47-48eb-8769-f1c334535903
cmd.Flags().StringSlice("headers", nil, `A comma-separated list of headers formatted as "key:value".`)
cmd.Flags().Bool("schema-id-header", false, "Serialize schema ID in the header instead of the message prefix.")

Expand Down Expand Up @@ -147,12 +147,14 @@
return err
}

keySerializer, keyMetaInfo, err := c.initSchemaAndGetInfo(cmd, topic, "key")
log.CliLogger.Tracef("produceCloud: kafkaClusterId=%q topic=%q", cluster.ID, topic)

keySerializer, keyMetaInfo, err := c.initSchemaAndGetInfo(cmd, topic, "key", cluster.ID)
if err != nil {
return err
}

valueSerializer, valueMetaInfo, err := c.initSchemaAndGetInfo(cmd, topic, "value")
valueSerializer, valueMetaInfo, err := c.initSchemaAndGetInfo(cmd, topic, "value", cluster.ID)
if err != nil {
return err
}
Expand Down Expand Up @@ -207,12 +209,12 @@
func (c *command) produceOnPrem(cmd *cobra.Command, args []string) error {
topic := args[0]

keySerializer, keyMetaInfo, err := c.initSchemaAndGetInfoOnPrem(cmd, topic, "key")
keySerializer, keyMetaInfo, err := c.initSchemaAndGetInfoOnPrem(cmd, topic, "key", "")
if err != nil {
return err
}

valueSerializer, valueMetaInfo, err := c.initSchemaAndGetInfoOnPrem(cmd, topic, "value")
valueSerializer, valueMetaInfo, err := c.initSchemaAndGetInfoOnPrem(cmd, topic, "value", "")
if err != nil {
return err
}
Expand Down Expand Up @@ -461,14 +463,12 @@
return "", "", errors.New(missingOrMalformedKeyErrorMsg)
}

func (c *command) initSchemaAndGetInfo(cmd *cobra.Command, topic, mode string) (serdes.SerializationProvider, []byte, error) {
func (c *command) initSchemaAndGetInfo(cmd *cobra.Command, topic, mode, kafkaClusterId string) (serdes.SerializationProvider, []byte, error) {

Check failure on line 466 in internal/kafka/command_topic_produce.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Refactor this method to reduce its Cognitive Complexity from 49 to the 15 allowed.

[S3776] Cognitive Complexity of functions should not be too high See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3373&issues=b03a5a4d-4f31-4d44-bacd-1b3b8fdf1814&open=b03a5a4d-4f31-4d44-bacd-1b3b8fdf1814
schemaDir, err := createTempDir()
if err != nil {
return nil, nil, err
}

subject := topicNameStrategy(topic, mode)

// Deprecated
var schemaId optional.Int32
if mode == "value" && cmd.Flags().Changed("schema-id") {
Expand All @@ -491,6 +491,46 @@
schemaId = optional.NewInt32(int32(id))
}

srEndpoint, err := cmd.Flags().GetString("schema-registry-endpoint")
if err != nil {
return nil, nil, err
}
srApiKey, err := cmd.Flags().GetString("schema-registry-api-key")
if err != nil {
return nil, nil, err
}
srApiSecret, err := cmd.Flags().GetString("schema-registry-api-secret")
if err != nil {
return nil, nil, err
}
var token string
if c.Config.IsCloudLogin() { // Do not get token if users are consuming from Cloud while logged out
token, err = auth.GetDataplaneToken(c.Context)
if err != nil {
return nil, nil, err
}
}
var srClusterId string
if (schemaId.IsSet() || schema != "") && srEndpoint == "" {
srClusterId, srEndpoint, err = c.GetCurrentSchemaRegistryClusterIdAndEndpoint(cmd)
if err != nil {
return nil, nil, err
}
}
srAuth := serdes.SchemaRegistryAuth{
ApiKey: srApiKey,
ApiSecret: srApiSecret,
Token: token,
}

// Resolve subject via SR associations, fall back to TopicNameStrategy on miss.
subject := topicNameStrategy(topic, mode)
if kafkaClusterId != "" && srEndpoint != "" {
if client, err := newSchemaRegistryClient(srEndpoint, srClusterId, srAuth); err == nil {
subject = resolveSubject(client, kafkaClusterId, topic, mode)
}
Comment on lines +526 to +531
Comment on lines +526 to +531
}

var format string
referencePathMap := map[string]string{}
metaInfo := []byte{}
Expand Down Expand Up @@ -559,49 +599,12 @@
}
}

// Fetch the SR client endpoint during schema registration
srEndpoint, err := cmd.Flags().GetString("schema-registry-endpoint")
if err != nil {
return nil, nil, err
}

var srClusterId string
if (schemaId.IsSet() || schema != "") && srEndpoint == "" {
srClusterId, srEndpoint, err = c.GetCurrentSchemaRegistryClusterIdAndEndpoint(cmd)
if err != nil {
return nil, nil, err
}
}

// Initialize the serializer with the same SR endpoint during registration
// The associated schema ID is also required to initialize the serializer
srApiKey, err := cmd.Flags().GetString("schema-registry-api-key")
if err != nil {
return nil, nil, err
}
srApiSecret, err := cmd.Flags().GetString("schema-registry-api-secret")
if err != nil {
return nil, nil, err
}
var parsedSchemaId = -1
if len(metaInfo) >= 5 {
parsedSchemaId = int(binary.BigEndian.Uint32(metaInfo[1:5]))
}

var token string
if c.Config.IsCloudLogin() { // Do not get token if users are consuming from Cloud while logged out
token, err = auth.GetDataplaneToken(c.Context)
if err != nil {
return nil, nil, err
}
}
srAuth := serdes.SchemaRegistryAuth{
ApiKey: srApiKey,
ApiSecret: srApiSecret,
Token: token,
}
err = serializationProvider.InitSerializer(srEndpoint, srClusterId, mode, parsedSchemaId, srAuth)
if err != nil {
if err := serializationProvider.InitSerializer(srEndpoint, srClusterId, kafkaClusterId, mode, parsedSchemaId, srAuth); err != nil {
return nil, nil, err
}

Expand All @@ -613,7 +616,7 @@
return serializationProvider, metaInfo, nil
}

func (c *command) initSchemaAndGetInfoOnPrem(cmd *cobra.Command, topic, mode string) (serdes.SerializationProvider, []byte, error) {
func (c *command) initSchemaAndGetInfoOnPrem(cmd *cobra.Command, topic, mode, kafkaClusterId string) (serdes.SerializationProvider, []byte, error) {

Check failure on line 619 in internal/kafka/command_topic_produce.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Refactor this method to reduce its Cognitive Complexity from 34 to the 15 allowed.

[S3776] Cognitive Complexity of functions should not be too high See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3373&issues=ce826e1e-1f91-4615-8b4a-a57080543e9b&open=ce826e1e-1f91-4615-8b4a-a57080543e9b
schemaDir, err := createTempDir()
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -732,7 +735,7 @@
ClientKeyPath: clientKeyPath,
Token: token,
}
err = serializationProvider.InitSerializer(srEndpoint, "", mode, parsedSchemaId, srAuth)
err = serializationProvider.InitSerializer(srEndpoint, "", kafkaClusterId, mode, parsedSchemaId, srAuth)
if err != nil {
return nil, nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions internal/kafka/confluent_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type GroupHandler struct {
SrApiSecret string
SrClusterId string
SrClusterEndpoint string
KafkaClusterId string
Token string
CertificateAuthorityPath string
ClientCertPath string
Expand Down Expand Up @@ -241,7 +242,7 @@ func ConsumeMessage(message *ckgo.Message, h *GroupHandler) error {
return err
}

err = keyDeserializer.InitDeserializer(h.SrClusterEndpoint, h.SrClusterId, "key", srAuth, nil)
err = keyDeserializer.InitDeserializer(h.SrClusterEndpoint, h.SrClusterId, h.KafkaClusterId, "key", srAuth, nil)
if err != nil {
return err
}
Expand All @@ -268,7 +269,7 @@ func ConsumeMessage(message *ckgo.Message, h *GroupHandler) error {
return err
}

err = valueDeserializer.InitDeserializer(h.SrClusterEndpoint, h.SrClusterId, "value", srAuth, nil)
err = valueDeserializer.InitDeserializer(h.SrClusterEndpoint, h.SrClusterId, h.KafkaClusterId, "value", srAuth, nil)
if err != nil {
return err
}
Expand Down
40 changes: 40 additions & 0 deletions internal/kafka/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@ import (

cmkv2 "github.com/confluentinc/ccloud-sdk-go-v2/cmk/v2"
cckafkarestv3 "github.com/confluentinc/ccloud-sdk-go-v2/kafkarest/v3"
"github.com/confluentinc/confluent-kafka-go/v2/schemaregistry"
cpkafkarestv3 "github.com/confluentinc/kafka-rest-sdk-go/kafkarestv3"

"github.com/confluentinc/cli/v4/pkg/ccloudv2"
"github.com/confluentinc/cli/v4/pkg/ccstructs"
"github.com/confluentinc/cli/v4/pkg/kafkarest"
"github.com/confluentinc/cli/v4/pkg/kafkausagelimits"
"github.com/confluentinc/cli/v4/pkg/log"
"github.com/confluentinc/cli/v4/pkg/serdes"
)

func toCreateTopicConfigs(topicConfigsMap map[string]string) []cckafkarestv3.ConfigData {
Expand Down Expand Up @@ -210,6 +213,43 @@ func topicNameStrategy(topic, mode string) string {
return fmt.Sprintf("%s-%s", topic, mode)
}

func newSchemaRegistryClient(srClientUrl, srClusterId string, srAuth serdes.SchemaRegistryAuth) (schemaregistry.Client, error) {
var cfg *schemaregistry.Config
switch {
case srAuth.ApiKey != "" && srAuth.ApiSecret != "":
cfg = schemaregistry.NewConfigWithBasicAuthentication(srClientUrl, srAuth.ApiKey, srAuth.ApiSecret)
case srAuth.Token != "":
cfg = schemaregistry.NewConfigWithBearerAuthentication(srClientUrl, srAuth.Token, srClusterId, "")
default:
cfg = schemaregistry.NewConfig(srClientUrl)
log.CliLogger.Info("initializing schema registry client with no authentication")
}
cfg.SslCaLocation = srAuth.CertificateAuthorityPath
cfg.SslCertificateLocation = srAuth.ClientCertPath
cfg.SslKeyLocation = srAuth.ClientKeyPath
return schemaregistry.NewClient(cfg)
}

// returns the SR subject for (topic, mode) by querying the associations API with the Kafka cluster id
// as resource namespace. Falls backt o default TopicNameStrategy (<topic>-<mode>) if unmatched.
func resolveSubject(client schemaregistry.Client, kafkaClusterId, topic, mode string) string {
fallback := topic + "-" + mode
if kafkaClusterId == "" || client == nil {
return fallback
}
associations, err := client.GetAssociationsByResourceName(topic, kafkaClusterId, "topic", []string{mode}, "", 0, -1)
if err != nil {
log.CliLogger.Tracef("subject resolution: associations lookup failed (topic=%q mode=%q clusterId=%q): %v; using %q", topic, mode, kafkaClusterId, err, fallback)
return fallback
}
if len(associations) == 0 {
log.CliLogger.Tracef("subject resolution: no association for topic=%q mode=%q clusterId=%q; using %q", topic, mode, kafkaClusterId, fallback)
return fallback
}
log.CliLogger.Tracef("subject resolution: resolved associated subject %q (topic=%q mode=%q clusterId=%q)", associations[0].Subject, topic, mode, kafkaClusterId)
return associations[0].Subject
}

func getLimitsForSku(cluster *cmkv2.CmkV2Cluster, usageLimits *kafkausagelimits.UsageLimits) *kafkausagelimits.Limits {
if isDedicated(cluster) {
return usageLimits.GetCkuLimit(cluster.Status.GetCku())
Expand Down
4 changes: 3 additions & 1 deletion pkg/serdes/avro_deserialization_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type AvroDeserializationProvider struct {
deser *avrov2.Deserializer
}

func (a *AvroDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error {
func (a *AvroDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, kafkaClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error {
// Note: Now Serializer/Deserializer are tightly coupled with Schema Registry
// If existingClient is not nil, we should share this client between ser and deser.
// As the shared client is referred as mock client to store the same set of schemas in cache
Expand All @@ -34,6 +34,8 @@ func (a *AvroDeserializationProvider) InitDeserializer(srClientUrl, srClusterId,
}
}

serdeConfig.SubjectNameStrategyType, serdeConfig.SubjectNameStrategyConfig = subjectStrategy(kafkaClusterId)

var serdeType serde.Type
switch mode {
case "key":
Expand Down
13 changes: 10 additions & 3 deletions pkg/serdes/avro_serialization_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type AvroSerializationProvider struct {
mode string
}

func (a *AvroSerializationProvider) InitSerializer(srClientUrl, srClusterId, mode string, schemaId int, srAuth SchemaRegistryAuth) error {
func (a *AvroSerializationProvider) InitSerializer(srClientUrl, srClusterId, kafkaClusterId, mode string, schemaId int, srAuth SchemaRegistryAuth) error {
serdeClient, err := initSchemaRegistryClient(srClientUrl, srClusterId, srAuth, nil)
if err != nil {
return fmt.Errorf("failed to create serializer-specific Schema Registry client: %w", err)
Expand Down Expand Up @@ -65,6 +65,8 @@ func (a *AvroSerializationProvider) InitSerializer(srClientUrl, srClusterId, mod
serdeConfig.UseLatestVersion = false
}

serdeConfig.SubjectNameStrategyType, serdeConfig.SubjectNameStrategyConfig = subjectStrategy(kafkaClusterId)

var serdeType serde.Type
if mode == "key" {
serdeType = serde.KeySerde
Expand Down Expand Up @@ -98,8 +100,13 @@ func (a *AvroSerializationProvider) GetSchemaName() string {
}

func (a *AvroSerializationProvider) Serialize(topic, message string) ([]kafka.Header, []byte, error) {
// Step#1: Fetch the schemaInfo based on subject and schema ID
schemaObj, err := a.GetSchemaRegistryClient().GetBySubjectAndID(topic+"-"+a.mode, a.schemaId)
// Step#1: Ask the configured ckgo strategy for the subject (AssociatedNameStrategy
// on cloud, TopicNameStrategy on on-prem), then fetch the schema by subject + id.
subject, err := a.ser.SubjectNameStrategy(topic, a.ser.SerdeType, schemaregistry.SchemaInfo{})
if err != nil {
return nil, nil, fmt.Errorf("failed to resolve subject: %w", err)
}
schemaObj, err := a.GetSchemaRegistryClient().GetBySubjectAndID(subject, a.schemaId)
if err != nil {
return nil, nil, fmt.Errorf("failed to serialize message: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/serdes/double_deserialization_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

type DoubleDeserializationProvider struct{}

func (DoubleDeserializationProvider) InitDeserializer(_, _, _ string, _ SchemaRegistryAuth, _ schemaregistry.Client) error {
func (DoubleDeserializationProvider) InitDeserializer(_, _, _, _ string, _ SchemaRegistryAuth, _ schemaregistry.Client) error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/serdes/double_serialization_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

type DoubleSerializationProvider struct{}

func (DoubleSerializationProvider) InitSerializer(_, _, _ string, _ int, _ SchemaRegistryAuth) error {
func (DoubleSerializationProvider) InitSerializer(_, _, _, _ string, _ int, _ SchemaRegistryAuth) error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/serdes/integer_deserialization_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

type IntegerDeserializationProvider struct{}

func (IntegerDeserializationProvider) InitDeserializer(_, _, _ string, _ SchemaRegistryAuth, _ schemaregistry.Client) error {
func (IntegerDeserializationProvider) InitDeserializer(_, _, _, _ string, _ SchemaRegistryAuth, _ schemaregistry.Client) error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/serdes/integer_serialization_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

type IntegerSerializationProvider struct{}

func (IntegerSerializationProvider) InitSerializer(_, _, _ string, _ int, _ SchemaRegistryAuth) error {
func (IntegerSerializationProvider) InitSerializer(_, _, _, _ string, _ int, _ SchemaRegistryAuth) error {
return nil
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/serdes/json_deserialization_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type JsonDeserializationProvider struct {
deser *jsonschema.Deserializer
}

func (j *JsonDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error {
func (j *JsonDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, kafkaClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error {
// Note: Now Serializer/Deserializer are tightly coupled with Schema Registry
// If existingClient is not nil, we should share this client between ser and deser.
// As the shared client is referred as mock client to store the same set of schemas in cache
Expand All @@ -36,6 +36,8 @@ func (j *JsonDeserializationProvider) InitDeserializer(srClientUrl, srClusterId,
}
}

serdeConfig.SubjectNameStrategyType, serdeConfig.SubjectNameStrategyConfig = subjectStrategy(kafkaClusterId)

var serdeType serde.Type
switch mode {
case "key":
Expand Down
4 changes: 3 additions & 1 deletion pkg/serdes/json_serialization_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type JsonSerializationProvider struct {
ser *jsonschema.Serializer
}

func (j *JsonSerializationProvider) InitSerializer(srClientUrl, srClusterId, mode string, schemaId int, srAuth SchemaRegistryAuth) error {
func (j *JsonSerializationProvider) InitSerializer(srClientUrl, srClusterId, kafkaClusterId, mode string, schemaId int, srAuth SchemaRegistryAuth) error {
serdeClient, err := initSchemaRegistryClient(srClientUrl, srClusterId, srAuth, nil)
if err != nil {
return fmt.Errorf("failed to create serializer-specific Schema Registry client: %w", err)
Expand Down Expand Up @@ -63,6 +63,8 @@ func (j *JsonSerializationProvider) InitSerializer(srClientUrl, srClusterId, mod
serdeConfig.UseLatestVersion = false
}

serdeConfig.SubjectNameStrategyType, serdeConfig.SubjectNameStrategyConfig = subjectStrategy(kafkaClusterId)

var serdeType serde.Type
if mode == "key" {
serdeType = serde.KeySerde
Expand Down
Loading