diff --git a/.github/workflows/test_file_reader.yml b/.github/workflows/test_file_reader.yml index a0dc0aee..fbe54239 100644 --- a/.github/workflows/test_file_reader.yml +++ b/.github/workflows/test_file_reader.yml @@ -28,6 +28,7 @@ jobs: python -m pip install --upgrade pip pip install -r requirements.txt pip install . + pip install graphviz - name: Test reader run: | pip install pytest diff --git a/.github/workflows/test_file_writer.yml b/.github/workflows/test_file_writer.yml index 7e51dad7..ef4f0c0d 100644 --- a/.github/workflows/test_file_writer.yml +++ b/.github/workflows/test_file_writer.yml @@ -26,7 +26,10 @@ jobs: run: | python -m pip install --upgrade pip pip install -r requirements.txt - pip install . + python -m pip install opencv-python + pip install . + pip install graphviz + sudo apt-get install graphviz - name: Test reader run: | pip install pytest diff --git a/.github/workflows/test_plugin.yml b/.github/workflows/test_plugin.yml deleted file mode 100644 index 6d1b8e01..00000000 --- a/.github/workflows/test_plugin.yml +++ /dev/null @@ -1,36 +0,0 @@ -name: test_plugin.py test - -on: - push: - branches: - - main - pull_request: - branches: - - main - - -jobs: - linux: - runs-on: ubuntu-latest - strategy: - matrix: - python-version: ['3.11'] - - steps: - - uses: actions/checkout@v4 - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v5 - with: - python-version: ${{ matrix.python-version }} - - name: Install dependencies - run: | - python -m pip install --upgrade pip - pip install -r requirements.txt - python -m pip install opencv-python - pip install . - pip install graphviz - sudo apt-get install graphviz - - name: Test reader - run: | - pip install pytest - pytest dsi/tests/test_plugin.py \ No newline at end of file diff --git a/dsi/backends/parquet.py b/dsi/backends/parquet.py index 92f8eb30..7bb38b9a 100644 --- a/dsi/backends/parquet.py +++ b/dsi/backends/parquet.py @@ -1,7 +1,5 @@ import pyarrow as pa from pyarrow import parquet as pq -import nbconvert as nbc -import nbformat as nbf import subprocess from dsi.backends.filesystem import Filesystem @@ -46,6 +44,9 @@ def get_cmd_output(cmd: list) -> str: return proc.stdout.strip().decode("utf-8") def inspect_artifacts(self, collection, interactive=False): + import nbconvert as nbc + import nbformat as nbf + """Populate a Jupyter notebook with tools required to look at Parquet data.""" nb = nbf.v4.new_notebook() text = """\ diff --git a/dsi/backends/sqlite.py b/dsi/backends/sqlite.py index b6f9c369..b85eb680 100644 --- a/dsi/backends/sqlite.py +++ b/dsi/backends/sqlite.py @@ -2,9 +2,9 @@ import sqlite3 import json import re -# import yaml -# import subprocess -# import os +import subprocess +from datetime import datetime +import textwrap from collections import OrderedDict from dsi.backends.filesystem import Filesystem @@ -46,10 +46,11 @@ class Sqlite(Filesystem): con = None cur = None - def __init__(self, filename): + def __init__(self, filename, run_table = True): self.filename = filename self.con = sqlite3.connect(filename) self.cur = self.con.cursor() + self.run_flag = run_table def check_type(self, text): """ @@ -76,22 +77,23 @@ def put_artifact_type(self, types, foreign_query = None, isVerbose=False): `return`: none """ - # key_names = types.properties.keys() - key_names = types.unit_keys - if "_units" in types.name: - key_names = [item + " UNIQUE" for item in types.unit_keys] - - col_names = ', '.join(key_names) - str_query = "CREATE TABLE IF NOT EXISTS {} ({}".format(str(types.name), col_names) + col_names = ', '.join(types.unit_keys) + if self.run_flag: + str_query = "CREATE TABLE IF NOT EXISTS {} (run_id, {}".format(str(types.name), col_names) + else: + str_query = "CREATE TABLE IF NOT EXISTS {} ({}".format(str(types.name), col_names) if foreign_query != None: str_query += foreign_query - str_query += ");" + + if self.run_flag: + str_query += ", FOREIGN KEY (run_id) REFERENCES runTable (run_id));" + else: + str_query += ");" if isVerbose: print(str_query) self.cur.execute(str_query) - self.con.commit() self.types = types @@ -119,25 +121,38 @@ def put_artifacts(self, collection, isVerbose=False): `collection`: A Python Collection of an Artifact derived class that has multiple regular structures of a defined schema, filled with rows to insert. - `return`: none """ # Core compatibility name assignment artifacts = collection - - + + insertError = False + errorString = None + if self.run_flag: + runTable_create = "CREATE TABLE IF NOT EXISTS runTable (run_id INTEGER PRIMARY KEY AUTOINCREMENT, run_timestamp TEXT UNIQUE);" + self.cur.execute(runTable_create) + self.con.commit() + + timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + runTable_insert = f"INSERT INTO runTable (run_timestamp) VALUES ('{timestamp}');" + if insertError == False: + try: + self.cur.execute(runTable_insert) + except sqlite3.Error as e: + if errorString is None: + errorString = e + insertError = True + self.con.rollback() + else: + self.con.rollback() for tableName, tableData in artifacts.items(): - if tableName == "dsi_relations": + if tableName == "dsi_relations" or tableName == "dsi_units": continue types = DataType() types.properties = {} types.unit_keys = [] - - # Check if this has been defined from helper function - '''if self.types != None: - types.name = self.types.name''' types.name = tableName foreign_query = "" @@ -163,27 +178,56 @@ def put_artifacts(self, collection, isVerbose=False): col_names = ', '.join(types.properties.keys()) placeholders = ', '.join('?' * len(types.properties)) - if "_units" in tableName: - str_query = "INSERT OR IGNORE INTO {} ({}) VALUES ({});".format(str(types.name), col_names, placeholders) + str_query = "INSERT INTO " + if self.run_flag: + run_id = self.cur.execute("SELECT run_id FROM runTable ORDER BY run_id DESC LIMIT 1;").fetchone()[0] + str_query += "{} (run_id, {}) VALUES ({}, {});".format(str(types.name), col_names, run_id, placeholders) else: - str_query = "INSERT INTO {} ({}) VALUES ({});".format(str(types.name), col_names, placeholders) - - # col_list helps access the specific keys of the dictionary in the for loop - col_list = col_names.split(', ') - - # loop through the contents of each column and insert into table as a row - for ind1 in range(len(types.properties[col_list[0]])): - vals = [] - for ind2 in range(len(types.properties.keys())): - vals.append(str(types.properties[col_list[ind2]][ind1])) - tup_vals = tuple(vals) - self.cur.execute(str_query,tup_vals) + str_query += "{} ({}) VALUES ({});".format(str(types.name), col_names, placeholders) + + rows = zip(*types.properties.values()) + if insertError == False: + try: + self.cur.executemany(str_query,rows) + except sqlite3.Error as e: + if errorString is None: + errorString = e + insertError = True + self.con.rollback() + else: + self.con.rollback() if isVerbose: - print(str_query) + print(str_query) + self.types = types #This will only copy the last table from artifacts (collections input) + + if "dsi_units" in artifacts.keys(): + create_query = "CREATE TABLE IF NOT EXISTS dsi_units (table_name TEXT, column_and_unit TEXT UNIQUE)" + self.cur.execute(create_query) + for tableName, tableData in artifacts["dsi_units"].items(): + if len(tableData) > 0: + for col_unit_pair in tableData: + str_query = f'INSERT OR IGNORE INTO dsi_units VALUES ("{tableName}", "{col_unit_pair}")' + if insertError == False: + try: + self.cur.execute(str_query) + except sqlite3.Error as e: + if errorString is None: + errorString = e + insertError = True + self.con.rollback() + else: + self.con.rollback() - self.con.commit() - self.types = types #This will only copy the last table from artifacts (collections input) + try: + assert insertError == False + self.con.commit() + except Exception as e: + self.con.rollback() + if type(e) is AssertionError: + return errorString + else: + return e def put_artifacts_only(self, artifacts, isVerbose=False): """ @@ -386,12 +430,84 @@ def get_artifacts(self, query, isVerbose=False): return data def inspect_artifacts(self, collection, interactive=False): - #nb = nbf.v4.new_notebook() + import nbconvert as nbc + import nbformat as nbf + + dsi_relations = dict(collection["dsi_relations"]) + dsi_units = dict(collection["dsi_units"]) + + nb = nbf.v4.new_notebook() text = """\ - # This notebook was auto-generated by a DSI Backend for SQLite. - # Execute the Jupyter notebook cells below and interact with "collection" - # to explore your data. + This notebook was auto-generated by a DSI Backend for SQLite. + Due to the possibility of several tables stored in the DSI abstraction (OrderedDict), the data is stored as several dataframes in a list + Execute the Jupyter notebook cells below and interact with table_list to explore your data. + """ + code1 = """\ + import pandas as pd + import sqlite3 + """ + code2 = f"""\ + dbPath = '{self.filename}' + conn = sqlite3.connect(dbPath) + tables = pd.read_sql_query('SELECT name FROM sqlite_master WHERE type="table";', conn) + dsi_units = {dsi_units} + dsi_relations = {dsi_relations} """ + code3 = """\ + table_list = [] + for table_name in tables['name']: + if table_name not in ['dsi_relations', 'dsi_units', 'sqlite_sequence']: + query = 'SELECT * FROM ' + table_name + df = pd.read_sql_query(query, conn) + df.attrs['name'] = table_name + if table_name in dsi_units: + df.attrs['units'] = dsi_units[table_name] + table_list.append(df) + + df = pd.DataFrame(dsi_relations) + df.attrs['name'] = 'dsi_relations' + table_list.append(df) + """ + code4 = """\ + for table_df in table_list: + print(table_df.attrs) + print(table_df) + # table_df.info() + # table_df.describe() + """ + + nb['cells'] = [nbf.v4.new_markdown_cell(text), + nbf.v4.new_code_cell(textwrap.dedent(code1)), + nbf.v4.new_code_cell(textwrap.dedent(code2)), + nbf.v4.new_code_cell(textwrap.dedent(code3)), + nbf.v4.new_code_cell(textwrap.dedent(code4))] + + fname = 'dsi_sqlite_backend_output.ipynb' + print('Writing Jupyter notebook...') + with open(fname, 'w') as fh: + nbf.write(nb, fh) + + # open the jupyter notebook for static page generation + with open(fname, 'r', encoding='utf-8') as fh: + nb_content = nbf.read(fh, as_version=4) + run_nb = nbc.preprocessors.ExecutePreprocessor(timeout=-1) # No timeout + run_nb.preprocess(nb_content, {'metadata':{'path':'.'}}) + + if interactive: + print('Opening Jupyter notebook...') + + proc = subprocess.run(['jupyter-lab ./dsi_sqlite_backend_output.ipynb'], capture_output=True, shell=True) + if proc.stderr != b"": + raise Exception(proc.stderr) + return proc.stdout.strip().decode("utf-8") + else: + # Init HTML exporter + html_exporter = nbc.HTMLExporter() + html_content,_ = html_exporter.from_notebook_node(nb_content) + # Save HTML file + html_filename = 'dsi_sqlite_backend_output.html' + with open(html_filename, 'w', encoding='utf-8') as fh: + fh.write(html_content) # Closes connection to server def close(self): @@ -507,14 +623,16 @@ def export_csv(self, rquery, tname, fname, isVerbose=False): # Sqlite reader class def read_to_artifact(self): artifact = OrderedDict() + artifact["dsi_relations"] = OrderedDict([("primary_key",[]), ("foreign_key", [])]) tableList = self.cur.execute("SELECT name FROM sqlite_master WHERE type ='table';").fetchall() - - artifact["dsi_relations"] = OrderedDict([("primary_key",[]), ("foreign_key", [])]) pkList = [] - for item in tableList: tableName = item[0] + if tableName == "dsi_units": + artifact["dsi_units"] = self.read_units_helper() + continue + tableInfo = self.cur.execute(f"PRAGMA table_info({tableName});").fetchall() colDict = OrderedDict() for colInfo in tableInfo: @@ -540,7 +658,17 @@ def read_to_artifact(self): artifact["dsi_relations"]["primary_key"].append(pk_tuple) artifact["dsi_relations"]["foreign_key"].append(("NULL", "NULL")) return artifact - + + def read_units_helper(self): + unitsDict = OrderedDict() + unitsTable = self.cur.execute("SELECT * FROM dsi_units;").fetchall() + for row in unitsTable: + tableName = row[0] + if tableName not in unitsDict.keys(): + unitsDict[tableName] = [] + unitsDict[tableName].append(eval(row[1])) + return unitsDict + '''UNUSED QUERY FUNCTIONS''' # Query file name @@ -618,165 +746,4 @@ def read_to_artifact(self): # if isVerbose: # print(resout) - # return resout - - '''YAML AND TOML READERS''' - - # def yamlDataToList(self, filenames): - # """ - # Function that reads a YAML file or files into a list - # """ - - # yamlData = [] - # for filename in filenames: - # with open(filename, 'r') as yaml_file: - # editedString = yaml_file.read() - # editedString = re.sub('specification', r'columns:\n specification', editedString) - # editedString = re.sub(r'(!.+)\n', r"'\1'\n", editedString) - # yml_data = yaml.safe_load_all(editedString) - - # for table in yml_data: - # yamlData.append(table) - - # return yamlData - - # def yamlToSqlite(self, filenames, db_name, deleteSql=True): - # """ - # Function that ingests a YAML file into a sqlite database based on the given database name - - # `filenames`: name of YAML file or a list of YAML files to be ingested - - # `db_name`: name of database that YAML file should be added to. Database will be created if it does not exist in local directory. - - # `deleteSql`: flag to delete temp SQL file that creates the database. Default is True, but change to False for testing or comparing outputs - # """ - - # sql_statements = [] - # if isinstance(filenames, str): - # filenames = [filenames] - - # with open(db_name+".sql", "w") as sql_file: - # yml_list = self.yamlDataToList(filenames) - # for table in yml_list: - # tableName = table["segment"] - - # data_types = {float: "FLOAT", str: "VARCHAR", int: "INT"} - # if not os.path.isfile(db_name+".db") or os.path.getsize(db_name+".db") == 0: - # createStmt = f"CREATE TABLE IF NOT EXISTS {tableName} ( " - # createUnitStmt = f"CREATE TABLE IF NOT EXISTS {tableName}_units ( " - # insertUnitStmt = f"INSERT INTO {tableName}_units VALUES( " - - # for key, val in table['columns'].items(): - # createUnitStmt+= f"{key} VARCHAR, " - # if data_types[type(val)] == "VARCHAR" and self.check_type(val[:val.find(" ")]) in [" INT", " FLOAT"]: - # createStmt += f"{key}{self.check_type(val[:val.find(' ')])}, " - # insertUnitStmt+= f"'{val[val.find(' ')+1:]}', " - # else: - # createStmt += f"{key} {data_types[type(val)]}, " - # insertUnitStmt+= "NULL, " - - # if createStmt not in sql_statements: - # sql_statements.append(createStmt) - # sql_file.write(createStmt[:-2] + ");\n\n") - # if createUnitStmt not in sql_statements: - # sql_statements.append(createUnitStmt) - # sql_file.write(createUnitStmt[:-2] + ");\n\n") - # if insertUnitStmt not in sql_statements: - # sql_statements.append(insertUnitStmt) - # sql_file.write(insertUnitStmt[:-2] + ");\n\n") - - # insertStmt = f"INSERT INTO {tableName} VALUES( " - # for val in table['columns'].values(): - # if data_types[type(val)] == "VARCHAR" and self.check_type(val[:val.find(" ")]) in [" INT", " FLOAT"]: - # insertStmt+= f"{val[:val.find(' ')]}, " - # elif data_types[type(val)] == "VARCHAR": - # insertStmt+= f"'{val}', " - # else: - # insertStmt+= f"{val}, " - - # if insertStmt not in sql_statements: - # sql_statements.append(insertStmt) - # sql_file.write(insertStmt[:-2] + ");\n\n") - - # subprocess.run(["sqlite3", db_name+".db"], stdin= open(db_name+".sql", "r")) - - # if deleteSql == True: - # os.remove(db_name+".sql") - - # def tomlDataToList(self, filenames): - # """ - # Function that reads a TOML file or files into a list - # """ - - # toml_data = [] - # for filename in filenames: - # with open(filename, 'r') as toml_file: - # data = toml.load(toml_file) - # for tableName, tableData in data.items(): - # toml_data.append([tableName, tableData]) - - # return toml_data - - # def tomlToSqlite(self, filenames, db_name, deleteSql=True): - # """ - # Function that ingests a TOML file into a sqlite database based on the given database name - - # `filenames`: name of TOML file or a list of TOML files to be ingested - - # `db_name`: name of database that TOML file should be added to. Database will be created if it does not exist in local directory. - - # `deleteSql`: flag to delete temp SQL file that creates the database. Default is True, but change to False for testing or comparing outputs - # """ - - # sql_statements = [] - # if isinstance(filenames, str): - # filenames = [filenames] - - # with open(db_name+".sql", "w") as sql_file: - # data = self.tomlDataToList(filenames) - - # for item in data: - # tableName, tableData = item - # data_types = {float: "FLOAT", str: "VARCHAR", int: "INT"} - - # if not os.path.isfile(db_name+".db") or os.path.getsize(db_name+".db") == 0: - # createStmt = f"CREATE TABLE IF NOT EXISTS {tableName} ( " - # createUnitStmt = f"CREATE TABLE IF NOT EXISTS {tableName}_units ( " - # insertUnitStmt = f"INSERT INTO {tableName}_units VALUES( " - - # for key, val in tableData.items(): - # createUnitStmt+= f"{key} VARCHAR, " - # if type(val) == list and type(val[0]) == str and self.check_type(val[0]) in [" INT", " FLOAT"]: - # createStmt += f"{key}{self.check_type(val[0])}, " - # insertUnitStmt+= f"'{val[1]}', " - # else: - # createStmt += f"{key} {data_types[type(val)]}, " - # insertUnitStmt+= "NULL, " - - # if createStmt not in sql_statements: - # sql_statements.append(createStmt) - # sql_file.write(createStmt[:-2] + ");\n\n") - # if createUnitStmt not in sql_statements: - # sql_statements.append(createUnitStmt) - # sql_file.write(createUnitStmt[:-2] + ");\n\n") - # if insertUnitStmt not in sql_statements: - # sql_statements.append(insertUnitStmt) - # sql_file.write(insertUnitStmt[:-2] + ");\n\n") - - # insertStmt = f"INSERT INTO {tableName} VALUES( " - # for val in tableData.values(): - # if type(val) == list and type(val[0]) == str and self.check_type(val[0]) in [" INT", " FLOAT"]: - # insertStmt+= f"{val[0]}, " - # elif type(val) == str: - # insertStmt+= f"'{val}', " - # else: - # insertStmt+= f"{val}, " - - # if insertStmt not in sql_statements: - # sql_statements.append(insertStmt) - # sql_file.write(insertStmt[:-2] + ");\n\n") - - # subprocess.run(["sqlite3", db_name+".db"], stdin= open(db_name+".sql", "r")) - - # if deleteSql == True: - # os.remove(db_name+".sql") + # return resout \ No newline at end of file diff --git a/dsi/core.py b/dsi/core.py index 2a1424b5..6718f0b1 100644 --- a/dsi/core.py +++ b/dsi/core.py @@ -24,14 +24,14 @@ class Terminal(): BACKEND_IMPLEMENTATIONS = ['gufi', 'sqlite', 'parquet'] PLUGIN_PREFIX = ['dsi.plugins'] PLUGIN_IMPLEMENTATIONS = ['env', 'file_reader', 'file_writer'] - VALID_PLUGINS = ['Hostname', 'SystemKernel', 'GitInfo', 'Bueno', 'Csv', 'ER_Diagram', 'YAML', 'TOML', "Table_Plot", "Schema"] + VALID_PLUGINS = ['Hostname', 'SystemKernel', 'GitInfo', 'Bueno', 'Csv', 'ER_Diagram', 'YAML1', 'TOML1', "Table_Plot", "Schema"] VALID_BACKENDS = ['Gufi', 'Sqlite', 'Parquet'] VALID_MODULES = VALID_PLUGINS + VALID_BACKENDS VALID_MODULE_FUNCTIONS = {'plugin': ['writer', 'reader'], 'backend': ['back-read', 'back-write']} VALID_ARTIFACT_INTERACTION_TYPES = ['get', 'set', 'put', 'inspect', 'read'] - def __init__(self, debug_flag = False): + def __init__(self, debug_flag = False, backup_db_flag = False): # Helper function to get parent module names. def static_munge(prefix, implementations): return (['.'.join(i) for i in product(prefix, implementations)]) @@ -57,12 +57,14 @@ def static_munge(prefix, implementations): self.active_metadata = OrderedDict() self.transload_lock = False + self.backup_db_flag = backup_db_flag + self.logger = logging.getLogger(self.__class__.__name__) if debug_flag: logging.basicConfig( filename='logger.txt', # Name of the log file - filemode='a', # Append mode ('w' for overwrite) + filemode='w', # Overwrite mode ('a' for append) format='%(asctime)s - %(levelname)s - %(message)s', # Log message format level=logging.INFO # Minimum log level to capture ) @@ -194,7 +196,6 @@ def transload(self, **kwargs): """ selected_function_modules = dict( (k, self.active_modules[k]) for k in ('reader', 'writer')) - # Note this transload supports plugin.env Environment types now. for module_type, objs in selected_function_modules.items(): for obj in objs: self.logger.info(f"-------------------------------------") @@ -207,9 +208,11 @@ def transload(self, **kwargs): self.active_metadata[table_name] = table_metadata else: for colName, colData in table_metadata.items(): - if colName in self.active_metadata[table_name].keys(): + if colName in self.active_metadata[table_name].keys() and table_name != "dsi_units": self.active_metadata[table_name][colName] += colData - else: + elif colName not in self.active_metadata[table_name].keys() and table_name == "dsi_units": + self.active_metadata[table_name][colName] = colData + elif colName not in self.active_metadata[table_name].keys() and table_name != "dsi_units": raise ValueError(f"Mismatched column input for table {table_name}") end = datetime.now() self.logger.info(f"Runtime: {end-start}") @@ -218,23 +221,6 @@ def transload(self, **kwargs): obj.get_rows(self.active_metadata, **kwargs) end = datetime.now() self.logger.info(f"Runtime: {end-start}") - # Plugins may add one or more rows (vector vs matrix data). - # You may have two or more plugins with different numbers of rows. - # Consequently, transload operations may have unstructured shape for - # some plugin configurations. We must force structure to create a valid - # middleware data structure. - # To resolve, we pad the shorter columns to match the max length column. - #COMMENTED OUT TILL LATER - ''' - max_len = max([len(col) for col in self.active_metadata.values()]) - for colname, coldata in self.active_metadata.items(): - if len(coldata) != max_len: - self.active_metadata[colname].extend( # add None's until reaching max_len - [None] * (max_len - len(coldata))) - - assert all([len(col) == max_len for col in self.active_metadata.values( - )]), "All columns must have the same number of rows" - ''' self.transload_lock = True @@ -247,22 +233,25 @@ def artifact_handler(self, interaction_type, query = None, **kwargs): the provided ``interaction_type``. """ if interaction_type not in self.VALID_ARTIFACT_INTERACTION_TYPES: - print( - 'Hint: Did you declare your artifact interaction type in the Terminal Global vars?') + print('Hint: Did you declare your artifact interaction type in the Terminal Global vars?') raise NotImplementedError operation_success = False # Perform artifact movement first, because inspect implementation may rely on # self.active_metadata or some stored artifact. - selected_function_modules = dict( - (k, self.active_modules[k]) for k in ('back-read', 'back-write')) - for module_type, objs in selected_function_modules.items(): + selected_active_backends = dict((k, self.active_modules[k]) for k in (['back-write', 'back-read'])) + for module_type, objs in selected_active_backends.items(): for obj in objs: self.logger.info(f"-------------------------------------") self.logger.info(obj.__class__.__name__ + f" backend - {interaction_type} the data") start = datetime.now() - if interaction_type == 'put' or interaction_type == 'set': - obj.put_artifacts( + if (interaction_type == 'put' or interaction_type == 'set') and module_type == 'back-write': + if self.backup_db_flag == True and os.path.getsize(obj.filename) > 100: + backup_file = obj.filename[:obj.filename.rfind('.')] + "_backup" + obj.filename[obj.filename.rfind('.'):] + shutil.copyfile(obj.filename, backup_file) + errorMessage = obj.put_artifacts( collection=self.active_metadata, **kwargs) + if errorMessage is not None: + print(f"No data was inserted into {obj.filename} due to the error: {errorMessage}") operation_success = True elif interaction_type == 'get': self.logger.info(f"Query to get data: {query}") @@ -274,12 +263,15 @@ def artifact_handler(self, interaction_type, query = None, **kwargs): self.active_metadata = obj.get_artifacts(**kwargs) operation_success = True elif interaction_type == 'inspect': - obj.put_artifacts( - collection=self.active_metadata, **kwargs) - self.active_metadata = obj.inspect_artifacts( - collection=self.active_metadata, **kwargs) + if module_type == 'back-write': + errorMessage = obj.put_artifacts( + collection=self.active_metadata, **kwargs) + if errorMessage is not None: + print("Error in ingesting data to db in inspect artifact. Generating Jupyter notebook with previous instance of db") + obj.inspect_artifacts( + collection=self.active_metadata, **kwargs) operation_success = True - elif interaction_type == "read": + elif interaction_type == "read" and module_type == 'back-read': self.active_metadata = obj.read_to_artifact() operation_success = True end = datetime.now() @@ -289,9 +281,8 @@ def artifact_handler(self, interaction_type, query = None, **kwargs): return self.active_metadata return else: - print( - 'Hint: Did you implement a case for your artifact interaction in the \ - artifact_handler loop?') + print('Is your artifact interaction spelled correct and is it implemented in your backend?') + print('Remember that backend writers cannot read a db and backend readers cannot write to a db') raise NotImplementedError class Sync(): @@ -456,4 +447,4 @@ def get(project_name = "Project"): DSI database ''' True - \ No newline at end of file + diff --git a/dsi/plugins/file_reader.py b/dsi/plugins/file_reader.py index c1606f33..ea8e0ce5 100644 --- a/dsi/plugins/file_reader.py +++ b/dsi/plugins/file_reader.py @@ -42,59 +42,84 @@ class Csv(FileReader): # Default value is False. strict_mode = False - def __init__(self, filenames, **kwargs): + def __init__(self, filenames, db_name = None, **kwargs): super().__init__(filenames, **kwargs) - self.csv_data = {} + self.csv_data = OrderedDict() + if isinstance(filenames, str): + self.filenames = [filenames] + else: + self.filenames = filenames + self.db_name = db_name - def pack_header(self) -> None: - """ Set schema based on the CSV columns """ + # def pack_header(self) -> None: + # """ Set schema based on the CSV columns """ - column_names = list(self.file_info.keys()) + list(self.csv_data.keys()) - self.set_schema(column_names) + # column_names = list(self.file_info.keys()) + list(self.csv_data.keys()) + # self.set_schema(column_names) def add_rows(self) -> None: """ Adds a list containing one or more rows of the CSV along with file_info to output. """ - if not self.schema_is_set(): - # use Pandas to append all CSVs together as a - # dataframe, then convert to dict - if self.strict_mode: - total_df = DataFrame() - dfs = [] - for filename in self.filenames: - # Initial case. Empty df collection. - if total_df.empty: - total_df = read_csv(filename) - dfs.append(total_df) - else: # One or more dfs in collection - temp_df = read_csv(filename) - # raise exception if schemas do not match - if any([set(temp_df.columns) != set(df.columns) for df in dfs]): - print('Error: Strict schema mode is on. Schemas do not match.') - raise TypeError - dfs.append(temp_df) - total_df = concat([total_df, temp_df]) - - # Reminder: Schema is not set in this block. - else: # self.strict_mode == False - total_df = DataFrame() - for filename in self.filenames: - temp_df = read_csv(filename) - total_df = concat([total_df, temp_df]) - - # Columns are present in the middleware already (schema_is_set==True). - # TODO: Can this go under the else block at line #79? - self.csv_data = total_df.to_dict('list') - for col, coldata in self.csv_data.items(): # replace NaNs with None - self.csv_data[col] = [None if type(item) == float and isnan(item) else item - for item in coldata] - self.pack_header() - - total_length = len(self.csv_data[list(self.csv_data.keys())[0]]) - for row_idx in range(total_length): - row = [self.csv_data[k][row_idx] for k in self.csv_data.keys()] - row_w_fileinfo = list(self.file_info.values()) + row - self.add_to_output(row_w_fileinfo) + total_df = DataFrame() + for filename in self.filenames: + temp_df = read_csv(filename) + try: + total_df = concat([total_df, temp_df]) + except: + raise ValueError(f"Error in adding {filename} to the existing csv data. Please recheck column names and data structure") + + #convert total_df to ordered dict + table_data = OrderedDict(total_df.to_dict(orient='list')) + for col, coldata in table_data.items(): # replace NaNs with None + table_data[col] = [None if type(item) == float and isnan(item) else item for item in coldata] + + if self.db_name is not None: + self.csv_data[self.db_name] = table_data + else: + self.csv_data = table_data + + self.set_schema_2(self.csv_data) + + # if not self.schema_is_set(): + # # use Pandas to append all CSVs together as a + # # dataframe, then convert to dict + # if self.strict_mode: + # total_df = DataFrame() + # dfs = [] + # for filename in self.filenames: + # # Initial case. Empty df collection. + # if total_df.empty: + # total_df = read_csv(filename) + # dfs.append(total_df) + # else: # One or more dfs in collection + # temp_df = read_csv(filename) + # # raise exception if schemas do not match + # if any([set(temp_df.columns) != set(df.columns) for df in dfs]): + # print('Error: Strict schema mode is on. Schemas do not match.') + # raise TypeError + # dfs.append(temp_df) + # total_df = concat([total_df, temp_df]) + + # # Reminder: Schema is not set in this block. + # else: # self.strict_mode == False + # total_df = DataFrame() + # for filename in self.filenames: + # temp_df = read_csv(filename) + # total_df = concat([total_df, temp_df]) + + # # Columns are present in the middleware already (schema_is_set==True). + # # TODO: Can this go under the else block at line #79? + # self.csv_data = total_df.to_dict('list') + # for col, coldata in self.csv_data.items(): # replace NaNs with None + # self.csv_data[col] = [None if type(item) == float and isnan(item) else item + # for item in coldata] + # self.pack_header() + + # total_length = len(self.csv_data[list(self.csv_data.keys())[0]]) + # for row_idx in range(total_length): + # row = [self.csv_data[k][row_idx] for k in self.csv_data.keys()] + # row_w_fileinfo = list(self.file_info.values()) + row + # self.add_to_output(row_w_fileinfo) class Bueno(FileReader): @@ -199,33 +224,56 @@ def pack_header(self) -> None: table_info.append((table_name, list(self.schema_data[table_name].keys()))) self.set_schema(table_info) - def add_rows(self) -> None: - if not self.schema_is_set(): - self.schema_data["dsi_relations"] = OrderedDict([('primary_key', []), ('foreign_key', [])]) - self.pack_header() - + def add_rows(self) -> None: + self.schema_data["dsi_relations"] = OrderedDict([('primary_key', []), ('foreign_key', [])]) with open(self.schema_file, 'r') as fh: schema_content = json.load(fh) for tableName, tableData in schema_content.items(): if self.target_table_prefix is not None: tableName = self.target_table_prefix + "__" + tableName - if tableData["primary_key"] != "NULL": - self.schema_data["dsi_relations"]["primary_key"].append((tableName, tableData["primary_key"])) - self.schema_data["dsi_relations"]["foreign_key"].append(("NULL", "NULL")) - self.add_to_output([(tableName, tableData["primary_key"]), ("NULL", "NULL")], "dsi_relations") - + + pkList = [] for colName, colData in tableData["foreign_key"].items(): if self.target_table_prefix is not None: colData[0] = self.target_table_prefix + "__" + colData[0] self.schema_data["dsi_relations"]["primary_key"].append((colData[0], colData[1])) self.schema_data["dsi_relations"]["foreign_key"].append((tableName, colName)) - self.add_to_output([(colData[0], colData[1]), (tableName, colName)], "dsi_relations") -class YAML(FileReader): + if "primary_key" in tableData.keys(): + pkList.append((tableName, tableData["primary_key"])) + + for pk in pkList: + if pk not in self.schema_data["dsi_relations"]["primary_key"]: + self.schema_data["dsi_relations"]["primary_key"].append(pk) + self.schema_data["dsi_relations"]["foreign_key"].append(("NULL", "NULL")) + self.set_schema_2(self.schema_data) + + # if not self.schema_is_set(): + # self.schema_data["dsi_relations"] = OrderedDict([('primary_key', []), ('foreign_key', [])]) + # self.pack_header() + + # with open(self.schema_file, 'r') as fh: + # schema_content = json.load(fh) + + # for tableName, tableData in schema_content.items(): + # if self.target_table_prefix is not None: + # tableName = self.target_table_prefix + "__" + tableName + # if tableData["primary_key"] != "NULL": + # self.schema_data["dsi_relations"]["primary_key"].append((tableName, tableData["primary_key"])) + # self.schema_data["dsi_relations"]["foreign_key"].append(("NULL", "NULL")) + # self.add_to_output([(tableName, tableData["primary_key"]), ("NULL", "NULL")], "dsi_relations") + + # for colName, colData in tableData["foreign_key"].items(): + # if self.target_table_prefix is not None: + # colData[0] = self.target_table_prefix + "__" + colData[0] + # self.schema_data["dsi_relations"]["primary_key"].append((colData[0], colData[1])) + # self.schema_data["dsi_relations"]["foreign_key"].append((tableName, colName)) + # self.add_to_output([(colData[0], colData[1]), (tableName, colName)], "dsi_relations") + +class YAML1(FileReader): ''' Plugin to read in an individual or a set of YAML files - Table names are the keys for the main ordered dictionary and column names are the keys for each table's nested ordered dictionary ''' def __init__(self, filenames, target_table_prefix = None, yamlSpace = ' ', **kwargs): @@ -243,12 +291,12 @@ def __init__(self, filenames, target_table_prefix = None, yamlSpace = ' ', **kw self.yaml_data = OrderedDict() self.target_table_prefix = target_table_prefix - def pack_header(self) -> None: - """Set schema with YAML data.""" - table_info = [] - for table_name in list(self.yaml_data.keys()): - table_info.append((table_name, list(self.yaml_data[table_name].keys()))) - self.set_schema(table_info) + # def pack_header(self) -> None: + # """Set schema with YAML data.""" + # table_info = [] + # for table_name in list(self.yaml_data.keys()): + # table_info.append((table_name, list(self.yaml_data[table_name].keys()))) + # self.set_schema(table_info) def check_type(self, text): """ @@ -276,41 +324,67 @@ def add_rows(self) -> None: editedString = re.sub('specification', f'columns:\n{self.yamlSpace}specification', editedString) editedString = re.sub(r'(!.+)\n', r"'\1'\n", editedString) yaml_load_data = list(yaml.safe_load_all(editedString)) - - if not self.schema_is_set(): - for table in yaml_load_data: - tableName = table["segment"] - if self.target_table_prefix is not None: - tableName = self.target_table_prefix + "__" + table["segment"] - self.yaml_data[tableName] = OrderedDict((key, []) for key in table["columns"].keys()) - self.yaml_data[tableName + "_units"] = OrderedDict((key, []) for key in table["columns"].keys()) - self.yaml_data["dsi_relations"] = OrderedDict([('primary_key', []), ('foreign_key', [])]) - self.pack_header() + if "dsi_units" not in self.yaml_data.keys(): + self.yaml_data["dsi_units"] = OrderedDict() for table in yaml_load_data: - row = [] - unit_row = [] tableName = table["segment"] if self.target_table_prefix is not None: tableName = self.target_table_prefix + "__" + table["segment"] + if tableName not in self.yaml_data.keys(): + self.yaml_data[tableName] = OrderedDict() + unitsList = [] for col_name, data in table["columns"].items(): unit_data = "NULL" if isinstance(data, str) and not isinstance(self.check_type(data[:data.find(" ")]), str): unit_data = data[data.find(' ')+1:] data = self.check_type(data[:data.find(" ")]) + if col_name not in self.yaml_data[tableName].keys(): + self.yaml_data[tableName][col_name] = [] self.yaml_data[tableName][col_name].append(data) - if len(self.yaml_data[tableName + "_units"][col_name]) < 1: - unit_row.append(unit_data) - self.yaml_data[tableName + "_units"][col_name].append(unit_data) - row.append(data) - self.add_to_output(row, tableName) - if len(next(iter(self.output_collector[tableName + "_units"].values()))) < 1: - self.add_to_output(unit_row, tableName + "_units") - -class TOML(FileReader): + if unit_data != "NULL" and (col_name, unit_data) not in unitsList: + unitsList.append((col_name, unit_data)) + if len(unitsList) > 0 and tableName not in self.yaml_data["dsi_units"].keys(): + self.yaml_data["dsi_units"][tableName] = unitsList + + self.set_schema_2(self.yaml_data) + + # if not self.schema_is_set(): + # self.yaml_data["dsi_units"] = OrderedDict() + # for table in yaml_load_data: + # tableName = table["segment"] + # if self.target_table_prefix is not None: + # tableName = self.target_table_prefix + "__" + table["segment"] + # self.yaml_data[tableName] = OrderedDict((key, []) for key in table["columns"].keys()) + # self.yaml_data["dsi_units"][tableName] = [] + # # self.yaml_data["dsi_relations"] = OrderedDict([('primary_key', []), ('foreign_key', [])]) + # self.pack_header() + + # unit_row = [] + # for table in yaml_load_data: + # row = [] + # table_unit_row = [] + # tableName = table["segment"] + # if self.target_table_prefix is not None: + # tableName = self.target_table_prefix + "__" + table["segment"] + # for col_name, data in table["columns"].items(): + # unit_data = "NULL" + # if isinstance(data, str) and not isinstance(self.check_type(data[:data.find(" ")]), str): + # unit_data = data[data.find(' ')+1:] + # data = self.check_type(data[:data.find(" ")]) + # self.yaml_data[tableName][col_name].append(data) + # if (col_name, unit_data) not in self.yaml_data["dsi_units"][tableName]: + # table_unit_row.append((col_name, unit_data)) + # self.yaml_data["dsi_units"][tableName].append((col_name, unit_data)) + # row.append(data) + # self.add_to_output(row, tableName) + # unit_row.append(table_unit_row) + # if len(next(iter(self.output_collector["dsi_units"].values()))) < 1: + # self.add_to_output(unit_row, "dsi_units") + +class TOML1(FileReader): ''' Plugin to read in an individual or a set of TOML files - Table names are the keys for the main ordered dictionary and column names are the keys for each table's nested ordered dictionary ''' def __init__(self, filenames, target_table_prefix = None, **kwargs): @@ -326,12 +400,12 @@ def __init__(self, filenames, target_table_prefix = None, **kwargs): self.toml_data = OrderedDict() self.target_table_prefix = target_table_prefix - def pack_header(self) -> None: - """Set schema with TOML data.""" - table_info = [] - for table_name in list(self.toml_data.keys()): - table_info.append((table_name, list(self.toml_data[table_name].keys()))) - self.set_schema(table_info) + # def pack_header(self) -> None: + # """Set schema with TOML data.""" + # table_info = [] + # for table_name in list(self.toml_data.keys()): + # table_info.append((table_name, list(self.toml_data[table_name].keys()))) + # self.set_schema(table_info) def add_rows(self) -> None: """ @@ -350,20 +424,14 @@ def add_rows(self) -> None: with open(filename, 'rb') as toml_file: toml_load_data = tomllib.load(toml_file) - if not self.schema_is_set(): - for tableName, tableData in toml_load_data.items(): - if self.target_table_prefix is not None: - tableName = self.target_table_prefix + "__" + tableName - self.toml_data[tableName] = OrderedDict((key, []) for key in tableData.keys()) - self.toml_data[tableName + "_units"] = OrderedDict((key, []) for key in tableData.keys()) - self.toml_data["dsi_relations"] = OrderedDict([('primary_key', []), ('foreign_key', [])]) - self.pack_header() - + if "dsi_units" not in self.toml_data.keys(): + self.toml_data["dsi_units"] = OrderedDict() for tableName, tableData in toml_load_data.items(): - row = [] - unit_row = [] if self.target_table_prefix is not None: tableName = self.target_table_prefix + "__" + tableName + if tableName not in self.toml_data.keys(): + self.toml_data[tableName] = OrderedDict() + unitsList = [] for col_name, data in tableData.items(): unit_data = "NULL" if isinstance(data, dict): @@ -374,11 +442,77 @@ def add_rows(self) -> None: # data = ast.literal_eval(data) # unit_data = data["units"] # data = data["value"] + if col_name not in self.toml_data[tableName].keys(): + self.toml_data[tableName][col_name] = [] self.toml_data[tableName][col_name].append(data) - if len(self.toml_data[tableName + "_units"][col_name]) < 1: - unit_row.append(unit_data) - self.toml_data[tableName + "_units"][col_name].append(unit_data) - row.append(data) - self.add_to_output(row, tableName) - if len(next(iter(self.output_collector[tableName + "_units"].values()))) < 1: - self.add_to_output(unit_row, tableName + "_units") + if unit_data != "NULL" and (col_name, unit_data) not in unitsList: + unitsList.append((col_name, unit_data)) + if len(unitsList) > 0 and tableName not in self.toml_data["dsi_units"].keys(): + self.toml_data["dsi_units"][tableName] = unitsList + + self.set_schema_2(self.toml_data) + + # if not self.schema_is_set(): + # for tableName, tableData in toml_load_data.items(): + # if self.target_table_prefix is not None: + # tableName = self.target_table_prefix + "__" + tableName + # self.toml_data[tableName] = OrderedDict((key, []) for key in tableData.keys()) + # self.toml_data["dsi_units"] = OrderedDict([(tableName,[])]) + # # self.toml_data["dsi_relations"] = OrderedDict([('primary_key', []), ('foreign_key', [])]) + # self.pack_header() + + # unit_row = [] + # for tableName, tableData in toml_load_data.items(): + # row = [] + # table_unit_row = [] + # if self.target_table_prefix is not None: + # tableName = self.target_table_prefix + "__" + tableName + # for col_name, data in tableData.items(): + # unit_data = "NULL" + # if isinstance(data, dict): + # unit_data = data["units"] + # data = data["value"] + # # IF statement for manual data parsing for python 3.10 and below + # # if isinstance(data, str) and data[0] == "{" and data[-1] == "}": + # # data = ast.literal_eval(data) + # # unit_data = data["units"] + # # data = data["value"] + # self.toml_data[tableName][col_name].append(data) + # if (col_name, unit_data) not in self.toml_data["dsi_units"][tableName]: + # table_unit_row.append((col_name, unit_data)) + # self.toml_data["dsi_units"][tableName].append((col_name, unit_data)) + # row.append(data) + # self.add_to_output(row, tableName) + # unit_row.append(table_unit_row) + # if len(next(iter(self.output_collector["dsi_units"].values()))) < 1: + # self.add_to_output(unit_row, "dsi_units") + +class TextFile(FileReader): + ''' + Plugin to read in an individual or a set of text files + Table names are the keys for the main ordered dictionary and column names are the keys for each table's nested ordered dictionary + ''' + def __init__(self, filenames, target_table_prefix = None, **kwargs): + ''' + `filenames`: one text file or a list of text files to be ingested + `target_table_prefix`: prefix to be added to every table created to differentiate between other text file sources + ''' + super().__init__(filenames, **kwargs) + if isinstance(filenames, str): + self.text_files = [filenames] + else: + self.text_files = filenames + self.text_file_data = OrderedDict() + self.target_table_prefix = target_table_prefix + + def add_rows(self) -> None: + """ + Parses text file data and creates an ordered dict whose keys are table names and values are an ordered dict for each table. + """ + for filename in self.text_files: + df = read_csv(filename) + if self.target_table_prefix is not None: + self.text_file_data[f"{self.target_table_prefix}__text_file"] = OrderedDict(df.to_dict(orient='list')) + else: + self.text_file_data["text_file"] = OrderedDict(df.to_dict(orient='list')) + self.set_schema_2(self.text_file_data) \ No newline at end of file diff --git a/dsi/plugins/file_writer.py b/dsi/plugins/file_writer.py index 1804e3cf..70644b83 100644 --- a/dsi/plugins/file_writer.py +++ b/dsi/plugins/file_writer.py @@ -1,11 +1,4 @@ -from collections import OrderedDict -from os.path import abspath -from hashlib import sha1 -import json, csv -from math import isnan -import sqlite3 -import subprocess -import os +import csv from matplotlib import pyplot as plt from dsi.plugins.metadata import StructuredMetadata @@ -63,55 +56,81 @@ def get_rows(self, collection) -> None: if self.target_table_prefix is not None and not any(self.target_table_prefix in element for element in collection.keys()): raise ValueError("Your input for target_table_prefix does not exist in the database. Please enter a valid prefix for table names.") - - dot_file = open(self.output_filename + ".dot", "w") + + manual_dot = False + try: from graphviz import Digraph + except ModuleNotFoundError: + manual_dot = True + import subprocess + import os + + if manual_dot: + dot_file = open(self.output_filename + ".dot", "w") + dot_file.write("digraph workflow_schema { ") + if self.target_table_prefix is not None: + dot_file.write(f'label="ER Diagram for {self.target_table_prefix} tables"; labelloc="t"; ') + dot_file.write("node [shape=plaintext]; dpi=300 rankdir=LR splines=true overlap=false ") + else: + dot = Digraph('workflow_schema', format = file_type[1:]) + if self.target_table_prefix is not None: + dot.attr(label = f'ER Diagram for {self.target_table_prefix} tables', labelloc='t') + dot.attr('node', shape='plaintext') + dot.attr(dpi='300', rankdir='LR', splines='true', overlap='false') num_tbl_cols = 1 - dot_file.write("digraph workflow_schema { ") - if self.target_table_prefix is not None: - dot_file.write(f'label="ER Diagram for {self.target_table_prefix} tables"; ') - dot_file.write('labelloc="t"; ') - dot_file.write("node [shape=plaintext]; ") - dot_file.write("rankdir=LR ") - dot_file.write("splines=true ") - dot_file.write("overlap=false ") - for tableName, tableData in collection.items(): - if tableName == "dsi_relations" or (self.target_table_prefix is not None and self.target_table_prefix not in tableName): + if tableName == "dsi_relations" or tableName == "sqlite_sequence": continue - - dot_file.write(f"{tableName} [label=<") - + elif self.target_table_prefix is not None and self.target_table_prefix not in tableName: + continue + + html_table = "" + if manual_dot: + html_table = f"{tableName} [label=" + html_table += f"<
{tableName}
" + + col_list = tableData.keys() + if tableName == "dsi_units": + col_list = ["table_name", "column_and_unit"] curr_row = 0 inner_brace = 0 - for col_name in tableData.keys(): + for col_name in col_list: if curr_row % num_tbl_cols == 0: inner_brace = 1 - dot_file.write("") - - dot_file.write(f"") + html_table += "" + html_table += f"" curr_row += 1 if curr_row % num_tbl_cols == 0: inner_brace = 0 - dot_file.write("") - + html_table += "" + if inner_brace: - dot_file.write("") - dot_file.write("
{tableName}
{col_name}
{col_name}
>]; ") + html_table += "" + html_table += ">" + + if manual_dot: dot_file.write(html_table+"]; ") + else: dot.node(tableName, label = html_table) for f_table, f_col in collection["dsi_relations"]["foreign_key"]: if self.target_table_prefix is not None and self.target_table_prefix not in f_table: continue if f_table != "NULL": foreignIndex = collection["dsi_relations"]["foreign_key"].index((f_table, f_col)) - dot_file.write(f"{f_table}:{f_col} -> {collection['dsi_relations']['primary_key'][foreignIndex][0]}: {collection['dsi_relations']['primary_key'][foreignIndex][1]}; ") - - dot_file.write("}") - dot_file.close() + fk_string = f"{f_table}:{f_col}" + pk_string = f"{collection['dsi_relations']['primary_key'][foreignIndex][0]}:{collection['dsi_relations']['primary_key'][foreignIndex][1]}" + + if manual_dot: dot_file.write(f"{fk_string} -> {pk_string}; ") + else: dot.edge(fk_string, pk_string) + + if manual_dot: + dot_file.write("}") + dot_file.close() + subprocess.run(["dot", "-T", file_type[1:], "-o", self.output_filename + file_type, self.output_filename + ".dot"]) + os.remove(self.output_filename + ".dot") + else: + dot.render(self.output_filename, cleanup=True) - subprocess.run(["dot", "-T", file_type[1:], "-o", self.output_filename + file_type, self.output_filename + ".dot"]) - os.remove(self.output_filename + ".dot") - + #REALLLLLY OLD CODE # def export_erd(self, dbname, fname): # """ # Function that outputs a ER diagram for the given database. @@ -304,11 +323,16 @@ def get_rows(self, collection) -> None: numeric_cols = [] col_len = None for colName, colData in collection[self.table_name].items(): + if colName == "run_id": + continue if col_len == None: col_len = len(colData) if isinstance(colData[0], str) == False: - if self.table_name + "_units" in collection.keys() and collection[self.table_name + "_units"][colName][0] != "NULL": - numeric_cols.append((colName + f" ({collection[self.table_name + '_units'][colName][0]})", colData)) + unit_tuple = "NULL" + if "dsi_units" in collection.keys() and self.table_name in collection["dsi_units"].keys(): + unit_tuple = next((t[1] for t in collection["dsi_units"][self.table_name] if t[0] == colName), "NULL") + if unit_tuple != "NULL": + numeric_cols.append((colName + f" ({unit_tuple})", colData)) else: numeric_cols.append((colName, colData)) diff --git a/dsi/plugins/metadata.py b/dsi/plugins/metadata.py index 93858ecf..15fdfbfe 100644 --- a/dsi/plugins/metadata.py +++ b/dsi/plugins/metadata.py @@ -63,6 +63,17 @@ def set_schema(self, table_data: list, validation_model=None) -> None: if not self.strict_mode_lock: self.strict_mode_lock = True + def set_schema_2(self, collection, validation_model=None) -> None: + # Finds file_reader class that called set_schema and assigns that as table_name for this data + if not isinstance(collection[next(iter(collection))], OrderedDict): + caller_frame = inspect.stack()[1] + tableName = caller_frame.frame.f_locals.get('self', None).__class__.__name__ + self.output_collector[tableName] = collection + else: + self.output_collector = collection + self.table_cnt = len(collection.keys()) + self.validation_model = validation_model + def add_to_output(self, row: list, tableName = None) -> None: """ Adds a row of data to the output_collector and guarantees good structure. @@ -85,7 +96,10 @@ def add_to_output(self, row: list, tableName = None) -> None: raise RuntimeError(f"For {tableName}, incorrect number of values was given") for key, row_elem in zip(self.output_collector[tableName].keys(), row): - self.output_collector[tableName][key].append(row_elem) + if "dsi_units" != tableName: + self.output_collector[tableName][key].append(row_elem) + else: + self.output_collector[tableName][key] = row_elem def schema_is_set(self) -> bool: """ Helper method to see if the schema has been set """ diff --git a/dsi/plugins/tests/test_file_reader.py b/dsi/plugins/tests/test_file_reader.py index bb72478f..e722be72 100644 --- a/dsi/plugins/tests/test_file_reader.py +++ b/dsi/plugins/tests/test_file_reader.py @@ -56,8 +56,8 @@ def test_csv_plugin_adds_rows(): for key, val in plug.output_collector["Csv"].items(): assert len(val) == 4 - # 11 Csv cols + 1 inherited FileReader cols - assert len(plug.output_collector["Csv"].keys()) == 12 + # 11 Csv cols + assert len(plug.output_collector["Csv"].keys()) == 11 def test_csv_plugin_adds_rows_multiple_files(): path1 = '/'.join([get_git_root('.'), 'examples/data', 'wildfiredata.csv']) @@ -69,8 +69,8 @@ def test_csv_plugin_adds_rows_multiple_files(): for key, val in plug.output_collector["Csv"].items(): assert len(val) == 8 - # 13 Csv cols + 2 inherited FileReader cols - assert len(plug.output_collector["Csv"].keys()) == 15 + # 13 Csv cols + assert len(plug.output_collector["Csv"].keys()) == 13 def test_csv_plugin_adds_rows_multiple_files_strict_mode(): path1 = '/'.join([get_git_root('.'), 'examples/data', 'wildfiredata.csv']) @@ -93,4 +93,41 @@ def test_csv_plugin_leaves_active_metadata_wellformed(): columns = list(term.active_metadata["Csv"].values()) assert all([len(columns[0]) == len(col) - for col in columns]) # all same length \ No newline at end of file + for col in columns]) # all same length + +def test_yaml_reader(): + a=Terminal() + a.load_module('plugin', 'YAML1', 'reader', filenames=["examples/data/student_test1.yml", "examples/data/student_test2.yml"], target_table_prefix = "student") + a.transload() + + assert len(a.active_metadata.keys()) == 4 # 4 tables - math, address, physics, dsi_units + for name, tableData in a.active_metadata.items(): + assert isinstance(tableData, OrderedDict) + numRows = 2 + if name == "dsi_units": + continue + assert all(len(colData) == numRows for colData in tableData.values()) + +def test_toml_reader(): + a=Terminal() + a.load_module('plugin', 'TOML1', 'reader', filenames="examples/data/results.toml", target_table_prefix = "results") + a.transload() + + assert len(a.active_metadata.keys()) == 2 # 2 tables - people and dsi_units + for name, tableData in a.active_metadata.items(): + assert isinstance(tableData, OrderedDict) + if name == "dsi_units": + continue + numRows = 1 + assert all(len(colData) == numRows for colData in tableData.values()) + +def test_schema_reader(): + a=Terminal() + a.load_module('plugin', 'Schema', 'reader', filename="examples/data/example_schema.json" , target_table_prefix = "student") + a.transload() + + assert len(a.active_metadata.keys()) == 1 + assert "dsi_relations" in a.active_metadata.keys() + for tableData in a.active_metadata.values(): + assert isinstance(tableData, OrderedDict) + assert len(tableData["primary_key"]) == len(tableData["foreign_key"]) \ No newline at end of file diff --git a/dsi/plugins/tests/test_file_writer.py b/dsi/plugins/tests/test_file_writer.py index f5a7f1d6..3ca1cbf8 100644 --- a/dsi/plugins/tests/test_file_writer.py +++ b/dsi/plugins/tests/test_file_writer.py @@ -2,8 +2,11 @@ from collections import OrderedDict import git -import dsi.plugins.file_writer as wCSV +# import dsi.plugins.file_writer as wCSV from dsi.backends.sqlite import Sqlite +import cv2 +import numpy as np +import os def get_git_root(path): git_repo = git.Repo(path, search_parent_directories=True) @@ -14,4 +17,19 @@ def test_csv_plugin_type(): path = '/'.join([get_git_root('.'), 'examples/data', 'wildfiredata.sqlite_db']) back = Sqlite(filename=path) - #assert type(plug.output_collector) == OrderedDict \ No newline at end of file + #assert type(plug.output_collector) == OrderedDict + +def test_export_db_erd(): + a=Terminal(debug_flag=False) + a.load_module('plugin', 'Schema', 'reader', filename="examples/data/example_schema.json" , target_table_prefix = "student") + a.load_module('plugin', 'YAML1', 'reader', filenames=["examples/data/student_test1.yml", "examples/data/student_test2.yml"], target_table_prefix = "student") + a.load_module('plugin', 'TOML1', 'reader', filenames=["examples/data/results.toml"], target_table_prefix = "results") + a.load_module('plugin', 'ER_Diagram', 'writer', filename = 'erd_test_output.png') + a.transload() + + er_image = cv2.imread("erd_test_output.png") + assert er_image is not None #check if image generated at all + + pixel_mean = np.mean(er_image) + os.remove("erd_test_output.png") + assert pixel_mean != 255 #check if image is all white pixels (no diagram generated) diff --git a/dsi/tests/test_plugin.py b/dsi/tests/test_plugin.py deleted file mode 100644 index 5d3c7d0d..00000000 --- a/dsi/tests/test_plugin.py +++ /dev/null @@ -1,22 +0,0 @@ -from dsi.plugins import file_writer as fw -from dsi.core import Terminal - -import cv2 -import numpy as np -import subprocess -import os - -def test_export_db_erd(): - a=Terminal(debug_flag=False) - a.load_module('plugin', 'Schema', 'reader', filename="examples/data/example_schema.json" , target_table_prefix = "student") - a.load_module('plugin', 'YAML', 'reader', filenames=["examples/data/student_test1.yml", "examples/data/student_test2.yml"], target_table_prefix = "student") - a.load_module('plugin', 'TOML', 'reader', filenames=["examples/data/results.toml"], target_table_prefix = "results") - a.load_module('plugin', 'ER_Diagram', 'writer', filename = 'erd_test_output.png') - a.transload() - - er_image = cv2.imread("erd_test_output.png") - assert er_image is not None #check if image generated at all - - pixel_mean = np.mean(er_image) - os.remove("erd_test_output.png") - assert pixel_mean != 255 #check if image is all white pixels (no diagram generated) diff --git a/examples/coreterminal.py b/examples/coreterminal.py index f7e94417..392a3e00 100644 --- a/examples/coreterminal.py +++ b/examples/coreterminal.py @@ -3,29 +3,29 @@ '''This is an example workflow using core.py''' -a=Terminal(debug_flag=False) +a=Terminal(debug_flag=False, backup_db_flag=False) # a.load_module('plugin','Bueno','reader', filenames='data/bueno1.data') # a.load_module('plugin','Hostname','reader') -# a.load_module('plugin', 'Schema', 'reader', filename="data/example_schema.json" , target_table_prefix = "student") -# a.load_module('plugin', 'YAML', 'reader', filenames=["data/student_test1.yml", "data/student_test2.yml"]) -# a.load_module('plugin', 'TOML', 'reader', filenames=["data/results.toml"], target_table_prefix = "results") +# a.load_module('plugin', 'Schema', 'reader', filename="data/example_schema.json", target_table_prefix = "student") +# a.load_module('plugin', 'YAML1', 'reader', filenames=["data/student_test1.yml", "data/student_test2.yml"], target_table_prefix = "student") +# a.load_module('plugin', 'TOML1', 'reader', filenames=["data/results.toml"], target_table_prefix = "results") -# a.load_module('plugin', "Table_Plot", "writer", table_name = "schema__physics", filename = "schema__physics") -# a.load_module('plugin', 'ER_Diagram', 'writer', filename = 'er_diagram.pdf')#, target_table_prefix = "physics") +# # a.load_module('plugin', "Table_Plot", "writer", table_name = "schema__physics", filename = "schema__physics") +# # a.load_module('plugin', 'ER_Diagram', 'writer', filename = 'er_diagram.pdf')#, target_table_prefix = "physics") # a.transload() -a.load_module('backend','Sqlite','back-write', filename='data/data.db') -# a.load_module('backend','Parquet','back-write',filename='./data/bueno.pq') +# a.load_module('backend','Sqlite','back-write', filename='data/data.db') +# # a.load_module('backend','Parquet','back-write',filename='data/bueno.pq') -a.artifact_handler(interaction_type='put') -# data = a.artifact_handler(interaction_type='get', query = "SELECT * FROM sqlite_master WHERE type='table';")#, isVerbose = True) +# a.artifact_handler(interaction_type='put') +# # data = a.artifact_handler(interaction_type='get', query = "SELECT * FROM run_table;")#, isVerbose = True) +# a.artifact_handler(interaction_type="read") # print(data) # a.unload_module('backend', 'Sqlite', 'back-write') - # LIST MODULES # a.list_available_modules('plugin') # # ['GitInfo', 'Hostname', 'SystemKernel', 'Bueno', 'Csv'] @@ -50,7 +50,7 @@ #Example use 2 -# a.load_module('backend','Sqlite','back-read', filename='data/data.db') -# a.artifact_handler(interaction_type="read") -# a.load_module('plugin', 'ER_Diagram', 'writer', filename = 'er_diagram.png')#, target_table_prefix = "physics") -# a.transload() \ No newline at end of file +a.load_module('backend','Sqlite','back-read', filename='data/data.db') +a.artifact_handler(interaction_type="read") +a.load_module('plugin', 'ER_Diagram', 'writer', filename = 'er_diagram.png')#, target_table_prefix = "physics") +a.transload() \ No newline at end of file