Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,6 @@ spec:
app.kubernetes.io/name: {{ include "kafka-operator.name" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
app.kubernetes.io/component: operator
app: prometheus
component: alertmanager
spec:
{{- with .Values.imagePullSecrets }}
imagePullSecrets:
Expand Down
31 changes: 31 additions & 0 deletions charts/kafka-operator/templates/podmonitor.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{{- if .Values.prometheusMetrics.podMonitor.enabled }}
kind: PodMonitor
apiVersion: monitoring.coreos.com/v1
metadata:
name: {{ include "kafka-operator.fullname" . }}
namespace: {{ .Release.Namespace | quote }}
labels:
helm.sh/chart: {{ include "kafka-operator.chart" . }}
app.kubernetes.io/name: {{ include "kafka-operator.name" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
app.kubernetes.io/version: {{ .Chart.AppVersion }}
app.kubernetes.io/component: operator
{{- with .Values.operator.annotations }}
annotations:
{{- toYaml . | nindent 4 }}
{{- end }}
spec:
namespaceSelector:
matchNames:
- {{ .Release.Namespace }}
selector:
matchLabels:
app.kubernetes.io/name: {{ include "kafka-operator.name" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
app.kubernetes.io/component: operator
endpoints:
- interval: {{ .Values.prometheusMetrics.podMonitor.interval }}
port: metrics
path: /metrics
{{- end }}
4 changes: 4 additions & 0 deletions charts/kafka-operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ prometheusMetrics:
create: true
# -- ServiceAccount used by prometheus auth proxy
name: kafka-operator-authproxy
podMonitor:
# -- If true, create a PodMonitor for Prometheus metrics
enabled: false
interval: 30s

# -- Health probes configuration
healthProbes: {}
Expand Down
4 changes: 3 additions & 1 deletion pkg/jmxextractor/extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"io"
"net/http"
"regexp"
"time"

"github.com/banzaicloud/koperator/api/v1beta1"
"github.com/banzaicloud/koperator/pkg/errorfactory"
Expand Down Expand Up @@ -90,7 +91,8 @@ func (exp *jmxExtractor) ExtractDockerImageAndVersion(brokerId int32, brokerConf
requestURL = fmt.Sprintf(serviceJMXTemplate, exp.clusterName, brokerId,
exp.clusterNamespace, exp.kubernetesClusterDomain, 9020)
}
rsp, err := http.Get(requestURL)
client := &http.Client{Timeout: 30 * time.Second}
rsp, err := client.Get(requestURL)
if err != nil {
exp.log.Error(err, fmt.Sprintf("error during talking to broker-%d", brokerId))
return nil, errorfactory.New(errorfactory.BrokersNotReady{}, err, "unable to talk to ...")
Expand Down
57 changes: 43 additions & 14 deletions pkg/resources/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ func (r *Reconciler) Reconcile(log logr.Logger) error {
reorderedBrokers := reorderBrokers(runningBrokers, boundPersistentVolumeClaims, r.KafkaCluster.Spec.Brokers, r.KafkaCluster.Status.BrokersState, controllerID, log)

allBrokerDynamicConfigSucceeded := true
brokerStatus := make(map[int32]*banzaiv1beta1.BrokerConfig)
for _, broker := range reorderedBrokers {
brokerConfig, err := broker.GetBrokerConfig(r.KafkaCluster.Spec)
if err != nil {
Expand Down Expand Up @@ -449,9 +450,10 @@ func (r *Reconciler) Reconcile(log logr.Logger) error {
if err != nil {
return err
}
if err = r.updateStatusWithDockerImageAndVersion(broker.Id, brokerConfig, log); err != nil {
return err
if r.brokerNeedsVersionUpdate(broker.Id, brokerConfig) {
brokerStatus[broker.Id] = brokerConfig
}

// If dynamic configs can not be set then let the loop continue to the next broker,
// after the loop we return error. This solves that case when other brokers could get healthy,
// but the loop exits too soon because dynamic configs can not be set.
Expand All @@ -474,6 +476,10 @@ func (r *Reconciler) Reconcile(log logr.Logger) error {
return err
}

if err := r.updateStatusWithDockerImageAndVersion(brokerStatus, log); err != nil {
return err
}

// in case HeadlessServiceEnabled is changed, delete the service that was created by the previous
// reconcile flow. The services must be deleted at the end of the reconcile flow after the new services
// were created and broker configurations reflecting the new services otherwise the Kafka brokers
Expand Down Expand Up @@ -917,21 +923,44 @@ func (r *Reconciler) reconcileKafkaPod(log logr.Logger, desiredPod *corev1.Pod,
return nil
}

func (r *Reconciler) updateStatusWithDockerImageAndVersion(brokerId int32, brokerConfig *banzaiv1beta1.BrokerConfig,
log logr.Logger) error {
jmxExp := jmxextractor.NewJMXExtractor(r.KafkaCluster.GetNamespace(),
r.KafkaCluster.Spec.GetKubernetesClusterDomain(), r.KafkaCluster.GetName(), log)
// brokerNeedsVersionUpdate returns true when the broker's image/version status is absent,
// incomplete, or stale relative to the desired image — i.e. a JMX fetch is warranted.
func (r *Reconciler) brokerNeedsVersionUpdate(brokerID int32, brokerConfig *banzaiv1beta1.BrokerConfig) bool {
desiredImage := util.GetBrokerImage(brokerConfig, r.KafkaCluster.Spec.GetClusterImage())
state, ok := r.KafkaCluster.Status.BrokersState[strconv.Itoa(int(brokerID))]
return !ok || state.Version == "" || state.Image != desiredImage
}

kafkaVersion, err := jmxExp.ExtractDockerImageAndVersion(brokerId, brokerConfig,
r.KafkaCluster.Spec.GetClusterImage(), r.KafkaCluster.Spec.HeadlessServiceEnabled)
if err != nil {
return err
type brokerVersionResult struct {
brokerID int32
kafkaVersion *banzaiv1beta1.KafkaVersion
err error
}

func (r *Reconciler) updateStatusWithDockerImageAndVersion(brokers map[int32]*banzaiv1beta1.BrokerConfig, log logr.Logger) error {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Goroutines have no cancellation path. If the controller shuts down mid-reconcile, these goroutines run to completion (or until JMX times out). Worth passing a context.Context from the reconcile call if JMX extraction can be long-running.

ch := make(chan brokerVersionResult, len(brokers))
for brokerID, brokerConfig := range brokers {
go func(id int32, cfg *banzaiv1beta1.BrokerConfig) {
jmxExp := jmxextractor.NewJMXExtractor(r.KafkaCluster.GetNamespace(), r.KafkaCluster.Spec.GetKubernetesClusterDomain(), r.KafkaCluster.GetName(), log)
kv, err := jmxExp.ExtractDockerImageAndVersion(id, cfg, r.KafkaCluster.Spec.GetClusterImage(), r.KafkaCluster.Spec.HeadlessServiceEnabled)
if err != nil {
ch <- brokerVersionResult{brokerID: id, err: err}
return
}
ch <- brokerVersionResult{brokerID: id, kafkaVersion: kv}
}(brokerID, brokerConfig)
}
err = k8sutil.UpdateBrokerStatus(r.Client, []string{strconv.Itoa(int(brokerId))}, r.KafkaCluster,
*kafkaVersion, log)
if err != nil {
return err

for range brokers {
result := <-ch
if result.err != nil {
return result.err
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drain the hole channel first then update the broker status so all or none are updated.

if err := k8sutil.UpdateBrokerStatus(r.Client, []string{strconv.Itoa(int(result.brokerID))}, r.KafkaCluster, *result.kafkaVersion, log); err != nil {
return err
}
}

return nil
}

Expand Down
104 changes: 104 additions & 0 deletions pkg/resources/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1986,3 +1986,107 @@ func TestGetBrokerAzMap(t *testing.T) {
})
}
}

func TestBrokerNeedsVersionUpdate(t *testing.T) {
t.Parallel()
const clusterImage = "apache/kafka:3.4.0"
const updatedImage = "apache/kafka:3.5.0"

testCases := []struct {
testName string
brokerID int32
brokerConfig v1beta1.BrokerConfig
clusterImage string
brokersState map[string]v1beta1.BrokerState
expected bool
}{
{
testName: "no existing status entry triggers update",
brokerID: 0,
brokerConfig: v1beta1.BrokerConfig{},
clusterImage: clusterImage,
brokersState: map[string]v1beta1.BrokerState{},
expected: true,
},
{
testName: "status entry with empty version triggers update",
brokerID: 0,
brokerConfig: v1beta1.BrokerConfig{},
clusterImage: clusterImage,
brokersState: map[string]v1beta1.BrokerState{
"0": {Image: clusterImage, Version: ""},
},
expected: true,
},
{
testName: "status entry with different image triggers update",
brokerID: 0,
brokerConfig: v1beta1.BrokerConfig{},
clusterImage: updatedImage,
brokersState: map[string]v1beta1.BrokerState{
"0": {Image: clusterImage, Version: "3.4.0"},
},
expected: true,
},
{
testName: "status up to date with cluster image skips update",
brokerID: 0,
brokerConfig: v1beta1.BrokerConfig{},
clusterImage: clusterImage,
brokersState: map[string]v1beta1.BrokerState{
"0": {Image: clusterImage, Version: "3.4.0"},
},
expected: false,
},
{
testName: "broker-level image override used instead of cluster image",
brokerID: 0,
brokerConfig: v1beta1.BrokerConfig{Image: "apache/kafka:3.4.1"},
clusterImage: clusterImage,
brokersState: map[string]v1beta1.BrokerState{
"0": {Image: "apache/kafka:3.4.1", Version: "3.4.1"},
},
expected: false,
},
{
testName: "broker-level image override differs from recorded image triggers update",
brokerID: 0,
brokerConfig: v1beta1.BrokerConfig{Image: "apache/kafka:3.5.0"},
clusterImage: clusterImage,
brokersState: map[string]v1beta1.BrokerState{
"0": {Image: "apache/kafka:3.4.1", Version: "3.4.1"},
},
expected: true,
},
{
testName: "correct state for one broker does not suppress update for another",
brokerID: 1,
brokerConfig: v1beta1.BrokerConfig{},
clusterImage: clusterImage,
brokersState: map[string]v1beta1.BrokerState{
"0": {Image: clusterImage, Version: "3.4.0"},
},
expected: true,
},
}

for _, test := range testCases {
test := test
t.Run(test.testName, func(t *testing.T) {
t.Parallel()
r := Reconciler{
Reconciler: resources.Reconciler{
KafkaCluster: &v1beta1.KafkaCluster{
Spec: v1beta1.KafkaClusterSpec{
ClusterImage: test.clusterImage,
},
Status: v1beta1.KafkaClusterStatus{
BrokersState: test.brokersState,
},
},
},
}
assert.Equal(t, test.expected, r.brokerNeedsVersionUpdate(test.brokerID, &test.brokerConfig))
})
}
}
Loading