diff --git a/.travis.yml b/.travis.yml index 80d83e1..48b4382 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,9 +8,6 @@ python: - 2.6.9 - 2.7 -env: - - DEBUG_NATS_TEST=true - before_install: - bash ./script/install_gnatsd.sh diff --git a/nats/__init__.py b/nats/__init__.py index 2cae124..51d01f9 100644 --- a/nats/__init__.py +++ b/nats/__init__.py @@ -1,2 +1,2 @@ -__version__ = b'0.4.0' +__version__ = b'0.5.0' __lang__ = b'python2' diff --git a/nats/io/client.py b/nats/io/client.py index 509b005..35e9096 100644 --- a/nats/io/client.py +++ b/nats/io/client.py @@ -136,6 +136,7 @@ def connect(self, tcp_nodelay=False, connect_timeout=DEFAULT_CONNECT_TIMEOUT, max_reconnect_attempts=MAX_RECONNECT_ATTEMPTS, + reconnect_time_wait=RECONNECT_TIME_WAIT, tls=None ): """ @@ -157,6 +158,7 @@ def connect(self, self.options["name"] = name self.options["max_outstanding_pings"] = max_outstanding_pings self.options["max_reconnect_attempts"] = max_reconnect_attempts + self.options["reconnect_time_wait"] = reconnect_time_wait self.options["dont_randomize"] = dont_randomize self.options["allow_reconnect"] = allow_reconnect self.options["tcp_nodelay"] = tcp_nodelay @@ -191,8 +193,15 @@ def connect(self, if s is None: raise ErrNoServers + # Check when was the last attempt and back off before reconnecting + if s.last_attempt is not None: + now = time.time() + if (now - s.last_attempt) < self.options["reconnect_time_wait"]: + yield tornado.gen.sleep(self.options["reconnect_time_wait"]) + # Mark that we have attempted to connect s.reconnects += 1 + s.last_attempt = time.time() yield self._server_connect(s) self._current_server = s s.did_connect = True @@ -569,15 +578,10 @@ def _process_connect_init(self): self._loop.remove_handler(self._socket.fileno()) tls_opts = {} - tls_version = None if "tls" in self.options: # Allow customizing the TLS version though default # to one that the server supports at least. tls_opts = self.options["tls"] - if "ssl_version" in tls_opts: - tls_version = tls_opts["ssl_version"] - else: - tls_version = ssl.PROTOCOL_TLSv1_2 # Rewrap using a TLS connection, can't do handshake on connect # as the socket is non blocking. @@ -638,7 +642,7 @@ def _next_server(self): s = None for server in self._server_pool: - if server.reconnects > self.options["max_reconnect_attempts"]: + if self.options["max_reconnect_attempts"] > 0 and (server.reconnects > self.options["max_reconnect_attempts"]): continue else: s = server @@ -678,7 +682,7 @@ def _unbind(self): if self.is_connected: self._status = Client.RECONNECTING - if self._ping_timer.is_running(): + if self._ping_timer is not None and self._ping_timer.is_running(): self._ping_timer.stop() while True: @@ -729,15 +733,19 @@ def _schedule_primary_and_connect(self): s = self._next_server() if s is None: raise ErrNoServers - s.reconnects += 1 - self.stats['reconnects'] += 1 # For the reconnection logic, we need to consider # sleeping for a bit before trying to reconnect # too soon to a server which has failed previously. - yield tornado.gen.Task( - self._loop.add_timeout, - timedelta(seconds=RECONNECT_TIME_WAIT)) + # Check when was the last attempt and back off before reconnecting + if s.last_attempt is not None: + now = time.time() + if (now - s.last_attempt) < self.options["reconnect_time_wait"]: + yield tornado.gen.sleep(self.options["reconnect_time_wait"]) + + s.reconnects += 1 + s.last_attempt = time.time() + self.stats['reconnects'] += 1 try: yield self._server_connect(s) self._current_server = s @@ -786,7 +794,7 @@ def _close(self, status, do_callbacks=True): return self._status = Client.CLOSED - if self._ping_timer.is_running(): + if self._ping_timer is not None and self._ping_timer.is_running(): self._ping_timer.stop() if not self.io.closed(): diff --git a/tests/client_test.py b/tests/client_test.py index 4138e10..0cd1254 100644 --- a/tests/client_test.py +++ b/tests/client_test.py @@ -285,20 +285,59 @@ class SampleClient(): def __init__(self): self.nc = Client() self.disconnected_cb_called = False + self.closed_cb_called = False def disconnected_cb(self): self.disconnected_cb_called = True + def closed_cb(self): + self.closed_cb_called = True + client = SampleClient() with self.assertRaises(ErrNoServers): options = { "servers": ["nats://127.0.0.1:4223"], - "close_cb": client.disconnected_cb, + "disconnected_cb": client.disconnected_cb, + "close_cb": client.closed_cb, "allow_reconnect": True, - "io_loop": self.io_loop + "io_loop": self.io_loop, + "max_reconnect_attempts": 2 } yield client.nc.connect(**options) self.assertFalse(client.disconnected_cb_called) + self.assertFalse(client.closed_cb_called) + + @tornado.testing.gen_test(timeout=5) + def test_connect_fails_allow_reconnect_forever_until_close(self): + + class SampleClient(): + def __init__(self): + self.nc = Client() + self.disconnected_cb_called = False + self.closed_cb_called = False + + def disconnected_cb(self): + self.disconnected_cb_called = True + + def close_cb(self): + self.closed_cb_called = True + + client = SampleClient() + options = { + "servers": ["nats://127.0.0.1:4223"], + "close_cb": client.close_cb, + "disconnected_cb": client.disconnected_cb, + "allow_reconnect": True, + "io_loop": self.io_loop, + "max_reconnect_attempts": -1, + "reconnect_time_wait": 0.1 + } + self.io_loop.spawn_callback(client.nc.connect, **options) + yield tornado.gen.sleep(2) + yield client.nc.close() + self.assertTrue(client.nc._server_pool[0].reconnects > 10) + self.assertTrue(client.disconnected_cb_called) + self.assertTrue(client.closed_cb_called) @tornado.testing.gen_test def test_subscribe(self): @@ -898,12 +937,14 @@ def __init__(self, nc): self.nc = nc self.error = None self.error_cb_called = False + self.errors = [] self.close_cb_called = False self.disconnected_cb_called = False self.reconnected_cb_called = False def error_cb(self, err): self.error = err + self.errors.append(err) self.error_cb_called = True def close_cb(self): @@ -927,7 +968,9 @@ def reconnected_cb(self): "close_cb": component.close_cb, "error_cb": component.error_cb, "disconnected_cb": component.disconnected_cb, - "reconnected_cb": component.reconnected_cb + "reconnected_cb": component.reconnected_cb, + "max_reconnect_attempts": 5, + "reconnect_time_wait": 0.1 } yield component.nc.connect(**options) self.assertEqual(True, component.nc.is_connected) @@ -967,7 +1010,7 @@ def reconnected_cb(self): orig_gnatsd.finish() # Wait for reconnect logic kick in and fail due to authorization error. - yield tornado.gen.sleep(1) + yield tornado.gen.sleep(0.5) self.assertFalse(component.nc.is_connected) self.assertTrue(component.nc.is_reconnecting) self.assertTrue(component.disconnected_cb_called) @@ -979,9 +1022,10 @@ def reconnected_cb(self): # self.assertTrue(component.error_cb_called) # Connection is closed at this point after reconnect failed. - yield tornado.gen.sleep(2) - self.assertTrue(component.nc.is_closed) + yield tornado.gen.sleep(1) self.assertTrue(component.reconnected_cb_called) + self.assertEqual(6, len(component.errors)) + self.assertTrue(component.close_cb_called) @tornado.testing.gen_test(timeout=15) def test_auth_pending_bytes_handling(self): @@ -1027,6 +1071,7 @@ def publisher(self): "reconnected_cb": c.reconnected_cb, "disconnected_cb": c.disconnected_cb, "error_cb": c.error_cb, + "reconnect_time_wait": 0.1 } yield c.nc.connect(**options) self.assertEqual(True, nc._server_info["auth_required"]) @@ -1035,18 +1080,16 @@ def publisher(self): yield c.nc.subscribe("foo", "", log.persist) self.io_loop.spawn_callback(c.publisher) + a = nc._current_server yield tornado.gen.sleep(0.001) orig_gnatsd = self.server_pool.pop(0) orig_gnatsd.finish() yield tornado.gen.sleep(0.001) - try: - a = nc._current_server - # Wait for reconnect logic kick in... - yield tornado.gen.sleep(3) - finally: - b = nc._current_server - self.assertNotEqual(a.uri, b.uri) + # Wait for reconnect logic kick in... + yield tornado.gen.sleep(3) + b = nc._current_server + self.assertNotEqual(a.uri, b.uri) # Should have reconnected already self.assertTrue(nc.is_connected) @@ -1440,6 +1483,7 @@ def test_tls_verify_fails(self): "error_cb": c.error_cb, "disconnected_cb": c.disconnected_cb, "reconnected_cb": c.reconnected_cb, + "reconnect_time_wait": 0.1, "tls": { "cert_reqs": ssl.CERT_REQUIRED, # "ca_certs": "./tests/configs/certs/ca.pem",