From e8eaa87f6b6cb6710836b9b236f560abec83fa6e Mon Sep 17 00:00:00 2001 From: Anton Joubert Date: Fri, 27 Apr 2018 19:46:58 +0200 Subject: [PATCH] Recover from flush timeouts on lost message If the client sends a PING message and that message, or the replying PONG gets lost somehow, this will cause all later `flush` attempts to timeout even if the later PING-PONGs are fast enough. This is due to the `_pongs` list getting out of sync with the flush requests - the oldest item in the list no longer refers to the current flush attempt. The fix is as follows: - If the flush PING-PONG times out, set the future's result to False as an indication that this item in the `_pongs` list can be ignored later. - When a PONG is received, remove all old items that have already timed out, and then one that hasn't yet timed out. Note that `_pongs_received` counter doesn't refer to the number of PONGs actually received, but rather the number of items removed from the the `_pongs` list. --- nats/io/client.py | 16 +++++++++--- tests/client_test.py | 61 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+), 3 deletions(-) diff --git a/nats/io/client.py b/nats/io/client.py index ee5ee92..c6dd3a7 100644 --- a/nats/io/client.py +++ b/nats/io/client.py @@ -392,7 +392,12 @@ def _flush_timeout(self, timeout): """ future = tornado.concurrent.Future() yield self._send_ping(future) - result = yield tornado.gen.with_timeout(timedelta(seconds=timeout), future) + try: + result = yield tornado.gen.with_timeout(timedelta(seconds=timeout), future) + except tornado.gen.TimeoutError: + # Set the future to False so it can be ignored in _process_pong. + future.set_result(False) + raise raise tornado.gen.Return(result) @tornado.gen.coroutine @@ -539,12 +544,17 @@ def _process_pong(self): The client will send a PING soon after CONNECT and then periodically to the server as a failure detector to close connections to unhealthy servers. For each PING the client sends, we will add a respective PONG future. + Here we want to find the oldest PONG future that is still running. If the + flush PING-PONG already timed out, then just drop those old items. """ - if len(self._pongs) > 0: + while len(self._pongs) > 0: future = self._pongs.pop(0) - future.set_result(True) self._pongs_received += 1 self._pings_outstanding -= 1 + # Only exit loop if future still running (hasn't exceeded flush timeout). + if future.running(): + future.set_result(True) + break @tornado.gen.coroutine def _process_msg(self, sid, subject, reply, data): diff --git a/tests/client_test.py b/tests/client_test.py index 656dda7..8ec5348 100644 --- a/tests/client_test.py +++ b/tests/client_test.py @@ -947,6 +947,38 @@ def parse(self, data=''): self.assertTrue(nc.is_connected) self.assertFalse(nc.is_reconnecting) + @tornado.testing.gen_test + def test_ping_slow_replies(self): + pongs = [] + class Parser(): + def __init__(self, nc): + self.nc = nc + + @tornado.gen.coroutine + def parse(self, data=''): + pongs.append(data) # but, don't process now + + nc = Client() + nc._ps = Parser(nc) + yield nc.connect(io_loop=self.io_loop, + ping_interval=0.1, + max_outstanding_pings=20) + yield tornado.gen.sleep(1) + + # Should have received more than 5 pongs, but processed none. + self.assertTrue(len(pongs) > 5) + self.assertTrue(len(pongs) <= nc._pings_outstanding) + self.assertEqual(0, nc._pongs_received) + self.assertEqual(len(nc._pongs), nc._pings_outstanding) + # Process all that were sent. + expected_outstanding = nc._pings_outstanding + for i in range(nc._pings_outstanding): + yield nc._process_pong() + expected_outstanding -= 1 + self.assertEqual(expected_outstanding, nc._pings_outstanding) + self.assertEqual(expected_outstanding, len(nc._pongs)) + self.assertEqual(i + 1, nc._pongs_received) + @tornado.testing.gen_test def test_flush_timeout(self): @@ -969,6 +1001,35 @@ def parse(self, data=''): self.assertEqual(1, len(nc._pongs)) self.assertEqual(1, nc._pings_outstanding) + @tornado.testing.gen_test + def test_flush_timeout_lost_message(self): + + class Parser(): + def __init__(self, nc): + self.nc = nc + self.drop_messages = False + + @tornado.gen.coroutine + def parse(self, data=''): + if not self.drop_messages: + yield self.nc._process_pong() + + nc = Client() + nc._ps = Parser(nc) + yield nc.connect(io_loop=self.io_loop) + nc._ps.drop_messages = True + with self.assertRaises(tornado.gen.TimeoutError): + yield nc.flush(timeout=1) + self.assertEqual(1, len(nc._pongs)) + self.assertEqual(1, nc._pings_outstanding) + self.assertEqual(0, nc._pongs_received) + # Successful flush must clear timed out pong and the new one. + nc._ps.drop_messages = False + yield nc.flush(timeout=1) + self.assertEqual(0, len(nc._pongs)) + self.assertEqual(0, nc._pings_outstanding) + self.assertEqual(2, nc._pongs_received) + @tornado.testing.gen_test def test_timed_request_timeout(self):