Skip to content

Commit

Permalink
[DONE] Initialize more nodes for cassandra (#4)
Browse files Browse the repository at this point in the history
[DONE] Initialize more nodes for cassandra
  • Loading branch information
rayandrew authored Jul 8, 2019
2 parents 886bfe2 + 29ce35d commit 61bf77c
Show file tree
Hide file tree
Showing 8 changed files with 479 additions and 32 deletions.
91 changes: 59 additions & 32 deletions cassandra/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@
import argparse
import mmap
import time
import csv

import itertools

from ccmlib.cluster import Cluster
from ccmlib import common, extension, repository
from ccmlib.node import Node, NodeError, TimeoutError


class CustomCluster(Cluster):
def __update_pids(self, started):
for node, p, _ in started:
Expand Down Expand Up @@ -38,7 +40,8 @@ def start(self, no_wait=False, verbose=False, wait_for_binary_proto=True,
if os.path.exists(node.logfilename()):
mark = node.mark_log()

p = node.start(update_pid=False, jvm_args=jvm_args, profile_options=profile_options, verbose=verbose, quiet_start=quiet_start, allow_root=allow_root)
p = node.start(update_pid=False, jvm_args=jvm_args, profile_options=profile_options,
verbose=verbose, quiet_start=quiet_start, allow_root=allow_root)

# Prior to JDK8, starting every node at once could lead to a
# nanotime collision where the RNG that generates a node's tokens
Expand All @@ -48,17 +51,20 @@ def start(self, no_wait=False, verbose=False, wait_for_binary_proto=True,

# [RAYANDREW] modify this
# print('Waiting 10s before starting other node')
time.sleep(10) # wait 10 seconds before starting other node
time.sleep(10) # wait 10 seconds before starting other node

started.append((node, p, mark))

if no_wait:
time.sleep(2) # waiting 2 seconds to check for early errors and for the pid to be set
# waiting 2 seconds to check for early errors and for the pid to be set
time.sleep(2)
else:
for node, p, mark in started:
try:
start_message = "Listening for thrift clients..." if self.cassandra_version() < "2.2" else "Starting listening for CQL clients"
node.watch_log_for(start_message, timeout=kwargs.get('timeout',60), process=p, verbose=verbose, from_mark=mark)
start_message = "Listening for thrift clients..." if self.cassandra_version(
) < "2.2" else "Starting listening for CQL clients"
node.watch_log_for(start_message, timeout=kwargs.get(
'timeout', 60), process=p, verbose=verbose, from_mark=mark)
except RuntimeError:
return None

Expand All @@ -75,12 +81,13 @@ def start(self, no_wait=False, verbose=False, wait_for_binary_proto=True,

if wait_for_binary_proto:
for node, p, mark in started:
node.wait_for_binary_interface(process=p, verbose=verbose, from_mark=mark)
node.wait_for_binary_interface(
process=p, verbose=verbose, from_mark=mark)

extension.post_cluster_start(self)

return started


def _read_logs(filename, text='Used Memory'):
line = None
Expand All @@ -90,27 +97,31 @@ def _read_logs(filename, text='Used Memory'):

return line


def _read_logs_2(filename, text='Used Memory'):
with open(filename, 'r') as f:
# memory-map the file, size 0 means whole file
m = mmap.mmap(f.fileno(), 0, prot=mmap.PROT_READ)
# prot argument is *nix only
m = mmap.mmap(f.fileno(), 0, prot=mmap.PROT_READ)
# prot argument is *nix only

i = m.rfind(b'Used Memory') # search for last occurrence of 'word'
m.seek(i) # seek to the location
line = m.readline() # read to the end of the line
return str(line)


def log_parser(args, node_count):
mems = []

for i in range(node_count):
line = _read_logs(os.path.join(args.cluster_path, args.cluster_name, 'node{}'.format(i + 1), 'logs', 'system.log'))
line = _read_logs(os.path.join(
args.cluster_path, args.cluster_name, 'node{}'.format(i + 1), 'logs', 'system.log'))
if line is not None:
mem_digits = [int(s) for s in line.split(' ') if s.isdigit()]
mems.append(mem_digits[0])
return mems


def deploy_cluster(args, node_count):
cluster = CustomCluster(
path=args.cluster_path,
Expand All @@ -120,42 +131,58 @@ def deploy_cluster(args, node_count):

return cluster


def stop_remove_cluster(cluster):
cluster.stop()
cluster.remove()


if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='[Cassandra] - Memory Reader')

parser.add_argument('--node_count', '-nc', default=5, type=int, help='Cassandra Node Count')
parser.add_argument('--cassandra_dir', '-cd', default='/mnt/extra/cassandra', help='cassandra source dir')
parser.add_argument('--cluster_name', '-cn', default='test', help='cluster name')
parser.add_argument('--cluster_path', '-cp', default='/mnt/extra/working', help='ccm conf dir')
parser.add_argument('--node_count', '-nc', default=5,
type=int, help='Cassandra Node Count')
parser.add_argument('--cassandra_dir', '-cd',
default='/mnt/extra/cassandra', help='cassandra source dir')
parser.add_argument('--cluster_name', '-cn',
default='test', help='cluster name')
parser.add_argument('--cluster_path', '-cp',
default='/mnt/extra/working', help='ccm conf dir')
args = parser.parse_args()

for node_count in range(-5, args.node_count - 5, 5):
print('Starting Cluster consists of {} nodes'.format(node_count + 10))
cluster = deploy_cluster(args, node_count + 10)
# result = []

print('Delay about 1 minute before trying to read memory logs')
time.sleep(60)
with open('result.csv', mode='w') as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=['nodes', 'mems'])
writer.writeheader()

print('Start reading the logs')
for node_count in range(10, args.node_count + 10, 10):
print('Starting Cluster consists of {} nodes'.format(node_count))
cluster = deploy_cluster(args, node_count)

mems = []
print('Delay about 1 minute before trying to read memory logs')
time.sleep(60)

while True:
mems = log_parser(args, node_count + 10)
time.sleep(2)
if len(mems) == (node_count + 10):
break
print('Start reading the logs')

mems = []

while True:
mems = log_parser(args, node_count)
time.sleep(2)
if len(mems) == node_count:
break

total_mems = sum(mems)
print('List of mem used ', mems)
print('Total memory used for {} nodes is : {} MB'.format(
node_count, total_mems))

print('List of mem used ', mems)
print('Total memory used for {} nodes is : {} MB'.format(node_count + 10, sum(mems)))
writer.writerow({'nodes': node_count, 'mems': total_mems})

print('Stopping and Remove Cluster')
stop_remove_cluster(cluster)
print('Stopping and Remove Cluster')
stop_remove_cluster(cluster)

print('Delaying 10 secs before spawning another cluster\n')
time.sleep(10)
print('Delaying 10 secs before spawning another cluster\n')
time.sleep(10)
82 changes: 82 additions & 0 deletions cassandra/log_100.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
Starting Cluster consists of 10 nodes
/usr/local/lib/python3.4/dist-packages/ccmlib/node.py:1501: YAMLLoadWarning: calling yaml.load() without Loader=... is deprecated, as the default Loader is unsafe. Please read https://msg.pyyaml.org/load for full details.
data = yaml.load(f)
Delay about 1 minute before trying to read memory logs
Start reading the logs
List of mem used [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
Total memory used for 10 nodes is : 10 MB
Stopping and Remove Cluster
Delaying 10 secs before spawning another cluster

Starting Cluster consists of 20 nodes
Delay about 1 minute before trying to read memory logs
Start reading the logs
List of mem used [2, 1, 2, 2, 1, 2, 2, 1, 1, 2, 2, 1, 2, 2, 2, 1, 2, 2, 2, 2]
Total memory used for 20 nodes is : 34 MB
Stopping and Remove Cluster
Delaying 10 secs before spawning another cluster

Starting Cluster consists of 30 nodes
Delay about 1 minute before trying to read memory logs
Start reading the logs
List of mem used [2, 2, 1, 2, 1, 2, 2, 2, 1, 2, 2, 1, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2]
Total memory used for 30 nodes is : 54 MB
Stopping and Remove Cluster
Delaying 10 secs before spawning another cluster

Starting Cluster consists of 40 nodes
Delay about 1 minute before trying to read memory logs
Start reading the logs
List of mem used [2, 2, 1, 2, 2, 2, 2, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2]
Total memory used for 40 nodes is : 74 MB
Stopping and Remove Cluster
Delaying 10 secs before spawning another cluster

Starting Cluster consists of 50 nodes
Delay about 1 minute before trying to read memory logs
Start reading the logs
List of mem used [2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2]
Total memory used for 50 nodes is : 96 MB
Stopping and Remove Cluster
Delaying 10 secs before spawning another cluster

Starting Cluster consists of 60 nodes
Delay about 1 minute before trying to read memory logs
Start reading the logs
List of mem used [2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2]
Total memory used for 60 nodes is : 117 MB
Stopping and Remove Cluster
Delaying 10 secs before spawning another cluster

Starting Cluster consists of 70 nodes
Delay about 1 minute before trying to read memory logs
Start reading the logs
List of mem used [2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2]
Total memory used for 70 nodes is : 136 MB
Stopping and Remove Cluster
Delaying 10 secs before spawning another cluster

Starting Cluster consists of 80 nodes
Delay about 1 minute before trying to read memory logs
Start reading the logs
List of mem used [2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 1, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2]
Total memory used for 80 nodes is : 155 MB
Stopping and Remove Cluster
Delaying 10 secs before spawning another cluster

Starting Cluster consists of 90 nodes
Delay about 1 minute before trying to read memory logs
Start reading the logs
List of mem used [2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2]
Total memory used for 90 nodes is : 176 MB
Stopping and Remove Cluster
Delaying 10 secs before spawning another cluster

Starting Cluster consists of 100 nodes
Delay about 1 minute before trying to read memory logs
Start reading the logs
List of mem used [2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3, 2, 2, 2]
Total memory used for 100 nodes is : 199 MB
Stopping and Remove Cluster
Delaying 10 secs before spawning another cluster

11 changes: 11 additions & 0 deletions cassandra/result.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
nodes,mems
10,10
20,34
30,54
40,74
50,96
60,117
70,136
80,155
90,176
100,199
2 changes: 2 additions & 0 deletions test.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
nodes,mems
0,1
21 changes: 21 additions & 0 deletions visualization/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Visualization Memory Usage

This project uses matplotlib to plot memory usage of distributed systems

## Distributed Systems

- [x] Cassandra
- [x] HDFS
- [ ] HBase
- [ ] Tensorflow Distributed
- [ ] ...

## Running

```bash
jupyter lab
```

## Results

![mem-usages](plot.png)
Loading

0 comments on commit 61bf77c

Please sign in to comment.