From 717676018de2a09d455826425f3b1d8b3e2081a1 Mon Sep 17 00:00:00 2001 From: Vedant P Iyer Date: Thu, 7 Nov 2024 01:01:13 -0700 Subject: [PATCH 01/39] Moved erd test function to test_file_writer and deleted test_plugin.py/CI file --- .github/workflows/test_plugin.yml | 36 --------------------------- dsi/plugins/tests/test_file_writer.py | 22 ++++++++++++++-- dsi/tests/test_plugin.py | 22 ---------------- 3 files changed, 20 insertions(+), 60 deletions(-) delete mode 100644 .github/workflows/test_plugin.yml delete mode 100644 dsi/tests/test_plugin.py diff --git a/.github/workflows/test_plugin.yml b/.github/workflows/test_plugin.yml deleted file mode 100644 index 6d1b8e01..00000000 --- a/.github/workflows/test_plugin.yml +++ /dev/null @@ -1,36 +0,0 @@ -name: test_plugin.py test - -on: - push: - branches: - - main - pull_request: - branches: - - main - - -jobs: - linux: - runs-on: ubuntu-latest - strategy: - matrix: - python-version: ['3.11'] - - steps: - - uses: actions/checkout@v4 - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v5 - with: - python-version: ${{ matrix.python-version }} - - name: Install dependencies - run: | - python -m pip install --upgrade pip - pip install -r requirements.txt - python -m pip install opencv-python - pip install . - pip install graphviz - sudo apt-get install graphviz - - name: Test reader - run: | - pip install pytest - pytest dsi/tests/test_plugin.py \ No newline at end of file diff --git a/dsi/plugins/tests/test_file_writer.py b/dsi/plugins/tests/test_file_writer.py index f5a7f1d6..2fa1cdb8 100644 --- a/dsi/plugins/tests/test_file_writer.py +++ b/dsi/plugins/tests/test_file_writer.py @@ -2,8 +2,11 @@ from collections import OrderedDict import git -import dsi.plugins.file_writer as wCSV +# import dsi.plugins.file_writer as wCSV from dsi.backends.sqlite import Sqlite +import cv2 +import numpy as np +import os def get_git_root(path): git_repo = git.Repo(path, search_parent_directories=True) @@ -14,4 +17,19 @@ def test_csv_plugin_type(): path = '/'.join([get_git_root('.'), 'examples/data', 'wildfiredata.sqlite_db']) back = Sqlite(filename=path) - #assert type(plug.output_collector) == OrderedDict \ No newline at end of file + #assert type(plug.output_collector) == OrderedDict + +def test_export_db_erd(): + a=Terminal(debug_flag=False) + a.load_module('plugin', 'Schema', 'reader', filename="examples/data/example_schema.json" , target_table_prefix = "student") + a.load_module('plugin', 'YAML', 'reader', filenames=["examples/data/student_test1.yml", "examples/data/student_test2.yml"], target_table_prefix = "student") + a.load_module('plugin', 'TOML', 'reader', filenames=["examples/data/results.toml"], target_table_prefix = "results") + a.load_module('plugin', 'ER_Diagram', 'writer', filename = 'erd_test_output.png') + a.transload() + + er_image = cv2.imread("erd_test_output.png") + assert er_image is not None #check if image generated at all + + pixel_mean = np.mean(er_image) + os.remove("erd_test_output.png") + assert pixel_mean != 255 #check if image is all white pixels (no diagram generated) diff --git a/dsi/tests/test_plugin.py b/dsi/tests/test_plugin.py deleted file mode 100644 index 5d3c7d0d..00000000 --- a/dsi/tests/test_plugin.py +++ /dev/null @@ -1,22 +0,0 @@ -from dsi.plugins import file_writer as fw -from dsi.core import Terminal - -import cv2 -import numpy as np -import subprocess -import os - -def test_export_db_erd(): - a=Terminal(debug_flag=False) - a.load_module('plugin', 'Schema', 'reader', filename="examples/data/example_schema.json" , target_table_prefix = "student") - a.load_module('plugin', 'YAML', 'reader', filenames=["examples/data/student_test1.yml", "examples/data/student_test2.yml"], target_table_prefix = "student") - a.load_module('plugin', 'TOML', 'reader', filenames=["examples/data/results.toml"], target_table_prefix = "results") - a.load_module('plugin', 'ER_Diagram', 'writer', filename = 'erd_test_output.png') - a.transload() - - er_image = cv2.imread("erd_test_output.png") - assert er_image is not None #check if image generated at all - - pixel_mean = np.mean(er_image) - os.remove("erd_test_output.png") - assert pixel_mean != 255 #check if image is all white pixels (no diagram generated) From f874bd58574816ef229bab956b97907a8df99062 Mon Sep 17 00:00:00 2001 From: Vedant P Iyer Date: Thu, 7 Nov 2024 01:15:11 -0700 Subject: [PATCH 02/39] created test functions for schema, yaml, toml readers --- dsi/plugins/tests/test_file_reader.py | 35 ++++++++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/dsi/plugins/tests/test_file_reader.py b/dsi/plugins/tests/test_file_reader.py index bb72478f..190e0a98 100644 --- a/dsi/plugins/tests/test_file_reader.py +++ b/dsi/plugins/tests/test_file_reader.py @@ -93,4 +93,37 @@ def test_csv_plugin_leaves_active_metadata_wellformed(): columns = list(term.active_metadata["Csv"].values()) assert all([len(columns[0]) == len(col) - for col in columns]) # all same length \ No newline at end of file + for col in columns]) # all same length + +def test_yaml_reader(): + a=Terminal() + a.load_module('plugin', 'YAML', 'reader', filenames=["examples/data/student_test1.yml", "examples/data/student_test2.yml"], target_table_prefix = "student") + a.transload() + + assert len(a.active_metadata.keys()) == 3 + for tableData in a.active_metadata.values(): + assert isinstance(tableData, OrderedDict) + numRows = 2 + assert all(len(lst) == numRows for lst in tableData.values()) + +def test_toml_reader(): + a=Terminal() + a.load_module('plugin', 'TOML', 'reader', filenames="examples/data/results.toml", target_table_prefix = "results") + a.transload() + + assert len(a.active_metadata.keys()) == 1 + for tableData in a.active_metadata.values(): + assert isinstance(tableData, OrderedDict) + numRows = 1 + assert all(len(lst) == numRows for lst in tableData.values()) + +def test_schema_reader(): + a=Terminal() + a.load_module('plugin', 'Schema', 'reader', filename="examples/data/example_schema.json" , target_table_prefix = "student") + a.transload() + + assert len(a.active_metadata.keys()) == 1 + assert "dsi_relations" in a.active_metadata.keys() + for tableData in a.active_metadata.values(): + assert isinstance(tableData, OrderedDict) + assert len(tableData["primary_key"]) == len(tableData["foreign_key"]) \ No newline at end of file From 8544983786f969047b727f2f21520e25e2b0e8bf Mon Sep 17 00:00:00 2001 From: Vedant P Iyer Date: Thu, 7 Nov 2024 01:19:30 -0700 Subject: [PATCH 03/39] updated file writer CI test with extra package installs for erd test --- .github/workflows/test_file_writer.yml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.github/workflows/test_file_writer.yml b/.github/workflows/test_file_writer.yml index 7e51dad7..ef4f0c0d 100644 --- a/.github/workflows/test_file_writer.yml +++ b/.github/workflows/test_file_writer.yml @@ -26,7 +26,10 @@ jobs: run: | python -m pip install --upgrade pip pip install -r requirements.txt - pip install . + python -m pip install opencv-python + pip install . + pip install graphviz + sudo apt-get install graphviz - name: Test reader run: | pip install pytest From 56e9d15e24f2c409a3a064c2e0076a965b29b03e Mon Sep 17 00:00:00 2001 From: Vedant P Iyer Date: Thu, 7 Nov 2024 01:20:55 -0700 Subject: [PATCH 04/39] created a unified dsi_units table for all ingested data and updated handling for writers --- dsi/plugins/file_reader.py | 37 +++++++++++++++++++++---------------- dsi/plugins/file_writer.py | 12 +++++++++--- 2 files changed, 30 insertions(+), 19 deletions(-) diff --git a/dsi/plugins/file_reader.py b/dsi/plugins/file_reader.py index c1606f33..19b9b472 100644 --- a/dsi/plugins/file_reader.py +++ b/dsi/plugins/file_reader.py @@ -278,18 +278,20 @@ def add_rows(self) -> None: yaml_load_data = list(yaml.safe_load_all(editedString)) if not self.schema_is_set(): + self.yaml_data["dsi_units"] = OrderedDict() for table in yaml_load_data: tableName = table["segment"] if self.target_table_prefix is not None: tableName = self.target_table_prefix + "__" + table["segment"] self.yaml_data[tableName] = OrderedDict((key, []) for key in table["columns"].keys()) - self.yaml_data[tableName + "_units"] = OrderedDict((key, []) for key in table["columns"].keys()) - self.yaml_data["dsi_relations"] = OrderedDict([('primary_key', []), ('foreign_key', [])]) + self.yaml_data["dsi_units"][tableName] = [] + # self.yaml_data["dsi_relations"] = OrderedDict([('primary_key', []), ('foreign_key', [])]) self.pack_header() + unit_row = [] for table in yaml_load_data: row = [] - unit_row = [] + table_unit_row = [] tableName = table["segment"] if self.target_table_prefix is not None: tableName = self.target_table_prefix + "__" + table["segment"] @@ -299,13 +301,14 @@ def add_rows(self) -> None: unit_data = data[data.find(' ')+1:] data = self.check_type(data[:data.find(" ")]) self.yaml_data[tableName][col_name].append(data) - if len(self.yaml_data[tableName + "_units"][col_name]) < 1: - unit_row.append(unit_data) - self.yaml_data[tableName + "_units"][col_name].append(unit_data) + if (col_name, unit_data) not in self.yaml_data["dsi_units"][tableName]: + table_unit_row.append((col_name, unit_data)) + self.yaml_data["dsi_units"][tableName].append((col_name, unit_data)) row.append(data) self.add_to_output(row, tableName) - if len(next(iter(self.output_collector[tableName + "_units"].values()))) < 1: - self.add_to_output(unit_row, tableName + "_units") + unit_row.append(table_unit_row) + if len(next(iter(self.output_collector["dsi_units"].values()))) < 1: + self.add_to_output(unit_row, "dsi_units") class TOML(FileReader): ''' @@ -355,13 +358,14 @@ def add_rows(self) -> None: if self.target_table_prefix is not None: tableName = self.target_table_prefix + "__" + tableName self.toml_data[tableName] = OrderedDict((key, []) for key in tableData.keys()) - self.toml_data[tableName + "_units"] = OrderedDict((key, []) for key in tableData.keys()) - self.toml_data["dsi_relations"] = OrderedDict([('primary_key', []), ('foreign_key', [])]) + self.toml_data["dsi_units"] = OrderedDict([(tableName,[])]) + # self.toml_data["dsi_relations"] = OrderedDict([('primary_key', []), ('foreign_key', [])]) self.pack_header() + unit_row = [] for tableName, tableData in toml_load_data.items(): row = [] - unit_row = [] + table_unit_row = [] if self.target_table_prefix is not None: tableName = self.target_table_prefix + "__" + tableName for col_name, data in tableData.items(): @@ -375,10 +379,11 @@ def add_rows(self) -> None: # unit_data = data["units"] # data = data["value"] self.toml_data[tableName][col_name].append(data) - if len(self.toml_data[tableName + "_units"][col_name]) < 1: - unit_row.append(unit_data) - self.toml_data[tableName + "_units"][col_name].append(unit_data) + if (col_name, unit_data) not in self.toml_data["dsi_units"][tableName]: + table_unit_row.append((col_name, unit_data)) + self.toml_data["dsi_units"][tableName].append((col_name, unit_data)) row.append(data) self.add_to_output(row, tableName) - if len(next(iter(self.output_collector[tableName + "_units"].values()))) < 1: - self.add_to_output(unit_row, tableName + "_units") + unit_row.append(table_unit_row) + if len(next(iter(self.output_collector["dsi_units"].values()))) < 1: + self.add_to_output(unit_row, "dsi_units") diff --git a/dsi/plugins/file_writer.py b/dsi/plugins/file_writer.py index 1804e3cf..61c10091 100644 --- a/dsi/plugins/file_writer.py +++ b/dsi/plugins/file_writer.py @@ -77,7 +77,9 @@ def get_rows(self, collection) -> None: dot_file.write("overlap=false ") for tableName, tableData in collection.items(): - if tableName == "dsi_relations" or (self.target_table_prefix is not None and self.target_table_prefix not in tableName): + if tableName == "dsi_relations" or tableName == "sqlite_sequence": + continue + elif self.target_table_prefix is not None and self.target_table_prefix not in tableName: continue dot_file.write(f"{tableName} [label=<") @@ -304,11 +306,15 @@ def get_rows(self, collection) -> None: numeric_cols = [] col_len = None for colName, colData in collection[self.table_name].items(): + if colName == "run_id": + continue if col_len == None: col_len = len(colData) if isinstance(colData[0], str) == False: - if self.table_name + "_units" in collection.keys() and collection[self.table_name + "_units"][colName][0] != "NULL": - numeric_cols.append((colName + f" ({collection[self.table_name + '_units'][colName][0]})", colData)) + if self.table_name in collection["dsi_units"].keys(): + unit_tuple = next((t[1] for t in collection["dsi_units"][self.table_name] if t[0] == colName), "NULL") + if unit_tuple != "NULL": + numeric_cols.append((colName + f" ({unit_tuple})", colData)) else: numeric_cols.append((colName, colData)) From 17c27349edc3717d19089aaf459ccc700c436a6e Mon Sep 17 00:00:00 2001 From: Vedant P Iyer Date: Thu, 7 Nov 2024 01:27:39 -0700 Subject: [PATCH 05/39] Updated other files to handle unified units table and separate back-write/read in core calls --- dsi/core.py | 26 +++++++++++++++++++++----- dsi/plugins/metadata.py | 5 ++++- examples/coreterminal.py | 18 +++++++++--------- 3 files changed, 34 insertions(+), 15 deletions(-) diff --git a/dsi/core.py b/dsi/core.py index 2ca4c6da..0ea406fe 100644 --- a/dsi/core.py +++ b/dsi/core.py @@ -207,10 +207,17 @@ def transload(self, **kwargs): self.active_metadata[table_name] = table_metadata else: for colName, colData in table_metadata.items(): - if colName in self.active_metadata[table_name].keys(): + if colName in self.active_metadata[table_name].keys() and table_name != "dsi_units": self.active_metadata[table_name][colName] += colData + elif table_name == "dsi_units": #allow overwrite of unit data + self.active_metadata[table_name][colName] = colData else: raise ValueError(f"Mismatched column input for table {table_name}") + # NO OVERWRITE OF UNIT DATA + # elif colName not in self.active_metadata[table_name].keys() and table_name == "dsi_units": + # self.active_metadata[table_name][colName] = colData + # elif colName not in self.active_metadata[table_name].keys() and table_name != "dsi_units": + # raise ValueError(f"Mismatched column input for table {table_name}") end = datetime.now() self.logger.info(f"Runtime: {end-start}") elif module_type == "writer": @@ -253,9 +260,8 @@ def artifact_handler(self, interaction_type, query = None, **kwargs): operation_success = False # Perform artifact movement first, because inspect implementation may rely on # self.active_metadata or some stored artifact. - selected_function_modules = dict( - (k, self.active_modules[k]) for k in ('back-read', 'back-write')) - for module_type, objs in selected_function_modules.items(): + selected_write_backends = dict((k, self.active_modules[k]) for k in (['back-write'])) + for module_type, objs in selected_write_backends.items(): for obj in objs: self.logger.info(f"-------------------------------------") self.logger.info(obj.__class__.__name__ + f" backend - {interaction_type} the data") @@ -277,11 +283,21 @@ def artifact_handler(self, interaction_type, query = None, **kwargs): self.active_metadata = obj.inspect_artifacts( collection=self.active_metadata, **kwargs) operation_success = True - elif interaction_type == "read": + end = datetime.now() + self.logger.info(f"Runtime: {end-start}") + + selected_read_backends = dict((k, self.active_modules[k]) for k in (['back-read'])) + for module_type, objs in selected_read_backends.items(): + for obj in objs: + self.logger.info(f"-------------------------------------") + self.logger.info(obj.__class__.__name__ + f" backend - {interaction_type} the data") + start = datetime.now() + if interaction_type == "read": self.active_metadata = obj.read_to_artifact() operation_success = True end = datetime.now() self.logger.info(f"Runtime: {end-start}") + if operation_success: if interaction_type == 'get' and self.active_metadata: return self.active_metadata diff --git a/dsi/plugins/metadata.py b/dsi/plugins/metadata.py index 93858ecf..cc6f4ba2 100644 --- a/dsi/plugins/metadata.py +++ b/dsi/plugins/metadata.py @@ -85,7 +85,10 @@ def add_to_output(self, row: list, tableName = None) -> None: raise RuntimeError(f"For {tableName}, incorrect number of values was given") for key, row_elem in zip(self.output_collector[tableName].keys(), row): - self.output_collector[tableName][key].append(row_elem) + if "dsi_units" != tableName: + self.output_collector[tableName][key].append(row_elem) + else: + self.output_collector[tableName][key] = row_elem def schema_is_set(self) -> bool: """ Helper method to see if the schema has been set """ diff --git a/examples/coreterminal.py b/examples/coreterminal.py index 7d326a69..722ea9ae 100644 --- a/examples/coreterminal.py +++ b/examples/coreterminal.py @@ -16,16 +16,15 @@ # a.load_module('plugin', 'ER_Diagram', 'writer', filename = 'er_diagram.pdf')#, target_table_prefix = "physics") # a.transload() -a.load_module('backend','Sqlite','back-write', filename='data/data.db') -# a.load_module('backend','Parquet','back-write',filename='./data/bueno.pq') +# a.load_module('backend','Sqlite','back-write', filename='data/data.db') +# a.load_module('backend','Parquet','back-write',filename='data/bueno.pq') -a.artifact_handler(interaction_type='put') -# data = a.artifact_handler(interaction_type='get', query = "SELECT * FROM sqlite_master WHERE type='table';")#, isVerbose = True) +# a.artifact_handler(interaction_type='put') +# data = a.artifact_handler(interaction_type='get', query = "SELECT * FROM run_table;")#, isVerbose = True) # print(data) # a.unload_module('backend', 'Sqlite', 'back-write') - # LIST MODULES # a.list_available_modules('plugin') # # ['GitInfo', 'Hostname', 'SystemKernel', 'Bueno', 'Csv'] @@ -50,7 +49,8 @@ #Example use 2 -# a.load_module('backend','SqliteReader','back-read', filename='data/data.db') -# a.artifact_handler(interaction_type="read") -# a.load_module('plugin', 'ER_Diagram', 'writer', filename = 'er_diagram.pdf')#, target_table_prefix = "physics") -# a.transload() \ No newline at end of file +a.load_module('backend','SqliteReader','back-read', filename='data/data.db') +a.artifact_handler(interaction_type="read") +a.load_module('plugin', 'ER_Diagram', 'writer', filename = 'er_diagram.pdf')#, target_table_prefix = "physics") +a.load_module('plugin', "Table_Plot", "writer", table_name = "student__physics", filename = "student__physics") +a.transload() \ No newline at end of file From 10d0e54767d48b6df103427626ddbaa13ec5e627 Mon Sep 17 00:00:00 2001 From: Vedant P Iyer Date: Thu, 7 Nov 2024 01:29:16 -0700 Subject: [PATCH 06/39] Created run table to store metadata for every workflow run and handled unified unit table in sqlite reader/writer classes --- dsi/backends/sqlite.py | 75 +++++++++++++++++++++++++++++++----------- 1 file changed, 55 insertions(+), 20 deletions(-) diff --git a/dsi/backends/sqlite.py b/dsi/backends/sqlite.py index 9a9fb824..d254288a 100644 --- a/dsi/backends/sqlite.py +++ b/dsi/backends/sqlite.py @@ -7,6 +7,7 @@ # import os import nbconvert as nbc import nbformat as nbf +from datetime import datetime from collections import OrderedDict from dsi.backends.filesystem import Filesystem @@ -40,21 +41,23 @@ class Artifact: class SqliteReader(Filesystem): - def __init__(self, filename): + def __init__(self, filename, append = False): self.filename = filename self.con = sqlite3.connect(filename) self.cur = self.con.cursor() def read_to_artifact(self): artifact = OrderedDict() + artifact["dsi_relations"] = OrderedDict([("primary_key",[]), ("foreign_key", [])]) tableList = self.cur.execute("SELECT name FROM sqlite_master WHERE type ='table';").fetchall() - - artifact["dsi_relations"] = OrderedDict([("primary_key",[]), ("foreign_key", [])]) pkList = [] - for item in tableList: tableName = item[0] + if tableName == "dsi_units": + artifact["dsi_units"] = self.read_units_helper() + continue + tableInfo = self.cur.execute(f"PRAGMA table_info({tableName});").fetchall() colDict = OrderedDict() for colInfo in tableInfo: @@ -81,6 +84,17 @@ def read_to_artifact(self): artifact["dsi_relations"]["foreign_key"].append(("NULL", "NULL")) return artifact + def read_units_helper(self): + unitsDict = OrderedDict() + unitsTable = self.cur.execute("SELECT * FROM dsi_units;").fetchall() + for row in unitsTable: + tableName = row[0] + if tableName not in unitsDict.keys(): + unitsDict[tableName] = [] + unitsDict[tableName].append(eval(row[1])) + return unitsDict + + # Closes connection to server def close(self): self.con.close() @@ -95,10 +109,11 @@ class Sqlite(Filesystem): con = None cur = None - def __init__(self, filename): + def __init__(self, filename, run_table = True): self.filename = filename self.con = sqlite3.connect(filename) self.cur = self.con.cursor() + self.run_flag = run_table def check_type(self, text): """ @@ -125,17 +140,19 @@ def put_artifact_type(self, types, foreign_query = None, isVerbose=False): `return`: none """ - # key_names = types.properties.keys() - key_names = types.unit_keys - if "_units" in types.name: - key_names = [item + " UNIQUE" for item in types.unit_keys] - - col_names = ', '.join(key_names) - str_query = "CREATE TABLE IF NOT EXISTS {} ({}".format(str(types.name), col_names) + col_names = ', '.join(types.unit_keys) + if self.run_flag: + str_query = "CREATE TABLE IF NOT EXISTS {} (run_id, {}".format(str(types.name), col_names) + else: + str_query = "CREATE TABLE IF NOT EXISTS {} ({}".format(str(types.name), col_names) if foreign_query != None: str_query += foreign_query - str_query += ");" + + if self.run_flag: + str_query += ", FOREIGN KEY (run_id) REFERENCES runTable (run_id));" + else: + str_query += ");" if isVerbose: print(str_query) @@ -174,10 +191,18 @@ def put_artifacts(self, collection, isVerbose=False): # Core compatibility name assignment artifacts = collection + if self.run_flag: + runTable_create = "CREATE TABLE IF NOT EXISTS runTable (run_id INTEGER PRIMARY KEY AUTOINCREMENT, run_timestamp TEXT UNIQUE);" + self.cur.execute(runTable_create) + self.con.commit() + timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + runTable_insert = f"INSERT INTO runTable (run_timestamp) VALUES ('{timestamp}');" + self.cur.execute(runTable_insert) + self.con.commit() for tableName, tableData in artifacts.items(): - if tableName == "dsi_relations": + if tableName == "dsi_relations" or tableName == "dsi_units": continue types = DataType() @@ -212,14 +237,14 @@ def put_artifacts(self, collection, isVerbose=False): col_names = ', '.join(types.properties.keys()) placeholders = ', '.join('?' * len(types.properties)) - if "_units" in tableName: - str_query = "INSERT OR IGNORE INTO {} ({}) VALUES ({});".format(str(types.name), col_names, placeholders) + str_query = "INSERT INTO " + if self.run_flag: + run_id = self.cur.execute("SELECT run_id FROM runTable ORDER BY run_id DESC LIMIT 1;").fetchone()[0] + str_query += "{} (run_id, {}) VALUES ({}, {});".format(str(types.name), col_names, run_id, placeholders) else: - str_query = "INSERT INTO {} ({}) VALUES ({});".format(str(types.name), col_names, placeholders) + str_query += "{} ({}) VALUES ({});".format(str(types.name), col_names, placeholders) - # col_list helps access the specific keys of the dictionary in the for loop col_list = col_names.split(', ') - # loop through the contents of each column and insert into table as a row for ind1 in range(len(types.properties[col_list[0]])): vals = [] @@ -232,7 +257,17 @@ def put_artifacts(self, collection, isVerbose=False): print(str_query) self.con.commit() - self.types = types #This will only copy the last table from artifacts (collections input) + self.types = types #This will only copy the last table from artifacts (collections input) + + for tableName, tableData in artifacts["dsi_units"].items(): + create_query = "CREATE TABLE IF NOT EXISTS dsi_units (table_name TEXT, column_and_unit TEXT UNIQUE)" + self.cur.execute(create_query) + self.con.commit() + if len({t[1] for t in tableData}) > 1: + for col_unit_pair in tableData: + str_query = f'INSERT OR IGNORE INTO dsi_units VALUES ("{tableName}", "{col_unit_pair}")' + self.cur.execute(str_query) + self.con.commit() def put_artifacts_only(self, artifacts, isVerbose=False): """ From 7eb97dd2d69f8dfa421e7ec30558cd4f3ac19995 Mon Sep 17 00:00:00 2001 From: Vedant P Iyer Date: Wed, 13 Nov 2024 16:11:43 -0700 Subject: [PATCH 07/39] updated backend actions and split back-write and read actions --- dsi/core.py | 101 +++++++++++++++++++++------------------------------- 1 file changed, 41 insertions(+), 60 deletions(-) diff --git a/dsi/core.py b/dsi/core.py index 0ea406fe..c84a5217 100644 --- a/dsi/core.py +++ b/dsi/core.py @@ -31,7 +31,7 @@ class Terminal(): 'backend': ['back-read', 'back-write']} VALID_ARTIFACT_INTERACTION_TYPES = ['get', 'set', 'put', 'inspect', 'read'] - def __init__(self, debug_flag = False): + def __init__(self, debug_flag = False, backup_db_flag = False): # Helper function to get parent module names. def static_munge(prefix, implementations): return (['.'.join(i) for i in product(prefix, implementations)]) @@ -57,6 +57,8 @@ def static_munge(prefix, implementations): self.active_metadata = OrderedDict() self.transload_lock = False + self.backup_db_flag = backup_db_flag + self.logger = logging.getLogger(self.__class__.__name__) if debug_flag: @@ -194,7 +196,6 @@ def transload(self, **kwargs): """ selected_function_modules = dict( (k, self.active_modules[k]) for k in ('reader', 'writer')) - # Note this transload supports plugin.env Environment types now. for module_type, objs in selected_function_modules.items(): for obj in objs: self.logger.info(f"-------------------------------------") @@ -209,15 +210,10 @@ def transload(self, **kwargs): for colName, colData in table_metadata.items(): if colName in self.active_metadata[table_name].keys() and table_name != "dsi_units": self.active_metadata[table_name][colName] += colData - elif table_name == "dsi_units": #allow overwrite of unit data + elif colName not in self.active_metadata[table_name].keys() and table_name == "dsi_units": self.active_metadata[table_name][colName] = colData - else: + elif colName not in self.active_metadata[table_name].keys() and table_name != "dsi_units": raise ValueError(f"Mismatched column input for table {table_name}") - # NO OVERWRITE OF UNIT DATA - # elif colName not in self.active_metadata[table_name].keys() and table_name == "dsi_units": - # self.active_metadata[table_name][colName] = colData - # elif colName not in self.active_metadata[table_name].keys() and table_name != "dsi_units": - # raise ValueError(f"Mismatched column input for table {table_name}") end = datetime.now() self.logger.info(f"Runtime: {end-start}") elif module_type == "writer": @@ -225,23 +221,6 @@ def transload(self, **kwargs): obj.get_rows(self.active_metadata, **kwargs) end = datetime.now() self.logger.info(f"Runtime: {end-start}") - # Plugins may add one or more rows (vector vs matrix data). - # You may have two or more plugins with different numbers of rows. - # Consequently, transload operations may have unstructured shape for - # some plugin configurations. We must force structure to create a valid - # middleware data structure. - # To resolve, we pad the shorter columns to match the max length column. - #COMMENTED OUT TILL LATER - ''' - max_len = max([len(col) for col in self.active_metadata.values()]) - for colname, coldata in self.active_metadata.items(): - if len(coldata) != max_len: - self.active_metadata[colname].extend( # add None's until reaching max_len - [None] * (max_len - len(coldata))) - - assert all([len(col) == max_len for col in self.active_metadata.values( - )]), "All columns must have the same number of rows" - ''' self.transload_lock = True @@ -260,43 +239,45 @@ def artifact_handler(self, interaction_type, query = None, **kwargs): operation_success = False # Perform artifact movement first, because inspect implementation may rely on # self.active_metadata or some stored artifact. - selected_write_backends = dict((k, self.active_modules[k]) for k in (['back-write'])) - for module_type, objs in selected_write_backends.items(): - for obj in objs: - self.logger.info(f"-------------------------------------") - self.logger.info(obj.__class__.__name__ + f" backend - {interaction_type} the data") - start = datetime.now() - if interaction_type == 'put' or interaction_type == 'set': - obj.put_artifacts( - collection=self.active_metadata, **kwargs) - operation_success = True - elif interaction_type == 'get': - self.logger.info(f"Query to get data: {query}") - if query != None: - self.active_metadata = obj.get_artifacts(query, **kwargs) - else: - raise ValueError("Need to specify a query of the database to return data") - operation_success = True - elif interaction_type == 'inspect': - obj.put_artifacts( + for obj in self.active_modules['back-write']: + self.logger.info(f"-------------------------------------") + self.logger.info(obj.__class__.__name__ + f" backend - {interaction_type} the data") + start = datetime.now() + if interaction_type == 'put' or interaction_type == 'set': + if self.backup_db_flag == True and os.path.getsize(obj.filename) > 100: + backup_file = obj.filename[:obj.filename.rfind('.')] + "_backup" + obj.filename[obj.filename.rfind('.'):] + shutil.copyfile(obj.filename, backup_file) + db_size = os.path.getsize(obj.filename) + errorMessage = obj.put_artifacts( + collection=self.active_metadata, **kwargs) + if db_size == os.path.getsize(obj.filename): + print(errorMessage) + operation_success = True + elif interaction_type == 'get': + self.logger.info(f"Query to get data: {query}") + if query != None: + self.active_metadata = obj.get_artifacts(query, **kwargs) + else: + raise ValueError("Need to specify a query of the database to return data") + operation_success = True + elif interaction_type == 'inspect': + obj.put_artifacts( collection=self.active_metadata, **kwargs) - self.active_metadata = obj.inspect_artifacts( + obj.inspect_artifacts( collection=self.active_metadata, **kwargs) - operation_success = True - end = datetime.now() - self.logger.info(f"Runtime: {end-start}") + operation_success = True + end = datetime.now() + self.logger.info(f"Runtime: {end-start}") - selected_read_backends = dict((k, self.active_modules[k]) for k in (['back-read'])) - for module_type, objs in selected_read_backends.items(): - for obj in objs: - self.logger.info(f"-------------------------------------") - self.logger.info(obj.__class__.__name__ + f" backend - {interaction_type} the data") - start = datetime.now() - if interaction_type == "read": - self.active_metadata = obj.read_to_artifact() - operation_success = True - end = datetime.now() - self.logger.info(f"Runtime: {end-start}") + for obj in self.active_modules['back-read']: + self.logger.info(f"-------------------------------------") + self.logger.info(obj.__class__.__name__ + f" backend - {interaction_type} the data") + start = datetime.now() + if interaction_type == "read": + self.active_metadata = obj.read_to_artifact() + operation_success = True + end = datetime.now() + self.logger.info(f"Runtime: {end-start}") if operation_success: if interaction_type == 'get' and self.active_metadata: From fdcb7a38ca0d239681e462b01233469f05ce7673 Mon Sep 17 00:00:00 2001 From: Vedant P Iyer Date: Wed, 13 Nov 2024 16:12:48 -0700 Subject: [PATCH 08/39] created set_schema2 which sets file reader data dict to metadata dict --- dsi/plugins/metadata.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/dsi/plugins/metadata.py b/dsi/plugins/metadata.py index cc6f4ba2..6b194366 100644 --- a/dsi/plugins/metadata.py +++ b/dsi/plugins/metadata.py @@ -63,6 +63,11 @@ def set_schema(self, table_data: list, validation_model=None) -> None: if not self.strict_mode_lock: self.strict_mode_lock = True + def set_schema_2(self, collection, validation_model=None) -> None: + self.output_collector = collection + self.table_cnt = len(collection.keys()) + self.validation_model = validation_model + def add_to_output(self, row: list, tableName = None) -> None: """ Adds a row of data to the output_collector and guarantees good structure. From 5a68c19802b5bec4bc79e3a74d079fcaf6db5e65 Mon Sep 17 00:00:00 2001 From: Vedant P Iyer Date: Wed, 13 Nov 2024 16:14:33 -0700 Subject: [PATCH 09/39] added inspect artifact handler function example workflow --- examples/coreterminal.py | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/examples/coreterminal.py b/examples/coreterminal.py index 722ea9ae..0859787f 100644 --- a/examples/coreterminal.py +++ b/examples/coreterminal.py @@ -3,24 +3,25 @@ '''This is an example workflow using core.py''' -a=Terminal(debug_flag=False) +a=Terminal(debug_flag=False, backup_db_flag=False) # a.load_module('plugin','Bueno','reader', filenames='data/bueno1.data') # a.load_module('plugin','Hostname','reader') -# a.load_module('plugin', 'Schema', 'reader', filename="data/example_schema.json" , target_table_prefix = "student") -# a.load_module('plugin', 'YAML', 'reader', filenames=["data/student_test1.yml", "data/student_test2.yml"], target_table_prefix = "student") -# a.load_module('plugin', 'TOML', 'reader', filenames=["data/results.toml"], target_table_prefix = "results") +a.load_module('plugin', 'Schema', 'reader', filename="data/example_schema.json" , target_table_prefix = "student") +a.load_module('plugin', 'YAML', 'reader', filenames=["data/student_test1.yml", "data/student_test2.yml"], target_table_prefix = "student") +a.load_module('plugin', 'TOML', 'reader', filenames=["data/results.toml"], target_table_prefix = "results") # a.load_module('plugin', "Table_Plot", "writer", table_name = "schema__physics", filename = "schema__physics") # a.load_module('plugin', 'ER_Diagram', 'writer', filename = 'er_diagram.pdf')#, target_table_prefix = "physics") -# a.transload() +a.transload() -# a.load_module('backend','Sqlite','back-write', filename='data/data.db') +a.load_module('backend','Sqlite','back-write', filename='data/data.db') # a.load_module('backend','Parquet','back-write',filename='data/bueno.pq') -# a.artifact_handler(interaction_type='put') +a.artifact_handler(interaction_type='put') # data = a.artifact_handler(interaction_type='get', query = "SELECT * FROM run_table;")#, isVerbose = True) +a.artifact_handler(interaction_type="inspect") # print(data) # a.unload_module('backend', 'Sqlite', 'back-write') @@ -49,8 +50,8 @@ #Example use 2 -a.load_module('backend','SqliteReader','back-read', filename='data/data.db') -a.artifact_handler(interaction_type="read") -a.load_module('plugin', 'ER_Diagram', 'writer', filename = 'er_diagram.pdf')#, target_table_prefix = "physics") -a.load_module('plugin', "Table_Plot", "writer", table_name = "student__physics", filename = "student__physics") -a.transload() \ No newline at end of file +# a.load_module('backend','SqliteReader','back-read', filename='data/data.db') +# a.artifact_handler(interaction_type="read") +# a.load_module('plugin', 'ER_Diagram', 'writer', filename = 'er_diagram.pdf')#, target_table_prefix = "physics") +# a.load_module('plugin', "Table_Plot", "writer", table_name = "student__physics", filename = "student__physics") +# a.transload() \ No newline at end of file From bd205d96dc5d837c11d04fc71a9c2179a2fd7299 Mon Sep 17 00:00:00 2001 From: Vedant P Iyer Date: Wed, 13 Nov 2024 16:15:39 -0700 Subject: [PATCH 10/39] updated file writers to handle unified unit table --- dsi/plugins/file_writer.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/dsi/plugins/file_writer.py b/dsi/plugins/file_writer.py index 61c10091..cf800c76 100644 --- a/dsi/plugins/file_writer.py +++ b/dsi/plugins/file_writer.py @@ -84,9 +84,12 @@ def get_rows(self, collection) -> None: dot_file.write(f"{tableName} [label=<
{tableName}
") + col_list = tableData.keys() + if tableName == "dsi_units": + col_list = ["table_name", "column_and_unit"] curr_row = 0 inner_brace = 0 - for col_name in tableData.keys(): + for col_name in col_list: if curr_row % num_tbl_cols == 0: inner_brace = 1 dot_file.write("") @@ -311,6 +314,7 @@ def get_rows(self, collection) -> None: if col_len == None: col_len = len(colData) if isinstance(colData[0], str) == False: + unit_tuple = "NULL" if self.table_name in collection["dsi_units"].keys(): unit_tuple = next((t[1] for t in collection["dsi_units"][self.table_name] if t[0] == colName), "NULL") if unit_tuple != "NULL": From f638ee1484d8491653a30f5aa6ca12a327ba784a Mon Sep 17 00:00:00 2001 From: Vedant P Iyer Date: Wed, 13 Nov 2024 16:16:40 -0700 Subject: [PATCH 11/39] updated schema, yaml, toml readers to use set_schema2 instead of old metadata functions --- dsi/plugins/file_reader.py | 194 ++++++++++++++++++++++++++----------- 1 file changed, 135 insertions(+), 59 deletions(-) diff --git a/dsi/plugins/file_reader.py b/dsi/plugins/file_reader.py index 19b9b472..4eb6fcfa 100644 --- a/dsi/plugins/file_reader.py +++ b/dsi/plugins/file_reader.py @@ -199,28 +199,52 @@ def pack_header(self) -> None: table_info.append((table_name, list(self.schema_data[table_name].keys()))) self.set_schema(table_info) - def add_rows(self) -> None: - if not self.schema_is_set(): - self.schema_data["dsi_relations"] = OrderedDict([('primary_key', []), ('foreign_key', [])]) - self.pack_header() - + def add_rows(self) -> None: + self.schema_data["dsi_relations"] = OrderedDict([('primary_key', []), ('foreign_key', [])]) with open(self.schema_file, 'r') as fh: schema_content = json.load(fh) for tableName, tableData in schema_content.items(): if self.target_table_prefix is not None: tableName = self.target_table_prefix + "__" + tableName - if tableData["primary_key"] != "NULL": - self.schema_data["dsi_relations"]["primary_key"].append((tableName, tableData["primary_key"])) - self.schema_data["dsi_relations"]["foreign_key"].append(("NULL", "NULL")) - self.add_to_output([(tableName, tableData["primary_key"]), ("NULL", "NULL")], "dsi_relations") - + + pkList = [] for colName, colData in tableData["foreign_key"].items(): if self.target_table_prefix is not None: colData[0] = self.target_table_prefix + "__" + colData[0] self.schema_data["dsi_relations"]["primary_key"].append((colData[0], colData[1])) self.schema_data["dsi_relations"]["foreign_key"].append((tableName, colName)) - self.add_to_output([(colData[0], colData[1]), (tableName, colName)], "dsi_relations") + + if tableData["primary_key"] != "NULL": + pkList.append((tableName, tableData["primary_key"])) + + for pk in pkList: + if pk not in self.schema_data["dsi_relations"]["primary_key"]: + self.schema_data["dsi_relations"]["primary_key"].append(pk) + self.schema_data["dsi_relations"]["foreign_key"].append(("NULL", "NULL")) + self.set_schema_2(self.schema_data) + + # if not self.schema_is_set(): + # self.schema_data["dsi_relations"] = OrderedDict([('primary_key', []), ('foreign_key', [])]) + # self.pack_header() + + # with open(self.schema_file, 'r') as fh: + # schema_content = json.load(fh) + + # for tableName, tableData in schema_content.items(): + # if self.target_table_prefix is not None: + # tableName = self.target_table_prefix + "__" + tableName + # if tableData["primary_key"] != "NULL": + # self.schema_data["dsi_relations"]["primary_key"].append((tableName, tableData["primary_key"])) + # self.schema_data["dsi_relations"]["foreign_key"].append(("NULL", "NULL")) + # self.add_to_output([(tableName, tableData["primary_key"]), ("NULL", "NULL")], "dsi_relations") + + # for colName, colData in tableData["foreign_key"].items(): + # if self.target_table_prefix is not None: + # colData[0] = self.target_table_prefix + "__" + colData[0] + # self.schema_data["dsi_relations"]["primary_key"].append((colData[0], colData[1])) + # self.schema_data["dsi_relations"]["foreign_key"].append((tableName, colName)) + # self.add_to_output([(colData[0], colData[1]), (tableName, colName)], "dsi_relations") class YAML(FileReader): ''' @@ -276,39 +300,63 @@ def add_rows(self) -> None: editedString = re.sub('specification', f'columns:\n{self.yamlSpace}specification', editedString) editedString = re.sub(r'(!.+)\n', r"'\1'\n", editedString) yaml_load_data = list(yaml.safe_load_all(editedString)) - - if not self.schema_is_set(): + + if "dsi_units" not in self.yaml_data.keys(): self.yaml_data["dsi_units"] = OrderedDict() - for table in yaml_load_data: - tableName = table["segment"] - if self.target_table_prefix is not None: - tableName = self.target_table_prefix + "__" + table["segment"] - self.yaml_data[tableName] = OrderedDict((key, []) for key in table["columns"].keys()) - self.yaml_data["dsi_units"][tableName] = [] - # self.yaml_data["dsi_relations"] = OrderedDict([('primary_key', []), ('foreign_key', [])]) - self.pack_header() - - unit_row = [] for table in yaml_load_data: - row = [] - table_unit_row = [] tableName = table["segment"] if self.target_table_prefix is not None: tableName = self.target_table_prefix + "__" + table["segment"] + if tableName not in self.yaml_data.keys(): + self.yaml_data[tableName] = OrderedDict() + unitsList = [] for col_name, data in table["columns"].items(): unit_data = "NULL" if isinstance(data, str) and not isinstance(self.check_type(data[:data.find(" ")]), str): unit_data = data[data.find(' ')+1:] data = self.check_type(data[:data.find(" ")]) + if col_name not in self.yaml_data[tableName].keys(): + self.yaml_data[tableName][col_name] = [] self.yaml_data[tableName][col_name].append(data) - if (col_name, unit_data) not in self.yaml_data["dsi_units"][tableName]: - table_unit_row.append((col_name, unit_data)) - self.yaml_data["dsi_units"][tableName].append((col_name, unit_data)) - row.append(data) - self.add_to_output(row, tableName) - unit_row.append(table_unit_row) - if len(next(iter(self.output_collector["dsi_units"].values()))) < 1: - self.add_to_output(unit_row, "dsi_units") + if unit_data != "NULL" and (col_name, unit_data) not in unitsList: + unitsList.append((col_name, unit_data)) + if len(unitsList) > 0 and tableName not in self.yaml_data["dsi_units"].keys(): + self.yaml_data["dsi_units"][tableName] = unitsList + + self.set_schema_2(self.yaml_data) + + # if not self.schema_is_set(): + # self.yaml_data["dsi_units"] = OrderedDict() + # for table in yaml_load_data: + # tableName = table["segment"] + # if self.target_table_prefix is not None: + # tableName = self.target_table_prefix + "__" + table["segment"] + # self.yaml_data[tableName] = OrderedDict((key, []) for key in table["columns"].keys()) + # self.yaml_data["dsi_units"][tableName] = [] + # # self.yaml_data["dsi_relations"] = OrderedDict([('primary_key', []), ('foreign_key', [])]) + # self.pack_header() + + # unit_row = [] + # for table in yaml_load_data: + # row = [] + # table_unit_row = [] + # tableName = table["segment"] + # if self.target_table_prefix is not None: + # tableName = self.target_table_prefix + "__" + table["segment"] + # for col_name, data in table["columns"].items(): + # unit_data = "NULL" + # if isinstance(data, str) and not isinstance(self.check_type(data[:data.find(" ")]), str): + # unit_data = data[data.find(' ')+1:] + # data = self.check_type(data[:data.find(" ")]) + # self.yaml_data[tableName][col_name].append(data) + # if (col_name, unit_data) not in self.yaml_data["dsi_units"][tableName]: + # table_unit_row.append((col_name, unit_data)) + # self.yaml_data["dsi_units"][tableName].append((col_name, unit_data)) + # row.append(data) + # self.add_to_output(row, tableName) + # unit_row.append(table_unit_row) + # if len(next(iter(self.output_collector["dsi_units"].values()))) < 1: + # self.add_to_output(unit_row, "dsi_units") class TOML(FileReader): ''' @@ -329,12 +377,12 @@ def __init__(self, filenames, target_table_prefix = None, **kwargs): self.toml_data = OrderedDict() self.target_table_prefix = target_table_prefix - def pack_header(self) -> None: - """Set schema with TOML data.""" - table_info = [] - for table_name in list(self.toml_data.keys()): - table_info.append((table_name, list(self.toml_data[table_name].keys()))) - self.set_schema(table_info) + # def pack_header(self) -> None: + # """Set schema with TOML data.""" + # table_info = [] + # for table_name in list(self.toml_data.keys()): + # table_info.append((table_name, list(self.toml_data[table_name].keys()))) + # self.set_schema(table_info) def add_rows(self) -> None: """ @@ -353,21 +401,14 @@ def add_rows(self) -> None: with open(filename, 'rb') as toml_file: toml_load_data = tomllib.load(toml_file) - if not self.schema_is_set(): - for tableName, tableData in toml_load_data.items(): - if self.target_table_prefix is not None: - tableName = self.target_table_prefix + "__" + tableName - self.toml_data[tableName] = OrderedDict((key, []) for key in tableData.keys()) - self.toml_data["dsi_units"] = OrderedDict([(tableName,[])]) - # self.toml_data["dsi_relations"] = OrderedDict([('primary_key', []), ('foreign_key', [])]) - self.pack_header() - - unit_row = [] + if "dsi_units" not in self.toml_data.keys(): + self.toml_data["dsi_units"] = OrderedDict() for tableName, tableData in toml_load_data.items(): - row = [] - table_unit_row = [] if self.target_table_prefix is not None: tableName = self.target_table_prefix + "__" + tableName + if tableName not in self.toml_data.keys(): + self.toml_data[tableName] = OrderedDict() + unitsList = [] for col_name, data in tableData.items(): unit_data = "NULL" if isinstance(data, dict): @@ -378,12 +419,47 @@ def add_rows(self) -> None: # data = ast.literal_eval(data) # unit_data = data["units"] # data = data["value"] + if col_name not in self.toml_data[tableName].keys(): + self.toml_data[tableName][col_name] = [] self.toml_data[tableName][col_name].append(data) - if (col_name, unit_data) not in self.toml_data["dsi_units"][tableName]: - table_unit_row.append((col_name, unit_data)) - self.toml_data["dsi_units"][tableName].append((col_name, unit_data)) - row.append(data) - self.add_to_output(row, tableName) - unit_row.append(table_unit_row) - if len(next(iter(self.output_collector["dsi_units"].values()))) < 1: - self.add_to_output(unit_row, "dsi_units") + if unit_data != "NULL" and (col_name, unit_data) not in unitsList: + unitsList.append((col_name, unit_data)) + if len(unitsList) > 0 and tableName not in self.toml_data["dsi_units"].keys(): + self.toml_data["dsi_units"][tableName] = unitsList + + self.set_schema_2(self.toml_data) + + # if not self.schema_is_set(): + # for tableName, tableData in toml_load_data.items(): + # if self.target_table_prefix is not None: + # tableName = self.target_table_prefix + "__" + tableName + # self.toml_data[tableName] = OrderedDict((key, []) for key in tableData.keys()) + # self.toml_data["dsi_units"] = OrderedDict([(tableName,[])]) + # # self.toml_data["dsi_relations"] = OrderedDict([('primary_key', []), ('foreign_key', [])]) + # self.pack_header() + + # unit_row = [] + # for tableName, tableData in toml_load_data.items(): + # row = [] + # table_unit_row = [] + # if self.target_table_prefix is not None: + # tableName = self.target_table_prefix + "__" + tableName + # for col_name, data in tableData.items(): + # unit_data = "NULL" + # if isinstance(data, dict): + # unit_data = data["units"] + # data = data["value"] + # # IF statement for manual data parsing for python 3.10 and below + # # if isinstance(data, str) and data[0] == "{" and data[-1] == "}": + # # data = ast.literal_eval(data) + # # unit_data = data["units"] + # # data = data["value"] + # self.toml_data[tableName][col_name].append(data) + # if (col_name, unit_data) not in self.toml_data["dsi_units"][tableName]: + # table_unit_row.append((col_name, unit_data)) + # self.toml_data["dsi_units"][tableName].append((col_name, unit_data)) + # row.append(data) + # self.add_to_output(row, tableName) + # unit_row.append(table_unit_row) + # if len(next(iter(self.output_collector["dsi_units"].values()))) < 1: + # self.add_to_output(unit_row, "dsi_units") From e4229f88faae6098c261e60355b53b3f0947af02 Mon Sep 17 00:00:00 2001 From: Vedant P Iyer Date: Wed, 13 Nov 2024 16:20:20 -0700 Subject: [PATCH 12/39] Only commit db insert if all data in workflow is stable/non repetitive, and updated inspect artifact function to generate Jupyter notebook --- dsi/backends/sqlite.py | 144 ++++++++++++++++++++++++++++++++--------- 1 file changed, 112 insertions(+), 32 deletions(-) diff --git a/dsi/backends/sqlite.py b/dsi/backends/sqlite.py index d254288a..b939e621 100644 --- a/dsi/backends/sqlite.py +++ b/dsi/backends/sqlite.py @@ -2,12 +2,11 @@ import sqlite3 import json import re -# import yaml -# import subprocess -# import os +import subprocess import nbconvert as nbc import nbformat as nbf from datetime import datetime +import textwrap from collections import OrderedDict from dsi.backends.filesystem import Filesystem @@ -157,7 +156,6 @@ def put_artifact_type(self, types, foreign_query = None, isVerbose=False): if isVerbose: print(str_query) self.cur.execute(str_query) - self.con.commit() self.types = types @@ -190,7 +188,8 @@ def put_artifacts(self, collection, isVerbose=False): """ # Core compatibility name assignment artifacts = collection - + insertError = False + errorString = None if self.run_flag: runTable_create = "CREATE TABLE IF NOT EXISTS runTable (run_id INTEGER PRIMARY KEY AUTOINCREMENT, run_timestamp TEXT UNIQUE);" self.cur.execute(runTable_create) @@ -198,8 +197,12 @@ def put_artifacts(self, collection, isVerbose=False): timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') runTable_insert = f"INSERT INTO runTable (run_timestamp) VALUES ('{timestamp}');" - self.cur.execute(runTable_insert) - self.con.commit() + try: + self.cur.execute(runTable_insert) + except sqlite3.Error as e: + if errorString is None: + errorString = e + insertError = True for tableName, tableData in artifacts.items(): if tableName == "dsi_relations" or tableName == "dsi_units": @@ -208,10 +211,6 @@ def put_artifacts(self, collection, isVerbose=False): types = DataType() types.properties = {} types.unit_keys = [] - - # Check if this has been defined from helper function - '''if self.types != None: - types.name = self.types.name''' types.name = tableName foreign_query = "" @@ -243,31 +242,42 @@ def put_artifacts(self, collection, isVerbose=False): str_query += "{} (run_id, {}) VALUES ({}, {});".format(str(types.name), col_names, run_id, placeholders) else: str_query += "{} ({}) VALUES ({});".format(str(types.name), col_names, placeholders) - - col_list = col_names.split(', ') - # loop through the contents of each column and insert into table as a row - for ind1 in range(len(types.properties[col_list[0]])): - vals = [] - for ind2 in range(len(types.properties.keys())): - vals.append(str(types.properties[col_list[ind2]][ind1])) - tup_vals = tuple(vals) - self.cur.execute(str_query,tup_vals) + + rows = zip(*types.properties.values()) + try: + self.cur.executemany(str_query,rows) + except sqlite3.Error as e: + if errorString is None: + errorString = e + insertError = True if isVerbose: print(str_query) + self.types = types #This will only copy the last table from artifacts (collections input) - self.con.commit() - self.types = types #This will only copy the last table from artifacts (collections input) - - for tableName, tableData in artifacts["dsi_units"].items(): + if "dsi_units" in artifacts.keys(): create_query = "CREATE TABLE IF NOT EXISTS dsi_units (table_name TEXT, column_and_unit TEXT UNIQUE)" self.cur.execute(create_query) + for tableName, tableData in artifacts["dsi_units"].items(): + if len(tableData) > 0: + for col_unit_pair in tableData: + str_query = f'INSERT OR IGNORE INTO dsi_units VALUES ("{tableName}", "{col_unit_pair}")' + try: + self.cur.execute(str_query) + except sqlite3.Error as e: + if errorString is None: + errorString = e + insertError = True + + try: + assert insertError == False self.con.commit() - if len({t[1] for t in tableData}) > 1: - for col_unit_pair in tableData: - str_query = f'INSERT OR IGNORE INTO dsi_units VALUES ("{tableName}", "{col_unit_pair}")' - self.cur.execute(str_query) - self.con.commit() + except Exception as e: + self.con.rollback() + if type(e) is AssertionError: + return f"No data was inserted into {self.filename} due to the error: {errorString}" + else: + return f"No data was inserted into {self.filename} due to the error: {e}" def put_artifacts_only(self, artifacts, isVerbose=False): """ @@ -470,13 +480,83 @@ def get_artifacts(self, query, isVerbose=False): return data def inspect_artifacts(self, collection, interactive=False): + dsi_relations = dict(collection["dsi_relations"]) + dsi_units = dict(collection["dsi_units"]) + nb = nbf.v4.new_notebook() + text = """\ - # This notebook was auto-generated by a DSI Backend for SQLite. - # Execute the Jupyter notebook cells below and interact with "collection" - # to explore your data. + This notebook was auto-generated by a DSI Backend for SQLite. + Due to the possibility of several tables stored in the DSI abstraction (OrderedDict), the data is stored as several dataframes in a list + Execute the Jupyter notebook cells below and interact with table_list to explore your data. + """ + code1 = """\ + import pandas as pd + import sqlite3 + """ + code2 = f"""\ + db_path = '{self.filename}' + conn = sqlite3.connect(db_path) + tables = pd.read_sql_query('SELECT name FROM sqlite_master WHERE type="table";', conn) + dsi_units = {dsi_units} + dsi_relations = {dsi_relations} + """ + code3 = """\ + table_list = [] + for table_name in tables['name']: + if table_name not in ['dsi_relations', 'dsi_units', 'sqlite_sequence']: + query = 'SELECT * FROM ' + table_name + df = pd.read_sql_query(query, conn) + df.attrs['name'] = table_name + if table_name in dsi_units: + df.attrs['units'] = dsi_units[table_name] + table_list.append(df) + + df = pd.DataFrame(dsi_relations) + df.attrs['name'] = 'dsi_relations' + table_list.append(df) + """ + code4 = """\ + for table_df in table_list: + print(table_df.attrs) + print(table_df) + # table_df.info() + # table_df.describe() """ + nb['cells'] = [nbf.v4.new_markdown_cell(text), + nbf.v4.new_code_cell(textwrap.dedent(code1)), + nbf.v4.new_code_cell(textwrap.dedent(code2)), + nbf.v4.new_code_cell(textwrap.dedent(code3)), + nbf.v4.new_code_cell(textwrap.dedent(code4))] + + fname = 'dsi_sqlite_backend_output.ipynb' + print('Writing Jupyter notebook...') + with open(fname, 'w') as fh: + nbf.write(nb, fh) + + # open the jupyter notebook for static page generation + with open(fname, 'r', encoding='utf-8') as fh: + nb_content = nbf.read(fh, as_version=4) + run_nb = nbc.preprocessors.ExecutePreprocessor(timeout=-1) # No timeout + run_nb.preprocess(nb_content, {'metadata':{'path':'.'}}) + + if interactive: + print('Opening Jupyter notebook...') + + proc = subprocess.run(['jupyter-lab ./dsi_sqlite_backend_output.ipynb'], capture_output=True, shell=True) + if proc.stderr != b"": + raise Exception(proc.stderr) + return proc.stdout.strip().decode("utf-8") + else: + # Init HTML exporter + html_exporter = nbc.HTMLExporter() + html_content,_ = html_exporter.from_notebook_node(nb_content) + # Save HTML file + html_filename = 'dsi_sqlite_backend_output.html' + with open(html_filename, 'w', encoding='utf-8') as fh: + fh.write(html_content) + # Closes connection to server def close(self): self.con.close() From bd3b2f87cbe210621a845d6acfea8daa54b33ac6 Mon Sep 17 00:00:00 2001 From: Vedant P Iyer Date: Thu, 14 Nov 2024 12:58:28 -0700 Subject: [PATCH 13/39] only execute sql statements if no insertion error else rollback all previous --- dsi/backends/sqlite.py | 46 ++++++++++++++++++++++++------------------ 1 file changed, 26 insertions(+), 20 deletions(-) diff --git a/dsi/backends/sqlite.py b/dsi/backends/sqlite.py index b939e621..6cfad6e9 100644 --- a/dsi/backends/sqlite.py +++ b/dsi/backends/sqlite.py @@ -197,12 +197,14 @@ def put_artifacts(self, collection, isVerbose=False): timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') runTable_insert = f"INSERT INTO runTable (run_timestamp) VALUES ('{timestamp}');" - try: - self.cur.execute(runTable_insert) - except sqlite3.Error as e: - if errorString is None: - errorString = e - insertError = True + if insertError == False: + try: + self.cur.execute(runTable_insert) + except sqlite3.Error as e: + if errorString is None: + errorString = e + insertError = True + self.con.rollback() for tableName, tableData in artifacts.items(): if tableName == "dsi_relations" or tableName == "dsi_units": @@ -244,12 +246,14 @@ def put_artifacts(self, collection, isVerbose=False): str_query += "{} ({}) VALUES ({});".format(str(types.name), col_names, placeholders) rows = zip(*types.properties.values()) - try: - self.cur.executemany(str_query,rows) - except sqlite3.Error as e: - if errorString is None: - errorString = e - insertError = True + if insertError == False: + try: + self.cur.executemany(str_query,rows) + except sqlite3.Error as e: + if errorString is None: + errorString = e + insertError = True + self.con.rollback() if isVerbose: print(str_query) @@ -262,12 +266,14 @@ def put_artifacts(self, collection, isVerbose=False): if len(tableData) > 0: for col_unit_pair in tableData: str_query = f'INSERT OR IGNORE INTO dsi_units VALUES ("{tableName}", "{col_unit_pair}")' - try: - self.cur.execute(str_query) - except sqlite3.Error as e: - if errorString is None: - errorString = e - insertError = True + if insertError == False: + try: + self.cur.execute(str_query) + except sqlite3.Error as e: + if errorString is None: + errorString = e + insertError = True + self.con.rollback() try: assert insertError == False @@ -275,9 +281,9 @@ def put_artifacts(self, collection, isVerbose=False): except Exception as e: self.con.rollback() if type(e) is AssertionError: - return f"No data was inserted into {self.filename} due to the error: {errorString}" + return str(errorString) else: - return f"No data was inserted into {self.filename} due to the error: {e}" + return str(e) def put_artifacts_only(self, artifacts, isVerbose=False): """ From 592a2185eb6a589e6f1a6fdb17f470459008edfe Mon Sep 17 00:00:00 2001 From: Vedant P Iyer Date: Thu, 14 Nov 2024 13:00:56 -0700 Subject: [PATCH 14/39] customized print error if duplicate/error data ingested to backend --- dsi/core.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/dsi/core.py b/dsi/core.py index c84a5217..b68a4051 100644 --- a/dsi/core.py +++ b/dsi/core.py @@ -233,12 +233,10 @@ def artifact_handler(self, interaction_type, query = None, **kwargs): the provided ``interaction_type``. """ if interaction_type not in self.VALID_ARTIFACT_INTERACTION_TYPES: - print( - 'Hint: Did you declare your artifact interaction type in the Terminal Global vars?') + print('Hint: Did you declare your artifact interaction type in the Terminal Global vars?') raise NotImplementedError operation_success = False - # Perform artifact movement first, because inspect implementation may rely on - # self.active_metadata or some stored artifact. + # Perform artifact movement first, because inspect implementation may rely on self.active_metadata for obj in self.active_modules['back-write']: self.logger.info(f"-------------------------------------") self.logger.info(obj.__class__.__name__ + f" backend - {interaction_type} the data") @@ -247,22 +245,23 @@ def artifact_handler(self, interaction_type, query = None, **kwargs): if self.backup_db_flag == True and os.path.getsize(obj.filename) > 100: backup_file = obj.filename[:obj.filename.rfind('.')] + "_backup" + obj.filename[obj.filename.rfind('.'):] shutil.copyfile(obj.filename, backup_file) - db_size = os.path.getsize(obj.filename) errorMessage = obj.put_artifacts( collection=self.active_metadata, **kwargs) - if db_size == os.path.getsize(obj.filename): - print(errorMessage) + if errorMessage is not None: + print(f"No data was inserted into {obj.filename} due to the error: {errorMessage}") operation_success = True elif interaction_type == 'get': self.logger.info(f"Query to get data: {query}") if query != None: self.active_metadata = obj.get_artifacts(query, **kwargs) else: - raise ValueError("Need to specify a query of the database to return data") + raise ValueError("Need to specify a query for the database to return data") operation_success = True elif interaction_type == 'inspect': - obj.put_artifacts( + errorMessage = obj.put_artifacts( collection=self.active_metadata, **kwargs) + if errorMessage is not None: + print("Error in ingesting data to db in inspect artifact. Generating Jupyter notebook with previous instance of db") obj.inspect_artifacts( collection=self.active_metadata, **kwargs) operation_success = True From 4c3c8d64a20a751050dac9cb617ff726f96329a0 Mon Sep 17 00:00:00 2001 From: Vedant P Iyer Date: Thu, 14 Nov 2024 13:02:45 -0700 Subject: [PATCH 15/39] updated er diagram writer to generate dot as python object and no external file generated --- dsi/plugins/file_writer.py | 61 +++++++++++++++++--------------------- 1 file changed, 27 insertions(+), 34 deletions(-) diff --git a/dsi/plugins/file_writer.py b/dsi/plugins/file_writer.py index cf800c76..1bfb15ea 100644 --- a/dsi/plugins/file_writer.py +++ b/dsi/plugins/file_writer.py @@ -1,12 +1,13 @@ -from collections import OrderedDict -from os.path import abspath -from hashlib import sha1 +# from collections import OrderedDict +# from os.path import abspath +# from hashlib import sha1 import json, csv from math import isnan -import sqlite3 +# import sqlite3 import subprocess import os from matplotlib import pyplot as plt +from graphviz import Digraph from dsi.plugins.metadata import StructuredMetadata @@ -53,37 +54,32 @@ def get_rows(self, collection) -> None: # return # else: - file_type = ".png" - if self.output_filename[-4:] == ".png" or self.output_filename[-4:] == ".pdf" or self.output_filename[-4:] == ".jpg": - file_type = self.output_filename[-4:] + file_type = "png" + if self.output_filename[-3:] in ["png", "pdf", "jpg"]: + file_type = self.output_filename[-3:] self.output_filename = self.output_filename[:-4] - elif self.output_filename[-5:] == ".jpeg": - file_type = self.output_filename[-5:] + elif self.output_filename[-4:] == ".jpeg": + file_type = self.output_filename[-4:] self.output_filename = self.output_filename[:-5] if self.target_table_prefix is not None and not any(self.target_table_prefix in element for element in collection.keys()): raise ValueError("Your input for target_table_prefix does not exist in the database. Please enter a valid prefix for table names.") - - dot_file = open(self.output_filename + ".dot", "w") - - num_tbl_cols = 1 - dot_file.write("digraph workflow_schema { ") + + dot = Digraph('workflow_schema', format = file_type) if self.target_table_prefix is not None: - dot_file.write(f'label="ER Diagram for {self.target_table_prefix} tables"; ') - dot_file.write('labelloc="t"; ') - dot_file.write("node [shape=plaintext]; ") - dot_file.write("rankdir=LR ") - dot_file.write("splines=true ") - dot_file.write("overlap=false ") + dot.attr(label = f'ER Diagram for {self.target_table_prefix} tables', labelloc='t') + dot.attr('node', shape='plaintext') + dot.attr(dpi='300', rankdir='LR', splines='true', overlap='false') + num_tbl_cols = 1 for tableName, tableData in collection.items(): if tableName == "dsi_relations" or tableName == "sqlite_sequence": continue elif self.target_table_prefix is not None and self.target_table_prefix not in tableName: continue - dot_file.write(f"{tableName} [label=<
{tableName}
") - + html_table = f"<
{tableName}
" + col_list = tableData.keys() if tableName == "dsi_units": col_list = ["table_name", "column_and_unit"] @@ -92,30 +88,27 @@ def get_rows(self, collection) -> None: for col_name in col_list: if curr_row % num_tbl_cols == 0: inner_brace = 1 - dot_file.write("") - - dot_file.write(f"") + html_table += "" + html_table += f"" curr_row += 1 if curr_row % num_tbl_cols == 0: inner_brace = 0 - dot_file.write("") - + html_table += "" + if inner_brace: - dot_file.write("") - dot_file.write("
{tableName}
{col_name}
{col_name}
>]; ") + html_table += "" + html_table += ">" + dot.node(tableName, label = html_table) for f_table, f_col in collection["dsi_relations"]["foreign_key"]: if self.target_table_prefix is not None and self.target_table_prefix not in f_table: continue if f_table != "NULL": foreignIndex = collection["dsi_relations"]["foreign_key"].index((f_table, f_col)) - dot_file.write(f"{f_table}:{f_col} -> {collection['dsi_relations']['primary_key'][foreignIndex][0]}: {collection['dsi_relations']['primary_key'][foreignIndex][1]}; ") + dot.edge(f"{f_table}:{f_col}", f"{collection['dsi_relations']['primary_key'][foreignIndex][0]}:{collection['dsi_relations']['primary_key'][foreignIndex][1]}") - dot_file.write("}") - dot_file.close() + dot.render(self.output_filename, cleanup=True) - subprocess.run(["dot", "-T", file_type[1:], "-o", self.output_filename + file_type, self.output_filename + ".dot"]) - os.remove(self.output_filename + ".dot") # def export_erd(self, dbname, fname): # """ From b41422d50835e2dbd72e7772cf005f572d090004 Mon Sep 17 00:00:00 2001 From: Vedant P Iyer Date: Thu, 14 Nov 2024 13:29:03 -0700 Subject: [PATCH 16/39] created generic text file reader -- assumes only one table in data source --- dsi/plugins/file_reader.py | 31 +++++++++++++++++++++++++++++++ dsi/plugins/file_writer.py | 6 +++--- 2 files changed, 34 insertions(+), 3 deletions(-) diff --git a/dsi/plugins/file_reader.py b/dsi/plugins/file_reader.py index 4eb6fcfa..e42f94f3 100644 --- a/dsi/plugins/file_reader.py +++ b/dsi/plugins/file_reader.py @@ -463,3 +463,34 @@ def add_rows(self) -> None: # unit_row.append(table_unit_row) # if len(next(iter(self.output_collector["dsi_units"].values()))) < 1: # self.add_to_output(unit_row, "dsi_units") + +class TextFile(FileReader): + ''' + Plugin to read in an individual or a set of text files + + Table names are the keys for the main ordered dictionary and column names are the keys for each table's nested ordered dictionary + ''' + def __init__(self, filenames, target_table_prefix = None, **kwargs): + ''' + `filenames`: one text file or a list of text files to be ingested + `target_table_prefix`: prefix to be added to every table created to differentiate between other text file sources + ''' + super().__init__(filenames, **kwargs) + if isinstance(filenames, str): + self.text_files = [filenames] + else: + self.text_files = filenames + self.text_file_data = OrderedDict() + self.target_table_prefix = target_table_prefix + + def add_rows(self) -> None: + """ + Parses text file data and creates an ordered dict whose keys are table names and values are an ordered dict for each table. + """ + for filename in self.text_files: + df = read_csv(filename) + if self.target_table_prefix is not None: + self.text_file_data[f"{self.target_table_prefix}__text_file"] = OrderedDict(df.to_dict(orient='list')) + else: + self.text_file_data["text_file"] = OrderedDict(df.to_dict(orient='list')) + self.set_schema_2(self.text_file_data) \ No newline at end of file diff --git a/dsi/plugins/file_writer.py b/dsi/plugins/file_writer.py index 1bfb15ea..b3496936 100644 --- a/dsi/plugins/file_writer.py +++ b/dsi/plugins/file_writer.py @@ -2,10 +2,10 @@ # from os.path import abspath # from hashlib import sha1 import json, csv -from math import isnan +# from math import isnan # import sqlite3 -import subprocess -import os +# import subprocess +# import os from matplotlib import pyplot as plt from graphviz import Digraph From e1afd9b1a2da20c6ddef4e88ffc74b8729db0091 Mon Sep 17 00:00:00 2001 From: Vedant P Iyer Date: Fri, 15 Nov 2024 10:15:50 -0700 Subject: [PATCH 17/39] merged changes with main branch --- dsi/backends/sqlite.py | 50 ++++++++++++++++++++++++++++++++++++++++ dsi/core.py | 6 +++-- examples/coreterminal.py | 6 ++--- 3 files changed, 57 insertions(+), 5 deletions(-) diff --git a/dsi/backends/sqlite.py b/dsi/backends/sqlite.py index 6cfad6e9..06cac076 100644 --- a/dsi/backends/sqlite.py +++ b/dsi/backends/sqlite.py @@ -674,6 +674,56 @@ def export_csv(self, rquery, tname, fname, isVerbose=False): return 1 + # Sqlite reader class + def read_to_artifact(self): + artifact = OrderedDict() + artifact["dsi_relations"] = OrderedDict([("primary_key",[]), ("foreign_key", [])]) + + tableList = self.cur.execute("SELECT name FROM sqlite_master WHERE type ='table';").fetchall() + pkList = [] + for item in tableList: + tableName = item[0] + if tableName == "dsi_units": + artifact["dsi_units"] = self.read_units_helper() + continue + + tableInfo = self.cur.execute(f"PRAGMA table_info({tableName});").fetchall() + colDict = OrderedDict() + for colInfo in tableInfo: + colDict[colInfo[1]] = [] + if colInfo[5] == 1: + pkList.append((tableName, colInfo[1])) + + data = self.cur.execute(f"SELECT * FROM {tableName};").fetchall() + for row in data: + for colName, val in zip(colDict.keys(), row): + colDict[colName].append(val) + artifact[tableName] = colDict + + fkData = self.cur.execute(f"PRAGMA foreign_key_list({tableName});").fetchall() + for row in fkData: + artifact["dsi_relations"]["primary_key"].append((row[2], row[4])) + artifact["dsi_relations"]["foreign_key"].append((tableName, row[3])) + if (row[2], row[4]) in pkList: + pkList.remove((row[2], row[4])) + + for pk_tuple in pkList: + if pk_tuple not in artifact["dsi_relations"]["primary_key"]: + artifact["dsi_relations"]["primary_key"].append(pk_tuple) + artifact["dsi_relations"]["foreign_key"].append(("NULL", "NULL")) + return artifact + + def read_units_helper(self): + unitsDict = OrderedDict() + unitsTable = self.cur.execute("SELECT * FROM dsi_units;").fetchall() + for row in unitsTable: + tableName = row[0] + if tableName not in unitsDict.keys(): + unitsDict[tableName] = [] + unitsDict[tableName].append(eval(row[1])) + return unitsDict + + '''UNUSED QUERY FUNCTIONS''' # Query file name diff --git a/dsi/core.py b/dsi/core.py index b68a4051..130786b3 100644 --- a/dsi/core.py +++ b/dsi/core.py @@ -25,7 +25,7 @@ class Terminal(): PLUGIN_PREFIX = ['dsi.plugins'] PLUGIN_IMPLEMENTATIONS = ['env', 'file_reader', 'file_writer'] VALID_PLUGINS = ['Hostname', 'SystemKernel', 'GitInfo', 'Bueno', 'Csv', 'ER_Diagram', 'YAML', 'TOML', "Table_Plot", "Schema"] - VALID_BACKENDS = ['Gufi', 'Sqlite', 'Parquet', 'SqliteReader'] + VALID_BACKENDS = ['Gufi', 'Sqlite', 'Parquet'] VALID_MODULES = VALID_PLUGINS + VALID_BACKENDS VALID_MODULE_FUNCTIONS = {'plugin': ['writer', 'reader'], 'backend': ['back-read', 'back-write']} @@ -255,7 +255,9 @@ def artifact_handler(self, interaction_type, query = None, **kwargs): if query != None: self.active_metadata = obj.get_artifacts(query, **kwargs) else: - raise ValueError("Need to specify a query for the database to return data") + #raise ValueError("Need to specify a query of the database to return data") + # This is a valid use-case, may give a warning for now + self.active_metadata = obj.get_artifacts(**kwargs) operation_success = True elif interaction_type == 'inspect': errorMessage = obj.put_artifacts( diff --git a/examples/coreterminal.py b/examples/coreterminal.py index 0859787f..67477f84 100644 --- a/examples/coreterminal.py +++ b/examples/coreterminal.py @@ -9,7 +9,7 @@ # a.load_module('plugin','Hostname','reader') a.load_module('plugin', 'Schema', 'reader', filename="data/example_schema.json" , target_table_prefix = "student") -a.load_module('plugin', 'YAML', 'reader', filenames=["data/student_test1.yml", "data/student_test2.yml"], target_table_prefix = "student") +a.load_module('plugin', 'YAML', 'reader', filenames=["data/student_test1.yml", "data/student_test2.yml"]) a.load_module('plugin', 'TOML', 'reader', filenames=["data/results.toml"], target_table_prefix = "results") # a.load_module('plugin', "Table_Plot", "writer", table_name = "schema__physics", filename = "schema__physics") @@ -31,7 +31,7 @@ # # ['GitInfo', 'Hostname', 'SystemKernel', 'Bueno', 'Csv'] # a.list_available_modules('backend') -# # ['Gufi', 'Sqlite', 'Parquet', 'SqliteReader] +# # ['Gufi', 'Sqlite', 'Parquet'] # print(a.list_loaded_modules()) # # {'writer': [], @@ -50,7 +50,7 @@ #Example use 2 -# a.load_module('backend','SqliteReader','back-read', filename='data/data.db') +# a.load_module('backend','Sqlite','back-read', filename='data/data.db') # a.artifact_handler(interaction_type="read") # a.load_module('plugin', 'ER_Diagram', 'writer', filename = 'er_diagram.pdf')#, target_table_prefix = "physics") # a.load_module('plugin', "Table_Plot", "writer", table_name = "student__physics", filename = "student__physics") From 0b49b3e0e1d00889529f4507c07f8e6f12409f81 Mon Sep 17 00:00:00 2001 From: Vedant P Iyer Date: Fri, 15 Nov 2024 10:43:57 -0700 Subject: [PATCH 18/39] combined and tested sqlite read/write class --- dsi/backends/sqlite.py | 63 ++-------------------------------------- examples/coreterminal.py | 2 +- 2 files changed, 3 insertions(+), 62 deletions(-) diff --git a/dsi/backends/sqlite.py b/dsi/backends/sqlite.py index 06cac076..560ff51f 100644 --- a/dsi/backends/sqlite.py +++ b/dsi/backends/sqlite.py @@ -38,66 +38,6 @@ class Artifact: name = "" properties = {} -class SqliteReader(Filesystem): - - def __init__(self, filename, append = False): - self.filename = filename - self.con = sqlite3.connect(filename) - self.cur = self.con.cursor() - - def read_to_artifact(self): - artifact = OrderedDict() - artifact["dsi_relations"] = OrderedDict([("primary_key",[]), ("foreign_key", [])]) - - tableList = self.cur.execute("SELECT name FROM sqlite_master WHERE type ='table';").fetchall() - pkList = [] - for item in tableList: - tableName = item[0] - if tableName == "dsi_units": - artifact["dsi_units"] = self.read_units_helper() - continue - - tableInfo = self.cur.execute(f"PRAGMA table_info({tableName});").fetchall() - colDict = OrderedDict() - for colInfo in tableInfo: - colDict[colInfo[1]] = [] - if colInfo[5] == 1: - pkList.append((tableName, colInfo[1])) - - data = self.cur.execute(f"SELECT * FROM {tableName};").fetchall() - for row in data: - for colName, val in zip(colDict.keys(), row): - colDict[colName].append(val) - artifact[tableName] = colDict - - fkData = self.cur.execute(f"PRAGMA foreign_key_list({tableName});").fetchall() - for row in fkData: - artifact["dsi_relations"]["primary_key"].append((row[2], row[4])) - artifact["dsi_relations"]["foreign_key"].append((tableName, row[3])) - if (row[2], row[4]) in pkList: - pkList.remove((row[2], row[4])) - - for pk_tuple in pkList: - if pk_tuple not in artifact["dsi_relations"]["primary_key"]: - artifact["dsi_relations"]["primary_key"].append(pk_tuple) - artifact["dsi_relations"]["foreign_key"].append(("NULL", "NULL")) - return artifact - - def read_units_helper(self): - unitsDict = OrderedDict() - unitsTable = self.cur.execute("SELECT * FROM dsi_units;").fetchall() - for row in unitsTable: - tableName = row[0] - if tableName not in unitsDict.keys(): - unitsDict[tableName] = [] - unitsDict[tableName].append(eval(row[1])) - return unitsDict - - - # Closes connection to server - def close(self): - self.con.close() - # Main storage class, interfaces with SQL class Sqlite(Filesystem): """ @@ -257,7 +197,8 @@ def put_artifacts(self, collection, isVerbose=False): if isVerbose: print(str_query) - self.types = types #This will only copy the last table from artifacts (collections input) + self.types = types + #This will only copy the last table from artifacts (collections input) if "dsi_units" in artifacts.keys(): create_query = "CREATE TABLE IF NOT EXISTS dsi_units (table_name TEXT, column_and_unit TEXT UNIQUE)" diff --git a/examples/coreterminal.py b/examples/coreterminal.py index 67477f84..d8e2cb5c 100644 --- a/examples/coreterminal.py +++ b/examples/coreterminal.py @@ -52,6 +52,6 @@ #Example use 2 # a.load_module('backend','Sqlite','back-read', filename='data/data.db') # a.artifact_handler(interaction_type="read") -# a.load_module('plugin', 'ER_Diagram', 'writer', filename = 'er_diagram.pdf')#, target_table_prefix = "physics") +# a.load_module('plugin', 'ER_Diagram', 'writer', filename = 'er_diagram.png')#, target_table_prefix = "physics") # a.load_module('plugin', "Table_Plot", "writer", table_name = "student__physics", filename = "student__physics") # a.transload() \ No newline at end of file From 6d458ef756b408150db5c7e32d72d78bf0b69f14 Mon Sep 17 00:00:00 2001 From: Vedant P Iyer Date: Fri, 15 Nov 2024 12:25:26 -0700 Subject: [PATCH 19/39] Updated other files to handle unified units table and separate back-write/read in core calls --- dsi/backends/sqlite.py | 5 ++- dsi/core.py | 87 +++++++++++++++++++++--------------------- 2 files changed, 47 insertions(+), 45 deletions(-) diff --git a/dsi/backends/sqlite.py b/dsi/backends/sqlite.py index 560ff51f..b8b364c5 100644 --- a/dsi/backends/sqlite.py +++ b/dsi/backends/sqlite.py @@ -215,6 +215,8 @@ def put_artifacts(self, collection, isVerbose=False): errorString = e insertError = True self.con.rollback() + else: + self.con.rollback() try: assert insertError == False @@ -442,8 +444,7 @@ def inspect_artifacts(self, collection, interactive=False): import sqlite3 """ code2 = f"""\ - db_path = '{self.filename}' - conn = sqlite3.connect(db_path) + conn = {self.con} tables = pd.read_sql_query('SELECT name FROM sqlite_master WHERE type="table";', conn) dsi_units = {dsi_units} dsi_relations = {dsi_relations} diff --git a/dsi/core.py b/dsi/core.py index 130786b3..1cb28b45 100644 --- a/dsi/core.py +++ b/dsi/core.py @@ -214,6 +214,11 @@ def transload(self, **kwargs): self.active_metadata[table_name][colName] = colData elif colName not in self.active_metadata[table_name].keys() and table_name != "dsi_units": raise ValueError(f"Mismatched column input for table {table_name}") + # NO OVERWRITE OF UNIT DATA + # elif colName not in self.active_metadata[table_name].keys() and table_name == "dsi_units": + # self.active_metadata[table_name][colName] = colData + # elif colName not in self.active_metadata[table_name].keys() and table_name != "dsi_units": + # raise ValueError(f"Mismatched column input for table {table_name}") end = datetime.now() self.logger.info(f"Runtime: {end-start}") elif module_type == "writer": @@ -236,50 +241,46 @@ def artifact_handler(self, interaction_type, query = None, **kwargs): print('Hint: Did you declare your artifact interaction type in the Terminal Global vars?') raise NotImplementedError operation_success = False - # Perform artifact movement first, because inspect implementation may rely on self.active_metadata - for obj in self.active_modules['back-write']: - self.logger.info(f"-------------------------------------") - self.logger.info(obj.__class__.__name__ + f" backend - {interaction_type} the data") - start = datetime.now() - if interaction_type == 'put' or interaction_type == 'set': - if self.backup_db_flag == True and os.path.getsize(obj.filename) > 100: - backup_file = obj.filename[:obj.filename.rfind('.')] + "_backup" + obj.filename[obj.filename.rfind('.'):] - shutil.copyfile(obj.filename, backup_file) - errorMessage = obj.put_artifacts( - collection=self.active_metadata, **kwargs) - if errorMessage is not None: - print(f"No data was inserted into {obj.filename} due to the error: {errorMessage}") - operation_success = True - elif interaction_type == 'get': - self.logger.info(f"Query to get data: {query}") - if query != None: - self.active_metadata = obj.get_artifacts(query, **kwargs) - else: - #raise ValueError("Need to specify a query of the database to return data") - # This is a valid use-case, may give a warning for now - self.active_metadata = obj.get_artifacts(**kwargs) - operation_success = True - elif interaction_type == 'inspect': - errorMessage = obj.put_artifacts( - collection=self.active_metadata, **kwargs) - if errorMessage is not None: - print("Error in ingesting data to db in inspect artifact. Generating Jupyter notebook with previous instance of db") - obj.inspect_artifacts( + # Perform artifact movement first, because inspect implementation may rely on + # self.active_metadata or some stored artifact. + selected_active_backends = dict((k, self.active_modules[k]) for k in (['back-write', 'back-read'])) + for module_type, objs in selected_active_backends.items(): + for obj in objs: + self.logger.info(f"-------------------------------------") + self.logger.info(obj.__class__.__name__ + f" backend - {interaction_type} the data") + start = datetime.now() + if interaction_type == 'put' or interaction_type == 'set': + if self.backup_db_flag == True and os.path.getsize(obj.filename) > 100: + backup_file = obj.filename[:obj.filename.rfind('.')] + "_backup" + obj.filename[obj.filename.rfind('.'):] + shutil.copyfile(obj.filename, backup_file) + errorMessage = obj.put_artifacts( collection=self.active_metadata, **kwargs) - operation_success = True - end = datetime.now() - self.logger.info(f"Runtime: {end-start}") - - for obj in self.active_modules['back-read']: - self.logger.info(f"-------------------------------------") - self.logger.info(obj.__class__.__name__ + f" backend - {interaction_type} the data") - start = datetime.now() - if interaction_type == "read": - self.active_metadata = obj.read_to_artifact() - operation_success = True - end = datetime.now() - self.logger.info(f"Runtime: {end-start}") - + if errorMessage is not None: + print(f"No data was inserted into {obj.filename} due to the error: {errorMessage}") + operation_success = True + elif interaction_type == 'get': + self.logger.info(f"Query to get data: {query}") + if query != None: + self.active_metadata = obj.get_artifacts(query, **kwargs) + else: + #raise ValueError("Need to specify a query of the database to return data") + # This is a valid use-case, may give a warning for now + self.active_metadata = obj.get_artifacts(**kwargs) + operation_success = True + elif interaction_type == 'inspect': + if module_type == 'back-write': + errorMessage = obj.put_artifacts( + collection=self.active_metadata, **kwargs) + if errorMessage is not None: + print("Error in ingesting data to db in inspect artifact. Generating Jupyter notebook with previous instance of db") + obj.inspect_artifacts( + collection=self.active_metadata, **kwargs) + operation_success = True + elif interaction_type == "read": + self.active_metadata = obj.read_to_artifact() + operation_success = True + end = datetime.now() + self.logger.info(f"Runtime: {end-start}") if operation_success: if interaction_type == 'get' and self.active_metadata: return self.active_metadata From dec406c8ea2ebf4d4f268ef59bf52f63219b22e1 Mon Sep 17 00:00:00 2001 From: Vedant P Iyer Date: Fri, 15 Nov 2024 12:26:11 -0700 Subject: [PATCH 20/39] updated primary key handling in schema reader --- dsi/plugins/file_reader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dsi/plugins/file_reader.py b/dsi/plugins/file_reader.py index e42f94f3..e5a065ca 100644 --- a/dsi/plugins/file_reader.py +++ b/dsi/plugins/file_reader.py @@ -215,7 +215,7 @@ def add_rows(self) -> None: self.schema_data["dsi_relations"]["primary_key"].append((colData[0], colData[1])) self.schema_data["dsi_relations"]["foreign_key"].append((tableName, colName)) - if tableData["primary_key"] != "NULL": + if "primary_key" in tableData.keys(): pkList.append((tableName, tableData["primary_key"])) for pk in pkList: From 750931b57b3d054091363de3beb9102585dde06d Mon Sep 17 00:00:00 2001 From: Vedant P Iyer Date: Thu, 7 Nov 2024 01:29:16 -0700 Subject: [PATCH 21/39] Created run table to store metadata for every workflow run and handled unified unit table in sqlite reader/writer classes --- dsi/backends/sqlite.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/dsi/backends/sqlite.py b/dsi/backends/sqlite.py index b8b364c5..3cff31a3 100644 --- a/dsi/backends/sqlite.py +++ b/dsi/backends/sqlite.py @@ -228,6 +228,16 @@ def put_artifacts(self, collection, isVerbose=False): else: return str(e) + for tableName, tableData in artifacts["dsi_units"].items(): + create_query = "CREATE TABLE IF NOT EXISTS dsi_units (table_name TEXT, column_and_unit TEXT UNIQUE)" + self.cur.execute(create_query) + self.con.commit() + if len({t[1] for t in tableData}) > 1: + for col_unit_pair in tableData: + str_query = f'INSERT OR IGNORE INTO dsi_units VALUES ("{tableName}", "{col_unit_pair}")' + self.cur.execute(str_query) + self.con.commit() + def put_artifacts_only(self, artifacts, isVerbose=False): """ Function for insertion of Artifact metadata into a defined schema as a Tuple From 041147009547c82a8f6fe7378dc783b39c339957 Mon Sep 17 00:00:00 2001 From: Vedant P Iyer Date: Wed, 13 Nov 2024 16:11:43 -0700 Subject: [PATCH 22/39] updated backend actions and split back-write and read actions --- dsi/core.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/dsi/core.py b/dsi/core.py index 1cb28b45..4ea11508 100644 --- a/dsi/core.py +++ b/dsi/core.py @@ -214,11 +214,6 @@ def transload(self, **kwargs): self.active_metadata[table_name][colName] = colData elif colName not in self.active_metadata[table_name].keys() and table_name != "dsi_units": raise ValueError(f"Mismatched column input for table {table_name}") - # NO OVERWRITE OF UNIT DATA - # elif colName not in self.active_metadata[table_name].keys() and table_name == "dsi_units": - # self.active_metadata[table_name][colName] = colData - # elif colName not in self.active_metadata[table_name].keys() and table_name != "dsi_units": - # raise ValueError(f"Mismatched column input for table {table_name}") end = datetime.now() self.logger.info(f"Runtime: {end-start}") elif module_type == "writer": From 3e973401ea9aaaf6d0be4d4869a76e0a593fbcd8 Mon Sep 17 00:00:00 2001 From: Vedant P Iyer Date: Wed, 13 Nov 2024 16:20:20 -0700 Subject: [PATCH 23/39] Only commit db insert if all data in workflow is stable/non repetitive, and updated inspect artifact function to generate Jupyter notebook --- dsi/backends/sqlite.py | 35 ++++++++++------------------------- 1 file changed, 10 insertions(+), 25 deletions(-) diff --git a/dsi/backends/sqlite.py b/dsi/backends/sqlite.py index 3cff31a3..4ef70d2d 100644 --- a/dsi/backends/sqlite.py +++ b/dsi/backends/sqlite.py @@ -196,9 +196,8 @@ def put_artifacts(self, collection, isVerbose=False): self.con.rollback() if isVerbose: - print(str_query) - self.types = types - #This will only copy the last table from artifacts (collections input) + print(str_query) + self.types = types #This will only copy the last table from artifacts (collections input) if "dsi_units" in artifacts.keys(): create_query = "CREATE TABLE IF NOT EXISTS dsi_units (table_name TEXT, column_and_unit TEXT UNIQUE)" @@ -207,16 +206,12 @@ def put_artifacts(self, collection, isVerbose=False): if len(tableData) > 0: for col_unit_pair in tableData: str_query = f'INSERT OR IGNORE INTO dsi_units VALUES ("{tableName}", "{col_unit_pair}")' - if insertError == False: - try: - self.cur.execute(str_query) - except sqlite3.Error as e: - if errorString is None: - errorString = e - insertError = True - self.con.rollback() - else: - self.con.rollback() + try: + self.cur.execute(str_query) + except sqlite3.Error as e: + if errorString is None: + errorString = e + insertError = True try: assert insertError == False @@ -224,19 +219,9 @@ def put_artifacts(self, collection, isVerbose=False): except Exception as e: self.con.rollback() if type(e) is AssertionError: - return str(errorString) + return f"No data was inserted into {self.filename} due to the error: {errorString}" else: - return str(e) - - for tableName, tableData in artifacts["dsi_units"].items(): - create_query = "CREATE TABLE IF NOT EXISTS dsi_units (table_name TEXT, column_and_unit TEXT UNIQUE)" - self.cur.execute(create_query) - self.con.commit() - if len({t[1] for t in tableData}) > 1: - for col_unit_pair in tableData: - str_query = f'INSERT OR IGNORE INTO dsi_units VALUES ("{tableName}", "{col_unit_pair}")' - self.cur.execute(str_query) - self.con.commit() + return f"No data was inserted into {self.filename} due to the error: {e}" def put_artifacts_only(self, artifacts, isVerbose=False): """ From 35369907a19e48615755d14558b570fe30402ba3 Mon Sep 17 00:00:00 2001 From: Vedant P Iyer Date: Fri, 15 Nov 2024 12:42:35 -0700 Subject: [PATCH 24/39] changed coreterminal to merge to main --- examples/coreterminal.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/examples/coreterminal.py b/examples/coreterminal.py index d8e2cb5c..25bd240e 100644 --- a/examples/coreterminal.py +++ b/examples/coreterminal.py @@ -8,9 +8,9 @@ # a.load_module('plugin','Bueno','reader', filenames='data/bueno1.data') # a.load_module('plugin','Hostname','reader') -a.load_module('plugin', 'Schema', 'reader', filename="data/example_schema.json" , target_table_prefix = "student") -a.load_module('plugin', 'YAML', 'reader', filenames=["data/student_test1.yml", "data/student_test2.yml"]) -a.load_module('plugin', 'TOML', 'reader', filenames=["data/results.toml"], target_table_prefix = "results") +# a.load_module('plugin', 'Schema', 'reader', filename="data/example_schema.json" , target_table_prefix = "student") +# a.load_module('plugin', 'YAML', 'reader', filenames=["data/student_test1.yml", "data/student_test2.yml"]) +# a.load_module('plugin', 'TOML', 'reader', filenames=["data/results.toml"], target_table_prefix = "results") # a.load_module('plugin', "Table_Plot", "writer", table_name = "schema__physics", filename = "schema__physics") # a.load_module('plugin', 'ER_Diagram', 'writer', filename = 'er_diagram.pdf')#, target_table_prefix = "physics") @@ -53,5 +53,4 @@ # a.load_module('backend','Sqlite','back-read', filename='data/data.db') # a.artifact_handler(interaction_type="read") # a.load_module('plugin', 'ER_Diagram', 'writer', filename = 'er_diagram.png')#, target_table_prefix = "physics") -# a.load_module('plugin', "Table_Plot", "writer", table_name = "student__physics", filename = "student__physics") # a.transload() \ No newline at end of file From 0a149d67868f56e02921425fd0f752094579a6ee Mon Sep 17 00:00:00 2001 From: Vedant P Iyer Date: Fri, 15 Nov 2024 12:52:23 -0700 Subject: [PATCH 25/39] updated sqlite to merge to main --- dsi/backends/sqlite.py | 189 +++++------------------------------------ 1 file changed, 20 insertions(+), 169 deletions(-) diff --git a/dsi/backends/sqlite.py b/dsi/backends/sqlite.py index 4ef70d2d..d1318c9b 100644 --- a/dsi/backends/sqlite.py +++ b/dsi/backends/sqlite.py @@ -2,6 +2,9 @@ import sqlite3 import json import re +# import yaml +# import subprocess +# import os import subprocess import nbconvert as nbc import nbformat as nbf @@ -123,11 +126,11 @@ def put_artifacts(self, collection, isVerbose=False): `collection`: A Python Collection of an Artifact derived class that has multiple regular structures of a defined schema, filled with rows to insert. - `return`: none """ # Core compatibility name assignment artifacts = collection + insertError = False errorString = None if self.run_flag: @@ -145,6 +148,8 @@ def put_artifacts(self, collection, isVerbose=False): errorString = e insertError = True self.con.rollback() + else: + self.con.rollback() for tableName, tableData in artifacts.items(): if tableName == "dsi_relations" or tableName == "dsi_units": @@ -194,6 +199,8 @@ def put_artifacts(self, collection, isVerbose=False): errorString = e insertError = True self.con.rollback() + else: + self.con.rollback() if isVerbose: print(str_query) @@ -206,12 +213,16 @@ def put_artifacts(self, collection, isVerbose=False): if len(tableData) > 0: for col_unit_pair in tableData: str_query = f'INSERT OR IGNORE INTO dsi_units VALUES ("{tableName}", "{col_unit_pair}")' - try: - self.cur.execute(str_query) - except sqlite3.Error as e: - if errorString is None: - errorString = e - insertError = True + if insertError == False: + try: + self.cur.execute(str_query) + except sqlite3.Error as e: + if errorString is None: + errorString = e + insertError = True + self.con.rollback() + else: + self.con.rollback() try: assert insertError == False @@ -424,6 +435,7 @@ def get_artifacts(self, query, isVerbose=False): return data def inspect_artifacts(self, collection, interactive=False): + #nb = nbf.v4.new_notebook() dsi_relations = dict(collection["dsi_relations"]) dsi_units = dict(collection["dsi_units"]) @@ -738,165 +750,4 @@ def read_units_helper(self): # if isVerbose: # print(resout) - # return resout - - '''YAML AND TOML READERS''' - - # def yamlDataToList(self, filenames): - # """ - # Function that reads a YAML file or files into a list - # """ - - # yamlData = [] - # for filename in filenames: - # with open(filename, 'r') as yaml_file: - # editedString = yaml_file.read() - # editedString = re.sub('specification', r'columns:\n specification', editedString) - # editedString = re.sub(r'(!.+)\n', r"'\1'\n", editedString) - # yml_data = yaml.safe_load_all(editedString) - - # for table in yml_data: - # yamlData.append(table) - - # return yamlData - - # def yamlToSqlite(self, filenames, db_name, deleteSql=True): - # """ - # Function that ingests a YAML file into a sqlite database based on the given database name - - # `filenames`: name of YAML file or a list of YAML files to be ingested - - # `db_name`: name of database that YAML file should be added to. Database will be created if it does not exist in local directory. - - # `deleteSql`: flag to delete temp SQL file that creates the database. Default is True, but change to False for testing or comparing outputs - # """ - - # sql_statements = [] - # if isinstance(filenames, str): - # filenames = [filenames] - - # with open(db_name+".sql", "w") as sql_file: - # yml_list = self.yamlDataToList(filenames) - # for table in yml_list: - # tableName = table["segment"] - - # data_types = {float: "FLOAT", str: "VARCHAR", int: "INT"} - # if not os.path.isfile(db_name+".db") or os.path.getsize(db_name+".db") == 0: - # createStmt = f"CREATE TABLE IF NOT EXISTS {tableName} ( " - # createUnitStmt = f"CREATE TABLE IF NOT EXISTS {tableName}_units ( " - # insertUnitStmt = f"INSERT INTO {tableName}_units VALUES( " - - # for key, val in table['columns'].items(): - # createUnitStmt+= f"{key} VARCHAR, " - # if data_types[type(val)] == "VARCHAR" and self.check_type(val[:val.find(" ")]) in [" INT", " FLOAT"]: - # createStmt += f"{key}{self.check_type(val[:val.find(' ')])}, " - # insertUnitStmt+= f"'{val[val.find(' ')+1:]}', " - # else: - # createStmt += f"{key} {data_types[type(val)]}, " - # insertUnitStmt+= "NULL, " - - # if createStmt not in sql_statements: - # sql_statements.append(createStmt) - # sql_file.write(createStmt[:-2] + ");\n\n") - # if createUnitStmt not in sql_statements: - # sql_statements.append(createUnitStmt) - # sql_file.write(createUnitStmt[:-2] + ");\n\n") - # if insertUnitStmt not in sql_statements: - # sql_statements.append(insertUnitStmt) - # sql_file.write(insertUnitStmt[:-2] + ");\n\n") - - # insertStmt = f"INSERT INTO {tableName} VALUES( " - # for val in table['columns'].values(): - # if data_types[type(val)] == "VARCHAR" and self.check_type(val[:val.find(" ")]) in [" INT", " FLOAT"]: - # insertStmt+= f"{val[:val.find(' ')]}, " - # elif data_types[type(val)] == "VARCHAR": - # insertStmt+= f"'{val}', " - # else: - # insertStmt+= f"{val}, " - - # if insertStmt not in sql_statements: - # sql_statements.append(insertStmt) - # sql_file.write(insertStmt[:-2] + ");\n\n") - - # subprocess.run(["sqlite3", db_name+".db"], stdin= open(db_name+".sql", "r")) - - # if deleteSql == True: - # os.remove(db_name+".sql") - - # def tomlDataToList(self, filenames): - # """ - # Function that reads a TOML file or files into a list - # """ - - # toml_data = [] - # for filename in filenames: - # with open(filename, 'r') as toml_file: - # data = toml.load(toml_file) - # for tableName, tableData in data.items(): - # toml_data.append([tableName, tableData]) - - # return toml_data - - # def tomlToSqlite(self, filenames, db_name, deleteSql=True): - # """ - # Function that ingests a TOML file into a sqlite database based on the given database name - - # `filenames`: name of TOML file or a list of TOML files to be ingested - - # `db_name`: name of database that TOML file should be added to. Database will be created if it does not exist in local directory. - - # `deleteSql`: flag to delete temp SQL file that creates the database. Default is True, but change to False for testing or comparing outputs - # """ - - # sql_statements = [] - # if isinstance(filenames, str): - # filenames = [filenames] - - # with open(db_name+".sql", "w") as sql_file: - # data = self.tomlDataToList(filenames) - - # for item in data: - # tableName, tableData = item - # data_types = {float: "FLOAT", str: "VARCHAR", int: "INT"} - - # if not os.path.isfile(db_name+".db") or os.path.getsize(db_name+".db") == 0: - # createStmt = f"CREATE TABLE IF NOT EXISTS {tableName} ( " - # createUnitStmt = f"CREATE TABLE IF NOT EXISTS {tableName}_units ( " - # insertUnitStmt = f"INSERT INTO {tableName}_units VALUES( " - - # for key, val in tableData.items(): - # createUnitStmt+= f"{key} VARCHAR, " - # if type(val) == list and type(val[0]) == str and self.check_type(val[0]) in [" INT", " FLOAT"]: - # createStmt += f"{key}{self.check_type(val[0])}, " - # insertUnitStmt+= f"'{val[1]}', " - # else: - # createStmt += f"{key} {data_types[type(val)]}, " - # insertUnitStmt+= "NULL, " - - # if createStmt not in sql_statements: - # sql_statements.append(createStmt) - # sql_file.write(createStmt[:-2] + ");\n\n") - # if createUnitStmt not in sql_statements: - # sql_statements.append(createUnitStmt) - # sql_file.write(createUnitStmt[:-2] + ");\n\n") - # if insertUnitStmt not in sql_statements: - # sql_statements.append(insertUnitStmt) - # sql_file.write(insertUnitStmt[:-2] + ");\n\n") - - # insertStmt = f"INSERT INTO {tableName} VALUES( " - # for val in tableData.values(): - # if type(val) == list and type(val[0]) == str and self.check_type(val[0]) in [" INT", " FLOAT"]: - # insertStmt+= f"{val[0]}, " - # elif type(val) == str: - # insertStmt+= f"'{val}', " - # else: - # insertStmt+= f"{val}, " - - # if insertStmt not in sql_statements: - # sql_statements.append(insertStmt) - # sql_file.write(insertStmt[:-2] + ");\n\n") - - # subprocess.run(["sqlite3", db_name+".db"], stdin= open(db_name+".sql", "r")) - - # if deleteSql == True: - # os.remove(db_name+".sql") + # return resout \ No newline at end of file From 402477b17ac3ca841d8ce1ccc1486439697af57b Mon Sep 17 00:00:00 2001 From: Vedant P Iyer Date: Fri, 15 Nov 2024 12:59:43 -0700 Subject: [PATCH 26/39] added graphviz to pip install in CI file --- .github/workflows/test_file_reader.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/test_file_reader.yml b/.github/workflows/test_file_reader.yml index a0dc0aee..fbe54239 100644 --- a/.github/workflows/test_file_reader.yml +++ b/.github/workflows/test_file_reader.yml @@ -28,6 +28,7 @@ jobs: python -m pip install --upgrade pip pip install -r requirements.txt pip install . + pip install graphviz - name: Test reader run: | pip install pytest From 58958568dd7a92932f9f4fa61cb8221ddd35e594 Mon Sep 17 00:00:00 2001 From: Vedant P Iyer Date: Fri, 15 Nov 2024 13:08:53 -0700 Subject: [PATCH 27/39] updated tests to reflect units table in collections now --- dsi/plugins/tests/test_file_reader.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dsi/plugins/tests/test_file_reader.py b/dsi/plugins/tests/test_file_reader.py index 190e0a98..469ae8b1 100644 --- a/dsi/plugins/tests/test_file_reader.py +++ b/dsi/plugins/tests/test_file_reader.py @@ -100,7 +100,7 @@ def test_yaml_reader(): a.load_module('plugin', 'YAML', 'reader', filenames=["examples/data/student_test1.yml", "examples/data/student_test2.yml"], target_table_prefix = "student") a.transload() - assert len(a.active_metadata.keys()) == 3 + assert len(a.active_metadata.keys()) == 4 # 4 tables - math, address, physics, dsi_units for tableData in a.active_metadata.values(): assert isinstance(tableData, OrderedDict) numRows = 2 @@ -111,7 +111,7 @@ def test_toml_reader(): a.load_module('plugin', 'TOML', 'reader', filenames="examples/data/results.toml", target_table_prefix = "results") a.transload() - assert len(a.active_metadata.keys()) == 1 + assert len(a.active_metadata.keys()) == 2 # 2 tables - people and dsi_units for tableData in a.active_metadata.values(): assert isinstance(tableData, OrderedDict) numRows = 1 From 23252d15492f4beaf65fa70f24d8345fe2419634 Mon Sep 17 00:00:00 2001 From: Vedant P Iyer Date: Fri, 15 Nov 2024 13:16:56 -0700 Subject: [PATCH 28/39] updated test file reader again --- dsi/plugins/tests/test_file_reader.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/dsi/plugins/tests/test_file_reader.py b/dsi/plugins/tests/test_file_reader.py index 469ae8b1..cb9ef239 100644 --- a/dsi/plugins/tests/test_file_reader.py +++ b/dsi/plugins/tests/test_file_reader.py @@ -101,10 +101,12 @@ def test_yaml_reader(): a.transload() assert len(a.active_metadata.keys()) == 4 # 4 tables - math, address, physics, dsi_units - for tableData in a.active_metadata.values(): + for name, tableData in a.active_metadata.items(): assert isinstance(tableData, OrderedDict) numRows = 2 - assert all(len(lst) == numRows for lst in tableData.values()) + if name == "dsi_units": + continue + assert all(len(colData) == numRows for colData in tableData.values()) def test_toml_reader(): a=Terminal() @@ -112,10 +114,12 @@ def test_toml_reader(): a.transload() assert len(a.active_metadata.keys()) == 2 # 2 tables - people and dsi_units - for tableData in a.active_metadata.values(): + for name, tableData in a.active_metadata.items(): assert isinstance(tableData, OrderedDict) + if name == "dsi_units": + continue numRows = 1 - assert all(len(lst) == numRows for lst in tableData.values()) + assert all(len(colData) == numRows for colData in tableData.values()) def test_schema_reader(): a=Terminal() From 718b439861b4834844afd6f271caf4e62c36e5e8 Mon Sep 17 00:00:00 2001 From: Vedant P Iyer Date: Fri, 15 Nov 2024 15:12:07 -0700 Subject: [PATCH 29/39] moved nbc and nbf dependencies inline for only inspect artifact --- dsi/backends/parquet.py | 5 +++-- dsi/backends/sqlite.py | 12 +++++++----- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/dsi/backends/parquet.py b/dsi/backends/parquet.py index 92f8eb30..7bb38b9a 100644 --- a/dsi/backends/parquet.py +++ b/dsi/backends/parquet.py @@ -1,7 +1,5 @@ import pyarrow as pa from pyarrow import parquet as pq -import nbconvert as nbc -import nbformat as nbf import subprocess from dsi.backends.filesystem import Filesystem @@ -46,6 +44,9 @@ def get_cmd_output(cmd: list) -> str: return proc.stdout.strip().decode("utf-8") def inspect_artifacts(self, collection, interactive=False): + import nbconvert as nbc + import nbformat as nbf + """Populate a Jupyter notebook with tools required to look at Parquet data.""" nb = nbf.v4.new_notebook() text = """\ diff --git a/dsi/backends/sqlite.py b/dsi/backends/sqlite.py index fed56e5a..b85eb680 100644 --- a/dsi/backends/sqlite.py +++ b/dsi/backends/sqlite.py @@ -3,8 +3,6 @@ import json import re import subprocess -import nbconvert as nbc -import nbformat as nbf from datetime import datetime import textwrap @@ -227,9 +225,9 @@ def put_artifacts(self, collection, isVerbose=False): except Exception as e: self.con.rollback() if type(e) is AssertionError: - return f"No data was inserted into {self.filename} due to the error: {errorString}" + return errorString else: - return f"No data was inserted into {self.filename} due to the error: {e}" + return e def put_artifacts_only(self, artifacts, isVerbose=False): """ @@ -432,6 +430,9 @@ def get_artifacts(self, query, isVerbose=False): return data def inspect_artifacts(self, collection, interactive=False): + import nbconvert as nbc + import nbformat as nbf + dsi_relations = dict(collection["dsi_relations"]) dsi_units = dict(collection["dsi_units"]) @@ -446,7 +447,8 @@ def inspect_artifacts(self, collection, interactive=False): import sqlite3 """ code2 = f"""\ - conn = {self.con} + dbPath = '{self.filename}' + conn = sqlite3.connect(dbPath) tables = pd.read_sql_query('SELECT name FROM sqlite_master WHERE type="table";', conn) dsi_units = {dsi_units} dsi_relations = {dsi_relations} From 4679e6e6a577ff251e2a8e2ea762b73e74da0325 Mon Sep 17 00:00:00 2001 From: Vedant P Iyer Date: Fri, 15 Nov 2024 15:13:15 -0700 Subject: [PATCH 30/39] only backend read can call read and only backend write can call put --- dsi/core.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/dsi/core.py b/dsi/core.py index 4ea11508..e9b2845d 100644 --- a/dsi/core.py +++ b/dsi/core.py @@ -24,7 +24,7 @@ class Terminal(): BACKEND_IMPLEMENTATIONS = ['gufi', 'sqlite', 'parquet'] PLUGIN_PREFIX = ['dsi.plugins'] PLUGIN_IMPLEMENTATIONS = ['env', 'file_reader', 'file_writer'] - VALID_PLUGINS = ['Hostname', 'SystemKernel', 'GitInfo', 'Bueno', 'Csv', 'ER_Diagram', 'YAML', 'TOML', "Table_Plot", "Schema"] + VALID_PLUGINS = ['Hostname', 'SystemKernel', 'GitInfo', 'Bueno', 'Csv', 'ER_Diagram', 'YAML1', 'TOML1', "Table_Plot", "Schema"] VALID_BACKENDS = ['Gufi', 'Sqlite', 'Parquet'] VALID_MODULES = VALID_PLUGINS + VALID_BACKENDS VALID_MODULE_FUNCTIONS = {'plugin': ['writer', 'reader'], @@ -244,7 +244,7 @@ def artifact_handler(self, interaction_type, query = None, **kwargs): self.logger.info(f"-------------------------------------") self.logger.info(obj.__class__.__name__ + f" backend - {interaction_type} the data") start = datetime.now() - if interaction_type == 'put' or interaction_type == 'set': + if (interaction_type == 'put' or interaction_type == 'set') and module_type == 'back-write': if self.backup_db_flag == True and os.path.getsize(obj.filename) > 100: backup_file = obj.filename[:obj.filename.rfind('.')] + "_backup" + obj.filename[obj.filename.rfind('.'):] shutil.copyfile(obj.filename, backup_file) @@ -271,7 +271,7 @@ def artifact_handler(self, interaction_type, query = None, **kwargs): obj.inspect_artifacts( collection=self.active_metadata, **kwargs) operation_success = True - elif interaction_type == "read": + elif interaction_type == "read" and module_type == 'back-read': self.active_metadata = obj.read_to_artifact() operation_success = True end = datetime.now() @@ -281,9 +281,8 @@ def artifact_handler(self, interaction_type, query = None, **kwargs): return self.active_metadata return else: - print( - 'Hint: Did you implement a case for your artifact interaction in the \ - artifact_handler loop?') + print('Is your artifact interaction spelled correct and is it implemented in your backend?') + print('Remember that backend writers cannot read a db and backend readers cannot write to a db') raise NotImplementedError class Sync(): From 25435223be8e5ae256e72beaaa19e653ebc0f164 Mon Sep 17 00:00:00 2001 From: Vedant P Iyer Date: Fri, 15 Nov 2024 15:14:01 -0700 Subject: [PATCH 31/39] removed extra imports and updated erd writer if graphviz installed or not(manual dot file) --- dsi/plugins/file_writer.py | 75 ++++++++++++++++++++++++-------------- 1 file changed, 48 insertions(+), 27 deletions(-) diff --git a/dsi/plugins/file_writer.py b/dsi/plugins/file_writer.py index b3496936..70644b83 100644 --- a/dsi/plugins/file_writer.py +++ b/dsi/plugins/file_writer.py @@ -1,13 +1,5 @@ -# from collections import OrderedDict -# from os.path import abspath -# from hashlib import sha1 -import json, csv -# from math import isnan -# import sqlite3 -# import subprocess -# import os +import csv from matplotlib import pyplot as plt -from graphviz import Digraph from dsi.plugins.metadata import StructuredMetadata @@ -54,22 +46,36 @@ def get_rows(self, collection) -> None: # return # else: - file_type = "png" - if self.output_filename[-3:] in ["png", "pdf", "jpg"]: - file_type = self.output_filename[-3:] - self.output_filename = self.output_filename[:-4] - elif self.output_filename[-4:] == ".jpeg": + file_type = ".png" + if self.output_filename[-4:] == ".png" or self.output_filename[-4:] == ".pdf" or self.output_filename[-4:] == ".jpg": file_type = self.output_filename[-4:] + self.output_filename = self.output_filename[:-4] + elif self.output_filename[-5:] == ".jpeg": + file_type = self.output_filename[-5:] self.output_filename = self.output_filename[:-5] if self.target_table_prefix is not None and not any(self.target_table_prefix in element for element in collection.keys()): raise ValueError("Your input for target_table_prefix does not exist in the database. Please enter a valid prefix for table names.") - dot = Digraph('workflow_schema', format = file_type) - if self.target_table_prefix is not None: - dot.attr(label = f'ER Diagram for {self.target_table_prefix} tables', labelloc='t') - dot.attr('node', shape='plaintext') - dot.attr(dpi='300', rankdir='LR', splines='true', overlap='false') + manual_dot = False + try: from graphviz import Digraph + except ModuleNotFoundError: + manual_dot = True + import subprocess + import os + + if manual_dot: + dot_file = open(self.output_filename + ".dot", "w") + dot_file.write("digraph workflow_schema { ") + if self.target_table_prefix is not None: + dot_file.write(f'label="ER Diagram for {self.target_table_prefix} tables"; labelloc="t"; ') + dot_file.write("node [shape=plaintext]; dpi=300 rankdir=LR splines=true overlap=false ") + else: + dot = Digraph('workflow_schema', format = file_type[1:]) + if self.target_table_prefix is not None: + dot.attr(label = f'ER Diagram for {self.target_table_prefix} tables', labelloc='t') + dot.attr('node', shape='plaintext') + dot.attr(dpi='300', rankdir='LR', splines='true', overlap='false') num_tbl_cols = 1 for tableName, tableData in collection.items(): @@ -77,8 +83,11 @@ def get_rows(self, collection) -> None: continue elif self.target_table_prefix is not None and self.target_table_prefix not in tableName: continue - - html_table = f"<" + + html_table = "" + if manual_dot: + html_table = f"{tableName} [label=" + html_table += f"<
{tableName}
" col_list = tableData.keys() if tableName == "dsi_units": @@ -98,18 +107,30 @@ def get_rows(self, collection) -> None: if inner_brace: html_table += "" html_table += "
{tableName}
>" - dot.node(tableName, label = html_table) + + if manual_dot: dot_file.write(html_table+"]; ") + else: dot.node(tableName, label = html_table) for f_table, f_col in collection["dsi_relations"]["foreign_key"]: if self.target_table_prefix is not None and self.target_table_prefix not in f_table: continue if f_table != "NULL": foreignIndex = collection["dsi_relations"]["foreign_key"].index((f_table, f_col)) - dot.edge(f"{f_table}:{f_col}", f"{collection['dsi_relations']['primary_key'][foreignIndex][0]}:{collection['dsi_relations']['primary_key'][foreignIndex][1]}") - - dot.render(self.output_filename, cleanup=True) + fk_string = f"{f_table}:{f_col}" + pk_string = f"{collection['dsi_relations']['primary_key'][foreignIndex][0]}:{collection['dsi_relations']['primary_key'][foreignIndex][1]}" + + if manual_dot: dot_file.write(f"{fk_string} -> {pk_string}; ") + else: dot.edge(fk_string, pk_string) + + if manual_dot: + dot_file.write("}") + dot_file.close() + subprocess.run(["dot", "-T", file_type[1:], "-o", self.output_filename + file_type, self.output_filename + ".dot"]) + os.remove(self.output_filename + ".dot") + else: + dot.render(self.output_filename, cleanup=True) - + #REALLLLLY OLD CODE # def export_erd(self, dbname, fname): # """ # Function that outputs a ER diagram for the given database. @@ -308,7 +329,7 @@ def get_rows(self, collection) -> None: col_len = len(colData) if isinstance(colData[0], str) == False: unit_tuple = "NULL" - if self.table_name in collection["dsi_units"].keys(): + if "dsi_units" in collection.keys() and self.table_name in collection["dsi_units"].keys(): unit_tuple = next((t[1] for t in collection["dsi_units"][self.table_name] if t[0] == colName), "NULL") if unit_tuple != "NULL": numeric_cols.append((colName + f" ({unit_tuple})", colData)) From 1e4f2e0898b9f3e98716f89922276e3ddf03195e Mon Sep 17 00:00:00 2001 From: Vedant P Iyer Date: Fri, 15 Nov 2024 15:16:17 -0700 Subject: [PATCH 32/39] Updated csv reader to be faster and renamed yaml/toml to YAML1 TOML1 --- dsi/plugins/file_reader.py | 129 +++++++++++++++++++++---------------- 1 file changed, 73 insertions(+), 56 deletions(-) diff --git a/dsi/plugins/file_reader.py b/dsi/plugins/file_reader.py index e5a065ca..c1384433 100644 --- a/dsi/plugins/file_reader.py +++ b/dsi/plugins/file_reader.py @@ -46,55 +46,75 @@ def __init__(self, filenames, **kwargs): super().__init__(filenames, **kwargs) self.csv_data = {} - def pack_header(self) -> None: - """ Set schema based on the CSV columns """ + # def pack_header(self) -> None: + # """ Set schema based on the CSV columns """ - column_names = list(self.file_info.keys()) + list(self.csv_data.keys()) - self.set_schema(column_names) + # column_names = list(self.file_info.keys()) + list(self.csv_data.keys()) + # self.set_schema(column_names) def add_rows(self) -> None: """ Adds a list containing one or more rows of the CSV along with file_info to output. """ - if not self.schema_is_set(): - # use Pandas to append all CSVs together as a - # dataframe, then convert to dict - if self.strict_mode: - total_df = DataFrame() - dfs = [] - for filename in self.filenames: - # Initial case. Empty df collection. - if total_df.empty: - total_df = read_csv(filename) - dfs.append(total_df) - else: # One or more dfs in collection - temp_df = read_csv(filename) - # raise exception if schemas do not match - if any([set(temp_df.columns) != set(df.columns) for df in dfs]): - print('Error: Strict schema mode is on. Schemas do not match.') - raise TypeError - dfs.append(temp_df) - total_df = concat([total_df, temp_df]) - - # Reminder: Schema is not set in this block. - else: # self.strict_mode == False - total_df = DataFrame() - for filename in self.filenames: - temp_df = read_csv(filename) - total_df = concat([total_df, temp_df]) - - # Columns are present in the middleware already (schema_is_set==True). - # TODO: Can this go under the else block at line #79? - self.csv_data = total_df.to_dict('list') - for col, coldata in self.csv_data.items(): # replace NaNs with None - self.csv_data[col] = [None if type(item) == float and isnan(item) else item - for item in coldata] - self.pack_header() + total_df = DataFrame() + for filename in self.filenames: + temp_df = read_csv(filename) + try: + total_df = concat([total_df, temp_df]) + except: + raise ValueError(f"Error in adding {filename} to the existing csv data. Please recheck column names and data structure") + + #convert total_df to ordered dict + table_data = OrderedDict(total_df.to_dict(orient='list')) + for col, coldata in table_data.items(): # replace NaNs with None + table_data[col] = [None if type(item) == float and isnan(item) else item for item in coldata] + + if self.db_name is not None: + self.csv_data[self.db_name] = table_data + else: + self.csv_data["CSV"] = table_data + + self.set_schema2(self.csv_data) - total_length = len(self.csv_data[list(self.csv_data.keys())[0]]) - for row_idx in range(total_length): - row = [self.csv_data[k][row_idx] for k in self.csv_data.keys()] - row_w_fileinfo = list(self.file_info.values()) + row - self.add_to_output(row_w_fileinfo) + # if not self.schema_is_set(): + # # use Pandas to append all CSVs together as a + # # dataframe, then convert to dict + # if self.strict_mode: + # total_df = DataFrame() + # dfs = [] + # for filename in self.filenames: + # # Initial case. Empty df collection. + # if total_df.empty: + # total_df = read_csv(filename) + # dfs.append(total_df) + # else: # One or more dfs in collection + # temp_df = read_csv(filename) + # # raise exception if schemas do not match + # if any([set(temp_df.columns) != set(df.columns) for df in dfs]): + # print('Error: Strict schema mode is on. Schemas do not match.') + # raise TypeError + # dfs.append(temp_df) + # total_df = concat([total_df, temp_df]) + + # # Reminder: Schema is not set in this block. + # else: # self.strict_mode == False + # total_df = DataFrame() + # for filename in self.filenames: + # temp_df = read_csv(filename) + # total_df = concat([total_df, temp_df]) + + # # Columns are present in the middleware already (schema_is_set==True). + # # TODO: Can this go under the else block at line #79? + # self.csv_data = total_df.to_dict('list') + # for col, coldata in self.csv_data.items(): # replace NaNs with None + # self.csv_data[col] = [None if type(item) == float and isnan(item) else item + # for item in coldata] + # self.pack_header() + + # total_length = len(self.csv_data[list(self.csv_data.keys())[0]]) + # for row_idx in range(total_length): + # row = [self.csv_data[k][row_idx] for k in self.csv_data.keys()] + # row_w_fileinfo = list(self.file_info.values()) + row + # self.add_to_output(row_w_fileinfo) class Bueno(FileReader): @@ -246,10 +266,9 @@ def add_rows(self) -> None: # self.schema_data["dsi_relations"]["foreign_key"].append((tableName, colName)) # self.add_to_output([(colData[0], colData[1]), (tableName, colName)], "dsi_relations") -class YAML(FileReader): +class YAML1(FileReader): ''' Plugin to read in an individual or a set of YAML files - Table names are the keys for the main ordered dictionary and column names are the keys for each table's nested ordered dictionary ''' def __init__(self, filenames, target_table_prefix = None, yamlSpace = ' ', **kwargs): @@ -267,12 +286,12 @@ def __init__(self, filenames, target_table_prefix = None, yamlSpace = ' ', **kw self.yaml_data = OrderedDict() self.target_table_prefix = target_table_prefix - def pack_header(self) -> None: - """Set schema with YAML data.""" - table_info = [] - for table_name in list(self.yaml_data.keys()): - table_info.append((table_name, list(self.yaml_data[table_name].keys()))) - self.set_schema(table_info) + # def pack_header(self) -> None: + # """Set schema with YAML data.""" + # table_info = [] + # for table_name in list(self.yaml_data.keys()): + # table_info.append((table_name, list(self.yaml_data[table_name].keys()))) + # self.set_schema(table_info) def check_type(self, text): """ @@ -323,7 +342,7 @@ def add_rows(self) -> None: if len(unitsList) > 0 and tableName not in self.yaml_data["dsi_units"].keys(): self.yaml_data["dsi_units"][tableName] = unitsList - self.set_schema_2(self.yaml_data) + self.set_schema_2(self.yaml_data) # if not self.schema_is_set(): # self.yaml_data["dsi_units"] = OrderedDict() @@ -358,10 +377,9 @@ def add_rows(self) -> None: # if len(next(iter(self.output_collector["dsi_units"].values()))) < 1: # self.add_to_output(unit_row, "dsi_units") -class TOML(FileReader): +class TOML1(FileReader): ''' Plugin to read in an individual or a set of TOML files - Table names are the keys for the main ordered dictionary and column names are the keys for each table's nested ordered dictionary ''' def __init__(self, filenames, target_table_prefix = None, **kwargs): @@ -427,7 +445,7 @@ def add_rows(self) -> None: if len(unitsList) > 0 and tableName not in self.toml_data["dsi_units"].keys(): self.toml_data["dsi_units"][tableName] = unitsList - self.set_schema_2(self.toml_data) + self.set_schema_2(self.toml_data) # if not self.schema_is_set(): # for tableName, tableData in toml_load_data.items(): @@ -467,7 +485,6 @@ def add_rows(self) -> None: class TextFile(FileReader): ''' Plugin to read in an individual or a set of text files - Table names are the keys for the main ordered dictionary and column names are the keys for each table's nested ordered dictionary ''' def __init__(self, filenames, target_table_prefix = None, **kwargs): From c9e0845850d23230794a0a85cd3c190874d413e3 Mon Sep 17 00:00:00 2001 From: Vedant P Iyer Date: Fri, 15 Nov 2024 15:17:21 -0700 Subject: [PATCH 33/39] Updated with new toml and yaml reader names --- examples/coreterminal.py | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/examples/coreterminal.py b/examples/coreterminal.py index 25bd240e..392a3e00 100644 --- a/examples/coreterminal.py +++ b/examples/coreterminal.py @@ -8,20 +8,20 @@ # a.load_module('plugin','Bueno','reader', filenames='data/bueno1.data') # a.load_module('plugin','Hostname','reader') -# a.load_module('plugin', 'Schema', 'reader', filename="data/example_schema.json" , target_table_prefix = "student") -# a.load_module('plugin', 'YAML', 'reader', filenames=["data/student_test1.yml", "data/student_test2.yml"]) -# a.load_module('plugin', 'TOML', 'reader', filenames=["data/results.toml"], target_table_prefix = "results") +# a.load_module('plugin', 'Schema', 'reader', filename="data/example_schema.json", target_table_prefix = "student") +# a.load_module('plugin', 'YAML1', 'reader', filenames=["data/student_test1.yml", "data/student_test2.yml"], target_table_prefix = "student") +# a.load_module('plugin', 'TOML1', 'reader', filenames=["data/results.toml"], target_table_prefix = "results") -# a.load_module('plugin', "Table_Plot", "writer", table_name = "schema__physics", filename = "schema__physics") -# a.load_module('plugin', 'ER_Diagram', 'writer', filename = 'er_diagram.pdf')#, target_table_prefix = "physics") -a.transload() +# # a.load_module('plugin', "Table_Plot", "writer", table_name = "schema__physics", filename = "schema__physics") +# # a.load_module('plugin', 'ER_Diagram', 'writer', filename = 'er_diagram.pdf')#, target_table_prefix = "physics") +# a.transload() -a.load_module('backend','Sqlite','back-write', filename='data/data.db') -# a.load_module('backend','Parquet','back-write',filename='data/bueno.pq') +# a.load_module('backend','Sqlite','back-write', filename='data/data.db') +# # a.load_module('backend','Parquet','back-write',filename='data/bueno.pq') -a.artifact_handler(interaction_type='put') -# data = a.artifact_handler(interaction_type='get', query = "SELECT * FROM run_table;")#, isVerbose = True) -a.artifact_handler(interaction_type="inspect") +# a.artifact_handler(interaction_type='put') +# # data = a.artifact_handler(interaction_type='get', query = "SELECT * FROM run_table;")#, isVerbose = True) +# a.artifact_handler(interaction_type="read") # print(data) # a.unload_module('backend', 'Sqlite', 'back-write') @@ -50,7 +50,7 @@ #Example use 2 -# a.load_module('backend','Sqlite','back-read', filename='data/data.db') -# a.artifact_handler(interaction_type="read") -# a.load_module('plugin', 'ER_Diagram', 'writer', filename = 'er_diagram.png')#, target_table_prefix = "physics") -# a.transload() \ No newline at end of file +a.load_module('backend','Sqlite','back-read', filename='data/data.db') +a.artifact_handler(interaction_type="read") +a.load_module('plugin', 'ER_Diagram', 'writer', filename = 'er_diagram.png')#, target_table_prefix = "physics") +a.transload() \ No newline at end of file From 2047081398a5d5f75707f79bca224e64d1c5e79f Mon Sep 17 00:00:00 2001 From: Vedant P Iyer Date: Fri, 15 Nov 2024 15:22:10 -0700 Subject: [PATCH 34/39] Updated init function of csv reader --- dsi/plugins/file_reader.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/dsi/plugins/file_reader.py b/dsi/plugins/file_reader.py index c1384433..f72b8abf 100644 --- a/dsi/plugins/file_reader.py +++ b/dsi/plugins/file_reader.py @@ -42,9 +42,14 @@ class Csv(FileReader): # Default value is False. strict_mode = False - def __init__(self, filenames, **kwargs): + def __init__(self, filenames, db_name = None, **kwargs): super().__init__(filenames, **kwargs) - self.csv_data = {} + self.csv_data = OrderedDict() + if isinstance(filenames, str): + self.filenames = [filenames] + else: + self.filenames = filenames + self.db_name = db_name # def pack_header(self) -> None: # """ Set schema based on the CSV columns """ From 61adf5bfea7837ca1c9e54a0a0d5887d0c1309ca Mon Sep 17 00:00:00 2001 From: Vedant P Iyer Date: Fri, 15 Nov 2024 15:24:13 -0700 Subject: [PATCH 35/39] Updated name of set schema 2 function call in csv reader --- dsi/plugins/file_reader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dsi/plugins/file_reader.py b/dsi/plugins/file_reader.py index f72b8abf..54ed98bb 100644 --- a/dsi/plugins/file_reader.py +++ b/dsi/plugins/file_reader.py @@ -78,7 +78,7 @@ def add_rows(self) -> None: else: self.csv_data["CSV"] = table_data - self.set_schema2(self.csv_data) + self.set_schema_2(self.csv_data) # if not self.schema_is_set(): # # use Pandas to append all CSVs together as a From d9cd05906fb51aa309780bf0ca0f490e2a5aa350 Mon Sep 17 00:00:00 2001 From: Vedant P Iyer Date: Fri, 15 Nov 2024 16:37:36 -0700 Subject: [PATCH 36/39] updated set_schema_2 in metadata to create nested ordered dict --- dsi/plugins/file_reader.py | 2 +- dsi/plugins/metadata.py | 8 +++++++- dsi/plugins/tests/test_file_reader.py | 4 ++-- dsi/plugins/tests/test_file_writer.py | 4 ++-- 4 files changed, 12 insertions(+), 6 deletions(-) diff --git a/dsi/plugins/file_reader.py b/dsi/plugins/file_reader.py index 54ed98bb..ea8e0ce5 100644 --- a/dsi/plugins/file_reader.py +++ b/dsi/plugins/file_reader.py @@ -76,7 +76,7 @@ def add_rows(self) -> None: if self.db_name is not None: self.csv_data[self.db_name] = table_data else: - self.csv_data["CSV"] = table_data + self.csv_data = table_data self.set_schema_2(self.csv_data) diff --git a/dsi/plugins/metadata.py b/dsi/plugins/metadata.py index 6b194366..15fdfbfe 100644 --- a/dsi/plugins/metadata.py +++ b/dsi/plugins/metadata.py @@ -64,7 +64,13 @@ def set_schema(self, table_data: list, validation_model=None) -> None: self.strict_mode_lock = True def set_schema_2(self, collection, validation_model=None) -> None: - self.output_collector = collection + # Finds file_reader class that called set_schema and assigns that as table_name for this data + if not isinstance(collection[next(iter(collection))], OrderedDict): + caller_frame = inspect.stack()[1] + tableName = caller_frame.frame.f_locals.get('self', None).__class__.__name__ + self.output_collector[tableName] = collection + else: + self.output_collector = collection self.table_cnt = len(collection.keys()) self.validation_model = validation_model diff --git a/dsi/plugins/tests/test_file_reader.py b/dsi/plugins/tests/test_file_reader.py index cb9ef239..36024cd4 100644 --- a/dsi/plugins/tests/test_file_reader.py +++ b/dsi/plugins/tests/test_file_reader.py @@ -97,7 +97,7 @@ def test_csv_plugin_leaves_active_metadata_wellformed(): def test_yaml_reader(): a=Terminal() - a.load_module('plugin', 'YAML', 'reader', filenames=["examples/data/student_test1.yml", "examples/data/student_test2.yml"], target_table_prefix = "student") + a.load_module('plugin', 'YAML1', 'reader', filenames=["examples/data/student_test1.yml", "examples/data/student_test2.yml"], target_table_prefix = "student") a.transload() assert len(a.active_metadata.keys()) == 4 # 4 tables - math, address, physics, dsi_units @@ -110,7 +110,7 @@ def test_yaml_reader(): def test_toml_reader(): a=Terminal() - a.load_module('plugin', 'TOML', 'reader', filenames="examples/data/results.toml", target_table_prefix = "results") + a.load_module('plugin', 'TOML1', 'reader', filenames="examples/data/results.toml", target_table_prefix = "results") a.transload() assert len(a.active_metadata.keys()) == 2 # 2 tables - people and dsi_units diff --git a/dsi/plugins/tests/test_file_writer.py b/dsi/plugins/tests/test_file_writer.py index 2fa1cdb8..3ca1cbf8 100644 --- a/dsi/plugins/tests/test_file_writer.py +++ b/dsi/plugins/tests/test_file_writer.py @@ -22,8 +22,8 @@ def test_csv_plugin_type(): def test_export_db_erd(): a=Terminal(debug_flag=False) a.load_module('plugin', 'Schema', 'reader', filename="examples/data/example_schema.json" , target_table_prefix = "student") - a.load_module('plugin', 'YAML', 'reader', filenames=["examples/data/student_test1.yml", "examples/data/student_test2.yml"], target_table_prefix = "student") - a.load_module('plugin', 'TOML', 'reader', filenames=["examples/data/results.toml"], target_table_prefix = "results") + a.load_module('plugin', 'YAML1', 'reader', filenames=["examples/data/student_test1.yml", "examples/data/student_test2.yml"], target_table_prefix = "student") + a.load_module('plugin', 'TOML1', 'reader', filenames=["examples/data/results.toml"], target_table_prefix = "results") a.load_module('plugin', 'ER_Diagram', 'writer', filename = 'erd_test_output.png') a.transload() From 7fe19646170b4d8e21249ad1cf24ef4b6e78f098 Mon Sep 17 00:00:00 2001 From: Vedant P Iyer Date: Fri, 15 Nov 2024 16:42:42 -0700 Subject: [PATCH 37/39] Extraneous columnns expected in csv dictionary in test function are removed --- dsi/plugins/tests/test_file_reader.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dsi/plugins/tests/test_file_reader.py b/dsi/plugins/tests/test_file_reader.py index 36024cd4..e722be72 100644 --- a/dsi/plugins/tests/test_file_reader.py +++ b/dsi/plugins/tests/test_file_reader.py @@ -56,8 +56,8 @@ def test_csv_plugin_adds_rows(): for key, val in plug.output_collector["Csv"].items(): assert len(val) == 4 - # 11 Csv cols + 1 inherited FileReader cols - assert len(plug.output_collector["Csv"].keys()) == 12 + # 11 Csv cols + assert len(plug.output_collector["Csv"].keys()) == 11 def test_csv_plugin_adds_rows_multiple_files(): path1 = '/'.join([get_git_root('.'), 'examples/data', 'wildfiredata.csv']) @@ -69,8 +69,8 @@ def test_csv_plugin_adds_rows_multiple_files(): for key, val in plug.output_collector["Csv"].items(): assert len(val) == 8 - # 13 Csv cols + 2 inherited FileReader cols - assert len(plug.output_collector["Csv"].keys()) == 15 + # 13 Csv cols + assert len(plug.output_collector["Csv"].keys()) == 13 def test_csv_plugin_adds_rows_multiple_files_strict_mode(): path1 = '/'.join([get_git_root('.'), 'examples/data', 'wildfiredata.csv']) From ec9b5a0c5272458bbfbe82f79646945796df220c Mon Sep 17 00:00:00 2001 From: Vedant Iyer Date: Fri, 15 Nov 2024 19:32:38 -0700 Subject: [PATCH 38/39] Logger overwrites not appends to output file --- dsi/core.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dsi/core.py b/dsi/core.py index e9b2845d..815860c9 100644 --- a/dsi/core.py +++ b/dsi/core.py @@ -64,7 +64,7 @@ def static_munge(prefix, implementations): if debug_flag: logging.basicConfig( filename='logger.txt', # Name of the log file - filemode='a', # Append mode ('w' for overwrite) + filemode='w', # Append mode ('w' for overwrite) format='%(asctime)s - %(levelname)s - %(message)s', # Log message format level=logging.INFO # Minimum log level to capture ) @@ -447,4 +447,4 @@ def get(project_name = "Project"): DSI database ''' True - \ No newline at end of file + From 123c85181e41ae874b07ec8ac470de6c2b1e22f4 Mon Sep 17 00:00:00 2001 From: Jesus Pulido Date: Sun, 17 Nov 2024 10:43:06 -0700 Subject: [PATCH 39/39] fixed description --- dsi/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dsi/core.py b/dsi/core.py index 815860c9..6718f0b1 100644 --- a/dsi/core.py +++ b/dsi/core.py @@ -64,7 +64,7 @@ def static_munge(prefix, implementations): if debug_flag: logging.basicConfig( filename='logger.txt', # Name of the log file - filemode='w', # Append mode ('w' for overwrite) + filemode='w', # Overwrite mode ('a' for append) format='%(asctime)s - %(levelname)s - %(message)s', # Log message format level=logging.INFO # Minimum log level to capture )