From 6571fb1addb71f63384c9aea319a299d18358e71 Mon Sep 17 00:00:00 2001 From: Paras Negi Date: Fri, 3 Apr 2026 14:18:03 +0530 Subject: [PATCH] Add Catalog Update Command --- internal/flink/command_catalog.go | 65 +++++++++++++++++++ internal/flink/command_catalog_update.go | 54 +++++++++++++++ pkg/flink/cmf_rest_client.go | 8 +++ .../flink/catalog/update-invalid-failure.json | 17 +++++ .../flink/catalog/update-invalid-failure.yaml | 8 +++ .../flink/catalog/update-successful.json | 17 +++++ .../flink/catalog/update-successful.yaml | 8 +++ .../output/flink/catalog/help-onprem.golden | 1 + .../flink/catalog/update-help-onprem.golden | 16 +++++ .../catalog/update-invalid-failure.golden | 1 + .../flink/catalog/update-success-json.golden | 23 +++++++ .../flink/catalog/update-success-yaml.golden | 13 ++++ .../flink/catalog/update-success.golden | 5 ++ test/flink_onprem_test.go | 32 +++++++++ test/test-server/flink_onprem_handler.go | 14 +++- 15 files changed, 281 insertions(+), 1 deletion(-) create mode 100644 internal/flink/command_catalog_update.go create mode 100644 test/fixtures/input/flink/catalog/update-invalid-failure.json create mode 100644 test/fixtures/input/flink/catalog/update-invalid-failure.yaml create mode 100644 test/fixtures/input/flink/catalog/update-successful.json create mode 100644 test/fixtures/input/flink/catalog/update-successful.yaml create mode 100644 test/fixtures/output/flink/catalog/update-help-onprem.golden create mode 100644 test/fixtures/output/flink/catalog/update-invalid-failure.golden create mode 100644 test/fixtures/output/flink/catalog/update-success-json.golden create mode 100644 test/fixtures/output/flink/catalog/update-success-yaml.golden create mode 100644 test/fixtures/output/flink/catalog/update-success.golden diff --git a/internal/flink/command_catalog.go b/internal/flink/command_catalog.go index 77b540e2e5..2af4794436 100644 --- a/internal/flink/command_catalog.go +++ b/internal/flink/command_catalog.go @@ -1,11 +1,19 @@ package flink import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "github.com/spf13/cobra" + "gopkg.in/yaml.v3" cmfsdk "github.com/confluentinc/cmf-sdk-go/v1" pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/errors" + "github.com/confluentinc/cli/v4/pkg/output" ) type catalogOut struct { @@ -25,10 +33,67 @@ func (c *command) newCatalogCommand() *cobra.Command { cmd.AddCommand(c.newCatalogDeleteCommand()) cmd.AddCommand(c.newCatalogDescribeCommand()) cmd.AddCommand(c.newCatalogListCommand()) + cmd.AddCommand(c.newCatalogUpdateCommand()) return cmd } +func printCatalogOutput(cmd *cobra.Command, sdkOutputCatalog cmfsdk.KafkaCatalog) error { + if output.GetFormat(cmd) == output.Human { + table := output.NewTable(cmd) + databases := make([]string, 0, len(sdkOutputCatalog.Spec.GetKafkaClusters())) + for _, kafkaCluster := range sdkOutputCatalog.Spec.GetKafkaClusters() { + databases = append(databases, kafkaCluster.DatabaseName) + } + var creationTime string + if sdkOutputCatalog.GetMetadata().CreationTimestamp != nil { + creationTime = *sdkOutputCatalog.GetMetadata().CreationTimestamp + } + table.Add(&catalogOut{ + CreationTime: creationTime, + Name: sdkOutputCatalog.GetMetadata().Name, + Databases: databases, + }) + return table.Print() + } + + localCatalog := convertSdkCatalogToLocalCatalog(sdkOutputCatalog) + return output.SerializedOutput(cmd, localCatalog) +} + +func readCatalogResourceFile(resourceFilePath string) (cmfsdk.KafkaCatalog, error) { + data, err := os.ReadFile(resourceFilePath) + if err != nil { + return cmfsdk.KafkaCatalog{}, fmt.Errorf("failed to read file: %w", err) + } + + var genericData map[string]interface{} + ext := filepath.Ext(resourceFilePath) + switch ext { + case ".json": + err = json.Unmarshal(data, &genericData) + case ".yaml", ".yml": + err = yaml.Unmarshal(data, &genericData) + default: + return cmfsdk.KafkaCatalog{}, errors.NewErrorWithSuggestions(fmt.Sprintf("unsupported file format: %s", ext), "Supported file formats are .json, .yaml, and .yml.") + } + if err != nil { + return cmfsdk.KafkaCatalog{}, fmt.Errorf("failed to parse input file: %w", err) + } + + jsonBytes, err := json.Marshal(genericData) + if err != nil { + return cmfsdk.KafkaCatalog{}, fmt.Errorf("failed to marshal intermediate data: %w", err) + } + + var sdkCatalog cmfsdk.KafkaCatalog + if err = json.Unmarshal(jsonBytes, &sdkCatalog); err != nil { + return cmfsdk.KafkaCatalog{}, fmt.Errorf("failed to bind data to KafkaCatalog model: %w", err) + } + + return sdkCatalog, nil +} + func convertSdkCatalogToLocalCatalog(sdkOutputCatalog cmfsdk.KafkaCatalog) LocalKafkaCatalog { localClusters := make([]LocalKafkaCatalogSpecKafkaClusters, 0, len(sdkOutputCatalog.Spec.GetKafkaClusters())) for _, sdkCluster := range sdkOutputCatalog.Spec.GetKafkaClusters() { diff --git a/internal/flink/command_catalog_update.go b/internal/flink/command_catalog_update.go new file mode 100644 index 0000000000..c0e85c51f4 --- /dev/null +++ b/internal/flink/command_catalog_update.go @@ -0,0 +1,54 @@ +package flink + +import ( + "fmt" + + "github.com/spf13/cobra" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" +) + +func (c *command) newCatalogUpdateCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "update ", + Short: "Update a Flink catalog.", + Long: "Update an existing Kafka Catalog in Confluent Platform from a resource file.", + Args: cobra.ExactArgs(1), + RunE: c.catalogUpdate, + } + + addCmfFlagSet(cmd) + pcmd.AddOutputFlag(cmd) + + return cmd +} + +func (c *command) catalogUpdate(cmd *cobra.Command, args []string) error { + resourceFilePath := args[0] + + client, err := c.GetCmfClient(cmd) + if err != nil { + return err + } + + sdkCatalog, err := readCatalogResourceFile(resourceFilePath) + if err != nil { + return err + } + + catalogName := sdkCatalog.Metadata.Name + if catalogName == "" { + return fmt.Errorf("catalog name is required: ensure the resource file contains a non-empty \"metadata.name\" field") + } + + if err := client.UpdateCatalog(c.createContext(), catalogName, sdkCatalog); err != nil { + return err + } + + sdkOutputCatalog, err := client.DescribeCatalog(c.createContext(), catalogName) + if err != nil { + return fmt.Errorf("catalog %q was updated successfully, but failed to retrieve updated details: %w", catalogName, err) + } + + return printCatalogOutput(cmd, sdkOutputCatalog) +} diff --git a/pkg/flink/cmf_rest_client.go b/pkg/flink/cmf_rest_client.go index 97d68971eb..158d587a9a 100644 --- a/pkg/flink/cmf_rest_client.go +++ b/pkg/flink/cmf_rest_client.go @@ -565,6 +565,14 @@ func (cmfClient *CmfRestClient) ListCatalog(ctx context.Context) ([]cmfsdk.Kafka return catalogs, nil } +func (cmfClient *CmfRestClient) UpdateCatalog(ctx context.Context, catalogName string, kafkaCatalog cmfsdk.KafkaCatalog) error { + httpResponse, err := cmfClient.SQLApi.UpdateKafkaCatalog(ctx, catalogName).KafkaCatalog(kafkaCatalog).Execute() + if parsedErr := parseSdkError(httpResponse, err); parsedErr != nil { + return fmt.Errorf(`failed to update Kafka Catalog "%s": %s`, catalogName, parsedErr) + } + return nil +} + func (cmfClient *CmfRestClient) DeleteCatalog(ctx context.Context, catalogName string) error { httpResp, err := cmfClient.SQLApi.DeleteKafkaCatalog(ctx, catalogName).Execute() return parseSdkError(httpResp, err) diff --git a/test/fixtures/input/flink/catalog/update-invalid-failure.json b/test/fixtures/input/flink/catalog/update-invalid-failure.json new file mode 100644 index 0000000000..761b3c784d --- /dev/null +++ b/test/fixtures/input/flink/catalog/update-invalid-failure.json @@ -0,0 +1,17 @@ +{ + "apiVersion": "cmf/api/v1/catalog", + "kind": "KafkaCatalog", + "metadata": { + "name": "invalid-catalog" + }, + "spec": { + "kafkaClusters": [ + { + "databaseName": "dev" + }, + { + "databaseName": "prod" + } + ] + } +} diff --git a/test/fixtures/input/flink/catalog/update-invalid-failure.yaml b/test/fixtures/input/flink/catalog/update-invalid-failure.yaml new file mode 100644 index 0000000000..0f763ec3aa --- /dev/null +++ b/test/fixtures/input/flink/catalog/update-invalid-failure.yaml @@ -0,0 +1,8 @@ +apiVersion: cmf/api/v1/catalog +kind: KafkaCatalog +metadata: + name: invalid-catalog +spec: + kafkaClusters: + - databaseName: dev + - databaseName: prod diff --git a/test/fixtures/input/flink/catalog/update-successful.json b/test/fixtures/input/flink/catalog/update-successful.json new file mode 100644 index 0000000000..651a5de54c --- /dev/null +++ b/test/fixtures/input/flink/catalog/update-successful.json @@ -0,0 +1,17 @@ +{ + "apiVersion": "cmf/api/v1/catalog", + "kind": "KafkaCatalog", + "metadata": { + "name": "test-catalog" + }, + "spec": { + "kafkaClusters": [ + { + "databaseName": "dev" + }, + { + "databaseName": "prod" + } + ] + } +} diff --git a/test/fixtures/input/flink/catalog/update-successful.yaml b/test/fixtures/input/flink/catalog/update-successful.yaml new file mode 100644 index 0000000000..d0de02e1ab --- /dev/null +++ b/test/fixtures/input/flink/catalog/update-successful.yaml @@ -0,0 +1,8 @@ +apiVersion: cmf/api/v1/catalog +kind: KafkaCatalog +metadata: + name: test-catalog +spec: + kafkaClusters: + - databaseName: dev + - databaseName: prod diff --git a/test/fixtures/output/flink/catalog/help-onprem.golden b/test/fixtures/output/flink/catalog/help-onprem.golden index cbc8042d2d..0e936a93e9 100644 --- a/test/fixtures/output/flink/catalog/help-onprem.golden +++ b/test/fixtures/output/flink/catalog/help-onprem.golden @@ -8,6 +8,7 @@ Available Commands: delete Delete one or more Flink catalogs in Confluent Platform. describe Describe a Flink catalog in Confluent Platform. list List Flink catalogs in Confluent Platform. + update Update a Flink catalog. Global Flags: -h, --help Show help for this command. diff --git a/test/fixtures/output/flink/catalog/update-help-onprem.golden b/test/fixtures/output/flink/catalog/update-help-onprem.golden new file mode 100644 index 0000000000..325c3ef05c --- /dev/null +++ b/test/fixtures/output/flink/catalog/update-help-onprem.golden @@ -0,0 +1,16 @@ +Update an existing Kafka Catalog in Confluent Platform from a resource file. + +Usage: + confluent flink catalog update [flags] + +Flags: + --url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag. + --client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag. + --client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag. + --certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent Manager for Apache Flink connection. Environment variable "CONFLUENT_CMF_CERTIFICATE_AUTHORITY_PATH" may be set in place of this flag. + -o, --output string Specify the output format as "human", "json", or "yaml". (default "human") + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). diff --git a/test/fixtures/output/flink/catalog/update-invalid-failure.golden b/test/fixtures/output/flink/catalog/update-invalid-failure.golden new file mode 100644 index 0000000000..235404bf44 --- /dev/null +++ b/test/fixtures/output/flink/catalog/update-invalid-failure.golden @@ -0,0 +1 @@ +Error: failed to update Kafka Catalog "invalid-catalog": The catalog name is invalid diff --git a/test/fixtures/output/flink/catalog/update-success-json.golden b/test/fixtures/output/flink/catalog/update-success-json.golden new file mode 100644 index 0000000000..a024bdbe73 --- /dev/null +++ b/test/fixtures/output/flink/catalog/update-success-json.golden @@ -0,0 +1,23 @@ +{ + "apiVersion": "", + "kind": "", + "metadata": { + "name": "test-catalog", + "creationTimestamp": "2025-08-05 12:00:00 +0000 UTC" + }, + "spec": { + "srInstance": { + "connectionConfig": null + }, + "kafkaClusters": [ + { + "databaseName": "test-database", + "connectionConfig": null + }, + { + "databaseName": "test-database-2", + "connectionConfig": null + } + ] + } +} diff --git a/test/fixtures/output/flink/catalog/update-success-yaml.golden b/test/fixtures/output/flink/catalog/update-success-yaml.golden new file mode 100644 index 0000000000..f3071e18d2 --- /dev/null +++ b/test/fixtures/output/flink/catalog/update-success-yaml.golden @@ -0,0 +1,13 @@ +apiVersion: "" +kind: "" +metadata: + name: test-catalog + creationTimestamp: 2025-08-05 12:00:00 +0000 UTC +spec: + srInstance: + connectionConfig: {} + kafkaClusters: + - databaseName: test-database + connectionConfig: {} + - databaseName: test-database-2 + connectionConfig: {} diff --git a/test/fixtures/output/flink/catalog/update-success.golden b/test/fixtures/output/flink/catalog/update-success.golden new file mode 100644 index 0000000000..7bd034f7ed --- /dev/null +++ b/test/fixtures/output/flink/catalog/update-success.golden @@ -0,0 +1,5 @@ ++---------------+--------------------------------+ +| Creation Time | 2025-08-05 12:00:00 +0000 UTC | +| Name | test-catalog | +| Databases | test-database, test-database-2 | ++---------------+--------------------------------+ diff --git a/test/flink_onprem_test.go b/test/flink_onprem_test.go index 5536f0a14c..ca33505b61 100644 --- a/test/flink_onprem_test.go +++ b/test/flink_onprem_test.go @@ -410,6 +410,19 @@ func (s *CLITestSuite) TestFlinkCatalogListOnPrem() { runIntegrationTestsWithMultipleAuth(s, tests) } +func (s *CLITestSuite) TestFlinkCatalogUpdateOnPrem() { + tests := []CLITest{ + // success + {args: "flink catalog update test/fixtures/input/flink/catalog/update-successful.json", fixture: "flink/catalog/update-success.golden"}, + {args: "flink catalog update test/fixtures/input/flink/catalog/update-successful.json --output json", fixture: "flink/catalog/update-success-json.golden"}, + {args: "flink catalog update test/fixtures/input/flink/catalog/update-successful.json --output yaml", fixture: "flink/catalog/update-success-yaml.golden"}, + // failure + {args: "flink catalog update test/fixtures/input/flink/catalog/update-invalid-failure.json", fixture: "flink/catalog/update-invalid-failure.golden", exitCode: 1}, + } + + runIntegrationTestsWithMultipleAuth(s, tests) +} + func (s *CLITestSuite) TestFlinkStatementCreateOnPrem() { tests := []CLITest{ // success @@ -607,6 +620,25 @@ func (s *CLITestSuite) TestFlinkCatalogCreateWithYAML() { runIntegrationTestsWithMultipleAuth(s, tests) } +func (s *CLITestSuite) TestFlinkCatalogUpdateWithYAML() { + tests := []CLITest{ + // success scenarios with JSON files + {args: "flink catalog update test/fixtures/input/flink/catalog/update-successful.json", fixture: "flink/catalog/update-success.golden"}, + {args: "flink catalog update test/fixtures/input/flink/catalog/update-successful.json --output json", fixture: "flink/catalog/update-success-json.golden"}, + {args: "flink catalog update test/fixtures/input/flink/catalog/update-successful.json --output yaml", fixture: "flink/catalog/update-success-yaml.golden"}, + // failure scenarios with JSON files + {args: "flink catalog update test/fixtures/input/flink/catalog/update-invalid-failure.json", fixture: "flink/catalog/update-invalid-failure.golden", exitCode: 1}, + // YAML file tests + {args: "flink catalog update test/fixtures/input/flink/catalog/update-successful.yaml", fixture: "flink/catalog/update-success.golden"}, + {args: "flink catalog update test/fixtures/input/flink/catalog/update-successful.yaml --output json", fixture: "flink/catalog/update-success-json.golden"}, + {args: "flink catalog update test/fixtures/input/flink/catalog/update-successful.yaml --output yaml", fixture: "flink/catalog/update-success-yaml.golden"}, + // YAML file failure scenarios + {args: "flink catalog update test/fixtures/input/flink/catalog/update-invalid-failure.yaml", fixture: "flink/catalog/update-invalid-failure.golden", exitCode: 1}, + } + + runIntegrationTestsWithMultipleAuth(s, tests) +} + func (s *CLITestSuite) TestFlinkShellOnPrem() { tests := []flinkShellTest{ { diff --git a/test/test-server/flink_onprem_handler.go b/test/test-server/flink_onprem_handler.go index 3089876db8..e10237e1f3 100644 --- a/test/test-server/flink_onprem_handler.go +++ b/test/test-server/flink_onprem_handler.go @@ -918,7 +918,7 @@ func handleCmfCatalogs(t *testing.T) http.HandlerFunc { } // Handler for "cmf/api/v1/catalogs/kafka/{catName}" -// Used by describe, delete catalog, no update catalog. +// Used by describe, update, delete catalog. func handleCmfCatalog(t *testing.T) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { handleLoginType(t, r) @@ -937,6 +937,18 @@ func handleCmfCatalog(t *testing.T) http.HandlerFunc { err := json.NewEncoder(w).Encode(catalog) require.NoError(t, err) return + case http.MethodPut: + if catalogName == "invalid-catalog" { + http.Error(w, "The catalog name is invalid", http.StatusNotFound) + return + } + + req := new(cmfsdk.KafkaCatalog) + err := json.NewDecoder(r.Body).Decode(req) + require.NoError(t, err) + + w.WriteHeader(http.StatusOK) + return case http.MethodDelete: if catalogName == "non-exist-catalog" { http.Error(w, "", http.StatusNotFound)