Skip to content

Commit

Permalink
Merge pull request #42 from charliestrawn/yapf
Browse files Browse the repository at this point in the history
Format code with google/yapf
  • Loading branch information
wallyqs authored Jun 11, 2018
2 parents fdceb1b + 5b58dfe commit 4428b1e
Show file tree
Hide file tree
Showing 21 changed files with 512 additions and 425 deletions.
105 changes: 56 additions & 49 deletions benchmark/latency_perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,69 +8,76 @@
DEFAULT_ITERATIONS = 10000
HASH_MODULO = 1000


def show_usage():
message = """
message = """
Usage: latency_perf [options]
options:
-n ITERATIONS Iterations to spec (default: 1000)
-S SUBJECT Send subject (default: (test)
"""
print(message)
print(message)


def show_usage_and_die():
show_usage()
sys.exit(1)
show_usage()
sys.exit(1)


global received
received = 0


@tornado.gen.coroutine
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-n', '--iterations', default=DEFAULT_ITERATIONS, type=int)
parser.add_argument('-S', '--subject', default='test')
parser.add_argument('--servers', default=[], action='append')
args = parser.parse_args()

servers = args.servers
if len(args.servers) < 1:
servers = ["nats://127.0.0.1:4222"]
opts = { "servers": servers }

# Make sure we're connected to a server first...
nc = NATS()
try:
yield nc.connect(**opts)
except Exception, e:
sys.stderr.write("ERROR: {0}".format(e))
show_usage_and_die()

@tornado.gen.coroutine
def handler(msg):
yield nc.publish(msg.reply, "")
yield nc.subscribe(args.subject, cb=handler)

# Start the benchmark
start = time.time()
to_send = args.iterations

print("Sending {0} request/responses on [{1}]".format(
args.iterations, args.subject))
while to_send > 0:
to_send -= 1
if to_send == 0:
break

yield nc.timed_request(args.subject, "")
if (to_send % HASH_MODULO) == 0:
sys.stdout.write("+")
sys.stdout.flush()

duration = time.time() - start
ms = "%.3f" % ((duration/args.iterations) * 1000)
print("\nTest completed : {0} ms avg request/response latency".format(ms))
yield nc.close()
parser = argparse.ArgumentParser()
parser.add_argument(
'-n', '--iterations', default=DEFAULT_ITERATIONS, type=int)
parser.add_argument('-S', '--subject', default='test')
parser.add_argument('--servers', default=[], action='append')
args = parser.parse_args()

servers = args.servers
if len(args.servers) < 1:
servers = ["nats://127.0.0.1:4222"]
opts = {"servers": servers}

# Make sure we're connected to a server first...
nc = NATS()
try:
yield nc.connect(**opts)
except Exception, e:
sys.stderr.write("ERROR: {0}".format(e))
show_usage_and_die()

@tornado.gen.coroutine
def handler(msg):
yield nc.publish(msg.reply, "")

yield nc.subscribe(args.subject, cb=handler)

# Start the benchmark
start = time.time()
to_send = args.iterations

print("Sending {0} request/responses on [{1}]".format(
args.iterations, args.subject))
while to_send > 0:
to_send -= 1
if to_send == 0:
break

yield nc.timed_request(args.subject, "")
if (to_send % HASH_MODULO) == 0:
sys.stdout.write("+")
sys.stdout.flush()

duration = time.time() - start
ms = "%.3f" % ((duration / args.iterations) * 1000)
print("\nTest completed : {0} ms avg request/response latency".format(ms))
yield nc.close()


if __name__ == '__main__':
tornado.ioloop.IOLoop.instance().run_sync(main)
tornado.ioloop.IOLoop.instance().run_sync(main)
24 changes: 14 additions & 10 deletions benchmark/parser_perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,22 @@
import tornado.gen
from nats.protocol.parser import *

class DummyNatsClient:

class DummyNatsClient:
def __init__(self):
self._subs = {}
self._pongs = []
self._pings_outstanding = 0
self._pongs_received = 0
self._server_info = {"max_payload": 1048576, "auth_required": False }
self._server_info = {"max_payload": 1048576, "auth_required": False}
self.stats = {
'in_msgs': 0,
'out_msgs': 0,
'in_bytes': 0,
'out_bytes': 0,
'in_msgs': 0,
'out_msgs': 0,
'in_bytes': 0,
'out_bytes': 0,
'reconnects': 0,
'errors_received': 0
}
}

@tornado.gen.coroutine
def _send_command(self, cmd):
Expand All @@ -33,13 +33,14 @@ def _process_ping(self):

@tornado.gen.coroutine
def _process_msg(self, sid, subject, reply, data):
self.stats['in_msgs'] += 1
self.stats['in_msgs'] += 1
self.stats['in_bytes'] += len(data)

@tornado.gen.coroutine
def _process_err(self, err=None):
pass


def generate_msg(subject, nbytes, reply=""):
msg = []
protocol_line = "MSG {subject} 1 {reply} {nbytes}\r\n".format(
Expand All @@ -49,16 +50,19 @@ def generate_msg(subject, nbytes, reply=""):
msg.append(b'r\n')
return b''.join(msg)


def parse_msgs(max_msgs=1, nbytes=1):
buf = bytearray()
buf.extend(b''.join([generate_msg("foo", nbytes) for i in range(0, max_msgs)]))
buf.extend(b''.join(
[generate_msg("foo", nbytes) for i in range(0, max_msgs)]))
print("--- buffer size: {0}".format(len(buf)))
loop = tornado.ioloop.IOLoop.instance()
ps = Parser(DummyNatsClient())
ps.buf = buf
loop.run_sync(ps.parse)
print("--- stats: ", ps.nc.stats)


if __name__ == '__main__':

benchs = [
Expand All @@ -74,7 +78,7 @@ def parse_msgs(max_msgs=1, nbytes=1):
"parse_msgs(max_msgs=100000, nbytes=8192)",
"parse_msgs(max_msgs=10000, nbytes=16384)",
"parse_msgs(max_msgs=100000, nbytes=16384)",
]
]

for bench in benchs:
print("=== {0}".format(bench))
Expand Down
11 changes: 7 additions & 4 deletions benchmark/pub_perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
DEFAULT_BATCH_SIZE = 100
HASH_MODULO = 1000


def show_usage():
message = """
Usage: pub_perf [options]
Expand All @@ -23,10 +24,12 @@ def show_usage():
"""
print(message)


def show_usage_and_die():
show_usage()
sys.exit(1)


@tornado.gen.coroutine
def main():
parser = argparse.ArgumentParser()
Expand All @@ -45,7 +48,7 @@ def main():
servers = args.servers
if len(args.servers) < 1:
servers = ["nats://127.0.0.1:4222"]
opts = { "servers": servers }
opts = {"servers": servers}

# Make sure we're connected to a server first..
nc = NATS()
Expand Down Expand Up @@ -78,11 +81,11 @@ def main():
yield nc.flush()

elapsed = time.time() - start
mbytes = "%.1f" % (((args.size * args.count)/elapsed) / (1024*1024))
mbytes = "%.1f" % (((args.size * args.count) / elapsed) / (1024 * 1024))
print("\nTest completed : {0} msgs/sec ({1}) MB/sec\n".format(
args.count/elapsed,
mbytes))
args.count / elapsed, mbytes))
yield nc.close()


if __name__ == '__main__':
tornado.ioloop.IOLoop.instance().run_sync(main)
18 changes: 13 additions & 5 deletions benchmark/pub_sub_perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
DEFAULT_BATCH_SIZE = 100
HASH_MODULO = 1000


def show_usage():
message = """
Usage: pub_sub_perf [options]
Expand All @@ -22,22 +23,28 @@ def show_usage():
"""
print(message)


def show_usage_and_die():
show_usage()
sys.exit(1)


global received
received = 0


def close_cb():
print("Closed connection to NATS")


def disconnected_cb():
print("Disconnected from NATS")


def reconnected_cb():
print("Reconnected to NATS")


@tornado.gen.coroutine
def main():
parser = argparse.ArgumentParser()
Expand Down Expand Up @@ -66,7 +73,7 @@ def main():

# Make sure we're connected to a server first...
nc = NATS()
try:
try:
yield nc.connect(**opts)
except Exception, e:
sys.stderr.write("ERROR: {0}".format(e))
Expand Down Expand Up @@ -102,12 +109,13 @@ def handler(msg):
yield nc.flush()

elapsed = time.time() - start
mbytes = "%.1f" % (((args.size * args.count)/elapsed) / (1024*1024))
mbytes = "%.1f" % (((args.size * args.count) / elapsed) / (1024 * 1024))
print("\nTest completed : {0} msgs/sec sent ({1}) MB/sec\n".format(
args.count/elapsed,
mbytes))
print("Received {0} messages ({1} msgs/sec)".format(received, received/elapsed))
args.count / elapsed, mbytes))
print("Received {0} messages ({1} msgs/sec)".format(
received, received / elapsed))
yield nc.close()


if __name__ == '__main__':
tornado.ioloop.IOLoop.instance().run_sync(main)
19 changes: 14 additions & 5 deletions benchmark/sub_perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
DEFAULT_BATCH_SIZE = 100
HASH_MODULO = 1000


def show_usage():
message = """
Usage: sub_perf [options]
Expand All @@ -21,13 +22,16 @@ def show_usage():
"""
print(message)


def show_usage_and_die():
show_usage()
sys.exit(1)


global received
received = 0


@tornado.gen.coroutine
def main():
parser = argparse.ArgumentParser()
Expand All @@ -40,7 +44,7 @@ def main():
servers = args.servers
if len(args.servers) < 1:
servers = ["nats://127.0.0.1:4222"]
opts = { "servers": servers }
opts = {"servers": servers}

# Make sure we're connected to a server first...
nc = NATS()
Expand All @@ -66,14 +70,16 @@ def handler(msg):
elif args.subtype == 'async':
yield nc.subscribe_async(args.subject, cb=handler)
else:
sys.stderr.write("ERROR: Unsupported type of subscription {0}".format(e))
sys.stderr.write(
"ERROR: Unsupported type of subscription {0}".format(e))
show_usage_and_die()

# Start the benchmark
start = time.time()
to_send = args.count

print("Waiting for {0} messages on [{1}]...".format(args.count, args.subject))
print("Waiting for {0} messages on [{1}]...".format(
args.count, args.subject))
while received < args.count:
# Minimal pause in between batches sent to server
yield tornado.gen.sleep(0.1)
Expand All @@ -82,9 +88,12 @@ def handler(msg):
yield nc.flush()

elapsed = time.time() - start
print("\nTest completed : {0} msgs/sec sent \n".format(args.count/elapsed))
print("Received {0} messages ({1} msgs/sec)".format(received, received/elapsed))
print("\nTest completed : {0} msgs/sec sent \n".format(
args.count / elapsed))
print("Received {0} messages ({1} msgs/sec)".format(
received, received / elapsed))
yield nc.close()


if __name__ == '__main__':
tornado.ioloop.IOLoop.instance().run_sync(main)
Loading

0 comments on commit 4428b1e

Please sign in to comment.