diff --git a/p2p/communication.go b/p2p/communication.go index e3e2e37..120726f 100644 --- a/p2p/communication.go +++ b/p2p/communication.go @@ -59,7 +59,6 @@ type Communication struct { externalAddr maddr.Multiaddr streamMgr *StreamMgr whitelistedPeers []peer.ID - discovery *PeerDiscovery } // NewCommunication create a new instance of Communication @@ -248,7 +247,6 @@ func (c *Communication) bootStrapConnectivityCheck() error { } func (c *Communication) startChannel(privKeyBytes []byte) error { - c.logger.Info().Msgf("No DHT enabled; use private gossip instead.") p2pPriKey, err := crypto.UnmarshalSecp256k1PrivateKey(privKeyBytes) if err != nil { c.logger.Error().Msgf("error is %f", err) @@ -350,9 +348,6 @@ func (c *Communication) startChannel(privKeyBytes []byte) error { } bootstrapPeerAddrInfos = append(bootstrapPeerAddrInfos, *peerInfo) } - discovery := NewPeerDiscovery(c.host, bootstrapPeerAddrInfos) - c.discovery = discovery - discovery.Start(context.Background()) return nil } @@ -422,9 +417,6 @@ func (c *Communication) Start(priKeyBytes []byte) error { // Stop communication func (c *Communication) Stop() error { - if c.discovery != nil { - c.discovery.Stop() - } // we need to stop the handler and the p2p services firstly, then terminate the our communication threads if err := c.host.Close(); err != nil { c.logger.Err(err).Msg("fail to close host network") diff --git a/p2p/discovery.go b/p2p/discovery.go deleted file mode 100644 index 271f53e..0000000 --- a/p2p/discovery.go +++ /dev/null @@ -1,235 +0,0 @@ -package p2p - -import ( - "context" - "encoding/json" - "io" - "sync" - "time" - - "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/network" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/multiformats/go-multiaddr" - "github.com/rs/zerolog" - "github.com/rs/zerolog/log" -) - -const DiscoveryProtocol = "/tss/discovery/1.0.0" - -const MaxGossipConcurrency = 50 - -var GossipInterval = 30 * time.Second - -type PeerDiscovery struct { - host host.Host - knownPeers map[peer.ID]peer.AddrInfo - bootstrapPeers []peer.AddrInfo - mu sync.RWMutex - logger zerolog.Logger - closeChan chan struct{} -} - -func NewPeerDiscovery(h host.Host, bootstrapPeers []peer.AddrInfo) *PeerDiscovery { - pd := &PeerDiscovery{ - host: h, - knownPeers: make(map[peer.ID]peer.AddrInfo), - bootstrapPeers: bootstrapPeers, - logger: log.With().Str("module", "peer-discovery").Logger(), - closeChan: make(chan struct{}), - } - - // Set up discovery protocol handler - h.SetStreamHandler(DiscoveryProtocol, pd.handleDiscovery) - - return pd -} - -// Start begins the discovery process -func (pd *PeerDiscovery) Start(ctx context.Context) { - pd.logger.Info().Msg("Starting peer discovery with bootstrap peers") - // Connect to bootstrap peers first - for _, pinfo := range pd.bootstrapPeers { - if err := pd.host.Connect(ctx, pinfo); err != nil { - pd.logger.Error().Err(err). - Stringer("bootstrap_peer_id", pinfo.ID). - Stringer("bootstrap_peer_info", pinfo). - Msgf("Failed to connect to bootstrap peer") - continue - } - pd.addPeer(pinfo) - } - - //before periodic gossip, start two rounds of warmup; this is to ensure keygen/keysign unit test - // success where there might not be enough time for gossip to propagate before the keygen starts. - pd.gossipPeers(ctx) - time.Sleep(1 * time.Second) - pd.gossipPeers(ctx) - // Start periodic gossip - go pd.startGossip(ctx) -} - -func (pd *PeerDiscovery) Stop() { - close(pd.closeChan) -} - -// addPeer adds a peer to known peers -func (pd *PeerDiscovery) addPeer(pinfo peer.AddrInfo) { - pd.mu.Lock() - defer pd.mu.Unlock() - - if pinfo.ID == pd.host.ID() { - return // Don't add ourselves - } - oldPinfo, ok := pd.knownPeers[pinfo.ID] - if ok { - for _, addr := range pinfo.Addrs { - if !multiaddr.Contains(oldPinfo.Addrs, addr) { - oldPinfo.Addrs = append(oldPinfo.Addrs, addr) - } - } - } else { - oldPinfo = pinfo - } - pd.knownPeers[pinfo.ID] = oldPinfo -} - -// GetPeers returns all known peers -func (pd *PeerDiscovery) GetPeers() []peer.AddrInfo { - pd.mu.RLock() - defer pd.mu.RUnlock() - - peers := make([]peer.AddrInfo, 0, len(pd.knownPeers)) - for _, p := range pd.knownPeers { - peers = append(peers, p) - } - return peers -} - -// handleDiscovery handles incoming discovery streams -func (pd *PeerDiscovery) handleDiscovery(s network.Stream) { - pd.logger.Debug(). - Stringer("from_peer", s.Conn().RemotePeer()). - Msgf("Received discovery stream") - defer s.Close() - - ma := s.Conn().RemoteMultiaddr() - - ai := peer.AddrInfo{ - ID: s.Conn().RemotePeer(), - Addrs: []multiaddr.Multiaddr{ma}, - } - pd.addPeer(ai) - - // Share our known peers - peers := pd.GetPeers() - data, err := json.Marshal(peers) - if err != nil { - pd.logger.Error().Err(err).Msgf("Failed to marshal peers") - return - } - _, err = s.Write(data) - if err != nil { - pd.logger.Error().Err(err).Msgf("Failed to write to stream") - } -} - -// startGossip periodically shares peer information -func (pd *PeerDiscovery) startGossip(ctx context.Context) { - ticker := time.NewTicker(GossipInterval) - defer ticker.Stop() - - for { - select { - case _, ok := <-pd.closeChan: - if !ok { - pd.logger.Info().Msg("Peer discovery stopped") - return - } - pd.logger.Warn().Msg("Should not receive from closed channel!") - case <-ctx.Done(): - return - case <-ticker.C: - pd.gossipPeers(ctx) - } - } -} - -func (pd *PeerDiscovery) gossipPeers(ctx context.Context) { - pd.logger.Debug().Msgf("Gossiping known peers") - peers := pd.GetPeers() - pd.logger.Debug(). - Array("peers", zerolog.Arr().Interface(peers)). - Msgf("current peers") - - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - var wg sync.WaitGroup - sem := make(chan struct{}, MaxGossipConcurrency) // Limit concurrency - - for _, p := range peers { - if p.ID == pd.host.ID() { - continue - } - - sem <- struct{}{} - wg.Add(1) - go func(p peer.AddrInfo) { - defer wg.Done() - defer func() { <-sem }() - - err := pd.host.Connect(ctx, p) - if err != nil { - pd.logger.Error().Err(err). - Stringer("to", p.ID). - Msg("Failed to connect to peer") - return - } - pd.logger.Debug(). - Stringer("to", p). - Msg("Connected to peer") - - // Open discovery stream - s, err := pd.host.NewStream(ctx, p.ID, DiscoveryProtocol) - if err != nil { - pd.logger.Error().Err(err). - Stringer("to", p). - Msg("Failed to open discovery stream to peer") - return - } - defer s.Close() - pd.logger.Debug(). - Stringer("to", p). - Msg("Opened discovery stream to peer") - - // Read peer info from stream - // This is a simplified example - implement proper serialization - limitedReader := io.LimitReader(s, 1<<20) // Limit to 1MB - buf, err := io.ReadAll(limitedReader) - if err != nil { - pd.logger.Error().Err(err). - Stringer("from", p). - Msg("Failed to read from stream") - return - } - pd.logger.Debug().Msgf("Received peer data: %s", string(buf)) - - // Parse received peer info and add to known peers - var recvPeers []peer.AddrInfo - err = json.Unmarshal(buf, &recvPeers) - if err != nil { - pd.logger.Error().Err(err). - Stringer("from", p). - Msg("Failed to unmarshal peer data received") - return - } - for _, p := range recvPeers { - pd.logger.Debug(). - Stringer("peer", p). - Msg("Adding peer") - pd.addPeer(p) - } - }(p) - } - wg.Wait() -} diff --git a/p2p/discovery_test.go b/p2p/discovery_test.go deleted file mode 100644 index 405c48e..0000000 --- a/p2p/discovery_test.go +++ /dev/null @@ -1,93 +0,0 @@ -package p2p - -import ( - "crypto/rand" - "encoding/base64" - "testing" - "time" - - "github.com/libp2p/go-libp2p/core/crypto" - "github.com/libp2p/go-libp2p/core/peer" - maddr "github.com/multiformats/go-multiaddr" - "github.com/stretchr/testify/assert" -) - -func TestDiscovery(t *testing.T) { - OldGossipInterval := GossipInterval - defer func() { - GossipInterval = OldGossipInterval - }() - GossipInterval = 1 * time.Second - bootstrapPeer := "/ip4/127.0.0.1/tcp/2220/p2p/16Uiu2HAm4TmEzUqy3q3Dv7HvdoSboHk5sFj2FH3npiN5vDbJC6gh" - bootstrapPeerID, err := peer.Decode("16Uiu2HAm4TmEzUqy3q3Dv7HvdoSboHk5sFj2FH3npiN5vDbJC6gh") - - bootstrapPrivKey := "6LABmWB4iXqkqOJ9H0YFEA2CSSx6bA7XAKGyI/TDtas=" - externalIP := "127.0.0.1" - var whitelistedPeers []peer.ID - - //fakeExternalMultiAddr := "/ip4/127.0.0.1/tcp/2220" - validMultiAddr, err := maddr.NewMultiaddr(bootstrapPeer) - assert.NoError(t, err) - privKey, err := base64.StdEncoding.DecodeString(bootstrapPrivKey) - assert.NoError(t, err) - - sk1, _, err := crypto.GenerateSecp256k1Key(rand.Reader) - sk1raw, _ := sk1.Raw() - assert.NoError(t, err) - - sk2, _, err := crypto.GenerateSecp256k1Key(rand.Reader) - assert.NoError(t, err) - sk2raw, _ := sk2.Raw() - sk3, _, err := crypto.GenerateSecp256k1Key(rand.Reader) - assert.NoError(t, err) - sk3raw, _ := sk3.Raw() - id1, err := peer.IDFromPrivateKey(sk1) - id2, err := peer.IDFromPrivateKey(sk2) - id3, err := peer.IDFromPrivateKey(sk3) - whitelistedPeers = append(whitelistedPeers, id1, id2, id3, bootstrapPeerID) - - comm, err := NewCommunication(nil, 2220, externalIP, whitelistedPeers) - assert.NoError(t, err) - assert.NoError(t, comm.Start(privKey)) - defer comm.Stop() - - comm2, err := NewCommunication([]maddr.Multiaddr{validMultiAddr}, 2221, externalIP, whitelistedPeers) - assert.NoError(t, err) - err = comm2.Start(sk1raw) - assert.NoError(t, err) - defer comm2.Stop() - - // we connect to an invalid peer and see - - //id, err := peer.IDFromPrivateKey(sk2) - //assert.NoError(t, err) - //invalidAddr := "/ip4/127.0.0.1/tcp/2220/p2p/" + id.String() - //invalidMultiAddr, err := maddr.NewMultiaddr(invalidAddr) - assert.NoError(t, err) - comm3, err := NewCommunication([]maddr.Multiaddr{validMultiAddr}, 2222, externalIP, whitelistedPeers) - assert.NoError(t, err) - err = comm3.Start(sk2raw) - defer comm3.Stop() - - // we connect to one invalid and one valid address - - comm4, err := NewCommunication([]maddr.Multiaddr{validMultiAddr}, 2223, externalIP, whitelistedPeers) - assert.NoError(t, err) - err = comm4.Start(sk3raw) - assert.NoError(t, err) - defer comm4.Stop() - - time.Sleep(5 * time.Second) - - assert.Equal(t, 4, len(comm.host.Peerstore().Peers())) - assert.Equal(t, 4, len(comm2.host.Peerstore().Peers())) - assert.Equal(t, 4, len(comm3.host.Peerstore().Peers())) - assert.Equal(t, 4, len(comm4.host.Peerstore().Peers())) - - comm.discovery.mu.Lock() - assert.Equal(t, 3, len(comm.discovery.knownPeers)) - for peer, knownPeers := range comm.discovery.knownPeers { - assert.LessOrEqual(t, len(knownPeers.Addrs), 4, "%s has more than 4 addresses (%d)?", peer.String(), len(knownPeers.Addrs)) - } - comm.discovery.mu.Unlock() -} diff --git a/tss/tss_4nodes_test.go b/tss/tss_4nodes_test.go index 3ecf985..899b09d 100644 --- a/tss/tss_4nodes_test.go +++ b/tss/tss_4nodes_test.go @@ -6,6 +6,7 @@ import ( "encoding/base64" "encoding/hex" "encoding/json" + "fmt" "os" "path" "strconv" @@ -61,15 +62,25 @@ func TestPackage(t *testing.T) { } type FourNodeTestSuite struct { - servers []*TssServer - ports []int - preParams []*btsskeygen.LocalPreParams - bootstrapPeer string - tssConfig common.TssConfig + servers []*TssServer + ports []int + preParams []*btsskeygen.LocalPreParams + bootstrapPeers []maddr.Multiaddr + tssConfig common.TssConfig } var _ = Suite(&FourNodeTestSuite{}) +func (s *FourNodeTestSuite) setupBootstrapPeers(c *C) { + for i := 0; i < len(s.ports); i++ { + peerId, err := conversion.Bech32PubkeyToPeerID(testPubKeys[i]) + c.Assert(err, IsNil) + peerAddr, err := maddr.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d/p2p/%s", s.ports[i], peerId.String())) + c.Assert(err, IsNil) + s.bootstrapPeers = append(s.bootstrapPeers, peerAddr) + } +} + // setup four nodes for test func (s *FourNodeTestSuite) SetUpTest(c *C) { common.InitLog("info", true, "four_nodes_test") @@ -77,7 +88,7 @@ func (s *FourNodeTestSuite) SetUpTest(c *C) { s.ports = []int{ 16666, 16667, 16668, 16669, } - s.bootstrapPeer = "/ip4/127.0.0.1/tcp/16666/p2p/16Uiu2HAmACG5DtqmQsHtXg4G2sLS65ttv84e7MrL4kapkjfmhxAp" + s.setupBootstrapPeers(c) s.preParams = getPreparams(c) s.servers = make([]*TssServer, partyNum) s.tssConfig = common.TssConfig{ @@ -93,9 +104,9 @@ func (s *FourNodeTestSuite) SetUpTest(c *C) { go func(idx int) { defer wg.Done() if idx == 0 { - s.servers[idx] = s.getTssServer(c, idx, s.tssConfig, "") + s.servers[idx] = s.getTssServer(c, idx, s.tssConfig) } else { - s.servers[idx] = s.getTssServer(c, idx, s.tssConfig, s.bootstrapPeer) + s.servers[idx] = s.getTssServer(c, idx, s.tssConfig) } }(i) @@ -315,9 +326,9 @@ func (s *FourNodeTestSuite) doTestBlame(c *C, version string, algo common.Algo) if shutdownIdx == 0 { // don't use a boostrap peer if we are shutting down the first server b/c the first // server is the bootstrap peer, so it doesn't work - s.servers[shutdownIdx] = s.getTssServer(c, shutdownIdx, s.tssConfig, "") + s.servers[shutdownIdx] = s.getTssServer(c, shutdownIdx, s.tssConfig) } else { - s.servers[shutdownIdx] = s.getTssServer(c, shutdownIdx, s.tssConfig, s.bootstrapPeer) + s.servers[shutdownIdx] = s.getTssServer(c, shutdownIdx, s.tssConfig) } c.Assert(s.servers[shutdownIdx].Start(), IsNil) @@ -354,7 +365,7 @@ func (s *FourNodeTestSuite) TearDownTest(c *C) { } } -func (s *FourNodeTestSuite) getTssServer(c *C, index int, conf common.TssConfig, bootstrap string) *TssServer { +func (s *FourNodeTestSuite) getTssServer(c *C, index int, conf common.TssConfig) *TssServer { priKey, err := conversion.GetPriKey(testPriKeyArr[index]) c.Assert(err, IsNil) baseHome := path.Join(os.TempDir(), "4nodes_test", strconv.Itoa(index)) @@ -362,21 +373,13 @@ func (s *FourNodeTestSuite) getTssServer(c *C, index int, conf common.TssConfig, err := os.MkdirAll(baseHome, os.ModePerm) c.Assert(err, IsNil) } - var peerIDs []maddr.Multiaddr - if len(bootstrap) > 0 { - multiAddr, err := maddr.NewMultiaddr(bootstrap) - c.Assert(err, IsNil) - peerIDs = []maddr.Multiaddr{multiAddr} - } else { - peerIDs = nil - } whitelistedPeers := []peer.ID{} for _, pk := range testPubKeys { peer, err := conversion.Bech32PubkeyToPeerID(pk) c.Assert(err, IsNil) whitelistedPeers = append(whitelistedPeers, peer) } - instance, err := NewTss(peerIDs, s.ports[index], priKey, baseHome, conf, s.preParams[index], "", "password", whitelistedPeers) + instance, err := NewTss(s.bootstrapPeers, s.ports[index], priKey, baseHome, conf, s.preParams[index], "", "password", whitelistedPeers) c.Assert(err, IsNil) return instance } diff --git a/tss/tss_4nodes_zeta_test.go b/tss/tss_4nodes_zeta_test.go index 8b2b087..ef629e5 100644 --- a/tss/tss_4nodes_zeta_test.go +++ b/tss/tss_4nodes_zeta_test.go @@ -4,6 +4,7 @@ import ( "crypto/rand" "crypto/sha256" "encoding/base64" + "fmt" "io" "os" "path" @@ -25,18 +26,28 @@ import ( ) type FourNodeScaleZetaSuite struct { - servers []*TssServer - ports []int - preParams []*btsskeygen.LocalPreParams - bootstrapPeer string - tssConfig common.TssConfig - poolPublicKey string - tmpDir string + servers []*TssServer + ports []int + preParams []*btsskeygen.LocalPreParams + bootstrapPeers []maddr.Multiaddr + tssConfig common.TssConfig + poolPublicKey string + tmpDir string } // Run with go test -v -gocheck.vv -gocheck.f FourNodeScaleZetaSuite . var _ = Suite(&FourNodeScaleZetaSuite{}) +func (s *FourNodeScaleZetaSuite) setupBootstrapPeers(c *C) { + for i := 0; i < len(s.ports); i++ { + peerId, err := conversion.Bech32PubkeyToPeerID(testPubKeys[i]) + c.Assert(err, IsNil) + peerAddr, err := maddr.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d/p2p/%s", s.ports[i], peerId.String())) + c.Assert(err, IsNil) + s.bootstrapPeers = append(s.bootstrapPeers, peerAddr) + } +} + // setup four nodes for test func (s *FourNodeScaleZetaSuite) SetUpSuite(c *C) { common.InitLog("info", true, "four_nodes_zeta_test") @@ -46,7 +57,7 @@ func (s *FourNodeScaleZetaSuite) SetUpSuite(c *C) { s.ports = []int{ 17666, 17667, 17668, 17669, } - s.bootstrapPeer = "/ip4/127.0.0.1/tcp/17666/p2p/16Uiu2HAmACG5DtqmQsHtXg4G2sLS65ttv84e7MrL4kapkjfmhxAp" + s.setupBootstrapPeers(c) s.preParams = getPreparams(c) s.servers = make([]*TssServer, partyNum) s.tssConfig = common.TssConfig{ @@ -62,9 +73,9 @@ func (s *FourNodeScaleZetaSuite) SetUpSuite(c *C) { go func(idx int) { defer wg.Done() if idx == 0 { - s.servers[idx] = s.getTssServer(c, idx, s.tssConfig, "") + s.servers[idx] = s.getTssServer(c, idx, s.tssConfig) } else { - s.servers[idx] = s.getTssServer(c, idx, s.tssConfig, s.bootstrapPeer) + s.servers[idx] = s.getTssServer(c, idx, s.tssConfig) } }(i) @@ -219,7 +230,7 @@ func (s *FourNodeScaleZetaSuite) TearDownSuite(c *C) { os.RemoveAll(s.tmpDir) } -func (s *FourNodeScaleZetaSuite) getTssServer(c *C, index int, conf common.TssConfig, bootstrap string) *TssServer { +func (s *FourNodeScaleZetaSuite) getTssServer(c *C, index int, conf common.TssConfig) *TssServer { priKey, err := conversion.GetPriKey(testPriKeyArr[index]) c.Assert(err, IsNil) baseHome := path.Join(s.tmpDir, strconv.Itoa(index)) @@ -227,21 +238,13 @@ func (s *FourNodeScaleZetaSuite) getTssServer(c *C, index int, conf common.TssCo err := os.MkdirAll(baseHome, os.ModePerm) c.Assert(err, IsNil) } - var peerIDs []maddr.Multiaddr - if len(bootstrap) > 0 { - multiAddr, err := maddr.NewMultiaddr(bootstrap) - c.Assert(err, IsNil) - peerIDs = []maddr.Multiaddr{multiAddr} - } else { - peerIDs = nil - } whitelistedPeers := []peer.ID{} for _, pk := range testPubKeys { peer, err := conversion.Bech32PubkeyToPeerID(pk) c.Assert(err, IsNil) whitelistedPeers = append(whitelistedPeers, peer) } - instance, err := NewTss(peerIDs, s.ports[index], priKey, baseHome, conf, s.preParams[index], "", "password", whitelistedPeers) + instance, err := NewTss(s.bootstrapPeers, s.ports[index], priKey, baseHome, conf, s.preParams[index], "", "password", whitelistedPeers) c.Assert(err, IsNil) return instance }