Skip to content

Commit

Permalink
Merge pull request #4 from nats-io/client-updates
Browse files Browse the repository at this point in the history
Refactor and improvements to client
  • Loading branch information
Waldemar Quevedo committed May 22, 2016
2 parents 4147fce + 97324f6 commit 8b0dfae
Show file tree
Hide file tree
Showing 26 changed files with 1,195 additions and 1,031 deletions.
76 changes: 76 additions & 0 deletions benchmark/latency_perf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import argparse, sys
import tornado.ioloop
import tornado.gen
import time
from random import randint
from nats.io.client import Client as NATS

DEFAULT_ITERATIONS = 10000
HASH_MODULO = 1000

def show_usage():
message = """
Usage: latency_perf [options]
options:
-n ITERATIONS Iterations to spec (default: 1000)
-S SUBJECT Send subject (default: (test)
"""
print(message)

def show_usage_and_die():
show_usage()
sys.exit(1)

global received
received = 0

@tornado.gen.coroutine
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-n', '--iterations', default=DEFAULT_ITERATIONS, type=int)
parser.add_argument('-S', '--subject', default='test')
parser.add_argument('--servers', default=[], action='append')
args = parser.parse_args()

servers = args.servers
if len(args.servers) < 1:
servers = ["nats://127.0.0.1:4222"]
opts = { "servers": servers }

# Make sure we're connected to a server first...
nc = NATS()
try:
yield nc.connect(**opts)
except Exception, e:
sys.stderr.write("ERROR: {0}".format(e))
show_usage_and_die()

@tornado.gen.coroutine
def handler(msg):
yield nc.publish(msg.reply, "")
yield nc.subscribe(args.subject, cb=handler)

# Start the benchmark
start = time.time()
to_send = args.iterations

print("Sending {0} request/responses on [{1}]".format(
args.iterations, args.subject))
while to_send > 0:
to_send -= 1
if to_send == 0:
break

yield nc.timed_request(args.subject, "")
if (to_send % HASH_MODULO) == 0:
sys.stdout.write("+")
sys.stdout.flush()

duration = time.time() - start
ms = "%.3f" % ((duration/args.iterations) * 1000)
print("\nTest completed : {0} ms avg request/response latency".format(ms))
yield nc.close()

if __name__ == '__main__':
tornado.ioloop.IOLoop.instance().run_sync(main)
81 changes: 81 additions & 0 deletions benchmark/parser_perf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import cProfile as prof
import tornado.gen
from nats.protocol.parser import *

class DummyNatsClient:

def __init__(self):
self._subs = {}
self._pongs = []
self._pings_outstanding = 0
self._pongs_received = 0
self._server_info = {"max_payload": 1048576, "auth_required": False }
self.stats = {
'in_msgs': 0,
'out_msgs': 0,
'in_bytes': 0,
'out_bytes': 0,
'reconnects': 0,
'errors_received': 0
}

@tornado.gen.coroutine
def _send_command(self, cmd):
pass

@tornado.gen.coroutine
def _process_pong(self):
pass

@tornado.gen.coroutine
def _process_ping(self):
pass

@tornado.gen.coroutine
def _process_msg(self, sid, subject, reply, data):
self.stats['in_msgs'] += 1
self.stats['in_bytes'] += len(data)

@tornado.gen.coroutine
def _process_err(self, err=None):
pass

def generate_msg(subject, nbytes, reply=""):
msg = []
protocol_line = "MSG {subject} 1 {reply} {nbytes}\r\n".format(
subject=subject, reply=reply, nbytes=nbytes).encode()
msg.append(protocol_line)
msg.append(b'A' * nbytes)
msg.append(b'r\n')
return b''.join(msg)

def parse_msgs(max_msgs=1, nbytes=1):
buf = bytearray()
buf.extend(b''.join([generate_msg("foo", nbytes) for i in range(0, max_msgs)]))
print("--- buffer size: {0}".format(len(buf)))
loop = tornado.ioloop.IOLoop.instance()
ps = Parser(DummyNatsClient())
ps.buf = buf
loop.run_sync(ps.parse)
print("--- stats: ", ps.nc.stats)

if __name__ == '__main__':

benchs = [
"parse_msgs(max_msgs=10000, nbytes=1)",
"parse_msgs(max_msgs=100000, nbytes=1)",
"parse_msgs(max_msgs=10000, nbytes=64)",
"parse_msgs(max_msgs=100000, nbytes=64)",
"parse_msgs(max_msgs=10000, nbytes=256)",
"parse_msgs(max_msgs=100000, nbytes=256)",
"parse_msgs(max_msgs=10000, nbytes=1024)",
"parse_msgs(max_msgs=100000, nbytes=1024)",
"parse_msgs(max_msgs=10000, nbytes=8192)",
"parse_msgs(max_msgs=100000, nbytes=8192)",
"parse_msgs(max_msgs=10000, nbytes=16384)",
"parse_msgs(max_msgs=100000, nbytes=16384)",
]

for bench in benchs:
print("=== {0}".format(bench))
prof.run(bench)
88 changes: 88 additions & 0 deletions benchmark/pub_perf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import argparse, sys
import tornado.ioloop
import tornado.gen
import time
from random import randint
from nats.io.client import Client as NATS

DEFAULT_NUM_MSGS = 100000
DEFAULT_NUM_PUBS = 1
DEFAULT_MSG_SIZE = 16
DEFAULT_BATCH_SIZE = 100
HASH_MODULO = 1000

def show_usage():
message = """
Usage: pub_perf [options]
options:
-n COUNT Messages to send (default: 100000}
-s SIZE Message size (default: 16)
-S SUBJECT Send subject (default: (test)
-b BATCH Batch size (default: (100)
"""
print(message)

def show_usage_and_die():
show_usage()
sys.exit(1)

@tornado.gen.coroutine
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-n', '--count', default=DEFAULT_NUM_MSGS, type=int)
parser.add_argument('-s', '--size', default=DEFAULT_MSG_SIZE, type=int)
parser.add_argument('-S', '--subject', default='test')
parser.add_argument('-b', '--batch', default=DEFAULT_BATCH_SIZE, type=int)
parser.add_argument('--servers', default=[], action='append')
args = parser.parse_args()

data = []
for i in range(0, args.size):
data.append(b"%01x" % randint(0, 16))
payload = b''.join(data)

servers = args.servers
if len(args.servers) < 1:
servers = ["nats://127.0.0.1:4222"]
opts = { "servers": servers }

# Make sure we're connected to a server first..
nc = NATS()
try:
yield nc.connect(**opts)
except Exception, e:
sys.stderr.write("ERROR: {0}".format(e))
show_usage_and_die()

# Start the benchmark
start = time.time()
to_send = args.count

print("Sending {0} messages of size {1} bytes on [{2}]".format(
args.count, args.size, args.subject))
while to_send > 0:
for i in range(0, args.batch):
to_send -= 1
yield nc.publish(args.subject, payload)
if (to_send % HASH_MODULO) == 0:
sys.stdout.write("#")
sys.stdout.flush()
if to_send == 0:
break

# Minimal pause in between batches sent to server
yield tornado.gen.sleep(0.00001)

# Ensure that all commands have been processed by server already.
yield nc.flush()

elapsed = time.time() - start
mbytes = "%.1f" % (((args.size * args.count)/elapsed) / (1024*1024))
print("\nTest completed : {0} msgs/sec ({1}) MB/sec\n".format(
args.count/elapsed,
mbytes))
yield nc.close()

if __name__ == '__main__':
tornado.ioloop.IOLoop.instance().run_sync(main)
98 changes: 98 additions & 0 deletions benchmark/pub_sub_perf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import argparse, sys
import tornado.ioloop
import tornado.gen
import time
from random import randint
from nats.io.client import Client as NATS

DEFAULT_NUM_MSGS = 100000
DEFAULT_MSG_SIZE = 16
DEFAULT_BATCH_SIZE = 100
HASH_MODULO = 1000

def show_usage():
message = """
Usage: pub_sub_perf [options]
options:
-n COUNT Messages to send (default: 100000}
-s SIZE Message size (default: 16)
-S SUBJECT Send subject (default: (test)
-b BATCH Batch size (default: (100)
"""
print(message)

def show_usage_and_die():
show_usage()
sys.exit(1)

global received
received = 0

@tornado.gen.coroutine
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-n', '--count', default=DEFAULT_NUM_MSGS, type=int)
parser.add_argument('-s', '--size', default=DEFAULT_MSG_SIZE, type=int)
parser.add_argument('-S', '--subject', default='test')
parser.add_argument('-b', '--batch', default=DEFAULT_BATCH_SIZE, type=int)
parser.add_argument('--servers', default=[], action='append')
args = parser.parse_args()

data = []
for i in range(0, args.size):
data.append(b"%01x" % randint(0, 16))
payload = b''.join(data)

servers = args.servers
if len(args.servers) < 1:
servers = ["nats://127.0.0.1:4222"]
opts = { "servers": servers }

# Make sure we're connected to a server first...
nc = NATS()
try:
yield nc.connect(**opts)
except Exception, e:
sys.stderr.write("ERROR: {0}".format(e))
show_usage_and_die()

@tornado.gen.coroutine
def handler(msg):
global received
received += 1

yield nc.subscribe(args.subject, cb=handler)

# Start the benchmark
start = time.time()
to_send = args.count

print("Sending {0} messages of size {1} bytes on [{2}]".format(
args.count, args.size, args.subject))
while to_send > 0:
for i in range(0, args.batch):
to_send -= 1
yield nc.publish(args.subject, payload)
if (to_send % HASH_MODULO) == 0:
sys.stdout.write("#")
sys.stdout.flush()
if to_send == 0:
break

# Minimal pause in between batches sent to server
yield tornado.gen.sleep(0.00001)

# Ensure that all commands have been processed by server already.
yield nc.flush()

elapsed = time.time() - start
mbytes = "%.1f" % (((args.size * args.count)/elapsed) / (1024*1024))
print("\nTest completed : {0} msgs/sec sent ({1}) MB/sec\n".format(
args.count/elapsed,
mbytes))
print("Received {0} messages ({1} msgs/sec)".format(received, received/elapsed))
yield nc.close()

if __name__ == '__main__':
tornado.ioloop.IOLoop.instance().run_sync(main)
Loading

0 comments on commit 8b0dfae

Please sign in to comment.