Skip to content

Commit

Permalink
Ensure dataset context is cleared always
Browse files Browse the repository at this point in the history
  • Loading branch information
akariv committed Jun 11, 2024
1 parent c483877 commit e5523ae
Showing 1 changed file with 28 additions and 24 deletions.
52 changes: 28 additions & 24 deletions odds/backend/processor/dataset_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,32 +35,36 @@ async def wait(self):
async def process(self, dataset: Dataset, catalog: DataCatalog, datasetFilter: DatasetFilter, ctx: str):
if config.debug:
rts.set(ctx, f'PROCESS DATASET {dataset.versions.get('resource_analyzer')} {dataset.title}')
resources = self.prune_resources(dataset, ctx)
if await datasetFilter.analyze(dataset):
if len(resources) > 0:
await asyncio.gather(
*[
self.resource_processor.process(resource, dataset, catalog, ctx + f'/RES.{resource.file_format}[{i}]')
for i, resource in enumerate(resources)
]
)
else:
if config.debug:
rts.set(ctx, f'SKIP ANALYZE')
resources = [resource for resource in resources if resource.status_loaded]
if len(resources) > 0:
if await datasetFilter.describe(dataset):
await self.meta_describer.describe(dataset, ctx)
try:
resources = self.prune_resources(dataset, ctx)
if await datasetFilter.analyze(dataset):
if len(resources) > 0:
await asyncio.gather(
*[
self.resource_processor.process(resource, dataset, catalog, ctx + f'/RES.{resource.file_format}[{i}]')
for i, resource in enumerate(resources)
]
)
else:
if config.debug:
rts.set(ctx, f'SKIP DESCRIBE')
if await datasetFilter.embed(dataset):
await self.embedder.embed(dataset, ctx)
if await datasetFilter.index(dataset):
await self.indexer.index(dataset, ctx)
await store.storeDataset(dataset, ctx)
await db.storeDataset(dataset, ctx)
rts.clear(ctx)
rts.set(ctx, f'SKIP ANALYZE')
resources = [resource for resource in resources if resource.status_loaded]
if len(resources) > 0:
if await datasetFilter.describe(dataset):
await self.meta_describer.describe(dataset, ctx)
else:
if config.debug:
rts.set(ctx, f'SKIP DESCRIBE')
if await datasetFilter.embed(dataset):
await self.embedder.embed(dataset, ctx)
if await datasetFilter.index(dataset):
await self.indexer.index(dataset, ctx)
await store.storeDataset(dataset, ctx)
await db.storeDataset(dataset, ctx)
except Exception as e:
rts.set(ctx, f'ERROR {e}', 'error')
finally:
rts.clear(ctx)

def prune_resources(self, dataset: Dataset, ctx: str):
resources = dataset.resources
Expand Down

0 comments on commit e5523ae

Please sign in to comment.