-
Notifications
You must be signed in to change notification settings - Fork 0
/
update_cars.py
120 lines (101 loc) · 3.45 KB
/
update_cars.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import asyncio
from typing import List, Dict, Any
from pymongo import ASCENDING, IndexModel
from pymongo.collection import Collection
import updater
from db import MongoDBConnection
async def create_indexes(collection: Collection) -> None:
indexes = [
IndexModel([("month", ASCENDING), ("make", ASCENDING)]),
IndexModel([("month", ASCENDING)]),
IndexModel([("make", ASCENDING)]),
IndexModel([("fuel_type", ASCENDING)]),
IndexModel([("make", ASCENDING), ("fuel_type", ASCENDING)]),
IndexModel([("number", ASCENDING)]),
]
collection.create_indexes(indexes)
async def run_aggregations(collection: Collection) -> None:
aggregations = [
[
{"$match": {"number": ""}},
{"$set": {"number": 0}},
{
"$merge": {
"into": collection.name,
"on": "_id",
"whenMatched": "replace",
"whenNotMatched": "discard",
}
},
],
[
{
"$addFields": {
"make": {
"$replaceAll": {
"input": {
"$replaceAll": {
"input": "$make",
"find": ".",
"replacement": "",
}
},
"find": "-",
"replacement": " ",
}
},
"vehicle_type": {
"$replaceAll": {
"input": "$vehicle_type",
"find": "/ ",
"replacement": "/",
}
},
}
},
{
"$merge": {
"into": collection.name,
"on": "_id",
"whenMatched": "merge",
"whenNotMatched": "discard",
}
},
],
[
{"$addFields": {"make": {"$toUpper": "$make"}}},
{
"$merge": {
"into": collection.name,
"on": "_id",
"whenMatched": "merge",
"whenNotMatched": "discard",
}
},
],
]
for aggregation in aggregations:
collection.aggregate(aggregation)
async def main() -> Dict[str, Any]:
collection_name: str = "cars"
zip_file_name: str = "Monthly New Registration of Cars by Make.zip"
zip_url: str = (
f"https://datamall.lta.gov.sg/content/dam/datamall/datasets/Facts_Figures/Vehicle Registration/{zip_file_name}"
)
key_fields: List[str] = ["month"]
db = MongoDBConnection().database
collection = db[collection_name]
await create_indexes(collection)
message = await updater.main(collection_name, zip_file_name, zip_url, key_fields)
if message["inserted_count"] > 0:
print("Running aggregation...")
await run_aggregations(collection)
print("Aggregation complete.")
db.client.close()
return message
def handler(event, context):
print("Event:", event)
response = asyncio.run(main())
return {"statusCode": 200, "body": response}
if __name__ == "__main__":
asyncio.run(main())