diff --git a/nats/__init__.py b/nats/__init__.py index cd3de55..922a94f 100644 --- a/nats/__init__.py +++ b/nats/__init__.py @@ -1,2 +1,2 @@ -__version__ = b'0.5.2' +__version__ = b'0.5.4' __lang__ = b'python2' diff --git a/nats/io/client.py b/nats/io/client.py index 00fa363..7adf8a9 100644 --- a/nats/io/client.py +++ b/nats/io/client.py @@ -266,7 +266,7 @@ def _send_ping(self, future=None): if self._pings_outstanding > self.options["max_outstanding_pings"]: yield self._unbind() else: - yield self.io.write(PING_PROTO) + yield self.send_command(PING_PROTO) yield self._flush_pending() if future is None: future = tornado.concurrent.Future() @@ -308,7 +308,7 @@ def send_command(self, cmd, priority=False): self._pending.append(cmd) self._pending_size += len(cmd) - if len(self._pending) > DEFAULT_PENDING_SIZE: + if self._pending_size > DEFAULT_PENDING_SIZE: yield self._flush_pending() @tornado.gen.coroutine @@ -858,8 +858,8 @@ def _flusher_loop(self): and then flushes them to the socket. """ while True: - tmp_pending = [] - tmp_pending_size = 0 + pending = [] + pending_size = 0 try: # Block and wait for the flusher to be kicked yield self._flush_queue.get() @@ -873,17 +873,16 @@ def _flusher_loop(self): cmds = b''.join(self._pending) # Reset pending queue and store tmp in case write fails - self._pending, tmp_pending = [], self._pending - self._pending_size, tmp_pending_size = 0, self._pending_size - + self._pending, pending = [], self._pending + self._pending_size, pending_size = 0, self._pending_size yield self.io.write(cmds) except tornado.iostream.StreamBufferFullError: # Acumulate as pending data size and flush when possible. - self._pending = tmp_pending + self._pending - self._pending_size += tmp_pending_size + self._pending = pending + self._pending + self._pending_size += pending_size except tornado.iostream.StreamClosedError as e: - self._pending = tmp_pending + self._pending - self._pending_size += tmp_pending_size + self._pending = pending + self._pending + self._pending_size += pending_size self._err = e if self._error_cb is not None and not self.is_reconnecting: self._error_cb(e) diff --git a/tests/client_test.py b/tests/client_test.py index 76c3fd2..98a1aca 100644 --- a/tests/client_test.py +++ b/tests/client_test.py @@ -495,36 +495,126 @@ def test_publish(self): self.assertEqual(2, nc.stats['in_msgs']) self.assertEqual(2, nc.stats['out_msgs']) - @tornado.testing.gen_test + @tornado.testing.gen_test(timeout=15) def test_publish_race_condition(self): - # This tests a race condition fixed in #23 where a series of large publishes - # followed by a flush and another publish will cause the last publish to never get written + # This tests a race condition fixed in #23 where a series of + # large publishes followed by a flush and another publish + # will cause the last publish to never get written. nc = Client() yield nc.connect(io_loop=self.io_loop) - self.assertEqual(Client.CONNECTED, nc._status) - info_keys = nc._server_info.keys() - self.assertTrue(len(info_keys) > 0) + self.assertTrue(nc.is_connected) - inbox = new_inbox() - for i in range(100): - yield nc.publish("help.%s" % i, "A" * 1000000) - yield tornado.gen.moment # Unblocks flusher - yield nc.publish("help.final", "A") - yield tornado.gen.sleep(1.0) + @tornado.gen.coroutine + def sub(msg): + sub.msgs.append(msg) + if len(sub.msgs) == 501: + sub.future.set_result(True) + + sub.msgs = [] + sub.future = tornado.concurrent.Future() + yield nc.subscribe("help.*", cb=sub) + + # Close to 1MB payload + payload = "A" * 1000000 + + # Publish messages from 0..499 + for i in range(500): + yield nc.publish("help.%s" % i, payload) + + # Relinquish control often to unblock the flusher + yield tornado.gen.moment + yield nc.publish("help.500", "A") + + # Wait for the future to yield after receiving all the messages. + try: + yield tornado.gen.with_timeout(timedelta(seconds=10), sub.future) + except: + # Skip timeout in case it may occur and let test fail + # when checking how many messages we received in the end. + pass + + # We should definitely have all the messages + self.assertEqual(len(sub.msgs), 501) + + for i in range(501): + self.assertEqual(sub.msgs[i].subject, u"help.%s" % (i)) http = tornado.httpclient.AsyncHTTPClient() response = yield http.fetch('http://127.0.0.1:%d/varz' % self.server_pool[0].http_port) varz = json.loads(response.body) - self.assertEqual(100000001, varz['in_bytes']) - self.assertEqual(0, varz['out_bytes']) - self.assertEqual(101, varz['in_msgs']) - self.assertEqual(0, varz['out_msgs']) - self.assertEqual(0, nc.stats['in_bytes']) - self.assertEqual(100000001, nc.stats['out_bytes']) - self.assertEqual(0, nc.stats['in_msgs']) - self.assertEqual(101, nc.stats['out_msgs']) + self.assertEqual(500000001, varz['in_bytes']) + self.assertEqual(500000001, varz['out_bytes']) + self.assertEqual(501, varz['in_msgs']) + self.assertEqual(501, varz['out_msgs']) + self.assertEqual(500000001, nc.stats['in_bytes']) + self.assertEqual(500000001, nc.stats['out_bytes']) + self.assertEqual(501, nc.stats['in_msgs']) + self.assertEqual(501, nc.stats['out_msgs']) + + @tornado.testing.gen_test(timeout=15) + def test_publish_flush_race_condition(self): + # This tests a race condition fixed in #23 where a series of + # large publishes followed by a flush and another publish + # will cause the last publish to never get written. + nc = Client() + + yield nc.connect(io_loop=self.io_loop) + self.assertTrue(nc.is_connected) + + @tornado.gen.coroutine + def sub(msg): + sub.msgs.append(msg) + if len(sub.msgs) == 501: + sub.future.set_result(True) + + sub.msgs = [] + sub.future = tornado.concurrent.Future() + yield nc.subscribe("help.*", cb=sub) + + # Close to 1MB payload + payload = "A" * 1000000 + + # Publish messages from 0..499 + for i in range(500): + yield nc.publish("help.%s" % i, payload) + if i % 10 == 0: + # Relinquish control often to unblock the flusher + yield tornado.gen.moment + + yield nc.publish("help.500", "A") + + # Flushing and doing ping/pong should not cause commands + # to be dropped either. + yield nc.flush() + + # Wait for the future to yield after receiving all the messages. + try: + yield tornado.gen.with_timeout(timedelta(seconds=10), sub.future) + except: + # Skip timeout in case it may occur and let test fail + # when checking how many messages we received in the end. + pass + + # We should definitely have all the messages + self.assertEqual(len(sub.msgs), 501) + + for i in range(501): + self.assertEqual(sub.msgs[i].subject, u"help.%s" % (i)) + + http = tornado.httpclient.AsyncHTTPClient() + response = yield http.fetch('http://127.0.0.1:%d/varz' % self.server_pool[0].http_port) + varz = json.loads(response.body) + + self.assertEqual(500000001, varz['in_bytes']) + self.assertEqual(500000001, varz['out_bytes']) + self.assertEqual(501, varz['in_msgs']) + self.assertEqual(501, varz['out_msgs']) + self.assertEqual(500000001, nc.stats['in_bytes']) + self.assertEqual(500000001, nc.stats['out_bytes']) + self.assertEqual(501, nc.stats['in_msgs']) + self.assertEqual(501, nc.stats['out_msgs']) @tornado.testing.gen_test def test_unsubscribe(self):