From d6d7041bd05de1a24075d0463f4f0eda2f5139a8 Mon Sep 17 00:00:00 2001 From: Jonathan Moody <103143855+moodyjon@users.noreply.github.com> Date: Wed, 7 Dec 2022 13:14:05 -0500 Subject: [PATCH] Clear self._urgent_reconnect_needed once more after connection established. Tweak sleep_delay logic so failures during initial RPC sequence also back off. --- lbry/wallet/network.py | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/lbry/wallet/network.py b/lbry/wallet/network.py index 3e9b19e531..3664a457d8 100644 --- a/lbry/wallet/network.py +++ b/lbry/wallet/network.py @@ -309,14 +309,14 @@ async def connect_to_fastest(self) -> Optional[ClientSession]: return async def network_loop(self): - sleep_delay = 15 + def reset_sleep(): + return 10 + random.uniform(0, 5) + sleep_delay = reset_sleep() while self.running: await asyncio.wait( [asyncio.sleep(sleep_delay), self._urgent_need_reconnect.wait()], return_when=asyncio.FIRST_COMPLETED ) - if self._urgent_need_reconnect.is_set(): - sleep_delay = 10 + random.uniform(0, 5) self._urgent_need_reconnect.clear() if not self.is_connected: client = await self.connect_to_fastest() @@ -325,17 +325,24 @@ async def network_loop(self): sleep_delay = min(sleep_delay, 120) log.warning("failed to connect to any spv servers, retrying after %.2fs", sleep_delay) continue + sleep_delay = reset_sleep() + server_str = "%s:%i" % client.server try: + # Perform initial sequence of RPCs. log.debug("get spv server features %s:%i", *client.server) features = await client.send_request('server.features', []) - self.client, self.server_features = client, features log.debug("discover other hubs %s:%i", *client.server) await self._update_hubs(await client.send_request('server.peers.subscribe', [])) log.info("subscribe to headers %s:%i", *client.server) - self._update_remote_height((await self.subscribe_headers(),)) + self._update_remote_height((await client.send_request('blockchain.headers.subscribe', [True]),)) + + # All initial RPCs were successful. We're now connected. + self.client, self.server_features = client, features + self._urgent_need_reconnect.clear() + sleep_delay = reset_sleep() + # Release any waiters. self._on_connected_controller.add(True) - sleep_delay = 15 - server_str = "%s:%i" % client.server + log.info("maintaining connection to spv server %s", server_str) self._keepalive_task = asyncio.create_task(self.client.keepalive_loop()) await asyncio.wait( @@ -345,7 +352,9 @@ async def network_loop(self): if self._urgent_need_reconnect.is_set(): log.warning("urgent reconnect needed") except (asyncio.TimeoutError, ConnectionResetError, ConnectionError, RPCError, ProtocolError): - pass + sleep_delay *= 2 + sleep_delay = min(sleep_delay, 120) + log.warning("failed to connect to spv server %s, retrying after %.2fs", server_str, sleep_delay) finally: if self._keepalive_task and not self._keepalive_task.done(): self._keepalive_task.cancel() @@ -436,9 +445,6 @@ def get_history(self, address): def broadcast(self, raw_transaction): return self.rpc('blockchain.transaction.broadcast', [raw_transaction], True) - def subscribe_headers(self): - return self.rpc('blockchain.headers.subscribe', [True], True) - async def subscribe_address(self, address, *addresses): addresses = list((address, ) + addresses) server_addr_and_port = self.client.server_address_and_port # on disconnect client will be None