Skip to content

Commit

Permalink
Restore improvement: batch (#4055)
Browse files Browse the repository at this point in the history
* feat(restore): batch, scale batch size with shard count

This was the initial design, that was changed during implementation by mistake.
We should always send shard_cnt * --batch-size sstables in a single restore job,
even when user wants to restore into a running cluster. For this case, it should
be enough to set --parallel=1 and --batch-size=1 for a slow running restore.

* feat(restore): batch, introduce 5% batching for --batch-size=0

This commit allows to set --batch-size=0.
When this happens, batches will be created so that they contain
about 5% of expected node workload during restore.
This allows for creating big, yet evenly distributed batches
without the need to play with the --batch-size flag.
It should also work better fine when backed up cluster
had different amount of nodes than the restore destination
cluster.

Fixes #4059

* feat(docs): restore, add --batch-size=0 description

* fix(restore): batch, remove panics and improve safety

* feat(scyllaclient): return error on 0 host shard count

This shouldn't be possible, but we can still validate that.

* feat(restore): log host shard count

* feat(restore): batch, order sstables by size

This results in creating batches of sstables of
more similar size.

Fixes #3979

* feat(restore): extend batch with small leftovers

This allows not to bother with small,
badly distributed over shards leftover batches.

* feat(restore): batch, add test for simple batching scenario
  • Loading branch information
Michal-Leszczynski authored Oct 7, 2024
1 parent ddd3cbf commit 78982d3
Show file tree
Hide file tree
Showing 9 changed files with 257 additions and 21 deletions.
1 change: 1 addition & 0 deletions docs/source/sctool/partials/sctool_restore.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ options:
usage: |
Number of SSTables per shard to process in one request by one node.
Increasing the default batch size might significantly increase restore performance, as only one shard can work on restoring a single SSTable bundle.
Set to 0 for best performance (batches will contain sstables of total size up to 5% of expected total node workload).
- name: cluster
shorthand: c
usage: |
Expand Down
1 change: 1 addition & 0 deletions docs/source/sctool/partials/sctool_restore_update.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ options:
usage: |
Number of SSTables per shard to process in one request by one node.
Increasing the default batch size might significantly increase restore performance, as only one shard can work on restoring a single SSTable bundle.
Set to 0 for best performance (batches will contain sstables of total size up to 5% of expected total node workload).
- name: cluster
shorthand: c
usage: |
Expand Down
1 change: 1 addition & 0 deletions pkg/command/restore/res.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ snapshot-tag: |
batch-size: |
Number of SSTables per shard to process in one request by one node.
Increasing the default batch size might significantly increase restore performance, as only one shard can work on restoring a single SSTable bundle.
Set to 0 for best performance (batches will contain sstables of total size up to 5% of expected total node workload).
parallel: |
The maximum number of Scylla restore jobs that can be run at the same time (on different SSTables).
Expand Down
3 changes: 3 additions & 0 deletions pkg/scyllaclient/client_scylla.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,9 @@ func (c *Client) HostsShardCount(ctx context.Context, hosts []string) (map[strin

out := make(map[string]uint)
for i, h := range hosts {
if shards[i] == 0 {
return nil, errors.Errorf("host %s reported 0 shard count", h)
}
out[h] = shards[i]
}
return out, nil
Expand Down
111 changes: 94 additions & 17 deletions pkg/service/restore/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,34 @@ import (
)

type batchDispatcher struct {
mu sync.Mutex
workload []LocationWorkload
batchSize int
locationHosts map[Location][]string
mu sync.Mutex
workload []LocationWorkload
batchSize int
expectedShardWorkload int64
hostShardCnt map[string]uint
locationHosts map[Location][]string
}

func newBatchDispatcher(workload []LocationWorkload, batchSize int, locationHosts map[Location][]string) *batchDispatcher {
func newBatchDispatcher(workload []LocationWorkload, batchSize int, hostShardCnt map[string]uint, locationHosts map[Location][]string) *batchDispatcher {
sortWorkloadBySizeDesc(workload)
var size int64
for _, t := range workload {
size += t.Size
}
var shards uint
for _, sh := range hostShardCnt {
shards += sh
}
if shards == 0 {
shards = 1
}
return &batchDispatcher{
mu: sync.Mutex{},
workload: workload,
batchSize: batchSize,
locationHosts: locationHosts,
mu: sync.Mutex{},
workload: workload,
batchSize: batchSize,
expectedShardWorkload: size / int64(shards),
hostShardCnt: hostShardCnt,
locationHosts: locationHosts,
}
}

Expand Down Expand Up @@ -113,8 +129,7 @@ func (b *batchDispatcher) DispatchBatch(host string) (batch, bool) {
if dir == nil {
return batch{}, false
}
out := b.createBatch(l, t, dir)
return out, true
return b.createBatch(l, t, dir, host)
}

// Returns location for which batch should be created.
Expand Down Expand Up @@ -153,15 +168,56 @@ func (b *batchDispatcher) chooseRemoteDir(table *TableWorkload) *RemoteDirWorklo
}

// Returns batch and updates RemoteDirWorkload and its parents.
func (b *batchDispatcher) createBatch(l *LocationWorkload, t *TableWorkload, dir *RemoteDirWorkload) batch {
i := min(b.batchSize, len(dir.SSTables))
sstables := dir.SSTables[:i]
dir.SSTables = dir.SSTables[i:]
func (b *batchDispatcher) createBatch(l *LocationWorkload, t *TableWorkload, dir *RemoteDirWorkload, host string) (batch, bool) {
shardCnt := b.hostShardCnt[host]
if shardCnt == 0 {
shardCnt = 1
}

var i int
var size int64
for _, sst := range sstables {
size += sst.Size
if b.batchSize == maxBatchSize {
// Create batch containing multiple of node shard count sstables
// and size up to 5% of expected node workload.
expectedNodeWorkload := b.expectedShardWorkload * int64(shardCnt)
sizeLimit := expectedNodeWorkload / 20
for {
for j := 0; j < int(shardCnt); j++ {
if i >= len(dir.SSTables) {
break
}
size += dir.SSTables[i].Size
i++
}
if i >= len(dir.SSTables) {
break
}
if size > sizeLimit {
break
}
}
} else {
// Create batch containing node_shard_count*batch_size sstables.
i = min(b.batchSize*int(shardCnt), len(dir.SSTables))
for j := 0; j < i; j++ {
size += dir.SSTables[j].Size
}
}

if i == 0 {
return batch{}, false
}
// Extend batch if it was to leave less than
// 1 sstable per shard for the next one.
if len(dir.SSTables)-i < int(shardCnt) {
for ; i < len(dir.SSTables); i++ {
size += dir.SSTables[i].Size
}
}

sstables := dir.SSTables[:i]
dir.SSTables = dir.SSTables[i:]

dir.Size -= size
t.Size -= size
l.Size -= size
Expand All @@ -171,5 +227,26 @@ func (b *batchDispatcher) createBatch(l *LocationWorkload, t *TableWorkload, dir
RemoteSSTableDir: dir.RemoteSSTableDir,
Size: size,
SSTables: sstables,
}, true
}

func sortWorkloadBySizeDesc(workload []LocationWorkload) {
slices.SortFunc(workload, func(a, b LocationWorkload) int {
return int(b.Size - a.Size)
})
for _, loc := range workload {
slices.SortFunc(loc.Tables, func(a, b TableWorkload) int {
return int(b.Size - a.Size)
})
for _, tab := range loc.Tables {
slices.SortFunc(tab.RemoteDirs, func(a, b RemoteDirWorkload) int {
return int(b.Size - a.Size)
})
for _, dir := range tab.RemoteDirs {
slices.SortFunc(dir.SSTables, func(a, b RemoteSSTable) int {
return int(b.Size - a.Size)
})
}
}
}
}
142 changes: 142 additions & 0 deletions pkg/service/restore/batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Copyright (C) 2024 ScyllaDB

package restore

import (
"testing"

"github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
)

func TestBatchDispatcher(t *testing.T) {
l1 := backupspec.Location{
Provider: "s3",
Path: "l1",
}
l2 := backupspec.Location{
Provider: "s3",
Path: "l2",
}
workload := []LocationWorkload{
{
Location: l1,
Size: 170,
Tables: []TableWorkload{
{
Size: 60,
RemoteDirs: []RemoteDirWorkload{
{
RemoteSSTableDir: "a",
Size: 20,
SSTables: []RemoteSSTable{
{Size: 5},
{Size: 15},
},
},
{
RemoteSSTableDir: "e",
Size: 10,
SSTables: []RemoteSSTable{
{Size: 2},
{Size: 4},
{Size: 4},
},
},
{
RemoteSSTableDir: "b",
Size: 30,
SSTables: []RemoteSSTable{
{Size: 10},
{Size: 20},
},
},
},
},
{
Size: 110,
RemoteDirs: []RemoteDirWorkload{
{
RemoteSSTableDir: "c",
Size: 110,
SSTables: []RemoteSSTable{
{Size: 50},
{Size: 60},
},
},
},
},
},
},
{
Location: l2,
Size: 200,
Tables: []TableWorkload{
{
Size: 200,
RemoteDirs: []RemoteDirWorkload{
{
RemoteSSTableDir: "d",
Size: 200,
SSTables: []RemoteSSTable{
{Size: 110},
{Size: 90},
},
},
},
},
},
},
}
locationHosts := map[backupspec.Location][]string{
l1: {"h1", "h2"},
l2: {"h3"},
}
hostToShard := map[string]uint{
"h1": 1,
"h2": 2,
"h3": 3,
}

bd := newBatchDispatcher(workload, 1, hostToShard, locationHosts)

scenario := []struct {
host string
ok bool
dir string
size int64
count int
}{
{host: "h1", ok: true, dir: "c", size: 60, count: 1},
{host: "h1", ok: true, dir: "c", size: 50, count: 1},
{host: "h2", ok: true, dir: "b", size: 30, count: 2},
{host: "h3", ok: true, dir: "d", size: 200, count: 2},
{host: "h3", ok: false},
{host: "h2", ok: true, dir: "a", size: 20, count: 2},
{host: "h2", ok: true, dir: "e", size: 10, count: 3}, // batch extended with leftovers < shard_cnt
{host: "h1", ok: false},
{host: "h2", ok: false},
}

for _, step := range scenario {
b, ok := bd.DispatchBatch(step.host)
if ok != step.ok {
t.Fatalf("Step: %+v, expected ok=%v, got ok=%v", step, step.ok, ok)
}
if ok == false {
continue
}
if b.RemoteSSTableDir != step.dir {
t.Fatalf("Step: %+v, expected dir=%v, got dir=%v", step, step.dir, b.RemoteSSTableDir)
}
if b.Size != step.size {
t.Fatalf("Step: %+v, expected size=%v, got size=%v", step, step.size, b.Size)
}
if len(b.SSTables) != step.count {
t.Fatalf("Step: %+v, expected count=%v, got count=%v", step, step.count, len(b.SSTables))
}
}

if err := bd.ValidateAllDispatched(); err != nil {
t.Fatalf("Expected sstables to be batched: %s", err)
}
}
6 changes: 4 additions & 2 deletions pkg/service/restore/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type Target struct {
locationHosts map[Location][]string `json:"-"`
}

const maxBatchSize = 0

func defaultTarget() Target {
return Target{
BatchSize: 2,
Expand All @@ -53,8 +55,8 @@ func (t Target) validateProperties() error {
if _, err := SnapshotTagTime(t.SnapshotTag); err != nil {
return err
}
if t.BatchSize <= 0 {
return errors.New("batch size param has to be greater than zero")
if t.BatchSize < 0 {
return errors.New("batch size param has to be greater or equal to zero")
}
if t.Parallel < 0 {
return errors.New("parallel param has to be greater or equal to zero")
Expand Down
2 changes: 1 addition & 1 deletion pkg/service/restore/service_restore_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ func TestRestoreGetTargetUnitsViewsErrorIntegration(t *testing.T) {
{
name: "non-positive batch size",
input: "testdata/get_target/non_positive_batch_size.input.json",
error: "batch size param has to be greater than zero",
error: "batch size param has to be greater or equal to zero",
},
{
name: "no data matching keyspace pattern",
Expand Down
11 changes: 10 additions & 1 deletion pkg/service/restore/tables_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,22 @@ func (w *tablesWorker) stageRestoreData(ctx context.Context) error {
}
w.initMetrics(workload)

bd := newBatchDispatcher(workload, w.target.BatchSize, w.target.locationHosts)
hostsS := strset.New()
for _, h := range w.target.locationHosts {
hostsS.Add(h...)
}
hosts := hostsS.List()

hostToShard, err := w.client.HostsShardCount(ctx, hosts)
if err != nil {
return errors.Wrap(err, "get hosts shard count")
}
for h, sh := range hostToShard {
w.logger.Info(ctx, "Host shard count", "host", h, "shards", sh)
}

bd := newBatchDispatcher(workload, w.target.BatchSize, hostToShard, w.target.locationHosts)

f := func(n int) (err error) {
h := hosts[n]
for {
Expand Down

0 comments on commit 78982d3

Please sign in to comment.