Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore: add and fill host info in restore progress #4088

Merged
merged 7 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ require (
github.com/scylladb/go-reflectx v1.0.1
github.com/scylladb/go-set v1.0.2
github.com/scylladb/gocqlx/v2 v2.8.0
github.com/scylladb/scylla-manager/v3/pkg/managerclient v0.0.0-20241028110806-78e39cceec83
github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20241028110806-78e39cceec83
github.com/scylladb/scylla-manager/v3/swagger v0.0.0-20241028110806-78e39cceec83
github.com/scylladb/scylla-manager/v3/pkg/managerclient v0.0.0-20241030073626-e409ae491c83
github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20241030073626-e409ae491c83
github.com/scylladb/scylla-manager/v3/swagger v0.0.0-20241030073626-e409ae491c83
github.com/spf13/cobra v1.8.0
github.com/spf13/pflag v1.0.5
github.com/stoewer/go-strcase v1.3.0
Expand Down Expand Up @@ -128,8 +128,5 @@ require (
replace (
github.com/gocql/gocql => github.com/scylladb/gocql v1.12.0
github.com/rclone/rclone => github.com/scylladb/rclone v1.54.1-0.20240312172628-afe1fd2aa65e
github.com/scylladb/scylla-manager/v3/pkg/managerclient => ./v3/pkg/managerclient
github.com/scylladb/scylla-manager/v3/pkg/util => ./v3/pkg/util
github.com/scylladb/scylla-manager/v3/swagger => ./v3/swagger
google.golang.org/api v0.114.0 => github.com/scylladb/google-api-go-client v0.34.1-patched
)
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1053,6 +1053,12 @@ github.com/scylladb/google-api-go-client v0.34.1-patched h1:DW+T0HA+74o6FDr3TFzV
github.com/scylladb/google-api-go-client v0.34.1-patched/go.mod h1:RriRmS2wJXH+2yd9PRTEcR380U9AXmurWwznqVhzsSc=
github.com/scylladb/rclone v1.54.1-0.20240312172628-afe1fd2aa65e h1:lJRphCtu+nKd+mfo8whOTeFkgjMWvk8iCSlqgibKSa8=
github.com/scylladb/rclone v1.54.1-0.20240312172628-afe1fd2aa65e/go.mod h1:JGZp4EvCUK+6AM1Fe1dye5xvihTc/Bk0WnHHSCJOePM=
github.com/scylladb/scylla-manager/v3/pkg/managerclient v0.0.0-20241030073626-e409ae491c83 h1:1E64zMOGQ1N3FFIpkAKKF/XVBo6WZEL3tuXHgluqDCk=
github.com/scylladb/scylla-manager/v3/pkg/managerclient v0.0.0-20241030073626-e409ae491c83/go.mod h1:CkiJ7/3IfAZekjx7dFltltVI7HS1e7k8QKsofX6uitM=
github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20241030073626-e409ae491c83 h1:6kyuL5XXRTPbBSUessmfz6TxhYgOiIHCwC63wvDExgQ=
github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20241030073626-e409ae491c83/go.mod h1:+sPCx2oaOXmMpy/ODNNEDGJ7vCghBeKP4S7xEfMI+eA=
github.com/scylladb/scylla-manager/v3/swagger v0.0.0-20241030073626-e409ae491c83 h1:+2wvERSjYAhGushwWpqw8EemmntHOIcCjHCWzEdpXcM=
github.com/scylladb/scylla-manager/v3/swagger v0.0.0-20241030073626-e409ae491c83/go.mod h1:Oxfuz1XcXi9iV4ggSGfQdn+p6gPz6djPOegRMMe/6/s=
github.com/scylladb/termtables v0.0.0-20191203121021-c4c0b6d42ff4 h1:8qmTC5ByIXO3GP/IzBkxcZ/99VITvnIETDhdFz/om7A=
github.com/scylladb/termtables v0.0.0-20191203121021-c4c0b6d42ff4/go.mod h1:C1a7PQSMz9NShzorzCiG2fk9+xuCgLkPeCvMHYR2OWg=
github.com/sirupsen/logrus v1.7.0 h1:ShrD1U9pZB12TX0cVy0DtePoCH97K8EtX+mg7ZARUtM=
Expand Down
1 change: 1 addition & 0 deletions pkg/schema/table/table.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 12 additions & 30 deletions pkg/service/restore/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
"github.com/gocql/gocql"
"github.com/pkg/errors"
"github.com/scylladb/gocqlx/v2"
"github.com/scylladb/gocqlx/v2/qb"
"github.com/scylladb/scylla-manager/v3/pkg/schema/table"
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
"github.com/scylladb/scylla-manager/v3/pkg/service/repair"

Expand Down Expand Up @@ -192,6 +190,7 @@ type RunProgress struct {
SSTableID []string `db:"sstable_id"`

Host string // IP of the node to which SSTables are downloaded.
ShardCnt int64 // Host shard count used for bandwidth per shard calculation.
AgentJobID int64

DownloadStartedAt *time.Time
Expand All @@ -205,34 +204,6 @@ type RunProgress struct {
VersionedProgress int64
}

// ForEachTableProgress iterates over all TableProgress belonging to the same run/manifest/table as the receiver.
func (pr *RunProgress) ForEachTableProgress(session gocqlx.Session, cb func(*RunProgress)) error {
q := qb.Select(table.RestoreRunProgress.Name()).Where(
qb.Eq("cluster_id"),
qb.Eq("task_id"),
qb.Eq("run_id"),
qb.Eq("manifest_path"),
qb.Eq("keyspace_name"),
qb.Eq("table_name"),
).Query(session)
defer q.Release()

iter := q.BindMap(qb.M{
"cluster_id": pr.ClusterID,
"task_id": pr.TaskID,
"run_id": pr.RunID,
"manifest_path": pr.RemoteSSTableDir,
"keyspace_name": pr.Keyspace,
"table_name": pr.Table,
}).Iter()

res := new(RunProgress)
for iter.StructScan(res) {
cb(res)
}
return iter.Close()
}

func (pr *RunProgress) setRestoreStartedAt() {
t := timeutc.Now()
pr.RestoreStartedAt = &t
Expand Down Expand Up @@ -263,6 +234,7 @@ type Progress struct {

SnapshotTag string `json:"snapshot_tag"`
Keyspaces []KeyspaceProgress `json:"keyspaces,omitempty"`
Hosts []HostProgress `json:"hosts,omitempty"`
Views []ViewProgress `json:"views,omitempty"`
Stage Stage `json:"stage"`
}
Expand All @@ -275,6 +247,16 @@ type KeyspaceProgress struct {
Tables []TableProgress `json:"tables,omitempty"`
}

// HostProgress groups restore progress for the host.
type HostProgress struct {
Host string `json:"host"`
ShardCnt int64 `json:"shard_cnt"`
DownloadedBytes int64 `json:"downloaded_bytes"`
DownloadDuration int64 `json:"download_duration"`
StreamedBytes int64 `json:"streamed_bytes"`
StreamDuration int64 `json:"stream_duration"`
}

// TableProgress defines restore progress for the table.
type TableProgress struct {
progress
Expand Down
25 changes: 21 additions & 4 deletions pkg/service/restore/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ func (w *worker) aggregateProgress(ctx context.Context) (Progress, error) {
SnapshotTag: w.run.SnapshotTag,
Stage: w.run.Stage,
}
tableMap = make(map[tableKey]*TableProgress)
key tableKey
tableMap = make(map[tableKey]*TableProgress)
key tableKey
hostProgress = make(map[string]HostProgress)
)

// Initialize tables and their size
Expand All @@ -54,7 +55,20 @@ func (w *worker) aggregateProgress(ctx context.Context) (Progress, error) {
}

// Initialize tables' progress
err := forEachProgress(w.session, w.run.ClusterID, w.run.TaskID, w.run.ID, aggregateRestoreTableProgress(tableMap))
atp := aggregateRestoreTableProgress(tableMap)
err := forEachProgress(w.session, w.run.ClusterID, w.run.TaskID, w.run.ID, func(runProgress *RunProgress) {
atp(runProgress)
hp := hostProgress[runProgress.Host]
hp.Host = runProgress.Host
hp.ShardCnt = runProgress.ShardCnt
hp.DownloadedBytes += runProgress.Downloaded
hp.DownloadDuration += timeSub(runProgress.DownloadStartedAt, runProgress.DownloadCompletedAt).Milliseconds()
if runProgress.RestoreCompletedAt != nil {
hp.StreamedBytes += runProgress.Downloaded
hp.StreamDuration += timeSub(runProgress.RestoreStartedAt, runProgress.RestoreCompletedAt).Milliseconds()
}
hostProgress[runProgress.Host] = hp
})
if err != nil {
return p, errors.Wrap(err, "iterate over restore progress")
}
Expand Down Expand Up @@ -106,6 +120,9 @@ func (w *worker) aggregateProgress(ctx context.Context) (Progress, error) {
})
}

for _, hp := range hostProgress {
p.Hosts = append(p.Hosts, hp)
}
return p, nil
}

Expand Down Expand Up @@ -216,5 +233,5 @@ func timeSub(start, end *time.Time) time.Duration {
}
return endV.Sub(*start)
}
return 0
return time.Duration(0)
}
2 changes: 1 addition & 1 deletion pkg/service/restore/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (s *Service) Restore(ctx context.Context, clusterID, taskID, runID uuid.UUI
for _, unit := range w.run.Units {
totalBytesToRestore += unit.Size
}
tw, workerErr := newTablesWorker(w, s.repairSvc, totalBytesToRestore)
tw, workerErr := newTablesWorker(ctx, w, s.repairSvc, totalBytesToRestore)
if workerErr != nil {
err = workerErr
} else {
Expand Down
52 changes: 28 additions & 24 deletions pkg/service/restore/tables_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
type tablesWorker struct {
worker

hosts []string
hostShardCnt map[string]uint
tableVersion map[TableName]string
repairSvc *repair.Service
progress *TotalRestoreProgress
Expand Down Expand Up @@ -65,7 +67,7 @@ func (p *TotalRestoreProgress) Update(bytesRestored int64) {
p.restoredBytes += bytesRestored
}

func newTablesWorker(w worker, repairSvc *repair.Service, totalBytes int64) (*tablesWorker, error) {
func newTablesWorker(ctx context.Context, w worker, repairSvc *repair.Service, totalBytes int64) (*tablesWorker, error) {
versions := make(map[TableName]string)
for _, u := range w.run.Units {
for _, t := range u.Tables {
Expand All @@ -80,8 +82,24 @@ func newTablesWorker(w worker, repairSvc *repair.Service, totalBytes int64) (*ta
}
}

hostsS := strset.New()
for _, h := range w.target.locationHosts {
hostsS.Add(h...)
}
hosts := hostsS.List()

hostToShard, err := w.client.HostsShardCount(ctx, hosts)
if err != nil {
return nil, errors.Wrap(err, "get hosts shard count")
}
for h, sh := range hostToShard {
w.logger.Info(ctx, "Host shard count", "host", h, "shards", sh)
}

return &tablesWorker{
worker: w,
hosts: hosts,
hostShardCnt: hostToShard,
tableVersion: versions,
repairSvc: repairSvc,
progress: NewTotalRestoreProgress(totalBytes),
Expand Down Expand Up @@ -169,55 +187,41 @@ func (w *tablesWorker) stageRestoreData(ctx context.Context) error {
}
w.initMetrics(workload)

hostsS := strset.New()
for _, h := range w.target.locationHosts {
hostsS.Add(h...)
}
hosts := hostsS.List()

hostToShard, err := w.client.HostsShardCount(ctx, hosts)
if err != nil {
return errors.Wrap(err, "get hosts shard count")
}
for h, sh := range hostToShard {
w.logger.Info(ctx, "Host shard count", "host", h, "shards", sh)
}

// This defer is outside of target field check for improved safety.
// We always want to enable auto compaction outside the restore.
defer func() {
if err := w.setAutoCompaction(context.Background(), hosts, true); err != nil {
if err := w.setAutoCompaction(context.Background(), w.hosts, true); err != nil {
w.logger.Error(ctx, "Couldn't enable auto compaction", "error", err)
}
}()
if !w.target.AllowCompaction {
if err := w.setAutoCompaction(ctx, hosts, false); err != nil {
if err := w.setAutoCompaction(ctx, w.hosts, false); err != nil {
return errors.Wrapf(err, "disable auto compaction")
}
}

// Same as above.
// We always want to pin agent to CPUs outside the restore.
defer func() {
if err := w.pinAgentCPU(context.Background(), hosts, true); err != nil {
if err := w.pinAgentCPU(context.Background(), w.hosts, true); err != nil {
w.logger.Error(ctx, "Couldn't re-pin agent to CPUs", "error", err)
}
}()
if w.target.UnpinAgentCPU {
if err := w.pinAgentCPU(ctx, hosts, false); err != nil {
if err := w.pinAgentCPU(ctx, w.hosts, false); err != nil {
return errors.Wrapf(err, "unpin agent from CPUs")
}
}

bd := newBatchDispatcher(workload, w.target.BatchSize, hostToShard, w.target.locationHosts)
bd := newBatchDispatcher(workload, w.target.BatchSize, w.hostShardCnt, w.target.locationHosts)

f := func(n int) error {
host := hosts[n]
host := w.hosts[n]
dc, err := w.client.HostDatacenter(ctx, host)
if err != nil {
return errors.Wrapf(err, "get host %s data center", host)
}
hi := w.hostInfo(host, dc, hostToShard[host])
hi := w.hostInfo(host, dc, w.hostShardCnt[host])
w.logger.Info(ctx, "Host info", "host", hi.Host, "transfers", hi.Transfers, "rate limit", hi.RateLimit)
for {
if ctx.Err() != nil {
Expand Down Expand Up @@ -252,12 +256,12 @@ func (w *tablesWorker) stageRestoreData(ctx context.Context) error {

notify := func(n int, err error) {
w.logger.Error(ctx, "Failed to restore files on host",
"host", hosts[n],
"host", w.hosts[n],
"error", err,
)
}

err = parallel.Run(len(hosts), w.target.Parallel, f, notify)
err = parallel.Run(len(w.hosts), w.target.Parallel, f, notify)
if err == nil {
if ctx.Err() != nil {
return ctx.Err()
Expand Down
1 change: 1 addition & 0 deletions pkg/service/restore/tablesdir_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func (w *tablesWorker) newRunProgress(ctx context.Context, hi HostInfo, b batch)
Keyspace: b.Keyspace,
Table: b.Table,
Host: hi.Host,
ShardCnt: int64(w.hostShardCnt[hi.Host]),
AgentJobID: jobID,
SSTableID: b.IDs(),
VersionedProgress: versionedPr,
Expand Down
4 changes: 3 additions & 1 deletion schema/v3.4.0.cql
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
ALTER TABLE restore_run DROP location;
ALTER TABLE restore_run DROP manifest_path;
ALTER TABLE restore_run DROP keyspace_name;
ALTER TABLE restore_run DROP table_name;
ALTER TABLE restore_run DROP table_name;

ALTER TABLE restore_run_progress ADD shard_cnt bigint;
4 changes: 2 additions & 2 deletions v3/pkg/managerclient/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ require (
github.com/lnquy/cron v1.1.1
github.com/pkg/errors v0.9.1
github.com/scylladb/go-set v1.0.2
github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20241028110806-78e39cceec83
github.com/scylladb/scylla-manager/v3/swagger v0.0.0-20241028110806-78e39cceec83
github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20241030073626-e409ae491c83
github.com/scylladb/scylla-manager/v3/swagger v0.0.0-20241030073626-e409ae491c83
github.com/scylladb/termtables v0.0.0-20191203121021-c4c0b6d42ff4
)

Expand Down
8 changes: 4 additions & 4 deletions v3/pkg/managerclient/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@ github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDN
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
github.com/scylladb/go-set v1.0.2 h1:SkvlMCKhP0wyyct6j+0IHJkBkSZL+TDzZ4E7f7BCcRE=
github.com/scylladb/go-set v1.0.2/go.mod h1:DkpGd78rljTxKAnTDPFqXSGxvETQnJyuSOQwsHycqfs=
github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20241028110806-78e39cceec83 h1:yy3k0OEYbsmgDenYuJd7B/nftAUI7VMB/WUoX1Iv/6I=
github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20241028110806-78e39cceec83/go.mod h1:+sPCx2oaOXmMpy/ODNNEDGJ7vCghBeKP4S7xEfMI+eA=
github.com/scylladb/scylla-manager/v3/swagger v0.0.0-20241028110806-78e39cceec83 h1:cCbhUYGzQ/xbjSN+w3g/H2ZcIoEn6OGRu3l5a+et/vs=
github.com/scylladb/scylla-manager/v3/swagger v0.0.0-20241028110806-78e39cceec83/go.mod h1:Oxfuz1XcXi9iV4ggSGfQdn+p6gPz6djPOegRMMe/6/s=
github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20241030073626-e409ae491c83 h1:6kyuL5XXRTPbBSUessmfz6TxhYgOiIHCwC63wvDExgQ=
github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20241030073626-e409ae491c83/go.mod h1:+sPCx2oaOXmMpy/ODNNEDGJ7vCghBeKP4S7xEfMI+eA=
github.com/scylladb/scylla-manager/v3/swagger v0.0.0-20241030073626-e409ae491c83 h1:+2wvERSjYAhGushwWpqw8EemmntHOIcCjHCWzEdpXcM=
github.com/scylladb/scylla-manager/v3/swagger v0.0.0-20241030073626-e409ae491c83/go.mod h1:Oxfuz1XcXi9iV4ggSGfQdn+p6gPz6djPOegRMMe/6/s=
github.com/scylladb/termtables v0.0.0-20191203121021-c4c0b6d42ff4 h1:8qmTC5ByIXO3GP/IzBkxcZ/99VITvnIETDhdFz/om7A=
github.com/scylladb/termtables v0.0.0-20191203121021-c4c0b6d42ff4/go.mod h1:C1a7PQSMz9NShzorzCiG2fk9+xuCgLkPeCvMHYR2OWg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
Loading
Loading