Skip to content

Commit

Permalink
fix: export columns, stale and status improvements (#2633)
Browse files Browse the repository at this point in the history
* properly type the filter

* filter csv columns from the config

* consolidate status from acquisition, fraud and anomaly

* fix error notification

* documentation update

* fail stale exports after 4 hours
  • Loading branch information
jonathanfallon authored Oct 4, 2024
1 parent b2a98ad commit 208c62d
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 32 deletions.
11 changes: 8 additions & 3 deletions api/src/pdc/services/export/commands/ProcessCommand.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
CommandOptionType,
} from "@/ilos/common/index.ts";
import { getPerformanceTimer, logger } from "@/lib/logger/index.ts";
import { staleDelay } from "@/pdc/services/export/config/export.ts";
import { NotificationService } from "@/pdc/services/export/services/NotificationService.ts";
import { StorageService } from "@/pdc/services/export/services/StorageService.ts";
import { CSVWriter } from "../models/CSVWriter.ts";
Expand Down Expand Up @@ -31,13 +32,17 @@ export class ProcessCommand implements CommandInterface {
) {}

public async call(): Promise<void> {
// init the storage service
await this.storage.init();

let counter = 50;
// fail stale exports running for too long
await this.exportRepository.failStaleExports();
logger.info(`Patched stale exports running for more than ${staleDelay}`);

// process pending exports until there are no more
// picking one at a time to avoid concurrency issues
// and let multiple workers process the queue in parallel
let counter = 50;
let exp = await this.exportRepository.pickPending();
while (exp && counter > 0) {
await this.process(exp);
Expand Down Expand Up @@ -82,8 +87,8 @@ export class ProcessCommand implements CommandInterface {
logger.info(`Export ${uuid} done in ${timer.stop()} ms`);
} catch (e) {
await this.exportRepository.error(_id, e.message);
await this.notify.error(exp);
await this.notify.support(exp);
await this.notify.error({ ...exp, error: e.message });
await this.notify.support({ ...exp, error: e.message });
}
}
}
4 changes: 3 additions & 1 deletion api/src/pdc/services/export/config/export.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { env_or_int } from "@/lib/env/index.ts";
import { env_or_default, env_or_int } from "@/lib/env/index.ts";

// default min start 3 years ago
export const minStartDefault = env_or_int(
Expand All @@ -11,3 +11,5 @@ export const maxEndDefault = env_or_int(
"APP_EXPORT_MAX_END",
1000 * 60 * 60 * 24 * 5,
);

export const staleDelay = env_or_default("APP_EXPORT_STALE_DELAY", "4 hours");
15 changes: 14 additions & 1 deletion api/src/pdc/services/export/models/CSVWriter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
import { logger } from "@/lib/logger/index.ts";
import { join } from "@/lib/path/index.ts";
import { sanitize } from "@/pdc/helpers/string.helper.ts";
import { castToStatusEnum } from "@/pdc/providers/carpool/helpers/castStatus.ts";
import {
AllowedComputedFields,
CarpoolRow,
Expand Down Expand Up @@ -51,6 +52,18 @@ export class CSVWriter {
cleanup: true,
fields: [],
computed: [
{
name: "status",
compute(row) {
return castToStatusEnum(
row.get([
"acquisition_status",
"anomaly_status",
"fraud_status",
]),
);
},
},
{
name: "incentive_type",
compute(row, datasources) {
Expand Down Expand Up @@ -131,7 +144,7 @@ export class CSVWriter {
);
});

this.stringifier.write(carpoolRow.get());
this.stringifier.write(carpoolRow.get(this.options.fields as string[]));

return this;
}
Expand Down
4 changes: 3 additions & 1 deletion api/src/pdc/services/export/models/CarpoolRow.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { pick } from "@/lib/object/index.ts";
import { CarpoolStatusEnum } from "@/pdc/providers/carpool/interfaces/common.ts";
import { CarpoolListType } from "@/pdc/services/export/repositories/queries/CarpoolListQuery.ts";

export type AllowedComputedFields = {
status: CarpoolStatusEnum;
incentive_type: string;
has_incentive: boolean;
};
Expand All @@ -28,7 +30,7 @@ export class CarpoolRow {
* @param fields
*/
public get(fields?: string[]): Partial<CarpoolRowData> {
return fields && fields.length ? pick(this.data, fields) : this.data;
return fields && fields.length ? pick(this.data, fields as any) : this.data;
}

// type makes sure the field exists in the root dataset to avoid having
Expand Down
4 changes: 2 additions & 2 deletions api/src/pdc/services/export/repositories/CarpoolRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ export class CarpoolRepository implements CarpoolRepositoryInterface {
if (progress) await progress(((done / total) * 100) | 0);
} while (count !== 0);

await cursor.release();
cursor && await cursor.release();
} catch (e) {
logger.error(`[export:CarpoolRepository] ${e.message}`, { values });
await cursor.release();
cursor && await cursor.release();
throw e;
}
}
Expand Down
41 changes: 24 additions & 17 deletions api/src/pdc/services/export/repositories/ExportRepository.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { provider } from "@/ilos/common/index.ts";
import { PostgresConnection } from "@/ilos/connection-postgres/index.ts";
import sql, { raw } from "@/pdc/providers/carpool/helpers/sql.ts";
import { staleDelay } from "@/pdc/services/export/config/export.ts";
import { Export, ExportStatus } from "../models/Export.ts";
import { ExportRecipient } from "../models/ExportRecipient.ts";
import { LogServiceInterfaceResolver } from "../services/LogService.ts";
Expand All @@ -12,23 +14,7 @@ export type ExportUpdateData = Partial<
>;
export type ExportProgress = (progress: number) => Promise<void>;

export interface ExportRepositoryInterface {
create(data: ExportCreateData): Promise<Export>;
get(id: number): Promise<Export>;
get(id: string): Promise<Export>;
update(id: number, data: ExportUpdateData): Promise<void>;
delete(id: number): Promise<void>;
list(): Promise<Export[]>;
status(id: number, status: ExportStatus): Promise<void>;
error(id: number, error: string): Promise<void>;
progress(id: number): Promise<ExportProgress>;
pickPending(): Promise<Export | null>;
recipients(id: number): Promise<ExportRecipient[]>;
addRecipient(export_id: number, recipient: ExportRecipient): Promise<void>;
}

export abstract class ExportRepositoryInterfaceResolver
implements ExportRepositoryInterface {
export abstract class ExportRepositoryInterfaceResolver {
/**
* Create an new export in the database
*
Expand Down Expand Up @@ -169,6 +155,16 @@ export abstract class ExportRepositoryInterfaceResolver
): Promise<void> {
throw new Error("Not implemented");
}

/**
* Fail stale exports
*
* This method is called by the process command to fail exports that are
* stuck in the `running` status for too long.
*/
public async failStaleExports(): Promise<void> {
throw new Error("Not implemented");
}
}

@provider({
Expand Down Expand Up @@ -330,4 +326,15 @@ export class ExportRepository implements ExportRepositoryInterface {
],
});
}

public async failStaleExports(): Promise<void> {
const query = sql`
UPDATE ${raw(this.exportsTable)}
SET status = ${ExportStatus.FAILURE}
WHERE status = ${ExportStatus.RUNNING}
AND created_at < NOW() - ${staleDelay}::interval
`;

await this.connection.getClient().query(query);
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
/* eslint-disable max-len */
import {
CarpoolAcquisitionStatusEnum,
CarpoolAnomalyStatusEnum,
CarpoolFraudStatusEnum,
} from "@/pdc/providers/carpool/interfaces/common.ts";
import { AbstractQuery } from "./AbstractQuery.ts";

// List the {{template}} used in the query for string replacement
Expand All @@ -9,7 +14,9 @@ export type CarpoolListType = {
operator_trip_id: string;
operator_journey_id: string;
operator_class: string;
status: string;
acquisition_status: CarpoolAcquisitionStatusEnum;
fraud_status: CarpoolFraudStatusEnum;
anomaly_status: CarpoolAnomalyStatusEnum;

start_datetime: Date;
start_date: string;
Expand Down Expand Up @@ -115,7 +122,9 @@ export class CarpoolListQuery extends AbstractQuery {
SELECT
cc._id,
cc.legacy_id,
cs.fraud_status as status,
cs.acquisition_status,
cs.fraud_status,
cs.anomaly_status,
-- time
cc.start_datetime as start_at,
Expand Down Expand Up @@ -241,7 +250,9 @@ export class CarpoolListQuery extends AbstractQuery {
trips.operator_trip_id,
trips.operator_journey_id,
trips.operator_class,
trips.status,
trips.acquisition_status,
trips.fraud_status,
trips.anomaly_status,
-- dates and times are in UTC
-- ceil times to 10 minutes and format for user's convenience
Expand Down
5 changes: 2 additions & 3 deletions api/src/pdc/services/export/services/FieldService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ export class FieldService {

public byTarget(target: ExportTarget): Partial<Fields> {
const fields = this.config.get("workbook.fields", []) as Fields;
const filter = this.config.get("workbook.filters", []).find((
filter: FieldFilter,
) => filter.target === target);
const filter = this.config.get<FieldFilter[]>("workbook.filters", [])
.find((filter: FieldFilter) => filter.target === target);

if (!filter) {
logger.warn(`No filter found for target ${target}`);
Expand Down
2 changes: 1 addition & 1 deletion doc/docs/operateurs/exports-de-trajets.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ trajets.
| | passenger_identity_id | Identifiant personnel inter-opérateurs |
| operator_driver_id | operator_driver_id | |
| | driver_identity_key | Identifiant personnel inter-opérateurs |
| status | status | Détection des anomalies |
| status | status | Statut du trajet : `acquisition_error`, `validation_error`, `normalization_error`, `fraud_error`, `anomaly_error`, `ok`, `expired`, `canceled`, `pending`, `unknown` |
| passenger_id | | |
| passenger_contribution | passenger_contribution | |
| passenger_incentive_N_siret | | |
Expand Down

0 comments on commit 208c62d

Please sign in to comment.