diff --git a/charts/kafka-operator/templates/operator-deployment-with-webhook.yaml b/charts/kafka-operator/templates/operator-deployment-with-webhook.yaml index 0de92a3ed..c4801b225 100644 --- a/charts/kafka-operator/templates/operator-deployment-with-webhook.yaml +++ b/charts/kafka-operator/templates/operator-deployment-with-webhook.yaml @@ -133,8 +133,6 @@ spec: app.kubernetes.io/name: {{ include "kafka-operator.name" . }} app.kubernetes.io/instance: {{ .Release.Name }} app.kubernetes.io/component: operator - app: prometheus - component: alertmanager spec: {{- with .Values.imagePullSecrets }} imagePullSecrets: diff --git a/charts/kafka-operator/templates/podmonitor.yaml b/charts/kafka-operator/templates/podmonitor.yaml new file mode 100644 index 000000000..06b8b87a7 --- /dev/null +++ b/charts/kafka-operator/templates/podmonitor.yaml @@ -0,0 +1,31 @@ +{{- if .Values.prometheusMetrics.podMonitor.enabled }} +kind: PodMonitor +apiVersion: monitoring.coreos.com/v1 +metadata: + name: {{ include "kafka-operator.fullname" . }} + namespace: {{ .Release.Namespace | quote }} + labels: + helm.sh/chart: {{ include "kafka-operator.chart" . }} + app.kubernetes.io/name: {{ include "kafka-operator.name" . }} + app.kubernetes.io/instance: {{ .Release.Name }} + app.kubernetes.io/managed-by: {{ .Release.Service }} + app.kubernetes.io/version: {{ .Chart.AppVersion }} + app.kubernetes.io/component: operator + {{- with .Values.operator.annotations }} + annotations: + {{- toYaml . | nindent 4 }} + {{- end }} +spec: + namespaceSelector: + matchNames: + - {{ .Release.Namespace }} + selector: + matchLabels: + app.kubernetes.io/name: {{ include "kafka-operator.name" . }} + app.kubernetes.io/instance: {{ .Release.Name }} + app.kubernetes.io/component: operator + endpoints: + - interval: {{ .Values.prometheusMetrics.podMonitor.interval }} + port: metrics + path: /metrics +{{- end }} diff --git a/charts/kafka-operator/values.yaml b/charts/kafka-operator/values.yaml index 1073280f8..a7966a9e8 100644 --- a/charts/kafka-operator/values.yaml +++ b/charts/kafka-operator/values.yaml @@ -103,6 +103,10 @@ prometheusMetrics: create: true # -- ServiceAccount used by prometheus auth proxy name: kafka-operator-authproxy + podMonitor: + # -- If true, create a PodMonitor for Prometheus metrics + enabled: false + interval: 30s # -- Health probes configuration healthProbes: {} diff --git a/pkg/jmxextractor/extractor.go b/pkg/jmxextractor/extractor.go index 583e06dd7..d06af1491 100644 --- a/pkg/jmxextractor/extractor.go +++ b/pkg/jmxextractor/extractor.go @@ -20,6 +20,7 @@ import ( "io" "net/http" "regexp" + "time" "github.com/banzaicloud/koperator/api/v1beta1" "github.com/banzaicloud/koperator/pkg/errorfactory" @@ -90,7 +91,8 @@ func (exp *jmxExtractor) ExtractDockerImageAndVersion(brokerId int32, brokerConf requestURL = fmt.Sprintf(serviceJMXTemplate, exp.clusterName, brokerId, exp.clusterNamespace, exp.kubernetesClusterDomain, 9020) } - rsp, err := http.Get(requestURL) + client := &http.Client{Timeout: 30 * time.Second} + rsp, err := client.Get(requestURL) if err != nil { exp.log.Error(err, fmt.Sprintf("error during talking to broker-%d", brokerId)) return nil, errorfactory.New(errorfactory.BrokersNotReady{}, err, "unable to talk to ...") diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index 9aa91808f..0b9be60c3 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -409,6 +409,7 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { reorderedBrokers := reorderBrokers(runningBrokers, boundPersistentVolumeClaims, r.KafkaCluster.Spec.Brokers, r.KafkaCluster.Status.BrokersState, controllerID, log) allBrokerDynamicConfigSucceeded := true + brokerStatus := make(map[int32]*banzaiv1beta1.BrokerConfig) for _, broker := range reorderedBrokers { brokerConfig, err := broker.GetBrokerConfig(r.KafkaCluster.Spec) if err != nil { @@ -449,9 +450,10 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { if err != nil { return err } - if err = r.updateStatusWithDockerImageAndVersion(broker.Id, brokerConfig, log); err != nil { - return err + if r.brokerNeedsVersionUpdate(broker.Id, brokerConfig) { + brokerStatus[broker.Id] = brokerConfig } + // If dynamic configs can not be set then let the loop continue to the next broker, // after the loop we return error. This solves that case when other brokers could get healthy, // but the loop exits too soon because dynamic configs can not be set. @@ -474,6 +476,10 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { return err } + if err := r.updateStatusWithDockerImageAndVersion(brokerStatus, log); err != nil { + return err + } + // in case HeadlessServiceEnabled is changed, delete the service that was created by the previous // reconcile flow. The services must be deleted at the end of the reconcile flow after the new services // were created and broker configurations reflecting the new services otherwise the Kafka brokers @@ -917,21 +923,44 @@ func (r *Reconciler) reconcileKafkaPod(log logr.Logger, desiredPod *corev1.Pod, return nil } -func (r *Reconciler) updateStatusWithDockerImageAndVersion(brokerId int32, brokerConfig *banzaiv1beta1.BrokerConfig, - log logr.Logger) error { - jmxExp := jmxextractor.NewJMXExtractor(r.KafkaCluster.GetNamespace(), - r.KafkaCluster.Spec.GetKubernetesClusterDomain(), r.KafkaCluster.GetName(), log) +// brokerNeedsVersionUpdate returns true when the broker's image/version status is absent, +// incomplete, or stale relative to the desired image — i.e. a JMX fetch is warranted. +func (r *Reconciler) brokerNeedsVersionUpdate(brokerID int32, brokerConfig *banzaiv1beta1.BrokerConfig) bool { + desiredImage := util.GetBrokerImage(brokerConfig, r.KafkaCluster.Spec.GetClusterImage()) + state, ok := r.KafkaCluster.Status.BrokersState[strconv.Itoa(int(brokerID))] + return !ok || state.Version == "" || state.Image != desiredImage +} - kafkaVersion, err := jmxExp.ExtractDockerImageAndVersion(brokerId, brokerConfig, - r.KafkaCluster.Spec.GetClusterImage(), r.KafkaCluster.Spec.HeadlessServiceEnabled) - if err != nil { - return err +type brokerVersionResult struct { + brokerID int32 + kafkaVersion *banzaiv1beta1.KafkaVersion + err error +} + +func (r *Reconciler) updateStatusWithDockerImageAndVersion(brokers map[int32]*banzaiv1beta1.BrokerConfig, log logr.Logger) error { + ch := make(chan brokerVersionResult, len(brokers)) + for brokerID, brokerConfig := range brokers { + go func(id int32, cfg *banzaiv1beta1.BrokerConfig) { + jmxExp := jmxextractor.NewJMXExtractor(r.KafkaCluster.GetNamespace(), r.KafkaCluster.Spec.GetKubernetesClusterDomain(), r.KafkaCluster.GetName(), log) + kv, err := jmxExp.ExtractDockerImageAndVersion(id, cfg, r.KafkaCluster.Spec.GetClusterImage(), r.KafkaCluster.Spec.HeadlessServiceEnabled) + if err != nil { + ch <- brokerVersionResult{brokerID: id, err: err} + return + } + ch <- brokerVersionResult{brokerID: id, kafkaVersion: kv} + }(brokerID, brokerConfig) } - err = k8sutil.UpdateBrokerStatus(r.Client, []string{strconv.Itoa(int(brokerId))}, r.KafkaCluster, - *kafkaVersion, log) - if err != nil { - return err + + for range brokers { + result := <-ch + if result.err != nil { + return result.err + } + if err := k8sutil.UpdateBrokerStatus(r.Client, []string{strconv.Itoa(int(result.brokerID))}, r.KafkaCluster, *result.kafkaVersion, log); err != nil { + return err + } } + return nil } diff --git a/pkg/resources/kafka/kafka_test.go b/pkg/resources/kafka/kafka_test.go index 4ca517e6f..42c17c9ab 100644 --- a/pkg/resources/kafka/kafka_test.go +++ b/pkg/resources/kafka/kafka_test.go @@ -1986,3 +1986,107 @@ func TestGetBrokerAzMap(t *testing.T) { }) } } + +func TestBrokerNeedsVersionUpdate(t *testing.T) { + t.Parallel() + const clusterImage = "apache/kafka:3.4.0" + const updatedImage = "apache/kafka:3.5.0" + + testCases := []struct { + testName string + brokerID int32 + brokerConfig v1beta1.BrokerConfig + clusterImage string + brokersState map[string]v1beta1.BrokerState + expected bool + }{ + { + testName: "no existing status entry triggers update", + brokerID: 0, + brokerConfig: v1beta1.BrokerConfig{}, + clusterImage: clusterImage, + brokersState: map[string]v1beta1.BrokerState{}, + expected: true, + }, + { + testName: "status entry with empty version triggers update", + brokerID: 0, + brokerConfig: v1beta1.BrokerConfig{}, + clusterImage: clusterImage, + brokersState: map[string]v1beta1.BrokerState{ + "0": {Image: clusterImage, Version: ""}, + }, + expected: true, + }, + { + testName: "status entry with different image triggers update", + brokerID: 0, + brokerConfig: v1beta1.BrokerConfig{}, + clusterImage: updatedImage, + brokersState: map[string]v1beta1.BrokerState{ + "0": {Image: clusterImage, Version: "3.4.0"}, + }, + expected: true, + }, + { + testName: "status up to date with cluster image skips update", + brokerID: 0, + brokerConfig: v1beta1.BrokerConfig{}, + clusterImage: clusterImage, + brokersState: map[string]v1beta1.BrokerState{ + "0": {Image: clusterImage, Version: "3.4.0"}, + }, + expected: false, + }, + { + testName: "broker-level image override used instead of cluster image", + brokerID: 0, + brokerConfig: v1beta1.BrokerConfig{Image: "apache/kafka:3.4.1"}, + clusterImage: clusterImage, + brokersState: map[string]v1beta1.BrokerState{ + "0": {Image: "apache/kafka:3.4.1", Version: "3.4.1"}, + }, + expected: false, + }, + { + testName: "broker-level image override differs from recorded image triggers update", + brokerID: 0, + brokerConfig: v1beta1.BrokerConfig{Image: "apache/kafka:3.5.0"}, + clusterImage: clusterImage, + brokersState: map[string]v1beta1.BrokerState{ + "0": {Image: "apache/kafka:3.4.1", Version: "3.4.1"}, + }, + expected: true, + }, + { + testName: "correct state for one broker does not suppress update for another", + brokerID: 1, + brokerConfig: v1beta1.BrokerConfig{}, + clusterImage: clusterImage, + brokersState: map[string]v1beta1.BrokerState{ + "0": {Image: clusterImage, Version: "3.4.0"}, + }, + expected: true, + }, + } + + for _, test := range testCases { + test := test + t.Run(test.testName, func(t *testing.T) { + t.Parallel() + r := Reconciler{ + Reconciler: resources.Reconciler{ + KafkaCluster: &v1beta1.KafkaCluster{ + Spec: v1beta1.KafkaClusterSpec{ + ClusterImage: test.clusterImage, + }, + Status: v1beta1.KafkaClusterStatus{ + BrokersState: test.brokersState, + }, + }, + }, + } + assert.Equal(t, test.expected, r.brokerNeedsVersionUpdate(test.brokerID, &test.brokerConfig)) + }) + } +}