diff --git a/api/db/migrations/20210618155305-territory_surface_bigint.js b/api/db/migrations/20210618155305-territory_surface_bigint.js new file mode 100644 index 0000000000..dd19326e54 --- /dev/null +++ b/api/db/migrations/20210618155305-territory_surface_bigint.js @@ -0,0 +1,13 @@ +'use strict'; +/** + * Cast all foreign keys *_id as integer to match PostgreSQL types + * Current type is 'varchar' as fkeys were migrated from MongoDB + * as a toString() of ObjectID objects. + */ +var { createMigration } = require('../helpers/createMigration'); + +var { setup, up, down } = createMigration(['territory/20210618155305_bigint_surface'], __dirname); + +exports.setup = setup; +exports.up = up; +exports.down = down; diff --git a/api/db/migrations/territory/20210618155305_bigint_surface.down.sql b/api/db/migrations/territory/20210618155305_bigint_surface.down.sql new file mode 100644 index 0000000000..10e2b1df5b --- /dev/null +++ b/api/db/migrations/territory/20210618155305_bigint_surface.down.sql @@ -0,0 +1,226 @@ +-- ERROR: cannot alter type of a column used by a view or rule +-- DETAIL: rule _RETURN on view trip.list_view depends on column "surface" + +DROP VIEW IF EXISTS trip.list_view; + +ALTER TABLE territory.territories ALTER COLUMN surface TYPE int USING surface::int; + +CREATE VIEW trip.list_view AS ( + SELECT + + -- THIS IS FOR AUTH AND SEARCH ONLY -- + cpp.operator_id as operator_id, + cpp.start_territory_id as start_territory_id, + cpp.end_territory_id as end_territory_id, + COALESCE((pip.policy_id || pid.policy_id)::int[], ARRAY[]::int[]) as applied_policies, + + -- DATA -- + cpp.acquisition_id as journey_id, + cpp.trip_id as trip_id, + + ts_ceil(cpp.datetime, 600) as journey_start_datetime, + extract(isodow from cpp.datetime) as journey_start_weekday, + extract(hour from cpp.datetime) as journey_start_dayhour, + + trunc( + ST_X(cpp.start_position::geometry)::numeric, + CASE WHEN ( + tts.surface > 0 AND + (tts.population::float / (tts.surface::float / 100)) > 40 + ) + THEN 3 + ELSE 2 + END + ) as journey_start_lon, + trunc( + ST_Y(cpp.start_position::geometry)::numeric, + CASE WHEN ( + tts.surface > 0 AND + (tts.population::float / (tts.surface::float / 100)) > 40 + ) + THEN 3 + ELSE 2 + END + ) as journey_start_lat, + + cts.insee[1] as journey_start_insee, + cts.postcode[1] as journey_start_postalcode, + substring(cts.postcode[1] from 1 for 2) as journey_start_department, + bts.town::varchar as journey_start_town, + bts.towngroup::varchar as journey_start_towngroup, + bts.country::varchar as journey_start_country, + + ts_ceil((cpp.datetime + (cpp.duration || ' seconds')::interval), 600) as journey_end_datetime, + + trunc( + ST_X(cpp.end_position::geometry)::numeric, + CASE WHEN ( + tte.surface > 0 AND + (tte.population::float / (tte.surface::float / 100)) > 40 + ) + THEN 3 + ELSE 2 + END + ) as journey_end_lon, + trunc( + ST_Y(cpp.end_position::geometry)::numeric, + CASE WHEN ( + tte.surface > 0 AND + (tte.population::float / (tte.surface::float / 100)) > 40 + ) + THEN 3 + ELSE 2 + END + ) as journey_end_lat, + + cte.insee[1] as journey_end_insee, + cte.postcode[1] as journey_end_postalcode, + substring(cte.postcode[1] from 1 for 2) as journey_end_department, + bte.town::varchar as journey_end_town, + bte.towngroup::varchar as journey_end_towngroup, + bte.country::varchar as journey_end_country, + + (CASE WHEN cpp.distance IS NOT NULL THEN cpp.distance ELSE (cpp.meta::json->>'calc_distance')::int END) as journey_distance, + cpp.distance as journey_distance_anounced, + (cpp.meta::json->>'calc_distance')::int as journey_distance_calculated, + + (CASE WHEN cpp.duration IS NOT NULL THEN cpp.duration ELSE (cpp.meta::json->>'calc_duration')::int END) as journey_duration, + cpp.duration as journey_duration_anounced, + (cpp.meta::json->>'calc_duration')::int as journey_duration_calculated, + + ope.name as operator, + cpp.operator_class as operator_class, + + cip.uuid as passenger_id, + (CASE WHEN cip.travel_pass_name IS NOT NULL THEN '1' ELSE '0' END)::boolean as passenger_card, + cip.over_18 as passenger_over_18, + cpp.seats as passenger_seats, + + abs(cpp.cost) as passenger_contribution, + cpip.incentive::trip.incentive[] as passenger_incentive_raw, + pip.incentive_raw::trip.incentive[] as passenger_incentive_rpc_raw, + pip.incentive_financial_sum as passenger_incentive_rpc_financial_sum, + pip.incentive_sum as passenger_incentive_rpc_sum, + + cid.uuid as driver_id, + (CASE WHEN cid.travel_pass_name IS NOT NULL THEN '1' ELSE '0' END)::boolean as driver_card, + + abs(cpd.cost) as driver_revenue, + cpid.incentive::trip.incentive[] as driver_incentive_raw, + pid.incentive_raw::trip.incentive[] as driver_incentive_rpc_raw, + pid.incentive_financial_sum as driver_incentive_rpc_financial_sum, + pid.incentive_sum as driver_incentive_rpc_sum, + + cpp.status as status + -- status_message + + FROM carpool.carpools as cpp + JOIN operator.operators as ope ON ope._id = cpp.operator_id::int + + LEFT JOIN territory.territories AS tts ON tts._id = cpp.start_territory_id + LEFT JOIN territory.territories AS tte ON tte._id = cpp.end_territory_id + LEFT JOIN carpool.carpools AS cpd ON cpd.acquisition_id = cpp.acquisition_id AND cpd.is_driver = true AND cpd.status = 'ok'::carpool.carpool_status_enum + LEFT JOIN carpool.identities AS cip ON cip._id = cpp.identity_id + LEFT JOIN carpool.identities AS cid ON cid._id = cpd.identity_id + LEFT JOIN territory.get_codes(cpp.start_territory_id, ARRAY[]::int[]) AS cts ON TRUE + LEFT JOIN territory.get_codes(cpp.end_territory_id, ARRAY[]::int[]) AS cte ON TRUE + LEFT JOIN territory.get_breadcrumb( + cpp.start_territory_id, + territory.get_ancestors(ARRAY[cpp.start_territory_id]) + ) AS bts ON TRUE + LEFT JOIN territory.get_breadcrumb( + cpp.end_territory_id, + territory.get_ancestors(ARRAY[cpp.end_territory_id]) + ) AS bte ON TRUE, + LATERAL ( + WITH data AS ( + SELECT + pi.policy_id, + sum(pi.amount) as amount + FROM policy.incentives as pi + WHERE pi.carpool_id = cpp._id + AND pi.status = 'validated'::policy.incentive_status_enum + AND pi.amount > 0 + GROUP BY pi.policy_id + ), + incentive AS ( + SELECT + data.policy_id as policy_id, + ROW( + cc.siret, + data.amount, + pp.unit::varchar, + data.policy_id, + pp.name::varchar, + 'incentive' + )::trip.incentive as value, + data.amount as amount, + CASE WHEN pp.unit = 'point'::policy.policy_unit_enum THEN false ELSE true END as financial + FROM data + LEFT JOIN policy.policies as pp on pp._id = data.policy_id + LEFT JOIN territory.territories as tt on pp.territory_id = tt._id + LEFT JOIN company.companies as cc on cc._id = tt.company_id + ) + SELECT + array_agg( + incentive.value + ) as incentive_raw, + sum(incentive.amount) as incentive_sum, + sum(incentive.amount) FILTER (WHERE incentive.financial IS true) as incentive_financial_sum, + array_agg(incentive.policy_id) as policy_id + FROM incentive + ) as pip, + LATERAL ( + WITH data AS ( + SELECT + pi.policy_id, + sum(pi.amount) as amount + FROM policy.incentives as pi + WHERE pi.carpool_id = cpd._id + AND pi.status = 'validated'::policy.incentive_status_enum + AND pi.amount > 0 + GROUP BY pi.policy_id + ), + incentive AS ( + SELECT + data.policy_id as policy_id, + ROW( + cc.siret, + data.amount, + pp.unit::varchar, + data.policy_id, + pp.name::varchar, + 'incentive' + )::trip.incentive as value, + data.amount as amount, + CASE WHEN pp.unit = 'point'::policy.policy_unit_enum THEN false ELSE true END as financial + FROM data + LEFT JOIN policy.policies as pp on pp._id = data.policy_id + LEFT JOIN territory.territories as tt on pp.territory_id = tt._id + LEFT JOIN company.companies as cc on cc._id = tt.company_id + ) + SELECT + array_agg( + incentive.value + ) as incentive_raw, + sum(incentive.amount) as incentive_sum, + sum(incentive.amount) FILTER (WHERE incentive.financial IS true) as incentive_financial_sum, + array_agg(incentive.policy_id) as policy_id + FROM incentive + ) as pid, + LATERAL ( + SELECT + array_agg( + value::trip.incentive + ) as incentive + FROM json_array_elements(cpp.meta->'payments') + ) as cpip, + LATERAL ( + SELECT + array_agg( + value::trip.incentive + ) as incentive + FROM json_array_elements(cpd.meta->'payments') + ) as cpid + WHERE cpp.is_driver = false AND cpp.status = 'ok'::carpool.carpool_status_enum +); diff --git a/api/db/migrations/territory/20210618155305_bigint_surface.up.sql b/api/db/migrations/territory/20210618155305_bigint_surface.up.sql new file mode 100644 index 0000000000..59c3a8e631 --- /dev/null +++ b/api/db/migrations/territory/20210618155305_bigint_surface.up.sql @@ -0,0 +1,226 @@ +-- ERROR: cannot alter type of a column used by a view or rule +-- DETAIL: rule _RETURN on view trip.list_view depends on column "surface" + +DROP VIEW IF EXISTS trip.list_view; + +ALTER TABLE territory.territories ALTER COLUMN surface TYPE bigint USING surface::bigint; + +CREATE VIEW trip.list_view AS ( + SELECT + + -- THIS IS FOR AUTH AND SEARCH ONLY -- + cpp.operator_id as operator_id, + cpp.start_territory_id as start_territory_id, + cpp.end_territory_id as end_territory_id, + COALESCE((pip.policy_id || pid.policy_id)::int[], ARRAY[]::int[]) as applied_policies, + + -- DATA -- + cpp.acquisition_id as journey_id, + cpp.trip_id as trip_id, + + ts_ceil(cpp.datetime, 600) as journey_start_datetime, + extract(isodow from cpp.datetime) as journey_start_weekday, + extract(hour from cpp.datetime) as journey_start_dayhour, + + trunc( + ST_X(cpp.start_position::geometry)::numeric, + CASE WHEN ( + tts.surface > 0 AND + (tts.population::float / (tts.surface::float / 1000000)) > 40 + ) + THEN 3 + ELSE 2 + END + ) as journey_start_lon, + trunc( + ST_Y(cpp.start_position::geometry)::numeric, + CASE WHEN ( + tts.surface > 0 AND + (tts.population::float / (tts.surface::float / 1000000)) > 40 + ) + THEN 3 + ELSE 2 + END + ) as journey_start_lat, + + cts.insee[1] as journey_start_insee, + cts.postcode[1] as journey_start_postalcode, + substring(cts.postcode[1] from 1 for 2) as journey_start_department, + bts.town::varchar as journey_start_town, + bts.towngroup::varchar as journey_start_towngroup, + bts.country::varchar as journey_start_country, + + ts_ceil((cpp.datetime + (cpp.duration || ' seconds')::interval), 600) as journey_end_datetime, + + trunc( + ST_X(cpp.end_position::geometry)::numeric, + CASE WHEN ( + tte.surface > 0 AND + (tte.population::float / (tte.surface::float / 1000000)) > 40 + ) + THEN 3 + ELSE 2 + END + ) as journey_end_lon, + trunc( + ST_Y(cpp.end_position::geometry)::numeric, + CASE WHEN ( + tte.surface > 0 AND + (tte.population::float / (tte.surface::float / 1000000)) > 40 + ) + THEN 3 + ELSE 2 + END + ) as journey_end_lat, + + cte.insee[1] as journey_end_insee, + cte.postcode[1] as journey_end_postalcode, + substring(cte.postcode[1] from 1 for 2) as journey_end_department, + bte.town::varchar as journey_end_town, + bte.towngroup::varchar as journey_end_towngroup, + bte.country::varchar as journey_end_country, + + (CASE WHEN cpp.distance IS NOT NULL THEN cpp.distance ELSE (cpp.meta::json->>'calc_distance')::int END) as journey_distance, + cpp.distance as journey_distance_anounced, + (cpp.meta::json->>'calc_distance')::int as journey_distance_calculated, + + (CASE WHEN cpp.duration IS NOT NULL THEN cpp.duration ELSE (cpp.meta::json->>'calc_duration')::int END) as journey_duration, + cpp.duration as journey_duration_anounced, + (cpp.meta::json->>'calc_duration')::int as journey_duration_calculated, + + ope.name as operator, + cpp.operator_class as operator_class, + + cip.uuid as passenger_id, + (CASE WHEN cip.travel_pass_name IS NOT NULL THEN '1' ELSE '0' END)::boolean as passenger_card, + cip.over_18 as passenger_over_18, + cpp.seats as passenger_seats, + + abs(cpp.cost) as passenger_contribution, + cpip.incentive::trip.incentive[] as passenger_incentive_raw, + pip.incentive_raw::trip.incentive[] as passenger_incentive_rpc_raw, + pip.incentive_financial_sum as passenger_incentive_rpc_financial_sum, + pip.incentive_sum as passenger_incentive_rpc_sum, + + cid.uuid as driver_id, + (CASE WHEN cid.travel_pass_name IS NOT NULL THEN '1' ELSE '0' END)::boolean as driver_card, + + abs(cpd.cost) as driver_revenue, + cpid.incentive::trip.incentive[] as driver_incentive_raw, + pid.incentive_raw::trip.incentive[] as driver_incentive_rpc_raw, + pid.incentive_financial_sum as driver_incentive_rpc_financial_sum, + pid.incentive_sum as driver_incentive_rpc_sum, + + cpp.status as status + -- status_message + + FROM carpool.carpools as cpp + JOIN operator.operators as ope ON ope._id = cpp.operator_id::int + + LEFT JOIN territory.territories AS tts ON tts._id = cpp.start_territory_id + LEFT JOIN territory.territories AS tte ON tte._id = cpp.end_territory_id + LEFT JOIN carpool.carpools AS cpd ON cpd.acquisition_id = cpp.acquisition_id AND cpd.is_driver = true AND cpd.status = 'ok'::carpool.carpool_status_enum + LEFT JOIN carpool.identities AS cip ON cip._id = cpp.identity_id + LEFT JOIN carpool.identities AS cid ON cid._id = cpd.identity_id + LEFT JOIN territory.get_codes(cpp.start_territory_id, ARRAY[]::int[]) AS cts ON TRUE + LEFT JOIN territory.get_codes(cpp.end_territory_id, ARRAY[]::int[]) AS cte ON TRUE + LEFT JOIN territory.get_breadcrumb( + cpp.start_territory_id, + territory.get_ancestors(ARRAY[cpp.start_territory_id]) + ) AS bts ON TRUE + LEFT JOIN territory.get_breadcrumb( + cpp.end_territory_id, + territory.get_ancestors(ARRAY[cpp.end_territory_id]) + ) AS bte ON TRUE, + LATERAL ( + WITH data AS ( + SELECT + pi.policy_id, + sum(pi.amount) as amount + FROM policy.incentives as pi + WHERE pi.carpool_id = cpp._id + AND pi.status = 'validated'::policy.incentive_status_enum + AND pi.amount > 0 + GROUP BY pi.policy_id + ), + incentive AS ( + SELECT + data.policy_id as policy_id, + ROW( + cc.siret, + data.amount, + pp.unit::varchar, + data.policy_id, + pp.name::varchar, + 'incentive' + )::trip.incentive as value, + data.amount as amount, + CASE WHEN pp.unit = 'point'::policy.policy_unit_enum THEN false ELSE true END as financial + FROM data + LEFT JOIN policy.policies as pp on pp._id = data.policy_id + LEFT JOIN territory.territories as tt on pp.territory_id = tt._id + LEFT JOIN company.companies as cc on cc._id = tt.company_id + ) + SELECT + array_agg( + incentive.value + ) as incentive_raw, + sum(incentive.amount) as incentive_sum, + sum(incentive.amount) FILTER (WHERE incentive.financial IS true) as incentive_financial_sum, + array_agg(incentive.policy_id) as policy_id + FROM incentive + ) as pip, + LATERAL ( + WITH data AS ( + SELECT + pi.policy_id, + sum(pi.amount) as amount + FROM policy.incentives as pi + WHERE pi.carpool_id = cpd._id + AND pi.status = 'validated'::policy.incentive_status_enum + AND pi.amount > 0 + GROUP BY pi.policy_id + ), + incentive AS ( + SELECT + data.policy_id as policy_id, + ROW( + cc.siret, + data.amount, + pp.unit::varchar, + data.policy_id, + pp.name::varchar, + 'incentive' + )::trip.incentive as value, + data.amount as amount, + CASE WHEN pp.unit = 'point'::policy.policy_unit_enum THEN false ELSE true END as financial + FROM data + LEFT JOIN policy.policies as pp on pp._id = data.policy_id + LEFT JOIN territory.territories as tt on pp.territory_id = tt._id + LEFT JOIN company.companies as cc on cc._id = tt.company_id + ) + SELECT + array_agg( + incentive.value + ) as incentive_raw, + sum(incentive.amount) as incentive_sum, + sum(incentive.amount) FILTER (WHERE incentive.financial IS true) as incentive_financial_sum, + array_agg(incentive.policy_id) as policy_id + FROM incentive + ) as pid, + LATERAL ( + SELECT + array_agg( + value::trip.incentive + ) as incentive + FROM json_array_elements(cpp.meta->'payments') + ) as cpip, + LATERAL ( + SELECT + array_agg( + value::trip.incentive + ) as incentive + FROM json_array_elements(cpd.meta->'payments') + ) as cpid + WHERE cpp.is_driver = false AND cpp.status = 'ok'::carpool.carpool_status_enum +); diff --git a/api/proxy/src/Kernel.ts b/api/proxy/src/Kernel.ts index f1aa258c4b..a784e36f69 100644 --- a/api/proxy/src/Kernel.ts +++ b/api/proxy/src/Kernel.ts @@ -19,12 +19,11 @@ import { bootstrap as monitoringBootstrap } from '@pdc/service-monitoring'; import { bootstrap as honorBootstrap } from '@pdc/service-honor'; import { config } from './config'; -import { MigrateInseeCommand } from './commands/MigrateInseeCommand'; import { ProcessJourneyCommand } from './commands/ProcessJourneyCommand'; -import { GeoFetchCommand } from './commands/GeoFetchCommand'; -import { SyncTerritoryInseeCommand } from './commands/SyncTerritoryInseeCommand'; -import { SyncRegionDepCommand } from './commands/SyncRegionDepCommand'; -import { SyncLegacyProductionCommand } from './commands/SyncLegacyProductionCommand'; +import { SyncGeoShapeCommand } from './commands/SyncGeoShapeCommand'; +// import { SyncTerritoryInseeCommand } from './commands/SyncTerritoryInseeCommand'; +// import { SyncRegionDepCommand } from './commands/SyncRegionDepCommand'; +import { SyncGeoSurfacePopulationCommand } from './commands/SyncSurfacePopulationCommand'; import { SeedCommand } from './commands/SeedCommand'; @kernel({ @@ -48,12 +47,11 @@ import { SeedCommand } from './commands/SeedCommand'; providers: [SentryProvider, TokenProvider], commands: [ ProcessJourneyCommand, - MigrateInseeCommand, - SyncTerritoryInseeCommand, - SyncRegionDepCommand, - GeoFetchCommand, - SyncLegacyProductionCommand, + // SyncTerritoryInseeCommand, + // SyncRegionDepCommand, + SyncGeoShapeCommand, SeedCommand, + SyncGeoSurfacePopulationCommand, Commands.CallCommand, ], }) diff --git a/api/proxy/src/commands/GeoFetchCommand.ts b/api/proxy/src/commands/GeoFetchCommand.ts deleted file mode 100644 index 8a3eaec97b..0000000000 --- a/api/proxy/src/commands/GeoFetchCommand.ts +++ /dev/null @@ -1,79 +0,0 @@ -import axios from 'axios'; -import { promisify } from 'util'; -import { command, CommandInterface, CommandOptionType } from '@ilos/common'; -import { PostgresConnection, Cursor } from '@ilos/connection-postgres'; - -@command() -export class GeoFetchCommand implements CommandInterface { - static readonly signature: string = 'geo:fetch'; - static readonly description: string = 'import all missing shapes in common.insee table from geo.data.gouv.fr'; - static readonly options: CommandOptionType[] = [ - { - signature: '-u, --database-uri ', - description: 'Postgres connection string', - default: process.env.APP_POSTGRES_URL, - }, - ]; - - public async call(options): Promise { - try { - const pgConnection = new PostgresConnection({ - connectionString: options.databaseUri, - }); - - await pgConnection.up(); - const pgClient = pgConnection.getClient(); - const pgConnected = await pgClient.connect(); - const cursorCb = pgConnected.query(new Cursor(`SELECT _id FROM common.insee WHERE geo IS NULL`)); - const cursor = promisify(cursorCb.read.bind(cursorCb)); - const ROW_COUNT = 10; - let count = 0; - - const totalStart = await pgClient.query('SELECT COUNT(*) FROM common.insee where geo IS NULL'); - - do { - const results = await cursor(ROW_COUNT); - count = results.length; - - for (const line of results) { - try { - const response = await axios.get( - `https://geo.api.gouv.fr/communes?code=${line._id}&fields=code,nom,codesPostaux,contour`, - ); - - const { data } = response; - - await pgClient.query({ - text: ` - UPDATE common.insee - SET - geo = ST_GeomFromGeoJSON($2), - postcodes = $3, - town = $4 - WHERE _id = $1 - `, - values: [line._id, data[0].contour, data[0].codesPostaux, data[0].nom], - }); - - // prettier-ignore - console.debug(`> UPDATE ${line._id} ${data[0].nom} ${data[0].contour && data[0].contour !== '' ? 'contour' : ''}`); // eslint-disable-line max-len - } catch (e) { - console.error(`> ERROR ${line._id}`, e); - } - } - } while (count !== 0); - - const totalEnd = await pgClient.query('SELECT COUNT(*) FROM common.insee where geo IS NULL'); - - console.table({ - Start: parseInt(totalStart.rows[0].count, 10), - End: parseInt(totalEnd.rows[0].count, 10), - Found: parseInt(totalStart.rows[0].count, 10) - parseInt(totalEnd.rows[0].count, 10), - }); - - return 'Finito'; - } catch (e) { - console.error(e.message, e); - } - } -} diff --git a/api/proxy/src/commands/MigrateInseeCommand.ts b/api/proxy/src/commands/MigrateInseeCommand.ts deleted file mode 100644 index d50700e1dd..0000000000 --- a/api/proxy/src/commands/MigrateInseeCommand.ts +++ /dev/null @@ -1,72 +0,0 @@ -import axios from 'axios'; -import { command, CommandInterface, CommandOptionType } from '@ilos/common'; -import { PostgresConnection } from '@ilos/connection-postgres'; - -@command() -export class MigrateInseeCommand implements CommandInterface { - static readonly signature: string = 'migrate:insee'; - static readonly description: string = 'Seed INSEE data'; - static readonly options: CommandOptionType[] = [ - { - signature: '-u, --database-uri ', - description: 'Postgres connection string', - default: process.env.APP_POSTGRES_URL, - }, - ]; - - public async call(options): Promise { - try { - const pgConnection = new PostgresConnection({ - connectionString: options.databaseUri, - }); - - await pgConnection.up(); - - const pgClient = pgConnection.getClient(); - - const results = await pgClient.query(` - SELECT territory.insee._id - FROM territory.insee - LEFT JOIN common.insee - ON territory.insee._id = common.insee._id - WHERE geo IS NULL - `); - - if (results.rowCount === 0) { - return 'All geo data found'; - } - - const list: { _id: string }[] = [...results.rows]; - while (list.length > 0) { - const { _id: code } = list.pop(); - try { - const response = await axios.get( - `https://geo.api.gouv.fr/communes?code=${code}&fields=code,nom,codesPostaux,contour`, - ); - - const { data } = response; - - await pgClient.query({ - text: `INSERT INTO common.insee ( _id, geo, postcodes, town, country ) - VALUES ( $1, ST_GeomFromGeoJSON($2), $3, $4, $5 )`, - values: [ - code, - data[0].contour, - data[0].codesPostaux, - data[0].nom, - code.substr(0, 2) !== '99' ? 'France' : null, - ], - }); - - console.info(`> INSERT ${code} ${data[0].nom}`); - } catch (e) { - console.error(`> ERROR ${code}`, e); - } - } - - return 'All geo data found'; - } catch (e) { - console.error(e.message, e); - } - } -} diff --git a/api/proxy/src/commands/SyncGeoShapeCommand.ts b/api/proxy/src/commands/SyncGeoShapeCommand.ts new file mode 100644 index 0000000000..63710dc039 --- /dev/null +++ b/api/proxy/src/commands/SyncGeoShapeCommand.ts @@ -0,0 +1,74 @@ +import axios from 'axios'; +import { promisify } from 'util'; +import { command, CommandInterface, CommandOptionType } from '@ilos/common'; +import { PostgresConnection, Cursor } from '@ilos/connection-postgres'; + +@command() +export class SyncGeoShapeCommand implements CommandInterface { + static readonly signature: string = 'geo:shape'; + static readonly description: string = 'get missing shapes in territory.territories from geo.data.gouv.fr'; + static readonly options: CommandOptionType[] = [ + { + signature: '-u, --database-uri ', + description: 'Postgres connection string', + default: process.env.APP_POSTGRES_URL, + }, + ]; + + public async call(options): Promise { + try { + const query = ` + SELECT tt._id, tc.value insee + FROM territory.territories tt + LEFT JOIN territory.territory_codes tc + ON tt._id = tc.territory_id + WHERE tt.level = 'town' + AND tc.type = 'insee' + AND tt.geo IS NULL + `; + + const pgConnection = new PostgresConnection({ connectionString: options.databaseUri }); + + await pgConnection.up(); + const pgClient = pgConnection.getClient(); + const pgConnected = await pgClient.connect(); + const cursorCb = pgConnected.query(new Cursor(query)); + const cursor = promisify(cursorCb.read.bind(cursorCb)); + const ROW_COUNT = 10; + let count = 0; + + do { + const results = await cursor(ROW_COUNT); + count = results.length; + + for (const line of results) { + try { + const response = await axios.get(`https://geo.api.gouv.fr/communes?code=${line.insee}&fields=nom,contour`); + + const { data } = response; + + if (!data.length) throw new Error(`${line._id}/${line.insee} not found`); + if (!data[0].contour) throw new Error(`${line._id}/${line.insee} no shape available`); + + await pgClient.query({ + text: ` + UPDATE territory.territories + SET geo = ST_GeomFromGeoJSON($2) + WHERE _id = $1 + `, + values: [line._id, data[0].contour], + }); + + console.debug(`> UPDATE ${line._id}\t${line.insee}\t${data[0].nom}`); + } catch (e) { + console.error(`> ERROR ${e.message}`); + } + } + } while (count !== 0); + + return 'Finito'; + } catch (e) { + console.error(e.message, e); + } + } +} diff --git a/api/proxy/src/commands/SyncLegacyProductionCommand.ts b/api/proxy/src/commands/SyncLegacyProductionCommand.ts deleted file mode 100644 index c0fd71a895..0000000000 --- a/api/proxy/src/commands/SyncLegacyProductionCommand.ts +++ /dev/null @@ -1,94 +0,0 @@ -import { command, CommandInterface, CommandOptionType } from '@ilos/common'; -import { PostgresConnection } from '@ilos/connection-postgres'; - -type Acquisition = { - _id: number; - created_at: Date; - application_id: number; - operator_id: number; - journey_id: string; - payload: object; -}; - -@command() -export class SyncLegacyProductionCommand implements CommandInterface { - static readonly signature: string = 'sync:legacy'; - static readonly description: string = 'sync legacy production acquisition table'; - static readonly options: CommandOptionType[] = [ - { - signature: '-u, --database-uri ', - description: 'Postgres connection string for writing', - default: process.env.APP_POSTGRES_URL, - }, - { - signature: '-z, --legacy-uri ', - description: 'Postgres legacy connection string for reading', - default: process.env.APP_POSTGRES_LEGACY_URL, - }, - { - signature: '-f, --from ', - description: 'Sync acquisition from this date', - coerce: (s: string): Date => new Date(s), - }, - ]; - - public async call(options: { databaseUri: string; legacyUri: string; from: Date }): Promise { - const pgConRead = new PostgresConnection({ connectionString: options.legacyUri }); - const pgConWrite = new PostgresConnection({ connectionString: options.databaseUri }); - await pgConRead.up(); - await pgConWrite.up(); - const pgRead = pgConRead.getClient(); - const pgWrite = pgConWrite.getClient(); - - // define date - if (!options.from) { - const resLatest = await pgWrite.query(` - SELECT created_at FROM acquisition.acquisitions ORDER BY created_at DESC LIMIT 1 - `); - options.from = resLatest.rows[0].created_at; - } - - console.info(`> Latest acquisition on new db is: ${options.from.toISOString()}`); - - // count acquisitions to import - const resAcquisitions = await pgRead.query({ - text: `SELECT * FROM acquisition.acquisitions WHERE created_at > $1`, - values: [options.from], - }); - - console.info(`> ${resAcquisitions.rowCount} acquisitions to import`); - - let counter = 0; - - if (resAcquisitions.rowCount > 0) { - try { - await pgWrite.query('BEGIN'); - - for (const line of resAcquisitions.rows as Acquisition[]) { - await pgWrite.query({ - text: ` - INSERT INTO acquisition.acquisitions ( - application_id, - operator_id, - journey_id, - payload - ) VALUES ( $1, $2, $3, $4 ) - ON CONFLICT DO NOTHING - `, - values: [line.application_id, line.operator_id, line.journey_id, line.payload], - }); - - counter += 1; - console.debug(`>>> Imported ${line._id}\t${line.created_at.toISOString()}`); - } - - await pgWrite.query('COMMIT'); - } catch (e) { - console.error(e.message, e); - await pgWrite.query('ROLLBACK'); - } - } - - return `> Imported ${counter} acquisitions into DB`; - } -} diff --git a/api/proxy/src/commands/SyncSurfacePopulationCommand.ts b/api/proxy/src/commands/SyncSurfacePopulationCommand.ts new file mode 100644 index 0000000000..802eca159d --- /dev/null +++ b/api/proxy/src/commands/SyncSurfacePopulationCommand.ts @@ -0,0 +1,77 @@ +import axios from 'axios'; +import { promisify } from 'util'; +import { command, CommandInterface, CommandOptionType } from '@ilos/common'; +import { PostgresConnection, Cursor } from '@ilos/connection-postgres'; + +@command() +export class SyncGeoSurfacePopulationCommand implements CommandInterface { + static readonly signature: string = 'geo:surfacepopulation'; + static readonly description: string = 'import all missing surface and population from geo.data.gouv.fr'; + static readonly options: CommandOptionType[] = [ + { + signature: '-u, --database-uri ', + description: 'Postgres connection string', + default: process.env.APP_POSTGRES_URL, + }, + ]; + + public async call(options): Promise { + try { + const selectQuery = ` + SELECT tt._id, tc.value insee + FROM territory.territories tt + LEFT JOIN territory.territory_codes tc + ON tt._id=tc.territory_id + WHERE tc.type='insee' + AND tt.level = 'town' + AND tt.geo IS NOT NULL + AND (tt.surface IS NULL OR tt.population IS NULL) + `; + + const pgConnection = new PostgresConnection({ connectionString: options.databaseUri }); + await pgConnection.up(); + const pgClient = pgConnection.getClient(); + const pgConnected = await pgClient.connect(); + const cursorCb = pgConnected.query(new Cursor(selectQuery)); + const cursor = promisify(cursorCb.read.bind(cursorCb)); + const ROW_COUNT = 10; + let count = 0; + + do { + const results = await cursor(ROW_COUNT); + count = results.length; + + for (const line of results) { + let data; + try { + const url = `https://geo.api.gouv.fr/communes?code=${line.insee}&fields=code,nom,population`; + const response = await axios.get(url); + + data = response.data; + + if (!data.length) throw new Error(`${line.insee} not found`); + + await pgClient.query({ + text: ` + UPDATE territory.territories + SET + surface = ST_Area(geo, true)::int, + population = $2 + WHERE _id = $1 + `, + values: [line._id, data[0].population ?? null], + }); + + console.debug(`> UPDATE #${line._id}\t${line.insee}\t${data[0].population} habitants\t${data[0].nom}`); + } catch (e) { + console.error(`> ERROR #${line._id}\t${line.insee}`, { message: e.message, data }); + } + } + } while (count !== 0); + + return 'Finito'; + } catch (e) { + console.error(e.message, e); + } + } +}