Skip to content

Commit

Permalink
refactor: remove useless queries in csv.py
Browse files Browse the repository at this point in the history
  • Loading branch information
bolinocroustibat committed Nov 22, 2024
1 parent 8d30751 commit 9067981
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 19 deletions.
23 changes: 10 additions & 13 deletions udata_hydra/analysis/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,14 @@
minio_client = MinIOClient()


async def notify_udata(check_id: int) -> None:
async def notify_udata(check: Record | None, resource: Record | None) -> None:
"""Notify udata of the result of a parsing"""
# Get the check again to get its updated data
check: Record | None = await Check.get_by_id(check_id, with_deleted=True)
resource_id = check["resource_id"]
db = await context.pool()
record = await db.fetchrow("SELECT dataset_id FROM catalog WHERE resource_id = $1", resource_id)
if record:
if not check:
return
if resource:
payload = {
"resource_id": resource_id,
"dataset_id": record["dataset_id"],
"resource_id": check["resource_id"],
"dataset_id": resource["dataset_id"],
"document": {
"analysis:parsing:error": check["parsing_error"],
"analysis:parsing:started_at": check["parsing_started_at"].isoformat()
Expand Down Expand Up @@ -129,7 +126,7 @@ async def analyse_csv(
url = check["url"]

# Update resource status to ANALYSING_CSV
await Resource.update(resource_id, {"status": "ANALYSING_CSV"})
resource: Record | None = await Resource.update(resource_id, {"status": "ANALYSING_CSV"})

# Check if the resource is in the exceptions table
# If it is, get the table_indexes to use them later
Expand All @@ -155,7 +152,7 @@ async def analyse_csv(
timer.mark("download-file")

try:
await Check.update(check["id"], {"parsing_started_at": datetime.now(timezone.utc)})
check = await Check.update(check["id"], {"parsing_started_at": datetime.now(timezone.utc)})

# Launch csv-detective against given file
try:
Expand Down Expand Up @@ -186,7 +183,7 @@ async def analyse_csv(
resource_id=resource_id,
)
timer.mark("csv-to-parquet")
await Check.update(
check = await Check.update(
check["id"],
{
"parsing_table": table_name,
Expand All @@ -200,7 +197,7 @@ async def analyse_csv(
except ParseException as e:
await handle_parse_exception(e, table_name, check)
finally:
await notify_udata(check["id"])
await notify_udata(check, resource)
timer.stop()
tmp_file.close()
os.remove(tmp_file.name)
Expand Down
12 changes: 7 additions & 5 deletions udata_hydra/db/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import json

from asyncpg import Record

from udata_hydra import context


Expand All @@ -22,11 +24,11 @@ def compute_insert_query(table_name: str, data: dict, returning: str = "id") ->
return f"""
INSERT INTO "{table_name}" ({columns})
VALUES ({placeholders})
RETURNING {returning}
RETURNING {returning};
"""


def compute_update_query(table_name: str, data: dict) -> str:
def compute_update_query(table_name: str, data: dict, returning: str = "*") -> str:
columns = data.keys()
# $1, $2...
placeholders = [f"${x + 1}" for x in range(len(data.values()))]
Expand All @@ -35,12 +37,12 @@ def compute_update_query(table_name: str, data: dict) -> str:
UPDATE "{table_name}"
SET {set_clause}
WHERE id = ${len(placeholders) + 1}
RETURNING {returning};
"""


async def update_table_record(table_name: str, record_id: int, data: dict) -> int:
async def update_table_record(table_name: str, record_id: int, data: dict) -> Record | None:
data = convert_dict_values_to_json(data)
q = compute_update_query(table_name, data)
pool = await context.pool()
await pool.execute(q, *data.values(), record_id)
return record_id
return await pool.fetchrow(q, *data.values(), record_id)
2 changes: 1 addition & 1 deletion udata_hydra/db/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ async def insert(cls, data: dict) -> Record:
return last_check

@classmethod
async def update(cls, check_id: int, data: dict) -> int:
async def update(cls, check_id: int, data: dict) -> Record | None:
"""Update a check in DB with new data and return the check id in DB"""
return await update_table_record(table_name="checks", record_id=check_id, data=data)

Expand Down

0 comments on commit 9067981

Please sign in to comment.