From 7b0980a715e3c7c87bab2584a279cf80068b0525 Mon Sep 17 00:00:00 2001 From: Tharindu N Date: Fri, 25 Feb 2022 00:28:07 +0530 Subject: [PATCH 1/3] update docstrings --- ReallySimpleDB/manager.py | 50 ++++++++++++++-------------- tests/test_manager.py | 70 +++++++++++++++++++-------------------- 2 files changed, 59 insertions(+), 61 deletions(-) diff --git a/ReallySimpleDB/manager.py b/ReallySimpleDB/manager.py index 1a1668d..d0c30f5 100644 --- a/ReallySimpleDB/manager.py +++ b/ReallySimpleDB/manager.py @@ -4,7 +4,8 @@ from .utils import DATA_TYPES class ReallySimpleDB: - """ ReallySimpleDB class + """ + ReallySimpleDB class. ReallySimpleDB objects are the ones responsible of creating DBs, connecting with them, creating tables, adding records, geting records, among tasks. In @@ -12,17 +13,15 @@ class ReallySimpleDB: """ def __init__(self) -> None: - """ - create a object - """ + """Create a object.""" self._add_columns_cmd = "" self.connection = "" def clean(self): """ - cleans add_columns data + Clean add_columns data. - why? _add_columns_cmd variable is for define SQL command. when using add_column, + Why? _add_columns_cmd variable is for define SQL command. when using add_column, it sets up a string here. but when it is finished this is not clean and the data continues to exist. when use add_column again and again, it will be processed along with the existing data. this should be used to prevent it. @@ -30,12 +29,12 @@ def clean(self): self._add_columns_cmd = "" def create_connection(self, database): - """opens a connection to the SQLite database file""" + """Open a connection to the SQLite database file.""" self.connection = sqlite3.connect(database) return True def create_db(self, dbpath:str="", replace:bool=False): - """creates a new database in a given path""" + """Create a new database in a given path.""" if self.connection == "" and not dbpath: raise TypeError("create_db() missing 1 required positional argument: 'dbpath'") @@ -61,14 +60,13 @@ def add_columns(self, database:str="", table:str=""): """ - add columns to an existing table / define columns before creating a table + Add columns to an existing table / define columns before creating a table. - if use for create new table: sqlite cannot create table without columns. + If use for create new table: sqlite cannot create table without columns. so user must first define the columns and create a table. important: user have to close connection here. if not, code returns error. because it tries to add column to existing table. """ - # checks if the user is trying to add unsupported data type if datatype.upper() not in DATA_TYPES: raise TypeError("datatype not supported, '{}'".format(datatype)) @@ -103,7 +101,7 @@ def add_columns(self, return True def create_table(self, table_name:str, database:str=""): - """creates new table in database""" + """Create new table in database.""" if self.connection == "" and not database: raise TypeError("create_table() missing 1 required positional argument: 'database'") @@ -122,7 +120,7 @@ def create_table(self, table_name:str, database:str=""): return True def all_tables(self, database:str=""): - """get a list of all the tables in the database""" + """Get a list of all the tables in the database.""" if self.connection == "" and not database: raise TypeError("all_tables() missing 1 required positional argument: 'database'") @@ -134,7 +132,7 @@ def all_tables(self, database:str=""): return [tables[0] for tables in cursor.execute(sql_cmd)] def is_table(self, table_name:str, database:str=""): - """checks if the given table is exists in the database""" + """Check if the given table is exists in the database.""" if self.connection == "" and not database: raise TypeError("is_table() missing 1 required positional argument: 'database'") @@ -146,7 +144,7 @@ def is_table(self, table_name:str, database:str=""): return False def delete_table(self, table:str, database:str=""): - """delete a table from the database""" + """Delete a table from the database.""" if self.connection == "" and not database: raise TypeError("delete_table() missing 1 required positional argument: 'database'") @@ -164,7 +162,7 @@ def delete_table(self, table:str, database:str=""): raise sqlite3.OperationalError("no such table: {}".format(table)) def get_all_column_types(self, table:str, database:str=""): - """get all the column names with the data types in a table""" + """Get all the column names with the data types in a table.""" if self.connection == "" and not database: raise TypeError( "get_all_column_types() missing 1 required positional argument: 'database'") @@ -188,7 +186,7 @@ def get_all_column_types(self, table:str, database:str=""): raise sqlite3.OperationalError("no such table: {}".format(table)) def get_column_type(self, table:str, column:str, database:str=""): - """get data type of a column in a table""" + """Get data type of a column in a table.""" all_data = self.get_all_column_types(table=table, database=database) # if columns exists in the table and given column in the table @@ -198,7 +196,7 @@ def get_column_type(self, table:str, column:str, database:str=""): raise sqlite3.OperationalError("no such column: {}".format(column)) def get_columns(self, table:str, database:str=""): - """get all the column names list in a table""" + """Get all the column names list in a table.""" if self.connection == "" and not database: raise TypeError("get_columns() missing 1 required positional argument: 'database'") @@ -215,7 +213,7 @@ def get_columns(self, table:str, database:str=""): return columns def get_primary_key(self, table:str, database:str=""): - """find and get primary key of a table""" + """Find and get primary key of a table.""" if self.connection == "" and not database: raise TypeError("get_primary_key() missing 1 required positional argument: 'database'") @@ -233,7 +231,7 @@ def get_primary_key(self, table:str, database:str=""): raise sqlite3.OperationalError("no such table: {}".format(table)) def add_record(self, table:str, record, database:str=""): - """add a new record to a table""" + """Add a new record to a table.""" if self.connection == "" and not database: raise TypeError("add_record() missing 1 required positional argument: 'database'") @@ -288,7 +286,7 @@ def add_record(self, table:str, record, database:str=""): raise sqlite3.OperationalError("no such table: {}".format(table)) def get_record(self, table:str, primary_key, database:str=""): - """get row data / record from a table using the primary key""" + """Get row data / record from a table using the primary key.""" if self.connection == "" and not database: raise TypeError("get_record() missing 1 required positional argument: 'database'") @@ -321,7 +319,7 @@ def get_record(self, table:str, primary_key, database:str=""): raise sqlite3.OperationalError("no such table: {}".format(table)) def get_all_records(self, table:str, database:str=""): - """get all data / records of a table""" + """Get all data / records of a table.""" if self.connection == "" and not database: raise TypeError("get_all_records() missing 1 required positional argument: 'database'") @@ -353,7 +351,7 @@ def get_all_records(self, table:str, database:str=""): raise sqlite3.OperationalError("no such table: {}".format(table)) def delete_record(self, table:str, primary_key, database:str=""): - """delete record from a table""" + """Delete record from a table.""" if self.connection == "" and not database: raise TypeError("delete_record() missing 1 required positional argument: 'database'") @@ -374,9 +372,9 @@ def delete_record(self, table:str, primary_key, database:str=""): def filter_records(self, table:str, values:dict, database:str=""): """ - get filtered record list from a table + Get filtered record list from a table. - this will return one or more records by checking the values. + This will return one or more records by checking the values. """ if self.connection == "" and not database: raise TypeError("filter_records() missing 1 required positional argument: 'database'") @@ -427,6 +425,6 @@ def filter_records(self, table:str, values:dict, database:str=""): raise sqlite3.OperationalError("no such table: {}".format(table)) def close_connection(self): - """close the connection with the SQLite database file""" + """Close the connection with the SQLite database file.""" self.connection.close() return True diff --git a/tests/test_manager.py b/tests/test_manager.py index a2a3b6e..e5f9f9c 100644 --- a/tests/test_manager.py +++ b/tests/test_manager.py @@ -5,12 +5,12 @@ _dbmanager = dbmanager() def test_create_db(): - """creates new database""" + """Create new database.""" assert _dbmanager.create_db(dbpath="test.db", replace=True) delete_db() def test_create_table_1(): - """creates new database and new table with columns""" + """Create new database and new table with columns.""" _dbmanager.create_db(dbpath="test.db", replace=True) _dbmanager.close_connection() @@ -20,7 +20,7 @@ def test_create_table_1(): assert _dbmanager.create_table(database="test.db", table_name="STUDENTS") def test_create_table_2(): - """creates new table with columns""" + """Create new table with columns.""" _dbmanager.clean() _dbmanager.add_columns(column_name="teacher_id", primary_key=True) _dbmanager.add_columns(column_name="name", not_null=True) @@ -28,7 +28,7 @@ def test_create_table_2(): assert _dbmanager.create_table(database="test.db", table_name="TEACHERS") def test_create_table_3(): - """creates new table with columns""" + """Create new table with columns.""" _dbmanager.clean() _dbmanager.add_columns(column_name="emp_id", primary_key=True) _dbmanager.add_columns(column_name="name", not_null=True) @@ -36,43 +36,43 @@ def test_create_table_3(): assert _dbmanager.create_table(table_name="EMPLOYEES") def test_add_columns(): - """add new column to a table""" + """Add new column to a table.""" assert _dbmanager.add_columns(column_name="year", database="test.db", table="STUDENTS") def test_all_tables_1(): - """if table exists in the database""" + """If table exists in the database.""" assert "STUDENTS" in _dbmanager.all_tables("test.db") def test_all_tables_2(): - """if table not in the database""" + """If table not in the database.""" assert "NON" not in _dbmanager.all_tables("test.db") def test_all_tables_3(): - """if table exists in the database.. without define the db""" + """If table exists in the database.. without define the db.""" assert "STUDENTS" in _dbmanager.all_tables() def test_all_tables_4(): - """if table not in the database.. without define the db""" + """If table not in the database.. without define the db.""" assert "NON" not in _dbmanager.all_tables() def test_is_table_1(): - """checks if the given table is exists in the database""" + """Check if the given table is exists in the database.""" assert _dbmanager.is_table(database="test.db", table_name="STUDENTS") def test_is_table_2(): - """checks if the given table is not in the database""" + """Check if the given table is not in the database.""" assert not _dbmanager.is_table(database="test.db", table_name="NON") def test_is_table_3(): - """checks if the given table is exists in the database.. without define the db""" + """Check if the given table is exists in the database.. without define the db.""" assert _dbmanager.is_table(table_name="STUDENTS") def test_is_table_4(): - """checks if the given table is not in the database.. without define the db""" + """Check if the given table is not in the database.. without define the db.""" assert not _dbmanager.is_table(table_name="NON") def test_delete_table_1(): - """delete table if table exists""" + """Delete table if table exists.""" try: _dbmanager.delete_table(table="EMPLOYEES") assert True @@ -80,7 +80,7 @@ def test_delete_table_1(): assert False def test_delete_table_2(): - """delete table if table not exists""" + """Delete table if table not exists.""" try: _dbmanager.delete_table(table="EMPLOYEES") assert False @@ -88,16 +88,16 @@ def test_delete_table_2(): assert True def test_get_all_column_types_1(): - """get all the column names with the data types in a table""" + """Get all the column names with the data types in a table.""" assert _dbmanager.get_all_column_types(table="STUDENTS") \ == {"student_id": "TEXT", "name": "TEXT", "mark": "INT", "year": "TEXT"} def test_get_column_type_1(): - """get data type of a column in a table""" + """Get data type of a column in a table.""" assert _dbmanager.get_column_type(table="STUDENTS", column="student_id") == "TEXT" def test_get_column_type_2(): - """get data type of not exists column in a table""" + """Get data type of not exists column in a table.""" try: _dbmanager.get_column_type(table="STUDENTS", column="address") assert False @@ -105,27 +105,27 @@ def test_get_column_type_2(): assert True def test_get_columns_1(): - """get all the column names list in a table""" + """Get all the column names list in a table.""" assert _dbmanager.get_columns(table="STUDENTS") == ["student_id", "name", "mark", "year"] def test_get_primary_key_1(): - """find and get primary key of a table""" + """Find and get primary key of a table.""" assert _dbmanager.get_primary_key(table="STUDENTS") == "student_id" def test_get_primary_key_2(): - """find and get primary key of a table""" + """Find and get primary key of a table.""" assert _dbmanager.get_primary_key(table="TEACHERS") == "teacher_id" def test_add_record_1(): - """add a new record to a table""" + """Add a new record to a table.""" assert _dbmanager.add_record(table="STUDENTS", record={"student_id": "1010", "name":"ABC", "mark":10, "year":"2022"}) == True def test_add_record_2(): - """add a new record to a table""" + """Add a new record to a table.""" assert _dbmanager.add_record(table="STUDENTS", record={"student_id": "1011", "name":"DEF", "mark":100, "year":"2022"}) == True def test_add_record_3(): - """add a new record to a table with datatype errors""" + """Add a new record to a table with datatype errors.""" try: _dbmanager.add_record(table="STUDENTS", record={"student_id": 10, "name":"ABC", "mark":10, "year":"2022"}) assert False @@ -133,41 +133,41 @@ def test_add_record_3(): assert True def test_get_record_1(): - """get row data / record from a table using the primary key""" + """Get row data / record from a table using the primary key.""" assert _dbmanager.get_record(table="STUDENTS", primary_key="1010") == {'student_id': '1010', 'name': 'ABC', 'mark': 10, 'year': '2022'} def test_get_record_2(): - """get row data / record from a table using the primary key""" + """Get row data / record from a table using the primary key.""" assert _dbmanager.get_record(table="STUDENTS", primary_key="10101") == {} def test_get_all_records(): - """get all data / records of a table""" + """Get all data / records of a table.""" assert _dbmanager.get_all_records(table="STUDENTS") == [{'student_id': '1010', 'name': 'ABC', 'mark': 10, 'year': '2022'}, {'student_id': '1011', 'name': 'DEF', 'mark': 100, 'year': '2022'}] def test_filter_record_1(): - """get filtered record list from a table""" + """Get filtered record list from a table.""" assert _dbmanager.filter_records(table="STUDENTS", values={"year":"2022"}) == [{'student_id': '1010', 'name': 'ABC', 'mark': 10, 'year': '2022'}, {'student_id': '1011', 'name': 'DEF', 'mark': 100, 'year': '2022'}] def test_filter_record_2(): - """get filtered record list from a table""" + """Get filtered record list from a table.""" assert _dbmanager.filter_records(table="STUDENTS", values={"mark":100, "year":"2022"}) == [{'student_id': '1011', 'name': 'DEF', 'mark': 100, 'year': '2022'}] def test_filter_record_3(): - """get filtered record list from a table: Comparison""" + """Get filtered record list from a table: Comparison.""" assert _dbmanager.filter_records(table="STUDENTS", values={"mark":" <= 100"}) == [{'student_id': '1010', 'name': 'ABC', 'mark': 10, 'year': '2022'}, {'student_id': '1011', 'name': 'DEF', 'mark': 100, 'year': '2022'}] def test_filter_record_4(): - """get filtered record list from a table: Comparison""" + """Get filtered record list from a table: Comparison.""" assert _dbmanager.filter_records(table="STUDENTS", values={"mark":" != 100"}) == [{'student_id': '1010', 'name': 'ABC', 'mark': 10, 'year': '2022'}] def test_delete_record_1(): - """delete record from a table""" + """Delete record from a table.""" assert _dbmanager.delete_record(table="STUDENTS", primary_key="1010") def test_delete_record_2(): - """delete record from a table when table is not exists""" + """Delete record from a table when table is not exists.""" try: _dbmanager.delete_record(table="STUDENTSS", primary_key="1010") assert False @@ -175,10 +175,10 @@ def test_delete_record_2(): assert True def test_finally(): - """deletes the database""" + """Delete the database.""" delete_db() def delete_db(database="test.db"): - """close connection and deletes the database""" + """Close connection and deletes the database.""" _dbmanager.close_connection() os.remove(database) From 918cba5fcea7ca5a95647e3d47f744bb9dfc3704 Mon Sep 17 00:00:00 2001 From: Tharindu N Date: Mon, 28 Feb 2022 11:07:48 +0530 Subject: [PATCH 2/3] Fixed: Possible SQL injection vector through string-based query construction --- ReallySimpleDB/manager.py | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/ReallySimpleDB/manager.py b/ReallySimpleDB/manager.py index d0c30f5..9338ec6 100644 --- a/ReallySimpleDB/manager.py +++ b/ReallySimpleDB/manager.py @@ -79,7 +79,7 @@ def add_columns(self, # column to an existing table. self.create_connection(database=database) cursor = self.connection.cursor() - sql_cmd = "ALTER TABLE {} ADD COLUMN {} {}".format(table, column_name, datatype) + sql_cmd = "ALTER TABLE " + table + " ADD COLUMN " + column_name + " " + datatype if not_null: sql_cmd += " NOT NULL" if primary_key: @@ -90,7 +90,7 @@ def add_columns(self, # if table is not defines, it means that the user is trying to add / define # a column to a new table. so the following code add SQL syntax globally for # use when creating new table - self._add_columns_cmd += (",{} {}".format(column_name, datatype)) + self._add_columns_cmd += "," + column_name + " " + datatype if primary_key: self._add_columns_cmd += " PRIMARY KEY" @@ -114,7 +114,7 @@ def create_table(self, table_name:str, database:str=""): if self._add_columns_cmd == "": raise NotImplementedError("call 'add_columns' function before create table") - sql_cmd = "CREATE TABLE {} ({})".format(table_name, self._add_columns_cmd[1:]) + sql_cmd = "CREATE TABLE " + table_name + " (" + self._add_columns_cmd[1:] + ")" self.connection.execute(sql_cmd) return True @@ -153,7 +153,7 @@ def delete_table(self, table:str, database:str=""): if self.is_table(table_name=table): cursor = self.connection.cursor() - sql_cmd = "DROP TABLE {};".format(table) + sql_cmd = "DROP TABLE " + table + ";" cursor.execute(sql_cmd) return True @@ -173,7 +173,7 @@ def get_all_column_types(self, table:str, database:str=""): if self.is_table(table_name=table, database=database): cursor = self.connection.cursor() - sql_cmd = "PRAGMA TABLE_INFO({});".format(table) + sql_cmd = "PRAGMA TABLE_INFO(" + table + ");" fetch = cursor.execute(sql_cmd) data_dict = {} @@ -223,8 +223,8 @@ def get_primary_key(self, table:str, database:str=""): if self.is_table(table_name=table, database=database): cursor = self.connection.cursor() - sql_cmd = "SELECT * FROM pragma_table_info('{}') WHERE pk;".format(table) - fetch = cursor.execute(sql_cmd) + sql_cmd = "SELECT * FROM pragma_table_info(?) WHERE pk;" + fetch = cursor.execute(sql_cmd, (table,)) return fetch.fetchall()[0][1] # raise OperationalError if the given table not exists @@ -250,7 +250,7 @@ def add_record(self, table:str, record, database:str=""): all_columns[column] = "" fields = [] - sql_cmd = "INSERT INTO {} VALUES(".format(table) + sql_cmd = "INSERT INTO " + table + " VALUES(" # if record is dict type,.. if isinstance(record, dict): @@ -296,8 +296,7 @@ def get_record(self, table:str, primary_key, database:str=""): if self.is_table(table_name=table, database=database): cursor = self.connection.cursor() - sql_cmd = "SELECT * FROM {} WHERE {}=?;".format( - table, self.get_primary_key(table=table, database=database)) + sql_cmd = "SELECT * FROM " + table + " WHERE " + self.get_primary_key(table=table, database=database) + "=?;" fetch = cursor.execute(sql_cmd, (primary_key,)) # get columns list using get_columns @@ -329,7 +328,7 @@ def get_all_records(self, table:str, database:str=""): if self.is_table(table_name=table, database=database): cursor = self.connection.cursor() - sql_cmd = "SELECT * FROM {}".format(table) + sql_cmd = "SELECT * FROM " + table cursor.execute(sql_cmd) rows = cursor.fetchall() @@ -360,8 +359,7 @@ def delete_record(self, table:str, primary_key, database:str=""): if self.is_table(table_name=table, database=database): cursor = self.connection.cursor() - sql = "DELETE FROM {} WHERE {}=?".format( - table, self.get_primary_key(table=table, database=database)) + sql = "DELETE FROM " + table + " WHERE " + self.get_primary_key(table=table, database=database) + "=?" cursor.execute(sql, (primary_key,)) self.connection.commit() @@ -387,7 +385,7 @@ def filter_records(self, table:str, values:dict, database:str=""): operators = [">", "<", "!", "="] - sql = "SELECT * FROM {} WHERE ".format(table) + sql = "SELECT * FROM " + table + " WHERE " for value in values: try: From a78a296d90d98b56c184317faaa843d8e445f95b Mon Sep 17 00:00:00 2001 From: Tharindu N Date: Mon, 28 Feb 2022 11:09:39 +0530 Subject: [PATCH 3/3] version 1.2 --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index f938ab4..568c008 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ setup( name="ReallySimpleDB", - version="1.1", + version="1.2", description="A tool for easily manage databases with Python", long_description=long_description, long_description_content_type="text/markdown",