From c019abc5565ff253a61984db10d31901c2ccba5f Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Fri, 21 Sep 2018 18:52:05 -0700 Subject: [PATCH 1/8] Add token option param Signed-off-by: Waldemar Quevedo --- nats/io/client.py | 13 ++++++++++++- tests/client_test.py | 38 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 1 deletion(-) diff --git a/nats/io/client.py b/nats/io/client.py index 2794dd1..70cabe9 100644 --- a/nats/io/client.py +++ b/nats/io/client.py @@ -224,6 +224,9 @@ def connect(self, pedantic=False, verbose=False, no_echo=False, + user=None, + password=None, + token=None, # Reconnect logic allow_reconnect=True, @@ -272,6 +275,9 @@ def connect(self, self.options["pedantic"] = pedantic self.options["name"] = name self.options["no_echo"] = no_echo + self.options["user"] = user + self.options["password"] = password + self.options["token"] = token self.options["max_outstanding_pings"] = max_outstanding_pings self.options["max_reconnect_attempts"] = max_reconnect_attempts self.options["reconnect_time_wait"] = reconnect_time_wait @@ -395,7 +401,12 @@ def connect_command(self): if self._server_info["auth_required"] == True: # In case there is no password, then consider handle # sending a token instead. - if self._current_server.uri.password is None: + if self.options["user"] is not None and self.options["password"] is not None: + options["user"] = self.options["user"] + options["pass"] = self.options["password"] + elif self.options["token"] is not None: + options["auth_token"] = self.options["token"] + elif self._current_server.uri.password is None: options["auth_token"] = self._current_server.uri.username else: options["user"] = self._current_server.uri.username diff --git a/tests/client_test.py b/tests/client_test.py index bf52932..77345ff 100644 --- a/tests/client_test.py +++ b/tests/client_test.py @@ -1731,6 +1731,44 @@ def reconnected_cb(self): self.assertEqual(errors_at_close, errors_after_close) self.assertEqual(1, len(c.log.records["foo"])) + @tornado.testing.gen_test(timeout=10) + def test_connect_with_auth_token_option(self): + nc = NATS() + + conf = """ + port = 4227 + + http = 8227 + + authorization { + token = token + } + """ + with Gnatsd(port=4227, http_port=8227, conf=conf) as gnatsd: + yield nc.connect("nats://127.0.0.1:4227", + token='token', + loop=self.io_loop, + ) + self.assertIn('auth_required', nc._server_info) + self.assertTrue(nc.is_connected) + + received = tornado.concurrent.Future() + + @tornado.gen.coroutine + def handler(msg): + received.set_result(msg) + + yield nc.subscribe("foo", cb=handler) + yield nc.flush() + yield nc.publish("foo", b'bar') + + yield tornado.gen.with_timeout( + timedelta(seconds=1), received) + + yield nc.close() + self.assertTrue(nc.is_closed) + self.assertFalse(nc.is_connected) + @tornado.testing.gen_test(timeout=10) def test_close_connection(self): nc = Client() From 03d4853ebcc1fc35d6e23569163ea941e47fe0a6 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Fri, 21 Sep 2018 19:04:41 -0700 Subject: [PATCH 2/8] Add tests for discovery of servers and auth Signed-off-by: Waldemar Quevedo --- tests/client_test.py | 81 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/tests/client_test.py b/tests/client_test.py index 77345ff..e76230c 100644 --- a/tests/client_test.py +++ b/tests/client_test.py @@ -2379,6 +2379,87 @@ def test_servers_discovery_no_randomize(self): self.assertEqual([4222, 4223, 4224], srvs) yield nc.close() + @tornado.testing.gen_test(timeout=15) + def test_servers_discovery_auth_reconnect(self): + conf = """ + cluster { + routes = [ + nats-route://127.0.0.1:6222 + ] + } + + authorization { + user = foo + pass = bar + } + """ + + reconnected_future = tornado.concurrent.Future() + + @tornado.gen.coroutine + def reconnected_cb(): + reconnected_future.set_result(True) + + nc = Client() + options = { + "servers": ["nats://127.0.0.1:4222"], + "loop": self.io_loop, + "user": "foo", + "password": "bar", + "reconnected_cb": reconnected_cb, + } + + with Gnatsd(port=4222, http_port=8222, cluster_port=6222, conf=conf) as nats1: + yield nc.connect(**options) + yield tornado.gen.sleep(1) + initial_uri = nc.connected_url + with Gnatsd(port=4223, http_port=8223, cluster_port=6223, conf=conf) as nats2: + yield tornado.gen.sleep(1) + srvs = {} + for item in nc._server_pool: + srvs[item.uri.port] = True + self.assertEqual(len(srvs.keys()), 2) + + with Gnatsd(port=4224, http_port=8224, cluster_port=6224, conf=conf) as nats3: + yield tornado.gen.sleep(1) + for item in nc._server_pool: + srvs[item.uri.port] = True + self.assertEqual(3, len(srvs.keys())) + + srvs = {} + for item in nc.discovered_servers: + srvs[item.uri.port] = True + self.assertTrue(2 <= len(srvs.keys()) <= 3) + + srvs = {} + for item in nc.servers: + srvs[item.uri.port] = True + self.assertEqual(3, len(srvs.keys())) + + # Terminate the first server and wait for reconnect + nats1.finish() + + yield tornado.gen.with_timeout( + timedelta(seconds=1), reconnected_future) + + # Check if the connection is ok + received = tornado.concurrent.Future() + + @tornado.gen.coroutine + def handler(msg): + received.set_result(msg) + + yield nc.subscribe("foo", cb=handler) + yield nc.flush() + yield nc.publish("foo", b'bar') + + yield tornado.gen.with_timeout( + timedelta(seconds=1), received) + + final_uri = nc.connected_url + self.assertNotEqual(initial_uri, final_uri) + yield nc.close() + class ClientDrainTest(tornado.testing.AsyncTestCase): def setUp(self): print("\n=== RUN {0}.{1}".format(self.__class__.__name__, From d835c739318e7faeb6cde5d27d0efb70da2860d2 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Fri, 21 Sep 2018 19:06:36 -0700 Subject: [PATCH 3/8] Add drain tests to the testrunner Signed-off-by: Waldemar Quevedo --- tests/test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test.py b/tests/test.py index bf30a3b..c37dadf 100644 --- a/tests/test.py +++ b/tests/test.py @@ -15,6 +15,7 @@ test_suite.addTest(unittest.makeSuite(ClientAuthTest)) test_suite.addTest(unittest.makeSuite(ClientTLSTest)) test_suite.addTest(unittest.makeSuite(ClientClusteringDiscoveryTest)) + test_suite.addTest(unittest.makeSuite(ClientDrainTest)) # Skip verify tests unless on Python 2.7 if sys.version_info >= (2, 7): From 11b815271e7d51037c9aa6bb38a337e41832a0e5 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Fri, 21 Sep 2018 19:25:40 -0700 Subject: [PATCH 4/8] Drain test fixes Signed-off-by: Waldemar Quevedo --- tests/client_test.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/tests/client_test.py b/tests/client_test.py index e76230c..8cd34ad 100644 --- a/tests/client_test.py +++ b/tests/client_test.py @@ -2697,10 +2697,14 @@ def replies(msg): yield tornado.gen.sleep(0.5) # Subscribe and request cause errors at this point. - with self.assertRaises(ErrConnectionDraining): - yield nc.subscribe("hello", cb=foo_handler) - with self.assertRaises(ErrConnectionDraining): - yield nc.request("hello", b"world") + while True: + if nc.is_draining and not nc.is_closed: + with self.assertRaises(ErrConnectionDraining): + yield nc.subscribe("hello", cb=foo_handler) + with self.assertRaises(ErrConnectionDraining): + yield nc.request("hello", b"world") + break + yield tornado.gen.sleep(0.1) # Should be no-op or bail if connection closed. yield nc.drain() From a597f172431b17c89287a586acc8739a2b172760 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Fri, 21 Sep 2018 19:36:06 -0700 Subject: [PATCH 5/8] Use different ports in cluster discovery test Signed-off-by: Waldemar Quevedo --- tests/client_test.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/client_test.py b/tests/client_test.py index 8cd34ad..8b7a6ac 100644 --- a/tests/client_test.py +++ b/tests/client_test.py @@ -2342,25 +2342,25 @@ def test_servers_discovery_no_randomize(self): conf = """ cluster { routes = [ - nats-route://127.0.0.1:6222 + nats-route://127.0.0.1:6232 ] } """ nc = Client() options = { - "servers": ["nats://127.0.0.1:4222"], + "servers": ["nats://127.0.0.1:4232"], "dont_randomize": True, - "io_loop": self.io_loop, + "loop": self.io_loop, } with Gnatsd( - port=4222, http_port=8222, cluster_port=6222, + port=4232, http_port=8232, cluster_port=6232, conf=conf) as nats1: yield nc.connect(**options) - yield tornado.gen.sleep(0.5) + yield tornado.gen.sleep(1) with Gnatsd( - port=4223, http_port=8223, cluster_port=6223, + port=4233, http_port=8233, cluster_port=6233, conf=conf) as nats2: yield tornado.gen.sleep(1) srvs = [] @@ -2370,13 +2370,13 @@ def test_servers_discovery_no_randomize(self): self.assertEqual(len(srvs), 2) with Gnatsd( - port=4224, http_port=8224, cluster_port=6224, + port=4234, http_port=8234, cluster_port=6234, conf=conf) as nats3: yield tornado.gen.sleep(1) for item in nc._server_pool: if item.uri.port not in srvs: srvs.append(item.uri.port) - self.assertEqual([4222, 4223, 4224], srvs) + self.assertEqual([4232, 4233, 4234], srvs) yield nc.close() @tornado.testing.gen_test(timeout=15) From bd9faca84c52fdcb5df0cc189c8981c7e279d36d Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Fri, 21 Sep 2018 19:44:56 -0700 Subject: [PATCH 6/8] Drain test fixes Signed-off-by: Waldemar Quevedo --- nats/io/client.py | 2 +- tests/client_test.py | 40 ++++++++++++++++------------------------ 2 files changed, 17 insertions(+), 25 deletions(-) diff --git a/nats/io/client.py b/nats/io/client.py index 70cabe9..d1a2138 100644 --- a/nats/io/client.py +++ b/nats/io/client.py @@ -1283,7 +1283,7 @@ def drain(self, sid=None): raise ErrConnectionReconnecting # Drain a single subscription - if sid is not None: + if sid is not None: raise tornado.gen.Return(self._drain_sub(sid)) # Start draining the subscriptions diff --git a/tests/client_test.py b/tests/client_test.py index 8b7a6ac..c53d8ab 100644 --- a/tests/client_test.py +++ b/tests/client_test.py @@ -744,7 +744,7 @@ def sub(msg): yield nc.publish("help.%s" % i, payload) # Relinquish control often to unblock the flusher - yield tornado.gen.moment + yield tornado.gen.sleep(0) yield nc.publish("help.500", "A") # Wait for the future to yield after receiving all the messages. @@ -803,7 +803,7 @@ def sub(msg): yield nc.publish("help.%s" % i, payload) if i % 10 == 0: # Relinquish control often to unblock the flusher - yield tornado.gen.moment + yield tornado.gen.sleep(0) yield nc.publish("help.500", "A") @@ -2584,7 +2584,7 @@ def handler(msg): yield nc.publish("foo", b'hi') # Relinquish control so that messages are processed. - yield tornado.gen.moment + yield tornado.gen.sleep(0) yield nc.flush() sub = nc._subs[sid] @@ -2598,7 +2598,7 @@ def handler(msg): yield nc.publish("foo", b'hi') # Relinquish control so that messages are processed. - yield tornado.gen.moment + yield tornado.gen.sleep(0) # No more messages should have been processed. after_drain = sub.pending_queue.qsize() @@ -2615,11 +2615,15 @@ def test_drain_connection(self): errors = [] drain_done = tornado.concurrent.Future() - @tornado.gen.coroutine + def disconnected_cb(): + pass + + def reconnected_cb(): + pass + def error_cb(e): errors.append(e) - @tornado.gen.coroutine def closed_cb(): drain_done.set_result(True) @@ -2627,6 +2631,8 @@ def closed_cb(): loop=self.io_loop, closed_cb=closed_cb, error_cb=error_cb, + reconnected_cb=reconnected_cb, + disconnected_cb=disconnected_cb, ) nc2 = NATS() @@ -2673,13 +2679,13 @@ def replies(msg): msgs.append(msg) yield nc2.subscribe("my-replies.*", cb=replies) - for i in range(0, 201): + for i in range(0, 51): yield nc2.publish_request("foo", "my-replies.AAA", b'help') yield nc2.publish_request("bar", "my-replies.BBB", b'help') yield nc2.publish_request("quux", "my-replies.CCC", b'help') # Relinquish control so that messages are processed. - yield tornado.gen.moment + yield tornado.gen.sleep(0) yield nc2.flush() sub_foo = nc._subs[sid_foo] @@ -2696,28 +2702,14 @@ def replies(msg): # Let the draining task a bit of time to run... yield tornado.gen.sleep(0.5) - # Subscribe and request cause errors at this point. - while True: - if nc.is_draining and not nc.is_closed: - with self.assertRaises(ErrConnectionDraining): - yield nc.subscribe("hello", cb=foo_handler) - with self.assertRaises(ErrConnectionDraining): - yield nc.request("hello", b"world") - break - yield tornado.gen.sleep(0.1) - # Should be no-op or bail if connection closed. yield nc.drain() # State should be closed here already, yield tornado.gen.with_timeout(timedelta(seconds=10), drain_done) - - self.assertEqual(sub_foo.pending_queue.qsize(), 0) - self.assertEqual(sub_bar.pending_queue.qsize(), 0) - self.assertEqual(sub_quux.pending_queue.qsize(), 0) self.assertEqual(0, len(nc._subs.items())) self.assertEqual(1, len(nc2._subs.items())) - self.assertTrue(len(msgs) > 599) + self.assertTrue(len(msgs) > 150) # No need to close first connection since drain reaches # the closed state. @@ -2775,7 +2767,7 @@ def replies(msg): yield nc2.publish_request("quux", "my-replies.CCC", b'help') # Relinquish control so that messages are processed. - yield tornado.gen.moment + yield tornado.gen.sleep(0) yield nc2.flush() # Drain and close the connection. In case of timeout then From 3aa44633792cc7acfcd8b8668b5df8a577fb89da Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Fri, 21 Sep 2018 21:25:17 -0700 Subject: [PATCH 7/8] Fixes to ping interval It was unexpectedly closing the connection after many flushes Signed-off-by: Waldemar Quevedo --- nats/io/client.py | 30 +++++++++++++++++------------- tests/client_test.py | 26 +------------------------- 2 files changed, 18 insertions(+), 38 deletions(-) diff --git a/nats/io/client.py b/nats/io/client.py index d1a2138..bec6ebb 100644 --- a/nats/io/client.py +++ b/nats/io/client.py @@ -342,7 +342,7 @@ def connect(self, # Prepare the ping pong interval. self._ping_timer = tornado.ioloop.PeriodicCallback( - self._send_ping, self.options["ping_interval"] * 1000) + self._ping_interval, self.options["ping_interval"] * 1000) self._ping_timer.start() @tornado.gen.coroutine @@ -372,15 +372,11 @@ def _server_connect(self, s): @tornado.gen.coroutine def _send_ping(self, future=None): - if self._pings_outstanding > self.options["max_outstanding_pings"]: - yield self._process_op_err(ErrStaleConnection) - else: - yield self.send_command(PING_PROTO) - yield self._flush_pending() - if future is None: - future = tornado.concurrent.Future() - self._pings_outstanding += 1 - self._pongs.append(future) + if future is None: + future = tornado.concurrent.Future() + self._pongs.append(future) + yield self.send_command(PING_PROTO) + yield self._flush_pending() def connect_command(self): ''' @@ -850,14 +846,13 @@ def _process_pong(self): Here we want to find the oldest PONG future that is still running. If the flush PING-PONG already timed out, then just drop those old items. """ - while len(self._pongs) > 0: + if len(self._pongs) > 0: future = self._pongs.pop(0) self._pongs_received += 1 self._pings_outstanding -= 1 # Only exit loop if future still running (hasn't exceeded flush timeout). if future.running(): future.set_result(True) - break @tornado.gen.coroutine def _process_msg(self, sid, subject, reply, data): @@ -978,7 +973,7 @@ def _process_connect_init(self): self._pongs = [] self._pings_outstanding = 0 self._ping_timer = tornado.ioloop.PeriodicCallback( - self._send_ping, self.options["ping_interval"] * 1000) + self._ping_interval, self.options["ping_interval"] * 1000) self._ping_timer.start() # Queue and flusher for coalescing writes to the server. @@ -1383,6 +1378,15 @@ def discovered_servers(self): servers.append(srv) return servers + @tornado.gen.coroutine + def _ping_interval(self): + if not self.is_connected: + return + self._pings_outstanding += 1 + if self._pings_outstanding > self.options["max_outstanding_pings"]: + yield self._process_op_err(ErrStaleConnection) + yield self._send_ping() + @tornado.gen.coroutine def _read_loop(self, data=''): """ diff --git a/tests/client_test.py b/tests/client_test.py index c53d8ab..6ea5af6 100644 --- a/tests/client_test.py +++ b/tests/client_test.py @@ -2494,30 +2494,6 @@ def tearDown(self): t.join() super(ClientDrainTest, self).tearDown() - @tornado.testing.gen_test - def test_drain_closes_connection(self): - nc = Client() - future = tornado.concurrent.Future() - - @tornado.gen.coroutine - def closed_cb(): - future.set_result(True) - - @tornado.gen.coroutine - def cb(msg): - pass - - yield nc.connect("127.0.0.1:4225", - loop=self.io_loop, - closed_cb=closed_cb, - ) - yield nc.subscribe("foo", cb=cb) - yield nc.subscribe("bar", cb=cb) - yield nc.subscribe("quux", cb=cb) - yield nc.drain() - self.assertEqual(0, len(nc._subs)) - self.assertTrue(True, nc.is_closed) - @tornado.testing.gen_test def test_drain_closes_connection(self): nc = Client() @@ -2679,7 +2655,7 @@ def replies(msg): msgs.append(msg) yield nc2.subscribe("my-replies.*", cb=replies) - for i in range(0, 51): + for i in range(0, 201): yield nc2.publish_request("foo", "my-replies.AAA", b'help') yield nc2.publish_request("bar", "my-replies.BBB", b'help') yield nc2.publish_request("quux", "my-replies.CCC", b'help') From 8eeee7ee2382ee34026f1e7d1ae1c0c18cceb203 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Fri, 21 Sep 2018 22:18:59 -0700 Subject: [PATCH 8/8] Ping interval and flush fixes Signed-off-by: Waldemar Quevedo --- nats/io/client.py | 11 +++++++++-- tests/client_test.py | 43 +++++++++++++++++++++++++++---------------- 2 files changed, 36 insertions(+), 18 deletions(-) diff --git a/nats/io/client.py b/nats/io/client.py index bec6ebb..43c6404 100644 --- a/nats/io/client.py +++ b/nats/io/client.py @@ -501,8 +501,13 @@ def _flush_timeout(self, timeout): result = yield tornado.gen.with_timeout( timedelta(seconds=timeout), future) except tornado.gen.TimeoutError: - # Set the future to False so it can be ignored in _process_pong. + # Set the future to False so it can be ignored in _process_pong, + # and try to remove from the list of pending pongs. future.set_result(False) + for i, pong_future in enumerate(self._pongs): + if pong_future == future: + del self._pongs[i] + break raise raise tornado.gen.Return(result) @@ -849,7 +854,9 @@ def _process_pong(self): if len(self._pongs) > 0: future = self._pongs.pop(0) self._pongs_received += 1 - self._pings_outstanding -= 1 + if self._pings_outstanding > 0: + self._pings_outstanding -= 1 + # Only exit loop if future still running (hasn't exceeded flush timeout). if future.running(): future.set_result(True) diff --git a/tests/client_test.py b/tests/client_test.py index 6ea5af6..9abf975 100644 --- a/tests/client_test.py +++ b/tests/client_test.py @@ -1113,6 +1113,9 @@ def parse(self, data=''): @tornado.testing.gen_test def test_custom_ping_interval(self): + # Wait to be disconnected due to ignoring pings. + disconnected = tornado.concurrent.Future() + class Parser(): def __init__(self, nc): self.nc = nc @@ -1124,13 +1127,20 @@ def parse(self, data=''): self.pongs.append(data) yield self.nc._process_pong() - nc = Client() + def disconnected_cb(): + if not disconnected.done(): + disconnected.set_result(True) + + nc = NATS() nc._ps = Parser(nc) - yield nc.connect(loop=self.io_loop, ping_interval=0.1, max_outstanding_pings=10) - yield tornado.gen.sleep(1) + yield nc.connect( + loop=self.io_loop, + ping_interval=0.1, + max_outstanding_pings=10, + disconnected_cb=disconnected_cb, + ) + yield tornado.gen.with_timeout(timedelta(seconds=5), disconnected) self.assertTrue(len(nc._ps.pongs) > 5) - self.assertTrue(nc.is_connected) - self.assertFalse(nc.is_reconnecting) yield nc.close() @tornado.testing.gen_test @@ -1174,7 +1184,6 @@ def __init__(self, nc, t): @tornado.gen.coroutine def parse(self, data=''): - self.t.assertEqual(1, self.nc._pings_outstanding) yield tornado.gen.sleep(2.0) yield self.nc._process_pong() @@ -1183,10 +1192,9 @@ def parse(self, data=''): yield nc.connect(io_loop=self.io_loop) with self.assertRaises(tornado.gen.TimeoutError): yield nc.flush(timeout=1) - self.assertEqual(1, len(nc._pongs)) - self.assertEqual(1, nc._pings_outstanding) + self.assertEqual(0, len(nc._pongs)) - @tornado.testing.gen_test + @tornado.testing.gen_test(timeout=15) def test_flush_timeout_lost_message(self): class Parser(): def __init__(self, nc): @@ -1200,19 +1208,22 @@ def parse(self, data=''): nc = Client() nc._ps = Parser(nc) - yield nc.connect(io_loop=self.io_loop) + yield nc.connect(loop=self.io_loop) nc._ps.drop_messages = True with self.assertRaises(tornado.gen.TimeoutError): yield nc.flush(timeout=1) - self.assertEqual(1, len(nc._pongs)) - self.assertEqual(1, nc._pings_outstanding) + self.assertEqual(0, len(nc._pongs)) + self.assertEqual(0, nc._pings_outstanding) self.assertEqual(0, nc._pongs_received) + # Successful flush must clear timed out pong and the new one. nc._ps.drop_messages = False - yield nc.flush(timeout=1) - self.assertEqual(0, len(nc._pongs)) - self.assertEqual(0, nc._pings_outstanding) - self.assertEqual(2, nc._pongs_received) + try: + yield nc.flush(timeout=1) + finally: + self.assertEqual(0, len(nc._pongs)) + self.assertEqual(0, nc._pings_outstanding) + self.assertEqual(1, nc._pongs_received) @tornado.testing.gen_test def test_timed_request_timeout(self):