-
Notifications
You must be signed in to change notification settings - Fork 1
/
Location_Service.py
128 lines (102 loc) · 4.83 KB
/
Location_Service.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import sys
from geopy.geocoders import Nominatim
from pymongo import MongoClient
from pymongo import ASCENDING
from helpers.Cache_Dump import Cache_Dump
from models.Location_Country_Pair import LocationCountryPair
from constants.Constants import *
from config.conf import MONGO
# Logging setup
import logging
import logstash
from config.conf import logstash_host, logstash_port
log = logging.getLogger(__name__)
log.setLevel(logging.INFO)
log.addHandler(logstash.TCPLogstashHandler(logstash_host, logstash_port, version=1))
sys.path.append('../resources')
class LocationService:
def __init__(self, cache_dump_interval):
"""
Construct LocationService.
Provide the time interval at which the current cache will be dumped to the database.
See Location_Service_Test.py for usage.
:param int cache_dump_interval: an integer to set the dump interval
:return: LocationService Object
"""
self.user_agent = USER_AGENT
self.local_cache = dict()
self.things_to_dump = dict()
self.geocoder = Nominatim(user_agent=self.user_agent)
# MongoDB setup
self.client = MongoClient(MONGO["URI_CACHE"])
self.cache_db = self.client["cache_db"]
self.lat_lon_cache = self.cache_db["lat_lon_cache"]
create_index_out = self.lat_lon_cache.create_index([('location_id', ASCENDING)], unique=True)
self.load_from_database()
self.cache_dump_interval = cache_dump_interval
log.info('LocationService is initialised')
self.dump_thread = Cache_Dump(self.lat_lon_cache, self.things_to_dump, self.cache_dump_interval)
self.dump_thread.setDaemon(True)
self.dump_thread.setName("Coordinates cache dumping thread")
self.dump_thread.start()
def load_from_database(self):
try:
_all_item = self.lat_lon_cache.find()
for item in _all_item:
log.debug('item is:' + str(item))
# location,country_code,lat,long
_loc = item['location_id'].split('__')
_cords = item['coords']
_key = LocationCountryPair(_loc[0].strip(), _loc[1].strip())
_val = {LAT: float(_cords[LAT]), LON: float(_cords[LON])}
self.local_cache[_key] = _val
except FileNotFoundError:
log.error("Load file. File not found: " + file_location)
log.info("Total number of entries in cache: " +str(len(self.local_cache)))
def get_coordinates_for_city(self, query):
"""
Method to get the geo coordinated for the query.
:param dict query: the query must be a python type dict. It must contain keys - CITY and COUNTRY which are defined in Constants.py
:return: A python type dict containing a LAT and LON as keys the each value is a floating point number.
In case if the API fails to get the geo coordinates for some weird reason, it prints the reason and returns an empty dict.
"""
_key = LocationCountryPair(query[CITY].strip().lower(), query[COUNTRY].strip().lower())
if _key in self.local_cache.keys():
log.debug("Location service using Cached results")
return self.local_cache.get(_key)
else:
try:
response = self.geocoder.geocode(query)
lat_lon = {}
if response is not None:
lat_lon[LAT] = response.latitude
lat_lon[LON] = response.longitude
self.cache_lat_lon(_key, lat_lon)
return lat_lon
else:
log.warning("Can not find lat lon for query:" + str(query))
return lat_lon
except Exception as ex:
log.error(str(ex))
return {}
def get_city_boundary(self, query):
"""
Method to get the geo coordinated for the query.
:param dict query: the query must be a python type dict. It must contain keys - CITY and COUNTRY which are defined in Constant.py
:return: A python type list containing a (LAT, LON) pairs at each index. The list is always of size 4. In case if the API
fails to get bounding box for some weird reason, it prints the reason and return an empty list.
"""
try:
response = Nominatim(user_agent=self.user_agent, bounded=True).geocode(query)
bounding_box = []
if response is not None and response.raw is not None:
return response.raw[BOUNDING_BOX]
else:
log.warning("Can not find lat lon for query:" + str(query))
return bounding_box
except Exception as ex:
log.error(str(ex))
return []
def cache_lat_lon(self, key, lat_lon):
self.things_to_dump[key] = lat_lon
self.local_cache[key] = lat_lon