Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/improve-btc-monitor' into restap…
Browse files Browse the repository at this point in the history
…i-oracle
  • Loading branch information
HenrikJannsen committed Jul 16, 2024
2 parents 895dd84 + d99e251 commit f81ab85
Show file tree
Hide file tree
Showing 9 changed files with 178 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ public CompletableFuture<Void> start() {
return simpleHttpServer.start()
.thenCompose(nil -> proxySetup.createSocksProxy())
.thenAccept(peerGroupService::applySocks5Proxy)
.thenCompose(nil -> peerGroupService.start())
.thenRun(peerGroupService::connectToAll);
.thenCompose(nil -> peerGroupService.start());
}

public CompletableFuture<Void> shutdown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public BtcNodeMonitorMain(String[] args) {

@Override
public void gracefulShutDown(ResultHandler resultHandler) {
log.info("gracefulShutDown");
btcNodeMonitor.shutdown().join();
System.exit(0);
resultHandler.handleResult();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ public String getAddress() {
}

public String getShortId() {
return getAddress().substring(0, 12) + "...";
String address = getAddress();
int endIndex = Math.min(address.length(), 12);
return address.substring(0, endIndex) + "...";
}

public int getNumConnectionAttempts() {
Expand Down
203 changes: 147 additions & 56 deletions btcnodemonitor/src/main/java/bisq/btcnodemonitor/btc/PeerConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package bisq.btcnodemonitor.btc;

import bisq.common.Timer;
import bisq.common.UserThread;
import bisq.common.util.SingleThreadExecutorUtils;

import org.bitcoinj.core.Context;
Expand All @@ -27,6 +25,8 @@
import org.bitcoinj.core.PeerAddress;
import org.bitcoinj.core.Utils;
import org.bitcoinj.core.VersionMessage;
import org.bitcoinj.core.listeners.PeerConnectedEventListener;
import org.bitcoinj.core.listeners.PeerDisconnectedEventListener;
import org.bitcoinj.net.BlockingClientManager;
import org.bitcoinj.net.ClientConnectionManager;

Expand All @@ -39,6 +39,8 @@

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import lombok.extern.slf4j.Slf4j;
Expand All @@ -55,9 +57,15 @@ public class PeerConnection {
private final int connectTimeoutMillis;
private final int vMinRequiredProtocolVersion;
private final PeerConncetionInfo peerConncetionInfo;
private Optional<Timer> disconnectScheduler = Optional.empty();
private Optional<Timer> reconnectScheduler = Optional.empty();
private final AtomicBoolean shutdownCalled = new AtomicBoolean();
private final ExecutorService connectionExecutor;
private final ExecutorService onConnectionExecutor;
private final ExecutorService onDisConnectionExecutor;
private Optional<PeerConnectedEventListener> peerConnectedEventListener = Optional.empty();
private Optional<PeerDisconnectedEventListener> peerDisconnectedEventListener = Optional.empty();
private Optional<CompletableFuture<Void>> connectAndDisconnectFuture = Optional.empty();
private Optional<CompletableFuture<Void>> innerConnectAndDisconnectFuture = Optional.empty();
private Optional<CompletableFuture<SocketAddress>> openConnectionFuture = Optional.empty();

public PeerConnection(Context context,
PeerConncetionInfo peerConncetionInfo,
Expand All @@ -72,37 +80,110 @@ public PeerConnection(Context context,
this.disconnectIntervalSec = disconnectIntervalSec;
this.reconnectIntervalSec = reconnectIntervalSec;

connectionExecutor = SingleThreadExecutorUtils.getSingleThreadExecutor("connection-" + peerConncetionInfo.getShortId());
onConnectionExecutor = SingleThreadExecutorUtils.getSingleThreadExecutor("onConnection-" + peerConncetionInfo.getShortId());
onDisConnectionExecutor = SingleThreadExecutorUtils.getSingleThreadExecutor("onDisConnection-" + peerConncetionInfo.getShortId());

this.params = context.getParams();
vMinRequiredProtocolVersion = params.getProtocolVersionNum(NetworkParameters.ProtocolVersion.BLOOM_FILTER);
}

public void start() {
CompletableFuture.runAsync(() -> {
do {
connectAndDisconnectFuture = Optional.of(connectAndDisconnect());
try {
connectAndDisconnectFuture.get().join();
} catch (Exception ignore) {
}
}
while (!shutdownCalled.get() && !Thread.currentThread().isInterrupted());
log.info("Exiting startConnectAndDisconnectLoop loop. Expected at shutdown");
}, connectionExecutor);
}

public CompletableFuture<Void> shutdown() {
log.info("Shutdown");
shutdownCalled.set(true);
disconnectScheduler.ifPresent(Timer::stop);
reconnectScheduler.ifPresent(Timer::stop);
peerConncetionInfo.getCurrentConnectionAttempt().ifPresent(connectionAttempt -> {
Peer peer = connectionAttempt.getPeer();
peerDisconnectedEventListener.ifPresent(peer::removeDisconnectedEventListener);
peerDisconnectedEventListener = Optional.empty();
peerConnectedEventListener.ifPresent(peer::removeConnectedEventListener);
peerConnectedEventListener = Optional.empty();
});

return CompletableFuture.runAsync(() -> {
log.info("shutdown {}", peerConncetionInfo);
Context.propagate(context);
disconnect();
peerConncetionInfo.getCurrentConnectionAttempt()
.ifPresent(currentConnectionAttempt -> currentConnectionAttempt.getPeer().close());

connectAndDisconnectFuture.ifPresent(connectFuture -> connectFuture.complete(null));
innerConnectAndDisconnectFuture.ifPresent(connectFuture -> connectFuture.complete(null));

connectionExecutor.shutdownNow();
onConnectionExecutor.shutdownNow();
onDisConnectionExecutor.shutdownNow();
}, SingleThreadExecutorUtils.getSingleThreadExecutor("shutdown-" + peerConncetionInfo.getShortId()));
}

public void connect() {
CompletableFuture.runAsync(() -> {
log.info("connect {}", peerConncetionInfo);
private CompletableFuture<Void> connectAndDisconnect() {
CompletableFuture<Void> future = new CompletableFuture<>();
innerConnectAndDisconnectFuture = Optional.of(CompletableFuture.runAsync(() -> {
log.info("\n>> Connect to {}", peerConncetionInfo.getAddress());
Context.propagate(context);
Peer peer = createPeer(peerConncetionInfo.getPeerAddress());
PeerConncetionInfo.ConnectionAttempt connectionAttempt = peerConncetionInfo.newConnectionAttempt(peer);
long ts = System.currentTimeMillis();
connectionAttempt.setConnectionStartedTs(ts);
try {
peer.addConnectedEventListener((peer1, peerCount) -> {
connectionAttempt.setDurationUntilConnection(System.currentTimeMillis() - ts);
connectionAttempt.setConnectionSuccessTs(System.currentTimeMillis());

peerConnectedEventListener = Optional.of((p, peerCount) -> {
peerConnectedEventListener.ifPresent(peer::removeConnectedEventListener);
peerConnectedEventListener = Optional.empty();
if (shutdownCalled.get()) {
return;
}
try {
log.info("\n## Successfully connected to {}", peer.getAddress());
long now = System.currentTimeMillis();
connectionAttempt.setDurationUntilConnection(now - ts);
connectionAttempt.setConnectionSuccessTs(now);
connectionAttempt.onConnected();
startAutoDisconnectAndReconnect();
});
peer.addDisconnectedEventListener((peer1, peerCount) -> {

try {
Thread.sleep(disconnectIntervalSec * 1000L); // 2 sec
} catch (InterruptedException ignore) {
}
if (shutdownCalled.get()) {
return;
}
log.info("Close peer {}", peer.getAddress());
peer.close();
} catch (Exception exception) {
log.warn("Exception at onConnection handler. {}", exception.toString());
handleException(exception, peer, connectionAttempt, ts, future);
}
});
peer.addConnectedEventListener(onConnectionExecutor, peerConnectedEventListener.get());

peerDisconnectedEventListener = Optional.of((p, peerCount) -> {
// At socket timeouts we get called twice from Bitcoinj
if (peerDisconnectedEventListener.isEmpty()) {
log.error("We got called twice at socketimeout from BitcoinJ and ignore the 2nd call.");
return;
}
peerDisconnectedEventListener.ifPresent(peer::removeDisconnectedEventListener);
peerDisconnectedEventListener = Optional.empty();
if (shutdownCalled.get()) {
return;
}
if (openConnectionFuture.isPresent() && !openConnectionFuture.get().isDone()) {
// BitcoinJ calls onDisconnect at sockettimeout without throwing an error on the open connection future.
openConnectionFuture.get().completeExceptionally(new TimeoutException("Open connection failed due timeout at PeerSocketHandler"));
return;
}
try {
log.info("\n<< Disconnected from {}", peer.getAddress());
long passed = System.currentTimeMillis() - ts;
// Timeout is not handled as error in bitcoinj, but it simply disconnects
// If we had a successful connect before we got versionMessage set, otherwise its from an error.
Expand All @@ -113,52 +194,62 @@ public void connect() {
connectionAttempt.setDurationUntilDisConnection(passed);
connectionAttempt.onDisconnected();
}
startAutoDisconnectAndReconnect();
});
openConnection(peer).join();
try {
Thread.sleep(reconnectIntervalSec * 2000L); // 120 sec
} catch (InterruptedException ignore) {
}
if (shutdownCalled.get()) {
return;
}
future.complete(null);
} catch (Exception exception) {
log.warn("Exception at onDisconnection handler. {}", exception.toString());
handleException(exception, peer, connectionAttempt, ts, future);
}
});
peer.addDisconnectedEventListener(onDisConnectionExecutor, peerDisconnectedEventListener.get());

try {
openConnectionFuture = Optional.of(openConnection(peer));
openConnectionFuture.get().join();
} catch (Exception exception) {
log.warn("Error at opening connection to peer {}", peerConncetionInfo, exception);
connectionAttempt.setDurationUntilFailure(System.currentTimeMillis() - ts);
connectionAttempt.onException(exception);
startAutoDisconnectAndReconnect();
log.warn("Error at opening connection to peer {}. {}", peerConncetionInfo, exception.toString());
handleException(exception, peer, connectionAttempt, ts, future);
}
}, SingleThreadExecutorUtils.getSingleThreadExecutor("connect-" + peerConncetionInfo.getShortId()));
}
}, MoreExecutors.directExecutor()));

private CompletableFuture<Void> disconnect() {
return peerConncetionInfo.getCurrentConnectionAttempt()
.map(currentConnectionAttempt -> CompletableFuture.runAsync(() -> {
log.info("disconnect {}", peerConncetionInfo);
Context.propagate(context);
currentConnectionAttempt.getPeer().close();
},
SingleThreadExecutorUtils.getSingleThreadExecutor("disconnect-" + peerConncetionInfo.getShortId())))
.orElse(CompletableFuture.completedFuture(null));
return future;
}

private void startAutoDisconnectAndReconnect() {
private void handleException(Throwable throwable,
Peer peer,
PeerConncetionInfo.ConnectionAttempt connectionAttempt,
long ts,
CompletableFuture<Void> future) {
peerDisconnectedEventListener.ifPresent(peer::removeDisconnectedEventListener);
peerDisconnectedEventListener = Optional.empty();
peerConnectedEventListener.ifPresent(peer::removeConnectedEventListener);
peerConnectedEventListener = Optional.empty();
if (shutdownCalled.get()) {
return;
}
disconnectScheduler.ifPresent(Timer::stop);
disconnectScheduler = Optional.of(UserThread.runAfter(() -> {
if (shutdownCalled.get()) {
return;
}
disconnect()
.thenRun(() -> {
if (shutdownCalled.get()) {
return;
}
reconnectScheduler.ifPresent(Timer::stop);
reconnectScheduler = Optional.of(UserThread.runAfter(() -> {
if (shutdownCalled.get()) {
return;
}
connect();
}, reconnectIntervalSec));
});
}, disconnectIntervalSec));
connectionAttempt.setDurationUntilFailure(System.currentTimeMillis() - ts);
connectionAttempt.onException(throwable);
try {
// Try disconnect
log.info("Try close peer {}", peer.getAddress());
peer.close();
} catch (Exception ignore) {
}

try {
Thread.sleep(reconnectIntervalSec * 1000L); // 120 sec
} catch (InterruptedException ignore) {
}
if (shutdownCalled.get()) {
return;
}
future.completeExceptionally(throwable);
}

private Peer createPeer(PeerAddress address) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,21 +119,24 @@ public CompletableFuture<Void> start() {
Context.propagate(context);
blockingClientManager.startAsync();
blockingClientManager.awaitRunning();
}, SingleThreadExecutorUtils.getSingleThreadExecutor("start"));
}

public void connectToAll() {
peerConnections.forEach(PeerConnection::connect);
peerConnections.forEach(PeerConnection::start);
}, SingleThreadExecutorUtils.getSingleThreadExecutor("start"));
}

public CompletableFuture<Void> shutdown() {
return CompletableFuture.runAsync(() -> {
log.info("shutdown");
log.info("Shutdown all peerConnections");
Context.propagate(context);
CountDownLatch latch = new CountDownLatch(peerConnections.size());
peerConnections.forEach(e -> e.shutdown().thenRun(latch::countDown));
peerConnections.forEach(peerConnection -> peerConnection.shutdown()
.thenRun(latch::countDown));
try {
latch.await(5, TimeUnit.SECONDS);
if (latch.await(3, TimeUnit.SECONDS)) {
log.info("All peerConnections shut down.");
} else {
log.info("Shutdown of peerConnections not completed in time.");
}
blockingClientManager.stopAsync();
blockingClientManager.awaitTerminated(Duration.ofSeconds(2));
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,28 +56,24 @@ public class SimpleHttpServer {
private final static String CLOSE_TAG = "</font><br/>";
private final static String WARNING_ICON = "&#9888; ";
private final static String ALERT_ICON = "&#9760; "; // &#9889; &#9889;
private final Config config;
@Getter
private final List<BtcNodes.BtcNode> providedBtcNodes;
private final Map<String, BtcNodes.BtcNode> btcNodeByAddress;
private final int port;
private final PeerConncetionModel peerConncetionModel;
private final String started;

private final String networkInfo;
private String html;
private int requestCounter;
private String networkInfo;


///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////

public SimpleHttpServer(Config config, PeerConncetionModel peerConncetionModel) {
this.config = config;
this.peerConncetionModel = peerConncetionModel;
started = new Date().toString();

this.providedBtcNodes = peerConncetionModel.getProvidedBtcNodes();
providedBtcNodes = peerConncetionModel.getProvidedBtcNodes();

BaseCurrencyNetwork network = config.baseCurrencyNetwork;
if (config.useTorForBtcMonitor) {
Expand Down Expand Up @@ -121,9 +117,9 @@ public CompletableFuture<Void> start() {

public CompletableFuture<Void> shutdown() {
return CompletableFuture.runAsync(() -> {
log.info("shutDown");
log.info("stop Spark server");
Spark.stop();
log.info("shutDown completed");
log.info("Spark server stopped");
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ public CompletableFuture<Optional<Socks5Proxy>> createSocksProxy() {
}

public CompletableFuture<Void> shutdown() {
log.warn("start shutdown");
log.info("Shutdown tor");
return CompletableFuture.runAsync(() -> {
if (tor != null) {
tor.shutdown();
log.info("Tor shutdown completed");
}
log.warn("shutdown completed");
})
.orTimeout(2, TimeUnit.SECONDS);
}
Expand Down
Loading

0 comments on commit f81ab85

Please sign in to comment.