Skip to content

Commit

Permalink
Cherry pick changes from #1 and #7 (#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
gartnera authored Sep 10, 2024
1 parent 9f5ae81 commit 05876ac
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 7 deletions.
9 changes: 6 additions & 3 deletions p2p/communication.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,16 @@ func (c *Communication) readFromStream(stream network.Stream) {
if nil == channel {
c.logger.Debug().Msgf("no MsgID %s found for this message", wrappedMsg.MsgID)
c.logger.Debug().Msgf("no MsgID %s found for this message", wrappedMsg.MessageType)
_ = stream.Close()
_ = stream.Reset()
return
}
c.streamMgr.AddStream(wrappedMsg.MsgID, stream)
channel <- &Message{
select {
case <-time.After(10 * time.Second):
c.logger.Warn().Msgf("timeout to send message to channel: protocol ID: %s, msg type %s, peer ID %s", stream.Protocol(), wrappedMsg.MessageType.String(), peerID)
case channel <- &Message{
PeerID: stream.Conn().RemotePeer(),
Payload: dataBuf,
Payload: dataBuf}:
}

}
Expand Down
10 changes: 6 additions & 4 deletions p2p/party_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (pc *PartyCoordinator) processRespMsg(respMsg *messages.JoinPartyLeaderComm
pc.joinPartyGroupLock.Unlock()
if !ok {
pc.logger.Info().Msgf("message ID from peer(%s) can not be found", remotePeer)
_ = stream.Close()
_ = stream.Reset()
return
}
pc.streamMgr.AddStream(respMsg.ID, stream)
Expand All @@ -95,6 +95,7 @@ func (pc *PartyCoordinator) processReqMsg(requestMsg *messages.JoinPartyLeaderCo
peerGroup, ok := pc.peersGroup[requestMsg.ID]
pc.joinPartyGroupLock.Unlock()
if !ok {
_ = stream.Reset()
pc.logger.Info().Msg("this party is not ready")
return
}
Expand Down Expand Up @@ -130,6 +131,7 @@ func (pc *PartyCoordinator) HandleStream(stream network.Stream) {
peerGroup, ok := pc.peersGroup[msg.ID]
pc.joinPartyGroupLock.Unlock()
if !ok {
_ = stream.Reset()
pc.logger.Info().Msg("this party is not ready")
return
}
Expand Down Expand Up @@ -180,7 +182,7 @@ func (pc *PartyCoordinator) HandleStreamWithLeader(stream network.Stream) {
}
}

func (pc *PartyCoordinator) removePeerGroup(messageID string) {
func (pc *PartyCoordinator) RemovePeerGroup(messageID string) {
pc.joinPartyGroupLock.Lock()
defer pc.joinPartyGroupLock.Unlock()
delete(pc.peersGroup, messageID)
Expand Down Expand Up @@ -445,7 +447,7 @@ func (pc *PartyCoordinator) JoinPartyWithLeader(msgID string, blockHeight int64,
pc.logger.Error().Err(err).Msg("error creating peerStatus")
return nil, leader, err
}
defer pc.removePeerGroup(msgID)
defer pc.RemovePeerGroup(msgID)

if pc.host.ID() == leaderID {
onlines, err := pc.joinPartyLeader(msgID, peerGroup, sigChan)
Expand Down Expand Up @@ -477,7 +479,7 @@ func (pc *PartyCoordinator) JoinPartyWithRetry(msgID string, peers []string) ([]
pc.logger.Error().Err(err).Msg("fail to create the join party group")
return nil, err
}
defer pc.removePeerGroup(msg.ID)
defer pc.RemovePeerGroup(msg.ID)
_, offline := peerGroup.getPeersStatus()
var wg sync.WaitGroup
done := make(chan struct{})
Expand Down
2 changes: 2 additions & 0 deletions tss/keysign.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ func (t *TssServer) KeySign(req keysign.Request) (keysign.Response, error) {
t.p2pCommunication.ReleaseStream(msgID)
t.signatureNotifier.ReleaseStream(msgID)
t.partyCoordinator.ReleaseStream(msgID)

t.partyCoordinator.RemovePeerGroup(msgID)
}()

localStateItem, err := t.stateManager.GetLocalState(req.PoolPubKey)
Expand Down

0 comments on commit 05876ac

Please sign in to comment.