Skip to content

Commit

Permalink
feat(restore): integrate new indexing and batching with codebase
Browse files Browse the repository at this point in the history
This commit makes use of the new indexing and batching
approaches and uses them in the restore tables codebase.
  • Loading branch information
Michal-Leszczynski committed Sep 27, 2024
1 parent cd67d27 commit 9ab3dcd
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 496 deletions.
4 changes: 0 additions & 4 deletions pkg/service/restore/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,6 @@ func (pr *RunProgress) ForEachTableProgress(session gocqlx.Session, cb func(*Run
return iter.Close()
}

func (pr *RunProgress) idCnt() int64 {
return int64(len(pr.SSTableID))
}

func (pr *RunProgress) setRestoreStartedAt() {
t := timeutc.Now()
pr.RestoreStartedAt = &t
Expand Down
8 changes: 6 additions & 2 deletions pkg/service/restore/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,12 @@ func (s *Service) Restore(ctx context.Context, clusterID, taskID, runID uuid.UUI
for _, unit := range w.run.Units {
totalBytesToRestore += unit.Size
}
tw := newTablesWorker(w, s.repairSvc, totalBytesToRestore)
err = tw.restore(ctx)
tw, workerErr := newTablesWorker(w, s.repairSvc, totalBytesToRestore)
if workerErr != nil {
err = workerErr
} else {
err = tw.restore(ctx)
}
} else {
sw := &schemaWorker{worker: w}
err = sw.restore(ctx)
Expand Down
11 changes: 1 addition & 10 deletions pkg/service/restore/service_restore_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -852,15 +852,6 @@ func restoreWithResume(t *testing.T, target Target, keyspace string, loadCnt, lo
t.Fatalf("Expected context error but got: %+v", err)
}

pr, err := dstH.service.GetProgress(context.Background(), dstH.ClusterID, dstH.TaskID, dstH.RunID)
if err != nil {
t.Fatal(err)
}
Printf("And: restore progress: %+#v\n", pr)
if pr.Downloaded == 0 {
t.Fatal("Expected partial restore progress")
}

Print("When: resume restore and stop in during repair")
dstH.RunID = uuid.MustRandom()
err = dstH.service.Restore(ctx2, dstH.ClusterID, dstH.TaskID, dstH.RunID, dstH.targetToProperties(target))
Expand All @@ -872,7 +863,7 @@ func restoreWithResume(t *testing.T, target Target, keyspace string, loadCnt, lo
t.Fatalf("Expected context error but got: %+v", err)
}

pr, err = dstH.service.GetProgress(context.Background(), dstH.ClusterID, dstH.TaskID, dstH.RunID)
pr, err := dstH.service.GetProgress(context.Background(), dstH.ClusterID, dstH.TaskID, dstH.RunID)
if err != nil {
t.Fatal(err)
}
Expand Down
129 changes: 71 additions & 58 deletions pkg/service/restore/tables_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,21 @@ import (
"sync"

"github.com/pkg/errors"
"github.com/scylladb/go-set/strset"
"github.com/scylladb/scylla-manager/v3/pkg/metrics"
. "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
"github.com/scylladb/scylla-manager/v3/pkg/service/repair"
"github.com/scylladb/scylla-manager/v3/pkg/util/parallel"
"github.com/scylladb/scylla-manager/v3/pkg/util/query"
"github.com/scylladb/scylla-manager/v3/pkg/util/uuid"
)

type tablesWorker struct {
worker

repairSvc *repair.Service
progress *TotalRestoreProgress
tableVersion map[TableName]string
repairSvc *repair.Service
progress *TotalRestoreProgress
}

// TotalRestoreProgress is a struct that holds information about the total progress of the restore job.
Expand Down Expand Up @@ -61,12 +65,27 @@ func (p *TotalRestoreProgress) Update(bytesRestored int64) {
p.restoredBytes += bytesRestored
}

func newTablesWorker(w worker, repairSvc *repair.Service, totalBytes int64) *tablesWorker {
return &tablesWorker{
worker: w,
repairSvc: repairSvc,
progress: NewTotalRestoreProgress(totalBytes),
func newTablesWorker(w worker, repairSvc *repair.Service, totalBytes int64) (*tablesWorker, error) {
versions := make(map[TableName]string)
for _, u := range w.run.Units {
for _, t := range u.Tables {
v, err := query.GetTableVersion(w.clusterSession, u.Keyspace, t.Table)
if err != nil {
return nil, errors.Wrapf(err, "get %s.%s version", u.Keyspace, t.Table)
}
versions[TableName{
Keyspace: u.Keyspace,
Table: t.Table,
}] = v
}
}

return &tablesWorker{
worker: w,
tableVersion: versions,
repairSvc: repairSvc,
progress: NewTotalRestoreProgress(totalBytes),
}, nil
}

// restore files from every location specified in restore target.
Expand Down Expand Up @@ -149,66 +168,60 @@ func (w *tablesWorker) stageRestoreData(ctx context.Context) error {
w.logger.Info(ctx, "Started restoring tables")
defer w.logger.Info(ctx, "Restoring tables finished")

// Restore locations in deterministic order
for _, l := range w.target.Location {
if err := w.restoreLocation(ctx, l); err != nil {
return err
}
workload, err := w.IndexWorkload(ctx, w.target.Location)
if err != nil {
return err
}
return nil
}

func (w *tablesWorker) restoreLocation(ctx context.Context, location Location) error {
w.logger.Info(ctx, "Restoring location", "location", location)
defer w.logger.Info(ctx, "Restoring location finished", "location", location)
w.initMetrics(workload)

restoreManifest := func(miwc ManifestInfoWithContent) error {
w.logger.Info(ctx, "Restoring manifest", "manifest", miwc.ManifestInfo)
defer w.logger.Info(ctx, "Restoring manifest finished", "manifest", miwc.ManifestInfo)

return miwc.ForEachIndexIterWithError(nil, w.restoreDir(ctx, miwc))
bd := newBatchDispatcher(workload, w.target.BatchSize, w.target.locationHosts)
hostsS := strset.New()
for _, h := range w.target.locationHosts {
hostsS.Add(h...)
}
hosts := hostsS.List()

f := func(n int) (err error) {
h := hosts[n]
for {
// Download and stream in parallel
b, ok := bd.DispatchBatch(h)
if !ok {
w.logger.Info(ctx, "No more batches to restore", "host", h)
return nil
}
w.metrics.IncreaseBatchSize(w.run.ClusterID, h, b.Size)
w.logger.Info(ctx, "Got batch to restore",
"host", h,
"keyspace", b.Keyspace,
"table", b.Table,
"size", b.Size,
"sstable count", len(b.SSTables),
)

return w.forEachManifest(ctx, location, restoreManifest)
}

func (w *tablesWorker) restoreDir(ctx context.Context, miwc ManifestInfoWithContent) func(fm FilesMeta) error {
return func(fm FilesMeta) error {
if !unitsContainTable(w.run.Units, fm.Keyspace, fm.Table) {
return nil
}

w.logger.Info(ctx, "Restoring table", "keyspace", fm.Keyspace, "table", fm.Table)
defer w.logger.Info(ctx, "Restoring table finished", "keyspace", fm.Keyspace, "table", fm.Table)
w.insertRun(ctx)

dw, err := newTablesDirWorker(ctx, w.worker, miwc, fm, w.progress)
if err != nil {
return errors.Wrap(err, "create dir worker")
}
if err := dw.resumePrevProgress(); err != nil {
return errors.Wrap(err, "resume prev run progress")
}

if err := dw.restore(ctx); err != nil {
if ctx.Err() != nil {
return ctx.Err()
pr, err := w.newRunProgress(ctx, h, b)
if err != nil {
return errors.Wrap(err, "create new run progress")
}
// In case all SSTables have been restored, restore can proceed even
// with errors from some hosts.
if len(dw.bundleIDPool) > 0 {
return errors.Wrapf(err, "not restored bundles %v", dw.bundleIDPool.drain())
if err := w.restoreBatch(ctx, b, pr); err != nil {
return errors.Wrap(err, "restore batch")
}

w.logger.Error(ctx, "Restore table failed on some hosts but restore will proceed",
"keyspace", fm.Keyspace,
"table", fm.Table,
"error", err,
)
w.decreaseRemainingBytesMetric(b)
}
}

notify := func(n int, err error) {
w.logger.Error(ctx, "Failed to restore files on host",
"host", hosts[n],
"error", err,
)
}

return nil
err = parallel.Run(len(hosts), w.target.Parallel, f, notify)
if err == nil {
return bd.ValidateAllDispatched()
}
return err
}

func (w *tablesWorker) stageRepair(ctx context.Context) error {
Expand Down
Loading

0 comments on commit 9ab3dcd

Please sign in to comment.