diff --git a/nats/io/client.py b/nats/io/client.py index 34c0de0..151990c 100644 --- a/nats/io/client.py +++ b/nats/io/client.py @@ -392,7 +392,12 @@ def _flush_timeout(self, timeout): """ future = tornado.concurrent.Future() yield self._send_ping(future) - result = yield tornado.gen.with_timeout(timedelta(seconds=timeout), future) + try: + result = yield tornado.gen.with_timeout(timedelta(seconds=timeout), future) + except tornado.gen.TimeoutError: + # Set the future to False so it can be ignored in _process_pong. + future.set_result(False) + raise raise tornado.gen.Return(result) @tornado.gen.coroutine @@ -539,12 +544,17 @@ def _process_pong(self): The client will send a PING soon after CONNECT and then periodically to the server as a failure detector to close connections to unhealthy servers. For each PING the client sends, we will add a respective PONG future. + Here we want to find the oldest PONG future that is still running. If the + flush PING-PONG already timed out, then just drop those old items. """ - if len(self._pongs) > 0: + while len(self._pongs) > 0: future = self._pongs.pop(0) - future.set_result(True) self._pongs_received += 1 self._pings_outstanding -= 1 + # Only exit loop if future still running (hasn't exceeded flush timeout). + if future.running(): + future.set_result(True) + break @tornado.gen.coroutine def _process_msg(self, sid, subject, reply, data): diff --git a/tests/client_test.py b/tests/client_test.py index 656dda7..8ec5348 100644 --- a/tests/client_test.py +++ b/tests/client_test.py @@ -947,6 +947,38 @@ def parse(self, data=''): self.assertTrue(nc.is_connected) self.assertFalse(nc.is_reconnecting) + @tornado.testing.gen_test + def test_ping_slow_replies(self): + pongs = [] + class Parser(): + def __init__(self, nc): + self.nc = nc + + @tornado.gen.coroutine + def parse(self, data=''): + pongs.append(data) # but, don't process now + + nc = Client() + nc._ps = Parser(nc) + yield nc.connect(io_loop=self.io_loop, + ping_interval=0.1, + max_outstanding_pings=20) + yield tornado.gen.sleep(1) + + # Should have received more than 5 pongs, but processed none. + self.assertTrue(len(pongs) > 5) + self.assertTrue(len(pongs) <= nc._pings_outstanding) + self.assertEqual(0, nc._pongs_received) + self.assertEqual(len(nc._pongs), nc._pings_outstanding) + # Process all that were sent. + expected_outstanding = nc._pings_outstanding + for i in range(nc._pings_outstanding): + yield nc._process_pong() + expected_outstanding -= 1 + self.assertEqual(expected_outstanding, nc._pings_outstanding) + self.assertEqual(expected_outstanding, len(nc._pongs)) + self.assertEqual(i + 1, nc._pongs_received) + @tornado.testing.gen_test def test_flush_timeout(self): @@ -969,6 +1001,35 @@ def parse(self, data=''): self.assertEqual(1, len(nc._pongs)) self.assertEqual(1, nc._pings_outstanding) + @tornado.testing.gen_test + def test_flush_timeout_lost_message(self): + + class Parser(): + def __init__(self, nc): + self.nc = nc + self.drop_messages = False + + @tornado.gen.coroutine + def parse(self, data=''): + if not self.drop_messages: + yield self.nc._process_pong() + + nc = Client() + nc._ps = Parser(nc) + yield nc.connect(io_loop=self.io_loop) + nc._ps.drop_messages = True + with self.assertRaises(tornado.gen.TimeoutError): + yield nc.flush(timeout=1) + self.assertEqual(1, len(nc._pongs)) + self.assertEqual(1, nc._pings_outstanding) + self.assertEqual(0, nc._pongs_received) + # Successful flush must clear timed out pong and the new one. + nc._ps.drop_messages = False + yield nc.flush(timeout=1) + self.assertEqual(0, len(nc._pongs)) + self.assertEqual(0, nc._pings_outstanding) + self.assertEqual(2, nc._pongs_received) + @tornado.testing.gen_test def test_timed_request_timeout(self):