Skip to content

Commit

Permalink
Merge pull request #23 from theisensanders-wf/flusher-race-cond
Browse files Browse the repository at this point in the history
Reset pending queue before IOStream write begins
  • Loading branch information
Waldemar Quevedo authored Aug 23, 2017
2 parents 232dcff + 5b51e49 commit a630cac
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 4 deletions.
17 changes: 13 additions & 4 deletions nats/io/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,8 @@ def _flusher_loop(self):
and then flushes them to the socket.
"""
while True:
tmp_pending = []
tmp_pending_size = 0
try:
# Block and wait for the flusher to be kicked
yield self._flush_queue.get()
Expand All @@ -868,13 +870,20 @@ def _flusher_loop(self):

# Flush only when we actually have something in buffer...
if self._pending_size > 0:
yield self.io.write(b''.join(self._pending))
self._pending = []
self._pending_size = 0
cmds = b''.join(self._pending)

# Reset pending queue and store tmp in case write fails
self._pending, tmp_pending = [], self._pending
self._pending_size, tmp_pending_size = 0, self._pending_size

yield self.io.write(cmds)
except tornado.iostream.StreamBufferFullError:
# Acumulate as pending data size and flush when possible.
pass
self._pending = tmp_pending + self._pending
self._pending_size += tmp_pending_size
except tornado.iostream.StreamClosedError as e:
self._pending = tmp_pending + self._pending
self._pending_size += tmp_pending_size
self._err = e
if self._error_cb is not None and not self.is_reconnecting:
self._error_cb(e)
Expand Down
31 changes: 31 additions & 0 deletions tests/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,37 @@ def test_publish(self):
self.assertEqual(2, nc.stats['in_msgs'])
self.assertEqual(2, nc.stats['out_msgs'])

@tornado.testing.gen_test
def test_publish_race_condition(self):
# This tests a race condition fixed in #23 where a series of large publishes
# followed by a flush and another publish will cause the last publish to never get written
nc = Client()

yield nc.connect(io_loop=self.io_loop)
self.assertEqual(Client.CONNECTED, nc._status)
info_keys = nc._server_info.keys()
self.assertTrue(len(info_keys) > 0)

inbox = new_inbox()
for i in range(100):
yield nc.publish("help.%s" % i, "A" * 1000000)
yield tornado.gen.moment # Unblocks flusher
yield nc.publish("help.final", "A")
yield tornado.gen.sleep(1.0)

http = tornado.httpclient.AsyncHTTPClient()
response = yield http.fetch('http://127.0.0.1:%d/varz' % self.server_pool[0].http_port)
varz = json.loads(response.body)

self.assertEqual(100000001, varz['in_bytes'])
self.assertEqual(0, varz['out_bytes'])
self.assertEqual(101, varz['in_msgs'])
self.assertEqual(0, varz['out_msgs'])
self.assertEqual(0, nc.stats['in_bytes'])
self.assertEqual(100000001, nc.stats['out_bytes'])
self.assertEqual(0, nc.stats['in_msgs'])
self.assertEqual(101, nc.stats['out_msgs'])

@tornado.testing.gen_test
def test_unsubscribe(self):
nc = Client()
Expand Down

0 comments on commit a630cac

Please sign in to comment.