From 208c62d2a82c16ced8218484d1be45045a929f49 Mon Sep 17 00:00:00 2001 From: Jonathan Fallon Date: Fri, 4 Oct 2024 11:03:19 +0200 Subject: [PATCH] fix: export columns, stale and status improvements (#2633) * properly type the filter * filter csv columns from the config * consolidate status from acquisition, fraud and anomaly * fix error notification * documentation update * fail stale exports after 4 hours --- .../export/commands/ProcessCommand.ts | 11 +++-- api/src/pdc/services/export/config/export.ts | 4 +- .../pdc/services/export/models/CSVWriter.ts | 15 ++++++- .../pdc/services/export/models/CarpoolRow.ts | 4 +- .../export/repositories/CarpoolRepository.ts | 4 +- .../export/repositories/ExportRepository.ts | 41 +++++++++++-------- .../repositories/queries/CarpoolListQuery.ts | 17 ++++++-- .../services/export/services/FieldService.ts | 5 +-- doc/docs/operateurs/exports-de-trajets.md | 2 +- 9 files changed, 71 insertions(+), 32 deletions(-) diff --git a/api/src/pdc/services/export/commands/ProcessCommand.ts b/api/src/pdc/services/export/commands/ProcessCommand.ts index 4f1484d789..68e3c71b68 100644 --- a/api/src/pdc/services/export/commands/ProcessCommand.ts +++ b/api/src/pdc/services/export/commands/ProcessCommand.ts @@ -4,6 +4,7 @@ import { CommandOptionType, } from "@/ilos/common/index.ts"; import { getPerformanceTimer, logger } from "@/lib/logger/index.ts"; +import { staleDelay } from "@/pdc/services/export/config/export.ts"; import { NotificationService } from "@/pdc/services/export/services/NotificationService.ts"; import { StorageService } from "@/pdc/services/export/services/StorageService.ts"; import { CSVWriter } from "../models/CSVWriter.ts"; @@ -31,13 +32,17 @@ export class ProcessCommand implements CommandInterface { ) {} public async call(): Promise { + // init the storage service await this.storage.init(); - let counter = 50; + // fail stale exports running for too long + await this.exportRepository.failStaleExports(); + logger.info(`Patched stale exports running for more than ${staleDelay}`); // process pending exports until there are no more // picking one at a time to avoid concurrency issues // and let multiple workers process the queue in parallel + let counter = 50; let exp = await this.exportRepository.pickPending(); while (exp && counter > 0) { await this.process(exp); @@ -82,8 +87,8 @@ export class ProcessCommand implements CommandInterface { logger.info(`Export ${uuid} done in ${timer.stop()} ms`); } catch (e) { await this.exportRepository.error(_id, e.message); - await this.notify.error(exp); - await this.notify.support(exp); + await this.notify.error({ ...exp, error: e.message }); + await this.notify.support({ ...exp, error: e.message }); } } } diff --git a/api/src/pdc/services/export/config/export.ts b/api/src/pdc/services/export/config/export.ts index 064c0397e6..f6fec22984 100644 --- a/api/src/pdc/services/export/config/export.ts +++ b/api/src/pdc/services/export/config/export.ts @@ -1,4 +1,4 @@ -import { env_or_int } from "@/lib/env/index.ts"; +import { env_or_default, env_or_int } from "@/lib/env/index.ts"; // default min start 3 years ago export const minStartDefault = env_or_int( @@ -11,3 +11,5 @@ export const maxEndDefault = env_or_int( "APP_EXPORT_MAX_END", 1000 * 60 * 60 * 24 * 5, ); + +export const staleDelay = env_or_default("APP_EXPORT_STALE_DELAY", "4 hours"); diff --git a/api/src/pdc/services/export/models/CSVWriter.ts b/api/src/pdc/services/export/models/CSVWriter.ts index f65398c5c3..a2adc1d62f 100644 --- a/api/src/pdc/services/export/models/CSVWriter.ts +++ b/api/src/pdc/services/export/models/CSVWriter.ts @@ -8,6 +8,7 @@ import { import { logger } from "@/lib/logger/index.ts"; import { join } from "@/lib/path/index.ts"; import { sanitize } from "@/pdc/helpers/string.helper.ts"; +import { castToStatusEnum } from "@/pdc/providers/carpool/helpers/castStatus.ts"; import { AllowedComputedFields, CarpoolRow, @@ -51,6 +52,18 @@ export class CSVWriter { cleanup: true, fields: [], computed: [ + { + name: "status", + compute(row) { + return castToStatusEnum( + row.get([ + "acquisition_status", + "anomaly_status", + "fraud_status", + ]), + ); + }, + }, { name: "incentive_type", compute(row, datasources) { @@ -131,7 +144,7 @@ export class CSVWriter { ); }); - this.stringifier.write(carpoolRow.get()); + this.stringifier.write(carpoolRow.get(this.options.fields as string[])); return this; } diff --git a/api/src/pdc/services/export/models/CarpoolRow.ts b/api/src/pdc/services/export/models/CarpoolRow.ts index efa7882a84..318459ddbd 100644 --- a/api/src/pdc/services/export/models/CarpoolRow.ts +++ b/api/src/pdc/services/export/models/CarpoolRow.ts @@ -1,7 +1,9 @@ import { pick } from "@/lib/object/index.ts"; +import { CarpoolStatusEnum } from "@/pdc/providers/carpool/interfaces/common.ts"; import { CarpoolListType } from "@/pdc/services/export/repositories/queries/CarpoolListQuery.ts"; export type AllowedComputedFields = { + status: CarpoolStatusEnum; incentive_type: string; has_incentive: boolean; }; @@ -28,7 +30,7 @@ export class CarpoolRow { * @param fields */ public get(fields?: string[]): Partial { - return fields && fields.length ? pick(this.data, fields) : this.data; + return fields && fields.length ? pick(this.data, fields as any) : this.data; } // type makes sure the field exists in the root dataset to avoid having diff --git a/api/src/pdc/services/export/repositories/CarpoolRepository.ts b/api/src/pdc/services/export/repositories/CarpoolRepository.ts index 4356e78a98..2c87f301b4 100644 --- a/api/src/pdc/services/export/repositories/CarpoolRepository.ts +++ b/api/src/pdc/services/export/repositories/CarpoolRepository.ts @@ -70,10 +70,10 @@ export class CarpoolRepository implements CarpoolRepositoryInterface { if (progress) await progress(((done / total) * 100) | 0); } while (count !== 0); - await cursor.release(); + cursor && await cursor.release(); } catch (e) { logger.error(`[export:CarpoolRepository] ${e.message}`, { values }); - await cursor.release(); + cursor && await cursor.release(); throw e; } } diff --git a/api/src/pdc/services/export/repositories/ExportRepository.ts b/api/src/pdc/services/export/repositories/ExportRepository.ts index 7f72dd8a4b..eef04b1a59 100644 --- a/api/src/pdc/services/export/repositories/ExportRepository.ts +++ b/api/src/pdc/services/export/repositories/ExportRepository.ts @@ -1,5 +1,7 @@ import { provider } from "@/ilos/common/index.ts"; import { PostgresConnection } from "@/ilos/connection-postgres/index.ts"; +import sql, { raw } from "@/pdc/providers/carpool/helpers/sql.ts"; +import { staleDelay } from "@/pdc/services/export/config/export.ts"; import { Export, ExportStatus } from "../models/Export.ts"; import { ExportRecipient } from "../models/ExportRecipient.ts"; import { LogServiceInterfaceResolver } from "../services/LogService.ts"; @@ -12,23 +14,7 @@ export type ExportUpdateData = Partial< >; export type ExportProgress = (progress: number) => Promise; -export interface ExportRepositoryInterface { - create(data: ExportCreateData): Promise; - get(id: number): Promise; - get(id: string): Promise; - update(id: number, data: ExportUpdateData): Promise; - delete(id: number): Promise; - list(): Promise; - status(id: number, status: ExportStatus): Promise; - error(id: number, error: string): Promise; - progress(id: number): Promise; - pickPending(): Promise; - recipients(id: number): Promise; - addRecipient(export_id: number, recipient: ExportRecipient): Promise; -} - -export abstract class ExportRepositoryInterfaceResolver - implements ExportRepositoryInterface { +export abstract class ExportRepositoryInterfaceResolver { /** * Create an new export in the database * @@ -169,6 +155,16 @@ export abstract class ExportRepositoryInterfaceResolver ): Promise { throw new Error("Not implemented"); } + + /** + * Fail stale exports + * + * This method is called by the process command to fail exports that are + * stuck in the `running` status for too long. + */ + public async failStaleExports(): Promise { + throw new Error("Not implemented"); + } } @provider({ @@ -330,4 +326,15 @@ export class ExportRepository implements ExportRepositoryInterface { ], }); } + + public async failStaleExports(): Promise { + const query = sql` + UPDATE ${raw(this.exportsTable)} + SET status = ${ExportStatus.FAILURE} + WHERE status = ${ExportStatus.RUNNING} + AND created_at < NOW() - ${staleDelay}::interval + `; + + await this.connection.getClient().query(query); + } } diff --git a/api/src/pdc/services/export/repositories/queries/CarpoolListQuery.ts b/api/src/pdc/services/export/repositories/queries/CarpoolListQuery.ts index d866537259..5fc2df6088 100644 --- a/api/src/pdc/services/export/repositories/queries/CarpoolListQuery.ts +++ b/api/src/pdc/services/export/repositories/queries/CarpoolListQuery.ts @@ -1,4 +1,9 @@ /* eslint-disable max-len */ +import { + CarpoolAcquisitionStatusEnum, + CarpoolAnomalyStatusEnum, + CarpoolFraudStatusEnum, +} from "@/pdc/providers/carpool/interfaces/common.ts"; import { AbstractQuery } from "./AbstractQuery.ts"; // List the {{template}} used in the query for string replacement @@ -9,7 +14,9 @@ export type CarpoolListType = { operator_trip_id: string; operator_journey_id: string; operator_class: string; - status: string; + acquisition_status: CarpoolAcquisitionStatusEnum; + fraud_status: CarpoolFraudStatusEnum; + anomaly_status: CarpoolAnomalyStatusEnum; start_datetime: Date; start_date: string; @@ -115,7 +122,9 @@ export class CarpoolListQuery extends AbstractQuery { SELECT cc._id, cc.legacy_id, - cs.fraud_status as status, + cs.acquisition_status, + cs.fraud_status, + cs.anomaly_status, -- time cc.start_datetime as start_at, @@ -241,7 +250,9 @@ export class CarpoolListQuery extends AbstractQuery { trips.operator_trip_id, trips.operator_journey_id, trips.operator_class, - trips.status, + trips.acquisition_status, + trips.fraud_status, + trips.anomaly_status, -- dates and times are in UTC -- ceil times to 10 minutes and format for user's convenience diff --git a/api/src/pdc/services/export/services/FieldService.ts b/api/src/pdc/services/export/services/FieldService.ts index 4b919b0eb8..c48f534814 100644 --- a/api/src/pdc/services/export/services/FieldService.ts +++ b/api/src/pdc/services/export/services/FieldService.ts @@ -22,9 +22,8 @@ export class FieldService { public byTarget(target: ExportTarget): Partial { const fields = this.config.get("workbook.fields", []) as Fields; - const filter = this.config.get("workbook.filters", []).find(( - filter: FieldFilter, - ) => filter.target === target); + const filter = this.config.get("workbook.filters", []) + .find((filter: FieldFilter) => filter.target === target); if (!filter) { logger.warn(`No filter found for target ${target}`); diff --git a/doc/docs/operateurs/exports-de-trajets.md b/doc/docs/operateurs/exports-de-trajets.md index 4b32b0d8d1..48f5fc2a17 100644 --- a/doc/docs/operateurs/exports-de-trajets.md +++ b/doc/docs/operateurs/exports-de-trajets.md @@ -136,7 +136,7 @@ trajets. | | passenger_identity_id | Identifiant personnel inter-opérateurs | | operator_driver_id | operator_driver_id | | | | driver_identity_key | Identifiant personnel inter-opérateurs | -| status | status | Détection des anomalies | +| status | status | Statut du trajet : `acquisition_error`, `validation_error`, `normalization_error`, `fraud_error`, `anomaly_error`, `ok`, `expired`, `canceled`, `pending`, `unknown` | | passenger_id | | | | passenger_contribution | passenger_contribution | | | passenger_incentive_N_siret | | |