diff --git a/examples/advanced-example.py b/examples/advanced-example.py index 94aaa6c..b9fc2ea 100644 --- a/examples/advanced-example.py +++ b/examples/advanced-example.py @@ -15,68 +15,58 @@ import tornado.ioloop import tornado.gen -import time from nats.io import Client as NATS - +from nats.io.errors import ErrNoServers @tornado.gen.coroutine def main(): nc = NATS() - # Set pool servers in the cluster and give a name to the client. - options = { - "name": - "worker", - "servers": [ - "nats://secret:pass@127.0.0.1:4222", - "nats://secret:pass@127.0.0.1:4223", - "nats://secret:pass@127.0.0.1:4224" - ] - } - - # Explicitly set loop to use for the reactor. - options["io_loop"] = tornado.ioloop.IOLoop.instance() - - yield nc.connect(**options) + try: + # Setting explicit list of servers in a cluster and + # max reconnect retries. + servers = [ + "nats://127.0.0.1:4222", + "nats://127.0.0.1:4223", + "nats://127.0.0.1:4224" + ] + yield nc.connect(max_reconnect_attempts=2, servers=servers) + except ErrNoServers: + print("No servers available!") + return @tornado.gen.coroutine - def subscriber(msg): - yield nc.publish("discover", "pong") + def message_handler(msg): + subject = msg.subject + reply = msg.reply + data = msg.data.decode() + for i in range(0, 20): + yield nc.publish(reply, "i={i}".format(i=i).encode()) - yield nc.subscribe("discover", "", subscriber) + yield nc.subscribe("help.>", cb=message_handler) @tornado.gen.coroutine - def async_subscriber(msg): - # First request takes longer, while others are still processed. - if msg.subject == "requests.1": - yield tornado.gen.sleep(0.5) - print("Processed request [{0}]: {1}".format(msg.subject, msg)) - - # Create asynchronous subscription and make roundtrip to server - # to ensure that subscriptions have been processed. - yield nc.subscribe_async("requests.*", cb=async_subscriber) + def request_handler(msg): + subject = msg.subject + reply = msg.reply + data = msg.data.decode() + print("Received a message on '{subject} {reply}': {data}".format( + subject=subject, reply=reply, data=data)) + + # Signal the server to stop sending messages after we got 10 already. + yield nc.request( + "help.please", b'help', expected=10, cb=request_handler) + + # Flush connection to server, returns when all messages have been processed. + # It raises a timeout if roundtrip takes longer than 1 second. yield nc.flush() - for i in range(1, 10): - yield nc.publish("requests.{0}".format(i), "example") - yield tornado.gen.sleep(1) - while True: - # Confirm stats to implement basic throttling logic. - sent = nc.stats["out_msgs"] - received = nc.stats["in_msgs"] - delta = sent - received - - if delta > 2000: - print("Waiting... Sent: {0}, Received: {1}, Delta: {2}".format( - sent, received, delta)) - yield tornado.gen.sleep(1) - - if nc.stats["reconnects"] > 10: - print("[WARN] Reconnected over 10 times!") - - for i in range(1000): - yield nc.publish("discover", "ping") + # Drain gracefully closes the connection, allowing all subscribers to + # handle any pending messages inflight that the server may have sent. + yield nc.drain() + # Drain works async in the background. + yield tornado.gen.sleep(1) if __name__ == '__main__': tornado.ioloop.IOLoop.instance().run_sync(main) diff --git a/examples/example.py b/examples/example.py index e130356..1656c2b 100644 --- a/examples/example.py +++ b/examples/example.py @@ -54,7 +54,7 @@ def help_request_handler(msg): msg = yield nc.request("help", b"Hi, need help!", timeout=0.2) print("[Response]: %s" % msg.data) except tornado.gen.TimeoutError: - print("Timeout!") + print("Response Timeout!") # Remove interest in subscription. yield nc.unsubscribe(sid) diff --git a/examples/wildcard.py b/examples/wildcard.py new file mode 100644 index 0000000..dfdce6a --- /dev/null +++ b/examples/wildcard.py @@ -0,0 +1,26 @@ +import tornado.ioloop +import tornado.gen +import time +from nats.io import Client as NATS + +@tornado.gen.coroutine +def main(): + nc = NATS() + + yield nc.connect("demo.nats.io") + + @tornado.gen.coroutine + def subscriber(msg): + print("Msg received on [{0}]: {1}".format(msg.subject, msg.data)) + + yield nc.subscribe("foo.*.baz", "", subscriber) + yield nc.subscribe("foo.bar.*", "", subscriber) + yield nc.subscribe("foo.>", "", subscriber) + yield nc.subscribe(">", "", subscriber) + + # Matches all of above + yield nc.publish("foo.bar.baz", b"Hello World") + yield tornado.gen.sleep(1) + +if __name__ == '__main__': + tornado.ioloop.IOLoop.current().run_sync(main) diff --git a/nats/io/client.py b/nats/io/client.py index fb2e981..2794dd1 100644 --- a/nats/io/client.py +++ b/nats/io/client.py @@ -61,6 +61,7 @@ MAX_RECONNECT_ATTEMPTS = 60 RECONNECT_TIME_WAIT = 2 # seconds DEFAULT_CONNECT_TIMEOUT = 2 # seconds +DEFAULT_DRAIN_TIMEOUT = 30 # seconds DEFAULT_READ_BUFFER_SIZE = 1024 * 1024 * 10 DEFAULT_WRITE_BUFFER_SIZE = None @@ -145,6 +146,8 @@ class Client(object): CLOSED = 2 RECONNECTING = 3 CONNECTING = 4 + DRAINING_SUBS = 5 + DRAINING_PUBS = 6 def __repr__(self): return "".format(__version__) @@ -237,6 +240,7 @@ def connect(self, max_write_buffer_size=DEFAULT_WRITE_BUFFER_SIZE, read_chunk_size=DEFAULT_READ_CHUNK_SIZE, tcp_nodelay=False, + drain_timeout=DEFAULT_DRAIN_TIMEOUT, ): """ Establishes a connection to a NATS server. @@ -278,6 +282,7 @@ def connect(self, # In seconds self.options["connect_timeout"] = connect_timeout self.options["ping_interval"] = ping_interval + self.options["drain_timeout"] = drain_timeout # TLS customizations if tls is not None: @@ -523,6 +528,9 @@ def request(self, subject, payload, timeout=0.5, expected=1, cb=None): <<- MSG hello 2 _INBOX.gnKUg9bmAHANjxIsDiQsWO 5 """ + if self.is_draining: + raise ErrConnectionDraining + # If callback given then continue to use old style. if cb is not None: next_inbox = INBOX_PREFIX[:] @@ -654,6 +662,9 @@ def subscribe( if self.is_closed: raise ErrConnectionClosed + if self.is_draining: + raise ErrConnectionDraining + self._ssid += 1 sid = self._ssid sub = Subscription( @@ -794,6 +805,12 @@ def auto_unsubscribe(self, sid, limit=1): blocks in order to be able to define request/response semantics via pub/sub by announcing the server limited interest a priori. """ + if self.is_draining: + raise ErrConnectionDraining + yield self._unsubscribe(sid, limit) + + @tornado.gen.coroutine + def _unsubscribe(self, sid, limit=1): b_limit = b'' if limit > 0: b_limit = ("%d" % limit).encode() @@ -1015,12 +1032,20 @@ def is_reconnecting(self): @property def is_connected(self): - return self._status == Client.CONNECTED + return self._status == Client.CONNECTED or self.is_draining @property def is_connecting(self): return self._status == Client.CONNECTING + @property + def is_draining(self): + return (self._status == Client.DRAINING_SUBS or self._status == Client.DRAINING_PUBS) + + @property + def is_draining_pubs(self): + return self._status == Client.DRAINING_PUBS + @tornado.gen.coroutine def _process_op_err(self, err=None): """ @@ -1225,6 +1250,84 @@ def _close(self, status, do_callbacks=True): if self._closed_cb is not None: self._closed_cb() + @tornado.gen.coroutine + def drain(self, sid=None): + """ + Drain will put a connection into a drain state. All subscriptions will + immediately be put into a drain state. Upon completion, the publishers + will be drained and can not publish any additional messages. Upon draining + of the publishers, the connection will be closed. Use the `closed_cb' + option to know when the connection has moved from draining to closed. + + If a sid is passed, just the subscription with that sid will be drained + without closing the connection. + """ + if self.is_draining: + return + + if self.is_closed: + raise ErrConnectionClosed + + if self.is_connecting or self.is_reconnecting: + raise ErrConnectionReconnecting + + # Drain a single subscription + if sid is not None: + raise tornado.gen.Return(self._drain_sub(sid)) + + # Start draining the subscriptions + self._status = Client.DRAINING_SUBS + + drain_tasks = [] + for ssid, sub in self._subs.items(): + task = self._drain_sub(ssid) + drain_tasks.append(task) + + # Wait for subscriptions to stop handling messages. + drain_is_done = tornado.gen.multi(drain_tasks) + try: + yield tornado.gen.with_timeout( + timedelta(seconds=self.options["drain_timeout"]), + drain_is_done, + ) + except tornado.gen.TimeoutError: + if self._error_cb is not None: + yield self._error_cb(ErrDrainTimeout()) + finally: + self._status = Client.DRAINING_PUBS + yield self.flush() + yield self.close() + + def _drain_sub(self, sid): + sub = self._subs.get(sid) + if sub is None: + raise ErrBadSubscription + + drained = tornado.concurrent.Future() + + # Draining happens async under a task per sub which can be awaited. + @tornado.gen.coroutine + def drain_subscription(): + # Announce server that no longer want to receive more + # messages in this sub and just process the ones remaining. + yield self._unsubscribe(sid, limit=0) + yield self.flush() + + # Wait until no more messages are left or enough received, + # then cancel the subscription task. + while sub.pending_queue.qsize() > 0 and not sub.closed: + yield tornado.gen.sleep(0.1) + + # Subscription is done and won't be receiving further + # messages so can throw it away now. + self._subs.pop(sid, None) + self._remove_subscription(sub) + drained.set_result(True) + self._loop.spawn_callback(drain_subscription) + + # Return future which can be used to await draining is done. + return drained + @tornado.gen.coroutine def _process_err(self, err=None): """ diff --git a/nats/io/errors.py b/nats/io/errors.py index 7669fe8..a395639 100644 --- a/nats/io/errors.py +++ b/nats/io/errors.py @@ -38,6 +38,7 @@ class ErrJsonParse(NatsError): pass + class ErrSlowConsumer(NatsError): """ The client becomes a slow consumer if the server ends up @@ -78,3 +79,19 @@ class ErrServerConnect(socket.error): Raised when it could not establish a connection with server. """ pass + +class ErrBadSubscription(NatsError): + def __str__(self): + return "nats: Invalid Subscription" + +class ErrDrainTimeout(NatsError): + def __str__(self): + return "nats: Draining Connection Timed Out" + +class ErrConnectionDraining(NatsError): + def __str__(self): + return "nats: Connection Draining" + +class ErrConnectionReconnecting(NatsError): + def __str__(self): + return "nats: Connection Reconnecting" diff --git a/readme.md b/readme.md index 0cc8f4c..350ad0b 100644 --- a/readme.md +++ b/readme.md @@ -165,6 +165,7 @@ def main(): yield nc.connect("demo.nats.io") + @tornado.gen.coroutine def subscriber(msg): print("Msg received on [{0}]: {1}".format(msg.subject, msg.data)) @@ -186,67 +187,61 @@ if __name__ == '__main__': ```python import tornado.ioloop import tornado.gen -import time from nats.io import Client as NATS +from nats.io.errors import ErrNoServers @tornado.gen.coroutine def main(): nc = NATS() - # Set pool servers in the cluster and give a name to the client. - options = { - "name": "worker", - "servers": [ - "nats://secret:pass@127.0.0.1:4222", - "nats://secret:pass@127.0.0.1:4223", - "nats://secret:pass@127.0.0.1:4224" + try: + # Setting explicit list of servers in a cluster and + # max reconnect retries. + servers = [ + "nats://127.0.0.1:4222", + "nats://127.0.0.1:4223", + "nats://127.0.0.1:4224" ] - } - - # Explicitly set loop to use for the reactor. - options["loop"] = tornado.ioloop.IOLoop.current() - - yield nc.connect(**options) + yield nc.connect(max_reconnect_attempts=2, servers=servers) + except ErrNoServers: + print("No servers available!") + return @tornado.gen.coroutine - def subscriber(msg): - yield nc.publish("discover", "pong") + def message_handler(msg): + subject = msg.subject + reply = msg.reply + data = msg.data.decode() + for i in range(0, 20): + yield nc.publish(reply, "i={i}".format(i=i).encode()) - yield nc.subscribe("discover", "", subscriber) + yield nc.subscribe("help.>", cb=message_handler) @tornado.gen.coroutine - def async_subscriber(msg): - # First request takes longer, while others are still processed. - if msg.subject == "requests.1": - yield tornado.gen.sleep(0.5) - print("Processed request [{0}]: {1}".format(msg.subject, msg)) - - # Create asynchronous subscription and make roundtrip to server - # to ensure that subscriptions have been processed. - yield nc.subscribe_async("requests.*", cb=async_subscriber) - yield nc.flush() - for i in range(1, 10): - yield nc.publish("requests.{0}".format(i), "example") - yield tornado.gen.sleep(1) + def request_handler(msg): + subject = msg.subject + reply = msg.reply + data = msg.data.decode() + print("Received a message on '{subject} {reply}': {data}".format( + subject=subject, reply=reply, data=data)) - while True: - # Confirm stats to implement basic throttling logic. - sent = nc.stats["out_msgs"] - received = nc.stats["in_msgs"] - delta = sent - received + # Signal the server to stop sending messages after we got 10 already. + yield nc.request( + "help.please", b'help', expected=10, cb=request_handler) - if delta > 2000: - print("Waiting... Sent: {0}, Received: {1}, Delta: {2}".format(sent, received, delta)) - yield tornado.gen.sleep(1) + # Flush connection to server, returns when all messages have been processed. + # It raises a timeout if roundtrip takes longer than 1 second. + yield nc.flush() - if nc.stats["reconnects"] > 10: - print("[WARN] Reconnected over 10 times!") + # Drain gracefully closes the connection, allowing all subscribers to + # handle any pending messages inflight that the server may have sent. + yield nc.drain() - for i in range(1000): - yield nc.publish("discover", "ping") + # Drain works async in the background. + yield tornado.gen.sleep(1) if __name__ == '__main__': - tornado.ioloop.IOLoop.current().run_sync(main) + tornado.ioloop.IOLoop.instance().run_sync(main) ``` ### TLS @@ -255,18 +250,25 @@ Advanced [customizations options](https://docs.python.org/2/library/ssl.html#ssl for setting up a secure connection can be done by including them on connect: ```python - # Establish secure connection to the server, tls options parameterize - # the wrap_socket available from ssl python package. - options = { - "servers": ["nats://127.0.0.1:4444"], - "tls": { - "cert_reqs": ssl.CERT_REQUIRED, - "ca_certs": "./configs/certs/ca.pem", - "keyfile": "./configs/certs/client-key.pem", - "certfile": "./configs/certs/client-cert.pem" - } - } - yield nc.connect(**options) +# Establish secure connection to the server, tls options parameterize +# the wrap_socket available from ssl python package. +options = { + "servers": ["nats://127.0.0.1:4444"], + "tls": { + "cert_reqs": ssl.CERT_REQUIRED, + "ca_certs": "./configs/certs/ca.pem", + "keyfile": "./configs/certs/client-key.pem", + "certfile": "./configs/certs/client-cert.pem" + } + } +yield nc.connect(**options) +``` + +The client will also automatically create a TLS context with defaults +in case it detects that it should connect securely against the server: + +``` +yield nc.connect("tls://demo.nats.io:4443") ``` ## Examples diff --git a/tests/client_test.py b/tests/client_test.py index 02e653a..bf52932 100644 --- a/tests/client_test.py +++ b/tests/client_test.py @@ -1937,7 +1937,7 @@ def disconnected_cb(self): self.disconnected_cb_called = True if not self.disconnected_future.done(): self.disconnected_future.set_result(True) - + def reconnected_cb(self): self.reconnected_cb_called = True if not self.reconnected_future.done(): @@ -2341,6 +2341,332 @@ def test_servers_discovery_no_randomize(self): self.assertEqual([4222, 4223, 4224], srvs) yield nc.close() +class ClientDrainTest(tornado.testing.AsyncTestCase): + def setUp(self): + print("\n=== RUN {0}.{1}".format(self.__class__.__name__, + self._testMethodName)) + self.threads = [] + self.server_pool = [] + + server = Gnatsd(port=4225, http_port=8225) + self.server_pool.append(server) + + for gnatsd in self.server_pool: + t = threading.Thread(target=gnatsd.start) + self.threads.append(t) + t.start() + + http = tornado.httpclient.HTTPClient() + while True: + try: + response = http.fetch('http://127.0.0.1:8225/varz') + if response.code == 200: + break + continue + except: + time.sleep(0.1) + continue + super(ClientDrainTest, self).setUp() + + def tearDown(self): + for gnatsd in self.server_pool: + gnatsd.finish() + for t in self.threads: + t.join() + super(ClientDrainTest, self).tearDown() + + @tornado.testing.gen_test + def test_drain_closes_connection(self): + nc = Client() + future = tornado.concurrent.Future() + + @tornado.gen.coroutine + def closed_cb(): + future.set_result(True) + + @tornado.gen.coroutine + def cb(msg): + pass + + yield nc.connect("127.0.0.1:4225", + loop=self.io_loop, + closed_cb=closed_cb, + ) + yield nc.subscribe("foo", cb=cb) + yield nc.subscribe("bar", cb=cb) + yield nc.subscribe("quux", cb=cb) + yield nc.drain() + self.assertEqual(0, len(nc._subs)) + self.assertTrue(True, nc.is_closed) + + @tornado.testing.gen_test + def test_drain_closes_connection(self): + nc = Client() + future = tornado.concurrent.Future() + + @tornado.gen.coroutine + def closed_cb(): + future.set_result(True) + + @tornado.gen.coroutine + def cb(msg): + pass + + yield nc.connect("127.0.0.1:4225", + loop=self.io_loop, + closed_cb=closed_cb, + ) + yield nc.subscribe("foo", cb=cb) + yield nc.subscribe("bar", cb=cb) + yield nc.subscribe("quux", cb=cb) + yield nc.drain() + yield tornado.gen.with_timeout(timedelta(seconds=1), future) + self.assertEqual(0, len(nc._subs)) + self.assertTrue(True, nc.is_closed) + + @tornado.testing.gen_test + def test_drain_invalid_subscription(self): + nc = NATS() + + yield nc.connect("127.0.0.1:4225", + loop=self.io_loop, + ) + msgs = [] + + @tornado.gen.coroutine + def cb(msg): + msgs.append(msg) + + yield nc.subscribe("foo", cb=cb) + yield nc.subscribe("bar", cb=cb) + yield nc.subscribe("quux", cb=cb) + + with self.assertRaises(ErrBadSubscription): + yield nc.drain(sid=4) + yield nc.close() + self.assertTrue(nc.is_closed) + + @tornado.testing.gen_test + def test_drain_single_subscription(self): + nc = NATS() + yield nc.connect("127.0.0.1:4225", loop=self.io_loop) + + msgs = [] + + @tornado.gen.coroutine + def handler(msg): + msgs.append(msg) + if len(msgs) == 10: + yield tornado.gen.sleep(0.5) + + sid = yield nc.subscribe("foo", cb=handler) + + for i in range(0, 200): + yield nc.publish("foo", b'hi') + + # Relinquish control so that messages are processed. + yield tornado.gen.moment + yield nc.flush() + + sub = nc._subs[sid] + before_drain = sub.pending_queue.qsize() + self.assertTrue(before_drain > 0) + + drain_task = yield nc.drain(sid=sid) + yield tornado.gen.with_timeout(timedelta(seconds=1), drain_task) + + for i in range(0, 200): + yield nc.publish("foo", b'hi') + + # Relinquish control so that messages are processed. + yield tornado.gen.moment + + # No more messages should have been processed. + after_drain = sub.pending_queue.qsize() + self.assertEqual(0, after_drain) + self.assertEqual(200, len(msgs)) + + yield nc.close() + self.assertTrue(nc.is_closed) + self.assertFalse(nc.is_connected) + + @tornado.testing.gen_test(timeout=15) + def test_drain_connection(self): + nc = NATS() + errors = [] + drain_done = tornado.concurrent.Future() + + @tornado.gen.coroutine + def error_cb(e): + errors.append(e) + + @tornado.gen.coroutine + def closed_cb(): + drain_done.set_result(True) + + yield nc.connect("127.0.0.1:4225", + loop=self.io_loop, + closed_cb=closed_cb, + error_cb=error_cb, + ) + + nc2 = NATS() + yield nc2.connect("127.0.0.1:4225", loop=self.io_loop) + + msgs = [] + + @tornado.gen.coroutine + def foo_handler(msg): + if len(msgs) % 20 == 1: + yield tornado.gen.sleep(0.2) + if len(msgs) % 50 == 1: + yield tornado.gen.sleep(0.5) + if msg.reply != "": + yield nc.publish_request(msg.reply, "foo", b'OK!') + yield nc.flush() + + @tornado.gen.coroutine + def bar_handler(msg): + if len(msgs) % 20 == 1: + yield tornado.gen.sleep(0.2) + if len(msgs) % 50 == 1: + yield tornado.gen.sleep(0.5) + if msg.reply != "": + yield nc.publish_request(msg.reply, "bar", b'OK!') + yield nc.flush() + + @tornado.gen.coroutine + def quux_handler(msg): + if len(msgs) % 20 == 1: + yield tornado.gen.sleep(0.2) + if len(msgs) % 50 == 1: + yield tornado.gen.sleep(0.5) + if msg.reply != "": + yield nc.publish_request(msg.reply, "quux", b'OK!') + yield nc.flush() + + sid_foo = yield nc.subscribe("foo", cb=foo_handler) + sid_bar = yield nc.subscribe("bar", cb=bar_handler) + sid_quux = yield nc.subscribe("quux", cb=quux_handler) + + @tornado.gen.coroutine + def replies(msg): + msgs.append(msg) + + yield nc2.subscribe("my-replies.*", cb=replies) + for i in range(0, 201): + yield nc2.publish_request("foo", "my-replies.AAA", b'help') + yield nc2.publish_request("bar", "my-replies.BBB", b'help') + yield nc2.publish_request("quux", "my-replies.CCC", b'help') + + # Relinquish control so that messages are processed. + yield tornado.gen.moment + yield nc2.flush() + + sub_foo = nc._subs[sid_foo] + sub_bar = nc._subs[sid_bar] + sub_quux = nc._subs[sid_quux] + self.assertTrue(sub_foo.pending_queue.qsize() > 0) + self.assertTrue(sub_bar.pending_queue.qsize() > 0) + self.assertTrue(sub_quux.pending_queue.qsize() > 0) + + # Drain and close the connection. In case of timeout then + # an async error will be emitted via the error callback. + self.io_loop.spawn_callback(nc.drain) + + # Let the draining task a bit of time to run... + yield tornado.gen.sleep(0.5) + + # Subscribe and request cause errors at this point. + with self.assertRaises(ErrConnectionDraining): + yield nc.subscribe("hello", cb=foo_handler) + with self.assertRaises(ErrConnectionDraining): + yield nc.request("hello", b"world") + + # Should be no-op or bail if connection closed. + yield nc.drain() + + # State should be closed here already, + yield tornado.gen.with_timeout(timedelta(seconds=10), drain_done) + + self.assertEqual(sub_foo.pending_queue.qsize(), 0) + self.assertEqual(sub_bar.pending_queue.qsize(), 0) + self.assertEqual(sub_quux.pending_queue.qsize(), 0) + self.assertEqual(0, len(nc._subs.items())) + self.assertEqual(1, len(nc2._subs.items())) + self.assertTrue(len(msgs) > 599) + + # No need to close first connection since drain reaches + # the closed state. + yield nc2.close() + self.assertTrue(nc.is_closed) + self.assertFalse(nc.is_connected) + self.assertTrue(nc2.is_closed) + self.assertFalse(nc2.is_connected) + + @tornado.testing.gen_test(timeout=15) + def test_drain_connection_timeout(self): + nc = NATS() + errors = [] + drain_done = tornado.concurrent.Future() + + @tornado.gen.coroutine + def error_cb(e): + errors.append(e) + + @tornado.gen.coroutine + def closed_cb(): + drain_done.set_result(True) + + yield nc.connect("127.0.0.1:4225", + loop=self.io_loop, + closed_cb=closed_cb, + error_cb=error_cb, + drain_timeout=0.1, + ) + + nc2 = NATS() + yield nc2.connect("127.0.0.1:4225", loop=self.io_loop) + + msgs = [] + + @tornado.gen.coroutine + def handler(msg): + if len(msgs) % 20 == 1: + yield tornado.gen.sleep(0.2) + if len(msgs) % 50 == 1: + yield tornado.gen.sleep(0.5) + if msg.reply != "": + yield nc.publish_request(msg.reply, "foo", b'OK!') + yield nc.flush() + sid_foo = yield nc.subscribe("foo", cb=handler) + + @tornado.gen.coroutine + def replies(msg): + msgs.append(msg) + + yield nc2.subscribe("my-replies.*", cb=replies) + for i in range(0, 201): + yield nc2.publish_request("foo", "my-replies.AAA", b'help') + yield nc2.publish_request("bar", "my-replies.BBB", b'help') + yield nc2.publish_request("quux", "my-replies.CCC", b'help') + + # Relinquish control so that messages are processed. + yield tornado.gen.moment + yield nc2.flush() + + # Drain and close the connection. In case of timeout then + # an async error will be emitted via the error callback. + yield nc.drain() + self.assertTrue(type(errors[0]) is ErrDrainTimeout) + + # No need to close first connection since drain reaches + # the closed state. + yield nc2.close() + self.assertTrue(nc.is_closed) + self.assertFalse(nc.is_connected) + self.assertTrue(nc2.is_closed) + self.assertFalse(nc2.is_connected) if __name__ == '__main__': runner = unittest.TextTestRunner(stream=sys.stdout)