Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: replace DHT with private peer discovery #34

Merged
merged 22 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
7d2b4f3
removing dht code from communication constructor add custom peer disc…
brewmaster012 Oct 23, 2024
0c977fc
fix linter complaints
brewmaster012 Oct 25, 2024
064fcc6
remove unnecessary arguments
brewmaster012 Oct 25, 2024
7e55943
add unit test
brewmaster012 Oct 25, 2024
50a27bd
remove rendezvous; no longer needed as DHT is removed
brewmaster012 Oct 25, 2024
d3ee7da
disable relay protocol
brewmaster012 Oct 27, 2024
c18043d
cleanup go.mod
brewmaster012 Oct 28, 2024
243c7de
merged with tip with #31, connection gater PR
brewmaster012 Oct 28, 2024
01921b3
WIP: fix some unit tests
brewmaster012 Oct 29, 2024
dbaee46
fix keygen/keysign unit tests
brewmaster012 Oct 29, 2024
13177a3
limit read size in stream
brewmaster012 Oct 29, 2024
ef1479e
make gossip connections concurrent
brewmaster012 Oct 29, 2024
3537fb6
increase timeout in gossip connections
brewmaster012 Oct 29, 2024
f87f1a1
add missing continue/return in gossip
brewmaster012 Oct 29, 2024
1d6e788
fix a bug of capturing loop variable in closure
brewmaster012 Oct 29, 2024
4441df3
make linter happy by renaming
brewmaster012 Oct 29, 2024
a9757e6
make linter happy again
brewmaster012 Oct 29, 2024
dae1f21
allow multiple addrs in gossip for the same peer
brewmaster012 Oct 29, 2024
694f4b4
use structured log if appropriate
brewmaster012 Oct 29, 2024
ba90276
fix unlimited growing of addrs in peer info
brewmaster012 Oct 29, 2024
e53a71f
Merge remote-tracking branch 'origin/master' into dht-removal
brewmaster012 Oct 29, 2024
d3ccaf1
review comments
brewmaster012 Oct 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion cmd/tss/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

"github.com/cosmos/cosmos-sdk/client/input"
golog "github.com/ipfs/go-log"

"gitlab.com/thorchain/tss/go-tss/common"
"gitlab.com/thorchain/tss/go-tss/conversion"
"gitlab.com/thorchain/tss/go-tss/p2p"
Expand Down
12 changes: 12 additions & 0 deletions conversion/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,3 +212,15 @@ func GetEDDSAPrivateKeyRawBytes(privateKey crypto2.PrivKey) ([]byte, error) {
copy(keyBytesArray[:], pk[:])
return keyBytesArray[:], nil
}

func Bech32PubkeyToPeerID(pubKey string) (peer.ID, error) {
bech32PubKey, err := sdk.UnmarshalPubKey(sdk.AccPK, pubKey)
if err != nil {
return "", err
}
brewmaster012 marked this conversation as resolved.
Show resolved Hide resolved
secp256k1PubKey, err := crypto2.UnmarshalSecp256k1PublicKey(bech32PubKey.Bytes())
if err != nil {
return "", err
}
return peer.IDFromPublicKey(secp256k1PubKey)
}
46 changes: 16 additions & 30 deletions p2p/communication.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,11 @@ import (
"time"

libp2p "github.com/libp2p/go-libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
discovery_routing "github.com/libp2p/go-libp2p/p2p/discovery/routing"
discovery_util "github.com/libp2p/go-libp2p/p2p/discovery/util"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
Expand Down Expand Up @@ -244,7 +241,7 @@ func (c *Communication) bootStrapConnectivityCheck() error {
}

func (c *Communication) startChannel(privKeyBytes []byte) error {
ctx := context.Background()
c.logger.Warn().Msgf("No DHT enabled")
brewmaster012 marked this conversation as resolved.
Show resolved Hide resolved
p2pPriKey, err := crypto.UnmarshalSecp256k1PrivateKey(privKeyBytes)
if err != nil {
c.logger.Error().Msgf("error is %f", err)
Expand Down Expand Up @@ -312,14 +309,6 @@ func (c *Communication) startChannel(privKeyBytes []byte) error {
// client because we want each peer to maintain its own local copy of the
// DHT, so that the bootstrapping node of the DHT can go down without
// inhibiting future peer discovery.
kademliaDHT, err := dht.New(ctx, h, dht.Mode(dht.ModeServer))
if err != nil {
return fmt.Errorf("fail to create DHT: %w", err)
}
c.logger.Debug().Msg("Bootstrapping the DHT")
if err = kademliaDHT.Bootstrap(ctx); err != nil {
return fmt.Errorf("fail to bootstrap DHT: %w", err)
}

var connectionErr error
for i := 0; i < 5; i++ {
Expand All @@ -334,30 +323,27 @@ func (c *Communication) startChannel(privKeyBytes []byte) error {
return fmt.Errorf("fail to connect to bootstrap peer: %w", connectionErr)
}

// We use a rendezvous point "meet me here" to announce our location.
// This is like telling your friends to meet you at the Eiffel Tower.
routingDiscovery := discovery_routing.NewRoutingDiscovery(kademliaDHT)
discovery_util.Advertise(ctx, routingDiscovery, c.rendezvous)

// Create a goroutine to shut down the DHT after 5 minutes
go func() {
select {
case <-time.After(5 * time.Minute):
c.logger.Info().Msg("Closing Kademlia DHT after 5 minutes")
if err := kademliaDHT.Close(); err != nil {
c.logger.Error().Err(err).Msg("Failed to close Kademlia DHT")
}
case <-ctx.Done():
c.logger.Info().Msg("Context done, not waiting for 5 minutes to close DHT")
}
}()

err = c.bootStrapConnectivityCheck()
if err != nil {
return err
}

c.logger.Info().Msg("Successfully announced!")

c.logger.Info().Msg("Start peer discovery/gossip...")
//c.bootstrapPeers
bootstrapPeerAddrInfos := make([]peer.AddrInfo, 0, len(c.bootstrapPeers))
for _, addr := range c.bootstrapPeers {
peerInfo, err := peer.AddrInfoFromP2pAddr(addr)
if err != nil {
c.logger.Error().Err(err).Msgf("fail to convert multiaddr to peer info: %s", addr)
continue
}
bootstrapPeerAddrInfos = append(bootstrapPeerAddrInfos, *peerInfo)
}
discovery := NewPeerDiscovery(c.host, bootstrapPeerAddrInfos)
discovery.Start(context.Background())
brewmaster012 marked this conversation as resolved.
Show resolved Hide resolved

brewmaster012 marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

Expand Down
176 changes: 176 additions & 0 deletions p2p/discovery.go
brewmaster012 marked this conversation as resolved.
Show resolved Hide resolved
brewmaster012 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package p2p

import (
"context"
"encoding/json"
"io"
"sync"
"time"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)

const DiscoveryProtocol = "/tss/discovery/1.0.0"

var GossipInterval = 10 * time.Second
brewmaster012 marked this conversation as resolved.
Show resolved Hide resolved

type PeerDiscovery struct {
host host.Host
knownPeers map[peer.ID]peer.AddrInfo
bootstrapPeers []peer.AddrInfo
mu sync.RWMutex
logger zerolog.Logger
}

func NewPeerDiscovery(h host.Host, bootstrapPeers []peer.AddrInfo) *PeerDiscovery {
pd := &PeerDiscovery{
host: h,
knownPeers: make(map[peer.ID]peer.AddrInfo),
bootstrapPeers: bootstrapPeers,
logger: log.With().Str("module", "peer-discovery").Logger(),
}

// Set up discovery protocol handler
h.SetStreamHandler(DiscoveryProtocol, pd.handleDiscovery)

brewmaster012 marked this conversation as resolved.
Show resolved Hide resolved
return pd
}
brewmaster012 marked this conversation as resolved.
Show resolved Hide resolved

// Start begins the discovery process
func (pd *PeerDiscovery) Start(ctx context.Context) {
pd.logger.Info().Msgf("Starting peer discovery with bootstrap peers: %v", pd.bootstrapPeers)
// Connect to bootstrap peers first
for _, pinfo := range pd.bootstrapPeers {
if err := pd.host.Connect(ctx, pinfo); err != nil {
pd.logger.Error().Err(err).Msgf("Failed to connect to bootstrap peer %s", pinfo.ID)
continue
}
pd.addPeer(pinfo)
}

// Start periodic gossip
go pd.startGossip(ctx)
}

// addPeer adds a peer to known peers
func (pd *PeerDiscovery) addPeer(pinfo peer.AddrInfo) {
pd.mu.Lock()
defer pd.mu.Unlock()

if pinfo.ID == pd.host.ID() {
return // Don't add ourselves
}
pd.knownPeers[pinfo.ID] = pinfo
brewmaster012 marked this conversation as resolved.
Show resolved Hide resolved
}
brewmaster012 marked this conversation as resolved.
Show resolved Hide resolved

// GetPeers returns all known peers
func (pd *PeerDiscovery) GetPeers() []peer.AddrInfo {
pd.mu.RLock()
defer pd.mu.RUnlock()

peers := make([]peer.AddrInfo, 0, len(pd.knownPeers))
for _, p := range pd.knownPeers {
peers = append(peers, p)
}
return peers
}

// handleDiscovery handles incoming discovery streams
func (pd *PeerDiscovery) handleDiscovery(s network.Stream) {
pd.logger.Debug().Msgf("Received discovery stream from %s", s.Conn().RemotePeer())
defer s.Close()

ma := s.Conn().RemoteMultiaddr()

ai := peer.AddrInfo{
ID: s.Conn().RemotePeer(),
Addrs: []multiaddr.Multiaddr{ma},
}
pd.addPeer(ai)

// Share our known peers
peers := pd.GetPeers()
data, err := json.Marshal(peers)
if err != nil {
pd.logger.Error().Err(err).Msgf("Failed to marshal peers")
return
}
_, err = s.Write(data)
if err != nil {
pd.logger.Error().Err(err).Msgf("Failed to write to stream")
}
brewmaster012 marked this conversation as resolved.
Show resolved Hide resolved
}
brewmaster012 marked this conversation as resolved.
Show resolved Hide resolved

// startGossip periodically shares peer information
func (pd *PeerDiscovery) startGossip(ctx context.Context) {
ticker := time.NewTicker(GossipInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:

pd.gossipPeers(ctx)
}
}
}

func (pd *PeerDiscovery) gossipPeers(ctx context.Context) {
pd.logger.Debug().Msgf("Gossiping known peers")
peers := pd.GetPeers()
pd.logger.Debug().Msgf("current peers: %v", peers)

ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
brewmaster012 marked this conversation as resolved.
Show resolved Hide resolved
for _, p := range peers {
if p.ID == pd.host.ID() {
continue
}

err := pd.host.Connect(ctx, p)
if err != nil {
pd.logger.Error().Err(err).Msgf("Failed to connect to peer %s", p)
}
pd.logger.Debug().Msgf("Connected to peer %s", p)

brewmaster012 marked this conversation as resolved.
Show resolved Hide resolved
// Open discovery stream
s, err := pd.host.NewStream(ctx, p.ID, DiscoveryProtocol)
if err != nil {
pd.logger.Error().Err(err).Msgf("Failed to open discovery stream to %s", p)
continue
}
pd.logger.Debug().Msgf("Opened discovery stream to %s", p)

// Read peer info from stream
// This is a simplified example - implement proper serialization
buf, err := io.ReadAll(s)
if err != nil {
s.Close()
pd.logger.Error().Err(err).Msgf("Failed to read from stream")
continue
}
pd.logger.Info().Msgf("Received peer data: %s", string(buf))
brewmaster012 marked this conversation as resolved.
Show resolved Hide resolved

// Parse received peer info and add to known peers
var recvPeers []peer.AddrInfo
err = json.Unmarshal(buf, &recvPeers)
if err != nil {
s.Close()
pd.logger.Error().Err(err).Msgf("Failed to unmarshal peer data")
continue
}
for _, p := range recvPeers {
pd.logger.Debug().Msgf("Adding peer %s", p)
pd.addPeer(p)
}

s.Close()
}
}
brewmaster012 marked this conversation as resolved.
Show resolved Hide resolved
69 changes: 69 additions & 0 deletions p2p/discovery_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package p2p

import (
"crypto/rand"
"encoding/base64"
"testing"
"time"

"github.com/libp2p/go-libp2p/core/crypto"
maddr "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/assert"
)

func TestDiscovery(t *testing.T) {
GossipInterval = 1 * time.Second
bootstrapPeer := "/ip4/127.0.0.1/tcp/2220/p2p/16Uiu2HAm4TmEzUqy3q3Dv7HvdoSboHk5sFj2FH3npiN5vDbJC6gh"
bootstrapPrivKey := "6LABmWB4iXqkqOJ9H0YFEA2CSSx6bA7XAKGyI/TDtas="
externalIP := "127.0.0.1"
brewmaster012 marked this conversation as resolved.
Show resolved Hide resolved
//fakeExternalMultiAddr := "/ip4/127.0.0.1/tcp/2220"
validMultiAddr, err := maddr.NewMultiaddr(bootstrapPeer)
assert.NoError(t, err)

privKey, err := base64.StdEncoding.DecodeString(bootstrapPrivKey)
assert.NoError(t, err)
comm, err := NewCommunication("commTest", nil, 2220, externalIP)
assert.NoError(t, err)
assert.NoError(t, comm.Start(privKey))
defer comm.Stop()
brewmaster012 marked this conversation as resolved.
Show resolved Hide resolved

sk1, _, err := crypto.GenerateSecp256k1Key(rand.Reader)
sk1raw, _ := sk1.Raw()
assert.NoError(t, err)
brewmaster012 marked this conversation as resolved.
Show resolved Hide resolved
comm2, err := NewCommunication("commTest", []maddr.Multiaddr{validMultiAddr}, 2221, externalIP)
assert.NoError(t, err)
err = comm2.Start(sk1raw)
assert.NoError(t, err)
defer comm2.Stop()

// we connect to an invalid peer and see
sk2, _, err := crypto.GenerateSecp256k1Key(rand.Reader)
assert.NoError(t, err)
sk2raw, _ := sk2.Raw()
//id, err := peer.IDFromPrivateKey(sk2)
//assert.NoError(t, err)
//invalidAddr := "/ip4/127.0.0.1/tcp/2220/p2p/" + id.String()
//invalidMultiAddr, err := maddr.NewMultiaddr(invalidAddr)
assert.NoError(t, err)
comm3, err := NewCommunication("commTest", []maddr.Multiaddr{validMultiAddr}, 2222, externalIP)
assert.NoError(t, err)
err = comm3.Start(sk2raw)
defer comm3.Stop()
brewmaster012 marked this conversation as resolved.
Show resolved Hide resolved

// we connect to one invalid and one valid address
sk3, _, err := crypto.GenerateSecp256k1Key(rand.Reader)
assert.NoError(t, err)
sk3raw, _ := sk3.Raw()
comm4, err := NewCommunication("commTest", []maddr.Multiaddr{validMultiAddr}, 2223, externalIP)
assert.NoError(t, err)
err = comm4.Start(sk3raw)
assert.NoError(t, err)
defer comm4.Stop()

brewmaster012 marked this conversation as resolved.
Show resolved Hide resolved
time.Sleep(5 * time.Second)

assert.Equal(t, len(comm.host.Peerstore().Peers()), 4)
assert.Equal(t, len(comm2.host.Peerstore().Peers()), 4)
assert.Equal(t, len(comm3.host.Peerstore().Peers()), 4)
assert.Equal(t, len(comm4.host.Peerstore().Peers()), 4)
brewmaster012 marked this conversation as resolved.
Show resolved Hide resolved
}
3 changes: 2 additions & 1 deletion tss/server.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tss

import (
"github.com/libp2p/go-libp2p/core/peer"
"gitlab.com/thorchain/tss/go-tss/keygen"
"gitlab.com/thorchain/tss/go-tss/keysign"
)
Expand All @@ -10,7 +11,7 @@ type Server interface {
Start() error
Stop()
GetLocalPeerID() string
GetKnownPeers() []PeerInfo
GetKnownPeers() []peer.AddrInfo
brewmaster012 marked this conversation as resolved.
Show resolved Hide resolved
Keygen(req keygen.Request) (keygen.Response, error)
KeygenAllAlgo(req keygen.Request) ([]keygen.Response, error)
KeySign(req keysign.Request) (keysign.Response, error)
Expand Down
Loading
Loading