From 503bd5c5c57dd99701fae3e71aa90ba0f47d5f31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Wed, 2 Oct 2024 20:24:36 +0200 Subject: [PATCH] feat(restore): index, log workload info Workload info contains location/table/remote sstable dir sstable count, total size, max and average sstable size. --- pkg/service/restore/index.go | 69 +++++++++++++++++++++++++++++++++--- 1 file changed, 64 insertions(+), 5 deletions(-) diff --git a/pkg/service/restore/index.go b/pkg/service/restore/index.go index 140cfdf80..dd7b7b72c 100644 --- a/pkg/service/restore/index.go +++ b/pkg/service/restore/index.go @@ -74,12 +74,14 @@ func (w *tablesWorker) indexLocationWorkload(ctx context.Context, location Locat return LocationWorkload{}, errors.Wrap(err, "create remote dir workloads") } if w.target.Continue { - rawWorkload, err = w.filterPreviouslyRestoredSStables(rawWorkload) + rawWorkload, err = w.filterPreviouslyRestoredSStables(ctx, rawWorkload) if err != nil { return LocationWorkload{}, errors.Wrap(err, "filter already restored sstables") } } - return aggregateLocationWorkload(rawWorkload), nil + workload := aggregateLocationWorkload(rawWorkload) + w.logWorkloadInfo(ctx, workload) + return workload, nil } func (w *tablesWorker) createRemoteDirWorkloads(ctx context.Context, location Location) ([]RemoteDirWorkload, error) { @@ -115,7 +117,9 @@ func (w *tablesWorker) createRemoteDirWorkloads(ctx context.Context, location Lo Size: size, SSTables: remoteSSTables, } - rawWorkload = append(rawWorkload, workload) + if size > 0 { + rawWorkload = append(rawWorkload, workload) + } return nil }) }) @@ -125,7 +129,9 @@ func (w *tablesWorker) createRemoteDirWorkloads(ctx context.Context, location Lo return rawWorkload, nil } -func (w *tablesWorker) filterPreviouslyRestoredSStables(rawWorkload []RemoteDirWorkload) ([]RemoteDirWorkload, error) { +func (w *tablesWorker) filterPreviouslyRestoredSStables(ctx context.Context, rawWorkload []RemoteDirWorkload) ([]RemoteDirWorkload, error) { + w.logger.Info(ctx, "Filter out previously restored sstables") + remoteSSTableDirToRestoredIDs := make(map[string][]string) err := forEachProgress(w.session, w.run.ClusterID, w.run.TaskID, w.run.ID, func(pr *RunProgress) { if validateTimeIsSet(pr.RestoreCompletedAt) { @@ -139,7 +145,11 @@ func (w *tablesWorker) filterPreviouslyRestoredSStables(rawWorkload []RemoteDirW return rawWorkload, nil } - var filtered []RemoteDirWorkload + var ( + filtered []RemoteDirWorkload + skippedCount int + skippedSize int64 + ) for _, rw := range rawWorkload { var filteredSSTables []RemoteSSTable var size int64 @@ -147,6 +157,9 @@ func (w *tablesWorker) filterPreviouslyRestoredSStables(rawWorkload []RemoteDirW if !slices.Contains(remoteSSTableDirToRestoredIDs[rw.RemoteSSTableDir], sst.ID) { filteredSSTables = append(filteredSSTables, sst) size += sst.Size + } else { + skippedCount++ + skippedSize += sst.Size } } if len(filteredSSTables) > 0 { @@ -157,9 +170,12 @@ func (w *tablesWorker) filterPreviouslyRestoredSStables(rawWorkload []RemoteDirW Size: size, SSTables: filteredSSTables, }) + } else { + w.logger.Info(ctx, "Completely filtered out remote sstable dir", "remote dir", rw.RemoteSSTableDir) } } + w.logger.Info(ctx, "Filtered out sstables info", "count", skippedCount, "size", skippedSize) return filtered, nil } @@ -200,6 +216,49 @@ func (w *tablesWorker) initMetrics(workload []LocationWorkload) { }, float64(totalSize-workloadSize)/float64(totalSize)*100) } +func (w *tablesWorker) logWorkloadInfo(ctx context.Context, workload LocationWorkload) { + if workload.Size == 0 { + return + } + var locMax, locCnt int64 + for _, twl := range workload.Tables { + if twl.Size == 0 { + continue + } + var tabMax, tabCnt int64 + for _, rdwl := range twl.RemoteDirs { + if rdwl.Size == 0 { + continue + } + var dirMax int64 + for _, sst := range rdwl.SSTables { + dirMax = max(dirMax, sst.Size) + } + dirCnt := int64(len(rdwl.SSTables)) + w.logger.Info(ctx, "Remote sstable dir workload info", + "path", rdwl.RemoteSSTableDir, + "max size", dirMax, + "average size", rdwl.Size/dirCnt, + "count", dirCnt) + tabCnt += dirCnt + tabMax = max(tabMax, dirMax) + } + w.logger.Info(ctx, "Table workload info", + "keyspace", twl.Keyspace, + "table", twl.Table, + "max size", tabMax, + "average size", twl.Size/tabCnt, + "count", tabCnt) + locCnt += tabCnt + locMax = max(locMax, tabMax) + } + w.logger.Info(ctx, "Location workload info", + "location", workload.Location.String(), + "max size", locMax, + "average size", workload.Size/locCnt, + "count", locCnt) +} + func aggregateLocationWorkload(rawWorkload []RemoteDirWorkload) LocationWorkload { remoteDirWorkloads := make(map[TableName][]RemoteDirWorkload) for _, rw := range rawWorkload {