Skip to content

Commit

Permalink
restore: add and fill host info in restore progress (#4088)
Browse files Browse the repository at this point in the history
Restore: add and fill host info in restore progress

* chore(go.mod): remove replace directive to SM submodules

It was a left-over after feature development:/

* chore(go.mod): bump SM submodules deps

* feat(schema): add shard cnt to restore_run_progress

It's going to be needed for calculating per shard
download/stream bandwidth in progress command.

* feat(restore): add and fill shard cnt in restore run progress

This commit also moves host shard info to the tablesWorker,
as it is commonly reused during restore procedure.

* feat(restore): add and fill host info in progress

This allows to calculate download/stream per shard
bandwidth in 'sctool progress' display.

* feat(managerclient): display bandwidth in sctool progress

Fixes #4042

* feat(managerclient): include B or iB in SizeSuffix display

It is nicer to see:
"Size: 10B" instead of "Size: 10" or
"Size: 20KiB" instead of "Size: 20k".
  • Loading branch information
Michal-Leszczynski authored Nov 4, 2024
1 parent 8121c2e commit aba3560
Show file tree
Hide file tree
Showing 21 changed files with 486 additions and 105 deletions.
9 changes: 3 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ require (
github.com/scylladb/go-reflectx v1.0.1
github.com/scylladb/go-set v1.0.2
github.com/scylladb/gocqlx/v2 v2.8.0
github.com/scylladb/scylla-manager/v3/pkg/managerclient v0.0.0-20241028110806-78e39cceec83
github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20241028110806-78e39cceec83
github.com/scylladb/scylla-manager/v3/swagger v0.0.0-20241028110806-78e39cceec83
github.com/scylladb/scylla-manager/v3/pkg/managerclient v0.0.0-20241030073626-e409ae491c83
github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20241030073626-e409ae491c83
github.com/scylladb/scylla-manager/v3/swagger v0.0.0-20241030073626-e409ae491c83
github.com/spf13/cobra v1.8.0
github.com/spf13/pflag v1.0.5
github.com/stoewer/go-strcase v1.3.0
Expand Down Expand Up @@ -128,8 +128,5 @@ require (
replace (
github.com/gocql/gocql => github.com/scylladb/gocql v1.12.0
github.com/rclone/rclone => github.com/scylladb/rclone v1.54.1-0.20240312172628-afe1fd2aa65e
github.com/scylladb/scylla-manager/v3/pkg/managerclient => ./v3/pkg/managerclient
github.com/scylladb/scylla-manager/v3/pkg/util => ./v3/pkg/util
github.com/scylladb/scylla-manager/v3/swagger => ./v3/swagger
google.golang.org/api v0.114.0 => github.com/scylladb/google-api-go-client v0.34.1-patched
)
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1053,6 +1053,12 @@ github.com/scylladb/google-api-go-client v0.34.1-patched h1:DW+T0HA+74o6FDr3TFzV
github.com/scylladb/google-api-go-client v0.34.1-patched/go.mod h1:RriRmS2wJXH+2yd9PRTEcR380U9AXmurWwznqVhzsSc=
github.com/scylladb/rclone v1.54.1-0.20240312172628-afe1fd2aa65e h1:lJRphCtu+nKd+mfo8whOTeFkgjMWvk8iCSlqgibKSa8=
github.com/scylladb/rclone v1.54.1-0.20240312172628-afe1fd2aa65e/go.mod h1:JGZp4EvCUK+6AM1Fe1dye5xvihTc/Bk0WnHHSCJOePM=
github.com/scylladb/scylla-manager/v3/pkg/managerclient v0.0.0-20241030073626-e409ae491c83 h1:1E64zMOGQ1N3FFIpkAKKF/XVBo6WZEL3tuXHgluqDCk=
github.com/scylladb/scylla-manager/v3/pkg/managerclient v0.0.0-20241030073626-e409ae491c83/go.mod h1:CkiJ7/3IfAZekjx7dFltltVI7HS1e7k8QKsofX6uitM=
github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20241030073626-e409ae491c83 h1:6kyuL5XXRTPbBSUessmfz6TxhYgOiIHCwC63wvDExgQ=
github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20241030073626-e409ae491c83/go.mod h1:+sPCx2oaOXmMpy/ODNNEDGJ7vCghBeKP4S7xEfMI+eA=
github.com/scylladb/scylla-manager/v3/swagger v0.0.0-20241030073626-e409ae491c83 h1:+2wvERSjYAhGushwWpqw8EemmntHOIcCjHCWzEdpXcM=
github.com/scylladb/scylla-manager/v3/swagger v0.0.0-20241030073626-e409ae491c83/go.mod h1:Oxfuz1XcXi9iV4ggSGfQdn+p6gPz6djPOegRMMe/6/s=
github.com/scylladb/termtables v0.0.0-20191203121021-c4c0b6d42ff4 h1:8qmTC5ByIXO3GP/IzBkxcZ/99VITvnIETDhdFz/om7A=
github.com/scylladb/termtables v0.0.0-20191203121021-c4c0b6d42ff4/go.mod h1:C1a7PQSMz9NShzorzCiG2fk9+xuCgLkPeCvMHYR2OWg=
github.com/sirupsen/logrus v1.7.0 h1:ShrD1U9pZB12TX0cVy0DtePoCH97K8EtX+mg7ZARUtM=
Expand Down
1 change: 1 addition & 0 deletions pkg/schema/table/table.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 12 additions & 30 deletions pkg/service/restore/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
"github.com/gocql/gocql"
"github.com/pkg/errors"
"github.com/scylladb/gocqlx/v2"
"github.com/scylladb/gocqlx/v2/qb"
"github.com/scylladb/scylla-manager/v3/pkg/schema/table"
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
"github.com/scylladb/scylla-manager/v3/pkg/service/repair"

Expand Down Expand Up @@ -192,6 +190,7 @@ type RunProgress struct {
SSTableID []string `db:"sstable_id"`

Host string // IP of the node to which SSTables are downloaded.
ShardCnt int64 // Host shard count used for bandwidth per shard calculation.
AgentJobID int64

DownloadStartedAt *time.Time
Expand All @@ -205,34 +204,6 @@ type RunProgress struct {
VersionedProgress int64
}

// ForEachTableProgress iterates over all TableProgress belonging to the same run/manifest/table as the receiver.
func (pr *RunProgress) ForEachTableProgress(session gocqlx.Session, cb func(*RunProgress)) error {
q := qb.Select(table.RestoreRunProgress.Name()).Where(
qb.Eq("cluster_id"),
qb.Eq("task_id"),
qb.Eq("run_id"),
qb.Eq("manifest_path"),
qb.Eq("keyspace_name"),
qb.Eq("table_name"),
).Query(session)
defer q.Release()

iter := q.BindMap(qb.M{
"cluster_id": pr.ClusterID,
"task_id": pr.TaskID,
"run_id": pr.RunID,
"manifest_path": pr.RemoteSSTableDir,
"keyspace_name": pr.Keyspace,
"table_name": pr.Table,
}).Iter()

res := new(RunProgress)
for iter.StructScan(res) {
cb(res)
}
return iter.Close()
}

func (pr *RunProgress) setRestoreStartedAt() {
t := timeutc.Now()
pr.RestoreStartedAt = &t
Expand Down Expand Up @@ -263,6 +234,7 @@ type Progress struct {

SnapshotTag string `json:"snapshot_tag"`
Keyspaces []KeyspaceProgress `json:"keyspaces,omitempty"`
Hosts []HostProgress `json:"hosts,omitempty"`
Views []ViewProgress `json:"views,omitempty"`
Stage Stage `json:"stage"`
}
Expand All @@ -275,6 +247,16 @@ type KeyspaceProgress struct {
Tables []TableProgress `json:"tables,omitempty"`
}

// HostProgress groups restore progress for the host.
type HostProgress struct {
Host string `json:"host"`
ShardCnt int64 `json:"shard_cnt"`
DownloadedBytes int64 `json:"downloaded_bytes"`
DownloadDuration int64 `json:"download_duration"`
StreamedBytes int64 `json:"streamed_bytes"`
StreamDuration int64 `json:"stream_duration"`
}

// TableProgress defines restore progress for the table.
type TableProgress struct {
progress
Expand Down
25 changes: 21 additions & 4 deletions pkg/service/restore/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ func (w *worker) aggregateProgress(ctx context.Context) (Progress, error) {
SnapshotTag: w.run.SnapshotTag,
Stage: w.run.Stage,
}
tableMap = make(map[tableKey]*TableProgress)
key tableKey
tableMap = make(map[tableKey]*TableProgress)
key tableKey
hostProgress = make(map[string]HostProgress)
)

// Initialize tables and their size
Expand All @@ -54,7 +55,20 @@ func (w *worker) aggregateProgress(ctx context.Context) (Progress, error) {
}

// Initialize tables' progress
err := forEachProgress(w.session, w.run.ClusterID, w.run.TaskID, w.run.ID, aggregateRestoreTableProgress(tableMap))
atp := aggregateRestoreTableProgress(tableMap)
err := forEachProgress(w.session, w.run.ClusterID, w.run.TaskID, w.run.ID, func(runProgress *RunProgress) {
atp(runProgress)
hp := hostProgress[runProgress.Host]
hp.Host = runProgress.Host
hp.ShardCnt = runProgress.ShardCnt
hp.DownloadedBytes += runProgress.Downloaded
hp.DownloadDuration += timeSub(runProgress.DownloadStartedAt, runProgress.DownloadCompletedAt).Milliseconds()
if runProgress.RestoreCompletedAt != nil {
hp.StreamedBytes += runProgress.Downloaded
hp.StreamDuration += timeSub(runProgress.RestoreStartedAt, runProgress.RestoreCompletedAt).Milliseconds()
}
hostProgress[runProgress.Host] = hp
})
if err != nil {
return p, errors.Wrap(err, "iterate over restore progress")
}
Expand Down Expand Up @@ -106,6 +120,9 @@ func (w *worker) aggregateProgress(ctx context.Context) (Progress, error) {
})
}

for _, hp := range hostProgress {
p.Hosts = append(p.Hosts, hp)
}
return p, nil
}

Expand Down Expand Up @@ -216,5 +233,5 @@ func timeSub(start, end *time.Time) time.Duration {
}
return endV.Sub(*start)
}
return 0
return time.Duration(0)
}
2 changes: 1 addition & 1 deletion pkg/service/restore/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (s *Service) Restore(ctx context.Context, clusterID, taskID, runID uuid.UUI
for _, unit := range w.run.Units {
totalBytesToRestore += unit.Size
}
tw, workerErr := newTablesWorker(w, s.repairSvc, totalBytesToRestore)
tw, workerErr := newTablesWorker(ctx, w, s.repairSvc, totalBytesToRestore)
if workerErr != nil {
err = workerErr
} else {
Expand Down
52 changes: 28 additions & 24 deletions pkg/service/restore/tables_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
type tablesWorker struct {
worker

hosts []string
hostShardCnt map[string]uint
tableVersion map[TableName]string
repairSvc *repair.Service
progress *TotalRestoreProgress
Expand Down Expand Up @@ -65,7 +67,7 @@ func (p *TotalRestoreProgress) Update(bytesRestored int64) {
p.restoredBytes += bytesRestored
}

func newTablesWorker(w worker, repairSvc *repair.Service, totalBytes int64) (*tablesWorker, error) {
func newTablesWorker(ctx context.Context, w worker, repairSvc *repair.Service, totalBytes int64) (*tablesWorker, error) {
versions := make(map[TableName]string)
for _, u := range w.run.Units {
for _, t := range u.Tables {
Expand All @@ -80,8 +82,24 @@ func newTablesWorker(w worker, repairSvc *repair.Service, totalBytes int64) (*ta
}
}

hostsS := strset.New()
for _, h := range w.target.locationHosts {
hostsS.Add(h...)
}
hosts := hostsS.List()

hostToShard, err := w.client.HostsShardCount(ctx, hosts)
if err != nil {
return nil, errors.Wrap(err, "get hosts shard count")
}
for h, sh := range hostToShard {
w.logger.Info(ctx, "Host shard count", "host", h, "shards", sh)
}

return &tablesWorker{
worker: w,
hosts: hosts,
hostShardCnt: hostToShard,
tableVersion: versions,
repairSvc: repairSvc,
progress: NewTotalRestoreProgress(totalBytes),
Expand Down Expand Up @@ -169,55 +187,41 @@ func (w *tablesWorker) stageRestoreData(ctx context.Context) error {
}
w.initMetrics(workload)

hostsS := strset.New()
for _, h := range w.target.locationHosts {
hostsS.Add(h...)
}
hosts := hostsS.List()

hostToShard, err := w.client.HostsShardCount(ctx, hosts)
if err != nil {
return errors.Wrap(err, "get hosts shard count")
}
for h, sh := range hostToShard {
w.logger.Info(ctx, "Host shard count", "host", h, "shards", sh)
}

// This defer is outside of target field check for improved safety.
// We always want to enable auto compaction outside the restore.
defer func() {
if err := w.setAutoCompaction(context.Background(), hosts, true); err != nil {
if err := w.setAutoCompaction(context.Background(), w.hosts, true); err != nil {
w.logger.Error(ctx, "Couldn't enable auto compaction", "error", err)
}
}()
if !w.target.AllowCompaction {
if err := w.setAutoCompaction(ctx, hosts, false); err != nil {
if err := w.setAutoCompaction(ctx, w.hosts, false); err != nil {
return errors.Wrapf(err, "disable auto compaction")
}
}

// Same as above.
// We always want to pin agent to CPUs outside the restore.
defer func() {
if err := w.pinAgentCPU(context.Background(), hosts, true); err != nil {
if err := w.pinAgentCPU(context.Background(), w.hosts, true); err != nil {
w.logger.Error(ctx, "Couldn't re-pin agent to CPUs", "error", err)
}
}()
if w.target.UnpinAgentCPU {
if err := w.pinAgentCPU(ctx, hosts, false); err != nil {
if err := w.pinAgentCPU(ctx, w.hosts, false); err != nil {
return errors.Wrapf(err, "unpin agent from CPUs")
}
}

bd := newBatchDispatcher(workload, w.target.BatchSize, hostToShard, w.target.locationHosts)
bd := newBatchDispatcher(workload, w.target.BatchSize, w.hostShardCnt, w.target.locationHosts)

f := func(n int) error {
host := hosts[n]
host := w.hosts[n]
dc, err := w.client.HostDatacenter(ctx, host)
if err != nil {
return errors.Wrapf(err, "get host %s data center", host)
}
hi := w.hostInfo(host, dc, hostToShard[host])
hi := w.hostInfo(host, dc, w.hostShardCnt[host])
w.logger.Info(ctx, "Host info", "host", hi.Host, "transfers", hi.Transfers, "rate limit", hi.RateLimit)
for {
if ctx.Err() != nil {
Expand Down Expand Up @@ -252,12 +256,12 @@ func (w *tablesWorker) stageRestoreData(ctx context.Context) error {

notify := func(n int, err error) {
w.logger.Error(ctx, "Failed to restore files on host",
"host", hosts[n],
"host", w.hosts[n],
"error", err,
)
}

err = parallel.Run(len(hosts), w.target.Parallel, f, notify)
err = parallel.Run(len(w.hosts), w.target.Parallel, f, notify)
if err == nil {
if ctx.Err() != nil {
return ctx.Err()
Expand Down
1 change: 1 addition & 0 deletions pkg/service/restore/tablesdir_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func (w *tablesWorker) newRunProgress(ctx context.Context, hi HostInfo, b batch)
Keyspace: b.Keyspace,
Table: b.Table,
Host: hi.Host,
ShardCnt: int64(w.hostShardCnt[hi.Host]),
AgentJobID: jobID,
SSTableID: b.IDs(),
VersionedProgress: versionedPr,
Expand Down
4 changes: 3 additions & 1 deletion schema/v3.4.0.cql
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
ALTER TABLE restore_run DROP location;
ALTER TABLE restore_run DROP manifest_path;
ALTER TABLE restore_run DROP keyspace_name;
ALTER TABLE restore_run DROP table_name;
ALTER TABLE restore_run DROP table_name;

ALTER TABLE restore_run_progress ADD shard_cnt bigint;
4 changes: 2 additions & 2 deletions v3/pkg/managerclient/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ require (
github.com/lnquy/cron v1.1.1
github.com/pkg/errors v0.9.1
github.com/scylladb/go-set v1.0.2
github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20241028110806-78e39cceec83
github.com/scylladb/scylla-manager/v3/swagger v0.0.0-20241028110806-78e39cceec83
github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20241030073626-e409ae491c83
github.com/scylladb/scylla-manager/v3/swagger v0.0.0-20241030073626-e409ae491c83
github.com/scylladb/termtables v0.0.0-20191203121021-c4c0b6d42ff4
)

Expand Down
8 changes: 4 additions & 4 deletions v3/pkg/managerclient/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@ github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDN
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
github.com/scylladb/go-set v1.0.2 h1:SkvlMCKhP0wyyct6j+0IHJkBkSZL+TDzZ4E7f7BCcRE=
github.com/scylladb/go-set v1.0.2/go.mod h1:DkpGd78rljTxKAnTDPFqXSGxvETQnJyuSOQwsHycqfs=
github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20241028110806-78e39cceec83 h1:yy3k0OEYbsmgDenYuJd7B/nftAUI7VMB/WUoX1Iv/6I=
github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20241028110806-78e39cceec83/go.mod h1:+sPCx2oaOXmMpy/ODNNEDGJ7vCghBeKP4S7xEfMI+eA=
github.com/scylladb/scylla-manager/v3/swagger v0.0.0-20241028110806-78e39cceec83 h1:cCbhUYGzQ/xbjSN+w3g/H2ZcIoEn6OGRu3l5a+et/vs=
github.com/scylladb/scylla-manager/v3/swagger v0.0.0-20241028110806-78e39cceec83/go.mod h1:Oxfuz1XcXi9iV4ggSGfQdn+p6gPz6djPOegRMMe/6/s=
github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20241030073626-e409ae491c83 h1:6kyuL5XXRTPbBSUessmfz6TxhYgOiIHCwC63wvDExgQ=
github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20241030073626-e409ae491c83/go.mod h1:+sPCx2oaOXmMpy/ODNNEDGJ7vCghBeKP4S7xEfMI+eA=
github.com/scylladb/scylla-manager/v3/swagger v0.0.0-20241030073626-e409ae491c83 h1:+2wvERSjYAhGushwWpqw8EemmntHOIcCjHCWzEdpXcM=
github.com/scylladb/scylla-manager/v3/swagger v0.0.0-20241030073626-e409ae491c83/go.mod h1:Oxfuz1XcXi9iV4ggSGfQdn+p6gPz6djPOegRMMe/6/s=
github.com/scylladb/termtables v0.0.0-20191203121021-c4c0b6d42ff4 h1:8qmTC5ByIXO3GP/IzBkxcZ/99VITvnIETDhdFz/om7A=
github.com/scylladb/termtables v0.0.0-20191203121021-c4c0b6d42ff4/go.mod h1:C1a7PQSMz9NShzorzCiG2fk9+xuCgLkPeCvMHYR2OWg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
Loading

0 comments on commit aba3560

Please sign in to comment.