From 0e30b7866b96a378bb0fdf20d3c2993088bbad9a Mon Sep 17 00:00:00 2001 From: Paras Negi Date: Sat, 21 Mar 2026 12:52:16 +0530 Subject: [PATCH 01/14] Add flink catalog database create command --- internal/flink/command_catalog.go | 1 + internal/flink/command_catalog_database.go | 49 +++++++++ .../flink/command_catalog_database_create.go | 102 ++++++++++++++++++ internal/flink/local_types.go | 26 +++++ pkg/flink/cmf_rest_client.go | 9 ++ 5 files changed, 187 insertions(+) create mode 100644 internal/flink/command_catalog_database.go create mode 100644 internal/flink/command_catalog_database_create.go diff --git a/internal/flink/command_catalog.go b/internal/flink/command_catalog.go index 77b540e2e5..8bfe75434f 100644 --- a/internal/flink/command_catalog.go +++ b/internal/flink/command_catalog.go @@ -25,6 +25,7 @@ func (c *command) newCatalogCommand() *cobra.Command { cmd.AddCommand(c.newCatalogDeleteCommand()) cmd.AddCommand(c.newCatalogDescribeCommand()) cmd.AddCommand(c.newCatalogListCommand()) + cmd.AddCommand(c.newCatalogDatabaseCommand()) return cmd } diff --git a/internal/flink/command_catalog_database.go b/internal/flink/command_catalog_database.go new file mode 100644 index 0000000000..8bd27e0c49 --- /dev/null +++ b/internal/flink/command_catalog_database.go @@ -0,0 +1,49 @@ +package flink + +import ( + "github.com/spf13/cobra" + + cmfsdk "github.com/confluentinc/cmf-sdk-go/v1" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" +) + +type databaseOut struct { + CreationTime string `human:"Creation Time" serialized:"creation_time"` + Name string `human:"Name" serialized:"name"` + Catalog string `human:"Catalog" serialized:"catalog"` +} + +func (c *command) newCatalogDatabaseCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "database", + Short: "Manage Flink databases in Confluent Platform.", + Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireCloudLogout}, + } + + cmd.AddCommand(c.newCatalogDatabaseCreateCommand()) + + return cmd +} + +func convertSdkDatabaseToLocalDatabase(sdkDatabase cmfsdk.KafkaDatabase) LocalKafkaDatabase { + return LocalKafkaDatabase{ + ApiVersion: sdkDatabase.ApiVersion, + Kind: sdkDatabase.Kind, + Metadata: LocalDatabaseMetadata{ + Name: sdkDatabase.Metadata.Name, + CreationTimestamp: sdkDatabase.Metadata.CreationTimestamp, + UpdateTimestamp: sdkDatabase.Metadata.UpdateTimestamp, + Uid: sdkDatabase.Metadata.Uid, + Labels: sdkDatabase.Metadata.Labels, + Annotations: sdkDatabase.Metadata.Annotations, + }, + Spec: LocalKafkaDatabaseSpec{ + KafkaCluster: LocalKafkaDatabaseSpecKafkaCluster{ + ConnectionConfig: sdkDatabase.Spec.KafkaCluster.ConnectionConfig, + ConnectionSecretId: sdkDatabase.Spec.KafkaCluster.ConnectionSecretId, + }, + AlterEnvironments: sdkDatabase.Spec.AlterEnvironments, + }, + } +} diff --git a/internal/flink/command_catalog_database_create.go b/internal/flink/command_catalog_database_create.go new file mode 100644 index 0000000000..4bd6dd3972 --- /dev/null +++ b/internal/flink/command_catalog_database_create.go @@ -0,0 +1,102 @@ +package flink + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + + "github.com/spf13/cobra" + "gopkg.in/yaml.v3" + + cmfsdk "github.com/confluentinc/cmf-sdk-go/v1" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/errors" + "github.com/confluentinc/cli/v4/pkg/output" +) + +func (c *command) newCatalogDatabaseCreateCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "create ", + Short: "Create a Flink database.", + Long: "Create a Flink database in a catalog in Confluent Platform.", + Args: cobra.ExactArgs(1), + RunE: c.catalogDatabaseCreate, + } + + cmd.Flags().String("catalog", "", "Name of the catalog.") + cobra.CheckErr(cmd.MarkFlagRequired("catalog")) + addCmfFlagSet(cmd) + pcmd.AddOutputFlag(cmd) + + return cmd +} + +func (c *command) catalogDatabaseCreate(cmd *cobra.Command, args []string) error { + resourceFilePath := args[0] + + catalogName, err := cmd.Flags().GetString("catalog") + if err != nil { + return err + } + + client, err := c.GetCmfClient(cmd) + if err != nil { + return err + } + + data, err := os.ReadFile(resourceFilePath) + if err != nil { + return fmt.Errorf("failed to read file: %v", err) + } + + var genericData map[string]interface{} + ext := filepath.Ext(resourceFilePath) + switch ext { + case ".json": + err = json.Unmarshal(data, &genericData) + case ".yaml", ".yml": + err = yaml.Unmarshal(data, &genericData) + default: + return errors.NewErrorWithSuggestions( + fmt.Sprintf("unsupported file format: %s", ext), + "Supported file formats are .json, .yaml, and .yml.", + ) + } + if err != nil { + return err + } + + jsonBytes, err := json.Marshal(genericData) + if err != nil { + return fmt.Errorf("failed to marshal intermediate data: %w", err) + } + + var sdkDatabase cmfsdk.KafkaDatabase + if err = json.Unmarshal(jsonBytes, &sdkDatabase); err != nil { + return fmt.Errorf("failed to bind data to KafkaDatabase model: %w", err) + } + + sdkOutputDatabase, err := client.CreateDatabase(c.createContext(), catalogName, sdkDatabase) + if err != nil { + return err + } + + if output.GetFormat(cmd) == output.Human { + table := output.NewTable(cmd) + var creationTime string + if sdkOutputDatabase.GetMetadata().CreationTimestamp != nil { + creationTime = *sdkOutputDatabase.GetMetadata().CreationTimestamp + } + table.Add(&databaseOut{ + CreationTime: creationTime, + Name: sdkOutputDatabase.GetMetadata().Name, + Catalog: catalogName, + }) + return table.Print() + } + + localDatabase := convertSdkDatabaseToLocalDatabase(sdkOutputDatabase) + return output.SerializedOutput(cmd, localDatabase) +} diff --git a/internal/flink/local_types.go b/internal/flink/local_types.go index 19e0b756d9..6342966129 100644 --- a/internal/flink/local_types.go +++ b/internal/flink/local_types.go @@ -142,6 +142,32 @@ type LocalKafkaCatalogSpecSrInstance struct { ConnectionSecretId *string `json:"connectionSecretId,omitempty" yaml:"connectionSecretId,omitempty"` } +type LocalKafkaDatabase struct { + ApiVersion string `json:"apiVersion" yaml:"apiVersion"` + Kind string `json:"kind" yaml:"kind"` + Metadata LocalDatabaseMetadata `json:"metadata" yaml:"metadata"` + Spec LocalKafkaDatabaseSpec `json:"spec" yaml:"spec"` +} + +type LocalDatabaseMetadata struct { + Name string `json:"name" yaml:"name"` + CreationTimestamp *string `json:"creationTimestamp,omitempty" yaml:"creationTimestamp,omitempty"` + UpdateTimestamp *string `json:"updateTimestamp,omitempty" yaml:"updateTimestamp,omitempty"` + Uid *string `json:"uid,omitempty" yaml:"uid,omitempty"` + Labels *map[string]string `json:"labels,omitempty" yaml:"labels,omitempty"` + Annotations *map[string]string `json:"annotations,omitempty" yaml:"annotations,omitempty"` +} + +type LocalKafkaDatabaseSpec struct { + KafkaCluster LocalKafkaDatabaseSpecKafkaCluster `json:"kafkaCluster" yaml:"kafkaCluster"` + AlterEnvironments *[]string `json:"alterEnvironments,omitempty" yaml:"alterEnvironments,omitempty"` +} + +type LocalKafkaDatabaseSpecKafkaCluster struct { + ConnectionConfig map[string]string `json:"connectionConfig" yaml:"connectionConfig"` + ConnectionSecretId *string `json:"connectionSecretId,omitempty" yaml:"connectionSecretId,omitempty"` +} + type LocalResultSchema struct { Columns []LocalResultSchemaColumn `json:"columns" yaml:"columns"` } diff --git a/pkg/flink/cmf_rest_client.go b/pkg/flink/cmf_rest_client.go index 97d68971eb..0ca0282a47 100644 --- a/pkg/flink/cmf_rest_client.go +++ b/pkg/flink/cmf_rest_client.go @@ -570,6 +570,15 @@ func (cmfClient *CmfRestClient) DeleteCatalog(ctx context.Context, catalogName s return parseSdkError(httpResp, err) } +func (cmfClient *CmfRestClient) CreateDatabase(ctx context.Context, catalogName string, kafkaDatabase cmfsdk.KafkaDatabase) (cmfsdk.KafkaDatabase, error) { + databaseName := kafkaDatabase.Metadata.Name + outputDatabase, httpResponse, err := cmfClient.SQLApi.CreateKafkaDatabase(ctx, catalogName).KafkaDatabase(kafkaDatabase).Execute() + if parsedErr := parseSdkError(httpResponse, err); parsedErr != nil { + return cmfsdk.KafkaDatabase{}, fmt.Errorf(`failed to create database "%s" in catalog "%s": %s`, databaseName, catalogName, parsedErr) + } + return outputDatabase, nil +} + // Returns the next page number and whether we need to fetch more pages or not. func extractPageOptions(receivedItemsLength int, currentPageNumber int32) (int32, bool) { if receivedItemsLength == 0 { From 96a12c9676d499ca3576bbed8eb4b5362a0f392d Mon Sep 17 00:00:00 2001 From: Paras Negi Date: Mon, 23 Mar 2026 17:37:28 +0530 Subject: [PATCH 02/14] Add golden test files for flink catalog database command --- .../catalog/database/create-help-onprem.golden | 17 +++++++++++++++++ .../flink/catalog/database/help-onprem.golden | 14 ++++++++++++++ .../output/flink/catalog/help-onprem.golden | 1 + 3 files changed, 32 insertions(+) create mode 100644 test/fixtures/output/flink/catalog/database/create-help-onprem.golden create mode 100644 test/fixtures/output/flink/catalog/database/help-onprem.golden diff --git a/test/fixtures/output/flink/catalog/database/create-help-onprem.golden b/test/fixtures/output/flink/catalog/database/create-help-onprem.golden new file mode 100644 index 0000000000..4b0f03902e --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/create-help-onprem.golden @@ -0,0 +1,17 @@ +Create a Flink database in a catalog in Confluent Platform. + +Usage: + confluent flink catalog database create [flags] + +Flags: + --catalog string REQUIRED: Name of the catalog. + --url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag. + --client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag. + --client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag. + --certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent Manager for Apache Flink connection. Environment variable "CONFLUENT_CMF_CERTIFICATE_AUTHORITY_PATH" may be set in place of this flag. + -o, --output string Specify the output format as "human", "json", or "yaml". (default "human") + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). diff --git a/test/fixtures/output/flink/catalog/database/help-onprem.golden b/test/fixtures/output/flink/catalog/database/help-onprem.golden new file mode 100644 index 0000000000..2c152416ae --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/help-onprem.golden @@ -0,0 +1,14 @@ +Manage Flink databases in Confluent Platform. + +Usage: + confluent flink catalog database [command] + +Available Commands: + create Create a Flink database. + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). + +Use "confluent flink catalog database [command] --help" for more information about a command. diff --git a/test/fixtures/output/flink/catalog/help-onprem.golden b/test/fixtures/output/flink/catalog/help-onprem.golden index cbc8042d2d..1ce122027c 100644 --- a/test/fixtures/output/flink/catalog/help-onprem.golden +++ b/test/fixtures/output/flink/catalog/help-onprem.golden @@ -5,6 +5,7 @@ Usage: Available Commands: create Create a Flink catalog. + database Manage Flink databases in Confluent Platform. delete Delete one or more Flink catalogs in Confluent Platform. describe Describe a Flink catalog in Confluent Platform. list List Flink catalogs in Confluent Platform. From 33e3b892c84bf99eb9e60680b0b002818cfea0e3 Mon Sep 17 00:00:00 2001 From: Paras Negi Date: Tue, 24 Mar 2026 15:34:33 +0530 Subject: [PATCH 03/14] Add integration tests and fix golden files for flink catalog database command --- .../database/create-invalid-failure.json | 12 +++++ .../catalog/database/create-successful.json | 14 ++++++ .../database/create-invalid-failure.golden | 1 + .../database/create-success-json.golden | 15 ++++++ .../catalog/database/create-success.golden | 5 ++ test/flink_onprem_test.go | 12 +++++ test/test-server/flink_onprem_handler.go | 48 +++++++++++++++++++ test/test-server/flink_onprem_router.go | 1 + 8 files changed, 108 insertions(+) create mode 100644 test/fixtures/input/flink/catalog/database/create-invalid-failure.json create mode 100644 test/fixtures/input/flink/catalog/database/create-successful.json create mode 100644 test/fixtures/output/flink/catalog/database/create-invalid-failure.golden create mode 100644 test/fixtures/output/flink/catalog/database/create-success-json.golden create mode 100644 test/fixtures/output/flink/catalog/database/create-success.golden diff --git a/test/fixtures/input/flink/catalog/database/create-invalid-failure.json b/test/fixtures/input/flink/catalog/database/create-invalid-failure.json new file mode 100644 index 0000000000..e45d244060 --- /dev/null +++ b/test/fixtures/input/flink/catalog/database/create-invalid-failure.json @@ -0,0 +1,12 @@ +{ + "apiVersion": "cmf/api/v1/database", + "kind": "KafkaDatabase", + "metadata": { + "name": "invalid-database" + }, + "spec": { + "kafkaCluster": { + "connectionConfig": {} + } + } +} diff --git a/test/fixtures/input/flink/catalog/database/create-successful.json b/test/fixtures/input/flink/catalog/database/create-successful.json new file mode 100644 index 0000000000..4422ebedb8 --- /dev/null +++ b/test/fixtures/input/flink/catalog/database/create-successful.json @@ -0,0 +1,14 @@ +{ + "apiVersion": "cmf/api/v1/database", + "kind": "KafkaDatabase", + "metadata": { + "name": "test-database" + }, + "spec": { + "kafkaCluster": { + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } + } + } +} diff --git a/test/fixtures/output/flink/catalog/database/create-invalid-failure.golden b/test/fixtures/output/flink/catalog/database/create-invalid-failure.golden new file mode 100644 index 0000000000..d2b4965a8a --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/create-invalid-failure.golden @@ -0,0 +1 @@ +Error: failed to create database "invalid-database" in catalog "test-catalog": The Kafka database object from resource file is invalid diff --git a/test/fixtures/output/flink/catalog/database/create-success-json.golden b/test/fixtures/output/flink/catalog/database/create-success-json.golden new file mode 100644 index 0000000000..065e5eba7c --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/create-success-json.golden @@ -0,0 +1,15 @@ +{ + "apiVersion": "cmf/api/v1/database", + "kind": "KafkaDatabase", + "metadata": { + "name": "test-database", + "creationTimestamp": "2025-03-12 23:42:00 +0000 UTC" + }, + "spec": { + "kafkaCluster": { + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } + } + } +} diff --git a/test/fixtures/output/flink/catalog/database/create-success.golden b/test/fixtures/output/flink/catalog/database/create-success.golden new file mode 100644 index 0000000000..b1b1a2a1ff --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/create-success.golden @@ -0,0 +1,5 @@ ++---------------+-------------------------------+ +| Creation Time | 2025-03-12 23:42:00 +0000 UTC | +| Name | test-database | +| Catalog | test-catalog | ++---------------+-------------------------------+ diff --git a/test/flink_onprem_test.go b/test/flink_onprem_test.go index 5536f0a14c..0f7d956714 100644 --- a/test/flink_onprem_test.go +++ b/test/flink_onprem_test.go @@ -410,6 +410,18 @@ func (s *CLITestSuite) TestFlinkCatalogListOnPrem() { runIntegrationTestsWithMultipleAuth(s, tests) } +func (s *CLITestSuite) TestFlinkCatalogDatabaseCreateOnPrem() { + tests := []CLITest{ + // success + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.json --catalog test-catalog", fixture: "flink/catalog/database/create-success.golden"}, + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.json --catalog test-catalog --output json", fixture: "flink/catalog/database/create-success-json.golden"}, + // failure + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-invalid-failure.json --catalog test-catalog", fixture: "flink/catalog/database/create-invalid-failure.golden", exitCode: 1}, + } + + runIntegrationTestsWithMultipleAuth(s, tests) +} + func (s *CLITestSuite) TestFlinkStatementCreateOnPrem() { tests := []CLITest{ // success diff --git a/test/test-server/flink_onprem_handler.go b/test/test-server/flink_onprem_handler.go index 3089876db8..f2c5f031bc 100644 --- a/test/test-server/flink_onprem_handler.go +++ b/test/test-server/flink_onprem_handler.go @@ -224,6 +224,54 @@ func createKafkaCatalog(catName string) cmfsdk.KafkaCatalog { } } +func createKafkaDatabase(dbName string) cmfsdk.KafkaDatabase { + timeStamp := time.Date(2025, time.August, 5, 12, 0, 0, 0, time.UTC).String() + return cmfsdk.KafkaDatabase{ + ApiVersion: "cmf/api/v1/database", + Kind: "KafkaDatabase", + Metadata: cmfsdk.DatabaseMetadata{ + Name: dbName, + CreationTimestamp: &timeStamp, + }, + Spec: cmfsdk.KafkaDatabaseSpec{ + KafkaCluster: cmfsdk.KafkaDatabaseSpecKafkaCluster{ + ConnectionConfig: map[string]string{ + "bootstrap.servers": "localhost:9092", + }, + }, + }, + } +} + +func handleCmfCatalogDatabases(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + handleLoginType(t, r) + switch r.Method { + case http.MethodPost: + reqBody, err := io.ReadAll(r.Body) + require.NoError(t, err) + var database cmfsdk.KafkaDatabase + err = json.Unmarshal(reqBody, &database) + require.NoError(t, err) + + dbName := database.GetMetadata().Name + + if dbName == "invalid-database" { + http.Error(w, "The Kafka database object from resource file is invalid", http.StatusUnprocessableEntity) + return + } + + timeStamp := time.Date(2025, time.March, 12, 23, 42, 0, 0, time.UTC).String() + database.Metadata.CreationTimestamp = &timeStamp + err = json.NewEncoder(w).Encode(database) + require.NoError(t, err) + return + default: + require.Fail(t, fmt.Sprintf("Unexpected method %s", r.Method)) + } + } +} + func createFlinkStatement(stmtName string, stopped bool, parallelism int32) cmfsdk.Statement { timeStamp := time.Date(2025, time.August, 5, 12, 00, 0, 0, time.UTC).String() status := cmfsdk.StatementStatus{ diff --git a/test/test-server/flink_onprem_router.go b/test/test-server/flink_onprem_router.go index 2267f01d5a..a565b7d3f5 100644 --- a/test/test-server/flink_onprem_router.go +++ b/test/test-server/flink_onprem_router.go @@ -9,6 +9,7 @@ import ( var flinkRoutes = []route{ {"/cmf/api/v1/catalogs/kafka", handleCmfCatalogs}, {"/cmf/api/v1/catalogs/kafka/{catName}", handleCmfCatalog}, + {"/cmf/api/v1/catalogs/kafka/{catName}/databases", handleCmfCatalogDatabases}, {"/cmf/api/v1/environments/{environment}/applications", handleCmfApplications}, {"/cmf/api/v1/environments/{environment}/applications/{application}", handleCmfApplication}, {"/cmf/api/v1/environments", handleCmfEnvironments}, From 36c006d225785d404cb19be2525a2230dd90afbf Mon Sep 17 00:00:00 2001 From: Paras Negi Date: Tue, 24 Mar 2026 15:58:19 +0530 Subject: [PATCH 04/14] Fix flink_onprem_handler.go --- test/test-server/flink_onprem_handler.go | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/test/test-server/flink_onprem_handler.go b/test/test-server/flink_onprem_handler.go index f2c5f031bc..1aec24fa3a 100644 --- a/test/test-server/flink_onprem_handler.go +++ b/test/test-server/flink_onprem_handler.go @@ -224,25 +224,6 @@ func createKafkaCatalog(catName string) cmfsdk.KafkaCatalog { } } -func createKafkaDatabase(dbName string) cmfsdk.KafkaDatabase { - timeStamp := time.Date(2025, time.August, 5, 12, 0, 0, 0, time.UTC).String() - return cmfsdk.KafkaDatabase{ - ApiVersion: "cmf/api/v1/database", - Kind: "KafkaDatabase", - Metadata: cmfsdk.DatabaseMetadata{ - Name: dbName, - CreationTimestamp: &timeStamp, - }, - Spec: cmfsdk.KafkaDatabaseSpec{ - KafkaCluster: cmfsdk.KafkaDatabaseSpecKafkaCluster{ - ConnectionConfig: map[string]string{ - "bootstrap.servers": "localhost:9092", - }, - }, - }, - } -} - func handleCmfCatalogDatabases(t *testing.T) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { handleLoginType(t, r) From d86fb7ef9fb47712494b9b3c6f7af6fc79ac4ab8 Mon Sep 17 00:00:00 2001 From: Paras Negi Date: Mon, 30 Mar 2026 09:26:11 +0530 Subject: [PATCH 05/14] Add flink catalog database list command --- internal/flink/command_catalog_database.go | 1 + .../flink/command_catalog_database_list.go | 64 +++++++++++++++++++ pkg/flink/cmf_rest_client.go | 18 ++++++ .../database/create-success-yaml.golden | 9 +++ .../flink/catalog/database/help-onprem.golden | 1 + .../catalog/database/list-help-onprem.golden | 17 +++++ .../catalog/database/list-success-json.golden | 32 ++++++++++ .../catalog/database/list-success-yaml.golden | 18 ++++++ .../catalog/database/list-success.golden | 4 ++ test/flink_onprem_test.go | 12 ++++ test/test-server/flink_onprem_handler.go | 10 +++ 11 files changed, 186 insertions(+) create mode 100644 internal/flink/command_catalog_database_list.go create mode 100644 test/fixtures/output/flink/catalog/database/create-success-yaml.golden create mode 100644 test/fixtures/output/flink/catalog/database/list-help-onprem.golden create mode 100644 test/fixtures/output/flink/catalog/database/list-success-json.golden create mode 100644 test/fixtures/output/flink/catalog/database/list-success-yaml.golden create mode 100644 test/fixtures/output/flink/catalog/database/list-success.golden diff --git a/internal/flink/command_catalog_database.go b/internal/flink/command_catalog_database.go index 8bd27e0c49..f0aa00182a 100644 --- a/internal/flink/command_catalog_database.go +++ b/internal/flink/command_catalog_database.go @@ -22,6 +22,7 @@ func (c *command) newCatalogDatabaseCommand() *cobra.Command { } cmd.AddCommand(c.newCatalogDatabaseCreateCommand()) + cmd.AddCommand(c.newCatalogDatabaseListCommand()) return cmd } diff --git a/internal/flink/command_catalog_database_list.go b/internal/flink/command_catalog_database_list.go new file mode 100644 index 0000000000..2e0ff82f6e --- /dev/null +++ b/internal/flink/command_catalog_database_list.go @@ -0,0 +1,64 @@ +package flink + +import ( + "github.com/spf13/cobra" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/output" +) + +func (c *command) newCatalogDatabaseListCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "list", + Short: "List Flink databases in a catalog in Confluent Platform.", + Args: cobra.NoArgs, + RunE: c.catalogDatabaseList, + } + + cmd.Flags().String("catalog", "", "Name of the catalog.") + cobra.CheckErr(cmd.MarkFlagRequired("catalog")) + addCmfFlagSet(cmd) + pcmd.AddOutputFlag(cmd) + + return cmd +} + +func (c *command) catalogDatabaseList(cmd *cobra.Command, _ []string) error { + catalogName, err := cmd.Flags().GetString("catalog") + if err != nil { + return err + } + + client, err := c.GetCmfClient(cmd) + if err != nil { + return err + } + + sdkDatabases, err := client.ListDatabases(c.createContext(), catalogName) + if err != nil { + return err + } + + if output.GetFormat(cmd) == output.Human { + list := output.NewList(cmd) + for _, db := range sdkDatabases { + var creationTime string + if db.GetMetadata().CreationTimestamp != nil { + creationTime = *db.GetMetadata().CreationTimestamp + } + list.Add(&databaseOut{ + CreationTime: creationTime, + Name: db.GetMetadata().Name, + Catalog: catalogName, + }) + } + return list.Print() + } + + localDatabases := make([]LocalKafkaDatabase, 0, len(sdkDatabases)) + for _, sdkDatabase := range sdkDatabases { + localDatabases = append(localDatabases, convertSdkDatabaseToLocalDatabase(sdkDatabase)) + } + + return output.SerializedOutput(cmd, localDatabases) +} diff --git a/pkg/flink/cmf_rest_client.go b/pkg/flink/cmf_rest_client.go index 0ca0282a47..d1c86800e5 100644 --- a/pkg/flink/cmf_rest_client.go +++ b/pkg/flink/cmf_rest_client.go @@ -579,6 +579,24 @@ func (cmfClient *CmfRestClient) CreateDatabase(ctx context.Context, catalogName return outputDatabase, nil } +func (cmfClient *CmfRestClient) ListDatabases(ctx context.Context, catalogName string) ([]cmfsdk.KafkaDatabase, error) { + databases := make([]cmfsdk.KafkaDatabase, 0) + done := false + const pageSize = 100 + var currentPageNumber int32 = 0 + + for !done { + databasePage, httpResponse, err := cmfClient.SQLApi.GetKafkaDatabases(ctx, catalogName).Page(currentPageNumber).Size(pageSize).Execute() + if parsedErr := parseSdkError(httpResponse, err); parsedErr != nil { + return nil, fmt.Errorf(`failed to list databases in catalog "%s": %s`, catalogName, parsedErr) + } + databases = append(databases, databasePage.GetItems()...) + currentPageNumber, done = extractPageOptions(len(databasePage.GetItems()), currentPageNumber) + } + + return databases, nil +} + // Returns the next page number and whether we need to fetch more pages or not. func extractPageOptions(receivedItemsLength int, currentPageNumber int32) (int32, bool) { if receivedItemsLength == 0 { diff --git a/test/fixtures/output/flink/catalog/database/create-success-yaml.golden b/test/fixtures/output/flink/catalog/database/create-success-yaml.golden new file mode 100644 index 0000000000..0d9dc39dc6 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/create-success-yaml.golden @@ -0,0 +1,9 @@ +apiVersion: cmf/api/v1/database +kind: KafkaDatabase +metadata: + name: test-database + creationTimestamp: 2025-03-12 23:42:00 +0000 UTC +spec: + kafkaCluster: + connectionConfig: + bootstrap.servers: localhost:9092 diff --git a/test/fixtures/output/flink/catalog/database/help-onprem.golden b/test/fixtures/output/flink/catalog/database/help-onprem.golden index 2c152416ae..f6edf74b3f 100644 --- a/test/fixtures/output/flink/catalog/database/help-onprem.golden +++ b/test/fixtures/output/flink/catalog/database/help-onprem.golden @@ -5,6 +5,7 @@ Usage: Available Commands: create Create a Flink database. + list List Flink databases in a catalog in Confluent Platform. Global Flags: -h, --help Show help for this command. diff --git a/test/fixtures/output/flink/catalog/database/list-help-onprem.golden b/test/fixtures/output/flink/catalog/database/list-help-onprem.golden new file mode 100644 index 0000000000..4dbf71ed16 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/list-help-onprem.golden @@ -0,0 +1,17 @@ +List Flink databases in a catalog in Confluent Platform. + +Usage: + confluent flink catalog database list [flags] + +Flags: + --catalog string REQUIRED: Name of the catalog. + --url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag. + --client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag. + --client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag. + --certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent Manager for Apache Flink connection. Environment variable "CONFLUENT_CMF_CERTIFICATE_AUTHORITY_PATH" may be set in place of this flag. + -o, --output string Specify the output format as "human", "json", or "yaml". (default "human") + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). diff --git a/test/fixtures/output/flink/catalog/database/list-success-json.golden b/test/fixtures/output/flink/catalog/database/list-success-json.golden new file mode 100644 index 0000000000..42e40f7379 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/list-success-json.golden @@ -0,0 +1,32 @@ +[ + { + "apiVersion": "cmf/api/v1/database", + "kind": "KafkaDatabase", + "metadata": { + "name": "test-database-1", + "creationTimestamp": "2025-08-05 12:00:00 +0000 UTC" + }, + "spec": { + "kafkaCluster": { + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } + } + } + }, + { + "apiVersion": "cmf/api/v1/database", + "kind": "KafkaDatabase", + "metadata": { + "name": "test-database-2", + "creationTimestamp": "2025-08-05 12:00:00 +0000 UTC" + }, + "spec": { + "kafkaCluster": { + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } + } + } + } +] diff --git a/test/fixtures/output/flink/catalog/database/list-success-yaml.golden b/test/fixtures/output/flink/catalog/database/list-success-yaml.golden new file mode 100644 index 0000000000..186a556114 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/list-success-yaml.golden @@ -0,0 +1,18 @@ +- apiVersion: cmf/api/v1/database + kind: KafkaDatabase + metadata: + name: test-database-1 + creationTimestamp: 2025-08-05 12:00:00 +0000 UTC + spec: + kafkaCluster: + connectionConfig: + bootstrap.servers: localhost:9092 +- apiVersion: cmf/api/v1/database + kind: KafkaDatabase + metadata: + name: test-database-2 + creationTimestamp: 2025-08-05 12:00:00 +0000 UTC + spec: + kafkaCluster: + connectionConfig: + bootstrap.servers: localhost:9092 diff --git a/test/fixtures/output/flink/catalog/database/list-success.golden b/test/fixtures/output/flink/catalog/database/list-success.golden new file mode 100644 index 0000000000..a3a7ea5545 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/list-success.golden @@ -0,0 +1,4 @@ + Creation Time | Name | Catalog +--------------------------------+------------------+-------------- + 2025-08-05 12:00:00 +0000 UTC | test-database-1 | test-catalog + 2025-08-05 12:00:00 +0000 UTC | test-database-2 | test-catalog diff --git a/test/flink_onprem_test.go b/test/flink_onprem_test.go index 0f7d956714..55be5662b1 100644 --- a/test/flink_onprem_test.go +++ b/test/flink_onprem_test.go @@ -415,6 +415,7 @@ func (s *CLITestSuite) TestFlinkCatalogDatabaseCreateOnPrem() { // success {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.json --catalog test-catalog", fixture: "flink/catalog/database/create-success.golden"}, {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.json --catalog test-catalog --output json", fixture: "flink/catalog/database/create-success-json.golden"}, + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.json --catalog test-catalog --output yaml", fixture: "flink/catalog/database/create-success-yaml.golden"}, // failure {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-invalid-failure.json --catalog test-catalog", fixture: "flink/catalog/database/create-invalid-failure.golden", exitCode: 1}, } @@ -422,6 +423,17 @@ func (s *CLITestSuite) TestFlinkCatalogDatabaseCreateOnPrem() { runIntegrationTestsWithMultipleAuth(s, tests) } +func (s *CLITestSuite) TestFlinkCatalogDatabaseListOnPrem() { + tests := []CLITest{ + // success + {args: "flink catalog database list --catalog test-catalog", fixture: "flink/catalog/database/list-success.golden"}, + {args: "flink catalog database list --catalog test-catalog --output json", fixture: "flink/catalog/database/list-success-json.golden"}, + {args: "flink catalog database list --catalog test-catalog --output yaml", fixture: "flink/catalog/database/list-success-yaml.golden"}, + } + + runIntegrationTestsWithMultipleAuth(s, tests) +} + func (s *CLITestSuite) TestFlinkStatementCreateOnPrem() { tests := []CLITest{ // success diff --git a/test/test-server/flink_onprem_handler.go b/test/test-server/flink_onprem_handler.go index 1aec24fa3a..44d302f478 100644 --- a/test/test-server/flink_onprem_handler.go +++ b/test/test-server/flink_onprem_handler.go @@ -228,6 +228,16 @@ func handleCmfCatalogDatabases(t *testing.T) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { handleLoginType(t, r) switch r.Method { + case http.MethodGet: + databases := []cmfsdk.KafkaDatabase{ + createKafkaDatabase("test-database-1"), + createKafkaDatabase("test-database-2"), + } + page := cmfsdk.KafkaDatabasesPage{ + Items: &databases, + } + err := json.NewEncoder(w).Encode(page) + require.NoError(t, err) case http.MethodPost: reqBody, err := io.ReadAll(r.Body) require.NoError(t, err) From 2c09071aec6900756f02e4aefbb6393f2fe1b1b8 Mon Sep 17 00:00:00 2001 From: Paras Negi Date: Mon, 30 Mar 2026 09:35:02 +0530 Subject: [PATCH 06/14] Fix build --- test/test-server/flink_onprem_handler.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/test/test-server/flink_onprem_handler.go b/test/test-server/flink_onprem_handler.go index 44d302f478..d9f8f67fe4 100644 --- a/test/test-server/flink_onprem_handler.go +++ b/test/test-server/flink_onprem_handler.go @@ -224,6 +224,25 @@ func createKafkaCatalog(catName string) cmfsdk.KafkaCatalog { } } +func createKafkaDatabase(dbName string) cmfsdk.KafkaDatabase { + timeStamp := time.Date(2025, time.August, 5, 12, 0, 0, 0, time.UTC).String() + return cmfsdk.KafkaDatabase{ + ApiVersion: "cmf/api/v1/database", + Kind: "KafkaDatabase", + Metadata: cmfsdk.DatabaseMetadata{ + Name: dbName, + CreationTimestamp: &timeStamp, + }, + Spec: cmfsdk.KafkaDatabaseSpec{ + KafkaCluster: cmfsdk.KafkaDatabaseSpecKafkaCluster{ + ConnectionConfig: map[string]string{ + "bootstrap.servers": "localhost:9092", + }, + }, + }, + } +} + func handleCmfCatalogDatabases(t *testing.T) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { handleLoginType(t, r) From f85825b252cd7dd8da4fed8c86b6f7cbb6f12e34 Mon Sep 17 00:00:00 2001 From: Paras Negi Date: Tue, 31 Mar 2026 10:46:01 +0530 Subject: [PATCH 07/14] Fix ListDatabases() --- test/test-server/flink_onprem_handler.go | 34 +++++++++++++++++++++--- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/test/test-server/flink_onprem_handler.go b/test/test-server/flink_onprem_handler.go index d9f8f67fe4..63fc4b3889 100644 --- a/test/test-server/flink_onprem_handler.go +++ b/test/test-server/flink_onprem_handler.go @@ -252,10 +252,14 @@ func handleCmfCatalogDatabases(t *testing.T) http.HandlerFunc { createKafkaDatabase("test-database-1"), createKafkaDatabase("test-database-2"), } - page := cmfsdk.KafkaDatabasesPage{ - Items: &databases, + databasesPage := cmfsdk.KafkaDatabasesPage{} + page := r.URL.Query().Get("page") + + if page == "0" { + databasesPage.SetItems(databases) } - err := json.NewEncoder(w).Encode(page) + + err := json.NewEncoder(w).Encode(databasesPage) require.NoError(t, err) case http.MethodPost: reqBody, err := io.ReadAll(r.Body) @@ -282,6 +286,30 @@ func handleCmfCatalogDatabases(t *testing.T) http.HandlerFunc { } } +func handleCmfCatalogDatabase(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + handleLoginType(t, r) + + vars := mux.Vars(r) + dbName := vars["dbName"] + + switch r.Method { + case http.MethodGet: + if dbName == "invalid-database" { + http.Error(w, "The database name is invalid", http.StatusNotFound) + return + } + + database := createKafkaDatabase(dbName) + err := json.NewEncoder(w).Encode(database) + require.NoError(t, err) + return + default: + require.Fail(t, fmt.Sprintf("Unexpected method %s", r.Method)) + } + } +} + func createFlinkStatement(stmtName string, stopped bool, parallelism int32) cmfsdk.Statement { timeStamp := time.Date(2025, time.August, 5, 12, 00, 0, 0, time.UTC).String() status := cmfsdk.StatementStatus{ From 02dfaf2bfc6d81bdee205d121eda14468f637cd5 Mon Sep 17 00:00:00 2001 From: Paras Negi Date: Tue, 31 Mar 2026 11:01:52 +0530 Subject: [PATCH 08/14] Added Describe/GET database cli command --- internal/flink/command_catalog_database.go | 1 + .../command_catalog_database_describe.go | 60 +++++++++++++++++++ pkg/flink/cmf_rest_client.go | 8 +++ .../database/describe-help-onprem.golden | 17 ++++++ .../database/describe-not-found.golden | 1 + .../database/describe-success-json.golden | 15 +++++ .../database/describe-success-yaml.golden | 9 +++ .../catalog/database/describe-success.golden | 5 ++ .../flink/catalog/database/help-onprem.golden | 1 + test/flink_onprem_test.go | 13 ++++ test/test-server/flink_onprem_router.go | 1 + 11 files changed, 131 insertions(+) create mode 100644 internal/flink/command_catalog_database_describe.go create mode 100644 test/fixtures/output/flink/catalog/database/describe-help-onprem.golden create mode 100644 test/fixtures/output/flink/catalog/database/describe-not-found.golden create mode 100644 test/fixtures/output/flink/catalog/database/describe-success-json.golden create mode 100644 test/fixtures/output/flink/catalog/database/describe-success-yaml.golden create mode 100644 test/fixtures/output/flink/catalog/database/describe-success.golden diff --git a/internal/flink/command_catalog_database.go b/internal/flink/command_catalog_database.go index f0aa00182a..356132f443 100644 --- a/internal/flink/command_catalog_database.go +++ b/internal/flink/command_catalog_database.go @@ -22,6 +22,7 @@ func (c *command) newCatalogDatabaseCommand() *cobra.Command { } cmd.AddCommand(c.newCatalogDatabaseCreateCommand()) + cmd.AddCommand(c.newCatalogDatabaseDescribeCommand()) cmd.AddCommand(c.newCatalogDatabaseListCommand()) return cmd diff --git a/internal/flink/command_catalog_database_describe.go b/internal/flink/command_catalog_database_describe.go new file mode 100644 index 0000000000..f6598d5b22 --- /dev/null +++ b/internal/flink/command_catalog_database_describe.go @@ -0,0 +1,60 @@ +package flink + +import ( + "github.com/spf13/cobra" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/output" +) + +func (c *command) newCatalogDatabaseDescribeCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "describe ", + Short: "Describe a Flink database in Confluent Platform.", + Args: cobra.ExactArgs(1), + RunE: c.catalogDatabaseDescribe, + } + + cmd.Flags().String("catalog", "", "Name of the catalog.") + cobra.CheckErr(cmd.MarkFlagRequired("catalog")) + addCmfFlagSet(cmd) + pcmd.AddOutputFlag(cmd) + + return cmd +} + +func (c *command) catalogDatabaseDescribe(cmd *cobra.Command, args []string) error { + name := args[0] + + catalogName, err := cmd.Flags().GetString("catalog") + if err != nil { + return err + } + + client, err := c.GetCmfClient(cmd) + if err != nil { + return err + } + + sdkOutputDatabase, err := client.DescribeDatabase(c.createContext(), catalogName, name) + if err != nil { + return err + } + + if output.GetFormat(cmd) == output.Human { + table := output.NewTable(cmd) + var creationTime string + if sdkOutputDatabase.GetMetadata().CreationTimestamp != nil { + creationTime = *sdkOutputDatabase.GetMetadata().CreationTimestamp + } + table.Add(&databaseOut{ + CreationTime: creationTime, + Name: sdkOutputDatabase.GetMetadata().Name, + Catalog: catalogName, + }) + return table.Print() + } + + localDatabase := convertSdkDatabaseToLocalDatabase(sdkOutputDatabase) + return output.SerializedOutput(cmd, localDatabase) +} diff --git a/pkg/flink/cmf_rest_client.go b/pkg/flink/cmf_rest_client.go index d1c86800e5..71686af98b 100644 --- a/pkg/flink/cmf_rest_client.go +++ b/pkg/flink/cmf_rest_client.go @@ -579,6 +579,14 @@ func (cmfClient *CmfRestClient) CreateDatabase(ctx context.Context, catalogName return outputDatabase, nil } +func (cmfClient *CmfRestClient) DescribeDatabase(ctx context.Context, catalogName, databaseName string) (cmfsdk.KafkaDatabase, error) { + outputDatabase, httpResponse, err := cmfClient.SQLApi.GetKafkaDatabase(ctx, catalogName, databaseName).Execute() + if parsedErr := parseSdkError(httpResponse, err); parsedErr != nil { + return cmfsdk.KafkaDatabase{}, fmt.Errorf(`failed to get database "%s" in catalog "%s": %s`, databaseName, catalogName, parsedErr) + } + return outputDatabase, nil +} + func (cmfClient *CmfRestClient) ListDatabases(ctx context.Context, catalogName string) ([]cmfsdk.KafkaDatabase, error) { databases := make([]cmfsdk.KafkaDatabase, 0) done := false diff --git a/test/fixtures/output/flink/catalog/database/describe-help-onprem.golden b/test/fixtures/output/flink/catalog/database/describe-help-onprem.golden new file mode 100644 index 0000000000..23cffb3b6d --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/describe-help-onprem.golden @@ -0,0 +1,17 @@ +Describe a Flink database in Confluent Platform. + +Usage: + confluent flink catalog database describe [flags] + +Flags: + --catalog string REQUIRED: Name of the catalog. + --url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag. + --client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag. + --client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag. + --certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent Manager for Apache Flink connection. Environment variable "CONFLUENT_CMF_CERTIFICATE_AUTHORITY_PATH" may be set in place of this flag. + -o, --output string Specify the output format as "human", "json", or "yaml". (default "human") + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). diff --git a/test/fixtures/output/flink/catalog/database/describe-not-found.golden b/test/fixtures/output/flink/catalog/database/describe-not-found.golden new file mode 100644 index 0000000000..0d2858c098 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/describe-not-found.golden @@ -0,0 +1 @@ +Error: failed to get database "invalid-database" in catalog "test-catalog": The database name is invalid diff --git a/test/fixtures/output/flink/catalog/database/describe-success-json.golden b/test/fixtures/output/flink/catalog/database/describe-success-json.golden new file mode 100644 index 0000000000..fb8c835dc6 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/describe-success-json.golden @@ -0,0 +1,15 @@ +{ + "apiVersion": "cmf/api/v1/database", + "kind": "KafkaDatabase", + "metadata": { + "name": "test-database", + "creationTimestamp": "2025-08-05 12:00:00 +0000 UTC" + }, + "spec": { + "kafkaCluster": { + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } + } + } +} diff --git a/test/fixtures/output/flink/catalog/database/describe-success-yaml.golden b/test/fixtures/output/flink/catalog/database/describe-success-yaml.golden new file mode 100644 index 0000000000..a45817dbbb --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/describe-success-yaml.golden @@ -0,0 +1,9 @@ +apiVersion: cmf/api/v1/database +kind: KafkaDatabase +metadata: + name: test-database + creationTimestamp: 2025-08-05 12:00:00 +0000 UTC +spec: + kafkaCluster: + connectionConfig: + bootstrap.servers: localhost:9092 diff --git a/test/fixtures/output/flink/catalog/database/describe-success.golden b/test/fixtures/output/flink/catalog/database/describe-success.golden new file mode 100644 index 0000000000..96b1543c77 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/describe-success.golden @@ -0,0 +1,5 @@ ++---------------+-------------------------------+ +| Creation Time | 2025-08-05 12:00:00 +0000 UTC | +| Name | test-database | +| Catalog | test-catalog | ++---------------+-------------------------------+ diff --git a/test/fixtures/output/flink/catalog/database/help-onprem.golden b/test/fixtures/output/flink/catalog/database/help-onprem.golden index f6edf74b3f..a566cc318d 100644 --- a/test/fixtures/output/flink/catalog/database/help-onprem.golden +++ b/test/fixtures/output/flink/catalog/database/help-onprem.golden @@ -5,6 +5,7 @@ Usage: Available Commands: create Create a Flink database. + describe Describe a Flink database in Confluent Platform. list List Flink databases in a catalog in Confluent Platform. Global Flags: diff --git a/test/flink_onprem_test.go b/test/flink_onprem_test.go index 55be5662b1..9072b9b8b8 100644 --- a/test/flink_onprem_test.go +++ b/test/flink_onprem_test.go @@ -423,6 +423,19 @@ func (s *CLITestSuite) TestFlinkCatalogDatabaseCreateOnPrem() { runIntegrationTestsWithMultipleAuth(s, tests) } +func (s *CLITestSuite) TestFlinkCatalogDatabaseDescribeOnPrem() { + tests := []CLITest{ + // success + {args: "flink catalog database describe test-database --catalog test-catalog", fixture: "flink/catalog/database/describe-success.golden"}, + {args: "flink catalog database describe test-database --catalog test-catalog --output json", fixture: "flink/catalog/database/describe-success-json.golden"}, + {args: "flink catalog database describe test-database --catalog test-catalog --output yaml", fixture: "flink/catalog/database/describe-success-yaml.golden"}, + // failure + {args: "flink catalog database describe invalid-database --catalog test-catalog", fixture: "flink/catalog/database/describe-not-found.golden", exitCode: 1}, + } + + runIntegrationTestsWithMultipleAuth(s, tests) +} + func (s *CLITestSuite) TestFlinkCatalogDatabaseListOnPrem() { tests := []CLITest{ // success diff --git a/test/test-server/flink_onprem_router.go b/test/test-server/flink_onprem_router.go index a565b7d3f5..8d597732f6 100644 --- a/test/test-server/flink_onprem_router.go +++ b/test/test-server/flink_onprem_router.go @@ -10,6 +10,7 @@ var flinkRoutes = []route{ {"/cmf/api/v1/catalogs/kafka", handleCmfCatalogs}, {"/cmf/api/v1/catalogs/kafka/{catName}", handleCmfCatalog}, {"/cmf/api/v1/catalogs/kafka/{catName}/databases", handleCmfCatalogDatabases}, + {"/cmf/api/v1/catalogs/kafka/{catName}/databases/{dbName}", handleCmfCatalogDatabase}, {"/cmf/api/v1/environments/{environment}/applications", handleCmfApplications}, {"/cmf/api/v1/environments/{environment}/applications/{application}", handleCmfApplication}, {"/cmf/api/v1/environments", handleCmfEnvironments}, From 3271eec05066eda01c41dcde0f0f6f22e6217814 Mon Sep 17 00:00:00 2001 From: Paras Negi Date: Tue, 31 Mar 2026 16:39:58 +0530 Subject: [PATCH 09/14] Fix the golden files --- .../output/flink/catalog/database/list-success.golden | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/fixtures/output/flink/catalog/database/list-success.golden b/test/fixtures/output/flink/catalog/database/list-success.golden index a3a7ea5545..983bc1d5df 100644 --- a/test/fixtures/output/flink/catalog/database/list-success.golden +++ b/test/fixtures/output/flink/catalog/database/list-success.golden @@ -1,4 +1,4 @@ - Creation Time | Name | Catalog ---------------------------------+------------------+-------------- - 2025-08-05 12:00:00 +0000 UTC | test-database-1 | test-catalog - 2025-08-05 12:00:00 +0000 UTC | test-database-2 | test-catalog + Creation Time | Name | Catalog +--------------------------------+-----------------+--------------- + 2025-08-05 12:00:00 +0000 UTC | test-database-1 | test-catalog + 2025-08-05 12:00:00 +0000 UTC | test-database-2 | test-catalog From 8f3a1e13850ba60754d32e4f9bf36e1945ce2bd6 Mon Sep 17 00:00:00 2001 From: Paras Negi Date: Tue, 31 Mar 2026 17:50:47 +0530 Subject: [PATCH 10/14] Added the update database cli command --- internal/flink/command_catalog_database.go | 1 + .../flink/command_catalog_database_update.go | 108 ++++++++++++++++++ pkg/flink/cmf_rest_client.go | 8 ++ .../database/update-invalid-failure.json | 14 +++ .../catalog/database/update-successful.json | 14 +++ .../flink/catalog/database/help-onprem.golden | 1 + .../database/update-help-onprem.golden | 17 +++ .../database/update-invalid-failure.golden | 1 + .../database/update-success-json.golden | 15 +++ .../database/update-success-yaml.golden | 9 ++ .../catalog/database/update-success.golden | 5 + test/flink_onprem_test.go | 13 +++ test/test-server/flink_onprem_handler.go | 8 ++ 13 files changed, 214 insertions(+) create mode 100644 internal/flink/command_catalog_database_update.go create mode 100644 test/fixtures/input/flink/catalog/database/update-invalid-failure.json create mode 100644 test/fixtures/input/flink/catalog/database/update-successful.json create mode 100644 test/fixtures/output/flink/catalog/database/update-help-onprem.golden create mode 100644 test/fixtures/output/flink/catalog/database/update-invalid-failure.golden create mode 100644 test/fixtures/output/flink/catalog/database/update-success-json.golden create mode 100644 test/fixtures/output/flink/catalog/database/update-success-yaml.golden create mode 100644 test/fixtures/output/flink/catalog/database/update-success.golden diff --git a/internal/flink/command_catalog_database.go b/internal/flink/command_catalog_database.go index 356132f443..80dfd0bd62 100644 --- a/internal/flink/command_catalog_database.go +++ b/internal/flink/command_catalog_database.go @@ -24,6 +24,7 @@ func (c *command) newCatalogDatabaseCommand() *cobra.Command { cmd.AddCommand(c.newCatalogDatabaseCreateCommand()) cmd.AddCommand(c.newCatalogDatabaseDescribeCommand()) cmd.AddCommand(c.newCatalogDatabaseListCommand()) + cmd.AddCommand(c.newCatalogDatabaseUpdateCommand()) return cmd } diff --git a/internal/flink/command_catalog_database_update.go b/internal/flink/command_catalog_database_update.go new file mode 100644 index 0000000000..64a7202ac1 --- /dev/null +++ b/internal/flink/command_catalog_database_update.go @@ -0,0 +1,108 @@ +package flink + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + + "github.com/spf13/cobra" + "gopkg.in/yaml.v3" + + cmfsdk "github.com/confluentinc/cmf-sdk-go/v1" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/errors" + "github.com/confluentinc/cli/v4/pkg/output" +) + +func (c *command) newCatalogDatabaseUpdateCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "update ", + Short: "Update a Flink database.", + Long: "Update a Flink database in a catalog in Confluent Platform.", + Args: cobra.ExactArgs(1), + RunE: c.catalogDatabaseUpdate, + } + + cmd.Flags().String("catalog", "", "Name of the catalog.") + cobra.CheckErr(cmd.MarkFlagRequired("catalog")) + addCmfFlagSet(cmd) + pcmd.AddOutputFlag(cmd) + + return cmd +} + +func (c *command) catalogDatabaseUpdate(cmd *cobra.Command, args []string) error { + resourceFilePath := args[0] + + catalogName, err := cmd.Flags().GetString("catalog") + if err != nil { + return err + } + + client, err := c.GetCmfClient(cmd) + if err != nil { + return err + } + + data, err := os.ReadFile(resourceFilePath) + if err != nil { + return fmt.Errorf("failed to read file: %v", err) + } + + var genericData map[string]interface{} + ext := filepath.Ext(resourceFilePath) + switch ext { + case ".json": + err = json.Unmarshal(data, &genericData) + case ".yaml", ".yml": + err = yaml.Unmarshal(data, &genericData) + default: + return errors.NewErrorWithSuggestions( + fmt.Sprintf("unsupported file format: %s", ext), + "Supported file formats are .json, .yaml, and .yml.", + ) + } + if err != nil { + return err + } + + jsonBytes, err := json.Marshal(genericData) + if err != nil { + return fmt.Errorf("failed to marshal intermediate data: %w", err) + } + + var sdkDatabase cmfsdk.KafkaDatabase + if err = json.Unmarshal(jsonBytes, &sdkDatabase); err != nil { + return fmt.Errorf("failed to bind data to KafkaDatabase model: %w", err) + } + + databaseName := sdkDatabase.Metadata.Name + + if err := client.UpdateDatabase(c.createContext(), catalogName, databaseName, sdkDatabase); err != nil { + return err + } + + sdkOutputDatabase, err := client.DescribeDatabase(c.createContext(), catalogName, databaseName) + if err != nil { + return err + } + + if output.GetFormat(cmd) == output.Human { + table := output.NewTable(cmd) + var creationTime string + if sdkOutputDatabase.GetMetadata().CreationTimestamp != nil { + creationTime = *sdkOutputDatabase.GetMetadata().CreationTimestamp + } + table.Add(&databaseOut{ + CreationTime: creationTime, + Name: sdkOutputDatabase.GetMetadata().Name, + Catalog: catalogName, + }) + return table.Print() + } + + localDatabase := convertSdkDatabaseToLocalDatabase(sdkOutputDatabase) + return output.SerializedOutput(cmd, localDatabase) +} diff --git a/pkg/flink/cmf_rest_client.go b/pkg/flink/cmf_rest_client.go index 71686af98b..f632682266 100644 --- a/pkg/flink/cmf_rest_client.go +++ b/pkg/flink/cmf_rest_client.go @@ -579,6 +579,14 @@ func (cmfClient *CmfRestClient) CreateDatabase(ctx context.Context, catalogName return outputDatabase, nil } +func (cmfClient *CmfRestClient) UpdateDatabase(ctx context.Context, catalogName, databaseName string, kafkaDatabase cmfsdk.KafkaDatabase) error { + httpResponse, err := cmfClient.SQLApi.UpdateKafkaDatabase(ctx, catalogName, databaseName).KafkaDatabase(kafkaDatabase).Execute() + if parsedErr := parseSdkError(httpResponse, err); parsedErr != nil { + return fmt.Errorf(`failed to update database "%s" in catalog "%s": %s`, databaseName, catalogName, parsedErr) + } + return nil +} + func (cmfClient *CmfRestClient) DescribeDatabase(ctx context.Context, catalogName, databaseName string) (cmfsdk.KafkaDatabase, error) { outputDatabase, httpResponse, err := cmfClient.SQLApi.GetKafkaDatabase(ctx, catalogName, databaseName).Execute() if parsedErr := parseSdkError(httpResponse, err); parsedErr != nil { diff --git a/test/fixtures/input/flink/catalog/database/update-invalid-failure.json b/test/fixtures/input/flink/catalog/database/update-invalid-failure.json new file mode 100644 index 0000000000..62292591c4 --- /dev/null +++ b/test/fixtures/input/flink/catalog/database/update-invalid-failure.json @@ -0,0 +1,14 @@ +{ + "apiVersion": "cmf/api/v1/database", + "kind": "KafkaDatabase", + "metadata": { + "name": "invalid-database" + }, + "spec": { + "kafkaCluster": { + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } + } + } +} diff --git a/test/fixtures/input/flink/catalog/database/update-successful.json b/test/fixtures/input/flink/catalog/database/update-successful.json new file mode 100644 index 0000000000..4422ebedb8 --- /dev/null +++ b/test/fixtures/input/flink/catalog/database/update-successful.json @@ -0,0 +1,14 @@ +{ + "apiVersion": "cmf/api/v1/database", + "kind": "KafkaDatabase", + "metadata": { + "name": "test-database" + }, + "spec": { + "kafkaCluster": { + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } + } + } +} diff --git a/test/fixtures/output/flink/catalog/database/help-onprem.golden b/test/fixtures/output/flink/catalog/database/help-onprem.golden index a566cc318d..1020cc1a21 100644 --- a/test/fixtures/output/flink/catalog/database/help-onprem.golden +++ b/test/fixtures/output/flink/catalog/database/help-onprem.golden @@ -7,6 +7,7 @@ Available Commands: create Create a Flink database. describe Describe a Flink database in Confluent Platform. list List Flink databases in a catalog in Confluent Platform. + update Update a Flink database. Global Flags: -h, --help Show help for this command. diff --git a/test/fixtures/output/flink/catalog/database/update-help-onprem.golden b/test/fixtures/output/flink/catalog/database/update-help-onprem.golden new file mode 100644 index 0000000000..f5c10ad850 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/update-help-onprem.golden @@ -0,0 +1,17 @@ +Update a Flink database in a catalog in Confluent Platform. + +Usage: + confluent flink catalog database update [flags] + +Flags: + --catalog string REQUIRED: Name of the catalog. + --url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag. + --client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag. + --client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag. + --certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent Manager for Apache Flink connection. Environment variable "CONFLUENT_CMF_CERTIFICATE_AUTHORITY_PATH" may be set in place of this flag. + -o, --output string Specify the output format as "human", "json", or "yaml". (default "human") + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). diff --git a/test/fixtures/output/flink/catalog/database/update-invalid-failure.golden b/test/fixtures/output/flink/catalog/database/update-invalid-failure.golden new file mode 100644 index 0000000000..16726790eb --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/update-invalid-failure.golden @@ -0,0 +1 @@ +Error: failed to update database "invalid-database" in catalog "test-catalog": The database name is invalid diff --git a/test/fixtures/output/flink/catalog/database/update-success-json.golden b/test/fixtures/output/flink/catalog/database/update-success-json.golden new file mode 100644 index 0000000000..fb8c835dc6 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/update-success-json.golden @@ -0,0 +1,15 @@ +{ + "apiVersion": "cmf/api/v1/database", + "kind": "KafkaDatabase", + "metadata": { + "name": "test-database", + "creationTimestamp": "2025-08-05 12:00:00 +0000 UTC" + }, + "spec": { + "kafkaCluster": { + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } + } + } +} diff --git a/test/fixtures/output/flink/catalog/database/update-success-yaml.golden b/test/fixtures/output/flink/catalog/database/update-success-yaml.golden new file mode 100644 index 0000000000..a45817dbbb --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/update-success-yaml.golden @@ -0,0 +1,9 @@ +apiVersion: cmf/api/v1/database +kind: KafkaDatabase +metadata: + name: test-database + creationTimestamp: 2025-08-05 12:00:00 +0000 UTC +spec: + kafkaCluster: + connectionConfig: + bootstrap.servers: localhost:9092 diff --git a/test/fixtures/output/flink/catalog/database/update-success.golden b/test/fixtures/output/flink/catalog/database/update-success.golden new file mode 100644 index 0000000000..96b1543c77 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/update-success.golden @@ -0,0 +1,5 @@ ++---------------+-------------------------------+ +| Creation Time | 2025-08-05 12:00:00 +0000 UTC | +| Name | test-database | +| Catalog | test-catalog | ++---------------+-------------------------------+ diff --git a/test/flink_onprem_test.go b/test/flink_onprem_test.go index 9072b9b8b8..f6f1c60415 100644 --- a/test/flink_onprem_test.go +++ b/test/flink_onprem_test.go @@ -447,6 +447,19 @@ func (s *CLITestSuite) TestFlinkCatalogDatabaseListOnPrem() { runIntegrationTestsWithMultipleAuth(s, tests) } +func (s *CLITestSuite) TestFlinkCatalogDatabaseUpdateOnPrem() { + tests := []CLITest{ + // success + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-successful.json --catalog test-catalog", fixture: "flink/catalog/database/update-success.golden"}, + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-successful.json --catalog test-catalog --output json", fixture: "flink/catalog/database/update-success-json.golden"}, + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-successful.json --catalog test-catalog --output yaml", fixture: "flink/catalog/database/update-success-yaml.golden"}, + // failure + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-invalid-failure.json --catalog test-catalog", fixture: "flink/catalog/database/update-invalid-failure.golden", exitCode: 1}, + } + + runIntegrationTestsWithMultipleAuth(s, tests) +} + func (s *CLITestSuite) TestFlinkStatementCreateOnPrem() { tests := []CLITest{ // success diff --git a/test/test-server/flink_onprem_handler.go b/test/test-server/flink_onprem_handler.go index 63fc4b3889..27195bb74d 100644 --- a/test/test-server/flink_onprem_handler.go +++ b/test/test-server/flink_onprem_handler.go @@ -304,6 +304,14 @@ func handleCmfCatalogDatabase(t *testing.T) http.HandlerFunc { err := json.NewEncoder(w).Encode(database) require.NoError(t, err) return + case http.MethodPut: + if dbName == "invalid-database" { + http.Error(w, "The database name is invalid", http.StatusNotFound) + return + } + + w.WriteHeader(http.StatusOK) + return default: require.Fail(t, fmt.Sprintf("Unexpected method %s", r.Method)) } From 25d8693b82695fdad5fba70872291c8ee9f0dc22 Mon Sep 17 00:00:00 2001 From: Paras Negi Date: Wed, 1 Apr 2026 20:13:39 +0530 Subject: [PATCH 11/14] Add flink catalog database update command --- internal/flink/command_catalog_database.go | 1 + .../flink/command_catalog_database_delete.go | 53 +++++++++++++++++++ pkg/flink/cmf_rest_client.go | 8 +++ pkg/resource/resource.go | 1 + .../database/delete-help-onprem.golden | 17 ++++++ .../database/delete-non-exist-failure.golden | 1 + .../database/delete-single-force.golden | 1 + .../database/delete-single-successful.golden | 1 + .../flink/catalog/database/help-onprem.golden | 1 + test/flink_onprem_test.go | 12 +++++ test/test-server/flink_onprem_handler.go | 15 ++++-- 11 files changed, 108 insertions(+), 3 deletions(-) create mode 100644 internal/flink/command_catalog_database_delete.go create mode 100644 test/fixtures/output/flink/catalog/database/delete-help-onprem.golden create mode 100644 test/fixtures/output/flink/catalog/database/delete-non-exist-failure.golden create mode 100644 test/fixtures/output/flink/catalog/database/delete-single-force.golden create mode 100644 test/fixtures/output/flink/catalog/database/delete-single-successful.golden diff --git a/internal/flink/command_catalog_database.go b/internal/flink/command_catalog_database.go index 80dfd0bd62..01327f68b8 100644 --- a/internal/flink/command_catalog_database.go +++ b/internal/flink/command_catalog_database.go @@ -22,6 +22,7 @@ func (c *command) newCatalogDatabaseCommand() *cobra.Command { } cmd.AddCommand(c.newCatalogDatabaseCreateCommand()) + cmd.AddCommand(c.newCatalogDatabaseDeleteCommand()) cmd.AddCommand(c.newCatalogDatabaseDescribeCommand()) cmd.AddCommand(c.newCatalogDatabaseListCommand()) cmd.AddCommand(c.newCatalogDatabaseUpdateCommand()) diff --git a/internal/flink/command_catalog_database_delete.go b/internal/flink/command_catalog_database_delete.go new file mode 100644 index 0000000000..8c161f6040 --- /dev/null +++ b/internal/flink/command_catalog_database_delete.go @@ -0,0 +1,53 @@ +package flink + +import ( + "github.com/spf13/cobra" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/deletion" + "github.com/confluentinc/cli/v4/pkg/resource" +) + +func (c *command) newCatalogDatabaseDeleteCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "delete ", + Short: "Delete a Flink database in Confluent Platform.", + Args: cobra.ExactArgs(1), + RunE: c.catalogDatabaseDelete, + } + + cmd.Flags().String("catalog", "", "Name of the catalog.") + cobra.CheckErr(cmd.MarkFlagRequired("catalog")) + addCmfFlagSet(cmd) + pcmd.AddForceFlag(cmd) + + return cmd +} + +func (c *command) catalogDatabaseDelete(cmd *cobra.Command, args []string) error { + catalogName, err := cmd.Flags().GetString("catalog") + if err != nil { + return err + } + + client, err := c.GetCmfClient(cmd) + if err != nil { + return err + } + + existenceFunc := func(name string) bool { + _, err := client.DescribeDatabase(c.createContext(), catalogName, name) + return err == nil + } + + if err := deletion.ValidateAndConfirm(cmd, args, existenceFunc, resource.FlinkDatabase); err != nil { + return err + } + + deleteFunc := func(name string) error { + return client.DeleteDatabase(c.createContext(), catalogName, name) + } + + _, err = deletion.Delete(cmd, args, deleteFunc, resource.FlinkDatabase) + return err +} diff --git a/pkg/flink/cmf_rest_client.go b/pkg/flink/cmf_rest_client.go index f632682266..05b3ce809b 100644 --- a/pkg/flink/cmf_rest_client.go +++ b/pkg/flink/cmf_rest_client.go @@ -570,6 +570,14 @@ func (cmfClient *CmfRestClient) DeleteCatalog(ctx context.Context, catalogName s return parseSdkError(httpResp, err) } +func (cmfClient *CmfRestClient) DeleteDatabase(ctx context.Context, catalogName, databaseName string) error { + httpResp, err := cmfClient.SQLApi.DeleteKafkaDatabase(ctx, catalogName, databaseName).Execute() + if parsedErr := parseSdkError(httpResp, err); parsedErr != nil { + return fmt.Errorf(`failed to delete database "%s" in catalog "%s": %s`, databaseName, catalogName, parsedErr) + } + return nil +} + func (cmfClient *CmfRestClient) CreateDatabase(ctx context.Context, catalogName string, kafkaDatabase cmfsdk.KafkaDatabase) (cmfsdk.KafkaDatabase, error) { databaseName := kafkaDatabase.Metadata.Name outputDatabase, httpResponse, err := cmfClient.SQLApi.CreateKafkaDatabase(ctx, catalogName).KafkaDatabase(kafkaDatabase).Execute() diff --git a/pkg/resource/resource.go b/pkg/resource/resource.go index a56592ddd9..bd50ce6853 100644 --- a/pkg/resource/resource.go +++ b/pkg/resource/resource.go @@ -45,6 +45,7 @@ const ( FlinkDetachedSavepoint = "Flink detached savepoint" FlinkApplication = "Flink application" FlinkCatalog = "Flink catalog" + FlinkDatabase = "Flink database" FlinkEnvironment = "Flink environment" FlinkRegion = "Flink region" FlinkEndpoint = "Flink endpoint" diff --git a/test/fixtures/output/flink/catalog/database/delete-help-onprem.golden b/test/fixtures/output/flink/catalog/database/delete-help-onprem.golden new file mode 100644 index 0000000000..91228009b3 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/delete-help-onprem.golden @@ -0,0 +1,17 @@ +Delete a Flink database in Confluent Platform. + +Usage: + confluent flink catalog database delete [flags] + +Flags: + --catalog string REQUIRED: Name of the catalog. + --url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag. + --client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag. + --client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag. + --certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent Manager for Apache Flink connection. Environment variable "CONFLUENT_CMF_CERTIFICATE_AUTHORITY_PATH" may be set in place of this flag. + --force Skip the deletion confirmation prompt. + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). diff --git a/test/fixtures/output/flink/catalog/database/delete-non-exist-failure.golden b/test/fixtures/output/flink/catalog/database/delete-non-exist-failure.golden new file mode 100644 index 0000000000..b6cedfb005 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/delete-non-exist-failure.golden @@ -0,0 +1 @@ +Are you sure you want to delete Flink database "non-exist-database"? (y/n): Error: failed to delete non-exist-database: failed to delete database "non-exist-database" in catalog "test-catalog": 404 Not Found diff --git a/test/fixtures/output/flink/catalog/database/delete-single-force.golden b/test/fixtures/output/flink/catalog/database/delete-single-force.golden new file mode 100644 index 0000000000..3517fbbf14 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/delete-single-force.golden @@ -0,0 +1 @@ +Deleted Flink database "test-database-1". diff --git a/test/fixtures/output/flink/catalog/database/delete-single-successful.golden b/test/fixtures/output/flink/catalog/database/delete-single-successful.golden new file mode 100644 index 0000000000..ecc2f170b7 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/delete-single-successful.golden @@ -0,0 +1 @@ +Are you sure you want to delete Flink database "test-database-1"? (y/n): Deleted Flink database "test-database-1". diff --git a/test/fixtures/output/flink/catalog/database/help-onprem.golden b/test/fixtures/output/flink/catalog/database/help-onprem.golden index 1020cc1a21..49da534a01 100644 --- a/test/fixtures/output/flink/catalog/database/help-onprem.golden +++ b/test/fixtures/output/flink/catalog/database/help-onprem.golden @@ -5,6 +5,7 @@ Usage: Available Commands: create Create a Flink database. + delete Delete a Flink database in Confluent Platform. describe Describe a Flink database in Confluent Platform. list List Flink databases in a catalog in Confluent Platform. update Update a Flink database. diff --git a/test/flink_onprem_test.go b/test/flink_onprem_test.go index f6f1c60415..b1655b19d2 100644 --- a/test/flink_onprem_test.go +++ b/test/flink_onprem_test.go @@ -423,6 +423,18 @@ func (s *CLITestSuite) TestFlinkCatalogDatabaseCreateOnPrem() { runIntegrationTestsWithMultipleAuth(s, tests) } +func (s *CLITestSuite) TestFlinkCatalogDatabaseDeleteOnPrem() { + tests := []CLITest{ + // success scenarios + {args: "flink catalog database delete test-database-1 --catalog test-catalog", input: "y\n", fixture: "flink/catalog/database/delete-single-successful.golden"}, + {args: "flink catalog database delete test-database-1 --catalog test-catalog --force", fixture: "flink/catalog/database/delete-single-force.golden"}, + // failure scenarios + {args: "flink catalog database delete non-exist-database --catalog test-catalog", input: "y\n", fixture: "flink/catalog/database/delete-non-exist-failure.golden", exitCode: 1}, + } + + runIntegrationTestsWithMultipleAuth(s, tests) +} + func (s *CLITestSuite) TestFlinkCatalogDatabaseDescribeOnPrem() { tests := []CLITest{ // success diff --git a/test/test-server/flink_onprem_handler.go b/test/test-server/flink_onprem_handler.go index 27195bb74d..ebc7f3da92 100644 --- a/test/test-server/flink_onprem_handler.go +++ b/test/test-server/flink_onprem_handler.go @@ -16,6 +16,8 @@ import ( cmfsdk "github.com/confluentinc/cmf-sdk-go/v1" ) +const invalidDatabaseName = "invalid-database" + // Helper function to create a Flink application. func createApplication(name string) cmfsdk.FlinkApplication { status := map[string]interface{}{ @@ -270,7 +272,7 @@ func handleCmfCatalogDatabases(t *testing.T) http.HandlerFunc { dbName := database.GetMetadata().Name - if dbName == "invalid-database" { + if dbName == invalidDatabaseName { http.Error(w, "The Kafka database object from resource file is invalid", http.StatusUnprocessableEntity) return } @@ -295,7 +297,7 @@ func handleCmfCatalogDatabase(t *testing.T) http.HandlerFunc { switch r.Method { case http.MethodGet: - if dbName == "invalid-database" { + if dbName == invalidDatabaseName { http.Error(w, "The database name is invalid", http.StatusNotFound) return } @@ -305,11 +307,18 @@ func handleCmfCatalogDatabase(t *testing.T) http.HandlerFunc { require.NoError(t, err) return case http.MethodPut: - if dbName == "invalid-database" { + if dbName == invalidDatabaseName { http.Error(w, "The database name is invalid", http.StatusNotFound) return } + w.WriteHeader(http.StatusOK) + return + case http.MethodDelete: + if dbName == "non-exist-database" { + http.Error(w, "", http.StatusNotFound) + return + } w.WriteHeader(http.StatusOK) return default: From c95ac55c7d54de55fb5f3013f58b5f8f92771b8a Mon Sep 17 00:00:00 2001 From: Paras Negi Date: Thu, 2 Apr 2026 11:24:26 +0530 Subject: [PATCH 12/14] Copilot suggested fixes --- internal/flink/command_catalog_database.go | 40 +++++++++++++++++++ .../flink/command_catalog_database_create.go | 38 +----------------- .../flink/command_catalog_database_delete.go | 7 +++- .../flink/command_catalog_database_update.go | 38 +----------------- 4 files changed, 48 insertions(+), 75 deletions(-) diff --git a/internal/flink/command_catalog_database.go b/internal/flink/command_catalog_database.go index 01327f68b8..aa54e69a3c 100644 --- a/internal/flink/command_catalog_database.go +++ b/internal/flink/command_catalog_database.go @@ -1,11 +1,18 @@ package flink import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "github.com/spf13/cobra" + "gopkg.in/yaml.v3" cmfsdk "github.com/confluentinc/cmf-sdk-go/v1" pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/errors" ) type databaseOut struct { @@ -30,6 +37,39 @@ func (c *command) newCatalogDatabaseCommand() *cobra.Command { return cmd } +func readDatabaseResourceFile(resourceFilePath string) (cmfsdk.KafkaDatabase, error) { + data, err := os.ReadFile(resourceFilePath) + if err != nil { + return cmfsdk.KafkaDatabase{}, fmt.Errorf("failed to read file: %v", err) + } + + var genericData map[string]interface{} + ext := filepath.Ext(resourceFilePath) + switch ext { + case ".json": + err = json.Unmarshal(data, &genericData) + case ".yaml", ".yml": + err = yaml.Unmarshal(data, &genericData) + default: + return cmfsdk.KafkaDatabase{}, errors.NewErrorWithSuggestions(fmt.Sprintf("unsupported file format: %s", ext), "Supported file formats are .json, .yaml, and .yml.") + } + if err != nil { + return cmfsdk.KafkaDatabase{}, fmt.Errorf("failed to parse input file: %w", err) + } + + jsonBytes, err := json.Marshal(genericData) + if err != nil { + return cmfsdk.KafkaDatabase{}, fmt.Errorf("failed to marshal intermediate data: %w", err) + } + + var sdkDatabase cmfsdk.KafkaDatabase + if err = json.Unmarshal(jsonBytes, &sdkDatabase); err != nil { + return cmfsdk.KafkaDatabase{}, fmt.Errorf("failed to bind data to KafkaDatabase model: %w", err) + } + + return sdkDatabase, nil +} + func convertSdkDatabaseToLocalDatabase(sdkDatabase cmfsdk.KafkaDatabase) LocalKafkaDatabase { return LocalKafkaDatabase{ ApiVersion: sdkDatabase.ApiVersion, diff --git a/internal/flink/command_catalog_database_create.go b/internal/flink/command_catalog_database_create.go index 4bd6dd3972..212606f2fa 100644 --- a/internal/flink/command_catalog_database_create.go +++ b/internal/flink/command_catalog_database_create.go @@ -1,18 +1,9 @@ package flink import ( - "encoding/json" - "fmt" - "os" - "path/filepath" - "github.com/spf13/cobra" - "gopkg.in/yaml.v3" - - cmfsdk "github.com/confluentinc/cmf-sdk-go/v1" pcmd "github.com/confluentinc/cli/v4/pkg/cmd" - "github.com/confluentinc/cli/v4/pkg/errors" "github.com/confluentinc/cli/v4/pkg/output" ) @@ -46,38 +37,11 @@ func (c *command) catalogDatabaseCreate(cmd *cobra.Command, args []string) error return err } - data, err := os.ReadFile(resourceFilePath) - if err != nil { - return fmt.Errorf("failed to read file: %v", err) - } - - var genericData map[string]interface{} - ext := filepath.Ext(resourceFilePath) - switch ext { - case ".json": - err = json.Unmarshal(data, &genericData) - case ".yaml", ".yml": - err = yaml.Unmarshal(data, &genericData) - default: - return errors.NewErrorWithSuggestions( - fmt.Sprintf("unsupported file format: %s", ext), - "Supported file formats are .json, .yaml, and .yml.", - ) - } + sdkDatabase, err := readDatabaseResourceFile(resourceFilePath) if err != nil { return err } - jsonBytes, err := json.Marshal(genericData) - if err != nil { - return fmt.Errorf("failed to marshal intermediate data: %w", err) - } - - var sdkDatabase cmfsdk.KafkaDatabase - if err = json.Unmarshal(jsonBytes, &sdkDatabase); err != nil { - return fmt.Errorf("failed to bind data to KafkaDatabase model: %w", err) - } - sdkOutputDatabase, err := client.CreateDatabase(c.createContext(), catalogName, sdkDatabase) if err != nil { return err diff --git a/internal/flink/command_catalog_database_delete.go b/internal/flink/command_catalog_database_delete.go index 8c161f6040..a836b38e46 100644 --- a/internal/flink/command_catalog_database_delete.go +++ b/internal/flink/command_catalog_database_delete.go @@ -5,6 +5,7 @@ import ( pcmd "github.com/confluentinc/cli/v4/pkg/cmd" "github.com/confluentinc/cli/v4/pkg/deletion" + "github.com/confluentinc/cli/v4/pkg/errors" "github.com/confluentinc/cli/v4/pkg/resource" ) @@ -41,7 +42,11 @@ func (c *command) catalogDatabaseDelete(cmd *cobra.Command, args []string) error } if err := deletion.ValidateAndConfirm(cmd, args, existenceFunc, resource.FlinkDatabase); err != nil { - return err + // We are validating only the existence of the resources (there is no prefix validation). + // Thus, we can add some extra context for the error. + suggestions := "List available Flink databases with `confluent flink catalog database list`." + suggestions += "\nCheck that CMF is running and accessible." + return errors.NewErrorWithSuggestions(err.Error(), suggestions) } deleteFunc := func(name string) error { diff --git a/internal/flink/command_catalog_database_update.go b/internal/flink/command_catalog_database_update.go index 64a7202ac1..293420b881 100644 --- a/internal/flink/command_catalog_database_update.go +++ b/internal/flink/command_catalog_database_update.go @@ -1,18 +1,9 @@ package flink import ( - "encoding/json" - "fmt" - "os" - "path/filepath" - "github.com/spf13/cobra" - "gopkg.in/yaml.v3" - - cmfsdk "github.com/confluentinc/cmf-sdk-go/v1" pcmd "github.com/confluentinc/cli/v4/pkg/cmd" - "github.com/confluentinc/cli/v4/pkg/errors" "github.com/confluentinc/cli/v4/pkg/output" ) @@ -46,38 +37,11 @@ func (c *command) catalogDatabaseUpdate(cmd *cobra.Command, args []string) error return err } - data, err := os.ReadFile(resourceFilePath) - if err != nil { - return fmt.Errorf("failed to read file: %v", err) - } - - var genericData map[string]interface{} - ext := filepath.Ext(resourceFilePath) - switch ext { - case ".json": - err = json.Unmarshal(data, &genericData) - case ".yaml", ".yml": - err = yaml.Unmarshal(data, &genericData) - default: - return errors.NewErrorWithSuggestions( - fmt.Sprintf("unsupported file format: %s", ext), - "Supported file formats are .json, .yaml, and .yml.", - ) - } + sdkDatabase, err := readDatabaseResourceFile(resourceFilePath) if err != nil { return err } - jsonBytes, err := json.Marshal(genericData) - if err != nil { - return fmt.Errorf("failed to marshal intermediate data: %w", err) - } - - var sdkDatabase cmfsdk.KafkaDatabase - if err = json.Unmarshal(jsonBytes, &sdkDatabase); err != nil { - return fmt.Errorf("failed to bind data to KafkaDatabase model: %w", err) - } - databaseName := sdkDatabase.Metadata.Name if err := client.UpdateDatabase(c.createContext(), catalogName, databaseName, sdkDatabase); err != nil { From 021cf0170c72eac90ede7ce3f0950c06aa1913ae Mon Sep 17 00:00:00 2001 From: Paras Negi Date: Fri, 3 Apr 2026 08:53:52 +0530 Subject: [PATCH 13/14] Add claude suggested fixes --- internal/flink/command_catalog_database.go | 22 ++++++++++- .../flink/command_catalog_database_create.go | 18 +-------- .../command_catalog_database_describe.go | 18 +-------- .../flink/command_catalog_database_update.go | 18 +-------- .../database/create-invalid-failure.yaml | 7 ++++ .../catalog/database/create-successful.yaml | 8 ++++ .../database/update-invalid-failure.yaml | 8 ++++ .../catalog/database/update-successful.yaml | 8 ++++ test/flink_onprem_test.go | 38 +++++++++++++++++++ test/test-server/flink_onprem_handler.go | 5 +++ 10 files changed, 99 insertions(+), 51 deletions(-) create mode 100644 test/fixtures/input/flink/catalog/database/create-invalid-failure.yaml create mode 100644 test/fixtures/input/flink/catalog/database/create-successful.yaml create mode 100644 test/fixtures/input/flink/catalog/database/update-invalid-failure.yaml create mode 100644 test/fixtures/input/flink/catalog/database/update-successful.yaml diff --git a/internal/flink/command_catalog_database.go b/internal/flink/command_catalog_database.go index aa54e69a3c..11a77fb6a2 100644 --- a/internal/flink/command_catalog_database.go +++ b/internal/flink/command_catalog_database.go @@ -13,6 +13,7 @@ import ( pcmd "github.com/confluentinc/cli/v4/pkg/cmd" "github.com/confluentinc/cli/v4/pkg/errors" + "github.com/confluentinc/cli/v4/pkg/output" ) type databaseOut struct { @@ -37,10 +38,29 @@ func (c *command) newCatalogDatabaseCommand() *cobra.Command { return cmd } +func printDatabaseOutput(cmd *cobra.Command, sdkDatabase cmfsdk.KafkaDatabase, catalogName string) error { + if output.GetFormat(cmd) == output.Human { + table := output.NewTable(cmd) + var creationTime string + if sdkDatabase.GetMetadata().CreationTimestamp != nil { + creationTime = *sdkDatabase.GetMetadata().CreationTimestamp + } + table.Add(&databaseOut{ + CreationTime: creationTime, + Name: sdkDatabase.GetMetadata().Name, + Catalog: catalogName, + }) + return table.Print() + } + + localDatabase := convertSdkDatabaseToLocalDatabase(sdkDatabase) + return output.SerializedOutput(cmd, localDatabase) +} + func readDatabaseResourceFile(resourceFilePath string) (cmfsdk.KafkaDatabase, error) { data, err := os.ReadFile(resourceFilePath) if err != nil { - return cmfsdk.KafkaDatabase{}, fmt.Errorf("failed to read file: %v", err) + return cmfsdk.KafkaDatabase{}, fmt.Errorf("failed to read file: %w", err) } var genericData map[string]interface{} diff --git a/internal/flink/command_catalog_database_create.go b/internal/flink/command_catalog_database_create.go index 212606f2fa..dbae647b6b 100644 --- a/internal/flink/command_catalog_database_create.go +++ b/internal/flink/command_catalog_database_create.go @@ -4,7 +4,6 @@ import ( "github.com/spf13/cobra" pcmd "github.com/confluentinc/cli/v4/pkg/cmd" - "github.com/confluentinc/cli/v4/pkg/output" ) func (c *command) newCatalogDatabaseCreateCommand() *cobra.Command { @@ -47,20 +46,5 @@ func (c *command) catalogDatabaseCreate(cmd *cobra.Command, args []string) error return err } - if output.GetFormat(cmd) == output.Human { - table := output.NewTable(cmd) - var creationTime string - if sdkOutputDatabase.GetMetadata().CreationTimestamp != nil { - creationTime = *sdkOutputDatabase.GetMetadata().CreationTimestamp - } - table.Add(&databaseOut{ - CreationTime: creationTime, - Name: sdkOutputDatabase.GetMetadata().Name, - Catalog: catalogName, - }) - return table.Print() - } - - localDatabase := convertSdkDatabaseToLocalDatabase(sdkOutputDatabase) - return output.SerializedOutput(cmd, localDatabase) + return printDatabaseOutput(cmd, sdkOutputDatabase, catalogName) } diff --git a/internal/flink/command_catalog_database_describe.go b/internal/flink/command_catalog_database_describe.go index f6598d5b22..24b9952f8a 100644 --- a/internal/flink/command_catalog_database_describe.go +++ b/internal/flink/command_catalog_database_describe.go @@ -4,7 +4,6 @@ import ( "github.com/spf13/cobra" pcmd "github.com/confluentinc/cli/v4/pkg/cmd" - "github.com/confluentinc/cli/v4/pkg/output" ) func (c *command) newCatalogDatabaseDescribeCommand() *cobra.Command { @@ -41,20 +40,5 @@ func (c *command) catalogDatabaseDescribe(cmd *cobra.Command, args []string) err return err } - if output.GetFormat(cmd) == output.Human { - table := output.NewTable(cmd) - var creationTime string - if sdkOutputDatabase.GetMetadata().CreationTimestamp != nil { - creationTime = *sdkOutputDatabase.GetMetadata().CreationTimestamp - } - table.Add(&databaseOut{ - CreationTime: creationTime, - Name: sdkOutputDatabase.GetMetadata().Name, - Catalog: catalogName, - }) - return table.Print() - } - - localDatabase := convertSdkDatabaseToLocalDatabase(sdkOutputDatabase) - return output.SerializedOutput(cmd, localDatabase) + return printDatabaseOutput(cmd, sdkOutputDatabase, catalogName) } diff --git a/internal/flink/command_catalog_database_update.go b/internal/flink/command_catalog_database_update.go index 293420b881..12646a9b66 100644 --- a/internal/flink/command_catalog_database_update.go +++ b/internal/flink/command_catalog_database_update.go @@ -50,23 +50,9 @@ func (c *command) catalogDatabaseUpdate(cmd *cobra.Command, args []string) error sdkOutputDatabase, err := client.DescribeDatabase(c.createContext(), catalogName, databaseName) if err != nil { + output.ErrPrintf(c.Config.EnableColor, "Update request for database \"%s\" in catalog \"%s\" succeeded, but failed to retrieve updated details: %v\n", databaseName, catalogName, err) return err } - if output.GetFormat(cmd) == output.Human { - table := output.NewTable(cmd) - var creationTime string - if sdkOutputDatabase.GetMetadata().CreationTimestamp != nil { - creationTime = *sdkOutputDatabase.GetMetadata().CreationTimestamp - } - table.Add(&databaseOut{ - CreationTime: creationTime, - Name: sdkOutputDatabase.GetMetadata().Name, - Catalog: catalogName, - }) - return table.Print() - } - - localDatabase := convertSdkDatabaseToLocalDatabase(sdkOutputDatabase) - return output.SerializedOutput(cmd, localDatabase) + return printDatabaseOutput(cmd, sdkOutputDatabase, catalogName) } diff --git a/test/fixtures/input/flink/catalog/database/create-invalid-failure.yaml b/test/fixtures/input/flink/catalog/database/create-invalid-failure.yaml new file mode 100644 index 0000000000..ea5bb88064 --- /dev/null +++ b/test/fixtures/input/flink/catalog/database/create-invalid-failure.yaml @@ -0,0 +1,7 @@ +apiVersion: cmf/api/v1/database +kind: KafkaDatabase +metadata: + name: invalid-database +spec: + kafkaCluster: + connectionConfig: {} diff --git a/test/fixtures/input/flink/catalog/database/create-successful.yaml b/test/fixtures/input/flink/catalog/database/create-successful.yaml new file mode 100644 index 0000000000..6e4df0bbf0 --- /dev/null +++ b/test/fixtures/input/flink/catalog/database/create-successful.yaml @@ -0,0 +1,8 @@ +apiVersion: cmf/api/v1/database +kind: KafkaDatabase +metadata: + name: test-database +spec: + kafkaCluster: + connectionConfig: + bootstrap.servers: localhost:9092 diff --git a/test/fixtures/input/flink/catalog/database/update-invalid-failure.yaml b/test/fixtures/input/flink/catalog/database/update-invalid-failure.yaml new file mode 100644 index 0000000000..93e62665e9 --- /dev/null +++ b/test/fixtures/input/flink/catalog/database/update-invalid-failure.yaml @@ -0,0 +1,8 @@ +apiVersion: cmf/api/v1/database +kind: KafkaDatabase +metadata: + name: invalid-database +spec: + kafkaCluster: + connectionConfig: + bootstrap.servers: localhost:9092 diff --git a/test/fixtures/input/flink/catalog/database/update-successful.yaml b/test/fixtures/input/flink/catalog/database/update-successful.yaml new file mode 100644 index 0000000000..6e4df0bbf0 --- /dev/null +++ b/test/fixtures/input/flink/catalog/database/update-successful.yaml @@ -0,0 +1,8 @@ +apiVersion: cmf/api/v1/database +kind: KafkaDatabase +metadata: + name: test-database +spec: + kafkaCluster: + connectionConfig: + bootstrap.servers: localhost:9092 diff --git a/test/flink_onprem_test.go b/test/flink_onprem_test.go index b1655b19d2..632bcf7e82 100644 --- a/test/flink_onprem_test.go +++ b/test/flink_onprem_test.go @@ -472,6 +472,44 @@ func (s *CLITestSuite) TestFlinkCatalogDatabaseUpdateOnPrem() { runIntegrationTestsWithMultipleAuth(s, tests) } +func (s *CLITestSuite) TestFlinkCatalogDatabaseCreateWithYAML() { + tests := []CLITest{ + // success scenarios with JSON files + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.json --catalog test-catalog", fixture: "flink/catalog/database/create-success.golden"}, + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.json --catalog test-catalog --output json", fixture: "flink/catalog/database/create-success-json.golden"}, + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.json --catalog test-catalog --output yaml", fixture: "flink/catalog/database/create-success-yaml.golden"}, + // failure scenarios with JSON files + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-invalid-failure.json --catalog test-catalog", fixture: "flink/catalog/database/create-invalid-failure.golden", exitCode: 1}, + // YAML file tests + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.yaml --catalog test-catalog", fixture: "flink/catalog/database/create-success.golden"}, + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.yaml --catalog test-catalog --output json", fixture: "flink/catalog/database/create-success-json.golden"}, + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.yaml --catalog test-catalog --output yaml", fixture: "flink/catalog/database/create-success-yaml.golden"}, + // YAML file failure scenarios + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-invalid-failure.yaml --catalog test-catalog", fixture: "flink/catalog/database/create-invalid-failure.golden", exitCode: 1}, + } + + runIntegrationTestsWithMultipleAuth(s, tests) +} + +func (s *CLITestSuite) TestFlinkCatalogDatabaseUpdateWithYAML() { + tests := []CLITest{ + // success scenarios with JSON files + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-successful.json --catalog test-catalog", fixture: "flink/catalog/database/update-success.golden"}, + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-successful.json --catalog test-catalog --output json", fixture: "flink/catalog/database/update-success-json.golden"}, + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-successful.json --catalog test-catalog --output yaml", fixture: "flink/catalog/database/update-success-yaml.golden"}, + // failure scenarios with JSON files + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-invalid-failure.json --catalog test-catalog", fixture: "flink/catalog/database/update-invalid-failure.golden", exitCode: 1}, + // YAML file tests + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-successful.yaml --catalog test-catalog", fixture: "flink/catalog/database/update-success.golden"}, + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-successful.yaml --catalog test-catalog --output json", fixture: "flink/catalog/database/update-success-json.golden"}, + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-successful.yaml --catalog test-catalog --output yaml", fixture: "flink/catalog/database/update-success-yaml.golden"}, + // YAML file failure scenarios + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-invalid-failure.yaml --catalog test-catalog", fixture: "flink/catalog/database/update-invalid-failure.golden", exitCode: 1}, + } + + runIntegrationTestsWithMultipleAuth(s, tests) +} + func (s *CLITestSuite) TestFlinkStatementCreateOnPrem() { tests := []CLITest{ // success diff --git a/test/test-server/flink_onprem_handler.go b/test/test-server/flink_onprem_handler.go index ebc7f3da92..6a43e51125 100644 --- a/test/test-server/flink_onprem_handler.go +++ b/test/test-server/flink_onprem_handler.go @@ -312,6 +312,11 @@ func handleCmfCatalogDatabase(t *testing.T) http.HandlerFunc { return } + // Read and validate the request body. + req := new(cmfsdk.KafkaDatabase) + err := json.NewDecoder(r.Body).Decode(req) + require.NoError(t, err) + w.WriteHeader(http.StatusOK) return case http.MethodDelete: From 977c3c9610b7270cc5ea5845a078c74e5aef12ee Mon Sep 17 00:00:00 2001 From: Paras Negi Date: Fri, 3 Apr 2026 12:38:35 +0530 Subject: [PATCH 14/14] Fix the update method --- internal/flink/command_catalog_database_update.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/flink/command_catalog_database_update.go b/internal/flink/command_catalog_database_update.go index 12646a9b66..8411a8b5cd 100644 --- a/internal/flink/command_catalog_database_update.go +++ b/internal/flink/command_catalog_database_update.go @@ -1,10 +1,11 @@ package flink import ( + "fmt" + "github.com/spf13/cobra" pcmd "github.com/confluentinc/cli/v4/pkg/cmd" - "github.com/confluentinc/cli/v4/pkg/output" ) func (c *command) newCatalogDatabaseUpdateCommand() *cobra.Command { @@ -50,8 +51,7 @@ func (c *command) catalogDatabaseUpdate(cmd *cobra.Command, args []string) error sdkOutputDatabase, err := client.DescribeDatabase(c.createContext(), catalogName, databaseName) if err != nil { - output.ErrPrintf(c.Config.EnableColor, "Update request for database \"%s\" in catalog \"%s\" succeeded, but failed to retrieve updated details: %v\n", databaseName, catalogName, err) - return err + return fmt.Errorf("database %q was updated successfully, but failed to retrieve updated details: %w", databaseName, err) } return printDatabaseOutput(cmd, sdkOutputDatabase, catalogName)