Skip to content

Commit

Permalink
Replace waitforpeertask with ethpeers method (#8009)
Browse files Browse the repository at this point in the history
* 7582: Add waitForPeer method to PeerSelector and EthPeers

Signed-off-by: Matilda Clerke <[email protected]>

* 7582: Replace all usages of WaitForPeer[s]Task with new EthPeers.waitForPeer method

Signed-off-by: Matilda Clerke <[email protected]>

* 7582: Fix PivotBlockConfirmerTest

Signed-off-by: Matilda Clerke <[email protected]>

* 7582: spotless

Signed-off-by: Matilda Clerke <[email protected]>

* 7582: Fix broken PivotBlockRetrieverTest

Signed-off-by: Matilda Clerke <[email protected]>

* 7582: Fix broken FastSyncActionsTest

Signed-off-by: Matilda Clerke <[email protected]>

* 7582: spotless

Signed-off-by: Matilda Clerke <[email protected]>

* 7582: Fix issues after merge

Signed-off-by: Matilda Clerke <[email protected]>

* 7582: Put AbstractSyncTargetManager.waitForPeerAndThenSetSyncTarget code back separate thread to avoid infinite loop waiting for peers during acceptance tests

Signed-off-by: Matilda Clerke <[email protected]>

* 7582: Remove pivot block checks when waiting for peer in FastSyncActions

Signed-off-by: Matilda Clerke <[email protected]>

* 7582: Remove estimated chain height check from PivotBlockConfirmer when waiting for peers

Signed-off-by: Matilda Clerke <[email protected]>

* 7582: Fix broken PivotBlockRetrieverTest

Signed-off-by: Matilda Clerke <[email protected]>

* Use isSuitablePeer as peer selection criteria when waiting for a peer in AbstractRetryingPeerTask

Signed-off-by: Matilda Clerke <[email protected]>

* Remove MetricsSystem from PivotSelectorFromPeers

Signed-off-by: Matilda Clerke <[email protected]>

---------

Signed-off-by: Matilda Clerke <[email protected]>
  • Loading branch information
Matilda-Clerke authored Dec 18, 2024
1 parent 3b4136d commit 320c476
Show file tree
Hide file tree
Showing 18 changed files with 120 additions and 538 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -923,7 +923,6 @@ private PivotBlockSelector createPivotSelector(
ethContext,
syncConfig,
syncState,
metricsSystem,
protocolContext,
nodeKey,
blockchain.getChainHeadHeader());
Expand Down Expand Up @@ -953,7 +952,7 @@ private PivotBlockSelector createPivotSelector(
unsubscribeForkchoiceListener);
} else {
LOG.info("TTD difficulty is not present, creating initial sync phase for PoW");
return new PivotSelectorFromPeers(ethContext, syncConfig, syncState, metricsSystem);
return new PivotSelectorFromPeers(ethContext, syncConfig, syncState);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.hyperledger.besu.ethereum.eth.sync.fastsync.NoSyncRequiredException;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.PivotSelectorFromPeers;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.plugin.services.MetricsSystem;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -56,7 +55,6 @@ public class BFTPivotSelectorFromPeers extends PivotSelectorFromPeers {
* @param ethContext the eth context
* @param syncConfig the sync config
* @param syncState the sync state
* @param metricsSystem the metrics
* @param protocolContext the protocol context
* @param nodeKey the node key
* @param blockHeader the block header
Expand All @@ -65,11 +63,10 @@ public BFTPivotSelectorFromPeers(
final EthContext ethContext,
final SynchronizerConfiguration syncConfig,
final SyncState syncState,
final MetricsSystem metricsSystem,
final ProtocolContext protocolContext,
final NodeKey nodeKey,
final BlockHeader blockHeader) {
super(ethContext, syncConfig, syncState, metricsSystem);
super(ethContext, syncConfig, syncState);
this.protocolContext = protocolContext;
this.blockHeader = blockHeader;
this.nodeKey = nodeKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.NoSyncRequiredException;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.plugin.services.MetricsSystem;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -55,7 +54,6 @@ public class QbftPivotSelectorTest {
@Mock private ProtocolContext protocolContext;
@Mock private BftContext bftContext;
@Mock private SyncState syncState;
@Mock private MetricsSystem metricsSystem;
@Mock private EthContext ethContext;
@Mock private EthPeers ethPeers;
@Mock private ValidatorProvider validatorProvider;
Expand All @@ -80,13 +78,7 @@ public void returnEmptySyncStateIfValidatorWithOtherValidatorsButNoPeers() {
when(validatorProvider.getValidatorsAtHead()).thenReturn(validatorList);
BFTPivotSelectorFromPeers pivotSelector =
new BFTPivotSelectorFromPeers(
ethContext,
syncConfig,
syncState,
metricsSystem,
protocolContext,
nodeKey,
blockHeader);
ethContext, syncConfig, syncState, protocolContext, nodeKey, blockHeader);
Optional<FastSyncState> pivotState = pivotSelector.selectNewPivotBlock();
assertThat(pivotState.isEmpty()).isTrue();
}
Expand All @@ -104,13 +96,7 @@ public void returnNoSyncRequiredIfOnlyValidatorAndNoPeers() {
when(validatorProvider.getValidatorsAtHead()).thenReturn(validatorList);
BFTPivotSelectorFromPeers pivotSelector =
new BFTPivotSelectorFromPeers(
ethContext,
syncConfig,
syncState,
metricsSystem,
protocolContext,
nodeKey,
blockHeader);
ethContext, syncConfig, syncState, protocolContext, nodeKey, blockHeader);

try {
Optional<FastSyncState> pivotState = pivotSelector.selectNewPivotBlock();
Expand All @@ -126,13 +112,7 @@ public void returnEmptySyncStateIfNonValidatorWithNoBestPeer() {
when(validatorProvider.nodeIsValidator(any())).thenReturn(false);
BFTPivotSelectorFromPeers pivotSelector =
new BFTPivotSelectorFromPeers(
ethContext,
syncConfig,
syncState,
metricsSystem,
protocolContext,
nodeKey,
blockHeader);
ethContext, syncConfig, syncState, protocolContext, nodeKey, blockHeader);

Optional<FastSyncState> pivotState = pivotSelector.selectNewPivotBlock();
assertThat(pivotState.isEmpty()).isTrue();
Expand All @@ -145,13 +125,7 @@ public void returnEmptySyncStateIfValidatorAndNotAtGenesisAndOtherValidators() {
when(blockHeader.getNumber()).thenReturn(10L);
BFTPivotSelectorFromPeers pivotSelector =
new BFTPivotSelectorFromPeers(
ethContext,
syncConfig,
syncState,
metricsSystem,
protocolContext,
nodeKey,
blockHeader);
ethContext, syncConfig, syncState, protocolContext, nodeKey, blockHeader);

Optional<FastSyncState> pivotState = pivotSelector.selectNewPivotBlock();
assertThat(pivotState.isEmpty()).isTrue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,40 @@ public Optional<EthPeer> getPeer(final Predicate<EthPeer> filter) {
.min(LEAST_TO_MOST_BUSY);
}

// Part of the PeerSelector interface, to be split apart later
@Override
public CompletableFuture<EthPeer> waitForPeer(final Predicate<EthPeer> filter) {
final CompletableFuture<EthPeer> future = new CompletableFuture<>();
LOG.debug("Waiting for peer matching filter. {} peers currently connected.", peerCount());
// check for an existing peer matching the filter and use that if one is found
Optional<EthPeer> maybePeer = getPeer(filter);
if (maybePeer.isPresent()) {
LOG.debug("Found peer matching filter already connected!");
future.complete(maybePeer.get());
} else {
// no existing peer matches our filter. Subscribe to new connections until we find one
LOG.debug("Subscribing to new peer connections to wait until one matches filter");
final long subscriptionId =
subscribeConnect(
(peer) -> {
if (!future.isDone() && filter.test(peer)) {
LOG.debug("Found new peer matching filter!");
future.complete(peer);
} else {
LOG.debug("New peer does not match filter");
}
});
future.handle(
(peer, throwable) -> {
LOG.debug("Unsubscribing from new peer connections with ID {}", subscriptionId);
unsubscribeConnect(subscriptionId);
return null;
});
}

return future;
}

// Part of the PeerSelector interface, to be split apart later
@Override
public Optional<EthPeer> getPeerByPeerId(final PeerId peerId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.hyperledger.besu.ethereum.p2p.peers.PeerId;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;

/** Selects the EthPeers for the PeerTaskExecutor */
Expand All @@ -31,6 +32,14 @@ public interface PeerSelector {
*/
Optional<EthPeer> getPeer(final Predicate<EthPeer> filter);

/**
* Waits for a peer matching the supplied filter
*
* @param filter a Predicate\<EthPeer\> matching desirable peers
* @return a CompletableFuture into which a peer will be placed
*/
CompletableFuture<EthPeer> waitForPeer(final Predicate<EthPeer> filter);

/**
* Attempts to get the EthPeer identified by peerId
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;

Expand Down Expand Up @@ -129,13 +130,12 @@ protected void handleTaskError(final Throwable error) {
"No useful peer found, wait max 5 seconds for new peer to connect: current peers {}",
ethContext.getEthPeers().peerCount());

final WaitForPeerTask waitTask = WaitForPeerTask.create(ethContext, metricsSystem);
executeSubTask(
() ->
ethContext
.getScheduler()
// wait for a new peer for up to 5 seconds
.timeout(waitTask, Duration.ofSeconds(5))
.getEthPeers()
.waitForPeer(this::isSuitablePeer)
.orTimeout(5, TimeUnit.SECONDS)
// execute the task again
.whenComplete((r, t) -> executeTaskTimed()));
return;
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.WaitForPeerTask;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncTarget;
import org.hyperledger.besu.ethereum.eth.sync.tasks.DetermineCommonAncestorTask;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
Expand Down Expand Up @@ -117,13 +116,16 @@ protected Optional<SyncTarget> finalizeSelectedSyncTarget(final SyncTarget syncT
protected abstract CompletableFuture<Optional<EthPeer>> selectBestAvailableSyncTarget();

private CompletableFuture<SyncTarget> waitForPeerAndThenSetSyncTarget() {
return waitForNewPeer().handle((r, t) -> r).thenCompose((r) -> findSyncTarget());
}

private CompletableFuture<?> waitForNewPeer() {
return ethContext
.getScheduler()
.timeout(WaitForPeerTask.create(ethContext, metricsSystem), Duration.ofSeconds(5));
.scheduleFutureTask(
() ->
ethContext
.getEthPeers()
.waitForPeer((peer) -> true)
.handle((ignored, ignored2) -> null)
.thenCompose((r) -> findSyncTarget()),
Duration.ofSeconds(5));
}

private boolean isCancelled() {
Expand Down
Loading

0 comments on commit 320c476

Please sign in to comment.