diff --git a/btcnodemonitor/src/main/java/bisq/btcnodemonitor/BtcNodeMonitor.java b/btcnodemonitor/src/main/java/bisq/btcnodemonitor/BtcNodeMonitor.java index 44fd3dd3cfe..9d4ad9292e3 100644 --- a/btcnodemonitor/src/main/java/bisq/btcnodemonitor/BtcNodeMonitor.java +++ b/btcnodemonitor/src/main/java/bisq/btcnodemonitor/BtcNodeMonitor.java @@ -54,8 +54,7 @@ public CompletableFuture start() { return simpleHttpServer.start() .thenCompose(nil -> proxySetup.createSocksProxy()) .thenAccept(peerGroupService::applySocks5Proxy) - .thenCompose(nil -> peerGroupService.start()) - .thenRun(peerGroupService::connectToAll); + .thenCompose(nil -> peerGroupService.start()); } public CompletableFuture shutdown() { diff --git a/btcnodemonitor/src/main/java/bisq/btcnodemonitor/BtcNodeMonitorMain.java b/btcnodemonitor/src/main/java/bisq/btcnodemonitor/BtcNodeMonitorMain.java index 13a14f10fd2..8ba2210ec27 100644 --- a/btcnodemonitor/src/main/java/bisq/btcnodemonitor/BtcNodeMonitorMain.java +++ b/btcnodemonitor/src/main/java/bisq/btcnodemonitor/BtcNodeMonitorMain.java @@ -51,6 +51,7 @@ public BtcNodeMonitorMain(String[] args) { @Override public void gracefulShutDown(ResultHandler resultHandler) { + log.info("gracefulShutDown"); btcNodeMonitor.shutdown().join(); System.exit(0); resultHandler.handleResult(); diff --git a/btcnodemonitor/src/main/java/bisq/btcnodemonitor/btc/PeerConncetionInfo.java b/btcnodemonitor/src/main/java/bisq/btcnodemonitor/btc/PeerConncetionInfo.java index 26e2bd8c683..f6487ca5035 100644 --- a/btcnodemonitor/src/main/java/bisq/btcnodemonitor/btc/PeerConncetionInfo.java +++ b/btcnodemonitor/src/main/java/bisq/btcnodemonitor/btc/PeerConncetionInfo.java @@ -64,7 +64,9 @@ public String getAddress() { } public String getShortId() { - return getAddress().substring(0, 12) + "..."; + String address = getAddress(); + int endIndex = Math.min(address.length(), 12); + return address.substring(0, endIndex) + "..."; } public int getNumConnectionAttempts() { diff --git a/btcnodemonitor/src/main/java/bisq/btcnodemonitor/btc/PeerConnection.java b/btcnodemonitor/src/main/java/bisq/btcnodemonitor/btc/PeerConnection.java index 535ef2b89f4..ebe11205734 100644 --- a/btcnodemonitor/src/main/java/bisq/btcnodemonitor/btc/PeerConnection.java +++ b/btcnodemonitor/src/main/java/bisq/btcnodemonitor/btc/PeerConnection.java @@ -17,8 +17,6 @@ package bisq.btcnodemonitor.btc; -import bisq.common.Timer; -import bisq.common.UserThread; import bisq.common.util.SingleThreadExecutorUtils; import org.bitcoinj.core.Context; @@ -27,6 +25,8 @@ import org.bitcoinj.core.PeerAddress; import org.bitcoinj.core.Utils; import org.bitcoinj.core.VersionMessage; +import org.bitcoinj.core.listeners.PeerConnectedEventListener; +import org.bitcoinj.core.listeners.PeerDisconnectedEventListener; import org.bitcoinj.net.BlockingClientManager; import org.bitcoinj.net.ClientConnectionManager; @@ -39,6 +39,8 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import lombok.extern.slf4j.Slf4j; @@ -55,9 +57,15 @@ public class PeerConnection { private final int connectTimeoutMillis; private final int vMinRequiredProtocolVersion; private final PeerConncetionInfo peerConncetionInfo; - private Optional disconnectScheduler = Optional.empty(); - private Optional reconnectScheduler = Optional.empty(); private final AtomicBoolean shutdownCalled = new AtomicBoolean(); + private final ExecutorService connectionExecutor; + private final ExecutorService onConnectionExecutor; + private final ExecutorService onDisConnectionExecutor; + private Optional peerConnectedEventListener = Optional.empty(); + private Optional peerDisconnectedEventListener = Optional.empty(); + private Optional> connectAndDisconnectFuture = Optional.empty(); + private Optional> innerConnectAndDisconnectFuture = Optional.empty(); + private Optional> openConnectionFuture = Optional.empty(); public PeerConnection(Context context, PeerConncetionInfo peerConncetionInfo, @@ -72,37 +80,110 @@ public PeerConnection(Context context, this.disconnectIntervalSec = disconnectIntervalSec; this.reconnectIntervalSec = reconnectIntervalSec; + connectionExecutor = SingleThreadExecutorUtils.getSingleThreadExecutor("connection-" + peerConncetionInfo.getShortId()); + onConnectionExecutor = SingleThreadExecutorUtils.getSingleThreadExecutor("onConnection-" + peerConncetionInfo.getShortId()); + onDisConnectionExecutor = SingleThreadExecutorUtils.getSingleThreadExecutor("onDisConnection-" + peerConncetionInfo.getShortId()); + this.params = context.getParams(); vMinRequiredProtocolVersion = params.getProtocolVersionNum(NetworkParameters.ProtocolVersion.BLOOM_FILTER); } + public void start() { + CompletableFuture.runAsync(() -> { + do { + connectAndDisconnectFuture = Optional.of(connectAndDisconnect()); + try { + connectAndDisconnectFuture.get().join(); + } catch (Exception ignore) { + } + } + while (!shutdownCalled.get() && !Thread.currentThread().isInterrupted()); + log.info("Exiting startConnectAndDisconnectLoop loop. Expected at shutdown"); + }, connectionExecutor); + } + public CompletableFuture shutdown() { + log.info("Shutdown"); shutdownCalled.set(true); - disconnectScheduler.ifPresent(Timer::stop); - reconnectScheduler.ifPresent(Timer::stop); + peerConncetionInfo.getCurrentConnectionAttempt().ifPresent(connectionAttempt -> { + Peer peer = connectionAttempt.getPeer(); + peerDisconnectedEventListener.ifPresent(peer::removeDisconnectedEventListener); + peerDisconnectedEventListener = Optional.empty(); + peerConnectedEventListener.ifPresent(peer::removeConnectedEventListener); + peerConnectedEventListener = Optional.empty(); + }); + return CompletableFuture.runAsync(() -> { - log.info("shutdown {}", peerConncetionInfo); Context.propagate(context); - disconnect(); + peerConncetionInfo.getCurrentConnectionAttempt() + .ifPresent(currentConnectionAttempt -> currentConnectionAttempt.getPeer().close()); + + connectAndDisconnectFuture.ifPresent(connectFuture -> connectFuture.complete(null)); + innerConnectAndDisconnectFuture.ifPresent(connectFuture -> connectFuture.complete(null)); + + connectionExecutor.shutdownNow(); + onConnectionExecutor.shutdownNow(); + onDisConnectionExecutor.shutdownNow(); }, SingleThreadExecutorUtils.getSingleThreadExecutor("shutdown-" + peerConncetionInfo.getShortId())); } - public void connect() { - CompletableFuture.runAsync(() -> { - log.info("connect {}", peerConncetionInfo); + private CompletableFuture connectAndDisconnect() { + CompletableFuture future = new CompletableFuture<>(); + innerConnectAndDisconnectFuture = Optional.of(CompletableFuture.runAsync(() -> { + log.info("\n>> Connect to {}", peerConncetionInfo.getAddress()); Context.propagate(context); Peer peer = createPeer(peerConncetionInfo.getPeerAddress()); PeerConncetionInfo.ConnectionAttempt connectionAttempt = peerConncetionInfo.newConnectionAttempt(peer); long ts = System.currentTimeMillis(); connectionAttempt.setConnectionStartedTs(ts); - try { - peer.addConnectedEventListener((peer1, peerCount) -> { - connectionAttempt.setDurationUntilConnection(System.currentTimeMillis() - ts); - connectionAttempt.setConnectionSuccessTs(System.currentTimeMillis()); + + peerConnectedEventListener = Optional.of((p, peerCount) -> { + peerConnectedEventListener.ifPresent(peer::removeConnectedEventListener); + peerConnectedEventListener = Optional.empty(); + if (shutdownCalled.get()) { + return; + } + try { + log.info("\n## Successfully connected to {}", peer.getAddress()); + long now = System.currentTimeMillis(); + connectionAttempt.setDurationUntilConnection(now - ts); + connectionAttempt.setConnectionSuccessTs(now); connectionAttempt.onConnected(); - startAutoDisconnectAndReconnect(); - }); - peer.addDisconnectedEventListener((peer1, peerCount) -> { + + try { + Thread.sleep(disconnectIntervalSec * 1000L); // 2 sec + } catch (InterruptedException ignore) { + } + if (shutdownCalled.get()) { + return; + } + log.info("Close peer {}", peer.getAddress()); + peer.close(); + } catch (Exception exception) { + log.warn("Exception at onConnection handler. {}", exception.toString()); + handleException(exception, peer, connectionAttempt, ts, future); + } + }); + peer.addConnectedEventListener(onConnectionExecutor, peerConnectedEventListener.get()); + + peerDisconnectedEventListener = Optional.of((p, peerCount) -> { + // At socket timeouts we get called twice from Bitcoinj + if (peerDisconnectedEventListener.isEmpty()) { + log.error("We got called twice at socketimeout from BitcoinJ and ignore the 2nd call."); + return; + } + peerDisconnectedEventListener.ifPresent(peer::removeDisconnectedEventListener); + peerDisconnectedEventListener = Optional.empty(); + if (shutdownCalled.get()) { + return; + } + if (openConnectionFuture.isPresent() && !openConnectionFuture.get().isDone()) { + // BitcoinJ calls onDisconnect at sockettimeout without throwing an error on the open connection future. + openConnectionFuture.get().completeExceptionally(new TimeoutException("Open connection failed due timeout at PeerSocketHandler")); + return; + } + try { + log.info("\n<< Disconnected from {}", peer.getAddress()); long passed = System.currentTimeMillis() - ts; // Timeout is not handled as error in bitcoinj, but it simply disconnects // If we had a successful connect before we got versionMessage set, otherwise its from an error. @@ -113,52 +194,62 @@ public void connect() { connectionAttempt.setDurationUntilDisConnection(passed); connectionAttempt.onDisconnected(); } - startAutoDisconnectAndReconnect(); - }); - openConnection(peer).join(); + try { + Thread.sleep(reconnectIntervalSec * 2000L); // 120 sec + } catch (InterruptedException ignore) { + } + if (shutdownCalled.get()) { + return; + } + future.complete(null); + } catch (Exception exception) { + log.warn("Exception at onDisconnection handler. {}", exception.toString()); + handleException(exception, peer, connectionAttempt, ts, future); + } + }); + peer.addDisconnectedEventListener(onDisConnectionExecutor, peerDisconnectedEventListener.get()); + + try { + openConnectionFuture = Optional.of(openConnection(peer)); + openConnectionFuture.get().join(); } catch (Exception exception) { - log.warn("Error at opening connection to peer {}", peerConncetionInfo, exception); - connectionAttempt.setDurationUntilFailure(System.currentTimeMillis() - ts); - connectionAttempt.onException(exception); - startAutoDisconnectAndReconnect(); + log.warn("Error at opening connection to peer {}. {}", peerConncetionInfo, exception.toString()); + handleException(exception, peer, connectionAttempt, ts, future); } - }, SingleThreadExecutorUtils.getSingleThreadExecutor("connect-" + peerConncetionInfo.getShortId())); - } + }, MoreExecutors.directExecutor())); - private CompletableFuture disconnect() { - return peerConncetionInfo.getCurrentConnectionAttempt() - .map(currentConnectionAttempt -> CompletableFuture.runAsync(() -> { - log.info("disconnect {}", peerConncetionInfo); - Context.propagate(context); - currentConnectionAttempt.getPeer().close(); - }, - SingleThreadExecutorUtils.getSingleThreadExecutor("disconnect-" + peerConncetionInfo.getShortId()))) - .orElse(CompletableFuture.completedFuture(null)); + return future; } - private void startAutoDisconnectAndReconnect() { + private void handleException(Throwable throwable, + Peer peer, + PeerConncetionInfo.ConnectionAttempt connectionAttempt, + long ts, + CompletableFuture future) { + peerDisconnectedEventListener.ifPresent(peer::removeDisconnectedEventListener); + peerDisconnectedEventListener = Optional.empty(); + peerConnectedEventListener.ifPresent(peer::removeConnectedEventListener); + peerConnectedEventListener = Optional.empty(); if (shutdownCalled.get()) { return; } - disconnectScheduler.ifPresent(Timer::stop); - disconnectScheduler = Optional.of(UserThread.runAfter(() -> { - if (shutdownCalled.get()) { - return; - } - disconnect() - .thenRun(() -> { - if (shutdownCalled.get()) { - return; - } - reconnectScheduler.ifPresent(Timer::stop); - reconnectScheduler = Optional.of(UserThread.runAfter(() -> { - if (shutdownCalled.get()) { - return; - } - connect(); - }, reconnectIntervalSec)); - }); - }, disconnectIntervalSec)); + connectionAttempt.setDurationUntilFailure(System.currentTimeMillis() - ts); + connectionAttempt.onException(throwable); + try { + // Try disconnect + log.info("Try close peer {}", peer.getAddress()); + peer.close(); + } catch (Exception ignore) { + } + + try { + Thread.sleep(reconnectIntervalSec * 1000L); // 120 sec + } catch (InterruptedException ignore) { + } + if (shutdownCalled.get()) { + return; + } + future.completeExceptionally(throwable); } private Peer createPeer(PeerAddress address) { diff --git a/btcnodemonitor/src/main/java/bisq/btcnodemonitor/btc/PeerGroupService.java b/btcnodemonitor/src/main/java/bisq/btcnodemonitor/btc/PeerGroupService.java index 0a277cb1434..9f39b00d50b 100644 --- a/btcnodemonitor/src/main/java/bisq/btcnodemonitor/btc/PeerGroupService.java +++ b/btcnodemonitor/src/main/java/bisq/btcnodemonitor/btc/PeerGroupService.java @@ -119,21 +119,24 @@ public CompletableFuture start() { Context.propagate(context); blockingClientManager.startAsync(); blockingClientManager.awaitRunning(); - }, SingleThreadExecutorUtils.getSingleThreadExecutor("start")); - } - public void connectToAll() { - peerConnections.forEach(PeerConnection::connect); + peerConnections.forEach(PeerConnection::start); + }, SingleThreadExecutorUtils.getSingleThreadExecutor("start")); } public CompletableFuture shutdown() { return CompletableFuture.runAsync(() -> { - log.info("shutdown"); + log.info("Shutdown all peerConnections"); Context.propagate(context); CountDownLatch latch = new CountDownLatch(peerConnections.size()); - peerConnections.forEach(e -> e.shutdown().thenRun(latch::countDown)); + peerConnections.forEach(peerConnection -> peerConnection.shutdown() + .thenRun(latch::countDown)); try { - latch.await(5, TimeUnit.SECONDS); + if (latch.await(3, TimeUnit.SECONDS)) { + log.info("All peerConnections shut down."); + } else { + log.info("Shutdown of peerConnections not completed in time."); + } blockingClientManager.stopAsync(); blockingClientManager.awaitTerminated(Duration.ofSeconds(2)); } catch (Exception e) { diff --git a/btcnodemonitor/src/main/java/bisq/btcnodemonitor/server/SimpleHttpServer.java b/btcnodemonitor/src/main/java/bisq/btcnodemonitor/server/SimpleHttpServer.java index 1b776dd69ce..a0385613a36 100644 --- a/btcnodemonitor/src/main/java/bisq/btcnodemonitor/server/SimpleHttpServer.java +++ b/btcnodemonitor/src/main/java/bisq/btcnodemonitor/server/SimpleHttpServer.java @@ -56,28 +56,24 @@ public class SimpleHttpServer { private final static String CLOSE_TAG = "
"; private final static String WARNING_ICON = "⚠ "; private final static String ALERT_ICON = "☠ "; // ⚡ ⚡ - private final Config config; @Getter private final List providedBtcNodes; private final Map btcNodeByAddress; private final int port; private final PeerConncetionModel peerConncetionModel; private final String started; - + private final String networkInfo; private String html; - private int requestCounter; - private String networkInfo; + /////////////////////////////////////////////////////////////////////////////////////////// // Constructor /////////////////////////////////////////////////////////////////////////////////////////// public SimpleHttpServer(Config config, PeerConncetionModel peerConncetionModel) { - this.config = config; this.peerConncetionModel = peerConncetionModel; started = new Date().toString(); - - this.providedBtcNodes = peerConncetionModel.getProvidedBtcNodes(); + providedBtcNodes = peerConncetionModel.getProvidedBtcNodes(); BaseCurrencyNetwork network = config.baseCurrencyNetwork; if (config.useTorForBtcMonitor) { @@ -121,9 +117,9 @@ public CompletableFuture start() { public CompletableFuture shutdown() { return CompletableFuture.runAsync(() -> { - log.info("shutDown"); + log.info("stop Spark server"); Spark.stop(); - log.info("shutDown completed"); + log.info("Spark server stopped"); }); } diff --git a/btcnodemonitor/src/main/java/bisq/btcnodemonitor/socksProxy/ProxySetup.java b/btcnodemonitor/src/main/java/bisq/btcnodemonitor/socksProxy/ProxySetup.java index 44cfa144cf9..5a04a534a6c 100644 --- a/btcnodemonitor/src/main/java/bisq/btcnodemonitor/socksProxy/ProxySetup.java +++ b/btcnodemonitor/src/main/java/bisq/btcnodemonitor/socksProxy/ProxySetup.java @@ -81,12 +81,12 @@ public CompletableFuture> createSocksProxy() { } public CompletableFuture shutdown() { - log.warn("start shutdown"); + log.info("Shutdown tor"); return CompletableFuture.runAsync(() -> { if (tor != null) { tor.shutdown(); + log.info("Tor shutdown completed"); } - log.warn("shutdown completed"); }) .orTimeout(2, TimeUnit.SECONDS); } diff --git a/common/src/main/java/bisq/common/util/Utilities.java b/common/src/main/java/bisq/common/util/Utilities.java index 30e0e7a05f7..87125850fea 100644 --- a/common/src/main/java/bisq/common/util/Utilities.java +++ b/common/src/main/java/bisq/common/util/Utilities.java @@ -63,7 +63,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -144,9 +143,9 @@ private static ThreadPoolExecutor getThreadPoolExecutor(String name, return executor; } - public static void shutdownAndAwaitTermination(ExecutorService executor, long timeout, TimeUnit unit) { + public static boolean shutdownAndAwaitTermination(ExecutorService executor, long timeout, TimeUnit unit) { //noinspection UnstableApiUsage - MoreExecutors.shutdownAndAwaitTermination(executor, timeout, unit); + return MoreExecutors.shutdownAndAwaitTermination(executor, timeout, unit); } public static FutureCallback failureCallback(Consumer errorHandler) { diff --git a/p2p/src/main/java/bisq/network/p2p/network/NewTor.java b/p2p/src/main/java/bisq/network/p2p/network/NewTor.java index 7cd6589cc86..64a4c65cfee 100644 --- a/p2p/src/main/java/bisq/network/p2p/network/NewTor.java +++ b/p2p/src/main/java/bisq/network/p2p/network/NewTor.java @@ -17,20 +17,21 @@ package bisq.network.p2p.network; +import org.berndpruenster.netlayer.tor.NativeTor; +import org.berndpruenster.netlayer.tor.Tor; +import org.berndpruenster.netlayer.tor.TorCtlException; +import org.berndpruenster.netlayer.tor.Torrc; + import java.io.File; import java.io.FileInputStream; import java.io.IOException; + import java.util.Arrays; import java.util.Collection; import java.util.Date; import java.util.LinkedHashMap; import java.util.stream.Collectors; -import org.berndpruenster.netlayer.tor.NativeTor; -import org.berndpruenster.netlayer.tor.Tor; -import org.berndpruenster.netlayer.tor.TorCtlException; -import org.berndpruenster.netlayer.tor.Torrc; - import lombok.extern.slf4j.Slf4j; import javax.annotation.Nullable; @@ -65,7 +66,7 @@ public Tor getTor() throws IOException, TorCtlException { long ts1 = new Date().getTime(); Collection bridgeEntries = bridgeAddressProvider.getBridgeAddresses(); - if (bridgeEntries != null) + if (bridgeEntries != null && !bridgeEntries.isEmpty()) log.info("Using bridges: {}", bridgeEntries.stream().collect(Collectors.joining(","))); Torrc override = null;