Skip to content

Commit

Permalink
Merge pull request #57 from nats-io/auth-options
Browse files Browse the repository at this point in the history
More auth options
  • Loading branch information
wallyqs authored Sep 22, 2018
2 parents d7a690b + 8eeee7e commit 5086d79
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 84 deletions.
56 changes: 39 additions & 17 deletions nats/io/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,9 @@ def connect(self,
pedantic=False,
verbose=False,
no_echo=False,
user=None,
password=None,
token=None,

# Reconnect logic
allow_reconnect=True,
Expand Down Expand Up @@ -272,6 +275,9 @@ def connect(self,
self.options["pedantic"] = pedantic
self.options["name"] = name
self.options["no_echo"] = no_echo
self.options["user"] = user
self.options["password"] = password
self.options["token"] = token
self.options["max_outstanding_pings"] = max_outstanding_pings
self.options["max_reconnect_attempts"] = max_reconnect_attempts
self.options["reconnect_time_wait"] = reconnect_time_wait
Expand Down Expand Up @@ -336,7 +342,7 @@ def connect(self,

# Prepare the ping pong interval.
self._ping_timer = tornado.ioloop.PeriodicCallback(
self._send_ping, self.options["ping_interval"] * 1000)
self._ping_interval, self.options["ping_interval"] * 1000)
self._ping_timer.start()

@tornado.gen.coroutine
Expand Down Expand Up @@ -366,15 +372,11 @@ def _server_connect(self, s):

@tornado.gen.coroutine
def _send_ping(self, future=None):
if self._pings_outstanding > self.options["max_outstanding_pings"]:
yield self._process_op_err(ErrStaleConnection)
else:
yield self.send_command(PING_PROTO)
yield self._flush_pending()
if future is None:
future = tornado.concurrent.Future()
self._pings_outstanding += 1
self._pongs.append(future)
if future is None:
future = tornado.concurrent.Future()
self._pongs.append(future)
yield self.send_command(PING_PROTO)
yield self._flush_pending()

def connect_command(self):
'''
Expand All @@ -395,7 +397,12 @@ def connect_command(self):
if self._server_info["auth_required"] == True:
# In case there is no password, then consider handle
# sending a token instead.
if self._current_server.uri.password is None:
if self.options["user"] is not None and self.options["password"] is not None:
options["user"] = self.options["user"]
options["pass"] = self.options["password"]
elif self.options["token"] is not None:
options["auth_token"] = self.options["token"]
elif self._current_server.uri.password is None:
options["auth_token"] = self._current_server.uri.username
else:
options["user"] = self._current_server.uri.username
Expand Down Expand Up @@ -494,8 +501,13 @@ def _flush_timeout(self, timeout):
result = yield tornado.gen.with_timeout(
timedelta(seconds=timeout), future)
except tornado.gen.TimeoutError:
# Set the future to False so it can be ignored in _process_pong.
# Set the future to False so it can be ignored in _process_pong,
# and try to remove from the list of pending pongs.
future.set_result(False)
for i, pong_future in enumerate(self._pongs):
if pong_future == future:
del self._pongs[i]
break
raise
raise tornado.gen.Return(result)

Expand Down Expand Up @@ -839,14 +851,15 @@ def _process_pong(self):
Here we want to find the oldest PONG future that is still running. If the
flush PING-PONG already timed out, then just drop those old items.
"""
while len(self._pongs) > 0:
if len(self._pongs) > 0:
future = self._pongs.pop(0)
self._pongs_received += 1
self._pings_outstanding -= 1
if self._pings_outstanding > 0:
self._pings_outstanding -= 1

# Only exit loop if future still running (hasn't exceeded flush timeout).
if future.running():
future.set_result(True)
break

@tornado.gen.coroutine
def _process_msg(self, sid, subject, reply, data):
Expand Down Expand Up @@ -967,7 +980,7 @@ def _process_connect_init(self):
self._pongs = []
self._pings_outstanding = 0
self._ping_timer = tornado.ioloop.PeriodicCallback(
self._send_ping, self.options["ping_interval"] * 1000)
self._ping_interval, self.options["ping_interval"] * 1000)
self._ping_timer.start()

# Queue and flusher for coalescing writes to the server.
Expand Down Expand Up @@ -1272,7 +1285,7 @@ def drain(self, sid=None):
raise ErrConnectionReconnecting

# Drain a single subscription
if sid is not None:
if sid is not None:
raise tornado.gen.Return(self._drain_sub(sid))

# Start draining the subscriptions
Expand Down Expand Up @@ -1372,6 +1385,15 @@ def discovered_servers(self):
servers.append(srv)
return servers

@tornado.gen.coroutine
def _ping_interval(self):
if not self.is_connected:
return
self._pings_outstanding += 1
if self._pings_outstanding > self.options["max_outstanding_pings"]:
yield self._process_op_err(ErrStaleConnection)
yield self._send_ping()

@tornado.gen.coroutine
def _read_loop(self, data=''):
"""
Expand Down
Loading

0 comments on commit 5086d79

Please sign in to comment.