Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/charmbracelet/lipgloss v0.11.0
github.com/client9/gospell v0.0.0-20160306015952-90dfc71015df
github.com/confluentinc/ccloud-sdk-go-v1-public v0.0.0-20250521223017-0e8f6f971b52
github.com/confluentinc/ccloud-sdk-go-v2-internal/kafkarest v0.23.1-0.20260206203324-971ae1cf8ca8
github.com/confluentinc/ccloud-sdk-go-v2/ai v0.1.0
github.com/confluentinc/ccloud-sdk-go-v2/apikeys v0.4.0
github.com/confluentinc/ccloud-sdk-go-v2/billing v0.3.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ github.com/compose-spec/compose-go/v2 v2.1.3 h1:bD67uqLuL/XgkAK6ir3xZvNLFPxPScEi
github.com/compose-spec/compose-go/v2 v2.1.3/go.mod h1:lFN0DrMxIncJGYAXTfWuajfwj5haBJqrBkarHcnjJKc=
github.com/confluentinc/ccloud-sdk-go-v1-public v0.0.0-20250521223017-0e8f6f971b52 h1:19qEGhkbZa5fopKCe0VPIV+Sasby4Pv10z9ZaktwWso=
github.com/confluentinc/ccloud-sdk-go-v1-public v0.0.0-20250521223017-0e8f6f971b52/go.mod h1:62EMf+5uFEt1BJ2q8WMrUoI9VUSxAbDnmZCGRt/MbA0=
github.com/confluentinc/ccloud-sdk-go-v2-internal/kafkarest v0.23.1-0.20260206203324-971ae1cf8ca8 h1:0t56uO8mzCIT7PLg/G7yKZc6U0ToR3Pqmi3aN1uPpiM=
github.com/confluentinc/ccloud-sdk-go-v2-internal/kafkarest v0.23.1-0.20260206203324-971ae1cf8ca8/go.mod h1:R2nAnRzw0ug4oUywRLO7AiyciE1dFS1Rf3TIfxj9Znk=
github.com/confluentinc/ccloud-sdk-go-v2/ai v0.1.0 h1:zSF4OQUJXWH2JeAo9rsq13ibk+JFdzITGR8S7cFMpzw=
github.com/confluentinc/ccloud-sdk-go-v2/ai v0.1.0/go.mod h1:DoxqzzF3JzvJr3fWkvCiOHFlE0GoYpozWxFZ1Ud9ntA=
github.com/confluentinc/ccloud-sdk-go-v2/apikeys v0.4.0 h1:8fWyLwMuy8ec0MVF5Avd54UvbIxhDFhZzanHBVwgxdw=
Expand Down
1 change: 1 addition & 0 deletions internal/kafka/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func New(cfg *config.Config, prerunner pcmd.PreRunner) *cobra.Command {
cmd.AddCommand(newRegionCommand(prerunner))
cmd.AddCommand(newReplicaCommand(prerunner))
cmd.AddCommand(newShareGroupCommand(prerunner))
cmd.AddCommand(newStreamGroupCommand(prerunner))
cmd.AddCommand(newTopicCommand(cfg, prerunner))
Comment on lines 27 to 29
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR description/release notes mention the new commands under confluent kafka consumer stream-group, but the implementation adds streams-group directly under confluent kafka (sibling of consumer, topic, etc.). Please align the docs/release notes (or the command placement) so users can find the feature.

Copilot uses AI. Check for mistakes.

return cmd
Expand Down
1 change: 1 addition & 0 deletions internal/kafka/command_consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type consumerGroupOut struct {
IsSimple bool `human:"Simple" serialized:"is_simple"`
PartitionAssignor string `human:"Partition Assignor" serialized:"partition_assignor"`
State string `human:"State" serialized:"state"`
ProtocolType string `human:"Type,omitempty" serialized:"type,omitempty"`
}
Comment on lines 14 to 18
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

omitempty in the human tag is only honored for single-object tables, not list headers/columns (see pkg/output/table.go list header generation). This means the “Type” column will always appear in consumer group list even when empty (as reflected in the updated golden output). If the intent is to omit the column when the API doesn’t return types, consider conditionally filtering the list columns (e.g., list.Filter(...)) when all ProtocolType values are empty.

Copilot uses AI. Check for mistakes.

func (c *consumerCommand) newGroupCommand(cfg *config.Config) *cobra.Command {
Expand Down
1 change: 1 addition & 0 deletions internal/kafka/command_consumer_group_describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func (c *consumerCommand) groupDescribe(cmd *cobra.Command, args []string) error
IsSimple: group.GetIsSimple(),
PartitionAssignor: group.GetPartitionAssignor(),
State: group.GetState(),
ProtocolType: group.GetType(),
})
return table.Print()
}
Expand Down
1 change: 1 addition & 0 deletions internal/kafka/command_consumer_group_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func (c *consumerCommand) groupList(cmd *cobra.Command, _ []string) error {
IsSimple: group.GetIsSimple(),
PartitionAssignor: group.GetPartitionAssignor(),
State: group.GetState(),
ProtocolType: group.GetType(),
})
}
return list.Print()
Expand Down
57 changes: 57 additions & 0 deletions internal/kafka/command_kafka_stream_group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package kafka

import (
"github.com/spf13/cobra"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
)

type streamGroupCommand struct {
*pcmd.AuthenticatedCLICommand
}

type streamGroupOut struct {
ClusterId string `human:"Cluster Id" serialized:"cluster_id"`
GroupId string `human:"Group Id" serialized:"group_id"`
State string `human:"State" serialized:"state"`
MemberCount int32 `human:"Member Count" serialized:"member_count"`
SubtopologyCount int32 `human:"Subtopology Count" serialized:"subtopology_count"`
GroupEpoch int32 `human:"Group Epoch" serialized:"group_epoch"`
TopologyEpoch int32 `human:"Topology Epoch" serialized:"topology_epoch"`
TargetAssignmentEpoch int32 `human:"Target Assignment Epoch" serialized:"target_assignment_epoch"`
Members string `human:"Members" serialized:"members"`
Subtopologies string `human:"Subtopologies" serialized:"subtopologies"`
}

func newStreamGroupCommand(prerunner pcmd.PreRunner) *cobra.Command {
cmd := &cobra.Command{
Use: "streams-group",
Short: "Manage Kafka stream groups.",
Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireCloudLogin},
}

c := &streamGroupCommand{pcmd.NewAuthenticatedCLICommand(cmd, prerunner)}

cmd.AddCommand(c.newStreamGroupDescribeCommand())
cmd.AddCommand(c.newStreamGroupListCommand())
cmd.AddCommand(c.newStreamGroupMemberCommand())
cmd.AddCommand(c.newStreamGroupMemberAssignmentCommand())
cmd.AddCommand(c.newStreamGroupMemberTaskPartitionsCommand())
cmd.AddCommand(c.newStreamGroupMemberTargetAssignmentCommand())
cmd.AddCommand(c.newStreamGroupMemberTargetAssignmentTaskPartitionsCommand())
cmd.AddCommand(c.newStreamGroupSubtopologyCommand())

return cmd
}

func (c *streamGroupCommand) validStreamGroupArgs(cmd *cobra.Command, args []string) []string {
if len(args) > 0 {
return nil
}

if err := c.PersistentPreRunE(cmd, args); err != nil {
return nil
}

return pcmd.AutocompleteStreamGroups(cmd, c.AuthenticatedCLICommand)
}
56 changes: 56 additions & 0 deletions internal/kafka/command_kafka_stream_group_describe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package kafka

import (
"github.com/spf13/cobra"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/output"
)

func (c *streamGroupCommand) newStreamGroupDescribeCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "describe <group>",
Short: "Describe stream group",
Args: cobra.ExactArgs(1),
ValidArgsFunction: pcmd.NewValidArgsFunction(c.validStreamGroupArgs),
Comment on lines +11 to +15
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This command hits internal Kafka REST endpoints via CloudClientInternal, but unlike the corresponding list commands it has no RequireNonAPIKeyCloudLogin run requirement. If API-key login isn’t supported for these endpoints, this should add the same run requirement to prevent confusing runtime failures.

Copilot uses AI. Check for mistakes.
RunE: c.streamGroupDescribe,
}

pcmd.AddEndpointFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddClusterFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddContextFlag(cmd, c.CLICommand)
pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddOutputFlag(cmd)

return cmd
}

func (c *streamGroupCommand) streamGroupDescribe(cmd *cobra.Command, args []string) error {
groupId := args[0]

kafkaREST, err := c.GetKafkaREST(cmd)
if err != nil {
return err
}

streamGroup, err := kafkaREST.CloudClientInternal.GetKafkaStreamGroup(groupId)
if err != nil {
return err
}

table := output.NewTable(cmd)
table.Add(&streamGroupOut{
ClusterId: streamGroup.GetClusterId(),
GroupId: streamGroup.GetGroupId(),
State: streamGroup.GetState(),
MemberCount: streamGroup.GetMemberCount(),
SubtopologyCount: streamGroup.GetSubtopologyCount(),
GroupEpoch: streamGroup.GetGroupEpoch(),
TopologyEpoch: streamGroup.GetTopologyEpoch(),
TargetAssignmentEpoch: streamGroup.GetTargetAssignmentEpoch(),
Members: streamGroup.Members.GetRelated(),
Subtopologies: streamGroup.Subtopologies.GetRelated(),
})

return table.Print()
}
65 changes: 65 additions & 0 deletions internal/kafka/command_kafka_stream_group_list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package kafka

import (
kafkarestv3Internal "github.com/confluentinc/ccloud-sdk-go-v2-internal/kafkarest/v3"
"github.com/spf13/cobra"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/output"
)

func (c *streamGroupCommand) newStreamGroupListCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "list",
Short: "List kafka stream groups.",
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Short help text uses lowercase “kafka” here, but other commands use “Kafka”. Consider changing to “List Kafka stream groups.” (and updating the golden help fixtures) for consistency.

Suggested change
Short: "List kafka stream groups.",
Short: "List Kafka stream groups.",

Copilot uses AI. Check for mistakes.
Args: cobra.NoArgs,
RunE: c.listStreamGroup,
Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLogin},
}

pcmd.AddEndpointFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddClusterFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddContextFlag(cmd, c.CLICommand)
pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddOutputFlag(cmd)

return cmd
}

func (c *streamGroupCommand) listStreamGroup(cmd *cobra.Command, _ []string) error {
groups, err := c.getStreamGroups(cmd)
if err != nil {
return err
}

list := output.NewList(cmd)
for _, streamGroup := range groups {
list.Add(&streamGroupOut{
ClusterId: streamGroup.GetClusterId(),
GroupId: streamGroup.GetGroupId(),
State: streamGroup.GetState(),
MemberCount: streamGroup.GetMemberCount(),
SubtopologyCount: streamGroup.GetSubtopologyCount(),
GroupEpoch: streamGroup.GetGroupEpoch(),
TopologyEpoch: streamGroup.GetTopologyEpoch(),
TargetAssignmentEpoch: streamGroup.GetTargetAssignmentEpoch(),
Members: streamGroup.Members.GetRelated(),
Subtopologies: streamGroup.Subtopologies.GetRelated(),
})
}
return list.Print()
}

func (c *streamGroupCommand) getStreamGroups(cmd *cobra.Command) ([]kafkarestv3Internal.StreamsGroupData, error) {
kafkaREST, err := c.GetKafkaREST(cmd)
if err != nil {
return nil, err
}

topics, err := kafkaREST.CloudClientInternal.ListKafkaStreamsGroup()
if err != nil {
return nil, err
}

return topics.Data, nil
Comment on lines +59 to +64
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable name topics is misleading here since this function is listing streams groups, not topics. Renaming it to something like groupsResp/streamsGroups would improve readability.

Suggested change
topics, err := kafkaREST.CloudClientInternal.ListKafkaStreamsGroup()
if err != nil {
return nil, err
}
return topics.Data, nil
groupsResp, err := kafkaREST.CloudClientInternal.ListKafkaStreamsGroup()
if err != nil {
return nil, err
}
return groupsResp.Data, nil

Copilot uses AI. Check for mistakes.
}
32 changes: 32 additions & 0 deletions internal/kafka/command_kafka_stream_group_member.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package kafka

import (
"github.com/spf13/cobra"
)

type streamGroupMemberOut struct {
Kind string `human:"Kind" serialized:"kind"`
ClusterId string `human:"Cluster Id" serialized:"cluster_id"`
GroupId string `human:"Group Id" serialized:"group_id"`
MemberId string `human:"Member Id" serialized:"member_id"`
ProcessId string `human:"Process Id" serialized:"process_id"`
ClientId string `human:"Client Id" serialized:"client_id"`
InstanceId string `human:"Instance Id" serialized:"instance_id"`
MemberEpoch int32 `human:"Member Epoch" serialized:"member_epoch"`
TopologyEpoch int32 `human:"Topology Epoch" serialized:"topology_epoch"`
IsClassic bool `human:"Is Classic" serialized:"is_classic"`
Assignments string `human:"Assignments" serialized:"assignments"`
TargetAssign string `human:"Target Assignment" serialized:"target_assignment"`
}

func (c *streamGroupCommand) newStreamGroupMemberCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "member",
Short: "Manage Kafka stream groups members.",
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Help text is grammatically incorrect: “Manage Kafka stream groups members.” should be “Manage Kafka stream group members.” (or similar). Fixing this string will also require updating the related golden help fixture.

Suggested change
Short: "Manage Kafka stream groups members.",
Short: "Manage Kafka stream group members.",

Copilot uses AI. Check for mistakes.
}

cmd.AddCommand(c.newStreamGroupMemberDescribeCommand())
cmd.AddCommand(c.newStreamGroupMemberListCommand())

return cmd
}
27 changes: 27 additions & 0 deletions internal/kafka/command_kafka_stream_group_member_assignment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package kafka

import (
"github.com/spf13/cobra"
)

type streamGroupMemberAssignmentOut struct {
Kind string `human:"Kind" serialized:"kind"`
ClusterId string `human:"Cluster Id" serialized:"cluster_id"`
GroupId string `human:"Group Id" serialized:"group_id"`
MemberId string `human:"Member Id" serialized:"member_id"`
ActiveTasks string `human:"Active Tasks" serialized:"active_tasks"`
StandbyTasks string `human:"Standby Tasks" serialized:"standby_tasks"`
WarmupTasks string `human:"Warmup Tasks" serialized:"warmup_tasks"`
}

func (c *streamGroupCommand) newStreamGroupMemberAssignmentCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "member-assignment",
Short: "Manage Kafka stream group member assignments.",
}

cmd.AddCommand(c.newStreamGroupMemberAssignmentDescribeCommand())
cmd.AddCommand(c.newStreamGroupMemberAssignmentListCommand())

return cmd
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package kafka

import (
"github.com/spf13/cobra"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/output"
)

func (c *streamGroupCommand) newStreamGroupMemberAssignmentDescribeCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "describe <member>",
Short: "Describe stream group member assignment",
Args: cobra.ExactArgs(1),
ValidArgsFunction: pcmd.NewValidArgsFunction(c.validStreamGroupArgs),
Comment on lines +10 to +15
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ValidArgsFunction here uses validStreamGroupArgs, which autocompletes stream group IDs. For describe <member>, completion should suggest member IDs (based on --group). Consider a dedicated valid-args function for member IDs and reuse it across the member-related describe commands.

Suggested change
func (c *streamGroupCommand) newStreamGroupMemberAssignmentDescribeCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "describe <member>",
Short: "Describe stream group member assignment",
Args: cobra.ExactArgs(1),
ValidArgsFunction: pcmd.NewValidArgsFunction(c.validStreamGroupArgs),
func (c *streamGroupCommand) validStreamGroupMemberArgs(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
groupId, err := cmd.Flags().GetString("group")
if err != nil || groupId == "" {
return nil, cobra.ShellCompDirectiveNoFileComp
}
return nil, cobra.ShellCompDirectiveNoFileComp
}
func (c *streamGroupCommand) newStreamGroupMemberAssignmentDescribeCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "describe <member>",
Short: "Describe stream group member assignment",
Args: cobra.ExactArgs(1),
ValidArgsFunction: pcmd.NewValidArgsFunction(c.validStreamGroupMemberArgs),

Copilot uses AI. Check for mistakes.
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This internal-endpoint command is missing the RequireNonAPIKeyCloudLogin run requirement used by the corresponding list commands in this command tree. If API-key login isn’t supported, add the run requirement here as well.

Suggested change
ValidArgsFunction: pcmd.NewValidArgsFunction(c.validStreamGroupArgs),
ValidArgsFunction: pcmd.NewValidArgsFunction(c.validStreamGroupArgs),
RunRequirements: []pcmd.RunRequirement{pcmd.RequireNonAPIKeyCloudLogin},

Copilot uses AI. Check for mistakes.
RunE: c.streamGroupMemberAssignmentDescribe,
}

cmd.Flags().String("group", "", "Group Id.")

pcmd.AddEndpointFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddClusterFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddContextFlag(cmd, c.CLICommand)
pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddOutputFlag(cmd)

cobra.CheckErr(cmd.MarkFlagRequired("group"))

return cmd
}

func (c *streamGroupCommand) streamGroupMemberAssignmentDescribe(cmd *cobra.Command, args []string) error {
groupId, err := cmd.Flags().GetString("group")
if err != nil {
return err
}

memberId := args[0]

kafkaREST, err := c.GetKafkaREST(cmd)
if err != nil {
return err
}

assignment, err := kafkaREST.CloudClientInternal.GetKafkaStreamGroupMemberAssignment(groupId, memberId)
if err != nil {
return err
}

table := output.NewTable(cmd)
table.Add(&streamGroupMemberAssignmentOut{
Kind: assignment.GetKind(),
ClusterId: assignment.GetClusterId(),
GroupId: assignment.GetGroupId(),
MemberId: assignment.GetMemberId(),
ActiveTasks: assignment.ActiveTasks.GetRelated(),
StandbyTasks: assignment.StandbyTasks.GetRelated(),
WarmupTasks: assignment.WarmupTasks.GetRelated(),
})

return table.Print()
}
Loading