From 05876ac6d66af8cb87162a4d89335017fc2cadf3 Mon Sep 17 00:00:00 2001 From: Alex Gartner Date: Tue, 10 Sep 2024 14:19:49 -0700 Subject: [PATCH] Cherry pick changes from #1 and #7 (#27) --- p2p/communication.go | 9 ++++++--- p2p/party_coordinator.go | 10 ++++++---- tss/keysign.go | 2 ++ 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/p2p/communication.go b/p2p/communication.go index c00a3fc..81ab547 100644 --- a/p2p/communication.go +++ b/p2p/communication.go @@ -177,13 +177,16 @@ func (c *Communication) readFromStream(stream network.Stream) { if nil == channel { c.logger.Debug().Msgf("no MsgID %s found for this message", wrappedMsg.MsgID) c.logger.Debug().Msgf("no MsgID %s found for this message", wrappedMsg.MessageType) - _ = stream.Close() + _ = stream.Reset() return } c.streamMgr.AddStream(wrappedMsg.MsgID, stream) - channel <- &Message{ + select { + case <-time.After(10 * time.Second): + c.logger.Warn().Msgf("timeout to send message to channel: protocol ID: %s, msg type %s, peer ID %s", stream.Protocol(), wrappedMsg.MessageType.String(), peerID) + case channel <- &Message{ PeerID: stream.Conn().RemotePeer(), - Payload: dataBuf, + Payload: dataBuf}: } } diff --git a/p2p/party_coordinator.go b/p2p/party_coordinator.go index f85c9b1..414d6a8 100644 --- a/p2p/party_coordinator.go +++ b/p2p/party_coordinator.go @@ -72,7 +72,7 @@ func (pc *PartyCoordinator) processRespMsg(respMsg *messages.JoinPartyLeaderComm pc.joinPartyGroupLock.Unlock() if !ok { pc.logger.Info().Msgf("message ID from peer(%s) can not be found", remotePeer) - _ = stream.Close() + _ = stream.Reset() return } pc.streamMgr.AddStream(respMsg.ID, stream) @@ -95,6 +95,7 @@ func (pc *PartyCoordinator) processReqMsg(requestMsg *messages.JoinPartyLeaderCo peerGroup, ok := pc.peersGroup[requestMsg.ID] pc.joinPartyGroupLock.Unlock() if !ok { + _ = stream.Reset() pc.logger.Info().Msg("this party is not ready") return } @@ -130,6 +131,7 @@ func (pc *PartyCoordinator) HandleStream(stream network.Stream) { peerGroup, ok := pc.peersGroup[msg.ID] pc.joinPartyGroupLock.Unlock() if !ok { + _ = stream.Reset() pc.logger.Info().Msg("this party is not ready") return } @@ -180,7 +182,7 @@ func (pc *PartyCoordinator) HandleStreamWithLeader(stream network.Stream) { } } -func (pc *PartyCoordinator) removePeerGroup(messageID string) { +func (pc *PartyCoordinator) RemovePeerGroup(messageID string) { pc.joinPartyGroupLock.Lock() defer pc.joinPartyGroupLock.Unlock() delete(pc.peersGroup, messageID) @@ -445,7 +447,7 @@ func (pc *PartyCoordinator) JoinPartyWithLeader(msgID string, blockHeight int64, pc.logger.Error().Err(err).Msg("error creating peerStatus") return nil, leader, err } - defer pc.removePeerGroup(msgID) + defer pc.RemovePeerGroup(msgID) if pc.host.ID() == leaderID { onlines, err := pc.joinPartyLeader(msgID, peerGroup, sigChan) @@ -477,7 +479,7 @@ func (pc *PartyCoordinator) JoinPartyWithRetry(msgID string, peers []string) ([] pc.logger.Error().Err(err).Msg("fail to create the join party group") return nil, err } - defer pc.removePeerGroup(msg.ID) + defer pc.RemovePeerGroup(msg.ID) _, offline := peerGroup.getPeersStatus() var wg sync.WaitGroup done := make(chan struct{}) diff --git a/tss/keysign.go b/tss/keysign.go index 9dcba9a..6d723c6 100644 --- a/tss/keysign.go +++ b/tss/keysign.go @@ -219,6 +219,8 @@ func (t *TssServer) KeySign(req keysign.Request) (keysign.Response, error) { t.p2pCommunication.ReleaseStream(msgID) t.signatureNotifier.ReleaseStream(msgID) t.partyCoordinator.ReleaseStream(msgID) + + t.partyCoordinator.RemovePeerGroup(msgID) }() localStateItem, err := t.stateManager.GetLocalState(req.PoolPubKey)