Skip to content

Commit

Permalink
fixes: 1. added 'change search criteria' functionality
Browse files Browse the repository at this point in the history
2. modified user data display view (now appears in grid format)
3. added exception handling in most use cases
4. handled unexpected script termination upon having found no available centres (sessions) in first attempt
5. many other small fixes as well
  • Loading branch information
divagicha committed May 17, 2021
1 parent c2fb1d5 commit 590d8aa
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 60 deletions.
143 changes: 100 additions & 43 deletions CovidVaccineChecker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,53 +57,84 @@ def __init__(self, mobile):


@staticmethod
def display_table(dict_list):
def display_table(dict_list=None, user_config_data=None):
"""
This function
1. Takes a list of dictionary
2. Add an Index column, and
3. Displays the data in tabular format
"""
if user_config_data is not None:
# print(tabulate.tabulate(user_config_data, headers=['Variable', 'Value'], tablefmt="grid"))
print(tabulate.tabulate(user_config_data, tablefmt="grid"))
return

header = ["idx"] + list(dict_list[0].keys())
rows = [[idx + 1] + list(x.values()) for idx, x in enumerate(dict_list)]
print(tabulate.tabulate(rows, header, tablefmt="grid"))


@staticmethod
def displayConfigFileData(user_config_file):
def displayConfigFileData(self, user_config_file):
with open(user_config_file, 'r') as json_file:
user_data = json.load(json_file)

display_keys = ['mobile', 'state_id', 'district_id', 'pincode_preferences', 'search_criteria',
'institution_preferences', 'slot_preference', 'appointment_date']
for key in user_data.keys():
if key in display_keys:
if key in ['state_id', 'district_id']:
print(f"{TextColors.SUCCESS}[+]{TextColors.ENDC} {TextColors.BOLD}{key.replace('_', ' ').title()}:{TextColors.ENDC} {user_data[key]}"
f"\t\t\t{TextColors.UNDERLINE}({user_data['state_name'] if key == 'state_id' else user_data['district_name']}){TextColors.ENDC}")
elif key == 'search_criteria':
print(f"{TextColors.SUCCESS}[+]{TextColors.ENDC} {TextColors.BOLD}{key.replace('_', ' ').title()}:{TextColors.ENDC} {user_data[key]}\t\t\t{TextColors.UNDERLINE}(1: by Pincode, 2: by District){TextColors.ENDC}")
else:
print(f"{TextColors.SUCCESS}[+]{TextColors.ENDC} {TextColors.BOLD}{key.replace('_', ' ').title()}:{TextColors.ENDC} {user_data[key]}")
# display_keys = ['mobile', 'state_id', 'district_id', 'pincode_preferences', 'search_criteria',
# 'institution_preferences', 'slot_preference', 'appointment_date']
# for key in user_data.keys():
# if key in display_keys:
# if key in ['state_id', 'district_id']:
# print(f"{TextColors.SUCCESS}[+]{TextColors.ENDC} {TextColors.BOLD}{key.replace('_', ' ').title()}:{TextColors.ENDC} {user_data[key]}"
# f"\t\t\t{TextColors.UNDERLINE}({user_data['state_name'] if key == 'state_id' else user_data['district_name']}){TextColors.ENDC}")
# elif key == 'search_criteria':
# print(f"{TextColors.SUCCESS}[+]{TextColors.ENDC} {TextColors.BOLD}{key.replace('_', ' ').title()}:{TextColors.ENDC} {user_data[key]}\t\t\t{TextColors.UNDERLINE}(1: by Pincode, 2: by District){TextColors.ENDC}")
# else:
# print(f"{TextColors.SUCCESS}[+]{TextColors.ENDC} {TextColors.BOLD}{key.replace('_', ' ').title()}:{TextColors.ENDC} {user_data[key]}")

# display_keys = ['mobile', 'state_id', 'state_name', 'district_id', 'district_name', 'pincode_preferences', 'search_criteria',
# 'institution_preferences', 'slot_preference', 'appointment_date']
# self.display_table(user_config_data=[[f"{TextColors.WARNING}{key.replace('_', ' ').title()}{TextColors.ENDC}", value]
# for key, value in user_data.items() if key in display_keys])

display_keys = ['mobile', 'state_id', 'district_id', 'pincode_preferences', 'search_criteria',
'institution_preferences', 'slot_preference', 'appointment_date']
key_value_list = list()
for key, value in user_data.items():
if key in display_keys:
if key == 'state_id':
key_value_list.append([f"{TextColors.WARNING}{key.replace('_', ' ').title()}{TextColors.ENDC}", f"{value}\t({user_data['state_name']})"])
elif key == 'district_id':
key_value_list.append([f"{TextColors.WARNING}{key.replace('_', ' ').title()}{TextColors.ENDC}", f"{value}\t({user_data['district_name']})"])
elif key == 'search_criteria':
key_value_list.append([f"{TextColors.WARNING}{key.replace('_', ' ').title()}{TextColors.ENDC}", f"{value}\t(1: by Pincode, 2: by District)"])
else:
key_value_list.append([f"{TextColors.WARNING}{key.replace('_', ' ').title()}{TextColors.ENDC}", value])

self.display_table(user_config_data=key_value_list)

def save_user_config(self, user_config_file):
self.user_data = {
"mobile": self.mobile,
"token": self.token,
"state_id": self.state_id,
"state_name": self.state_name,
"district_id": self.district_id,
"district_name": self.district_name,
"pincode_preferences": self.pincode_preferences,
"search_criteria": self.search_criteria,
"institution_preferences": self.institution_preferences,
"slot_preference": self.slot_preference,
"appointment_date": self.appointment_date
}

with open(user_config_file, 'x') as json_file:
json_file.write(json.dumps(self.user_data, indent=4))
def save_user_config(self, user_config_file):
try:
self.user_data = {
"mobile": self.mobile,
"token": self.token,
"state_id": self.state_id,
"state_name": self.state_name,
"district_id": self.district_id,
"district_name": self.district_name,
"pincode_preferences": self.pincode_preferences,
"search_criteria": self.search_criteria,
"institution_preferences": self.institution_preferences,
"slot_preference": self.slot_preference,
"appointment_date": self.appointment_date
}

with open(user_config_file, 'x') as json_file:
json_file.write(json.dumps(self.user_data, indent=4))
except Exception as e:
if os.path.exists(user_config_file):
os.remove(user_config_file)
print(f"\n{TextColors.FAIL}File could not be saved (message: {e}). Please try again...{TextColors.ENDC}")
exit(1)


def update_user_config(self, list_of_keys_to_update, value_list, user_config_file):
Expand Down Expand Up @@ -174,7 +205,7 @@ def create_new_user_config(self, user_config_file):
self.refreshToken(user_config_file, save_token_in_file=False)

while True:
self.search_criteria = input("\nEnter search criteria ('1' for search by pincode, '2' for search by district): ")
self.search_criteria = input("\nEnter search criteria ('1' to search by pincode, '2' to search by district): ")

if self.search_criteria is not None or self.search_criteria.strip() != "":
self.search_criteria = int(self.search_criteria)
Expand Down Expand Up @@ -224,11 +255,10 @@ def create_new_user_config(self, user_config_file):

self.save_user_config(user_config_file)

print("DONE")


def changeAppointmentDate(self, user_config_file):
self.use_existing_user_config(user_config_file) # to initialise all other variables too, before calling update_user_config()
def changeAppointmentDate(self, user_config_file, load_values_from_existing_config_first = True):
if load_values_from_existing_config_first:
self.use_existing_user_config(user_config_file) # to initialise all other variables too, before calling update_user_config()

date = input("\nEnter appointment date to check available slots for that date "
"(Format: dd-mm-yyyy, defaults to today if nothing is entered): ")
Expand All @@ -246,6 +276,25 @@ def changeAppointmentDate(self, user_config_file):
self.update_user_config(['appointment_date'], [self.appointment_date], user_config_file)


def changeSearchCriteria(self, user_config_file, load_values_from_existing_config_first = True):
if load_values_from_existing_config_first:
self.use_existing_user_config(user_config_file) # to initialise all other variables too, before calling update_user_config()

while True:
self.search_criteria = input("\nEnter search criteria ('1' to search by pincode, '2' to search by district): ")

if self.search_criteria is not None or self.search_criteria.strip() != "":
self.search_criteria = int(self.search_criteria)
if self.search_criteria in [1, 2]:
break
else:
print(f"\n{TextColors.FAIL}Invalid input! Please enter one of the above two choices{TextColors.ENDC}")
else:
print(f"\n{TextColors.FAIL}Invalid input! Please enter one of the above two choices{TextColors.ENDC}")

self.update_user_config(['search_criteria'], [self.search_criteria], user_config_file)


def refreshToken(self, user_config_file, save_token_in_file=True):
print(f"\n{TextColors.FAIL}Previous TOKEN Expired!!!{TextColors.ENDC}")

Expand Down Expand Up @@ -361,6 +410,7 @@ def getStateDistrictPincodePreferences(self):


def findCentresByPin(self):
# Returns empty list as response.json()['sessions']=[] if there are institutes present but are all booked or none of them have opened booking slots yet
params = {
"pincode": self.pincode_preferences[0],
"date": self.appointment_date,
Expand All @@ -378,6 +428,7 @@ def findCentresByPin(self):


def findCentresByDistrict(self):
# Returns empty list as response.json()['sessions']=[] if there are institutes present but are all booked or none of them have opened booking slots yet
params = {
"district_id": self.district_id,
"date": self.appointment_date,
Expand All @@ -397,12 +448,13 @@ def findCentresByDistrict(self):


def findCentresBySearchCriteria(self):
# Returns empty list as response.json()['sessions']=[] if there are institutes present but are all booked or none of them have opened booking slots yet
if self.search_criteria == 1:
print(f"\n-->\tGetting list of centres for pincode '{self.pincode_preferences[0]}' (first pincode in your preferences), using date '{self.appointment_date}'")
print(f"\n-->\tGetting list of centres for pincode '{self.pincode_preferences[0]}' (first pincode in your preferences), using date '{self.appointment_date}'\n")

all_centres = self.findCentresByPin()
else:
print(f"\n-->\tGetting list of centres for district '{self.district_name}', using date '{self.appointment_date}'")
print(f"\n-->\tGetting list of centres for district '{self.district_name}', using date '{self.appointment_date}'\n")

all_centres = self.findCentresByDistrict()

Expand All @@ -415,7 +467,8 @@ def get_beneficiaries(self):
try:
beneficiaries = response.json()['beneficiaries']
except Exception as e:
print(f"{TextColors.FAIL}{TextColors.FAIL}FAILED ATTEMPT (message: {e}){TextColors.ENDC}{TextColors.ENDC} (response: {response.text})")
if "unauthenticated access" not in response.text.lower():
print(f"{TextColors.FAIL}{TextColors.FAIL}FAILED ATTEMPT (message: {e}){TextColors.ENDC}{TextColors.ENDC} (response: {response.text})")
beneficiaries = None

return beneficiaries, response.status_code
Expand All @@ -432,7 +485,7 @@ def get_appointment_details(appointment_dict):


def generate_captcha(self, user_config_file):
print(f"\n{TextColors.HEADER}========================================= GETTING CAPTCHA ========================================={TextColors.ENDC}")
print(f"\n{TextColors.HEADER}===================================== GENERATING CAPTCHA ====================================={TextColors.ENDC}")

while True:
response = requests.request("POST", self.captcha_url, headers=self.auth_headers)
Expand Down Expand Up @@ -475,7 +528,11 @@ def schedule_appointment(self, all_centres, ref_ids, dose_number, min_age_limit,
appointment_booked_flag = False
appointment_id = None

print(f"Ref. IDs to schedule booking for: {ref_ids}")
if len(all_centres) == 0:
print(f"\n{TextColors.FAIL}No vaccination centre found!{TextColors.ENDC}", end="")
return appointment_booked_flag, appointment_id

print(f"\nRef. IDs to schedule booking for: {ref_ids}")

for centre in all_centres:
print(f"\ntrying centre '{centre['name']}'\t{TextColors.BOLD}{TextColors.WARNING}(Min Age Limit: {centre['min_age_limit']}){TextColors.ENDC}...", end=" ")
Expand All @@ -488,7 +545,7 @@ def schedule_appointment(self, all_centres, ref_ids, dose_number, min_age_limit,
if centre['available_capacity'] >= len(ref_ids):
captcha = self.generate_captcha(user_config_file)

print(f"{TextColors.BLACKONGREY}Entered Value: {captcha}{TextColors.ENDC}")
print(f"\n{TextColors.BLACKONGREY}Entered Captcha Value: {captcha}{TextColors.ENDC}")

payload = json.dumps({
"dose": dose_number,
Expand All @@ -504,7 +561,7 @@ def schedule_appointment(self, all_centres, ref_ids, dose_number, min_age_limit,
try:
appointment_id = response.json()['appointment_confirmation_no']
print(f"\n{TextColors.SUCCESS}[+]{TextColors.ENDC} SUCCESS: '{centre['name']}, {centre['address']}' centre successfully booked for {self.appointment_date} for selected beneficiaries")
print(f"\nAppointment Confirmation Number: {appointment_id}")
print(f"\n{TextColors.BLACKONGREY}Appointment Confirmation Number: {appointment_id}{TextColors.ENDC}")
appointment_booked_flag = True
break
except Exception as e:
Expand All @@ -513,6 +570,6 @@ def schedule_appointment(self, all_centres, ref_ids, dose_number, min_age_limit,
else:
print(f"\n{TextColors.FAIL}FAILED ATTEMPT (message: {e}){TextColors.ENDC} (response: {response.text})")
else:
print(f"{TextColors.FAIL}FAILED: Vaccine shots available are less than the number of beneficiaries selected{TextColors.ENDC}")
print(f"\n{TextColors.FAIL}FAILED: Vaccine shots available in this centre are less than the number of beneficiaries selected{TextColors.ENDC}")

return appointment_booked_flag, appointment_id
Loading

0 comments on commit 590d8aa

Please sign in to comment.