Skip to content

Commit

Permalink
feat(restore): add bandwidth metrics (#4081)
Browse files Browse the repository at this point in the history
* refactor(restore): separate methods for updating metrics/progress

This should make it easier to see what is updated where and when.

* feat(metrics): restore, add bandwidth metrics

They are really useful for evaluating restore performance.

* feat(restore): set download/stream bytes/duration metrics

It's useful for checking/tracking restore performance.

Ref #4042

* fix(restore): don't initialize metrics twice

This was a left-over from the PR introducing
indexing (14aef7b). It also initialized metrics
as a part of the indexing procedure, but it
forgot to remove the previous metrics initialization
from the code.

* fix(restore): use backup bluster ID in remaining_bytes metric

There was a confusion about which cluster ID should
be used for labeling remaining_bytes metric.
When setting remaining_bytes, we used backup cluster ID,
but when decreasing, we used restore cluster ID.
Backup cluster ID should be used in both places
as this metrics describes how many bytes from
which place are yet to be restored. Since we use backup
cluster DC, node ID, etc., we should also use backup
cluster ID.
  • Loading branch information
Michal-Leszczynski authored Oct 30, 2024
1 parent 2c9c18d commit 07ff683
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 122 deletions.
68 changes: 60 additions & 8 deletions pkg/metrics/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,24 @@
package metrics

import (
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
"github.com/scylladb/scylla-manager/v3/pkg/util/uuid"
)

type RestoreMetrics struct {
batchSize *prometheus.GaugeVec
remainingBytes *prometheus.GaugeVec
state *prometheus.GaugeVec
progress *prometheus.GaugeVec
viewBuildStatus *prometheus.GaugeVec
batchSize *prometheus.GaugeVec
remainingBytes *prometheus.GaugeVec
state *prometheus.GaugeVec
downloadedBytes *prometheus.GaugeVec
downloadDuration *prometheus.GaugeVec
streamedBytes *prometheus.GaugeVec
streamDuration *prometheus.GaugeVec
progress *prometheus.GaugeVec
viewBuildStatus *prometheus.GaugeVec
}

func NewRestoreMetrics() RestoreMetrics {
Expand All @@ -24,9 +30,13 @@ func NewRestoreMetrics() RestoreMetrics {
batchSize: g("Cumulative size of the batches of files taken by the host to restore the data.", "batch_size", "cluster", "host"),
remainingBytes: g("Remaining bytes of backup to be restored yet.", "remaining_bytes",
"cluster", "snapshot_tag", "location", "dc", "node", "keyspace", "table"),
state: g("Defines current state of the restore process (idle/download/load/error).", "state", "cluster", "location", "snapshot_tag", "host"),
progress: g("Defines current progress of the restore process.", "progress", "cluster", "snapshot_tag"),
viewBuildStatus: g("Defines build status of recreated view.", "view_build_status", "cluster", "keyspace", "view"),
state: g("Defines current state of the restore process (idle/download/load/error).", "state", "cluster", "location", "snapshot_tag", "host"),
downloadedBytes: g("Downloaded bytes", "downloaded_bytes", "cluster", "location", "host"),
downloadDuration: g("Download duration in ms", "download_duration", "cluster", "location", "host"),
streamedBytes: g("Load&Streamed bytes", "streamed_bytes", "cluster", "host"),
streamDuration: g("Load&Stream duration in ms", "stream_duration", "cluster", "host"),
progress: g("Defines current progress of the restore process.", "progress", "cluster", "snapshot_tag"),
viewBuildStatus: g("Defines build status of recreated view.", "view_build_status", "cluster", "keyspace", "view"),
}
}

Expand All @@ -41,6 +51,10 @@ func (m RestoreMetrics) all() []prometheus.Collector {
m.batchSize,
m.remainingBytes,
m.state,
m.downloadedBytes,
m.downloadDuration,
m.streamedBytes,
m.streamDuration,
m.progress,
m.viewBuildStatus,
}
Expand Down Expand Up @@ -154,6 +168,44 @@ func (m RestoreMetrics) SetRestoreState(clusterID uuid.UUID, location backupspec
m.state.With(l).Set(float64(state))
}

// IncreaseRestoreDownloadedBytes increases restore "downloaded_bytes" metric.
func (m RestoreMetrics) IncreaseRestoreDownloadedBytes(clusterID uuid.UUID, location, host string, bytes int64) {
l := prometheus.Labels{
"cluster": clusterID.String(),
"location": location,
"host": host,
}
m.downloadedBytes.With(l).Add(float64(bytes))
}

// IncreaseRestoreDownloadDuration increases restore "download_duration" metric.
func (m RestoreMetrics) IncreaseRestoreDownloadDuration(clusterID uuid.UUID, location, host string, d time.Duration) {
l := prometheus.Labels{
"cluster": clusterID.String(),
"location": location,
"host": host,
}
m.downloadDuration.With(l).Add(float64(d.Milliseconds()))
}

// IncreaseRestoreStreamedBytes increases restore "streamed_bytes" metric.
func (m RestoreMetrics) IncreaseRestoreStreamedBytes(clusterID uuid.UUID, host string, bytes int64) {
l := prometheus.Labels{
"cluster": clusterID.String(),
"host": host,
}
m.streamedBytes.With(l).Add(float64(bytes))
}

// IncreaseRestoreStreamDuration increases restore "stream_duration" metric.
func (m RestoreMetrics) IncreaseRestoreStreamDuration(clusterID uuid.UUID, host string, d time.Duration) {
l := prometheus.Labels{
"cluster": clusterID.String(),
"host": host,
}
m.streamDuration.With(l).Add(float64(d.Milliseconds()))
}

// ViewBuildStatus defines build status of a view.
type ViewBuildStatus int

Expand Down
15 changes: 15 additions & 0 deletions pkg/service/restore/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/scylladb/gocqlx/v2/qb"
"github.com/scylladb/scylla-manager/v3/pkg/schema/table"
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
"github.com/scylladb/scylla-manager/v3/pkg/util/timeutc"
"github.com/scylladb/scylla-manager/v3/pkg/util/uuid"
)

Expand Down Expand Up @@ -203,3 +204,17 @@ func forEachProgress(s gocqlx.Session, clusterID, taskID, runID uuid.UUID, cb fu
q.Release()
return err
}

// Returns duration between end and start.
// If start is nil, returns 0.
// If end is nil, returns duration between now and start.
func timeSub(start, end *time.Time) time.Duration {
if start != nil {
endV := timeutc.Now()
if end != nil {
endV = *end
}
return endV.Sub(*start)
}
return 0
}
64 changes: 1 addition & 63 deletions pkg/service/restore/tables_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/pkg/errors"
"github.com/scylladb/go-set/strset"
"github.com/scylladb/scylla-manager/v3/pkg/metrics"
. "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
"github.com/scylladb/scylla-manager/v3/pkg/service/repair"
"github.com/scylladb/scylla-manager/v3/pkg/util/parallel"
Expand Down Expand Up @@ -91,11 +90,6 @@ func newTablesWorker(w worker, repairSvc *repair.Service, totalBytes int64) (*ta

// restore files from every location specified in restore target.
func (w *tablesWorker) restore(ctx context.Context) error {
// Init metrics only on fresh start
if w.run.PrevID == uuid.Nil {
w.initRestoreMetrics(ctx)
}

stageFunc := map[Stage]func() error{
StageDropViews: func() error {
for _, v := range w.run.Views {
Expand Down Expand Up @@ -235,14 +229,7 @@ func (w *tablesWorker) stageRestoreData(ctx context.Context) error {
w.logger.Info(ctx, "No more batches to restore", "host", hi.Host)
return nil
}
w.metrics.IncreaseBatchSize(w.run.ClusterID, hi.Host, b.Size)
w.logger.Info(ctx, "Got batch to restore",
"host", hi.Host,
"keyspace", b.Keyspace,
"table", b.Table,
"size", b.Size,
"sstable count", len(b.SSTables),
)
w.onBatchDispatch(ctx, b, host)

pr, err := w.newRunProgress(ctx, hi, b)
if err != nil {
Expand All @@ -260,7 +247,6 @@ func (w *tablesWorker) stageRestoreData(ctx context.Context) error {
continue
}
bd.ReportSuccess(b)
w.decreaseRemainingBytesMetric(b)
}
}

Expand Down Expand Up @@ -314,54 +300,6 @@ func (w *tablesWorker) stageRepair(ctx context.Context) error {
return w.repairSvc.Repair(ctx, w.run.ClusterID, w.run.RepairTaskID, repairRunID, repairTarget)
}

func (w *tablesWorker) initRestoreMetrics(ctx context.Context) {
for _, location := range w.target.Location {
err := w.forEachManifest(
ctx,
location,
func(miwc ManifestInfoWithContent) error {
sizePerTableAndKeyspace := make(map[string]map[string]int64)
err := miwc.ForEachIndexIterWithError(
nil,
func(fm FilesMeta) error {
if !unitsContainTable(w.run.Units, fm.Keyspace, fm.Table) {
return nil
}

if sizePerTableAndKeyspace[fm.Keyspace] == nil {
sizePerTableAndKeyspace[fm.Keyspace] = make(map[string]int64)
}
sizePerTableAndKeyspace[fm.Keyspace][fm.Table] += fm.Size
return nil
})
for kspace, sizePerTable := range sizePerTableAndKeyspace {
for table, size := range sizePerTable {
labels := metrics.RestoreBytesLabels{
ClusterID: w.run.ClusterID.String(),
SnapshotTag: w.target.SnapshotTag,
Location: location.String(),
DC: miwc.DC,
Node: miwc.NodeID,
Keyspace: kspace,
Table: table,
}
w.metrics.SetRemainingBytes(labels, size)
}
}
return err
})
progressLabels := metrics.RestoreProgressLabels{
ClusterID: w.run.ClusterID.String(),
SnapshotTag: w.target.SnapshotTag,
}
w.metrics.SetProgress(progressLabels, 0)
if err != nil {
w.logger.Info(ctx, "Couldn't count restore data size")
continue
}
}
}

// Disables auto compaction on all provided hosts and units.
func (w *tablesWorker) setAutoCompaction(ctx context.Context, hosts []string, enabled bool) error {
f := w.client.EnableAutoCompaction
Expand Down
Loading

0 comments on commit 07ff683

Please sign in to comment.