From 6493987614b8ac9618c952df972e662b23f453ce Mon Sep 17 00:00:00 2001 From: Adrian Coman <1664229+azun@users.noreply.github.com> Date: Thu, 16 Apr 2026 16:33:35 +0300 Subject: [PATCH 1/7] feat/Broker version extraction improvements --- pkg/resources/kafka/kafka.go | 51 +++++++++++++++++++++++++----------- 1 file changed, 36 insertions(+), 15 deletions(-) diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index 9aa91808f..fbbc0eab0 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -409,6 +409,7 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { reorderedBrokers := reorderBrokers(runningBrokers, boundPersistentVolumeClaims, r.KafkaCluster.Spec.Brokers, r.KafkaCluster.Status.BrokersState, controllerID, log) allBrokerDynamicConfigSucceeded := true + brokerStatus := make(map[int32]*banzaiv1beta1.BrokerConfig) for _, broker := range reorderedBrokers { brokerConfig, err := broker.GetBrokerConfig(r.KafkaCluster.Spec) if err != nil { @@ -449,9 +450,8 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { if err != nil { return err } - if err = r.updateStatusWithDockerImageAndVersion(broker.Id, brokerConfig, log); err != nil { - return err - } + brokerStatus[broker.Id] = brokerConfig + // If dynamic configs can not be set then let the loop continue to the next broker, // after the loop we return error. This solves that case when other brokers could get healthy, // but the loop exits too soon because dynamic configs can not be set. @@ -474,6 +474,10 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { return err } + if err := r.updateStatusWithDockerImageAndVersion(brokerStatus, log); err != nil { + return err + } + // in case HeadlessServiceEnabled is changed, delete the service that was created by the previous // reconcile flow. The services must be deleted at the end of the reconcile flow after the new services // were created and broker configurations reflecting the new services otherwise the Kafka brokers @@ -917,21 +921,38 @@ func (r *Reconciler) reconcileKafkaPod(log logr.Logger, desiredPod *corev1.Pod, return nil } -func (r *Reconciler) updateStatusWithDockerImageAndVersion(brokerId int32, brokerConfig *banzaiv1beta1.BrokerConfig, - log logr.Logger) error { - jmxExp := jmxextractor.NewJMXExtractor(r.KafkaCluster.GetNamespace(), - r.KafkaCluster.Spec.GetKubernetesClusterDomain(), r.KafkaCluster.GetName(), log) +type brokerVersionResult struct { + brokerID int32 + kafkaVersion *banzaiv1beta1.KafkaVersion + err error +} - kafkaVersion, err := jmxExp.ExtractDockerImageAndVersion(brokerId, brokerConfig, - r.KafkaCluster.Spec.GetClusterImage(), r.KafkaCluster.Spec.HeadlessServiceEnabled) - if err != nil { - return err +func (r *Reconciler) updateStatusWithDockerImageAndVersion(brokers map[int32]*banzaiv1beta1.BrokerConfig, log logr.Logger) error { + ch := make(chan brokerVersionResult, len(brokers)) + for brokerID, brokerConfig := range brokers { + go func(id int32, cfg *banzaiv1beta1.BrokerConfig) { + jmxExp := jmxextractor.NewJMXExtractor(r.KafkaCluster.GetNamespace(), + r.KafkaCluster.Spec.GetKubernetesClusterDomain(), r.KafkaCluster.GetName(), log) + kv, err := jmxExp.ExtractDockerImageAndVersion(id, cfg, + r.KafkaCluster.Spec.GetClusterImage(), r.KafkaCluster.Spec.HeadlessServiceEnabled) + if err != nil { + ch <- brokerVersionResult{brokerID: id, err: err} + return + } + ch <- brokerVersionResult{brokerID: id, kafkaVersion: kv} + }(brokerID, brokerConfig) } - err = k8sutil.UpdateBrokerStatus(r.Client, []string{strconv.Itoa(int(brokerId))}, r.KafkaCluster, - *kafkaVersion, log) - if err != nil { - return err + + for range brokers { + result := <-ch + if result.err != nil { + return result.err + } + if err := k8sutil.UpdateBrokerStatus(r.Client, []string{strconv.Itoa(int(result.brokerID))}, r.KafkaCluster, *result.kafkaVersion, log); err != nil { + return err + } } + return nil } From 8f25bec40179b16ed529067612d3794bf529aba0 Mon Sep 17 00:00:00 2001 From: Adrian Coman <1664229+azun@users.noreply.github.com> Date: Mon, 20 Apr 2026 15:50:22 +0300 Subject: [PATCH 2/7] Added PodMonitor for Prometheus metrics --- .../operator-deployment-with-webhook.yaml | 6 ++-- .../kafka-operator/templates/podmonitor.yaml | 32 +++++++++++++++++++ charts/kafka-operator/values.yaml | 4 +++ 3 files changed, 40 insertions(+), 2 deletions(-) create mode 100644 charts/kafka-operator/templates/podmonitor.yaml diff --git a/charts/kafka-operator/templates/operator-deployment-with-webhook.yaml b/charts/kafka-operator/templates/operator-deployment-with-webhook.yaml index 0de92a3ed..e99612438 100644 --- a/charts/kafka-operator/templates/operator-deployment-with-webhook.yaml +++ b/charts/kafka-operator/templates/operator-deployment-with-webhook.yaml @@ -117,6 +117,8 @@ spec: app.kubernetes.io/name: {{ include "kafka-operator.name" . }} app.kubernetes.io/instance: {{ .Release.Name }} app.kubernetes.io/component: operator + app: kafka-operator + component: operator replicas: {{ .Values.replicaCount }} template: metadata: @@ -133,8 +135,8 @@ spec: app.kubernetes.io/name: {{ include "kafka-operator.name" . }} app.kubernetes.io/instance: {{ .Release.Name }} app.kubernetes.io/component: operator - app: prometheus - component: alertmanager + app: kafka-operator + component: operator spec: {{- with .Values.imagePullSecrets }} imagePullSecrets: diff --git a/charts/kafka-operator/templates/podmonitor.yaml b/charts/kafka-operator/templates/podmonitor.yaml new file mode 100644 index 000000000..164ba5577 --- /dev/null +++ b/charts/kafka-operator/templates/podmonitor.yaml @@ -0,0 +1,32 @@ +{{- if .Values.prometheusMetrics.podMonitor.enabled }} +kind: PodMonitor +apiVersion: monitoring.coreos.com/v1 +metadata: + name: {{ include "kafka-operator.fullname" . }} + namespace: {{ .Release.Namespace | quote }} + labels: + helm.sh/chart: {{ include "kafka-operator.chart" . }} + app.kubernetes.io/name: {{ include "kafka-operator.name" . }} + app.kubernetes.io/instance: {{ .Release.Name }} + app.kubernetes.io/managed-by: {{ .Release.Service }} + app.kubernetes.io/version: {{ .Chart.AppVersion }} + app.kubernetes.io/component: operator + app: kafka-operator + component: operator + {{- with .Values.operator.annotations }} + annotations: + {{- toYaml . | nindent 4 }} + {{- end }} +spec: + namespaceSelector: + matchNames: + - {{ .Release.Namespace }} + selector: + matchLabels: + app: kafka-operator + component: operator + endpoints: + - interval: {{ .Values.prometheusMetrics.interval }} + port: metrics + path: /metrics +{{- end }} diff --git a/charts/kafka-operator/values.yaml b/charts/kafka-operator/values.yaml index 1073280f8..a7966a9e8 100644 --- a/charts/kafka-operator/values.yaml +++ b/charts/kafka-operator/values.yaml @@ -103,6 +103,10 @@ prometheusMetrics: create: true # -- ServiceAccount used by prometheus auth proxy name: kafka-operator-authproxy + podMonitor: + # -- If true, create a PodMonitor for Prometheus metrics + enabled: false + interval: 30s # -- Health probes configuration healthProbes: {} From 8b3ad25278d361086ce6ea64c5b2e93854b01fdc Mon Sep 17 00:00:00 2001 From: Adrian <1664229+azun@users.noreply.github.com> Date: Wed, 6 May 2026 16:20:49 +0300 Subject: [PATCH 3/7] Update charts/kafka-operator/templates/podmonitor.yaml Co-authored-by: Razvan Dobre --- charts/kafka-operator/templates/podmonitor.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/charts/kafka-operator/templates/podmonitor.yaml b/charts/kafka-operator/templates/podmonitor.yaml index 164ba5577..1faef3af2 100644 --- a/charts/kafka-operator/templates/podmonitor.yaml +++ b/charts/kafka-operator/templates/podmonitor.yaml @@ -26,7 +26,7 @@ spec: app: kafka-operator component: operator endpoints: - - interval: {{ .Values.prometheusMetrics.interval }} + - interval: {{ .Values.prometheusMetrics.podMonitor.interval }} port: metrics path: /metrics {{- end }} From 32c6bd2c14337fac0d9852a70456284de0bc2acd Mon Sep 17 00:00:00 2001 From: Adrian <1664229+azun@users.noreply.github.com> Date: Wed, 6 May 2026 16:24:21 +0300 Subject: [PATCH 4/7] Update charts/kafka-operator/templates/podmonitor.yaml Co-authored-by: Razvan Dobre --- charts/kafka-operator/templates/podmonitor.yaml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/charts/kafka-operator/templates/podmonitor.yaml b/charts/kafka-operator/templates/podmonitor.yaml index 1faef3af2..481265240 100644 --- a/charts/kafka-operator/templates/podmonitor.yaml +++ b/charts/kafka-operator/templates/podmonitor.yaml @@ -23,8 +23,9 @@ spec: - {{ .Release.Namespace }} selector: matchLabels: - app: kafka-operator - component: operator + app.kubernetes.io/name: {{ include "kafka-operator.name" . }} + app.kubernetes.io/instance: {{ .Release.Name }} + app.kubernetes.io/component: operator endpoints: - interval: {{ .Values.prometheusMetrics.podMonitor.interval }} port: metrics From 3317ccc37c92839eb64b65b69fa5b9e05db77a4e Mon Sep 17 00:00:00 2001 From: Adrian Coman Date: Wed, 6 May 2026 16:37:32 +0300 Subject: [PATCH 5/7] label cleanup --- .../templates/operator-deployment-with-webhook.yaml | 4 ---- charts/kafka-operator/templates/podmonitor.yaml | 2 -- 2 files changed, 6 deletions(-) diff --git a/charts/kafka-operator/templates/operator-deployment-with-webhook.yaml b/charts/kafka-operator/templates/operator-deployment-with-webhook.yaml index e99612438..c4801b225 100644 --- a/charts/kafka-operator/templates/operator-deployment-with-webhook.yaml +++ b/charts/kafka-operator/templates/operator-deployment-with-webhook.yaml @@ -117,8 +117,6 @@ spec: app.kubernetes.io/name: {{ include "kafka-operator.name" . }} app.kubernetes.io/instance: {{ .Release.Name }} app.kubernetes.io/component: operator - app: kafka-operator - component: operator replicas: {{ .Values.replicaCount }} template: metadata: @@ -135,8 +133,6 @@ spec: app.kubernetes.io/name: {{ include "kafka-operator.name" . }} app.kubernetes.io/instance: {{ .Release.Name }} app.kubernetes.io/component: operator - app: kafka-operator - component: operator spec: {{- with .Values.imagePullSecrets }} imagePullSecrets: diff --git a/charts/kafka-operator/templates/podmonitor.yaml b/charts/kafka-operator/templates/podmonitor.yaml index 481265240..06b8b87a7 100644 --- a/charts/kafka-operator/templates/podmonitor.yaml +++ b/charts/kafka-operator/templates/podmonitor.yaml @@ -11,8 +11,6 @@ metadata: app.kubernetes.io/managed-by: {{ .Release.Service }} app.kubernetes.io/version: {{ .Chart.AppVersion }} app.kubernetes.io/component: operator - app: kafka-operator - component: operator {{- with .Values.operator.annotations }} annotations: {{- toYaml . | nindent 4 }} From 7cac66ea4fd102f093aceb6766440e6a18c5c94f Mon Sep 17 00:00:00 2001 From: Adrian Coman Date: Wed, 6 May 2026 16:37:56 +0300 Subject: [PATCH 6/7] Added 30s timeout to JMX extractor --- pkg/jmxextractor/extractor.go | 4 +++- pkg/resources/kafka/kafka.go | 6 ++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/jmxextractor/extractor.go b/pkg/jmxextractor/extractor.go index 583e06dd7..d06af1491 100644 --- a/pkg/jmxextractor/extractor.go +++ b/pkg/jmxextractor/extractor.go @@ -20,6 +20,7 @@ import ( "io" "net/http" "regexp" + "time" "github.com/banzaicloud/koperator/api/v1beta1" "github.com/banzaicloud/koperator/pkg/errorfactory" @@ -90,7 +91,8 @@ func (exp *jmxExtractor) ExtractDockerImageAndVersion(brokerId int32, brokerConf requestURL = fmt.Sprintf(serviceJMXTemplate, exp.clusterName, brokerId, exp.clusterNamespace, exp.kubernetesClusterDomain, 9020) } - rsp, err := http.Get(requestURL) + client := &http.Client{Timeout: 30 * time.Second} + rsp, err := client.Get(requestURL) if err != nil { exp.log.Error(err, fmt.Sprintf("error during talking to broker-%d", brokerId)) return nil, errorfactory.New(errorfactory.BrokersNotReady{}, err, "unable to talk to ...") diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index fbbc0eab0..934d5b312 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -931,10 +931,8 @@ func (r *Reconciler) updateStatusWithDockerImageAndVersion(brokers map[int32]*ba ch := make(chan brokerVersionResult, len(brokers)) for brokerID, brokerConfig := range brokers { go func(id int32, cfg *banzaiv1beta1.BrokerConfig) { - jmxExp := jmxextractor.NewJMXExtractor(r.KafkaCluster.GetNamespace(), - r.KafkaCluster.Spec.GetKubernetesClusterDomain(), r.KafkaCluster.GetName(), log) - kv, err := jmxExp.ExtractDockerImageAndVersion(id, cfg, - r.KafkaCluster.Spec.GetClusterImage(), r.KafkaCluster.Spec.HeadlessServiceEnabled) + jmxExp := jmxextractor.NewJMXExtractor(r.KafkaCluster.GetNamespace(), r.KafkaCluster.Spec.GetKubernetesClusterDomain(), r.KafkaCluster.GetName(), log) + kv, err := jmxExp.ExtractDockerImageAndVersion(id, cfg, r.KafkaCluster.Spec.GetClusterImage(), r.KafkaCluster.Spec.HeadlessServiceEnabled) if err != nil { ch <- brokerVersionResult{brokerID: id, err: err} return From 7e8b879ad18f26d08be10b1da95a4d496b1e0c66 Mon Sep 17 00:00:00 2001 From: hvan Date: Wed, 27 May 2026 09:01:16 -0500 Subject: [PATCH 7/7] Skip JMX version fetch when broker status is already current (#249) Adds brokerNeedsVersionUpdate to guard the BrokersState write so that JMX fetches are skipped for brokers whose recorded image and version already match the desired image, reducing unnecessary reconcile work. Co-authored-by: Ha Van Co-authored-by: Claude Sonnet 4.6 --- pkg/resources/kafka/kafka.go | 12 +++- pkg/resources/kafka/kafka_test.go | 104 ++++++++++++++++++++++++++++++ 2 files changed, 115 insertions(+), 1 deletion(-) diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index 934d5b312..0b9be60c3 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -450,7 +450,9 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { if err != nil { return err } - brokerStatus[broker.Id] = brokerConfig + if r.brokerNeedsVersionUpdate(broker.Id, brokerConfig) { + brokerStatus[broker.Id] = brokerConfig + } // If dynamic configs can not be set then let the loop continue to the next broker, // after the loop we return error. This solves that case when other brokers could get healthy, @@ -921,6 +923,14 @@ func (r *Reconciler) reconcileKafkaPod(log logr.Logger, desiredPod *corev1.Pod, return nil } +// brokerNeedsVersionUpdate returns true when the broker's image/version status is absent, +// incomplete, or stale relative to the desired image — i.e. a JMX fetch is warranted. +func (r *Reconciler) brokerNeedsVersionUpdate(brokerID int32, brokerConfig *banzaiv1beta1.BrokerConfig) bool { + desiredImage := util.GetBrokerImage(brokerConfig, r.KafkaCluster.Spec.GetClusterImage()) + state, ok := r.KafkaCluster.Status.BrokersState[strconv.Itoa(int(brokerID))] + return !ok || state.Version == "" || state.Image != desiredImage +} + type brokerVersionResult struct { brokerID int32 kafkaVersion *banzaiv1beta1.KafkaVersion diff --git a/pkg/resources/kafka/kafka_test.go b/pkg/resources/kafka/kafka_test.go index 4ca517e6f..42c17c9ab 100644 --- a/pkg/resources/kafka/kafka_test.go +++ b/pkg/resources/kafka/kafka_test.go @@ -1986,3 +1986,107 @@ func TestGetBrokerAzMap(t *testing.T) { }) } } + +func TestBrokerNeedsVersionUpdate(t *testing.T) { + t.Parallel() + const clusterImage = "apache/kafka:3.4.0" + const updatedImage = "apache/kafka:3.5.0" + + testCases := []struct { + testName string + brokerID int32 + brokerConfig v1beta1.BrokerConfig + clusterImage string + brokersState map[string]v1beta1.BrokerState + expected bool + }{ + { + testName: "no existing status entry triggers update", + brokerID: 0, + brokerConfig: v1beta1.BrokerConfig{}, + clusterImage: clusterImage, + brokersState: map[string]v1beta1.BrokerState{}, + expected: true, + }, + { + testName: "status entry with empty version triggers update", + brokerID: 0, + brokerConfig: v1beta1.BrokerConfig{}, + clusterImage: clusterImage, + brokersState: map[string]v1beta1.BrokerState{ + "0": {Image: clusterImage, Version: ""}, + }, + expected: true, + }, + { + testName: "status entry with different image triggers update", + brokerID: 0, + brokerConfig: v1beta1.BrokerConfig{}, + clusterImage: updatedImage, + brokersState: map[string]v1beta1.BrokerState{ + "0": {Image: clusterImage, Version: "3.4.0"}, + }, + expected: true, + }, + { + testName: "status up to date with cluster image skips update", + brokerID: 0, + brokerConfig: v1beta1.BrokerConfig{}, + clusterImage: clusterImage, + brokersState: map[string]v1beta1.BrokerState{ + "0": {Image: clusterImage, Version: "3.4.0"}, + }, + expected: false, + }, + { + testName: "broker-level image override used instead of cluster image", + brokerID: 0, + brokerConfig: v1beta1.BrokerConfig{Image: "apache/kafka:3.4.1"}, + clusterImage: clusterImage, + brokersState: map[string]v1beta1.BrokerState{ + "0": {Image: "apache/kafka:3.4.1", Version: "3.4.1"}, + }, + expected: false, + }, + { + testName: "broker-level image override differs from recorded image triggers update", + brokerID: 0, + brokerConfig: v1beta1.BrokerConfig{Image: "apache/kafka:3.5.0"}, + clusterImage: clusterImage, + brokersState: map[string]v1beta1.BrokerState{ + "0": {Image: "apache/kafka:3.4.1", Version: "3.4.1"}, + }, + expected: true, + }, + { + testName: "correct state for one broker does not suppress update for another", + brokerID: 1, + brokerConfig: v1beta1.BrokerConfig{}, + clusterImage: clusterImage, + brokersState: map[string]v1beta1.BrokerState{ + "0": {Image: clusterImage, Version: "3.4.0"}, + }, + expected: true, + }, + } + + for _, test := range testCases { + test := test + t.Run(test.testName, func(t *testing.T) { + t.Parallel() + r := Reconciler{ + Reconciler: resources.Reconciler{ + KafkaCluster: &v1beta1.KafkaCluster{ + Spec: v1beta1.KafkaClusterSpec{ + ClusterImage: test.clusterImage, + }, + Status: v1beta1.KafkaClusterStatus{ + BrokersState: test.brokersState, + }, + }, + }, + } + assert.Equal(t, test.expected, r.brokerNeedsVersionUpdate(test.brokerID, &test.brokerConfig)) + }) + } +}