From 3642b8cd1c0c95befab4c5d3ddfd4a2da26ed43c Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Thu, 31 Oct 2024 10:45:15 -1000 Subject: [PATCH] feat: purge peer connections and information Connections to a specific peer, or to all peers, can be closed and the peer information removed from the peer store. This can be useful to help determine if the presence/absence of a connection to a peer is affecting behavior. Be aware that purging a connection is inherently racey as it is possible for the peer to reestablish a connection at any time following a purge. - `http://$RAINBOW_CTL_LISTEN_ADDRESS/mgr/purge?peer=` purges connection and info for peer identifid by peer_id - `http://$RAINBOW_CTL_LISTEN_ADDRESS/mgr/purge?peer=all` purges connections and info for all peers - `http://$RAINBOW_CTL_LISTEN_ADDRESS/mgr/peers` returns a list of currently connected peers Closes #193 --- README.md | 13 +++++++ handlers.go | 107 ++++++++++++++++++++++++++++++++++++++++++++++++++++ main.go | 2 + 3 files changed, 122 insertions(+) diff --git a/README.md b/README.md index 36e9398..2aa6eb9 100644 --- a/README.md +++ b/README.md @@ -142,6 +142,19 @@ possible to dynamically modify the logging at runtime. - `http://$RAINBOW_CTL_LISTEN_ADDRESS/mgr/log/level?subsystem=&level=` will set the logging level for a subsystem - `http://$RAINBOW_CTL_LISTEN_ADDRESS/mgr/log/ls` will return a comma separated list of available logging subsystems +## Purging Peer Connections + +Connections to a specific peer, or to all peers, can be closed and the peer information removed from the peer store. This can be useful to help determine if the presence/absence of a connection to a peer is affecting behavior. Be aware that purging a connection is inherently racey as it is possible for the peer to reestablish a connection at any time following a purge. + +- `http://$RAINBOW_CTL_LISTEN_ADDRESS/mgr/purge?peer=` purges connection and info for peer identifid by peer_id +- `http://$RAINBOW_CTL_LISTEN_ADDRESS/mgr/purge?peer=all` purges connections and info for all peers +- `http://$RAINBOW_CTL_LISTEN_ADDRESS/mgr/peers` returns a list of currently connected peers + +Example cURL commmand to show connected peers and purge peer connection: + + curl http://127.0.0.1:8091/mgr/peers + curl http://127.0.0.1:8091/mgr/purge?peer=QmQzqxhK82kAmKvARFZSkUVS6fo9sySaiogAnx5EnZ6ZmC + ## Tracing See [docs/tracing.md](docs/tracing.md). diff --git a/handlers.go b/handlers.go index 1afcd6e..b4b5856 100644 --- a/handlers.go +++ b/handlers.go @@ -13,6 +13,8 @@ import ( "github.com/ipfs/boxo/blockstore" leveldb "github.com/ipfs/go-ds-leveldb" "github.com/ipfs/go-log/v2" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" _ "embed" _ "net/http/pprof" @@ -92,6 +94,111 @@ func GCHandler(gnd *Node) func(w http.ResponseWriter, r *http.Request) { } } +func PurgePeerHandler(p2pHost host.Host) func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + + q := r.URL.Query() + peerIDStr := q.Get("peer") + if peerIDStr == "" { + http.Error(w, "missing peer id", http.StatusBadRequest) + return + } + + if peerIDStr == "all" { + purgeCount, err := purgeAllConnections(p2pHost) + if err != nil { + goLog.Errorw("Error closing all libp2p connections", "err", err) + http.Error(w, "error closing connections", http.StatusInternalServerError) + return + } + goLog.Infow("Purged connections", "count", purgeCount) + + h := w.Header() + h.Set("Content-Type", "text/plain; charset=utf-8") + fmt.Fprintln(w, "Peer connections purged:", purgeCount) + return + } + + peerID, err := peer.Decode(peerIDStr) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + err = purgeConnection(p2pHost, peerID) + if err != nil { + goLog.Errorw("Error closing libp2p connection", "err", err, "peer", peerID) + http.Error(w, "error closing connection", http.StatusInternalServerError) + return + } + goLog.Infow("Purged connection", "peer", peerID) + + h := w.Header() + h.Set("Content-Type", "text/plain; charset=utf-8") + fmt.Fprintln(w, "Purged connection to peer", peerID) + } +} + +func purgeConnection(p2pHost host.Host, peerID peer.ID) error { + peerStore := p2pHost.Peerstore() + if peerStore != nil { + peerStore.RemovePeer(peerID) + peerStore.ClearAddrs(peerID) + } + return p2pHost.Network().ClosePeer(peerID) +} + +func purgeAllConnections(p2pHost host.Host) (int, error) { + net := p2pHost.Network() + peers := net.Peers() + + peerStore := p2pHost.Peerstore() + if peerStore != nil { + for _, peerID := range peers { + peerStore.RemovePeer(peerID) + peerStore.ClearAddrs(peerID) + } + } + + var errCount, purgeCount int + for _, peerID := range peers { + err := net.ClosePeer(peerID) + if err != nil { + goLog.Errorw("Closing libp2p connection", "err", err, "peer", peerID) + errCount++ + } else { + purgeCount++ + } + } + + if errCount != 0 { + return 0, fmt.Errorf("error closing connections to %d peers", errCount) + } + + return purgeCount, nil +} + +func showPeersHandler(p2pHost host.Host) func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + + h := w.Header() + h.Set("Content-Type", "text/plain; charset=utf-8") + + peers := p2pHost.Network().Peers() + if len(peers) == 0 { + fmt.Fprintln(w, "no connected peers") + return + } + + fmt.Fprintln(w, "Connected peers:", len(peers)) + for _, peerID := range peers { + fmt.Fprintln(w, peerID.String()) + } + } +} + func withConnect(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // ServeMux does not support requests with CONNECT method, diff --git a/main.go b/main.go index be37c14..e2a6a39 100644 --- a/main.go +++ b/main.go @@ -590,6 +590,8 @@ share the same seed as long as the indexes are different. apiMux := makeMetricsAndDebuggingHandler() apiMux.HandleFunc("/mgr/gc", GCHandler(gnd)) + apiMux.HandleFunc("/mgr/purge", PurgePeerHandler(gnd.host)) + apiMux.HandleFunc("/mgr/peers", showPeersHandler(gnd.host)) addLogHandlers(apiMux) apiSrv := &http.Server{