Skip to content

Commit

Permalink
Configurable pprof listener [#3614]
Browse files Browse the repository at this point in the history
  • Loading branch information
firelizzard18 committed Jun 28, 2024
1 parent 2bf4728 commit 39b5a5c
Show file tree
Hide file tree
Showing 10 changed files with 98 additions and 53 deletions.
3 changes: 3 additions & 0 deletions cmd/accumulated/run/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ Instrumentation:
type: Monitor
marshal-as: reference
pointer: true
- name: PprofListen
type: p2p.Multiaddr
marshal-as: union

Monitor:
non-binary: true
Expand Down
7 changes: 3 additions & 4 deletions cmd/accumulated/run/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package run

import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
Expand Down Expand Up @@ -246,12 +247,10 @@ func (c *ConsensusService) start(inst *Instance) error {
return errors.UnknownError.WithFormat("start consensus: %w", err)
}

inst.cleanup(func() {
inst.cleanup(func(context.Context) error {
err := node.Stop()
if err != nil {
slog.ErrorContext(inst.context, "Error while stopping node", "error", err)
}
node.Wait()
return err
})

err = consensusProvidesEventBus.Register(inst.services, c, d.eventBus)
Expand Down
3 changes: 2 additions & 1 deletion cmd/accumulated/run/faucet.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package run

import (
"context"
"time"

"gitlab.com/accumulatenetwork/accumulate/exp/ioc"
Expand Down Expand Up @@ -98,7 +99,7 @@ func (f *FaucetService) start(inst *Instance) error {
return
}

inst.cleanup(func() { impl.Stop() })
inst.cleanup(func(context.Context) error { impl.Stop(); return nil })
registerRpcService(inst, impl.ServiceAddress(), message.Faucet{Faucet: impl})
inst.logger.Info("Ready", "module", "run", "service", "faucet")
}()
Expand Down
21 changes: 3 additions & 18 deletions cmd/accumulated/run/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
package run

import (
"context"
"encoding/json"
"log/slog"
"net"
Expand Down Expand Up @@ -154,28 +153,14 @@ func (h *HttpListener) startHTTP(inst *Instance, handler http.Handler) (*http.Se
ReadHeaderTimeout: *h.ReadHeaderTimeout,
}

inst.cleanup(func() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err := server.Shutdown(ctx)
slog.Error("Server stopped", "error", err)
})
inst.cleanup(server.Shutdown)

for _, l := range h.Listen {
proto, addr, port, http, err := decomposeListen(l)
if err != nil {
return nil, err
}
if proto == "" || port == "" {
return nil, errors.UnknownError.WithFormat("invalid listen address: %v", l)
}
addr += ":" + port

l, err := net.Listen(proto, addr)
l, secure, err := httpListen(l)
if err != nil {
return nil, err
}
err = h.serveHTTP(inst, server, l, http == "https")
err = h.serveHTTP(inst, server, l, secure)
if err != nil {
return nil, err
}
Expand Down
16 changes: 10 additions & 6 deletions cmd/accumulated/run/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,8 @@ func (inst *Instance) StartFiltered(predicate func(Service) bool) (err error) {
}
}()

// Start metrics
if inst.config.Instrumentation == nil {
inst.config.Instrumentation = new(Instrumentation)
}
// Start instrumentation
setDefaultVal(&inst.config.Instrumentation, new(Instrumentation))
err = inst.config.Instrumentation.start(inst)
if err != nil {
return err
Expand Down Expand Up @@ -227,12 +225,18 @@ func (i *Instance) run(fn func()) {
}()
}

func (i *Instance) cleanup(fn func()) {
func (i *Instance) cleanup(fn func(context.Context) error) {
i.running.Add(1)
go func() {
defer i.running.Done()
<-i.context.Done()
fn()

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err := fn(ctx)
if err != nil {
slog.Error("Error during shutdown", "error", err)
}
}()
}

Expand Down
40 changes: 35 additions & 5 deletions cmd/accumulated/run/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
package run

import (
"context"
"fmt"
"log/slog"
"net/http"
"os"
"runtime"
"runtime/pprof"
Expand All @@ -20,14 +22,17 @@ import (
)

func (i *Instrumentation) start(inst *Instance) error {
err := i.listen(inst)
err := i.startPprof(inst)
if err != nil {
return errors.UnknownError.WithFormat("listen: %w", err)
return errors.UnknownError.WithFormat("pprof: %w", err)
}

if i.Monitoring == nil {
i.Monitoring = new(Monitor)
err = i.listen(inst)
if err != nil {
return errors.UnknownError.WithFormat("listen: %w", err)
}

setDefaultVal(&i.Monitoring, new(Monitor))
err = i.Monitoring.start(inst)
if err != nil {
return errors.UnknownError.WithFormat("monitoring: %w", err)
Expand Down Expand Up @@ -55,6 +60,31 @@ func (i *Instrumentation) listen(inst *Instance) error {
return err
}

func (i *Instrumentation) startPprof(inst *Instance) error {
if i.PprofListen == nil {
return nil
}

l, secure, err := httpListen(i.PprofListen)
if err != nil {
return err
}
if secure {
return errors.BadRequest.With("https pprof not supported")
}

// Default HTTP server plus slow-loris prevention
s := &http.Server{ReadHeaderTimeout: time.Minute}

inst.run(func() {
err := s.Serve(l)
slog.Error("Server stopped (pprof)", "error", err)
})

inst.cleanup(s.Shutdown)
return nil
}

func (m *Monitor) start(inst *Instance) error {
setDefaultPtr(&m.ProfileMemory, false) // Enabled = false
setDefaultPtr(&m.MemoryPollingRate, time.Minute) // Polling rate = every minute
Expand All @@ -75,7 +105,7 @@ func (m *Monitor) start(inst *Instance) error {

func (m *Monitor) pollMemory(inst *Instance) {
tick := time.NewTicker(*m.MemoryPollingRate)
inst.cleanup(tick.Stop)
inst.cleanup(func(context.Context) error { tick.Stop(); return nil })

var s1, s2 runtime.MemStats
runtime.ReadMemStats(&s1)
Expand Down
9 changes: 5 additions & 4 deletions cmd/accumulated/run/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package run

import (
"context"
"encoding/json"
"log/slog"
"strconv"
Expand Down Expand Up @@ -45,13 +46,13 @@ func (p *P2P) start(inst *Instance) error {

slog.InfoContext(inst.context, "We are", "id", node.ID(), "module", "run")

inst.cleanup(func() {
inst.cleanup(func(context.Context) error {
err := node.Close()
if err != nil {
slog.ErrorContext(inst.context, "Error while stopping node", "module", "run", "id", node.ID(), "error", err)
} else {
slog.InfoContext(inst.context, "Stopped", "id", node.ID(), "module", "run")
return err
}
slog.InfoContext(inst.context, "Stopped", "id", node.ID(), "module", "run")
return nil
})
return nil
}
Expand Down
19 changes: 5 additions & 14 deletions cmd/accumulated/run/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
package run

import (
"context"
"io"
"log/slog"

"gitlab.com/accumulatenetwork/accumulate/exp/ioc"
"gitlab.com/accumulatenetwork/accumulate/pkg/database/keyvalue"
Expand Down Expand Up @@ -115,7 +115,7 @@ func (s *BadgerStorage) open(inst *Instance) (keyvalue.Beginner, error) {
return nil, err
}

inst.cleanupCloser(db, "Error while closing database")
inst.cleanup(func(context.Context) error { return db.Close() })
return db, nil
}

Expand All @@ -125,7 +125,7 @@ func (s *BoltStorage) open(inst *Instance) (keyvalue.Beginner, error) {
return nil, err
}

inst.cleanupCloser(db, "Error while closing database")
inst.cleanup(func(context.Context) error { return db.Close() })
return db, nil
}

Expand All @@ -135,7 +135,7 @@ func (s *LevelDBStorage) open(inst *Instance) (keyvalue.Beginner, error) {
return nil, err
}

inst.cleanupCloser(db, "Error while closing database")
inst.cleanup(func(context.Context) error { return db.Close() })
return db, nil
}

Expand All @@ -145,15 +145,6 @@ func (s *ExpBlockDBStorage) open(inst *Instance) (keyvalue.Beginner, error) {
return nil, err
}

inst.cleanupCloser(db, "Error while closing database")
inst.cleanup(func(context.Context) error { return db.Close() })
return db, nil
}

func (i *Instance) cleanupCloser(c io.Closer, msg string) {
i.cleanup(func() {
err := c.Close()
if err != nil {
slog.ErrorContext(i.context, msg, "error", err)
}
})
}
19 changes: 18 additions & 1 deletion cmd/accumulated/run/types_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ type HttpService struct {

type Instrumentation struct {
HttpListener
Monitoring *Monitor `json:"monitoring,omitempty" form:"monitoring" query:"monitoring" validate:"required"`
Monitoring *Monitor `json:"monitoring,omitempty" form:"monitoring" query:"monitoring" validate:"required"`
PprofListen p2p.Multiaddr `json:"pprofListen,omitempty" form:"pprofListen" query:"pprofListen" validate:"required"`
}

type LevelDBStorage struct {
Expand Down Expand Up @@ -666,6 +667,9 @@ func (v *Instrumentation) Copy() *Instrumentation {
if v.Monitoring != nil {
u.Monitoring = (v.Monitoring).Copy()
}
if v.PprofListen != nil {
u.PprofListen = p2p.CopyMultiaddr(v.PprofListen)
}

return u
}
Expand Down Expand Up @@ -1383,6 +1387,9 @@ func (v *Instrumentation) Equal(u *Instrumentation) bool {
case !((v.Monitoring).Equal(u.Monitoring)):
return false
}
if !(p2p.EqualMultiaddr(v.PprofListen, u.PprofListen)) {
return false
}

return true
}
Expand Down Expand Up @@ -2307,6 +2314,7 @@ func (v *Instrumentation) MarshalJSON() ([]byte, error) {
TlsCertPath string `json:"tlsCertPath,omitempty"`
TlsKeyPath string `json:"tlsKeyPath,omitempty"`
Monitoring *Monitor `json:"monitoring,omitempty"`
PprofListen *encoding.JsonUnmarshalWith[p2p.Multiaddr] `json:"pprofListen,omitempty"`
}{}
if !(len(v.HttpListener.Listen) == 0) {
u.Listen = &encoding.JsonUnmarshalListWith[p2p.Multiaddr]{Value: v.HttpListener.Listen, Func: p2p.UnmarshalMultiaddrJSON}
Expand All @@ -2328,6 +2336,9 @@ func (v *Instrumentation) MarshalJSON() ([]byte, error) {
if !(v.Monitoring == nil) {
u.Monitoring = v.Monitoring
}
if !(p2p.EqualMultiaddr(v.PprofListen, nil)) {
u.PprofListen = &encoding.JsonUnmarshalWith[p2p.Multiaddr]{Value: v.PprofListen, Func: p2p.UnmarshalMultiaddrJSON}
}
return json.Marshal(&u)
}

Expand Down Expand Up @@ -3127,6 +3138,7 @@ func (v *Instrumentation) UnmarshalJSON(data []byte) error {
TlsCertPath string `json:"tlsCertPath,omitempty"`
TlsKeyPath string `json:"tlsKeyPath,omitempty"`
Monitoring *Monitor `json:"monitoring,omitempty"`
PprofListen *encoding.JsonUnmarshalWith[p2p.Multiaddr] `json:"pprofListen,omitempty"`
}{}
u.Listen = &encoding.JsonUnmarshalListWith[p2p.Multiaddr]{Value: v.HttpListener.Listen, Func: p2p.UnmarshalMultiaddrJSON}
u.ConnectionLimit = v.HttpListener.ConnectionLimit
Expand All @@ -3136,6 +3148,7 @@ func (v *Instrumentation) UnmarshalJSON(data []byte) error {
u.TlsCertPath = v.HttpListener.TlsCertPath
u.TlsKeyPath = v.HttpListener.TlsKeyPath
u.Monitoring = v.Monitoring
u.PprofListen = &encoding.JsonUnmarshalWith[p2p.Multiaddr]{Value: v.PprofListen, Func: p2p.UnmarshalMultiaddrJSON}
err := json.Unmarshal(data, &u)
if err != nil {
return err
Expand All @@ -3157,6 +3170,10 @@ func (v *Instrumentation) UnmarshalJSON(data []byte) error {
v.HttpListener.TlsCertPath = u.TlsCertPath
v.HttpListener.TlsKeyPath = u.TlsKeyPath
v.Monitoring = u.Monitoring
if u.PprofListen != nil {
v.PprofListen = u.PprofListen.Value
}

return nil
}

Expand Down
14 changes: 14 additions & 0 deletions cmd/accumulated/run/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,20 @@ func decomposeListen(addr multiaddr.Multiaddr) (proto, host, port, http string,
return
}

func httpListen(ma multiaddr.Multiaddr) (net.Listener, bool, error) {
proto, addr, port, http, err := decomposeListen(ma)
if err != nil {
return nil, false, err
}
if proto == "" || port == "" {
return nil, false, errors.UnknownError.WithFormat("invalid listen address: %v", ma)
}
addr += ":" + port

l, err := net.Listen(proto, addr)
return l, http == "https", err
}

func isPrivate(addr multiaddr.Multiaddr) bool {
var private bool
multiaddr.ForEach(addr, func(c multiaddr.Component) bool {
Expand Down

0 comments on commit 39b5a5c

Please sign in to comment.