Skip to content

Commit

Permalink
Clear self._urgent_reconnect_needed once more after connection establ…
Browse files Browse the repository at this point in the history
…ished.

Tweak sleep_delay logic so failures during initial RPC sequence also back off.
  • Loading branch information
moodyjon committed Dec 14, 2022
1 parent 93f85d0 commit 154921a
Showing 1 changed file with 17 additions and 11 deletions.
28 changes: 17 additions & 11 deletions lbry/wallet/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,14 +309,14 @@ async def connect_to_fastest(self) -> Optional[ClientSession]:
return

async def network_loop(self):
sleep_delay = 15
def reset_sleep():
return 10 + random.uniform(0, 5)
sleep_delay = reset_sleep()
while self.running:
await asyncio.wait(
[asyncio.sleep(sleep_delay), self._urgent_need_reconnect.wait()],
return_when=asyncio.FIRST_COMPLETED
)
if self._urgent_need_reconnect.is_set():
sleep_delay = 10 + random.uniform(0, 5)
self._urgent_need_reconnect.clear()
if not self.is_connected:
client = await self.connect_to_fastest()
Expand All @@ -325,17 +325,24 @@ async def network_loop(self):
sleep_delay = min(sleep_delay, 120)
log.warning("failed to connect to any spv servers, retrying after %.2fs", sleep_delay)
continue
sleep_delay = reset_sleep()
server_str = "%s:%i" % client.server
try:
# Perform initial sequence of RPCs.
log.debug("get spv server features %s:%i", *client.server)
features = await client.send_request('server.features', [])
self.client, self.server_features = client, features
log.debug("discover other hubs %s:%i", *client.server)
await self._update_hubs(await client.send_request('server.peers.subscribe', []))
log.info("subscribe to headers %s:%i", *client.server)
self._update_remote_height((await self.subscribe_headers(),))
self._update_remote_height((await client.send_request('blockchain.headers.subscribe', [True]),))

# All initial RPCs were successful. We're now connected.
self.client, self.server_features = client, features
self._urgent_need_reconnect.clear()
sleep_delay = reset_sleep()
# Release any waiters.
self._on_connected_controller.add(True)
sleep_delay = 15
server_str = "%s:%i" % client.server

log.info("maintaining connection to spv server %s", server_str)
self._keepalive_task = asyncio.create_task(self.client.keepalive_loop())
await asyncio.wait(
Expand All @@ -345,7 +352,9 @@ async def network_loop(self):
if self._urgent_need_reconnect.is_set():
log.warning("urgent reconnect needed")
except (asyncio.TimeoutError, ConnectionResetError, ConnectionError, RPCError, ProtocolError):
pass
sleep_delay *= 2
sleep_delay = min(sleep_delay, 120)
log.warning("failed to connect to spv server %s, retrying after %.2fs", server_str, sleep_delay)
finally:
if self._keepalive_task and not self._keepalive_task.done():
self._keepalive_task.cancel()
Expand Down Expand Up @@ -436,9 +445,6 @@ def get_history(self, address):
def broadcast(self, raw_transaction):
return self.rpc('blockchain.transaction.broadcast', [raw_transaction], True)

def subscribe_headers(self):
return self.rpc('blockchain.headers.subscribe', [True], True)

async def subscribe_address(self, address, *addresses):
addresses = list((address, ) + addresses)
server_addr_and_port = self.client.server_address_and_port # on disconnect client will be None
Expand Down

0 comments on commit 154921a

Please sign in to comment.