From 6f8add5b23b96a99895aaa994ce9aa938f7dfaf3 Mon Sep 17 00:00:00 2001 From: Jiashen Cao Date: Wed, 6 Sep 2023 15:16:04 -0400 Subject: [PATCH 1/5] update --- evadb/catalog/sql_config.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/evadb/catalog/sql_config.py b/evadb/catalog/sql_config.py index efba395b8f..da4acf6042 100644 --- a/evadb/catalog/sql_config.py +++ b/evadb/catalog/sql_config.py @@ -21,8 +21,13 @@ from evadb.utils.generic_utils import is_postgres_uri, parse_config_yml +# Permanent identifier column. IDENTIFIER_COLUMN = "_row_id" +# Runtime generated column. +ROW_NUM_COLUMN = "_row_number" +ROW_NUM_MAGIC = 0xffffffff + CATALOG_TABLES = [ "column_catalog", "table_catalog", From d6514c7fcf2993fd093293a29ea3e159756ad560 Mon Sep 17 00:00:00 2001 From: Jiashen Cao Date: Thu, 7 Sep 2023 09:52:32 -0400 Subject: [PATCH 2/5] update --- evadb/catalog/sql_config.py | 2 +- .../long/test_single_document_similarity.py | 118 ++++++++++++++++++ 2 files changed, 119 insertions(+), 1 deletion(-) create mode 100644 test/integration_tests/long/test_single_document_similarity.py diff --git a/evadb/catalog/sql_config.py b/evadb/catalog/sql_config.py index da4acf6042..f4893ba997 100644 --- a/evadb/catalog/sql_config.py +++ b/evadb/catalog/sql_config.py @@ -26,7 +26,7 @@ # Runtime generated column. ROW_NUM_COLUMN = "_row_number" -ROW_NUM_MAGIC = 0xffffffff +ROW_NUM_MAGIC = 0xFFFFFFFF CATALOG_TABLES = [ "column_catalog", diff --git a/test/integration_tests/long/test_single_document_similarity.py b/test/integration_tests/long/test_single_document_similarity.py new file mode 100644 index 0000000000..bc3e031471 --- /dev/null +++ b/test/integration_tests/long/test_single_document_similarity.py @@ -0,0 +1,118 @@ +# coding=utf-8 +# Copyright 2018-2023 EvaDB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +import unittest +from copy import deepcopy +from test.markers import gpu_skip_marker, qdrant_skip_marker +from test.util import ( + create_sample_image, + get_evadb_for_testing, + load_functions_for_testing, + shutdown_ray, +) + +import numpy as np +import pandas as pd +import pytest + +from evadb.models.storage.batch import Batch +from evadb.server.command_handler import execute_query_fetch_all +from evadb.storage.storage_engine import StorageEngine +from evadb.utils.generic_utils import try_to_import_fitz + + +@pytest.mark.notparallel +class SingleDocumentSimilarityTests(unittest.TestCase): + def setUp(self): + self.evadb = get_evadb_for_testing() + self.evadb.catalog().reset() + + load_functions_for_testing(self.evadb) + + def test_single_pdf_should_work(self): + try_to_import_fitz() + import fitz + + text_list = [ + "EvaDB is an AI-powered database", + "I love playing switch when I am not doing research", + "Playing basketball is a good exercise", + ] + + # Create a PDF. + doc = fitz.open() + for text in text_list: + doc.insert_page( + -1, + text=text, + fontsize=11, + width=595, + height=842, + fontname="Helvetica", + color=(0, 0, 0), + ) + doc.save("test.pdf") + + # Check PDF read. + all_text = set(deepcopy(text_list)) + execute_query_fetch_all( + self.evadb, + "LOAD PDF 'test.pdf' INTO MyPDF", + ) + res_batch = execute_query_fetch_all(self.evadb, "SELECT * FROM MyPDF") + for i in range(len(res_batch)): + all_text.remove(res_batch.frames["mypdf.data"][i]) + self.assertEqual(len(all_text), 0) + + # Create feature extrator. + execute_query_fetch_all( + self.evadb, "DROP FUNCTION IF EXISTS SentenceFeatureExtractor" + ) + execute_query_fetch_all( + self.evadb, + "CREATE FUNCTION SentenceFeatureExtractor IMPL 'evadb/functions/sentence_feature_extractor.py'", + ) + + # Create index. + execute_query_fetch_all( + self.evadb, + """ + CREATE INDEX qdrant_index + ON MyPDF (SentenceFeatureExtractor(data)) + USING FAISS + """, + ) + + # Ensure index scan is used. + query = """ + SELECT data + FROM MyPDF + ORDER BY Similarity(SentenceFeatureExtractor('{}'), SentenceFeatureExtractor(data)) + LIMIT 1 + """ + res_batch = execute_query_fetch_all(self.evadb, f"EXPLAIN {query.format('xx')}") + self.assertTrue("IndexScan" in res_batch.frames[0][0]) + + # Search top match. + all_text = set(deepcopy(text_list)) + for text in text_list: + res_batch = execute_query_fetch_all(self.evadb, query.format(text)) + res_text = res_batch.frames["mypdf.data"][0] + self.assertTrue(res_text in all_text) + all_text.remove(res_text) + + # Remove PDF and table. + execute_query_fetch_all(self.evadb, "DROP TABLE IF EXISTS MyPDF") + os.remove("test.pdf") From a29b1f0d729d96d56c40fed3d17c81bbe275e790 Mon Sep 17 00:00:00 2001 From: Jiashen Cao Date: Thu, 7 Sep 2023 21:57:28 -0400 Subject: [PATCH 3/5] add row number to all and fix tests --- evadb/executor/create_index_executor.py | 6 ++--- evadb/executor/vector_index_scan_executor.py | 22 +++++++++---------- evadb/readers/decord_reader.py | 3 +++ evadb/readers/document/document_reader.py | 5 ++++- evadb/readers/pdf_reader.py | 4 ++++ evadb/storage/document_storage_engine.py | 4 +++- evadb/storage/image_storage_engine.py | 4 +++- evadb/storage/pdf_storage_engine.py | 4 +++- evadb/storage/sqlite_storage_engine.py | 9 ++++---- evadb/storage/video_storage_engine.py | 4 +++- test/unit_tests/readers/test_decord_reader.py | 5 ++++- .../storage/test_sqlite_storage_engine.py | 4 +++- test/util.py | 1 + 13 files changed, 50 insertions(+), 25 deletions(-) diff --git a/evadb/executor/create_index_executor.py b/evadb/executor/create_index_executor.py index 0dee98b163..4bc30183c3 100644 --- a/evadb/executor/create_index_executor.py +++ b/evadb/executor/create_index_executor.py @@ -16,7 +16,7 @@ import pandas as pd -from evadb.catalog.sql_config import IDENTIFIER_COLUMN +from evadb.catalog.sql_config import ROW_NUM_COLUMN from evadb.database import EvaDBDatabase from evadb.executor.abstract_executor import AbstractExecutor from evadb.executor.executor_utils import ExecutorError, handle_vector_store_params @@ -87,7 +87,7 @@ def _create_index(self): # array. Use zero index to get the actual numpy array. feat = input_batch.column_as_numpy_array(feat_col_name) - row_id = input_batch.column_as_numpy_array(IDENTIFIER_COLUMN) + row_num = input_batch.column_as_numpy_array(ROW_NUM_COLUMN) for i in range(len(input_batch)): row_feat = feat[i].reshape(1, -1) @@ -103,7 +103,7 @@ def _create_index(self): self.index.create(input_dim) # Row ID for mapping back to the row. - self.index.add([FeaturePayload(row_id[i], row_feat)]) + self.index.add([FeaturePayload(row_num[i], row_feat)]) # Persist index. self.index.persist() diff --git a/evadb/executor/vector_index_scan_executor.py b/evadb/executor/vector_index_scan_executor.py index eaf675dbb6..6c74fef534 100644 --- a/evadb/executor/vector_index_scan_executor.py +++ b/evadb/executor/vector_index_scan_executor.py @@ -16,7 +16,7 @@ import pandas as pd -from evadb.catalog.sql_config import IDENTIFIER_COLUMN +from evadb.catalog.sql_config import ROW_NUM_COLUMN from evadb.database import EvaDBDatabase from evadb.executor.abstract_executor import AbstractExecutor from evadb.executor.executor_utils import handle_vector_store_params @@ -27,11 +27,11 @@ from evadb.utils.logging_manager import logger -# Helper function for getting row_id column alias. -def get_row_id_column_alias(column_list): +# Helper function for getting row_num column alias. +def get_row_num_column_alias(column_list): for column in column_list: alias, col_name = column.split(".") - if col_name == IDENTIFIER_COLUMN: + if col_name == ROW_NUM_COLUMN: return alias @@ -74,10 +74,10 @@ def exec(self, *args, **kwargs) -> Iterator[Batch]: ) # todo support queries over distance as well # distance_list = index_result.similarities - row_id_np = index_result.ids + row_num_np = index_result.ids # Load projected columns from disk and join with search results. - row_id_col_name = None + row_num_col_name = None # handle the case where the index_results are less than self.limit_count.value num_required_results = self.limit_count.value @@ -90,14 +90,14 @@ def exec(self, *args, **kwargs) -> Iterator[Batch]: res_row_list = [None for _ in range(num_required_results)] for batch in self.children[0].exec(**kwargs): column_list = batch.columns - if not row_id_col_name: - row_id_alias = get_row_id_column_alias(column_list) - row_id_col_name = "{}.{}".format(row_id_alias, IDENTIFIER_COLUMN) + if not row_num_col_name: + row_num_alias = get_row_num_column_alias(column_list) + row_num_col_name = "{}.{}".format(row_num_alias, ROW_NUM_COLUMN) # Nested join. for _, row in batch.frames.iterrows(): - for idx, rid in enumerate(row_id_np): - if rid == row[row_id_col_name]: + for idx, row_num in enumerate(row_num_np): + if row_num == row[row_num_col_name]: res_row = dict() for col_name in column_list: res_row[col_name] = row[col_name] diff --git a/evadb/readers/decord_reader.py b/evadb/readers/decord_reader.py index cfaaea577d..3221aeabda 100644 --- a/evadb/readers/decord_reader.py +++ b/evadb/readers/decord_reader.py @@ -23,6 +23,7 @@ from evadb.readers.abstract_reader import AbstractReader from evadb.utils.generic_utils import try_to_import_decord from evadb.utils.logging_manager import logger +from evadb.catalog.sql_config import ROW_NUM_COLUMN class DecordReader(AbstractReader): @@ -126,6 +127,7 @@ def __get_video_frame(self, frame_id): return { VideoColumnName.id.name: frame_id, + ROW_NUM_COLUMN: frame_id, VideoColumnName.data.name: frame_video, VideoColumnName.seconds.name: round(timestamp, 2), } @@ -136,6 +138,7 @@ def __get_audio_frame(self, frame_id): return { VideoColumnName.id.name: frame_id, + ROW_NUM_COLUMN: frame_id, VideoColumnName.data.name: np.empty(0), VideoColumnName.seconds.name: 0.0, VideoColumnName.audio.name: frame_audio, diff --git a/evadb/readers/document/document_reader.py b/evadb/readers/document/document_reader.py index 9011424a1c..e0a183b836 100644 --- a/evadb/readers/document/document_reader.py +++ b/evadb/readers/document/document_reader.py @@ -20,6 +20,7 @@ _lazy_import_loader, _lazy_import_text_splitter, ) +from evadb.catalog.sql_config import ROW_NUM_COLUMN class DocumentReader(AbstractReader): @@ -44,8 +45,10 @@ def _read(self) -> Iterator[Dict]: chunk_size=self._chunk_size, chunk_overlap=self._chunk_overlap ) + row_num = 0 for data in loader.load(): for chunk_id, row in enumerate( langchain_text_splitter.split_documents([data]) ): - yield {"chunk_id": chunk_id, "data": row.page_content} + yield {"chunk_id": chunk_id, "data": row.page_content, ROW_NUM_COLUMN: row_num} + row_num += 1 diff --git a/evadb/readers/pdf_reader.py b/evadb/readers/pdf_reader.py index e102e393f6..dfbbd3ecbd 100644 --- a/evadb/readers/pdf_reader.py +++ b/evadb/readers/pdf_reader.py @@ -16,6 +16,7 @@ from evadb.readers.abstract_reader import AbstractReader from evadb.utils.generic_utils import try_to_import_fitz +from evadb.catalog.sql_config import ROW_NUM_COLUMN class PDFReader(AbstractReader): @@ -35,6 +36,7 @@ def _read(self) -> Iterator[Dict]: doc = fitz.open(self.file_url) # PAGE ID, PARAGRAPH ID, STRING + row_num = 0 for page_no, page in enumerate(doc): blocks = page.get_text("dict")["blocks"] # iterate through the text blocks @@ -51,7 +53,9 @@ def _read(self) -> Iterator[Dict]: if span["text"].strip(): block_string += span["text"] yield { + ROW_NUM_COLUMN: row_num, "page": page_no + 1, "paragraph": paragraph_no + 1, "data": block_string, } + row_num += 1 diff --git a/evadb/storage/document_storage_engine.py b/evadb/storage/document_storage_engine.py index 6d213bbe85..4e40510cb6 100644 --- a/evadb/storage/document_storage_engine.py +++ b/evadb/storage/document_storage_engine.py @@ -20,6 +20,7 @@ from evadb.models.storage.batch import Batch from evadb.readers.document.document_reader import DocumentReader from evadb.storage.abstract_media_storage_engine import AbstractMediaStorageEngine +from evadb.catalog.sql_config import ROW_NUM_COLUMN, ROW_NUM_MAGIC class DocumentStorageEngine(AbstractMediaStorageEngine): @@ -28,7 +29,7 @@ def __init__(self, db: EvaDBDatabase): def read(self, table: TableCatalogEntry, chunk_params: dict) -> Iterator[Batch]: for doc_files in self._rdb_handler.read(self._get_metadata_table(table), 12): - for _, (row_id, file_name) in doc_files.iterrows(): + for _, (row_id, file_name, _) in doc_files.iterrows(): system_file_name = self._xform_file_url_to_file_name(file_name) doc_file = Path(table.file_url) / system_file_name # setting batch_mem_size = 1, we need fix it @@ -38,4 +39,5 @@ def read(self, table: TableCatalogEntry, chunk_params: dict) -> Iterator[Batch]: for batch in reader.read(): batch.frames[table.columns[0].name] = row_id batch.frames[table.columns[1].name] = str(file_name) + batch.frames[ROW_NUM_COLUMN] = row_id * ROW_NUM_MAGIC + batch.frames[ROW_NUM_COLUMN] yield batch diff --git a/evadb/storage/image_storage_engine.py b/evadb/storage/image_storage_engine.py index eb07bd8ef6..b1e0a19692 100644 --- a/evadb/storage/image_storage_engine.py +++ b/evadb/storage/image_storage_engine.py @@ -20,6 +20,7 @@ from evadb.models.storage.batch import Batch from evadb.readers.image.opencv_image_reader import CVImageReader from evadb.storage.abstract_media_storage_engine import AbstractMediaStorageEngine +from evadb.catalog.sql_config import ROW_NUM_COLUMN class ImageStorageEngine(AbstractMediaStorageEngine): @@ -28,7 +29,7 @@ def __init__(self, db: EvaDBDatabase): def read(self, table: TableCatalogEntry) -> Iterator[Batch]: for image_files in self._rdb_handler.read(self._get_metadata_table(table)): - for _, (row_id, file_name) in image_files.iterrows(): + for _, (row_id, file_name, _) in image_files.iterrows(): system_file_name = self._xform_file_url_to_file_name(file_name) image_file = Path(table.file_url) / system_file_name # setting batch_mem_size = 1, we need fix it @@ -36,4 +37,5 @@ def read(self, table: TableCatalogEntry) -> Iterator[Batch]: for batch in reader.read(): batch.frames[table.columns[0].name] = row_id batch.frames[table.columns[1].name] = str(file_name) + batch.frames[ROW_NUM_COLUMN] = batch.frames[table.columns[0].name] yield batch diff --git a/evadb/storage/pdf_storage_engine.py b/evadb/storage/pdf_storage_engine.py index 79f9db50ab..cceab82c07 100644 --- a/evadb/storage/pdf_storage_engine.py +++ b/evadb/storage/pdf_storage_engine.py @@ -20,6 +20,7 @@ from evadb.models.storage.batch import Batch from evadb.readers.pdf_reader import PDFReader from evadb.storage.abstract_media_storage_engine import AbstractMediaStorageEngine +from evadb.catalog.sql_config import ROW_NUM_COLUMN, ROW_NUM_MAGIC class PDFStorageEngine(AbstractMediaStorageEngine): @@ -28,7 +29,7 @@ def __init__(self, db: EvaDBDatabase): def read(self, table: TableCatalogEntry) -> Iterator[Batch]: for image_files in self._rdb_handler.read(self._get_metadata_table(table), 12): - for _, (row_id, file_name) in image_files.iterrows(): + for _, (row_id, file_name, _) in image_files.iterrows(): system_file_name = self._xform_file_url_to_file_name(file_name) image_file = Path(table.file_url) / system_file_name # setting batch_mem_size = 1, we need fix it @@ -36,4 +37,5 @@ def read(self, table: TableCatalogEntry) -> Iterator[Batch]: for batch in reader.read(): batch.frames[table.columns[0].name] = row_id batch.frames[table.columns[1].name] = str(file_name) + batch.frames[ROW_NUM_COLUMN] = row_id * ROW_NUM_MAGIC + batch.frames[ROW_NUM_COLUMN] yield batch diff --git a/evadb/storage/sqlite_storage_engine.py b/evadb/storage/sqlite_storage_engine.py index 494ffa0430..ba6c19fcf7 100644 --- a/evadb/storage/sqlite_storage_engine.py +++ b/evadb/storage/sqlite_storage_engine.py @@ -24,7 +24,7 @@ from evadb.catalog.models.column_catalog import ColumnCatalogEntry from evadb.catalog.models.table_catalog import TableCatalogEntry from evadb.catalog.schema_utils import SchemaUtils -from evadb.catalog.sql_config import IDENTIFIER_COLUMN +from evadb.catalog.sql_config import IDENTIFIER_COLUMN, ROW_NUM_COLUMN from evadb.database import EvaDBDatabase from evadb.models.storage.batch import Batch from evadb.parser.table_ref import TableInfo @@ -67,6 +67,7 @@ def _deserialize_sql_row(self, sql_row: dict, columns: List[ColumnCatalogEntry]) dict_row[col.name] = self._serializer.deserialize(sql_row[col.name]) else: dict_row[col.name] = sql_row[col.name] + dict_row[ROW_NUM_COLUMN] = dict_row[IDENTIFIER_COLUMN] return dict_row def _try_loading_table_via_reflection(self, table_name: str): @@ -94,7 +95,7 @@ def create(self, table: TableCatalogEntry, **kwargs): # During table creation, assume row_id is automatically handled by # the sqlalchemy engine. - table_columns = [col for col in table.columns if col.name != IDENTIFIER_COLUMN] + table_columns = [col for col in table.columns if (col.name != IDENTIFIER_COLUMN and col.name != ROW_NUM_COLUMN)] sqlalchemy_schema = SchemaUtils.xform_to_sqlalchemy_schema(table_columns) attr_dict.update(sqlalchemy_schema) @@ -148,12 +149,12 @@ def write(self, table: TableCatalogEntry, rows: Batch): # the sqlalchemy engine. Another assumption we make here is the # updated data need not to take care of row_id. table_columns = [ - col for col in table.columns if col.name != IDENTIFIER_COLUMN + col for col in table.columns if (col.name != IDENTIFIER_COLUMN and col.name != ROW_NUM_COLUMN) ] # Todo: validate the data type before inserting into the table for record in rows.frames.values: - row_data = {col: record[idx] for idx, col in enumerate(columns)} + row_data = {col: record[idx] for idx, col in enumerate(columns) if col != ROW_NUM_COLUMN} data.append(self._dict_to_sql_row(row_data, table_columns)) self._sql_session.execute(table_to_update.insert(), data) self._sql_session.commit() diff --git a/evadb/storage/video_storage_engine.py b/evadb/storage/video_storage_engine.py index 142005f711..00b0df2efb 100644 --- a/evadb/storage/video_storage_engine.py +++ b/evadb/storage/video_storage_engine.py @@ -22,6 +22,7 @@ from evadb.models.storage.batch import Batch from evadb.readers.decord_reader import DecordReader from evadb.storage.abstract_media_storage_engine import AbstractMediaStorageEngine +from evadb.catalog.sql_config import ROW_NUM_COLUMN, ROW_NUM_MAGIC class DecordStorageEngine(AbstractMediaStorageEngine): @@ -39,7 +40,7 @@ def read( read_video: bool = True, ) -> Iterator[Batch]: for video_files in self._rdb_handler.read(self._get_metadata_table(table), 12): - for _, (row_id, video_file_name) in video_files.iterrows(): + for _, (row_id, video_file_name, _) in video_files.iterrows(): system_file_name = self._xform_file_url_to_file_name(video_file_name) video_file = Path(table.file_url) / system_file_name # increase batch size when reading audio so that @@ -58,4 +59,5 @@ def read( for batch in reader.read(): batch.frames[table.columns[0].name] = row_id batch.frames[table.columns[1].name] = str(video_file_name) + batch.frames[ROW_NUM_COLUMN] = row_id * ROW_NUM_MAGIC + batch.frames[ROW_NUM_COLUMN] yield batch diff --git a/test/unit_tests/readers/test_decord_reader.py b/test/unit_tests/readers/test_decord_reader.py index a4d6163f68..ab81290fc1 100644 --- a/test/unit_tests/readers/test_decord_reader.py +++ b/test/unit_tests/readers/test_decord_reader.py @@ -59,7 +59,7 @@ def _batches_to_reader_convertor(self, batches): new_batches = [] for batch in batches: batch.drop_column_alias() - new_batches.append(batch.project(["id", "data", "seconds"])) + new_batches.append(batch.project(["id", "data", "seconds", "_row_number"])) return new_batches def test_should_sample_only_iframe(self): @@ -75,6 +75,9 @@ def test_should_sample_only_iframe(self): create_dummy_batches(filters=[i for i in range(0, NUM_FRAMES, k)]) ) + print(batches[0].frames) + print(expected[0].frames) + self.assertEqual(batches, expected) def test_should_sample_every_k_frame_with_predicate(self): diff --git a/test/unit_tests/storage/test_sqlite_storage_engine.py b/test/unit_tests/storage/test_sqlite_storage_engine.py index 11268d84e2..b4f1395326 100644 --- a/test/unit_tests/storage/test_sqlite_storage_engine.py +++ b/test/unit_tests/storage/test_sqlite_storage_engine.py @@ -26,6 +26,7 @@ from evadb.catalog.models.column_catalog import ColumnCatalogEntry from evadb.catalog.models.table_catalog import TableCatalogEntry from evadb.storage.sqlite_storage_engine import SQLStorageEngine +from evadb.catalog.sql_config import IDENTIFIER_COLUMN @pytest.mark.notparallel @@ -40,12 +41,13 @@ def create_sample_table(self): str(suffix_pytest_xdist_worker_id_to_dir("dataset")), table_type=TableType.VIDEO_DATA, ) + column_pk = ColumnCatalogEntry(IDENTIFIER_COLUMN, ColumnType.INTEGER, is_nullable=False) column_0 = ColumnCatalogEntry("name", ColumnType.TEXT, is_nullable=False) column_1 = ColumnCatalogEntry("id", ColumnType.INTEGER, is_nullable=False) column_2 = ColumnCatalogEntry( "data", ColumnType.NDARRAY, False, NdArrayType.UINT8, [2, 2, 3] ) - table_info.schema = [column_0, column_1, column_2] + table_info.columns = [column_pk, column_0, column_1, column_2] return table_info def setUp(self): diff --git a/test/util.py b/test/util.py index 39be10449c..4ffb2d9a2c 100644 --- a/test/util.py +++ b/test/util.py @@ -461,6 +461,7 @@ def create_dummy_batches( data.append( { "myvideo._row_id": 1, + "myvideo._row_number": i + start_id, "myvideo.name": os.path.join(video_dir, "dummy.avi"), "myvideo.id": i + start_id, "myvideo.data": np.array( From 74f247ecd8d1d59e4c02bf805a4ac63fd2964bb0 Mon Sep 17 00:00:00 2001 From: Jiashen Cao Date: Thu, 7 Sep 2023 22:21:05 -0400 Subject: [PATCH 4/5] update test --- test/unit_tests/readers/test_decord_reader.py | 12 ++++++------ test/util.py | 5 ++++- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/test/unit_tests/readers/test_decord_reader.py b/test/unit_tests/readers/test_decord_reader.py index ab81290fc1..f6fd7138c7 100644 --- a/test/unit_tests/readers/test_decord_reader.py +++ b/test/unit_tests/readers/test_decord_reader.py @@ -72,7 +72,7 @@ def test_should_sample_only_iframe(self): batches = list(video_loader.read()) expected = self._batches_to_reader_convertor( - create_dummy_batches(filters=[i for i in range(0, NUM_FRAMES, k)]) + create_dummy_batches(filters=[i for i in range(0, NUM_FRAMES, k)], is_from_storage=True) ) print(batches[0].frames) @@ -96,7 +96,7 @@ def test_should_sample_every_k_frame_with_predicate(self): value = NUM_FRAMES // 2 start = value + k - (value % k) if value % k else value expected = self._batches_to_reader_convertor( - create_dummy_batches(filters=[i for i in range(start, NUM_FRAMES, k)]) + create_dummy_batches(filters=[i for i in range(start, NUM_FRAMES, k)], is_from_storage=True) ) self.assertEqual(batches, expected) @@ -123,7 +123,7 @@ def test_should_sample_every_k_frame_with_predicate(self): batches = list(video_loader.read()) start = value + k - (value % k) if value % k else value expected = self._batches_to_reader_convertor( - create_dummy_batches(filters=[i for i in range(start, 8, k)]) + create_dummy_batches(filters=[i for i in range(start, 8, k)], is_from_storage=True) ) self.assertEqual(batches, expected) @@ -132,7 +132,7 @@ def test_should_return_one_batch(self): file_url=self.video_file_url, ) batches = list(video_loader.read()) - expected = self._batches_to_reader_convertor(create_dummy_batches()) + expected = self._batches_to_reader_convertor(create_dummy_batches(is_from_storage=True)) self.assertEqual(batches, expected) def test_should_return_batches_equivalent_to_number_of_frames(self): @@ -141,7 +141,7 @@ def test_should_return_batches_equivalent_to_number_of_frames(self): batch_mem_size=self.frame_size, ) batches = list(video_loader.read()) - expected = self._batches_to_reader_convertor(create_dummy_batches(batch_size=1)) + expected = self._batches_to_reader_convertor(create_dummy_batches(batch_size=1, is_from_storage=True)) self.assertEqual(batches, expected) def test_should_sample_every_k_frame(self): @@ -152,7 +152,7 @@ def test_should_sample_every_k_frame(self): ) batches = list(video_loader.read()) expected = self._batches_to_reader_convertor( - create_dummy_batches(filters=[i for i in range(0, NUM_FRAMES, k)]) + create_dummy_batches(filters=[i for i in range(0, NUM_FRAMES, k)], is_from_storage=True) ) self.assertEqual(batches, expected) diff --git a/test/util.py b/test/util.py index 4ffb2d9a2c..4e759b4f02 100644 --- a/test/util.py +++ b/test/util.py @@ -451,6 +451,7 @@ def create_dummy_batches( batch_size=10, start_id=0, video_dir=None, + is_from_storage=False, # if cover test directly from storage, it needs to append a _row_number ): video_dir = video_dir or get_tmp_dir() @@ -461,7 +462,6 @@ def create_dummy_batches( data.append( { "myvideo._row_id": 1, - "myvideo._row_number": i + start_id, "myvideo.name": os.path.join(video_dir, "dummy.avi"), "myvideo.id": i + start_id, "myvideo.data": np.array( @@ -471,6 +471,9 @@ def create_dummy_batches( } ) + if is_from_storage: + data[-1]["myvideo._row_number"] = i + start_id + if len(data) % batch_size == 0: yield Batch(pd.DataFrame(data)) data = [] From 181aa89c1744a236da51a8f7970f3a71a2dd0c3b Mon Sep 17 00:00:00 2001 From: Jiashen Cao Date: Thu, 7 Sep 2023 22:22:37 -0400 Subject: [PATCH 5/5] fix lint --- evadb/readers/decord_reader.py | 2 +- evadb/readers/document/document_reader.py | 8 ++++-- evadb/readers/pdf_reader.py | 2 +- evadb/storage/document_storage_engine.py | 6 ++-- evadb/storage/image_storage_engine.py | 2 +- evadb/storage/pdf_storage_engine.py | 6 ++-- evadb/storage/sqlite_storage_engine.py | 16 +++++++++-- evadb/storage/video_storage_engine.py | 6 ++-- .../long/test_single_document_similarity.py | 14 ++-------- test/unit_tests/readers/test_decord_reader.py | 28 +++++++++++++------ .../storage/test_sqlite_storage_engine.py | 6 ++-- test/util.py | 2 +- 12 files changed, 60 insertions(+), 38 deletions(-) diff --git a/evadb/readers/decord_reader.py b/evadb/readers/decord_reader.py index 3221aeabda..3097f4ea1a 100644 --- a/evadb/readers/decord_reader.py +++ b/evadb/readers/decord_reader.py @@ -17,13 +17,13 @@ import numpy as np from evadb.catalog.catalog_type import VideoColumnName +from evadb.catalog.sql_config import ROW_NUM_COLUMN from evadb.constants import AUDIORATE, IFRAMES from evadb.expression.abstract_expression import AbstractExpression from evadb.expression.expression_utils import extract_range_list_from_predicate from evadb.readers.abstract_reader import AbstractReader from evadb.utils.generic_utils import try_to_import_decord from evadb.utils.logging_manager import logger -from evadb.catalog.sql_config import ROW_NUM_COLUMN class DecordReader(AbstractReader): diff --git a/evadb/readers/document/document_reader.py b/evadb/readers/document/document_reader.py index e0a183b836..a11d0c7481 100644 --- a/evadb/readers/document/document_reader.py +++ b/evadb/readers/document/document_reader.py @@ -15,12 +15,12 @@ from pathlib import Path from typing import Dict, Iterator +from evadb.catalog.sql_config import ROW_NUM_COLUMN from evadb.readers.abstract_reader import AbstractReader from evadb.readers.document.registry import ( _lazy_import_loader, _lazy_import_text_splitter, ) -from evadb.catalog.sql_config import ROW_NUM_COLUMN class DocumentReader(AbstractReader): @@ -50,5 +50,9 @@ def _read(self) -> Iterator[Dict]: for chunk_id, row in enumerate( langchain_text_splitter.split_documents([data]) ): - yield {"chunk_id": chunk_id, "data": row.page_content, ROW_NUM_COLUMN: row_num} + yield { + "chunk_id": chunk_id, + "data": row.page_content, + ROW_NUM_COLUMN: row_num, + } row_num += 1 diff --git a/evadb/readers/pdf_reader.py b/evadb/readers/pdf_reader.py index dfbbd3ecbd..e7502d83d0 100644 --- a/evadb/readers/pdf_reader.py +++ b/evadb/readers/pdf_reader.py @@ -14,9 +14,9 @@ # limitations under the License. from typing import Dict, Iterator +from evadb.catalog.sql_config import ROW_NUM_COLUMN from evadb.readers.abstract_reader import AbstractReader from evadb.utils.generic_utils import try_to_import_fitz -from evadb.catalog.sql_config import ROW_NUM_COLUMN class PDFReader(AbstractReader): diff --git a/evadb/storage/document_storage_engine.py b/evadb/storage/document_storage_engine.py index 4e40510cb6..cd97c4ea31 100644 --- a/evadb/storage/document_storage_engine.py +++ b/evadb/storage/document_storage_engine.py @@ -16,11 +16,11 @@ from typing import Iterator from evadb.catalog.models.table_catalog import TableCatalogEntry +from evadb.catalog.sql_config import ROW_NUM_COLUMN, ROW_NUM_MAGIC from evadb.database import EvaDBDatabase from evadb.models.storage.batch import Batch from evadb.readers.document.document_reader import DocumentReader from evadb.storage.abstract_media_storage_engine import AbstractMediaStorageEngine -from evadb.catalog.sql_config import ROW_NUM_COLUMN, ROW_NUM_MAGIC class DocumentStorageEngine(AbstractMediaStorageEngine): @@ -39,5 +39,7 @@ def read(self, table: TableCatalogEntry, chunk_params: dict) -> Iterator[Batch]: for batch in reader.read(): batch.frames[table.columns[0].name] = row_id batch.frames[table.columns[1].name] = str(file_name) - batch.frames[ROW_NUM_COLUMN] = row_id * ROW_NUM_MAGIC + batch.frames[ROW_NUM_COLUMN] + batch.frames[ROW_NUM_COLUMN] = ( + row_id * ROW_NUM_MAGIC + batch.frames[ROW_NUM_COLUMN] + ) yield batch diff --git a/evadb/storage/image_storage_engine.py b/evadb/storage/image_storage_engine.py index b1e0a19692..88c1f28ebb 100644 --- a/evadb/storage/image_storage_engine.py +++ b/evadb/storage/image_storage_engine.py @@ -16,11 +16,11 @@ from typing import Iterator from evadb.catalog.models.table_catalog import TableCatalogEntry +from evadb.catalog.sql_config import ROW_NUM_COLUMN from evadb.database import EvaDBDatabase from evadb.models.storage.batch import Batch from evadb.readers.image.opencv_image_reader import CVImageReader from evadb.storage.abstract_media_storage_engine import AbstractMediaStorageEngine -from evadb.catalog.sql_config import ROW_NUM_COLUMN class ImageStorageEngine(AbstractMediaStorageEngine): diff --git a/evadb/storage/pdf_storage_engine.py b/evadb/storage/pdf_storage_engine.py index cceab82c07..fc5ac39764 100644 --- a/evadb/storage/pdf_storage_engine.py +++ b/evadb/storage/pdf_storage_engine.py @@ -16,11 +16,11 @@ from typing import Iterator from evadb.catalog.models.table_catalog import TableCatalogEntry +from evadb.catalog.sql_config import ROW_NUM_COLUMN, ROW_NUM_MAGIC from evadb.database import EvaDBDatabase from evadb.models.storage.batch import Batch from evadb.readers.pdf_reader import PDFReader from evadb.storage.abstract_media_storage_engine import AbstractMediaStorageEngine -from evadb.catalog.sql_config import ROW_NUM_COLUMN, ROW_NUM_MAGIC class PDFStorageEngine(AbstractMediaStorageEngine): @@ -37,5 +37,7 @@ def read(self, table: TableCatalogEntry) -> Iterator[Batch]: for batch in reader.read(): batch.frames[table.columns[0].name] = row_id batch.frames[table.columns[1].name] = str(file_name) - batch.frames[ROW_NUM_COLUMN] = row_id * ROW_NUM_MAGIC + batch.frames[ROW_NUM_COLUMN] + batch.frames[ROW_NUM_COLUMN] = ( + row_id * ROW_NUM_MAGIC + batch.frames[ROW_NUM_COLUMN] + ) yield batch diff --git a/evadb/storage/sqlite_storage_engine.py b/evadb/storage/sqlite_storage_engine.py index ba6c19fcf7..a1c90d412d 100644 --- a/evadb/storage/sqlite_storage_engine.py +++ b/evadb/storage/sqlite_storage_engine.py @@ -95,7 +95,11 @@ def create(self, table: TableCatalogEntry, **kwargs): # During table creation, assume row_id is automatically handled by # the sqlalchemy engine. - table_columns = [col for col in table.columns if (col.name != IDENTIFIER_COLUMN and col.name != ROW_NUM_COLUMN)] + table_columns = [ + col + for col in table.columns + if (col.name != IDENTIFIER_COLUMN and col.name != ROW_NUM_COLUMN) + ] sqlalchemy_schema = SchemaUtils.xform_to_sqlalchemy_schema(table_columns) attr_dict.update(sqlalchemy_schema) @@ -149,12 +153,18 @@ def write(self, table: TableCatalogEntry, rows: Batch): # the sqlalchemy engine. Another assumption we make here is the # updated data need not to take care of row_id. table_columns = [ - col for col in table.columns if (col.name != IDENTIFIER_COLUMN and col.name != ROW_NUM_COLUMN) + col + for col in table.columns + if (col.name != IDENTIFIER_COLUMN and col.name != ROW_NUM_COLUMN) ] # Todo: validate the data type before inserting into the table for record in rows.frames.values: - row_data = {col: record[idx] for idx, col in enumerate(columns) if col != ROW_NUM_COLUMN} + row_data = { + col: record[idx] + for idx, col in enumerate(columns) + if col != ROW_NUM_COLUMN + } data.append(self._dict_to_sql_row(row_data, table_columns)) self._sql_session.execute(table_to_update.insert(), data) self._sql_session.commit() diff --git a/evadb/storage/video_storage_engine.py b/evadb/storage/video_storage_engine.py index 00b0df2efb..bef5644bfe 100644 --- a/evadb/storage/video_storage_engine.py +++ b/evadb/storage/video_storage_engine.py @@ -17,12 +17,12 @@ from typing import Iterator from evadb.catalog.models.table_catalog import TableCatalogEntry +from evadb.catalog.sql_config import ROW_NUM_COLUMN, ROW_NUM_MAGIC from evadb.database import EvaDBDatabase from evadb.expression.abstract_expression import AbstractExpression from evadb.models.storage.batch import Batch from evadb.readers.decord_reader import DecordReader from evadb.storage.abstract_media_storage_engine import AbstractMediaStorageEngine -from evadb.catalog.sql_config import ROW_NUM_COLUMN, ROW_NUM_MAGIC class DecordStorageEngine(AbstractMediaStorageEngine): @@ -59,5 +59,7 @@ def read( for batch in reader.read(): batch.frames[table.columns[0].name] = row_id batch.frames[table.columns[1].name] = str(video_file_name) - batch.frames[ROW_NUM_COLUMN] = row_id * ROW_NUM_MAGIC + batch.frames[ROW_NUM_COLUMN] + batch.frames[ROW_NUM_COLUMN] = ( + row_id * ROW_NUM_MAGIC + batch.frames[ROW_NUM_COLUMN] + ) yield batch diff --git a/test/integration_tests/long/test_single_document_similarity.py b/test/integration_tests/long/test_single_document_similarity.py index bc3e031471..58a93e2383 100644 --- a/test/integration_tests/long/test_single_document_similarity.py +++ b/test/integration_tests/long/test_single_document_similarity.py @@ -15,21 +15,11 @@ import os import unittest from copy import deepcopy -from test.markers import gpu_skip_marker, qdrant_skip_marker -from test.util import ( - create_sample_image, - get_evadb_for_testing, - load_functions_for_testing, - shutdown_ray, -) +from test.util import get_evadb_for_testing, load_functions_for_testing -import numpy as np -import pandas as pd import pytest -from evadb.models.storage.batch import Batch from evadb.server.command_handler import execute_query_fetch_all -from evadb.storage.storage_engine import StorageEngine from evadb.utils.generic_utils import try_to_import_fitz @@ -97,7 +87,7 @@ def test_single_pdf_should_work(self): # Ensure index scan is used. query = """ - SELECT data + SELECT data FROM MyPDF ORDER BY Similarity(SentenceFeatureExtractor('{}'), SentenceFeatureExtractor(data)) LIMIT 1 diff --git a/test/unit_tests/readers/test_decord_reader.py b/test/unit_tests/readers/test_decord_reader.py index f6fd7138c7..5a11bafa01 100644 --- a/test/unit_tests/readers/test_decord_reader.py +++ b/test/unit_tests/readers/test_decord_reader.py @@ -72,12 +72,11 @@ def test_should_sample_only_iframe(self): batches = list(video_loader.read()) expected = self._batches_to_reader_convertor( - create_dummy_batches(filters=[i for i in range(0, NUM_FRAMES, k)], is_from_storage=True) + create_dummy_batches( + filters=[i for i in range(0, NUM_FRAMES, k)], is_from_storage=True + ) ) - print(batches[0].frames) - print(expected[0].frames) - self.assertEqual(batches, expected) def test_should_sample_every_k_frame_with_predicate(self): @@ -96,7 +95,10 @@ def test_should_sample_every_k_frame_with_predicate(self): value = NUM_FRAMES // 2 start = value + k - (value % k) if value % k else value expected = self._batches_to_reader_convertor( - create_dummy_batches(filters=[i for i in range(start, NUM_FRAMES, k)], is_from_storage=True) + create_dummy_batches( + filters=[i for i in range(start, NUM_FRAMES, k)], + is_from_storage=True, + ) ) self.assertEqual(batches, expected) @@ -123,7 +125,9 @@ def test_should_sample_every_k_frame_with_predicate(self): batches = list(video_loader.read()) start = value + k - (value % k) if value % k else value expected = self._batches_to_reader_convertor( - create_dummy_batches(filters=[i for i in range(start, 8, k)], is_from_storage=True) + create_dummy_batches( + filters=[i for i in range(start, 8, k)], is_from_storage=True + ) ) self.assertEqual(batches, expected) @@ -132,7 +136,9 @@ def test_should_return_one_batch(self): file_url=self.video_file_url, ) batches = list(video_loader.read()) - expected = self._batches_to_reader_convertor(create_dummy_batches(is_from_storage=True)) + expected = self._batches_to_reader_convertor( + create_dummy_batches(is_from_storage=True) + ) self.assertEqual(batches, expected) def test_should_return_batches_equivalent_to_number_of_frames(self): @@ -141,7 +147,9 @@ def test_should_return_batches_equivalent_to_number_of_frames(self): batch_mem_size=self.frame_size, ) batches = list(video_loader.read()) - expected = self._batches_to_reader_convertor(create_dummy_batches(batch_size=1, is_from_storage=True)) + expected = self._batches_to_reader_convertor( + create_dummy_batches(batch_size=1, is_from_storage=True) + ) self.assertEqual(batches, expected) def test_should_sample_every_k_frame(self): @@ -152,7 +160,9 @@ def test_should_sample_every_k_frame(self): ) batches = list(video_loader.read()) expected = self._batches_to_reader_convertor( - create_dummy_batches(filters=[i for i in range(0, NUM_FRAMES, k)], is_from_storage=True) + create_dummy_batches( + filters=[i for i in range(0, NUM_FRAMES, k)], is_from_storage=True + ) ) self.assertEqual(batches, expected) diff --git a/test/unit_tests/storage/test_sqlite_storage_engine.py b/test/unit_tests/storage/test_sqlite_storage_engine.py index b4f1395326..fc0058cd6c 100644 --- a/test/unit_tests/storage/test_sqlite_storage_engine.py +++ b/test/unit_tests/storage/test_sqlite_storage_engine.py @@ -25,8 +25,8 @@ from evadb.catalog.catalog_type import ColumnType, NdArrayType, TableType from evadb.catalog.models.column_catalog import ColumnCatalogEntry from evadb.catalog.models.table_catalog import TableCatalogEntry -from evadb.storage.sqlite_storage_engine import SQLStorageEngine from evadb.catalog.sql_config import IDENTIFIER_COLUMN +from evadb.storage.sqlite_storage_engine import SQLStorageEngine @pytest.mark.notparallel @@ -41,7 +41,9 @@ def create_sample_table(self): str(suffix_pytest_xdist_worker_id_to_dir("dataset")), table_type=TableType.VIDEO_DATA, ) - column_pk = ColumnCatalogEntry(IDENTIFIER_COLUMN, ColumnType.INTEGER, is_nullable=False) + column_pk = ColumnCatalogEntry( + IDENTIFIER_COLUMN, ColumnType.INTEGER, is_nullable=False + ) column_0 = ColumnCatalogEntry("name", ColumnType.TEXT, is_nullable=False) column_1 = ColumnCatalogEntry("id", ColumnType.INTEGER, is_nullable=False) column_2 = ColumnCatalogEntry( diff --git a/test/util.py b/test/util.py index 4e759b4f02..732aabc15a 100644 --- a/test/util.py +++ b/test/util.py @@ -451,7 +451,7 @@ def create_dummy_batches( batch_size=10, start_id=0, video_dir=None, - is_from_storage=False, # if cover test directly from storage, it needs to append a _row_number + is_from_storage=False, # if cover test directly from storage, it needs to append a _row_number ): video_dir = video_dir or get_tmp_dir()