Skip to content

Commit

Permalink
Merge pull request #3 from NucciTheBoss/data-manip
Browse files Browse the repository at this point in the history
Completed data manipulation stage
  • Loading branch information
NucciTheBoss authored Jun 22, 2021
2 parents 63bd246 + 348b6bb commit 6db9a5d
Show file tree
Hide file tree
Showing 6 changed files with 279 additions and 28 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,4 @@ dmypy.json

# jespipe specific directories
/data
/plugins
239 changes: 217 additions & 22 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from utils.workeradmin.skip import skip_attack
from mpi4py import MPI
import json
import sys
import re
import os
import time
import logging
from utils.workerops.preprocessing import preprocessing
from utils.workerops.recombine import recombine
from utils.workerops.paramfactory import paramfactory


# Global values to be shared across all nodes
Expand All @@ -24,7 +26,15 @@
from utils.workeradmin import skip
from utils.macro import xml2dict as x2d
from utils.macro.unwrap import unwrap_train, unwrap_attack
from utils.workerops import scattershot as scat
from utils.workerops import scattershot as sst


# Check if we are working in the same directory as main.py.
# If not, throw error as that will impact the pipelines ability to run.
cwd_contents = [f for f in os.listdir(".") if os.path.isfile(f)]
if sys.argv[0] not in cwd_contents:
gl.killmsg(comm, size, True)
raise OSError("Not in same directory as {}. Please change current working directory to where {} is located.".format(sys.argv[0], sys.argv[0]))

# Read in config to get default configurations file
fin = open(CONFIG_FILE, "rt"); config = fin.read(); fin.close()
Expand Down Expand Up @@ -68,28 +78,47 @@

train_macro_list = unwrap_train(train_control)

# Loop through train_macro_list and convert relative paths to absolute paths
for i in range(0, len(train_macro_list)):
# Convert to list in order to change elements
dataset = list(train_macro_list[i])

# Check if path to dataset is absolute
if os.path.isabs(dataset[1]) is False:
dataset[1] = os.path.abspath(dataset[1])

# Check if path to plugin is absolute
if os.path.isabs(dataset[5]) is False:
dataset[5] = os.path.abspath(dataset[5])

# Convert back to tuple
train_macro_list[i] = tuple(dataset)

# Loop through train_macro_list, creating directories for
# storing models for each dataset, as well as verifying
# that each specified dataset does exists
for macro in train_macro_list:
# Check that dataset exists. If not, raise file not found error
if os.path.isfile(macro[1]) is False:
if os.path.isfile(macro[1]) is False or os.path.isfile(macro[5]) is False:
gl.killmsg(comm, size, True)
raise(FileNotFoundError("Specified dataset is not found. Please verify that you are using the correct file path."))
raise FileNotFoundError("Specified dataset is not found. Please verify that you are using the correct file path.")

# If dataset file is verified to exist, create necessary directories
# If the dataset and plugin are verified to exist, create necessary directories
# Create data/$dataset_name/models <- where trained models are stored
os.makedirs("data/" + macro[0] + "/models", exist_ok=True)

# Create directives for the worker nodes
train_directive_list = scat.generate_train(train_macro_list)
sliced_directive_list = scat.slice(train_directive_list, size)
train_directive_list = sst.generate_train(train_macro_list)
sliced_directive_list = sst.slice(train_directive_list, size)

# Create directory for nodes to log their status if not exist
os.makedirs("data/.logs", exist_ok=True)

# Broadcast that everything is good to go for the training stage
gl.killmsg(comm, size, False)

# Send greenlight to workers and then follow up with tasks
node_rank = scat.delegate(comm, size, sliced_directive_list)
node_rank = sst.delegate(comm, size, sliced_directive_list)

# Block until manager hears back from all workers
node_status = list()
Expand Down Expand Up @@ -160,12 +189,37 @@
skip_stage_training = comm.recv(source=0, tag=1)

if skip_stage_training != 1:
logger.warning("INFO: Beginning execution of model training stage.")
logger.warning("INFO: Waiting for greenlight to start training stage.")

training_greenlight = comm.recv(source=0, tag=1)
if training_greenlight != 1:
logger.warning("ERROR: Received greenlight message {} for training stage. Aborting execution.".format(training_greenlight))
exit(127)

logger.warning("INFO: Received greenlight {}. Beginning execution of model training stage.".format(training_greenlight))

# Receive task from manager
task_list = comm.recv(source=0, tag=1)
logger.warning("INFO: Received task list {} from manager.".format(task_list))
comm.send(1, dest=0, tag=1)

# Check if task list sent is empty. If so, return message to the manager
if task_list != []:
# Loop through each of the tasks and perform necessary data manipulations
for task in task_list:
logger.warning("INFO: Beginning training of model {} using directive list {}.".format(task[2], task))

# Perform data manipulation using user specified data manipulation
feat, label = preprocessing(task[1], task[6], task[8])
recomb = recombine(feat, label, save=True, save_path="data/" + task[0] + "/models/maniped_data", manip_tag=task[7])

# Create dictionary that will be passed to plugin
param_dict = paramfactory(task[0], recomb, task[4], os.getcwd(), task[6], task[7])

comm.send(1, dest=0, tag=1)

else:
logger.warning("WARNING: Received empty task list. Returning status 1 to manager.")
comm.send(1, dest=0, tag=1)

else:
logger.warning("WARNING: Skipping training stage of pipeline.")
Expand Down Expand Up @@ -198,12 +252,35 @@
skip_stage_training = comm.recv(source=0, tag=2)

if skip_stage_training != 1:
logger.warning("INFO: Beginning execution of model training stage.")
logger.warning("INFO: Waiting for greenlight to start training stage.")

training_greenlight = comm.recv(source=0, tag=2)
if training_greenlight != 1:
logger.warning("ERROR: Received greenlight message {} for training stage. Aborting execution.".format(training_greenlight))
exit(127)

logger.warning("INFO: Received greenlight {}. Beginning execution of model training stage.".format(training_greenlight))

# Receive task from manager
task_list = comm.recv(source=0, tag=2)
logger.warning("INFO: Received task list {} from manager.".format(task_list))
comm.send(1, dest=0, tag=2)

if task_list != []:
for task in task_list:
logger.warning("INFO: Beginning training of model {} using directive list {}.".format(task[2], task))

# Perform data manipulation using user specified data manipulation
feat, label = preprocessing(task[1], task[6], task[8])
recomb = recombine(feat, label, save=True, save_path="data/" + task[0] + "/models/maniped_data", manip_tag=task[7])

# Create dictionary that will be passed to plugin
param_dict = paramfactory(task[0], recomb, task[4], os.getcwd(), task[6], task[7])

comm.send(1, dest=0, tag=2)

else:
logger.warning("WARNING: Received empty task list. Returning status 1 to manager.")
comm.send(1, dest=0, tag=2)

else:
logger.warning("WARNING: Skipping training stage of pipeline.")
Expand Down Expand Up @@ -236,12 +313,35 @@
skip_stage_training = comm.recv(source=0, tag=3)

if skip_stage_training != 1:
logger.warning("INFO: Beginning execution of model training stage.")
logger.warning("INFO: Waiting for greenlight to start training stage.")

training_greenlight = comm.recv(source=0, tag=3)
if training_greenlight != 1:
logger.warning("ERROR: Received greenlight message {} for training stage. Aborting execution.".format(training_greenlight))
exit(127)

logger.warning("INFO: Received greenlight {}. Beginning execution of model training stage.".format(training_greenlight))

# Receive task from manager
task_list = comm.recv(source=0, tag=3)
logger.warning("INFO: Received task list {} from manager.".format(task_list))
comm.send(1, dest=0, tag=3)

if task_list != []:
for task in task_list:
logger.warning("INFO: Beginning training of model {} using directive list {}.".format(task[2], task))

# Perform data manipulation using user specified data manipulation
feat, label = preprocessing(task[1], task[6], task[8])
recomb = recombine(feat, label, save=True, save_path="data/" + task[0] + "/models/maniped_data", manip_tag=task[7])

# Create dictionary that will be passed to plugin
param_dict = paramfactory(task[0], recomb, task[4], os.getcwd(), task[6], task[7])

comm.send(1, dest=0, tag=3)

else:
logger.warning("WARNING: Received empty task list. Returning status 1 to manager.")
comm.send(1, dest=0, tag=3)

else:
logger.warning("WARNING: Skipping training stage of pipeline.")
Expand Down Expand Up @@ -274,16 +374,42 @@
skip_stage_training = comm.recv(source=0, tag=4)

if skip_stage_training != 1:
logger.warning("INFO: Beginning execution of model training stage.")
logger.warning("INFO: Waiting for greenlight to start training stage.")

training_greenlight = comm.recv(source=0, tag=4)
if training_greenlight != 1:
logger.warning("ERROR: Received greenlight message {} for training stage. Aborting execution.".format(training_greenlight))
exit(127)

logger.warning("INFO: Received greenlight {}. Beginning execution of model training stage.".format(training_greenlight))

# Receive task from manager
task_list = comm.recv(source=0, tag=4)
logger.warning("INFO: Received task list {} from manager.".format(task_list))
comm.send(1, dest=0, tag=4)

if task_list != []:
for task in task_list:
logger.warning("INFO: Beginning training of model {} using directive list {}.".format(task[2], task))

# Perform data manipulation using user specified data manipulation
feat, label = preprocessing(task[1], task[6], task[8])
recomb = recombine(feat, label, save=True, save_path="data/" + task[0] + "/models/maniped_data", manip_tag=task[7])

# Create dictionary that will be passed to plugin
param_dict = paramfactory(task[0], recomb, task[4], os.getcwd(), task[6], task[7])

comm.send(1, dest=0, tag=4)

else:
logger.warning("WARNING: Received empty task list. Returning status 1 to manager.")
comm.send(1, dest=0, tag=4)

else:
logger.warning("WARNING: Skipping training stage of pipeline.")

# Send message to manager acknowledging the skipping of the training stage
comm.send(1, dest=0, tag=2)

# ATTACK STAGE
skip_stage_attack = comm.recv(source=0, tag=4)

Expand Down Expand Up @@ -312,12 +438,35 @@
skip_stage_training = comm.recv(source=0, tag=5)

if skip_stage_training != 1:
logger.warning("INFO: Beginning execution of model training stage.")
logger.warning("INFO: Waiting for greenlight to start training stage.")

training_greenlight = comm.recv(source=0, tag=5)
if training_greenlight != 1:
logger.warning("ERROR: Received greenlight message {} for training stage. Aborting execution.".format(training_greenlight))
exit(127)

logger.warning("INFO: Received greenlight {}. Beginning execution of model training stage.".format(training_greenlight))

# Receive task from manager
task_list = comm.recv(source=0, tag=5)
logger.warning("INFO: Received task list {} from manager.".format(task_list))
comm.send(1, dest=0, tag=5)

if task_list != []:
for task in task_list:
logger.warning("INFO: Beginning training of model {} using directive list {}.".format(task[2], task))

# Perform data manipulation using user specified data manipulation
feat, label = preprocessing(task[1], task[6], task[8])
recomb = recombine(feat, label, save=True, save_path="data/" + task[0] + "/models/maniped_data", manip_tag=task[7])

# Create dictionary that will be passed to plugin
param_dict = paramfactory(task[0], recomb, task[4], os.getcwd(), task[6], task[7])

comm.send(1, dest=0, tag=5)

else:
logger.warning("WARNING: Received empty task list. Returning status 1 to manager.")
comm.send(1, dest=0, tag=5)

else:
logger.warning("WARNING: Skipping training stage of pipeline.")
Expand Down Expand Up @@ -350,12 +499,35 @@
skip_stage_training = comm.recv(source=0, tag=6)

if skip_stage_training != 1:
logger.warning("INFO: Beginning execution of model training stage.")
logger.warning("INFO: Waiting for greenlight to start training stage.")

training_greenlight = comm.recv(source=0, tag=6)
if training_greenlight != 1:
logger.warning("ERROR: Received greenlight message {} for training stage. Aborting execution.".format(training_greenlight))
exit(127)

logger.warning("INFO: Received greenlight {}. Beginning execution of model training stage.".format(training_greenlight))

# Receive task from manager
task_list = comm.recv(source=0, tag=6)
logger.warning("INFO: Received task list {} from manager.".format(task_list))
comm.send(1, dest=0, tag=6)

if task_list != []:
for task in task_list:
logger.warning("INFO: Beginning training of model {} using directive list {}.".format(task[2], task))

# Perform data manipulation using user specified data manipulation
feat, label = preprocessing(task[1], task[6], task[8])
recomb = recombine(feat, label, save=True, save_path="data/" + task[0] + "/models/maniped_data", manip_tag=task[7])

# Create dictionary that will be passed to plugin
param_dict = paramfactory(task[0], recomb, task[4], os.getcwd(), task[6], task[7])

comm.send(1, dest=0, tag=6)

else:
logger.warning("WARNING: Received empty task list. Returning status 1 to manager.")
comm.send(1, dest=0, tag=6)

else:
logger.warning("WARNING: Skipping training stage of pipeline.")
Expand Down Expand Up @@ -388,12 +560,35 @@
skip_stage_training = comm.recv(source=0, tag=7)

if skip_stage_training != 1:
logger.warning("INFO: Beginning execution of model training stage.")
logger.warning("INFO: Waiting for greenlight to start training stage.")

training_greenlight = comm.recv(source=0, tag=7)
if training_greenlight != 1:
logger.warning("ERROR: Received greenlight message {} for training stage. Aborting execution.".format(training_greenlight))
exit(127)

logger.warning("INFO: Received greenlight {}. Beginning execution of model training stage.".format(training_greenlight))

# Receive task from manager
task_list = comm.recv(source=0, tag=7)
logger.warning("Received task list {} from manager.".format(task_list))
comm.send(1, dest=0, tag=7)

if task_list != []:
for task in task_list:
logger.warning("INFO: Beginning training of model {} using directive list {}.".format(task[2], task))

# Perform data manipulation using user specified data manipulation
feat, label = preprocessing(task[1], task[6], task[8])
recomb = recombine(feat, label, save=True, save_path="data/" + task[0] + "/models/maniped_data", manip_tag=task[7])

# Create dictionary that will be passed to plugin
param_dict = paramfactory(task[0], recomb, task[4], os.getcwd(), task[6], task[7])

comm.send(1, dest=0, tag=7)

else:
logger.warning("WARNING: Received empty task list. Returning status 1 to manager.")
comm.send(1, dest=0, tag=7)

else:
logger.warning("WARNING: Skipping training stage of pipeline.")
Expand Down
4 changes: 2 additions & 2 deletions test/test-macro-files/google-stock.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
</parameters>

<xgboost tag="xgb1">
<n_features>11</n_features>
<n_features>3</n_features>
</xgboost>

<randomforest tag="rf1"></randomforest>

<pca tag="pca1">
<n_features>11</n_features>
<n_features>3</n_features>
</pca>

<candlestick tag="cand1">
Expand Down
Loading

0 comments on commit 6db9a5d

Please sign in to comment.