-
Notifications
You must be signed in to change notification settings - Fork 24
APIE-844 - Confluent CLI support for kstreams #3260
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
8f92b80
1603eeb
649039a
07fca9a
e9fac8e
42db63f
06c14b8
e9b2d66
0e942b6
0eaefca
4ea10ce
6b366b8
5b1d3de
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,6 +14,7 @@ type consumerGroupOut struct { | |
| IsSimple bool `human:"Simple" serialized:"is_simple"` | ||
| PartitionAssignor string `human:"Partition Assignor" serialized:"partition_assignor"` | ||
| State string `human:"State" serialized:"state"` | ||
| ProtocolType string `human:"Type,omitempty" serialized:"type,omitempty"` | ||
| } | ||
|
Comment on lines
14
to
18
|
||
|
|
||
| func (c *consumerCommand) newGroupCommand(cfg *config.Config) *cobra.Command { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,57 @@ | ||
| package kafka | ||
|
|
||
| import ( | ||
| "github.com/spf13/cobra" | ||
|
|
||
| pcmd "github.com/confluentinc/cli/v4/pkg/cmd" | ||
| ) | ||
|
|
||
| type streamGroupCommand struct { | ||
| *pcmd.AuthenticatedCLICommand | ||
| } | ||
|
|
||
| type streamGroupOut struct { | ||
| ClusterId string `human:"Cluster Id" serialized:"cluster_id"` | ||
| GroupId string `human:"Group Id" serialized:"group_id"` | ||
| State string `human:"State" serialized:"state"` | ||
| MemberCount int32 `human:"Member Count" serialized:"member_count"` | ||
| SubtopologyCount int32 `human:"Subtopology Count" serialized:"subtopology_count"` | ||
| GroupEpoch int32 `human:"Group Epoch" serialized:"group_epoch"` | ||
| TopologyEpoch int32 `human:"Topology Epoch" serialized:"topology_epoch"` | ||
| TargetAssignmentEpoch int32 `human:"Target Assignment Epoch" serialized:"target_assignment_epoch"` | ||
| Members string `human:"Members" serialized:"members"` | ||
| Subtopologies string `human:"Subtopologies" serialized:"subtopologies"` | ||
| } | ||
|
|
||
| func newStreamGroupCommand(prerunner pcmd.PreRunner) *cobra.Command { | ||
| cmd := &cobra.Command{ | ||
| Use: "streams-group", | ||
| Short: "Manage Kafka stream groups.", | ||
| Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireCloudLogin}, | ||
| } | ||
|
|
||
| c := &streamGroupCommand{pcmd.NewAuthenticatedCLICommand(cmd, prerunner)} | ||
|
|
||
| cmd.AddCommand(c.newStreamGroupDescribeCommand()) | ||
| cmd.AddCommand(c.newStreamGroupListCommand()) | ||
| cmd.AddCommand(c.newStreamGroupMemberCommand()) | ||
| cmd.AddCommand(c.newStreamGroupMemberAssignmentCommand()) | ||
| cmd.AddCommand(c.newStreamGroupMemberTaskPartitionsCommand()) | ||
| cmd.AddCommand(c.newStreamGroupMemberTargetAssignmentCommand()) | ||
| cmd.AddCommand(c.newStreamGroupMemberTargetAssignmentTaskPartitionsCommand()) | ||
| cmd.AddCommand(c.newStreamGroupSubtopologyCommand()) | ||
|
|
||
| return cmd | ||
| } | ||
|
|
||
| func (c *streamGroupCommand) validStreamGroupArgs(cmd *cobra.Command, args []string) []string { | ||
| if len(args) > 0 { | ||
| return nil | ||
| } | ||
|
|
||
| if err := c.PersistentPreRunE(cmd, args); err != nil { | ||
| return nil | ||
| } | ||
|
|
||
| return pcmd.AutocompleteStreamGroups(cmd, c.AuthenticatedCLICommand) | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,56 @@ | ||
| package kafka | ||
|
|
||
| import ( | ||
| "github.com/spf13/cobra" | ||
|
|
||
| pcmd "github.com/confluentinc/cli/v4/pkg/cmd" | ||
| "github.com/confluentinc/cli/v4/pkg/output" | ||
| ) | ||
|
|
||
| func (c *streamGroupCommand) newStreamGroupDescribeCommand() *cobra.Command { | ||
| cmd := &cobra.Command{ | ||
| Use: "describe <group>", | ||
| Short: "Describe stream group", | ||
| Args: cobra.ExactArgs(1), | ||
| ValidArgsFunction: pcmd.NewValidArgsFunction(c.validStreamGroupArgs), | ||
|
Comment on lines
+11
to
+15
|
||
| RunE: c.streamGroupDescribe, | ||
| } | ||
|
|
||
| pcmd.AddEndpointFlag(cmd, c.AuthenticatedCLICommand) | ||
| pcmd.AddClusterFlag(cmd, c.AuthenticatedCLICommand) | ||
| pcmd.AddContextFlag(cmd, c.CLICommand) | ||
| pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) | ||
| pcmd.AddOutputFlag(cmd) | ||
|
|
||
| return cmd | ||
| } | ||
|
|
||
| func (c *streamGroupCommand) streamGroupDescribe(cmd *cobra.Command, args []string) error { | ||
| groupId := args[0] | ||
|
|
||
| kafkaREST, err := c.GetKafkaREST(cmd) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| streamGroup, err := kafkaREST.CloudClientInternal.GetKafkaStreamGroup(groupId) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| table := output.NewTable(cmd) | ||
| table.Add(&streamGroupOut{ | ||
| ClusterId: streamGroup.GetClusterId(), | ||
| GroupId: streamGroup.GetGroupId(), | ||
| State: streamGroup.GetState(), | ||
| MemberCount: streamGroup.GetMemberCount(), | ||
| SubtopologyCount: streamGroup.GetSubtopologyCount(), | ||
| GroupEpoch: streamGroup.GetGroupEpoch(), | ||
| TopologyEpoch: streamGroup.GetTopologyEpoch(), | ||
| TargetAssignmentEpoch: streamGroup.GetTargetAssignmentEpoch(), | ||
| Members: streamGroup.Members.GetRelated(), | ||
| Subtopologies: streamGroup.Subtopologies.GetRelated(), | ||
| }) | ||
|
|
||
| return table.Print() | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,65 @@ | ||||||||||||||||||||||||||
| package kafka | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| import ( | ||||||||||||||||||||||||||
| kafkarestv3Internal "github.com/confluentinc/ccloud-sdk-go-v2-internal/kafkarest/v3" | ||||||||||||||||||||||||||
| "github.com/spf13/cobra" | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| pcmd "github.com/confluentinc/cli/v4/pkg/cmd" | ||||||||||||||||||||||||||
| "github.com/confluentinc/cli/v4/pkg/output" | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| func (c *streamGroupCommand) newStreamGroupListCommand() *cobra.Command { | ||||||||||||||||||||||||||
| cmd := &cobra.Command{ | ||||||||||||||||||||||||||
| Use: "list", | ||||||||||||||||||||||||||
| Short: "List kafka stream groups.", | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
| Short: "List kafka stream groups.", | |
| Short: "List Kafka stream groups.", |
Copilot
AI
Apr 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Variable name topics is misleading here since this function is listing streams groups, not topics. Renaming it to something like groupsResp/streamsGroups would improve readability.
| topics, err := kafkaREST.CloudClientInternal.ListKafkaStreamsGroup() | |
| if err != nil { | |
| return nil, err | |
| } | |
| return topics.Data, nil | |
| groupsResp, err := kafkaREST.CloudClientInternal.ListKafkaStreamsGroup() | |
| if err != nil { | |
| return nil, err | |
| } | |
| return groupsResp.Data, nil |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,32 @@ | ||||||
| package kafka | ||||||
|
|
||||||
| import ( | ||||||
| "github.com/spf13/cobra" | ||||||
| ) | ||||||
|
|
||||||
| type streamGroupMemberOut struct { | ||||||
| Kind string `human:"Kind" serialized:"kind"` | ||||||
| ClusterId string `human:"Cluster Id" serialized:"cluster_id"` | ||||||
| GroupId string `human:"Group Id" serialized:"group_id"` | ||||||
| MemberId string `human:"Member Id" serialized:"member_id"` | ||||||
| ProcessId string `human:"Process Id" serialized:"process_id"` | ||||||
| ClientId string `human:"Client Id" serialized:"client_id"` | ||||||
| InstanceId string `human:"Instance Id" serialized:"instance_id"` | ||||||
| MemberEpoch int32 `human:"Member Epoch" serialized:"member_epoch"` | ||||||
| TopologyEpoch int32 `human:"Topology Epoch" serialized:"topology_epoch"` | ||||||
| IsClassic bool `human:"Is Classic" serialized:"is_classic"` | ||||||
| Assignments string `human:"Assignments" serialized:"assignments"` | ||||||
| TargetAssign string `human:"Target Assignment" serialized:"target_assignment"` | ||||||
| } | ||||||
|
|
||||||
| func (c *streamGroupCommand) newStreamGroupMemberCommand() *cobra.Command { | ||||||
| cmd := &cobra.Command{ | ||||||
| Use: "member", | ||||||
| Short: "Manage Kafka stream groups members.", | ||||||
|
||||||
| Short: "Manage Kafka stream groups members.", | |
| Short: "Manage Kafka stream group members.", |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,27 @@ | ||
| package kafka | ||
|
|
||
| import ( | ||
| "github.com/spf13/cobra" | ||
| ) | ||
|
|
||
| type streamGroupMemberAssignmentOut struct { | ||
| Kind string `human:"Kind" serialized:"kind"` | ||
| ClusterId string `human:"Cluster Id" serialized:"cluster_id"` | ||
| GroupId string `human:"Group Id" serialized:"group_id"` | ||
| MemberId string `human:"Member Id" serialized:"member_id"` | ||
| ActiveTasks string `human:"Active Tasks" serialized:"active_tasks"` | ||
| StandbyTasks string `human:"Standby Tasks" serialized:"standby_tasks"` | ||
| WarmupTasks string `human:"Warmup Tasks" serialized:"warmup_tasks"` | ||
| } | ||
|
|
||
| func (c *streamGroupCommand) newStreamGroupMemberAssignmentCommand() *cobra.Command { | ||
| cmd := &cobra.Command{ | ||
| Use: "member-assignment", | ||
| Short: "Manage Kafka stream group member assignments.", | ||
| } | ||
|
|
||
| cmd.AddCommand(c.newStreamGroupMemberAssignmentDescribeCommand()) | ||
| cmd.AddCommand(c.newStreamGroupMemberAssignmentListCommand()) | ||
|
|
||
| return cmd | ||
| } |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,62 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||
| package kafka | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| import ( | ||||||||||||||||||||||||||||||||||||||||||||||||||
| "github.com/spf13/cobra" | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| pcmd "github.com/confluentinc/cli/v4/pkg/cmd" | ||||||||||||||||||||||||||||||||||||||||||||||||||
| "github.com/confluentinc/cli/v4/pkg/output" | ||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| func (c *streamGroupCommand) newStreamGroupMemberAssignmentDescribeCommand() *cobra.Command { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| cmd := &cobra.Command{ | ||||||||||||||||||||||||||||||||||||||||||||||||||
| Use: "describe <member>", | ||||||||||||||||||||||||||||||||||||||||||||||||||
| Short: "Describe stream group member assignment", | ||||||||||||||||||||||||||||||||||||||||||||||||||
| Args: cobra.ExactArgs(1), | ||||||||||||||||||||||||||||||||||||||||||||||||||
| ValidArgsFunction: pcmd.NewValidArgsFunction(c.validStreamGroupArgs), | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+10
to
+15
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| func (c *streamGroupCommand) newStreamGroupMemberAssignmentDescribeCommand() *cobra.Command { | |
| cmd := &cobra.Command{ | |
| Use: "describe <member>", | |
| Short: "Describe stream group member assignment", | |
| Args: cobra.ExactArgs(1), | |
| ValidArgsFunction: pcmd.NewValidArgsFunction(c.validStreamGroupArgs), | |
| func (c *streamGroupCommand) validStreamGroupMemberArgs(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) { | |
| groupId, err := cmd.Flags().GetString("group") | |
| if err != nil || groupId == "" { | |
| return nil, cobra.ShellCompDirectiveNoFileComp | |
| } | |
| return nil, cobra.ShellCompDirectiveNoFileComp | |
| } | |
| func (c *streamGroupCommand) newStreamGroupMemberAssignmentDescribeCommand() *cobra.Command { | |
| cmd := &cobra.Command{ | |
| Use: "describe <member>", | |
| Short: "Describe stream group member assignment", | |
| Args: cobra.ExactArgs(1), | |
| ValidArgsFunction: pcmd.NewValidArgsFunction(c.validStreamGroupMemberArgs), |
Copilot
AI
Apr 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This internal-endpoint command is missing the RequireNonAPIKeyCloudLogin run requirement used by the corresponding list commands in this command tree. If API-key login isn’t supported, add the run requirement here as well.
| ValidArgsFunction: pcmd.NewValidArgsFunction(c.validStreamGroupArgs), | |
| ValidArgsFunction: pcmd.NewValidArgsFunction(c.validStreamGroupArgs), | |
| RunRequirements: []pcmd.RunRequirement{pcmd.RequireNonAPIKeyCloudLogin}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR description/release notes mention the new commands under
confluent kafka consumer stream-group, but the implementation addsstreams-groupdirectly underconfluent kafka(sibling ofconsumer,topic, etc.). Please align the docs/release notes (or the command placement) so users can find the feature.