Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/flink/command_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func (c *command) newCatalogCommand() *cobra.Command {
cmd.AddCommand(c.newCatalogDeleteCommand())
cmd.AddCommand(c.newCatalogDescribeCommand())
cmd.AddCommand(c.newCatalogListCommand())
cmd.AddCommand(c.newCatalogDatabaseCommand())

return cmd
}
Expand Down
113 changes: 113 additions & 0 deletions internal/flink/command_catalog_database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package flink

import (
"encoding/json"
"fmt"
"os"
"path/filepath"

"github.com/spf13/cobra"
"gopkg.in/yaml.v3"

cmfsdk "github.com/confluentinc/cmf-sdk-go/v1"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/errors"
"github.com/confluentinc/cli/v4/pkg/output"
)

type databaseOut struct {
CreationTime string `human:"Creation Time" serialized:"creation_time"`
Name string `human:"Name" serialized:"name"`
Catalog string `human:"Catalog" serialized:"catalog"`
}

func (c *command) newCatalogDatabaseCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "database",
Short: "Manage Flink databases in Confluent Platform.",
Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireCloudLogout},
}

cmd.AddCommand(c.newCatalogDatabaseCreateCommand())
cmd.AddCommand(c.newCatalogDatabaseDeleteCommand())
cmd.AddCommand(c.newCatalogDatabaseDescribeCommand())
cmd.AddCommand(c.newCatalogDatabaseListCommand())
cmd.AddCommand(c.newCatalogDatabaseUpdateCommand())

return cmd
}

func printDatabaseOutput(cmd *cobra.Command, sdkDatabase cmfsdk.KafkaDatabase, catalogName string) error {
if output.GetFormat(cmd) == output.Human {
table := output.NewTable(cmd)
var creationTime string
if sdkDatabase.GetMetadata().CreationTimestamp != nil {
creationTime = *sdkDatabase.GetMetadata().CreationTimestamp
}
table.Add(&databaseOut{
CreationTime: creationTime,
Name: sdkDatabase.GetMetadata().Name,
Catalog: catalogName,
})
return table.Print()
}

localDatabase := convertSdkDatabaseToLocalDatabase(sdkDatabase)
return output.SerializedOutput(cmd, localDatabase)
}

func readDatabaseResourceFile(resourceFilePath string) (cmfsdk.KafkaDatabase, error) {
data, err := os.ReadFile(resourceFilePath)
if err != nil {
return cmfsdk.KafkaDatabase{}, fmt.Errorf("failed to read file: %w", err)
}

var genericData map[string]interface{}
ext := filepath.Ext(resourceFilePath)
switch ext {
case ".json":
err = json.Unmarshal(data, &genericData)
case ".yaml", ".yml":
err = yaml.Unmarshal(data, &genericData)
default:
return cmfsdk.KafkaDatabase{}, errors.NewErrorWithSuggestions(fmt.Sprintf("unsupported file format: %s", ext), "Supported file formats are .json, .yaml, and .yml.")
}
if err != nil {
return cmfsdk.KafkaDatabase{}, fmt.Errorf("failed to parse input file: %w", err)
}

jsonBytes, err := json.Marshal(genericData)
if err != nil {
return cmfsdk.KafkaDatabase{}, fmt.Errorf("failed to marshal intermediate data: %w", err)
}

var sdkDatabase cmfsdk.KafkaDatabase
if err = json.Unmarshal(jsonBytes, &sdkDatabase); err != nil {
return cmfsdk.KafkaDatabase{}, fmt.Errorf("failed to bind data to KafkaDatabase model: %w", err)
}

return sdkDatabase, nil
}

func convertSdkDatabaseToLocalDatabase(sdkDatabase cmfsdk.KafkaDatabase) LocalKafkaDatabase {
return LocalKafkaDatabase{
ApiVersion: sdkDatabase.ApiVersion,
Kind: sdkDatabase.Kind,
Metadata: LocalDatabaseMetadata{
Name: sdkDatabase.Metadata.Name,
CreationTimestamp: sdkDatabase.Metadata.CreationTimestamp,
UpdateTimestamp: sdkDatabase.Metadata.UpdateTimestamp,
Uid: sdkDatabase.Metadata.Uid,
Labels: sdkDatabase.Metadata.Labels,
Annotations: sdkDatabase.Metadata.Annotations,
},
Spec: LocalKafkaDatabaseSpec{
KafkaCluster: LocalKafkaDatabaseSpecKafkaCluster{
ConnectionConfig: sdkDatabase.Spec.KafkaCluster.ConnectionConfig,
ConnectionSecretId: sdkDatabase.Spec.KafkaCluster.ConnectionSecretId,
},
AlterEnvironments: sdkDatabase.Spec.AlterEnvironments,
},
}
}
50 changes: 50 additions & 0 deletions internal/flink/command_catalog_database_create.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package flink

import (
"github.com/spf13/cobra"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
)

func (c *command) newCatalogDatabaseCreateCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "create <resourceFilePath>",
Short: "Create a Flink database.",
Long: "Create a Flink database in a catalog in Confluent Platform.",
Args: cobra.ExactArgs(1),
RunE: c.catalogDatabaseCreate,
}

cmd.Flags().String("catalog", "", "Name of the catalog.")
cobra.CheckErr(cmd.MarkFlagRequired("catalog"))
addCmfFlagSet(cmd)
pcmd.AddOutputFlag(cmd)

return cmd
}

func (c *command) catalogDatabaseCreate(cmd *cobra.Command, args []string) error {
resourceFilePath := args[0]

catalogName, err := cmd.Flags().GetString("catalog")
if err != nil {
return err
}

client, err := c.GetCmfClient(cmd)
if err != nil {
return err
}

sdkDatabase, err := readDatabaseResourceFile(resourceFilePath)
if err != nil {
return err
}

sdkOutputDatabase, err := client.CreateDatabase(c.createContext(), catalogName, sdkDatabase)
if err != nil {
return err
}

return printDatabaseOutput(cmd, sdkOutputDatabase, catalogName)
}
58 changes: 58 additions & 0 deletions internal/flink/command_catalog_database_delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package flink

import (
"github.com/spf13/cobra"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/deletion"
"github.com/confluentinc/cli/v4/pkg/errors"
"github.com/confluentinc/cli/v4/pkg/resource"
)

func (c *command) newCatalogDatabaseDeleteCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "delete <name>",
Short: "Delete a Flink database in Confluent Platform.",
Args: cobra.ExactArgs(1),
RunE: c.catalogDatabaseDelete,
}

cmd.Flags().String("catalog", "", "Name of the catalog.")
cobra.CheckErr(cmd.MarkFlagRequired("catalog"))
addCmfFlagSet(cmd)
pcmd.AddForceFlag(cmd)

return cmd
}

func (c *command) catalogDatabaseDelete(cmd *cobra.Command, args []string) error {
catalogName, err := cmd.Flags().GetString("catalog")
if err != nil {
return err
}

client, err := c.GetCmfClient(cmd)
if err != nil {
return err
}

existenceFunc := func(name string) bool {
_, err := client.DescribeDatabase(c.createContext(), catalogName, name)
return err == nil
}

if err := deletion.ValidateAndConfirm(cmd, args, existenceFunc, resource.FlinkDatabase); err != nil {
// We are validating only the existence of the resources (there is no prefix validation).
// Thus, we can add some extra context for the error.
suggestions := "List available Flink databases with `confluent flink catalog database list`."
suggestions += "\nCheck that CMF is running and accessible."
return errors.NewErrorWithSuggestions(err.Error(), suggestions)
}

deleteFunc := func(name string) error {
return client.DeleteDatabase(c.createContext(), catalogName, name)
}

_, err = deletion.Delete(cmd, args, deleteFunc, resource.FlinkDatabase)
return err
}
44 changes: 44 additions & 0 deletions internal/flink/command_catalog_database_describe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package flink

import (
"github.com/spf13/cobra"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
)

func (c *command) newCatalogDatabaseDescribeCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "describe <name>",
Short: "Describe a Flink database in Confluent Platform.",
Args: cobra.ExactArgs(1),
RunE: c.catalogDatabaseDescribe,
}

cmd.Flags().String("catalog", "", "Name of the catalog.")
cobra.CheckErr(cmd.MarkFlagRequired("catalog"))
addCmfFlagSet(cmd)
pcmd.AddOutputFlag(cmd)

return cmd
}

func (c *command) catalogDatabaseDescribe(cmd *cobra.Command, args []string) error {
name := args[0]

catalogName, err := cmd.Flags().GetString("catalog")
if err != nil {
return err
}

client, err := c.GetCmfClient(cmd)
if err != nil {
return err
}

sdkOutputDatabase, err := client.DescribeDatabase(c.createContext(), catalogName, name)
if err != nil {
return err
}

return printDatabaseOutput(cmd, sdkOutputDatabase, catalogName)
}
64 changes: 64 additions & 0 deletions internal/flink/command_catalog_database_list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package flink

import (
"github.com/spf13/cobra"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/output"
)

func (c *command) newCatalogDatabaseListCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "list",
Short: "List Flink databases in a catalog in Confluent Platform.",
Args: cobra.NoArgs,
RunE: c.catalogDatabaseList,
}

cmd.Flags().String("catalog", "", "Name of the catalog.")
cobra.CheckErr(cmd.MarkFlagRequired("catalog"))
addCmfFlagSet(cmd)
pcmd.AddOutputFlag(cmd)

return cmd
}

func (c *command) catalogDatabaseList(cmd *cobra.Command, _ []string) error {
catalogName, err := cmd.Flags().GetString("catalog")
if err != nil {
return err
}

client, err := c.GetCmfClient(cmd)
if err != nil {
return err
}

sdkDatabases, err := client.ListDatabases(c.createContext(), catalogName)
if err != nil {
return err
}

if output.GetFormat(cmd) == output.Human {
list := output.NewList(cmd)
for _, db := range sdkDatabases {
var creationTime string
if db.GetMetadata().CreationTimestamp != nil {
creationTime = *db.GetMetadata().CreationTimestamp
}
list.Add(&databaseOut{
CreationTime: creationTime,
Name: db.GetMetadata().Name,
Catalog: catalogName,
})
}
return list.Print()
}

localDatabases := make([]LocalKafkaDatabase, 0, len(sdkDatabases))
for _, sdkDatabase := range sdkDatabases {
localDatabases = append(localDatabases, convertSdkDatabaseToLocalDatabase(sdkDatabase))
}

return output.SerializedOutput(cmd, localDatabases)
}
Loading