From 884641fb6fd68868995fe6afd74c76a7905ab43b Mon Sep 17 00:00:00 2001 From: Cynthia Qin Date: Fri, 29 May 2026 00:24:59 -0700 Subject: [PATCH 1/8] Update produce and consume w/ association support --- internal/kafka/command_topic_consume.go | 1 + internal/kafka/command_topic_produce.go | 95 +++++++++++++------------ 2 files changed, 49 insertions(+), 47 deletions(-) diff --git a/internal/kafka/command_topic_consume.go b/internal/kafka/command_topic_consume.go index b80faa9b59..5a4ba9c029 100644 --- a/internal/kafka/command_topic_consume.go +++ b/internal/kafka/command_topic_consume.go @@ -292,6 +292,7 @@ func (c *command) consumeCloud(cmd *cobra.Command, args []string) error { SrApiSecret: srApiSecret, SrClusterId: srClusterId, SrClusterEndpoint: srEndpoint, + KafkaClusterId: cluster.ID, Token: token, KeyFormat: keyFormat, ValueFormat: valueFormat, diff --git a/internal/kafka/command_topic_produce.go b/internal/kafka/command_topic_produce.go index ec9c15ea98..2bc81e4686 100644 --- a/internal/kafka/command_topic_produce.go +++ b/internal/kafka/command_topic_produce.go @@ -147,12 +147,12 @@ func (c *command) produceCloud(cmd *cobra.Command, args []string) error { return err } - keySerializer, keyMetaInfo, err := c.initSchemaAndGetInfo(cmd, topic, "key") + keySerializer, keyMetaInfo, err := c.initSchemaAndGetInfo(cmd, topic, "key", cluster.ID) if err != nil { return err } - valueSerializer, valueMetaInfo, err := c.initSchemaAndGetInfo(cmd, topic, "value") + valueSerializer, valueMetaInfo, err := c.initSchemaAndGetInfo(cmd, topic, "value", cluster.ID) if err != nil { return err } @@ -207,12 +207,12 @@ func (c *command) produceCloud(cmd *cobra.Command, args []string) error { func (c *command) produceOnPrem(cmd *cobra.Command, args []string) error { topic := args[0] - keySerializer, keyMetaInfo, err := c.initSchemaAndGetInfoOnPrem(cmd, topic, "key") + keySerializer, keyMetaInfo, err := c.initSchemaAndGetInfoOnPrem(cmd, topic, "key", "") if err != nil { return err } - valueSerializer, valueMetaInfo, err := c.initSchemaAndGetInfoOnPrem(cmd, topic, "value") + valueSerializer, valueMetaInfo, err := c.initSchemaAndGetInfoOnPrem(cmd, topic, "value", "") if err != nil { return err } @@ -461,14 +461,12 @@ func getKeyAndValue(schemaBased bool, data, delimiter string) (string, string, e return "", "", errors.New(missingOrMalformedKeyErrorMsg) } -func (c *command) initSchemaAndGetInfo(cmd *cobra.Command, topic, mode string) (serdes.SerializationProvider, []byte, error) { +func (c *command) initSchemaAndGetInfo(cmd *cobra.Command, topic, mode, kafkaClusterId string) (serdes.SerializationProvider, []byte, error) { schemaDir, err := createTempDir() if err != nil { return nil, nil, err } - subject := topicNameStrategy(topic, mode) - // Deprecated var schemaId optional.Int32 if mode == "value" && cmd.Flags().Changed("schema-id") { @@ -491,6 +489,46 @@ func (c *command) initSchemaAndGetInfo(cmd *cobra.Command, topic, mode string) ( schemaId = optional.NewInt32(int32(id)) } + srEndpoint, err := cmd.Flags().GetString("schema-registry-endpoint") + if err != nil { + return nil, nil, err + } + srApiKey, err := cmd.Flags().GetString("schema-registry-api-key") + if err != nil { + return nil, nil, err + } + srApiSecret, err := cmd.Flags().GetString("schema-registry-api-secret") + if err != nil { + return nil, nil, err + } + var token string + if c.Config.IsCloudLogin() { // Do not get token if users are consuming from Cloud while logged out + token, err = auth.GetDataplaneToken(c.Context) + if err != nil { + return nil, nil, err + } + } + var srClusterId string + if (schemaId.IsSet() || schema != "") && srEndpoint == "" { + srClusterId, srEndpoint, err = c.GetCurrentSchemaRegistryClusterIdAndEndpoint(cmd) + if err != nil { + return nil, nil, err + } + } + srAuth := serdes.SchemaRegistryAuth{ + ApiKey: srApiKey, + ApiSecret: srApiSecret, + Token: token, + } + + // Resolve subject via SR associations, fall back to TopicNameStrategy on miss. + subject := topicNameStrategy(topic, mode) + if kafkaClusterId != "" && srEndpoint != "" { + if client, err := serdes.NewSchemaRegistryClient(srEndpoint, srClusterId, srAuth); err == nil { + subject = serdes.ResolveSubject(client, kafkaClusterId, topic, mode) + } + } + var format string referencePathMap := map[string]string{} metaInfo := []byte{} @@ -559,49 +597,12 @@ func (c *command) initSchemaAndGetInfo(cmd *cobra.Command, topic, mode string) ( } } - // Fetch the SR client endpoint during schema registration - srEndpoint, err := cmd.Flags().GetString("schema-registry-endpoint") - if err != nil { - return nil, nil, err - } - - var srClusterId string - if (schemaId.IsSet() || schema != "") && srEndpoint == "" { - srClusterId, srEndpoint, err = c.GetCurrentSchemaRegistryClusterIdAndEndpoint(cmd) - if err != nil { - return nil, nil, err - } - } - - // Initialize the serializer with the same SR endpoint during registration - // The associated schema ID is also required to initialize the serializer - srApiKey, err := cmd.Flags().GetString("schema-registry-api-key") - if err != nil { - return nil, nil, err - } - srApiSecret, err := cmd.Flags().GetString("schema-registry-api-secret") - if err != nil { - return nil, nil, err - } var parsedSchemaId = -1 if len(metaInfo) >= 5 { parsedSchemaId = int(binary.BigEndian.Uint32(metaInfo[1:5])) } - var token string - if c.Config.IsCloudLogin() { // Do not get token if users are consuming from Cloud while logged out - token, err = auth.GetDataplaneToken(c.Context) - if err != nil { - return nil, nil, err - } - } - srAuth := serdes.SchemaRegistryAuth{ - ApiKey: srApiKey, - ApiSecret: srApiSecret, - Token: token, - } - err = serializationProvider.InitSerializer(srEndpoint, srClusterId, mode, parsedSchemaId, srAuth) - if err != nil { + if err := serializationProvider.InitSerializer(srEndpoint, srClusterId, kafkaClusterId, mode, parsedSchemaId, srAuth); err != nil { return nil, nil, err } @@ -613,7 +614,7 @@ func (c *command) initSchemaAndGetInfo(cmd *cobra.Command, topic, mode string) ( return serializationProvider, metaInfo, nil } -func (c *command) initSchemaAndGetInfoOnPrem(cmd *cobra.Command, topic, mode string) (serdes.SerializationProvider, []byte, error) { +func (c *command) initSchemaAndGetInfoOnPrem(cmd *cobra.Command, topic, mode, kafkaClusterId string) (serdes.SerializationProvider, []byte, error) { schemaDir, err := createTempDir() if err != nil { return nil, nil, err @@ -732,7 +733,7 @@ func (c *command) initSchemaAndGetInfoOnPrem(cmd *cobra.Command, topic, mode str ClientKeyPath: clientKeyPath, Token: token, } - err = serializationProvider.InitSerializer(srEndpoint, "", mode, parsedSchemaId, srAuth) + err = serializationProvider.InitSerializer(srEndpoint, "", kafkaClusterId, mode, parsedSchemaId, srAuth) if err != nil { return nil, nil, err } From 784420446b05b200f55aca564853cebbdca2684a Mon Sep 17 00:00:00 2001 From: Cynthia Qin Date: Fri, 29 May 2026 00:25:57 -0700 Subject: [PATCH 2/8] Update InitDeserializer --- internal/asyncapi/command_export.go | 2 +- internal/kafka/confluent_kafka.go | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/internal/asyncapi/command_export.go b/internal/asyncapi/command_export.go index 1b31206afe..20e2904de7 100644 --- a/internal/asyncapi/command_export.go +++ b/internal/asyncapi/command_export.go @@ -307,7 +307,7 @@ func (c *command) getMessageExamples(consumer *ckgo.Consumer, topicName, content return nil, err } - err = deserializationProvider.InitDeserializer(srEndpoint, srClusterId, "value", serdes.SchemaRegistryAuth{Token: token}, nil) + err = deserializationProvider.InitDeserializer(srEndpoint, srClusterId, "", "value", serdes.SchemaRegistryAuth{Token: token}, nil) if err != nil { return nil, err } diff --git a/internal/kafka/confluent_kafka.go b/internal/kafka/confluent_kafka.go index 2ebe6cfd6a..6c6f91e390 100644 --- a/internal/kafka/confluent_kafka.go +++ b/internal/kafka/confluent_kafka.go @@ -58,6 +58,7 @@ type GroupHandler struct { SrApiSecret string SrClusterId string SrClusterEndpoint string + KafkaClusterId string Token string CertificateAuthorityPath string ClientCertPath string @@ -241,7 +242,7 @@ func ConsumeMessage(message *ckgo.Message, h *GroupHandler) error { return err } - err = keyDeserializer.InitDeserializer(h.SrClusterEndpoint, h.SrClusterId, "key", srAuth, nil) + err = keyDeserializer.InitDeserializer(h.SrClusterEndpoint, h.SrClusterId, h.KafkaClusterId, "key", srAuth, nil) if err != nil { return err } @@ -268,7 +269,7 @@ func ConsumeMessage(message *ckgo.Message, h *GroupHandler) error { return err } - err = valueDeserializer.InitDeserializer(h.SrClusterEndpoint, h.SrClusterId, "value", srAuth, nil) + err = valueDeserializer.InitDeserializer(h.SrClusterEndpoint, h.SrClusterId, h.KafkaClusterId, "value", srAuth, nil) if err != nil { return err } From a17aa874805054d4be76eb7f90df2e6aae16fb11 Mon Sep 17 00:00:00 2001 From: Cynthia Qin Date: Fri, 29 May 2026 05:57:22 -0700 Subject: [PATCH 3/8] Add NewSchemaRegistryClient and more associate logic --- pkg/serdes/double_deserialization_provider.go | 2 +- pkg/serdes/double_serialization_provider.go | 2 +- .../integer_deserialization_provider.go | 2 +- pkg/serdes/integer_serialization_provider.go | 2 +- pkg/serdes/json_deserialization_provider.go | 9 +++++++- pkg/serdes/json_serialization_provider.go | 9 +++++++- .../protobuf_deserialization_provider.go | 9 +++++++- pkg/serdes/protobuf_serialization_provider.go | 9 +++++++- pkg/serdes/serdes.go | 22 +++++++++++++++++-- 9 files changed, 56 insertions(+), 10 deletions(-) diff --git a/pkg/serdes/double_deserialization_provider.go b/pkg/serdes/double_deserialization_provider.go index d24f2d3e67..b1a3993bfd 100644 --- a/pkg/serdes/double_deserialization_provider.go +++ b/pkg/serdes/double_deserialization_provider.go @@ -12,7 +12,7 @@ import ( type DoubleDeserializationProvider struct{} -func (DoubleDeserializationProvider) InitDeserializer(_, _, _ string, _ SchemaRegistryAuth, _ schemaregistry.Client) error { +func (DoubleDeserializationProvider) InitDeserializer(_, _, _, _ string, _ SchemaRegistryAuth, _ schemaregistry.Client) error { return nil } diff --git a/pkg/serdes/double_serialization_provider.go b/pkg/serdes/double_serialization_provider.go index e17c17da25..897d676519 100644 --- a/pkg/serdes/double_serialization_provider.go +++ b/pkg/serdes/double_serialization_provider.go @@ -12,7 +12,7 @@ import ( type DoubleSerializationProvider struct{} -func (DoubleSerializationProvider) InitSerializer(_, _, _ string, _ int, _ SchemaRegistryAuth) error { +func (DoubleSerializationProvider) InitSerializer(_, _, _, _ string, _ int, _ SchemaRegistryAuth) error { return nil } diff --git a/pkg/serdes/integer_deserialization_provider.go b/pkg/serdes/integer_deserialization_provider.go index 564b3ae724..2a26d2b550 100644 --- a/pkg/serdes/integer_deserialization_provider.go +++ b/pkg/serdes/integer_deserialization_provider.go @@ -11,7 +11,7 @@ import ( type IntegerDeserializationProvider struct{} -func (IntegerDeserializationProvider) InitDeserializer(_, _, _ string, _ SchemaRegistryAuth, _ schemaregistry.Client) error { +func (IntegerDeserializationProvider) InitDeserializer(_, _, _, _ string, _ SchemaRegistryAuth, _ schemaregistry.Client) error { return nil } diff --git a/pkg/serdes/integer_serialization_provider.go b/pkg/serdes/integer_serialization_provider.go index 12b8de571b..0770292d29 100644 --- a/pkg/serdes/integer_serialization_provider.go +++ b/pkg/serdes/integer_serialization_provider.go @@ -11,7 +11,7 @@ import ( type IntegerSerializationProvider struct{} -func (IntegerSerializationProvider) InitSerializer(_, _, _ string, _ int, _ SchemaRegistryAuth) error { +func (IntegerSerializationProvider) InitSerializer(_, _, _, _ string, _ int, _ SchemaRegistryAuth) error { return nil } diff --git a/pkg/serdes/json_deserialization_provider.go b/pkg/serdes/json_deserialization_provider.go index b8ebe2d33c..840ab81936 100644 --- a/pkg/serdes/json_deserialization_provider.go +++ b/pkg/serdes/json_deserialization_provider.go @@ -15,7 +15,7 @@ type JsonDeserializationProvider struct { deser *jsonschema.Deserializer } -func (j *JsonDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error { +func (j *JsonDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, kafkaClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error { // Note: Now Serializer/Deserializer are tightly coupled with Schema Registry // If existingClient is not nil, we should share this client between ser and deser. // As the shared client is referred as mock client to store the same set of schemas in cache @@ -36,6 +36,13 @@ func (j *JsonDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, } } + if kafkaClusterId != "" { + serdeConfig.SubjectNameStrategyType = serde.AssociatedNameStrategyType + serdeConfig.SubjectNameStrategyConfig = map[string]string{serde.KafkaClusterIDConfig: kafkaClusterId} + } else { + serdeConfig.SubjectNameStrategyType = serde.TopicNameStrategyType + } + var serdeType serde.Type switch mode { case "key": diff --git a/pkg/serdes/json_serialization_provider.go b/pkg/serdes/json_serialization_provider.go index 600fa6d810..f350b3b87f 100644 --- a/pkg/serdes/json_serialization_provider.go +++ b/pkg/serdes/json_serialization_provider.go @@ -23,7 +23,7 @@ type JsonSerializationProvider struct { ser *jsonschema.Serializer } -func (j *JsonSerializationProvider) InitSerializer(srClientUrl, srClusterId, mode string, schemaId int, srAuth SchemaRegistryAuth) error { +func (j *JsonSerializationProvider) InitSerializer(srClientUrl, srClusterId, kafkaClusterId, mode string, schemaId int, srAuth SchemaRegistryAuth) error { serdeClient, err := initSchemaRegistryClient(srClientUrl, srClusterId, srAuth, nil) if err != nil { return fmt.Errorf("failed to create serializer-specific Schema Registry client: %w", err) @@ -63,6 +63,13 @@ func (j *JsonSerializationProvider) InitSerializer(srClientUrl, srClusterId, mod serdeConfig.UseLatestVersion = false } + if kafkaClusterId != "" { + serdeConfig.SubjectNameStrategyType = serde.AssociatedNameStrategyType + serdeConfig.SubjectNameStrategyConfig = map[string]string{serde.KafkaClusterIDConfig: kafkaClusterId} + } else { + serdeConfig.SubjectNameStrategyType = serde.TopicNameStrategyType + } + var serdeType serde.Type if mode == "key" { serdeType = serde.KeySerde diff --git a/pkg/serdes/protobuf_deserialization_provider.go b/pkg/serdes/protobuf_deserialization_provider.go index 695f176465..ea84164cac 100644 --- a/pkg/serdes/protobuf_deserialization_provider.go +++ b/pkg/serdes/protobuf_deserialization_provider.go @@ -25,7 +25,7 @@ type ProtobufDeserializationProvider struct { message gproto.Message } -func (p *ProtobufDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error { +func (p *ProtobufDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, kafkaClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error { // Note: Now Serializer/Deserializer are tightly coupled with Schema Registry // If existingClient is not nil, we should share this client between ser and deser. // As the shared client is referred as mock client to store the same set of schemas in cache @@ -44,6 +44,13 @@ func (p *ProtobufDeserializationProvider) InitDeserializer(srClientUrl, srCluste } } + if kafkaClusterId != "" { + serdeConfig.SubjectNameStrategyType = serde.AssociatedNameStrategyType + serdeConfig.SubjectNameStrategyConfig = map[string]string{serde.KafkaClusterIDConfig: kafkaClusterId} + } else { + serdeConfig.SubjectNameStrategyType = serde.TopicNameStrategyType + } + var serdeType serde.Type switch mode { case "key": diff --git a/pkg/serdes/protobuf_serialization_provider.go b/pkg/serdes/protobuf_serialization_provider.go index 2f8e2ce45b..8588cea045 100644 --- a/pkg/serdes/protobuf_serialization_provider.go +++ b/pkg/serdes/protobuf_serialization_provider.go @@ -43,7 +43,7 @@ type ProtobufSerializationProvider struct { message gproto.Message } -func (p *ProtobufSerializationProvider) InitSerializer(srClientUrl, srClusterId, mode string, schemaId int, srAuth SchemaRegistryAuth) error { +func (p *ProtobufSerializationProvider) InitSerializer(srClientUrl, srClusterId, kafkaClusterId, mode string, schemaId int, srAuth SchemaRegistryAuth) error { serdeClient, err := initSchemaRegistryClient(srClientUrl, srClusterId, srAuth, nil) if err != nil { return fmt.Errorf("failed to create serializer-specific Schema Registry client: %w", err) @@ -82,6 +82,13 @@ func (p *ProtobufSerializationProvider) InitSerializer(srClientUrl, srClusterId, serdeConfig.UseLatestVersion = false } + if kafkaClusterId != "" { + serdeConfig.SubjectNameStrategyType = serde.AssociatedNameStrategyType + serdeConfig.SubjectNameStrategyConfig = map[string]string{serde.KafkaClusterIDConfig: kafkaClusterId} + } else { + serdeConfig.SubjectNameStrategyType = serde.TopicNameStrategyType + } + var serdeType serde.Type if mode == "key" { serdeType = serde.KeySerde diff --git a/pkg/serdes/serdes.go b/pkg/serdes/serdes.go index be1e0a4b3b..3f81bb0afc 100644 --- a/pkg/serdes/serdes.go +++ b/pkg/serdes/serdes.go @@ -68,7 +68,7 @@ type SchemaRegistryAuth struct { } type SerializationProvider interface { - InitSerializer(srClientUrl, srClusterId, mode string, schemaId int, srAuth SchemaRegistryAuth) error + InitSerializer(srClientUrl, srClusterId, kafkaClusterId, mode string, schemaId int, srAuth SchemaRegistryAuth) error LoadSchema(string, map[string]string) error Serialize(string, string) ([]kafka.Header, []byte, error) GetSchemaName() string @@ -77,7 +77,7 @@ type SerializationProvider interface { } type DeserializationProvider interface { - InitDeserializer(srClientUrl, srClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error + InitDeserializer(srClientUrl, srClusterId, kafkaClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error LoadSchema(string, string, serde.Type, *kafka.Message) error Deserialize(string, []kafka.Header, []byte) (string, error) GetSchemaRegistryClient() schemaregistry.Client @@ -160,3 +160,21 @@ func initSchemaRegistryClient(srClientUrl, srClusterId string, srAuth SchemaRegi return schemaregistry.NewClient(serdeClientConfig) } + +func NewSchemaRegistryClient(srClientUrl, srClusterId string, srAuth SchemaRegistryAuth) (schemaregistry.Client, error) { + return initSchemaRegistryClient(srClientUrl, srClusterId, srAuth, nil) +} + +// returns the SR subject for (topic, mode) by querying the associations API with the Kafka cluster id +// as resource namespace. Falls backt o default TopicNameStrategy (-) if unmatched. +func ResolveSubject(client schemaregistry.Client, kafkaClusterId, topic, mode string) string { + fallback := topic + "-" + mode + if kafkaClusterId == "" || client == nil { + return fallback + } + associations, err := client.GetAssociationsByResourceName(topic, kafkaClusterId, "topic", []string{mode}, "", 0, -1) + if err != nil || len(associations) == 0 { + return fallback + } + return associations[0].Subject +} From e9011c82699c0628c22e7b5bb47dde8b8fd33926 Mon Sep 17 00:00:00 2001 From: Cynthia Qin Date: Fri, 29 May 2026 10:27:44 -0700 Subject: [PATCH 4/8] More initDeserializer and initSerializer --- pkg/serdes/serdes_test.go | 250 +++++++++++++++--- pkg/serdes/string_deserialization_provider.go | 2 +- pkg/serdes/string_serialization_provider.go | 2 +- 3 files changed, 209 insertions(+), 45 deletions(-) diff --git a/pkg/serdes/serdes_test.go b/pkg/serdes/serdes_test.go index 049f72c725..2a5c1a7cf9 100644 --- a/pkg/serdes/serdes_test.go +++ b/pkg/serdes/serdes_test.go @@ -46,7 +46,7 @@ func TestInitSchemaRegistryClient(t *testing.T) { // Basic Auth provider, err := GetDeserializationProvider(avroSchemaName) req.Nil(err) - err = provider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{ + err = provider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{ ApiKey: "key", ApiSecret: "secret", }, nil) @@ -60,7 +60,7 @@ func TestInitSchemaRegistryClient(t *testing.T) { serde.GlobalRuleRegistry().Clear() provider, err = GetDeserializationProvider(jsonSchemaName) req.Nil(err) - err = provider.InitDeserializer(mockClientUrl, "lsrc-abc123", "value", SchemaRegistryAuth{Token: "token"}, nil) + err = provider.InitDeserializer(mockClientUrl, "lsrc-abc123", "", "value", SchemaRegistryAuth{Token: "token"}, nil) req.Nil(err) config = provider.GetSchemaRegistryClient().Config() req.Equal(config.SchemaRegistryURL, mockClientUrl) @@ -72,7 +72,7 @@ func TestInitSchemaRegistryClient(t *testing.T) { serde.GlobalRuleRegistry().Clear() provider, err = GetDeserializationProvider(protobufSchemaName) req.Nil(err) - err = provider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{ + err = provider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{ CertificateAuthorityPath: "ca.cert", ClientCertPath: "client.crt", ClientKeyPath: "client.key", @@ -146,7 +146,7 @@ func TestAvroSerdesValid(t *testing.T) { // Initialize the mock serializer and use latest schemaId serializationProvider, _ := GetSerializationProvider(avroSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err := serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -168,7 +168,7 @@ func TestAvroSerdesValid(t *testing.T) { // Initialize the mock deserializer deserializationProvider, _ := GetDeserializationProvider(avroSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) actualString, err := deserializationProvider.Deserialize("topic1", nil, data) @@ -189,7 +189,7 @@ func TestAvroSerdesValidWithHeaders(t *testing.T) { // Initialize the mock serializer and use latest schemaId serializationProvider, _ := GetSerializationProvider(avroSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err := serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) serializationProvider.SetSchemaIDSerializer(serde.HeaderSchemaIDSerializer) @@ -210,7 +210,7 @@ func TestAvroSerdesValidWithHeaders(t *testing.T) { // Initialize the mock deserializer deserializationProvider, _ := GetDeserializationProvider(avroSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) actualString, err := deserializationProvider.Deserialize("topic1", headers, data) @@ -228,7 +228,7 @@ func TestAvroSerdesInvalid(t *testing.T) { // Initialize the mock serializer and use latest schemaId serializationProvider, _ := GetSerializationProvider(avroSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err := serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -246,7 +246,7 @@ func TestAvroSerdesInvalid(t *testing.T) { deserializationProvider, _ := GetDeserializationProvider(avroSchemaName) req.Nil(err) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) brokenString := `{"f1"` @@ -276,7 +276,7 @@ func TestAvroSerdesNestedValid(t *testing.T) { // Initialize the mock serializer and use latest schemaId serializationProvider, _ := GetSerializationProvider(avroSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err := serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -298,7 +298,7 @@ func TestAvroSerdesNestedValid(t *testing.T) { // Initialize the mock deserializer deserializationProvider, _ := GetDeserializationProvider(avroSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) actualString, err := deserializationProvider.Deserialize("topic1", nil, expectedBytes) @@ -320,7 +320,7 @@ func TestAvroSerdesValidWithRuleSet(t *testing.T) { // Initialize the mock serializer and use latest schemaId serializationProvider, _ := GetSerializationProvider(avroSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err := serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -358,7 +358,7 @@ func TestAvroSerdesValidWithRuleSet(t *testing.T) { // Initialize the mock deserializer deserializationProvider, _ := GetDeserializationProvider(avroSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) actualString, err := deserializationProvider.Deserialize("topic1", nil, data) @@ -380,7 +380,7 @@ func TestAvroSerdesValidWithCSPERuleSet(t *testing.T) { // Initialize the mock serializer and use latest schemaId serializationProvider, _ := GetSerializationProvider(avroSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err := serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -418,7 +418,7 @@ func TestAvroSerdesValidWithCSPERuleSet(t *testing.T) { // Initialize the mock deserializer deserializationProvider, _ := GetDeserializationProvider(avroSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) actualString, err := deserializationProvider.Deserialize("topic1", nil, data) @@ -439,7 +439,7 @@ func TestJsonSerdesValid(t *testing.T) { // Initialize the mock serializer and use latest schemaId serializationProvider, _ := GetSerializationProvider(jsonSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err := serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -461,7 +461,7 @@ func TestJsonSerdesValid(t *testing.T) { // Initialize the mock deserializer deserializationProvider, _ := GetDeserializationProvider(jsonSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) err = deserializationProvider.LoadSchema("topic1-value", schemaPath, serde.ValueSerde, nil) @@ -483,7 +483,7 @@ func TestJsonSerdesValidWithHeaders(t *testing.T) { // Initialize the mock serializer and use latest schemaId serializationProvider, _ := GetSerializationProvider(jsonSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err := serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) serializationProvider.SetSchemaIDSerializer(serde.HeaderSchemaIDSerializer) @@ -504,7 +504,7 @@ func TestJsonSerdesValidWithHeaders(t *testing.T) { // Initialize the mock deserializer deserializationProvider, _ := GetDeserializationProvider(jsonSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) actualString, err := deserializationProvider.Deserialize("topic1", headers, expectedBytes) @@ -536,7 +536,7 @@ func TestJsonSerdesReference(t *testing.T) { // Initialize the mock serializer and use latest schemaId serializationProvider, _ := GetSerializationProvider(jsonSchemaName) - err = serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err = serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -566,7 +566,7 @@ func TestJsonSerdesReference(t *testing.T) { // Initialize the mock deserializer deserializationProvider, _ := GetDeserializationProvider(jsonSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) err = deserializationProvider.LoadSchema("topic1-value", schemaPath, serde.ValueSerde, nil) @@ -585,7 +585,7 @@ func TestJsonSerdesInvalid(t *testing.T) { // Initialize the mock serializer and use latest schemaId serializationProvider, _ := GetSerializationProvider(jsonSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err := serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -601,7 +601,7 @@ func TestJsonSerdesInvalid(t *testing.T) { // Initialize the mock deserializer deserializationProvider, _ := GetDeserializationProvider(jsonSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) brokenString := `{"f1":` @@ -638,7 +638,7 @@ func TestJsonSerdesNestedValid(t *testing.T) { // Initialize the mock serializer and use latest schemaId serializationProvider, _ := GetSerializationProvider(jsonSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err := serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -660,7 +660,7 @@ func TestJsonSerdesNestedValid(t *testing.T) { // Initialize the mock deserializer deserializationProvider, _ := GetDeserializationProvider(jsonSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) actualString, err := deserializationProvider.Deserialize("topic1", nil, expectedBytes) @@ -681,7 +681,7 @@ func TestJsonSerdesValidWithRuleSet(t *testing.T) { // Initialize the mock serializer and use latest schemaId serializationProvider, _ := GetSerializationProvider(jsonSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err := serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -719,7 +719,7 @@ func TestJsonSerdesValidWithRuleSet(t *testing.T) { // Initialize the mock deserializer deserializationProvider, _ := GetDeserializationProvider(jsonSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) actualString, err := deserializationProvider.Deserialize("topic1", nil, data) @@ -741,7 +741,7 @@ func TestJsonSerdesValidWithCSPERuleSet(t *testing.T) { // Initialize the mock serializer and use latest schemaId serializationProvider, _ := GetSerializationProvider(jsonSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err := serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -779,7 +779,7 @@ func TestJsonSerdesValidWithCSPERuleSet(t *testing.T) { // Initialize the mock deserializer deserializationProvider, _ := GetDeserializationProvider(jsonSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) actualString, err := deserializationProvider.Deserialize("topic1", nil, data) @@ -808,7 +808,7 @@ func TestProtobufSerdesValid(t *testing.T) { expectedString := `{"name":"abc","page":1,"result":2.5}` serializationProvider, _ := GetSerializationProvider(protobufSchemaName) - err = serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err = serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -826,7 +826,7 @@ func TestProtobufSerdesValid(t *testing.T) { req.Nil(err) deserializationProvider, _ := GetDeserializationProvider(protobufSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) err = deserializationProvider.LoadSchema("topic1-value", tempDir, serde.ValueSerde, &kafka.Message{Value: data}) req.Nil(err) @@ -855,7 +855,7 @@ func TestProtobufSerdesValidWithHeaders(t *testing.T) { expectedString := `{"name":"abc","page":1,"result":2.5}` serializationProvider, _ := GetSerializationProvider(protobufSchemaName) - err = serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err = serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) serializationProvider.SetSchemaIDSerializer(serde.HeaderSchemaIDSerializer) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) @@ -874,7 +874,7 @@ func TestProtobufSerdesValidWithHeaders(t *testing.T) { req.Nil(err) deserializationProvider, _ := GetDeserializationProvider(protobufSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) err = deserializationProvider.LoadSchema("topic1-value", tempDir, serde.ValueSerde, &kafka.Message{ Value: data, @@ -925,7 +925,7 @@ message Person { expectedString := `{"name":"abc","address":{"city":"LA"},"result":2}` serializationProvider, _ := GetSerializationProvider(protobufSchemaName) - err = serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err = serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{"address.proto": referencePath}) req.Nil(err) @@ -957,7 +957,7 @@ message Person { req.Nil(err) deserializationProvider, _ := GetDeserializationProvider(protobufSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) err = deserializationProvider.LoadSchema("topic1-value", tempDir, serde.ValueSerde, &kafka.Message{Value: data}) req.Nil(err) @@ -993,7 +993,7 @@ func TestProtobufSerdesInvalid(t *testing.T) { req.NoError(os.WriteFile(schemaPath, []byte(schemaString), 0644)) serializationProvider, _ := GetSerializationProvider(protobufSchemaName) - err = serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err = serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -1012,7 +1012,7 @@ func TestProtobufSerdesInvalid(t *testing.T) { req.Nil(err) deserializationProvider, _ := GetDeserializationProvider(protobufSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) err = deserializationProvider.LoadSchema("topic1-value", tempDir, serde.ValueSerde, &kafka.Message{Value: data}) req.Nil(err) @@ -1064,7 +1064,7 @@ func TestProtobufSerdesNestedValid(t *testing.T) { expectedString := `{"name":"abc","id":2,"add":{"zip":"123","street":"def"},"phones":{"number":"234"}}` serializationProvider, _ := GetSerializationProvider(protobufSchemaName) - err = serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err = serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -1082,7 +1082,7 @@ func TestProtobufSerdesNestedValid(t *testing.T) { req.Nil(err) deserializationProvider, _ := GetDeserializationProvider(protobufSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) err = deserializationProvider.LoadSchema("topic1-value", tempDir, serde.ValueSerde, &kafka.Message{Value: data}) req.Nil(err) @@ -1119,7 +1119,7 @@ func TestProtobufSerdesValidWithRuleSet(t *testing.T) { expectedString := `{"name":"abc","page":1,"result":2.5}` serializationProvider, _ := GetSerializationProvider(protobufSchemaName) - err = serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err = serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -1156,7 +1156,7 @@ func TestProtobufSerdesValidWithRuleSet(t *testing.T) { req.Nil(err) deserializationProvider, _ := GetDeserializationProvider(protobufSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) err = deserializationProvider.LoadSchema("topic1-value", tempDir, serde.ValueSerde, &kafka.Message{Value: data}) req.Nil(err) @@ -1190,7 +1190,7 @@ func TestProtobufSerdesValidWithCSPERuleSet(t *testing.T) { expectedString := `{"name":"abc","page":1,"result":2.5}` serializationProvider, _ := GetSerializationProvider(protobufSchemaName) - err = serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err = serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -1227,7 +1227,7 @@ func TestProtobufSerdesValidWithCSPERuleSet(t *testing.T) { req.Nil(err) deserializationProvider, _ := GetDeserializationProvider(protobufSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) err = deserializationProvider.LoadSchema("topic1-value", tempDir, serde.ValueSerde, &kafka.Message{Value: data}) req.Nil(err) @@ -1259,3 +1259,167 @@ func readSchemaReferences(references string) ([]schemaregistry.Reference, error) return refs, nil } + +func newMockClient(t *testing.T) schemaregistry.Client { + t.Helper() + client, err := schemaregistry.NewClient(schemaregistry.NewConfig(mockClientUrl)) + require.NoError(t, err) + return client +} + +func seedAssociation(t *testing.T, client schemaregistry.Client, topic, kafkaClusterId, mode, subject string) { + t.Helper() + // The mock requires the subject to have a registered schema before it + // will accept an association referencing it. + _, err := client.Register(subject, schemaregistry.SchemaInfo{ + Schema: `{"type":"record","name":"R","fields":[{"name":"f","type":"int"}]}`, + SchemaType: "AVRO", + }, false) + require.NoError(t, err) + _, err = client.CreateOrUpdateAssociation(schemaregistry.AssociationCreateOrUpdateRequest{ + ResourceName: topic, + ResourceNamespace: kafkaClusterId, + ResourceID: topic + ":" + kafkaClusterId, + ResourceType: "topic", + Associations: []schemaregistry.AssociationCreateOrUpdateInfo{{ + Subject: subject, + AssociationType: mode, + }}, + }) + require.NoError(t, err) +} + +func TestResolveSubject(t *testing.T) { + t.Run("nil client falls back to TopicNameStrategy", func(t *testing.T) { + require.Equal(t, "topic1-value", ResolveSubject(nil, "lkc-123", "topic1", "value")) + }) + + t.Run("empty kafkaClusterId falls back to TopicNameStrategy", func(t *testing.T) { + client := newMockClient(t) + require.Equal(t, "topic1-value", ResolveSubject(client, "", "topic1", "value")) + }) + + t.Run("no association falls back to TopicNameStrategy", func(t *testing.T) { + client := newMockClient(t) + require.Equal(t, "topic1-value", ResolveSubject(client, "lkc-123", "topic1", "value")) + }) + + t.Run("matching association returns its subject", func(t *testing.T) { + client := newMockClient(t) + seedAssociation(t, client, "topic1", "lkc-123", "value", "custom-value-subject") + require.Equal(t, "custom-value-subject", ResolveSubject(client, "lkc-123", "topic1", "value")) + }) + + t.Run("association for other mode falls back", func(t *testing.T) { + client := newMockClient(t) + seedAssociation(t, client, "topic1", "lkc-123", "key", "custom-key-subject") + require.Equal(t, "topic1-value", ResolveSubject(client, "lkc-123", "topic1", "value")) + }) + + t.Run("association under different cluster id falls back", func(t *testing.T) { + client := newMockClient(t) + seedAssociation(t, client, "topic1", "lkc-other", "value", "should-not-be-used") + require.Equal(t, "topic1-value", ResolveSubject(client, "lkc-123", "topic1", "value")) + }) +} + +// TestAvroSerdesUsesAssociatedSubject confirms that, end-to-end, configuring an +// Avro serializer with a non-empty kafkaClusterId routes serialization to the +// associated subject. The on-prem variant (empty kafkaClusterId) is exercised +// by the existing TestAvroSerdesValid which seeds under "topic1-value". +func TestAvroSerdesUsesAssociatedSubject(t *testing.T) { + req := require.New(t) + + schemaString := `{"type":"record","name":"myRecord","fields":[{"name":"f1","type":"int"}]}` + schemaPath := filepath.Join(tempDir, "avro-schema-associated.txt") + req.NoError(os.WriteFile(schemaPath, []byte(schemaString), 0644)) + + const ( + kafkaClusterId = "lkc-assoc-test" + topic = "associated-topic" + associated = "custom-associated-value" + ) + + serializationProvider, _ := GetSerializationProvider(avroSchemaName) + err := serializationProvider.InitSerializer(mockClientUrl, "", kafkaClusterId, "value", -1, SchemaRegistryAuth{}) + req.Nil(err) + err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) + req.Nil(err) + + // Register the real schema under the associated subject first, then + // attach an association without re-registering (so the associated + // subject's only version is the test's schema at id=1). + client := serializationProvider.GetSchemaRegistryClient() + info := schemaregistry.SchemaInfo{Schema: schemaString, SchemaType: "AVRO"} + _, err = client.Register(associated, info, false) + req.Nil(err) + _, err = client.CreateOrUpdateAssociation(schemaregistry.AssociationCreateOrUpdateRequest{ + ResourceName: topic, + ResourceNamespace: kafkaClusterId, + ResourceID: topic + ":" + kafkaClusterId, + ResourceType: "topic", + Associations: []schemaregistry.AssociationCreateOrUpdateInfo{{ + Subject: associated, + AssociationType: "value", + }}, + }) + req.Nil(err) + + _, _, err = serializationProvider.Serialize(topic, `{"f1":123}`) + req.Nil(err, "serialize should succeed using the associated subject") + + // Verify the serializer did not fall back to TopicNameStrategy by + // confirming no schema is registered under "-value". + _, err = client.GetLatestSchemaMetadata(topic + "-value") + req.Error(err, "schema should not be resolvable under TopicNameStrategy when an association exists") +} + +// TestAvroSerdesAssociatedRoundTrip verifies the consume side: when an +// association exists, the deserializer (configured with AssociatedNameStrategy) +// looks up the schema under the associated subject and decodes correctly. +func TestAvroSerdesAssociatedRoundTrip(t *testing.T) { + req := require.New(t) + + schemaString := `{"type":"record","name":"myRecord","fields":[{"name":"f1","type":"int"}]}` + schemaPath := filepath.Join(tempDir, "avro-schema-rt.txt") + req.NoError(os.WriteFile(schemaPath, []byte(schemaString), 0644)) + + const ( + kafkaClusterId = "lkc-rt-test" + topic = "rt-topic" + associated = "rt-associated-value" + message = `{"f1":7}` + ) + + serializationProvider, _ := GetSerializationProvider(avroSchemaName) + err := serializationProvider.InitSerializer(mockClientUrl, "", kafkaClusterId, "value", -1, SchemaRegistryAuth{}) + req.Nil(err) + err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) + req.Nil(err) + + client := serializationProvider.GetSchemaRegistryClient() + _, err = client.Register(associated, schemaregistry.SchemaInfo{Schema: schemaString, SchemaType: "AVRO"}, false) + req.Nil(err) + _, err = client.CreateOrUpdateAssociation(schemaregistry.AssociationCreateOrUpdateRequest{ + ResourceName: topic, + ResourceNamespace: kafkaClusterId, + ResourceID: topic + ":" + kafkaClusterId, + ResourceType: "topic", + Associations: []schemaregistry.AssociationCreateOrUpdateInfo{{ + Subject: associated, + AssociationType: "value", + }}, + }) + req.Nil(err) + + _, data, err := serializationProvider.Serialize(topic, message) + req.Nil(err) + + deserializationProvider, _ := GetDeserializationProvider(avroSchemaName) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", kafkaClusterId, "value", SchemaRegistryAuth{}, client) + req.Nil(err) + + decoded, err := deserializationProvider.Deserialize(topic, nil, data) + req.Nil(err, "deserialize should succeed using the associated subject") + req.JSONEq(message, decoded) +} diff --git a/pkg/serdes/string_deserialization_provider.go b/pkg/serdes/string_deserialization_provider.go index 0d1891a424..b040183a1e 100644 --- a/pkg/serdes/string_deserialization_provider.go +++ b/pkg/serdes/string_deserialization_provider.go @@ -8,7 +8,7 @@ import ( type StringDeserializationProvider struct{} -func (s *StringDeserializationProvider) InitDeserializer(_, _, _ string, _ SchemaRegistryAuth, _ schemaregistry.Client) error { +func (s *StringDeserializationProvider) InitDeserializer(_, _, _, _ string, _ SchemaRegistryAuth, _ schemaregistry.Client) error { return nil } diff --git a/pkg/serdes/string_serialization_provider.go b/pkg/serdes/string_serialization_provider.go index a88000ed6a..8352937be4 100644 --- a/pkg/serdes/string_serialization_provider.go +++ b/pkg/serdes/string_serialization_provider.go @@ -8,7 +8,7 @@ import ( type StringSerializationProvider struct{} -func (s *StringSerializationProvider) InitSerializer(_, _, _ string, _ int, _ SchemaRegistryAuth) error { +func (s *StringSerializationProvider) InitSerializer(_, _, _, _ string, _ int, _ SchemaRegistryAuth) error { return nil } From 7a72450649d9be885a918078d94a25530345ac40 Mon Sep 17 00:00:00 2001 From: Cynthia Qin Date: Fri, 29 May 2026 10:30:31 -0700 Subject: [PATCH 5/8] Update avro --- pkg/serdes/avro_deserialization_provider.go | 9 ++++++- pkg/serdes/avro_serialization_provider.go | 29 ++++++++++++++++----- 2 files changed, 31 insertions(+), 7 deletions(-) diff --git a/pkg/serdes/avro_deserialization_provider.go b/pkg/serdes/avro_deserialization_provider.go index 0b2e6c8221..7f04ba0de0 100644 --- a/pkg/serdes/avro_deserialization_provider.go +++ b/pkg/serdes/avro_deserialization_provider.go @@ -15,7 +15,7 @@ type AvroDeserializationProvider struct { deser *avrov2.Deserializer } -func (a *AvroDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error { +func (a *AvroDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, kafkaClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error { // Note: Now Serializer/Deserializer are tightly coupled with Schema Registry // If existingClient is not nil, we should share this client between ser and deser. // As the shared client is referred as mock client to store the same set of schemas in cache @@ -34,6 +34,13 @@ func (a *AvroDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, } } + if kafkaClusterId != "" { + serdeConfig.SubjectNameStrategyType = serde.AssociatedNameStrategyType + serdeConfig.SubjectNameStrategyConfig = map[string]string{serde.KafkaClusterIDConfig: kafkaClusterId} + } else { + serdeConfig.SubjectNameStrategyType = serde.TopicNameStrategyType + } + var serdeType serde.Type switch mode { case "key": diff --git a/pkg/serdes/avro_serialization_provider.go b/pkg/serdes/avro_serialization_provider.go index c71e68b516..976fd4923b 100644 --- a/pkg/serdes/avro_serialization_provider.go +++ b/pkg/serdes/avro_serialization_provider.go @@ -21,12 +21,14 @@ import ( ) type AvroSerializationProvider struct { - ser *avrov2.Serializer - schemaId int - mode string + ser *avrov2.Serializer + schemaId int + mode string + kafkaClusterId string + subjectCache map[string]string } -func (a *AvroSerializationProvider) InitSerializer(srClientUrl, srClusterId, mode string, schemaId int, srAuth SchemaRegistryAuth) error { +func (a *AvroSerializationProvider) InitSerializer(srClientUrl, srClusterId, kafkaClusterId, mode string, schemaId int, srAuth SchemaRegistryAuth) error { serdeClient, err := initSchemaRegistryClient(srClientUrl, srClusterId, srAuth, nil) if err != nil { return fmt.Errorf("failed to create serializer-specific Schema Registry client: %w", err) @@ -65,6 +67,13 @@ func (a *AvroSerializationProvider) InitSerializer(srClientUrl, srClusterId, mod serdeConfig.UseLatestVersion = false } + if kafkaClusterId != "" { + serdeConfig.SubjectNameStrategyType = serde.AssociatedNameStrategyType + serdeConfig.SubjectNameStrategyConfig = map[string]string{serde.KafkaClusterIDConfig: kafkaClusterId} + } else { + serdeConfig.SubjectNameStrategyType = serde.TopicNameStrategyType + } + var serdeType serde.Type if mode == "key" { serdeType = serde.KeySerde @@ -86,6 +95,8 @@ func (a *AvroSerializationProvider) InitSerializer(srClientUrl, srClusterId, mod a.schemaId = 1 } a.mode = mode + a.kafkaClusterId = kafkaClusterId + a.subjectCache = map[string]string{} return nil } @@ -98,8 +109,14 @@ func (a *AvroSerializationProvider) GetSchemaName() string { } func (a *AvroSerializationProvider) Serialize(topic, message string) ([]kafka.Header, []byte, error) { - // Step#1: Fetch the schemaInfo based on subject and schema ID - schemaObj, err := a.GetSchemaRegistryClient().GetBySubjectAndID(topic+"-"+a.mode, a.schemaId) + // Step#1: Fetch the schemaInfo based on subject and schema ID. + // Cache the per-topic subject so we don't re-hit the associations API on every message. + subject, ok := a.subjectCache[topic] + if !ok { + subject = ResolveSubject(a.GetSchemaRegistryClient(), a.kafkaClusterId, topic, a.mode) + a.subjectCache[topic] = subject + } + schemaObj, err := a.GetSchemaRegistryClient().GetBySubjectAndID(subject, a.schemaId) if err != nil { return nil, nil, fmt.Errorf("failed to serialize message: %w", err) } From 356c25b0c6d9a5240b16b68f4800656bd1357920 Mon Sep 17 00:00:00 2001 From: Cynthia Qin Date: Fri, 29 May 2026 11:28:42 -0700 Subject: [PATCH 6/8] Refactor to use helper instead --- pkg/serdes/avro_deserialization_provider.go | 7 +------ pkg/serdes/avro_serialization_provider.go | 7 +------ pkg/serdes/json_deserialization_provider.go | 7 +------ pkg/serdes/json_serialization_provider.go | 7 +------ pkg/serdes/protobuf_deserialization_provider.go | 7 +------ pkg/serdes/protobuf_serialization_provider.go | 7 +------ pkg/serdes/serdes.go | 9 +++++++++ 7 files changed, 15 insertions(+), 36 deletions(-) diff --git a/pkg/serdes/avro_deserialization_provider.go b/pkg/serdes/avro_deserialization_provider.go index 7f04ba0de0..f9387e4dd5 100644 --- a/pkg/serdes/avro_deserialization_provider.go +++ b/pkg/serdes/avro_deserialization_provider.go @@ -34,12 +34,7 @@ func (a *AvroDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, } } - if kafkaClusterId != "" { - serdeConfig.SubjectNameStrategyType = serde.AssociatedNameStrategyType - serdeConfig.SubjectNameStrategyConfig = map[string]string{serde.KafkaClusterIDConfig: kafkaClusterId} - } else { - serdeConfig.SubjectNameStrategyType = serde.TopicNameStrategyType - } + serdeConfig.SubjectNameStrategyType, serdeConfig.SubjectNameStrategyConfig = subjectStrategy(kafkaClusterId) var serdeType serde.Type switch mode { diff --git a/pkg/serdes/avro_serialization_provider.go b/pkg/serdes/avro_serialization_provider.go index 976fd4923b..98c8d3b70a 100644 --- a/pkg/serdes/avro_serialization_provider.go +++ b/pkg/serdes/avro_serialization_provider.go @@ -67,12 +67,7 @@ func (a *AvroSerializationProvider) InitSerializer(srClientUrl, srClusterId, kaf serdeConfig.UseLatestVersion = false } - if kafkaClusterId != "" { - serdeConfig.SubjectNameStrategyType = serde.AssociatedNameStrategyType - serdeConfig.SubjectNameStrategyConfig = map[string]string{serde.KafkaClusterIDConfig: kafkaClusterId} - } else { - serdeConfig.SubjectNameStrategyType = serde.TopicNameStrategyType - } + serdeConfig.SubjectNameStrategyType, serdeConfig.SubjectNameStrategyConfig = subjectStrategy(kafkaClusterId) var serdeType serde.Type if mode == "key" { diff --git a/pkg/serdes/json_deserialization_provider.go b/pkg/serdes/json_deserialization_provider.go index 840ab81936..88c117b4c5 100644 --- a/pkg/serdes/json_deserialization_provider.go +++ b/pkg/serdes/json_deserialization_provider.go @@ -36,12 +36,7 @@ func (j *JsonDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, } } - if kafkaClusterId != "" { - serdeConfig.SubjectNameStrategyType = serde.AssociatedNameStrategyType - serdeConfig.SubjectNameStrategyConfig = map[string]string{serde.KafkaClusterIDConfig: kafkaClusterId} - } else { - serdeConfig.SubjectNameStrategyType = serde.TopicNameStrategyType - } + serdeConfig.SubjectNameStrategyType, serdeConfig.SubjectNameStrategyConfig = subjectStrategy(kafkaClusterId) var serdeType serde.Type switch mode { diff --git a/pkg/serdes/json_serialization_provider.go b/pkg/serdes/json_serialization_provider.go index f350b3b87f..fa86b67f35 100644 --- a/pkg/serdes/json_serialization_provider.go +++ b/pkg/serdes/json_serialization_provider.go @@ -63,12 +63,7 @@ func (j *JsonSerializationProvider) InitSerializer(srClientUrl, srClusterId, kaf serdeConfig.UseLatestVersion = false } - if kafkaClusterId != "" { - serdeConfig.SubjectNameStrategyType = serde.AssociatedNameStrategyType - serdeConfig.SubjectNameStrategyConfig = map[string]string{serde.KafkaClusterIDConfig: kafkaClusterId} - } else { - serdeConfig.SubjectNameStrategyType = serde.TopicNameStrategyType - } + serdeConfig.SubjectNameStrategyType, serdeConfig.SubjectNameStrategyConfig = subjectStrategy(kafkaClusterId) var serdeType serde.Type if mode == "key" { diff --git a/pkg/serdes/protobuf_deserialization_provider.go b/pkg/serdes/protobuf_deserialization_provider.go index ea84164cac..61560f43de 100644 --- a/pkg/serdes/protobuf_deserialization_provider.go +++ b/pkg/serdes/protobuf_deserialization_provider.go @@ -44,12 +44,7 @@ func (p *ProtobufDeserializationProvider) InitDeserializer(srClientUrl, srCluste } } - if kafkaClusterId != "" { - serdeConfig.SubjectNameStrategyType = serde.AssociatedNameStrategyType - serdeConfig.SubjectNameStrategyConfig = map[string]string{serde.KafkaClusterIDConfig: kafkaClusterId} - } else { - serdeConfig.SubjectNameStrategyType = serde.TopicNameStrategyType - } + serdeConfig.SubjectNameStrategyType, serdeConfig.SubjectNameStrategyConfig = subjectStrategy(kafkaClusterId) var serdeType serde.Type switch mode { diff --git a/pkg/serdes/protobuf_serialization_provider.go b/pkg/serdes/protobuf_serialization_provider.go index 8588cea045..8272e9e09b 100644 --- a/pkg/serdes/protobuf_serialization_provider.go +++ b/pkg/serdes/protobuf_serialization_provider.go @@ -82,12 +82,7 @@ func (p *ProtobufSerializationProvider) InitSerializer(srClientUrl, srClusterId, serdeConfig.UseLatestVersion = false } - if kafkaClusterId != "" { - serdeConfig.SubjectNameStrategyType = serde.AssociatedNameStrategyType - serdeConfig.SubjectNameStrategyConfig = map[string]string{serde.KafkaClusterIDConfig: kafkaClusterId} - } else { - serdeConfig.SubjectNameStrategyType = serde.TopicNameStrategyType - } + serdeConfig.SubjectNameStrategyType, serdeConfig.SubjectNameStrategyConfig = subjectStrategy(kafkaClusterId) var serdeType serde.Type if mode == "key" { diff --git a/pkg/serdes/serdes.go b/pkg/serdes/serdes.go index 3f81bb0afc..e26177b2a9 100644 --- a/pkg/serdes/serdes.go +++ b/pkg/serdes/serdes.go @@ -165,6 +165,15 @@ func NewSchemaRegistryClient(srClientUrl, srClusterId string, srAuth SchemaRegis return initSchemaRegistryClient(srClientUrl, srClusterId, srAuth, nil) } +// returns the SerDes subject name strategy + its config for a given Kafka cluster id. +// on-prem callers which pass "" as default will stay on TopicNameStrategy. +func subjectStrategy(kafkaClusterId string) (serde.SubjectNameStrategyType, map[string]string) { + if kafkaClusterId != "" { + return serde.AssociatedNameStrategyType, map[string]string{serde.KafkaClusterIDConfig: kafkaClusterId} + } + return serde.TopicNameStrategyType, nil +} + // returns the SR subject for (topic, mode) by querying the associations API with the Kafka cluster id // as resource namespace. Falls backt o default TopicNameStrategy (-) if unmatched. func ResolveSubject(client schemaregistry.Client, kafkaClusterId, topic, mode string) string { From 3b49e15aafe45e28177bd2215cdfc6b7acf2c898 Mon Sep 17 00:00:00 2001 From: Cynthia Qin Date: Fri, 29 May 2026 16:47:03 -0700 Subject: [PATCH 7/8] Clean up serdes and more SR client to internal/kafka --- internal/kafka/command_topic_produce.go | 4 +- internal/kafka/utils.go | 34 ++++++ pkg/serdes/avro_serialization_provider.go | 21 ++-- pkg/serdes/serdes.go | 36 ++---- pkg/serdes/serdes_test.go | 141 +++------------------- 5 files changed, 72 insertions(+), 164 deletions(-) diff --git a/internal/kafka/command_topic_produce.go b/internal/kafka/command_topic_produce.go index 2bc81e4686..354743d358 100644 --- a/internal/kafka/command_topic_produce.go +++ b/internal/kafka/command_topic_produce.go @@ -524,8 +524,8 @@ func (c *command) initSchemaAndGetInfo(cmd *cobra.Command, topic, mode, kafkaClu // Resolve subject via SR associations, fall back to TopicNameStrategy on miss. subject := topicNameStrategy(topic, mode) if kafkaClusterId != "" && srEndpoint != "" { - if client, err := serdes.NewSchemaRegistryClient(srEndpoint, srClusterId, srAuth); err == nil { - subject = serdes.ResolveSubject(client, kafkaClusterId, topic, mode) + if client, err := newSchemaRegistryClient(srEndpoint, srClusterId, srAuth); err == nil { + subject = resolveSubject(client, kafkaClusterId, topic, mode) } } diff --git a/internal/kafka/utils.go b/internal/kafka/utils.go index 8df2dcdd23..1d0deee1e5 100644 --- a/internal/kafka/utils.go +++ b/internal/kafka/utils.go @@ -6,12 +6,15 @@ import ( cmkv2 "github.com/confluentinc/ccloud-sdk-go-v2/cmk/v2" cckafkarestv3 "github.com/confluentinc/ccloud-sdk-go-v2/kafkarest/v3" + "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry" cpkafkarestv3 "github.com/confluentinc/kafka-rest-sdk-go/kafkarestv3" "github.com/confluentinc/cli/v4/pkg/ccloudv2" "github.com/confluentinc/cli/v4/pkg/ccstructs" "github.com/confluentinc/cli/v4/pkg/kafkarest" "github.com/confluentinc/cli/v4/pkg/kafkausagelimits" + "github.com/confluentinc/cli/v4/pkg/log" + "github.com/confluentinc/cli/v4/pkg/serdes" ) func toCreateTopicConfigs(topicConfigsMap map[string]string) []cckafkarestv3.ConfigData { @@ -210,6 +213,37 @@ func topicNameStrategy(topic, mode string) string { return fmt.Sprintf("%s-%s", topic, mode) } +func newSchemaRegistryClient(srClientUrl, srClusterId string, srAuth serdes.SchemaRegistryAuth) (schemaregistry.Client, error) { + var cfg *schemaregistry.Config + switch { + case srAuth.ApiKey != "" && srAuth.ApiSecret != "": + cfg = schemaregistry.NewConfigWithBasicAuthentication(srClientUrl, srAuth.ApiKey, srAuth.ApiSecret) + case srAuth.Token != "": + cfg = schemaregistry.NewConfigWithBearerAuthentication(srClientUrl, srAuth.Token, srClusterId, "") + default: + cfg = schemaregistry.NewConfig(srClientUrl) + log.CliLogger.Info("initializing schema registry client with no authentication") + } + cfg.SslCaLocation = srAuth.CertificateAuthorityPath + cfg.SslCertificateLocation = srAuth.ClientCertPath + cfg.SslKeyLocation = srAuth.ClientKeyPath + return schemaregistry.NewClient(cfg) +} + +// returns the SR subject for (topic, mode) by querying the associations API with the Kafka cluster id +// as resource namespace. Falls backt o default TopicNameStrategy (-) if unmatched. +func resolveSubject(client schemaregistry.Client, kafkaClusterId, topic, mode string) string { + fallback := topic + "-" + mode + if kafkaClusterId == "" || client == nil { + return fallback + } + associations, err := client.GetAssociationsByResourceName(topic, kafkaClusterId, "topic", []string{mode}, "", 0, -1) + if err != nil || len(associations) == 0 { + return fallback + } + return associations[0].Subject +} + func getLimitsForSku(cluster *cmkv2.CmkV2Cluster, usageLimits *kafkausagelimits.UsageLimits) *kafkausagelimits.Limits { if isDedicated(cluster) { return usageLimits.GetCkuLimit(cluster.Status.GetCku()) diff --git a/pkg/serdes/avro_serialization_provider.go b/pkg/serdes/avro_serialization_provider.go index 98c8d3b70a..2ab52e1aad 100644 --- a/pkg/serdes/avro_serialization_provider.go +++ b/pkg/serdes/avro_serialization_provider.go @@ -21,11 +21,9 @@ import ( ) type AvroSerializationProvider struct { - ser *avrov2.Serializer - schemaId int - mode string - kafkaClusterId string - subjectCache map[string]string + ser *avrov2.Serializer + schemaId int + mode string } func (a *AvroSerializationProvider) InitSerializer(srClientUrl, srClusterId, kafkaClusterId, mode string, schemaId int, srAuth SchemaRegistryAuth) error { @@ -90,8 +88,6 @@ func (a *AvroSerializationProvider) InitSerializer(srClientUrl, srClusterId, kaf a.schemaId = 1 } a.mode = mode - a.kafkaClusterId = kafkaClusterId - a.subjectCache = map[string]string{} return nil } @@ -104,12 +100,11 @@ func (a *AvroSerializationProvider) GetSchemaName() string { } func (a *AvroSerializationProvider) Serialize(topic, message string) ([]kafka.Header, []byte, error) { - // Step#1: Fetch the schemaInfo based on subject and schema ID. - // Cache the per-topic subject so we don't re-hit the associations API on every message. - subject, ok := a.subjectCache[topic] - if !ok { - subject = ResolveSubject(a.GetSchemaRegistryClient(), a.kafkaClusterId, topic, a.mode) - a.subjectCache[topic] = subject + // Step#1: Ask the configured ckgo strategy for the subject (AssociatedNameStrategy + // on cloud, TopicNameStrategy on on-prem), then fetch the schema by subject + id. + subject, err := a.ser.SubjectNameStrategy(topic, a.ser.SerdeType, schemaregistry.SchemaInfo{}) + if err != nil { + return nil, nil, fmt.Errorf("failed to resolve subject: %w", err) } schemaObj, err := a.GetSchemaRegistryClient().GetBySubjectAndID(subject, a.schemaId) if err != nil { diff --git a/pkg/serdes/serdes.go b/pkg/serdes/serdes.go index e26177b2a9..60c48d9ac5 100644 --- a/pkg/serdes/serdes.go +++ b/pkg/serdes/serdes.go @@ -140,6 +140,15 @@ func IsProtobufSchema(valueFormat string) bool { return valueFormat == protobufSchemaName } +// returns the SerDes subject name strategy + its config for a given Kafka cluster id. +// on-prem callers which pass "" stay on TopicNameStrategy. +func subjectStrategy(kafkaClusterId string) (serde.SubjectNameStrategyType, map[string]string) { + if kafkaClusterId != "" { + return serde.AssociatedNameStrategyType, map[string]string{serde.KafkaClusterIDConfig: kafkaClusterId} + } + return serde.TopicNameStrategyType, nil +} + func initSchemaRegistryClient(srClientUrl, srClusterId string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) (schemaregistry.Client, error) { if existingClient != nil { return existingClient, nil @@ -160,30 +169,3 @@ func initSchemaRegistryClient(srClientUrl, srClusterId string, srAuth SchemaRegi return schemaregistry.NewClient(serdeClientConfig) } - -func NewSchemaRegistryClient(srClientUrl, srClusterId string, srAuth SchemaRegistryAuth) (schemaregistry.Client, error) { - return initSchemaRegistryClient(srClientUrl, srClusterId, srAuth, nil) -} - -// returns the SerDes subject name strategy + its config for a given Kafka cluster id. -// on-prem callers which pass "" as default will stay on TopicNameStrategy. -func subjectStrategy(kafkaClusterId string) (serde.SubjectNameStrategyType, map[string]string) { - if kafkaClusterId != "" { - return serde.AssociatedNameStrategyType, map[string]string{serde.KafkaClusterIDConfig: kafkaClusterId} - } - return serde.TopicNameStrategyType, nil -} - -// returns the SR subject for (topic, mode) by querying the associations API with the Kafka cluster id -// as resource namespace. Falls backt o default TopicNameStrategy (-) if unmatched. -func ResolveSubject(client schemaregistry.Client, kafkaClusterId, topic, mode string) string { - fallback := topic + "-" + mode - if kafkaClusterId == "" || client == nil { - return fallback - } - associations, err := client.GetAssociationsByResourceName(topic, kafkaClusterId, "topic", []string{mode}, "", 0, -1) - if err != nil || len(associations) == 0 { - return fallback - } - return associations[0].Subject -} diff --git a/pkg/serdes/serdes_test.go b/pkg/serdes/serdes_test.go index 2a5c1a7cf9..5c92aa986c 100644 --- a/pkg/serdes/serdes_test.go +++ b/pkg/serdes/serdes_test.go @@ -1260,73 +1260,10 @@ func readSchemaReferences(references string) ([]schemaregistry.Reference, error) return refs, nil } -func newMockClient(t *testing.T) schemaregistry.Client { - t.Helper() - client, err := schemaregistry.NewClient(schemaregistry.NewConfig(mockClientUrl)) - require.NoError(t, err) - return client -} - -func seedAssociation(t *testing.T, client schemaregistry.Client, topic, kafkaClusterId, mode, subject string) { - t.Helper() - // The mock requires the subject to have a registered schema before it - // will accept an association referencing it. - _, err := client.Register(subject, schemaregistry.SchemaInfo{ - Schema: `{"type":"record","name":"R","fields":[{"name":"f","type":"int"}]}`, - SchemaType: "AVRO", - }, false) - require.NoError(t, err) - _, err = client.CreateOrUpdateAssociation(schemaregistry.AssociationCreateOrUpdateRequest{ - ResourceName: topic, - ResourceNamespace: kafkaClusterId, - ResourceID: topic + ":" + kafkaClusterId, - ResourceType: "topic", - Associations: []schemaregistry.AssociationCreateOrUpdateInfo{{ - Subject: subject, - AssociationType: mode, - }}, - }) - require.NoError(t, err) -} - -func TestResolveSubject(t *testing.T) { - t.Run("nil client falls back to TopicNameStrategy", func(t *testing.T) { - require.Equal(t, "topic1-value", ResolveSubject(nil, "lkc-123", "topic1", "value")) - }) - - t.Run("empty kafkaClusterId falls back to TopicNameStrategy", func(t *testing.T) { - client := newMockClient(t) - require.Equal(t, "topic1-value", ResolveSubject(client, "", "topic1", "value")) - }) - - t.Run("no association falls back to TopicNameStrategy", func(t *testing.T) { - client := newMockClient(t) - require.Equal(t, "topic1-value", ResolveSubject(client, "lkc-123", "topic1", "value")) - }) - - t.Run("matching association returns its subject", func(t *testing.T) { - client := newMockClient(t) - seedAssociation(t, client, "topic1", "lkc-123", "value", "custom-value-subject") - require.Equal(t, "custom-value-subject", ResolveSubject(client, "lkc-123", "topic1", "value")) - }) - - t.Run("association for other mode falls back", func(t *testing.T) { - client := newMockClient(t) - seedAssociation(t, client, "topic1", "lkc-123", "key", "custom-key-subject") - require.Equal(t, "topic1-value", ResolveSubject(client, "lkc-123", "topic1", "value")) - }) - - t.Run("association under different cluster id falls back", func(t *testing.T) { - client := newMockClient(t) - seedAssociation(t, client, "topic1", "lkc-other", "value", "should-not-be-used") - require.Equal(t, "topic1-value", ResolveSubject(client, "lkc-123", "topic1", "value")) - }) -} - -// TestAvroSerdesUsesAssociatedSubject confirms that, end-to-end, configuring an -// Avro serializer with a non-empty kafkaClusterId routes serialization to the -// associated subject. The on-prem variant (empty kafkaClusterId) is exercised -// by the existing TestAvroSerdesValid which seeds under "topic1-value". +// TestAvroSerdesUsesAssociatedSubject verifies the cloud path end-to-end: when +// InitSerializer receives a non-empty kafkaClusterId, ckgo's strategy resolves +// the subject via the associations API, and Serialize fetches the schema under +// the associated subject (not under -). func TestAvroSerdesUsesAssociatedSubject(t *testing.T) { req := require.New(t) @@ -1346,12 +1283,8 @@ func TestAvroSerdesUsesAssociatedSubject(t *testing.T) { err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) - // Register the real schema under the associated subject first, then - // attach an association without re-registering (so the associated - // subject's only version is the test's schema at id=1). client := serializationProvider.GetSchemaRegistryClient() - info := schemaregistry.SchemaInfo{Schema: schemaString, SchemaType: "AVRO"} - _, err = client.Register(associated, info, false) + _, err = client.Register(associated, schemaregistry.SchemaInfo{Schema: schemaString, SchemaType: "AVRO"}, false) req.Nil(err) _, err = client.CreateOrUpdateAssociation(schemaregistry.AssociationCreateOrUpdateRequest{ ResourceName: topic, @@ -1366,60 +1299,24 @@ func TestAvroSerdesUsesAssociatedSubject(t *testing.T) { req.Nil(err) _, _, err = serializationProvider.Serialize(topic, `{"f1":123}`) - req.Nil(err, "serialize should succeed using the associated subject") + req.Nil(err, "serialize should resolve to the associated subject") - // Verify the serializer did not fall back to TopicNameStrategy by - // confirming no schema is registered under "-value". + // Confirm no schema lives under TopicNameStrategy: if Serialize had used the literal -, + // the test wouldn't be exercising the associated path. _, err = client.GetLatestSchemaMetadata(topic + "-value") - req.Error(err, "schema should not be resolvable under TopicNameStrategy when an association exists") + req.Error(err) } -// TestAvroSerdesAssociatedRoundTrip verifies the consume side: when an -// association exists, the deserializer (configured with AssociatedNameStrategy) -// looks up the schema under the associated subject and decodes correctly. -func TestAvroSerdesAssociatedRoundTrip(t *testing.T) { - req := require.New(t) - - schemaString := `{"type":"record","name":"myRecord","fields":[{"name":"f1","type":"int"}]}` - schemaPath := filepath.Join(tempDir, "avro-schema-rt.txt") - req.NoError(os.WriteFile(schemaPath, []byte(schemaString), 0644)) - - const ( - kafkaClusterId = "lkc-rt-test" - topic = "rt-topic" - associated = "rt-associated-value" - message = `{"f1":7}` - ) - - serializationProvider, _ := GetSerializationProvider(avroSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", kafkaClusterId, "value", -1, SchemaRegistryAuth{}) - req.Nil(err) - err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) - req.Nil(err) - - client := serializationProvider.GetSchemaRegistryClient() - _, err = client.Register(associated, schemaregistry.SchemaInfo{Schema: schemaString, SchemaType: "AVRO"}, false) - req.Nil(err) - _, err = client.CreateOrUpdateAssociation(schemaregistry.AssociationCreateOrUpdateRequest{ - ResourceName: topic, - ResourceNamespace: kafkaClusterId, - ResourceID: topic + ":" + kafkaClusterId, - ResourceType: "topic", - Associations: []schemaregistry.AssociationCreateOrUpdateInfo{{ - Subject: associated, - AssociationType: "value", - }}, +func TestSubjectStrategy(t *testing.T) { + t.Run("non-empty cluster id selects AssociatedNameStrategy", func(t *testing.T) { + typ, cfg := subjectStrategy("lkc-test") + require.Equal(t, serde.AssociatedNameStrategyType, typ) + require.Equal(t, map[string]string{serde.KafkaClusterIDConfig: "lkc-test"}, cfg) }) - req.Nil(err) - - _, data, err := serializationProvider.Serialize(topic, message) - req.Nil(err) - deserializationProvider, _ := GetDeserializationProvider(avroSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", kafkaClusterId, "value", SchemaRegistryAuth{}, client) - req.Nil(err) - - decoded, err := deserializationProvider.Deserialize(topic, nil, data) - req.Nil(err, "deserialize should succeed using the associated subject") - req.JSONEq(message, decoded) + t.Run("empty cluster id selects TopicNameStrategy", func(t *testing.T) { + typ, cfg := subjectStrategy("") + require.Equal(t, serde.TopicNameStrategyType, typ) + require.Nil(t, cfg) + }) } From b187cb3d73e9b673329cf3ecc1e0c169ca02171c Mon Sep 17 00:00:00 2001 From: Cynthia Qin Date: Fri, 29 May 2026 17:06:20 -0700 Subject: [PATCH 8/8] Add debug logs --- internal/kafka/command_topic_consume.go | 2 ++ internal/kafka/command_topic_produce.go | 2 ++ internal/kafka/utils.go | 8 +++++++- pkg/serdes/serdes.go | 2 ++ 4 files changed, 13 insertions(+), 1 deletion(-) diff --git a/internal/kafka/command_topic_consume.go b/internal/kafka/command_topic_consume.go index 5a4ba9c029..b4c576fe17 100644 --- a/internal/kafka/command_topic_consume.go +++ b/internal/kafka/command_topic_consume.go @@ -286,6 +286,8 @@ func (c *command) consumeCloud(cmd *cobra.Command, args []string) error { subject = schemaRegistryContext } + log.CliLogger.Tracef("consumeCloud: kafkaClusterId=%q topic=%q", cluster.ID, topic) + groupHandler := &GroupHandler{ SrClient: srClient, SrApiKey: srApiKey, diff --git a/internal/kafka/command_topic_produce.go b/internal/kafka/command_topic_produce.go index 354743d358..f79aa9f3b8 100644 --- a/internal/kafka/command_topic_produce.go +++ b/internal/kafka/command_topic_produce.go @@ -147,6 +147,8 @@ func (c *command) produceCloud(cmd *cobra.Command, args []string) error { return err } + log.CliLogger.Tracef("produceCloud: kafkaClusterId=%q topic=%q", cluster.ID, topic) + keySerializer, keyMetaInfo, err := c.initSchemaAndGetInfo(cmd, topic, "key", cluster.ID) if err != nil { return err diff --git a/internal/kafka/utils.go b/internal/kafka/utils.go index 1d0deee1e5..550bca23b8 100644 --- a/internal/kafka/utils.go +++ b/internal/kafka/utils.go @@ -238,9 +238,15 @@ func resolveSubject(client schemaregistry.Client, kafkaClusterId, topic, mode st return fallback } associations, err := client.GetAssociationsByResourceName(topic, kafkaClusterId, "topic", []string{mode}, "", 0, -1) - if err != nil || len(associations) == 0 { + if err != nil { + log.CliLogger.Tracef("subject resolution: associations lookup failed (topic=%q mode=%q clusterId=%q): %v; using %q", topic, mode, kafkaClusterId, err, fallback) return fallback } + if len(associations) == 0 { + log.CliLogger.Tracef("subject resolution: no association for topic=%q mode=%q clusterId=%q; using %q", topic, mode, kafkaClusterId, fallback) + return fallback + } + log.CliLogger.Tracef("subject resolution: resolved associated subject %q (topic=%q mode=%q clusterId=%q)", associations[0].Subject, topic, mode, kafkaClusterId) return associations[0].Subject } diff --git a/pkg/serdes/serdes.go b/pkg/serdes/serdes.go index 60c48d9ac5..d2b7c4532b 100644 --- a/pkg/serdes/serdes.go +++ b/pkg/serdes/serdes.go @@ -144,8 +144,10 @@ func IsProtobufSchema(valueFormat string) bool { // on-prem callers which pass "" stay on TopicNameStrategy. func subjectStrategy(kafkaClusterId string) (serde.SubjectNameStrategyType, map[string]string) { if kafkaClusterId != "" { + log.CliLogger.Tracef("subjectStrategy: AssociatedNameStrategy (kafkaClusterId=%q)", kafkaClusterId) return serde.AssociatedNameStrategyType, map[string]string{serde.KafkaClusterIDConfig: kafkaClusterId} } + log.CliLogger.Tracef("subjectStrategy: TopicNameStrategy (no kafkaClusterId)") return serde.TopicNameStrategyType, nil }