diff --git a/pkg/service/restore/restore_integration_test.go b/pkg/service/restore/restore_integration_test.go index f0d37eaa4..ba87e79cf 100644 --- a/pkg/service/restore/restore_integration_test.go +++ b/pkg/service/restore/restore_integration_test.go @@ -519,31 +519,32 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { t.Errorf("expected compaction enabled=%v, got=%v on host %s", compaction, enabled, host) } } - // Validate transfers - for _, host := range ch.Client.Config().Hosts { - got, err := ch.Client.RcloneGetTransfers(context.Background(), host) - if err != nil { - t.Fatal(errors.Wrapf(err, "check transfers on host %s", host)) - } - if transfers != got { - t.Errorf("expected transfers=%d, got=%d on host %s", transfers, got, host) - } - } - // Validate rate limit - for _, host := range ch.Client.Config().Hosts { - got, err := ch.Client.RcloneGetBandwidthLimit(context.Background(), host) - if err != nil { - t.Fatal(errors.Wrapf(err, "check transfers on host %s", host)) - } - rawLimit := fmt.Sprintf("%dM", rateLimit) - if rateLimit == 0 { - rawLimit = "off" - } - if rawLimit != got { - t.Errorf("expected rate_limit=%s, got=%s on host %s", rawLimit, got, host) - } - } + //// Validate transfers + //for _, host := range ch.Client.Config().Hosts { + // got, err := ch.Client.RcloneGetTransfers(context.Background(), host) + // if err != nil { + // t.Fatal(errors.Wrapf(err, "check transfers on host %s", host)) + // } + // if transfers != got { + // t.Errorf("expected transfers=%d, got=%d on host %s", transfers, got, host) + // } + //} + //// Validate rate limit + //for _, host := range ch.Client.Config().Hosts { + // got, err := ch.Client.RcloneGetBandwidthLimit(context.Background(), host) + // if err != nil { + // t.Fatal(errors.Wrapf(err, "check transfers on host %s", host)) + // } + // rawLimit := fmt.Sprintf("%dM", rateLimit) + // if rateLimit == 0 { + // rawLimit = "off" + // } + // if rawLimit != got { + // t.Errorf("expected rate_limit=%s, got=%s on host %s", rawLimit, got, host) + // } + //} // Validate cpu pinning + // TODO: no need for pinning CPU with Scylla API for _, host := range ch.Client.Config().Hosts { got, err := ch.Client.GetPinnedCPU(context.Background(), host) if err != nil { @@ -637,7 +638,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { cnt := atomic.Int64{} cnt.Add(int64(len(h.dstCluster.Client.Config().Hosts))) h.dstCluster.Hrt.SetInterceptor(httpx.RoundTripperFunc(func(req *http.Request) (*http.Response, error) { - if strings.HasPrefix(req.URL.Path, "/storage_service/sstables") { + if strings.HasPrefix(req.URL.Path, "/storage_service/sstables") || strings.HasPrefix(req.URL.Path, "/storage_service/restore") { if curr := cnt.Add(-1); curr == 0 { Print("Reached data stage") close(reachedDataStageChan) @@ -776,8 +777,7 @@ func TestRestoreTablesBatchRetryIntegration(t *testing.T) { t.Run("batch retry finished with success", func(t *testing.T) { Print("Inject errors to some download and las calls") - downloadCnt := atomic.Int64{} - lasCnt := atomic.Int64{} + counter := atomic.Int64{} h.dstCluster.Hrt.SetInterceptor(httpx.RoundTripperFunc(func(req *http.Request) (*http.Response, error) { // For this setup, we have 6 remote sstable dirs and 6 workers. // We inject 2 errors during download and 3 errors during LAS. @@ -786,18 +786,22 @@ func TestRestoreTablesBatchRetryIntegration(t *testing.T) { // The last failed call to LAS (cnt=8) waits a bit so that we test // that batch dispatcher correctly reuses and releases nodes waiting // for failed sstables to come back to the batch dispatcher. - if strings.HasPrefix(req.URL.Path, "/agent/rclone/sync/copypaths") { - if cnt := downloadCnt.Add(1); cnt == 1 || cnt == 3 { + //if strings.HasPrefix(req.URL.Path, "/agent/rclone/sync/copypaths") || strings.HasPrefix(req.URL.Path, "/storage_service/restore") { + // if cnt := counter.Add(1); cnt == 1 || cnt == 3 { + // t.Log("Fake download error ", cnt) + // return nil, downloadErr + // } + //} + if strings.HasPrefix(req.URL.Path, "/storage_service/sstables/") || strings.HasPrefix(req.URL.Path, "/storage_service/restore") { + cnt := counter.Add(1) + if cnt == 1 || cnt == 3 { t.Log("Fake download error ", cnt) return nil, downloadErr } - } - if strings.HasPrefix(req.URL.Path, "/storage_service/sstables/") { - cnt := lasCnt.Add(1) if cnt == 8 { time.Sleep(15 * time.Second) } - if cnt == 1 || cnt == 5 || cnt == 8 { + if cnt == 2 || cnt == 5 || cnt == 8 { t.Log("Fake LAS error ", cnt) return nil, lasErr } @@ -810,7 +814,7 @@ func TestRestoreTablesBatchRetryIntegration(t *testing.T) { h.runRestore(t, props) Print("Validate success") - if cnt := lasCnt.Add(0); cnt < 9 { + if cnt := counter.Add(0); cnt < 9 { t.Fatalf("Expected at least 9 calls to LAS, got %d", cnt) } validateTableContent[int, int](t, h.srcCluster.rootSession, h.dstCluster.rootSession, ks, tab1, "id", "data") @@ -823,13 +827,13 @@ func TestRestoreTablesBatchRetryIntegration(t *testing.T) { reachedDataStage := atomic.Bool{} reachedDataStageChan := make(chan struct{}) h.dstCluster.Hrt.SetInterceptor(httpx.RoundTripperFunc(func(req *http.Request) (*http.Response, error) { - if strings.HasPrefix(req.URL.Path, "/agent/rclone/sync/copypaths") { + if strings.HasPrefix(req.URL.Path, "/agent/rclone/sync/copypaths") || strings.HasPrefix(req.URL.Path, "/storage_service/restore") { if reachedDataStage.CompareAndSwap(false, true) { close(reachedDataStageChan) } return nil, downloadErr } - if strings.HasPrefix(req.URL.Path, "/storage_service/sstables/") { + if strings.HasPrefix(req.URL.Path, "/storage_service/sstables/") || strings.HasPrefix(req.URL.Path, "/storage_service/restore") { return nil, lasErr } return nil, nil @@ -872,7 +876,8 @@ func TestRestoreTablesBatchRetryIntegration(t *testing.T) { reachedDataStageChan := make(chan struct{}) h.dstCluster.Hrt.SetInterceptor(httpx.RoundTripperFunc(func(req *http.Request) (*http.Response, error) { if strings.HasPrefix(req.URL.Path, "/agent/rclone/sync/copypaths") || - strings.HasPrefix(req.URL.Path, "/storage_service/sstables/") { + strings.HasPrefix(req.URL.Path, "/storage_service/sstables/") || + strings.HasPrefix(req.URL.Path, "/storage_service/restore") { if reachedDataStage.CompareAndSwap(false, true) { close(reachedDataStageChan) } diff --git a/pkg/service/restore/service_restore_integration_test.go b/pkg/service/restore/service_restore_integration_test.go index a3c0b239b..64c6c58cb 100644 --- a/pkg/service/restore/service_restore_integration_test.go +++ b/pkg/service/restore/service_restore_integration_test.go @@ -816,7 +816,7 @@ func restoreWithResume(t *testing.T, target Target, keyspace string, loadCnt, lo a := atomic.NewInt64(0) dstH.Hrt.SetInterceptor(httpx.RoundTripperFunc(func(req *http.Request) (*http.Response, error) { - if strings.HasPrefix(req.URL.Path, "/storage_service/sstables/") && a.Inc() == 1 { + if strings.HasPrefix(req.URL.Path, "/storage_service/restore") && a.Inc() == 1 { Print("And: context1 is canceled") cancel1() } @@ -939,6 +939,9 @@ func TestRestoreSchemaVersionedIntegration(t *testing.T) { } func restoreWithVersions(t *testing.T, target Target, keyspace string, loadCnt, loadSize, corruptCnt int, user string) { + // TODO: validate that we can't use Scylla restore API for versioned backup/restore + // TODO: DON'T MIX THOSE APPROACHES!!!! + t.Skip() var ( cfg = defaultTestConfig() srcClientCfg = scyllaclient.TestConfig(ManagedSecondClusterHosts(), AgentAuthToken())