Skip to content

Commit

Permalink
Merge pull request #24 from wallyqs/flusher-race-cond-fixes
Browse files Browse the repository at this point in the history
Flusher fixes
  • Loading branch information
Waldemar Quevedo authored Aug 23, 2017
2 parents a630cac + 064c6ad commit 2718837
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 32 deletions.
2 changes: 1 addition & 1 deletion nats/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
__version__ = b'0.5.2'
__version__ = b'0.5.4'
__lang__ = b'python2'
21 changes: 10 additions & 11 deletions nats/io/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ def _send_ping(self, future=None):
if self._pings_outstanding > self.options["max_outstanding_pings"]:
yield self._unbind()
else:
yield self.io.write(PING_PROTO)
yield self.send_command(PING_PROTO)
yield self._flush_pending()
if future is None:
future = tornado.concurrent.Future()
Expand Down Expand Up @@ -308,7 +308,7 @@ def send_command(self, cmd, priority=False):
self._pending.append(cmd)
self._pending_size += len(cmd)

if len(self._pending) > DEFAULT_PENDING_SIZE:
if self._pending_size > DEFAULT_PENDING_SIZE:
yield self._flush_pending()

@tornado.gen.coroutine
Expand Down Expand Up @@ -858,8 +858,8 @@ def _flusher_loop(self):
and then flushes them to the socket.
"""
while True:
tmp_pending = []
tmp_pending_size = 0
pending = []
pending_size = 0
try:
# Block and wait for the flusher to be kicked
yield self._flush_queue.get()
Expand All @@ -873,17 +873,16 @@ def _flusher_loop(self):
cmds = b''.join(self._pending)

# Reset pending queue and store tmp in case write fails
self._pending, tmp_pending = [], self._pending
self._pending_size, tmp_pending_size = 0, self._pending_size

self._pending, pending = [], self._pending
self._pending_size, pending_size = 0, self._pending_size
yield self.io.write(cmds)
except tornado.iostream.StreamBufferFullError:
# Acumulate as pending data size and flush when possible.
self._pending = tmp_pending + self._pending
self._pending_size += tmp_pending_size
self._pending = pending + self._pending
self._pending_size += pending_size
except tornado.iostream.StreamClosedError as e:
self._pending = tmp_pending + self._pending
self._pending_size += tmp_pending_size
self._pending = pending + self._pending
self._pending_size += pending_size
self._err = e
if self._error_cb is not None and not self.is_reconnecting:
self._error_cb(e)
Expand Down
130 changes: 110 additions & 20 deletions tests/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,36 +495,126 @@ def test_publish(self):
self.assertEqual(2, nc.stats['in_msgs'])
self.assertEqual(2, nc.stats['out_msgs'])

@tornado.testing.gen_test
@tornado.testing.gen_test(timeout=15)
def test_publish_race_condition(self):
# This tests a race condition fixed in #23 where a series of large publishes
# followed by a flush and another publish will cause the last publish to never get written
# This tests a race condition fixed in #23 where a series of
# large publishes followed by a flush and another publish
# will cause the last publish to never get written.
nc = Client()

yield nc.connect(io_loop=self.io_loop)
self.assertEqual(Client.CONNECTED, nc._status)
info_keys = nc._server_info.keys()
self.assertTrue(len(info_keys) > 0)
self.assertTrue(nc.is_connected)

inbox = new_inbox()
for i in range(100):
yield nc.publish("help.%s" % i, "A" * 1000000)
yield tornado.gen.moment # Unblocks flusher
yield nc.publish("help.final", "A")
yield tornado.gen.sleep(1.0)
@tornado.gen.coroutine
def sub(msg):
sub.msgs.append(msg)
if len(sub.msgs) == 501:
sub.future.set_result(True)

sub.msgs = []
sub.future = tornado.concurrent.Future()
yield nc.subscribe("help.*", cb=sub)

# Close to 1MB payload
payload = "A" * 1000000

# Publish messages from 0..499
for i in range(500):
yield nc.publish("help.%s" % i, payload)

# Relinquish control often to unblock the flusher
yield tornado.gen.moment
yield nc.publish("help.500", "A")

# Wait for the future to yield after receiving all the messages.
try:
yield tornado.gen.with_timeout(timedelta(seconds=10), sub.future)
except:
# Skip timeout in case it may occur and let test fail
# when checking how many messages we received in the end.
pass

# We should definitely have all the messages
self.assertEqual(len(sub.msgs), 501)

for i in range(501):
self.assertEqual(sub.msgs[i].subject, u"help.%s" % (i))

http = tornado.httpclient.AsyncHTTPClient()
response = yield http.fetch('http://127.0.0.1:%d/varz' % self.server_pool[0].http_port)
varz = json.loads(response.body)

self.assertEqual(100000001, varz['in_bytes'])
self.assertEqual(0, varz['out_bytes'])
self.assertEqual(101, varz['in_msgs'])
self.assertEqual(0, varz['out_msgs'])
self.assertEqual(0, nc.stats['in_bytes'])
self.assertEqual(100000001, nc.stats['out_bytes'])
self.assertEqual(0, nc.stats['in_msgs'])
self.assertEqual(101, nc.stats['out_msgs'])
self.assertEqual(500000001, varz['in_bytes'])
self.assertEqual(500000001, varz['out_bytes'])
self.assertEqual(501, varz['in_msgs'])
self.assertEqual(501, varz['out_msgs'])
self.assertEqual(500000001, nc.stats['in_bytes'])
self.assertEqual(500000001, nc.stats['out_bytes'])
self.assertEqual(501, nc.stats['in_msgs'])
self.assertEqual(501, nc.stats['out_msgs'])

@tornado.testing.gen_test(timeout=15)
def test_publish_flush_race_condition(self):
# This tests a race condition fixed in #23 where a series of
# large publishes followed by a flush and another publish
# will cause the last publish to never get written.
nc = Client()

yield nc.connect(io_loop=self.io_loop)
self.assertTrue(nc.is_connected)

@tornado.gen.coroutine
def sub(msg):
sub.msgs.append(msg)
if len(sub.msgs) == 501:
sub.future.set_result(True)

sub.msgs = []
sub.future = tornado.concurrent.Future()
yield nc.subscribe("help.*", cb=sub)

# Close to 1MB payload
payload = "A" * 1000000

# Publish messages from 0..499
for i in range(500):
yield nc.publish("help.%s" % i, payload)
if i % 10 == 0:
# Relinquish control often to unblock the flusher
yield tornado.gen.moment

yield nc.publish("help.500", "A")

# Flushing and doing ping/pong should not cause commands
# to be dropped either.
yield nc.flush()

# Wait for the future to yield after receiving all the messages.
try:
yield tornado.gen.with_timeout(timedelta(seconds=10), sub.future)
except:
# Skip timeout in case it may occur and let test fail
# when checking how many messages we received in the end.
pass

# We should definitely have all the messages
self.assertEqual(len(sub.msgs), 501)

for i in range(501):
self.assertEqual(sub.msgs[i].subject, u"help.%s" % (i))

http = tornado.httpclient.AsyncHTTPClient()
response = yield http.fetch('http://127.0.0.1:%d/varz' % self.server_pool[0].http_port)
varz = json.loads(response.body)

self.assertEqual(500000001, varz['in_bytes'])
self.assertEqual(500000001, varz['out_bytes'])
self.assertEqual(501, varz['in_msgs'])
self.assertEqual(501, varz['out_msgs'])
self.assertEqual(500000001, nc.stats['in_bytes'])
self.assertEqual(500000001, nc.stats['out_bytes'])
self.assertEqual(501, nc.stats['in_msgs'])
self.assertEqual(501, nc.stats['out_msgs'])

@tornado.testing.gen_test
def test_unsubscribe(self):
Expand Down

0 comments on commit 2718837

Please sign in to comment.