Skip to content

Commit

Permalink
Use the new snapshot collector [#3424]
Browse files Browse the repository at this point in the history
  • Loading branch information
firelizzard18 committed Oct 9, 2023
1 parent 072255c commit 4976c54
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 27 deletions.
2 changes: 1 addition & 1 deletion cmd/accumulated/cmd_init_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"time"

tmed25519 "github.com/cometbft/cometbft/crypto/ed25519"
"github.com/ghodss/yaml"
"github.com/spf13/cobra"
"gitlab.com/accumulatenetwork/accumulate/exp/faucet"
"gitlab.com/accumulatenetwork/accumulate/internal/core"
Expand All @@ -25,7 +26,6 @@ import (
accumulated "gitlab.com/accumulatenetwork/accumulate/internal/node/daemon"
ioutil2 "gitlab.com/accumulatenetwork/accumulate/internal/util/io"
"gitlab.com/accumulatenetwork/accumulate/protocol"
"gopkg.in/yaml.v3"
)

var cmdInitNetwork = &cobra.Command{
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ require (
github.com/fatih/structtag v1.2.0 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/fzipp/gocyclo v0.6.0 // indirect
github.com/ghodss/yaml v1.0.0
github.com/go-critic/go-critic v0.9.0 // indirect
github.com/go-kit/kit v0.12.0 // indirect
github.com/go-playground/locales v0.14.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/fzipp/gocyclo v0.6.0 h1:lsblElZG7d3ALtGMx9fmxeTKZaLLpU8mET09yN4BBLo=
github.com/fzipp/gocyclo v0.6.0/go.mod h1:rXPyn8fnlpa0R2csP/31uerbiVBugk5whMdlyaLkLoA=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/gliderlabs/ssh v0.1.1/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0=
github.com/gliderlabs/ssh v0.2.2/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0=
Expand Down
39 changes: 21 additions & 18 deletions internal/database/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,20 @@ type CollectMetrics struct {
}
}

// Collect collects a snapshot of the database.
//
// Collect is a wrapper around [Batch.Collect].
func (db *Database) Collect(file io.WriteSeeker, partition *url.URL, opts *CollectOptions) error {
batch := db.Begin(false)
defer batch.Discard()
return batch.Collect(file, partition, opts)
}

// Collect collects a snapshot of the database.
//
// WARNING: If the batch is nested (if it is not a root batch), Collect may
// cause excessive memory consumption until the root batch is discarded.
func (batch *Batch) Collect(file io.WriteSeeker, partition *url.URL, opts *CollectOptions) error {
if opts == nil {
opts = new(CollectOptions)
}
Expand All @@ -57,7 +70,7 @@ func (db *Database) Collect(file io.WriteSeeker, partition *url.URL, opts *Colle
}

// Write the header
err = db.writeSnapshotHeader(w, partition, opts)
err = batch.writeSnapshotHeader(w, partition, opts)
if err != nil {
return errors.UnknownError.Wrap(err)
}
Expand All @@ -70,7 +83,7 @@ func (db *Database) Collect(file io.WriteSeeker, partition *url.URL, opts *Colle
}

// Collect the BPT
err = db.collectBPT(w, opts)
err = batch.collectBPT(w, opts)
if err != nil {
return errors.UnknownError.Wrap(err)
}
Expand Down Expand Up @@ -102,13 +115,13 @@ func (db *Database) Collect(file io.WriteSeeker, partition *url.URL, opts *Colle
}

// Collect accounts
err = db.collectAccounts(w, index, hashes, opts)
err = batch.collectAccounts(w, index, hashes, opts)
if err != nil {
return errors.UnknownError.Wrap(err)
}

// Collect messages
err = db.collectMessages(w, index, hashes, opts)
err = batch.collectMessages(w, index, hashes, opts)
if err != nil {
return errors.UnknownError.Wrap(err)
}
Expand All @@ -122,12 +135,10 @@ func (db *Database) Collect(file io.WriteSeeker, partition *url.URL, opts *Colle
return nil
}

func (db *Database) writeSnapshotHeader(w *snapshot.Writer, partition *url.URL, opts *CollectOptions) error {
func (batch *Batch) writeSnapshotHeader(w *snapshot.Writer, partition *url.URL, opts *CollectOptions) error {
header := new(snapshot.Header)

// Load the BPT root hash
batch := db.Begin(false)
defer batch.Discard()
var err error
header.RootHash, err = batch.GetBptRootHash()
if err != nil {
Expand All @@ -151,16 +162,14 @@ func (db *Database) writeSnapshotHeader(w *snapshot.Writer, partition *url.URL,
return nil
}

func (db *Database) collectAccounts(w *snapshot.Writer, index, hashes *indexing.Bucket, opts *CollectOptions) error {
func (batch *Batch) collectAccounts(w *snapshot.Writer, index, hashes *indexing.Bucket, opts *CollectOptions) error {
// Open a records section
records, err := w.OpenRecords()
if err != nil {
return errors.UnknownError.Wrap(err)
}

// Iterate over the BPT and collect accounts
batch := db.Begin(false)
defer batch.Discard()
it := batch.IterateAccounts()
copts := collectOptions(index, opts)
for it.Next() {
Expand Down Expand Up @@ -197,16 +206,13 @@ func (db *Database) collectAccounts(w *snapshot.Writer, index, hashes *indexing.
return errors.UnknownError.Wrap(err)
}

func (db *Database) collectMessages(w *snapshot.Writer, index, hashes *indexing.Bucket, opts *CollectOptions) error {
func (batch *Batch) collectMessages(w *snapshot.Writer, index, hashes *indexing.Bucket, opts *CollectOptions) error {
// Open a records section
records, err := w.OpenRecords()
if err != nil {
return errors.UnknownError.Wrap(err)
}

batch := db.Begin(false)
defer batch.Discard()

copts := collectOptions(index, opts)

for i := 0; i < 256; i++ {
Expand Down Expand Up @@ -261,10 +267,7 @@ func (db *Database) collectMessages(w *snapshot.Writer, index, hashes *indexing.
return errors.UnknownError.Wrap(err)
}

func (db *Database) collectBPT(w *snapshot.Writer, opts *CollectOptions) error {
batch := db.Begin(false)
defer batch.Discard()

func (batch *Batch) collectBPT(w *snapshot.Writer, opts *CollectOptions) error {
// Check if the caller wants to skip the BPT
if opts.Predicate != nil {
ok, err := opts.Predicate(batch.BPT())
Expand Down
3 changes: 3 additions & 0 deletions internal/node/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ type Snapshots struct {
// Enable enables snapshots
Enable bool `toml:"enable" mapstructure:"enable"`

// EnableIndexing enables indexing of snapshots
EnableIndexing bool `toml:"enable-indexing" mapstructure:"enable-indexing"`

// Directory is the directory to store snapshots in
Directory string `toml:"directory" mapstructure:"directory"`

Expand Down
47 changes: 39 additions & 8 deletions internal/node/daemon/snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ import (

"gitlab.com/accumulatenetwork/accumulate/internal/core"
"gitlab.com/accumulatenetwork/accumulate/internal/core/events"
"gitlab.com/accumulatenetwork/accumulate/internal/database"
coredb "gitlab.com/accumulatenetwork/accumulate/internal/database"
"gitlab.com/accumulatenetwork/accumulate/internal/database/snapshot"
"gitlab.com/accumulatenetwork/accumulate/internal/node/abci"
"gitlab.com/accumulatenetwork/accumulate/internal/node/config"
ioutil2 "gitlab.com/accumulatenetwork/accumulate/internal/util/io"
"gitlab.com/accumulatenetwork/accumulate/pkg/database"
"gitlab.com/accumulatenetwork/accumulate/pkg/errors"
"golang.org/x/exp/slog"
)

func (d *Daemon) onDidCommitBlock(event events.DidCommitBlock) error {
Expand All @@ -36,7 +38,11 @@ func (d *Daemon) onDidCommitBlock(event events.DidCommitBlock) error {
return nil
}

func (d *Daemon) collectSnapshot(batch *database.Batch, blockTime time.Time, majorBlock, minorBlock uint64) {
func (d *Daemon) collectSnapshot(batch *coredb.Batch, blockTime time.Time, majorBlock, minorBlock uint64) {
if !d.isTimeForSnapshot(blockTime) {
return
}

// Don't collect a snapshot if one is still being collected
if !d.snapshotLock.TryLock() {
return
Expand All @@ -50,10 +56,6 @@ func (d *Daemon) collectSnapshot(batch *database.Batch, blockTime time.Time, maj
}()
defer batch.Discard()

if !d.isTimeForSnapshot(blockTime) {
return
}

d.Logger.Info("Creating a snapshot", "major-block", majorBlock, "minor-block", minorBlock, "module", "snapshot")
snapDir := config.MakeAbsolute(d.Config.RootDir, d.Config.Accumulate.Snapshots.Directory)
err := os.Mkdir(snapDir, 0755)
Expand All @@ -76,7 +78,36 @@ func (d *Daemon) collectSnapshot(batch *database.Batch, blockTime time.Time, maj
}
}()

err = snapshot.FullCollect(batch, file, d.Config.Accumulate.PartitionUrl(), d.Logger.With("module", "snapshot"), false)
// Timer for updating progress
tick := time.NewTicker(10 * time.Second)
defer tick.Stop()

var metrics coredb.CollectMetrics
err = batch.Collect(file, d.Config.Accumulate.PartitionUrl().URL, &coredb.CollectOptions{
Metrics: &metrics,
BuildIndex: d.Config.Accumulate.Snapshots.EnableIndexing,
Predicate: func(r database.Record) (bool, error) {
select {
case <-tick.C:
default:
return true, nil
}

// The sole purpose of this function is to print progress
switch r.Key().Get(0) {
case "Account":
k := r.Key().SliceJ(2)
h := k.Hash()
slog.Info("Collecting an account", "module", "snapshot", "majorBlock", majorBlock, "account", k, "hash", h[:4], "totalMessages", metrics.Messages.Count)

case "Message", "Transaction":
slog.Info("Collecting a message", "module", "snapshot", "majorBlock", majorBlock, "message", r.Key().Get(1), "count", fmt.Sprintf("%d/%d", metrics.Messages.Collecting, metrics.Messages.Count))
}

// Retain everything
return true, nil
},
})
if err != nil {
d.Logger.Error("Failed to create snapshot", "error", err, "major-block", majorBlock, "minor-block", minorBlock, "module", "snapshot")
return
Expand Down Expand Up @@ -127,7 +158,7 @@ func (d *Daemon) collectSnapshot(batch *database.Batch, blockTime time.Time, maj
}

func (d *Daemon) LoadSnapshot(file ioutil2.SectionReader) error {
db, err := database.Open(d.Config, d.Logger)
db, err := coredb.Open(d.Config, d.Logger)
if err != nil {
return fmt.Errorf("failed to open database: %v", err)
}
Expand Down

0 comments on commit 4976c54

Please sign in to comment.