From 35133905c75f1d68ecfe0cb873e5468a4b94d801 Mon Sep 17 00:00:00 2001 From: Keenan Nemetz Date: Wed, 12 Jun 2024 20:10:42 -0700 Subject: [PATCH] revert skipWaitForMerge. use peer info from state --- tests/clients/cli/wrapper.go | 4 ++-- tests/integration/p2p.go | 29 ++++++++--------------------- 2 files changed, 10 insertions(+), 23 deletions(-) diff --git a/tests/clients/cli/wrapper.go b/tests/clients/cli/wrapper.go index 501557b391..1681dd6e7b 100644 --- a/tests/clients/cli/wrapper.go +++ b/tests/clients/cli/wrapper.go @@ -65,11 +65,11 @@ func (w *Wrapper) PeerInfo() peer.AddrInfo { data, err := w.cmd.execute(context.Background(), args) if err != nil { - panic(fmt.Sprintf("failed to get peer info: %v %s", err, data)) + panic(fmt.Sprintf("failed to get peer info: %v", err)) } var info peer.AddrInfo if err := json.Unmarshal(data, &info); err != nil { - panic(fmt.Sprintf("failed to get peer info: %v %s", err, data)) + panic(fmt.Sprintf("failed to get peer info: %v", err)) } return info } diff --git a/tests/integration/p2p.go b/tests/integration/p2p.go index 7a2d22a9ef..d990a3d322 100644 --- a/tests/integration/p2p.go +++ b/tests/integration/p2p.go @@ -175,7 +175,6 @@ func setupPeerWaitSync( nodeCollections := map[int][]int{} waitIndex := 0 - skipWaitForMerge := true for i := startIndex; i < len(s.testCase.Actions); i++ { switch action := s.testCase.Actions[i].(type) { case SubscribeToCollection: @@ -248,18 +247,14 @@ func setupPeerWaitSync( case WaitForSync: waitIndex += 1 - skipWaitForMerge = false targetToSourceEvents = append(targetToSourceEvents, 0) sourceToTargetEvents = append(sourceToTargetEvents, 0) } } - // skip waiting for a merge if we aren't interested in waiting for a sync to complete - if !skipWaitForMerge { - nodeSynced := make(chan struct{}) - go waitForMerge(s, cfg.SourceNodeID, cfg.TargetNodeID, sourceToTargetEvents, targetToSourceEvents, nodeSynced) - s.syncChans = append(s.syncChans, nodeSynced) - } + nodeSynced := make(chan struct{}) + go waitForMerge(s, cfg.SourceNodeID, cfg.TargetNodeID, sourceToTargetEvents, targetToSourceEvents, nodeSynced) + s.syncChans = append(s.syncChans, nodeSynced) } // collectionSubscribedTo returns true if the collection on the given node @@ -328,7 +323,6 @@ func setupReplicatorWaitSync( docIDsSyncedToSource := map[int]struct{}{} waitIndex := 0 currentDocID := 0 - skipWaitForMerge := true for i := startIndex; i < len(s.testCase.Actions); i++ { switch action := s.testCase.Actions[i].(type) { case CreateDoc: @@ -366,18 +360,14 @@ func setupReplicatorWaitSync( case WaitForSync: waitIndex += 1 - skipWaitForMerge = false targetToSourceEvents = append(targetToSourceEvents, 0) sourceToTargetEvents = append(sourceToTargetEvents, 0) } } - // skip waiting for a merge if we aren't interested in waiting for a sync to complete - if !skipWaitForMerge { - nodeSynced := make(chan struct{}) - go waitForMerge(s, cfg.SourceNodeID, cfg.TargetNodeID, sourceToTargetEvents, targetToSourceEvents, nodeSynced) - s.syncChans = append(s.syncChans, nodeSynced) - } + nodeSynced := make(chan struct{}) + go waitForMerge(s, cfg.SourceNodeID, cfg.TargetNodeID, sourceToTargetEvents, targetToSourceEvents, nodeSynced) + s.syncChans = append(s.syncChans, nodeSynced) } // subscribeToCollection sets up a collection subscription on the given node/collection. @@ -511,14 +501,11 @@ func waitForMerge( targetToSourceEvents []int, nodeSynced chan struct{}, ) { - sourceNode := s.nodes[sourceNodeID] - targetNode := s.nodes[targetNodeID] - sourceSub := s.eventSubs[sourceNodeID] targetSub := s.eventSubs[targetNodeID] - sourcePeerInfo := sourceNode.PeerInfo() - targetPeerInfo := targetNode.PeerInfo() + sourcePeerInfo := s.nodeAddresses[sourceNodeID] + targetPeerInfo := s.nodeAddresses[targetNodeID] for waitIndex := 0; waitIndex < len(sourceToTargetEvents); waitIndex++ { for i := 0; i < targetToSourceEvents[waitIndex]; i++ {