Skip to content

Commit

Permalink
revert skipWaitForMerge. use peer info from state
Browse files Browse the repository at this point in the history
  • Loading branch information
nasdf committed Jun 13, 2024
1 parent 0357385 commit 3513390
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 23 deletions.
4 changes: 2 additions & 2 deletions tests/clients/cli/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ func (w *Wrapper) PeerInfo() peer.AddrInfo {

data, err := w.cmd.execute(context.Background(), args)
if err != nil {
panic(fmt.Sprintf("failed to get peer info: %v %s", err, data))
panic(fmt.Sprintf("failed to get peer info: %v", err))
}
var info peer.AddrInfo
if err := json.Unmarshal(data, &info); err != nil {
panic(fmt.Sprintf("failed to get peer info: %v %s", err, data))
panic(fmt.Sprintf("failed to get peer info: %v", err))
}
return info
}
Expand Down
29 changes: 8 additions & 21 deletions tests/integration/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ func setupPeerWaitSync(

nodeCollections := map[int][]int{}
waitIndex := 0
skipWaitForMerge := true
for i := startIndex; i < len(s.testCase.Actions); i++ {
switch action := s.testCase.Actions[i].(type) {
case SubscribeToCollection:
Expand Down Expand Up @@ -248,18 +247,14 @@ func setupPeerWaitSync(

case WaitForSync:
waitIndex += 1
skipWaitForMerge = false
targetToSourceEvents = append(targetToSourceEvents, 0)
sourceToTargetEvents = append(sourceToTargetEvents, 0)
}
}

// skip waiting for a merge if we aren't interested in waiting for a sync to complete
if !skipWaitForMerge {
nodeSynced := make(chan struct{})
go waitForMerge(s, cfg.SourceNodeID, cfg.TargetNodeID, sourceToTargetEvents, targetToSourceEvents, nodeSynced)
s.syncChans = append(s.syncChans, nodeSynced)
}
nodeSynced := make(chan struct{})
go waitForMerge(s, cfg.SourceNodeID, cfg.TargetNodeID, sourceToTargetEvents, targetToSourceEvents, nodeSynced)
s.syncChans = append(s.syncChans, nodeSynced)
}

// collectionSubscribedTo returns true if the collection on the given node
Expand Down Expand Up @@ -328,7 +323,6 @@ func setupReplicatorWaitSync(
docIDsSyncedToSource := map[int]struct{}{}
waitIndex := 0
currentDocID := 0
skipWaitForMerge := true
for i := startIndex; i < len(s.testCase.Actions); i++ {
switch action := s.testCase.Actions[i].(type) {
case CreateDoc:
Expand Down Expand Up @@ -366,18 +360,14 @@ func setupReplicatorWaitSync(

case WaitForSync:
waitIndex += 1
skipWaitForMerge = false
targetToSourceEvents = append(targetToSourceEvents, 0)
sourceToTargetEvents = append(sourceToTargetEvents, 0)
}
}

// skip waiting for a merge if we aren't interested in waiting for a sync to complete
if !skipWaitForMerge {
nodeSynced := make(chan struct{})
go waitForMerge(s, cfg.SourceNodeID, cfg.TargetNodeID, sourceToTargetEvents, targetToSourceEvents, nodeSynced)
s.syncChans = append(s.syncChans, nodeSynced)
}
nodeSynced := make(chan struct{})
go waitForMerge(s, cfg.SourceNodeID, cfg.TargetNodeID, sourceToTargetEvents, targetToSourceEvents, nodeSynced)
s.syncChans = append(s.syncChans, nodeSynced)
}

// subscribeToCollection sets up a collection subscription on the given node/collection.
Expand Down Expand Up @@ -511,14 +501,11 @@ func waitForMerge(
targetToSourceEvents []int,
nodeSynced chan struct{},
) {
sourceNode := s.nodes[sourceNodeID]
targetNode := s.nodes[targetNodeID]

sourceSub := s.eventSubs[sourceNodeID]
targetSub := s.eventSubs[targetNodeID]

sourcePeerInfo := sourceNode.PeerInfo()
targetPeerInfo := targetNode.PeerInfo()
sourcePeerInfo := s.nodeAddresses[sourceNodeID]
targetPeerInfo := s.nodeAddresses[targetNodeID]

for waitIndex := 0; waitIndex < len(sourceToTargetEvents); waitIndex++ {
for i := 0; i < targetToSourceEvents[waitIndex]; i++ {
Expand Down

0 comments on commit 3513390

Please sign in to comment.