Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More network_loop() exception handling and fix for sleep_delay. #3718

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 36 additions & 30 deletions lbry/wallet/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,47 +309,55 @@ async def connect_to_fastest(self) -> Optional[ClientSession]:
return

async def network_loop(self):
sleep_delay = 30
def reset_sleep():
return 10 + random.uniform(0, 5)
sleep_delay = reset_sleep()
while self.running:
await asyncio.wait(
[asyncio.sleep(30), self._urgent_need_reconnect.wait()], return_when=asyncio.FIRST_COMPLETED
[asyncio.sleep(sleep_delay), self._urgent_need_reconnect.wait()],
return_when=asyncio.FIRST_COMPLETED
)
if self._urgent_need_reconnect.is_set():
sleep_delay = 30
self._urgent_need_reconnect.clear()
if not self.is_connected:
client = await self.connect_to_fastest()
if not client:
log.warning("failed to connect to any spv servers, retrying later")
sleep_delay *= 2
sleep_delay = min(sleep_delay, 300)
sleep_delay = min(sleep_delay, 120)
log.warning("failed to connect to any spv servers, retrying after %.2fs", sleep_delay)
continue
log.debug("get spv server features %s:%i", *client.server)
features = await client.send_request('server.features', [])
self.client, self.server_features = client, features
log.debug("discover other hubs %s:%i", *client.server)
await self._update_hubs(await client.send_request('server.peers.subscribe', []))
log.info("subscribe to headers %s:%i", *client.server)
self._update_remote_height((await self.subscribe_headers(),))
self._on_connected_controller.add(True)
sleep_delay = reset_sleep()
server_str = "%s:%i" % client.server
log.info("maintaining connection to spv server %s", server_str)
self._keepalive_task = asyncio.create_task(self.client.keepalive_loop())
try:
if not self._urgent_need_reconnect.is_set():
await asyncio.wait(
[self._keepalive_task, self._urgent_need_reconnect.wait()],
return_when=asyncio.FIRST_COMPLETED
)
else:
await self._keepalive_task
# Perform initial sequence of RPCs.
log.debug("get spv server features %s:%i", *client.server)
features = await client.send_request('server.features', [])
log.debug("discover other hubs %s:%i", *client.server)
await self._update_hubs(await client.send_request('server.peers.subscribe', []))
log.info("subscribe to headers %s:%i", *client.server)
self._update_remote_height((await client.send_request('blockchain.headers.subscribe', [True]),))

# All initial RPCs were successful. We're now connected.
self.client, self.server_features = client, features
self._urgent_need_reconnect.clear()
sleep_delay = reset_sleep()
# Release any waiters.
self._on_connected_controller.add(True)

log.info("maintaining connection to spv server %s", server_str)
self._keepalive_task = asyncio.create_task(self.client.keepalive_loop())
await asyncio.wait(
[self._keepalive_task, self._urgent_need_reconnect.wait()],
return_when=asyncio.FIRST_COMPLETED
)
if self._urgent_need_reconnect.is_set():
log.warning("urgent reconnect needed")
except (asyncio.TimeoutError, ConnectionResetError, ConnectionError, RPCError, ProtocolError):
sleep_delay *= 2
sleep_delay = min(sleep_delay, 120)
log.warning("failed to connect to spv server %s, retrying after %.2fs", server_str, sleep_delay)
finally:
if self._keepalive_task and not self._keepalive_task.done():
self._keepalive_task.cancel()
except asyncio.CancelledError:
pass
finally:
self._keepalive_task = None
self.client = None
self.server_features = None
Expand Down Expand Up @@ -381,9 +389,10 @@ def rpc(self, list_or_method, args, restricted=True, session: Optional[ClientSes
async def retriable_call(self, function, *args, **kwargs):
while self.running:
if not self.is_connected:
log.warning("Wallet server unavailable, waiting for it to come back and retry.")
log.warning("%s: Wallet server unavailable, waiting for it to come back and retry.", function.__name__)
self._urgent_need_reconnect.set()
await self.on_connected.first
log.warning("%s: Wallet server available, proceeding.", function.__name__)
try:
return await function(*args, **kwargs)
except asyncio.TimeoutError:
Expand Down Expand Up @@ -436,9 +445,6 @@ def get_history(self, address):
def broadcast(self, raw_transaction):
return self.rpc('blockchain.transaction.broadcast', [raw_transaction], True)

def subscribe_headers(self):
return self.rpc('blockchain.headers.subscribe', [True], True)

async def subscribe_address(self, address, *addresses):
addresses = list((address, ) + addresses)
server_addr_and_port = self.client.server_address_and_port # on disconnect client will be None
Expand Down