Skip to content

Commit

Permalink
merge: branch '3596-alloc-spike-debug' into 'main'
Browse files Browse the repository at this point in the history
Debug allocation spikes [#3596]

Closes #3596

See merge request accumulatenetwork/accumulate!1063
  • Loading branch information
firelizzard18 committed May 10, 2024
2 parents 874c10d + c64e771 commit d3c0c0a
Show file tree
Hide file tree
Showing 26 changed files with 829 additions and 231 deletions.
9 changes: 2 additions & 7 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,7 @@
"mode": "auto",
"program": "${workspaceFolder}/cmd/accumulated",
"args": [
"run",
"devnet",
"-w=${workspaceFolder}/.nodes/devnet",
"${workspaceFolder}/.nodes/devnet",
]
},
{
Expand Down Expand Up @@ -524,10 +522,7 @@
"mode": "auto",
"program": "${workspaceFolder}/tools/cmd/debug",
"args": [
"comet",
"download-genesis",
"http://206.189.97.165:16592",
"${env:HOME}/Desktop/apollo-genesis.json"
"db", "analyze", "badger://${workspaceFolder}/.nodes/devnet/bvn1-1/bvnn/data/accumulate.db", "--accounts",
]
},
{
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.21 as build
FROM golang:1.22 as build

ARG GIT_DESCRIBE
ARG GIT_COMMIT
Expand Down
79 changes: 75 additions & 4 deletions cmd/accumulated/cmd_init_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@ import (
"github.com/spf13/cobra"
"gitlab.com/accumulatenetwork/accumulate/exp/faucet"
"gitlab.com/accumulatenetwork/accumulate/internal/core"
coredb "gitlab.com/accumulatenetwork/accumulate/internal/database"
"gitlab.com/accumulatenetwork/accumulate/internal/database/smt/storage"
"gitlab.com/accumulatenetwork/accumulate/internal/logging"
accumulated "gitlab.com/accumulatenetwork/accumulate/internal/node/daemon"
"gitlab.com/accumulatenetwork/accumulate/internal/node/genesis"
ioutil2 "gitlab.com/accumulatenetwork/accumulate/internal/util/io"
"gitlab.com/accumulatenetwork/accumulate/pkg/database"
"gitlab.com/accumulatenetwork/accumulate/pkg/url"
"gitlab.com/accumulatenetwork/accumulate/protocol"
"gitlab.com/accumulatenetwork/accumulate/test/testing"
)

var cmdInitNetwork = &cobra.Command{
Expand All @@ -43,8 +47,15 @@ var cmdInitGenesis = &cobra.Command{
Args: cobra.ExactArgs(1),
}

var cmdInitPrepareGenesis = &cobra.Command{
Use: "prepare-genesis <output> <inputs...>",
Short: "Ingests data from snapshots and produces a single, stripped-down snapshot for genesis",
Run: prepareGenesis,
Args: cobra.MinimumNArgs(2),
}

func init() {
cmdInit.AddCommand(cmdInitGenesis)
cmdInit.AddCommand(cmdInitGenesis, cmdInitPrepareGenesis)
}

func loadNetworkConfiguration(file string) (ret *accumulated.NetworkInit, err error) {
Expand Down Expand Up @@ -129,7 +140,68 @@ func initGenesis(cmd *cobra.Command, args []string) {
}
}

func initNetworkLocalFS(cmd *cobra.Command, netInit *accumulated.NetworkInit) {
func prepareGenesis(cmd *cobra.Command, args []string) {
// Timer for updating progress
tick := time.NewTicker(time.Second / 2)
defer tick.Stop()

db := coredb.OpenInMemory(nil)
db.SetObserver(testing.NullObserver{})
for _, path := range args[1:] {
fmt.Println("Processing", path)
file, err := os.Open(path)
check(err)
defer file.Close()
_, err = genesis.Extract(db, file, func(u *url.URL) bool {
select {
case <-tick.C:
h := database.NewKey("Account", u).Hash()
fmt.Printf("\033[A\r\033[KProcessing [%x] %v\n", h[:4], u)
default:
return true
}

// Retain everything
return true
})
check(err)
}

file, err := os.Create(args[0])
check(err)
defer file.Close()

// Collect
var metrics coredb.CollectMetrics
fmt.Println("Collecting into", args[0])
err = db.Collect(file, nil, &coredb.CollectOptions{
Metrics: &metrics,
Predicate: func(r database.Record) (bool, error) {
select {
case <-tick.C:
default:
return true, nil
}

// Print progress
switch r.Key().Get(0) {
case "Account":
k := r.Key().SliceJ(2)
h := k.Hash()
fmt.Printf("\033[A\r\033[KCollecting [%x] (%d) %v\n", h[:4], metrics.Messages.Count, k.Get(1))

case "Message", "Transaction":
fmt.Printf("\033[A\r\033[KCollecting (%d/%d) %x\n", metrics.Messages.Collecting, metrics.Messages.Count, r.Key().Get(1).([32]byte))
}

// Retain everything
return true, nil
},
})
check(err)
}

func initNetworkLocalFS(_ *cobra.Command, netInit *accumulated.NetworkInit) {
if flagInit.LogLevels != "" {
_, _, err := logging.ParseLogLevel(flagInit.LogLevels, io.Discard)
checkf(err, "--log-level")
Expand Down Expand Up @@ -222,8 +294,7 @@ func buildGenesis(network *accumulated.NetworkInit) map[string][]byte {
})
}

values := new(core.GlobalValues)
genDocs, err := accumulated.BuildGenesisDocs(network, values, time.Now(), newLogger(), factomAddresses, snapshots)
genDocs, err := accumulated.BuildGenesisDocs(network, network.Globals, time.Now(), newLogger(), factomAddresses, snapshots)
checkf(err, "build genesis documents")
return genDocs
}
Expand Down
27 changes: 27 additions & 0 deletions cmd/accumulated/run/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,29 @@ Instrumentation:
fields:
- type: HttpListener
marshal-as: reference
- name: Monitoring
type: Monitor
marshal-as: reference
pointer: true

Monitor:
non-binary: true
fields:
- name: Directory
description: is the directory traces and profiles are written to
type: string
- name: ProfileMemory
description: enables profiling when memory usage increases dramatically
type: bool
pointer: true
- name: MemoryPollingRate
description: is rate at which to poll memory usage
type: duration
pointer: true
- name: AllocRateTrigger
description: is the rate of allocation in bytes/second that triggers a profile
type: float
pointer: true

HttpListener:
non-binary: true
Expand Down Expand Up @@ -407,6 +430,10 @@ CoreValidatorConfiguration:
type: bool
optional: true
pointer: true
- name: EnableSnapshots
type: bool
optional: true
pointer: true
- name: MaxEnvelopesPerBlock
type: uint
optional: true
Expand Down
7 changes: 7 additions & 0 deletions cmd/accumulated/run/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ func (c *ConsensusService) start(inst *Instance) error {
return err
}

if d.config.Instrumentation.Prometheus {
d.config.Instrumentation.Namespace = c.MetricsNamespace
}

case errors.Is(err, fs.ErrNotExist):
d.config.NodeKey = ""
d.config.PrivValidatorKey = ""
Expand Down Expand Up @@ -500,6 +504,9 @@ func (c *CoreConsensusApp) start(inst *Instance, d *tendermint) (types.Applicati
Querier: v3.Querier2{Querier: client},
Dispatcher: execOpts.NewDispatcher(),
RunTask: execOpts.BackgroundTaskLauncher,

// TODO Fix the flooding issues and enable this by default
EnableAnchorHealing: Ptr(false),
}
err = conductor.Start(d.eventBus)
if err != nil {
Expand Down
38 changes: 15 additions & 23 deletions cmd/accumulated/run/core_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ type partOpts struct {
}

func (p partOpts) apply(cfg *Config) error {
setDefaultPtr(&p.EnableSnapshots, false)

// Consensus
addService(cfg,
&ConsensusService{
Expand All @@ -138,33 +140,23 @@ func (p partOpts) apply(cfg *Config) error {

// Storage
if !haveService2(cfg, p.ID, func(s *StorageService) string { return s.Name }, nil) {
switch *p.StorageType {
case StorageTypeMemory:
cfg.Services = append(cfg.Services, &StorageService{
Name: p.ID,
Storage: &MemoryStorage{},
})

case StorageTypeBadger:
cfg.Services = append(cfg.Services, &StorageService{
Name: p.ID,
Storage: &BadgerStorage{
Path: filepath.Join(p.Dir, "data", "accumulate.db"),
},
})

default:
return errors.BadRequest.WithFormat("unsupported storage type %v", p.StorageType)
storage, err := NewStorage(*p.StorageType)
if err != nil {
return errors.UnknownError.Wrap(err)
}

storage.setPath(filepath.Join(p.Dir, "data", "accumulate.db"))
cfg.Services = append(cfg.Services, &StorageService{Name: p.ID, Storage: storage})
}

// Snapshots
addService(cfg,
&SnapshotService{
Partition: p.ID,
Directory: filepath.Join(p.Dir, "snapshots"),
},
func(s *SnapshotService) string { return s.Partition })
if *p.EnableSnapshots {
addService(cfg,
&SnapshotService{
Partition: p.ID,
Directory: filepath.Join(p.Dir, "snapshots")},
func(s *SnapshotService) string { return s.Partition })
}

// Services
addService(cfg, &Querier{Partition: p.ID}, func(s *Querier) string { return s.Partition })
Expand Down
2 changes: 1 addition & 1 deletion cmd/accumulated/run/devnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ func (d *DevnetConfiguration) addSubNode(inst *Instance, root *Config, name stri
return cfg, sub, nil
}

func (d *DevnetConfiguration) writeSubNode(inst *Instance, root, cfg *Config, sub *SubnodeService, ip ipOffset) error {
func (d *DevnetConfiguration) writeSubNode(_ *Instance, root, cfg *Config, sub *SubnodeService, ip ipOffset) error {
// Update the subnode configuration
sub.NodeKey = cfg.P2P.Key
sub.Services = cfg.Services
Expand Down
3 changes: 3 additions & 0 deletions cmd/accumulated/run/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ func (inst *Instance) StartFiltered(predicate func(Service) bool) (err error) {
}()

// Start metrics
if inst.config.Instrumentation == nil {
inst.config.Instrumentation = new(Instrumentation)
}
err = inst.config.Instrumentation.start(inst)
if err != nil {
return err
Expand Down
89 changes: 87 additions & 2 deletions cmd/accumulated/run/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,36 @@
package run

import (
"fmt"
"os"
"runtime"
"runtime/pprof"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"gitlab.com/accumulatenetwork/accumulate/pkg/errors"
"golang.org/x/exp/slog"
)

func (i *Instrumentation) start(inst *Instance) error {
err := i.listen(inst)
if err != nil {
return errors.UnknownError.WithFormat("listen: %w", err)
}

if i.Monitoring == nil {
i.Monitoring = new(Monitor)
}
err = i.Monitoring.start(inst)
if err != nil {
return errors.UnknownError.WithFormat("monitoring: %w", err)
}

return nil
}

func (i *Instrumentation) listen(inst *Instance) error {
// If there's no listening address, there's nothing to do. Someone else
// (e.g. Tendermint) may be setting up a listener but we're not so we're
// done.
Expand All @@ -20,8 +45,6 @@ func (i *Instrumentation) start(inst *Instance) error {
if i == nil || len(i.Listen) == 0 {
return nil
}
i.applyHttpDefaults()

_, err := i.startHTTP(inst, promhttp.InstrumentMetricHandler(
prometheus.DefaultRegisterer, promhttp.HandlerFor(
prometheus.DefaultGatherer,
Expand All @@ -30,3 +53,65 @@ func (i *Instrumentation) start(inst *Instance) error {
))
return err
}

func (m *Monitor) start(inst *Instance) error {
setDefaultPtr(&m.ProfileMemory, false) // Enabled = false
setDefaultPtr(&m.MemoryPollingRate, time.Minute) // Polling rate = every minute
setDefaultPtr(&m.AllocRateTrigger, 50<<20) // Trigger rate = 50 MiB/s
setDefaultVal(&m.Directory, "traces") // Directory = ./traces

if *m.ProfileMemory {
err := os.MkdirAll(inst.path(m.Directory), 0700)
if err != nil {
return err
}

go m.pollMemory(inst)
}

return nil
}

func (m *Monitor) pollMemory(inst *Instance) {
tick := time.NewTicker(*m.MemoryPollingRate)
inst.cleanup(tick.Stop)

var s1, s2 runtime.MemStats
runtime.ReadMemStats(&s1)
t1 := time.Now()

for t2 := range tick.C {
runtime.ReadMemStats(&s2)
rate := (float64(s2.Alloc) - float64(s1.Alloc)) / t2.Sub(t1).Seconds()
s1, t1 = s2, t2
if rate < *m.AllocRateTrigger {
continue
}

// TODO Capture at a higher frequency, regardless of allocation rate,
// for some period after the spike

m.takeHeapProfile(inst)
}
}

func (m *Monitor) takeHeapProfile(inst *Instance) {
time := time.Now().Format("2006-01-02-15-04-05.999")
name := inst.path(m.Directory, fmt.Sprintf("memory-%s.pb.gz", time))
f, err := os.OpenFile(name, os.O_CREATE|os.O_EXCL|os.O_RDWR, 0600)
if err != nil {
slog.Error("Failed to open file for heap profile", "error", err, "path", name)
return
}
defer func() {
err = f.Close()
if err != nil {
slog.Error("Failed to close file after writing heap profile", "error", err, "path", name)
}
}()

err = pprof.Lookup("heap").WriteTo(f, 0)
if err != nil {
slog.Error("Failed to capture heap profile", "error", err, "path", name)
}
}
Loading

0 comments on commit d3c0c0a

Please sign in to comment.