Skip to content

Commit

Permalink
merge: branch '3623-faucet-tweaks' into 'main'
Browse files Browse the repository at this point in the history
Faucet tweaks [#3623]

Closes #3623

See merge request accumulatenetwork/accumulate!1090
  • Loading branch information
firelizzard18 committed Sep 30, 2024
2 parents 098bf61 + 49ff6e5 commit 6cf7191
Show file tree
Hide file tree
Showing 14 changed files with 94 additions and 50 deletions.
16 changes: 16 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,22 @@
"${workspaceFolder}/.nodes/devnet/bootstrap",
]
},
{
"name": "Run faucet",
"presentation": {
"group": "20-Run"
},
"type": "go",
"request": "launch",
"mode": "auto",
"program": "${workspaceFolder}/cmd/accumulated-faucet",
"args": [
"Kermit",
"--account=acc://7a6f9db5789710a6b27e0c5965e337d8fc7431075290434d/ACME",
"--key=${workspaceFolder}/.nodes/faucet/key",
"--node-key=seed:foo",
]
},
// Services
{
"name": "API (mainnet)",
Expand Down
12 changes: 7 additions & 5 deletions cmd/accumulated-bootstrap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ var flag = struct {
External multiaddr.Multiaddr
}{
Key: PrivateKeyFlag{Value: &TransientPrivateKey{}},
PromListen: []multiaddr.Multiaddr{
multiaddr.StringCast("/ip4/0.0.0.0/tcp/8081/http"),
},
}

func init() {
Expand All @@ -46,6 +43,12 @@ func init() {
cmd.Flags().Var((*MultiaddrSliceFlag)(&flag.PromListen), "prom-listen", "Prometheus listening address(es) (default /ip4/0.0.0.0/tcp/8081/http)")
cmd.Flags().VarP((*MultiaddrSliceFlag)(&flag.Peers), "peer", "p", "Peers to connect to")
cmd.Flags().Var(MultiaddrFlag{Value: &flag.External}, "external", "External address to advertize")

cmd.PersistentPreRun = func(cmd *cobra.Command, args []string) {
if !cmd.Flag("prom-listen").Changed {
flag.PromListen = []multiaddr.Multiaddr{multiaddr.StringCast("/ip4/0.0.0.0/tcp/8081/http")}
}
}
}

func run(*cobra.Command, []string) {
Expand All @@ -67,7 +70,6 @@ func run(*cobra.Command, []string) {
ctx := ContextForMainProcess(context.Background())
inst, err := Start(ctx, cfg)
Check(err)

<-ctx.Done()
<-inst.Done()
inst.Stop()
}
53 changes: 34 additions & 19 deletions cmd/accumulated-faucet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ package main
import (
"context"
"log/slog"
"os/user"
"path/filepath"

"github.com/multiformats/go-multiaddr"
"github.com/spf13/cobra"
Expand All @@ -30,32 +32,43 @@ var cmd = &cobra.Command{
}

var flag = struct {
NodeKey PrivateKeyFlag
Key PrivateKeyFlag
LogLevel string
Account UrlFlag
Listen []multiaddr.Multiaddr
PromListen []multiaddr.Multiaddr
Peers []multiaddr.Multiaddr
}{
PromListen: []multiaddr.Multiaddr{
multiaddr.StringCast("/ip4/0.0.0.0/tcp/8081/http"),
},
}
NodeKey PrivateKeyFlag
Key PrivateKeyFlag
LogLevel string
Account UrlFlag
Listen []multiaddr.Multiaddr
PromListen []multiaddr.Multiaddr
Peers []multiaddr.Multiaddr
PeerDatabase string
}{}

var cu = func() *user.User {
cu, _ := user.Current()
return cu
}()

func init() {
flag.Key.Value = &TransientPrivateKey{}
flag.Peers = accumulate.BootstrapServers

if cu != nil {
flag.PeerDatabase = filepath.Join(cu.HomeDir, ".accumulate", "cache", "peerdb.json")
}

cmd.Flags().Var(&flag.NodeKey, "node-key", "The node key - not required but highly recommended. The value can be a key or a file containing a key. The key must be hex, base64, or an Accumulate secret key address.")
cmd.Flags().Var(&flag.Key, "key", "The key used to sign faucet transactions")
cmd.Flags().Var(&flag.Account, "account", "The faucet account")
cmd.Flags().Var((*MultiaddrSliceFlag)(&flag.Listen), "listen", "P2P listening address(es)")
cmd.Flags().Var((*MultiaddrSliceFlag)(&flag.PromListen), "prom-listen", "Prometheus listening address(es) (default /ip4/0.0.0.0/tcp/8081/http)")
cmd.Flags().Var((*MultiaddrSliceFlag)(&flag.PromListen), "prom-listen", "Prometheus listening address(es)")
cmd.Flags().VarP((*MultiaddrSliceFlag)(&flag.Peers), "peer", "p", "Peers to connect to")
cmd.Flags().StringVar(&flag.PeerDatabase, "peer-db", flag.PeerDatabase, "Track peers using a persistent database.")
cmd.Flags().StringVar(&flag.LogLevel, "log-level", "error", "Log level")

_ = cmd.MarkFlagRequired("peer")
cmd.PersistentPreRun = func(cmd *cobra.Command, args []string) {
if !cmd.Flag("peer").Changed {
flag.Peers = accumulate.BootstrapServers
}
}

_ = cmd.MarkFlagRequired("account")
_ = cmd.MarkFlagRequired("key")
}
Expand All @@ -79,16 +92,18 @@ func run(_ *cobra.Command, args []string) {
},
},
P2P: &P2P{
Key: flag.NodeKey.Value,
Listen: flag.Listen,
BootstrapPeers: flag.Peers,
Key: flag.NodeKey.Value,
Listen: flag.Listen,
BootstrapPeers: flag.Peers,
PeerDB: &flag.PeerDatabase,
EnablePeerTracking: true,
},
Services: []Service{svcCfg},
}

ctx := cmdutil.ContextForMainProcess(context.Background())
inst, err := Start(ctx, cfg)
Check(err)
<-ctx.Done()
<-inst.Done()
inst.Stop()
}
18 changes: 11 additions & 7 deletions cmd/accumulated-http/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,7 @@ var flag = struct {
TlsKey string
PeerDatabase string
Pprof string
}{
PromListen: []multiaddr.Multiaddr{
multiaddr.StringCast("/ip4/0.0.0.0/tcp/8081/http"),
},
}
}{}

var cu = func() *user.User {
cu, _ := user.Current()
Expand All @@ -67,7 +63,6 @@ var cu = func() *user.User {

func init() {
flag.Key.Value = &TransientPrivateKey{}
flag.Peers = accumulate.BootstrapServers

if cu != nil {
flag.PeerDatabase = filepath.Join(cu.HomeDir, ".accumulate", "cache", "peerdb.json")
Expand All @@ -88,6 +83,15 @@ func init() {
cmd.Flags().StringVar(&flag.PeerDatabase, "peer-db", flag.PeerDatabase, "Track peers using a persistent database.")
cmd.Flags().BoolVar(&jsonrpc2.DebugMethodFunc, "debug", false, "Print out a stack trace if an API method fails")
cmd.Flags().StringVar(&flag.Pprof, "pprof", "", "Address to run net/http/pprof on")

cmd.PersistentPreRun = func(cmd *cobra.Command, args []string) {
if !cmd.Flag("prom-listen").Changed {
flag.PromListen = []multiaddr.Multiaddr{multiaddr.StringCast("/ip4/0.0.0.0/tcp/8081/http")}
}
if !cmd.Flag("peer").Changed {
flag.Peers = accumulate.BootstrapServers
}
}
}

func run(cmd *cobra.Command, args []string) {
Expand Down Expand Up @@ -171,7 +175,7 @@ func run(cmd *cobra.Command, args []string) {
ctx := cmdutil.ContextForMainProcess(context.Background())
inst, err := Start(ctx, cfg)
Check(err)
<-ctx.Done()
<-inst.Done()
inst.Stop()
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/accumulated/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func runCfg(c *run.Config, predicate func(run.Service) bool) {

check(i.StartFiltered(predicate))
color.HiBlue("\n----- Running -----\n\n")
<-ctx.Done()
<-i.Done()
color.HiBlack("\n----- Stopping -----\n\n")
i.Stop()
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/accumulated/run/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func (c *ConsensusService) start(inst *Instance) error {
return errors.UnknownError.WithFormat("start consensus: %w", err)
}

inst.cleanup(func(context.Context) error {
inst.cleanup("consensus node", func(context.Context) error {
err := node.Stop()
node.Wait()
return err
Expand Down
8 changes: 6 additions & 2 deletions cmd/accumulated/run/faucet.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ func (f *FaucetService) start(inst *Instance) error {
go func() {
if r, ok := router.(*routing.RouterInstance); ok {
inst.logger.Info("Waiting for router", "module", "run", "service", "faucet")
<-r.Ready()
if !<-r.Ready() {
inst.logger.Error("Failed to start faucet", "error", "unable to start router")
inst.Stop()
return
}
}

for {
Expand Down Expand Up @@ -99,7 +103,7 @@ func (f *FaucetService) start(inst *Instance) error {
return
}

inst.cleanup(func(context.Context) error { impl.Stop(); return nil })
inst.cleanup("faucet", func(context.Context) error { impl.Stop(); return nil })
registerRpcService(inst, impl.ServiceAddress(), message.Faucet{Faucet: impl})
inst.logger.Info("Ready", "module", "run", "service", "faucet")
}()
Expand Down
2 changes: 1 addition & 1 deletion cmd/accumulated/run/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (h *HttpListener) startHTTP(inst *Instance, handler http.Handler) (*http.Se
ReadHeaderTimeout: h.ReadHeaderTimeout.Get(),
}

inst.cleanup(server.Shutdown)
inst.cleanup("http listener", server.Shutdown)

for _, l := range h.Listen {
l, secure, err := httpListen(l)
Expand Down
9 changes: 6 additions & 3 deletions cmd/accumulated/run/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (inst *Instance) StartFiltered(predicate func(Service) bool) (err error) {
serviceUp.Add(inst.context, 1, metric.WithAttributes(
attribute.String("type", svc.Type().String())))

inst.cleanup(func(ctx context.Context) error {
inst.cleanup("service metrics", func(ctx context.Context) error {
serviceUp.Add(inst.context, -1, metric.WithAttributes(
attribute.String("type", svc.Type().String())))
return nil
Expand Down Expand Up @@ -256,17 +256,20 @@ func (i *Instance) run(fn func()) {
}()
}

func (i *Instance) cleanup(fn func(context.Context) error) {
func (i *Instance) cleanup(name string, fn func(context.Context) error) {
i.running.Add(1)
go func() {
defer i.running.Done()
<-i.context.Done()

slog.Debug("Stopping", "process", name)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err := fn(ctx)
if err != nil {
slog.Error("Error during shutdown", "error", err)
slog.Error("Error during shutdown", "error", err, "process", name)
} else {
slog.Debug("Stopped", "process", name)
}
}()
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/accumulated/run/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (i *Instrumentation) startPprof(inst *Instance) error {
slog.Error("Server stopped (pprof)", "error", err)
})

inst.cleanup(s.Shutdown)
inst.cleanup("pprof http", s.Shutdown)
return nil
}

Expand Down Expand Up @@ -110,7 +110,7 @@ func (m *Monitor) start(inst *Instance) error {

func (m *Monitor) pollMemory(inst *Instance) {
tick := time.NewTicker(m.MemoryPollingRate.Get())
inst.cleanup(func(context.Context) error { tick.Stop(); return nil })
inst.cleanup("memory tracking", func(context.Context) error { tick.Stop(); return nil })

var s1, s2 runtime.MemStats
runtime.ReadMemStats(&s1)
Expand Down
2 changes: 1 addition & 1 deletion cmd/accumulated/run/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (p *P2P) start(inst *Instance) error {

slog.InfoContext(inst.context, "We are", "node-id", node.ID(), "instance-id", inst.id, "module", "run")

inst.cleanup(func(context.Context) error {
inst.cleanup("p2p node", func(context.Context) error {
err := node.Close()
if err != nil {
return err
Expand Down
8 changes: 4 additions & 4 deletions cmd/accumulated/run/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (s *BadgerStorage) open(inst *Instance) (keyvalue.Beginner, error) {
return nil, err
}

inst.cleanup(func(context.Context) error { return db.Close() })
inst.cleanup("storage", func(context.Context) error { return db.Close() })
return db, nil
}

Expand All @@ -117,7 +117,7 @@ func (s *BoltStorage) open(inst *Instance) (keyvalue.Beginner, error) {
return nil, err
}

inst.cleanup(func(context.Context) error { return db.Close() })
inst.cleanup("storage", func(context.Context) error { return db.Close() })
return db, nil
}

Expand All @@ -127,7 +127,7 @@ func (s *LevelDBStorage) open(inst *Instance) (keyvalue.Beginner, error) {
return nil, err
}

inst.cleanup(func(context.Context) error { return db.Close() })
inst.cleanup("storage", func(context.Context) error { return db.Close() })
return db, nil
}

Expand All @@ -137,6 +137,6 @@ func (s *ExpBlockDBStorage) open(inst *Instance) (keyvalue.Beginner, error) {
return nil, err
}

inst.cleanup(func(context.Context) error { return db.Close() })
inst.cleanup("storage", func(context.Context) error { return db.Close() })
return db, nil
}
6 changes: 3 additions & 3 deletions cmd/accumulated/run/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (t *Telemetry) setupTraceProvider(res *resource.Resource, inst *Instance) e
trace.WithBatchTimeout(time.Second)),
)

inst.cleanup(provider.Shutdown)
inst.cleanup("otel (traces)", provider.Shutdown)
otel.SetTracerProvider(provider)
return nil
}
Expand Down Expand Up @@ -252,7 +252,7 @@ func (t *Telemetry) setupMeterProvider(res *resource.Resource, inst *Instance) e
}

provider := metric.NewMeterProvider(options...)
inst.cleanup(provider.Shutdown)
inst.cleanup("otel (metrics)", provider.Shutdown)
otel.SetMeterProvider(provider)
return nil
}
Expand All @@ -271,7 +271,7 @@ func (t *Telemetry) setupLoggerProvider(res *resource.Resource, inst *Instance)
log.WithProcessor(log.NewBatchProcessor(exporter)),
)

inst.cleanup(provider.Shutdown)
inst.cleanup("otel (logs)", provider.Shutdown)
global.SetLoggerProvider(provider)
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion internal/api/routing/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (r MessageRouter) Route(msg message.Message) (multiaddr.Multiaddr, error) {
if t != nil {
token = t
} else {
return nil, errors.BadRequest.WithFormat("%v is not a lite token address and the request does not specify a token type", t)
return nil, errors.BadRequest.WithFormat("%v is not a lite token address and the request does not specify a token type", msg.Account)
}
}

Expand Down

0 comments on commit 6cf7191

Please sign in to comment.