Skip to content

Commit

Permalink
Merge pull request #56 from nats-io/drain-mode
Browse files Browse the repository at this point in the history
Drain mode
  • Loading branch information
wallyqs authored Sep 22, 2018
2 parents e04504f + cf1f307 commit d7a690b
Show file tree
Hide file tree
Showing 7 changed files with 570 additions and 106 deletions.
86 changes: 38 additions & 48 deletions examples/advanced-example.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,68 +15,58 @@

import tornado.ioloop
import tornado.gen
import time
from nats.io import Client as NATS

from nats.io.errors import ErrNoServers

@tornado.gen.coroutine
def main():
nc = NATS()

# Set pool servers in the cluster and give a name to the client.
options = {
"name":
"worker",
"servers": [
"nats://secret:[email protected]:4222",
"nats://secret:[email protected]:4223",
"nats://secret:[email protected]:4224"
]
}

# Explicitly set loop to use for the reactor.
options["io_loop"] = tornado.ioloop.IOLoop.instance()

yield nc.connect(**options)
try:
# Setting explicit list of servers in a cluster and
# max reconnect retries.
servers = [
"nats://127.0.0.1:4222",
"nats://127.0.0.1:4223",
"nats://127.0.0.1:4224"
]
yield nc.connect(max_reconnect_attempts=2, servers=servers)
except ErrNoServers:
print("No servers available!")
return

@tornado.gen.coroutine
def subscriber(msg):
yield nc.publish("discover", "pong")
def message_handler(msg):
subject = msg.subject
reply = msg.reply
data = msg.data.decode()
for i in range(0, 20):
yield nc.publish(reply, "i={i}".format(i=i).encode())

yield nc.subscribe("discover", "", subscriber)
yield nc.subscribe("help.>", cb=message_handler)

@tornado.gen.coroutine
def async_subscriber(msg):
# First request takes longer, while others are still processed.
if msg.subject == "requests.1":
yield tornado.gen.sleep(0.5)
print("Processed request [{0}]: {1}".format(msg.subject, msg))

# Create asynchronous subscription and make roundtrip to server
# to ensure that subscriptions have been processed.
yield nc.subscribe_async("requests.*", cb=async_subscriber)
def request_handler(msg):
subject = msg.subject
reply = msg.reply
data = msg.data.decode()
print("Received a message on '{subject} {reply}': {data}".format(
subject=subject, reply=reply, data=data))

# Signal the server to stop sending messages after we got 10 already.
yield nc.request(
"help.please", b'help', expected=10, cb=request_handler)

# Flush connection to server, returns when all messages have been processed.
# It raises a timeout if roundtrip takes longer than 1 second.
yield nc.flush()
for i in range(1, 10):
yield nc.publish("requests.{0}".format(i), "example")
yield tornado.gen.sleep(1)

while True:
# Confirm stats to implement basic throttling logic.
sent = nc.stats["out_msgs"]
received = nc.stats["in_msgs"]
delta = sent - received

if delta > 2000:
print("Waiting... Sent: {0}, Received: {1}, Delta: {2}".format(
sent, received, delta))
yield tornado.gen.sleep(1)

if nc.stats["reconnects"] > 10:
print("[WARN] Reconnected over 10 times!")

for i in range(1000):
yield nc.publish("discover", "ping")
# Drain gracefully closes the connection, allowing all subscribers to
# handle any pending messages inflight that the server may have sent.
yield nc.drain()

# Drain works async in the background.
yield tornado.gen.sleep(1)

if __name__ == '__main__':
tornado.ioloop.IOLoop.instance().run_sync(main)
2 changes: 1 addition & 1 deletion examples/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def help_request_handler(msg):
msg = yield nc.request("help", b"Hi, need help!", timeout=0.2)
print("[Response]: %s" % msg.data)
except tornado.gen.TimeoutError:
print("Timeout!")
print("Response Timeout!")

# Remove interest in subscription.
yield nc.unsubscribe(sid)
Expand Down
26 changes: 26 additions & 0 deletions examples/wildcard.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import tornado.ioloop
import tornado.gen
import time
from nats.io import Client as NATS

@tornado.gen.coroutine
def main():
nc = NATS()

yield nc.connect("demo.nats.io")

@tornado.gen.coroutine
def subscriber(msg):
print("Msg received on [{0}]: {1}".format(msg.subject, msg.data))

yield nc.subscribe("foo.*.baz", "", subscriber)
yield nc.subscribe("foo.bar.*", "", subscriber)
yield nc.subscribe("foo.>", "", subscriber)
yield nc.subscribe(">", "", subscriber)

# Matches all of above
yield nc.publish("foo.bar.baz", b"Hello World")
yield tornado.gen.sleep(1)

if __name__ == '__main__':
tornado.ioloop.IOLoop.current().run_sync(main)
105 changes: 104 additions & 1 deletion nats/io/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
MAX_RECONNECT_ATTEMPTS = 60
RECONNECT_TIME_WAIT = 2 # seconds
DEFAULT_CONNECT_TIMEOUT = 2 # seconds
DEFAULT_DRAIN_TIMEOUT = 30 # seconds

DEFAULT_READ_BUFFER_SIZE = 1024 * 1024 * 10
DEFAULT_WRITE_BUFFER_SIZE = None
Expand Down Expand Up @@ -145,6 +146,8 @@ class Client(object):
CLOSED = 2
RECONNECTING = 3
CONNECTING = 4
DRAINING_SUBS = 5
DRAINING_PUBS = 6

def __repr__(self):
return "<nats client v{}>".format(__version__)
Expand Down Expand Up @@ -237,6 +240,7 @@ def connect(self,
max_write_buffer_size=DEFAULT_WRITE_BUFFER_SIZE,
read_chunk_size=DEFAULT_READ_CHUNK_SIZE,
tcp_nodelay=False,
drain_timeout=DEFAULT_DRAIN_TIMEOUT,
):
"""
Establishes a connection to a NATS server.
Expand Down Expand Up @@ -278,6 +282,7 @@ def connect(self,
# In seconds
self.options["connect_timeout"] = connect_timeout
self.options["ping_interval"] = ping_interval
self.options["drain_timeout"] = drain_timeout

# TLS customizations
if tls is not None:
Expand Down Expand Up @@ -523,6 +528,9 @@ def request(self, subject, payload, timeout=0.5, expected=1, cb=None):
<<- MSG hello 2 _INBOX.gnKUg9bmAHANjxIsDiQsWO 5
"""
if self.is_draining:
raise ErrConnectionDraining

# If callback given then continue to use old style.
if cb is not None:
next_inbox = INBOX_PREFIX[:]
Expand Down Expand Up @@ -654,6 +662,9 @@ def subscribe(
if self.is_closed:
raise ErrConnectionClosed

if self.is_draining:
raise ErrConnectionDraining

self._ssid += 1
sid = self._ssid
sub = Subscription(
Expand Down Expand Up @@ -794,6 +805,12 @@ def auto_unsubscribe(self, sid, limit=1):
blocks in order to be able to define request/response semantics via pub/sub
by announcing the server limited interest a priori.
"""
if self.is_draining:
raise ErrConnectionDraining
yield self._unsubscribe(sid, limit)

@tornado.gen.coroutine
def _unsubscribe(self, sid, limit=1):
b_limit = b''
if limit > 0:
b_limit = ("%d" % limit).encode()
Expand Down Expand Up @@ -1015,12 +1032,20 @@ def is_reconnecting(self):

@property
def is_connected(self):
return self._status == Client.CONNECTED
return self._status == Client.CONNECTED or self.is_draining

@property
def is_connecting(self):
return self._status == Client.CONNECTING

@property
def is_draining(self):
return (self._status == Client.DRAINING_SUBS or self._status == Client.DRAINING_PUBS)

@property
def is_draining_pubs(self):
return self._status == Client.DRAINING_PUBS

@tornado.gen.coroutine
def _process_op_err(self, err=None):
"""
Expand Down Expand Up @@ -1225,6 +1250,84 @@ def _close(self, status, do_callbacks=True):
if self._closed_cb is not None:
self._closed_cb()

@tornado.gen.coroutine
def drain(self, sid=None):
"""
Drain will put a connection into a drain state. All subscriptions will
immediately be put into a drain state. Upon completion, the publishers
will be drained and can not publish any additional messages. Upon draining
of the publishers, the connection will be closed. Use the `closed_cb'
option to know when the connection has moved from draining to closed.
If a sid is passed, just the subscription with that sid will be drained
without closing the connection.
"""
if self.is_draining:
return

if self.is_closed:
raise ErrConnectionClosed

if self.is_connecting or self.is_reconnecting:
raise ErrConnectionReconnecting

# Drain a single subscription
if sid is not None:
raise tornado.gen.Return(self._drain_sub(sid))

# Start draining the subscriptions
self._status = Client.DRAINING_SUBS

drain_tasks = []
for ssid, sub in self._subs.items():
task = self._drain_sub(ssid)
drain_tasks.append(task)

# Wait for subscriptions to stop handling messages.
drain_is_done = tornado.gen.multi(drain_tasks)
try:
yield tornado.gen.with_timeout(
timedelta(seconds=self.options["drain_timeout"]),
drain_is_done,
)
except tornado.gen.TimeoutError:
if self._error_cb is not None:
yield self._error_cb(ErrDrainTimeout())
finally:
self._status = Client.DRAINING_PUBS
yield self.flush()
yield self.close()

def _drain_sub(self, sid):
sub = self._subs.get(sid)
if sub is None:
raise ErrBadSubscription

drained = tornado.concurrent.Future()

# Draining happens async under a task per sub which can be awaited.
@tornado.gen.coroutine
def drain_subscription():
# Announce server that no longer want to receive more
# messages in this sub and just process the ones remaining.
yield self._unsubscribe(sid, limit=0)
yield self.flush()

# Wait until no more messages are left or enough received,
# then cancel the subscription task.
while sub.pending_queue.qsize() > 0 and not sub.closed:
yield tornado.gen.sleep(0.1)

# Subscription is done and won't be receiving further
# messages so can throw it away now.
self._subs.pop(sid, None)
self._remove_subscription(sub)
drained.set_result(True)
self._loop.spawn_callback(drain_subscription)

# Return future which can be used to await draining is done.
return drained

@tornado.gen.coroutine
def _process_err(self, err=None):
"""
Expand Down
17 changes: 17 additions & 0 deletions nats/io/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class ErrJsonParse(NatsError):
pass



class ErrSlowConsumer(NatsError):
"""
The client becomes a slow consumer if the server ends up
Expand Down Expand Up @@ -78,3 +79,19 @@ class ErrServerConnect(socket.error):
Raised when it could not establish a connection with server.
"""
pass

class ErrBadSubscription(NatsError):
def __str__(self):
return "nats: Invalid Subscription"

class ErrDrainTimeout(NatsError):
def __str__(self):
return "nats: Draining Connection Timed Out"

class ErrConnectionDraining(NatsError):
def __str__(self):
return "nats: Connection Draining"

class ErrConnectionReconnecting(NatsError):
def __str__(self):
return "nats: Connection Reconnecting"
Loading

0 comments on commit d7a690b

Please sign in to comment.