From 590d8aa3fa8519a89688d04bd27e46166738abf0 Mon Sep 17 00:00:00 2001
From: DivAgicha
Date: Mon, 17 May 2021 23:42:52 +0530
Subject: [PATCH] fixes: 1. added 'change search criteria' functionality 2.
modified user data display view (now appears in grid format) 3. added
exception handling in most use cases 4. handled unexpected script termination
upon having found no available centres (sessions) in first attempt 5. many
other small fixes as well
---
CovidVaccineChecker/__init__.py | 143 +++++++++++++++++++---------
schedule_vaccination_appointment.py | 73 ++++++++++----
2 files changed, 156 insertions(+), 60 deletions(-)
diff --git a/CovidVaccineChecker/__init__.py b/CovidVaccineChecker/__init__.py
index ff88905..3a1e68d 100644
--- a/CovidVaccineChecker/__init__.py
+++ b/CovidVaccineChecker/__init__.py
@@ -57,53 +57,84 @@ def __init__(self, mobile):
@staticmethod
- def display_table(dict_list):
+ def display_table(dict_list=None, user_config_data=None):
"""
This function
1. Takes a list of dictionary
2. Add an Index column, and
3. Displays the data in tabular format
"""
+ if user_config_data is not None:
+ # print(tabulate.tabulate(user_config_data, headers=['Variable', 'Value'], tablefmt="grid"))
+ print(tabulate.tabulate(user_config_data, tablefmt="grid"))
+ return
+
header = ["idx"] + list(dict_list[0].keys())
rows = [[idx + 1] + list(x.values()) for idx, x in enumerate(dict_list)]
print(tabulate.tabulate(rows, header, tablefmt="grid"))
- @staticmethod
- def displayConfigFileData(user_config_file):
+ def displayConfigFileData(self, user_config_file):
with open(user_config_file, 'r') as json_file:
user_data = json.load(json_file)
- display_keys = ['mobile', 'state_id', 'district_id', 'pincode_preferences', 'search_criteria',
- 'institution_preferences', 'slot_preference', 'appointment_date']
- for key in user_data.keys():
- if key in display_keys:
- if key in ['state_id', 'district_id']:
- print(f"{TextColors.SUCCESS}[+]{TextColors.ENDC} {TextColors.BOLD}{key.replace('_', ' ').title()}:{TextColors.ENDC} {user_data[key]}"
- f"\t\t\t{TextColors.UNDERLINE}({user_data['state_name'] if key == 'state_id' else user_data['district_name']}){TextColors.ENDC}")
- elif key == 'search_criteria':
- print(f"{TextColors.SUCCESS}[+]{TextColors.ENDC} {TextColors.BOLD}{key.replace('_', ' ').title()}:{TextColors.ENDC} {user_data[key]}\t\t\t{TextColors.UNDERLINE}(1: by Pincode, 2: by District){TextColors.ENDC}")
- else:
- print(f"{TextColors.SUCCESS}[+]{TextColors.ENDC} {TextColors.BOLD}{key.replace('_', ' ').title()}:{TextColors.ENDC} {user_data[key]}")
+ # display_keys = ['mobile', 'state_id', 'district_id', 'pincode_preferences', 'search_criteria',
+ # 'institution_preferences', 'slot_preference', 'appointment_date']
+ # for key in user_data.keys():
+ # if key in display_keys:
+ # if key in ['state_id', 'district_id']:
+ # print(f"{TextColors.SUCCESS}[+]{TextColors.ENDC} {TextColors.BOLD}{key.replace('_', ' ').title()}:{TextColors.ENDC} {user_data[key]}"
+ # f"\t\t\t{TextColors.UNDERLINE}({user_data['state_name'] if key == 'state_id' else user_data['district_name']}){TextColors.ENDC}")
+ # elif key == 'search_criteria':
+ # print(f"{TextColors.SUCCESS}[+]{TextColors.ENDC} {TextColors.BOLD}{key.replace('_', ' ').title()}:{TextColors.ENDC} {user_data[key]}\t\t\t{TextColors.UNDERLINE}(1: by Pincode, 2: by District){TextColors.ENDC}")
+ # else:
+ # print(f"{TextColors.SUCCESS}[+]{TextColors.ENDC} {TextColors.BOLD}{key.replace('_', ' ').title()}:{TextColors.ENDC} {user_data[key]}")
+
+ # display_keys = ['mobile', 'state_id', 'state_name', 'district_id', 'district_name', 'pincode_preferences', 'search_criteria',
+ # 'institution_preferences', 'slot_preference', 'appointment_date']
+ # self.display_table(user_config_data=[[f"{TextColors.WARNING}{key.replace('_', ' ').title()}{TextColors.ENDC}", value]
+ # for key, value in user_data.items() if key in display_keys])
+
+ display_keys = ['mobile', 'state_id', 'district_id', 'pincode_preferences', 'search_criteria',
+ 'institution_preferences', 'slot_preference', 'appointment_date']
+ key_value_list = list()
+ for key, value in user_data.items():
+ if key in display_keys:
+ if key == 'state_id':
+ key_value_list.append([f"{TextColors.WARNING}{key.replace('_', ' ').title()}{TextColors.ENDC}", f"{value}\t({user_data['state_name']})"])
+ elif key == 'district_id':
+ key_value_list.append([f"{TextColors.WARNING}{key.replace('_', ' ').title()}{TextColors.ENDC}", f"{value}\t({user_data['district_name']})"])
+ elif key == 'search_criteria':
+ key_value_list.append([f"{TextColors.WARNING}{key.replace('_', ' ').title()}{TextColors.ENDC}", f"{value}\t(1: by Pincode, 2: by District)"])
+ else:
+ key_value_list.append([f"{TextColors.WARNING}{key.replace('_', ' ').title()}{TextColors.ENDC}", value])
+ self.display_table(user_config_data=key_value_list)
- def save_user_config(self, user_config_file):
- self.user_data = {
- "mobile": self.mobile,
- "token": self.token,
- "state_id": self.state_id,
- "state_name": self.state_name,
- "district_id": self.district_id,
- "district_name": self.district_name,
- "pincode_preferences": self.pincode_preferences,
- "search_criteria": self.search_criteria,
- "institution_preferences": self.institution_preferences,
- "slot_preference": self.slot_preference,
- "appointment_date": self.appointment_date
- }
- with open(user_config_file, 'x') as json_file:
- json_file.write(json.dumps(self.user_data, indent=4))
+ def save_user_config(self, user_config_file):
+ try:
+ self.user_data = {
+ "mobile": self.mobile,
+ "token": self.token,
+ "state_id": self.state_id,
+ "state_name": self.state_name,
+ "district_id": self.district_id,
+ "district_name": self.district_name,
+ "pincode_preferences": self.pincode_preferences,
+ "search_criteria": self.search_criteria,
+ "institution_preferences": self.institution_preferences,
+ "slot_preference": self.slot_preference,
+ "appointment_date": self.appointment_date
+ }
+
+ with open(user_config_file, 'x') as json_file:
+ json_file.write(json.dumps(self.user_data, indent=4))
+ except Exception as e:
+ if os.path.exists(user_config_file):
+ os.remove(user_config_file)
+ print(f"\n{TextColors.FAIL}File could not be saved (message: {e}). Please try again...{TextColors.ENDC}")
+ exit(1)
def update_user_config(self, list_of_keys_to_update, value_list, user_config_file):
@@ -174,7 +205,7 @@ def create_new_user_config(self, user_config_file):
self.refreshToken(user_config_file, save_token_in_file=False)
while True:
- self.search_criteria = input("\nEnter search criteria ('1' for search by pincode, '2' for search by district): ")
+ self.search_criteria = input("\nEnter search criteria ('1' to search by pincode, '2' to search by district): ")
if self.search_criteria is not None or self.search_criteria.strip() != "":
self.search_criteria = int(self.search_criteria)
@@ -224,11 +255,10 @@ def create_new_user_config(self, user_config_file):
self.save_user_config(user_config_file)
- print("DONE")
-
- def changeAppointmentDate(self, user_config_file):
- self.use_existing_user_config(user_config_file) # to initialise all other variables too, before calling update_user_config()
+ def changeAppointmentDate(self, user_config_file, load_values_from_existing_config_first = True):
+ if load_values_from_existing_config_first:
+ self.use_existing_user_config(user_config_file) # to initialise all other variables too, before calling update_user_config()
date = input("\nEnter appointment date to check available slots for that date "
"(Format: dd-mm-yyyy, defaults to today if nothing is entered): ")
@@ -246,6 +276,25 @@ def changeAppointmentDate(self, user_config_file):
self.update_user_config(['appointment_date'], [self.appointment_date], user_config_file)
+ def changeSearchCriteria(self, user_config_file, load_values_from_existing_config_first = True):
+ if load_values_from_existing_config_first:
+ self.use_existing_user_config(user_config_file) # to initialise all other variables too, before calling update_user_config()
+
+ while True:
+ self.search_criteria = input("\nEnter search criteria ('1' to search by pincode, '2' to search by district): ")
+
+ if self.search_criteria is not None or self.search_criteria.strip() != "":
+ self.search_criteria = int(self.search_criteria)
+ if self.search_criteria in [1, 2]:
+ break
+ else:
+ print(f"\n{TextColors.FAIL}Invalid input! Please enter one of the above two choices{TextColors.ENDC}")
+ else:
+ print(f"\n{TextColors.FAIL}Invalid input! Please enter one of the above two choices{TextColors.ENDC}")
+
+ self.update_user_config(['search_criteria'], [self.search_criteria], user_config_file)
+
+
def refreshToken(self, user_config_file, save_token_in_file=True):
print(f"\n{TextColors.FAIL}Previous TOKEN Expired!!!{TextColors.ENDC}")
@@ -361,6 +410,7 @@ def getStateDistrictPincodePreferences(self):
def findCentresByPin(self):
+ # Returns empty list as response.json()['sessions']=[] if there are institutes present but are all booked or none of them have opened booking slots yet
params = {
"pincode": self.pincode_preferences[0],
"date": self.appointment_date,
@@ -378,6 +428,7 @@ def findCentresByPin(self):
def findCentresByDistrict(self):
+ # Returns empty list as response.json()['sessions']=[] if there are institutes present but are all booked or none of them have opened booking slots yet
params = {
"district_id": self.district_id,
"date": self.appointment_date,
@@ -397,12 +448,13 @@ def findCentresByDistrict(self):
def findCentresBySearchCriteria(self):
+ # Returns empty list as response.json()['sessions']=[] if there are institutes present but are all booked or none of them have opened booking slots yet
if self.search_criteria == 1:
- print(f"\n-->\tGetting list of centres for pincode '{self.pincode_preferences[0]}' (first pincode in your preferences), using date '{self.appointment_date}'")
+ print(f"\n-->\tGetting list of centres for pincode '{self.pincode_preferences[0]}' (first pincode in your preferences), using date '{self.appointment_date}'\n")
all_centres = self.findCentresByPin()
else:
- print(f"\n-->\tGetting list of centres for district '{self.district_name}', using date '{self.appointment_date}'")
+ print(f"\n-->\tGetting list of centres for district '{self.district_name}', using date '{self.appointment_date}'\n")
all_centres = self.findCentresByDistrict()
@@ -415,7 +467,8 @@ def get_beneficiaries(self):
try:
beneficiaries = response.json()['beneficiaries']
except Exception as e:
- print(f"{TextColors.FAIL}{TextColors.FAIL}FAILED ATTEMPT (message: {e}){TextColors.ENDC}{TextColors.ENDC} (response: {response.text})")
+ if "unauthenticated access" not in response.text.lower():
+ print(f"{TextColors.FAIL}{TextColors.FAIL}FAILED ATTEMPT (message: {e}){TextColors.ENDC}{TextColors.ENDC} (response: {response.text})")
beneficiaries = None
return beneficiaries, response.status_code
@@ -432,7 +485,7 @@ def get_appointment_details(appointment_dict):
def generate_captcha(self, user_config_file):
- print(f"\n{TextColors.HEADER}========================================= GETTING CAPTCHA ========================================={TextColors.ENDC}")
+ print(f"\n{TextColors.HEADER}===================================== GENERATING CAPTCHA ====================================={TextColors.ENDC}")
while True:
response = requests.request("POST", self.captcha_url, headers=self.auth_headers)
@@ -475,7 +528,11 @@ def schedule_appointment(self, all_centres, ref_ids, dose_number, min_age_limit,
appointment_booked_flag = False
appointment_id = None
- print(f"Ref. IDs to schedule booking for: {ref_ids}")
+ if len(all_centres) == 0:
+ print(f"\n{TextColors.FAIL}No vaccination centre found!{TextColors.ENDC}", end="")
+ return appointment_booked_flag, appointment_id
+
+ print(f"\nRef. IDs to schedule booking for: {ref_ids}")
for centre in all_centres:
print(f"\ntrying centre '{centre['name']}'\t{TextColors.BOLD}{TextColors.WARNING}(Min Age Limit: {centre['min_age_limit']}){TextColors.ENDC}...", end=" ")
@@ -488,7 +545,7 @@ def schedule_appointment(self, all_centres, ref_ids, dose_number, min_age_limit,
if centre['available_capacity'] >= len(ref_ids):
captcha = self.generate_captcha(user_config_file)
- print(f"{TextColors.BLACKONGREY}Entered Value: {captcha}{TextColors.ENDC}")
+ print(f"\n{TextColors.BLACKONGREY}Entered Captcha Value: {captcha}{TextColors.ENDC}")
payload = json.dumps({
"dose": dose_number,
@@ -504,7 +561,7 @@ def schedule_appointment(self, all_centres, ref_ids, dose_number, min_age_limit,
try:
appointment_id = response.json()['appointment_confirmation_no']
print(f"\n{TextColors.SUCCESS}[+]{TextColors.ENDC} SUCCESS: '{centre['name']}, {centre['address']}' centre successfully booked for {self.appointment_date} for selected beneficiaries")
- print(f"\nAppointment Confirmation Number: {appointment_id}")
+ print(f"\n{TextColors.BLACKONGREY}Appointment Confirmation Number: {appointment_id}{TextColors.ENDC}")
appointment_booked_flag = True
break
except Exception as e:
@@ -513,6 +570,6 @@ def schedule_appointment(self, all_centres, ref_ids, dose_number, min_age_limit,
else:
print(f"\n{TextColors.FAIL}FAILED ATTEMPT (message: {e}){TextColors.ENDC} (response: {response.text})")
else:
- print(f"{TextColors.FAIL}FAILED: Vaccine shots available are less than the number of beneficiaries selected{TextColors.ENDC}")
+ print(f"\n{TextColors.FAIL}FAILED: Vaccine shots available in this centre are less than the number of beneficiaries selected{TextColors.ENDC}")
return appointment_booked_flag, appointment_id
diff --git a/schedule_vaccination_appointment.py b/schedule_vaccination_appointment.py
index 34442bf..c281f03 100644
--- a/schedule_vaccination_appointment.py
+++ b/schedule_vaccination_appointment.py
@@ -1,4 +1,5 @@
import os
+import re
import sys
# import json
import time
@@ -17,7 +18,16 @@
`Y8bood8P' `Y8bod8P' `8' `8' o888o o8o `8 o88o o8888o o888o o888o
""")
-mobile = input("\n-->\tEnter mobile: ")
+mobile_number_pattern = re.compile("[7-9][0-9]{9}")
+
+while True:
+ mobile = input("\n-->\tEnter mobile: ")
+
+ if mobile is not None and mobile.strip() != "" and mobile_number_pattern.match(mobile.strip()):
+ mobile = mobile.strip()
+ break
+ else:
+ print(f"\n{TextColors.FAIL}Invalid input! Please enter correct mobile number (format: 10-digit number starting with either 7,8 or 9){TextColors.ENDC}")
cowinAPI = CoWINAPI(mobile)
@@ -30,25 +40,28 @@
user_config_file = os.path.join(cowinAPI.BASE_PROJECT_DIR, "user_data/user_config_" + mobile + ".json")
if os.path.exists(user_config_file):
- print(f"\nUser configuration file found for '{mobile}'! Listing details...")
+ print(f"\nUser configuration file found for '{mobile}'! Listing details...\n")
cowinAPI.displayConfigFileData(user_config_file)
while True:
answer = input(f"\n-->\tReady to go? {TextColors.WARNING}(Continue with existing configuration (y) / "
- f"Create new configuration (n) / Change only appointment date (c) / Quit (q)){TextColors.ENDC}: ")
+ f"Create new configuration (n) / Change appointment date (c) / Change search criteria (s) / Quit (q)){TextColors.ENDC}: ")
if answer.lower().strip() == 'y':
cowinAPI.use_existing_user_config(user_config_file)
break
+ elif answer.lower().strip() == 'n':
+ cowinAPI.create_new_user_config(user_config_file)
+ break
elif answer.lower().strip() == 'c':
cowinAPI.changeAppointmentDate(user_config_file)
break
- elif answer.lower().strip() == 'n':
- cowinAPI.create_new_user_config(user_config_file)
+ elif answer.lower().strip() == 's':
+ cowinAPI.changeSearchCriteria(user_config_file)
break
elif answer.lower().strip() == 'q':
- print(f"\n{TextColors.WARNING}Exiting program...{TextColors.ENDC}")
+ print(f"\nExiting program...")
exit(0)
else:
print(f"\n{TextColors.FAIL}Invalid input!{TextColors.ENDC}")
@@ -60,17 +73,34 @@
all_centres = cowinAPI.findCentresBySearchCriteria()
if len(all_centres) == 0:
- print(f"\n{TextColors.FAIL}FAILED: All centres are fully booked for the selected appointment date. "
- f"Try running the script again after changing date, district or pincode.{TextColors.ENDC}")
- exit(1)
+ print(f"{TextColors.FAIL}No Centre Found{TextColors.ENDC} (Either all centres are fully booked for the selected appointment date or slots aren't opened yet. "
+ f"You can also try running the script again after changing date, search criteria, district or pincode)")
-centres_list = [{"Name": centre['name'], "District": centre['district_name'], "Pincode": centre['pincode'], "Vaccine Name": centre['vaccine'],
- "Fee Type": centre['fee_type'], "Min Age": centre['min_age_limit'], "Available Capacity": centre['available_capacity'],
- "Slots": "\n".join(centre['slots'])} for centre in all_centres]
+ while True:
+ print(f"\n-->\t{TextColors.UNDERLINE}{TextColors.BOLD}Note{TextColors.ENDC}: {TextColors.WARNING}Continue with existing configuration "
+ f"only if you are sure that slots are gonna open in few minutes!{TextColors.ENDC}")
+ answer = input(f"\nEnter choice {TextColors.WARNING}(Continue with existing configuration (y) / "
+ f"Change appointment date (c) / Change search criteria (s)){TextColors.ENDC}: ")
-cowinAPI.display_table(centres_list)
+ if answer.lower().strip() == 'y':
+ cowinAPI.use_existing_user_config(user_config_file)
+ break
+ elif answer.lower().strip() == 'c':
+ cowinAPI.changeAppointmentDate(user_config_file, load_values_from_existing_config_first=False)
+ break
+ elif answer.lower().strip() == 's':
+ cowinAPI.changeSearchCriteria(user_config_file, load_values_from_existing_config_first=False)
+ break
+ else:
+ print(f"\n{TextColors.FAIL}Invalid input!{TextColors.ENDC}")
+else:
+ centres_list = [{"Name": centre['name'], "District": centre['district_name'], "Pincode": centre['pincode'], "Vaccine Name": centre['vaccine'],
+ "Fee Type": centre['fee_type'], "Min Age": centre['min_age_limit'], "Available Capacity": centre['available_capacity'],
+ "Slots": "\n".join(centre['slots'])} for centre in all_centres]
+
+ cowinAPI.display_table(centres_list)
-print(f"\n{TextColors.BLACKONGREY}Total Centres Found: {len(all_centres)}{TextColors.ENDC}")
+ print(f"\n{TextColors.BLACKONGREY}Total Centres Found: {len(all_centres)}{TextColors.ENDC}")
while True:
print(f"\n-->\tGetting beneficiaries registered with mobile number '{mobile}'\n")
@@ -93,10 +123,15 @@
print(f"\n{TextColors.BLACKONGREY}Total Beneficiaries Found: {len(beneficiaries)}{TextColors.ENDC}")
-ids_input = input(f"\nEnter comma-separated index of beneficiaries to schedule appointment for {TextColors.WARNING}(Enter '0' to select all){TextColors.ENDC}: ")
+ids_input = input(f"\nEnter comma-separated index of beneficiaries to schedule appointment for {TextColors.WARNING}(Enter '0' to select all or 'q' to quit and try after sometime){TextColors.ENDC}: ")
while True:
assert ids_input is not None and ids_input != ""
+
+ if ids_input.strip().lower() == 'q':
+ print("\nExiting program...")
+ exit(0)
+
reference_ids = ids_input.replace(" ", "").split(",")
if not isinstance(reference_ids[0], int):
@@ -111,7 +146,7 @@
if len(reference_ids) == 0:
print(f"\n{TextColors.FAIL}Please enter correct indexes to proceed to booking{TextColors.ENDC}")
- ids_input = input(f"\nEnter comma-separated index of beneficiaries to schedule appointment for {TextColors.WARNING}(Enter '0' to select all){TextColors.ENDC}: ")
+ ids_input = input(f"\nEnter comma-separated index of beneficiaries to schedule appointment for {TextColors.WARNING}(Enter '0' to select all or 'q' to quit and try after sometime){TextColors.ENDC}: ")
else:
break
@@ -161,7 +196,11 @@
sys.stdout.flush()
time.sleep(1)
all_centres = cowinAPI.findCentresBySearchCriteria()
- print(f"\n{TextColors.BLACKONGREY}Total Centres Found: {len(all_centres)}{TextColors.ENDC}", end="")
+ if len(all_centres) > 0:
+ print(f"{TextColors.BLACKONGREY}Total Centres Found: {len(all_centres)}{TextColors.ENDC}", end="")
+ else:
+ print(f"{TextColors.FAIL}No Centre Found{TextColors.ENDC} (Either all centres are fully booked for the selected appointment date or slots aren't opened yet. "
+ f"You can also try running the script again after changing date, search criteria, district or pincode)", end="")
time.sleep(1)
else:
break