diff --git a/pkg/service/repair/plan.go b/pkg/service/repair/plan.go index 07bd779913..cd15c8d933 100644 --- a/pkg/service/repair/plan.go +++ b/pkg/service/repair/plan.go @@ -49,18 +49,24 @@ func newPlan(ctx context.Context, target Target, client *scyllaclient.Client) (* } skip := false - for _, rep := range ring.ReplicaTokens { - rtr := scyllaclient.ReplicaTokenRanges{ - ReplicaSet: filteredReplicaSet(rep.ReplicaSet, filtered, target.Host), - Ranges: rep.Ranges, + for _, rtr := range ring.ReplicaTokens { + // Skip the whole keyspace based on repaired dcs only + // (unless it's a single node cluster). + replicas := 0 + for _, h := range rtr.ReplicaSet { + if slice.ContainsString(target.DC, ring.HostDC[h]) { + replicas++ + } } - - // Don't add keyspace with some ranges not replicated in filtered hosts, - // unless it's a single node cluster. - if len(rtr.ReplicaSet) <= 1 && len(status) > 1 { + if replicas <= 1 && len(status) > 1 { skip = true break } + // Skip given replica sets based on all filtering factors + rtr.ReplicaSet = filteredReplicaSet(rtr.ReplicaSet, filtered, target.Host) + if len(rtr.ReplicaSet) <= 1 && len(status) > 1 { + continue + } for _, r := range rtr.Ranges { kp.TokenRepIdx[r] = len(kp.Replicas) @@ -68,7 +74,7 @@ func newPlan(ctx context.Context, target Target, client *scyllaclient.Client) (* kp.Replicas = append(kp.Replicas, rtr) } - if skip { + if skip || len(kp.Replicas) == 0 { p.SkippedKeyspaces = append(p.SkippedKeyspaces, u.Keyspace) continue } diff --git a/pkg/service/repair/service_repair_integration_test.go b/pkg/service/repair/service_repair_integration_test.go index 5526d9aba7..dd440ca991 100644 --- a/pkg/service/repair/service_repair_integration_test.go +++ b/pkg/service/repair/service_repair_integration_test.go @@ -146,7 +146,7 @@ const ( shortWait = 60 * time.Second // longWait specifies that condition shall be met after a while, this is // useful for waiting for repair to significantly advance or finish. - longWait = 20 * time.Second + longWait = 2 * shortWait _interval = 500 * time.Millisecond ) @@ -1122,7 +1122,7 @@ func TestServiceRepairIntegration(t *testing.T) { h := newRepairTestHelper(t, session, defaultConfig()) clusterSession := CreateSessionAndDropAllKeyspaces(t, h.Client) - createKeyspace(t, clusterSession, "test_repair", 3, 3) + createKeyspace(t, clusterSession, "test_repair", 2, 2) WriteData(t, clusterSession, "test_repair", 1, "test_table_0", "test_table_1") defer dropKeyspace(t, clusterSession, "test_repair") @@ -1167,7 +1167,7 @@ func TestServiceRepairIntegration(t *testing.T) { h.assertRunning(shortWait) Print("When: repair is done") - h.assertDone(2 * longWait) + h.assertDone(longWait) Print("Then: dc2 is used for repair") prog, err := h.service.GetProgress(context.Background(), h.ClusterID, h.TaskID, h.RunID) @@ -1211,7 +1211,7 @@ func TestServiceRepairIntegration(t *testing.T) { h.assertRunning(shortWait) Print("When: repair is done") - h.assertDone(2 * longWait) + h.assertDone(longWait) Print("Then: ignored node is not repaired") prog, err := h.service.GetProgress(context.Background(), h.ClusterID, h.TaskID, h.RunID) @@ -1242,7 +1242,7 @@ func TestServiceRepairIntegration(t *testing.T) { h.assertRunning(shortWait) Print("When: repair is done") - h.assertDone(2 * longWait) + h.assertDone(longWait) }) t.Run("repair dc local keyspace mismatch", func(t *testing.T) { @@ -1472,7 +1472,7 @@ func TestServiceRepairIntegration(t *testing.T) { h.assertParallelIntensity(controlParallel, controlIntensity) Print("Then: repair is done") - h.assertDone(3 * longWait) + h.assertDone(longWait) Print("Then: assert resumed, finished parallel/intensity from control") h.assertParallelIntensity(controlParallel, controlIntensity) @@ -1488,7 +1488,7 @@ func TestServiceRepairIntegration(t *testing.T) { h.assertParallelIntensity(propParallel, propIntensity) Print("Then: repair is done") - h.assertDone(3 * longWait) + h.assertDone(longWait) Print("Then: assert fresh, finished repair parallel/intensity from control") h.assertParallelIntensity(propParallel, propIntensity) @@ -1901,7 +1901,7 @@ func TestServiceRepairIntegration(t *testing.T) { }) Print("Then: repair is done") - h.assertDone(3 * longWait) + h.assertDone(longWait) Print("And: more than one repair jobs were scheduled") if repairCalled <= 1 { @@ -1990,7 +1990,7 @@ func TestServiceRepairIntegration(t *testing.T) { }) Print("Then: repair is done") - h.assertDone(3 * longWait) + h.assertDone(longWait) Print("And: jobs were scheduled") if repairCalled < 1 {