diff --git a/.vscode/launch.json b/.vscode/launch.json index 156923018..ddc4c28be 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -261,6 +261,22 @@ "${workspaceFolder}/.nodes/devnet/bootstrap", ] }, + { + "name": "Run faucet", + "presentation": { + "group": "20-Run" + }, + "type": "go", + "request": "launch", + "mode": "auto", + "program": "${workspaceFolder}/cmd/accumulated-faucet", + "args": [ + "Kermit", + "--account=acc://7a6f9db5789710a6b27e0c5965e337d8fc7431075290434d/ACME", + "--key=${workspaceFolder}/.nodes/faucet/key", + "--node-key=seed:foo", + ] + }, // Services { "name": "API (mainnet)", diff --git a/cmd/accumulated-bootstrap/main.go b/cmd/accumulated-bootstrap/main.go index 22687f992..8196bfce9 100644 --- a/cmd/accumulated-bootstrap/main.go +++ b/cmd/accumulated-bootstrap/main.go @@ -35,9 +35,6 @@ var flag = struct { External multiaddr.Multiaddr }{ Key: PrivateKeyFlag{Value: &TransientPrivateKey{}}, - PromListen: []multiaddr.Multiaddr{ - multiaddr.StringCast("/ip4/0.0.0.0/tcp/8081/http"), - }, } func init() { @@ -46,6 +43,12 @@ func init() { cmd.Flags().Var((*MultiaddrSliceFlag)(&flag.PromListen), "prom-listen", "Prometheus listening address(es) (default /ip4/0.0.0.0/tcp/8081/http)") cmd.Flags().VarP((*MultiaddrSliceFlag)(&flag.Peers), "peer", "p", "Peers to connect to") cmd.Flags().Var(MultiaddrFlag{Value: &flag.External}, "external", "External address to advertize") + + cmd.PersistentPreRun = func(cmd *cobra.Command, args []string) { + if !cmd.Flag("prom-listen").Changed { + flag.PromListen = []multiaddr.Multiaddr{multiaddr.StringCast("/ip4/0.0.0.0/tcp/8081/http")} + } + } } func run(*cobra.Command, []string) { @@ -67,7 +70,6 @@ func run(*cobra.Command, []string) { ctx := ContextForMainProcess(context.Background()) inst, err := Start(ctx, cfg) Check(err) - - <-ctx.Done() + <-inst.Done() inst.Stop() } diff --git a/cmd/accumulated-faucet/main.go b/cmd/accumulated-faucet/main.go index d65c766db..3eab00be2 100644 --- a/cmd/accumulated-faucet/main.go +++ b/cmd/accumulated-faucet/main.go @@ -9,6 +9,8 @@ package main import ( "context" "log/slog" + "os/user" + "path/filepath" "github.com/multiformats/go-multiaddr" "github.com/spf13/cobra" @@ -30,32 +32,43 @@ var cmd = &cobra.Command{ } var flag = struct { - NodeKey PrivateKeyFlag - Key PrivateKeyFlag - LogLevel string - Account UrlFlag - Listen []multiaddr.Multiaddr - PromListen []multiaddr.Multiaddr - Peers []multiaddr.Multiaddr -}{ - PromListen: []multiaddr.Multiaddr{ - multiaddr.StringCast("/ip4/0.0.0.0/tcp/8081/http"), - }, -} + NodeKey PrivateKeyFlag + Key PrivateKeyFlag + LogLevel string + Account UrlFlag + Listen []multiaddr.Multiaddr + PromListen []multiaddr.Multiaddr + Peers []multiaddr.Multiaddr + PeerDatabase string +}{} + +var cu = func() *user.User { + cu, _ := user.Current() + return cu +}() func init() { flag.Key.Value = &TransientPrivateKey{} - flag.Peers = accumulate.BootstrapServers + + if cu != nil { + flag.PeerDatabase = filepath.Join(cu.HomeDir, ".accumulate", "cache", "peerdb.json") + } cmd.Flags().Var(&flag.NodeKey, "node-key", "The node key - not required but highly recommended. The value can be a key or a file containing a key. The key must be hex, base64, or an Accumulate secret key address.") cmd.Flags().Var(&flag.Key, "key", "The key used to sign faucet transactions") cmd.Flags().Var(&flag.Account, "account", "The faucet account") cmd.Flags().Var((*MultiaddrSliceFlag)(&flag.Listen), "listen", "P2P listening address(es)") - cmd.Flags().Var((*MultiaddrSliceFlag)(&flag.PromListen), "prom-listen", "Prometheus listening address(es) (default /ip4/0.0.0.0/tcp/8081/http)") + cmd.Flags().Var((*MultiaddrSliceFlag)(&flag.PromListen), "prom-listen", "Prometheus listening address(es)") cmd.Flags().VarP((*MultiaddrSliceFlag)(&flag.Peers), "peer", "p", "Peers to connect to") + cmd.Flags().StringVar(&flag.PeerDatabase, "peer-db", flag.PeerDatabase, "Track peers using a persistent database.") cmd.Flags().StringVar(&flag.LogLevel, "log-level", "error", "Log level") - _ = cmd.MarkFlagRequired("peer") + cmd.PersistentPreRun = func(cmd *cobra.Command, args []string) { + if !cmd.Flag("peer").Changed { + flag.Peers = accumulate.BootstrapServers + } + } + _ = cmd.MarkFlagRequired("account") _ = cmd.MarkFlagRequired("key") } @@ -79,9 +92,11 @@ func run(_ *cobra.Command, args []string) { }, }, P2P: &P2P{ - Key: flag.NodeKey.Value, - Listen: flag.Listen, - BootstrapPeers: flag.Peers, + Key: flag.NodeKey.Value, + Listen: flag.Listen, + BootstrapPeers: flag.Peers, + PeerDB: &flag.PeerDatabase, + EnablePeerTracking: true, }, Services: []Service{svcCfg}, } @@ -89,6 +104,6 @@ func run(_ *cobra.Command, args []string) { ctx := cmdutil.ContextForMainProcess(context.Background()) inst, err := Start(ctx, cfg) Check(err) - <-ctx.Done() + <-inst.Done() inst.Stop() } diff --git a/cmd/accumulated-http/main.go b/cmd/accumulated-http/main.go index 2d5d9f6cb..d0ba9d738 100644 --- a/cmd/accumulated-http/main.go +++ b/cmd/accumulated-http/main.go @@ -54,11 +54,7 @@ var flag = struct { TlsKey string PeerDatabase string Pprof string -}{ - PromListen: []multiaddr.Multiaddr{ - multiaddr.StringCast("/ip4/0.0.0.0/tcp/8081/http"), - }, -} +}{} var cu = func() *user.User { cu, _ := user.Current() @@ -67,7 +63,6 @@ var cu = func() *user.User { func init() { flag.Key.Value = &TransientPrivateKey{} - flag.Peers = accumulate.BootstrapServers if cu != nil { flag.PeerDatabase = filepath.Join(cu.HomeDir, ".accumulate", "cache", "peerdb.json") @@ -88,6 +83,15 @@ func init() { cmd.Flags().StringVar(&flag.PeerDatabase, "peer-db", flag.PeerDatabase, "Track peers using a persistent database.") cmd.Flags().BoolVar(&jsonrpc2.DebugMethodFunc, "debug", false, "Print out a stack trace if an API method fails") cmd.Flags().StringVar(&flag.Pprof, "pprof", "", "Address to run net/http/pprof on") + + cmd.PersistentPreRun = func(cmd *cobra.Command, args []string) { + if !cmd.Flag("prom-listen").Changed { + flag.PromListen = []multiaddr.Multiaddr{multiaddr.StringCast("/ip4/0.0.0.0/tcp/8081/http")} + } + if !cmd.Flag("peer").Changed { + flag.Peers = accumulate.BootstrapServers + } + } } func run(cmd *cobra.Command, args []string) { @@ -171,7 +175,7 @@ func run(cmd *cobra.Command, args []string) { ctx := cmdutil.ContextForMainProcess(context.Background()) inst, err := Start(ctx, cfg) Check(err) - <-ctx.Done() + <-inst.Done() inst.Stop() } diff --git a/cmd/accumulated/main.go b/cmd/accumulated/main.go index afb96c58a..f20c49e65 100644 --- a/cmd/accumulated/main.go +++ b/cmd/accumulated/main.go @@ -103,7 +103,7 @@ func runCfg(c *run.Config, predicate func(run.Service) bool) { check(i.StartFiltered(predicate)) color.HiBlue("\n----- Running -----\n\n") - <-ctx.Done() + <-i.Done() color.HiBlack("\n----- Stopping -----\n\n") i.Stop() } diff --git a/cmd/accumulated/run/consensus.go b/cmd/accumulated/run/consensus.go index d8894c7ba..a1df28e00 100644 --- a/cmd/accumulated/run/consensus.go +++ b/cmd/accumulated/run/consensus.go @@ -235,7 +235,7 @@ func (c *ConsensusService) start(inst *Instance) error { return errors.UnknownError.WithFormat("start consensus: %w", err) } - inst.cleanup(func(context.Context) error { + inst.cleanup("consensus node", func(context.Context) error { err := node.Stop() node.Wait() return err diff --git a/cmd/accumulated/run/faucet.go b/cmd/accumulated/run/faucet.go index 8678049cd..6b06b6066 100644 --- a/cmd/accumulated/run/faucet.go +++ b/cmd/accumulated/run/faucet.go @@ -66,7 +66,11 @@ func (f *FaucetService) start(inst *Instance) error { go func() { if r, ok := router.(*routing.RouterInstance); ok { inst.logger.Info("Waiting for router", "module", "run", "service", "faucet") - <-r.Ready() + if !<-r.Ready() { + inst.logger.Error("Failed to start faucet", "error", "unable to start router") + inst.Stop() + return + } } for { @@ -99,7 +103,7 @@ func (f *FaucetService) start(inst *Instance) error { return } - inst.cleanup(func(context.Context) error { impl.Stop(); return nil }) + inst.cleanup("faucet", func(context.Context) error { impl.Stop(); return nil }) registerRpcService(inst, impl.ServiceAddress(), message.Faucet{Faucet: impl}) inst.logger.Info("Ready", "module", "run", "service", "faucet") }() diff --git a/cmd/accumulated/run/http.go b/cmd/accumulated/run/http.go index 50787f0b0..b72a9bb2c 100644 --- a/cmd/accumulated/run/http.go +++ b/cmd/accumulated/run/http.go @@ -154,7 +154,7 @@ func (h *HttpListener) startHTTP(inst *Instance, handler http.Handler) (*http.Se ReadHeaderTimeout: h.ReadHeaderTimeout.Get(), } - inst.cleanup(server.Shutdown) + inst.cleanup("http listener", server.Shutdown) for _, l := range h.Listen { l, secure, err := httpListen(l) diff --git a/cmd/accumulated/run/instance.go b/cmd/accumulated/run/instance.go index 57b206843..3bfb3f272 100644 --- a/cmd/accumulated/run/instance.go +++ b/cmd/accumulated/run/instance.go @@ -199,7 +199,7 @@ func (inst *Instance) StartFiltered(predicate func(Service) bool) (err error) { serviceUp.Add(inst.context, 1, metric.WithAttributes( attribute.String("type", svc.Type().String()))) - inst.cleanup(func(ctx context.Context) error { + inst.cleanup("service metrics", func(ctx context.Context) error { serviceUp.Add(inst.context, -1, metric.WithAttributes( attribute.String("type", svc.Type().String()))) return nil @@ -256,17 +256,20 @@ func (i *Instance) run(fn func()) { }() } -func (i *Instance) cleanup(fn func(context.Context) error) { +func (i *Instance) cleanup(name string, fn func(context.Context) error) { i.running.Add(1) go func() { defer i.running.Done() <-i.context.Done() + slog.Debug("Stopping", "process", name) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() err := fn(ctx) if err != nil { - slog.Error("Error during shutdown", "error", err) + slog.Error("Error during shutdown", "error", err, "process", name) + } else { + slog.Debug("Stopped", "process", name) } }() } diff --git a/cmd/accumulated/run/instrumentation.go b/cmd/accumulated/run/instrumentation.go index d18eb2d02..0ddda2ad3 100644 --- a/cmd/accumulated/run/instrumentation.go +++ b/cmd/accumulated/run/instrumentation.go @@ -82,7 +82,7 @@ func (i *Instrumentation) startPprof(inst *Instance) error { slog.Error("Server stopped (pprof)", "error", err) }) - inst.cleanup(s.Shutdown) + inst.cleanup("pprof http", s.Shutdown) return nil } @@ -110,7 +110,7 @@ func (m *Monitor) start(inst *Instance) error { func (m *Monitor) pollMemory(inst *Instance) { tick := time.NewTicker(m.MemoryPollingRate.Get()) - inst.cleanup(func(context.Context) error { tick.Stop(); return nil }) + inst.cleanup("memory tracking", func(context.Context) error { tick.Stop(); return nil }) var s1, s2 runtime.MemStats runtime.ReadMemStats(&s1) diff --git a/cmd/accumulated/run/p2p.go b/cmd/accumulated/run/p2p.go index 56b9678a0..9c7bea053 100644 --- a/cmd/accumulated/run/p2p.go +++ b/cmd/accumulated/run/p2p.go @@ -38,7 +38,7 @@ func (p *P2P) start(inst *Instance) error { slog.InfoContext(inst.context, "We are", "node-id", node.ID(), "instance-id", inst.id, "module", "run") - inst.cleanup(func(context.Context) error { + inst.cleanup("p2p node", func(context.Context) error { err := node.Close() if err != nil { return err diff --git a/cmd/accumulated/run/storage.go b/cmd/accumulated/run/storage.go index 00cce75a0..2eba1e0c2 100644 --- a/cmd/accumulated/run/storage.go +++ b/cmd/accumulated/run/storage.go @@ -107,7 +107,7 @@ func (s *BadgerStorage) open(inst *Instance) (keyvalue.Beginner, error) { return nil, err } - inst.cleanup(func(context.Context) error { return db.Close() }) + inst.cleanup("storage", func(context.Context) error { return db.Close() }) return db, nil } @@ -117,7 +117,7 @@ func (s *BoltStorage) open(inst *Instance) (keyvalue.Beginner, error) { return nil, err } - inst.cleanup(func(context.Context) error { return db.Close() }) + inst.cleanup("storage", func(context.Context) error { return db.Close() }) return db, nil } @@ -127,7 +127,7 @@ func (s *LevelDBStorage) open(inst *Instance) (keyvalue.Beginner, error) { return nil, err } - inst.cleanup(func(context.Context) error { return db.Close() }) + inst.cleanup("storage", func(context.Context) error { return db.Close() }) return db, nil } @@ -137,6 +137,6 @@ func (s *ExpBlockDBStorage) open(inst *Instance) (keyvalue.Beginner, error) { return nil, err } - inst.cleanup(func(context.Context) error { return db.Close() }) + inst.cleanup("storage", func(context.Context) error { return db.Close() }) return db, nil } diff --git a/cmd/accumulated/run/telemetry.go b/cmd/accumulated/run/telemetry.go index 366cd7957..8a2d11bca 100644 --- a/cmd/accumulated/run/telemetry.go +++ b/cmd/accumulated/run/telemetry.go @@ -102,7 +102,7 @@ func (t *Telemetry) setupTraceProvider(res *resource.Resource, inst *Instance) e trace.WithBatchTimeout(time.Second)), ) - inst.cleanup(provider.Shutdown) + inst.cleanup("otel (traces)", provider.Shutdown) otel.SetTracerProvider(provider) return nil } @@ -252,7 +252,7 @@ func (t *Telemetry) setupMeterProvider(res *resource.Resource, inst *Instance) e } provider := metric.NewMeterProvider(options...) - inst.cleanup(provider.Shutdown) + inst.cleanup("otel (metrics)", provider.Shutdown) otel.SetMeterProvider(provider) return nil } @@ -271,7 +271,7 @@ func (t *Telemetry) setupLoggerProvider(res *resource.Resource, inst *Instance) log.WithProcessor(log.NewBatchProcessor(exporter)), ) - inst.cleanup(provider.Shutdown) + inst.cleanup("otel (logs)", provider.Shutdown) global.SetLoggerProvider(provider) return nil } diff --git a/internal/api/routing/message.go b/internal/api/routing/message.go index 4f8ed6ba1..bd08dba54 100644 --- a/internal/api/routing/message.go +++ b/internal/api/routing/message.go @@ -156,7 +156,7 @@ func (r MessageRouter) Route(msg message.Message) (multiaddr.Multiaddr, error) { if t != nil { token = t } else { - return nil, errors.BadRequest.WithFormat("%v is not a lite token address and the request does not specify a token type", t) + return nil, errors.BadRequest.WithFormat("%v is not a lite token address and the request does not specify a token type", msg.Account) } }