diff --git a/cmd/accumulated/run/config.yml b/cmd/accumulated/run/config.yml index f7b9ddc6e..df205165b 100644 --- a/cmd/accumulated/run/config.yml +++ b/cmd/accumulated/run/config.yml @@ -81,6 +81,9 @@ Instrumentation: type: Monitor marshal-as: reference pointer: true + - name: PprofListen + type: p2p.Multiaddr + marshal-as: union Monitor: non-binary: true diff --git a/cmd/accumulated/run/consensus.go b/cmd/accumulated/run/consensus.go index f6f299ee1..e83535e39 100644 --- a/cmd/accumulated/run/consensus.go +++ b/cmd/accumulated/run/consensus.go @@ -7,6 +7,7 @@ package run import ( + "context" "crypto/sha256" "encoding/hex" "fmt" @@ -246,12 +247,10 @@ func (c *ConsensusService) start(inst *Instance) error { return errors.UnknownError.WithFormat("start consensus: %w", err) } - inst.cleanup(func() { + inst.cleanup(func(context.Context) error { err := node.Stop() - if err != nil { - slog.ErrorContext(inst.context, "Error while stopping node", "error", err) - } node.Wait() + return err }) err = consensusProvidesEventBus.Register(inst.services, c, d.eventBus) diff --git a/cmd/accumulated/run/faucet.go b/cmd/accumulated/run/faucet.go index 1cbb47a70..8678049cd 100644 --- a/cmd/accumulated/run/faucet.go +++ b/cmd/accumulated/run/faucet.go @@ -7,6 +7,7 @@ package run import ( + "context" "time" "gitlab.com/accumulatenetwork/accumulate/exp/ioc" @@ -98,7 +99,7 @@ func (f *FaucetService) start(inst *Instance) error { return } - inst.cleanup(func() { impl.Stop() }) + inst.cleanup(func(context.Context) error { impl.Stop(); return nil }) registerRpcService(inst, impl.ServiceAddress(), message.Faucet{Faucet: impl}) inst.logger.Info("Ready", "module", "run", "service", "faucet") }() diff --git a/cmd/accumulated/run/http.go b/cmd/accumulated/run/http.go index fc8e6e359..b0f9463bc 100644 --- a/cmd/accumulated/run/http.go +++ b/cmd/accumulated/run/http.go @@ -7,7 +7,6 @@ package run import ( - "context" "encoding/json" "log/slog" "net" @@ -154,28 +153,14 @@ func (h *HttpListener) startHTTP(inst *Instance, handler http.Handler) (*http.Se ReadHeaderTimeout: *h.ReadHeaderTimeout, } - inst.cleanup(func() { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - err := server.Shutdown(ctx) - slog.Error("Server stopped", "error", err) - }) + inst.cleanup(server.Shutdown) for _, l := range h.Listen { - proto, addr, port, http, err := decomposeListen(l) - if err != nil { - return nil, err - } - if proto == "" || port == "" { - return nil, errors.UnknownError.WithFormat("invalid listen address: %v", l) - } - addr += ":" + port - - l, err := net.Listen(proto, addr) + l, secure, err := httpListen(l) if err != nil { return nil, err } - err = h.serveHTTP(inst, server, l, http == "https") + err = h.serveHTTP(inst, server, l, secure) if err != nil { return nil, err } diff --git a/cmd/accumulated/run/instance.go b/cmd/accumulated/run/instance.go index c78ef1394..48ccf9db7 100644 --- a/cmd/accumulated/run/instance.go +++ b/cmd/accumulated/run/instance.go @@ -108,10 +108,8 @@ func (inst *Instance) StartFiltered(predicate func(Service) bool) (err error) { } }() - // Start metrics - if inst.config.Instrumentation == nil { - inst.config.Instrumentation = new(Instrumentation) - } + // Start instrumentation + setDefaultVal(&inst.config.Instrumentation, new(Instrumentation)) err = inst.config.Instrumentation.start(inst) if err != nil { return err @@ -227,12 +225,18 @@ func (i *Instance) run(fn func()) { }() } -func (i *Instance) cleanup(fn func()) { +func (i *Instance) cleanup(fn func(context.Context) error) { i.running.Add(1) go func() { defer i.running.Done() <-i.context.Done() - fn() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + err := fn(ctx) + if err != nil { + slog.Error("Error during shutdown", "error", err) + } }() } diff --git a/cmd/accumulated/run/instrumentation.go b/cmd/accumulated/run/instrumentation.go index bb9ac1e1f..c0d395072 100644 --- a/cmd/accumulated/run/instrumentation.go +++ b/cmd/accumulated/run/instrumentation.go @@ -7,8 +7,10 @@ package run import ( + "context" "fmt" "log/slog" + "net/http" "os" "runtime" "runtime/pprof" @@ -20,14 +22,17 @@ import ( ) func (i *Instrumentation) start(inst *Instance) error { - err := i.listen(inst) + err := i.startPprof(inst) if err != nil { - return errors.UnknownError.WithFormat("listen: %w", err) + return errors.UnknownError.WithFormat("pprof: %w", err) } - if i.Monitoring == nil { - i.Monitoring = new(Monitor) + err = i.listen(inst) + if err != nil { + return errors.UnknownError.WithFormat("listen: %w", err) } + + setDefaultVal(&i.Monitoring, new(Monitor)) err = i.Monitoring.start(inst) if err != nil { return errors.UnknownError.WithFormat("monitoring: %w", err) @@ -55,6 +60,31 @@ func (i *Instrumentation) listen(inst *Instance) error { return err } +func (i *Instrumentation) startPprof(inst *Instance) error { + if i.PprofListen == nil { + return nil + } + + l, secure, err := httpListen(i.PprofListen) + if err != nil { + return err + } + if secure { + return errors.BadRequest.With("https pprof not supported") + } + + // Default HTTP server plus slow-loris prevention + s := &http.Server{ReadHeaderTimeout: time.Minute} + + inst.run(func() { + err := s.Serve(l) + slog.Error("Server stopped (pprof)", "error", err) + }) + + inst.cleanup(s.Shutdown) + return nil +} + func (m *Monitor) start(inst *Instance) error { setDefaultPtr(&m.ProfileMemory, false) // Enabled = false setDefaultPtr(&m.MemoryPollingRate, time.Minute) // Polling rate = every minute @@ -75,7 +105,7 @@ func (m *Monitor) start(inst *Instance) error { func (m *Monitor) pollMemory(inst *Instance) { tick := time.NewTicker(*m.MemoryPollingRate) - inst.cleanup(tick.Stop) + inst.cleanup(func(context.Context) error { tick.Stop(); return nil }) var s1, s2 runtime.MemStats runtime.ReadMemStats(&s1) diff --git a/cmd/accumulated/run/p2p.go b/cmd/accumulated/run/p2p.go index 90a20b8bf..e89aacf1a 100644 --- a/cmd/accumulated/run/p2p.go +++ b/cmd/accumulated/run/p2p.go @@ -7,6 +7,7 @@ package run import ( + "context" "encoding/json" "log/slog" "strconv" @@ -45,13 +46,13 @@ func (p *P2P) start(inst *Instance) error { slog.InfoContext(inst.context, "We are", "id", node.ID(), "module", "run") - inst.cleanup(func() { + inst.cleanup(func(context.Context) error { err := node.Close() if err != nil { - slog.ErrorContext(inst.context, "Error while stopping node", "module", "run", "id", node.ID(), "error", err) - } else { - slog.InfoContext(inst.context, "Stopped", "id", node.ID(), "module", "run") + return err } + slog.InfoContext(inst.context, "Stopped", "id", node.ID(), "module", "run") + return nil }) return nil } diff --git a/cmd/accumulated/run/storage.go b/cmd/accumulated/run/storage.go index 240ce297b..cb7b74466 100644 --- a/cmd/accumulated/run/storage.go +++ b/cmd/accumulated/run/storage.go @@ -7,8 +7,8 @@ package run import ( + "context" "io" - "log/slog" "gitlab.com/accumulatenetwork/accumulate/exp/ioc" "gitlab.com/accumulatenetwork/accumulate/pkg/database/keyvalue" @@ -115,7 +115,7 @@ func (s *BadgerStorage) open(inst *Instance) (keyvalue.Beginner, error) { return nil, err } - inst.cleanupCloser(db, "Error while closing database") + inst.cleanup(func(context.Context) error { return db.Close() }) return db, nil } @@ -125,7 +125,7 @@ func (s *BoltStorage) open(inst *Instance) (keyvalue.Beginner, error) { return nil, err } - inst.cleanupCloser(db, "Error while closing database") + inst.cleanup(func(context.Context) error { return db.Close() }) return db, nil } @@ -135,7 +135,7 @@ func (s *LevelDBStorage) open(inst *Instance) (keyvalue.Beginner, error) { return nil, err } - inst.cleanupCloser(db, "Error while closing database") + inst.cleanup(func(context.Context) error { return db.Close() }) return db, nil } @@ -145,15 +145,6 @@ func (s *ExpBlockDBStorage) open(inst *Instance) (keyvalue.Beginner, error) { return nil, err } - inst.cleanupCloser(db, "Error while closing database") + inst.cleanup(func(context.Context) error { return db.Close() }) return db, nil } - -func (i *Instance) cleanupCloser(c io.Closer, msg string) { - i.cleanup(func() { - err := c.Close() - if err != nil { - slog.ErrorContext(i.context, msg, "error", err) - } - }) -} diff --git a/cmd/accumulated/run/types_gen.go b/cmd/accumulated/run/types_gen.go index 4543b2d5d..7b1877721 100644 --- a/cmd/accumulated/run/types_gen.go +++ b/cmd/accumulated/run/types_gen.go @@ -155,7 +155,8 @@ type HttpService struct { type Instrumentation struct { HttpListener - Monitoring *Monitor `json:"monitoring,omitempty" form:"monitoring" query:"monitoring" validate:"required"` + Monitoring *Monitor `json:"monitoring,omitempty" form:"monitoring" query:"monitoring" validate:"required"` + PprofListen p2p.Multiaddr `json:"pprofListen,omitempty" form:"pprofListen" query:"pprofListen" validate:"required"` } type LevelDBStorage struct { @@ -666,6 +667,9 @@ func (v *Instrumentation) Copy() *Instrumentation { if v.Monitoring != nil { u.Monitoring = (v.Monitoring).Copy() } + if v.PprofListen != nil { + u.PprofListen = p2p.CopyMultiaddr(v.PprofListen) + } return u } @@ -1383,6 +1387,9 @@ func (v *Instrumentation) Equal(u *Instrumentation) bool { case !((v.Monitoring).Equal(u.Monitoring)): return false } + if !(p2p.EqualMultiaddr(v.PprofListen, u.PprofListen)) { + return false + } return true } @@ -2307,6 +2314,7 @@ func (v *Instrumentation) MarshalJSON() ([]byte, error) { TlsCertPath string `json:"tlsCertPath,omitempty"` TlsKeyPath string `json:"tlsKeyPath,omitempty"` Monitoring *Monitor `json:"monitoring,omitempty"` + PprofListen *encoding.JsonUnmarshalWith[p2p.Multiaddr] `json:"pprofListen,omitempty"` }{} if !(len(v.HttpListener.Listen) == 0) { u.Listen = &encoding.JsonUnmarshalListWith[p2p.Multiaddr]{Value: v.HttpListener.Listen, Func: p2p.UnmarshalMultiaddrJSON} @@ -2328,6 +2336,9 @@ func (v *Instrumentation) MarshalJSON() ([]byte, error) { if !(v.Monitoring == nil) { u.Monitoring = v.Monitoring } + if !(p2p.EqualMultiaddr(v.PprofListen, nil)) { + u.PprofListen = &encoding.JsonUnmarshalWith[p2p.Multiaddr]{Value: v.PprofListen, Func: p2p.UnmarshalMultiaddrJSON} + } return json.Marshal(&u) } @@ -3127,6 +3138,7 @@ func (v *Instrumentation) UnmarshalJSON(data []byte) error { TlsCertPath string `json:"tlsCertPath,omitempty"` TlsKeyPath string `json:"tlsKeyPath,omitempty"` Monitoring *Monitor `json:"monitoring,omitempty"` + PprofListen *encoding.JsonUnmarshalWith[p2p.Multiaddr] `json:"pprofListen,omitempty"` }{} u.Listen = &encoding.JsonUnmarshalListWith[p2p.Multiaddr]{Value: v.HttpListener.Listen, Func: p2p.UnmarshalMultiaddrJSON} u.ConnectionLimit = v.HttpListener.ConnectionLimit @@ -3136,6 +3148,7 @@ func (v *Instrumentation) UnmarshalJSON(data []byte) error { u.TlsCertPath = v.HttpListener.TlsCertPath u.TlsKeyPath = v.HttpListener.TlsKeyPath u.Monitoring = v.Monitoring + u.PprofListen = &encoding.JsonUnmarshalWith[p2p.Multiaddr]{Value: v.PprofListen, Func: p2p.UnmarshalMultiaddrJSON} err := json.Unmarshal(data, &u) if err != nil { return err @@ -3157,6 +3170,10 @@ func (v *Instrumentation) UnmarshalJSON(data []byte) error { v.HttpListener.TlsCertPath = u.TlsCertPath v.HttpListener.TlsKeyPath = u.TlsKeyPath v.Monitoring = u.Monitoring + if u.PprofListen != nil { + v.PprofListen = u.PprofListen.Value + } + return nil } diff --git a/cmd/accumulated/run/utils.go b/cmd/accumulated/run/utils.go index ac062329c..a717b46a5 100644 --- a/cmd/accumulated/run/utils.go +++ b/cmd/accumulated/run/utils.go @@ -158,6 +158,20 @@ func decomposeListen(addr multiaddr.Multiaddr) (proto, host, port, http string, return } +func httpListen(ma multiaddr.Multiaddr) (net.Listener, bool, error) { + proto, addr, port, http, err := decomposeListen(ma) + if err != nil { + return nil, false, err + } + if proto == "" || port == "" { + return nil, false, errors.UnknownError.WithFormat("invalid listen address: %v", ma) + } + addr += ":" + port + + l, err := net.Listen(proto, addr) + return l, http == "https", err +} + func isPrivate(addr multiaddr.Multiaddr) bool { var private bool multiaddr.ForEach(addr, func(c multiaddr.Component) bool {