Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATS] Cassandra + HDFS #1

Merged
merged 50 commits into from
Jul 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
8c99589
add protobuf
rayandrew Jul 2, 2019
b777d40
chmod scripts
rayandrew Jul 2, 2019
ee7c232
refactor code
rayandrew Jul 2, 2019
266fbc1
add build essentials
rayandrew Jul 2, 2019
a2f9d21
add custom checker
rayandrew Jul 2, 2019
db7bfef
add utils
rayandrew Jul 2, 2019
0b1a8a3
add utils
rayandrew Jul 2, 2019
993b748
add utils
rayandrew Jul 2, 2019
24d458f
add utils
rayandrew Jul 2, 2019
a40d496
add maven
rayandrew Jul 2, 2019
dbe8204
add maven temp dir
rayandrew Jul 2, 2019
24f6e49
add maven temp dir
rayandrew Jul 2, 2019
b572380
add maven source
rayandrew Jul 2, 2019
f92571c
add maven source
rayandrew Jul 2, 2019
177a37a
fix vars
rayandrew Jul 2, 2019
eb9bc93
add hadoop scripts
rayandrew Jul 2, 2019
a90c0ff
add custom hadoop src
rayandrew Jul 2, 2019
7bc4e80
add hadoop installation
rayandrew Jul 2, 2019
b1a3d64
add hadoop installation
rayandrew Jul 2, 2019
8fdc309
add hdfs runner
rayandrew Jul 6, 2019
1868350
add hadoop runner
rayandrew Jul 6, 2019
80f7729
add hadoop runner
rayandrew Jul 6, 2019
c1e1128
add env
rayandrew Jul 6, 2019
55228e1
add env
rayandrew Jul 6, 2019
c95712e
fix for
rayandrew Jul 6, 2019
85415e1
fix spacing
rayandrew Jul 6, 2019
08680ec
[FEATS] add memory reader for hdfs
rayandrew Jul 6, 2019
7840577
[FEATS] add memory reader for hdfs
rayandrew Jul 6, 2019
81dd018
[FEATS] add memory reader for hdfs
rayandrew Jul 6, 2019
08a3697
add config
rayandrew Jul 6, 2019
65cb402
add custom log dir and cassandra
rayandrew Jul 7, 2019
7023172
add custom log dir and cassandra
rayandrew Jul 7, 2019
ded270b
add custom log dir and cassandra
rayandrew Jul 7, 2019
d5950d7
add custom log dir and cassandra
rayandrew Jul 7, 2019
4cf9240
add custom log dir and cassandra
rayandrew Jul 7, 2019
e9b089a
fix env
rayandrew Jul 7, 2019
eceaacd
fix env
rayandrew Jul 7, 2019
c2a9172
fix env
rayandrew Jul 7, 2019
96347f0
fix env and add reader
rayandrew Jul 7, 2019
2b5949b
fix env and add reader
rayandrew Jul 7, 2019
79cca27
fix env and add reader
rayandrew Jul 7, 2019
0aec2d8
fix env and add reader
rayandrew Jul 7, 2019
4706c63
fix env and add reader
rayandrew Jul 7, 2019
cdc50c8
fix env and add reader
rayandrew Jul 7, 2019
f5f4156
fix env and add reader
rayandrew Jul 7, 2019
40e2259
fix env and add reader
rayandrew Jul 7, 2019
a8778bd
fix env and add reader
rayandrew Jul 7, 2019
fc8f0e6
fix env and add reader
rayandrew Jul 7, 2019
4001637
fix env and add reader
rayandrew Jul 7, 2019
02e130b
add plot
rayandrew Jul 7, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.bak
.DS_Store
.ipynb_checkpoints
6 changes: 6 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[submodule "hdfs/source"]
path = hdfs/source
url = [email protected]:rayandrews/custom-hadoop.git
[submodule "cassandra/source"]
path = cassandra/source
url = [email protected]:rayandrews/custom-cassandra.git
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1 +1,15 @@
# ucare-research

Projects containing all of my UCARE Research

## Projects

### Memory Usage Tracker

- [Cassandra](https://github.com/rayandrews/ucare-research/tree/master/cassandra)
- [HDFS](https://github.com/rayandrews/ucare-research/tree/master/hdfs)

## Author

- Ray Andrew <[email protected]>
- Cesar Stuardo
47 changes: 47 additions & 0 deletions cassandra/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Cassandra Memory Usage

This project uses [CCM](https://github.com/riptano/ccm) by Riptano

## Installation

1. Building cassandra

```bash
ant
```

## Running

1. Run CCM

```bash
python3 deploy.py \
--node_count N \
--cassandra_dir CASSANDRA_DIR \
--cluster_name CLUSTER_NAME \
--cluster_path CLUSTER_PATH >> log.txt
```

or background task

```bash
nohup python3 deploy.py \
--node_count N \
--cassandra_dir CASSANDRA_DIR \
--cluster_name CLUSTER_NAME \
--cluster_path CLUSTER_PATH >> log.txt &
```

3. Preprocess the data

Just copy paste it to the csv or create the script

4. Plot the data

```bash
jupyter lab # open Visualization.ipynb
```

## Results

![cassandra-mem-usage](plot2.png)
472 changes: 472 additions & 0 deletions cassandra/Visualization.ipynb

Large diffs are not rendered by default.

51 changes: 51 additions & 0 deletions cassandra/data.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
nodes,memory
1,1
2,2
3,3
4,4
5,5
6,6
7,7
8,8
9,9
10,10
11,11
12,12
13,13
14,14
15,15
16,16
17,17
18,18
19,19
20,20
21,21
22,22
23,23
24,24
25,25
26,26
27,27
28,28
29,29
30,30
31,31
32,32
33,33
34,35
35,35
36,38
37,40
38,43
39,44
40,46
41,48
42,50
43,52
44,53
45,55
46,57
47,58
48,63
49,67
50,74
20 changes: 20 additions & 0 deletions cassandra/data2_after_heap.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
nodes,memory
5,5
10,10
15,16
20,35
25,43
30,56
35,64
40,76
45,86
50,94
55,107
60,114
65,127
70,136
75,145
80,156
85,166
90,176
100,198
161 changes: 161 additions & 0 deletions cassandra/deploy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
import os
import argparse
import mmap
import time

import itertools

from ccmlib.cluster import Cluster
from ccmlib import common, extension, repository
from ccmlib.node import Node, NodeError, TimeoutError

class CustomCluster(Cluster):
def __update_pids(self, started):
for node, p, _ in started:
node._update_pid(p)

def start(self, no_wait=False, verbose=False, wait_for_binary_proto=True,
wait_other_notice=True, jvm_args=['-Xms1024M', '-Xmx1024M'], profile_options=None,
quiet_start=False, allow_root=False, **kwargs):
# if jvm_args is None:
# jvm_args = []

extension.pre_cluster_start(self)

common.assert_jdk_valid_for_cassandra_version(self.cassandra_version())

# check whether all loopback aliases are available before starting any nodes
for node in list(self.nodes.values()):
if not node.is_running():
for itf in node.network_interfaces.values():
if itf is not None:
common.assert_socket_available(itf)

started = []
for node in list(self.nodes.values()):
if not node.is_running():
mark = 0
if os.path.exists(node.logfilename()):
mark = node.mark_log()

p = node.start(update_pid=False, jvm_args=jvm_args, profile_options=profile_options, verbose=verbose, quiet_start=quiet_start, allow_root=allow_root)

# Prior to JDK8, starting every node at once could lead to a
# nanotime collision where the RNG that generates a node's tokens
# gives identical tokens to several nodes. Thus, we stagger
# the node starts
# if common.get_jdk_version() < '1.8':

# [RAYANDREW] modify this
# print('Waiting 10s before starting other node')
time.sleep(10) # wait 10 seconds before starting other node

started.append((node, p, mark))

if no_wait:
time.sleep(2) # waiting 2 seconds to check for early errors and for the pid to be set
else:
for node, p, mark in started:
try:
start_message = "Listening for thrift clients..." if self.cassandra_version() < "2.2" else "Starting listening for CQL clients"
node.watch_log_for(start_message, timeout=kwargs.get('timeout',60), process=p, verbose=verbose, from_mark=mark)
except RuntimeError:
return None

self.__update_pids(started)

for node, p, _ in started:
if not node.is_running():
raise NodeError("Error starting {0}.".format(node.name), p)

if not no_wait:
if wait_other_notice:
for (node, _, mark), (other_node, _, _) in itertools.permutations(started, 2):
node.watch_log_for_alive(other_node, from_mark=mark)

if wait_for_binary_proto:
for node, p, mark in started:
node.wait_for_binary_interface(process=p, verbose=verbose, from_mark=mark)

extension.post_cluster_start(self)

return started


def _read_logs(filename, text='Used Memory'):
line = None

with open(filename, 'r') as f:
line = next((l for l in f if text in l), None)

return line

def _read_logs_2(filename, text='Used Memory'):
with open(filename, 'r') as f:
# memory-map the file, size 0 means whole file
m = mmap.mmap(f.fileno(), 0, prot=mmap.PROT_READ)
# prot argument is *nix only

i = m.rfind(b'Used Memory') # search for last occurrence of 'word'
m.seek(i) # seek to the location
line = m.readline() # read to the end of the line
return str(line)

def log_parser(args, node_count):
mems = []

for i in range(node_count):
line = _read_logs(os.path.join(args.cluster_path, args.cluster_name, 'node{}'.format(i + 1), 'logs', 'system.log'))
if line is not None:
mem_digits = [int(s) for s in line.split(' ') if s.isdigit()]
mems.append(mem_digits[0])
return mems

def deploy_cluster(args, node_count):
cluster = CustomCluster(
path=args.cluster_path,
name=args.cluster_name,
install_dir=args.cassandra_dir)
cluster.populate(node_count).start()

return cluster

def stop_remove_cluster(cluster):
cluster.stop()
cluster.remove()

if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='[Cassandra] - Memory Reader')

parser.add_argument('--node_count', '-nc', default=5, type=int, help='Cassandra Node Count')
parser.add_argument('--cassandra_dir', '-cd', default='/mnt/extra/cassandra', help='cassandra source dir')
parser.add_argument('--cluster_name', '-cn', default='test', help='cluster name')
parser.add_argument('--cluster_path', '-cp', default='/mnt/extra/working', help='ccm conf dir')
args = parser.parse_args()

for node_count in range(-5, args.node_count - 5, 5):
print('Starting Cluster consists of {} nodes'.format(node_count + 10))
cluster = deploy_cluster(args, node_count + 10)

print('Delay about 1 minute before trying to read memory logs')
time.sleep(60)

print('Start reading the logs')

mems = []

while True:
mems = log_parser(args, node_count + 10)
time.sleep(2)
if len(mems) == (node_count + 10):
break

print('List of mem used ', mems)
print('Total memory used for {} nodes is : {} MB'.format(node_count + 10, sum(mems)))

print('Stopping and Remove Cluster')
stop_remove_cluster(cluster)

print('Delaying 10 secs before spawning another cluster\n')
time.sleep(10)
Loading