Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/flink/command_application.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func (c *command) newApplicationCommand() *cobra.Command {
cmd.AddCommand(c.newApplicationCreateCommand())
cmd.AddCommand(c.newApplicationDeleteCommand())
cmd.AddCommand(c.newApplicationDescribeCommand())
cmd.AddCommand(c.newApplicationInstanceCommand())
cmd.AddCommand(c.newApplicationListCommand())
cmd.AddCommand(c.newApplicationUpdateCommand())
cmd.AddCommand(c.newApplicationWebUiForwardCommand())
Expand Down
58 changes: 58 additions & 0 deletions internal/flink/command_application_instance.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package flink

import (
"github.com/spf13/cobra"

cmfsdk "github.com/confluentinc/cmf-sdk-go/v1"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
)

type flinkApplicationInstanceSummaryOut struct {
Name string `human:"Name" serialized:"name"`
CreationTime string `human:"Creation Time" serialized:"creation_time"`
JobId string `human:"Job ID" serialized:"job_id"`
JobState string `human:"Job State" serialized:"job_state"`
}

func (c *command) newApplicationInstanceCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "instance",
Short: "Manage Flink application instances.",
Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireCloudLogout},
}

cmd.AddCommand(c.newApplicationInstanceDescribeCommand())
cmd.AddCommand(c.newApplicationInstanceListCommand())

return cmd
}

func convertSdkApplicationInstanceToLocalApplicationInstance(sdkInstance cmfsdk.FlinkApplicationInstance) LocalFlinkApplicationInstance {
localInstance := LocalFlinkApplicationInstance{
ApiVersion: sdkInstance.ApiVersion,
Kind: sdkInstance.Kind,
}
if sdkInstance.Metadata != nil {
localInstance.Metadata = &LocalApplicationInstanceMetadata{
Name: sdkInstance.Metadata.Name,
Uid: sdkInstance.Metadata.Uid,
CreationTimestamp: sdkInstance.Metadata.CreationTimestamp,
UpdateTimestamp: sdkInstance.Metadata.UpdateTimestamp,
Labels: sdkInstance.Metadata.Labels,
Annotations: sdkInstance.Metadata.Annotations,
}
}
if sdkInstance.Status != nil {
localInstance.Status = &LocalApplicationInstanceStatus{
Spec: sdkInstance.Status.Spec,
}
if sdkInstance.Status.JobStatus != nil {
localInstance.Status.JobStatus = &LocalApplicationInstanceJobStatus{
JobId: sdkInstance.Status.JobStatus.JobId,
State: sdkInstance.Status.JobStatus.State,
}
}
}
return localInstance
}
54 changes: 54 additions & 0 deletions internal/flink/command_application_instance_describe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package flink

import (
"github.com/spf13/cobra"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/output"
)

func (c *command) newApplicationInstanceDescribeCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "describe <name>",
Short: "Describe a Flink application instance.",
Args: cobra.ExactArgs(1),
RunE: c.applicationInstanceDescribe,
}

cmd.Flags().String("environment", "", "Name of the Flink environment.")
cmd.Flags().String("application", "", "Name of the Flink application.")
addCmfFlagSet(cmd)
pcmd.AddOutputFlagWithHumanRestricted(cmd)

cobra.CheckErr(cmd.MarkFlagRequired("environment"))
cobra.CheckErr(cmd.MarkFlagRequired("application"))

return cmd
}

func (c *command) applicationInstanceDescribe(cmd *cobra.Command, args []string) error {
environment, err := cmd.Flags().GetString("environment")
if err != nil {
return err
}

application, err := cmd.Flags().GetString("application")
if err != nil {
return err
}

client, err := c.GetCmfClient(cmd)
if err != nil {
return err
}

instanceName := args[0]
instance, err := client.DescribeApplicationInstance(c.createContext(), environment, application, instanceName)
if err != nil {
return err
}

localInstance := convertSdkApplicationInstanceToLocalApplicationInstance(instance)

return output.SerializedOutput(cmd, localInstance)
}
85 changes: 85 additions & 0 deletions internal/flink/command_application_instance_list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package flink

import (
"github.com/spf13/cobra"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/output"
)

func (c *command) newApplicationInstanceListCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "list",
Short: "List Flink application instances.",
Args: cobra.NoArgs,
RunE: c.applicationInstanceList,
}

cmd.Flags().String("environment", "", "Name of the Flink environment.")
cmd.Flags().String("application", "", "Name of the Flink application.")
addCmfFlagSet(cmd)
pcmd.AddOutputFlag(cmd)

cobra.CheckErr(cmd.MarkFlagRequired("environment"))
cobra.CheckErr(cmd.MarkFlagRequired("application"))

return cmd
}

func (c *command) applicationInstanceList(cmd *cobra.Command, _ []string) error {

Check failure on line 29 in internal/flink/command_application_instance_list.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Refactor this method to reduce its Cognitive Complexity from 23 to the 15 allowed.

[S3776] Cognitive Complexity of functions should not be too high See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3307&issues=17ba9d82-bf93-4aa5-9ba3-0ae37d80f64d&open=17ba9d82-bf93-4aa5-9ba3-0ae37d80f64d
environment, err := cmd.Flags().GetString("environment")
if err != nil {
return err
}

application, err := cmd.Flags().GetString("application")
if err != nil {
return err
}

client, err := c.GetCmfClient(cmd)
if err != nil {
return err
}

instances, err := client.ListApplicationInstances(c.createContext(), environment, application)
if err != nil {
return err
}

if output.GetFormat(cmd) == output.Human {
list := output.NewList(cmd)
for _, instance := range instances {
metadata := instance.GetMetadata()
var creationTime string
if metadata.CreationTimestamp != nil {
creationTime = *metadata.CreationTimestamp
}

var jobId, jobState string
if instance.Status != nil && instance.Status.JobStatus != nil {
if instance.Status.JobStatus.JobId != nil {
jobId = *instance.Status.JobStatus.JobId
}
if instance.Status.JobStatus.State != nil {
jobState = *instance.Status.JobStatus.State
}
}

list.Add(&flinkApplicationInstanceSummaryOut{
Name: metadata.GetName(),
CreationTime: creationTime,
JobId: jobId,
JobState: jobState,
})
}
return list.Print()
}

localInstances := make([]LocalFlinkApplicationInstance, 0, len(instances))
for _, sdkInstance := range instances {
localInstances = append(localInstances, convertSdkApplicationInstanceToLocalApplicationInstance(sdkInstance))
}

return output.SerializedOutput(cmd, localInstances)
}
26 changes: 26 additions & 0 deletions internal/flink/local_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,32 @@ type LocalEnvironment struct {
StatementDefaults *LocalAllStatementDefaults1 `json:"statementDefaults,omitempty" yaml:"statementDefaults,omitempty"`
}

type LocalFlinkApplicationInstance struct {
ApiVersion string `json:"apiVersion" yaml:"apiVersion"`
Kind string `json:"kind" yaml:"kind"`
Metadata *LocalApplicationInstanceMetadata `json:"metadata,omitempty" yaml:"metadata,omitempty"`
Status *LocalApplicationInstanceStatus `json:"status,omitempty" yaml:"status,omitempty"`
}

type LocalApplicationInstanceMetadata struct {
Name *string `json:"name,omitempty" yaml:"name,omitempty"`
Uid *string `json:"uid,omitempty" yaml:"uid,omitempty"`
CreationTimestamp *string `json:"creationTimestamp,omitempty" yaml:"creationTimestamp,omitempty"`
UpdateTimestamp *string `json:"updateTimestamp,omitempty" yaml:"updateTimestamp,omitempty"`
Labels *map[string]string `json:"labels,omitempty" yaml:"labels,omitempty"`
Annotations *map[string]string `json:"annotations,omitempty" yaml:"annotations,omitempty"`
}

type LocalApplicationInstanceStatus struct {
Spec *map[string]interface{} `json:"spec,omitempty" yaml:"spec,omitempty"`
JobStatus *LocalApplicationInstanceJobStatus `json:"jobStatus,omitempty" yaml:"jobStatus,omitempty"`
}

type LocalApplicationInstanceJobStatus struct {
JobId *string `json:"jobId,omitempty" yaml:"jobId,omitempty"`
State *string `json:"state,omitempty" yaml:"state,omitempty"`
}

type LocalFlinkApplication struct {
ApiVersion string `json:"apiVersion" yaml:"apiVersion"`
Kind string `json:"kind" yaml:"kind"`
Expand Down
27 changes: 27 additions & 0 deletions pkg/flink/cmf_rest_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,33 @@ func (cmfClient *CmfRestClient) DeleteCatalog(ctx context.Context, catalogName s
return parseSdkError(httpResp, err)
}

func (cmfClient *CmfRestClient) DescribeApplicationInstance(ctx context.Context, environment, application, instance string) (cmfsdk.FlinkApplicationInstance, error) {
cmfInstance, httpResponse, err := cmfClient.FlinkApplicationsApi.GetApplicationInstance(ctx, environment, application, instance).Execute()
if parsedErr := parseSdkError(httpResponse, err); parsedErr != nil {
return cmfsdk.FlinkApplicationInstance{}, fmt.Errorf(`failed to describe instance "%s" of application "%s" in the environment "%s": %s`, instance, application, environment, parsedErr)
}
return cmfInstance, nil
}

func (cmfClient *CmfRestClient) ListApplicationInstances(ctx context.Context, environment, application string) ([]cmfsdk.FlinkApplicationInstance, error) {
instances := make([]cmfsdk.FlinkApplicationInstance, 0)
var currentPageNumber int32 = 0
// 100 is an arbitrary page size we've chosen.
const pageSize = 100
done := false

for !done {
instancesPage, httpResponse, err := cmfClient.FlinkApplicationsApi.GetApplicationInstances(ctx, environment, application).Page(currentPageNumber).Size(pageSize).Execute()
if parsedErr := parseSdkError(httpResponse, err); parsedErr != nil {
return nil, fmt.Errorf(`failed to list instances of application "%s" in the environment "%s": %s`, application, environment, parsedErr)
}
instances = append(instances, instancesPage.GetItems()...)
currentPageNumber, done = extractPageOptions(len(instancesPage.GetItems()), currentPageNumber)
}

return instances, nil
}

// Returns the next page number and whether we need to fetch more pages or not.
func extractPageOptions(receivedItemsLength int, currentPageNumber int32) (int32, bool) {
if receivedItemsLength == 0 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Available Commands:
create Create a Flink application.
delete Delete one or more Flink applications.
describe Describe a Flink application.
instance Manage Flink application instances.
list List Flink applications.
update Update a Flink application.
web-ui-forward Forward the web UI of a Flink application.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
Error: required flag(s) "application" not set
Usage:
confluent flink application instance describe <name> [flags]

Flags:
--environment string REQUIRED: Name of the Flink environment.
--application string REQUIRED: Name of the Flink application.
--url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag.
--client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag.
--client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag.
--certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent Manager for Apache Flink connection. Environment variable "CONFLUENT_CMF_CERTIFICATE_AUTHORITY_PATH" may be set in place of this flag.
-o, --output string Specify the output format as "json" or "yaml". (default "json")

Global Flags:
-h, --help Show help for this command.
--unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets.
-v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace).

Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
Error: required flag(s) "environment" not set
Usage:
confluent flink application instance describe <name> [flags]

Flags:
--environment string REQUIRED: Name of the Flink environment.
--application string REQUIRED: Name of the Flink application.
--url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag.
--client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag.
--client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag.
--certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent Manager for Apache Flink connection. Environment variable "CONFLUENT_CMF_CERTIFICATE_AUTHORITY_PATH" may be set in place of this flag.
-o, --output string Specify the output format as "json" or "yaml". (default "json")

Global Flags:
-h, --help Show help for this command.
--unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets.
-v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace).

Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
Error: accepts 1 arg(s), received 0
Usage:
confluent flink application instance describe <name> [flags]

Flags:
--environment string Name of the Flink environment.
--application string Name of the Flink application.
--url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag.
--client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag.
--client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag.
--certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent Manager for Apache Flink connection. Environment variable "CONFLUENT_CMF_CERTIFICATE_AUTHORITY_PATH" may be set in place of this flag.
-o, --output string Specify the output format as "json" or "yaml". (default "json")

Global Flags:
-h, --help Show help for this command.
--unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets.
-v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace).

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Error: failed to describe instance "inst-001" of application "non-existent" in the environment "default": Application not found
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Error: failed to describe instance "inst-001" of application "default-application-1" in the environment "non-existent": Environment not found
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Error: failed to describe instance "non-existent" of application "default-application-1" in the environment "default": Instance not found
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
apiVersion: cmf.confluent.io/v1
kind: FlinkApplicationInstance
metadata:
name: inst-001
uid: inst-001
creationTimestamp: "2025-09-18T10:00:00Z"
status:
jobStatus:
jobId: job-abc123
state: RUNNING
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"apiVersion": "cmf.confluent.io/v1",
"kind": "FlinkApplicationInstance",
"metadata": {
"name": "inst-001",
"uid": "inst-001",
"creationTimestamp": "2025-09-18T10:00:00Z"
},
"status": {
"jobStatus": {
"jobId": "job-abc123",
"state": "RUNNING"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"apiVersion": "cmf.confluent.io/v1",
"kind": "FlinkApplicationInstance",
"metadata": {
"name": "inst-001",
"uid": "inst-001",
"creationTimestamp": "2025-09-18T10:00:00Z"
},
"status": {
"jobStatus": {
"jobId": "job-abc123",
"state": "RUNNING"
}
}
}
Loading