From 99e574835e377871be3bb732533a62a4f42bc49a Mon Sep 17 00:00:00 2001 From: Ha Van Date: Tue, 26 May 2026 13:12:59 -0500 Subject: [PATCH 1/4] feature: PVC role labels --- api/v1beta1/kafkacluster_types.go | 13 ++++ pkg/resources/kafka/kafka.go | 2 +- pkg/resources/kafka/pvc.go | 7 +- pkg/resources/kafka/pvc_test.go | 104 ++++++++++++++++++++++++++++-- 4 files changed, 118 insertions(+), 8 deletions(-) diff --git a/api/v1beta1/kafkacluster_types.go b/api/v1beta1/kafkacluster_types.go index 0e03045fe..d6f916f85 100644 --- a/api/v1beta1/kafkacluster_types.go +++ b/api/v1beta1/kafkacluster_types.go @@ -46,6 +46,9 @@ const ( // ProcessRolesKey is used to identify which process roles the Kafka pod has ProcessRolesKey = "processRoles" + // PvcRolesKey is used to identify which process roles a PVC serves (broker, controller, or broker_controller) + PvcRolesKey = "pvcRoles" + // IsBrokerNodeKey is used to identify if the kafka pod is either a broker or a broker_controller IsBrokerNodeKey = "isBrokerNode" @@ -1214,6 +1217,16 @@ func (bConfig *BrokerConfig) IsCombinedNode() bool { return bConfig.IsBrokerNode() && bConfig.IsControllerNode() } +// GetPvcRolesLabelValue returns the value for the pvcRoles label on PVCs. +// In KRaft mode the value mirrors processRoles (e.g. "broker", "controller", "broker_controller"). +// In ZooKeeper mode all nodes are brokers, so the value is always "broker". +func (bConfig *BrokerConfig) GetPvcRolesLabelValue(kRaftMode bool) string { + if kRaftMode && len(bConfig.Roles) > 0 { + return strings.Join(bConfig.Roles, "_") + } + return BrokerNodeProcessRole +} + // GetResources returns the broker specific Kubernetes resource func (bConfig *BrokerConfig) GetResources() *corev1.ResourceRequirements { if bConfig.Resources != nil { diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index 9aa91808f..e10ec186a 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -312,7 +312,7 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { if storage.PvcSpec == nil { continue } - o, err := r.pvc(broker.Id, index, storage) + o, err := r.pvc(broker.Id, index, storage, brokerConfig, r.KafkaCluster.Spec.KRaftMode) if err != nil { return errors.WrapIfWithDetails(err, "failed to generate resource", "resources", "PersistentVolumeClaim") } diff --git a/pkg/resources/kafka/pvc.go b/pkg/resources/kafka/pvc.go index 29516f397..6487d8d0a 100644 --- a/pkg/resources/kafka/pvc.go +++ b/pkg/resources/kafka/pvc.go @@ -31,7 +31,7 @@ import ( "github.com/Masterminds/sprig/v3" ) -func (r *Reconciler) pvc(brokerId int32, storageIndex int, storage v1beta1.StorageConfig) (*corev1.PersistentVolumeClaim, error) { +func (r *Reconciler) pvc(brokerId int32, storageIndex int, storage v1beta1.StorageConfig, brokerConfig *v1beta1.BrokerConfig, kRaftMode bool) (*corev1.PersistentVolumeClaim, error) { errCtx := []interface{}{v1beta1.BrokerIdLabelKey, brokerId, "mountPath", storage.MountPath} pvcSpecYaml, err := yaml.Marshal(storage.PvcSpec) @@ -67,7 +67,10 @@ func (r *Reconciler) pvc(brokerId int32, storageIndex int, storage v1beta1.Stora fmt.Sprintf(brokerStorageTemplate, r.KafkaCluster.Name, brokerId, storageIndex), apiutil.MergeLabels( apiutil.LabelsForKafka(r.KafkaCluster.Name), - map[string]string{v1beta1.BrokerIdLabelKey: fmt.Sprintf("%d", brokerId)}, + map[string]string{ + v1beta1.BrokerIdLabelKey: fmt.Sprintf("%d", brokerId), + v1beta1.PvcRolesKey: brokerConfig.GetPvcRolesLabelValue(kRaftMode), + }, ), map[string]string{"mountPath": storage.MountPath}, r.KafkaCluster), Spec: pvcSpec, diff --git a/pkg/resources/kafka/pvc_test.go b/pkg/resources/kafka/pvc_test.go index 95520342c..796453f1c 100644 --- a/pkg/resources/kafka/pvc_test.go +++ b/pkg/resources/kafka/pvc_test.go @@ -47,13 +47,19 @@ func TestReconciler_pvc(t *testing.T) { }, } + brokerConfig := &v1beta1.BrokerConfig{} + testCases := []struct { testName string + brokerConfig *v1beta1.BrokerConfig + kRaftMode bool storageConfig v1beta1.StorageConfig expectedPersistentVolumeClaim *corev1.PersistentVolumeClaim }{ { - testName: "storage config with no template", + testName: "storage config with no template", + brokerConfig: brokerConfig, + kRaftMode: false, storageConfig: v1beta1.StorageConfig{ MountPath: "/kafka-logs-1", PvcSpec: &corev1.PersistentVolumeClaimSpec{ @@ -71,6 +77,7 @@ func TestReconciler_pvc(t *testing.T) { v1beta1.AppLabelKey: "kafka", v1beta1.KafkaCRLabelKey: kafkaCluster.GetName(), v1beta1.BrokerIdLabelKey: "2", + v1beta1.PvcRolesKey: "broker", }, Annotations: map[string]string{ "mountPath": "/kafka-logs-1", @@ -84,7 +91,9 @@ func TestReconciler_pvc(t *testing.T) { }, }, { - testName: "storage config with template", + testName: "storage config with template", + brokerConfig: brokerConfig, + kRaftMode: false, storageConfig: v1beta1.StorageConfig{ MountPath: "/kafka-logs-1", PvcSpec: &corev1.PersistentVolumeClaimSpec{ @@ -108,6 +117,7 @@ func TestReconciler_pvc(t *testing.T) { v1beta1.AppLabelKey: "kafka", v1beta1.KafkaCRLabelKey: kafkaCluster.GetName(), v1beta1.BrokerIdLabelKey: "2", + v1beta1.PvcRolesKey: "broker", }, Annotations: map[string]string{ "mountPath": "/kafka-logs-1", @@ -125,7 +135,9 @@ func TestReconciler_pvc(t *testing.T) { }, }, { - testName: "storage config with template and very long mount path", + testName: "storage config with template and very long mount path", + brokerConfig: brokerConfig, + kRaftMode: false, storageConfig: v1beta1.StorageConfig{ MountPath: "/mountpath/that/exceeds63characters/kafka-logs-123456789123456789", PvcSpec: &corev1.PersistentVolumeClaimSpec{ @@ -147,6 +159,7 @@ func TestReconciler_pvc(t *testing.T) { v1beta1.AppLabelKey: "kafka", v1beta1.KafkaCRLabelKey: kafkaCluster.GetName(), v1beta1.BrokerIdLabelKey: "2", + v1beta1.PvcRolesKey: "broker", }, Annotations: map[string]string{ "mountPath": "/mountpath/that/exceeds63characters/kafka-logs-123456789123456789", @@ -164,7 +177,9 @@ func TestReconciler_pvc(t *testing.T) { }, }, { - testName: "storage config with volume name template", + testName: "storage config with volume name template", + brokerConfig: brokerConfig, + kRaftMode: false, storageConfig: v1beta1.StorageConfig{ MountPath: "/kafka-logs-1", PvcSpec: &corev1.PersistentVolumeClaimSpec{ @@ -180,6 +195,7 @@ func TestReconciler_pvc(t *testing.T) { v1beta1.AppLabelKey: "kafka", v1beta1.KafkaCRLabelKey: kafkaCluster.GetName(), v1beta1.BrokerIdLabelKey: "2", + v1beta1.PvcRolesKey: "broker", }, Annotations: map[string]string{ "mountPath": "/kafka-logs-1", @@ -190,6 +206,84 @@ func TestReconciler_pvc(t *testing.T) { }, }, }, + { + testName: "kraft controller-only node", + brokerConfig: &v1beta1.BrokerConfig{Roles: []string{v1beta1.ControllerNodeProcessRole}}, + kRaftMode: true, + storageConfig: v1beta1.StorageConfig{ + MountPath: "/kafka-logs-1", + PvcSpec: &corev1.PersistentVolumeClaimSpec{}, + }, + expectedPersistentVolumeClaim: &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: kafkaCluster.GetNamespace(), + Name: "", + GenerateName: fmt.Sprintf("%s-2-storage-1-", kafkaCluster.GetName()), + Labels: map[string]string{ + v1beta1.AppLabelKey: "kafka", + v1beta1.KafkaCRLabelKey: kafkaCluster.GetName(), + v1beta1.BrokerIdLabelKey: "2", + v1beta1.PvcRolesKey: "controller", + }, + Annotations: map[string]string{ + "mountPath": "/kafka-logs-1", + }, + }, + Spec: corev1.PersistentVolumeClaimSpec{}, + }, + }, + { + testName: "kraft broker-only node", + brokerConfig: &v1beta1.BrokerConfig{Roles: []string{v1beta1.BrokerNodeProcessRole}}, + kRaftMode: true, + storageConfig: v1beta1.StorageConfig{ + MountPath: "/kafka-logs-1", + PvcSpec: &corev1.PersistentVolumeClaimSpec{}, + }, + expectedPersistentVolumeClaim: &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: kafkaCluster.GetNamespace(), + Name: "", + GenerateName: fmt.Sprintf("%s-2-storage-1-", kafkaCluster.GetName()), + Labels: map[string]string{ + v1beta1.AppLabelKey: "kafka", + v1beta1.KafkaCRLabelKey: kafkaCluster.GetName(), + v1beta1.BrokerIdLabelKey: "2", + v1beta1.PvcRolesKey: "broker", + }, + Annotations: map[string]string{ + "mountPath": "/kafka-logs-1", + }, + }, + Spec: corev1.PersistentVolumeClaimSpec{}, + }, + }, + { + testName: "kraft combined broker+controller node", + brokerConfig: &v1beta1.BrokerConfig{Roles: []string{v1beta1.BrokerNodeProcessRole, v1beta1.ControllerNodeProcessRole}}, + kRaftMode: true, + storageConfig: v1beta1.StorageConfig{ + MountPath: "/kafka-logs-1", + PvcSpec: &corev1.PersistentVolumeClaimSpec{}, + }, + expectedPersistentVolumeClaim: &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: kafkaCluster.GetNamespace(), + Name: "", + GenerateName: fmt.Sprintf("%s-2-storage-1-", kafkaCluster.GetName()), + Labels: map[string]string{ + v1beta1.AppLabelKey: "kafka", + v1beta1.KafkaCRLabelKey: kafkaCluster.GetName(), + v1beta1.BrokerIdLabelKey: "2", + v1beta1.PvcRolesKey: "broker_controller", + }, + Annotations: map[string]string{ + "mountPath": "/kafka-logs-1", + }, + }, + Spec: corev1.PersistentVolumeClaimSpec{}, + }, + }, } t.Parallel() @@ -198,7 +292,7 @@ func TestReconciler_pvc(t *testing.T) { test := test t.Run(test.testName, func(t *testing.T) { - pvc, err := r.pvc(2, 1, test.storageConfig) + pvc, err := r.pvc(2, 1, test.storageConfig, test.brokerConfig, test.kRaftMode) assert.NilError(t, err, "PVC creation should succeed") From 8ca0bcf96e069297fce22b56033687712b59c81a Mon Sep 17 00:00:00 2001 From: Ha Van Date: Tue, 26 May 2026 13:58:18 -0500 Subject: [PATCH 2/4] refactor: extract processRolesValue helper and sort roles for deterministic labels - Extract private processRolesValue() on BrokerConfig as a single source of truth for the role join logic used by both GetBrokerLabels (processRoles pod label) and GetPvcRolesLabelValue (pvcRoles PVC label) - Sort roles before joining so both labels are alphabetically ordered regardless of CR authoring order ([controller, broker] and [broker, controller] both produce broker_controller) - Add backfill log line in PVC reconcile path so operators can observe the one-time label backfill on existing PVCs after upgrade Co-Authored-By: Claude Sonnet 4.6 --- api/v1beta1/kafkacluster_types.go | 19 ++++++++++++++----- pkg/resources/kafka/kafka.go | 3 +++ 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/api/v1beta1/kafkacluster_types.go b/api/v1beta1/kafkacluster_types.go index d6f916f85..b5f5fb498 100644 --- a/api/v1beta1/kafkacluster_types.go +++ b/api/v1beta1/kafkacluster_types.go @@ -17,6 +17,7 @@ package v1beta1 import ( "fmt" + "sort" "strconv" "strings" @@ -1105,12 +1106,23 @@ func (bConfig *BrokerConfig) GetBrokerAnnotations() map[string]string { return util.CloneMap(bConfig.BrokerAnnotations) } +// processRolesValue returns the joined role string used for both the processRoles pod label +// and the pvcRoles PVC label. In KRaft mode the roles are joined with "_"; in ZK mode always "broker". +func (bConfig *BrokerConfig) processRolesValue(kRaftMode bool) string { + if kRaftMode { + roles := append([]string(nil), bConfig.Roles...) + sort.Strings(roles) + return strings.Join(roles, "_") + } + return BrokerNodeProcessRole +} + // GetBrokerLabels returns the labels that are applied to broker pods func (bConfig *BrokerConfig) GetBrokerLabels(kafkaClusterName string, brokerId int32, kRaftMode bool) map[string]string { var kraftLabels map[string]string if kRaftMode { kraftLabels = map[string]string{ - ProcessRolesKey: strings.Join(bConfig.Roles, "_"), + ProcessRolesKey: bConfig.processRolesValue(kRaftMode), IsControllerNodeKey: fmt.Sprintf("%t", bConfig.IsControllerNode()), IsBrokerNodeKey: fmt.Sprintf("%t", bConfig.IsBrokerNode()), } @@ -1221,10 +1233,7 @@ func (bConfig *BrokerConfig) IsCombinedNode() bool { // In KRaft mode the value mirrors processRoles (e.g. "broker", "controller", "broker_controller"). // In ZooKeeper mode all nodes are brokers, so the value is always "broker". func (bConfig *BrokerConfig) GetPvcRolesLabelValue(kRaftMode bool) string { - if kRaftMode && len(bConfig.Roles) > 0 { - return strings.Join(bConfig.Roles, "_") - } - return BrokerNodeProcessRole + return bConfig.processRolesValue(kRaftMode) } // GetResources returns the broker specific Kubernetes resource diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index e10ec186a..3abf7cc6d 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -1257,6 +1257,9 @@ func (r *Reconciler) reconcileKafkaPvc(ctx context.Context, log logr.Logger, bro if err := r.Update(ctx, desiredPvc); err != nil { return errorfactory.New(errorfactory.APIFailure{}, err, "updating resource failed", "kind", desiredType) } + if _, hadLabel := currentPvc.Labels[banzaiv1beta1.PvcRolesKey]; !hadLabel { + log.Info("backfilling pvcRoles label on existing PVC", "pvc", desiredPvc.Name, banzaiv1beta1.PvcRolesKey, desiredPvc.Labels[banzaiv1beta1.PvcRolesKey]) + } log.Info("resource updated") } } From 39f01a8273b9bc8abcee2ed59752304782b2586a Mon Sep 17 00:00:00 2001 From: Ha Van Date: Tue, 26 May 2026 14:04:24 -0500 Subject: [PATCH 3/4] Remove the log message --- pkg/resources/kafka/kafka.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index 3abf7cc6d..e10ec186a 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -1257,9 +1257,6 @@ func (r *Reconciler) reconcileKafkaPvc(ctx context.Context, log logr.Logger, bro if err := r.Update(ctx, desiredPvc); err != nil { return errorfactory.New(errorfactory.APIFailure{}, err, "updating resource failed", "kind", desiredType) } - if _, hadLabel := currentPvc.Labels[banzaiv1beta1.PvcRolesKey]; !hadLabel { - log.Info("backfilling pvcRoles label on existing PVC", "pvc", desiredPvc.Name, banzaiv1beta1.PvcRolesKey, desiredPvc.Labels[banzaiv1beta1.PvcRolesKey]) - } log.Info("resource updated") } } From d6f5c3144380cc9d745d1e9b675e6d3185436aca Mon Sep 17 00:00:00 2001 From: Ha Van Date: Tue, 26 May 2026 14:23:55 -0500 Subject: [PATCH 4/4] fix: update test expectation for sorted processRoles label Co-Authored-By: Claude Sonnet 4.6 --- api/v1beta1/kafkacluster_types_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/v1beta1/kafkacluster_types_test.go b/api/v1beta1/kafkacluster_types_test.go index 38a7be38e..996adc629 100644 --- a/api/v1beta1/kafkacluster_types_test.go +++ b/api/v1beta1/kafkacluster_types_test.go @@ -515,7 +515,7 @@ func TestGetBrokerLabels(t *testing.T) { BrokerIdLabelKey: strconv.Itoa(expectedBrokerId), KafkaCRLabelKey: expectedKafkaCRName, "test_label_key": "test_label_value", - ProcessRolesKey: "controller_broker", + ProcessRolesKey: "broker_controller", IsBrokerNodeKey: "true", IsControllerNodeKey: "true", },