Skip to content

Commit

Permalink
Merge pull request #36 from ajoubertza/recover-from-lost-pings
Browse files Browse the repository at this point in the history
Recover from flush timeouts on lost PING-PONG message
  • Loading branch information
wallyqs authored May 1, 2018
2 parents 594ddb2 + e8eaa87 commit 0280980
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 3 deletions.
16 changes: 13 additions & 3 deletions nats/io/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,12 @@ def _flush_timeout(self, timeout):
"""
future = tornado.concurrent.Future()
yield self._send_ping(future)
result = yield tornado.gen.with_timeout(timedelta(seconds=timeout), future)
try:
result = yield tornado.gen.with_timeout(timedelta(seconds=timeout), future)
except tornado.gen.TimeoutError:
# Set the future to False so it can be ignored in _process_pong.
future.set_result(False)
raise
raise tornado.gen.Return(result)

@tornado.gen.coroutine
Expand Down Expand Up @@ -539,12 +544,17 @@ def _process_pong(self):
The client will send a PING soon after CONNECT and then periodically
to the server as a failure detector to close connections to unhealthy servers.
For each PING the client sends, we will add a respective PONG future.
Here we want to find the oldest PONG future that is still running. If the
flush PING-PONG already timed out, then just drop those old items.
"""
if len(self._pongs) > 0:
while len(self._pongs) > 0:
future = self._pongs.pop(0)
future.set_result(True)
self._pongs_received += 1
self._pings_outstanding -= 1
# Only exit loop if future still running (hasn't exceeded flush timeout).
if future.running():
future.set_result(True)
break

@tornado.gen.coroutine
def _process_msg(self, sid, subject, reply, data):
Expand Down
61 changes: 61 additions & 0 deletions tests/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -947,6 +947,38 @@ def parse(self, data=''):
self.assertTrue(nc.is_connected)
self.assertFalse(nc.is_reconnecting)

@tornado.testing.gen_test
def test_ping_slow_replies(self):
pongs = []
class Parser():
def __init__(self, nc):
self.nc = nc

@tornado.gen.coroutine
def parse(self, data=''):
pongs.append(data) # but, don't process now

nc = Client()
nc._ps = Parser(nc)
yield nc.connect(io_loop=self.io_loop,
ping_interval=0.1,
max_outstanding_pings=20)
yield tornado.gen.sleep(1)

# Should have received more than 5 pongs, but processed none.
self.assertTrue(len(pongs) > 5)
self.assertTrue(len(pongs) <= nc._pings_outstanding)
self.assertEqual(0, nc._pongs_received)
self.assertEqual(len(nc._pongs), nc._pings_outstanding)
# Process all that were sent.
expected_outstanding = nc._pings_outstanding
for i in range(nc._pings_outstanding):
yield nc._process_pong()
expected_outstanding -= 1
self.assertEqual(expected_outstanding, nc._pings_outstanding)
self.assertEqual(expected_outstanding, len(nc._pongs))
self.assertEqual(i + 1, nc._pongs_received)

@tornado.testing.gen_test
def test_flush_timeout(self):

Expand All @@ -969,6 +1001,35 @@ def parse(self, data=''):
self.assertEqual(1, len(nc._pongs))
self.assertEqual(1, nc._pings_outstanding)

@tornado.testing.gen_test
def test_flush_timeout_lost_message(self):

class Parser():
def __init__(self, nc):
self.nc = nc
self.drop_messages = False

@tornado.gen.coroutine
def parse(self, data=''):
if not self.drop_messages:
yield self.nc._process_pong()

nc = Client()
nc._ps = Parser(nc)
yield nc.connect(io_loop=self.io_loop)
nc._ps.drop_messages = True
with self.assertRaises(tornado.gen.TimeoutError):
yield nc.flush(timeout=1)
self.assertEqual(1, len(nc._pongs))
self.assertEqual(1, nc._pings_outstanding)
self.assertEqual(0, nc._pongs_received)
# Successful flush must clear timed out pong and the new one.
nc._ps.drop_messages = False
yield nc.flush(timeout=1)
self.assertEqual(0, len(nc._pongs))
self.assertEqual(0, nc._pings_outstanding)
self.assertEqual(2, nc._pongs_received)

@tornado.testing.gen_test
def test_timed_request_timeout(self):

Expand Down

0 comments on commit 0280980

Please sign in to comment.