Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore improvement: batch #4055

Merged
merged 9 commits into from
Oct 7, 2024
1 change: 1 addition & 0 deletions docs/source/sctool/partials/sctool_restore.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ options:
usage: |
Number of SSTables per shard to process in one request by one node.
Increasing the default batch size might significantly increase restore performance, as only one shard can work on restoring a single SSTable bundle.
Set to 0 for best performance (batches will contain sstables of total size up to 5% of expected total node workload).
- name: cluster
shorthand: c
usage: |
Expand Down
1 change: 1 addition & 0 deletions docs/source/sctool/partials/sctool_restore_update.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ options:
usage: |
Number of SSTables per shard to process in one request by one node.
Increasing the default batch size might significantly increase restore performance, as only one shard can work on restoring a single SSTable bundle.
Set to 0 for best performance (batches will contain sstables of total size up to 5% of expected total node workload).
- name: cluster
shorthand: c
usage: |
Expand Down
1 change: 1 addition & 0 deletions pkg/command/restore/res.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ snapshot-tag: |
batch-size: |
Number of SSTables per shard to process in one request by one node.
Increasing the default batch size might significantly increase restore performance, as only one shard can work on restoring a single SSTable bundle.
Set to 0 for best performance (batches will contain sstables of total size up to 5% of expected total node workload).
parallel: |
The maximum number of Scylla restore jobs that can be run at the same time (on different SSTables).
Expand Down
3 changes: 3 additions & 0 deletions pkg/scyllaclient/client_scylla.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,9 @@ func (c *Client) HostsShardCount(ctx context.Context, hosts []string) (map[strin

out := make(map[string]uint)
for i, h := range hosts {
if shards[i] == 0 {
return nil, errors.Errorf("host %s reported 0 shard count", h)
}
out[h] = shards[i]
}
return out, nil
Expand Down
104 changes: 87 additions & 17 deletions pkg/service/restore/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,34 @@ import (
)

type batchDispatcher struct {
mu sync.Mutex
workload []LocationWorkload
batchSize int
locationHosts map[Location][]string
mu sync.Mutex
workload []LocationWorkload
batchSize int
expectedShardWorkload int64
hostShardCnt map[string]uint
locationHosts map[Location][]string
}

func newBatchDispatcher(workload []LocationWorkload, batchSize int, locationHosts map[Location][]string) *batchDispatcher {
func newBatchDispatcher(workload []LocationWorkload, batchSize int, hostShardCnt map[string]uint, locationHosts map[Location][]string) *batchDispatcher {
sortWorkloadBySizeDesc(workload)
var size int64
for _, t := range workload {
size += t.Size
}
var shards uint
for _, sh := range hostShardCnt {
shards += sh
}
if shards == 0 {
shards = 1
}
return &batchDispatcher{
mu: sync.Mutex{},
workload: workload,
batchSize: batchSize,
locationHosts: locationHosts,
mu: sync.Mutex{},
workload: workload,
batchSize: batchSize,
expectedShardWorkload: size / int64(shards),
hostShardCnt: hostShardCnt,
locationHosts: locationHosts,
}
}

Expand Down Expand Up @@ -113,8 +129,7 @@ func (b *batchDispatcher) DispatchBatch(host string) (batch, bool) {
if dir == nil {
return batch{}, false
}
out := b.createBatch(l, t, dir)
return out, true
return b.createBatch(l, t, dir, host)
}

// Returns location for which batch should be created.
Expand Down Expand Up @@ -153,15 +168,49 @@ func (b *batchDispatcher) chooseRemoteDir(table *TableWorkload) *RemoteDirWorklo
}

// Returns batch and updates RemoteDirWorkload and its parents.
func (b *batchDispatcher) createBatch(l *LocationWorkload, t *TableWorkload, dir *RemoteDirWorkload) batch {
i := min(b.batchSize, len(dir.SSTables))
sstables := dir.SSTables[:i]
dir.SSTables = dir.SSTables[i:]
func (b *batchDispatcher) createBatch(l *LocationWorkload, t *TableWorkload, dir *RemoteDirWorkload, host string) (batch, bool) {
Michal-Leszczynski marked this conversation as resolved.
Show resolved Hide resolved
shardCnt := b.hostShardCnt[host]
if shardCnt == 0 {
shardCnt = 1
}

var i int
var size int64
for _, sst := range sstables {
size += sst.Size
if b.batchSize == maxBatchSize {
// Create batch containing multiple of node shard count sstables
// and size up to 5% of expected node workload.
expectedNodeWorkload := b.expectedShardWorkload * int64(shardCnt)
sizeLimit := expectedNodeWorkload / 20
for {
for j := 0; j < int(shardCnt); j++ {
if i >= len(dir.SSTables) {
break
}
size += dir.SSTables[i].Size
i++
}
if i >= len(dir.SSTables) {
break
}
if size > sizeLimit {
break
}
}
} else {
// Create batch containing node_shard_count*batch_size sstables.
i = min(b.batchSize*int(shardCnt), len(dir.SSTables))
for j := 0; j < i; j++ {
size += dir.SSTables[j].Size
}
}

if i == 0 {
return batch{}, false
}

sstables := dir.SSTables[:i]
dir.SSTables = dir.SSTables[i:]

dir.Size -= size
t.Size -= size
l.Size -= size
Expand All @@ -171,5 +220,26 @@ func (b *batchDispatcher) createBatch(l *LocationWorkload, t *TableWorkload, dir
RemoteSSTableDir: dir.RemoteSSTableDir,
Size: size,
SSTables: sstables,
}, true
}

func sortWorkloadBySizeDesc(workload []LocationWorkload) {
Michal-Leszczynski marked this conversation as resolved.
Show resolved Hide resolved
slices.SortFunc(workload, func(a, b LocationWorkload) int {
return int(b.Size - a.Size)
})
for _, loc := range workload {
slices.SortFunc(loc.Tables, func(a, b TableWorkload) int {
return int(b.Size - a.Size)
})
for _, tab := range loc.Tables {
slices.SortFunc(tab.RemoteDirs, func(a, b RemoteDirWorkload) int {
return int(b.Size - a.Size)
})
for _, dir := range tab.RemoteDirs {
slices.SortFunc(dir.SSTables, func(a, b RemoteSSTable) int {
return int(b.Size - a.Size)
})
}
}
}
}
6 changes: 4 additions & 2 deletions pkg/service/restore/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type Target struct {
locationHosts map[Location][]string `json:"-"`
}

const maxBatchSize = 0

func defaultTarget() Target {
return Target{
BatchSize: 2,
Expand All @@ -53,8 +55,8 @@ func (t Target) validateProperties() error {
if _, err := SnapshotTagTime(t.SnapshotTag); err != nil {
return err
}
if t.BatchSize <= 0 {
return errors.New("batch size param has to be greater than zero")
if t.BatchSize < 0 {
return errors.New("batch size param has to be greater or equal to zero")
}
if t.Parallel < 0 {
return errors.New("parallel param has to be greater or equal to zero")
Expand Down
2 changes: 1 addition & 1 deletion pkg/service/restore/service_restore_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ func TestRestoreGetTargetUnitsViewsErrorIntegration(t *testing.T) {
{
name: "non-positive batch size",
input: "testdata/get_target/non_positive_batch_size.input.json",
error: "batch size param has to be greater than zero",
error: "batch size param has to be greater or equal to zero",
},
{
name: "no data matching keyspace pattern",
Expand Down
11 changes: 10 additions & 1 deletion pkg/service/restore/tables_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,22 @@ func (w *tablesWorker) stageRestoreData(ctx context.Context) error {
}
w.initMetrics(workload)

bd := newBatchDispatcher(workload, w.target.BatchSize, w.target.locationHosts)
hostsS := strset.New()
for _, h := range w.target.locationHosts {
hostsS.Add(h...)
}
hosts := hostsS.List()

hostToShard, err := w.client.HostsShardCount(ctx, hosts)
if err != nil {
return errors.Wrap(err, "get hosts shard count")
}
for h, sh := range hostToShard {
w.logger.Info(ctx, "Host shard count", "host", h, "shards", sh)
}

bd := newBatchDispatcher(workload, w.target.BatchSize, hostToShard, w.target.locationHosts)

f := func(n int) (err error) {
h := hosts[n]
for {
Expand Down
Loading