diff --git a/pkg/metrics/restore.go b/pkg/metrics/restore.go index 509adf439f..42c95815ab 100644 --- a/pkg/metrics/restore.go +++ b/pkg/metrics/restore.go @@ -3,6 +3,8 @@ package metrics import ( + "time" + "github.com/prometheus/client_golang/prometheus" "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" @@ -10,11 +12,15 @@ import ( ) type RestoreMetrics struct { - batchSize *prometheus.GaugeVec - remainingBytes *prometheus.GaugeVec - state *prometheus.GaugeVec - progress *prometheus.GaugeVec - viewBuildStatus *prometheus.GaugeVec + batchSize *prometheus.GaugeVec + remainingBytes *prometheus.GaugeVec + state *prometheus.GaugeVec + downloadedBytes *prometheus.GaugeVec + downloadDuration *prometheus.GaugeVec + streamedBytes *prometheus.GaugeVec + streamDuration *prometheus.GaugeVec + progress *prometheus.GaugeVec + viewBuildStatus *prometheus.GaugeVec } func NewRestoreMetrics() RestoreMetrics { @@ -24,9 +30,13 @@ func NewRestoreMetrics() RestoreMetrics { batchSize: g("Cumulative size of the batches of files taken by the host to restore the data.", "batch_size", "cluster", "host"), remainingBytes: g("Remaining bytes of backup to be restored yet.", "remaining_bytes", "cluster", "snapshot_tag", "location", "dc", "node", "keyspace", "table"), - state: g("Defines current state of the restore process (idle/download/load/error).", "state", "cluster", "location", "snapshot_tag", "host"), - progress: g("Defines current progress of the restore process.", "progress", "cluster", "snapshot_tag"), - viewBuildStatus: g("Defines build status of recreated view.", "view_build_status", "cluster", "keyspace", "view"), + state: g("Defines current state of the restore process (idle/download/load/error).", "state", "cluster", "location", "snapshot_tag", "host"), + downloadedBytes: g("Downloaded bytes", "downloaded_bytes", "cluster", "location", "host"), + downloadDuration: g("Download duration in ms", "download_duration", "cluster", "location", "host"), + streamedBytes: g("Load&Streamed bytes", "streamed_bytes", "cluster", "host"), + streamDuration: g("Load&Stream duration in ms", "stream_duration", "cluster", "host"), + progress: g("Defines current progress of the restore process.", "progress", "cluster", "snapshot_tag"), + viewBuildStatus: g("Defines build status of recreated view.", "view_build_status", "cluster", "keyspace", "view"), } } @@ -41,6 +51,10 @@ func (m RestoreMetrics) all() []prometheus.Collector { m.batchSize, m.remainingBytes, m.state, + m.downloadedBytes, + m.downloadDuration, + m.streamedBytes, + m.streamDuration, m.progress, m.viewBuildStatus, } @@ -154,6 +168,44 @@ func (m RestoreMetrics) SetRestoreState(clusterID uuid.UUID, location backupspec m.state.With(l).Set(float64(state)) } +// IncreaseRestoreDownloadedBytes increases restore "downloaded_bytes" metric. +func (m RestoreMetrics) IncreaseRestoreDownloadedBytes(clusterID uuid.UUID, location, host string, bytes int64) { + l := prometheus.Labels{ + "cluster": clusterID.String(), + "location": location, + "host": host, + } + m.downloadedBytes.With(l).Add(float64(bytes)) +} + +// IncreaseRestoreDownloadDuration increases restore "download_duration" metric. +func (m RestoreMetrics) IncreaseRestoreDownloadDuration(clusterID uuid.UUID, location, host string, d time.Duration) { + l := prometheus.Labels{ + "cluster": clusterID.String(), + "location": location, + "host": host, + } + m.downloadDuration.With(l).Add(float64(d.Milliseconds())) +} + +// IncreaseRestoreStreamedBytes increases restore "streamed_bytes" metric. +func (m RestoreMetrics) IncreaseRestoreStreamedBytes(clusterID uuid.UUID, host string, bytes int64) { + l := prometheus.Labels{ + "cluster": clusterID.String(), + "host": host, + } + m.streamedBytes.With(l).Add(float64(bytes)) +} + +// IncreaseRestoreStreamDuration increases restore "stream_duration" metric. +func (m RestoreMetrics) IncreaseRestoreStreamDuration(clusterID uuid.UUID, host string, d time.Duration) { + l := prometheus.Labels{ + "cluster": clusterID.String(), + "host": host, + } + m.streamDuration.With(l).Add(float64(d.Milliseconds())) +} + // ViewBuildStatus defines build status of a view. type ViewBuildStatus int diff --git a/pkg/service/restore/progress.go b/pkg/service/restore/progress.go index 15727dbcae..f99389d00e 100644 --- a/pkg/service/restore/progress.go +++ b/pkg/service/restore/progress.go @@ -11,6 +11,7 @@ import ( "github.com/scylladb/gocqlx/v2/qb" "github.com/scylladb/scylla-manager/v3/pkg/schema/table" "github.com/scylladb/scylla-manager/v3/pkg/scyllaclient" + "github.com/scylladb/scylla-manager/v3/pkg/util/timeutc" "github.com/scylladb/scylla-manager/v3/pkg/util/uuid" ) @@ -203,3 +204,17 @@ func forEachProgress(s gocqlx.Session, clusterID, taskID, runID uuid.UUID, cb fu q.Release() return err } + +// Returns duration between end and start. +// If start is nil, returns 0. +// If end is nil, returns duration between now and start. +func timeSub(start, end *time.Time) time.Duration { + if start != nil { + endV := timeutc.Now() + if end != nil { + endV = *end + } + return endV.Sub(*start) + } + return 0 +} diff --git a/pkg/service/restore/tables_worker.go b/pkg/service/restore/tables_worker.go index 2a6ee0a37f..ee5f33c1ee 100644 --- a/pkg/service/restore/tables_worker.go +++ b/pkg/service/restore/tables_worker.go @@ -10,7 +10,6 @@ import ( "github.com/pkg/errors" "github.com/scylladb/go-set/strset" - "github.com/scylladb/scylla-manager/v3/pkg/metrics" . "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" "github.com/scylladb/scylla-manager/v3/pkg/service/repair" "github.com/scylladb/scylla-manager/v3/pkg/util/parallel" @@ -91,11 +90,6 @@ func newTablesWorker(w worker, repairSvc *repair.Service, totalBytes int64) (*ta // restore files from every location specified in restore target. func (w *tablesWorker) restore(ctx context.Context) error { - // Init metrics only on fresh start - if w.run.PrevID == uuid.Nil { - w.initRestoreMetrics(ctx) - } - stageFunc := map[Stage]func() error{ StageDropViews: func() error { for _, v := range w.run.Views { @@ -235,14 +229,7 @@ func (w *tablesWorker) stageRestoreData(ctx context.Context) error { w.logger.Info(ctx, "No more batches to restore", "host", hi.Host) return nil } - w.metrics.IncreaseBatchSize(w.run.ClusterID, hi.Host, b.Size) - w.logger.Info(ctx, "Got batch to restore", - "host", hi.Host, - "keyspace", b.Keyspace, - "table", b.Table, - "size", b.Size, - "sstable count", len(b.SSTables), - ) + w.onBatchDispatch(ctx, b, host) pr, err := w.newRunProgress(ctx, hi, b) if err != nil { @@ -260,7 +247,6 @@ func (w *tablesWorker) stageRestoreData(ctx context.Context) error { continue } bd.ReportSuccess(b) - w.decreaseRemainingBytesMetric(b) } } @@ -314,54 +300,6 @@ func (w *tablesWorker) stageRepair(ctx context.Context) error { return w.repairSvc.Repair(ctx, w.run.ClusterID, w.run.RepairTaskID, repairRunID, repairTarget) } -func (w *tablesWorker) initRestoreMetrics(ctx context.Context) { - for _, location := range w.target.Location { - err := w.forEachManifest( - ctx, - location, - func(miwc ManifestInfoWithContent) error { - sizePerTableAndKeyspace := make(map[string]map[string]int64) - err := miwc.ForEachIndexIterWithError( - nil, - func(fm FilesMeta) error { - if !unitsContainTable(w.run.Units, fm.Keyspace, fm.Table) { - return nil - } - - if sizePerTableAndKeyspace[fm.Keyspace] == nil { - sizePerTableAndKeyspace[fm.Keyspace] = make(map[string]int64) - } - sizePerTableAndKeyspace[fm.Keyspace][fm.Table] += fm.Size - return nil - }) - for kspace, sizePerTable := range sizePerTableAndKeyspace { - for table, size := range sizePerTable { - labels := metrics.RestoreBytesLabels{ - ClusterID: w.run.ClusterID.String(), - SnapshotTag: w.target.SnapshotTag, - Location: location.String(), - DC: miwc.DC, - Node: miwc.NodeID, - Keyspace: kspace, - Table: table, - } - w.metrics.SetRemainingBytes(labels, size) - } - } - return err - }) - progressLabels := metrics.RestoreProgressLabels{ - ClusterID: w.run.ClusterID.String(), - SnapshotTag: w.target.SnapshotTag, - } - w.metrics.SetProgress(progressLabels, 0) - if err != nil { - w.logger.Info(ctx, "Couldn't count restore data size") - continue - } - } -} - // Disables auto compaction on all provided hosts and units. func (w *tablesWorker) setAutoCompaction(ctx context.Context, hosts []string, enabled bool) error { f := w.client.EnableAutoCompaction diff --git a/pkg/service/restore/tablesdir_worker.go b/pkg/service/restore/tablesdir_worker.go index 199204c940..0f3d81d839 100644 --- a/pkg/service/restore/tablesdir_worker.go +++ b/pkg/service/restore/tablesdir_worker.go @@ -20,8 +20,6 @@ func (w *tablesWorker) restoreBatch(ctx context.Context, b batch, pr *RunProgres if err != nil { w.metrics.SetRestoreState(w.run.ClusterID, b.Location, w.run.SnapshotTag, pr.Host, metrics.RestoreStateError) w.cleanupRunProgress(context.Background(), pr) - } else { - w.metrics.SetRestoreState(w.run.ClusterID, b.Location, w.run.SnapshotTag, pr.Host, metrics.RestoreStateIdle) } }() @@ -38,13 +36,11 @@ func (w *tablesWorker) restoreBatch(ctx context.Context, b batch, pr *RunProgres return errors.Wrap(err, "call load and stream") } } - w.logger.Info(ctx, "Restored batch", "host", pr.Host, "sstable_id", pr.SSTableID) return nil } // waitJob waits for rclone job to finish while updating its progress. func (w *tablesWorker) waitJob(ctx context.Context, b batch, pr *RunProgress) (err error) { - w.metrics.SetRestoreState(w.run.ClusterID, b.Location, w.run.SnapshotTag, pr.Host, metrics.RestoreStateDownloading) w.logger.Info(ctx, "Waiting for job", "host", pr.Host, "job_id", pr.AgentJobID) defer func() { @@ -71,44 +67,21 @@ func (w *tablesWorker) waitJob(ctx context.Context, b batch, pr *RunProgress) (e case scyllaclient.JobError: return errors.Errorf("job error (%d): %s", pr.AgentJobID, job.Error) case scyllaclient.JobSuccess: - w.updateDownloadProgress(ctx, pr, job) - w.logger.Info(ctx, "Batch download completed", "host", pr.Host, "job_id", pr.AgentJobID) + w.onDownloadUpdate(ctx, b, pr, job) return nil case scyllaclient.JobRunning: - w.updateDownloadProgress(ctx, pr, job) + w.onDownloadUpdate(ctx, b, pr, job) case scyllaclient.JobNotFound: return errors.New("job not found") } } } -func (w *tablesWorker) updateDownloadProgress(ctx context.Context, pr *RunProgress, job *scyllaclient.RcloneJobProgress) { - // Set StartedAt and CompletedAt based on Job - if t := time.Time(job.StartedAt); !t.IsZero() { - pr.DownloadStartedAt = &t - } - if t := time.Time(job.CompletedAt); !t.IsZero() { - pr.DownloadCompletedAt = &t - } - - pr.Error = job.Error - pr.Downloaded = job.Uploaded - pr.Skipped = job.Skipped - pr.Failed = job.Failed - w.insertRunProgress(ctx, pr) -} - func (w *tablesWorker) restoreSSTables(ctx context.Context, b batch, pr *RunProgress) error { - w.metrics.SetRestoreState(w.run.ClusterID, b.Location, w.run.SnapshotTag, pr.Host, metrics.RestoreStateLoading) - if !validateTimeIsSet(pr.RestoreStartedAt) { - pr.setRestoreStartedAt() - w.insertRunProgress(ctx, pr) - } - + w.onLasStart(ctx, b, pr) err := w.worker.restoreSSTables(ctx, pr.Host, pr.Keyspace, pr.Table, true, true) if err == nil { - pr.setRestoreCompletedAt() - w.insertRunProgress(ctx, pr) + w.onLasEnd(ctx, b, pr) } return err } @@ -141,8 +114,7 @@ func (w *tablesWorker) newRunProgress(ctx context.Context, hi HostInfo, b batch) SSTableID: b.IDs(), VersionedProgress: versionedPr, } - - w.insertRunProgress(ctx, pr) + w.onDownloadStart(ctx, b, pr) return pr, nil } @@ -170,10 +142,6 @@ func (w *tablesWorker) startDownload(ctx context.Context, hi HostInfo, b batch) if err != nil { return 0, 0, errors.Wrap(err, "download batch to upload dir") } - w.logger.Info(ctx, "Started downloading files", - "host", hi.Host, - "job_id", jobID, - ) return jobID, versionedSize, nil } @@ -218,10 +186,75 @@ func (w *tablesWorker) downloadVersioned(ctx context.Context, host, srcDir, dstD return parallel.Run(len(versioned), parallel.NoLimit, f, notify) } -func (w *tablesWorker) decreaseRemainingBytesMetric(b batch) { +func (w *tablesWorker) cleanupRunProgress(ctx context.Context, pr *RunProgress) { + w.deleteRunProgress(ctx, pr) + tn := TableName{ + Keyspace: pr.Keyspace, + Table: pr.Table, + } + if cleanErr := w.cleanUploadDir(ctx, pr.Host, UploadTableDir(pr.Keyspace, pr.Table, w.tableVersion[tn]), nil); cleanErr != nil { + w.logger.Error(ctx, "Couldn't clear destination directory", "host", pr.Host, "error", cleanErr) + } +} + +func (w *tablesWorker) onBatchDispatch(ctx context.Context, b batch, host string) { + w.metrics.IncreaseBatchSize(w.run.ClusterID, host, b.Size) + w.logger.Info(ctx, "Got batch to restore", + "host", host, + "keyspace", b.Keyspace, + "table", b.Table, + "size", b.Size, + "sstable count", len(b.SSTables), + ) +} + +func (w *tablesWorker) onDownloadStart(ctx context.Context, b batch, pr *RunProgress) { + w.metrics.SetRestoreState(w.run.ClusterID, b.Location, w.run.SnapshotTag, pr.Host, metrics.RestoreStateDownloading) + w.logger.Info(ctx, "Started downloading batch", "host", pr.Host, "job_id", pr.AgentJobID) + w.insertRunProgress(ctx, pr) +} + +func (w *tablesWorker) onDownloadUpdate(ctx context.Context, b batch, pr *RunProgress, job *scyllaclient.RcloneJobProgress) { + // As we update metrics on download update, + // we need to remember to update just the delta. + w.metrics.IncreaseRestoreDownloadedBytes(w.run.ClusterID, b.Location.StringWithoutDC(), pr.Host, job.Uploaded-pr.Downloaded) + prevD := timeSub(pr.DownloadStartedAt, pr.DownloadCompletedAt) + if t := time.Time(job.StartedAt); !t.IsZero() { + pr.DownloadStartedAt = &t + } + if t := time.Time(job.CompletedAt); !t.IsZero() { + pr.DownloadCompletedAt = &t + } + currD := timeSub(pr.DownloadStartedAt, pr.DownloadCompletedAt) + w.metrics.IncreaseRestoreDownloadDuration(w.run.ClusterID, b.Location.StringWithoutDC(), pr.Host, currD-prevD) + + pr.Error = job.Error + pr.Downloaded = job.Uploaded + pr.Skipped = job.Skipped + pr.Failed = job.Failed + w.insertRunProgress(ctx, pr) + + if scyllaclient.RcloneJobStatus(job.Status) == scyllaclient.JobSuccess { + w.logger.Info(ctx, "Downloaded batch", "host", pr.Host, "job id", pr.AgentJobID) + } +} + +func (w *tablesWorker) onLasStart(ctx context.Context, b batch, pr *RunProgress) { + w.metrics.SetRestoreState(w.run.ClusterID, b.Location, w.run.SnapshotTag, pr.Host, metrics.RestoreStateLoading) + w.logger.Info(ctx, "Started restoring batch", "host", pr.Host) + pr.setRestoreStartedAt() + w.insertRunProgress(ctx, pr) +} + +func (w *tablesWorker) onLasEnd(ctx context.Context, b batch, pr *RunProgress) { + w.metrics.SetRestoreState(w.run.ClusterID, b.Location, w.target.SnapshotTag, pr.Host, metrics.RestoreStateIdle) + pr.setRestoreCompletedAt() + w.metrics.IncreaseRestoreStreamedBytes(w.run.ClusterID, pr.Host, b.Size) + w.metrics.IncreaseRestoreStreamDuration(w.run.ClusterID, pr.Host, timeSub(pr.RestoreStartedAt, pr.RestoreCompletedAt)) + labels := metrics.RestoreBytesLabels{ - ClusterID: w.run.ClusterID.String(), - SnapshotTag: w.run.SnapshotTag, + ClusterID: b.ClusterID.String(), + SnapshotTag: b.SnapshotTag, Location: b.Location.String(), DC: b.DC, Node: b.NodeID, @@ -229,22 +262,14 @@ func (w *tablesWorker) decreaseRemainingBytesMetric(b batch) { Table: b.Table, } w.metrics.DecreaseRemainingBytes(labels, b.Size) - w.progress.Update(b.Size) progressLabels := metrics.RestoreProgressLabels{ ClusterID: w.run.ClusterID.String(), SnapshotTag: w.run.SnapshotTag, } + w.progress.Update(b.Size) w.metrics.SetProgress(progressLabels, w.progress.CurrentProgress()) -} -func (w *tablesWorker) cleanupRunProgress(ctx context.Context, pr *RunProgress) { - w.deleteRunProgress(ctx, pr) - tn := TableName{ - Keyspace: pr.Keyspace, - Table: pr.Table, - } - if cleanErr := w.cleanUploadDir(ctx, pr.Host, UploadTableDir(pr.Keyspace, pr.Table, w.tableVersion[tn]), nil); cleanErr != nil { - w.logger.Error(ctx, "Couldn't clear destination directory", "host", pr.Host, "error", cleanErr) - } + w.logger.Info(ctx, "Restored batch", "host", pr.Host) + w.insertRunProgress(ctx, pr) }