Skip to content

Commit

Permalink
Merge pull request #4 from soxoj/main
Browse files Browse the repository at this point in the history
merge upstream
  • Loading branch information
overcuriousity authored Dec 17, 2024
2 parents 0685396 + 97e5f60 commit 7d2c265
Show file tree
Hide file tree
Showing 8 changed files with 212 additions and 102 deletions.
37 changes: 16 additions & 21 deletions maigret/checking.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,7 @@
from . import errors
from .activation import ParsingActivator, import_aiohttp_cookies
from .errors import CheckError
from .executors import (
AsyncExecutor,
AsyncioSimpleExecutor,
AsyncioProgressbarQueueExecutor,
)
from .executors import AsyncioQueueGeneratorExecutor
from .result import MaigretCheckResult, MaigretCheckStatus
from .sites import MaigretDatabase, MaigretSite
from .types import QueryOptions, QueryResultWrapper
Expand Down Expand Up @@ -670,18 +666,13 @@ async def maigret(
await debug_ip_request(clearweb_checker, logger)

# setup parallel executor
executor: Optional[AsyncExecutor] = None
if no_progressbar:
# TODO: switch to AsyncioProgressbarQueueExecutor with progress object mock
executor = AsyncioSimpleExecutor(logger=logger)
else:
executor = AsyncioProgressbarQueueExecutor(
logger=logger,
in_parallel=max_connections,
timeout=timeout + 0.5,
*args,
**kwargs,
)
executor = AsyncioQueueGeneratorExecutor(
logger=logger,
in_parallel=max_connections,
timeout=timeout + 0.5,
*args,
**kwargs,
)

# make options objects for all the requests
options: QueryOptions = {}
Expand Down Expand Up @@ -728,13 +719,17 @@ async def maigret(
},
)

cur_results = await executor.run(tasks_dict.values())

# wait for executor timeout errors
await asyncio.sleep(1)
cur_results = []
with alive_bar(
len(tasks_dict), title="Searching", force_tty=True, disable=no_progressbar
) as progress:
async for result in executor.run(tasks_dict.values()):
cur_results.append(result)
progress()

all_results.update(cur_results)

# rerun for failed sites
sites = get_failed_sites(dict(cur_results))
attempts -= 1

Expand Down
70 changes: 69 additions & 1 deletion maigret/executors.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import sys
import time
from typing import Any, Iterable, List
from typing import Any, Iterable, List, Callable

import alive_progress
from alive_progress import alive_bar
Expand All @@ -19,6 +19,7 @@ def create_task_func():


class AsyncExecutor:
# Deprecated: will be removed soon, don't use it
def __init__(self, *args, **kwargs):
self.logger = kwargs['logger']

Expand All @@ -34,6 +35,7 @@ async def _run(self, tasks: Iterable[QueryDraft]):


class AsyncioSimpleExecutor(AsyncExecutor):
# Deprecated: will be removed soon, don't use it
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.semaphore = asyncio.Semaphore(kwargs.get('in_parallel', 100))
Expand All @@ -48,6 +50,7 @@ async def sem_task(f, args, kwargs):


class AsyncioProgressbarExecutor(AsyncExecutor):
# Deprecated: will be removed soon, don't use it
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

Expand All @@ -71,6 +74,7 @@ async def track_task(task):


class AsyncioProgressbarSemaphoreExecutor(AsyncExecutor):
# Deprecated: will be removed soon, don't use it
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.semaphore = asyncio.Semaphore(kwargs.get('in_parallel', 1))
Expand Down Expand Up @@ -174,3 +178,67 @@ async def _run(self, queries: Iterable[QueryDraft]):
w.cancel()

return self.results


class AsyncioQueueGeneratorExecutor:
# Deprecated: will be removed soon, don't use it
def __init__(self, *args, **kwargs):
self.workers_count = kwargs.get('in_parallel', 10)
self.queue = asyncio.Queue()
self.timeout = kwargs.get('timeout')
self.logger = kwargs['logger']
self._results = asyncio.Queue()
self._stop_signal = object()

async def worker(self):
"""Process tasks from the queue and put results into the results queue."""
while True:
task = await self.queue.get()
if task is self._stop_signal:
self.queue.task_done()
break

try:
f, args, kwargs = task
query_future = f(*args, **kwargs)
query_task = create_task_func()(query_future)

try:
result = await asyncio.wait_for(query_task, timeout=self.timeout)
except asyncio.TimeoutError:
result = kwargs.get('default')
await self._results.put(result)
except Exception as e:
self.logger.error(f"Error in worker: {e}")
finally:
self.queue.task_done()

async def run(self, queries: Iterable[Callable[..., Any]]):
"""Run workers to process queries in parallel."""
start_time = time.time()

# Add tasks to the queue
for t in queries:
await self.queue.put(t)

# Create workers
workers = [
asyncio.create_task(self.worker()) for _ in range(self.workers_count)
]

# Add stop signals
for _ in range(self.workers_count):
await self.queue.put(self._stop_signal)

try:
while any(w.done() is False for w in workers) or not self._results.empty():
try:
result = await asyncio.wait_for(self._results.get(), timeout=1)
yield result
except asyncio.TimeoutError:
pass
finally:
# Ensure all workers are awaited
await asyncio.gather(*workers)
self.execution_time = time.time() - start_time
self.logger.debug(f"Spent time: {self.execution_time}")
4 changes: 3 additions & 1 deletion maigret/maigret.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,9 @@ async def main():
if args.web is not None:
from maigret.web.app import app

port = args.web if args.web else 5000 # args.web is either the specified port or 5000 by default
port = (
args.web if args.web else 5000
) # args.web is either the specified port or 5000 by default
app.run(port=port)
return

Expand Down
159 changes: 85 additions & 74 deletions maigret/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,118 +98,129 @@ class MaigretGraph:
def __init__(self, graph):
self.G = graph

def add_node(self, key, value):
def add_node(self, key, value, color=None):
node_name = f'{key}: {value}'

params = self.other_params
params = dict(self.other_params)
if key in SUPPORTED_IDS:
params = self.username_params
params = dict(self.username_params)
elif value.startswith('http'):
params = self.site_params

self.G.add_node(node_name, title=node_name, **params)

if value != value.lower():
normalized_node_name = self.add_node(key, value.lower())
self.link(node_name, normalized_node_name)
params = dict(self.site_params)

params['title'] = node_name
if color:
params['color'] = color

self.G.add_node(node_name, **params)
return node_name

def link(self, node1_name, node2_name):
self.G.add_edge(node1_name, node2_name, weight=2)


def save_graph_report(filename: str, username_results: list, db: MaigretDatabase):
# moved here to speed up the launch of Maigret
import networkx as nx

G = nx.Graph()
graph = MaigretGraph(G)

for username, id_type, results in username_results:
username_node_name = graph.add_node(id_type, username)
base_site_nodes = {}
site_account_nodes = {}
processed_values = {} # Track processed values to avoid duplicates

for website_name in results:
dictionary = results[website_name]
# TODO: fix no site data issue
if not dictionary:
continue
for username, id_type, results in username_results:
# Add username node, using normalized version directly if different
norm_username = username.lower()
username_node_name = graph.add_node(id_type, norm_username)

if dictionary.get("is_similar"):
for website_name, dictionary in results.items():
if not dictionary or dictionary.get("is_similar"):
continue

status = dictionary.get("status")
if not status: # FIXME: currently in case of timeout
if not status or status.status != MaigretCheckStatus.CLAIMED:
continue

if dictionary["status"].status != MaigretCheckStatus.CLAIMED:
continue
# base site node
site_base_url = website_name
if site_base_url not in base_site_nodes:
base_site_nodes[site_base_url] = graph.add_node('site', site_base_url, color='#28a745') # Green color

site_fallback_name = dictionary.get(
'url_user', f'{website_name}: {username.lower()}'
)
# site_node_name = dictionary.get('url_user', f'{website_name}: {username.lower()}')
site_node_name = graph.add_node('site', site_fallback_name)
graph.link(username_node_name, site_node_name)
site_base_node_name = base_site_nodes[site_base_url]

# account node
account_url = dictionary.get('url_user', f'{site_base_url}/{norm_username}')
account_node_id = f"{site_base_url}: {account_url}"
if account_node_id not in site_account_nodes:
site_account_nodes[account_node_id] = graph.add_node('account', account_url)

account_node_name = site_account_nodes[account_node_id]

# link username → account → site
graph.link(username_node_name, account_node_name)
graph.link(account_node_name, site_base_node_name)

def process_ids(parent_node, ids):
for k, v in ids.items():
if k.endswith('_count') or k.startswith('is_') or k.endswith('_at'):
continue
if k in 'image':
if k.endswith('_count') or k.startswith('is_') or k.endswith('_at') or k in 'image':
continue

v_data = v
if v.startswith('['):
try:
v_data = ast.literal_eval(v)
except Exception as e:
logging.error(e)

# value is a list
if isinstance(v_data, list):
list_node_name = graph.add_node(k, site_fallback_name)
for vv in v_data:
data_node_name = graph.add_node(vv, site_fallback_name)
graph.link(list_node_name, data_node_name)

add_ids = {
a: b for b, a in db.extract_ids_from_url(vv).items()
}
if add_ids:
process_ids(data_node_name, add_ids)
# Normalize value if string
norm_v = v.lower() if isinstance(v, str) else v
value_key = f"{k}:{norm_v}"

if value_key in processed_values:
ids_data_name = processed_values[value_key]
else:
# value is just a string
# ids_data_name = f'{k}: {v}'
# if ids_data_name == parent_node:
# continue

ids_data_name = graph.add_node(k, v)
# G.add_node(ids_data_name, size=10, title=ids_data_name, group=3)
graph.link(parent_node, ids_data_name)

# check for username
if 'username' in k or k in SUPPORTED_IDS:
new_username_node_name = graph.add_node('username', v)
graph.link(ids_data_name, new_username_node_name)
v_data = v
if isinstance(v, str) and v.startswith('['):
try:
v_data = ast.literal_eval(v)
except Exception as e:
logging.error(e)
continue

if isinstance(v_data, list):
list_node_name = graph.add_node(k, site_base_url)
processed_values[value_key] = list_node_name
for vv in v_data:
data_node_name = graph.add_node(vv, site_base_url)
graph.link(list_node_name, data_node_name)

add_ids = {a: b for b, a in db.extract_ids_from_url(vv).items()}
if add_ids:
process_ids(data_node_name, add_ids)
ids_data_name = list_node_name
else:
ids_data_name = graph.add_node(k, norm_v)
processed_values[value_key] = ids_data_name

if 'username' in k or k in SUPPORTED_IDS:
new_username_key = f"username:{norm_v}"
if new_username_key not in processed_values:
new_username_node_name = graph.add_node('username', norm_v)
processed_values[new_username_key] = new_username_node_name
graph.link(ids_data_name, new_username_node_name)

add_ids = {k: v for v, k in db.extract_ids_from_url(v).items()}
if add_ids:
process_ids(ids_data_name, add_ids)

add_ids = {k: v for v, k in db.extract_ids_from_url(v).items()}
if add_ids:
process_ids(ids_data_name, add_ids)
graph.link(parent_node, ids_data_name)

if status.ids_data:
process_ids(site_node_name, status.ids_data)
process_ids(account_node_name, status.ids_data)

nodes_to_remove = []
for node in G.nodes:
if len(str(node)) > 100:
nodes_to_remove.append(node)
# Remove overly long nodes
nodes_to_remove = [node for node in G.nodes if len(str(node)) > 100]
G.remove_nodes_from(nodes_to_remove)

[G.remove_node(node) for node in nodes_to_remove]
# Remove site nodes with only one connection
single_degree_sites = [n for n, deg in G.degree() if n.startswith("site:") and deg <= 1]
G.remove_nodes_from(single_degree_sites)

# moved here to speed up the launch of Maigret
# Generate interactive visualization
from pyvis.network import Network

nt = Network(notebook=True, height="750px", width="100%")
nt.from_nx(G)
nt.show(filename)
Expand Down
Loading

0 comments on commit 7d2c265

Please sign in to comment.