Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Repair: fix repair --host filtering #3737

Merged
merged 2 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions pkg/service/repair/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,26 +49,32 @@ func newPlan(ctx context.Context, target Target, client *scyllaclient.Client) (*
}

skip := false
for _, rep := range ring.ReplicaTokens {
rtr := scyllaclient.ReplicaTokenRanges{
ReplicaSet: filteredReplicaSet(rep.ReplicaSet, filtered, target.Host),
Ranges: rep.Ranges,
for _, rtr := range ring.ReplicaTokens {
// Skip the whole keyspace based on repaired dcs only
// (unless it's a single node cluster).
replicas := 0
for _, h := range rtr.ReplicaSet {
if slice.ContainsString(target.DC, ring.HostDC[h]) {
replicas++
}
}

// Don't add keyspace with some ranges not replicated in filtered hosts,
// unless it's a single node cluster.
if len(rtr.ReplicaSet) <= 1 && len(status) > 1 {
if replicas <= 1 && len(status) > 1 {
skip = true
break
}
// Skip given replica sets based on all filtering factors
rtr.ReplicaSet = filteredReplicaSet(rtr.ReplicaSet, filtered, target.Host)
if len(rtr.ReplicaSet) <= 1 && len(status) > 1 {
continue
}

for _, r := range rtr.Ranges {
kp.TokenRepIdx[r] = len(kp.Replicas)
}
kp.Replicas = append(kp.Replicas, rtr)
}

if skip {
if skip || len(kp.Replicas) == 0 {
p.SkippedKeyspaces = append(p.SkippedKeyspaces, u.Keyspace)
continue
}
Expand Down
18 changes: 9 additions & 9 deletions pkg/service/repair/service_repair_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ const (
shortWait = 60 * time.Second
// longWait specifies that condition shall be met after a while, this is
// useful for waiting for repair to significantly advance or finish.
longWait = 20 * time.Second
longWait = 2 * shortWait

_interval = 500 * time.Millisecond
)
Expand Down Expand Up @@ -1122,7 +1122,7 @@ func TestServiceRepairIntegration(t *testing.T) {
h := newRepairTestHelper(t, session, defaultConfig())
clusterSession := CreateSessionAndDropAllKeyspaces(t, h.Client)

createKeyspace(t, clusterSession, "test_repair", 3, 3)
createKeyspace(t, clusterSession, "test_repair", 2, 2)
WriteData(t, clusterSession, "test_repair", 1, "test_table_0", "test_table_1")
defer dropKeyspace(t, clusterSession, "test_repair")

Expand Down Expand Up @@ -1167,7 +1167,7 @@ func TestServiceRepairIntegration(t *testing.T) {
h.assertRunning(shortWait)

Print("When: repair is done")
h.assertDone(2 * longWait)
h.assertDone(longWait)

Print("Then: dc2 is used for repair")
prog, err := h.service.GetProgress(context.Background(), h.ClusterID, h.TaskID, h.RunID)
Expand Down Expand Up @@ -1211,7 +1211,7 @@ func TestServiceRepairIntegration(t *testing.T) {
h.assertRunning(shortWait)

Print("When: repair is done")
h.assertDone(2 * longWait)
h.assertDone(longWait)

Print("Then: ignored node is not repaired")
prog, err := h.service.GetProgress(context.Background(), h.ClusterID, h.TaskID, h.RunID)
Expand Down Expand Up @@ -1242,7 +1242,7 @@ func TestServiceRepairIntegration(t *testing.T) {
h.assertRunning(shortWait)

Print("When: repair is done")
h.assertDone(2 * longWait)
h.assertDone(longWait)
})

t.Run("repair dc local keyspace mismatch", func(t *testing.T) {
Expand Down Expand Up @@ -1472,7 +1472,7 @@ func TestServiceRepairIntegration(t *testing.T) {
h.assertParallelIntensity(controlParallel, controlIntensity)

Print("Then: repair is done")
h.assertDone(3 * longWait)
h.assertDone(longWait)

Print("Then: assert resumed, finished parallel/intensity from control")
h.assertParallelIntensity(controlParallel, controlIntensity)
Expand All @@ -1488,7 +1488,7 @@ func TestServiceRepairIntegration(t *testing.T) {
h.assertParallelIntensity(propParallel, propIntensity)

Print("Then: repair is done")
h.assertDone(3 * longWait)
h.assertDone(longWait)

Print("Then: assert fresh, finished repair parallel/intensity from control")
h.assertParallelIntensity(propParallel, propIntensity)
Expand Down Expand Up @@ -1901,7 +1901,7 @@ func TestServiceRepairIntegration(t *testing.T) {
})

Print("Then: repair is done")
h.assertDone(3 * longWait)
h.assertDone(longWait)

Print("And: more than one repair jobs were scheduled")
if repairCalled <= 1 {
Expand Down Expand Up @@ -1990,7 +1990,7 @@ func TestServiceRepairIntegration(t *testing.T) {
})

Print("Then: repair is done")
h.assertDone(3 * longWait)
h.assertDone(longWait)

Print("And: jobs were scheduled")
if repairCalled < 1 {
Expand Down
Loading