Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AGENTS.md
66 changes: 51 additions & 15 deletions test/chaos/chaos_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ var _ = Describe("Chaos Under Load (PurgeKeysOnRebalance=true)", Label("chaos",
Expect(framework.CreateRedkeyCluster(ctx, dynamicClient, namespace.Name, clusterName, defaultPrimaries, true)).To(Succeed())

By("waiting for cluster to be ready")
Expect(framework.WaitForChaosReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed())
Expect(framework.WaitForRedkeyClusterReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed())
})

AfterEach(func() {
Expand Down Expand Up @@ -148,7 +148,7 @@ var _ = Describe("Chaos Under Load (PurgeKeysOnRebalance=false)", Label("chaos",
Expect(framework.CreateRedkeyCluster(ctx, dynamicClient, namespace.Name, clusterName, defaultPrimaries, false)).To(Succeed())

By("waiting for cluster to be ready")
Expect(framework.WaitForChaosReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed())
Expect(framework.WaitForRedkeyClusterReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed())
})

AfterEach(func() {
Expand Down Expand Up @@ -222,7 +222,7 @@ var _ = Describe("Topology Corruption Recovery", Label("chaos", "topology"), fun
Expect(framework.CreateRedkeyCluster(ctx, dynamicClient, namespace.Name, clusterName, defaultPrimaries, true)).To(Succeed())

By("waiting for cluster to be ready")
Expect(framework.WaitForChaosReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed())
Expect(framework.WaitForRedkeyClusterReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed())
})

AfterEach(func() {
Expand All @@ -246,7 +246,7 @@ var _ = Describe("Topology Corruption Recovery", Label("chaos", "topology"), fun
// ==================================================================================
It("heals slot ownership conflicts when operator and robin restart", func() {
By("verifying cluster is ready")
Expect(framework.WaitForChaosReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed())
Expect(framework.WaitForRedkeyClusterReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed())

By("scaling operator to 0")
Expect(framework.ScaleOperatorDown(ctx, k8sClientset, namespace.Name)).To(Succeed())
Expand All @@ -264,17 +264,15 @@ var _ = Describe("Topology Corruption Recovery", Label("chaos", "topology"), fun
Expect(framework.ScaleOperatorUp(ctx, k8sClientset, namespace.Name)).To(Succeed())

By("waiting for cluster to heal")
Expect(framework.WaitForChaosReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed())
Expect(framework.AssertAllSlotsAssigned(ctx, k8sClientset, namespace.Name, clusterName)).To(Succeed())
Expect(framework.AssertNoNodesInFailState(ctx, k8sClientset, namespace.Name, clusterName)).To(Succeed())
Expect(framework.WaitForRedkeyClusterReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed())
})

// ==================================================================================
// Scenario 6: Mid-Migration Slot Recovery
// ==================================================================================
It("recovers from mid-migration slots when operator and robin restart", func() {
By("verifying cluster is ready")
Expect(framework.WaitForChaosReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed())
Expect(framework.WaitForRedkeyClusterReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed())

By("scaling operator to 0")
Expect(framework.ScaleOperatorDown(ctx, k8sClientset, namespace.Name)).To(Succeed())
Expand All @@ -292,17 +290,15 @@ var _ = Describe("Topology Corruption Recovery", Label("chaos", "topology"), fun
Expect(framework.ScaleOperatorUp(ctx, k8sClientset, namespace.Name)).To(Succeed())

By("waiting for cluster to heal")
Expect(framework.WaitForChaosReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed())
Expect(framework.AssertAllSlotsAssigned(ctx, k8sClientset, namespace.Name, clusterName)).To(Succeed())
Expect(framework.AssertNoNodesInFailState(ctx, k8sClientset, namespace.Name, clusterName)).To(Succeed())
Expect(framework.WaitForRedkeyClusterReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed())
})

// ==================================================================================
// Scenario 7: Primary to Replica Demotion Recovery
// ==================================================================================
It("recovers from forced primary to replica demotion", func() {
By("verifying cluster is ready")
Expect(framework.WaitForChaosReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed())
Expect(framework.WaitForRedkeyClusterReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed())

targetPod := clusterName + "-0"

Expand All @@ -322,9 +318,49 @@ var _ = Describe("Topology Corruption Recovery", Label("chaos", "topology"), fun
Expect(framework.ScaleOperatorUp(ctx, k8sClientset, namespace.Name)).To(Succeed())

By("waiting for cluster to heal")
Expect(framework.WaitForChaosReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed())
Expect(framework.AssertAllSlotsAssigned(ctx, k8sClientset, namespace.Name, clusterName)).To(Succeed())
Expect(framework.AssertNoNodesInFailState(ctx, k8sClientset, namespace.Name, clusterName)).To(Succeed())
Expect(framework.WaitForRedkeyClusterReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed())
})

// ==================================================================================
// Scenario 8: Robin ConfigMap stale primaries after scale-down (issue #48)
//
// Reproduces the bug where PersistRobinReplicas silently fails during a
// purgeKeysOnRebalance scale-down, leaving the Robin ConfigMap with the
// old (higher) primaries count. Robin then tries to manage ghost nodes
// that no longer exist.
// ==================================================================================
It("recovers when Robin ConfigMap has stale primaries from failed scale-down", func() {
By("verifying cluster is ready with 5 primaries")
Expect(framework.WaitForRedkeyClusterReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed())

By("scaling cluster up to 8 primaries")
Expect(framework.ScaleCluster(ctx, dynamicClient, namespace.Name, clusterName, 8)).To(Succeed())
Expect(framework.WaitForRedkeyClusterReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed())

By("scaling cluster down to 3 primaries")
Expect(framework.ScaleCluster(ctx, dynamicClient, namespace.Name, clusterName, 3)).To(Succeed())
Expect(framework.WaitForRedkeyClusterReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed())

By("scaling operator to 0")
Expect(framework.ScaleOperatorDown(ctx, k8sClientset, namespace.Name)).To(Succeed())

By("scaling robin to 0")
Expect(framework.ScaleRobinDown(ctx, k8sClientset, namespace.Name, clusterName)).To(Succeed())

By("corrupting Robin ConfigMap: primaries=8 status=ScalingUp (simulates issue #48)")
Expect(framework.CorruptRobinConfigMapPrimaries(ctx, k8sClientset, namespace.Name, clusterName, 8, "ScalingUp")).To(Succeed())

By("corrupting CR status: status=ScalingDown substatus=EndingFastScaling (simulates stuck doFastScaling)")
Expect(framework.CorruptCRStatus(ctx, dynamicClient, namespace.Name, clusterName, "ScalingDown", "EndingFastScaling")).To(Succeed())

By("scaling robin to 1")
Expect(framework.ScaleRobinUp(ctx, k8sClientset, namespace.Name, clusterName)).To(Succeed())

By("scaling operator to 1")
Expect(framework.ScaleOperatorUp(ctx, k8sClientset, namespace.Name)).To(Succeed())

By("waiting for cluster to heal — expected to timeout due to issue #48")
Expect(framework.WaitForRedkeyClusterReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed())
})
})

Expand Down
66 changes: 7 additions & 59 deletions test/chaos/framework/readiness.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ import (
)

const (
defaultChaosReadyTimeout = 10 * time.Minute
pollInterval = 2 * time.Second
defaultReadyTimeout = 10 * time.Minute
pollInterval = 2 * time.Second
)

// WaitForChaosReady waits for the Redis cluster to be fully healthy.
// Checks: CR status == Ready, redis-cli --cluster check passes, no fail/migrating states.
func WaitForChaosReady(ctx context.Context, dc dynamic.Interface, clientset kubernetes.Interface, namespace, clusterName string, timeout time.Duration) error {
// WaitForRedkeyClusterReady waits for the Redis cluster to be fully healthy.
// Checks: CR status == Ready, pod count matches spec, redis-cli --cluster check passes, no fail/migrating states.
func WaitForRedkeyClusterReady(ctx context.Context, dc dynamic.Interface, clientset kubernetes.Interface, namespace, clusterName string, timeout time.Duration) error {
if timeout == 0 {
timeout = defaultChaosReadyTimeout
timeout = defaultReadyTimeout
}

var lastReason string
Expand Down Expand Up @@ -108,7 +108,7 @@ func WaitForChaosReady(ctx context.Context, dc dynamic.Interface, clientset kube
return true, nil
})
if err != nil && lastReason != "" {
return fmt.Errorf("WaitForChaosReady(%s/%s): last check: %s: %w", namespace, clusterName, lastReason, err)
return fmt.Errorf("WaitForRedkeyClusterReady(%s/%s): last check: %s: %w", namespace, clusterName, lastReason, err)
}
return err
}
Expand All @@ -130,55 +130,3 @@ func clusterNodesHasFailure(ctx context.Context, namespace, podName string) bool
}
return strings.Contains(stdout, "fail") || strings.Contains(stdout, "->") || strings.Contains(stdout, "<-")
}

// AssertAllSlotsAssigned verifies that all 16384 slots are assigned.
func AssertAllSlotsAssigned(ctx context.Context, clientset kubernetes.Interface, namespace, clusterName string) error {
pods, err := clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
LabelSelector: RedisPodsSelector(clusterName),
})
if err != nil {
return err
}

if len(pods.Items) == 0 {
return fmt.Errorf("no redis pods found")
}

stdout, _, err := RemoteCommand(ctx, namespace, pods.Items[0].Name, "redis-cli cluster info")
if err != nil {
return fmt.Errorf("failed to get cluster info: %w", err)
}

if !strings.Contains(stdout, "cluster_slots_ok:16384") {
return fmt.Errorf("not all slots assigned: %s", stdout)
}

return nil
}

// AssertNoNodesInFailState verifies no nodes are in fail state.
func AssertNoNodesInFailState(ctx context.Context, clientset kubernetes.Interface, namespace, clusterName string) error {
pods, err := clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
LabelSelector: RedisPodsSelector(clusterName),
})
if err != nil {
return err
}

if len(pods.Items) == 0 {
return fmt.Errorf("no redis pods found")
}

for _, pod := range pods.Items {
stdout, _, err := RemoteCommand(ctx, namespace, pod.Name, "redis-cli cluster nodes")
if err != nil {
return fmt.Errorf("failed to get cluster nodes from %s: %w", pod.Name, err)
}

if strings.Contains(stdout, "fail") {
return fmt.Errorf("node in fail state detected in pod %s: %s", pod.Name, stdout)
}
}

return nil
}
78 changes: 78 additions & 0 deletions test/chaos/framework/redis_chaos.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,17 @@ import (
"time"

"github.com/onsi/ginkgo/v2"
"gopkg.in/yaml.v3"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"
"k8s.io/utils/ptr"

"github.com/inditextech/redkeyoperator/internal/robin"
)

const (
Expand Down Expand Up @@ -312,9 +316,83 @@ func ForcePrimaryToReplica(ctx context.Context, clientset kubernetes.Interface,
return nil
}

// CorruptCRStatus overwrites the CR status subresource to simulate the
// operator being stuck mid-reconcile. This is needed to reproduce issue #48:
// the operator loops in doFastScaling → SubstatusEndingFastScaling, polling
// Robin forever because the Robin ConfigMap has stale primaries.
//
// The caller must stop the operator BEFORE calling this function to prevent
// the operator from immediately reconciling the status back.
func CorruptCRStatus(ctx context.Context, dc dynamic.Interface, namespace, clusterName, status, substatus string) error {
unstr, err := dc.Resource(RedkeyClusterGVR).Namespace(namespace).Get(ctx, clusterName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("get RedkeyCluster %s/%s: %w", namespace, clusterName, err)
}

currentStatus, _, _ := unstructured.NestedString(unstr.Object, "status", "status")
currentSubstatus, _, _ := unstructured.NestedString(unstr.Object, "status", "substatus", "status")

ginkgo.GinkgoWriter.Printf("CorruptCRStatus: %s/%s: status %q -> %q, substatus %q -> %q\n",
namespace, clusterName, currentStatus, status, currentSubstatus, substatus)

if err := unstructured.SetNestedField(unstr.Object, status, "status", "status"); err != nil {
return fmt.Errorf("set status.status on %s/%s: %w", namespace, clusterName, err)
}
if err := unstructured.SetNestedField(unstr.Object, substatus, "status", "substatus", "status"); err != nil {
return fmt.Errorf("set status.substatus.status on %s/%s: %w", namespace, clusterName, err)
}

if _, err := dc.Resource(RedkeyClusterGVR).Namespace(namespace).UpdateStatus(ctx, unstr, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("update status subresource for %s/%s: %w", namespace, clusterName, err)
}

return nil
}

func trimNewline(s string) string {
for len(s) > 0 && (s[len(s)-1] == '\n' || s[len(s)-1] == '\r') {
s = s[:len(s)-1]
}
return s
}

// CorruptRobinConfigMapPrimaries overwrites the primaries and status fields
// in the Robin ConfigMap. This simulates the state left behind when
// PersistRobinReplicas silently fails during a purgeKeysOnRebalance
// scale-down (issue #48): the ConfigMap retains the old (higher) primaries
// count while the cluster has already scaled down to fewer nodes.
func CorruptRobinConfigMapPrimaries(ctx context.Context, clientset kubernetes.Interface, namespace, clusterName string, primaries int, status string) error {
cmName := clusterName + "-robin"
cm, err := clientset.CoreV1().ConfigMaps(namespace).Get(ctx, cmName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("get ConfigMap %s/%s: %w", namespace, cmName, err)
}

data, ok := cm.Data["application-configmap.yml"]
if !ok {
return fmt.Errorf("ConfigMap %s/%s has no key 'application-configmap.yml'", namespace, cmName)
}

var config robin.Configuration
if err := yaml.Unmarshal([]byte(data), &config); err != nil {
return fmt.Errorf("unmarshal robin config from %s/%s: %w", namespace, cmName, err)
}

ginkgo.GinkgoWriter.Printf("CorruptRobinConfigMapPrimaries: %s/%s: primaries %d -> %d, status %q -> %q\n",
namespace, cmName, config.Redis.Cluster.Primaries, primaries, config.Redis.Cluster.Status, status)

config.Redis.Cluster.Primaries = primaries
config.Redis.Cluster.Status = status

updated, err := yaml.Marshal(config)
if err != nil {
return fmt.Errorf("marshal robin config: %w", err)
}

cm.Data["application-configmap.yml"] = string(updated)
if _, err := clientset.CoreV1().ConfigMaps(namespace).Update(ctx, cm, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("update ConfigMap %s/%s: %w", namespace, cmName, err)
}

return nil
}
Loading
Loading