From 5568e3258491c35a15a9e19d210b05958bf68f44 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Wed, 1 Mar 2017 01:30:26 -0800 Subject: [PATCH 1/7] Add reconnect time back off and infinite reconnects support --- nats/io/client.py | 29 +++++++++++++++++++++-------- tests/client_test.py | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 8 deletions(-) diff --git a/nats/io/client.py b/nats/io/client.py index 27e2ede..5dd5103 100644 --- a/nats/io/client.py +++ b/nats/io/client.py @@ -136,6 +136,7 @@ def connect(self, tcp_nodelay=False, connect_timeout=DEFAULT_CONNECT_TIMEOUT, max_reconnect_attempts=MAX_RECONNECT_ATTEMPTS, + reconnect_time_wait=RECONNECT_TIME_WAIT, tls=None ): """ @@ -157,6 +158,7 @@ def connect(self, self.options["name"] = name self.options["max_outstanding_pings"] = max_outstanding_pings self.options["max_reconnect_attempts"] = max_reconnect_attempts + self.options["reconnect_time_wait"] = reconnect_time_wait self.options["dont_randomize"] = dont_randomize self.options["allow_reconnect"] = allow_reconnect self.options["tcp_nodelay"] = tcp_nodelay @@ -191,8 +193,15 @@ def connect(self, if s is None: raise ErrNoServers + # Check when was the last attempt and back off before reconnecting + if s.last_attempt is not None: + now = time.time() + if (now - s.last_attempt) < self.options["reconnect_time_wait"]: + yield tornado.gen.sleep(self.options["reconnect_time_wait"]) + # Mark that we have attempted to connect s.reconnects += 1 + s.last_attempt = time.time() yield self._server_connect(s) self._current_server = s s.did_connect = True @@ -643,7 +652,7 @@ def _next_server(self): s = None for server in self._server_pool: - if server.reconnects > self.options["max_reconnect_attempts"]: + if self.options["max_reconnect_attempts"] > 0 and (server.reconnects > self.options["max_reconnect_attempts"]): continue else: s = server @@ -683,7 +692,7 @@ def _unbind(self): if self.is_connected: self._status = Client.RECONNECTING - if self._ping_timer.is_running(): + if self._ping_timer is not None and self._ping_timer.is_running(): self._ping_timer.stop() while True: @@ -734,15 +743,19 @@ def _schedule_primary_and_connect(self): s = self._next_server() if s is None: raise ErrNoServers - s.reconnects += 1 - self.stats['reconnects'] += 1 # For the reconnection logic, we need to consider # sleeping for a bit before trying to reconnect # too soon to a server which has failed previously. - yield tornado.gen.Task( - self._loop.add_timeout, - timedelta(seconds=RECONNECT_TIME_WAIT)) + # Check when was the last attempt and back off before reconnecting + if s.last_attempt is not None: + now = time.time() + if (now - s.last_attempt) < self.options["reconnect_time_wait"]: + yield tornado.gen.sleep(self.options["reconnect_time_wait"]) + + s.reconnects += 1 + s.last_attempt = time.time() + self.stats['reconnects'] += 1 try: yield self._server_connect(s) self._current_server = s @@ -791,7 +804,7 @@ def _close(self, status, do_callbacks=True): return self._status = Client.CLOSED - if self._ping_timer.is_running(): + if self._ping_timer is not None and self._ping_timer.is_running(): self._ping_timer.stop() if not self.io.closed(): diff --git a/tests/client_test.py b/tests/client_test.py index b73f0a3..4230b37 100644 --- a/tests/client_test.py +++ b/tests/client_test.py @@ -300,6 +300,38 @@ def disconnected_cb(self): yield client.nc.connect(**options) self.assertFalse(client.disconnected_cb_called) + @tornado.testing.gen_test(timeout=5) + def test_connect_fails_allow_reconnect_forever_until_close(self): + + class SampleClient(): + def __init__(self): + self.nc = Client() + self.disconnected_cb_called = False + self.closed_cb_called = False + + def disconnected_cb(self): + self.disconnected_cb_called = True + + def close_cb(self): + self.closed_cb_called = True + + client = SampleClient() + options = { + "servers": ["nats://127.0.0.1:4223"], + "close_cb": client.close_cb, + "disconnected_cb": client.disconnected_cb, + "allow_reconnect": True, + "io_loop": self.io_loop, + "max_reconnect_attempts": -1, + "reconnect_time_wait": 0.1 + } + self.io_loop.spawn_callback(client.nc.connect, **options) + yield tornado.gen.sleep(2) + yield client.nc.close() + self.assertTrue(client.nc._server_pool[0].reconnects > 10) + self.assertTrue(client.disconnected_cb_called) + self.assertTrue(client.closed_cb_called) + @tornado.testing.gen_test def test_subscribe(self): nc = Client() From 746e22e4e9cd4ade06c33589fb0e3180f3b85d3a Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Wed, 1 Mar 2017 01:45:30 -0800 Subject: [PATCH 2/7] Update reconnecting tests with max_reconnect_attempts --- tests/client_test.py | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/tests/client_test.py b/tests/client_test.py index 4230b37..055b1a7 100644 --- a/tests/client_test.py +++ b/tests/client_test.py @@ -285,20 +285,27 @@ class SampleClient(): def __init__(self): self.nc = Client() self.disconnected_cb_called = False + self.closed_cb_called = False def disconnected_cb(self): self.disconnected_cb_called = True + def closed_cb(self): + self.closed_cb_called = True + client = SampleClient() with self.assertRaises(ErrNoServers): options = { "servers": ["nats://127.0.0.1:4223"], - "close_cb": client.disconnected_cb, + "disconnected_cb": client.disconnected_cb, + "close_cb": client.closed_cb, "allow_reconnect": True, - "io_loop": self.io_loop + "io_loop": self.io_loop, + "max_reconnect_attempts": 2 } yield client.nc.connect(**options) self.assertFalse(client.disconnected_cb_called) + self.assertFalse(client.closed_cb_called) @tornado.testing.gen_test(timeout=5) def test_connect_fails_allow_reconnect_forever_until_close(self): @@ -960,7 +967,9 @@ def reconnected_cb(self): "close_cb": component.close_cb, "error_cb": component.error_cb, "disconnected_cb": component.disconnected_cb, - "reconnected_cb": component.reconnected_cb + "reconnected_cb": component.reconnected_cb, + "max_reconnect_attempts": 10, + "reconnect_time_wait": 0.1 } yield component.nc.connect(**options) self.assertEqual(True, component.nc.is_connected) @@ -1060,6 +1069,7 @@ def publisher(self): "reconnected_cb": c.reconnected_cb, "disconnected_cb": c.disconnected_cb, "error_cb": c.error_cb, + "reconnect_time_wait": 0.1 } yield c.nc.connect(**options) self.assertEqual(True, nc._server_info["auth_required"]) @@ -1068,18 +1078,16 @@ def publisher(self): yield c.nc.subscribe("foo", "", log.persist) self.io_loop.spawn_callback(c.publisher) + a = nc._current_server yield tornado.gen.sleep(0.001) orig_gnatsd = self.server_pool.pop(0) orig_gnatsd.finish() yield tornado.gen.sleep(0.001) - try: - a = nc._current_server - # Wait for reconnect logic kick in... - yield tornado.gen.sleep(3) - finally: - b = nc._current_server - self.assertNotEqual(a.uri, b.uri) + # Wait for reconnect logic kick in... + yield tornado.gen.sleep(3) + b = nc._current_server + self.assertNotEqual(a.uri, b.uri) # Should have reconnected already self.assertTrue(nc.is_connected) From 439d7c616d24924ffc29998a04ec19e6f00e2a76 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Fri, 24 Mar 2017 14:52:19 +0000 Subject: [PATCH 3/7] Bump version to 0.5.0 --- nats/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nats/__init__.py b/nats/__init__.py index 2cae124..51d01f9 100644 --- a/nats/__init__.py +++ b/nats/__init__.py @@ -1,2 +1,2 @@ -__version__ = b'0.4.0' +__version__ = b'0.5.0' __lang__ = b'python2' From 6204d4b6b48eb804f979a928e21f984b0c353d3c Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Fri, 24 Mar 2017 14:55:33 +0000 Subject: [PATCH 4/7] Update travis --- .travis.yml | 3 --- 1 file changed, 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index 80d83e1..48b4382 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,9 +8,6 @@ python: - 2.6.9 - 2.7 -env: - - DEBUG_NATS_TEST=true - before_install: - bash ./script/install_gnatsd.sh From 14716fd3c4479b4ce6e9f6911d0f39658ec84851 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Fri, 24 Mar 2017 15:26:36 +0000 Subject: [PATCH 5/7] Check for number of reconnect attempts in auth test --- tests/client_test.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/tests/client_test.py b/tests/client_test.py index 055b1a7..e9a22aa 100644 --- a/tests/client_test.py +++ b/tests/client_test.py @@ -938,12 +938,14 @@ def __init__(self, nc): self.nc = nc self.error = None self.error_cb_called = False + self.errors = [] self.close_cb_called = False self.disconnected_cb_called = False self.reconnected_cb_called = False def error_cb(self, err): self.error = err + self.errors.append(err) self.error_cb_called = True def close_cb(self): @@ -968,7 +970,7 @@ def reconnected_cb(self): "error_cb": component.error_cb, "disconnected_cb": component.disconnected_cb, "reconnected_cb": component.reconnected_cb, - "max_reconnect_attempts": 10, + "max_reconnect_attempts": 5, "reconnect_time_wait": 0.1 } yield component.nc.connect(**options) @@ -1009,7 +1011,7 @@ def reconnected_cb(self): orig_gnatsd.finish() # Wait for reconnect logic kick in and fail due to authorization error. - yield tornado.gen.sleep(1) + yield tornado.gen.sleep(0.5) self.assertFalse(component.nc.is_connected) self.assertTrue(component.nc.is_reconnecting) self.assertTrue(component.disconnected_cb_called) @@ -1021,9 +1023,10 @@ def reconnected_cb(self): # self.assertTrue(component.error_cb_called) # Connection is closed at this point after reconnect failed. - yield tornado.gen.sleep(2) - self.assertTrue(component.nc.is_closed) + yield tornado.gen.sleep(1) self.assertTrue(component.reconnected_cb_called) + self.assertEqual(6, len(component.errors)) + self.assertTrue(component.close_cb_called) @tornado.testing.gen_test(timeout=15) def test_auth_pending_bytes_handling(self): From 614e29058079b973a962c8af49e74d78abd4029d Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Fri, 24 Mar 2017 15:51:12 +0000 Subject: [PATCH 6/7] Remove unused tls config param --- nats/io/client.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/nats/io/client.py b/nats/io/client.py index 5dd5103..a57a593 100644 --- a/nats/io/client.py +++ b/nats/io/client.py @@ -583,15 +583,10 @@ def _process_connect_init(self): self._loop.remove_handler(self._socket.fileno()) tls_opts = {} - tls_version = None if "tls" in self.options: # Allow customizing the TLS version though default # to one that the server supports at least. tls_opts = self.options["tls"] - if "ssl_version" in tls_opts: - tls_version = tls_opts["ssl_version"] - else: - tls_version = ssl.PROTOCOL_TLSv1_2 # Rewrap using a TLS connection, can't do handshake on connect # as the socket is non blocking. From 289f4d44ddb10cba94114ff59ee0166bff083de5 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Fri, 24 Mar 2017 16:01:55 +0000 Subject: [PATCH 7/7] Add reconnect wait to TLS test --- tests/client_test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/client_test.py b/tests/client_test.py index e9a22aa..5973f76 100644 --- a/tests/client_test.py +++ b/tests/client_test.py @@ -1484,6 +1484,7 @@ def test_tls_verify_fails(self): "error_cb": c.error_cb, "disconnected_cb": c.disconnected_cb, "reconnected_cb": c.reconnected_cb, + "reconnect_time_wait": 0.1, "tls": { "cert_reqs": ssl.CERT_REQUIRED, # "ca_certs": "./tests/configs/certs/ca.pem",