Skip to content

Commit

Permalink
Restore improvement: compaction (#4060)
Browse files Browse the repository at this point in the history
* feat(restore): disable compaction during restore

Fixes #3956

* feat(command/restore): allow controlling auto compaction

* feat(restore_test): test setup during restore data stage

This commit add TestRestoreTablesPreparationIntegration.
Its purpose is to check setup changes performed by SM during
restore data stage. Right now it validates tombstone_gc mode
and compaction, but it can be easily extended in the future.
Test scenario:
- Run restore - hang on restore data stage
- Validate setup
- Pause restore
- Validate setup
- Resume restore - hang on restore data stage
- Validate setup
- Resume restore - wait for success
- Validate setup
- Validate restore success

* fix(backup_test): remove partial upload check from TestBackupResumeIntegration

This check isn't implemented correctly, as starting from 27b1d23,
file names are not stored in the SM DB. It means that 'sfs' was
always empty.
Nevertheless, the partial upload check could still fail when
's3f' was empty as well. This test does not implement any breakpoint
to ensure that backup is paused after transferring at least a single file
(it just waits for the transfers to start).
That's why this check was flaky and didn't do anything meaningful,
so it is removed from the test.
  • Loading branch information
Michal-Leszczynski authored Oct 8, 2024
1 parent f7a2ea9 commit 9fb5be9
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 59 deletions.
5 changes: 5 additions & 0 deletions docs/source/sctool/partials/sctool_restore.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ description: |
a specific follow-up action described by selected type.
usage: sctool restore --cluster <id|name> --location [<dc>:]<provider>:<bucket> --snapshot-tag <tag> [flags]
options:
- name: allow-compaction
default_value: "false"
usage: |
Defines if auto compactions should be running on Scylla nodes during restore.
Disabling auto compactions decreases restore time duration, but increases compaction workload after the restore is done.
- name: batch-size
default_value: "2"
usage: |
Expand Down
5 changes: 5 additions & 0 deletions docs/source/sctool/partials/sctool_restore_update.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ description: |
If there is one restore task the 'restore/task-id' argument is not needed.
usage: sctool restore update --cluster <id|name> [flags] [<restore/task-id>]
options:
- name: allow-compaction
default_value: "false"
usage: |
Defines if auto compactions should be running on Scylla nodes during restore.
Disabling auto compactions decreases restore time duration, but increases compaction workload after the restore is done.
- name: batch-size
default_value: "2"
usage: |
Expand Down
26 changes: 16 additions & 10 deletions pkg/command/restore/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,17 @@ type command struct {
flag.TaskBase
client *managerclient.Client

cluster string
location []string
keyspace []string
snapshotTag string
batchSize int
parallel int
restoreSchema bool
restoreTables bool
dryRun bool
showTables bool
cluster string
location []string
keyspace []string
snapshotTag string
batchSize int
parallel int
allowCompaction bool
restoreSchema bool
restoreTables bool
dryRun bool
showTables bool
}

func NewCommand(client *managerclient.Client) *cobra.Command {
Expand Down Expand Up @@ -78,6 +79,7 @@ func (cmd *command) init() {
w.Unwrap().StringVarP(&cmd.snapshotTag, "snapshot-tag", "T", "", "")
w.Unwrap().IntVar(&cmd.batchSize, "batch-size", 2, "")
w.Unwrap().IntVar(&cmd.parallel, "parallel", 1, "")
w.Unwrap().BoolVar(&cmd.allowCompaction, "allow-compaction", false, "")
w.Unwrap().BoolVar(&cmd.restoreSchema, "restore-schema", false, "")
w.Unwrap().BoolVar(&cmd.restoreTables, "restore-tables", false, "")
w.Unwrap().BoolVar(&cmd.dryRun, "dry-run", false, "")
Expand Down Expand Up @@ -144,6 +146,10 @@ func (cmd *command) run(args []string) error {
props["parallel"] = cmd.parallel
ok = true
}
if cmd.Flag("allow-compaction").Changed {
props["allow_compaction"] = cmd.allowCompaction
ok = true
}
if cmd.Flag("restore-schema").Changed {
if cmd.Update() {
return wrapper("restore-schema")
Expand Down
4 changes: 4 additions & 0 deletions pkg/command/restore/res.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ parallel: |
The maximum number of Scylla restore jobs that can be run at the same time (on different SSTables).
Each node can take part in at most one restore at any given moment.
allow-compaction: |
Defines if auto compactions should be running on Scylla nodes during restore.
Disabling auto compactions decreases restore time duration, but increases compaction workload after the restore is done.
restore-schema: |
Specifies restore type (alternative to '--restore-tables' flag).
Restore will recreate schema by applying the backed up output of DESCRIBE SCHEMA WITH INTERNALS via CQL.
Expand Down
41 changes: 0 additions & 41 deletions pkg/service/backup/service_backup_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/scylladb/go-log"
"github.com/scylladb/go-set/strset"
"github.com/scylladb/gocqlx/v2"
"github.com/scylladb/gocqlx/v2/qb"
"github.com/scylladb/scylla-manager/v3/pkg/service/cluster"
"github.com/scylladb/scylla-manager/v3/pkg/util"
"go.uber.org/atomic"
Expand Down Expand Up @@ -187,35 +186,6 @@ func (h *backupTestHelper) listS3Files() (manifests, schemas, files []string) {
return
}

func (h *backupTestHelper) progressFilesSet() *strset.Set {
h.T.Helper()

files := strset.New()
q := table.BackupRunProgress.SelectQuery(h.Session).BindMap(qb.M{
"cluster_id": h.ClusterID,
"task_id": h.TaskID,
"run_id": h.RunID,
})
iter := q.Iter()
defer func() {
iter.Close()
q.Release()
}()

pr := &backup.RunProgress{}
for iter.StructScan(pr) {
fs := pr.Files()
for i := range fs {
if strings.Contains(fs[i].Name, ScyllaManifest) {
continue
}
files.Add(fs[i].Name)
}
}

return files
}

func restartAgents(h *CommonTestHelper) {
execOnAllHosts(h, "supervisorctl restart scylla-manager-agent")
}
Expand Down Expand Up @@ -1064,17 +1034,6 @@ func TestBackupResumeIntegration(t *testing.T) {
Print("And: nothing is transferring")
h.waitNoTransfers()

Print("And: snapshot is partially uploaded")
_, _, s3Files := h.listS3Files()
sfs := h.progressFilesSet()
s3f := strset.New()
for i := range s3Files {
s3f.Add(path.Base(s3Files[i]))
}
if s3f.IsEqual(sfs) {
h.T.Fatalf("Expected partial upload, got\n%v,\n%v", sfs, s3f)
}

Print("When: backup is resumed with new RunID")
err := h.service.Backup(context.Background(), h.ClusterID, h.TaskID, uuid.NewTime(), target)
if err != nil {
Expand Down
17 changes: 9 additions & 8 deletions pkg/service/restore/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ import (

// Target specifies what data should be restored and from which locations.
type Target struct {
Location []Location `json:"location"`
Keyspace []string `json:"keyspace,omitempty"`
SnapshotTag string `json:"snapshot_tag"`
BatchSize int `json:"batch_size,omitempty"`
Parallel int `json:"parallel,omitempty"`
RestoreSchema bool `json:"restore_schema,omitempty"`
RestoreTables bool `json:"restore_tables,omitempty"`
Continue bool `json:"continue"`
Location []Location `json:"location"`
Keyspace []string `json:"keyspace,omitempty"`
SnapshotTag string `json:"snapshot_tag"`
BatchSize int `json:"batch_size,omitempty"`
Parallel int `json:"parallel,omitempty"`
AllowCompaction bool `json:"allow_compaction,omitempty"`
RestoreSchema bool `json:"restore_schema,omitempty"`
RestoreTables bool `json:"restore_tables,omitempty"`
Continue bool `json:"continue"`

// Cache for host with access to remote location
locationHosts map[Location][]string `json:"-"`
Expand Down
153 changes: 153 additions & 0 deletions pkg/service/restore/restore_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
"sync/atomic"
"testing"
"time"

Expand All @@ -19,6 +21,7 @@ import (
. "github.com/scylladb/scylla-manager/v3/pkg/testutils"
. "github.com/scylladb/scylla-manager/v3/pkg/testutils/db"
. "github.com/scylladb/scylla-manager/v3/pkg/testutils/testconfig"
"github.com/scylladb/scylla-manager/v3/pkg/util/httpx"
"github.com/scylladb/scylla-manager/v3/pkg/util/maputil"
"github.com/scylladb/scylla-manager/v3/pkg/util/query"
"github.com/scylladb/scylla-manager/v3/pkg/util/uuid"
Expand Down Expand Up @@ -465,3 +468,153 @@ func TestRestoreTablesPausedIntegration(t *testing.T) {
}
}
}

func TestRestoreTablesPreparationIntegration(t *testing.T) {
// Scenario - setup corresponds to things like tombstone_gc mode or compaction being enabled:
// Run restore - hang on restore data stage
// Validate setup
// Pause restore
// Validate setup
// Resume restore - hang on restore data stage
// Validate setup
// Resume restore - wait for success
// Validate setup
// Validate restore success

h := newTestHelper(t, ManagedSecondClusterHosts(), ManagedClusterHosts())

Print("Keyspace setup")
ksStmt := "CREATE KEYSPACE %q WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': %d}"
ks := randomizedName("prep_")
ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(ksStmt, ks, 2))
ExecStmt(t, h.dstCluster.rootSession, fmt.Sprintf(ksStmt, ks, 2))

Print("Table setup")
tabStmt := "CREATE TABLE %q.%q (id int PRIMARY KEY, data int) WITH tombstone_gc = {'mode': 'repair'}"
tab := randomizedName("tab_")
ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(tabStmt, ks, tab))
ExecStmt(t, h.dstCluster.rootSession, fmt.Sprintf(tabStmt, ks, tab))

Print("Fill setup")
fillTable(t, h.srcCluster.rootSession, 100, ks, tab)

Print("Run backup")
loc := []Location{testLocation("preparation", "")}
S3InitBucket(t, loc[0].Path)
ksFilter := []string{ks}
tag := h.runBackup(t, map[string]any{
"location": loc,
"keyspace": ksFilter,
})

runRestore := func(ctx context.Context, finishedRestore chan error) {
grantRestoreTablesPermissions(t, h.dstCluster.rootSession, ksFilter, h.dstUser)
h.dstCluster.RunID = uuid.NewTime()
rawProps, err := json.Marshal(map[string]any{
"location": loc,
"keyspace": ksFilter,
"snapshot_tag": tag,
"restore_tables": true,
})
if err != nil {
t.Error(err)
}
finishedRestore <- h.dstRestoreSvc.Restore(ctx, h.dstCluster.ClusterID, h.dstCluster.TaskID, h.dstCluster.RunID, rawProps)
}

validateState := func(tombstone string, compaction bool) {
// Validate tombstone_gc mode
if got := tombstoneGCMode(t, h.dstCluster.rootSession, ks, tab); tombstone != got {
t.Errorf("expected tombstone_gc=%s, got %s", tombstone, got)
}
// Validate compaction
for _, host := range ManagedClusterHosts() {
enabled, err := h.dstCluster.Client.IsAutoCompactionEnabled(context.Background(), host, ks, tab)
if err != nil {
t.Fatal(errors.Wrapf(err, "check compaction on host %s", host))
}
if compaction != enabled {
t.Errorf("expected compaction enabled=%v, got=%v on host %s", compaction, enabled, host)
}
}
}

makeCopyPathsHang := func(reachedDataStage *atomic.Bool, reachedDataStageChan, hangCopyPaths chan struct{}) {
h.dstCluster.Hrt.SetInterceptor(httpx.RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
if strings.HasPrefix(req.URL.Path, "/agent/rclone/sync/copypaths") {
if reachedDataStage.CompareAndSwap(false, true) {
Print("Reached data stage")
close(reachedDataStageChan)
}
Print("Wait for copy paths to stop hanging")
<-hangCopyPaths
}
return nil, nil
}))
}

var (
reachedDataStage = &atomic.Bool{}
reachedDataStageChan = make(chan struct{})
hangCopyPathsChan = make(chan struct{})
)
Print("Make copy paths hang")
makeCopyPathsHang(reachedDataStage, reachedDataStageChan, hangCopyPathsChan)

Print("Run restore")
finishedRestore := make(chan error)
restoreCtx, restoreCancel := context.WithCancel(context.Background())
go runRestore(restoreCtx, finishedRestore)

Print("Wait for data stage")
<-reachedDataStageChan

Print("Validate state during restore data")
validateState("disabled", false)

Print("Pause restore")
restoreCancel()

Print("Release copy paths")
close(hangCopyPathsChan)

Print("Wait for restore")
err := <-finishedRestore
if !errors.Is(err, context.Canceled) {
t.Fatalf("Expected restore to be paused, got: %s", err)
}

Print("Validate state during pause")
validateState("disabled", true)

reachedDataStage = &atomic.Bool{}
reachedDataStageChan = make(chan struct{})
hangCopyPathsChan = make(chan struct{})
Print("Make copy paths hang after pause")
makeCopyPathsHang(reachedDataStage, reachedDataStageChan, hangCopyPathsChan)

Print("Run restore after pause")
finishedRestore = make(chan error)
go runRestore(context.Background(), finishedRestore)

Print("Wait for data stage")
<-reachedDataStageChan

Print("Validate state during restore data after pause")
validateState("disabled", false)

Print("Release copy paths")
close(hangCopyPathsChan)

Print("Wait for restore")
err = <-finishedRestore
if err != nil {
t.Fatalf("Expected restore to success, got: %s", err)
}

Print("Validate state after restore success")
validateState("repair", true)

Print("Validate table contents")
h.validateIdenticalTables(t, []table{{ks: ks, tab: tab}})
}
29 changes: 29 additions & 0 deletions pkg/service/restore/tables_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,17 @@ func (w *tablesWorker) stageRestoreData(ctx context.Context) error {
w.logger.Info(ctx, "Host shard count", "host", h, "shards", sh)
}

if !w.target.AllowCompaction {
defer func() {
if err := w.setAutoCompaction(context.Background(), hosts, true); err != nil {
w.logger.Error(ctx, "Couldn't enable auto compaction", "error", err)
}
}()
if err := w.setAutoCompaction(ctx, hosts, false); err != nil {
return errors.Wrapf(err, "disable auto compaction")
}
}

bd := newBatchDispatcher(workload, w.target.BatchSize, hostToShard, w.target.locationHosts)

f := func(n int) (err error) {
Expand Down Expand Up @@ -313,3 +324,21 @@ func (w *tablesWorker) initRestoreMetrics(ctx context.Context) {
}
}
}

// Disables auto compaction on all provided hosts and units.
func (w *tablesWorker) setAutoCompaction(ctx context.Context, hosts []string, enabled bool) error {
f := w.client.EnableAutoCompaction
if !enabled {
f = w.client.DisableAutoCompaction
}
for _, h := range hosts {
for _, u := range w.run.Units {
for _, t := range u.Tables {
if err := f(ctx, h, u.Keyspace, t.Table); err != nil {
return errors.Wrapf(err, "set autocompaction on %s to %v", h, enabled)
}
}
}
}
return nil
}

0 comments on commit 9fb5be9

Please sign in to comment.