diff --git a/cassandra/deploy.py b/cassandra/deploy.py index ebfe2b0..aa78b19 100644 --- a/cassandra/deploy.py +++ b/cassandra/deploy.py @@ -2,6 +2,7 @@ import argparse import mmap import time +import csv import itertools @@ -9,6 +10,7 @@ from ccmlib import common, extension, repository from ccmlib.node import Node, NodeError, TimeoutError + class CustomCluster(Cluster): def __update_pids(self, started): for node, p, _ in started: @@ -38,7 +40,8 @@ def start(self, no_wait=False, verbose=False, wait_for_binary_proto=True, if os.path.exists(node.logfilename()): mark = node.mark_log() - p = node.start(update_pid=False, jvm_args=jvm_args, profile_options=profile_options, verbose=verbose, quiet_start=quiet_start, allow_root=allow_root) + p = node.start(update_pid=False, jvm_args=jvm_args, profile_options=profile_options, + verbose=verbose, quiet_start=quiet_start, allow_root=allow_root) # Prior to JDK8, starting every node at once could lead to a # nanotime collision where the RNG that generates a node's tokens @@ -48,17 +51,20 @@ def start(self, no_wait=False, verbose=False, wait_for_binary_proto=True, # [RAYANDREW] modify this # print('Waiting 10s before starting other node') - time.sleep(10) # wait 10 seconds before starting other node + time.sleep(10) # wait 10 seconds before starting other node started.append((node, p, mark)) if no_wait: - time.sleep(2) # waiting 2 seconds to check for early errors and for the pid to be set + # waiting 2 seconds to check for early errors and for the pid to be set + time.sleep(2) else: for node, p, mark in started: try: - start_message = "Listening for thrift clients..." if self.cassandra_version() < "2.2" else "Starting listening for CQL clients" - node.watch_log_for(start_message, timeout=kwargs.get('timeout',60), process=p, verbose=verbose, from_mark=mark) + start_message = "Listening for thrift clients..." if self.cassandra_version( + ) < "2.2" else "Starting listening for CQL clients" + node.watch_log_for(start_message, timeout=kwargs.get( + 'timeout', 60), process=p, verbose=verbose, from_mark=mark) except RuntimeError: return None @@ -75,12 +81,13 @@ def start(self, no_wait=False, verbose=False, wait_for_binary_proto=True, if wait_for_binary_proto: for node, p, mark in started: - node.wait_for_binary_interface(process=p, verbose=verbose, from_mark=mark) + node.wait_for_binary_interface( + process=p, verbose=verbose, from_mark=mark) extension.post_cluster_start(self) return started - + def _read_logs(filename, text='Used Memory'): line = None @@ -90,27 +97,31 @@ def _read_logs(filename, text='Used Memory'): return line + def _read_logs_2(filename, text='Used Memory'): with open(filename, 'r') as f: # memory-map the file, size 0 means whole file - m = mmap.mmap(f.fileno(), 0, prot=mmap.PROT_READ) - # prot argument is *nix only + m = mmap.mmap(f.fileno(), 0, prot=mmap.PROT_READ) + # prot argument is *nix only i = m.rfind(b'Used Memory') # search for last occurrence of 'word' m.seek(i) # seek to the location line = m.readline() # read to the end of the line return str(line) + def log_parser(args, node_count): mems = [] for i in range(node_count): - line = _read_logs(os.path.join(args.cluster_path, args.cluster_name, 'node{}'.format(i + 1), 'logs', 'system.log')) + line = _read_logs(os.path.join( + args.cluster_path, args.cluster_name, 'node{}'.format(i + 1), 'logs', 'system.log')) if line is not None: mem_digits = [int(s) for s in line.split(' ') if s.isdigit()] mems.append(mem_digits[0]) return mems + def deploy_cluster(args, node_count): cluster = CustomCluster( path=args.cluster_path, @@ -120,42 +131,58 @@ def deploy_cluster(args, node_count): return cluster + def stop_remove_cluster(cluster): cluster.stop() cluster.remove() + if __name__ == '__main__': parser = argparse.ArgumentParser( description='[Cassandra] - Memory Reader') - parser.add_argument('--node_count', '-nc', default=5, type=int, help='Cassandra Node Count') - parser.add_argument('--cassandra_dir', '-cd', default='/mnt/extra/cassandra', help='cassandra source dir') - parser.add_argument('--cluster_name', '-cn', default='test', help='cluster name') - parser.add_argument('--cluster_path', '-cp', default='/mnt/extra/working', help='ccm conf dir') + parser.add_argument('--node_count', '-nc', default=5, + type=int, help='Cassandra Node Count') + parser.add_argument('--cassandra_dir', '-cd', + default='/mnt/extra/cassandra', help='cassandra source dir') + parser.add_argument('--cluster_name', '-cn', + default='test', help='cluster name') + parser.add_argument('--cluster_path', '-cp', + default='/mnt/extra/working', help='ccm conf dir') args = parser.parse_args() - for node_count in range(-5, args.node_count - 5, 5): - print('Starting Cluster consists of {} nodes'.format(node_count + 10)) - cluster = deploy_cluster(args, node_count + 10) + # result = [] - print('Delay about 1 minute before trying to read memory logs') - time.sleep(60) + with open('result.csv', mode='w') as csv_file: + writer = csv.DictWriter(csv_file, fieldnames=['nodes', 'mems']) + writer.writeheader() - print('Start reading the logs') + for node_count in range(10, args.node_count + 10, 10): + print('Starting Cluster consists of {} nodes'.format(node_count)) + cluster = deploy_cluster(args, node_count) - mems = [] + print('Delay about 1 minute before trying to read memory logs') + time.sleep(60) - while True: - mems = log_parser(args, node_count + 10) - time.sleep(2) - if len(mems) == (node_count + 10): - break + print('Start reading the logs') + + mems = [] + + while True: + mems = log_parser(args, node_count) + time.sleep(2) + if len(mems) == node_count: + break + + total_mems = sum(mems) + print('List of mem used ', mems) + print('Total memory used for {} nodes is : {} MB'.format( + node_count, total_mems)) - print('List of mem used ', mems) - print('Total memory used for {} nodes is : {} MB'.format(node_count + 10, sum(mems))) + writer.writerow({'nodes': node_count, 'mems': total_mems}) - print('Stopping and Remove Cluster') - stop_remove_cluster(cluster) + print('Stopping and Remove Cluster') + stop_remove_cluster(cluster) - print('Delaying 10 secs before spawning another cluster\n') - time.sleep(10) \ No newline at end of file + print('Delaying 10 secs before spawning another cluster\n') + time.sleep(10) diff --git a/cassandra/log_100.txt b/cassandra/log_100.txt new file mode 100644 index 0000000..e9e11ff --- /dev/null +++ b/cassandra/log_100.txt @@ -0,0 +1,82 @@ +Starting Cluster consists of 10 nodes +/usr/local/lib/python3.4/dist-packages/ccmlib/node.py:1501: YAMLLoadWarning: calling yaml.load() without Loader=... is deprecated, as the default Loader is unsafe. Please read https://msg.pyyaml.org/load for full details. + data = yaml.load(f) +Delay about 1 minute before trying to read memory logs +Start reading the logs +List of mem used [1, 1, 1, 1, 1, 1, 1, 1, 1, 1] +Total memory used for 10 nodes is : 10 MB +Stopping and Remove Cluster +Delaying 10 secs before spawning another cluster + +Starting Cluster consists of 20 nodes +Delay about 1 minute before trying to read memory logs +Start reading the logs +List of mem used [2, 1, 2, 2, 1, 2, 2, 1, 1, 2, 2, 1, 2, 2, 2, 1, 2, 2, 2, 2] +Total memory used for 20 nodes is : 34 MB +Stopping and Remove Cluster +Delaying 10 secs before spawning another cluster + +Starting Cluster consists of 30 nodes +Delay about 1 minute before trying to read memory logs +Start reading the logs +List of mem used [2, 2, 1, 2, 1, 2, 2, 2, 1, 2, 2, 1, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2] +Total memory used for 30 nodes is : 54 MB +Stopping and Remove Cluster +Delaying 10 secs before spawning another cluster + +Starting Cluster consists of 40 nodes +Delay about 1 minute before trying to read memory logs +Start reading the logs +List of mem used [2, 2, 1, 2, 2, 2, 2, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2] +Total memory used for 40 nodes is : 74 MB +Stopping and Remove Cluster +Delaying 10 secs before spawning another cluster + +Starting Cluster consists of 50 nodes +Delay about 1 minute before trying to read memory logs +Start reading the logs +List of mem used [2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2] +Total memory used for 50 nodes is : 96 MB +Stopping and Remove Cluster +Delaying 10 secs before spawning another cluster + +Starting Cluster consists of 60 nodes +Delay about 1 minute before trying to read memory logs +Start reading the logs +List of mem used [2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2] +Total memory used for 60 nodes is : 117 MB +Stopping and Remove Cluster +Delaying 10 secs before spawning another cluster + +Starting Cluster consists of 70 nodes +Delay about 1 minute before trying to read memory logs +Start reading the logs +List of mem used [2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2] +Total memory used for 70 nodes is : 136 MB +Stopping and Remove Cluster +Delaying 10 secs before spawning another cluster + +Starting Cluster consists of 80 nodes +Delay about 1 minute before trying to read memory logs +Start reading the logs +List of mem used [2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 1, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2] +Total memory used for 80 nodes is : 155 MB +Stopping and Remove Cluster +Delaying 10 secs before spawning another cluster + +Starting Cluster consists of 90 nodes +Delay about 1 minute before trying to read memory logs +Start reading the logs +List of mem used [2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2] +Total memory used for 90 nodes is : 176 MB +Stopping and Remove Cluster +Delaying 10 secs before spawning another cluster + +Starting Cluster consists of 100 nodes +Delay about 1 minute before trying to read memory logs +Start reading the logs +List of mem used [2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3, 2, 2, 2] +Total memory used for 100 nodes is : 199 MB +Stopping and Remove Cluster +Delaying 10 secs before spawning another cluster + diff --git a/cassandra/result.csv b/cassandra/result.csv new file mode 100644 index 0000000..95a9390 --- /dev/null +++ b/cassandra/result.csv @@ -0,0 +1,11 @@ +nodes,mems +10,10 +20,34 +30,54 +40,74 +50,96 +60,117 +70,136 +80,155 +90,176 +100,199 diff --git a/test.csv b/test.csv new file mode 100644 index 0000000..289f3bd --- /dev/null +++ b/test.csv @@ -0,0 +1,2 @@ +nodes,mems +0,1 diff --git a/visualization/README.md b/visualization/README.md new file mode 100644 index 0000000..0a0df08 --- /dev/null +++ b/visualization/README.md @@ -0,0 +1,21 @@ +# Visualization Memory Usage + +This project uses matplotlib to plot memory usage of distributed systems + +## Distributed Systems + +- [x] Cassandra +- [x] HDFS +- [ ] HBase +- [ ] Tensorflow Distributed +- [ ] ... + +## Running + +```bash +jupyter lab +``` + +## Results + +![mem-usages](plot.png) diff --git a/visualization/Visualization.ipynb b/visualization/Visualization.ipynb new file mode 100644 index 0000000..86608f0 --- /dev/null +++ b/visualization/Visualization.ipynb @@ -0,0 +1,293 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Populating the interactive namespace from numpy and matplotlib\n" + ] + } + ], + "source": [ + "# This line configures matplotlib to show figures embedded in the notebook, \n", + "# instead of poping up a new window. More about that later. \n", + "%pylab inline" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "%matplotlib inline" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "from pylab import *\n", + "import matplotlib.pyplot as plt" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "import pandas as pd\n", + "from scipy.interpolate import interp1d\n", + "import numpy as np" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [], + "source": [ + "distributed_systems = ['cassandra', 'hdfs']" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "df = pd.read_csv('./data.csv')\n", + "df = df.set_index('nodes')" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
cassandrahdfs
nodes
101026
203446
305466
407489
5096109
\n", + "
" + ], + "text/plain": [ + " cassandra hdfs\n", + "nodes \n", + "10 10 26\n", + "20 34 46\n", + "30 54 66\n", + "40 74 89\n", + "50 96 109" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df.head()" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "Int64Index([10, 20, 30, 40, 50, 60, 70, 80, 90, 100], dtype='int64', name='nodes')" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df.index" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "Text(0, 0.5, 'Memory (MB)')" + ] + }, + "execution_count": 8, + "metadata": {}, + "output_type": "execute_result" + }, + { + "data": { + "image/png": "\n", + "text/plain": [ + "
" + ] + }, + "metadata": { + "needs_background": "light" + }, + "output_type": "display_data" + } + ], + "source": [ + "ax = df.plot.line()\n", + "ax.set_title('Before interpolation')\n", + "ax.set_xlabel(\"Nodes\")\n", + "ax.set_ylabel(\"Memory (MB)\")" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [], + "source": [ + "# f = interp1d(df.index, df['memory'], kind='cubic')\n", + "f = []\n", + "for system in distributed_systems:\n", + " f.append(interp1d(df.index, df[system], kind='cubic'))" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "metadata": {}, + "outputs": [], + "source": [ + "df_int = pd.DataFrame()\n", + "new_index = np.arange(10, 110, 10)\n", + "# df_int['memory'] = f(new_index)\n", + "\n", + "for i in range(len(distributed_systems)):\n", + " df_int[distributed_systems[i]] = f[i](new_index) \n", + " \n", + "df_int.index = new_index\n" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "\n", + "text/plain": [ + "
" + ] + }, + "metadata": { + "needs_background": "light" + }, + "output_type": "display_data" + } + ], + "source": [ + "ax2 = df_int.plot.line(figsize=(15,8), grid=True)\n", + "ax2.set_title('Cassandra JVM Memory Usage')\n", + "ax2.set_xlabel(\"Nodes\")\n", + "ax2.set_ylabel(\"Memory (MB)\")\n", + "plt.savefig('plot.png')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.3" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/visualization/data.csv b/visualization/data.csv new file mode 100644 index 0000000..4953072 --- /dev/null +++ b/visualization/data.csv @@ -0,0 +1,11 @@ +nodes,cassandra,hdfs +10,10,26 +20,34,46 +30,54,66 +40,74,89 +50,96,109 +60,117,129 +70,136,152 +80,155,170 +90,176,193 +100,199,213 \ No newline at end of file diff --git a/visualization/plot.png b/visualization/plot.png new file mode 100644 index 0000000..b57b66c Binary files /dev/null and b/visualization/plot.png differ