diff --git a/docs/source/configuracion.rst b/docs/source/configuracion.rst index f55a6d6..c736d4c 100644 --- a/docs/source/configuracion.rst +++ b/docs/source/configuracion.rst @@ -122,6 +122,9 @@ Parámetros de posicionamiento GPS Este parámetro se utiliza para cuando existe una tabla separada con GPS que contenga el posicionamiento de los vehículos o internos. En ese caso, se gelocalizará cada transacción en base a la tabla GPS, uniendo por `id_linea` e `interno` (haciendo a este campo obligatorio) y minimizando el tiempo de la transacción con respecto a la transacción gps del interno de esa linea. Para eso el campo ``fecha`` debe estar completo con dia, hora y minutos. Esto hace obligatoria la existencia de un csv con la información de posicionamiento de los gps. Su nombre y atributos se especifican de modo similar a lo hecho en transacciones. + +En ocasiones en la tabla de GPS puede haber información sobre los servicios prestados por cada vehículo. Para más detalles sobre esta configuración y cómo lo trabaja UrbanTrips ver el apartado **Servicios**. + .. code:: geolocalizar_trx: True @@ -136,7 +139,12 @@ Este parámetro se utiliza para cuando existe una tabla separada con GPS que con fecha_gps: date_time latitud_gps: latitude longitud_gps: longitude + servicios_gps: TYPE + velocity_gps: VELOCITY + trust_service_type_gps: False + valor_inicio_servicio: 7 + valor_fin_servicio: 9 Parámetro de lineas, ramales y paradas diff --git a/docs/source/index.rst b/docs/source/index.rst index a177f86..d4a5c70 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -26,6 +26,7 @@ Cotenido factores_expansion lineas_ramales resultados + kpi servicios diff --git a/docs/source/kpi.rst b/docs/source/kpi.rst new file mode 100644 index 0000000..f773e2e --- /dev/null +++ b/docs/source/kpi.rst @@ -0,0 +1,36 @@ +KPI +============== + +En este apartado se describen los diferentes tipos de KPI que UrbanTrips puede producir en función de los inputs con los que cada ciudad cuente. Se comienza por el caso donde solamente existen datos de demanda. En segundo lugar se abordan los KPI que puede producir cuando existen datos en de la oferta, como puede ser la tabla de GPS como así también información de servicios ofertados. + +En base a demanda +----------------- + +Cuando no existen datos de oferta, expresada fundamentalmente en la tabla GPS de posicionamiento de vehículos, UrbanTrips puede calcular algunos elementos de oferta y demanda en base a la tabla de transacciones en base al concepto de *pasajero equivalente*. Tomando la tabla de etapas se calcula por hora y dia para el interno de cada linea el total de pasajeros que iniciaron una etapa a esa hora en ese interno, la distancia media viajada (*DMT*) y la velocidad comercial promedio a la que circula ese interno en esa hora (siempre tomando solamente las coordenadas del interno cuando recoge pasajeros). + +Tomando estos datos se construye para cada pasajero un *pasajero equivalente* poniendo en relación cuántas posiciones disponibles en ese interno utilizó y por cuánto tiempo. Es decir, si un pasajero recorre 5 km en un interno que circula a 10 kmh, equivaldrá a 0.5 posiciones o pasajeros equivalentes. Para calcular un factor de ocupación se considera que cada interno oferta 60 ubicaciones y se compara el total de pasajeros equivalentes en esa hora en ese interno contra ese stock fijo. Luego se procede a agregar tdos estos estadísticos para diferentes niveles de agregación (interno y linea) así como también para un día de la semana tipo o dia de fin de semana tipo (siempre que hayan días procesados que pertenezcan a uno de esos tipos). + +Los resultados quedan almacenados en las siguientes tablas (para más detalles vea el apartado :doc:`resultados`). + +* ``basic_kpi_by_vehicle_hr``: arroja la batería de estadísticos por vehículo para cada linea, día y hora. +* ``basic_kpi_by_line_hr``: arroja la batería de estadísticos promediando para cada linea, día y hora (incluyendo día de semana y de fín de semana típico). +* ``basic_kpi_by_line_day``: arroja la batería de estadísticos promediando para cada linea y día (incluyendo día de semana y de fín de semana típico). + + +En base a GPS +------------- + +Cuando existe una tabla de GPS se puedem elaborar estadísicos más elaborados y precisos. En primer lugar se procura obtener un factor de ocupación comparando los Espacio Kilómetro Demandados (EKD) como proporción de los Espacio Kilómetro Ofertados (EKO). Para los primeros (EKD) se toma la cantidad de pasajeros transportados multiplicados por una DMT que puede ser utilizando la distancia media o la mediana, para evitar la influencia de medidas extremas que puedan incidir en el indicador. Para los segundos (EKO) se toma el total de kilómetros recorridos en base a la información disponible en la tabla GPS y se los multiplca por las 60 ubicaciones que se considera posee cada interno. + +En segundo lugar, se obtiene un Índice Pasajero Kilómetro (IPK) poniendo en relación el total de pasajeros transportados sobre el total de kilómetros recorridos por la línea. Para obtener estos indicadores principales se obtiene otros insumos que quedan en la base de datos, como el total de pasajeros, el total de kilómetros recorridos, la distancias medias y medianas viajadas, los vehículos totales, los pasajeros por vehículo por día, y los kilómetros por vehículo por día. Esto se calcula agregado para cada linea y día procesado, así como también para un día de la semana tipo o dia de fin de semana tipo. + +Los resultados quedan almacenados en la tabla ``kpi_by_day_line`` (para más detalles vea el apartado :doc:`resultados`). + +En base a servicios +------------------- + +UrbanTrips permite obtener datos a nivel de servicios para cada vehículo de cada línea (para más información pueden leer el aparatdo de :doc:`servicios`). Una vez que esta clafisicación de los datos de GPS en servicios ha tenido lugar, pueden obtenerse diferentes KPI al nivel de cada servicio identificado. Los resultados quedan almacenados en la tabla ``kpi_by_day_line_service`` (para más detalles vea el apartado :doc:`resultados`) + + + + diff --git a/docs/source/resultados.rst b/docs/source/resultados.rst index 34290e3..d8938ac 100644 --- a/docs/source/resultados.rst +++ b/docs/source/resultados.rst @@ -468,7 +468,7 @@ Estas tablas contienen estadísticos calculados por UrbanTrips. Algunos estádis -.. list-table:: indicadores_operativos_linea +.. list-table:: kpi_by_day_line :widths: 25 25 50 :header-rows: 1 @@ -505,15 +505,165 @@ Estas tablas contienen estadísticos calculados por UrbanTrips. Algunos estádis * - *ipk* - float - Índice Pasajero Kilómetro. - * - *fo* + * - *fo_mean* - float - - Factor de ocupación tomando 60 ubicaciónes por vehículo. + - Factor de ocupación tomando 60 ubicaciónes por vehículo tomando la DMT promedio. + * - *fo_median* + - float + - Factor de ocupación tomando 60 ubicaciónes por vehículo tomando la DMT mediana. +.. list-table:: kpi_by_day_line_service + :widths: 25 25 50 + :header-rows: 1 + + * - Campo + - Tipo de dato + - Descripción + * - *id_linea* + - int + - id identificando la linea + * - *dia* + - text + - Fecha del día para el cual fue computado el estadístico + * - *interno* + - int + - numero de interno o vehículo utilizado en la transacción. + * - *service_id* + - int + - numero de service dentro del vehiculo o interno para esa linea y dia. + * - *hora_inicio* + - float + - hora de inicio del servicio. + * - *hora_fin* + - float + - hora de cierre del servicio. + * - *tot_km* + - float + - Total de kilómetros ofertados por el servicio. + * - *tot_pax* + - float + - Total de pasajeros transportados por el servicio. + * - *dmt_mean* + - float + - Distancia media recorrida por pasajero del servicio. + * - *dmt_median* + - float + - Distancia mediana recorrida por pasajero del servicio. + * - *ipk* + - float + - Índice Pasajero Kilómetro. + * - *fo_mean* + - float + - Factor de ocupación tomando 60 ubicaciónes por vehículo tomando la DMT promedio. + * - *fo_median* + - float + - Factor de ocupación tomando 60 ubicaciónes por vehículo tomando la DMT mediana. + +.. list-table:: basic_kpi_by_vehicle_hr + :widths: 25 25 50 + :header-rows: 1 + + * - Campo + - Tipo de dato + - Descripción + * - *dia* + - text + - Fecha del día para el cual fue computado el estadístico + * - *id_linea* + - int + - id identificando la linea + * - *interno* + - int + - numero de interno o vehículo utilizado en la transacción. + * - *hora* + - float + - hora en la que se encuentra circulando el vehiculo. + * - *tot_pax* + - float + - Total de pasajeros transportados por el vehiculo para esa hora. + * - *eq_pax* + - float + - Total de pasajeros equivalentes transportados por ese vehículo durante esa hora. + * - *dmt* + - float + - Distancia media recorrida por pasajero del vehiculo para esa hora. + * - *of* + - float + - Factor de ocupación calculado como la relación entre la DMT y la velocidad comercial. + * - *speed_kmh* + - float + - Velocidad comercial promedio de ese vehiculo a esa hora. + + + +.. list-table:: basic_kpi_by_line_hr + :widths: 25 25 50 + :header-rows: 1 + + * - Campo + - Tipo de dato + - Descripción + * - *dia* + - text + - Fecha del día para el cual fue computado el estadístico + * - *id_linea* + - int + - id identificando la linea + * - *hora* + - float + - hora del día. + * - *veh* + - float + - Total de vehículos únicos circulando a esa hora para esa linea y día. + * - *pax* + - float + - Total de pasajeros que iniciaron una etapa en esa linea a esa hora y día. + * - *dmt* + - float + - Distancia media recorrida por pasajero del vehiculo para esa hora y día. + * - *of* + - float + - Factor de ocupación promedio calculado como la relación entre la DMT y la velocidad comercial. + * - *speed_kmh* + - float + - Velocidad comercial promedio de esa línea a esa hora para ese día. + + +.. list-table:: basic_kpi_by_line_day + :widths: 25 25 50 + :header-rows: 1 + + * - Campo + - Tipo de dato + - Descripción + * - *dia* + - text + - Fecha del día para el cual fue computado el estadístico + * - *id_linea* + - int + - id identificando la linea + * - *veh* + - float + - Total de vehículos únicos circulando para esa linea y día. + * - *pax* + - float + - Total de pasajeros que utilizaron esa linea ese día. + * - *dmt* + - float + - Distancia media recorrida por pasajero en esa línea ese día. + * - *of* + - float + - Factor de ocupación promedio calculado como la relación entre la DMT y la velocidad comercial. + * - *speed_kmh* + - float + - Velocidad comercial promedio de esa línea para ese día. + + Modelo de datos de base ``insumos`` ----------------------------------- diff --git a/docs/source/servicios.rst b/docs/source/servicios.rst index 951fc06..ac23b34 100644 --- a/docs/source/servicios.rst +++ b/docs/source/servicios.rst @@ -12,14 +12,28 @@ Como los servicios son una unidad de información vital para obtener ciertos ind nombres_variables_gps=nombres_variables_gps, formato_fecha=formato_fecha) -Dicha tabla debe tener un atributo donde se especifique el inicio de un servivcio. Tambien puede especificarse el final del mismo. Esto debera cargarse en el archivo de configuracion del especificando el [ATTR] y los valores correspondientes del siguiente modo: +Dicha tabla debe tener un atributo donde se especifique el inicio de un servicio. Tambien puede especificarse el final del mismo. Esto debera cargarse en el archivo de configuracion. Por un lado especificando la columna que almacena los datos de servicio, en el mismo lugar que se especifican las otras columnas de la tabla gps en ``servicios_gps``. Por otro lado los valores que en esa columna indican una apertura y cierre de servicios en los parámetros ``valor_inicio_servicio`` y ``valor_fin_servicio``: + .. code:: yaml - servicios_gps: [ATTR] + nombres_variables_gps: + id_gps: DTSN + id_linea_gps: [ATTR] + id_ramal_gps: [ATTR] + interno_gps: [ATTR] + fecha_gps: [ATTR] + latitud_gps: [ATTR] + longitud_gps: [ATTR] + servicios_gps: [ATTR] + velocity_gps: [ATTR] + + trust_service_type_gps: False valor_inicio_servicio: [VAL] valor_fin_servicio: [VAL] + + A su vez en el archivo de configuración se debe setear el parámetro correspondiente. Si ese atributo es confiable o si UrbanTrips debe, dentro de cada servicio tal como es declarado por el conductor, clasificar nuevos servicios. .. code:: yaml @@ -100,4 +114,18 @@ Para resolverlo, dichas paradas pueden agregarse en un único nodo mediante el c :alt: Clasificacion servicios ramal +Resultados +---------- + +Los resultados de la clasificación de servicios quedan en una serie de tablas dentro de la db de los datos (para más información puede consultar :doc:`resultados`). Estas tablas pueden ofrecer información para diagnósticar la clasificación. + + +* ``services``: agrupa los servicios ofertados por las diferentes lineas, sin clasificarlos por ramal. Cada servicio tiene un id tal cual fue identificado por el conductor del vehículo y otro tal como fue identificado por UrbanTrips. Para cada servicio se agregan algunos datos como la hora de inicio y de fin, la cantidad de puntos gps, el porcentaje de puntos donde el vehículo estuvo detenido, etc. +* ``services_gps_points``: vincula cada punto gps de la tabla ``gps`` con la tabla ``services``. A su vez indican el ``node_id`` más cercano y el ramal al que pertenece. +* ``services_stats``: para cada línea y día arroja una batería de estadísticos comparando los servicios tal cual venían declarados en la información original con los servicios tal cual fueron inferidos por UrbanTrips (la cantidad de servicios nuevos y cuántos de ellos resultan válidos, cúantos de estos son servicios con muy pocos puntos gps o con demasiado tiempo quietos, la distancia recorrida acumulada originalmente y aquella que se obtiene de utilizar sólo los servicios válidos y la proporción de servicios original sin subdividir en otros por parte de UrbanTrips). +* ``services_by_line_hour``: una tabla que resume por linea, dia y hora la cantidad de servicios ofertados. + + + + diff --git a/urbantrips/datamodel/transactions.py b/urbantrips/datamodel/transactions.py index 1275a8e..f868924 100644 --- a/urbantrips/datamodel/transactions.py +++ b/urbantrips/datamodel/transactions.py @@ -252,9 +252,10 @@ def renombrar_columnas_tablas(df, nombres_variables, postfijo): # get the name in the original df holding service type data service_id_col_name = nombres_variables.pop('servicios_gps') - start_service_value = nombres_variables.pop('valor_inicio_servicio') - finish_service_value = nombres_variables.pop( - 'valor_fin_servicio') + # get the values for services start and finish + gps_config = leer_configs_generales() + start_service_value = gps_config['valor_inicio_servicio'] + finish_service_value = gps_config['valor_fin_servicio'] # create a replace values dict service_id_values = { diff --git a/urbantrips/kpi/kpi.py b/urbantrips/kpi/kpi.py index 48130df..9801144 100644 --- a/urbantrips/kpi/kpi.py +++ b/urbantrips/kpi/kpi.py @@ -3,17 +3,80 @@ import warnings import pandas as pd import numpy as np -import h3 import weightedstats as ws from math import floor import re +import h3 from urbantrips.geo import geo from urbantrips.utils.utils import ( duracion, iniciar_conexion_db, - crear_tablas_indicadores_operativos, + leer_configs_generales ) +# KPI WRAPPER + + +@duracion +def compute_kpi(): + """ + Esta funcion toma los datos de oferta de la tabla gps + los datos de demanda de la tabla trx + y produce una serie de indicadores operativos por + dia y linea y por dia, linea, interno + """ + + print("Produciendo indicadores operativos...") + conn_data = iniciar_conexion_db(tipo="data") + + cur = conn_data.cursor() + q = """ + SELECT tbl_name FROM sqlite_master + WHERE type='table' + AND tbl_name='gps'; + """ + listOfTables = cur.execute(q).fetchall() + + if listOfTables == []: + print("No existe tabla GPS en la base") + print("Se calcularán KPI básicos en base a datos de demanda") + + # runing basic kpi + run_basic_kpi() + + # read data + legs, gps = read_data_for_daily_kpi() + + if (len(legs) > 0) & (len(gps) > 0): + # compute KPI per line and date + compute_kpi_by_line_day(legs=legs, gps=gps) + + # compute KPI per line and type of day + compute_kpi_by_line_typeday() + + # Run KPI at service level + cur = conn_data.cursor() + q = "select count(*) from services where valid = 1;" + valid_services = cur.execute(q).fetchall()[0][0] + + if valid_services > 0: + print("Computando estadisticos por servicio") + # compute KPI by service and day + compute_kpi_by_service() + + # compute amount of hourly services by line and day + compute_dispatched_services_by_line_hour_day() + + # compute amount of hourly services by line and type of day + compute_dispatched_services_by_line_hour_typeday() + else: + + print("No hay servicios procesados.") + print("Puede correr la funcion services.process_services()") + print("si cuenta con una tabla de gps que indique servicios") + + +# SECTION LOAD KPI @duracion def compute_route_section_load( @@ -47,7 +110,6 @@ def compute_route_section_load( """ - dat_type_is_a_date = is_date_string(day_type) # check day type format @@ -516,20 +578,27 @@ def get_route_section_id(point, route_geom): return floor_rounding(route_geom.project(point, normalized=True)) -@duracion -def compute_kpi(): +# GENERAL PURPOSE KPIS WITH GPS + +def read_data_for_daily_kpi(): """ - Esta funcion toma los datos de oferta de la tabla gps - los datos de demanda de la tabla trx - y produce una serie de indicadores operativos por - dia y linea y por dia, linea, interno + Read legs and gps micro data from db and + merges distances to legs + + Parameters + ---------- + None + + Returns + ------- + legs: pandas.DataFrame + data frame with legs data + + gps: pandas.DataFrame + gps vehicle tracking data """ - # crear tablas - crear_tablas_indicadores_operativos() - print("Produciendo indicadores operativos...") conn_data = iniciar_conexion_db(tipo="data") - conn_insumos = iniciar_conexion_db(tipo="insumos") cur = conn_data.cursor() q = """ @@ -540,131 +609,223 @@ def compute_kpi(): listOfTables = cur.execute(q).fetchall() if listOfTables == []: - print("No existe tabla GPS en la base (no se pudeden computar indicadores de oferta)") - + print( + "No existe tabla GPS en la base (no se pudeden computar indicadores de oferta)") + return None - res = 11 - distancia_entre_hex = h3.edge_length(resolution=res, unit="km") - distancia_entre_hex = distancia_entre_hex * 2 + # get day with stats computed + processed_days_q = """ + select distinct dia + from kpi_by_day_line + """ + processed_days = pd.read_sql(processed_days_q, conn_data) + processed_days = processed_days.dia + processed_days = ', '.join([f"'{val}'" for val in processed_days]) print("Leyendo datos de oferta") - q = """ + q = f""" select * from gps + where dia not in ({processed_days}) order by dia, id_linea, interno, fecha """ gps = pd.read_sql(q, conn_data) print("Leyendo datos de demanda") - q = """ + q = f""" SELECT e.dia,e.id_linea,e.interno,e.id_tarjeta,e.h3_o, e.h3_d, e.factor_expansion_linea from etapas e where e.od_validado==1 + and dia not in ({processed_days}) + """ + legs = pd.read_sql(q, conn_data) + + if (len(gps) > 0) & (len(legs) > 0): + # add distances + legs = add_distances_to_legs(legs) + else: + print("No hay datos sin KPI procesados") + legs = pd.DataFrame() + gps = pd.DataFrame() + print("Fin carga de datos de oferta y demanda") + return legs, gps + + +def add_distances_to_legs(legs): + """ + Takes legs data and add distances to each leg + + Parameters + ---------- + legs : pandas.DataFrame + DataFrame with legs data + + Returns + ------- + legs : pandas.DataFrame + DataFrame with legs and distances data + """ - etapas = pd.read_sql(q, conn_data) - distancias = pd.read_sql_query( + configs = leer_configs_generales() + h3_original_res = configs['resolucion_h3'] + min_distance = h3.edge_length(resolution=h3_original_res, unit="km") + + conn_insumos = iniciar_conexion_db(tipo="insumos") + + print("Leyendo distancias") + distances = pd.read_sql_query( """ SELECT * FROM distancias """, conn_insumos, ) - # usar distancias h3 cuando no hay osm - distancias.distance_osm_drive = ( - distancias.distance_osm_drive.combine_first(distancias.distance_h3) + + # TODO: USE DIFERENT DISTANCES, GRAPH + + print("Sumando distancias a etapas") + # use distances h3 when osm missing + distances.loc[:, ['distance']] = ( + distances.distance_osm_drive.combine_first(distances.distance_h3) ) + distances = distances.reindex(columns=["h3_o", "h3_d", "distance"]) - # obtener etapas y sus distancias recorridas - etapas = etapas.merge(distancias, how="left", on=["h3_o", "h3_d"]) + # add legs' distances + legs = legs.merge(distances, how="left", on=["h3_o", "h3_d"]) - print("Calculando indicadores de oferta por interno") + # add minimum distance in km as length of h3 + legs.distance = legs.distance.map(lambda x: max(x, min_distance)) - # Calcular kilometros vehiculo dia kvd - oferta_interno = gps\ - .groupby(["id_linea", "dia", "interno"], as_index=False)\ - .agg(kvd=("distance_km", "sum")) + no_distance = legs.distance.isna().sum()/len(legs) * 100 + print("Hay un {:.2f} % de etapas sin distancias ".format(no_distance)) + conn_insumos.close() - # Eliminar los vehiculos que tengan 0 kms recorridos - oferta_interno = oferta_interno.loc[oferta_interno.kvd > 0] + return legs - print("Calculando indicadores de demanda por interno") - # calcular pax veh dia (pvd) y distancia media recorrida (dmt) - demanda_interno = etapas.groupby( - ["id_linea", "dia", "interno"], as_index=False - ).apply(indicadores_demanda_interno) +@duracion +def compute_kpi_by_line_day(legs, gps): + """ + Takes data for supply and demand and computes KPI at line level + for each day - print("Calculando indicadores operativos por dia e interno") - indicadores_interno = oferta_interno.merge( - demanda_interno, how="left", on=["id_linea", "dia", "interno"] - ) - internos_sin_demanda = indicadores_interno.pvd.isna().sum() - internos_sin_demanda = round( - internos_sin_demanda / len(indicadores_interno) * 100, 2 - ) - print(f"Hay {internos_sin_demanda} por ciento de internos sin demanda") + Parameters + ---------- + legs : pandas.DataFrame + DataFrame with legs data + + gps : pandas.DataFrame + DataFrame with vehicle gps data + + Returns + ------- + None + + """ + conn_data = iniciar_conexion_db(tipo="data") + + # demand data + day_demand_stats = legs\ + .groupby(['id_linea', 'dia'], as_index=False)\ + .apply(demand_stats) - print("Calculando IPK y FO") - # calcular indice pasajero kilometros (ipk) y factor de ocupacion (fo) - indicadores_interno["ipk"] = indicadores_interno.pvd / \ - indicadores_interno.kvd + # supply data + day_supply_stats = gps\ + .groupby(['id_linea', 'dia'], as_index=False)\ + .apply(supply_stats) + + day_stats = day_demand_stats\ + .merge(day_supply_stats, + how='inner', on=['id_linea', 'dia']) + + # compute KPI + day_stats['pvd'] = day_stats.tot_pax / \ + day_stats.tot_veh + day_stats['kvd'] = day_stats.tot_km / \ + day_stats.tot_veh + day_stats['ipk'] = day_stats.tot_pax / \ + day_stats.tot_km # Calcular espacios-km ofertados (EKO) y los espacios-km demandados (EKD). - eko = indicadores_interno.kvd * 60 - ekd = indicadores_interno.pvd * indicadores_interno.dmt_mean - indicadores_interno["fo"] = ekd / eko + day_stats['ekd_mean'] = day_stats.tot_pax * \ + day_stats.dmt_mean + day_stats['ekd_median'] = day_stats.tot_pax * \ + day_stats.dmt_median + day_stats['eko'] = day_stats.tot_km * 60 + + day_stats['fo_mean'] = day_stats.ekd_mean / \ + day_stats.eko + day_stats['fo_median'] = day_stats.ekd_median / \ + day_stats.eko - print("Subiendo indicadores por interno a la db") cols = [ "id_linea", "dia", - "interno", - "kvd", - "pvd", + "tot_veh", + "tot_km", + "tot_pax", "dmt_mean", "dmt_median", + "pvd", + "kvd", "ipk", - "fo", + "fo_mean", + "fo_median" ] - indicadores_interno = indicadores_interno.reindex(columns=cols) - indicadores_interno.to_sql( - "indicadores_operativos_interno", + + day_stats = day_stats.reindex(columns=cols) + + day_stats.to_sql( + "kpi_by_day_line", conn_data, if_exists="append", index=False, ) - print("Calculando indicadores de demanda por linea y dia") + return day_stats - demanda_linea = etapas.groupby(["id_linea", "dia"], as_index=False).apply( - indicadores_demanda_linea - ) - print("Calculando indicadores de oferta por linea y dia") +@duracion +def compute_kpi_by_line_typeday(): + """ + Takes data for supply and demand and computes KPI at line level + for weekday and weekend - oferta_linea = oferta_interno\ - .groupby(["id_linea", "dia"], as_index=False)\ - .agg( - tot_veh=("interno", "count"), - tot_km=("kvd", "sum"), - ) + Parameters + ---------- + None - indicadores_linea = oferta_linea.merge( - demanda_linea, how="left", on=["id_linea", "dia"] - ) - indicadores_linea["pvd"] = indicadores_linea.tot_pax / \ - indicadores_linea.tot_veh - indicadores_linea["kvd"] = indicadores_linea.tot_km / \ - indicadores_linea.tot_veh - indicadores_linea["ipk"] = indicadores_linea.tot_pax / \ - indicadores_linea.tot_km + Returns + ------- + None - # Calcular espacios-km ofertados (EKO) y los espacios-km demandados (EKD). - eko = indicadores_linea.tot_km * 60 - ekd = indicadores_linea.tot_pax * indicadores_linea.dmt_mean + """ + conn_data = iniciar_conexion_db(tipo="data") + + # delete old data + delete_q = """ + DELETE FROM kpi_by_day_line + where dia in ('weekday','weekend') + """ + conn_data.execute(delete_q) + conn_data.commit() + + # read daily data + q = """ + select * from kpi_by_day_line + """ + daily_data = pd.read_sql(q, conn_data) + + # get day of the week + weekend = pd.to_datetime(daily_data['dia'].copy()).dt.dayofweek > 4 + daily_data.loc[:, ['dia']] = 'weekday' + daily_data.loc[weekend, ['dia']] = 'weekend' - indicadores_linea["fo"] = ekd / eko + # compute aggregated stats + type_of_day_stats = daily_data\ + .groupby(['id_linea', 'dia'], as_index=False)\ + .mean() print("Subiendo indicadores por linea a la db") @@ -679,36 +840,805 @@ def compute_kpi(): "pvd", "kvd", "ipk", - "fo", + "fo_mean", + "fo_median" ] - indicadores_linea = indicadores_linea.reindex(columns=cols) - indicadores_linea.to_sql( - "indicadores_operativos_linea", + + type_of_day_stats = type_of_day_stats.reindex(columns=cols) + + type_of_day_stats.to_sql( + "kpi_by_day_line", conn_data, if_exists="append", index=False, ) + return type_of_day_stats -def indicadores_demanda_interno(df): - d = {} - d["pvd"] = df["factor_expansion_linea"].sum() - d["dmt_mean"] = np.average( - a=df.distance_osm_drive, weights=df.factor_expansion_linea) - d["dmt_median"] = ws.weighted_median( - data=df.distance_osm_drive.tolist(), - weights=df.factor_expansion_linea.tolist() + +# KPIS BY SERVICE + +@duracion +def compute_kpi_by_service(): + """ + Reads supply and demand data and computes KPI at service level + for each day + + Parameters + ---------- + legs : pandas.DataFrame + DataFrame with legs data + + gps : pandas.DataFrame + DataFrame with vehicle gps data + + Returns + ------- + None + + """ + + conn_data = iniciar_conexion_db(tipo="data") + + print("Leyendo demanda por servicios validos") + q_valid_services = """ + with fechas_procesadas as ( + select distinct dia from kpi_by_day_line_service + ), + demand as ( + select e.id_tarjeta, e.id, id_linea, dia, interno, + cast(strftime('%s',(dia||' '||tiempo)) as int) as ts, tiempo, + e.h3_o, + e.h3_d, e.factor_expansion_linea + from etapas e + where od_validado = 1 + and id_linea in (select distinct id_linea from gps) + and dia not in fechas_procesadas + ), + valid_services as ( + select id_linea,dia,interno, service_id, min_ts, max_ts + from services + where valid = 1 + ), + valid_demand as ( + select d.*, s.service_id + from demand d + join valid_services s + on d.id_linea = s.id_linea + and d.dia = s.dia + and d.interno = s.interno + and d.ts >= s.min_ts + and d.ts <= s.max_ts + ) + select * from valid_demand + ; + """ + + valid_demand = pd.read_sql(q_valid_services, conn_data) + + print("Leyendo demanda por servicios invalidos") + q_invalid_services = """ + with fechas_procesadas as ( + select distinct dia from kpi_by_day_line_service + ), + demand as ( + select e.id_tarjeta, e.id, id_linea, dia, interno, + cast(strftime('%s',(dia||' '||tiempo)) as int) as ts, tiempo, + e.h3_o, + e.h3_d, e.factor_expansion_linea + from etapas e + where od_validado = 1 + and id_linea in (select distinct id_linea from gps) + and dia not in fechas_procesadas + ), + valid_services as ( + select id_linea,dia,interno, service_id, min_ts, max_ts + from services + where valid = 1 + ), + invalid_demand as ( + select d.*, s.service_id + from demand d + left join valid_services s + on d.id_linea = s.id_linea + and d.dia = s.dia + and d.interno = s.interno + and d.ts >= s.min_ts + and d.ts <= s.max_ts + ), + legs_no_service as ( + select e.id_tarjeta, e.id, id_linea, dia, interno, ts, + tiempo, h3_o, h3_d,factor_expansion_linea + from invalid_demand e + where service_id is null + ) + select d.*, s.service_id + from legs_no_service d + left join valid_services s + on d.id_linea = s.id_linea + and d.dia = s.dia + and d.interno = s.interno + and d.ts <= s.min_ts -- valid services begining after the leg start + order by d.id_tarjeta,d.dia,d.id_linea,d.interno, s.min_ts asc + ; + """ + + invalid_demand_dups = pd.read_sql(q_invalid_services, conn_data) + + # remove duplicates leaving the first, i.e. next valid service in time + invalid_demand = invalid_demand_dups.drop_duplicates( + subset=['id'], keep='first') + invalid_demand = invalid_demand.dropna(subset=['service_id']) + + # create single demand by service df + service_demand = pd.concat([valid_demand, invalid_demand]) + + # add distances to demand data + service_demand = add_distances_to_legs(legs=service_demand) + + # TODO: remove this line when factor is corrected + service_demand['factor_expansion_linea'] = ( + service_demand['factor_expansion_linea'].replace(0, 1) + ) + # compute demand stats + service_demand_stats = service_demand\ + .groupby(['dia', 'id_linea', 'interno', 'service_id'], as_index=False)\ + .apply(demand_stats) + + # read supply service data + service_supply_q = """ + select + dia,id_linea,interno,service_id, + distance_km as tot_km, min_datetime,max_datetime + from + services where valid = 1 + """ + service_supply = pd.read_sql(service_supply_q, conn_data) + + # merge supply and demand data + service_stats = service_supply\ + .merge(service_demand_stats, how='left', + on=['dia', 'id_linea', 'interno', 'service_id']) + service_stats.tot_pax = service_stats.tot_pax.fillna(0) + + # compute stats + service_stats['ipk'] = service_stats['tot_pax'] / service_stats['tot_km'] + service_stats['ekd_mean'] = service_stats['tot_pax'] * \ + service_stats['dmt_mean'] + service_stats['ekd_median'] = service_stats['tot_pax'] * \ + service_stats['dmt_median'] + service_stats['eko'] = service_stats['tot_km'] * 60 + service_stats['fo_mean'] = service_stats['ekd_mean'] / service_stats['eko'] + service_stats['fo_median'] = service_stats['ekd_median'] / \ + service_stats['eko'] + + service_stats['hora_inicio'] = service_stats.min_datetime.str[10:13].map( + int) + service_stats['hora_fin'] = service_stats.max_datetime.str[10:13].map(int) + + # reindex to meet schema + cols = ['id_linea', 'dia', 'interno', 'service_id', + 'hora_inicio', 'hora_fin', 'tot_km', 'tot_pax', 'dmt_mean', + 'dmt_median', 'ipk', 'fo_mean', 'fo_median'] + + service_stats = service_stats.reindex(columns=cols) + + service_stats.to_sql( + "kpi_by_day_line_service", + conn_data, + if_exists="append", + index=False, ) - return pd.Series(d, index=["pvd", "dmt_mean", "dmt_median"]) + return service_stats -def indicadores_demanda_linea(df): + +def demand_stats(df): d = {} d["tot_pax"] = df["factor_expansion_linea"].sum() d["dmt_mean"] = np.average( - a=df.distance_osm_drive, weights=df.factor_expansion_linea) + a=df['distance'], weights=df.factor_expansion_linea) d["dmt_median"] = ws.weighted_median( - data=df.distance_osm_drive.tolist(), + data=df['distance'].tolist(), weights=df.factor_expansion_linea.tolist() ) + return pd.Series(d, index=["tot_pax", "dmt_mean", "dmt_median"]) + + +def supply_stats(df): + d = {} + d["tot_veh"] = len(df.interno.unique()) + d["tot_km"] = df.distance_km.sum() + + return pd.Series(d, index=["tot_veh", "tot_km"]) + +# GENERAL PURPOSE KPI WITH NO GPS + + +@duracion +def run_basic_kpi(): + conn_data = iniciar_conexion_db(tipo='data') + + # read already process days + processed_days = get_processed_days(table_name='basic_kpi_by_line_day') + + # read unprocessed data from legs + + q = f""" + select * + from etapas + where od_validado = 1 + and dia not in ({processed_days}) + ; + """ + print("Leyendo datos de demanda") + legs = pd.read_sql(q, conn_data) + + if len(legs) < 5: + return None + + legs = add_distances_to_legs(legs=legs) + legs.loc[:, ['datetime']] = legs.dia + ' ' + legs.tiempo + legs.loc[:, ['time']] = pd.to_datetime( + legs.loc[:, 'datetime'], format="%Y-%m-%d %H:%M:%S") + + print("Calculando velocidades comerciales") + # compute vehicle speed per hour + speed_vehicle_hour = legs\ + .groupby(['dia', 'id_linea', 'interno'])\ + .apply(compute_speed_by_veh_hour) + + speed_vehicle_hour = speed_vehicle_hour.droplevel(3).reset_index() + + # set a max speed te remove outliers + speed_max = 60 + speed_vehicle_hour.loc[speed_vehicle_hour.speed_kmh_veh_h > + speed_max, 'speed_kmh_veh_h'] = speed_max + + print("Eliminando casos atipicos en velocidades comerciales") + # compute standar deviation to remove low speed outliers + speed_dev = speed_vehicle_hour\ + .groupby(['dia', 'id_linea'], as_index=False)\ + .agg( + mean=('speed_kmh_veh_h', 'mean'), + std=('speed_kmh_veh_h', 'std') + ) + speed_dev['speed_min'] = speed_dev['mean'] - \ + (2 * speed_dev['std']).map(lambda x: max(1, x)) + speed_dev = speed_dev.reindex(columns=['dia', 'id_linea', 'speed_min']) + + speed_vehicle_hour = speed_vehicle_hour.merge( + speed_dev, on=['dia', 'id_linea'], how='left') + + speed_mask = (speed_vehicle_hour.speed_kmh_veh_h < speed_max) &\ + (speed_vehicle_hour.speed_kmh_veh_h > speed_vehicle_hour.speed_min) + + speed_vehicle_hour = speed_vehicle_hour.loc[speed_mask, [ + 'dia', 'id_linea', 'interno', 'hora', 'speed_kmh_veh_h']] + + # compute by hour to fill nans in vehicle speed + speed_line_hour = speed_vehicle_hour\ + .drop('interno', axis=1)\ + .groupby(['dia', 'id_linea', 'hora'], as_index=False).mean()\ + .rename(columns={'speed_kmh_veh_h': 'speed_kmh_line_h'}) + + speed_line_day = speed_vehicle_hour\ + .drop('interno', axis=1)\ + .groupby(['dia', 'id_linea'], as_index=False).mean()\ + .rename(columns={'speed_kmh_veh_h': 'speed_kmh_line_day'}) + + # add commercial speed to demand data + legs = legs\ + .merge(speed_vehicle_hour, + on=['dia', 'id_linea', 'interno', 'hora'], how='left')\ + .merge(speed_line_hour, on=['dia', 'id_linea', 'hora'], how='left') + + legs['speed_kmh'] = legs.speed_kmh_veh_h.combine_first( + legs.speed_kmh_line_h) + + print("Calculando pasajero equivalente otros KPI por dia" + ", linea, interno y hora") + + # get an vehicle space equivalent passenger + legs['eq_pax'] = (legs.distance / legs.speed_kmh) * \ + legs.factor_expansion_linea + + # COMPUTE KPI BY DAY LINE VEHICLE HOUR + kpi_by_veh = legs\ + .reindex(columns=['dia', 'id_linea', 'interno', 'hora', + 'factor_expansion_linea', 'eq_pax', 'distance'])\ + .groupby(['dia', 'id_linea', 'interno', 'hora'], as_index=False)\ + .agg( + tot_pax=('factor_expansion_linea', 'sum'), + eq_pax=('eq_pax', 'sum'), + dmt=('distance', 'mean') + ) + + # compute ocupation factor + kpi_by_veh['of'] = kpi_by_veh.eq_pax/60 * 100 + + # add average commercial speed data + kpi_by_veh = kpi_by_veh\ + .merge(speed_vehicle_hour, + on=['dia', 'id_linea', 'interno', 'hora'], how='left') + kpi_by_veh = kpi_by_veh.rename(columns={'speed_kmh_veh_h': 'speed_kmh'}) + + print("Subiendo a la base de datos") + # set schema and upload to db + cols = ['dia', 'id_linea', 'interno', 'hora', 'tot_pax', 'eq_pax', + 'dmt', 'of', 'speed_kmh'] + + kpi_by_veh = kpi_by_veh.reindex(columns=cols) + + kpi_by_veh.to_sql( + "basic_kpi_by_vehicle_hr", + conn_data, + if_exists="append", + index=False, + ) + + print("Calculando pasajero equivalente otros KPI por dia, linea y hora") + + # COMPUTE KPI BY DAY LINE HOUR + + # compute ocupation factor + ocupation_factor_line_hour = kpi_by_veh\ + .reindex(columns=['dia', 'id_linea', 'hora', 'of'])\ + .groupby(['dia', 'id_linea', 'hora'], as_index=False)\ + .mean() + + # compute supply as unique vehicles day per hour + supply = legs\ + .reindex(columns=['dia', 'id_linea', 'interno', 'hora'])\ + .drop_duplicates().groupby(['dia', 'id_linea', 'hora']).size()\ + .reset_index()\ + .rename(columns={0: 'veh'}) + + # compute demand as total legs per hour and DMT + demand = legs\ + .reindex(columns=['dia', 'id_linea', 'hora', + 'factor_expansion_linea', 'distance'])\ + .groupby(['dia', 'id_linea', 'hora'], as_index=False)\ + .agg( + pax=('factor_expansion_linea', 'sum'), + dmt=('distance', 'mean') + ) + + # compute line kpi table + kpi_by_line_hr = supply\ + .merge(demand, on=['dia', 'id_linea', 'hora'], how='left')\ + .merge(ocupation_factor_line_hour, + on=['dia', 'id_linea', 'hora'], how='left') + + kpi_by_line_hr = kpi_by_line_hr.merge( + speed_line_hour, on=['dia', 'id_linea', 'hora'], how='left') + kpi_by_line_hr = kpi_by_line_hr.rename( + columns={'speed_kmh_line_h': 'speed_kmh'}) + + print("Subiendo a la base de datos") + + # set schema and upload to db + cols = ['dia', 'id_linea', 'hora', 'veh', 'pax', 'dmt', 'of', + 'speed_kmh'] + + kpi_by_line_hr = kpi_by_line_hr.reindex(columns=cols) + + kpi_by_line_hr.to_sql( + "basic_kpi_by_line_hr", + conn_data, + if_exists="append", + index=False, + ) + + # COMPUTE KPI BY DAY AND LINE + print("Calculando pasajero equivalente otros KPI por dia y linea") + + # compute daily stats + ocupation_factor_line = kpi_by_veh\ + .reindex(columns=['dia', 'id_linea', 'of'])\ + .groupby(['dia', 'id_linea'], as_index=False).mean() + + # compute supply as unique vehicles day + daily_supply = legs\ + .reindex(columns=['dia', 'id_linea', 'interno'])\ + .drop_duplicates().groupby(['dia', 'id_linea'])\ + .size()\ + .reset_index()\ + .rename(columns={0: 'veh'}) + + # compute demand as total legs per hour and DMT + daily_demand = legs\ + .reindex(columns=['dia', 'id_linea', + 'factor_expansion_linea', 'distance'])\ + .groupby(['dia', 'id_linea'], as_index=False)\ + .agg( + pax=('factor_expansion_linea', 'sum'), + dmt=('distance', 'mean'), + ) + + # compute line kpi table + kpi_by_line_day = daily_supply\ + .merge(daily_demand, on=['dia', 'id_linea'], how='left')\ + .merge(ocupation_factor_line, on=['dia', 'id_linea'], how='left') + + kpi_by_line_day = kpi_by_line_day.merge( + speed_line_day, on=['dia', 'id_linea'], how='left') + kpi_by_line_day = kpi_by_line_day.rename( + columns={'speed_kmh_line_day': 'speed_kmh'}) + + print("Subiendo a la base de datos") + # set schema and upload to db + cols = ['dia', 'id_linea', 'veh', 'pax', 'dmt', 'of', 'speed_kmh'] + + kpi_by_line_day = kpi_by_line_day.reindex(columns=cols) + + kpi_by_line_day.to_sql( + "basic_kpi_by_line_day", + conn_data, + if_exists="append", + index=False, + ) + + # compute aggregated stats by weekday and weekend + compute_basic_kpi_line_typeday() + compute_basic_kpi_line_hr_typeday() + + conn_data.close() + + +def compute_basic_kpi_line_typeday(): + conn_data = iniciar_conexion_db(tipo='data') + + print("Borrando datos desactualizados por tipo de dia") + + # delete old type of day data data + delete_q = """ + DELETE FROM basic_kpi_by_line_day + where dia in ('weekday','weekend') + """ + conn_data.execute(delete_q) + conn_data.commit() + + print("Calculando KPI basicos por tipo de dia") + q = """ + select * from basic_kpi_by_line_day; + """ + kpi_by_line_day = pd.read_sql(q, conn_data) + + # get day of the week + weekend = pd.to_datetime(kpi_by_line_day['dia'].copy()).dt.dayofweek > 4 + kpi_by_line_day.loc[:, ['dia']] = 'weekday' + kpi_by_line_day.loc[weekend, ['dia']] = 'weekend' + kpi_by_line_day + + # compute aggregated stats + kpi_by_line_typeday = kpi_by_line_day\ + .groupby(['dia', 'id_linea',], as_index=False)\ + .mean() + + print("Subiendo a la base de datos") + # set schema and upload to db + cols = ['dia', 'id_linea', 'veh', 'pax', 'dmt', 'of', 'speed_kmh'] + + kpi_by_line_typeday = kpi_by_line_typeday.reindex(columns=cols) + + kpi_by_line_typeday.to_sql( + "basic_kpi_by_line_day", + conn_data, + if_exists="append", + index=False, + ) + + conn_data.close() + + +def compute_basic_kpi_line_hr_typeday(): + conn_data = iniciar_conexion_db(tipo='data') + + print("Borrando datos desactualizados por tipo de dia") + + # delete old type of day data data + delete_q = """ + DELETE FROM basic_kpi_by_line_hr + where dia in ('weekday','weekend') + """ + conn_data.execute(delete_q) + conn_data.commit() + + print("Calculando KPI basicos por tipo de dia") + q = """ + select * from basic_kpi_by_line_hr; + """ + + kpi_by_line_hr = pd.read_sql(q, conn_data) + + # get day of the week + weekend = pd.to_datetime(kpi_by_line_hr['dia'].copy()).dt.dayofweek > 4 + kpi_by_line_hr.loc[:, ['dia']] = 'weekday' + kpi_by_line_hr.loc[weekend, ['dia']] = 'weekend' + + # compute aggregated stats + kpi_by_line_typeday = kpi_by_line_hr\ + .groupby(['dia', 'id_linea', 'hora'], as_index=False)\ + .mean() + + print("Subiendo a la base de datos") + # set schema and upload to db + cols = ['dia', 'id_linea', 'hora', 'veh', 'pax', 'dmt', 'of', 'speed_kmh'] + + kpi_by_line_typeday = kpi_by_line_typeday.reindex(columns=cols) + + kpi_by_line_typeday.to_sql( + "basic_kpi_by_line_hr", + conn_data, + if_exists="append", + index=False, + ) + + conn_data.close() + + +def compute_speed_by_veh_hour(legs_vehicle): + if len(legs_vehicle) < 2: + return None + + res = 11 + distance_between_hex = h3.edge_length(resolution=res, unit="m") + distance_between_hex = distance_between_hex * 2 + + speed = legs_vehicle.reindex( + columns=['interno', 'hora', 'time', 'latitud', 'longitud']) + speed["h3"] = speed.apply( + geo.h3_from_row, axis=1, args=(res, "latitud", "longitud")) + + # get only one h3 per vehicle hour + speed = speed.drop_duplicates(subset=['interno', 'hora', 'h3']) + if len(speed) < 2: + return None + speed = speed.sort_values('time') + + # compute meters between h3 + speed['h3_lag'] = speed['h3'].shift(1) + speed['time_lag'] = speed['time'].shift(1) + + speed = speed.dropna(subset=['h3_lag', 'time_lag']) + + speed['seconds'] = (speed['time'] - speed['time_lag'] + ).map(lambda x: x.total_seconds()) + + speed['meters'] = speed\ + .apply(lambda row: h3.h3_distance(row['h3'], row['h3_lag']), + axis=1) * distance_between_hex + + speed_by_hour = speed\ + .reindex(columns=['hora', 'seconds', 'meters'])\ + .groupby('hora', as_index=False)\ + .agg( + meters=('meters', 'sum'), + seconds=('seconds', 'sum'), + n=('hora', 'count'), + ) + # remove vehicles with less than 2 pax + + speed_by_hour = speed_by_hour.loc[speed_by_hour.n > 2, :] + speed_by_hour['speed_kmh_veh_h'] = speed_by_hour.meters / \ + speed_by_hour.seconds * 3.6 + speed_by_hour = speed_by_hour.reindex(columns=['hora', 'speed_kmh_veh_h']) + + return speed_by_hour + + +def get_processed_days(table_name): + """ + Takes a table name and returns all days present in + that table + + Parameters + ---------- + table_name : str + name of the table with processed data + + Returns + ------- + str + processed days in a coma separated str + + + """ + conn_data = iniciar_conexion_db(tipo='data') + + # get processed days in basic data + processed_days_q = f""" + select distinct dia + from {table_name} + """ + processed_days = pd.read_sql(processed_days_q, conn_data) + processed_days = processed_days.dia + processed_days = ', '.join([f"'{val}'" for val in processed_days]) + + return processed_days + + +# SERVICES' KPIS + + +@duracion +def compute_dispatched_services_by_line_hour_day(): + """ + Reads services' data and computes how many services + by line, day and hour + + Parameters + ---------- + None + + Returns + ------- + None + + """ + conn_data = iniciar_conexion_db(tipo="data") + conn_dash = iniciar_conexion_db(tipo="dash") + + processed_days_q = """ + select distinct dia + from services_by_line_hour + """ + processed_days = pd.read_sql(processed_days_q, conn_data) + processed_days = processed_days.dia + processed_days = ', '.join([f"'{val}'" for val in processed_days]) + + print("Leyendo datos de servicios") + + daily_services_q = f""" + select + id_linea, dia, min_datetime + from + services + where + valid = 1 + and dia not in ({processed_days}) + ; + """ + + daily_services = pd.read_sql(daily_services_q, conn_data) + + if len(daily_services) > 0: + + print("Procesando servicios por hora") + + daily_services['hora'] = daily_services.min_datetime.str[10:13].map( + int) + + daily_services = daily_services.drop(['min_datetime'], axis=1) + + # computing services by hour + dispatched_services_stats = daily_services\ + .groupby(['id_linea', 'dia', 'hora'], as_index=False)\ + .agg(servicios=('hora', 'count')) + + print("Fin procesamiento servicios por hora") + + print("Subiendo datos a la DB") + + cols = [ + "id_linea", + "dia", + "hora", + "servicios" + ] + + dispatched_services_stats = dispatched_services_stats.reindex( + columns=cols) + + dispatched_services_stats.to_sql( + "services_by_line_hour", + conn_data, + if_exists="append", + index=False, + ) + + dispatched_services_stats.to_sql( + "services_by_line_hour", + conn_dash, + if_exists="append", + index=False, + ) + conn_data.close() + conn_dash.close() + + print("Datos subidos a la DB") + else: + print("Todos los dias fueron procesados") + + +@duracion +def compute_dispatched_services_by_line_hour_typeday(): + """ + Reads services' data and computes how many services + by line, type of day (weekday weekend), and hour + + Parameters + ---------- + None + + Returns + ------- + None + + """ + + conn_data = iniciar_conexion_db(tipo="data") + conn_dash = iniciar_conexion_db(tipo="dash") + + # delete old data + delete_q = """ + DELETE FROM services_by_line_hour + where dia in ('weekday','weekend') + """ + conn_data.execute(delete_q) + conn_data.commit() + + # read daily data + q = """ + select * from services_by_line_hour + """ + daily_data = pd.read_sql(q, conn_data) + + if len(daily_data) > 0: + + print("Procesando servicios por tipo de dia") + + # get day of the week + weekend = pd.to_datetime(daily_data['dia'].copy()).dt.dayofweek > 4 + daily_data.loc[:, ['dia']] = 'weekday' + daily_data.loc[weekend, ['dia']] = 'weekend' + + # compute aggregated stats + type_of_day_stats = daily_data\ + .groupby(['id_linea', 'dia', 'hora'], as_index=False)\ + .mean() + + print("Subiendo datos a la DB") + + cols = [ + "id_linea", + "dia", + "hora", + "servicios" + ] + + type_of_day_stats = type_of_day_stats.reindex(columns=cols) + + type_of_day_stats.to_sql( + "services_by_line_hour", + conn_data, + if_exists="append", + index=False, + ) + + # delete old dash data + delete_q = """ + DELETE FROM services_by_line_hour + where dia in ('weekday','weekend') + """ + conn_dash.execute(delete_q) + conn_dash.commit() + + type_of_day_stats.to_sql( + "services_by_line_hour", + conn_dash, + if_exists="append", + index=False, + ) + conn_data.close() + conn_dash.close() + + print("Datos subidos a la DB") + + else: + print("No hay datos de servicios por hora") + print("Correr la funcion kpi.compute_services_by_line_hour_day()") + + return type_of_day_stats diff --git a/urbantrips/tests/test_unit_tests.py b/urbantrips/tests/test_unit_tests.py index 00bf3ca..08dc49c 100644 --- a/urbantrips/tests/test_unit_tests.py +++ b/urbantrips/tests/test_unit_tests.py @@ -771,12 +771,12 @@ def test_gps(matriz_validacion_test_amba): tot_pax = fe.factor_expansion.sum() kpi_df = pd.read_sql( - "select * from indicadores_operativos_linea;", conn_data) + "select * from kpi_by_day_line;", conn_data) - assert round(kpi_df.tot_km.item()) == 16 - assert kpi_df.tot_veh.item() == 2 - assert kpi_df.dmt_mean.item() == mean_distances - assert kpi_df.tot_pax.item() == tot_pax + assert round(kpi_df.tot_km.iloc[0]) == 16 + assert kpi_df.tot_veh.iloc[0] == 2 + assert kpi_df.dmt_mean.iloc[0] == mean_distances + assert kpi_df.tot_pax.iloc[0] == tot_pax carto.create_zones_table() diff --git a/urbantrips/utils/utils.py b/urbantrips/utils/utils.py index e43f202..096344e 100644 --- a/urbantrips/utils/utils.py +++ b/urbantrips/utils/utils.py @@ -511,6 +511,9 @@ def create_db(): """ ) + # create KPI tables + create_kpi_tables() + conn_data.close() conn_insumos.close() conn_dash.close() @@ -790,15 +793,17 @@ def eliminar_tarjetas_trx_unica(trx): return trx -def crear_tablas_indicadores_operativos(): - """Esta funcion crea la tablas en la db para albergar los datos de - los indicadores operativos""" +def create_kpi_tables(): + """ + Creates KPI tables in the data db + """ conn_data = iniciar_conexion_db(tipo='data') + conn_dash = iniciar_conexion_db(tipo='dash') conn_data.execute( """ - CREATE TABLE IF NOT EXISTS indicadores_operativos_linea + CREATE TABLE IF NOT EXISTS kpi_by_day_line ( id_linea int not null, dia text not null, @@ -810,7 +815,8 @@ def crear_tablas_indicadores_operativos(): pvd float, kvd float, ipk float, - fo float + fo_mean float, + fo_median float ) ; """ @@ -818,23 +824,121 @@ def crear_tablas_indicadores_operativos(): conn_data.execute( """ - CREATE TABLE IF NOT EXISTS indicadores_operativos_interno + CREATE TABLE IF NOT EXISTS kpi_by_day_line_service ( id_linea int not null, dia text not null, interno text not null, - kvd float, - pvd float, + service_id int not null, + hora_inicio float, + hora_fin float, + tot_km float, + tot_pax float, dmt_mean float, dmt_median float, ipk float, - fo float + fo_mean float, + fo_median float + ) + ; + """ + ) + + conn_data.execute( + """ + CREATE TABLE IF NOT EXISTS services_by_line_hour + ( + id_linea int not null, + dia text not null, + hora int not null, + servicios float not null + ) + ; + """ + ) + conn_dash.execute( + """ + CREATE TABLE IF NOT EXISTS services_by_line_hour + ( + id_linea int not null, + dia text not null, + hora int not null, + servicios float not null + ) + ; + """ + ) + + conn_data.execute( + """ + CREATE TABLE IF NOT EXISTS basic_kpi_by_vehicle_hr + ( + dia text not null, + id_linea int not null, + interno int not null, + hora int not null, + tot_pax float, + eq_pax float, + dmt float, + of float, + speed_kmh float + ) + ; + """ + ) + + conn_data.execute( + """ + CREATE TABLE IF NOT EXISTS basic_kpi_by_line_hr + ( + dia text not null, + id_linea int not null, + hora int not null, + veh float, + pax float, + dmt float, + of float, + speed_kmh float + ) + ; + """ + ) + + conn_data.execute( + """ + CREATE TABLE IF NOT EXISTS basic_kpi_by_line_day + ( + dia text not null, + id_linea int not null, + veh float, + pax float, + dmt float, + of float, + speed_kmh float + ) + ; + """ + ) + + conn_dash.execute( + """ + CREATE TABLE IF NOT EXISTS basic_kpi_by_line_hr + ( + dia text not null, + id_linea int not null, + hora int not null, + veh float, + pax float, + dmt float, + of float, + speed_kmh float ) ; """ ) conn_data.close() + conn_dash.close() def check_table_in_db(table_name, tipo_db): diff --git a/urbantrips/viz/viz.py b/urbantrips/viz/viz.py index 03e056a..0421299 100644 --- a/urbantrips/viz/viz.py +++ b/urbantrips/viz/viz.py @@ -1144,6 +1144,16 @@ def imprime_graficos_hora(viajes, conn_dash = iniciar_conexion_db(tipo='dash') + query = f""" + DELETE FROM viajes_hora + WHERE desc_dia = "{desc_dia}" + and tipo_dia = "{tipo_dia}" + """ + query = f'' + conn_dash.execute(query) + conn_dash.commit() + + viajesxhora_dash.to_sql("viajes_hora", conn_dash, if_exists="replace", index=False) @@ -1210,8 +1220,11 @@ def imprime_graficos_hora(viajes, vi_dash.columns = ['desc_dia', 'tipo_dia', 'Distancia', 'Viajes', 'Modo'] conn_dash = iniciar_conexion_db(tipo='dash') - - query = f'DELETE FROM distribucion WHERE desc_dia = "{desc_dia}" and tipo_dia = "{tipo_dia}"' + query = f""" + DELETE FROM distribucion + WHERE desc_dia = "{desc_dia}" + and tipo_dia = "{tipo_dia}" + """ conn_dash.execute(query) conn_dash.commit() @@ -1950,13 +1963,14 @@ def lineas_deseo(df, df_folium['filtro1'] = filtro1 conn_dash = iniciar_conexion_db(tipo='dash') + var_zona_q = var_zona.replace('h3_r', 'H3 Resolucion ') query = f""" - DELETE FROM lineas_deseo - WHERE - desc_dia = '{desc_dia}' and + DELETE FROM lineas_deseo + WHERE + desc_dia = '{desc_dia}' and tipo_dia = '{tipo_dia}' and - var_zona = '{var_zona.replace('h3_r', 'H3 Resolucion ')}' and + var_zona = '{var_zona_q}' and filtro1 = '{filtro1}' """ @@ -2240,3 +2254,220 @@ def create_visualizations(): k_jenks=5) save_zones() + + # plor dispatched services + plot_dispatched_services_wrapper() + + # plot basic kpi if exists + plot_basic_kpi_wrapper() + + +def plot_dispatched_services_wrapper(): + conn_data = iniciar_conexion_db(tipo='data') + + q = """ + select * + from services_by_line_hour + where dia = 'weekday'; + """ + service_data = pd.read_sql(q, conn_data) + + if len(service_data) > 0: + service_data.groupby(['id_linea']).apply( + plot_dispatched_services_by_line_day) + + conn_data.close() + + +def plot_dispatched_services_by_line_day(df): + """ + Reads services' data and plots how many services + by line, type of day (weekday weekend), and hour. + Saves it in results dir + + Parameters + ---------- + df : pandas.DataFrame + dataframe with dispatched services by hour from + services_by_line_hour table with + + Returns + ------- + None + + """ + line_id = df.id_linea.unique().item() + day = df.dia.unique().item() + + if day == 'weekend': + day_str = 'Fin de semana tipo' + elif day == 'weekday': + day_str = 'Dia de semana tipo' + else: + day_str = day + + conn_insumos = iniciar_conexion_db(tipo='insumos') + + s = f"select nombre_linea from metadata_lineas" +\ + f" where id_linea = {line_id};" + id_linea_str = pd.read_sql(s, conn_insumos) + conn_insumos.close() + + if len(id_linea_str) > 0: + id_linea_str = id_linea_str.nombre_linea.item() + id_linea_str = id_linea_str + ' -' + else: + id_linea_str = '' + + print("Creando plot de servicios despachados por linea") + print("id linea:", line_id) + + f, ax = plt.subplots(figsize=(8, 6)) + sns.barplot( + data=df, + x="hora", + y="servicios", + hue="id_linea", + ax=ax) + + ax.get_legend().remove() + ax.set_xlabel("Hora") + ax.set_ylabel("Cantidad de servicios despachados") + + f.suptitle(f"Cantidad de servicios despachados por hora y día", + fontdict={'size': 18, + 'weight': 'bold'}) + ax.set_title(f"{id_linea_str} id linea: {line_id} - Dia: {day_str}", + fontdict={"fontsize": 11}) + + ax.spines.right.set_visible(False) + ax.spines.top.set_visible(False) + ax.spines.bottom.set_visible(False) + ax.spines.left.set_visible(False) + ax.spines.left.set_position(('outward', 10)) + ax.spines.bottom.set_position(('outward', 10)) + + ax.grid(axis='y') + + for frm in ['png', 'pdf']: + archivo = f'servicios_despachados_id_linea_{line_id}_{day}.{frm}' + db_path = os.path.join("resultados", frm, archivo) + f.savefig(db_path, dpi=300) + plt.close() + + +def plot_basic_kpi_wrapper(): + conn_data = iniciar_conexion_db(tipo='data') + + q = """ + select * + from basic_kpi_by_line_hr + where dia = 'weekday'; + """ + kpi_data = pd.read_sql(q, conn_data) + + if len(kpi_data) > 0: + kpi_data.groupby(['id_linea']).apply( + plot_basic_kpi) + + conn_data.close() + + +def plot_basic_kpi(kpi_by_line_hr): + line_id = kpi_by_line_hr.id_linea.unique().item() + day = kpi_by_line_hr.dia.unique().item() + + if day == 'weekend': + day_str = 'Fin de semana tipo' + elif day == 'weekday': + day_str = 'Dia de semana tipo' + else: + day_str = day + + conn_insumos = iniciar_conexion_db(tipo='insumos') + + s = f"select nombre_linea from metadata_lineas" +\ + f" where id_linea = {line_id};" + + id_linea_str = pd.read_sql(s, conn_insumos) + conn_insumos.close() + + if len(id_linea_str) > 0: + id_linea_str = id_linea_str.nombre_linea.item() + id_linea_str = id_linea_str + ' -' + else: + id_linea_str = '' + + # Create empty df with 0 - 23 hrs + kpi_stats_line_plot = pd.DataFrame( + {'id_linea': [line_id] * 24, 'hora': range(0, 24)}) + + kpi_stats_line_plot = kpi_stats_line_plot\ + .merge(kpi_by_line_hr.query(f"id_linea == {line_id}"), + on=['id_linea', 'hora'], + how='left') + + supply_factor = kpi_stats_line_plot.of.max()/kpi_stats_line_plot.veh.max() + demand_factor = kpi_stats_line_plot.of.max()/kpi_stats_line_plot.pax.max() + + kpi_stats_line_plot.veh = kpi_stats_line_plot.veh * supply_factor + kpi_stats_line_plot.pax = kpi_stats_line_plot.pax * demand_factor + + print("Creando plot de KPI basicos por linea") + print("id linea:", line_id) + + f, ax = plt.subplots(figsize=(8, 6)) + + sns.barplot(data=kpi_stats_line_plot, x='hora', y='of', + color='silver', ax=ax, label='Factor de ocupación') + + sns.lineplot(data=kpi_stats_line_plot, x="hora", y="veh", ax=ax, + color='Purple', label='Oferta - veh/hr') + sns.lineplot(data=kpi_stats_line_plot, x="hora", y="pax", ax=ax, + color='Orange', label='Demanda - pax/hr') + + ax.set_xlabel("Hora") + ax.set_ylabel("Factor de Ocupación (%)") + + f.suptitle(f"Indicadores de oferta y demanda estadarizados", + fontdict={'size': 18, + 'weight': 'bold'}) + + ax.set_title(f"{id_linea_str} id linea: {line_id} - Dia: {day_str}", + fontdict={"fontsize": 11}) + # Add a footnote below and to the right side of the chart + note = """ + Los indicadores de Oferta y Demanda se estandarizaron para que + coincidan con el eje de Factor de Ocupación + """ + ax_note = ax.annotate(note, + xy=(0, -.18), + xycoords='axes fraction', + ha='left', + va="center", + fontsize=10) + ax.spines.right.set_visible(False) + ax.spines.top.set_visible(False) + ax.spines.bottom.set_visible(False) + ax.spines.left.set_visible(False) + ax.spines.left.set_position(('outward', 10)) + ax.spines.bottom.set_position(('outward', 10)) + + for frm in ['png', 'pdf']: + archivo = f'kpi_basicos_id_linea_{line_id}_{day}.{frm}' + db_path = os.path.join("resultados", frm, archivo) + f.savefig(db_path, dpi=300, bbox_extra_artists=( + ax_note,), bbox_inches='tight') + plt.close() + + # add to dash + kpi_stats_line_plot['nombre_linea'] = id_linea_str + + conn_dash = iniciar_conexion_db(tipo='dash') + kpi_stats_line_plot.to_sql( + "basic_kpi_by_line_hr", + conn_dash, + if_exists="append", + index=False, + ) + conn_dash.close()