diff --git a/egy.py b/egy.py index 1e3eb76..8e07c84 100644 --- a/egy.py +++ b/egy.py @@ -28,36 +28,37 @@ def get_target_url(): while True: - user_input = input("Enter the target URL (e.g., https://example.com): ") - - if not re.match(r"https?://", user_input): + user_input = input("Enter the target URL (e.g., https://example.com): ").strip() + if not user_input: + print("URL cannot be empty. Please try again.") + continue + + # Ensure the URL starts with http:// or https:// + if not re.match(r"^https?://", user_input): + print("Adding 'http://' to your URL for a proper format.") user_input = "http://" + user_input try: response = requests.get(user_input) - response.raise_for_status() + response.raise_for_status() # Raises an HTTPError for bad responses + print("Successfully connected to the URL.") return user_input except requests.exceptions.MissingSchema: print( - "Invalid URL format. Make sure to include 'http://' or 'https://'." - " Please try again." + "Invalid URL format. Please include 'http://' or 'https://'. Try again." ) except requests.exceptions.HTTPError as e: - if response.status_code == 403: - print( - f"Access to the URL is forbidden (HTTP 403 Forbidden): {user_input}" - ) - else: - print( - f"HTTP error {response.status_code} occurred while connecting to" - f" the URL: {e}" - ) + print( + f"HTTP error {response.status_code} occurred while connecting to the URL: {str(e)}" + ) + except requests.exceptions.ConnectionError: + print( + "Failed to connect to the URL. Please check your connection or the URL and try again." + ) except requests.exceptions.RequestException as e: - print(f"Unable to connect to the URL: {e}") + print(f"An error occurred while connecting to the URL: {str(e)}") except Exception as e: - print(f"An error occurred: {e}") - - init(autoreset=True) + print(f"An unexpected error occurred: {str(e)}") PAYLOADS = [ @@ -2315,28 +2316,34 @@ def inject_payloads_into_params(url): def save_vulnerable_urls(vulnerable_urls): + """Append found vulnerable URLs to a file.""" with open("vulnerable_urls.txt", "a") as file: for url in vulnerable_urls: file.write(url + "\n") def print_colorful(message, color=Fore.GREEN): + """Print messages in color.""" print(color + message + Style.RESET_ALL) def print_warning(message): - print_colorful("\n[Bingo]" + message, Fore.CYAN) + """Print warnings in cyan.""" + print_colorful("\n[Bingo] " + message, Fore.CYAN) def print_error(message): - print_colorful("[Error]" + message, Fore.RED) + """Print errors in red.""" + print_colorful("[Error] " + message, Fore.RED) def print_info(message): - print_colorful("[Info]" + message, Fore.MAGENTA) + """Print information in magenta.""" + print_colorful("[Info] " + message, Fore.MAGENTA) def collect_urls_from_file(file_path): + """Collect URLs from a given file path.""" try: with open(file_path, "r") as file: websites = file.read().splitlines() @@ -2347,6 +2354,7 @@ def collect_urls_from_file(file_path): def get_target_url(): + """Prompt user to enter a URL and parse it to ensure it includes a scheme.""" target_url = input("Enter the target URL to scan for vulnerabilities: ") parsed_url = urlparse(target_url) if not parsed_url.scheme: @@ -2354,143 +2362,79 @@ def get_target_url(): return target_url -def create_session(cookies=None): - session = requests.Session() - session.verify = True - session.headers = { - "User-Agent": random.choice(USER_AGENTS), - } - if cookies: - session.headers["Cookie"] = cookies - return session - - def main(): print_logo() - while True: user_choice = input( - "Choose an option:\n1. Enter the target URL to scan for vulnerabilities\n2." - " Load a list of websites from a txt file\nEnter your choice (1 or 2): " + "Choose an option:\n1. Enter the target URL to scan for vulnerabilities\n2. Load a list of websites from a txt file\nEnter your choice (1 or 2): " ) - if user_choice == "1": target_url = get_target_url() - scan_user_dashboard = input( - "Do you want to scan inside the user dashboard? (yes/no): " - ).lower() - - if scan_user_dashboard not in ["yes", "no"]: - print_error("Invalid input. Please enter 'yes' or 'no'.") - continue - - if scan_user_dashboard == "yes": - request_file = input( - "Please enter the path or name of the request file: " - ) - try: - with open(request_file, "r") as file: - request_content = file.read() - headers, body = request_content.split("\n\n \n", 1) - cookies = headers.split("Cookie: ")[1].strip() - headers = headers.split("\n")[1:] - session = create_session(cookies) - except FileNotFoundError: - print_error("File not found. Please enter a valid file path.") - continue - except Exception as e: - print_error( - f"Error occurred while processing the request file: {e}" - ) - continue - else: - session = create_session() - - try: - print_info("Collecting URLs from the target website...") - urls = collect_urls(target_url) - - print(f"Found {len(urls)} URLs to scan.") - - print_info("Scanning collected URLs for vulnerabilities...") - vulnerable_urls = set() - - with concurrent.futures.ThreadPoolExecutor() as executor: - futures = [ - executor.submit( - scan_for_vulnerabilities, url, PAYLOADS, vulnerable_urls - ) - for url in urls - ] - for future in tqdm( - concurrent.futures.as_completed(futures), - total=len(futures), - desc="Scanning Website", - unit="URL", - ): - try: - future.result() - except Exception as e: - print_error(f"Error occurred while scanning URL: {e}") - - print_info("Scanning completed!") - - save_vulnerable_urls(vulnerable_urls) - print_info("Vulnerable URLs saved to 'vulnerable_urls.txt'.") - break - - except Exception as e: - print_error(f"Error occurred during the scan process: {e}") + urls = collect_urls(target_url) + print(f"Found {len(urls)} URLs to scan.") + vulnerable_urls = scan_urls(urls) + save_vulnerable_urls(vulnerable_urls) + break elif user_choice == "2": file_path = input( "Enter the path of the txt file containing the list of websites: " ) websites = collect_urls_from_file(file_path) - if not websites: print_error("No websites loaded from the file.") continue + print_info(f"Loaded {len(websites)} websites from the file.") + vulnerable_urls = scan_websites(websites) + save_vulnerable_urls(vulnerable_urls) + break + else: + print_error("Invalid choice. Please choose option 1 or 2.") - try: - print_info(f"Loaded {len(websites)} websites from the file.") - - print_info("Scanning websites from the file...") - vulnerable_urls = set() - - with concurrent.futures.ThreadPoolExecutor() as executor: - futures = [ - executor.submit(collect_urls, website) for website in websites - ] - for future in tqdm( - concurrent.futures.as_completed(futures), - total=len(futures), - desc="Scanning Websites", - unit="Website", - ): - try: - urls = future.result() - for url in urls: - executor.submit( - scan_for_vulnerabilities, - url, - PAYLOADS, - vulnerable_urls, - ) - except Exception as e: - print_error(f"Error occurred while scanning website: {e}") - - print_info("Scanning completed!") - - save_vulnerable_urls(vulnerable_urls) - print_info("Vulnerable URLs saved to 'vulnerable_urls.txt'.") - break +def scan_urls(urls): + print_info("Scanning collected URLs for vulnerabilities...") + vulnerable_urls = set() + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = [ + executor.submit(scan_for_vulnerabilities, url, PAYLOADS, vulnerable_urls) + for url in urls + ] + for future in tqdm( + concurrent.futures.as_completed(futures), + total=len(futures), + desc="Scanning Website", + unit="URL", + ): + try: + future.result() except Exception as e: - print_error(f"Error occurred during the scan process: {e}") + print_error(f"Error occurred while scanning URL: {e}") + print_info("Scanning completed!") + return vulnerable_urls - else: - print_error("Invalid choice. Please choose option 1 or 2.") + +def scan_websites(websites): + print_info("Scanning websites from the file...") + vulnerable_urls = set() + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = [executor.submit(collect_urls, website) for website in websites] + for future in tqdm( + concurrent.futures.as_completed(futures), + total=len(futures), + desc="Scanning Websites", + unit="Website", + ): + try: + urls = future.result() + for url in urls: + executor.submit( + scan_for_vulnerabilities, url, PAYLOADS, vulnerable_urls + ) + except Exception as e: + print_error(f"Error occurred while scanning website: {e}") + print_info("Scanning completed!") + return vulnerable_urls if __name__ == "__main__": diff --git a/egyscan.py b/egyscan.py index 1e3eb76..8e07c84 100644 --- a/egyscan.py +++ b/egyscan.py @@ -28,36 +28,37 @@ def get_target_url(): while True: - user_input = input("Enter the target URL (e.g., https://example.com): ") - - if not re.match(r"https?://", user_input): + user_input = input("Enter the target URL (e.g., https://example.com): ").strip() + if not user_input: + print("URL cannot be empty. Please try again.") + continue + + # Ensure the URL starts with http:// or https:// + if not re.match(r"^https?://", user_input): + print("Adding 'http://' to your URL for a proper format.") user_input = "http://" + user_input try: response = requests.get(user_input) - response.raise_for_status() + response.raise_for_status() # Raises an HTTPError for bad responses + print("Successfully connected to the URL.") return user_input except requests.exceptions.MissingSchema: print( - "Invalid URL format. Make sure to include 'http://' or 'https://'." - " Please try again." + "Invalid URL format. Please include 'http://' or 'https://'. Try again." ) except requests.exceptions.HTTPError as e: - if response.status_code == 403: - print( - f"Access to the URL is forbidden (HTTP 403 Forbidden): {user_input}" - ) - else: - print( - f"HTTP error {response.status_code} occurred while connecting to" - f" the URL: {e}" - ) + print( + f"HTTP error {response.status_code} occurred while connecting to the URL: {str(e)}" + ) + except requests.exceptions.ConnectionError: + print( + "Failed to connect to the URL. Please check your connection or the URL and try again." + ) except requests.exceptions.RequestException as e: - print(f"Unable to connect to the URL: {e}") + print(f"An error occurred while connecting to the URL: {str(e)}") except Exception as e: - print(f"An error occurred: {e}") - - init(autoreset=True) + print(f"An unexpected error occurred: {str(e)}") PAYLOADS = [ @@ -2315,28 +2316,34 @@ def inject_payloads_into_params(url): def save_vulnerable_urls(vulnerable_urls): + """Append found vulnerable URLs to a file.""" with open("vulnerable_urls.txt", "a") as file: for url in vulnerable_urls: file.write(url + "\n") def print_colorful(message, color=Fore.GREEN): + """Print messages in color.""" print(color + message + Style.RESET_ALL) def print_warning(message): - print_colorful("\n[Bingo]" + message, Fore.CYAN) + """Print warnings in cyan.""" + print_colorful("\n[Bingo] " + message, Fore.CYAN) def print_error(message): - print_colorful("[Error]" + message, Fore.RED) + """Print errors in red.""" + print_colorful("[Error] " + message, Fore.RED) def print_info(message): - print_colorful("[Info]" + message, Fore.MAGENTA) + """Print information in magenta.""" + print_colorful("[Info] " + message, Fore.MAGENTA) def collect_urls_from_file(file_path): + """Collect URLs from a given file path.""" try: with open(file_path, "r") as file: websites = file.read().splitlines() @@ -2347,6 +2354,7 @@ def collect_urls_from_file(file_path): def get_target_url(): + """Prompt user to enter a URL and parse it to ensure it includes a scheme.""" target_url = input("Enter the target URL to scan for vulnerabilities: ") parsed_url = urlparse(target_url) if not parsed_url.scheme: @@ -2354,143 +2362,79 @@ def get_target_url(): return target_url -def create_session(cookies=None): - session = requests.Session() - session.verify = True - session.headers = { - "User-Agent": random.choice(USER_AGENTS), - } - if cookies: - session.headers["Cookie"] = cookies - return session - - def main(): print_logo() - while True: user_choice = input( - "Choose an option:\n1. Enter the target URL to scan for vulnerabilities\n2." - " Load a list of websites from a txt file\nEnter your choice (1 or 2): " + "Choose an option:\n1. Enter the target URL to scan for vulnerabilities\n2. Load a list of websites from a txt file\nEnter your choice (1 or 2): " ) - if user_choice == "1": target_url = get_target_url() - scan_user_dashboard = input( - "Do you want to scan inside the user dashboard? (yes/no): " - ).lower() - - if scan_user_dashboard not in ["yes", "no"]: - print_error("Invalid input. Please enter 'yes' or 'no'.") - continue - - if scan_user_dashboard == "yes": - request_file = input( - "Please enter the path or name of the request file: " - ) - try: - with open(request_file, "r") as file: - request_content = file.read() - headers, body = request_content.split("\n\n \n", 1) - cookies = headers.split("Cookie: ")[1].strip() - headers = headers.split("\n")[1:] - session = create_session(cookies) - except FileNotFoundError: - print_error("File not found. Please enter a valid file path.") - continue - except Exception as e: - print_error( - f"Error occurred while processing the request file: {e}" - ) - continue - else: - session = create_session() - - try: - print_info("Collecting URLs from the target website...") - urls = collect_urls(target_url) - - print(f"Found {len(urls)} URLs to scan.") - - print_info("Scanning collected URLs for vulnerabilities...") - vulnerable_urls = set() - - with concurrent.futures.ThreadPoolExecutor() as executor: - futures = [ - executor.submit( - scan_for_vulnerabilities, url, PAYLOADS, vulnerable_urls - ) - for url in urls - ] - for future in tqdm( - concurrent.futures.as_completed(futures), - total=len(futures), - desc="Scanning Website", - unit="URL", - ): - try: - future.result() - except Exception as e: - print_error(f"Error occurred while scanning URL: {e}") - - print_info("Scanning completed!") - - save_vulnerable_urls(vulnerable_urls) - print_info("Vulnerable URLs saved to 'vulnerable_urls.txt'.") - break - - except Exception as e: - print_error(f"Error occurred during the scan process: {e}") + urls = collect_urls(target_url) + print(f"Found {len(urls)} URLs to scan.") + vulnerable_urls = scan_urls(urls) + save_vulnerable_urls(vulnerable_urls) + break elif user_choice == "2": file_path = input( "Enter the path of the txt file containing the list of websites: " ) websites = collect_urls_from_file(file_path) - if not websites: print_error("No websites loaded from the file.") continue + print_info(f"Loaded {len(websites)} websites from the file.") + vulnerable_urls = scan_websites(websites) + save_vulnerable_urls(vulnerable_urls) + break + else: + print_error("Invalid choice. Please choose option 1 or 2.") - try: - print_info(f"Loaded {len(websites)} websites from the file.") - - print_info("Scanning websites from the file...") - vulnerable_urls = set() - - with concurrent.futures.ThreadPoolExecutor() as executor: - futures = [ - executor.submit(collect_urls, website) for website in websites - ] - for future in tqdm( - concurrent.futures.as_completed(futures), - total=len(futures), - desc="Scanning Websites", - unit="Website", - ): - try: - urls = future.result() - for url in urls: - executor.submit( - scan_for_vulnerabilities, - url, - PAYLOADS, - vulnerable_urls, - ) - except Exception as e: - print_error(f"Error occurred while scanning website: {e}") - - print_info("Scanning completed!") - - save_vulnerable_urls(vulnerable_urls) - print_info("Vulnerable URLs saved to 'vulnerable_urls.txt'.") - break +def scan_urls(urls): + print_info("Scanning collected URLs for vulnerabilities...") + vulnerable_urls = set() + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = [ + executor.submit(scan_for_vulnerabilities, url, PAYLOADS, vulnerable_urls) + for url in urls + ] + for future in tqdm( + concurrent.futures.as_completed(futures), + total=len(futures), + desc="Scanning Website", + unit="URL", + ): + try: + future.result() except Exception as e: - print_error(f"Error occurred during the scan process: {e}") + print_error(f"Error occurred while scanning URL: {e}") + print_info("Scanning completed!") + return vulnerable_urls - else: - print_error("Invalid choice. Please choose option 1 or 2.") + +def scan_websites(websites): + print_info("Scanning websites from the file...") + vulnerable_urls = set() + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = [executor.submit(collect_urls, website) for website in websites] + for future in tqdm( + concurrent.futures.as_completed(futures), + total=len(futures), + desc="Scanning Websites", + unit="Website", + ): + try: + urls = future.result() + for url in urls: + executor.submit( + scan_for_vulnerabilities, url, PAYLOADS, vulnerable_urls + ) + except Exception as e: + print_error(f"Error occurred while scanning website: {e}") + print_info("Scanning completed!") + return vulnerable_urls if __name__ == "__main__":