diff --git a/odds/backend/processor/dataset_processor.py b/odds/backend/processor/dataset_processor.py index 39a126a..5e063c2 100644 --- a/odds/backend/processor/dataset_processor.py +++ b/odds/backend/processor/dataset_processor.py @@ -35,32 +35,36 @@ async def wait(self): async def process(self, dataset: Dataset, catalog: DataCatalog, datasetFilter: DatasetFilter, ctx: str): if config.debug: rts.set(ctx, f'PROCESS DATASET {dataset.versions.get('resource_analyzer')} {dataset.title}') - resources = self.prune_resources(dataset, ctx) - if await datasetFilter.analyze(dataset): - if len(resources) > 0: - await asyncio.gather( - *[ - self.resource_processor.process(resource, dataset, catalog, ctx + f'/RES.{resource.file_format}[{i}]') - for i, resource in enumerate(resources) - ] - ) - else: - if config.debug: - rts.set(ctx, f'SKIP ANALYZE') - resources = [resource for resource in resources if resource.status_loaded] - if len(resources) > 0: - if await datasetFilter.describe(dataset): - await self.meta_describer.describe(dataset, ctx) + try: + resources = self.prune_resources(dataset, ctx) + if await datasetFilter.analyze(dataset): + if len(resources) > 0: + await asyncio.gather( + *[ + self.resource_processor.process(resource, dataset, catalog, ctx + f'/RES.{resource.file_format}[{i}]') + for i, resource in enumerate(resources) + ] + ) else: if config.debug: - rts.set(ctx, f'SKIP DESCRIBE') - if await datasetFilter.embed(dataset): - await self.embedder.embed(dataset, ctx) - if await datasetFilter.index(dataset): - await self.indexer.index(dataset, ctx) - await store.storeDataset(dataset, ctx) - await db.storeDataset(dataset, ctx) - rts.clear(ctx) + rts.set(ctx, f'SKIP ANALYZE') + resources = [resource for resource in resources if resource.status_loaded] + if len(resources) > 0: + if await datasetFilter.describe(dataset): + await self.meta_describer.describe(dataset, ctx) + else: + if config.debug: + rts.set(ctx, f'SKIP DESCRIBE') + if await datasetFilter.embed(dataset): + await self.embedder.embed(dataset, ctx) + if await datasetFilter.index(dataset): + await self.indexer.index(dataset, ctx) + await store.storeDataset(dataset, ctx) + await db.storeDataset(dataset, ctx) + except Exception as e: + rts.set(ctx, f'ERROR {e}', 'error') + finally: + rts.clear(ctx) def prune_resources(self, dataset: Dataset, ctx: str): resources = dataset.resources