From 9d0761126714968c60f22b22cf2bfce948c87ad8 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Thu, 31 May 2018 02:48:11 -0700 Subject: [PATCH 01/11] Add slots to Msg class Signed-off-by: Waldemar Quevedo --- nats/io/client.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/nats/io/client.py b/nats/io/client.py index a7b9642..0b69415 100644 --- a/nats/io/client.py +++ b/nats/io/client.py @@ -1039,13 +1039,14 @@ def __init__( class Msg(object): - def __init__( - self, - subject='', - reply='', - data=b'', - sid=0, - ): + __slots__ = 'subject', 'reply', 'data', 'sid' + + def __init__(self, + subject='', + reply='', + data=b'', + sid=0, + ): self.subject = subject self.reply = reply self.data = data From 86be7c80da82184c84ce51ade784bea5f032fb21 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Thu, 31 May 2018 06:24:14 -0600 Subject: [PATCH 02/11] Add repr implementation to Msg Signed-off-by: Waldemar Quevedo --- nats/io/client.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/nats/io/client.py b/nats/io/client.py index 0b69415..296933a 100644 --- a/nats/io/client.py +++ b/nats/io/client.py @@ -1052,6 +1052,13 @@ def __init__(self, self.data = data self.sid = sid + def __repr__(self): + return "<{}: subject='{}' reply='{}' data='{}...'>".format( + self.__class__.__name__, + self.subject, + self.reply, + self.data[:10].decode(), + ) class Srv(object): """ From d65f470f8734fb2978c596f60a6e54dd92ab3245 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Thu, 31 May 2018 06:27:06 -0600 Subject: [PATCH 03/11] Add message processor cb per sub Signed-off-by: Waldemar Quevedo --- nats/io/client.py | 122 ++++++++++++++++++++++++++++++++++------------ 1 file changed, 92 insertions(+), 30 deletions(-) diff --git a/nats/io/client.py b/nats/io/client.py index 296933a..c1aad57 100644 --- a/nats/io/client.py +++ b/nats/io/client.py @@ -67,6 +67,10 @@ DEFAULT_PENDING_SIZE = 1024 * 1024 DEFAULT_MAX_PAYLOAD_SIZE = 1048576 +# Default Pending Limits of Subscriptions +DEFAULT_SUB_PENDING_MSGS_LIMIT = 65536 +DEFAULT_SUB_PENDING_BYTES_LIMIT = 65536 * 1024 + PROTOCOL = 1 INBOX_PREFIX = bytearray(b'_INBOX.') @@ -118,7 +122,6 @@ def __init__(self): self._ps = Parser(self) self._err = None self._flush_queue = None - self._flusher_task = None self._nuid = NUID() @@ -466,11 +469,15 @@ def subscribe(self, cb=None, future=None, max_msgs=0, - is_async=False): + is_async=False, + pending_msgs_limit=DEFAULT_SUB_PENDING_MSGS_LIMIT, + pending_bytes_limit=DEFAULT_SUB_PENDING_BYTES_LIMIT, + ): """ - Sends a SUB command to the server. Takes a queue parameter which can be used - in case of distributed queues or left empty if it is not the case, and a callback - that will be dispatched message for processing them. + Sends a SUB command to the server. Takes a queue parameter + which can be used in case of distributed queues or left empty + if it is not the case, and a callback that will be dispatched + message for processing them. """ if self.is_closed: raise ErrConnectionClosed @@ -486,7 +493,55 @@ def subscribe(self, is_async=is_async, ) self._subs[sid] = sub - yield self._subscribe(sub, sid) + + if cb is not None: + sub.pending_msgs_limit = pending_msgs_limit + sub.pending_bytes_limit = pending_bytes_limit + sub.pending_queue = tornado.queues.Queue( + maxsize=pending_msgs_limit, + ) + + @tornado.gen.coroutine + def wait_for_msgs(): + sub = wait_for_msgs.sub + err_cb = wait_for_msgs.err_cb + + while True: + try: + msg = yield sub.pending_queue.get() + sub.pending_size -= len(msg.data) + + # Invoke depending of type of handler. + if sub.is_async: + # NOTE: Deprecate this usage in a next release, + # the handler implementation ought to decide + # the concurrency level at which the messages + # should be processed. + self._loop.spawn_callback(sub.cb, msg) + else: + # Call it and take the possible future in the loop. + yield sub.cb(msg) + except Exception as e: + # All errors from calling an async subscriber + # handler are async errors. + if err_cb is not None: + yield err_cb(e) + + # Bind the subscription and error cb if present + wait_for_msgs.sub = sub + wait_for_msgs.err_cb = self._error_cb + sub.wait_for_msgs_task = self._loop.spawn_callback(wait_for_msgs) + + elif future is not None: + # Used to handle the single response from a request + # based on auto unsubscribe. + sub.future = future + + # Send SUB command... + sub_cmd = b''.join([SUB_OP, _SPC_, sub.subject.encode( + ), _SPC_, sub.queue.encode(), _SPC_, ("%d" % sid).encode(), _CRLF_]) + yield self.send_command(sub_cmd) + yield self._flush_pending() raise tornado.gen.Return(sid) @tornado.gen.coroutine @@ -527,19 +582,6 @@ def unsubscribe(self, ssid, max_msgs=0): if not self.is_reconnecting: yield self.auto_unsubscribe(ssid, max_msgs) - @tornado.gen.coroutine - def _subscribe(self, sub, ssid): - """ - Generates a SUB command given a Subscription and the subject sequence id. - """ - sub_cmd = b''.join([ - SUB_OP, _SPC_, - sub.subject.encode(), _SPC_, - sub.queue.encode(), _SPC_, ("%d" % ssid).encode(), _CRLF_ - ]) - yield self.send_command(sub_cmd) - yield self._flush_pending() - @tornado.gen.coroutine def auto_unsubscribe(self, sid, limit=1): """ @@ -590,8 +632,9 @@ def _process_msg(self, sid, subject, reply, data): dispatched to a passed callback. In case there was not a callback, then it tries to set the message into a future. """ + payload_size = len(data) self.stats['in_msgs'] += 1 - self.stats['in_bytes'] += len(data) + self.stats['in_bytes'] += payload_size msg = Msg(subject=subject.decode(), reply=reply.decode(), data=data) @@ -605,17 +648,29 @@ def _process_msg(self, sid, subject, reply, data): # Enough messages so can throwaway subscription now. self._subs.pop(sid, None) - if sub.cb is not None: - if sub.is_async: - self._loop.spawn_callback(sub.cb, msg) - else: - # Call it and take the possible future in the loop. - maybe_future = sub.cb(msg) - if maybe_future is not None and type( - maybe_future) is tornado.concurrent.Future: - yield maybe_future - elif sub.future is not None: + # Check if it is an old style request. + if sub.future is not None: sub.future.set_result(msg) + raise tornado.gen.Return() + + # Let subscription wait_for_msgs coroutine process the messages, + # but in case sending to the subscription task would block, + # then consider it to be an slow consumer and drop the message. + try: + sub.pending_size += payload_size + if sub.pending_size >= sub.pending_bytes_limit: + # Substract again the bytes since throwing away + # the message so would not be pending data. + sub.pending_size -= payload_size + + if self._error_cb is not None: + yield self._error_cb(ErrSlowConsumer()) + raise tornado.gen.Return() + + yield sub.pending_queue.put_nowait(msg) + except tornado.queues.QueueFull: + if self._error_cb is not None: + yield self._error_cb(ErrSlowConsumer()) @tornado.gen.coroutine def _process_connect_init(self): @@ -1037,6 +1092,13 @@ def __init__( self.is_async = is_async self.received = 0 + # Per subscription message processor + self.pending_msgs_limit = None + self.pending_bytes_limit = None + self.pending_queue = None + self.pending_size = 0 + self.wait_for_msgs_task = None + class Msg(object): __slots__ = 'subject', 'reply', 'data', 'sid' From d27ee473d37767f0c7b59a72c1f972402e5fb939 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Thu, 31 May 2018 07:47:23 -0600 Subject: [PATCH 04/11] Add new request/response style implementation Signed-off-by: Waldemar Quevedo --- benchmark/latency_perf.py | 91 +++++++++++++++++++-------------------- nats/io/client.py | 81 ++++++++++++++++++++++++++++++---- 2 files changed, 117 insertions(+), 55 deletions(-) diff --git a/benchmark/latency_perf.py b/benchmark/latency_perf.py index 18fb85c..77ea798 100644 --- a/benchmark/latency_perf.py +++ b/benchmark/latency_perf.py @@ -31,53 +31,50 @@ def show_usage_and_die(): @tornado.gen.coroutine def main(): - parser = argparse.ArgumentParser() - parser.add_argument( - '-n', '--iterations', default=DEFAULT_ITERATIONS, type=int) - parser.add_argument('-S', '--subject', default='test') - parser.add_argument('--servers', default=[], action='append') - args = parser.parse_args() - - servers = args.servers - if len(args.servers) < 1: - servers = ["nats://127.0.0.1:4222"] - opts = {"servers": servers} - - # Make sure we're connected to a server first... - nc = NATS() - try: - yield nc.connect(**opts) - except Exception, e: - sys.stderr.write("ERROR: {0}".format(e)) - show_usage_and_die() - - @tornado.gen.coroutine - def handler(msg): - yield nc.publish(msg.reply, "") - - yield nc.subscribe(args.subject, cb=handler) - - # Start the benchmark - start = time.time() - to_send = args.iterations - - print("Sending {0} request/responses on [{1}]".format( - args.iterations, args.subject)) - while to_send > 0: - to_send -= 1 - if to_send == 0: - break - - yield nc.timed_request(args.subject, "") - if (to_send % HASH_MODULO) == 0: - sys.stdout.write("+") - sys.stdout.flush() - - duration = time.time() - start - ms = "%.3f" % ((duration / args.iterations) * 1000) - print("\nTest completed : {0} ms avg request/response latency".format(ms)) - yield nc.close() - + parser = argparse.ArgumentParser() + parser.add_argument('-n', '--iterations', default=DEFAULT_ITERATIONS, type=int) + parser.add_argument('-S', '--subject', default='test') + parser.add_argument('--servers', default=[], action='append') + args = parser.parse_args() + + servers = args.servers + if len(args.servers) < 1: + servers = ["nats://127.0.0.1:4222"] + opts = { "servers": servers } + + # Make sure we're connected to a server first... + nc = NATS() + try: + yield nc.connect(**opts) + except Exception, e: + sys.stderr.write("ERROR: {0}".format(e)) + show_usage_and_die() + + @tornado.gen.coroutine + def handler(msg): + yield nc.publish(msg.reply, "") + yield nc.subscribe(args.subject, cb=handler) + + # Start the benchmark + start = time.time() + to_send = args.iterations + + print("Sending {0} request/responses on [{1}]".format( + args.iterations, args.subject)) + while to_send > 0: + to_send -= 1 + if to_send == 0: + break + + yield nc.request(args.subject, "") + if (to_send % HASH_MODULO) == 0: + sys.stdout.write("+") + sys.stdout.flush() + + duration = time.time() - start + ms = "%.3f" % ((duration/args.iterations) * 1000) + print("\nTest completed : {0} ms avg request/response latency".format(ms)) + yield nc.close() if __name__ == '__main__': tornado.ioloop.IOLoop.instance().run_sync(main) diff --git a/nats/io/client.py b/nats/io/client.py index c1aad57..48eb6d3 100644 --- a/nats/io/client.py +++ b/nats/io/client.py @@ -73,6 +73,7 @@ PROTOCOL = 1 INBOX_PREFIX = bytearray(b'_INBOX.') +INBOX_PREFIX_LEN = len(INBOX_PREFIX) + 22 + 1 class Client(object): @@ -123,6 +124,10 @@ def __init__(self): self._err = None self._flush_queue = None + # New style request/response + self._resp_sub = None + self._resp_map = None + self._resp_sub_prefix = None self._nuid = NUID() # Ping interval to disconnect from unhealthy servers. @@ -414,7 +419,7 @@ def _flush_timeout(self, timeout): raise tornado.gen.Return(result) @tornado.gen.coroutine - def request(self, subject, payload, expected=1, cb=None): + def request(self, subject, payload, timeout=0.5, expected=1, cb=None): """ Implements the request/response pattern via pub/sub using an ephemeral subscription which will be published @@ -427,13 +432,73 @@ def request(self, subject, payload, expected=1, cb=None): <<- MSG hello 2 _INBOX.gnKUg9bmAHANjxIsDiQsWO 5 """ - next_inbox = INBOX_PREFIX[:] - next_inbox.extend(self._nuid.next()) - inbox = str(next_inbox) - sid = yield self.subscribe(inbox, _EMPTY_, cb) - yield self.auto_unsubscribe(sid, expected) - yield self.publish_request(subject, inbox, payload) - raise tornado.gen.Return(sid) + # If callback given then continue to use old style. + if cb is not None: + next_inbox = INBOX_PREFIX[:] + next_inbox.extend(self._nuid.next()) + inbox = str(next_inbox) + sid = yield self.subscribe(inbox, _EMPTY_, cb) + yield self.auto_unsubscribe(sid, expected) + yield self.publish_request(subject, inbox, payload) + raise tornado.gen.Return(sid) + + if self._resp_sub_prefix is None: + self._resp_map = {} + + # Create a prefix and single wildcard subscription once. + self._resp_sub_prefix = str(INBOX_PREFIX[:]) + self._resp_sub_prefix += self._nuid.next() + self._resp_sub_prefix += b'.' + resp_mux_subject = str(self._resp_sub_prefix[:]) + resp_mux_subject += b'*' + sub = Subscription(subject=str(resp_mux_subject)) + + # FIXME: Allow setting pending limits for responses mux subscription. + sub.pending_msgs_limit = DEFAULT_SUB_PENDING_MSGS_LIMIT + sub.pending_bytes_limit = DEFAULT_SUB_PENDING_BYTES_LIMIT + sub.pending_queue = tornado.queues.Queue( + maxsize=sub.pending_msgs_limit, + ) + + # Single task for handling the requests + @tornado.gen.coroutine + def wait_for_msgs(): + while True: + msg = yield sub.pending_queue.get() + token = msg.subject[INBOX_PREFIX_LEN:] + try: + fut = self._resp_map[token] + fut.set_result(msg) + del self._resp_map[token] + except KeyError: + # Future already handled so drop any extra + # responses which may have made it. + continue + + wait_for_msgs.sub = sub + sub.wait_for_msgs_task = self._loop.spawn_callback(wait_for_msgs) + + # Store the subscription in the subscriptions map, + # then send the protocol commands to the server. + self._ssid += 1 + sid = self._ssid + self._subs[sid] = sub + + # Send SUB command... + sub_cmd = b''.join([SUB_OP, _SPC_, sub.subject.encode( + ), _SPC_, ("%d" % sid).encode(), _CRLF_]) + yield self.send_command(sub_cmd) + yield self._flush_pending() + + # Use a new NUID for the token inbox and then use the future. + token = self._nuid.next() + inbox = self._resp_sub_prefix[:] + inbox.extend(token) + future = tornado.concurrent.Future() + self._resp_map[token.decode()] = future + yield self.publish_request(subject, str(inbox), payload) + msg = yield tornado.gen.with_timeout(timedelta(seconds=timeout), future) + raise tornado.gen.Return(msg) @tornado.gen.coroutine def timed_request(self, subject, payload, timeout=0.5): From f1a336a4b4dfc5bb350b08eaeb3a675456c7620c Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Tue, 12 Jun 2018 13:36:09 -0700 Subject: [PATCH 05/11] Add test about concurrent message processing Signed-off-by: Waldemar Quevedo --- tests/client_test.py | 49 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/tests/client_test.py b/tests/client_test.py index 12856fc..6d7c2dc 100644 --- a/tests/client_test.py +++ b/tests/client_test.py @@ -1076,6 +1076,55 @@ def test_process_message_subscription_not_present(self): nc = Client() yield nc._process_msg(387, 'some-subject', 'some-reply', [0, 1, 2]) + @tornado.testing.gen_test + def test_subscribe_async_process_messages_concurrently(self): + nc = Client() + + yield nc.connect(io_loop=self.io_loop) + + @tornado.gen.coroutine + def sub_foo_handler(msg): + msgs = sub_foo_handler.msgs + msgs.append(msg) + + # Should not block other subscriptions processing + # the messages in parallel... + yield tornado.gen.sleep(1) + + sub_foo_handler.msgs = [] + yield nc.subscribe("foo", cb=sub_foo_handler) + + @tornado.gen.coroutine + def sub_bar_handler(msg): + nc = sub_bar_handler.nc + msgs = sub_bar_handler.msgs + msgs.append(msg) + yield nc.publish(msg.reply, "OK!") + + sub_bar_handler.nc = nc + sub_bar_handler.msgs = [] + yield nc.subscribe("bar", cb=sub_bar_handler) + + @tornado.gen.coroutine + def sub_quux_handler(msg): + msgs = sub_quux_handler.msgs + msgs.append(msg) + sub_quux_handler.msgs = [] + yield nc.subscribe("quux", cb=sub_quux_handler) + + yield nc.publish("foo", "hello") + for i in range(0, 10): + yield nc.publish("quux", "test-{}".format(i)) + + response = yield nc.request("bar", b'help') + self.assertEqual(response.data, 'OK!') + + yield tornado.gen.sleep(0.2) + self.assertEqual(len(sub_foo_handler.msgs), 1) + self.assertEqual(len(sub_bar_handler.msgs), 1) + self.assertEqual(len(sub_quux_handler.msgs), 10) + + yield nc.close() class ClientAuthTest(tornado.testing.AsyncTestCase): def setUp(self): From d0932d81c2e5a6f1702de95bf0fa93296875600e Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Tue, 12 Jun 2018 16:27:50 -0700 Subject: [PATCH 06/11] Add slow consumer tests for pending msgs/bytes limits Signed-off-by: Waldemar Quevedo --- tests/client_test.py | 105 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 105 insertions(+) diff --git a/tests/client_test.py b/tests/client_test.py index 6d7c2dc..2e57f77 100644 --- a/tests/client_test.py +++ b/tests/client_test.py @@ -1126,6 +1126,111 @@ def sub_quux_handler(msg): yield nc.close() + @tornado.testing.gen_test + def test_subscribe_slow_consumer_pending_msgs_limit(self): + nc = Client() + + def error_cb(err): + error_cb.errors.append(err) + error_cb.errors = [] + + yield nc.connect(io_loop=self.io_loop, + error_cb=error_cb) + + @tornado.gen.coroutine + def sub_hello_handler(msg): + msgs = sub_hello_handler.msgs + msgs.append(msg) + + if len(msgs) == 5: + yield tornado.gen.sleep(0.5) + + sub_hello_handler.msgs = [] + yield nc.subscribe("hello", cb=sub_hello_handler, pending_msgs_limit=5) + + for i in range(0, 20): + yield nc.publish("hello", "test-{}".format(i)) + yield nc.flush(1) + + # Wait a bit for subscriber to recover + yield tornado.gen.sleep(0.5) + + for i in range(0, 3): + yield nc.publish("hello", "ok-{}".format(i)) + yield nc.flush(1) + + # Wait a bit to receive the final messages + yield tornado.gen.sleep(0.5) + + # There would be a few async slow consumer errors + errors = error_cb.errors + self.assertTrue(len(errors) > 0) + self.assertTrue(type(errors[0]) is ErrSlowConsumer) + + # We should have received some messages and dropped others, + # but definitely got the last 3 messages after recovering + # from the slow consumer error. + msgs = sub_hello_handler.msgs + self.assertEqual(len(msgs), 13) + + msgs = sub_hello_handler.msgs[-3:] + for i in range(0, 3): + self.assertEqual("ok-{}".format(i), msgs[i].data) + yield nc.close() + + @tornado.testing.gen_test + def test_subscribe_slow_consumer_pending_bytes_limit(self): + nc = Client() + + def error_cb(err): + error_cb.errors.append(err) + error_cb.errors = [] + + yield nc.connect(io_loop=self.io_loop, + error_cb=error_cb) + + @tornado.gen.coroutine + def sub_hello_handler(msg): + msgs = sub_hello_handler.msgs + msgs.append(msg) + sub_hello_handler.data += msg.data + if len(sub_hello_handler.data) == 10: + yield tornado.gen.sleep(0.5) + + sub_hello_handler.msgs = [] + sub_hello_handler.data = '' + yield nc.subscribe("hello", cb=sub_hello_handler, pending_bytes_limit=10) + + for i in range(0, 20): + yield nc.publish("hello", "A") + yield nc.flush(1) + + # Wait a bit for subscriber to recover + yield tornado.gen.sleep(1) + + for i in range(0, 3): + yield nc.publish("hello", "B") + yield nc.flush(1) + + # Wait a bit to receive the final messages + yield tornado.gen.sleep(1) + + # There would be a few async slow consumer errors + errors = error_cb.errors + self.assertTrue(len(errors) > 0) + self.assertTrue(type(errors[0]) is ErrSlowConsumer) + + # We should have received some messages and dropped others, + # but definitely got the last 3 messages after recovering + # from the slow consumer error. + msgs = sub_hello_handler.msgs + self.assertTrue(len(msgs) > 10 and len(msgs) != 23) + + msgs = sub_hello_handler.msgs[-3:] + for i in range(0, 3): + self.assertEqual("B", msgs[i].data) + yield nc.close() + class ClientAuthTest(tornado.testing.AsyncTestCase): def setUp(self): print("\n=== RUN {0}.{1}".format(self.__class__.__name__, From 27a00de9b91f01641ee59e8b3fcece2f19778f23 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Tue, 12 Jun 2018 17:05:17 -0700 Subject: [PATCH 07/11] Update request() docstring Signed-off-by: Waldemar Quevedo --- nats/io/client.py | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/nats/io/client.py b/nats/io/client.py index 48eb6d3..6ebf010 100644 --- a/nats/io/client.py +++ b/nats/io/client.py @@ -421,9 +421,24 @@ def _flush_timeout(self, timeout): @tornado.gen.coroutine def request(self, subject, payload, timeout=0.5, expected=1, cb=None): """ - Implements the request/response pattern via pub/sub - using an ephemeral subscription which will be published - with customizable limited interest. + Implements the request/response pattern via pub/sub using an + unique reply subject and an async subscription. + + If cb is None, then it will wait and return a single message + using the new request/response style that is less chatty over + the network. + + ->> SUB _INBOX.BF6zPVxvScfXGd4VyUMJyo.* 1 + ->> PUB hello _INBOX.BF6zPVxvScfXGd4VyUMJyo.BF6zPVxvScfXCh4VyUMJyo 5 + ->> MSG_PAYLOAD: hello + <<- MSG hello 2 _INBOX.BF6zPVxvScfXGd4VyUMJyo.BF6zPVxvScfXCh4VyUMJyo 5 + ->> PUB _INBOX.BF6zPVxvScfXGd4VyUMJyo.BF6zPVxvScfXCh4VyUMJyo 5 + ->> MSG_PAYLOAD: world + <<- MSG _INBOX.BF6zPVxvScfXGd4VyUMJyo.BF6zPVxvScfXCh4VyUMJyo 1 5 + + If a cb is passed, then it will use auto unsubscribe + functionality and expect a limited number of messages which + will be handled asynchronously in the callback. ->> SUB _INBOX.gnKUg9bmAHANjxIsDiQsWO 90 ->> UNSUB 90 1 From 116b6f659a0fb70d7330b762e12180f01eb10175 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Wed, 13 Jun 2018 03:55:29 -0700 Subject: [PATCH 08/11] Fix removing subscriptions using auto unsubscribe Signed-off-by: Waldemar Quevedo --- nats/io/client.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/nats/io/client.py b/nats/io/client.py index 6ebf010..da72483 100644 --- a/nats/io/client.py +++ b/nats/io/client.py @@ -452,7 +452,11 @@ def request(self, subject, payload, timeout=0.5, expected=1, cb=None): next_inbox = INBOX_PREFIX[:] next_inbox.extend(self._nuid.next()) inbox = str(next_inbox) - sid = yield self.subscribe(inbox, _EMPTY_, cb) + sid = yield self.subscribe(inbox, + queue=_EMPTY_, + cb=cb, + max_msgs=expected, + ) yield self.auto_unsubscribe(sid, expected) yield self.publish_request(subject, inbox, payload) raise tornado.gen.Return(sid) From 237fd33551ca7cea8d4e63359785c62d44506ee2 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Wed, 13 Jun 2018 03:58:03 -0700 Subject: [PATCH 09/11] Fix removing subscriptions on close Signed-off-by: Waldemar Quevedo --- nats/io/client.py | 29 ++++++++++++++++++++++++-- tests/client_test.py | 48 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 2 deletions(-) diff --git a/nats/io/client.py b/nats/io/client.py index da72483..628a8f1 100644 --- a/nats/io/client.py +++ b/nats/io/client.py @@ -660,12 +660,27 @@ def unsubscribe(self, ssid, max_msgs=0): # remove the callback locally too. if max_msgs == 0 or sub.received >= max_msgs: self._subs.pop(ssid, None) + self._remove_subscription(sub) # We will send these for all subs when we reconnect anyway, # so that we can suppress here. if not self.is_reconnecting: yield self.auto_unsubscribe(ssid, max_msgs) + def _remove_subscription(self, sub): + # Mark as invalid + sub.closed = True + + # Remove the pending queue + if sub.pending_queue is not None: + try: + # Send empty msg to signal cancellation + # and stop the msg processing loop. + sub.pending_queue.put_nowait(None) + except tornado.queues.QueueFull: + # Skip error + return + @tornado.gen.coroutine def auto_unsubscribe(self, sid, limit=1): """ @@ -735,6 +750,9 @@ def _process_msg(self, sid, subject, reply, data): # Check if it is an old style request. if sub.future is not None: sub.future.set_result(msg) + + # Discard subscription since done + self._remove_subscription(sub) raise tornado.gen.Return() # Let subscription wait_for_msgs coroutine process the messages, @@ -1037,6 +1055,12 @@ def _close(self, status, do_callbacks=True): if not self.io.closed(): self.io.close() + # Cleanup subscriptions since not reconnecting so no need + # to replay the subscriptions anymore. + for ssid, sub in self._subs.items(): + self._subs.pop(ssid, None) + self._remove_subscription(sub) + if do_callbacks: if self._disconnected_cb is not None: self._disconnected_cb() @@ -1167,6 +1191,7 @@ def __init__( is_async=False, future=None, max_msgs=0, + sid=None, ): self.subject = subject self.queue = queue @@ -1175,14 +1200,14 @@ def __init__( self.max_msgs = max_msgs self.is_async = is_async self.received = 0 + self.sid = sid # Per subscription message processor self.pending_msgs_limit = None self.pending_bytes_limit = None self.pending_queue = None self.pending_size = 0 - self.wait_for_msgs_task = None - + self.closed = False class Msg(object): __slots__ = 'subject', 'reply', 'data', 'sid' diff --git a/tests/client_test.py b/tests/client_test.py index 2e57f77..41ee844 100644 --- a/tests/client_test.py +++ b/tests/client_test.py @@ -1231,6 +1231,54 @@ def sub_hello_handler(msg): self.assertEqual("B", msgs[i].data) yield nc.close() + @tornado.testing.gen_test + def test_close_stops_subscriptions_loops(self): + nc = Client() + + def error_cb(err): + error_cb.errors.append(err) + error_cb.errors = [] + + yield nc.connect(io_loop=self.io_loop, + error_cb=error_cb) + + @tornado.gen.coroutine + def sub_hello_handler(msg): + msgs = sub_hello_handler.msgs + msgs.append(msg) + + sub_hello_handler.msgs = [] + yield nc.subscribe("hello.foo.bar", cb=sub_hello_handler) + yield nc.subscribe("hello.*.*", cb=sub_hello_handler) + yield nc.subscribe("hello.>", cb=sub_hello_handler) + yield nc.subscribe(">", cb=sub_hello_handler) + + self.assertEqual(len(self.io_loop._callbacks), 5) + for i in range(0, 10): + yield nc.publish("hello.foo.bar", "test-{}".format(i)) + yield nc.flush(1) + msgs = sub_hello_handler.msgs + self.assertEqual(len(msgs), 40) + + self.assertEqual(len(nc._subs), 4) + + subs = [] + for _, sub in nc._subs.items(): + subs.append(sub) + self.assertEqual(sub.closed, False) + + yield nc.close() + + # Close should have removed all subscriptions + self.assertEqual(len(nc._subs), 0) + + # Let background message processors stop + yield tornado.gen.sleep(0) + self.assertEqual(len(self.io_loop._callbacks), 0) + + for sub in subs: + self.assertEqual(sub.closed, True) + class ClientAuthTest(tornado.testing.AsyncTestCase): def setUp(self): print("\n=== RUN {0}.{1}".format(self.__class__.__name__, From 128cd8a988166afc9d6edc838a39fe8231cbcae1 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Wed, 13 Jun 2018 04:02:29 -0700 Subject: [PATCH 10/11] Remove msg processing coroutine from sub on empty msg Signed-off-by: Waldemar Quevedo --- nats/io/client.py | 34 +++++++++++++++++++++++++++++----- tests/client_test.py | 18 ++++++++++++++++++ 2 files changed, 47 insertions(+), 5 deletions(-) diff --git a/nats/io/client.py b/nats/io/client.py index 628a8f1..bae99ff 100644 --- a/nats/io/client.py +++ b/nats/io/client.py @@ -483,7 +483,14 @@ def request(self, subject, payload, timeout=0.5, expected=1, cb=None): @tornado.gen.coroutine def wait_for_msgs(): while True: + sub = wait_for_msgs.sub + if sub.closed: + break + msg = yield sub.pending_queue.get() + if msg is None: + break + token = msg.subject[INBOX_PREFIX_LEN:] try: fut = self._resp_map[token] @@ -495,12 +502,13 @@ def wait_for_msgs(): continue wait_for_msgs.sub = sub - sub.wait_for_msgs_task = self._loop.spawn_callback(wait_for_msgs) + self._loop.spawn_callback(wait_for_msgs) # Store the subscription in the subscriptions map, # then send the protocol commands to the server. self._ssid += 1 sid = self._ssid + sub.sid = sid self._subs[sid] = sub # Send SUB command... @@ -575,6 +583,7 @@ def subscribe(self, future=future, max_msgs=max_msgs, is_async=is_async, + sid=sid, ) self._subs[sid] = sub @@ -587,14 +596,24 @@ def subscribe(self, @tornado.gen.coroutine def wait_for_msgs(): - sub = wait_for_msgs.sub - err_cb = wait_for_msgs.err_cb - while True: + sub = wait_for_msgs.sub + err_cb = wait_for_msgs.err_cb + try: + sub = wait_for_msgs.sub + if sub.closed: + break + msg = yield sub.pending_queue.get() + if msg is None: + break sub.pending_size -= len(msg.data) + if sub.max_msgs > 0 and sub.received >= sub.max_msgs: + # If we have hit the max for delivered msgs, remove sub. + self._remove_subscription(sub) + # Invoke depending of type of handler. if sub.is_async: # NOTE: Deprecate this usage in a next release, @@ -610,11 +629,16 @@ def wait_for_msgs(): # handler are async errors. if err_cb is not None: yield err_cb(e) + finally: + if sub.max_msgs > 0 and sub.received >= sub.max_msgs: + # If we have hit the max for delivered msgs, remove sub. + self._remove_subscription(sub) + break # Bind the subscription and error cb if present wait_for_msgs.sub = sub wait_for_msgs.err_cb = self._error_cb - sub.wait_for_msgs_task = self._loop.spawn_callback(wait_for_msgs) + self._loop.spawn_callback(wait_for_msgs) elif future is not None: # Used to handle the single response from a request diff --git a/tests/client_test.py b/tests/client_test.py index 41ee844..f4b9fa1 100644 --- a/tests/client_test.py +++ b/tests/client_test.py @@ -704,8 +704,12 @@ def test_unsubscribe(self): yield nc.publish("foo", b'A') yield nc.publish("foo", b'B') yield tornado.gen.sleep(1) + + sub = nc._subs[sid] yield nc.unsubscribe(sid) yield nc.flush() + self.assertEqual(sub.closed, True) + yield nc.publish("foo", b'C') yield nc.publish("foo", b'D') self.assertEqual(2, len(log.records["foo"])) @@ -737,7 +741,11 @@ def test_unsubscribe_only_if_max_reached(self): yield nc.publish("foo", b'C') yield tornado.gen.sleep(1) self.assertEqual(3, len(log.records["foo"])) + + sub = nc._subs[sid] yield nc.unsubscribe(sid, 3) + self.assertEqual(sub.closed, True) + yield nc.publish("foo", b'D') yield nc.flush() self.assertEqual(3, len(log.records["foo"])) @@ -783,7 +791,16 @@ def respond(self, msg=None): yield nc.subscribe(">", "", log.persist) yield nc.subscribe("help", "", c.respond) yield nc.request("help", "please", expected=2, cb=c.receive_responses) + + subs = [] + for _, sub in nc._subs.items(): + subs.append(sub) + self.assertEqual(len(subs), 3) + + self.assertEqual(len(self.io_loop._callbacks), 4) yield tornado.gen.sleep(0.5) + self.assertEqual(len(nc._subs), 2) + self.assertEqual(len(self.io_loop._callbacks), 0) http = tornado.httpclient.AsyncHTTPClient() response = yield http.fetch( @@ -808,6 +825,7 @@ def respond(self, msg=None): self.assertEqual('please', full_msg) self.assertEqual("ok:1", c.replies[0].data) self.assertEqual("ok:2", c.replies[1].data) + yield nc.close() @tornado.testing.gen_test def test_timed_request(self): From 9c2b25638bc7e0c4127f53830d455a07fd317a59 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Wed, 13 Jun 2018 04:15:12 -0700 Subject: [PATCH 11/11] Formatting fixes with yapf Signed-off-by: Waldemar Quevedo --- nats/io/client.py | 77 +++++++++++++++++++++++++------------------- tests/client_test.py | 25 +++++++------- 2 files changed, 57 insertions(+), 45 deletions(-) diff --git a/nats/io/client.py b/nats/io/client.py index bae99ff..554143d 100644 --- a/nats/io/client.py +++ b/nats/io/client.py @@ -68,7 +68,7 @@ DEFAULT_MAX_PAYLOAD_SIZE = 1048576 # Default Pending Limits of Subscriptions -DEFAULT_SUB_PENDING_MSGS_LIMIT = 65536 +DEFAULT_SUB_PENDING_MSGS_LIMIT = 65536 DEFAULT_SUB_PENDING_BYTES_LIMIT = 65536 * 1024 PROTOCOL = 1 @@ -452,11 +452,12 @@ def request(self, subject, payload, timeout=0.5, expected=1, cb=None): next_inbox = INBOX_PREFIX[:] next_inbox.extend(self._nuid.next()) inbox = str(next_inbox) - sid = yield self.subscribe(inbox, - queue=_EMPTY_, - cb=cb, - max_msgs=expected, - ) + sid = yield self.subscribe( + inbox, + queue=_EMPTY_, + cb=cb, + max_msgs=expected, + ) yield self.auto_unsubscribe(sid, expected) yield self.publish_request(subject, inbox, payload) raise tornado.gen.Return(sid) @@ -476,8 +477,7 @@ def request(self, subject, payload, timeout=0.5, expected=1, cb=None): sub.pending_msgs_limit = DEFAULT_SUB_PENDING_MSGS_LIMIT sub.pending_bytes_limit = DEFAULT_SUB_PENDING_BYTES_LIMIT sub.pending_queue = tornado.queues.Queue( - maxsize=sub.pending_msgs_limit, - ) + maxsize=sub.pending_msgs_limit) # Single task for handling the requests @tornado.gen.coroutine @@ -512,8 +512,10 @@ def wait_for_msgs(): self._subs[sid] = sub # Send SUB command... - sub_cmd = b''.join([SUB_OP, _SPC_, sub.subject.encode( - ), _SPC_, ("%d" % sid).encode(), _CRLF_]) + sub_cmd = b''.join([ + SUB_OP, _SPC_, + sub.subject.encode(), _SPC_, ("%d" % sid).encode(), _CRLF_ + ]) yield self.send_command(sub_cmd) yield self._flush_pending() @@ -524,7 +526,8 @@ def wait_for_msgs(): future = tornado.concurrent.Future() self._resp_map[token.decode()] = future yield self.publish_request(subject, str(inbox), payload) - msg = yield tornado.gen.with_timeout(timedelta(seconds=timeout), future) + msg = yield tornado.gen.with_timeout( + timedelta(seconds=timeout), future) raise tornado.gen.Return(msg) @tornado.gen.coroutine @@ -555,16 +558,17 @@ def timed_request(self, subject, payload, timeout=0.5): raise tornado.gen.Return(msg) @tornado.gen.coroutine - def subscribe(self, - subject="", - queue="", - cb=None, - future=None, - max_msgs=0, - is_async=False, - pending_msgs_limit=DEFAULT_SUB_PENDING_MSGS_LIMIT, - pending_bytes_limit=DEFAULT_SUB_PENDING_BYTES_LIMIT, - ): + def subscribe( + self, + subject="", + queue="", + cb=None, + future=None, + max_msgs=0, + is_async=False, + pending_msgs_limit=DEFAULT_SUB_PENDING_MSGS_LIMIT, + pending_bytes_limit=DEFAULT_SUB_PENDING_BYTES_LIMIT, + ): """ Sends a SUB command to the server. Takes a queue parameter which can be used in case of distributed queues or left empty @@ -591,8 +595,7 @@ def subscribe(self, sub.pending_msgs_limit = pending_msgs_limit sub.pending_bytes_limit = pending_bytes_limit sub.pending_queue = tornado.queues.Queue( - maxsize=pending_msgs_limit, - ) + maxsize=pending_msgs_limit) @tornado.gen.coroutine def wait_for_msgs(): @@ -604,7 +607,7 @@ def wait_for_msgs(): sub = wait_for_msgs.sub if sub.closed: break - + msg = yield sub.pending_queue.get() if msg is None: break @@ -646,8 +649,11 @@ def wait_for_msgs(): sub.future = future # Send SUB command... - sub_cmd = b''.join([SUB_OP, _SPC_, sub.subject.encode( - ), _SPC_, sub.queue.encode(), _SPC_, ("%d" % sid).encode(), _CRLF_]) + sub_cmd = b''.join([ + SUB_OP, _SPC_, + sub.subject.encode(), _SPC_, + sub.queue.encode(), _SPC_, ("%d" % sid).encode(), _CRLF_ + ]) yield self.send_command(sub_cmd) yield self._flush_pending() raise tornado.gen.Return(sid) @@ -1080,7 +1086,7 @@ def _close(self, status, do_callbacks=True): self.io.close() # Cleanup subscriptions since not reconnecting so no need - # to replay the subscriptions anymore. + # to replay the subscriptions anymore. for ssid, sub in self._subs.items(): self._subs.pop(ssid, None) self._remove_subscription(sub) @@ -1233,15 +1239,17 @@ def __init__( self.pending_size = 0 self.closed = False + class Msg(object): __slots__ = 'subject', 'reply', 'data', 'sid' - def __init__(self, - subject='', - reply='', - data=b'', - sid=0, - ): + def __init__( + self, + subject='', + reply='', + data=b'', + sid=0, + ): self.subject = subject self.reply = reply self.data = data @@ -1253,7 +1261,8 @@ def __repr__(self): self.subject, self.reply, self.data[:10].decode(), - ) + ) + class Srv(object): """ diff --git a/tests/client_test.py b/tests/client_test.py index f4b9fa1..3cd1df5 100644 --- a/tests/client_test.py +++ b/tests/client_test.py @@ -742,7 +742,7 @@ def test_unsubscribe_only_if_max_reached(self): yield tornado.gen.sleep(1) self.assertEqual(3, len(log.records["foo"])) - sub = nc._subs[sid] + sub = nc._subs[sid] yield nc.unsubscribe(sid, 3) self.assertEqual(sub.closed, True) @@ -791,7 +791,7 @@ def respond(self, msg=None): yield nc.subscribe(">", "", log.persist) yield nc.subscribe("help", "", c.respond) yield nc.request("help", "please", expected=2, cb=c.receive_responses) - + subs = [] for _, sub in nc._subs.items(): subs.append(sub) @@ -1127,6 +1127,7 @@ def sub_bar_handler(msg): def sub_quux_handler(msg): msgs = sub_quux_handler.msgs msgs.append(msg) + sub_quux_handler.msgs = [] yield nc.subscribe("quux", cb=sub_quux_handler) @@ -1150,10 +1151,10 @@ def test_subscribe_slow_consumer_pending_msgs_limit(self): def error_cb(err): error_cb.errors.append(err) + error_cb.errors = [] - yield nc.connect(io_loop=self.io_loop, - error_cb=error_cb) + yield nc.connect(io_loop=self.io_loop, error_cb=error_cb) @tornado.gen.coroutine def sub_hello_handler(msg): @@ -1202,10 +1203,10 @@ def test_subscribe_slow_consumer_pending_bytes_limit(self): def error_cb(err): error_cb.errors.append(err) + error_cb.errors = [] - yield nc.connect(io_loop=self.io_loop, - error_cb=error_cb) + yield nc.connect(io_loop=self.io_loop, error_cb=error_cb) @tornado.gen.coroutine def sub_hello_handler(msg): @@ -1217,7 +1218,8 @@ def sub_hello_handler(msg): sub_hello_handler.msgs = [] sub_hello_handler.data = '' - yield nc.subscribe("hello", cb=sub_hello_handler, pending_bytes_limit=10) + yield nc.subscribe( + "hello", cb=sub_hello_handler, pending_bytes_limit=10) for i in range(0, 20): yield nc.publish("hello", "A") @@ -1255,10 +1257,10 @@ def test_close_stops_subscriptions_loops(self): def error_cb(err): error_cb.errors.append(err) + error_cb.errors = [] - yield nc.connect(io_loop=self.io_loop, - error_cb=error_cb) + yield nc.connect(io_loop=self.io_loop, error_cb=error_cb) @tornado.gen.coroutine def sub_hello_handler(msg): @@ -1268,8 +1270,8 @@ def sub_hello_handler(msg): sub_hello_handler.msgs = [] yield nc.subscribe("hello.foo.bar", cb=sub_hello_handler) yield nc.subscribe("hello.*.*", cb=sub_hello_handler) - yield nc.subscribe("hello.>", cb=sub_hello_handler) - yield nc.subscribe(">", cb=sub_hello_handler) + yield nc.subscribe("hello.>", cb=sub_hello_handler) + yield nc.subscribe(">", cb=sub_hello_handler) self.assertEqual(len(self.io_loop._callbacks), 5) for i in range(0, 10): @@ -1297,6 +1299,7 @@ def sub_hello_handler(msg): for sub in subs: self.assertEqual(sub.closed, True) + class ClientAuthTest(tornado.testing.AsyncTestCase): def setUp(self): print("\n=== RUN {0}.{1}".format(self.__class__.__name__,