-
Notifications
You must be signed in to change notification settings - Fork 0
/
zmq_subscriber.py
135 lines (119 loc) · 4.36 KB
/
zmq_subscriber.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import zmq
import gzip
import json
import os
import time
import threading
from google.cloud import storage
# Directory to store the final JSON
base_directory = "OVfiets"
os.makedirs(base_directory, exist_ok=True)
# A dictionary to accumulate the data
combined_data = {}
def create_socket(context):
"""
Creates and returns a configured ZeroMQ SUB socket.
"""
socket = context.socket(zmq.SUB)
socket.connect("tcp://vid.openov.nl:6703")
topic = "/OVfiets"
print(f"Subscribing to topic {topic}")
socket.setsockopt_string(zmq.SUBSCRIBE, topic)
return socket
def write_combined_json():
file_path = os.path.join(base_directory, "combined_data.json")
to_write = list(combined_data.values())
with open(file_path, 'w', encoding='utf-8') as json_file:
json.dump(to_write, json_file, ensure_ascii=False)
print(f"Combined JSON saved to {file_path}")
def upload_to_gcs(source_file_name, destination_blob_name):
client = storage.Client()
bucket_name = os.getenv("PUBLIC_BUCKET_NAME")
print(f"Uploading {source_file_name} to bucket {bucket_name} as {destination_blob_name}.")
bucket = client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.cache_control = "no-cache, max-age=0"
blob.upload_from_filename(source_file_name)
print(f"File {source_file_name} uploaded to {destination_blob_name}.")
def filter_old_entries():
twoWeeksAgo = int(time.time()) - (14 * 24 * 60 * 60)
# Collect keys to remove (can't modify dict while iterating)
keys_to_remove = []
for location_code, data in combined_data.items():
fetch_time = data["extra"].get("fetchTime")
if fetch_time < twoWeeksAgo:
keys_to_remove.append(location_code)
# Remove the old entries
for key in keys_to_remove:
del combined_data[key]
print(f"Removed old location data: {key}")
write_timer = None
def save_and_upload():
global write_timer
write_combined_json()
upload_to_gcs(base_directory+'/combined_data.json', 'locations.json')
write_timer = None
def save_and_upload_delayed():
global write_timer
if write_timer is not None:
write_timer.cancel()
write_timer = threading.Timer(1.0, save_and_upload)
write_timer.start()
def get_useful_data(entry):
return {
# strip because Duiven has a space at the end of its description.
'description': entry.get('description', '').strip(),
'stationCode': entry.get('stationCode'),
'lat': entry.get('lat'),
'lng': entry.get('lng'),
'link': {'uri': entry['link']['uri']},
'extra': {
'locationCode': entry['extra']['locationCode'],
'fetchTime': entry['extra']['fetchTime'],
'rentalBikes': entry['extra'].get('rentalBikes', None)
},
'openingHours': entry.get('openingHours')
}
def receive_messages(socket):
"""
Handle incoming messages on the given socket and update combined_data.
"""
topic_received = socket.recv_string()
while True:
try:
message = socket.recv(flags=zmq.NOBLOCK)
decompressed_message = gzip.decompress(message)
message_str = decompressed_message.decode('utf-8')
json_data = json.loads(message_str)
location_code = topic_received.split("/")[-1]
print(f"Received data for {location_code} with fetchTime {json_data['extra']['fetchTime']}")
if 'rentalBikes' in json_data['extra']:
combined_data[location_code] = get_useful_data(json_data)
topic_received = socket.recv_string(flags=zmq.NOBLOCK)
except zmq.Again:
# No more messages available
return
# Main loop
try:
context = zmq.Context()
socket = create_socket(context)
while True:
try:
receive_messages(socket)
filter_old_entries()
save_and_upload_delayed()
except zmq.ZMQError:
print("Connection lost. Retrying in 5 minutes.")
socket.close()
context.term()
time.sleep(300) # 5 minutes * 60 = 300 seconds
context = zmq.Context()
socket = create_socket(context)
except KeyboardInterrupt:
print("Interrupted by user.")
finally:
# Clean up resources
if write_timer is not None:
write_timer.cancel()
socket.close()
context.term()