From b44263cee279e8070ecc7af11e0d8e19ac49559e Mon Sep 17 00:00:00 2001 From: Paras Negi Date: Mon, 6 Apr 2026 10:48:12 +0530 Subject: [PATCH 1/3] Add support for list instances cli --- internal/flink/command_application.go | 1 + .../flink/command_application_instance.go | 57 +++++++++++++ .../command_application_instance_list.go | 85 +++++++++++++++++++ internal/flink/local_types.go | 26 ++++++ pkg/flink/cmf_rest_client.go | 19 +++++ .../flink/application/help-onprem.golden | 1 + .../instance-list-app-missing.golden | 18 ++++ .../application/instance-list-empty.golden | 1 + .../instance-list-env-missing.golden | 18 ++++ .../application/instance-list-human.golden | 4 + .../application/instance-list-json.golden | 32 +++++++ .../instance-list-non-existent-app.golden | 1 + .../instance-list-non-existent-env.golden | 1 + .../application/instance-list-yaml.golden | 20 +++++ .../application/instance/help-onprem.golden | 14 +++ .../instance/list-help-onprem.golden | 18 ++++ test/flink_onprem_test.go | 17 ++++ test/test-server/flink_onprem_handler.go | 73 ++++++++++++++++ test/test-server/flink_onprem_router.go | 1 + 19 files changed, 407 insertions(+) create mode 100644 internal/flink/command_application_instance.go create mode 100644 internal/flink/command_application_instance_list.go create mode 100644 test/fixtures/output/flink/application/instance-list-app-missing.golden create mode 100644 test/fixtures/output/flink/application/instance-list-empty.golden create mode 100644 test/fixtures/output/flink/application/instance-list-env-missing.golden create mode 100644 test/fixtures/output/flink/application/instance-list-human.golden create mode 100644 test/fixtures/output/flink/application/instance-list-json.golden create mode 100644 test/fixtures/output/flink/application/instance-list-non-existent-app.golden create mode 100644 test/fixtures/output/flink/application/instance-list-non-existent-env.golden create mode 100644 test/fixtures/output/flink/application/instance-list-yaml.golden create mode 100644 test/fixtures/output/flink/application/instance/help-onprem.golden create mode 100644 test/fixtures/output/flink/application/instance/list-help-onprem.golden diff --git a/internal/flink/command_application.go b/internal/flink/command_application.go index 3cc1e9cf58..9d51f4b2b0 100644 --- a/internal/flink/command_application.go +++ b/internal/flink/command_application.go @@ -33,6 +33,7 @@ func (c *command) newApplicationCommand() *cobra.Command { cmd.AddCommand(c.newApplicationCreateCommand()) cmd.AddCommand(c.newApplicationDeleteCommand()) cmd.AddCommand(c.newApplicationDescribeCommand()) + cmd.AddCommand(c.newApplicationInstanceCommand()) cmd.AddCommand(c.newApplicationListCommand()) cmd.AddCommand(c.newApplicationUpdateCommand()) cmd.AddCommand(c.newApplicationWebUiForwardCommand()) diff --git a/internal/flink/command_application_instance.go b/internal/flink/command_application_instance.go new file mode 100644 index 0000000000..6713243b79 --- /dev/null +++ b/internal/flink/command_application_instance.go @@ -0,0 +1,57 @@ +package flink + +import ( + "github.com/spf13/cobra" + + cmfsdk "github.com/confluentinc/cmf-sdk-go/v1" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" +) + +type flinkApplicationInstanceSummaryOut struct { + Name string `human:"Name" serialized:"name"` + CreationTime string `human:"Creation Time" serialized:"creation_time"` + JobId string `human:"Job ID" serialized:"job_id"` + JobState string `human:"Job State" serialized:"job_state"` +} + +func (c *command) newApplicationInstanceCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "instance", + Short: "Manage Flink application instances.", + Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireCloudLogout}, + } + + cmd.AddCommand(c.newApplicationInstanceListCommand()) + + return cmd +} + +func convertSdkApplicationInstanceToLocalApplicationInstance(sdkInstance cmfsdk.FlinkApplicationInstance) LocalFlinkApplicationInstance { + localInstance := LocalFlinkApplicationInstance{ + ApiVersion: sdkInstance.ApiVersion, + Kind: sdkInstance.Kind, + } + if sdkInstance.Metadata != nil { + localInstance.Metadata = &LocalApplicationInstanceMetadata{ + Name: sdkInstance.Metadata.Name, + Uid: sdkInstance.Metadata.Uid, + CreationTimestamp: sdkInstance.Metadata.CreationTimestamp, + UpdateTimestamp: sdkInstance.Metadata.UpdateTimestamp, + Labels: sdkInstance.Metadata.Labels, + Annotations: sdkInstance.Metadata.Annotations, + } + } + if sdkInstance.Status != nil { + localInstance.Status = &LocalApplicationInstanceStatus{ + Spec: sdkInstance.Status.Spec, + } + if sdkInstance.Status.JobStatus != nil { + localInstance.Status.JobStatus = &LocalApplicationInstanceJobStatus{ + JobId: sdkInstance.Status.JobStatus.JobId, + State: sdkInstance.Status.JobStatus.State, + } + } + } + return localInstance +} diff --git a/internal/flink/command_application_instance_list.go b/internal/flink/command_application_instance_list.go new file mode 100644 index 0000000000..c3be00d527 --- /dev/null +++ b/internal/flink/command_application_instance_list.go @@ -0,0 +1,85 @@ +package flink + +import ( + "github.com/spf13/cobra" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/output" +) + +func (c *command) newApplicationInstanceListCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "list", + Short: "List Flink application instances.", + Args: cobra.NoArgs, + RunE: c.applicationInstanceList, + } + + cmd.Flags().String("environment", "", "Name of the Flink environment.") + cmd.Flags().String("application", "", "Name of the Flink application.") + addCmfFlagSet(cmd) + pcmd.AddOutputFlag(cmd) + + cobra.CheckErr(cmd.MarkFlagRequired("environment")) + cobra.CheckErr(cmd.MarkFlagRequired("application")) + + return cmd +} + +func (c *command) applicationInstanceList(cmd *cobra.Command, _ []string) error { + environment, err := cmd.Flags().GetString("environment") + if err != nil { + return err + } + + application, err := cmd.Flags().GetString("application") + if err != nil { + return err + } + + client, err := c.GetCmfClient(cmd) + if err != nil { + return err + } + + instances, err := client.ListApplicationInstances(c.createContext(), environment, application) + if err != nil { + return err + } + + if output.GetFormat(cmd) == output.Human { + list := output.NewList(cmd) + for _, instance := range instances { + metadata := instance.GetMetadata() + var creationTime string + if metadata.CreationTimestamp != nil { + creationTime = *metadata.CreationTimestamp + } + + var jobId, jobState string + if instance.Status != nil && instance.Status.JobStatus != nil { + if instance.Status.JobStatus.JobId != nil { + jobId = *instance.Status.JobStatus.JobId + } + if instance.Status.JobStatus.State != nil { + jobState = *instance.Status.JobStatus.State + } + } + + list.Add(&flinkApplicationInstanceSummaryOut{ + Name: metadata.GetName(), + CreationTime: creationTime, + JobId: jobId, + JobState: jobState, + }) + } + return list.Print() + } + + localInstances := make([]LocalFlinkApplicationInstance, 0, len(instances)) + for _, sdkInstance := range instances { + localInstances = append(localInstances, convertSdkApplicationInstanceToLocalApplicationInstance(sdkInstance)) + } + + return output.SerializedOutput(cmd, localInstances) +} diff --git a/internal/flink/local_types.go b/internal/flink/local_types.go index 19e0b756d9..bf0abd3521 100644 --- a/internal/flink/local_types.go +++ b/internal/flink/local_types.go @@ -111,6 +111,32 @@ type LocalEnvironment struct { StatementDefaults *LocalAllStatementDefaults1 `json:"statementDefaults,omitempty" yaml:"statementDefaults,omitempty"` } +type LocalFlinkApplicationInstance struct { + ApiVersion string `json:"apiVersion" yaml:"apiVersion"` + Kind string `json:"kind" yaml:"kind"` + Metadata *LocalApplicationInstanceMetadata `json:"metadata,omitempty" yaml:"metadata,omitempty"` + Status *LocalApplicationInstanceStatus `json:"status,omitempty" yaml:"status,omitempty"` +} + +type LocalApplicationInstanceMetadata struct { + Name *string `json:"name,omitempty" yaml:"name,omitempty"` + Uid *string `json:"uid,omitempty" yaml:"uid,omitempty"` + CreationTimestamp *string `json:"creationTimestamp,omitempty" yaml:"creationTimestamp,omitempty"` + UpdateTimestamp *string `json:"updateTimestamp,omitempty" yaml:"updateTimestamp,omitempty"` + Labels *map[string]string `json:"labels,omitempty" yaml:"labels,omitempty"` + Annotations *map[string]string `json:"annotations,omitempty" yaml:"annotations,omitempty"` +} + +type LocalApplicationInstanceStatus struct { + Spec *map[string]interface{} `json:"spec,omitempty" yaml:"spec,omitempty"` + JobStatus *LocalApplicationInstanceJobStatus `json:"jobStatus,omitempty" yaml:"jobStatus,omitempty"` +} + +type LocalApplicationInstanceJobStatus struct { + JobId *string `json:"jobId,omitempty" yaml:"jobId,omitempty"` + State *string `json:"state,omitempty" yaml:"state,omitempty"` +} + type LocalFlinkApplication struct { ApiVersion string `json:"apiVersion" yaml:"apiVersion"` Kind string `json:"kind" yaml:"kind"` diff --git a/pkg/flink/cmf_rest_client.go b/pkg/flink/cmf_rest_client.go index 97d68971eb..f551c90d5c 100644 --- a/pkg/flink/cmf_rest_client.go +++ b/pkg/flink/cmf_rest_client.go @@ -570,6 +570,25 @@ func (cmfClient *CmfRestClient) DeleteCatalog(ctx context.Context, catalogName s return parseSdkError(httpResp, err) } +func (cmfClient *CmfRestClient) ListApplicationInstances(ctx context.Context, environment, application string) ([]cmfsdk.FlinkApplicationInstance, error) { + instances := make([]cmfsdk.FlinkApplicationInstance, 0) + var currentPageNumber int32 = 0 + // 100 is an arbitrary page size we've chosen. + const pageSize = 100 + done := false + + for !done { + instancesPage, httpResponse, err := cmfClient.FlinkApplicationsApi.GetApplicationInstances(ctx, environment, application).Page(currentPageNumber).Size(pageSize).Execute() + if parsedErr := parseSdkError(httpResponse, err); parsedErr != nil { + return nil, fmt.Errorf(`failed to list instances of application "%s" in the environment "%s": %s`, application, environment, parsedErr) + } + instances = append(instances, instancesPage.GetItems()...) + currentPageNumber, done = extractPageOptions(len(instancesPage.GetItems()), currentPageNumber) + } + + return instances, nil +} + // Returns the next page number and whether we need to fetch more pages or not. func extractPageOptions(receivedItemsLength int, currentPageNumber int32) (int32, bool) { if receivedItemsLength == 0 { diff --git a/test/fixtures/output/flink/application/help-onprem.golden b/test/fixtures/output/flink/application/help-onprem.golden index fd5615fe45..9a8bd1f073 100644 --- a/test/fixtures/output/flink/application/help-onprem.golden +++ b/test/fixtures/output/flink/application/help-onprem.golden @@ -10,6 +10,7 @@ Available Commands: create Create a Flink application. delete Delete one or more Flink applications. describe Describe a Flink application. + instance Manage Flink application instances. list List Flink applications. update Update a Flink application. web-ui-forward Forward the web UI of a Flink application. diff --git a/test/fixtures/output/flink/application/instance-list-app-missing.golden b/test/fixtures/output/flink/application/instance-list-app-missing.golden new file mode 100644 index 0000000000..589416e105 --- /dev/null +++ b/test/fixtures/output/flink/application/instance-list-app-missing.golden @@ -0,0 +1,18 @@ +Error: required flag(s) "application" not set +Usage: + confluent flink application instance list [flags] + +Flags: + --environment string REQUIRED: Name of the Flink environment. + --application string REQUIRED: Name of the Flink application. + --url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag. + --client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag. + --client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag. + --certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent Manager for Apache Flink connection. Environment variable "CONFLUENT_CMF_CERTIFICATE_AUTHORITY_PATH" may be set in place of this flag. + -o, --output string Specify the output format as "human", "json", or "yaml". (default "human") + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). + diff --git a/test/fixtures/output/flink/application/instance-list-empty.golden b/test/fixtures/output/flink/application/instance-list-empty.golden new file mode 100644 index 0000000000..167bb27ed9 --- /dev/null +++ b/test/fixtures/output/flink/application/instance-list-empty.golden @@ -0,0 +1 @@ +None found. diff --git a/test/fixtures/output/flink/application/instance-list-env-missing.golden b/test/fixtures/output/flink/application/instance-list-env-missing.golden new file mode 100644 index 0000000000..876706e597 --- /dev/null +++ b/test/fixtures/output/flink/application/instance-list-env-missing.golden @@ -0,0 +1,18 @@ +Error: required flag(s) "environment" not set +Usage: + confluent flink application instance list [flags] + +Flags: + --environment string REQUIRED: Name of the Flink environment. + --application string REQUIRED: Name of the Flink application. + --url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag. + --client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag. + --client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag. + --certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent Manager for Apache Flink connection. Environment variable "CONFLUENT_CMF_CERTIFICATE_AUTHORITY_PATH" may be set in place of this flag. + -o, --output string Specify the output format as "human", "json", or "yaml". (default "human") + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). + diff --git a/test/fixtures/output/flink/application/instance-list-human.golden b/test/fixtures/output/flink/application/instance-list-human.golden new file mode 100644 index 0000000000..74609f7489 --- /dev/null +++ b/test/fixtures/output/flink/application/instance-list-human.golden @@ -0,0 +1,4 @@ + Name | Creation Time | Job ID | Job State +-----------+----------------------+------------+------------ + inst-001 | 2025-09-18T10:00:00Z | job-abc123 | RUNNING + inst-002 | 2025-09-17T08:30:00Z | job-def456 | FINISHED diff --git a/test/fixtures/output/flink/application/instance-list-json.golden b/test/fixtures/output/flink/application/instance-list-json.golden new file mode 100644 index 0000000000..ac5c7fc488 --- /dev/null +++ b/test/fixtures/output/flink/application/instance-list-json.golden @@ -0,0 +1,32 @@ +[ + { + "apiVersion": "cmf.confluent.io/v1", + "kind": "FlinkApplicationInstance", + "metadata": { + "name": "inst-001", + "uid": "inst-001", + "creationTimestamp": "2025-09-18T10:00:00Z" + }, + "status": { + "jobStatus": { + "jobId": "job-abc123", + "state": "RUNNING" + } + } + }, + { + "apiVersion": "cmf.confluent.io/v1", + "kind": "FlinkApplicationInstance", + "metadata": { + "name": "inst-002", + "uid": "inst-002", + "creationTimestamp": "2025-09-17T08:30:00Z" + }, + "status": { + "jobStatus": { + "jobId": "job-def456", + "state": "FINISHED" + } + } + } +] diff --git a/test/fixtures/output/flink/application/instance-list-non-existent-app.golden b/test/fixtures/output/flink/application/instance-list-non-existent-app.golden new file mode 100644 index 0000000000..e1c7584729 --- /dev/null +++ b/test/fixtures/output/flink/application/instance-list-non-existent-app.golden @@ -0,0 +1 @@ +Error: failed to list instances of application "non-existent" in the environment "default": Application not found diff --git a/test/fixtures/output/flink/application/instance-list-non-existent-env.golden b/test/fixtures/output/flink/application/instance-list-non-existent-env.golden new file mode 100644 index 0000000000..f7f2cb578a --- /dev/null +++ b/test/fixtures/output/flink/application/instance-list-non-existent-env.golden @@ -0,0 +1 @@ +Error: failed to list instances of application "default-application-1" in the environment "non-existent": Environment not found diff --git a/test/fixtures/output/flink/application/instance-list-yaml.golden b/test/fixtures/output/flink/application/instance-list-yaml.golden new file mode 100644 index 0000000000..2455e1839b --- /dev/null +++ b/test/fixtures/output/flink/application/instance-list-yaml.golden @@ -0,0 +1,20 @@ +- apiVersion: cmf.confluent.io/v1 + kind: FlinkApplicationInstance + metadata: + name: inst-001 + uid: inst-001 + creationTimestamp: "2025-09-18T10:00:00Z" + status: + jobStatus: + jobId: job-abc123 + state: RUNNING +- apiVersion: cmf.confluent.io/v1 + kind: FlinkApplicationInstance + metadata: + name: inst-002 + uid: inst-002 + creationTimestamp: "2025-09-17T08:30:00Z" + status: + jobStatus: + jobId: job-def456 + state: FINISHED diff --git a/test/fixtures/output/flink/application/instance/help-onprem.golden b/test/fixtures/output/flink/application/instance/help-onprem.golden new file mode 100644 index 0000000000..0510038802 --- /dev/null +++ b/test/fixtures/output/flink/application/instance/help-onprem.golden @@ -0,0 +1,14 @@ +Manage Flink application instances. + +Usage: + confluent flink application instance [command] + +Available Commands: + list List Flink application instances. + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). + +Use "confluent flink application instance [command] --help" for more information about a command. diff --git a/test/fixtures/output/flink/application/instance/list-help-onprem.golden b/test/fixtures/output/flink/application/instance/list-help-onprem.golden new file mode 100644 index 0000000000..e3476bb621 --- /dev/null +++ b/test/fixtures/output/flink/application/instance/list-help-onprem.golden @@ -0,0 +1,18 @@ +List Flink application instances. + +Usage: + confluent flink application instance list [flags] + +Flags: + --environment string REQUIRED: Name of the Flink environment. + --application string REQUIRED: Name of the Flink application. + --url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag. + --client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag. + --client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag. + --certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent Manager for Apache Flink connection. Environment variable "CONFLUENT_CMF_CERTIFICATE_AUTHORITY_PATH" may be set in place of this flag. + -o, --output string Specify the output format as "human", "json", or "yaml". (default "human") + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). diff --git a/test/flink_onprem_test.go b/test/flink_onprem_test.go index 5536f0a14c..4c3d3c5a8c 100644 --- a/test/flink_onprem_test.go +++ b/test/flink_onprem_test.go @@ -39,6 +39,23 @@ func (s *CLITestSuite) TestFlinkApplicationList() { runIntegrationTestsWithMultipleAuth(s, tests) } +func (s *CLITestSuite) TestFlinkApplicationInstanceList() { + tests := []CLITest{ + // failure scenarios + {args: "flink application instance list --application default-application-1", fixture: "flink/application/instance-list-env-missing.golden", exitCode: 1}, + {args: "flink application instance list --environment default", fixture: "flink/application/instance-list-app-missing.golden", exitCode: 1}, + {args: "flink application instance list --environment non-existent --application default-application-1", fixture: "flink/application/instance-list-non-existent-env.golden", exitCode: 1}, + {args: "flink application instance list --environment default --application non-existent", fixture: "flink/application/instance-list-non-existent-app.golden", exitCode: 1}, + // success scenarios + {args: "flink application instance list --environment default --application default-application-2", fixture: "flink/application/instance-list-empty.golden"}, + {args: "flink application instance list --environment default --application default-application-1 --output human", fixture: "flink/application/instance-list-human.golden"}, + {args: "flink application instance list --environment default --application default-application-1 --output json", fixture: "flink/application/instance-list-json.golden"}, + {args: "flink application instance list --environment default --application default-application-1 --output yaml", fixture: "flink/application/instance-list-yaml.golden"}, + } + + runIntegrationTestsWithMultipleAuth(s, tests) +} + func (s *CLITestSuite) TestFlinkApplicationDelete() { tests := []CLITest{ // failure scenarios diff --git a/test/test-server/flink_onprem_handler.go b/test/test-server/flink_onprem_handler.go index 3089876db8..117f86e82e 100644 --- a/test/test-server/flink_onprem_handler.go +++ b/test/test-server/flink_onprem_handler.go @@ -594,6 +594,79 @@ func handleCmfApplication(t *testing.T) http.HandlerFunc { } } +// Helper function to create a Flink application instance. +func createApplicationInstance(name, jobId, state, creationTimestamp string) cmfsdk.FlinkApplicationInstance { + instance := cmfsdk.FlinkApplicationInstance{ + ApiVersion: "cmf.confluent.io/v1", + Kind: "FlinkApplicationInstance", + } + + metadata := cmfsdk.ApplicationInstanceMetadata{ + Name: &name, + Uid: &name, + CreationTimestamp: &creationTimestamp, + } + instance.Metadata = &metadata + + jobStatus := cmfsdk.ApplicationInstanceStatusJobStatus{ + JobId: &jobId, + State: &state, + } + status := cmfsdk.ApplicationInstanceStatus{ + JobStatus: &jobStatus, + } + instance.Status = &status + + return instance +} + +// Handler for "cmf/api/v1/environments/{environment}/applications/{application}/instances" +// Used to list application instances. +func handleCmfApplicationInstances(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + handleLoginType(t, r) + + vars := mux.Vars(r) + environment := vars["environment"] + application := vars["application"] + + switch r.Method { + case http.MethodGet: + if environment != "default" && environment != "test" { + http.Error(w, "Environment not found", http.StatusNotFound) + return + } + + if application != "default-application-1" && application != "default-application-2" { + http.Error(w, "Application not found", http.StatusNotFound) + return + } + + instancesPage := map[string]interface{}{ + "items": []cmfsdk.FlinkApplicationInstance{}, + } + + page := r.URL.Query().Get("page") + + if application == "default-application-1" && page == "0" { + items := []cmfsdk.FlinkApplicationInstance{ + createApplicationInstance("inst-001", "job-abc123", "RUNNING", "2025-09-18T10:00:00Z"), + createApplicationInstance("inst-002", "job-def456", "FINISHED", "2025-09-17T08:30:00Z"), + } + instancesPage = map[string]interface{}{ + "items": items, + } + } + + err := json.NewEncoder(w).Encode(instancesPage) + require.NoError(t, err) + return + default: + require.Fail(t, fmt.Sprintf("Unexpected method %s", r.Method)) + } + } +} + // Handler for "/cmf/api/v1/environments/{envName}/applications/{appName}/savepoints" // Used by list, create savepoints. func handleCmfSavepoints(t *testing.T) http.HandlerFunc { diff --git a/test/test-server/flink_onprem_router.go b/test/test-server/flink_onprem_router.go index 2267f01d5a..fd1353e846 100644 --- a/test/test-server/flink_onprem_router.go +++ b/test/test-server/flink_onprem_router.go @@ -11,6 +11,7 @@ var flinkRoutes = []route{ {"/cmf/api/v1/catalogs/kafka/{catName}", handleCmfCatalog}, {"/cmf/api/v1/environments/{environment}/applications", handleCmfApplications}, {"/cmf/api/v1/environments/{environment}/applications/{application}", handleCmfApplication}, + {"/cmf/api/v1/environments/{environment}/applications/{application}/instances", handleCmfApplicationInstances}, {"/cmf/api/v1/environments", handleCmfEnvironments}, {"/cmf/api/v1/environments/{environment}", handleCmfEnvironment}, {"/cmf/api/v1/environments/{environment}/compute-pools", handleCmfComputePools}, From d282316dda9eef6ce9697ca1a5b8879616783ec1 Mon Sep 17 00:00:00 2001 From: Paras Negi Date: Mon, 6 Apr 2026 11:07:13 +0530 Subject: [PATCH 2/3] Add support for describe instances cli --- .../flink/command_application_instance.go | 1 + .../command_application_instance_describe.go | 54 +++++++++++++++++++ pkg/flink/cmf_rest_client.go | 8 +++ .../instance-describe-app-missing.golden | 18 +++++++ .../instance-describe-env-missing.golden | 18 +++++++ .../instance-describe-name-missing.golden | 18 +++++++ .../instance-describe-non-existent-app.golden | 1 + .../instance-describe-non-existent-env.golden | 1 + ...ance-describe-non-existent-instance.golden | 1 + .../instance-describe-success-yaml.golden | 10 ++++ .../instance-describe-success.golden | 15 ++++++ .../instance-describe-with-human.golden | 15 ++++++ .../instance/describe-help-onprem.golden | 18 +++++++ .../application/instance/help-onprem.golden | 1 + test/flink_onprem_test.go | 18 +++++++ test/test-server/flink_onprem_handler.go | 38 +++++++++++++ test/test-server/flink_onprem_router.go | 1 + 17 files changed, 236 insertions(+) create mode 100644 internal/flink/command_application_instance_describe.go create mode 100644 test/fixtures/output/flink/application/instance-describe-app-missing.golden create mode 100644 test/fixtures/output/flink/application/instance-describe-env-missing.golden create mode 100644 test/fixtures/output/flink/application/instance-describe-name-missing.golden create mode 100644 test/fixtures/output/flink/application/instance-describe-non-existent-app.golden create mode 100644 test/fixtures/output/flink/application/instance-describe-non-existent-env.golden create mode 100644 test/fixtures/output/flink/application/instance-describe-non-existent-instance.golden create mode 100644 test/fixtures/output/flink/application/instance-describe-success-yaml.golden create mode 100644 test/fixtures/output/flink/application/instance-describe-success.golden create mode 100644 test/fixtures/output/flink/application/instance-describe-with-human.golden create mode 100644 test/fixtures/output/flink/application/instance/describe-help-onprem.golden diff --git a/internal/flink/command_application_instance.go b/internal/flink/command_application_instance.go index 6713243b79..ed54026190 100644 --- a/internal/flink/command_application_instance.go +++ b/internal/flink/command_application_instance.go @@ -22,6 +22,7 @@ func (c *command) newApplicationInstanceCommand() *cobra.Command { Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireCloudLogout}, } + cmd.AddCommand(c.newApplicationInstanceDescribeCommand()) cmd.AddCommand(c.newApplicationInstanceListCommand()) return cmd diff --git a/internal/flink/command_application_instance_describe.go b/internal/flink/command_application_instance_describe.go new file mode 100644 index 0000000000..2c06de157c --- /dev/null +++ b/internal/flink/command_application_instance_describe.go @@ -0,0 +1,54 @@ +package flink + +import ( + "github.com/spf13/cobra" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/output" +) + +func (c *command) newApplicationInstanceDescribeCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "describe ", + Short: "Describe a Flink application instance.", + Args: cobra.ExactArgs(1), + RunE: c.applicationInstanceDescribe, + } + + cmd.Flags().String("environment", "", "Name of the Flink environment.") + cmd.Flags().String("application", "", "Name of the Flink application.") + addCmfFlagSet(cmd) + pcmd.AddOutputFlagWithHumanRestricted(cmd) + + cobra.CheckErr(cmd.MarkFlagRequired("environment")) + cobra.CheckErr(cmd.MarkFlagRequired("application")) + + return cmd +} + +func (c *command) applicationInstanceDescribe(cmd *cobra.Command, args []string) error { + environment, err := cmd.Flags().GetString("environment") + if err != nil { + return err + } + + application, err := cmd.Flags().GetString("application") + if err != nil { + return err + } + + client, err := c.GetCmfClient(cmd) + if err != nil { + return err + } + + instanceName := args[0] + instance, err := client.DescribeApplicationInstance(c.createContext(), environment, application, instanceName) + if err != nil { + return err + } + + localInstance := convertSdkApplicationInstanceToLocalApplicationInstance(instance) + + return output.SerializedOutput(cmd, localInstance) +} diff --git a/pkg/flink/cmf_rest_client.go b/pkg/flink/cmf_rest_client.go index f551c90d5c..7221e4395d 100644 --- a/pkg/flink/cmf_rest_client.go +++ b/pkg/flink/cmf_rest_client.go @@ -570,6 +570,14 @@ func (cmfClient *CmfRestClient) DeleteCatalog(ctx context.Context, catalogName s return parseSdkError(httpResp, err) } +func (cmfClient *CmfRestClient) DescribeApplicationInstance(ctx context.Context, environment, application, instance string) (cmfsdk.FlinkApplicationInstance, error) { + cmfInstance, httpResponse, err := cmfClient.FlinkApplicationsApi.GetApplicationInstance(ctx, environment, application, instance).Execute() + if parsedErr := parseSdkError(httpResponse, err); parsedErr != nil { + return cmfsdk.FlinkApplicationInstance{}, fmt.Errorf(`failed to describe instance "%s" of application "%s" in the environment "%s": %s`, instance, application, environment, parsedErr) + } + return cmfInstance, nil +} + func (cmfClient *CmfRestClient) ListApplicationInstances(ctx context.Context, environment, application string) ([]cmfsdk.FlinkApplicationInstance, error) { instances := make([]cmfsdk.FlinkApplicationInstance, 0) var currentPageNumber int32 = 0 diff --git a/test/fixtures/output/flink/application/instance-describe-app-missing.golden b/test/fixtures/output/flink/application/instance-describe-app-missing.golden new file mode 100644 index 0000000000..6c13fb7f3a --- /dev/null +++ b/test/fixtures/output/flink/application/instance-describe-app-missing.golden @@ -0,0 +1,18 @@ +Error: required flag(s) "application" not set +Usage: + confluent flink application instance describe [flags] + +Flags: + --environment string REQUIRED: Name of the Flink environment. + --application string REQUIRED: Name of the Flink application. + --url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag. + --client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag. + --client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag. + --certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent Manager for Apache Flink connection. Environment variable "CONFLUENT_CMF_CERTIFICATE_AUTHORITY_PATH" may be set in place of this flag. + -o, --output string Specify the output format as "json" or "yaml". (default "json") + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). + diff --git a/test/fixtures/output/flink/application/instance-describe-env-missing.golden b/test/fixtures/output/flink/application/instance-describe-env-missing.golden new file mode 100644 index 0000000000..dc81a1f388 --- /dev/null +++ b/test/fixtures/output/flink/application/instance-describe-env-missing.golden @@ -0,0 +1,18 @@ +Error: required flag(s) "environment" not set +Usage: + confluent flink application instance describe [flags] + +Flags: + --environment string REQUIRED: Name of the Flink environment. + --application string REQUIRED: Name of the Flink application. + --url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag. + --client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag. + --client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag. + --certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent Manager for Apache Flink connection. Environment variable "CONFLUENT_CMF_CERTIFICATE_AUTHORITY_PATH" may be set in place of this flag. + -o, --output string Specify the output format as "json" or "yaml". (default "json") + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). + diff --git a/test/fixtures/output/flink/application/instance-describe-name-missing.golden b/test/fixtures/output/flink/application/instance-describe-name-missing.golden new file mode 100644 index 0000000000..020690199e --- /dev/null +++ b/test/fixtures/output/flink/application/instance-describe-name-missing.golden @@ -0,0 +1,18 @@ +Error: accepts 1 arg(s), received 0 +Usage: + confluent flink application instance describe [flags] + +Flags: + --environment string Name of the Flink environment. + --application string Name of the Flink application. + --url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag. + --client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag. + --client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag. + --certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent Manager for Apache Flink connection. Environment variable "CONFLUENT_CMF_CERTIFICATE_AUTHORITY_PATH" may be set in place of this flag. + -o, --output string Specify the output format as "json" or "yaml". (default "json") + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). + diff --git a/test/fixtures/output/flink/application/instance-describe-non-existent-app.golden b/test/fixtures/output/flink/application/instance-describe-non-existent-app.golden new file mode 100644 index 0000000000..4a5d9feec7 --- /dev/null +++ b/test/fixtures/output/flink/application/instance-describe-non-existent-app.golden @@ -0,0 +1 @@ +Error: failed to describe instance "inst-001" of application "non-existent" in the environment "default": Application not found diff --git a/test/fixtures/output/flink/application/instance-describe-non-existent-env.golden b/test/fixtures/output/flink/application/instance-describe-non-existent-env.golden new file mode 100644 index 0000000000..b4164c3303 --- /dev/null +++ b/test/fixtures/output/flink/application/instance-describe-non-existent-env.golden @@ -0,0 +1 @@ +Error: failed to describe instance "inst-001" of application "default-application-1" in the environment "non-existent": Environment not found diff --git a/test/fixtures/output/flink/application/instance-describe-non-existent-instance.golden b/test/fixtures/output/flink/application/instance-describe-non-existent-instance.golden new file mode 100644 index 0000000000..e2ba1b2d43 --- /dev/null +++ b/test/fixtures/output/flink/application/instance-describe-non-existent-instance.golden @@ -0,0 +1 @@ +Error: failed to describe instance "non-existent" of application "default-application-1" in the environment "default": Instance not found diff --git a/test/fixtures/output/flink/application/instance-describe-success-yaml.golden b/test/fixtures/output/flink/application/instance-describe-success-yaml.golden new file mode 100644 index 0000000000..5a90cf254b --- /dev/null +++ b/test/fixtures/output/flink/application/instance-describe-success-yaml.golden @@ -0,0 +1,10 @@ +apiVersion: cmf.confluent.io/v1 +kind: FlinkApplicationInstance +metadata: + name: inst-001 + uid: inst-001 + creationTimestamp: "2025-09-18T10:00:00Z" +status: + jobStatus: + jobId: job-abc123 + state: RUNNING diff --git a/test/fixtures/output/flink/application/instance-describe-success.golden b/test/fixtures/output/flink/application/instance-describe-success.golden new file mode 100644 index 0000000000..ec8c76e96b --- /dev/null +++ b/test/fixtures/output/flink/application/instance-describe-success.golden @@ -0,0 +1,15 @@ +{ + "apiVersion": "cmf.confluent.io/v1", + "kind": "FlinkApplicationInstance", + "metadata": { + "name": "inst-001", + "uid": "inst-001", + "creationTimestamp": "2025-09-18T10:00:00Z" + }, + "status": { + "jobStatus": { + "jobId": "job-abc123", + "state": "RUNNING" + } + } +} diff --git a/test/fixtures/output/flink/application/instance-describe-with-human.golden b/test/fixtures/output/flink/application/instance-describe-with-human.golden new file mode 100644 index 0000000000..ec8c76e96b --- /dev/null +++ b/test/fixtures/output/flink/application/instance-describe-with-human.golden @@ -0,0 +1,15 @@ +{ + "apiVersion": "cmf.confluent.io/v1", + "kind": "FlinkApplicationInstance", + "metadata": { + "name": "inst-001", + "uid": "inst-001", + "creationTimestamp": "2025-09-18T10:00:00Z" + }, + "status": { + "jobStatus": { + "jobId": "job-abc123", + "state": "RUNNING" + } + } +} diff --git a/test/fixtures/output/flink/application/instance/describe-help-onprem.golden b/test/fixtures/output/flink/application/instance/describe-help-onprem.golden new file mode 100644 index 0000000000..460bd1963a --- /dev/null +++ b/test/fixtures/output/flink/application/instance/describe-help-onprem.golden @@ -0,0 +1,18 @@ +Describe a Flink application instance. + +Usage: + confluent flink application instance describe [flags] + +Flags: + --environment string REQUIRED: Name of the Flink environment. + --application string REQUIRED: Name of the Flink application. + --url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag. + --client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag. + --client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag. + --certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent Manager for Apache Flink connection. Environment variable "CONFLUENT_CMF_CERTIFICATE_AUTHORITY_PATH" may be set in place of this flag. + -o, --output string Specify the output format as "json" or "yaml". (default "json") + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). diff --git a/test/fixtures/output/flink/application/instance/help-onprem.golden b/test/fixtures/output/flink/application/instance/help-onprem.golden index 0510038802..fbbaa00047 100644 --- a/test/fixtures/output/flink/application/instance/help-onprem.golden +++ b/test/fixtures/output/flink/application/instance/help-onprem.golden @@ -4,6 +4,7 @@ Usage: confluent flink application instance [command] Available Commands: + describe Describe a Flink application instance. list List Flink application instances. Global Flags: diff --git a/test/flink_onprem_test.go b/test/flink_onprem_test.go index 4c3d3c5a8c..ce473b8574 100644 --- a/test/flink_onprem_test.go +++ b/test/flink_onprem_test.go @@ -56,6 +56,24 @@ func (s *CLITestSuite) TestFlinkApplicationInstanceList() { runIntegrationTestsWithMultipleAuth(s, tests) } +func (s *CLITestSuite) TestFlinkApplicationInstanceDescribe() { + tests := []CLITest{ + // failure scenarios + {args: "flink application instance describe inst-001 --application default-application-1", fixture: "flink/application/instance-describe-env-missing.golden", exitCode: 1}, + {args: "flink application instance describe inst-001 --environment default", fixture: "flink/application/instance-describe-app-missing.golden", exitCode: 1}, + {args: "flink application instance describe --environment default --application default-application-1", fixture: "flink/application/instance-describe-name-missing.golden", exitCode: 1}, + {args: "flink application instance describe inst-001 --environment non-existent --application default-application-1", fixture: "flink/application/instance-describe-non-existent-env.golden", exitCode: 1}, + {args: "flink application instance describe inst-001 --environment default --application non-existent", fixture: "flink/application/instance-describe-non-existent-app.golden", exitCode: 1}, + {args: "flink application instance describe non-existent --environment default --application default-application-1", fixture: "flink/application/instance-describe-non-existent-instance.golden", exitCode: 1}, + // success scenarios + {args: "flink application instance describe inst-001 --environment default --application default-application-1", fixture: "flink/application/instance-describe-success.golden"}, + {args: "flink application instance describe inst-001 --environment default --application default-application-1 --output yaml", fixture: "flink/application/instance-describe-success-yaml.golden"}, + {args: "flink application instance describe inst-001 --environment default --application default-application-1 --output human", fixture: "flink/application/instance-describe-with-human.golden"}, + } + + runIntegrationTestsWithMultipleAuth(s, tests) +} + func (s *CLITestSuite) TestFlinkApplicationDelete() { tests := []CLITest{ // failure scenarios diff --git a/test/test-server/flink_onprem_handler.go b/test/test-server/flink_onprem_handler.go index 117f86e82e..fd3ae6bb20 100644 --- a/test/test-server/flink_onprem_handler.go +++ b/test/test-server/flink_onprem_handler.go @@ -667,6 +667,44 @@ func handleCmfApplicationInstances(t *testing.T) http.HandlerFunc { } } +// Handler for "cmf/api/v1/environments/{environment}/applications/{application}/instances/{instName}" +// Used to describe a specific application instance. +func handleCmfApplicationInstance(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + handleLoginType(t, r) + + vars := mux.Vars(r) + environment := vars["environment"] + application := vars["application"] + instName := vars["instName"] + + switch r.Method { + case http.MethodGet: + if environment != "default" && environment != "test" { + http.Error(w, "Environment not found", http.StatusNotFound) + return + } + + if application != "default-application-1" && application != "default-application-2" { + http.Error(w, "Application not found", http.StatusNotFound) + return + } + + if application == "default-application-1" && instName == "inst-001" { + instance := createApplicationInstance("inst-001", "job-abc123", "RUNNING", "2025-09-18T10:00:00Z") + err := json.NewEncoder(w).Encode(instance) + require.NoError(t, err) + return + } + + http.Error(w, "Instance not found", http.StatusNotFound) + return + default: + require.Fail(t, fmt.Sprintf("Unexpected method %s", r.Method)) + } + } +} + // Handler for "/cmf/api/v1/environments/{envName}/applications/{appName}/savepoints" // Used by list, create savepoints. func handleCmfSavepoints(t *testing.T) http.HandlerFunc { diff --git a/test/test-server/flink_onprem_router.go b/test/test-server/flink_onprem_router.go index fd1353e846..c4e2269103 100644 --- a/test/test-server/flink_onprem_router.go +++ b/test/test-server/flink_onprem_router.go @@ -12,6 +12,7 @@ var flinkRoutes = []route{ {"/cmf/api/v1/environments/{environment}/applications", handleCmfApplications}, {"/cmf/api/v1/environments/{environment}/applications/{application}", handleCmfApplication}, {"/cmf/api/v1/environments/{environment}/applications/{application}/instances", handleCmfApplicationInstances}, + {"/cmf/api/v1/environments/{environment}/applications/{application}/instances/{instName}", handleCmfApplicationInstance}, {"/cmf/api/v1/environments", handleCmfEnvironments}, {"/cmf/api/v1/environments/{environment}", handleCmfEnvironment}, {"/cmf/api/v1/environments/{environment}/compute-pools", handleCmfComputePools}, From 35f9681b413714c4da9b1bd991cb319aaddf0435 Mon Sep 17 00:00:00 2001 From: Paras Negi Date: Tue, 7 Apr 2026 09:28:39 +0530 Subject: [PATCH 3/3] Fix gci lint error --- internal/flink/command_application_instance.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/flink/command_application_instance.go b/internal/flink/command_application_instance.go index ed54026190..617050cf85 100644 --- a/internal/flink/command_application_instance.go +++ b/internal/flink/command_application_instance.go @@ -9,10 +9,10 @@ import ( ) type flinkApplicationInstanceSummaryOut struct { - Name string `human:"Name" serialized:"name"` - CreationTime string `human:"Creation Time" serialized:"creation_time"` - JobId string `human:"Job ID" serialized:"job_id"` - JobState string `human:"Job State" serialized:"job_state"` + Name string `human:"Name" serialized:"name"` + CreationTime string `human:"Creation Time" serialized:"creation_time"` + JobId string `human:"Job ID" serialized:"job_id"` + JobState string `human:"Job State" serialized:"job_state"` } func (c *command) newApplicationInstanceCommand() *cobra.Command {