Skip to content

Commit

Permalink
do not remove state files bigger than maximin(domainsTxNum) (#10510)
Browse files Browse the repository at this point in the history
touches #10317
  • Loading branch information
awskii authored May 28, 2024
1 parent 708078a commit 1822132
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 25 deletions.
35 changes: 10 additions & 25 deletions erigon-lib/state/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,15 +185,15 @@ func (d *Domain) openList(names []string, readonly bool) error {
return fmt.Errorf("Domain.openList: %w, %s", err, d.filenameBase)
}
d.reCalcVisibleFiles()
d.protectFromHistoryFilesAheadOfDomainFiles(readonly)
d.protectFromHistoryFilesAheadOfDomainFiles()
return nil
}

// protectFromHistoryFilesAheadOfDomainFiles - in some corner-cases app may see more .ef/.v files than .kv:
// - `kill -9` in the middle of `buildFiles()`, then `rm -f db` (restore from backup)
// - `kill -9` in the middle of `buildFiles()`, then `stage_exec --reset` (drop progress - as a hot-fix)
func (d *Domain) protectFromHistoryFilesAheadOfDomainFiles(readonly bool) {
d.removeFilesAfterStep(d.dirtyFilesEndTxNumMinimax()/d.aggregationStep, readonly)
func (d *Domain) protectFromHistoryFilesAheadOfDomainFiles() {
d.closeFilesAfterStep(d.dirtyFilesEndTxNumMinimax() / d.aggregationStep)
}

func (d *Domain) OpenFolder(readonly bool) error {
Expand All @@ -215,7 +215,7 @@ func (d *Domain) GetAndResetStats() DomainStats {
return r
}

func (d *Domain) removeFilesAfterStep(lowerBound uint64, readonly bool) {
func (d *Domain) closeFilesAfterStep(lowerBound uint64) {
var toDelete []*filesItem
d.dirtyFiles.Scan(func(item *filesItem) bool {
if item.startTxNum/d.aggregationStep >= lowerBound {
Expand All @@ -225,13 +225,8 @@ func (d *Domain) removeFilesAfterStep(lowerBound uint64, readonly bool) {
})
for _, item := range toDelete {
d.dirtyFiles.Delete(item)
if !readonly {
log.Debug(fmt.Sprintf("[snapshots] delete %s, because step %d has not enough files (was not complete). stack: %s", item.decompressor.FileName(), lowerBound, dbg.Stack()))
item.closeFilesAndRemove()
} else {
log.Debug(fmt.Sprintf("[snapshots] closing %s, because step %d has not enough files (was not complete). stack: %s", item.decompressor.FileName(), lowerBound, dbg.Stack()))
item.closeFiles()
}
log.Debug(fmt.Sprintf("[snapshots] closing %s, because step %d has not enough files (was not complete). stack: %s", item.decompressor.FileName(), lowerBound, dbg.Stack()))
item.closeFiles()
}

toDelete = toDelete[:0]
Expand All @@ -243,13 +238,8 @@ func (d *Domain) removeFilesAfterStep(lowerBound uint64, readonly bool) {
})
for _, item := range toDelete {
d.History.dirtyFiles.Delete(item)
if !readonly {
log.Debug(fmt.Sprintf("[snapshots] deleting some histor files - because step %d has not enough files (was not complete)", lowerBound))
item.closeFilesAndRemove()
} else {
log.Debug(fmt.Sprintf("[snapshots] closing some histor files - because step %d has not enough files (was not complete)", lowerBound))
item.closeFiles()
}
log.Debug(fmt.Sprintf("[snapshots] closing some histor files - because step %d has not enough files (was not complete)", lowerBound))
item.closeFiles()
}

toDelete = toDelete[:0]
Expand All @@ -261,13 +251,8 @@ func (d *Domain) removeFilesAfterStep(lowerBound uint64, readonly bool) {
})
for _, item := range toDelete {
d.History.InvertedIndex.dirtyFiles.Delete(item)
if !readonly {
log.Debug(fmt.Sprintf("[snapshots] delete %s, because step %d has not enough files (was not complete)", item.decompressor.FileName(), lowerBound))
item.closeFilesAndRemove()
} else {
log.Debug(fmt.Sprintf("[snapshots] closing %s, because step %d has not enough files (was not complete)", item.decompressor.FileName(), lowerBound))
item.closeFiles()
}
log.Debug(fmt.Sprintf("[snapshots] closing %s, because step %d has not enough files (was not complete)", item.decompressor.FileName(), lowerBound))
item.closeFiles()
}
}

Expand Down
98 changes: 98 additions & 0 deletions erigon-lib/state/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ import (
"encoding/binary"
"encoding/hex"
"fmt"
"io/fs"
"math"
"math/rand"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -980,6 +982,102 @@ func TestDomain_PruneOnWrite(t *testing.T) {

}

func TestDomain_OpenFilesWithDeletions(t *testing.T) {
logger := log.New()
keyCount, txCount := uint64(4), uint64(125)
db, dom, data := filledDomainFixedSize(t, keyCount, txCount, 16, logger)
defer db.Close()
clear(data)

logEvery := time.NewTicker(30 * time.Second)
defer logEvery.Stop()
ctx := context.Background()

err := db.Update(ctx, func(tx kv.RwTx) error {
for step := uint64(0); step < txCount/dom.aggregationStep-1; step++ {
s, ns := step*dom.aggregationStep, (step+1)*dom.aggregationStep
c, err := dom.collate(ctx, step, s, ns, tx)
require.NoError(t, err)
sf, err := dom.buildFiles(ctx, step, c, background.NewProgressSet())
require.NoError(t, err)
dom.integrateDirtyFiles(sf, s, ns)
dom.reCalcVisibleFiles()

dc := dom.BeginFilesRo()
_, err = dc.Prune(ctx, tx, step, s, ns, math.MaxUint64, false, logEvery)
dc.Close()
require.NoError(t, err)
}
return nil
})

require.NoError(t, err)

run1Doms, run1Hist := make([]string, 0), make([]string, 0)
for i := 0; i < len(dom._visibleFiles); i++ {
run1Doms = append(run1Doms, dom._visibleFiles[i].src.decompressor.FileName())
// should be equal length
run1Hist = append(run1Hist, dom.History._visibleFiles[i].src.decompressor.FileName())
}

removedHist := make(map[string]struct{})
for i := len(dom.History._visibleFiles) - 1; i > 3; i-- {
removedHist[dom.History._visibleFiles[i].src.decompressor.FileName()] = struct{}{}
t.Logf("rm hist: %s\n", dom.History._visibleFiles[i].src.decompressor.FileName())

dom.History._visibleFiles[i].src.closeFilesAndRemove()
}
dom.Close()

err = dom.OpenFolder(false)
require.NoError(t, err)

// domain files for same range should not be available so lengths should match
require.Len(t, dom._visibleFiles, len(run1Doms)-len(removedHist))
require.Len(t, dom.History._visibleFiles, len(dom._visibleFiles))
require.Len(t, dom.History._visibleFiles, len(run1Hist)-len(removedHist))

for i := 0; i < len(dom._visibleFiles); i++ {
require.EqualValuesf(t, run1Doms[i], dom._visibleFiles[i].src.decompressor.FileName(), "kv i=%d", i)
require.EqualValuesf(t, run1Hist[i], dom.History._visibleFiles[i].src.decompressor.FileName(), " v i=%d", i)
}

danglingDomains := make(map[string]bool, len(removedHist))
for i := len(run1Doms) - len(removedHist); i < len(run1Doms); i++ {
t.Logf("dangling: %s\n", run1Doms[i])
danglingDomains[run1Doms[i]] = false
}

//dom.dirtyFiles.Walk(func(items []*filesItem) bool {
// for _, item := range items {
// if _, found := danglingDomains[item.decompressor.FileName()]; found {
// danglingDomains[item.decompressor.FileName()] = true
// }
// }
// return true
//})
//
//for f, persists := range danglingDomains {
// require.True(t, persists, f)
//}

// check files persist on the disk
persistingDomains := make(map[string]bool, 0)
err = fs.WalkDir(os.DirFS(dom.dirs.SnapDomain), ".", func(path string, d fs.DirEntry, err error) error {
persistingDomains[filepath.Base(path)] = false
return nil
})
require.NoError(t, err)

// check all "invalid" kv files persists on disk
for fname := range danglingDomains {
_, found := persistingDomains[fname]
require.True(t, found, fname)
}

dom.Close()
}

func TestScanStaticFilesD(t *testing.T) {

ii := &Domain{History: &History{InvertedIndex: emptyTestInvertedIndex(1)},
Expand Down

0 comments on commit 1822132

Please sign in to comment.