Skip to content

Commit

Permalink
feat(restore): index, log workload info
Browse files Browse the repository at this point in the history
Workload info contains location/table/remote sstable dir sstable count,
total size, max and average sstable size.
  • Loading branch information
Michal-Leszczynski committed Oct 3, 2024
1 parent ad8a33c commit 503bd5c
Showing 1 changed file with 64 additions and 5 deletions.
69 changes: 64 additions & 5 deletions pkg/service/restore/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,14 @@ func (w *tablesWorker) indexLocationWorkload(ctx context.Context, location Locat
return LocationWorkload{}, errors.Wrap(err, "create remote dir workloads")
}
if w.target.Continue {
rawWorkload, err = w.filterPreviouslyRestoredSStables(rawWorkload)
rawWorkload, err = w.filterPreviouslyRestoredSStables(ctx, rawWorkload)
if err != nil {
return LocationWorkload{}, errors.Wrap(err, "filter already restored sstables")
}
}
return aggregateLocationWorkload(rawWorkload), nil
workload := aggregateLocationWorkload(rawWorkload)
w.logWorkloadInfo(ctx, workload)
return workload, nil
}

func (w *tablesWorker) createRemoteDirWorkloads(ctx context.Context, location Location) ([]RemoteDirWorkload, error) {
Expand Down Expand Up @@ -115,7 +117,9 @@ func (w *tablesWorker) createRemoteDirWorkloads(ctx context.Context, location Lo
Size: size,
SSTables: remoteSSTables,
}
rawWorkload = append(rawWorkload, workload)
if size > 0 {
rawWorkload = append(rawWorkload, workload)
}
return nil
})
})
Expand All @@ -125,7 +129,9 @@ func (w *tablesWorker) createRemoteDirWorkloads(ctx context.Context, location Lo
return rawWorkload, nil
}

func (w *tablesWorker) filterPreviouslyRestoredSStables(rawWorkload []RemoteDirWorkload) ([]RemoteDirWorkload, error) {
func (w *tablesWorker) filterPreviouslyRestoredSStables(ctx context.Context, rawWorkload []RemoteDirWorkload) ([]RemoteDirWorkload, error) {
w.logger.Info(ctx, "Filter out previously restored sstables")

remoteSSTableDirToRestoredIDs := make(map[string][]string)
err := forEachProgress(w.session, w.run.ClusterID, w.run.TaskID, w.run.ID, func(pr *RunProgress) {
if validateTimeIsSet(pr.RestoreCompletedAt) {
Expand All @@ -139,14 +145,21 @@ func (w *tablesWorker) filterPreviouslyRestoredSStables(rawWorkload []RemoteDirW
return rawWorkload, nil
}

var filtered []RemoteDirWorkload
var (
filtered []RemoteDirWorkload
skippedCount int
skippedSize int64
)
for _, rw := range rawWorkload {
var filteredSSTables []RemoteSSTable
var size int64
for _, sst := range rw.SSTables {
if !slices.Contains(remoteSSTableDirToRestoredIDs[rw.RemoteSSTableDir], sst.ID) {
filteredSSTables = append(filteredSSTables, sst)
size += sst.Size
} else {
skippedCount++
skippedSize += sst.Size
}
}
if len(filteredSSTables) > 0 {
Expand All @@ -157,9 +170,12 @@ func (w *tablesWorker) filterPreviouslyRestoredSStables(rawWorkload []RemoteDirW
Size: size,
SSTables: filteredSSTables,
})
} else {
w.logger.Info(ctx, "Completely filtered out remote sstable dir", "remote dir", rw.RemoteSSTableDir)
}
}

w.logger.Info(ctx, "Filtered out sstables info", "count", skippedCount, "size", skippedSize)
return filtered, nil
}

Expand Down Expand Up @@ -200,6 +216,49 @@ func (w *tablesWorker) initMetrics(workload []LocationWorkload) {
}, float64(totalSize-workloadSize)/float64(totalSize)*100)
}

func (w *tablesWorker) logWorkloadInfo(ctx context.Context, workload LocationWorkload) {
if workload.Size == 0 {
return
}
var locMax, locCnt int64
for _, twl := range workload.Tables {
if twl.Size == 0 {
continue
}
var tabMax, tabCnt int64
for _, rdwl := range twl.RemoteDirs {
if rdwl.Size == 0 {
continue
}
var dirMax int64
for _, sst := range rdwl.SSTables {
dirMax = max(dirMax, sst.Size)
}
dirCnt := int64(len(rdwl.SSTables))
w.logger.Info(ctx, "Remote sstable dir workload info",
"path", rdwl.RemoteSSTableDir,
"max size", dirMax,
"average size", rdwl.Size/dirCnt,
"count", dirCnt)
tabCnt += dirCnt
tabMax = max(tabMax, dirMax)
}
w.logger.Info(ctx, "Table workload info",
"keyspace", twl.Keyspace,
"table", twl.Table,
"max size", tabMax,
"average size", twl.Size/tabCnt,
"count", tabCnt)
locCnt += tabCnt
locMax = max(locMax, tabMax)
}
w.logger.Info(ctx, "Location workload info",
"location", workload.Location.String(),
"max size", locMax,
"average size", workload.Size/locCnt,
"count", locCnt)
}

func aggregateLocationWorkload(rawWorkload []RemoteDirWorkload) LocationWorkload {
remoteDirWorkloads := make(map[TableName][]RemoteDirWorkload)
for _, rw := range rawWorkload {
Expand Down

0 comments on commit 503bd5c

Please sign in to comment.