diff --git a/AGENTS.md b/AGENTS.md new file mode 120000 index 0000000..cc0327f --- /dev/null +++ b/AGENTS.md @@ -0,0 +1 @@ +/home/dani/org/srcs/redkeyoperator-chaostest/AGENTS.md \ No newline at end of file diff --git a/test/chaos/chaos_suite_test.go b/test/chaos/chaos_suite_test.go index 08f403d..c0c6de6 100644 --- a/test/chaos/chaos_suite_test.go +++ b/test/chaos/chaos_suite_test.go @@ -64,7 +64,7 @@ var _ = Describe("Chaos Under Load (PurgeKeysOnRebalance=true)", Label("chaos", Expect(framework.CreateRedkeyCluster(ctx, dynamicClient, namespace.Name, clusterName, defaultPrimaries, true)).To(Succeed()) By("waiting for cluster to be ready") - Expect(framework.WaitForChaosReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed()) + Expect(framework.WaitForRedkeyClusterReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed()) }) AfterEach(func() { @@ -148,7 +148,7 @@ var _ = Describe("Chaos Under Load (PurgeKeysOnRebalance=false)", Label("chaos", Expect(framework.CreateRedkeyCluster(ctx, dynamicClient, namespace.Name, clusterName, defaultPrimaries, false)).To(Succeed()) By("waiting for cluster to be ready") - Expect(framework.WaitForChaosReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed()) + Expect(framework.WaitForRedkeyClusterReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed()) }) AfterEach(func() { @@ -222,7 +222,7 @@ var _ = Describe("Topology Corruption Recovery", Label("chaos", "topology"), fun Expect(framework.CreateRedkeyCluster(ctx, dynamicClient, namespace.Name, clusterName, defaultPrimaries, true)).To(Succeed()) By("waiting for cluster to be ready") - Expect(framework.WaitForChaosReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed()) + Expect(framework.WaitForRedkeyClusterReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed()) }) AfterEach(func() { @@ -246,7 +246,7 @@ var _ = Describe("Topology Corruption Recovery", Label("chaos", "topology"), fun // ================================================================================== It("heals slot ownership conflicts when operator and robin restart", func() { By("verifying cluster is ready") - Expect(framework.WaitForChaosReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed()) + Expect(framework.WaitForRedkeyClusterReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed()) By("scaling operator to 0") Expect(framework.ScaleOperatorDown(ctx, k8sClientset, namespace.Name)).To(Succeed()) @@ -264,9 +264,7 @@ var _ = Describe("Topology Corruption Recovery", Label("chaos", "topology"), fun Expect(framework.ScaleOperatorUp(ctx, k8sClientset, namespace.Name)).To(Succeed()) By("waiting for cluster to heal") - Expect(framework.WaitForChaosReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed()) - Expect(framework.AssertAllSlotsAssigned(ctx, k8sClientset, namespace.Name, clusterName)).To(Succeed()) - Expect(framework.AssertNoNodesInFailState(ctx, k8sClientset, namespace.Name, clusterName)).To(Succeed()) + Expect(framework.WaitForRedkeyClusterReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed()) }) // ================================================================================== @@ -274,7 +272,7 @@ var _ = Describe("Topology Corruption Recovery", Label("chaos", "topology"), fun // ================================================================================== It("recovers from mid-migration slots when operator and robin restart", func() { By("verifying cluster is ready") - Expect(framework.WaitForChaosReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed()) + Expect(framework.WaitForRedkeyClusterReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed()) By("scaling operator to 0") Expect(framework.ScaleOperatorDown(ctx, k8sClientset, namespace.Name)).To(Succeed()) @@ -292,9 +290,7 @@ var _ = Describe("Topology Corruption Recovery", Label("chaos", "topology"), fun Expect(framework.ScaleOperatorUp(ctx, k8sClientset, namespace.Name)).To(Succeed()) By("waiting for cluster to heal") - Expect(framework.WaitForChaosReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed()) - Expect(framework.AssertAllSlotsAssigned(ctx, k8sClientset, namespace.Name, clusterName)).To(Succeed()) - Expect(framework.AssertNoNodesInFailState(ctx, k8sClientset, namespace.Name, clusterName)).To(Succeed()) + Expect(framework.WaitForRedkeyClusterReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed()) }) // ================================================================================== @@ -302,7 +298,7 @@ var _ = Describe("Topology Corruption Recovery", Label("chaos", "topology"), fun // ================================================================================== It("recovers from forced primary to replica demotion", func() { By("verifying cluster is ready") - Expect(framework.WaitForChaosReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed()) + Expect(framework.WaitForRedkeyClusterReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed()) targetPod := clusterName + "-0" @@ -322,9 +318,49 @@ var _ = Describe("Topology Corruption Recovery", Label("chaos", "topology"), fun Expect(framework.ScaleOperatorUp(ctx, k8sClientset, namespace.Name)).To(Succeed()) By("waiting for cluster to heal") - Expect(framework.WaitForChaosReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed()) - Expect(framework.AssertAllSlotsAssigned(ctx, k8sClientset, namespace.Name, clusterName)).To(Succeed()) - Expect(framework.AssertNoNodesInFailState(ctx, k8sClientset, namespace.Name, clusterName)).To(Succeed()) + Expect(framework.WaitForRedkeyClusterReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed()) + }) + + // ================================================================================== + // Scenario 8: Robin ConfigMap stale primaries after scale-down (issue #48) + // + // Reproduces the bug where PersistRobinReplicas silently fails during a + // purgeKeysOnRebalance scale-down, leaving the Robin ConfigMap with the + // old (higher) primaries count. Robin then tries to manage ghost nodes + // that no longer exist. + // ================================================================================== + It("recovers when Robin ConfigMap has stale primaries from failed scale-down", func() { + By("verifying cluster is ready with 5 primaries") + Expect(framework.WaitForRedkeyClusterReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed()) + + By("scaling cluster up to 8 primaries") + Expect(framework.ScaleCluster(ctx, dynamicClient, namespace.Name, clusterName, 8)).To(Succeed()) + Expect(framework.WaitForRedkeyClusterReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed()) + + By("scaling cluster down to 3 primaries") + Expect(framework.ScaleCluster(ctx, dynamicClient, namespace.Name, clusterName, 3)).To(Succeed()) + Expect(framework.WaitForRedkeyClusterReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed()) + + By("scaling operator to 0") + Expect(framework.ScaleOperatorDown(ctx, k8sClientset, namespace.Name)).To(Succeed()) + + By("scaling robin to 0") + Expect(framework.ScaleRobinDown(ctx, k8sClientset, namespace.Name, clusterName)).To(Succeed()) + + By("corrupting Robin ConfigMap: primaries=8 status=ScalingUp (simulates issue #48)") + Expect(framework.CorruptRobinConfigMapPrimaries(ctx, k8sClientset, namespace.Name, clusterName, 8, "ScalingUp")).To(Succeed()) + + By("corrupting CR status: status=ScalingDown substatus=EndingFastScaling (simulates stuck doFastScaling)") + Expect(framework.CorruptCRStatus(ctx, dynamicClient, namespace.Name, clusterName, "ScalingDown", "EndingFastScaling")).To(Succeed()) + + By("scaling robin to 1") + Expect(framework.ScaleRobinUp(ctx, k8sClientset, namespace.Name, clusterName)).To(Succeed()) + + By("scaling operator to 1") + Expect(framework.ScaleOperatorUp(ctx, k8sClientset, namespace.Name)).To(Succeed()) + + By("waiting for cluster to heal — expected to timeout due to issue #48") + Expect(framework.WaitForRedkeyClusterReady(ctx, dynamicClient, k8sClientset, namespace.Name, clusterName, chaosReadyTimeout)).To(Succeed()) }) }) diff --git a/test/chaos/framework/readiness.go b/test/chaos/framework/readiness.go index 757a4a4..e347e15 100644 --- a/test/chaos/framework/readiness.go +++ b/test/chaos/framework/readiness.go @@ -20,15 +20,15 @@ import ( ) const ( - defaultChaosReadyTimeout = 10 * time.Minute - pollInterval = 2 * time.Second + defaultReadyTimeout = 10 * time.Minute + pollInterval = 2 * time.Second ) -// WaitForChaosReady waits for the Redis cluster to be fully healthy. -// Checks: CR status == Ready, redis-cli --cluster check passes, no fail/migrating states. -func WaitForChaosReady(ctx context.Context, dc dynamic.Interface, clientset kubernetes.Interface, namespace, clusterName string, timeout time.Duration) error { +// WaitForRedkeyClusterReady waits for the Redis cluster to be fully healthy. +// Checks: CR status == Ready, pod count matches spec, redis-cli --cluster check passes, no fail/migrating states. +func WaitForRedkeyClusterReady(ctx context.Context, dc dynamic.Interface, clientset kubernetes.Interface, namespace, clusterName string, timeout time.Duration) error { if timeout == 0 { - timeout = defaultChaosReadyTimeout + timeout = defaultReadyTimeout } var lastReason string @@ -108,7 +108,7 @@ func WaitForChaosReady(ctx context.Context, dc dynamic.Interface, clientset kube return true, nil }) if err != nil && lastReason != "" { - return fmt.Errorf("WaitForChaosReady(%s/%s): last check: %s: %w", namespace, clusterName, lastReason, err) + return fmt.Errorf("WaitForRedkeyClusterReady(%s/%s): last check: %s: %w", namespace, clusterName, lastReason, err) } return err } @@ -130,55 +130,3 @@ func clusterNodesHasFailure(ctx context.Context, namespace, podName string) bool } return strings.Contains(stdout, "fail") || strings.Contains(stdout, "->") || strings.Contains(stdout, "<-") } - -// AssertAllSlotsAssigned verifies that all 16384 slots are assigned. -func AssertAllSlotsAssigned(ctx context.Context, clientset kubernetes.Interface, namespace, clusterName string) error { - pods, err := clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{ - LabelSelector: RedisPodsSelector(clusterName), - }) - if err != nil { - return err - } - - if len(pods.Items) == 0 { - return fmt.Errorf("no redis pods found") - } - - stdout, _, err := RemoteCommand(ctx, namespace, pods.Items[0].Name, "redis-cli cluster info") - if err != nil { - return fmt.Errorf("failed to get cluster info: %w", err) - } - - if !strings.Contains(stdout, "cluster_slots_ok:16384") { - return fmt.Errorf("not all slots assigned: %s", stdout) - } - - return nil -} - -// AssertNoNodesInFailState verifies no nodes are in fail state. -func AssertNoNodesInFailState(ctx context.Context, clientset kubernetes.Interface, namespace, clusterName string) error { - pods, err := clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{ - LabelSelector: RedisPodsSelector(clusterName), - }) - if err != nil { - return err - } - - if len(pods.Items) == 0 { - return fmt.Errorf("no redis pods found") - } - - for _, pod := range pods.Items { - stdout, _, err := RemoteCommand(ctx, namespace, pod.Name, "redis-cli cluster nodes") - if err != nil { - return fmt.Errorf("failed to get cluster nodes from %s: %w", pod.Name, err) - } - - if strings.Contains(stdout, "fail") { - return fmt.Errorf("node in fail state detected in pod %s: %s", pod.Name, stdout) - } - } - - return nil -} diff --git a/test/chaos/framework/redis_chaos.go b/test/chaos/framework/redis_chaos.go index 5cd35cd..3d80844 100644 --- a/test/chaos/framework/redis_chaos.go +++ b/test/chaos/framework/redis_chaos.go @@ -12,13 +12,17 @@ import ( "time" "github.com/onsi/ginkgo/v2" + "gopkg.in/yaml.v3" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/client-go/util/retry" "k8s.io/utils/ptr" + + "github.com/inditextech/redkeyoperator/internal/robin" ) const ( @@ -312,9 +316,83 @@ func ForcePrimaryToReplica(ctx context.Context, clientset kubernetes.Interface, return nil } +// CorruptCRStatus overwrites the CR status subresource to simulate the +// operator being stuck mid-reconcile. This is needed to reproduce issue #48: +// the operator loops in doFastScaling → SubstatusEndingFastScaling, polling +// Robin forever because the Robin ConfigMap has stale primaries. +// +// The caller must stop the operator BEFORE calling this function to prevent +// the operator from immediately reconciling the status back. +func CorruptCRStatus(ctx context.Context, dc dynamic.Interface, namespace, clusterName, status, substatus string) error { + unstr, err := dc.Resource(RedkeyClusterGVR).Namespace(namespace).Get(ctx, clusterName, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("get RedkeyCluster %s/%s: %w", namespace, clusterName, err) + } + + currentStatus, _, _ := unstructured.NestedString(unstr.Object, "status", "status") + currentSubstatus, _, _ := unstructured.NestedString(unstr.Object, "status", "substatus", "status") + + ginkgo.GinkgoWriter.Printf("CorruptCRStatus: %s/%s: status %q -> %q, substatus %q -> %q\n", + namespace, clusterName, currentStatus, status, currentSubstatus, substatus) + + if err := unstructured.SetNestedField(unstr.Object, status, "status", "status"); err != nil { + return fmt.Errorf("set status.status on %s/%s: %w", namespace, clusterName, err) + } + if err := unstructured.SetNestedField(unstr.Object, substatus, "status", "substatus", "status"); err != nil { + return fmt.Errorf("set status.substatus.status on %s/%s: %w", namespace, clusterName, err) + } + + if _, err := dc.Resource(RedkeyClusterGVR).Namespace(namespace).UpdateStatus(ctx, unstr, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("update status subresource for %s/%s: %w", namespace, clusterName, err) + } + + return nil +} + func trimNewline(s string) string { for len(s) > 0 && (s[len(s)-1] == '\n' || s[len(s)-1] == '\r') { s = s[:len(s)-1] } return s } + +// CorruptRobinConfigMapPrimaries overwrites the primaries and status fields +// in the Robin ConfigMap. This simulates the state left behind when +// PersistRobinReplicas silently fails during a purgeKeysOnRebalance +// scale-down (issue #48): the ConfigMap retains the old (higher) primaries +// count while the cluster has already scaled down to fewer nodes. +func CorruptRobinConfigMapPrimaries(ctx context.Context, clientset kubernetes.Interface, namespace, clusterName string, primaries int, status string) error { + cmName := clusterName + "-robin" + cm, err := clientset.CoreV1().ConfigMaps(namespace).Get(ctx, cmName, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("get ConfigMap %s/%s: %w", namespace, cmName, err) + } + + data, ok := cm.Data["application-configmap.yml"] + if !ok { + return fmt.Errorf("ConfigMap %s/%s has no key 'application-configmap.yml'", namespace, cmName) + } + + var config robin.Configuration + if err := yaml.Unmarshal([]byte(data), &config); err != nil { + return fmt.Errorf("unmarshal robin config from %s/%s: %w", namespace, cmName, err) + } + + ginkgo.GinkgoWriter.Printf("CorruptRobinConfigMapPrimaries: %s/%s: primaries %d -> %d, status %q -> %q\n", + namespace, cmName, config.Redis.Cluster.Primaries, primaries, config.Redis.Cluster.Status, status) + + config.Redis.Cluster.Primaries = primaries + config.Redis.Cluster.Status = status + + updated, err := yaml.Marshal(config) + if err != nil { + return fmt.Errorf("marshal robin config: %w", err) + } + + cm.Data["application-configmap.yml"] = string(updated) + if _, err := clientset.CoreV1().ConfigMaps(namespace).Update(ctx, cm, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("update ConfigMap %s/%s: %w", namespace, cmName, err) + } + + return nil +} diff --git a/test/chaos/helpers_test.go b/test/chaos/helpers_test.go index 5d2778c..649c909 100644 --- a/test/chaos/helpers_test.go +++ b/test/chaos/helpers_test.go @@ -30,16 +30,10 @@ func stopK6Load(namespace, depName string) { Expect(framework.StopK6Load(ctx, k8sClientset, namespace, depName)).To(Succeed(), "failed to stop k6 deployment %s in namespace %s", depName, namespace) } -// verifyClusterHealthy runs all cluster health checks. +// verifyClusterHealthy waits for the cluster to be fully healthy. func verifyClusterHealthy(namespace, clusterName string) { By("verifying cluster readiness") - Expect(framework.WaitForChaosReady(ctx, dynamicClient, k8sClientset, namespace, clusterName, chaosReadyTimeout)).To(Succeed()) - - By("verifying all slots assigned") - Expect(framework.AssertAllSlotsAssigned(ctx, k8sClientset, namespace, clusterName)).To(Succeed()) - - By("verifying no nodes in fail state") - Expect(framework.AssertNoNodesInFailState(ctx, k8sClientset, namespace, clusterName)).To(Succeed()) + Expect(framework.WaitForRedkeyClusterReady(ctx, dynamicClient, k8sClientset, namespace, clusterName, chaosReadyTimeout)).To(Succeed()) } // --------------------------------------------------------------------------- @@ -86,7 +80,7 @@ func runScalingChaos(rng *rand.Rand, namespace, clusterName string, purgeKeysOnR GinkgoWriter.Printf("Deleted pods: %v\n", deleted) By(fmt.Sprintf("iteration %d/%d: waiting for cluster recovery", i, chaosIterations)) - Expect(framework.WaitForChaosReady(ctx, dynamicClient, k8sClientset, namespace, clusterName, chaosReadyTimeout)).To(Succeed(), + Expect(framework.WaitForRedkeyClusterReady(ctx, dynamicClient, k8sClientset, namespace, clusterName, chaosReadyTimeout)).To(Succeed(), fmt.Sprintf("iteration %d/%d: cluster did not recover after pod deletion", i, chaosIterations)) // Verify the cluster spec matches what we scaled to. @@ -102,7 +96,7 @@ func runScalingChaos(rng *rand.Rand, namespace, clusterName string, purgeKeysOnR Expect(framework.ScaleCluster(ctx, dynamicClient, namespace, clusterName, downSize)).To(Succeed(), fmt.Sprintf("iteration %d/%d: failed to scale cluster down to %d", i, chaosIterations, downSize)) - Expect(framework.WaitForChaosReady(ctx, dynamicClient, k8sClientset, namespace, clusterName, chaosReadyTimeout)).To(Succeed(), + Expect(framework.WaitForRedkeyClusterReady(ctx, dynamicClient, k8sClientset, namespace, clusterName, chaosReadyTimeout)).To(Succeed(), fmt.Sprintf("iteration %d/%d: cluster did not become ready after scaling down", i, chaosIterations)) // Verify the cluster spec matches what we scaled to. @@ -143,7 +137,7 @@ func runOperatorDeletionChaos(rng *rand.Rand, namespace, clusterName string) str GinkgoWriter.Printf("Deleted pods: %v\n", deleted) By(fmt.Sprintf("iteration %d/%d: waiting for recovery", i, chaosIterations)) - Expect(framework.WaitForChaosReady(ctx, dynamicClient, k8sClientset, namespace, clusterName, chaosReadyTimeout)).To(Succeed(), + Expect(framework.WaitForRedkeyClusterReady(ctx, dynamicClient, k8sClientset, namespace, clusterName, chaosReadyTimeout)).To(Succeed(), fmt.Sprintf("iteration %d/%d: cluster did not recover after operator deletion", i, chaosIterations)) // Rate limit between iterations @@ -154,7 +148,7 @@ func runOperatorDeletionChaos(rng *rand.Rand, namespace, clusterName string) str stopK6Load(namespace, k6DepName) By("verifying final cluster state") - Expect(framework.WaitForChaosReady(ctx, dynamicClient, k8sClientset, namespace, clusterName, chaosReadyTimeout)).To(Succeed()) + Expect(framework.WaitForRedkeyClusterReady(ctx, dynamicClient, k8sClientset, namespace, clusterName, chaosReadyTimeout)).To(Succeed()) return k6DepName } @@ -180,7 +174,7 @@ func runRobinDeletionChaos(rng *rand.Rand, namespace, clusterName string) string GinkgoWriter.Printf("Deleted pods: %v\n", deletedRedis) By(fmt.Sprintf("iteration %d/%d: waiting for recovery", i, chaosIterations)) - Expect(framework.WaitForChaosReady(ctx, dynamicClient, k8sClientset, namespace, clusterName, chaosReadyTimeout)).To(Succeed(), + Expect(framework.WaitForRedkeyClusterReady(ctx, dynamicClient, k8sClientset, namespace, clusterName, chaosReadyTimeout)).To(Succeed(), fmt.Sprintf("iteration %d/%d: cluster did not recover after robin deletion", i, chaosIterations)) // Rate limit between iterations @@ -191,7 +185,7 @@ func runRobinDeletionChaos(rng *rand.Rand, namespace, clusterName string) string stopK6Load(namespace, k6DepName) By("verifying final cluster state") - Expect(framework.WaitForChaosReady(ctx, dynamicClient, k8sClientset, namespace, clusterName, chaosReadyTimeout)).To(Succeed()) + Expect(framework.WaitForRedkeyClusterReady(ctx, dynamicClient, k8sClientset, namespace, clusterName, chaosReadyTimeout)).To(Succeed()) return k6DepName } @@ -247,7 +241,7 @@ func runFullChaos(rng *rand.Rand, namespace, clusterName string) string { } By(fmt.Sprintf("iteration %d/%d: waiting for recovery", i, chaosIterations)) - Expect(framework.WaitForChaosReady(ctx, dynamicClient, k8sClientset, namespace, clusterName, chaosReadyTimeout)).To(Succeed(), + Expect(framework.WaitForRedkeyClusterReady(ctx, dynamicClient, k8sClientset, namespace, clusterName, chaosReadyTimeout)).To(Succeed(), fmt.Sprintf("iteration %d/%d: cluster did not recover after chaos actions", i, chaosIterations)) // Verify the cluster spec matches what we expect after recovery.