diff --git a/internal/asyncapi/command_export.go b/internal/asyncapi/command_export.go index 1b31206afe..20e2904de7 100644 --- a/internal/asyncapi/command_export.go +++ b/internal/asyncapi/command_export.go @@ -307,7 +307,7 @@ func (c *command) getMessageExamples(consumer *ckgo.Consumer, topicName, content return nil, err } - err = deserializationProvider.InitDeserializer(srEndpoint, srClusterId, "value", serdes.SchemaRegistryAuth{Token: token}, nil) + err = deserializationProvider.InitDeserializer(srEndpoint, srClusterId, "", "value", serdes.SchemaRegistryAuth{Token: token}, nil) if err != nil { return nil, err } diff --git a/internal/kafka/command_topic_consume.go b/internal/kafka/command_topic_consume.go index b80faa9b59..b4c576fe17 100644 --- a/internal/kafka/command_topic_consume.go +++ b/internal/kafka/command_topic_consume.go @@ -286,12 +286,15 @@ func (c *command) consumeCloud(cmd *cobra.Command, args []string) error { subject = schemaRegistryContext } + log.CliLogger.Tracef("consumeCloud: kafkaClusterId=%q topic=%q", cluster.ID, topic) + groupHandler := &GroupHandler{ SrClient: srClient, SrApiKey: srApiKey, SrApiSecret: srApiSecret, SrClusterId: srClusterId, SrClusterEndpoint: srEndpoint, + KafkaClusterId: cluster.ID, Token: token, KeyFormat: keyFormat, ValueFormat: valueFormat, diff --git a/internal/kafka/command_topic_produce.go b/internal/kafka/command_topic_produce.go index ec9c15ea98..f79aa9f3b8 100644 --- a/internal/kafka/command_topic_produce.go +++ b/internal/kafka/command_topic_produce.go @@ -147,12 +147,14 @@ func (c *command) produceCloud(cmd *cobra.Command, args []string) error { return err } - keySerializer, keyMetaInfo, err := c.initSchemaAndGetInfo(cmd, topic, "key") + log.CliLogger.Tracef("produceCloud: kafkaClusterId=%q topic=%q", cluster.ID, topic) + + keySerializer, keyMetaInfo, err := c.initSchemaAndGetInfo(cmd, topic, "key", cluster.ID) if err != nil { return err } - valueSerializer, valueMetaInfo, err := c.initSchemaAndGetInfo(cmd, topic, "value") + valueSerializer, valueMetaInfo, err := c.initSchemaAndGetInfo(cmd, topic, "value", cluster.ID) if err != nil { return err } @@ -207,12 +209,12 @@ func (c *command) produceCloud(cmd *cobra.Command, args []string) error { func (c *command) produceOnPrem(cmd *cobra.Command, args []string) error { topic := args[0] - keySerializer, keyMetaInfo, err := c.initSchemaAndGetInfoOnPrem(cmd, topic, "key") + keySerializer, keyMetaInfo, err := c.initSchemaAndGetInfoOnPrem(cmd, topic, "key", "") if err != nil { return err } - valueSerializer, valueMetaInfo, err := c.initSchemaAndGetInfoOnPrem(cmd, topic, "value") + valueSerializer, valueMetaInfo, err := c.initSchemaAndGetInfoOnPrem(cmd, topic, "value", "") if err != nil { return err } @@ -461,14 +463,12 @@ func getKeyAndValue(schemaBased bool, data, delimiter string) (string, string, e return "", "", errors.New(missingOrMalformedKeyErrorMsg) } -func (c *command) initSchemaAndGetInfo(cmd *cobra.Command, topic, mode string) (serdes.SerializationProvider, []byte, error) { +func (c *command) initSchemaAndGetInfo(cmd *cobra.Command, topic, mode, kafkaClusterId string) (serdes.SerializationProvider, []byte, error) { schemaDir, err := createTempDir() if err != nil { return nil, nil, err } - subject := topicNameStrategy(topic, mode) - // Deprecated var schemaId optional.Int32 if mode == "value" && cmd.Flags().Changed("schema-id") { @@ -491,6 +491,46 @@ func (c *command) initSchemaAndGetInfo(cmd *cobra.Command, topic, mode string) ( schemaId = optional.NewInt32(int32(id)) } + srEndpoint, err := cmd.Flags().GetString("schema-registry-endpoint") + if err != nil { + return nil, nil, err + } + srApiKey, err := cmd.Flags().GetString("schema-registry-api-key") + if err != nil { + return nil, nil, err + } + srApiSecret, err := cmd.Flags().GetString("schema-registry-api-secret") + if err != nil { + return nil, nil, err + } + var token string + if c.Config.IsCloudLogin() { // Do not get token if users are consuming from Cloud while logged out + token, err = auth.GetDataplaneToken(c.Context) + if err != nil { + return nil, nil, err + } + } + var srClusterId string + if (schemaId.IsSet() || schema != "") && srEndpoint == "" { + srClusterId, srEndpoint, err = c.GetCurrentSchemaRegistryClusterIdAndEndpoint(cmd) + if err != nil { + return nil, nil, err + } + } + srAuth := serdes.SchemaRegistryAuth{ + ApiKey: srApiKey, + ApiSecret: srApiSecret, + Token: token, + } + + // Resolve subject via SR associations, fall back to TopicNameStrategy on miss. + subject := topicNameStrategy(topic, mode) + if kafkaClusterId != "" && srEndpoint != "" { + if client, err := newSchemaRegistryClient(srEndpoint, srClusterId, srAuth); err == nil { + subject = resolveSubject(client, kafkaClusterId, topic, mode) + } + } + var format string referencePathMap := map[string]string{} metaInfo := []byte{} @@ -559,49 +599,12 @@ func (c *command) initSchemaAndGetInfo(cmd *cobra.Command, topic, mode string) ( } } - // Fetch the SR client endpoint during schema registration - srEndpoint, err := cmd.Flags().GetString("schema-registry-endpoint") - if err != nil { - return nil, nil, err - } - - var srClusterId string - if (schemaId.IsSet() || schema != "") && srEndpoint == "" { - srClusterId, srEndpoint, err = c.GetCurrentSchemaRegistryClusterIdAndEndpoint(cmd) - if err != nil { - return nil, nil, err - } - } - - // Initialize the serializer with the same SR endpoint during registration - // The associated schema ID is also required to initialize the serializer - srApiKey, err := cmd.Flags().GetString("schema-registry-api-key") - if err != nil { - return nil, nil, err - } - srApiSecret, err := cmd.Flags().GetString("schema-registry-api-secret") - if err != nil { - return nil, nil, err - } var parsedSchemaId = -1 if len(metaInfo) >= 5 { parsedSchemaId = int(binary.BigEndian.Uint32(metaInfo[1:5])) } - var token string - if c.Config.IsCloudLogin() { // Do not get token if users are consuming from Cloud while logged out - token, err = auth.GetDataplaneToken(c.Context) - if err != nil { - return nil, nil, err - } - } - srAuth := serdes.SchemaRegistryAuth{ - ApiKey: srApiKey, - ApiSecret: srApiSecret, - Token: token, - } - err = serializationProvider.InitSerializer(srEndpoint, srClusterId, mode, parsedSchemaId, srAuth) - if err != nil { + if err := serializationProvider.InitSerializer(srEndpoint, srClusterId, kafkaClusterId, mode, parsedSchemaId, srAuth); err != nil { return nil, nil, err } @@ -613,7 +616,7 @@ func (c *command) initSchemaAndGetInfo(cmd *cobra.Command, topic, mode string) ( return serializationProvider, metaInfo, nil } -func (c *command) initSchemaAndGetInfoOnPrem(cmd *cobra.Command, topic, mode string) (serdes.SerializationProvider, []byte, error) { +func (c *command) initSchemaAndGetInfoOnPrem(cmd *cobra.Command, topic, mode, kafkaClusterId string) (serdes.SerializationProvider, []byte, error) { schemaDir, err := createTempDir() if err != nil { return nil, nil, err @@ -732,7 +735,7 @@ func (c *command) initSchemaAndGetInfoOnPrem(cmd *cobra.Command, topic, mode str ClientKeyPath: clientKeyPath, Token: token, } - err = serializationProvider.InitSerializer(srEndpoint, "", mode, parsedSchemaId, srAuth) + err = serializationProvider.InitSerializer(srEndpoint, "", kafkaClusterId, mode, parsedSchemaId, srAuth) if err != nil { return nil, nil, err } diff --git a/internal/kafka/confluent_kafka.go b/internal/kafka/confluent_kafka.go index 2ebe6cfd6a..6c6f91e390 100644 --- a/internal/kafka/confluent_kafka.go +++ b/internal/kafka/confluent_kafka.go @@ -58,6 +58,7 @@ type GroupHandler struct { SrApiSecret string SrClusterId string SrClusterEndpoint string + KafkaClusterId string Token string CertificateAuthorityPath string ClientCertPath string @@ -241,7 +242,7 @@ func ConsumeMessage(message *ckgo.Message, h *GroupHandler) error { return err } - err = keyDeserializer.InitDeserializer(h.SrClusterEndpoint, h.SrClusterId, "key", srAuth, nil) + err = keyDeserializer.InitDeserializer(h.SrClusterEndpoint, h.SrClusterId, h.KafkaClusterId, "key", srAuth, nil) if err != nil { return err } @@ -268,7 +269,7 @@ func ConsumeMessage(message *ckgo.Message, h *GroupHandler) error { return err } - err = valueDeserializer.InitDeserializer(h.SrClusterEndpoint, h.SrClusterId, "value", srAuth, nil) + err = valueDeserializer.InitDeserializer(h.SrClusterEndpoint, h.SrClusterId, h.KafkaClusterId, "value", srAuth, nil) if err != nil { return err } diff --git a/internal/kafka/utils.go b/internal/kafka/utils.go index 8df2dcdd23..550bca23b8 100644 --- a/internal/kafka/utils.go +++ b/internal/kafka/utils.go @@ -6,12 +6,15 @@ import ( cmkv2 "github.com/confluentinc/ccloud-sdk-go-v2/cmk/v2" cckafkarestv3 "github.com/confluentinc/ccloud-sdk-go-v2/kafkarest/v3" + "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry" cpkafkarestv3 "github.com/confluentinc/kafka-rest-sdk-go/kafkarestv3" "github.com/confluentinc/cli/v4/pkg/ccloudv2" "github.com/confluentinc/cli/v4/pkg/ccstructs" "github.com/confluentinc/cli/v4/pkg/kafkarest" "github.com/confluentinc/cli/v4/pkg/kafkausagelimits" + "github.com/confluentinc/cli/v4/pkg/log" + "github.com/confluentinc/cli/v4/pkg/serdes" ) func toCreateTopicConfigs(topicConfigsMap map[string]string) []cckafkarestv3.ConfigData { @@ -210,6 +213,43 @@ func topicNameStrategy(topic, mode string) string { return fmt.Sprintf("%s-%s", topic, mode) } +func newSchemaRegistryClient(srClientUrl, srClusterId string, srAuth serdes.SchemaRegistryAuth) (schemaregistry.Client, error) { + var cfg *schemaregistry.Config + switch { + case srAuth.ApiKey != "" && srAuth.ApiSecret != "": + cfg = schemaregistry.NewConfigWithBasicAuthentication(srClientUrl, srAuth.ApiKey, srAuth.ApiSecret) + case srAuth.Token != "": + cfg = schemaregistry.NewConfigWithBearerAuthentication(srClientUrl, srAuth.Token, srClusterId, "") + default: + cfg = schemaregistry.NewConfig(srClientUrl) + log.CliLogger.Info("initializing schema registry client with no authentication") + } + cfg.SslCaLocation = srAuth.CertificateAuthorityPath + cfg.SslCertificateLocation = srAuth.ClientCertPath + cfg.SslKeyLocation = srAuth.ClientKeyPath + return schemaregistry.NewClient(cfg) +} + +// returns the SR subject for (topic, mode) by querying the associations API with the Kafka cluster id +// as resource namespace. Falls backt o default TopicNameStrategy (-) if unmatched. +func resolveSubject(client schemaregistry.Client, kafkaClusterId, topic, mode string) string { + fallback := topic + "-" + mode + if kafkaClusterId == "" || client == nil { + return fallback + } + associations, err := client.GetAssociationsByResourceName(topic, kafkaClusterId, "topic", []string{mode}, "", 0, -1) + if err != nil { + log.CliLogger.Tracef("subject resolution: associations lookup failed (topic=%q mode=%q clusterId=%q): %v; using %q", topic, mode, kafkaClusterId, err, fallback) + return fallback + } + if len(associations) == 0 { + log.CliLogger.Tracef("subject resolution: no association for topic=%q mode=%q clusterId=%q; using %q", topic, mode, kafkaClusterId, fallback) + return fallback + } + log.CliLogger.Tracef("subject resolution: resolved associated subject %q (topic=%q mode=%q clusterId=%q)", associations[0].Subject, topic, mode, kafkaClusterId) + return associations[0].Subject +} + func getLimitsForSku(cluster *cmkv2.CmkV2Cluster, usageLimits *kafkausagelimits.UsageLimits) *kafkausagelimits.Limits { if isDedicated(cluster) { return usageLimits.GetCkuLimit(cluster.Status.GetCku()) diff --git a/pkg/serdes/avro_deserialization_provider.go b/pkg/serdes/avro_deserialization_provider.go index 0b2e6c8221..f9387e4dd5 100644 --- a/pkg/serdes/avro_deserialization_provider.go +++ b/pkg/serdes/avro_deserialization_provider.go @@ -15,7 +15,7 @@ type AvroDeserializationProvider struct { deser *avrov2.Deserializer } -func (a *AvroDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error { +func (a *AvroDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, kafkaClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error { // Note: Now Serializer/Deserializer are tightly coupled with Schema Registry // If existingClient is not nil, we should share this client between ser and deser. // As the shared client is referred as mock client to store the same set of schemas in cache @@ -34,6 +34,8 @@ func (a *AvroDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, } } + serdeConfig.SubjectNameStrategyType, serdeConfig.SubjectNameStrategyConfig = subjectStrategy(kafkaClusterId) + var serdeType serde.Type switch mode { case "key": diff --git a/pkg/serdes/avro_serialization_provider.go b/pkg/serdes/avro_serialization_provider.go index c71e68b516..2ab52e1aad 100644 --- a/pkg/serdes/avro_serialization_provider.go +++ b/pkg/serdes/avro_serialization_provider.go @@ -26,7 +26,7 @@ type AvroSerializationProvider struct { mode string } -func (a *AvroSerializationProvider) InitSerializer(srClientUrl, srClusterId, mode string, schemaId int, srAuth SchemaRegistryAuth) error { +func (a *AvroSerializationProvider) InitSerializer(srClientUrl, srClusterId, kafkaClusterId, mode string, schemaId int, srAuth SchemaRegistryAuth) error { serdeClient, err := initSchemaRegistryClient(srClientUrl, srClusterId, srAuth, nil) if err != nil { return fmt.Errorf("failed to create serializer-specific Schema Registry client: %w", err) @@ -65,6 +65,8 @@ func (a *AvroSerializationProvider) InitSerializer(srClientUrl, srClusterId, mod serdeConfig.UseLatestVersion = false } + serdeConfig.SubjectNameStrategyType, serdeConfig.SubjectNameStrategyConfig = subjectStrategy(kafkaClusterId) + var serdeType serde.Type if mode == "key" { serdeType = serde.KeySerde @@ -98,8 +100,13 @@ func (a *AvroSerializationProvider) GetSchemaName() string { } func (a *AvroSerializationProvider) Serialize(topic, message string) ([]kafka.Header, []byte, error) { - // Step#1: Fetch the schemaInfo based on subject and schema ID - schemaObj, err := a.GetSchemaRegistryClient().GetBySubjectAndID(topic+"-"+a.mode, a.schemaId) + // Step#1: Ask the configured ckgo strategy for the subject (AssociatedNameStrategy + // on cloud, TopicNameStrategy on on-prem), then fetch the schema by subject + id. + subject, err := a.ser.SubjectNameStrategy(topic, a.ser.SerdeType, schemaregistry.SchemaInfo{}) + if err != nil { + return nil, nil, fmt.Errorf("failed to resolve subject: %w", err) + } + schemaObj, err := a.GetSchemaRegistryClient().GetBySubjectAndID(subject, a.schemaId) if err != nil { return nil, nil, fmt.Errorf("failed to serialize message: %w", err) } diff --git a/pkg/serdes/double_deserialization_provider.go b/pkg/serdes/double_deserialization_provider.go index d24f2d3e67..b1a3993bfd 100644 --- a/pkg/serdes/double_deserialization_provider.go +++ b/pkg/serdes/double_deserialization_provider.go @@ -12,7 +12,7 @@ import ( type DoubleDeserializationProvider struct{} -func (DoubleDeserializationProvider) InitDeserializer(_, _, _ string, _ SchemaRegistryAuth, _ schemaregistry.Client) error { +func (DoubleDeserializationProvider) InitDeserializer(_, _, _, _ string, _ SchemaRegistryAuth, _ schemaregistry.Client) error { return nil } diff --git a/pkg/serdes/double_serialization_provider.go b/pkg/serdes/double_serialization_provider.go index e17c17da25..897d676519 100644 --- a/pkg/serdes/double_serialization_provider.go +++ b/pkg/serdes/double_serialization_provider.go @@ -12,7 +12,7 @@ import ( type DoubleSerializationProvider struct{} -func (DoubleSerializationProvider) InitSerializer(_, _, _ string, _ int, _ SchemaRegistryAuth) error { +func (DoubleSerializationProvider) InitSerializer(_, _, _, _ string, _ int, _ SchemaRegistryAuth) error { return nil } diff --git a/pkg/serdes/integer_deserialization_provider.go b/pkg/serdes/integer_deserialization_provider.go index 564b3ae724..2a26d2b550 100644 --- a/pkg/serdes/integer_deserialization_provider.go +++ b/pkg/serdes/integer_deserialization_provider.go @@ -11,7 +11,7 @@ import ( type IntegerDeserializationProvider struct{} -func (IntegerDeserializationProvider) InitDeserializer(_, _, _ string, _ SchemaRegistryAuth, _ schemaregistry.Client) error { +func (IntegerDeserializationProvider) InitDeserializer(_, _, _, _ string, _ SchemaRegistryAuth, _ schemaregistry.Client) error { return nil } diff --git a/pkg/serdes/integer_serialization_provider.go b/pkg/serdes/integer_serialization_provider.go index 12b8de571b..0770292d29 100644 --- a/pkg/serdes/integer_serialization_provider.go +++ b/pkg/serdes/integer_serialization_provider.go @@ -11,7 +11,7 @@ import ( type IntegerSerializationProvider struct{} -func (IntegerSerializationProvider) InitSerializer(_, _, _ string, _ int, _ SchemaRegistryAuth) error { +func (IntegerSerializationProvider) InitSerializer(_, _, _, _ string, _ int, _ SchemaRegistryAuth) error { return nil } diff --git a/pkg/serdes/json_deserialization_provider.go b/pkg/serdes/json_deserialization_provider.go index b8ebe2d33c..88c117b4c5 100644 --- a/pkg/serdes/json_deserialization_provider.go +++ b/pkg/serdes/json_deserialization_provider.go @@ -15,7 +15,7 @@ type JsonDeserializationProvider struct { deser *jsonschema.Deserializer } -func (j *JsonDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error { +func (j *JsonDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, kafkaClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error { // Note: Now Serializer/Deserializer are tightly coupled with Schema Registry // If existingClient is not nil, we should share this client between ser and deser. // As the shared client is referred as mock client to store the same set of schemas in cache @@ -36,6 +36,8 @@ func (j *JsonDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, } } + serdeConfig.SubjectNameStrategyType, serdeConfig.SubjectNameStrategyConfig = subjectStrategy(kafkaClusterId) + var serdeType serde.Type switch mode { case "key": diff --git a/pkg/serdes/json_serialization_provider.go b/pkg/serdes/json_serialization_provider.go index 600fa6d810..fa86b67f35 100644 --- a/pkg/serdes/json_serialization_provider.go +++ b/pkg/serdes/json_serialization_provider.go @@ -23,7 +23,7 @@ type JsonSerializationProvider struct { ser *jsonschema.Serializer } -func (j *JsonSerializationProvider) InitSerializer(srClientUrl, srClusterId, mode string, schemaId int, srAuth SchemaRegistryAuth) error { +func (j *JsonSerializationProvider) InitSerializer(srClientUrl, srClusterId, kafkaClusterId, mode string, schemaId int, srAuth SchemaRegistryAuth) error { serdeClient, err := initSchemaRegistryClient(srClientUrl, srClusterId, srAuth, nil) if err != nil { return fmt.Errorf("failed to create serializer-specific Schema Registry client: %w", err) @@ -63,6 +63,8 @@ func (j *JsonSerializationProvider) InitSerializer(srClientUrl, srClusterId, mod serdeConfig.UseLatestVersion = false } + serdeConfig.SubjectNameStrategyType, serdeConfig.SubjectNameStrategyConfig = subjectStrategy(kafkaClusterId) + var serdeType serde.Type if mode == "key" { serdeType = serde.KeySerde diff --git a/pkg/serdes/protobuf_deserialization_provider.go b/pkg/serdes/protobuf_deserialization_provider.go index 695f176465..61560f43de 100644 --- a/pkg/serdes/protobuf_deserialization_provider.go +++ b/pkg/serdes/protobuf_deserialization_provider.go @@ -25,7 +25,7 @@ type ProtobufDeserializationProvider struct { message gproto.Message } -func (p *ProtobufDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error { +func (p *ProtobufDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, kafkaClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error { // Note: Now Serializer/Deserializer are tightly coupled with Schema Registry // If existingClient is not nil, we should share this client between ser and deser. // As the shared client is referred as mock client to store the same set of schemas in cache @@ -44,6 +44,8 @@ func (p *ProtobufDeserializationProvider) InitDeserializer(srClientUrl, srCluste } } + serdeConfig.SubjectNameStrategyType, serdeConfig.SubjectNameStrategyConfig = subjectStrategy(kafkaClusterId) + var serdeType serde.Type switch mode { case "key": diff --git a/pkg/serdes/protobuf_serialization_provider.go b/pkg/serdes/protobuf_serialization_provider.go index 2f8e2ce45b..8272e9e09b 100644 --- a/pkg/serdes/protobuf_serialization_provider.go +++ b/pkg/serdes/protobuf_serialization_provider.go @@ -43,7 +43,7 @@ type ProtobufSerializationProvider struct { message gproto.Message } -func (p *ProtobufSerializationProvider) InitSerializer(srClientUrl, srClusterId, mode string, schemaId int, srAuth SchemaRegistryAuth) error { +func (p *ProtobufSerializationProvider) InitSerializer(srClientUrl, srClusterId, kafkaClusterId, mode string, schemaId int, srAuth SchemaRegistryAuth) error { serdeClient, err := initSchemaRegistryClient(srClientUrl, srClusterId, srAuth, nil) if err != nil { return fmt.Errorf("failed to create serializer-specific Schema Registry client: %w", err) @@ -82,6 +82,8 @@ func (p *ProtobufSerializationProvider) InitSerializer(srClientUrl, srClusterId, serdeConfig.UseLatestVersion = false } + serdeConfig.SubjectNameStrategyType, serdeConfig.SubjectNameStrategyConfig = subjectStrategy(kafkaClusterId) + var serdeType serde.Type if mode == "key" { serdeType = serde.KeySerde diff --git a/pkg/serdes/serdes.go b/pkg/serdes/serdes.go index be1e0a4b3b..d2b7c4532b 100644 --- a/pkg/serdes/serdes.go +++ b/pkg/serdes/serdes.go @@ -68,7 +68,7 @@ type SchemaRegistryAuth struct { } type SerializationProvider interface { - InitSerializer(srClientUrl, srClusterId, mode string, schemaId int, srAuth SchemaRegistryAuth) error + InitSerializer(srClientUrl, srClusterId, kafkaClusterId, mode string, schemaId int, srAuth SchemaRegistryAuth) error LoadSchema(string, map[string]string) error Serialize(string, string) ([]kafka.Header, []byte, error) GetSchemaName() string @@ -77,7 +77,7 @@ type SerializationProvider interface { } type DeserializationProvider interface { - InitDeserializer(srClientUrl, srClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error + InitDeserializer(srClientUrl, srClusterId, kafkaClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error LoadSchema(string, string, serde.Type, *kafka.Message) error Deserialize(string, []kafka.Header, []byte) (string, error) GetSchemaRegistryClient() schemaregistry.Client @@ -140,6 +140,17 @@ func IsProtobufSchema(valueFormat string) bool { return valueFormat == protobufSchemaName } +// returns the SerDes subject name strategy + its config for a given Kafka cluster id. +// on-prem callers which pass "" stay on TopicNameStrategy. +func subjectStrategy(kafkaClusterId string) (serde.SubjectNameStrategyType, map[string]string) { + if kafkaClusterId != "" { + log.CliLogger.Tracef("subjectStrategy: AssociatedNameStrategy (kafkaClusterId=%q)", kafkaClusterId) + return serde.AssociatedNameStrategyType, map[string]string{serde.KafkaClusterIDConfig: kafkaClusterId} + } + log.CliLogger.Tracef("subjectStrategy: TopicNameStrategy (no kafkaClusterId)") + return serde.TopicNameStrategyType, nil +} + func initSchemaRegistryClient(srClientUrl, srClusterId string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) (schemaregistry.Client, error) { if existingClient != nil { return existingClient, nil diff --git a/pkg/serdes/serdes_test.go b/pkg/serdes/serdes_test.go index 049f72c725..5c92aa986c 100644 --- a/pkg/serdes/serdes_test.go +++ b/pkg/serdes/serdes_test.go @@ -46,7 +46,7 @@ func TestInitSchemaRegistryClient(t *testing.T) { // Basic Auth provider, err := GetDeserializationProvider(avroSchemaName) req.Nil(err) - err = provider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{ + err = provider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{ ApiKey: "key", ApiSecret: "secret", }, nil) @@ -60,7 +60,7 @@ func TestInitSchemaRegistryClient(t *testing.T) { serde.GlobalRuleRegistry().Clear() provider, err = GetDeserializationProvider(jsonSchemaName) req.Nil(err) - err = provider.InitDeserializer(mockClientUrl, "lsrc-abc123", "value", SchemaRegistryAuth{Token: "token"}, nil) + err = provider.InitDeserializer(mockClientUrl, "lsrc-abc123", "", "value", SchemaRegistryAuth{Token: "token"}, nil) req.Nil(err) config = provider.GetSchemaRegistryClient().Config() req.Equal(config.SchemaRegistryURL, mockClientUrl) @@ -72,7 +72,7 @@ func TestInitSchemaRegistryClient(t *testing.T) { serde.GlobalRuleRegistry().Clear() provider, err = GetDeserializationProvider(protobufSchemaName) req.Nil(err) - err = provider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{ + err = provider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{ CertificateAuthorityPath: "ca.cert", ClientCertPath: "client.crt", ClientKeyPath: "client.key", @@ -146,7 +146,7 @@ func TestAvroSerdesValid(t *testing.T) { // Initialize the mock serializer and use latest schemaId serializationProvider, _ := GetSerializationProvider(avroSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err := serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -168,7 +168,7 @@ func TestAvroSerdesValid(t *testing.T) { // Initialize the mock deserializer deserializationProvider, _ := GetDeserializationProvider(avroSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) actualString, err := deserializationProvider.Deserialize("topic1", nil, data) @@ -189,7 +189,7 @@ func TestAvroSerdesValidWithHeaders(t *testing.T) { // Initialize the mock serializer and use latest schemaId serializationProvider, _ := GetSerializationProvider(avroSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err := serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) serializationProvider.SetSchemaIDSerializer(serde.HeaderSchemaIDSerializer) @@ -210,7 +210,7 @@ func TestAvroSerdesValidWithHeaders(t *testing.T) { // Initialize the mock deserializer deserializationProvider, _ := GetDeserializationProvider(avroSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) actualString, err := deserializationProvider.Deserialize("topic1", headers, data) @@ -228,7 +228,7 @@ func TestAvroSerdesInvalid(t *testing.T) { // Initialize the mock serializer and use latest schemaId serializationProvider, _ := GetSerializationProvider(avroSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err := serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -246,7 +246,7 @@ func TestAvroSerdesInvalid(t *testing.T) { deserializationProvider, _ := GetDeserializationProvider(avroSchemaName) req.Nil(err) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) brokenString := `{"f1"` @@ -276,7 +276,7 @@ func TestAvroSerdesNestedValid(t *testing.T) { // Initialize the mock serializer and use latest schemaId serializationProvider, _ := GetSerializationProvider(avroSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err := serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -298,7 +298,7 @@ func TestAvroSerdesNestedValid(t *testing.T) { // Initialize the mock deserializer deserializationProvider, _ := GetDeserializationProvider(avroSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) actualString, err := deserializationProvider.Deserialize("topic1", nil, expectedBytes) @@ -320,7 +320,7 @@ func TestAvroSerdesValidWithRuleSet(t *testing.T) { // Initialize the mock serializer and use latest schemaId serializationProvider, _ := GetSerializationProvider(avroSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err := serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -358,7 +358,7 @@ func TestAvroSerdesValidWithRuleSet(t *testing.T) { // Initialize the mock deserializer deserializationProvider, _ := GetDeserializationProvider(avroSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) actualString, err := deserializationProvider.Deserialize("topic1", nil, data) @@ -380,7 +380,7 @@ func TestAvroSerdesValidWithCSPERuleSet(t *testing.T) { // Initialize the mock serializer and use latest schemaId serializationProvider, _ := GetSerializationProvider(avroSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err := serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -418,7 +418,7 @@ func TestAvroSerdesValidWithCSPERuleSet(t *testing.T) { // Initialize the mock deserializer deserializationProvider, _ := GetDeserializationProvider(avroSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) actualString, err := deserializationProvider.Deserialize("topic1", nil, data) @@ -439,7 +439,7 @@ func TestJsonSerdesValid(t *testing.T) { // Initialize the mock serializer and use latest schemaId serializationProvider, _ := GetSerializationProvider(jsonSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err := serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -461,7 +461,7 @@ func TestJsonSerdesValid(t *testing.T) { // Initialize the mock deserializer deserializationProvider, _ := GetDeserializationProvider(jsonSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) err = deserializationProvider.LoadSchema("topic1-value", schemaPath, serde.ValueSerde, nil) @@ -483,7 +483,7 @@ func TestJsonSerdesValidWithHeaders(t *testing.T) { // Initialize the mock serializer and use latest schemaId serializationProvider, _ := GetSerializationProvider(jsonSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err := serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) serializationProvider.SetSchemaIDSerializer(serde.HeaderSchemaIDSerializer) @@ -504,7 +504,7 @@ func TestJsonSerdesValidWithHeaders(t *testing.T) { // Initialize the mock deserializer deserializationProvider, _ := GetDeserializationProvider(jsonSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) actualString, err := deserializationProvider.Deserialize("topic1", headers, expectedBytes) @@ -536,7 +536,7 @@ func TestJsonSerdesReference(t *testing.T) { // Initialize the mock serializer and use latest schemaId serializationProvider, _ := GetSerializationProvider(jsonSchemaName) - err = serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err = serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -566,7 +566,7 @@ func TestJsonSerdesReference(t *testing.T) { // Initialize the mock deserializer deserializationProvider, _ := GetDeserializationProvider(jsonSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) err = deserializationProvider.LoadSchema("topic1-value", schemaPath, serde.ValueSerde, nil) @@ -585,7 +585,7 @@ func TestJsonSerdesInvalid(t *testing.T) { // Initialize the mock serializer and use latest schemaId serializationProvider, _ := GetSerializationProvider(jsonSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err := serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -601,7 +601,7 @@ func TestJsonSerdesInvalid(t *testing.T) { // Initialize the mock deserializer deserializationProvider, _ := GetDeserializationProvider(jsonSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) brokenString := `{"f1":` @@ -638,7 +638,7 @@ func TestJsonSerdesNestedValid(t *testing.T) { // Initialize the mock serializer and use latest schemaId serializationProvider, _ := GetSerializationProvider(jsonSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err := serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -660,7 +660,7 @@ func TestJsonSerdesNestedValid(t *testing.T) { // Initialize the mock deserializer deserializationProvider, _ := GetDeserializationProvider(jsonSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) actualString, err := deserializationProvider.Deserialize("topic1", nil, expectedBytes) @@ -681,7 +681,7 @@ func TestJsonSerdesValidWithRuleSet(t *testing.T) { // Initialize the mock serializer and use latest schemaId serializationProvider, _ := GetSerializationProvider(jsonSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err := serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -719,7 +719,7 @@ func TestJsonSerdesValidWithRuleSet(t *testing.T) { // Initialize the mock deserializer deserializationProvider, _ := GetDeserializationProvider(jsonSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) actualString, err := deserializationProvider.Deserialize("topic1", nil, data) @@ -741,7 +741,7 @@ func TestJsonSerdesValidWithCSPERuleSet(t *testing.T) { // Initialize the mock serializer and use latest schemaId serializationProvider, _ := GetSerializationProvider(jsonSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err := serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -779,7 +779,7 @@ func TestJsonSerdesValidWithCSPERuleSet(t *testing.T) { // Initialize the mock deserializer deserializationProvider, _ := GetDeserializationProvider(jsonSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) actualString, err := deserializationProvider.Deserialize("topic1", nil, data) @@ -808,7 +808,7 @@ func TestProtobufSerdesValid(t *testing.T) { expectedString := `{"name":"abc","page":1,"result":2.5}` serializationProvider, _ := GetSerializationProvider(protobufSchemaName) - err = serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err = serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -826,7 +826,7 @@ func TestProtobufSerdesValid(t *testing.T) { req.Nil(err) deserializationProvider, _ := GetDeserializationProvider(protobufSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) err = deserializationProvider.LoadSchema("topic1-value", tempDir, serde.ValueSerde, &kafka.Message{Value: data}) req.Nil(err) @@ -855,7 +855,7 @@ func TestProtobufSerdesValidWithHeaders(t *testing.T) { expectedString := `{"name":"abc","page":1,"result":2.5}` serializationProvider, _ := GetSerializationProvider(protobufSchemaName) - err = serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err = serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) serializationProvider.SetSchemaIDSerializer(serde.HeaderSchemaIDSerializer) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) @@ -874,7 +874,7 @@ func TestProtobufSerdesValidWithHeaders(t *testing.T) { req.Nil(err) deserializationProvider, _ := GetDeserializationProvider(protobufSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) err = deserializationProvider.LoadSchema("topic1-value", tempDir, serde.ValueSerde, &kafka.Message{ Value: data, @@ -925,7 +925,7 @@ message Person { expectedString := `{"name":"abc","address":{"city":"LA"},"result":2}` serializationProvider, _ := GetSerializationProvider(protobufSchemaName) - err = serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err = serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{"address.proto": referencePath}) req.Nil(err) @@ -957,7 +957,7 @@ message Person { req.Nil(err) deserializationProvider, _ := GetDeserializationProvider(protobufSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) err = deserializationProvider.LoadSchema("topic1-value", tempDir, serde.ValueSerde, &kafka.Message{Value: data}) req.Nil(err) @@ -993,7 +993,7 @@ func TestProtobufSerdesInvalid(t *testing.T) { req.NoError(os.WriteFile(schemaPath, []byte(schemaString), 0644)) serializationProvider, _ := GetSerializationProvider(protobufSchemaName) - err = serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err = serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -1012,7 +1012,7 @@ func TestProtobufSerdesInvalid(t *testing.T) { req.Nil(err) deserializationProvider, _ := GetDeserializationProvider(protobufSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) err = deserializationProvider.LoadSchema("topic1-value", tempDir, serde.ValueSerde, &kafka.Message{Value: data}) req.Nil(err) @@ -1064,7 +1064,7 @@ func TestProtobufSerdesNestedValid(t *testing.T) { expectedString := `{"name":"abc","id":2,"add":{"zip":"123","street":"def"},"phones":{"number":"234"}}` serializationProvider, _ := GetSerializationProvider(protobufSchemaName) - err = serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err = serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -1082,7 +1082,7 @@ func TestProtobufSerdesNestedValid(t *testing.T) { req.Nil(err) deserializationProvider, _ := GetDeserializationProvider(protobufSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) err = deserializationProvider.LoadSchema("topic1-value", tempDir, serde.ValueSerde, &kafka.Message{Value: data}) req.Nil(err) @@ -1119,7 +1119,7 @@ func TestProtobufSerdesValidWithRuleSet(t *testing.T) { expectedString := `{"name":"abc","page":1,"result":2.5}` serializationProvider, _ := GetSerializationProvider(protobufSchemaName) - err = serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err = serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -1156,7 +1156,7 @@ func TestProtobufSerdesValidWithRuleSet(t *testing.T) { req.Nil(err) deserializationProvider, _ := GetDeserializationProvider(protobufSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) err = deserializationProvider.LoadSchema("topic1-value", tempDir, serde.ValueSerde, &kafka.Message{Value: data}) req.Nil(err) @@ -1190,7 +1190,7 @@ func TestProtobufSerdesValidWithCSPERuleSet(t *testing.T) { expectedString := `{"name":"abc","page":1,"result":2.5}` serializationProvider, _ := GetSerializationProvider(protobufSchemaName) - err = serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err = serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -1227,7 +1227,7 @@ func TestProtobufSerdesValidWithCSPERuleSet(t *testing.T) { req.Nil(err) deserializationProvider, _ := GetDeserializationProvider(protobufSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) err = deserializationProvider.LoadSchema("topic1-value", tempDir, serde.ValueSerde, &kafka.Message{Value: data}) req.Nil(err) @@ -1259,3 +1259,64 @@ func readSchemaReferences(references string) ([]schemaregistry.Reference, error) return refs, nil } + +// TestAvroSerdesUsesAssociatedSubject verifies the cloud path end-to-end: when +// InitSerializer receives a non-empty kafkaClusterId, ckgo's strategy resolves +// the subject via the associations API, and Serialize fetches the schema under +// the associated subject (not under -). +func TestAvroSerdesUsesAssociatedSubject(t *testing.T) { + req := require.New(t) + + schemaString := `{"type":"record","name":"myRecord","fields":[{"name":"f1","type":"int"}]}` + schemaPath := filepath.Join(tempDir, "avro-schema-associated.txt") + req.NoError(os.WriteFile(schemaPath, []byte(schemaString), 0644)) + + const ( + kafkaClusterId = "lkc-assoc-test" + topic = "associated-topic" + associated = "custom-associated-value" + ) + + serializationProvider, _ := GetSerializationProvider(avroSchemaName) + err := serializationProvider.InitSerializer(mockClientUrl, "", kafkaClusterId, "value", -1, SchemaRegistryAuth{}) + req.Nil(err) + err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) + req.Nil(err) + + client := serializationProvider.GetSchemaRegistryClient() + _, err = client.Register(associated, schemaregistry.SchemaInfo{Schema: schemaString, SchemaType: "AVRO"}, false) + req.Nil(err) + _, err = client.CreateOrUpdateAssociation(schemaregistry.AssociationCreateOrUpdateRequest{ + ResourceName: topic, + ResourceNamespace: kafkaClusterId, + ResourceID: topic + ":" + kafkaClusterId, + ResourceType: "topic", + Associations: []schemaregistry.AssociationCreateOrUpdateInfo{{ + Subject: associated, + AssociationType: "value", + }}, + }) + req.Nil(err) + + _, _, err = serializationProvider.Serialize(topic, `{"f1":123}`) + req.Nil(err, "serialize should resolve to the associated subject") + + // Confirm no schema lives under TopicNameStrategy: if Serialize had used the literal -, + // the test wouldn't be exercising the associated path. + _, err = client.GetLatestSchemaMetadata(topic + "-value") + req.Error(err) +} + +func TestSubjectStrategy(t *testing.T) { + t.Run("non-empty cluster id selects AssociatedNameStrategy", func(t *testing.T) { + typ, cfg := subjectStrategy("lkc-test") + require.Equal(t, serde.AssociatedNameStrategyType, typ) + require.Equal(t, map[string]string{serde.KafkaClusterIDConfig: "lkc-test"}, cfg) + }) + + t.Run("empty cluster id selects TopicNameStrategy", func(t *testing.T) { + typ, cfg := subjectStrategy("") + require.Equal(t, serde.TopicNameStrategyType, typ) + require.Nil(t, cfg) + }) +} diff --git a/pkg/serdes/string_deserialization_provider.go b/pkg/serdes/string_deserialization_provider.go index 0d1891a424..b040183a1e 100644 --- a/pkg/serdes/string_deserialization_provider.go +++ b/pkg/serdes/string_deserialization_provider.go @@ -8,7 +8,7 @@ import ( type StringDeserializationProvider struct{} -func (s *StringDeserializationProvider) InitDeserializer(_, _, _ string, _ SchemaRegistryAuth, _ schemaregistry.Client) error { +func (s *StringDeserializationProvider) InitDeserializer(_, _, _, _ string, _ SchemaRegistryAuth, _ schemaregistry.Client) error { return nil } diff --git a/pkg/serdes/string_serialization_provider.go b/pkg/serdes/string_serialization_provider.go index a88000ed6a..8352937be4 100644 --- a/pkg/serdes/string_serialization_provider.go +++ b/pkg/serdes/string_serialization_provider.go @@ -8,7 +8,7 @@ import ( type StringSerializationProvider struct{} -func (s *StringSerializationProvider) InitSerializer(_, _, _ string, _ int, _ SchemaRegistryAuth) error { +func (s *StringSerializationProvider) InitSerializer(_, _, _, _ string, _ int, _ SchemaRegistryAuth) error { return nil }