diff --git a/Makefile b/Makefile index 534ddf9b..1e0bdf0f 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ LINT_FILES=maigret wizard.py tests test: - coverage run --source=./maigret -m pytest tests + coverage run --source=./maigret,./maigret/web -m pytest tests coverage report -m coverage html diff --git a/maigret/maigret.py b/maigret/maigret.py index 133c23c5..01620eff 100755 --- a/maigret/maigret.py +++ b/maigret/maigret.py @@ -330,11 +330,9 @@ def setup_arguments_parser(settings: Settings): type=int, nargs='?', const=5000, # default if --web is provided without a port - default=None, + default=settings.web_interface_port, help="Launches the web interface on the specified port (default: 5000 if no PORT is provided).", ) - - output_group = parser.add_argument_group( 'Output options', 'Options to change verbosity and view of the console output' ) @@ -494,12 +492,14 @@ async def main(): elif args.verbose: log_level = logging.WARNING logger.setLevel(log_level) - + if args.web is not None: from maigret.web.app import app - port = args.web if args.web else 5000 # args.web is either the specified port or 5000 by const - app.run(port=port) + port = ( + args.web if args.web else 5000 + ) # args.web is either the specified port or 5000 by const + app.run(port=port) # Usernames initial list usernames = { diff --git a/maigret/web/app.py b/maigret/web/app.py index a4c1bb5b..1494e6f1 100644 --- a/maigret/web/app.py +++ b/maigret/web/app.py @@ -1,5 +1,14 @@ # app.py -from flask import Flask, render_template, request, send_file, Response, flash, redirect, url_for +from flask import ( + Flask, + render_template, + request, + send_file, + Response, + flash, + redirect, + url_for, +) import logging import os import asyncio @@ -26,17 +35,19 @@ os.makedirs(UPLOAD_FOLDER, exist_ok=True) os.makedirs(REPORTS_FOLDER, exist_ok=True) + def setup_logger(log_level, name): logger = logging.getLogger(name) logger.setLevel(log_level) return logger + async def maigret_search(username, options): logger = setup_logger(logging.WARNING, 'maigret') try: db = MaigretDatabase().load_from_path(MAIGRET_DB_FILE) sites = db.ranked_sites_dict(top=int(options.get('top_sites', 500))) - + results = await maigret.search( username=username, site_dict=sites, @@ -50,6 +61,7 @@ async def maigret_search(username, options): logger.error(f"Error during search: {str(e)}") raise + async def search_multiple_usernames(usernames, options): results = [] for username in usernames: @@ -60,109 +72,140 @@ async def search_multiple_usernames(usernames, options): logging.error(f"Error searching username {username}: {str(e)}") return results + def process_search_task(usernames, options, timestamp): try: # Setup event loop for async operations loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - + # Run the search - general_results = loop.run_until_complete(search_multiple_usernames(usernames, options)) - + general_results = loop.run_until_complete( + search_multiple_usernames(usernames, options) + ) + # Create session folder session_folder = os.path.join(REPORTS_FOLDER, f"search_{timestamp}") os.makedirs(session_folder, exist_ok=True) - + # Save the combined graph graph_path = os.path.join(session_folder, "combined_graph.html") - maigret.report.save_graph_report(graph_path, general_results, MaigretDatabase().load_from_path(MAIGRET_DB_FILE)) - + maigret.report.save_graph_report( + graph_path, + general_results, + MaigretDatabase().load_from_path(MAIGRET_DB_FILE), + ) + # Save individual reports individual_reports = [] for username, id_type, results in general_results: report_base = os.path.join(session_folder, f"report_{username}") - + csv_path = f"{report_base}.csv" json_path = f"{report_base}.json" pdf_path = f"{report_base}.pdf" html_path = f"{report_base}.html" - + context = generate_report_context(general_results) - + maigret.report.save_csv_report(csv_path, username, results) - maigret.report.save_json_report(json_path, username, results, report_type='ndjson') + maigret.report.save_json_report( + json_path, username, results, report_type='ndjson' + ) maigret.report.save_pdf_report(pdf_path, context) maigret.report.save_html_report(html_path, context) claimed_profiles = [] for site_name, site_data in results.items(): - if (site_data.get('status') and - site_data['status'].status == maigret.result.MaigretCheckStatus.CLAIMED): - claimed_profiles.append({ - 'site_name': site_name, - 'url': site_data.get('url_user', ''), - 'tags': site_data.get('status').tags if site_data.get('status') else [] - }) - - individual_reports.append({ - 'username': username, - 'csv_file': os.path.join(f"search_{timestamp}", f"report_{username}.csv"), - 'json_file': os.path.join(f"search_{timestamp}", f"report_{username}.json"), - 'pdf_file': os.path.join(f"search_{timestamp}", f"report_{username}.pdf"), - 'html_file': os.path.join(f"search_{timestamp}", f"report_{username}.html"), - 'claimed_profiles': claimed_profiles, - }) - + if ( + site_data.get('status') + and site_data['status'].status + == maigret.result.MaigretCheckStatus.CLAIMED + ): + claimed_profiles.append( + { + 'site_name': site_name, + 'url': site_data.get('url_user', ''), + 'tags': ( + site_data.get('status').tags + if site_data.get('status') + else [] + ), + } + ) + + individual_reports.append( + { + 'username': username, + 'csv_file': os.path.join( + f"search_{timestamp}", f"report_{username}.csv" + ), + 'json_file': os.path.join( + f"search_{timestamp}", f"report_{username}.json" + ), + 'pdf_file': os.path.join( + f"search_{timestamp}", f"report_{username}.pdf" + ), + 'html_file': os.path.join( + f"search_{timestamp}", f"report_{username}.html" + ), + 'claimed_profiles': claimed_profiles, + } + ) + # Save results and mark job as complete job_results[timestamp] = { 'status': 'completed', 'session_folder': f"search_{timestamp}", 'graph_file': os.path.join(f"search_{timestamp}", "combined_graph.html"), 'usernames': usernames, - 'individual_reports': individual_reports + 'individual_reports': individual_reports, } except Exception as e: - job_results[timestamp] = { - 'status': 'failed', - 'error': str(e) - } + job_results[timestamp] = {'status': 'failed', 'error': str(e)} finally: background_jobs[timestamp]['completed'] = True + @app.route('/') def index(): return render_template('index.html') + @app.route('/search', methods=['POST']) def search(): usernames_input = request.form.get('usernames', '').strip() if not usernames_input: flash('At least one username is required', 'danger') return redirect(url_for('index')) - - usernames = [u.strip() for u in usernames_input.replace(',', ' ').split() if u.strip()] - + + usernames = [ + u.strip() for u in usernames_input.replace(',', ' ').split() if u.strip() + ] + # Create timestamp for this search session timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - + logging.info(f"Starting search for usernames: {usernames}") - + options = { 'top_sites': request.form.get('top_sites', '500'), 'timeout': request.form.get('timeout', '30'), 'id_type': 'username', # fixed as username 'use_cookies': 'use_cookies' in request.form, } - + # Start background job background_jobs[timestamp] = { 'completed': False, - 'thread': Thread(target=process_search_task, args=(usernames, options, timestamp)) + 'thread': Thread( + target=process_search_task, args=(usernames, options, timestamp) + ), } background_jobs[timestamp]['thread'].start() - + logging.info(f"Search job started with timestamp: {timestamp}") - + # Redirect to status page return redirect(url_for('status', timestamp=timestamp)) @@ -170,19 +213,19 @@ def search(): @app.route('/status/') def status(timestamp): logging.info(f"Status check for timestamp: {timestamp}") - + # Validate timestamp if timestamp not in background_jobs: flash('Invalid search session', 'danger') return redirect(url_for('index')) - + # Check if job is completed if background_jobs[timestamp]['completed']: result = job_results.get(timestamp) if not result: flash('No results found for this search session', 'warning') return redirect(url_for('index')) - + if result['status'] == 'completed': # Redirect to results page once done return redirect(url_for('results', session_id=result['session_folder'])) @@ -190,7 +233,7 @@ def status(timestamp): error_msg = result.get('error', 'Unknown error occurred') flash(f'Search failed: {error_msg}', 'danger') return redirect(url_for('index')) - + # If job is still running, show status page with a simple spinner return render_template('status.html', timestamp=timestamp) @@ -200,23 +243,25 @@ def results(session_id): if not session_id.startswith('search_'): flash('Invalid results session format', 'danger') return redirect(url_for('index')) - + result_data = next( - (r for r in job_results.values() - if r.get('status') == 'completed' and r['session_folder'] == session_id), - None + ( + r + for r in job_results.values() + if r.get('status') == 'completed' and r['session_folder'] == session_id + ), + None, ) - - return render_template( 'results.html', usernames=result_data['usernames'], graph_file=result_data['graph_file'], individual_reports=result_data['individual_reports'], - timestamp=session_id.replace('search_', '') + timestamp=session_id.replace('search_', ''), ) + @app.route('/reports/') def download_report(filename): try: @@ -226,9 +271,10 @@ def download_report(filename): logging.error(f"Error serving file {filename}: {str(e)}") return "File not found", 404 + if __name__ == '__main__': logging.basicConfig( level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', ) app.run(debug=True) diff --git a/tests/test_cli.py b/tests/test_cli.py index 16d78a25..60e0d9eb 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -42,6 +42,7 @@ 'use_disabled_sites': False, 'username': [], 'verbose': False, + 'web': 5000, 'with_domains': False, 'xmind': False, } @@ -55,7 +56,8 @@ def test_args_search_mode(argparser): want_args = dict(DEFAULT_ARGS) want_args.update({'username': ['username']}) - assert args == Namespace(**want_args) + for arg in vars(args): + assert getattr(args, arg) == want_args[arg] def test_args_search_mode_several_usernames(argparser): @@ -66,7 +68,8 @@ def test_args_search_mode_several_usernames(argparser): want_args = dict(DEFAULT_ARGS) want_args.update({'username': ['username1', 'username2']}) - assert args == Namespace(**want_args) + for arg in vars(args): + assert getattr(args, arg) == want_args[arg] def test_args_self_check_mode(argparser): @@ -81,7 +84,8 @@ def test_args_self_check_mode(argparser): } ) - assert args == Namespace(**want_args) + for arg in vars(args): + assert getattr(args, arg) == want_args[arg] def test_args_multiple_sites(argparser): @@ -97,4 +101,5 @@ def test_args_multiple_sites(argparser): } ) - assert args == Namespace(**want_args) + for arg in vars(args): + assert getattr(args, arg) == want_args[arg]