Skip to content

Commit

Permalink
Fix file deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
akariv committed Jun 11, 2024
1 parent 0ff6ed7 commit e274ef3
Showing 1 changed file with 5 additions and 4 deletions.
9 changes: 5 additions & 4 deletions odds/backend/processor/resource_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from collections import Counter
import httpx
import uuid
import os

import dataflows as DF
from sqlalchemy import create_engine
Expand Down Expand Up @@ -88,7 +89,7 @@ async def process(self, resource: Resource, dataset: Dataset, catalog: DataCatal
DF.load(filename, override_schema={'missingValues': self.MISSING_VALUES}, deduplicate_headers=True),
DF.update_resource(-1, name='data'),
DF.validate(on_error=DF.schema_validator.clear),
self.updater(ctx, lambda i: 'LOADED {i} ROWS TO DISK'),
self.updater(ctx, lambda i: f'LOADED {i} ROWS TO DISK'),
DF.stream(stream)
).process()
rts.set(ctx, f'VALIDATED {total_size} BYTES from {resource.url} to {stream.name}')
Expand Down Expand Up @@ -142,7 +143,7 @@ async def process(self, resource: Resource, dataset: Dataset, catalog: DataCatal
engine = create_engine(sqlite_url)
DF.Flow(
DF.unstream(stream.name),
self.updater(ctx, lambda i: 'DUMPED {i} ROWS TO SQLITE'),
self.updater(ctx, lambda i: f'DUMPED {i} ROWS TO SQLITE'),
DF.dump_to_sql({'data': {'resource-name': 'data'}}, engine=engine),
).process()
rts.set(ctx, f'DUMPED {total_size} BYTES / {len(data)} ROWS from {resource.url} TO {sqlite_filename}')
Expand All @@ -164,8 +165,8 @@ async def process(self, resource: Resource, dataset: Dataset, catalog: DataCatal
finally:
for filename in to_delete:
try:
filename.unlink()
except:
os.unlink(filename)
except Exception as e:
rts.set(ctx, f'FAILED TO DELETE {filename}: {e}', 'error')

def set_concurrency_limit(self, concurrency_limit):
Expand Down

0 comments on commit e274ef3

Please sign in to comment.