From 139e835fab994888f4846b79d1c49a604db9b03c Mon Sep 17 00:00:00 2001 From: Adam Kariv Date: Tue, 11 Jun 2024 14:22:38 +0300 Subject: [PATCH] Revamp filters --- odds/backend/backend.py | 19 ++-- odds/backend/processor/dataset_processor.py | 8 +- odds/common/filters.py | 102 ++++++++++---------- 3 files changed, 61 insertions(+), 68 deletions(-) diff --git a/odds/backend/backend.py b/odds/backend/backend.py index 59e13b9..43ede07 100644 --- a/odds/backend/backend.py +++ b/odds/backend/backend.py @@ -20,7 +20,7 @@ def __init__(self): self.scanner_factory = ScannerFactory() self.catalogs = catalog_repo.load_catalogs() - async def scan(self, catalogFilter: CatalogFilter, datasetFilterCls, *datasetFilterArgs) -> None: + async def scan(self, catalogFilter: CatalogFilter, datasetFilter: DatasetFilter, *datasetFilterArgs) -> None: dataset_processor.set_concurrency(config.dataset_processor_concurrency_limit or 3) rts.clearAll() scanner_ctx = '' @@ -38,8 +38,7 @@ async def scan(self, catalogFilter: CatalogFilter, datasetFilterCls, *datasetFil if existing: existing.merge(dataset) dataset = existing - datasetFilter = datasetFilterCls(catalog, dataset, *datasetFilterArgs) - if await datasetFilter.consider(): + if await datasetFilter.consider(dataset): rts.set(cat_ctx, f'CONSIDER DATASET {dataset.id}') await db.storeDataset(dataset, ctx) dataset_processor.queue(dataset, catalog, datasetFilter, ctx) @@ -54,23 +53,21 @@ async def scan(self, catalogFilter: CatalogFilter, datasetFilterCls, *datasetFil await dataset_processor.wait() def scan_required(self) -> None: - asyncio.run(self.scan(CatalogFilter(), DatasetFilterIncomplete)) + asyncio.run(self.scan(CatalogFilter(), DatasetFilterIncomplete())) def scan_all(self) -> None: - asyncio.run(self.scan(CatalogFilter(), DatasetFilterForce)) + asyncio.run(self.scan(CatalogFilter(), DatasetFilterForce())) def scan_new(self) -> None: - asyncio.run(self.scan(CatalogFilter(), DatasetFilterNew)) + asyncio.run(self.scan(CatalogFilter(), DatasetFilterNew())) def scan_specific(self, catalogId: str = None, datasetId: str = None) -> None: if catalogId: catalogFilter = CatalogFilterById(catalogId) else: catalogFilter = CatalogFilter() - args = [] if datasetId: - datasetFilter = DatasetFilterById - args.append(datasetId) + datasetFilter = DatasetFilterById(datasetId) else: - datasetFilter = DatasetFilter - asyncio.run(self.scan(catalogFilter, datasetFilter, args)) \ No newline at end of file + datasetFilter = DatasetFilterIncomplete() + asyncio.run(self.scan(catalogFilter, datasetFilter)) \ No newline at end of file diff --git a/odds/backend/processor/dataset_processor.py b/odds/backend/processor/dataset_processor.py index 463c06f..8813f4f 100644 --- a/odds/backend/processor/dataset_processor.py +++ b/odds/backend/processor/dataset_processor.py @@ -36,7 +36,7 @@ async def process(self, dataset: Dataset, catalog: DataCatalog, datasetFilter: D if config.debug: rts.set(ctx, f'PROCESS DATASET {dataset.versions.get('resource_analyzer')} {dataset.title}') resources = self.prune_resources(dataset, ctx) - if await datasetFilter.analyze(): + if await datasetFilter.analyze(datasetFilter): if len(resources) > 0: await asyncio.gather( *[ @@ -49,14 +49,14 @@ async def process(self, dataset: Dataset, catalog: DataCatalog, datasetFilter: D rts.set(ctx, f'SKIP ANALYZE') resources = [resource for resource in resources if resource.status_loaded] if len(resources) > 0: - if await datasetFilter.describe(): + if await datasetFilter.describe(dataset): await self.meta_describer.describe(dataset, ctx) else: if config.debug: rts.set(ctx, f'SKIP DESCRIBE') - if await datasetFilter.embed(): + if await datasetFilter.embed(dataset): await self.embedder.embed(dataset, ctx) - if await datasetFilter.index(): + if await datasetFilter.index(dataset): await self.indexer.index(dataset, ctx) await store.storeDataset(dataset, ctx) await db.storeDataset(dataset, ctx) diff --git a/odds/common/filters.py b/odds/common/filters.py index 68b8aa5..0059eb4 100644 --- a/odds/common/filters.py +++ b/odds/common/filters.py @@ -4,49 +4,62 @@ class DatasetFilter: + async def consider(self, dataset: Dataset) -> bool: + return False + + async def analyze(self, dataset: Dataset) -> bool: + return True + + async def describe(self, dataset: Dataset) -> bool: + return True + + async def embed(self, dataset: Dataset) -> bool: + return True + + async def index(self, dataset: Dataset) -> bool: + return True + - def __init__(self, catalog: DataCatalog, dataset: Dataset): - self.catalog = catalog - self.dataset = dataset - self.analyzed = False - self.described = False - self.embedded = False - self.indexed = False +class DatasetFilterIncomplete(DatasetFilter): - async def consider(self) -> bool: - return True + def __init__(self) -> None: + super().__init__() + + async def consider(self, dataset: Dataset) -> bool: + if await self.analyze(dataset): + return True + if await self.describe(dataset): + return True + if await self.embed(dataset): + return True + if await self.index(dataset): + return True - async def analyze(self) -> bool: + async def analyze(self, dataset: Dataset) -> bool: # print('FILTER ANALYZE', dataset.id, len(dataset.resources), all([not r.status_selected for r in dataset.resources]), dataset.versions.get('resource_analyzer') != config.feature_versions.resource_analyzer) - self.analyzed = ( - len(self.dataset.resources) and - all([not r.status_selected for r in self.dataset.resources]) - ) or self.dataset.versions.get('resource_analyzer') != config.feature_versions.resource_analyzer - return self.analyzed + return ( + len(dataset.resources) and + all([not r.status_selected for r in dataset.resources]) + ) or dataset.versions.get('resource_analyzer') != config.feature_versions.resource_analyzer - async def describe(self) -> bool: + async def describe(self, dataset: Dataset) -> bool: # print('FILTER DESCRIBE', dataset.id, dataset.better_title is None, dataset.better_description is None, dataset.versions.get('meta_describer') != config.feature_versions.meta_describer) - self.described = ( - self.analyzed or - self.dataset.better_title is None or - self.dataset.better_description is None or - self.dataset.versions.get('meta_describer') != config.feature_versions.meta_describer + return ( + dataset.better_title is None or + dataset.better_description is None or + dataset.versions.get('meta_describer') != config.feature_versions.meta_describer ) - return self.described - async def embed(self) -> bool: + async def embed(self, dataset: Dataset) -> bool: # print('FILTER EMBED', dataset.id, dataset.embedding is None, dataset.versions.get('embedder') != config.feature_versions.embedder) - self.embedded = self.dataset.better_title is not None and ( - self.described or - self.dataset.status_embedding is None or - self.dataset.versions.get('embedder') != config.feature_versions.embedder or - await store.getEmbedding(self.dataset) is None + return dataset.better_title is not None and ( + dataset.status_embedding is None or + dataset.versions.get('embedder') != config.feature_versions.embedder or + await store.getEmbedding(dataset) is None ) - return self.embedded - async def index(self) -> bool: - self.indexed = self.embedded or not self.dataset.status_indexing or self.dataset.versions.get('indexer') != config.feature_versions.indexer - return self.indexed + async def index(self, dataset: Dataset) -> bool: + return not dataset.status_indexing or dataset.versions.get('indexer') != config.feature_versions.indexer class DatasetFilterNew(DatasetFilter): @@ -57,36 +70,19 @@ async def consider(self) -> bool: class DatasetFilterForce(DatasetFilter): - async def analyze(self) -> bool: + async def consider(self) -> bool: return True -class DatasetFilterById(DatasetFilterForce): - def __init__(self, catalog: DataCatalog, dataset: Dataset, datasetId: str): - super().__init__(catalog, dataset) +class DatasetFilterById(DatasetFilter): + def __init__(self, datasetId: str): + super().__init__() self.datasetId = datasetId async def consider(self) -> bool: return self.dataset.id == self.datasetId -class DatasetFilterIncomplete(DatasetFilter): - - async def consider(self) -> bool: - dataset = await store.getDataset(self.dataset.storeId()) - if dataset is None: - return True - if await self.analyze(): - return True - if await self.describe(): - return True - if await self.embed(): - return True - if await self.index(): - return True - return False - - class CatalogFilter: async def include(self, catalog: DataCatalog) -> bool: