diff --git a/internal/flink/command_catalog.go b/internal/flink/command_catalog.go index 77b540e2e5..8bfe75434f 100644 --- a/internal/flink/command_catalog.go +++ b/internal/flink/command_catalog.go @@ -25,6 +25,7 @@ func (c *command) newCatalogCommand() *cobra.Command { cmd.AddCommand(c.newCatalogDeleteCommand()) cmd.AddCommand(c.newCatalogDescribeCommand()) cmd.AddCommand(c.newCatalogListCommand()) + cmd.AddCommand(c.newCatalogDatabaseCommand()) return cmd } diff --git a/internal/flink/command_catalog_database.go b/internal/flink/command_catalog_database.go new file mode 100644 index 0000000000..11a77fb6a2 --- /dev/null +++ b/internal/flink/command_catalog_database.go @@ -0,0 +1,113 @@ +package flink + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + + "github.com/spf13/cobra" + "gopkg.in/yaml.v3" + + cmfsdk "github.com/confluentinc/cmf-sdk-go/v1" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/errors" + "github.com/confluentinc/cli/v4/pkg/output" +) + +type databaseOut struct { + CreationTime string `human:"Creation Time" serialized:"creation_time"` + Name string `human:"Name" serialized:"name"` + Catalog string `human:"Catalog" serialized:"catalog"` +} + +func (c *command) newCatalogDatabaseCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "database", + Short: "Manage Flink databases in Confluent Platform.", + Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireCloudLogout}, + } + + cmd.AddCommand(c.newCatalogDatabaseCreateCommand()) + cmd.AddCommand(c.newCatalogDatabaseDeleteCommand()) + cmd.AddCommand(c.newCatalogDatabaseDescribeCommand()) + cmd.AddCommand(c.newCatalogDatabaseListCommand()) + cmd.AddCommand(c.newCatalogDatabaseUpdateCommand()) + + return cmd +} + +func printDatabaseOutput(cmd *cobra.Command, sdkDatabase cmfsdk.KafkaDatabase, catalogName string) error { + if output.GetFormat(cmd) == output.Human { + table := output.NewTable(cmd) + var creationTime string + if sdkDatabase.GetMetadata().CreationTimestamp != nil { + creationTime = *sdkDatabase.GetMetadata().CreationTimestamp + } + table.Add(&databaseOut{ + CreationTime: creationTime, + Name: sdkDatabase.GetMetadata().Name, + Catalog: catalogName, + }) + return table.Print() + } + + localDatabase := convertSdkDatabaseToLocalDatabase(sdkDatabase) + return output.SerializedOutput(cmd, localDatabase) +} + +func readDatabaseResourceFile(resourceFilePath string) (cmfsdk.KafkaDatabase, error) { + data, err := os.ReadFile(resourceFilePath) + if err != nil { + return cmfsdk.KafkaDatabase{}, fmt.Errorf("failed to read file: %w", err) + } + + var genericData map[string]interface{} + ext := filepath.Ext(resourceFilePath) + switch ext { + case ".json": + err = json.Unmarshal(data, &genericData) + case ".yaml", ".yml": + err = yaml.Unmarshal(data, &genericData) + default: + return cmfsdk.KafkaDatabase{}, errors.NewErrorWithSuggestions(fmt.Sprintf("unsupported file format: %s", ext), "Supported file formats are .json, .yaml, and .yml.") + } + if err != nil { + return cmfsdk.KafkaDatabase{}, fmt.Errorf("failed to parse input file: %w", err) + } + + jsonBytes, err := json.Marshal(genericData) + if err != nil { + return cmfsdk.KafkaDatabase{}, fmt.Errorf("failed to marshal intermediate data: %w", err) + } + + var sdkDatabase cmfsdk.KafkaDatabase + if err = json.Unmarshal(jsonBytes, &sdkDatabase); err != nil { + return cmfsdk.KafkaDatabase{}, fmt.Errorf("failed to bind data to KafkaDatabase model: %w", err) + } + + return sdkDatabase, nil +} + +func convertSdkDatabaseToLocalDatabase(sdkDatabase cmfsdk.KafkaDatabase) LocalKafkaDatabase { + return LocalKafkaDatabase{ + ApiVersion: sdkDatabase.ApiVersion, + Kind: sdkDatabase.Kind, + Metadata: LocalDatabaseMetadata{ + Name: sdkDatabase.Metadata.Name, + CreationTimestamp: sdkDatabase.Metadata.CreationTimestamp, + UpdateTimestamp: sdkDatabase.Metadata.UpdateTimestamp, + Uid: sdkDatabase.Metadata.Uid, + Labels: sdkDatabase.Metadata.Labels, + Annotations: sdkDatabase.Metadata.Annotations, + }, + Spec: LocalKafkaDatabaseSpec{ + KafkaCluster: LocalKafkaDatabaseSpecKafkaCluster{ + ConnectionConfig: sdkDatabase.Spec.KafkaCluster.ConnectionConfig, + ConnectionSecretId: sdkDatabase.Spec.KafkaCluster.ConnectionSecretId, + }, + AlterEnvironments: sdkDatabase.Spec.AlterEnvironments, + }, + } +} diff --git a/internal/flink/command_catalog_database_create.go b/internal/flink/command_catalog_database_create.go new file mode 100644 index 0000000000..dbae647b6b --- /dev/null +++ b/internal/flink/command_catalog_database_create.go @@ -0,0 +1,50 @@ +package flink + +import ( + "github.com/spf13/cobra" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" +) + +func (c *command) newCatalogDatabaseCreateCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "create ", + Short: "Create a Flink database.", + Long: "Create a Flink database in a catalog in Confluent Platform.", + Args: cobra.ExactArgs(1), + RunE: c.catalogDatabaseCreate, + } + + cmd.Flags().String("catalog", "", "Name of the catalog.") + cobra.CheckErr(cmd.MarkFlagRequired("catalog")) + addCmfFlagSet(cmd) + pcmd.AddOutputFlag(cmd) + + return cmd +} + +func (c *command) catalogDatabaseCreate(cmd *cobra.Command, args []string) error { + resourceFilePath := args[0] + + catalogName, err := cmd.Flags().GetString("catalog") + if err != nil { + return err + } + + client, err := c.GetCmfClient(cmd) + if err != nil { + return err + } + + sdkDatabase, err := readDatabaseResourceFile(resourceFilePath) + if err != nil { + return err + } + + sdkOutputDatabase, err := client.CreateDatabase(c.createContext(), catalogName, sdkDatabase) + if err != nil { + return err + } + + return printDatabaseOutput(cmd, sdkOutputDatabase, catalogName) +} diff --git a/internal/flink/command_catalog_database_delete.go b/internal/flink/command_catalog_database_delete.go new file mode 100644 index 0000000000..a836b38e46 --- /dev/null +++ b/internal/flink/command_catalog_database_delete.go @@ -0,0 +1,58 @@ +package flink + +import ( + "github.com/spf13/cobra" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/deletion" + "github.com/confluentinc/cli/v4/pkg/errors" + "github.com/confluentinc/cli/v4/pkg/resource" +) + +func (c *command) newCatalogDatabaseDeleteCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "delete ", + Short: "Delete a Flink database in Confluent Platform.", + Args: cobra.ExactArgs(1), + RunE: c.catalogDatabaseDelete, + } + + cmd.Flags().String("catalog", "", "Name of the catalog.") + cobra.CheckErr(cmd.MarkFlagRequired("catalog")) + addCmfFlagSet(cmd) + pcmd.AddForceFlag(cmd) + + return cmd +} + +func (c *command) catalogDatabaseDelete(cmd *cobra.Command, args []string) error { + catalogName, err := cmd.Flags().GetString("catalog") + if err != nil { + return err + } + + client, err := c.GetCmfClient(cmd) + if err != nil { + return err + } + + existenceFunc := func(name string) bool { + _, err := client.DescribeDatabase(c.createContext(), catalogName, name) + return err == nil + } + + if err := deletion.ValidateAndConfirm(cmd, args, existenceFunc, resource.FlinkDatabase); err != nil { + // We are validating only the existence of the resources (there is no prefix validation). + // Thus, we can add some extra context for the error. + suggestions := "List available Flink databases with `confluent flink catalog database list`." + suggestions += "\nCheck that CMF is running and accessible." + return errors.NewErrorWithSuggestions(err.Error(), suggestions) + } + + deleteFunc := func(name string) error { + return client.DeleteDatabase(c.createContext(), catalogName, name) + } + + _, err = deletion.Delete(cmd, args, deleteFunc, resource.FlinkDatabase) + return err +} diff --git a/internal/flink/command_catalog_database_describe.go b/internal/flink/command_catalog_database_describe.go new file mode 100644 index 0000000000..24b9952f8a --- /dev/null +++ b/internal/flink/command_catalog_database_describe.go @@ -0,0 +1,44 @@ +package flink + +import ( + "github.com/spf13/cobra" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" +) + +func (c *command) newCatalogDatabaseDescribeCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "describe ", + Short: "Describe a Flink database in Confluent Platform.", + Args: cobra.ExactArgs(1), + RunE: c.catalogDatabaseDescribe, + } + + cmd.Flags().String("catalog", "", "Name of the catalog.") + cobra.CheckErr(cmd.MarkFlagRequired("catalog")) + addCmfFlagSet(cmd) + pcmd.AddOutputFlag(cmd) + + return cmd +} + +func (c *command) catalogDatabaseDescribe(cmd *cobra.Command, args []string) error { + name := args[0] + + catalogName, err := cmd.Flags().GetString("catalog") + if err != nil { + return err + } + + client, err := c.GetCmfClient(cmd) + if err != nil { + return err + } + + sdkOutputDatabase, err := client.DescribeDatabase(c.createContext(), catalogName, name) + if err != nil { + return err + } + + return printDatabaseOutput(cmd, sdkOutputDatabase, catalogName) +} diff --git a/internal/flink/command_catalog_database_list.go b/internal/flink/command_catalog_database_list.go new file mode 100644 index 0000000000..2e0ff82f6e --- /dev/null +++ b/internal/flink/command_catalog_database_list.go @@ -0,0 +1,64 @@ +package flink + +import ( + "github.com/spf13/cobra" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/output" +) + +func (c *command) newCatalogDatabaseListCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "list", + Short: "List Flink databases in a catalog in Confluent Platform.", + Args: cobra.NoArgs, + RunE: c.catalogDatabaseList, + } + + cmd.Flags().String("catalog", "", "Name of the catalog.") + cobra.CheckErr(cmd.MarkFlagRequired("catalog")) + addCmfFlagSet(cmd) + pcmd.AddOutputFlag(cmd) + + return cmd +} + +func (c *command) catalogDatabaseList(cmd *cobra.Command, _ []string) error { + catalogName, err := cmd.Flags().GetString("catalog") + if err != nil { + return err + } + + client, err := c.GetCmfClient(cmd) + if err != nil { + return err + } + + sdkDatabases, err := client.ListDatabases(c.createContext(), catalogName) + if err != nil { + return err + } + + if output.GetFormat(cmd) == output.Human { + list := output.NewList(cmd) + for _, db := range sdkDatabases { + var creationTime string + if db.GetMetadata().CreationTimestamp != nil { + creationTime = *db.GetMetadata().CreationTimestamp + } + list.Add(&databaseOut{ + CreationTime: creationTime, + Name: db.GetMetadata().Name, + Catalog: catalogName, + }) + } + return list.Print() + } + + localDatabases := make([]LocalKafkaDatabase, 0, len(sdkDatabases)) + for _, sdkDatabase := range sdkDatabases { + localDatabases = append(localDatabases, convertSdkDatabaseToLocalDatabase(sdkDatabase)) + } + + return output.SerializedOutput(cmd, localDatabases) +} diff --git a/internal/flink/command_catalog_database_update.go b/internal/flink/command_catalog_database_update.go new file mode 100644 index 0000000000..8411a8b5cd --- /dev/null +++ b/internal/flink/command_catalog_database_update.go @@ -0,0 +1,58 @@ +package flink + +import ( + "fmt" + + "github.com/spf13/cobra" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" +) + +func (c *command) newCatalogDatabaseUpdateCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "update ", + Short: "Update a Flink database.", + Long: "Update a Flink database in a catalog in Confluent Platform.", + Args: cobra.ExactArgs(1), + RunE: c.catalogDatabaseUpdate, + } + + cmd.Flags().String("catalog", "", "Name of the catalog.") + cobra.CheckErr(cmd.MarkFlagRequired("catalog")) + addCmfFlagSet(cmd) + pcmd.AddOutputFlag(cmd) + + return cmd +} + +func (c *command) catalogDatabaseUpdate(cmd *cobra.Command, args []string) error { + resourceFilePath := args[0] + + catalogName, err := cmd.Flags().GetString("catalog") + if err != nil { + return err + } + + client, err := c.GetCmfClient(cmd) + if err != nil { + return err + } + + sdkDatabase, err := readDatabaseResourceFile(resourceFilePath) + if err != nil { + return err + } + + databaseName := sdkDatabase.Metadata.Name + + if err := client.UpdateDatabase(c.createContext(), catalogName, databaseName, sdkDatabase); err != nil { + return err + } + + sdkOutputDatabase, err := client.DescribeDatabase(c.createContext(), catalogName, databaseName) + if err != nil { + return fmt.Errorf("database %q was updated successfully, but failed to retrieve updated details: %w", databaseName, err) + } + + return printDatabaseOutput(cmd, sdkOutputDatabase, catalogName) +} diff --git a/internal/flink/local_types.go b/internal/flink/local_types.go index 19e0b756d9..6342966129 100644 --- a/internal/flink/local_types.go +++ b/internal/flink/local_types.go @@ -142,6 +142,32 @@ type LocalKafkaCatalogSpecSrInstance struct { ConnectionSecretId *string `json:"connectionSecretId,omitempty" yaml:"connectionSecretId,omitempty"` } +type LocalKafkaDatabase struct { + ApiVersion string `json:"apiVersion" yaml:"apiVersion"` + Kind string `json:"kind" yaml:"kind"` + Metadata LocalDatabaseMetadata `json:"metadata" yaml:"metadata"` + Spec LocalKafkaDatabaseSpec `json:"spec" yaml:"spec"` +} + +type LocalDatabaseMetadata struct { + Name string `json:"name" yaml:"name"` + CreationTimestamp *string `json:"creationTimestamp,omitempty" yaml:"creationTimestamp,omitempty"` + UpdateTimestamp *string `json:"updateTimestamp,omitempty" yaml:"updateTimestamp,omitempty"` + Uid *string `json:"uid,omitempty" yaml:"uid,omitempty"` + Labels *map[string]string `json:"labels,omitempty" yaml:"labels,omitempty"` + Annotations *map[string]string `json:"annotations,omitempty" yaml:"annotations,omitempty"` +} + +type LocalKafkaDatabaseSpec struct { + KafkaCluster LocalKafkaDatabaseSpecKafkaCluster `json:"kafkaCluster" yaml:"kafkaCluster"` + AlterEnvironments *[]string `json:"alterEnvironments,omitempty" yaml:"alterEnvironments,omitempty"` +} + +type LocalKafkaDatabaseSpecKafkaCluster struct { + ConnectionConfig map[string]string `json:"connectionConfig" yaml:"connectionConfig"` + ConnectionSecretId *string `json:"connectionSecretId,omitempty" yaml:"connectionSecretId,omitempty"` +} + type LocalResultSchema struct { Columns []LocalResultSchemaColumn `json:"columns" yaml:"columns"` } diff --git a/pkg/flink/cmf_rest_client.go b/pkg/flink/cmf_rest_client.go index 97d68971eb..05b3ce809b 100644 --- a/pkg/flink/cmf_rest_client.go +++ b/pkg/flink/cmf_rest_client.go @@ -570,6 +570,57 @@ func (cmfClient *CmfRestClient) DeleteCatalog(ctx context.Context, catalogName s return parseSdkError(httpResp, err) } +func (cmfClient *CmfRestClient) DeleteDatabase(ctx context.Context, catalogName, databaseName string) error { + httpResp, err := cmfClient.SQLApi.DeleteKafkaDatabase(ctx, catalogName, databaseName).Execute() + if parsedErr := parseSdkError(httpResp, err); parsedErr != nil { + return fmt.Errorf(`failed to delete database "%s" in catalog "%s": %s`, databaseName, catalogName, parsedErr) + } + return nil +} + +func (cmfClient *CmfRestClient) CreateDatabase(ctx context.Context, catalogName string, kafkaDatabase cmfsdk.KafkaDatabase) (cmfsdk.KafkaDatabase, error) { + databaseName := kafkaDatabase.Metadata.Name + outputDatabase, httpResponse, err := cmfClient.SQLApi.CreateKafkaDatabase(ctx, catalogName).KafkaDatabase(kafkaDatabase).Execute() + if parsedErr := parseSdkError(httpResponse, err); parsedErr != nil { + return cmfsdk.KafkaDatabase{}, fmt.Errorf(`failed to create database "%s" in catalog "%s": %s`, databaseName, catalogName, parsedErr) + } + return outputDatabase, nil +} + +func (cmfClient *CmfRestClient) UpdateDatabase(ctx context.Context, catalogName, databaseName string, kafkaDatabase cmfsdk.KafkaDatabase) error { + httpResponse, err := cmfClient.SQLApi.UpdateKafkaDatabase(ctx, catalogName, databaseName).KafkaDatabase(kafkaDatabase).Execute() + if parsedErr := parseSdkError(httpResponse, err); parsedErr != nil { + return fmt.Errorf(`failed to update database "%s" in catalog "%s": %s`, databaseName, catalogName, parsedErr) + } + return nil +} + +func (cmfClient *CmfRestClient) DescribeDatabase(ctx context.Context, catalogName, databaseName string) (cmfsdk.KafkaDatabase, error) { + outputDatabase, httpResponse, err := cmfClient.SQLApi.GetKafkaDatabase(ctx, catalogName, databaseName).Execute() + if parsedErr := parseSdkError(httpResponse, err); parsedErr != nil { + return cmfsdk.KafkaDatabase{}, fmt.Errorf(`failed to get database "%s" in catalog "%s": %s`, databaseName, catalogName, parsedErr) + } + return outputDatabase, nil +} + +func (cmfClient *CmfRestClient) ListDatabases(ctx context.Context, catalogName string) ([]cmfsdk.KafkaDatabase, error) { + databases := make([]cmfsdk.KafkaDatabase, 0) + done := false + const pageSize = 100 + var currentPageNumber int32 = 0 + + for !done { + databasePage, httpResponse, err := cmfClient.SQLApi.GetKafkaDatabases(ctx, catalogName).Page(currentPageNumber).Size(pageSize).Execute() + if parsedErr := parseSdkError(httpResponse, err); parsedErr != nil { + return nil, fmt.Errorf(`failed to list databases in catalog "%s": %s`, catalogName, parsedErr) + } + databases = append(databases, databasePage.GetItems()...) + currentPageNumber, done = extractPageOptions(len(databasePage.GetItems()), currentPageNumber) + } + + return databases, nil +} + // Returns the next page number and whether we need to fetch more pages or not. func extractPageOptions(receivedItemsLength int, currentPageNumber int32) (int32, bool) { if receivedItemsLength == 0 { diff --git a/pkg/resource/resource.go b/pkg/resource/resource.go index a56592ddd9..bd50ce6853 100644 --- a/pkg/resource/resource.go +++ b/pkg/resource/resource.go @@ -45,6 +45,7 @@ const ( FlinkDetachedSavepoint = "Flink detached savepoint" FlinkApplication = "Flink application" FlinkCatalog = "Flink catalog" + FlinkDatabase = "Flink database" FlinkEnvironment = "Flink environment" FlinkRegion = "Flink region" FlinkEndpoint = "Flink endpoint" diff --git a/test/fixtures/input/flink/catalog/database/create-invalid-failure.json b/test/fixtures/input/flink/catalog/database/create-invalid-failure.json new file mode 100644 index 0000000000..e45d244060 --- /dev/null +++ b/test/fixtures/input/flink/catalog/database/create-invalid-failure.json @@ -0,0 +1,12 @@ +{ + "apiVersion": "cmf/api/v1/database", + "kind": "KafkaDatabase", + "metadata": { + "name": "invalid-database" + }, + "spec": { + "kafkaCluster": { + "connectionConfig": {} + } + } +} diff --git a/test/fixtures/input/flink/catalog/database/create-invalid-failure.yaml b/test/fixtures/input/flink/catalog/database/create-invalid-failure.yaml new file mode 100644 index 0000000000..ea5bb88064 --- /dev/null +++ b/test/fixtures/input/flink/catalog/database/create-invalid-failure.yaml @@ -0,0 +1,7 @@ +apiVersion: cmf/api/v1/database +kind: KafkaDatabase +metadata: + name: invalid-database +spec: + kafkaCluster: + connectionConfig: {} diff --git a/test/fixtures/input/flink/catalog/database/create-successful.json b/test/fixtures/input/flink/catalog/database/create-successful.json new file mode 100644 index 0000000000..4422ebedb8 --- /dev/null +++ b/test/fixtures/input/flink/catalog/database/create-successful.json @@ -0,0 +1,14 @@ +{ + "apiVersion": "cmf/api/v1/database", + "kind": "KafkaDatabase", + "metadata": { + "name": "test-database" + }, + "spec": { + "kafkaCluster": { + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } + } + } +} diff --git a/test/fixtures/input/flink/catalog/database/create-successful.yaml b/test/fixtures/input/flink/catalog/database/create-successful.yaml new file mode 100644 index 0000000000..6e4df0bbf0 --- /dev/null +++ b/test/fixtures/input/flink/catalog/database/create-successful.yaml @@ -0,0 +1,8 @@ +apiVersion: cmf/api/v1/database +kind: KafkaDatabase +metadata: + name: test-database +spec: + kafkaCluster: + connectionConfig: + bootstrap.servers: localhost:9092 diff --git a/test/fixtures/input/flink/catalog/database/update-invalid-failure.json b/test/fixtures/input/flink/catalog/database/update-invalid-failure.json new file mode 100644 index 0000000000..62292591c4 --- /dev/null +++ b/test/fixtures/input/flink/catalog/database/update-invalid-failure.json @@ -0,0 +1,14 @@ +{ + "apiVersion": "cmf/api/v1/database", + "kind": "KafkaDatabase", + "metadata": { + "name": "invalid-database" + }, + "spec": { + "kafkaCluster": { + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } + } + } +} diff --git a/test/fixtures/input/flink/catalog/database/update-invalid-failure.yaml b/test/fixtures/input/flink/catalog/database/update-invalid-failure.yaml new file mode 100644 index 0000000000..93e62665e9 --- /dev/null +++ b/test/fixtures/input/flink/catalog/database/update-invalid-failure.yaml @@ -0,0 +1,8 @@ +apiVersion: cmf/api/v1/database +kind: KafkaDatabase +metadata: + name: invalid-database +spec: + kafkaCluster: + connectionConfig: + bootstrap.servers: localhost:9092 diff --git a/test/fixtures/input/flink/catalog/database/update-successful.json b/test/fixtures/input/flink/catalog/database/update-successful.json new file mode 100644 index 0000000000..4422ebedb8 --- /dev/null +++ b/test/fixtures/input/flink/catalog/database/update-successful.json @@ -0,0 +1,14 @@ +{ + "apiVersion": "cmf/api/v1/database", + "kind": "KafkaDatabase", + "metadata": { + "name": "test-database" + }, + "spec": { + "kafkaCluster": { + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } + } + } +} diff --git a/test/fixtures/input/flink/catalog/database/update-successful.yaml b/test/fixtures/input/flink/catalog/database/update-successful.yaml new file mode 100644 index 0000000000..6e4df0bbf0 --- /dev/null +++ b/test/fixtures/input/flink/catalog/database/update-successful.yaml @@ -0,0 +1,8 @@ +apiVersion: cmf/api/v1/database +kind: KafkaDatabase +metadata: + name: test-database +spec: + kafkaCluster: + connectionConfig: + bootstrap.servers: localhost:9092 diff --git a/test/fixtures/output/flink/catalog/database/create-help-onprem.golden b/test/fixtures/output/flink/catalog/database/create-help-onprem.golden new file mode 100644 index 0000000000..4b0f03902e --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/create-help-onprem.golden @@ -0,0 +1,17 @@ +Create a Flink database in a catalog in Confluent Platform. + +Usage: + confluent flink catalog database create [flags] + +Flags: + --catalog string REQUIRED: Name of the catalog. + --url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag. + --client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag. + --client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag. + --certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent Manager for Apache Flink connection. Environment variable "CONFLUENT_CMF_CERTIFICATE_AUTHORITY_PATH" may be set in place of this flag. + -o, --output string Specify the output format as "human", "json", or "yaml". (default "human") + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). diff --git a/test/fixtures/output/flink/catalog/database/create-invalid-failure.golden b/test/fixtures/output/flink/catalog/database/create-invalid-failure.golden new file mode 100644 index 0000000000..d2b4965a8a --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/create-invalid-failure.golden @@ -0,0 +1 @@ +Error: failed to create database "invalid-database" in catalog "test-catalog": The Kafka database object from resource file is invalid diff --git a/test/fixtures/output/flink/catalog/database/create-success-json.golden b/test/fixtures/output/flink/catalog/database/create-success-json.golden new file mode 100644 index 0000000000..065e5eba7c --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/create-success-json.golden @@ -0,0 +1,15 @@ +{ + "apiVersion": "cmf/api/v1/database", + "kind": "KafkaDatabase", + "metadata": { + "name": "test-database", + "creationTimestamp": "2025-03-12 23:42:00 +0000 UTC" + }, + "spec": { + "kafkaCluster": { + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } + } + } +} diff --git a/test/fixtures/output/flink/catalog/database/create-success-yaml.golden b/test/fixtures/output/flink/catalog/database/create-success-yaml.golden new file mode 100644 index 0000000000..0d9dc39dc6 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/create-success-yaml.golden @@ -0,0 +1,9 @@ +apiVersion: cmf/api/v1/database +kind: KafkaDatabase +metadata: + name: test-database + creationTimestamp: 2025-03-12 23:42:00 +0000 UTC +spec: + kafkaCluster: + connectionConfig: + bootstrap.servers: localhost:9092 diff --git a/test/fixtures/output/flink/catalog/database/create-success.golden b/test/fixtures/output/flink/catalog/database/create-success.golden new file mode 100644 index 0000000000..b1b1a2a1ff --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/create-success.golden @@ -0,0 +1,5 @@ ++---------------+-------------------------------+ +| Creation Time | 2025-03-12 23:42:00 +0000 UTC | +| Name | test-database | +| Catalog | test-catalog | ++---------------+-------------------------------+ diff --git a/test/fixtures/output/flink/catalog/database/delete-help-onprem.golden b/test/fixtures/output/flink/catalog/database/delete-help-onprem.golden new file mode 100644 index 0000000000..91228009b3 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/delete-help-onprem.golden @@ -0,0 +1,17 @@ +Delete a Flink database in Confluent Platform. + +Usage: + confluent flink catalog database delete [flags] + +Flags: + --catalog string REQUIRED: Name of the catalog. + --url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag. + --client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag. + --client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag. + --certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent Manager for Apache Flink connection. Environment variable "CONFLUENT_CMF_CERTIFICATE_AUTHORITY_PATH" may be set in place of this flag. + --force Skip the deletion confirmation prompt. + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). diff --git a/test/fixtures/output/flink/catalog/database/delete-non-exist-failure.golden b/test/fixtures/output/flink/catalog/database/delete-non-exist-failure.golden new file mode 100644 index 0000000000..b6cedfb005 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/delete-non-exist-failure.golden @@ -0,0 +1 @@ +Are you sure you want to delete Flink database "non-exist-database"? (y/n): Error: failed to delete non-exist-database: failed to delete database "non-exist-database" in catalog "test-catalog": 404 Not Found diff --git a/test/fixtures/output/flink/catalog/database/delete-single-force.golden b/test/fixtures/output/flink/catalog/database/delete-single-force.golden new file mode 100644 index 0000000000..3517fbbf14 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/delete-single-force.golden @@ -0,0 +1 @@ +Deleted Flink database "test-database-1". diff --git a/test/fixtures/output/flink/catalog/database/delete-single-successful.golden b/test/fixtures/output/flink/catalog/database/delete-single-successful.golden new file mode 100644 index 0000000000..ecc2f170b7 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/delete-single-successful.golden @@ -0,0 +1 @@ +Are you sure you want to delete Flink database "test-database-1"? (y/n): Deleted Flink database "test-database-1". diff --git a/test/fixtures/output/flink/catalog/database/describe-help-onprem.golden b/test/fixtures/output/flink/catalog/database/describe-help-onprem.golden new file mode 100644 index 0000000000..23cffb3b6d --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/describe-help-onprem.golden @@ -0,0 +1,17 @@ +Describe a Flink database in Confluent Platform. + +Usage: + confluent flink catalog database describe [flags] + +Flags: + --catalog string REQUIRED: Name of the catalog. + --url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag. + --client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag. + --client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag. + --certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent Manager for Apache Flink connection. Environment variable "CONFLUENT_CMF_CERTIFICATE_AUTHORITY_PATH" may be set in place of this flag. + -o, --output string Specify the output format as "human", "json", or "yaml". (default "human") + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). diff --git a/test/fixtures/output/flink/catalog/database/describe-not-found.golden b/test/fixtures/output/flink/catalog/database/describe-not-found.golden new file mode 100644 index 0000000000..0d2858c098 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/describe-not-found.golden @@ -0,0 +1 @@ +Error: failed to get database "invalid-database" in catalog "test-catalog": The database name is invalid diff --git a/test/fixtures/output/flink/catalog/database/describe-success-json.golden b/test/fixtures/output/flink/catalog/database/describe-success-json.golden new file mode 100644 index 0000000000..fb8c835dc6 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/describe-success-json.golden @@ -0,0 +1,15 @@ +{ + "apiVersion": "cmf/api/v1/database", + "kind": "KafkaDatabase", + "metadata": { + "name": "test-database", + "creationTimestamp": "2025-08-05 12:00:00 +0000 UTC" + }, + "spec": { + "kafkaCluster": { + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } + } + } +} diff --git a/test/fixtures/output/flink/catalog/database/describe-success-yaml.golden b/test/fixtures/output/flink/catalog/database/describe-success-yaml.golden new file mode 100644 index 0000000000..a45817dbbb --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/describe-success-yaml.golden @@ -0,0 +1,9 @@ +apiVersion: cmf/api/v1/database +kind: KafkaDatabase +metadata: + name: test-database + creationTimestamp: 2025-08-05 12:00:00 +0000 UTC +spec: + kafkaCluster: + connectionConfig: + bootstrap.servers: localhost:9092 diff --git a/test/fixtures/output/flink/catalog/database/describe-success.golden b/test/fixtures/output/flink/catalog/database/describe-success.golden new file mode 100644 index 0000000000..96b1543c77 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/describe-success.golden @@ -0,0 +1,5 @@ ++---------------+-------------------------------+ +| Creation Time | 2025-08-05 12:00:00 +0000 UTC | +| Name | test-database | +| Catalog | test-catalog | ++---------------+-------------------------------+ diff --git a/test/fixtures/output/flink/catalog/database/help-onprem.golden b/test/fixtures/output/flink/catalog/database/help-onprem.golden new file mode 100644 index 0000000000..49da534a01 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/help-onprem.golden @@ -0,0 +1,18 @@ +Manage Flink databases in Confluent Platform. + +Usage: + confluent flink catalog database [command] + +Available Commands: + create Create a Flink database. + delete Delete a Flink database in Confluent Platform. + describe Describe a Flink database in Confluent Platform. + list List Flink databases in a catalog in Confluent Platform. + update Update a Flink database. + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). + +Use "confluent flink catalog database [command] --help" for more information about a command. diff --git a/test/fixtures/output/flink/catalog/database/list-help-onprem.golden b/test/fixtures/output/flink/catalog/database/list-help-onprem.golden new file mode 100644 index 0000000000..4dbf71ed16 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/list-help-onprem.golden @@ -0,0 +1,17 @@ +List Flink databases in a catalog in Confluent Platform. + +Usage: + confluent flink catalog database list [flags] + +Flags: + --catalog string REQUIRED: Name of the catalog. + --url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag. + --client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag. + --client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag. + --certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent Manager for Apache Flink connection. Environment variable "CONFLUENT_CMF_CERTIFICATE_AUTHORITY_PATH" may be set in place of this flag. + -o, --output string Specify the output format as "human", "json", or "yaml". (default "human") + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). diff --git a/test/fixtures/output/flink/catalog/database/list-success-json.golden b/test/fixtures/output/flink/catalog/database/list-success-json.golden new file mode 100644 index 0000000000..42e40f7379 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/list-success-json.golden @@ -0,0 +1,32 @@ +[ + { + "apiVersion": "cmf/api/v1/database", + "kind": "KafkaDatabase", + "metadata": { + "name": "test-database-1", + "creationTimestamp": "2025-08-05 12:00:00 +0000 UTC" + }, + "spec": { + "kafkaCluster": { + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } + } + } + }, + { + "apiVersion": "cmf/api/v1/database", + "kind": "KafkaDatabase", + "metadata": { + "name": "test-database-2", + "creationTimestamp": "2025-08-05 12:00:00 +0000 UTC" + }, + "spec": { + "kafkaCluster": { + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } + } + } + } +] diff --git a/test/fixtures/output/flink/catalog/database/list-success-yaml.golden b/test/fixtures/output/flink/catalog/database/list-success-yaml.golden new file mode 100644 index 0000000000..186a556114 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/list-success-yaml.golden @@ -0,0 +1,18 @@ +- apiVersion: cmf/api/v1/database + kind: KafkaDatabase + metadata: + name: test-database-1 + creationTimestamp: 2025-08-05 12:00:00 +0000 UTC + spec: + kafkaCluster: + connectionConfig: + bootstrap.servers: localhost:9092 +- apiVersion: cmf/api/v1/database + kind: KafkaDatabase + metadata: + name: test-database-2 + creationTimestamp: 2025-08-05 12:00:00 +0000 UTC + spec: + kafkaCluster: + connectionConfig: + bootstrap.servers: localhost:9092 diff --git a/test/fixtures/output/flink/catalog/database/list-success.golden b/test/fixtures/output/flink/catalog/database/list-success.golden new file mode 100644 index 0000000000..983bc1d5df --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/list-success.golden @@ -0,0 +1,4 @@ + Creation Time | Name | Catalog +--------------------------------+-----------------+--------------- + 2025-08-05 12:00:00 +0000 UTC | test-database-1 | test-catalog + 2025-08-05 12:00:00 +0000 UTC | test-database-2 | test-catalog diff --git a/test/fixtures/output/flink/catalog/database/update-help-onprem.golden b/test/fixtures/output/flink/catalog/database/update-help-onprem.golden new file mode 100644 index 0000000000..f5c10ad850 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/update-help-onprem.golden @@ -0,0 +1,17 @@ +Update a Flink database in a catalog in Confluent Platform. + +Usage: + confluent flink catalog database update [flags] + +Flags: + --catalog string REQUIRED: Name of the catalog. + --url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag. + --client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag. + --client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag. + --certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent Manager for Apache Flink connection. Environment variable "CONFLUENT_CMF_CERTIFICATE_AUTHORITY_PATH" may be set in place of this flag. + -o, --output string Specify the output format as "human", "json", or "yaml". (default "human") + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). diff --git a/test/fixtures/output/flink/catalog/database/update-invalid-failure.golden b/test/fixtures/output/flink/catalog/database/update-invalid-failure.golden new file mode 100644 index 0000000000..16726790eb --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/update-invalid-failure.golden @@ -0,0 +1 @@ +Error: failed to update database "invalid-database" in catalog "test-catalog": The database name is invalid diff --git a/test/fixtures/output/flink/catalog/database/update-success-json.golden b/test/fixtures/output/flink/catalog/database/update-success-json.golden new file mode 100644 index 0000000000..fb8c835dc6 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/update-success-json.golden @@ -0,0 +1,15 @@ +{ + "apiVersion": "cmf/api/v1/database", + "kind": "KafkaDatabase", + "metadata": { + "name": "test-database", + "creationTimestamp": "2025-08-05 12:00:00 +0000 UTC" + }, + "spec": { + "kafkaCluster": { + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } + } + } +} diff --git a/test/fixtures/output/flink/catalog/database/update-success-yaml.golden b/test/fixtures/output/flink/catalog/database/update-success-yaml.golden new file mode 100644 index 0000000000..a45817dbbb --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/update-success-yaml.golden @@ -0,0 +1,9 @@ +apiVersion: cmf/api/v1/database +kind: KafkaDatabase +metadata: + name: test-database + creationTimestamp: 2025-08-05 12:00:00 +0000 UTC +spec: + kafkaCluster: + connectionConfig: + bootstrap.servers: localhost:9092 diff --git a/test/fixtures/output/flink/catalog/database/update-success.golden b/test/fixtures/output/flink/catalog/database/update-success.golden new file mode 100644 index 0000000000..96b1543c77 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/update-success.golden @@ -0,0 +1,5 @@ ++---------------+-------------------------------+ +| Creation Time | 2025-08-05 12:00:00 +0000 UTC | +| Name | test-database | +| Catalog | test-catalog | ++---------------+-------------------------------+ diff --git a/test/fixtures/output/flink/catalog/help-onprem.golden b/test/fixtures/output/flink/catalog/help-onprem.golden index cbc8042d2d..1ce122027c 100644 --- a/test/fixtures/output/flink/catalog/help-onprem.golden +++ b/test/fixtures/output/flink/catalog/help-onprem.golden @@ -5,6 +5,7 @@ Usage: Available Commands: create Create a Flink catalog. + database Manage Flink databases in Confluent Platform. delete Delete one or more Flink catalogs in Confluent Platform. describe Describe a Flink catalog in Confluent Platform. list List Flink catalogs in Confluent Platform. diff --git a/test/flink_onprem_test.go b/test/flink_onprem_test.go index 5536f0a14c..632bcf7e82 100644 --- a/test/flink_onprem_test.go +++ b/test/flink_onprem_test.go @@ -410,6 +410,106 @@ func (s *CLITestSuite) TestFlinkCatalogListOnPrem() { runIntegrationTestsWithMultipleAuth(s, tests) } +func (s *CLITestSuite) TestFlinkCatalogDatabaseCreateOnPrem() { + tests := []CLITest{ + // success + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.json --catalog test-catalog", fixture: "flink/catalog/database/create-success.golden"}, + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.json --catalog test-catalog --output json", fixture: "flink/catalog/database/create-success-json.golden"}, + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.json --catalog test-catalog --output yaml", fixture: "flink/catalog/database/create-success-yaml.golden"}, + // failure + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-invalid-failure.json --catalog test-catalog", fixture: "flink/catalog/database/create-invalid-failure.golden", exitCode: 1}, + } + + runIntegrationTestsWithMultipleAuth(s, tests) +} + +func (s *CLITestSuite) TestFlinkCatalogDatabaseDeleteOnPrem() { + tests := []CLITest{ + // success scenarios + {args: "flink catalog database delete test-database-1 --catalog test-catalog", input: "y\n", fixture: "flink/catalog/database/delete-single-successful.golden"}, + {args: "flink catalog database delete test-database-1 --catalog test-catalog --force", fixture: "flink/catalog/database/delete-single-force.golden"}, + // failure scenarios + {args: "flink catalog database delete non-exist-database --catalog test-catalog", input: "y\n", fixture: "flink/catalog/database/delete-non-exist-failure.golden", exitCode: 1}, + } + + runIntegrationTestsWithMultipleAuth(s, tests) +} + +func (s *CLITestSuite) TestFlinkCatalogDatabaseDescribeOnPrem() { + tests := []CLITest{ + // success + {args: "flink catalog database describe test-database --catalog test-catalog", fixture: "flink/catalog/database/describe-success.golden"}, + {args: "flink catalog database describe test-database --catalog test-catalog --output json", fixture: "flink/catalog/database/describe-success-json.golden"}, + {args: "flink catalog database describe test-database --catalog test-catalog --output yaml", fixture: "flink/catalog/database/describe-success-yaml.golden"}, + // failure + {args: "flink catalog database describe invalid-database --catalog test-catalog", fixture: "flink/catalog/database/describe-not-found.golden", exitCode: 1}, + } + + runIntegrationTestsWithMultipleAuth(s, tests) +} + +func (s *CLITestSuite) TestFlinkCatalogDatabaseListOnPrem() { + tests := []CLITest{ + // success + {args: "flink catalog database list --catalog test-catalog", fixture: "flink/catalog/database/list-success.golden"}, + {args: "flink catalog database list --catalog test-catalog --output json", fixture: "flink/catalog/database/list-success-json.golden"}, + {args: "flink catalog database list --catalog test-catalog --output yaml", fixture: "flink/catalog/database/list-success-yaml.golden"}, + } + + runIntegrationTestsWithMultipleAuth(s, tests) +} + +func (s *CLITestSuite) TestFlinkCatalogDatabaseUpdateOnPrem() { + tests := []CLITest{ + // success + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-successful.json --catalog test-catalog", fixture: "flink/catalog/database/update-success.golden"}, + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-successful.json --catalog test-catalog --output json", fixture: "flink/catalog/database/update-success-json.golden"}, + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-successful.json --catalog test-catalog --output yaml", fixture: "flink/catalog/database/update-success-yaml.golden"}, + // failure + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-invalid-failure.json --catalog test-catalog", fixture: "flink/catalog/database/update-invalid-failure.golden", exitCode: 1}, + } + + runIntegrationTestsWithMultipleAuth(s, tests) +} + +func (s *CLITestSuite) TestFlinkCatalogDatabaseCreateWithYAML() { + tests := []CLITest{ + // success scenarios with JSON files + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.json --catalog test-catalog", fixture: "flink/catalog/database/create-success.golden"}, + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.json --catalog test-catalog --output json", fixture: "flink/catalog/database/create-success-json.golden"}, + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.json --catalog test-catalog --output yaml", fixture: "flink/catalog/database/create-success-yaml.golden"}, + // failure scenarios with JSON files + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-invalid-failure.json --catalog test-catalog", fixture: "flink/catalog/database/create-invalid-failure.golden", exitCode: 1}, + // YAML file tests + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.yaml --catalog test-catalog", fixture: "flink/catalog/database/create-success.golden"}, + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.yaml --catalog test-catalog --output json", fixture: "flink/catalog/database/create-success-json.golden"}, + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.yaml --catalog test-catalog --output yaml", fixture: "flink/catalog/database/create-success-yaml.golden"}, + // YAML file failure scenarios + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-invalid-failure.yaml --catalog test-catalog", fixture: "flink/catalog/database/create-invalid-failure.golden", exitCode: 1}, + } + + runIntegrationTestsWithMultipleAuth(s, tests) +} + +func (s *CLITestSuite) TestFlinkCatalogDatabaseUpdateWithYAML() { + tests := []CLITest{ + // success scenarios with JSON files + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-successful.json --catalog test-catalog", fixture: "flink/catalog/database/update-success.golden"}, + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-successful.json --catalog test-catalog --output json", fixture: "flink/catalog/database/update-success-json.golden"}, + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-successful.json --catalog test-catalog --output yaml", fixture: "flink/catalog/database/update-success-yaml.golden"}, + // failure scenarios with JSON files + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-invalid-failure.json --catalog test-catalog", fixture: "flink/catalog/database/update-invalid-failure.golden", exitCode: 1}, + // YAML file tests + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-successful.yaml --catalog test-catalog", fixture: "flink/catalog/database/update-success.golden"}, + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-successful.yaml --catalog test-catalog --output json", fixture: "flink/catalog/database/update-success-json.golden"}, + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-successful.yaml --catalog test-catalog --output yaml", fixture: "flink/catalog/database/update-success-yaml.golden"}, + // YAML file failure scenarios + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-invalid-failure.yaml --catalog test-catalog", fixture: "flink/catalog/database/update-invalid-failure.golden", exitCode: 1}, + } + + runIntegrationTestsWithMultipleAuth(s, tests) +} + func (s *CLITestSuite) TestFlinkStatementCreateOnPrem() { tests := []CLITest{ // success diff --git a/test/test-server/flink_onprem_handler.go b/test/test-server/flink_onprem_handler.go index 3089876db8..6a43e51125 100644 --- a/test/test-server/flink_onprem_handler.go +++ b/test/test-server/flink_onprem_handler.go @@ -16,6 +16,8 @@ import ( cmfsdk "github.com/confluentinc/cmf-sdk-go/v1" ) +const invalidDatabaseName = "invalid-database" + // Helper function to create a Flink application. func createApplication(name string) cmfsdk.FlinkApplication { status := map[string]interface{}{ @@ -224,6 +226,112 @@ func createKafkaCatalog(catName string) cmfsdk.KafkaCatalog { } } +func createKafkaDatabase(dbName string) cmfsdk.KafkaDatabase { + timeStamp := time.Date(2025, time.August, 5, 12, 0, 0, 0, time.UTC).String() + return cmfsdk.KafkaDatabase{ + ApiVersion: "cmf/api/v1/database", + Kind: "KafkaDatabase", + Metadata: cmfsdk.DatabaseMetadata{ + Name: dbName, + CreationTimestamp: &timeStamp, + }, + Spec: cmfsdk.KafkaDatabaseSpec{ + KafkaCluster: cmfsdk.KafkaDatabaseSpecKafkaCluster{ + ConnectionConfig: map[string]string{ + "bootstrap.servers": "localhost:9092", + }, + }, + }, + } +} + +func handleCmfCatalogDatabases(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + handleLoginType(t, r) + switch r.Method { + case http.MethodGet: + databases := []cmfsdk.KafkaDatabase{ + createKafkaDatabase("test-database-1"), + createKafkaDatabase("test-database-2"), + } + databasesPage := cmfsdk.KafkaDatabasesPage{} + page := r.URL.Query().Get("page") + + if page == "0" { + databasesPage.SetItems(databases) + } + + err := json.NewEncoder(w).Encode(databasesPage) + require.NoError(t, err) + case http.MethodPost: + reqBody, err := io.ReadAll(r.Body) + require.NoError(t, err) + var database cmfsdk.KafkaDatabase + err = json.Unmarshal(reqBody, &database) + require.NoError(t, err) + + dbName := database.GetMetadata().Name + + if dbName == invalidDatabaseName { + http.Error(w, "The Kafka database object from resource file is invalid", http.StatusUnprocessableEntity) + return + } + + timeStamp := time.Date(2025, time.March, 12, 23, 42, 0, 0, time.UTC).String() + database.Metadata.CreationTimestamp = &timeStamp + err = json.NewEncoder(w).Encode(database) + require.NoError(t, err) + return + default: + require.Fail(t, fmt.Sprintf("Unexpected method %s", r.Method)) + } + } +} + +func handleCmfCatalogDatabase(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + handleLoginType(t, r) + + vars := mux.Vars(r) + dbName := vars["dbName"] + + switch r.Method { + case http.MethodGet: + if dbName == invalidDatabaseName { + http.Error(w, "The database name is invalid", http.StatusNotFound) + return + } + + database := createKafkaDatabase(dbName) + err := json.NewEncoder(w).Encode(database) + require.NoError(t, err) + return + case http.MethodPut: + if dbName == invalidDatabaseName { + http.Error(w, "The database name is invalid", http.StatusNotFound) + return + } + + // Read and validate the request body. + req := new(cmfsdk.KafkaDatabase) + err := json.NewDecoder(r.Body).Decode(req) + require.NoError(t, err) + + w.WriteHeader(http.StatusOK) + return + case http.MethodDelete: + if dbName == "non-exist-database" { + http.Error(w, "", http.StatusNotFound) + return + } + w.WriteHeader(http.StatusOK) + return + default: + require.Fail(t, fmt.Sprintf("Unexpected method %s", r.Method)) + } + } +} + func createFlinkStatement(stmtName string, stopped bool, parallelism int32) cmfsdk.Statement { timeStamp := time.Date(2025, time.August, 5, 12, 00, 0, 0, time.UTC).String() status := cmfsdk.StatementStatus{ diff --git a/test/test-server/flink_onprem_router.go b/test/test-server/flink_onprem_router.go index 2267f01d5a..8d597732f6 100644 --- a/test/test-server/flink_onprem_router.go +++ b/test/test-server/flink_onprem_router.go @@ -9,6 +9,8 @@ import ( var flinkRoutes = []route{ {"/cmf/api/v1/catalogs/kafka", handleCmfCatalogs}, {"/cmf/api/v1/catalogs/kafka/{catName}", handleCmfCatalog}, + {"/cmf/api/v1/catalogs/kafka/{catName}/databases", handleCmfCatalogDatabases}, + {"/cmf/api/v1/catalogs/kafka/{catName}/databases/{dbName}", handleCmfCatalogDatabase}, {"/cmf/api/v1/environments/{environment}/applications", handleCmfApplications}, {"/cmf/api/v1/environments/{environment}/applications/{application}", handleCmfApplication}, {"/cmf/api/v1/environments", handleCmfEnvironments},