Skip to content

Commit

Permalink
Revamp filters
Browse files Browse the repository at this point in the history
  • Loading branch information
akariv committed Jun 11, 2024
1 parent ebc878e commit 139e835
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 68 deletions.
19 changes: 8 additions & 11 deletions odds/backend/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def __init__(self):
self.scanner_factory = ScannerFactory()
self.catalogs = catalog_repo.load_catalogs()

async def scan(self, catalogFilter: CatalogFilter, datasetFilterCls, *datasetFilterArgs) -> None:
async def scan(self, catalogFilter: CatalogFilter, datasetFilter: DatasetFilter, *datasetFilterArgs) -> None:
dataset_processor.set_concurrency(config.dataset_processor_concurrency_limit or 3)
rts.clearAll()
scanner_ctx = ''
Expand All @@ -38,8 +38,7 @@ async def scan(self, catalogFilter: CatalogFilter, datasetFilterCls, *datasetFil
if existing:
existing.merge(dataset)
dataset = existing
datasetFilter = datasetFilterCls(catalog, dataset, *datasetFilterArgs)
if await datasetFilter.consider():
if await datasetFilter.consider(dataset):
rts.set(cat_ctx, f'CONSIDER DATASET {dataset.id}')
await db.storeDataset(dataset, ctx)
dataset_processor.queue(dataset, catalog, datasetFilter, ctx)
Expand All @@ -54,23 +53,21 @@ async def scan(self, catalogFilter: CatalogFilter, datasetFilterCls, *datasetFil
await dataset_processor.wait()

def scan_required(self) -> None:
asyncio.run(self.scan(CatalogFilter(), DatasetFilterIncomplete))
asyncio.run(self.scan(CatalogFilter(), DatasetFilterIncomplete()))

def scan_all(self) -> None:
asyncio.run(self.scan(CatalogFilter(), DatasetFilterForce))
asyncio.run(self.scan(CatalogFilter(), DatasetFilterForce()))

def scan_new(self) -> None:
asyncio.run(self.scan(CatalogFilter(), DatasetFilterNew))
asyncio.run(self.scan(CatalogFilter(), DatasetFilterNew()))

def scan_specific(self, catalogId: str = None, datasetId: str = None) -> None:
if catalogId:
catalogFilter = CatalogFilterById(catalogId)
else:
catalogFilter = CatalogFilter()
args = []
if datasetId:
datasetFilter = DatasetFilterById
args.append(datasetId)
datasetFilter = DatasetFilterById(datasetId)
else:
datasetFilter = DatasetFilter
asyncio.run(self.scan(catalogFilter, datasetFilter, args))
datasetFilter = DatasetFilterIncomplete()
asyncio.run(self.scan(catalogFilter, datasetFilter))
8 changes: 4 additions & 4 deletions odds/backend/processor/dataset_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async def process(self, dataset: Dataset, catalog: DataCatalog, datasetFilter: D
if config.debug:
rts.set(ctx, f'PROCESS DATASET {dataset.versions.get('resource_analyzer')} {dataset.title}')
resources = self.prune_resources(dataset, ctx)
if await datasetFilter.analyze():
if await datasetFilter.analyze(datasetFilter):
if len(resources) > 0:
await asyncio.gather(
*[
Expand All @@ -49,14 +49,14 @@ async def process(self, dataset: Dataset, catalog: DataCatalog, datasetFilter: D
rts.set(ctx, f'SKIP ANALYZE')
resources = [resource for resource in resources if resource.status_loaded]
if len(resources) > 0:
if await datasetFilter.describe():
if await datasetFilter.describe(dataset):
await self.meta_describer.describe(dataset, ctx)
else:
if config.debug:
rts.set(ctx, f'SKIP DESCRIBE')
if await datasetFilter.embed():
if await datasetFilter.embed(dataset):
await self.embedder.embed(dataset, ctx)
if await datasetFilter.index():
if await datasetFilter.index(dataset):
await self.indexer.index(dataset, ctx)
await store.storeDataset(dataset, ctx)
await db.storeDataset(dataset, ctx)
Expand Down
102 changes: 49 additions & 53 deletions odds/common/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,49 +4,62 @@


class DatasetFilter:
async def consider(self, dataset: Dataset) -> bool:
return False

async def analyze(self, dataset: Dataset) -> bool:
return True

async def describe(self, dataset: Dataset) -> bool:
return True

async def embed(self, dataset: Dataset) -> bool:
return True

async def index(self, dataset: Dataset) -> bool:
return True


def __init__(self, catalog: DataCatalog, dataset: Dataset):
self.catalog = catalog
self.dataset = dataset
self.analyzed = False
self.described = False
self.embedded = False
self.indexed = False
class DatasetFilterIncomplete(DatasetFilter):

async def consider(self) -> bool:
return True
def __init__(self) -> None:
super().__init__()

async def consider(self, dataset: Dataset) -> bool:
if await self.analyze(dataset):
return True
if await self.describe(dataset):
return True
if await self.embed(dataset):
return True
if await self.index(dataset):
return True

async def analyze(self) -> bool:
async def analyze(self, dataset: Dataset) -> bool:
# print('FILTER ANALYZE', dataset.id, len(dataset.resources), all([not r.status_selected for r in dataset.resources]), dataset.versions.get('resource_analyzer') != config.feature_versions.resource_analyzer)
self.analyzed = (
len(self.dataset.resources) and
all([not r.status_selected for r in self.dataset.resources])
) or self.dataset.versions.get('resource_analyzer') != config.feature_versions.resource_analyzer
return self.analyzed
return (
len(dataset.resources) and
all([not r.status_selected for r in dataset.resources])
) or dataset.versions.get('resource_analyzer') != config.feature_versions.resource_analyzer

async def describe(self) -> bool:
async def describe(self, dataset: Dataset) -> bool:
# print('FILTER DESCRIBE', dataset.id, dataset.better_title is None, dataset.better_description is None, dataset.versions.get('meta_describer') != config.feature_versions.meta_describer)
self.described = (
self.analyzed or
self.dataset.better_title is None or
self.dataset.better_description is None or
self.dataset.versions.get('meta_describer') != config.feature_versions.meta_describer
return (
dataset.better_title is None or
dataset.better_description is None or
dataset.versions.get('meta_describer') != config.feature_versions.meta_describer
)
return self.described

async def embed(self) -> bool:
async def embed(self, dataset: Dataset) -> bool:
# print('FILTER EMBED', dataset.id, dataset.embedding is None, dataset.versions.get('embedder') != config.feature_versions.embedder)
self.embedded = self.dataset.better_title is not None and (
self.described or
self.dataset.status_embedding is None or
self.dataset.versions.get('embedder') != config.feature_versions.embedder or
await store.getEmbedding(self.dataset) is None
return dataset.better_title is not None and (
dataset.status_embedding is None or
dataset.versions.get('embedder') != config.feature_versions.embedder or
await store.getEmbedding(dataset) is None
)
return self.embedded

async def index(self) -> bool:
self.indexed = self.embedded or not self.dataset.status_indexing or self.dataset.versions.get('indexer') != config.feature_versions.indexer
return self.indexed
async def index(self, dataset: Dataset) -> bool:
return not dataset.status_indexing or dataset.versions.get('indexer') != config.feature_versions.indexer


class DatasetFilterNew(DatasetFilter):
Expand All @@ -57,36 +70,19 @@ async def consider(self) -> bool:

class DatasetFilterForce(DatasetFilter):

async def analyze(self) -> bool:
async def consider(self) -> bool:
return True


class DatasetFilterById(DatasetFilterForce):
def __init__(self, catalog: DataCatalog, dataset: Dataset, datasetId: str):
super().__init__(catalog, dataset)
class DatasetFilterById(DatasetFilter):
def __init__(self, datasetId: str):
super().__init__()
self.datasetId = datasetId

async def consider(self) -> bool:
return self.dataset.id == self.datasetId


class DatasetFilterIncomplete(DatasetFilter):

async def consider(self) -> bool:
dataset = await store.getDataset(self.dataset.storeId())
if dataset is None:
return True
if await self.analyze():
return True
if await self.describe():
return True
if await self.embed():
return True
if await self.index():
return True
return False


class CatalogFilter:

async def include(self, catalog: DataCatalog) -> bool:
Expand Down

0 comments on commit 139e835

Please sign in to comment.