Skip to content

Commit

Permalink
Merge pull request #17 from wallyqs/max-reconnect-attempts
Browse files Browse the repository at this point in the history
Add back off between reconnections and infinite reconnects
  • Loading branch information
Waldemar Quevedo authored Mar 25, 2017
2 parents 5dabb4a + 289f4d4 commit 6c261f8
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 30 deletions.
3 changes: 0 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ python:
- 2.6.9
- 2.7

env:
- DEBUG_NATS_TEST=true

before_install:
- bash ./script/install_gnatsd.sh

Expand Down
2 changes: 1 addition & 1 deletion nats/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
__version__ = b'0.4.0'
__version__ = b'0.5.0'
__lang__ = b'python2'
34 changes: 21 additions & 13 deletions nats/io/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ def connect(self,
tcp_nodelay=False,
connect_timeout=DEFAULT_CONNECT_TIMEOUT,
max_reconnect_attempts=MAX_RECONNECT_ATTEMPTS,
reconnect_time_wait=RECONNECT_TIME_WAIT,
tls=None
):
"""
Expand All @@ -157,6 +158,7 @@ def connect(self,
self.options["name"] = name
self.options["max_outstanding_pings"] = max_outstanding_pings
self.options["max_reconnect_attempts"] = max_reconnect_attempts
self.options["reconnect_time_wait"] = reconnect_time_wait
self.options["dont_randomize"] = dont_randomize
self.options["allow_reconnect"] = allow_reconnect
self.options["tcp_nodelay"] = tcp_nodelay
Expand Down Expand Up @@ -191,8 +193,15 @@ def connect(self,
if s is None:
raise ErrNoServers

# Check when was the last attempt and back off before reconnecting
if s.last_attempt is not None:
now = time.time()
if (now - s.last_attempt) < self.options["reconnect_time_wait"]:
yield tornado.gen.sleep(self.options["reconnect_time_wait"])

# Mark that we have attempted to connect
s.reconnects += 1
s.last_attempt = time.time()
yield self._server_connect(s)
self._current_server = s
s.did_connect = True
Expand Down Expand Up @@ -569,15 +578,10 @@ def _process_connect_init(self):
self._loop.remove_handler(self._socket.fileno())

tls_opts = {}
tls_version = None
if "tls" in self.options:
# Allow customizing the TLS version though default
# to one that the server supports at least.
tls_opts = self.options["tls"]
if "ssl_version" in tls_opts:
tls_version = tls_opts["ssl_version"]
else:
tls_version = ssl.PROTOCOL_TLSv1_2

# Rewrap using a TLS connection, can't do handshake on connect
# as the socket is non blocking.
Expand Down Expand Up @@ -638,7 +642,7 @@ def _next_server(self):

s = None
for server in self._server_pool:
if server.reconnects > self.options["max_reconnect_attempts"]:
if self.options["max_reconnect_attempts"] > 0 and (server.reconnects > self.options["max_reconnect_attempts"]):
continue
else:
s = server
Expand Down Expand Up @@ -678,7 +682,7 @@ def _unbind(self):
if self.is_connected:
self._status = Client.RECONNECTING

if self._ping_timer.is_running():
if self._ping_timer is not None and self._ping_timer.is_running():
self._ping_timer.stop()

while True:
Expand Down Expand Up @@ -729,15 +733,19 @@ def _schedule_primary_and_connect(self):
s = self._next_server()
if s is None:
raise ErrNoServers
s.reconnects += 1
self.stats['reconnects'] += 1

# For the reconnection logic, we need to consider
# sleeping for a bit before trying to reconnect
# too soon to a server which has failed previously.
yield tornado.gen.Task(
self._loop.add_timeout,
timedelta(seconds=RECONNECT_TIME_WAIT))
# Check when was the last attempt and back off before reconnecting
if s.last_attempt is not None:
now = time.time()
if (now - s.last_attempt) < self.options["reconnect_time_wait"]:
yield tornado.gen.sleep(self.options["reconnect_time_wait"])

s.reconnects += 1
s.last_attempt = time.time()
self.stats['reconnects'] += 1
try:
yield self._server_connect(s)
self._current_server = s
Expand Down Expand Up @@ -786,7 +794,7 @@ def _close(self, status, do_callbacks=True):
return

self._status = Client.CLOSED
if self._ping_timer.is_running():
if self._ping_timer is not None and self._ping_timer.is_running():
self._ping_timer.stop()

if not self.io.closed():
Expand Down
70 changes: 57 additions & 13 deletions tests/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,20 +285,59 @@ class SampleClient():
def __init__(self):
self.nc = Client()
self.disconnected_cb_called = False
self.closed_cb_called = False

def disconnected_cb(self):
self.disconnected_cb_called = True

def closed_cb(self):
self.closed_cb_called = True

client = SampleClient()
with self.assertRaises(ErrNoServers):
options = {
"servers": ["nats://127.0.0.1:4223"],
"close_cb": client.disconnected_cb,
"disconnected_cb": client.disconnected_cb,
"close_cb": client.closed_cb,
"allow_reconnect": True,
"io_loop": self.io_loop
"io_loop": self.io_loop,
"max_reconnect_attempts": 2
}
yield client.nc.connect(**options)
self.assertFalse(client.disconnected_cb_called)
self.assertFalse(client.closed_cb_called)

@tornado.testing.gen_test(timeout=5)
def test_connect_fails_allow_reconnect_forever_until_close(self):

class SampleClient():
def __init__(self):
self.nc = Client()
self.disconnected_cb_called = False
self.closed_cb_called = False

def disconnected_cb(self):
self.disconnected_cb_called = True

def close_cb(self):
self.closed_cb_called = True

client = SampleClient()
options = {
"servers": ["nats://127.0.0.1:4223"],
"close_cb": client.close_cb,
"disconnected_cb": client.disconnected_cb,
"allow_reconnect": True,
"io_loop": self.io_loop,
"max_reconnect_attempts": -1,
"reconnect_time_wait": 0.1
}
self.io_loop.spawn_callback(client.nc.connect, **options)
yield tornado.gen.sleep(2)
yield client.nc.close()
self.assertTrue(client.nc._server_pool[0].reconnects > 10)
self.assertTrue(client.disconnected_cb_called)
self.assertTrue(client.closed_cb_called)

@tornado.testing.gen_test
def test_subscribe(self):
Expand Down Expand Up @@ -898,12 +937,14 @@ def __init__(self, nc):
self.nc = nc
self.error = None
self.error_cb_called = False
self.errors = []
self.close_cb_called = False
self.disconnected_cb_called = False
self.reconnected_cb_called = False

def error_cb(self, err):
self.error = err
self.errors.append(err)
self.error_cb_called = True

def close_cb(self):
Expand All @@ -927,7 +968,9 @@ def reconnected_cb(self):
"close_cb": component.close_cb,
"error_cb": component.error_cb,
"disconnected_cb": component.disconnected_cb,
"reconnected_cb": component.reconnected_cb
"reconnected_cb": component.reconnected_cb,
"max_reconnect_attempts": 5,
"reconnect_time_wait": 0.1
}
yield component.nc.connect(**options)
self.assertEqual(True, component.nc.is_connected)
Expand Down Expand Up @@ -967,7 +1010,7 @@ def reconnected_cb(self):
orig_gnatsd.finish()

# Wait for reconnect logic kick in and fail due to authorization error.
yield tornado.gen.sleep(1)
yield tornado.gen.sleep(0.5)
self.assertFalse(component.nc.is_connected)
self.assertTrue(component.nc.is_reconnecting)
self.assertTrue(component.disconnected_cb_called)
Expand All @@ -979,9 +1022,10 @@ def reconnected_cb(self):
# self.assertTrue(component.error_cb_called)

# Connection is closed at this point after reconnect failed.
yield tornado.gen.sleep(2)
self.assertTrue(component.nc.is_closed)
yield tornado.gen.sleep(1)
self.assertTrue(component.reconnected_cb_called)
self.assertEqual(6, len(component.errors))
self.assertTrue(component.close_cb_called)

@tornado.testing.gen_test(timeout=15)
def test_auth_pending_bytes_handling(self):
Expand Down Expand Up @@ -1027,6 +1071,7 @@ def publisher(self):
"reconnected_cb": c.reconnected_cb,
"disconnected_cb": c.disconnected_cb,
"error_cb": c.error_cb,
"reconnect_time_wait": 0.1
}
yield c.nc.connect(**options)
self.assertEqual(True, nc._server_info["auth_required"])
Expand All @@ -1035,18 +1080,16 @@ def publisher(self):
yield c.nc.subscribe("foo", "", log.persist)
self.io_loop.spawn_callback(c.publisher)

a = nc._current_server
yield tornado.gen.sleep(0.001)
orig_gnatsd = self.server_pool.pop(0)
orig_gnatsd.finish()
yield tornado.gen.sleep(0.001)

try:
a = nc._current_server
# Wait for reconnect logic kick in...
yield tornado.gen.sleep(3)
finally:
b = nc._current_server
self.assertNotEqual(a.uri, b.uri)
# Wait for reconnect logic kick in...
yield tornado.gen.sleep(3)
b = nc._current_server
self.assertNotEqual(a.uri, b.uri)

# Should have reconnected already
self.assertTrue(nc.is_connected)
Expand Down Expand Up @@ -1440,6 +1483,7 @@ def test_tls_verify_fails(self):
"error_cb": c.error_cb,
"disconnected_cb": c.disconnected_cb,
"reconnected_cb": c.reconnected_cb,
"reconnect_time_wait": 0.1,
"tls": {
"cert_reqs": ssl.CERT_REQUIRED,
# "ca_certs": "./tests/configs/certs/ca.pem",
Expand Down

0 comments on commit 6c261f8

Please sign in to comment.