Skip to content

Commit

Permalink
Make BackedUpItems thread safe
Browse files Browse the repository at this point in the history
Signed-off-by: Scott Seago <[email protected]>
  • Loading branch information
sseago committed Nov 27, 2024
1 parent 3c06fc8 commit d2da99d
Show file tree
Hide file tree
Showing 9 changed files with 151 additions and 44 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/8366-sseago
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Make BackedUpItems thread safe
87 changes: 87 additions & 0 deletions pkg/backup/backed_up_items_map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
Copyright the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package backup

import (
"fmt"
"sort"
"sync"
)

// backedUpItemsMap keeps track of the items already backed up for the current Velero Backup
type backedUpItemsMap struct {
*sync.RWMutex
BackedUpItems map[itemKey]struct{}
}

func NewBackedUpItemsMap() *backedUpItemsMap {
return &backedUpItemsMap{
RWMutex: &sync.RWMutex{},
BackedUpItems: make(map[itemKey]struct{}),
}
}

func (m *backedUpItemsMap) CopyItemList() map[itemKey]struct{} {
m.RLock()
defer m.RUnlock()
returnMap := map[itemKey]struct{}{}
for key, val := range m.BackedUpItems {
returnMap[key] = val
}
return returnMap
}

func (m *backedUpItemsMap) ResourceList() map[string][]string {
m.RLock()
defer m.RUnlock()

resources := map[string][]string{}
for i := range m.BackedUpItems {
entry := i.name
if i.namespace != "" {
entry = fmt.Sprintf("%s/%s", i.namespace, i.name)
}
resources[i.resource] = append(resources[i.resource], entry)
}

// sort namespace/name entries for each GVK
for _, v := range resources {
sort.Strings(v)
}

return resources
}

func (m *backedUpItemsMap) Len() int {
m.RLock()
defer m.RUnlock()
return len(m.BackedUpItems)
}

func (m *backedUpItemsMap) ItemInBackup(key itemKey) bool {
m.RLock()
defer m.RUnlock()

_, exists := m.BackedUpItems[key]
return exists
}

func (m *backedUpItemsMap) AddItem(key itemKey) {
m.Lock()
defer m.Unlock()
m.BackedUpItems[key] = struct{}{}
}
31 changes: 15 additions & 16 deletions pkg/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,6 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
return err
}

backupRequest.BackedUpItems = map[itemKey]struct{}{}

podVolumeTimeout := kb.podVolumeTimeout
if val := backupRequest.Annotations[velerov1api.PodVolumeOperationTimeoutAnnotation]; val != "" {
parsed, err := time.ParseDuration(val)
Expand Down Expand Up @@ -499,20 +497,21 @@ func (kb *kubernetesBackupper) BackupWithResolvers(

// updated total is computed as "how many items we've backed up so far, plus
// how many items we know of that are remaining"
totalItems := len(backupRequest.BackedUpItems) + (len(items) - (i + 1))
backedUpItems := backupRequest.BackedUpItems.Len()
totalItems := backedUpItems + (len(items) - (i + 1))

// send a progress update
update <- progressUpdate{
totalItems: totalItems,
itemsBackedUp: len(backupRequest.BackedUpItems),
itemsBackedUp: backedUpItems,
}

log.WithFields(map[string]interface{}{
"progress": "",
"resource": items[i].groupResource.String(),
"namespace": items[i].namespace,
"name": items[i].name,
}).Infof("Backed up %d items out of an estimated total of %d (estimate will change throughout the backup)", len(backupRequest.BackedUpItems), totalItems)
}).Infof("Backed up %d items out of an estimated total of %d (estimate will change throughout the backup)", backedUpItems, totalItems)
}

// no more progress updates will be sent on the 'update' channel
Expand All @@ -538,8 +537,9 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
if updated.Status.Progress == nil {
updated.Status.Progress = &velerov1api.BackupProgress{}
}
updated.Status.Progress.TotalItems = len(backupRequest.BackedUpItems)
updated.Status.Progress.ItemsBackedUp = len(backupRequest.BackedUpItems)
backedUpItems := backupRequest.BackedUpItems.Len()
updated.Status.Progress.TotalItems = backedUpItems
updated.Status.Progress.ItemsBackedUp = backedUpItems

// update the hooks execution status
if updated.Status.HookStatus == nil {
Expand All @@ -558,8 +558,8 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
log.Infof("Summary for skipped PVs: %s", skippedPVSummary)
}

backupRequest.Status.Progress = &velerov1api.BackupProgress{TotalItems: len(backupRequest.BackedUpItems), ItemsBackedUp: len(backupRequest.BackedUpItems)}
log.WithField("progress", "").Infof("Backed up a total of %d items", len(backupRequest.BackedUpItems))
backupRequest.Status.Progress = &velerov1api.BackupProgress{TotalItems: backedUpItems, ItemsBackedUp: backedUpItems}
log.WithField("progress", "").Infof("Backed up a total of %d items", backedUpItems)

return nil
}
Expand Down Expand Up @@ -667,7 +667,7 @@ func (kb *kubernetesBackupper) backupItemBlock(itemBlock BackupItemBlock) []sche
continue
}
// Don't run hooks if pod has already been backed up
if _, exists := itemBlock.itemBackupper.backupRequest.BackedUpItems[key]; !exists {
if !itemBlock.itemBackupper.backupRequest.BackedUpItems.ItemInBackup(key) {
preHookPods = append(preHookPods, item)
}
}
Expand All @@ -681,7 +681,7 @@ func (kb *kubernetesBackupper) backupItemBlock(itemBlock BackupItemBlock) []sche
itemBlock.Log.WithError(errors.WithStack(err)).Error("Error accessing pod metadata")
continue
}
itemBlock.itemBackupper.backupRequest.BackedUpItems[key] = struct{}{}
itemBlock.itemBackupper.backupRequest.BackedUpItems.AddItem(key)
}

itemBlock.Log.Debug("Backing up items in BackupItemBlock")
Expand Down Expand Up @@ -861,8 +861,6 @@ func (kb *kubernetesBackupper) FinalizeBackup(
return err
}

backupRequest.BackedUpItems = map[itemKey]struct{}{}

// set up a temp dir for the itemCollector to use to temporarily
// store items as they're scraped from the API.
tempDir, err := os.MkdirTemp("", "")
Expand Down Expand Up @@ -947,14 +945,15 @@ func (kb *kubernetesBackupper) FinalizeBackup(

// updated total is computed as "how many items we've backed up so far, plus
// how many items we know of that are remaining"
totalItems := len(backupRequest.BackedUpItems) + (len(items) - (i + 1))
backedUpItems := backupRequest.BackedUpItems.Len()
totalItems := backedUpItems + (len(items) - (i + 1))

Check warning on line 949 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L948-L949

Added lines #L948 - L949 were not covered by tests

log.WithFields(map[string]interface{}{
"progress": "",
"resource": item.groupResource.String(),
"namespace": item.namespace,
"name": item.name,
}).Infof("Updated %d items out of an estimated total of %d (estimate will change throughout the backup finalizer)", len(backupRequest.BackedUpItems), totalItems)
}).Infof("Updated %d items out of an estimated total of %d (estimate will change throughout the backup finalizer)", backedUpItems, totalItems)

Check warning on line 956 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L956

Added line #L956 was not covered by tests
}

volumeInfos, err := backupStore.GetBackupVolumeInfos(backupRequest.Backup.Name)
Expand All @@ -979,7 +978,7 @@ func (kb *kubernetesBackupper) FinalizeBackup(
return err
}

log.WithField("progress", "").Infof("Updated a total of %d items", len(backupRequest.BackedUpItems))
log.WithField("progress", "").Infof("Updated a total of %d items", backupRequest.BackedUpItems.Len())

Check warning on line 981 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L981

Added line #L981 was not covered by tests

return nil
}
Expand Down
Loading

0 comments on commit d2da99d

Please sign in to comment.