Skip to content

Commit

Permalink
Merge pull request #9 from wallyqs/flusher-queue-pending
Browse files Browse the repository at this point in the history
Read loop and flusher improvements
  • Loading branch information
Waldemar Quevedo authored Oct 3, 2016
2 parents a8123cf + d63f091 commit 961e3dc
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 13 deletions.
2 changes: 1 addition & 1 deletion benchmark/pub_perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def main():

data = []
for i in range(0, args.size):
data.append(b"%01x" % randint(0, 16))
data.append(b'W')
payload = b''.join(data)

servers = args.servers
Expand Down
19 changes: 17 additions & 2 deletions benchmark/pub_sub_perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ def show_usage_and_die():
global received
received = 0

def close_cb():
print("Closed connection to NATS")

def disconnected_cb():
print("Disconnected from NATS")

def reconnected_cb():
print("Reconnected to NATS")

@tornado.gen.coroutine
def main():
parser = argparse.ArgumentParser()
Expand All @@ -41,13 +50,19 @@ def main():

data = []
for i in range(0, args.size):
data.append(b"%01x" % randint(0, 16))
data.append("W")
payload = b''.join(data)

servers = args.servers
if len(args.servers) < 1:
servers = ["nats://127.0.0.1:4222"]
opts = { "servers": servers }
opts = {
"servers": servers,
"disconnected_cb": disconnected_cb,
"close_cb": close_cb,
"reconnected_cb": reconnected_cb,
"allow_reconnect": False,
}

# Make sure we're connected to a server first...
nc = NATS()
Expand Down
31 changes: 23 additions & 8 deletions nats/io/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
RECONNECT_TIME_WAIT = 2 # seconds
DEFAULT_CONNECT_TIMEOUT = 2 # seconds

DEFAULT_READ_BUFFER_SIZE = 1024 * 1024 * 100
DEFAULT_READ_BUFFER_SIZE = 1024 * 1024 * 10
DEFAULT_WRITE_BUFFER_SIZE = None
DEFAULT_READ_CHUNK_SIZE = 32768 * 2
DEFAULT_PENDING_SIZE = 1024 * 1024
Expand Down Expand Up @@ -831,8 +831,17 @@ def _read_loop(self, data=''):
of maximum MAX_CONTROL_LINE_SIZE, then received bytes are streamed
to the parsing callback for processing.
"""
if not self.io.closed():
self.io.read_bytes(MAX_CONTROL_LINE_SIZE, callback=self._read_loop, streaming_callback=self._ps.parse, partial=True)
while True:
if not self.is_connected or self.is_connecting or self.io.closed():
break

try:
yield self.io.read_bytes(DEFAULT_READ_CHUNK_SIZE, streaming_callback=self._ps.parse, partial=True)
except tornado.iostream.StreamClosedError as e:
self._err = e
if self._error_cb is not None and not self.is_reconnecting:
self._error_cb(e)
break

@tornado.gen.coroutine
def _flusher_loop(self):
Expand All @@ -841,13 +850,19 @@ def _flusher_loop(self):
and then flushes them to the socket.
"""
while True:
if self.io.closed():
break
try:
# Block and wait for the flusher to be kicked
yield self._flush_queue.get()
yield self.io.write(b''.join(self._pending))
self._pending = []
self._pending_size = 0

# Check whether we should bail first
if not self.is_connected or self.is_connecting or self.io.closed():
break

# Flush only when we actually have something in buffer...
if self._pending_size > 0:
yield self.io.write(b''.join(self._pending))
self._pending = []
self._pending_size = 0
except tornado.iostream.StreamBufferFullError:
# Acumulate as pending data size and flush when possible.
pass
Expand Down
6 changes: 4 additions & 2 deletions tests/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1014,8 +1014,10 @@ def publisher(self):
yield c.nc.subscribe("foo", "", log.persist)
self.io_loop.spawn_callback(c.publisher)

yield tornado.gen.sleep(0.001)
orig_gnatsd = self.server_pool.pop(0)
orig_gnatsd.finish()
yield tornado.gen.sleep(0.001)

try:
a = nc._current_server
Expand Down Expand Up @@ -1297,7 +1299,7 @@ def reconnected_cb(self):

self.assertTrue(c.disconnected_cb_called)
self.assertFalse(c.close_cb_called)
self.assertFalse(c.error_cb_called)
self.assertTrue(c.error_cb_called)
self.assertTrue(c.reconnected_cb_called)

for i in range(0, 5):
Expand All @@ -1309,7 +1311,7 @@ def reconnected_cb(self):
yield c.nc.close()
self.assertTrue(c.disconnected_cb_called)
self.assertTrue(c.close_cb_called)
self.assertFalse(c.error_cb_called)
self.assertTrue(c.error_cb_called)
self.assertTrue(c.reconnected_cb_called)

class ClientTLSCertsTest(tornado.testing.AsyncTestCase):
Expand Down

0 comments on commit 961e3dc

Please sign in to comment.