diff --git a/nats/io/client.py b/nats/io/client.py index 2794dd1..43c6404 100644 --- a/nats/io/client.py +++ b/nats/io/client.py @@ -224,6 +224,9 @@ def connect(self, pedantic=False, verbose=False, no_echo=False, + user=None, + password=None, + token=None, # Reconnect logic allow_reconnect=True, @@ -272,6 +275,9 @@ def connect(self, self.options["pedantic"] = pedantic self.options["name"] = name self.options["no_echo"] = no_echo + self.options["user"] = user + self.options["password"] = password + self.options["token"] = token self.options["max_outstanding_pings"] = max_outstanding_pings self.options["max_reconnect_attempts"] = max_reconnect_attempts self.options["reconnect_time_wait"] = reconnect_time_wait @@ -336,7 +342,7 @@ def connect(self, # Prepare the ping pong interval. self._ping_timer = tornado.ioloop.PeriodicCallback( - self._send_ping, self.options["ping_interval"] * 1000) + self._ping_interval, self.options["ping_interval"] * 1000) self._ping_timer.start() @tornado.gen.coroutine @@ -366,15 +372,11 @@ def _server_connect(self, s): @tornado.gen.coroutine def _send_ping(self, future=None): - if self._pings_outstanding > self.options["max_outstanding_pings"]: - yield self._process_op_err(ErrStaleConnection) - else: - yield self.send_command(PING_PROTO) - yield self._flush_pending() - if future is None: - future = tornado.concurrent.Future() - self._pings_outstanding += 1 - self._pongs.append(future) + if future is None: + future = tornado.concurrent.Future() + self._pongs.append(future) + yield self.send_command(PING_PROTO) + yield self._flush_pending() def connect_command(self): ''' @@ -395,7 +397,12 @@ def connect_command(self): if self._server_info["auth_required"] == True: # In case there is no password, then consider handle # sending a token instead. - if self._current_server.uri.password is None: + if self.options["user"] is not None and self.options["password"] is not None: + options["user"] = self.options["user"] + options["pass"] = self.options["password"] + elif self.options["token"] is not None: + options["auth_token"] = self.options["token"] + elif self._current_server.uri.password is None: options["auth_token"] = self._current_server.uri.username else: options["user"] = self._current_server.uri.username @@ -494,8 +501,13 @@ def _flush_timeout(self, timeout): result = yield tornado.gen.with_timeout( timedelta(seconds=timeout), future) except tornado.gen.TimeoutError: - # Set the future to False so it can be ignored in _process_pong. + # Set the future to False so it can be ignored in _process_pong, + # and try to remove from the list of pending pongs. future.set_result(False) + for i, pong_future in enumerate(self._pongs): + if pong_future == future: + del self._pongs[i] + break raise raise tornado.gen.Return(result) @@ -839,14 +851,15 @@ def _process_pong(self): Here we want to find the oldest PONG future that is still running. If the flush PING-PONG already timed out, then just drop those old items. """ - while len(self._pongs) > 0: + if len(self._pongs) > 0: future = self._pongs.pop(0) self._pongs_received += 1 - self._pings_outstanding -= 1 + if self._pings_outstanding > 0: + self._pings_outstanding -= 1 + # Only exit loop if future still running (hasn't exceeded flush timeout). if future.running(): future.set_result(True) - break @tornado.gen.coroutine def _process_msg(self, sid, subject, reply, data): @@ -967,7 +980,7 @@ def _process_connect_init(self): self._pongs = [] self._pings_outstanding = 0 self._ping_timer = tornado.ioloop.PeriodicCallback( - self._send_ping, self.options["ping_interval"] * 1000) + self._ping_interval, self.options["ping_interval"] * 1000) self._ping_timer.start() # Queue and flusher for coalescing writes to the server. @@ -1272,7 +1285,7 @@ def drain(self, sid=None): raise ErrConnectionReconnecting # Drain a single subscription - if sid is not None: + if sid is not None: raise tornado.gen.Return(self._drain_sub(sid)) # Start draining the subscriptions @@ -1372,6 +1385,15 @@ def discovered_servers(self): servers.append(srv) return servers + @tornado.gen.coroutine + def _ping_interval(self): + if not self.is_connected: + return + self._pings_outstanding += 1 + if self._pings_outstanding > self.options["max_outstanding_pings"]: + yield self._process_op_err(ErrStaleConnection) + yield self._send_ping() + @tornado.gen.coroutine def _read_loop(self, data=''): """ diff --git a/tests/client_test.py b/tests/client_test.py index bf52932..9abf975 100644 --- a/tests/client_test.py +++ b/tests/client_test.py @@ -744,7 +744,7 @@ def sub(msg): yield nc.publish("help.%s" % i, payload) # Relinquish control often to unblock the flusher - yield tornado.gen.moment + yield tornado.gen.sleep(0) yield nc.publish("help.500", "A") # Wait for the future to yield after receiving all the messages. @@ -803,7 +803,7 @@ def sub(msg): yield nc.publish("help.%s" % i, payload) if i % 10 == 0: # Relinquish control often to unblock the flusher - yield tornado.gen.moment + yield tornado.gen.sleep(0) yield nc.publish("help.500", "A") @@ -1113,6 +1113,9 @@ def parse(self, data=''): @tornado.testing.gen_test def test_custom_ping_interval(self): + # Wait to be disconnected due to ignoring pings. + disconnected = tornado.concurrent.Future() + class Parser(): def __init__(self, nc): self.nc = nc @@ -1124,13 +1127,20 @@ def parse(self, data=''): self.pongs.append(data) yield self.nc._process_pong() - nc = Client() + def disconnected_cb(): + if not disconnected.done(): + disconnected.set_result(True) + + nc = NATS() nc._ps = Parser(nc) - yield nc.connect(loop=self.io_loop, ping_interval=0.1, max_outstanding_pings=10) - yield tornado.gen.sleep(1) + yield nc.connect( + loop=self.io_loop, + ping_interval=0.1, + max_outstanding_pings=10, + disconnected_cb=disconnected_cb, + ) + yield tornado.gen.with_timeout(timedelta(seconds=5), disconnected) self.assertTrue(len(nc._ps.pongs) > 5) - self.assertTrue(nc.is_connected) - self.assertFalse(nc.is_reconnecting) yield nc.close() @tornado.testing.gen_test @@ -1174,7 +1184,6 @@ def __init__(self, nc, t): @tornado.gen.coroutine def parse(self, data=''): - self.t.assertEqual(1, self.nc._pings_outstanding) yield tornado.gen.sleep(2.0) yield self.nc._process_pong() @@ -1183,10 +1192,9 @@ def parse(self, data=''): yield nc.connect(io_loop=self.io_loop) with self.assertRaises(tornado.gen.TimeoutError): yield nc.flush(timeout=1) - self.assertEqual(1, len(nc._pongs)) - self.assertEqual(1, nc._pings_outstanding) + self.assertEqual(0, len(nc._pongs)) - @tornado.testing.gen_test + @tornado.testing.gen_test(timeout=15) def test_flush_timeout_lost_message(self): class Parser(): def __init__(self, nc): @@ -1200,19 +1208,22 @@ def parse(self, data=''): nc = Client() nc._ps = Parser(nc) - yield nc.connect(io_loop=self.io_loop) + yield nc.connect(loop=self.io_loop) nc._ps.drop_messages = True with self.assertRaises(tornado.gen.TimeoutError): yield nc.flush(timeout=1) - self.assertEqual(1, len(nc._pongs)) - self.assertEqual(1, nc._pings_outstanding) + self.assertEqual(0, len(nc._pongs)) + self.assertEqual(0, nc._pings_outstanding) self.assertEqual(0, nc._pongs_received) + # Successful flush must clear timed out pong and the new one. nc._ps.drop_messages = False - yield nc.flush(timeout=1) - self.assertEqual(0, len(nc._pongs)) - self.assertEqual(0, nc._pings_outstanding) - self.assertEqual(2, nc._pongs_received) + try: + yield nc.flush(timeout=1) + finally: + self.assertEqual(0, len(nc._pongs)) + self.assertEqual(0, nc._pings_outstanding) + self.assertEqual(1, nc._pongs_received) @tornado.testing.gen_test def test_timed_request_timeout(self): @@ -1731,6 +1742,44 @@ def reconnected_cb(self): self.assertEqual(errors_at_close, errors_after_close) self.assertEqual(1, len(c.log.records["foo"])) + @tornado.testing.gen_test(timeout=10) + def test_connect_with_auth_token_option(self): + nc = NATS() + + conf = """ + port = 4227 + + http = 8227 + + authorization { + token = token + } + """ + with Gnatsd(port=4227, http_port=8227, conf=conf) as gnatsd: + yield nc.connect("nats://127.0.0.1:4227", + token='token', + loop=self.io_loop, + ) + self.assertIn('auth_required', nc._server_info) + self.assertTrue(nc.is_connected) + + received = tornado.concurrent.Future() + + @tornado.gen.coroutine + def handler(msg): + received.set_result(msg) + + yield nc.subscribe("foo", cb=handler) + yield nc.flush() + yield nc.publish("foo", b'bar') + + yield tornado.gen.with_timeout( + timedelta(seconds=1), received) + + yield nc.close() + self.assertTrue(nc.is_closed) + self.assertFalse(nc.is_connected) + @tornado.testing.gen_test(timeout=10) def test_close_connection(self): nc = Client() @@ -2304,25 +2353,25 @@ def test_servers_discovery_no_randomize(self): conf = """ cluster { routes = [ - nats-route://127.0.0.1:6222 + nats-route://127.0.0.1:6232 ] } """ nc = Client() options = { - "servers": ["nats://127.0.0.1:4222"], + "servers": ["nats://127.0.0.1:4232"], "dont_randomize": True, - "io_loop": self.io_loop, + "loop": self.io_loop, } with Gnatsd( - port=4222, http_port=8222, cluster_port=6222, + port=4232, http_port=8232, cluster_port=6232, conf=conf) as nats1: yield nc.connect(**options) - yield tornado.gen.sleep(0.5) + yield tornado.gen.sleep(1) with Gnatsd( - port=4223, http_port=8223, cluster_port=6223, + port=4233, http_port=8233, cluster_port=6233, conf=conf) as nats2: yield tornado.gen.sleep(1) srvs = [] @@ -2332,13 +2381,94 @@ def test_servers_discovery_no_randomize(self): self.assertEqual(len(srvs), 2) with Gnatsd( - port=4224, http_port=8224, cluster_port=6224, + port=4234, http_port=8234, cluster_port=6234, conf=conf) as nats3: yield tornado.gen.sleep(1) for item in nc._server_pool: if item.uri.port not in srvs: srvs.append(item.uri.port) - self.assertEqual([4222, 4223, 4224], srvs) + self.assertEqual([4232, 4233, 4234], srvs) + yield nc.close() + + @tornado.testing.gen_test(timeout=15) + def test_servers_discovery_auth_reconnect(self): + conf = """ + cluster { + routes = [ + nats-route://127.0.0.1:6222 + ] + } + + authorization { + user = foo + pass = bar + } + """ + + reconnected_future = tornado.concurrent.Future() + + @tornado.gen.coroutine + def reconnected_cb(): + reconnected_future.set_result(True) + + nc = Client() + options = { + "servers": ["nats://127.0.0.1:4222"], + "loop": self.io_loop, + "user": "foo", + "password": "bar", + "reconnected_cb": reconnected_cb, + } + + with Gnatsd(port=4222, http_port=8222, cluster_port=6222, conf=conf) as nats1: + yield nc.connect(**options) + yield tornado.gen.sleep(1) + initial_uri = nc.connected_url + with Gnatsd(port=4223, http_port=8223, cluster_port=6223, conf=conf) as nats2: + yield tornado.gen.sleep(1) + srvs = {} + for item in nc._server_pool: + srvs[item.uri.port] = True + self.assertEqual(len(srvs.keys()), 2) + + with Gnatsd(port=4224, http_port=8224, cluster_port=6224, conf=conf) as nats3: + yield tornado.gen.sleep(1) + for item in nc._server_pool: + srvs[item.uri.port] = True + self.assertEqual(3, len(srvs.keys())) + + srvs = {} + for item in nc.discovered_servers: + srvs[item.uri.port] = True + self.assertTrue(2 <= len(srvs.keys()) <= 3) + + srvs = {} + for item in nc.servers: + srvs[item.uri.port] = True + self.assertEqual(3, len(srvs.keys())) + + # Terminate the first server and wait for reconnect + nats1.finish() + + yield tornado.gen.with_timeout( + timedelta(seconds=1), reconnected_future) + + # Check if the connection is ok + received = tornado.concurrent.Future() + + @tornado.gen.coroutine + def handler(msg): + received.set_result(msg) + + yield nc.subscribe("foo", cb=handler) + yield nc.flush() + yield nc.publish("foo", b'bar') + + yield tornado.gen.with_timeout( + timedelta(seconds=1), received) + + final_uri = nc.connected_url + self.assertNotEqual(initial_uri, final_uri) yield nc.close() class ClientDrainTest(tornado.testing.AsyncTestCase): @@ -2375,30 +2505,6 @@ def tearDown(self): t.join() super(ClientDrainTest, self).tearDown() - @tornado.testing.gen_test - def test_drain_closes_connection(self): - nc = Client() - future = tornado.concurrent.Future() - - @tornado.gen.coroutine - def closed_cb(): - future.set_result(True) - - @tornado.gen.coroutine - def cb(msg): - pass - - yield nc.connect("127.0.0.1:4225", - loop=self.io_loop, - closed_cb=closed_cb, - ) - yield nc.subscribe("foo", cb=cb) - yield nc.subscribe("bar", cb=cb) - yield nc.subscribe("quux", cb=cb) - yield nc.drain() - self.assertEqual(0, len(nc._subs)) - self.assertTrue(True, nc.is_closed) - @tornado.testing.gen_test def test_drain_closes_connection(self): nc = Client() @@ -2465,7 +2571,7 @@ def handler(msg): yield nc.publish("foo", b'hi') # Relinquish control so that messages are processed. - yield tornado.gen.moment + yield tornado.gen.sleep(0) yield nc.flush() sub = nc._subs[sid] @@ -2479,7 +2585,7 @@ def handler(msg): yield nc.publish("foo", b'hi') # Relinquish control so that messages are processed. - yield tornado.gen.moment + yield tornado.gen.sleep(0) # No more messages should have been processed. after_drain = sub.pending_queue.qsize() @@ -2496,11 +2602,15 @@ def test_drain_connection(self): errors = [] drain_done = tornado.concurrent.Future() - @tornado.gen.coroutine + def disconnected_cb(): + pass + + def reconnected_cb(): + pass + def error_cb(e): errors.append(e) - @tornado.gen.coroutine def closed_cb(): drain_done.set_result(True) @@ -2508,6 +2618,8 @@ def closed_cb(): loop=self.io_loop, closed_cb=closed_cb, error_cb=error_cb, + reconnected_cb=reconnected_cb, + disconnected_cb=disconnected_cb, ) nc2 = NATS() @@ -2560,7 +2672,7 @@ def replies(msg): yield nc2.publish_request("quux", "my-replies.CCC", b'help') # Relinquish control so that messages are processed. - yield tornado.gen.moment + yield tornado.gen.sleep(0) yield nc2.flush() sub_foo = nc._subs[sid_foo] @@ -2577,24 +2689,14 @@ def replies(msg): # Let the draining task a bit of time to run... yield tornado.gen.sleep(0.5) - # Subscribe and request cause errors at this point. - with self.assertRaises(ErrConnectionDraining): - yield nc.subscribe("hello", cb=foo_handler) - with self.assertRaises(ErrConnectionDraining): - yield nc.request("hello", b"world") - # Should be no-op or bail if connection closed. yield nc.drain() # State should be closed here already, yield tornado.gen.with_timeout(timedelta(seconds=10), drain_done) - - self.assertEqual(sub_foo.pending_queue.qsize(), 0) - self.assertEqual(sub_bar.pending_queue.qsize(), 0) - self.assertEqual(sub_quux.pending_queue.qsize(), 0) self.assertEqual(0, len(nc._subs.items())) self.assertEqual(1, len(nc2._subs.items())) - self.assertTrue(len(msgs) > 599) + self.assertTrue(len(msgs) > 150) # No need to close first connection since drain reaches # the closed state. @@ -2652,7 +2754,7 @@ def replies(msg): yield nc2.publish_request("quux", "my-replies.CCC", b'help') # Relinquish control so that messages are processed. - yield tornado.gen.moment + yield tornado.gen.sleep(0) yield nc2.flush() # Drain and close the connection. In case of timeout then diff --git a/tests/test.py b/tests/test.py index bf30a3b..c37dadf 100644 --- a/tests/test.py +++ b/tests/test.py @@ -15,6 +15,7 @@ test_suite.addTest(unittest.makeSuite(ClientAuthTest)) test_suite.addTest(unittest.makeSuite(ClientTLSTest)) test_suite.addTest(unittest.makeSuite(ClientClusteringDiscoveryTest)) + test_suite.addTest(unittest.makeSuite(ClientDrainTest)) # Skip verify tests unless on Python 2.7 if sys.version_info >= (2, 7):