-
Notifications
You must be signed in to change notification settings - Fork 2
/
coverage_journeys.py
96 lines (79 loc) · 3.58 KB
/
coverage_journeys.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import sys
from datetime import datetime, timedelta
import json
import os
import shutil
from pyspark import SparkContext, SparkConf
from glob import glob
def get_journey_count_from_stat_dict(stat_dict):
result = []
if len(stat_dict['coverages']) > 0:
for cov in stat_dict['coverages']:
coverages = []
region_id = cov['region_id'] if 'region_id' in cov else ''
if not region_id in coverages:
coverages.append(region_id)
for cov in coverages:
if not '' == cov: # The empty coverages key is represented by [{}]
result.append(
(
(
datetime.utcfromtimestamp(stat_dict['request_date']).date(), # request_date
cov, # region_id
1 if 'canaltp' in stat_dict['user_name'] else 0, # is_internal_call
),
len(stat_dict['journeys']) if 'journeys' in stat_dict else 0, # nb_journeys
)
)
return result
if __name__ == "__main__":
if len(sys.argv) < 3:
raise SystemExit("Missing arguments. Usage: " + sys.argv[0] + " <source_root> <start_date> <end_date> ")
# treatment_day = datetime.strptime(sys.argv[1], '%Y-%m-%d').date()
# source_root = '/home/vlepot/dev/navitia-stat-logger/tmp'
# source_root = 'gs://hdp_test'
source_root = sys.argv[1]
treatment_day_start = datetime.strptime(sys.argv[2], '%Y-%m-%d').date()
treatment_day_end = datetime.strptime(sys.argv[3], '%Y-%m-%d').date()
print "Go for dates: " + treatment_day_start.strftime('%Y-%m-%d') + " -> " + treatment_day_end.strftime('%Y-%m-%d')
print "Source root dir: " + source_root
conf = SparkConf().setAppName("coverage_journeys_compiler")
sc = SparkContext(conf=conf)
statsLines = sc.emptyRDD()
treatment_day = treatment_day_start
while treatment_day <= treatment_day_end:
if source_root.startswith("/") and \
len(glob(source_root + '/' + treatment_day.strftime('%Y/%m/%d') + '/*.json.log*')) > 0:
statsLines = statsLines.union(sc.textFile(
source_root + '/' + treatment_day.strftime('%Y/%m/%d') + '/*.json.log*')
)
treatment_day += timedelta(days=1)
dayStats = statsLines.map(
lambda stat: json.loads(stat)
).filter(
lambda line: line["api"] == 'v1.journeys'
)
coverage_journeys = dayStats.flatMap(
get_journey_count_from_stat_dict
).reduceByKey(
lambda a, b: a+b
).filter(
lambda item: item[1] > 0 # Only keep items where nb_journeys > 0
).map(
lambda tuple_of_tuples: [str(element) for element in tuple_of_tuples[0]] + [str(tuple_of_tuples[1])]
).map(
lambda line: ";".join(line)
)
# print requests_calls.count()
# Store on FS
compiled_storage_rootdir = source_root + "/compiled/" + treatment_day_start.strftime('%Y/%m/%d')
compiled_storage_dir = compiled_storage_rootdir + "/coverage_journeys_" + treatment_day_start.strftime('%Y%m%d')
if source_root.startswith('/'): # In case of local file system do some preparation first
if os.path.isdir(compiled_storage_dir):
shutil.rmtree(compiled_storage_dir)
else:
if not os.path.isdir(compiled_storage_rootdir):
os.makedirs(compiled_storage_rootdir)
elif source_root.startswith('gs://'): # In case of Google Cloud Storage do some preparation first
pass
coverage_journeys.saveAsTextFile(compiled_storage_dir)