From 5bb45ab125596ce7ed2dd7f7441bfa99a1fbcfdd Mon Sep 17 00:00:00 2001 From: Jan Polonsky Date: Wed, 23 Oct 2024 22:24:21 +0200 Subject: [PATCH] Logging improvement part 2. (Collectors) Fixed: AttributeError: 'WebDriver' object has no attribute 'dispose' Tidy up log types Same logs for each collector (start, finish) moved to one place --- src/collectors/collectors/atom_collector.py | 274 ++- src/collectors/collectors/base_collector.py | 692 ++++--- src/collectors/collectors/email_collector.py | 368 ++-- src/collectors/collectors/rss_collector.py | 426 ++-- .../collectors/scheduled_tasks_collector.py | 127 +- src/collectors/collectors/slack_collector.py | 272 ++- .../collectors/twitter_collector.py | 272 ++- src/collectors/collectors/web_collector.py | 1780 ++++++++--------- src/collectors/managers/collectors_manager.py | 258 +-- 9 files changed, 2216 insertions(+), 2253 deletions(-) diff --git a/src/collectors/collectors/atom_collector.py b/src/collectors/collectors/atom_collector.py index 7925f3a5..06c25b7a 100644 --- a/src/collectors/collectors/atom_collector.py +++ b/src/collectors/collectors/atom_collector.py @@ -1,140 +1,134 @@ -"""Module for Atom collector.""" - -import datetime -import hashlib -import uuid -import traceback -import feedparser -import requests -from bs4 import BeautifulSoup - -from .base_collector import BaseCollector -from managers.log_manager import logger -from shared.schema.news_item import NewsItemData -from shared.schema.parameter import Parameter, ParameterType - - -class AtomCollector(BaseCollector): - """Collector for gathering data from Atom. - - Attributes: - type (str): Type of the collector. - name (str): Name of the collector. - description (str): Description of the collector. - parameters (list): List of parameters required for the collector. - Methods: - collect(source): Collect data from an Atom feed. - - Raises: - Exception: If an error occurs during the collection process. - """ - - type = "ATOM_COLLECTOR" - name = "Atom Collector" - description = "Collector for gathering data from Atom feeds" - - parameters = [ - Parameter(0, "ATOM_FEED_URL", "Atom feed URL", "Full url for Atom feed", ParameterType.STRING), - Parameter(0, "USER_AGENT", "User agent", "Type of user agent", ParameterType.STRING), - Parameter( - 0, - "LINKS_LIMIT", - "Limit for article links", - "OPTIONAL: Maximum number of article links to process. Default: all", - ParameterType.NUMBER, - ), - ] - - parameters.extend(BaseCollector.parameters) - - news_items = [] - - @BaseCollector.ignore_exceptions - def collect(self, source): - """Collect data from Atom feed. - - Parameters: - source -- Source object. - """ - self.collector_source = f"{self.name} '{source.name}':" - BaseCollector.update_last_attempt(source) - feed_url = source.parameter_values["ATOM_FEED_URL"] - user_agent = source.parameter_values["USER_AGENT"] - interval = source.parameter_values["REFRESH_INTERVAL"] # noqa: F841 - links_limit = BaseCollector.read_int_parameter("LINKS_LIMIT", 0, source) - - logger.info(f"{self.collector_source} Starting collector for {feed_url}") - - proxies = {} - if "PROXY_SERVER" in source.parameter_values: - proxy_server = source.parameter_values["PROXY_SERVER"] - if proxy_server.startswith("https://"): - proxies["https"] = proxy_server - elif proxy_server.startswith("http://"): - proxies["http"] = proxy_server - else: - proxies["http"] = "http://" + proxy_server - - try: - if proxies: - atom_xml = requests.get(feed_url, headers={"User-Agent": user_agent}, proxies=proxies) - feed = feedparser.parse(atom_xml.text) - else: - feed = feedparser.parse(feed_url) - - logger.info(f"{self.collector_source} Atom returned feed with {len(feed['entries'])} entries") - - news_items = [] - - count = 0 - for feed_entry in feed["entries"]: - count += 1 - link_for_article = feed_entry["link"] - logger.info(f"{self.collector_source} Visiting article {count}/{len(feed['entries'])}: {link_for_article}") - if proxies: - page = requests.get(link_for_article, headers={"User-Agent": user_agent}, proxies=proxies) - else: - page = requests.get(link_for_article, headers={"User-Agent": user_agent}) - - html_content = page.text - - if html_content: - content = BeautifulSoup(html_content, features="html.parser").text - else: - content = "" - - description = feed_entry["summary"][:500].replace("

", " ") - - # author can exist/miss in header/entry - author = feed_entry["author"] if "author" in feed_entry else "" - for_hash = author + feed_entry["title"] + feed_entry["link"] - - news_item = NewsItemData( - uuid.uuid4(), - hashlib.sha256(for_hash.encode()).hexdigest(), - feed_entry["title"], - description, - feed_url, - feed_entry["link"], - feed_entry["updated"], - author, - datetime.datetime.now(), - content, - source.id, - [], - ) - - news_items.append(news_item) - - if count >= links_limit & links_limit > 0: - logger.info(f"{self.collector_source} Limit for article links reached ({links_limit})") - break - - BaseCollector.publish(news_items, source) - - except Exception as error: - logger.info(f"{self.collector_source} Atom collection exceptionally failed") - BaseCollector.print_exception(source, error) - logger.debug(traceback.format_exc()) - - logger.debug(f"{self.type} collection finished.") +"""Module for Atom collector.""" + +import datetime +import hashlib +import uuid +import traceback +import feedparser +import requests +from bs4 import BeautifulSoup + +from .base_collector import BaseCollector +from managers.log_manager import logger +from shared.schema.news_item import NewsItemData +from shared.schema.parameter import Parameter, ParameterType + + +class AtomCollector(BaseCollector): + """Collector for gathering data from Atom. + + Attributes: + type (str): Type of the collector. + name (str): Name of the collector. + description (str): Description of the collector. + parameters (list): List of parameters required for the collector. + Methods: + collect(source): Collect data from an Atom feed. + + Raises: + Exception: If an error occurs during the collection process. + """ + + type = "ATOM_COLLECTOR" + name = "Atom Collector" + description = "Collector for gathering data from Atom feeds" + + parameters = [ + Parameter(0, "ATOM_FEED_URL", "Atom feed URL", "Full url for Atom feed", ParameterType.STRING), + Parameter(0, "USER_AGENT", "User agent", "Type of user agent", ParameterType.STRING), + Parameter( + 0, + "LINKS_LIMIT", + "Limit for article links", + "OPTIONAL: Maximum number of article links to process. Default: all", + ParameterType.NUMBER, + ), + ] + + parameters.extend(BaseCollector.parameters) + + news_items = [] + + @BaseCollector.ignore_exceptions + def collect(self, source): + """Collect data from Atom feed. + + Parameters: + source -- Source object. + """ + feed_url = source.parameter_values["ATOM_FEED_URL"] + user_agent = source.parameter_values["USER_AGENT"] + interval = source.parameter_values["REFRESH_INTERVAL"] # noqa: F841 + links_limit = BaseCollector.read_int_parameter("LINKS_LIMIT", 0, source) + + logger.info(f"{self.collector_source} Requesting feed URL {feed_url}") + + proxies = {} + if "PROXY_SERVER" in source.parameter_values: + proxy_server = source.parameter_values["PROXY_SERVER"] + if proxy_server.startswith("https://"): + proxies["https"] = proxy_server + elif proxy_server.startswith("http://"): + proxies["http"] = proxy_server + else: + proxies["http"] = "http://" + proxy_server + + try: + if proxies: + atom_xml = requests.get(feed_url, headers={"User-Agent": user_agent}, proxies=proxies) + feed = feedparser.parse(atom_xml.text) + else: + feed = feedparser.parse(feed_url) + + logger.debug(f"{self.collector_source} Atom returned feed with {len(feed['entries'])} entries") + + news_items = [] + + count = 0 + for feed_entry in feed["entries"]: + count += 1 + link_for_article = feed_entry["link"] + logger.info(f"{self.collector_source} Visiting article {count}/{len(feed['entries'])}: {link_for_article}") + if proxies: + page = requests.get(link_for_article, headers={"User-Agent": user_agent}, proxies=proxies) + else: + page = requests.get(link_for_article, headers={"User-Agent": user_agent}) + + html_content = page.text + + if html_content: + content = BeautifulSoup(html_content, features="html.parser").text + else: + content = "" + + description = feed_entry["summary"][:500].replace("

", " ") + + # author can exist/miss in header/entry + author = feed_entry["author"] if "author" in feed_entry else "" + for_hash = author + feed_entry["title"] + feed_entry["link"] + + news_item = NewsItemData( + uuid.uuid4(), + hashlib.sha256(for_hash.encode()).hexdigest(), + feed_entry["title"], + description, + feed_url, + feed_entry["link"], + feed_entry["updated"], + author, + datetime.datetime.now(), + content, + source.id, + [], + ) + + news_items.append(news_item) + + if count >= links_limit & links_limit > 0: + logger.debug(f"{self.collector_source} Limit for article links reached ({links_limit})") + break + + BaseCollector.publish(news_items, source, self.collector_source) + + except Exception as error: + logger.exception(f"{self.collector_source} Collection failed: {error}") diff --git a/src/collectors/collectors/base_collector.py b/src/collectors/collectors/base_collector.py index 49d9d273..786d0cfd 100644 --- a/src/collectors/collectors/base_collector.py +++ b/src/collectors/collectors/base_collector.py @@ -1,350 +1,342 @@ -"""Module for Base collector.""" - -import datetime -import hashlib -import uuid -import bleach -import re -import time -from functools import wraps - -from managers import time_manager -from managers.log_manager import logger -from remote.core_api import CoreApi -from shared.schema import collector, osint_source, news_item -from shared.schema.parameter import Parameter, ParameterType - - -class BaseCollector: - """Base abstract type for all collectors. - - Attributes: - type (str): The type of the collector. - name (str): The name of the collector. - description (str): The description of the collector. - parameters (list): A list of parameters for the collector. - Methods: - __init__(): Initializes the BaseCollector object. - get_info(): Get information about the collector. - update_last_attempt(source): Update the last attempt for a collector. - ignore_exceptions(func): Decorator to wrap scheduled action with exception handling. - print_exception(source, error): Print the exception details. - history(interval): Calculate the limit for retrieving historical data based on the given interval. - filter_by_word_list(news_items, source): Filter news items based on word lists defined in the source. - presanitize_html(html): Clean and sanitize the given HTML by removing certain tags and entities. - sanitize_news_items(news_items, source): Sanitize news items by setting default values for any missing attributes. - publish(news_items, source): Publish the collected news items to the CoreApi. - refresh(): Refresh the OSINT sources for the collector. - """ - - type = "BASE_COLLECTOR" - name = "Base Collector" - description = "Base abstract type for all collectors" - - parameters = [ - Parameter( - 0, "PROXY_SERVER", "Proxy server", "Type SOCKS5 proxy server as username:password@ip:port or ip:port", ParameterType.STRING - ), - Parameter( - 0, - "REFRESH_INTERVAL", - "Refresh interval in minutes (0 to disable)", - "How often is this collector queried for new data", - ParameterType.NUMBER, - ), - ] - - def __init__(self): - """Initialize the BaseCollector object.""" - self.osint_sources = [] - - def get_info(self): - """Get information about the collector. - - Returns: - (dict): A dictionary containing information about the collector. - """ - info_schema = collector.CollectorSchema() - return info_schema.dump(self) - - @staticmethod - def update_last_attempt(source): - """Update the last attempt for a collector. - - Parameters: - source: The source object representing the collector. - """ - response, status_code = CoreApi.update_collector_last_attepmt(source.id) - if status_code != 200: - logger.critical(f"Update last attempt: HTTP {status_code}, response: {response}") - - @staticmethod - def ignore_exceptions(func): - """Wrap scheduled action with exception handling.""" - - @wraps(func) - def wrapper(self, source): - """Handle exceptions during scheduled collector runs. - - Parameters: - source: The source of the collector. - Raises: - Exception: If an unhandled exception occurs during the collector run. - """ - try: - func(self, source) - except Exception as ex: - logger.exception(f"An unhandled exception occurred during scheduled collector run: {ex}") - - return wrapper - - @staticmethod - def print_exception(source, error): - """Print the exception details. - - Parameters: - source (OSINTSource): The OSINT source. - error (Exception): The exception object. - """ - logger.warning(f"OSINTSource name: {source.name}") - if str(error).startswith("b"): - logger.warning(f"ERROR: {error[2:-1]}") - else: - logger.warning(f"ERROR: {error}") - - @staticmethod - def history(interval): - """Calculate the limit for retrieving historical data based on the given interval. - - Parameters: - interval (str or int): The interval for retrieving historical data. It can be a string representing a time unit - (e.g., '1d' for 1 day, '1w' for 1 week) or an integer representing the number of minutes. - Returns: - limit (datetime.datetime): The limit for retrieving historical data. - """ - if interval[0].isdigit() and ":" in interval: - limit = datetime.datetime.now() - datetime.timedelta(days=1) - elif interval[0].isalpha(): - limit = datetime.datetime.now() - datetime.timedelta(weeks=1) - else: - if int(interval) > 60: - hours = int(interval) // 60 - minutes = int(interval) - hours * 60 - limit = datetime.datetime.now() - datetime.timedelta(days=0, hours=hours, minutes=minutes) - else: - limit = datetime.datetime.now() - datetime.timedelta(days=0, hours=0, minutes=int(interval)) - - return limit - - @staticmethod - def filter_by_word_list(news_items, source): - """Filter the given news_items based on the word lists defined in the source. - - Parameters: - news_items (list): A list of news items to be filtered. - source (object): The source object containing word lists. - Returns: - news_items (list): A filtered list of news items based on the word lists. If no word lists are defined, - the original list is returned. - """ - if source.word_lists: - one_word_list = set() - - for word_list in source.word_lists: - if word_list.use_for_stop_words is False: - for category in word_list.categories: - for entry in category.entries: - one_word_list.add(entry.value.lower()) - - filtered_news_items = [] - if one_word_list: - for item in news_items: - for word in one_word_list: - if word in item.title.lower() or word in item.review.lower() or word in item.content.lower(): - filtered_news_items.append(item) - break - - return filtered_news_items - else: - return news_items - else: - return news_items - - @staticmethod - def presanitize_html(html): - """Clean and sanitize the given HTML by removing certain tags and entities. - - Parameters: - html (str): The HTML string to be sanitized. - Returns: - clean (str): The sanitized HTML string. - """ - # these re.sub are not security sensitive ; bleach is supposed to fix the remaining stuff - html = re.sub(r"(?i)( |\xa0)", " ", html, re.DOTALL) - html = re.sub(r"(?i)/]*>.*?/]*>", "", html, re.DOTALL) - html = re.sub(r"(?i)/]*>.*?/]*>", "", html, re.DOTALL) - html = re.sub(r"(?i)/]*>.*?/]*>", "", html, re.DOTALL) - - clean = bleach.clean(html, tags=["p", "b", "i", "b", "u", "pre"], strip=True) - return clean - - @staticmethod - def sanitize_news_items(news_items, source): - """Sanitize the given news_items by setting default values for any missing attributes. - - Parameters: - news_items (list): A list of news items to be sanitized. - source: The source of the news items. - """ - for item in news_items: - if item.id is None: - item.id = uuid.uuid4() - if item.title is None: - item.title = "" - if item.review is None: - item.review = "" - if item.source is None: - item.source = "" - if item.link is None: - item.link = "" - if item.author is None: - item.author = "" - if item.content is None: - item.content = "" - if item.published is None: - item.published = datetime.datetime.now() - if item.collected is None: - item.collected = datetime.datetime.now() - if item.hash is None: - for_hash = item.author + item.title + item.link - item.hash = hashlib.sha256(for_hash.encode()).hexdigest() - if item.osint_source_id is None: - item.osint_source_id = source.id - if item.attributes is None: - item.attributes = [] - item.title = BaseCollector.presanitize_html(item.title) - item.review = BaseCollector.presanitize_html(item.review) - item.content = BaseCollector.presanitize_html(item.content) - item.author = BaseCollector.presanitize_html(item.author) - item.source = BaseCollector.presanitize_html(item.source) # TODO: replace with link sanitizer - item.link = BaseCollector.presanitize_html(item.link) # TODO: replace with link sanitizer - - @staticmethod - def publish(news_items, source): - """Publish the collected news items to the CoreApi. - - Parameters: - news_items (list): A list of news items to be published. - source (object): The source object from which the news items were collected. - """ - logger.debug(f"{source.name}: Collected {len(news_items)} news items") - BaseCollector.sanitize_news_items(news_items, source) - filtered_news_items = BaseCollector.filter_by_word_list(news_items, source) - news_items_schema = news_item.NewsItemDataSchema(many=True) - CoreApi.add_news_items(news_items_schema.dump(filtered_news_items)) - - def refresh(self): - """Refresh the OSINT sources for the collector.""" - time.sleep(30) - logger.info(f"Core API requested a refresh of OSINT sources for {self.type}...") - - # cancel all existing jobs - # TODO: cannot cancel jobs that are running and are scheduled for further in time than 60 seconds - # updating of the configuration needs to be done more gracefully - for source in self.osint_sources: - try: - time_manager.cancel_job(source.scheduler_job) - except Exception: - pass - self.osint_sources = [] - - # get new node configuration - response, code = CoreApi.get_osint_sources(self.type) - logger.debug(f"HTTP {code}: Got the following reply: {response}") - - try: - # if configuration was successfully received - if code == 200 and response is not None: - source_schema = osint_source.OSINTSourceSchemaBase(many=True) - self.osint_sources = source_schema.load(response) - - logger.debug(f"{len(self.osint_sources)} sources loaded for {self.type}") - - # start collection - for source in self.osint_sources: - interval = source.parameter_values["REFRESH_INTERVAL"] - # do not schedule if no interval is set - if interval == "" or interval == "0": - logger.debug(f"disabled '{source.name}'") - continue - - self.run_collector(source) - - # run task every day at XY - if interval[0].isdigit() and ":" in interval: - logger.debug(f"scheduling '{source.name}' at: {interval}") - source.scheduler_job = time_manager.schedule_job_every_day(interval, self.run_collector, source) - # run task at a specific day (XY, ZZ:ZZ:ZZ) - elif interval[0].isalpha(): - interval = interval.split(",") - day = interval[0].strip() - at = interval[1].strip() - logger.debug(f"scheduling '{source.name}' at: {day} {at}") - if day == "Monday": - source.scheduler_job = time_manager.schedule_job_on_monday(at, self.run_collector, source) - elif day == "Tuesday": - source.scheduler_job = time_manager.schedule_job_on_tuesday(at, self.run_collector, source) - elif day == "Wednesday": - source.scheduler_job = time_manager.schedule_job_on_wednesday(at, self.run_collector, source) - elif day == "Thursday": - source.scheduler_job = time_manager.schedule_job_on_thursday(at, self.run_collector, source) - elif day == "Friday": - source.scheduler_job = time_manager.schedule_job_on_friday(at, self.run_collector, source) - elif day == "Saturday": - source.scheduler_job = time_manager.schedule_job_on_saturday(at, self.run_collector, source) - elif day == "Sunday": - source.scheduler_job = time_manager.schedule_job_on_sunday(at, self.run_collector, source) - # run task every XY minutes - else: - logger.debug(f"scheduling '{source.name}' for {interval}") - source.scheduler_job = time_manager.schedule_job_minutes(int(interval), self.run_collector, source) - else: - # TODO: send update to core with the error message - logger.warning(f"configuration not received, code: {code}, response: {response}") - pass - except Exception as error: - logger.exception(f"Refreshing of sources failed: {error}") - pass - - def run_collector(self, source): - """Run the collector on the given source. - - Parameters: - source: The source to collect data from. - """ - runner = self.__class__() # get right type of collector - runner.collect(source) - - def initialize(self): - """Initialize the collector.""" - self.refresh() - - @staticmethod - def read_int_parameter(name, default_value, source): - """Read an integer parameter from a source dictionary. - - Parameters: - name (str): The name of the parameter to read. - default_value (int): The default value to return if the parameter is not found or is not a valid integer. - source (dict): The dictionary containing the parameter values. - Returns: - val (int): The value of the parameter, or the default value if the parameter is not found or is not a valid integer. - """ - try: - val = int(source.parameter_values[name]) - if val <= 0: - val = default_value - except Exception as error: - logger.exception(f"Reading of int parameter failed: {error}") - val = default_value - return val +"""Module for Base collector.""" + +import datetime +import hashlib +import uuid +import bleach +import re +import time +from functools import wraps + +from managers import time_manager +from managers.log_manager import logger +from remote.core_api import CoreApi +from shared.schema import collector, osint_source, news_item +from shared.schema.parameter import Parameter, ParameterType + + +class BaseCollector: + """Base abstract type for all collectors. + + Attributes: + type (str): The type of the collector. + name (str): The name of the collector. + description (str): The description of the collector. + parameters (list): A list of parameters for the collector. + Methods: + __init__(): Initializes the BaseCollector object. + get_info(): Get information about the collector. + update_last_attempt(source): Update the last attempt for a collector. + ignore_exceptions(func): Decorator to wrap scheduled action with exception handling. + history(interval): Calculate the limit for retrieving historical data based on the given interval. + filter_by_word_list(news_items, source): Filter news items based on word lists defined in the source. + presanitize_html(html): Clean and sanitize the given HTML by removing certain tags and entities. + sanitize_news_items(news_items, source): Sanitize news items by setting default values for any missing attributes. + publish(news_items, source): Publish the collected news items to the CoreApi. + refresh(): Refresh the OSINT sources for the collector. + """ + + type = "BASE_COLLECTOR" + name = "Base Collector" + description = "Base abstract type for all collectors" + + parameters = [ + Parameter( + 0, "PROXY_SERVER", "Proxy server", "Type SOCKS5 proxy server as username:password@ip:port or ip:port", ParameterType.STRING + ), + Parameter( + 0, + "REFRESH_INTERVAL", + "Refresh interval in minutes (0 to disable)", + "How often is this collector queried for new data", + ParameterType.NUMBER, + ), + ] + + def __init__(self): + """Initialize the BaseCollector object.""" + self.osint_sources = [] + + def get_info(self): + """Get information about the collector. + + Returns: + (dict): A dictionary containing information about the collector. + """ + info_schema = collector.CollectorSchema() + return info_schema.dump(self) + + @staticmethod + def update_last_attempt(source): + """Update the last attempt for a collector. + + Parameters: + source: The source object representing the collector. + """ + response, status_code = CoreApi.update_collector_last_attepmt(source.id) + if status_code != 200: + logger.critical(f"Update last attempt: HTTP {status_code}, response: {response}") + + @staticmethod + def ignore_exceptions(func): + """Wrap scheduled action with exception handling.""" + + @wraps(func) + def wrapper(self, source): + """Handle exceptions during scheduled collector runs. + + Parameters: + source: The source of the collector. + Raises: + Exception: If an unhandled exception occurs during the collector run. + """ + try: + func(self, source) + except Exception as error: + logger.exception(f"An unhandled exception occurred during scheduled collector run: {error}") + + return wrapper + + @staticmethod + def history(interval): + """Calculate the limit for retrieving historical data based on the given interval. + + Parameters: + interval (str or int): The interval for retrieving historical data. It can be a string representing a time unit + (e.g., '1d' for 1 day, '1w' for 1 week) or an integer representing the number of minutes. + Returns: + limit (datetime.datetime): The limit for retrieving historical data. + """ + if interval[0].isdigit() and ":" in interval: + limit = datetime.datetime.now() - datetime.timedelta(days=1) + elif interval[0].isalpha(): + limit = datetime.datetime.now() - datetime.timedelta(weeks=1) + else: + if int(interval) > 60: + hours = int(interval) // 60 + minutes = int(interval) - hours * 60 + limit = datetime.datetime.now() - datetime.timedelta(days=0, hours=hours, minutes=minutes) + else: + limit = datetime.datetime.now() - datetime.timedelta(days=0, hours=0, minutes=int(interval)) + + return limit + + @staticmethod + def filter_by_word_list(news_items, source): + """Filter the given news_items based on the word lists defined in the source. + + Parameters: + news_items (list): A list of news items to be filtered. + source (object): The source object containing word lists. + Returns: + news_items (list): A filtered list of news items based on the word lists. If no word lists are defined, + the original list is returned. + """ + if source.word_lists: + one_word_list = set() + + for word_list in source.word_lists: + if word_list.use_for_stop_words is False: + for category in word_list.categories: + for entry in category.entries: + one_word_list.add(entry.value.lower()) + + filtered_news_items = [] + if one_word_list: + for item in news_items: + for word in one_word_list: + if word in item.title.lower() or word in item.review.lower() or word in item.content.lower(): + filtered_news_items.append(item) + break + + return filtered_news_items + else: + return news_items + else: + return news_items + + @staticmethod + def presanitize_html(html): + """Clean and sanitize the given HTML by removing certain tags and entities. + + Parameters: + html (str): The HTML string to be sanitized. + Returns: + clean (str): The sanitized HTML string. + """ + # these re.sub are not security sensitive ; bleach is supposed to fix the remaining stuff + html = re.sub(r"(?i)( |\xa0)", " ", html, re.DOTALL) + html = re.sub(r"(?i)/]*>.*?/]*>", "", html, re.DOTALL) + html = re.sub(r"(?i)/]*>.*?/]*>", "", html, re.DOTALL) + html = re.sub(r"(?i)/]*>.*?/]*>", "", html, re.DOTALL) + + clean = bleach.clean(html, tags=["p", "b", "i", "b", "u", "pre"], strip=True) + return clean + + @staticmethod + def sanitize_news_items(news_items, source): + """Sanitize the given news_items by setting default values for any missing attributes. + + Parameters: + news_items (list): A list of news items to be sanitized. + source: The source of the news items. + """ + for item in news_items: + if item.id is None: + item.id = uuid.uuid4() + if item.title is None: + item.title = "" + if item.review is None: + item.review = "" + if item.source is None: + item.source = "" + if item.link is None: + item.link = "" + if item.author is None: + item.author = "" + if item.content is None: + item.content = "" + if item.published is None: + item.published = datetime.datetime.now() + if item.collected is None: + item.collected = datetime.datetime.now() + if item.hash is None: + for_hash = item.author + item.title + item.link + item.hash = hashlib.sha256(for_hash.encode()).hexdigest() + if item.osint_source_id is None: + item.osint_source_id = source.id + if item.attributes is None: + item.attributes = [] + item.title = BaseCollector.presanitize_html(item.title) + item.review = BaseCollector.presanitize_html(item.review) + item.content = BaseCollector.presanitize_html(item.content) + item.author = BaseCollector.presanitize_html(item.author) + item.source = BaseCollector.presanitize_html(item.source) # TODO: replace with link sanitizer + item.link = BaseCollector.presanitize_html(item.link) # TODO: replace with link sanitizer + + @staticmethod + def publish(news_items, source, collector_source): + """Publish the collected news items to the CoreApi. + + Parameters: + news_items (list): A list of news items to be published. + source (object): The source object from which the news items were collected. + collector_source (string): Collector readbale name + """ + logger.debug(f"{collector_source} Collected {len(news_items)} news items") + BaseCollector.sanitize_news_items(news_items, source) + filtered_news_items = BaseCollector.filter_by_word_list(news_items, source) + news_items_schema = news_item.NewsItemDataSchema(many=True) + CoreApi.add_news_items(news_items_schema.dump(filtered_news_items)) + + def refresh(self): + """Refresh the OSINT sources for the collector.""" + time.sleep(30) + logger.info(f"Core API requested a refresh of OSINT sources for {self.name}...") + + # cancel all existing jobs + # TODO: cannot cancel jobs that are running and are scheduled for further in time than 60 seconds + # updating of the configuration needs to be done more gracefully + for source in self.osint_sources: + try: + time_manager.cancel_job(source.scheduler_job) + except Exception: + pass + self.osint_sources = [] + + # get new node configuration + response, code = CoreApi.get_osint_sources(self.type) + # logger.debug(f"HTTP {code}: Got the following reply: {response}") + + try: + # if configuration was successfully received + if code == 200 and response is not None: + source_schema = osint_source.OSINTSourceSchemaBase(many=True) + self.osint_sources = source_schema.load(response) + + logger.debug(f"{self.name}: {len(self.osint_sources)} sources loaded") + + # start collection + for source in self.osint_sources: + interval = source.parameter_values["REFRESH_INTERVAL"] + # do not schedule if no interval is set + if interval == "" or interval == "0": + logger.info(f"{self.name} '{source.name}': Disabled") + continue + + self.run_collector(source) + + # run task every day at XY + if interval[0].isdigit() and ":" in interval: + logger.debug(f"{self.name} '{source.name}': Scheduling at: {interval}") + source.scheduler_job = time_manager.schedule_job_every_day(interval, self.run_collector, source) + # run task at a specific day (XY, ZZ:ZZ:ZZ) + elif interval[0].isalpha(): + interval = interval.split(",") + day = interval[0].strip() + at = interval[1].strip() + logger.debug(f"{self.name} '{source.name}': Scheduling at: {day} {at}") + if day == "Monday": + source.scheduler_job = time_manager.schedule_job_on_monday(at, self.run_collector, source) + elif day == "Tuesday": + source.scheduler_job = time_manager.schedule_job_on_tuesday(at, self.run_collector, source) + elif day == "Wednesday": + source.scheduler_job = time_manager.schedule_job_on_wednesday(at, self.run_collector, source) + elif day == "Thursday": + source.scheduler_job = time_manager.schedule_job_on_thursday(at, self.run_collector, source) + elif day == "Friday": + source.scheduler_job = time_manager.schedule_job_on_friday(at, self.run_collector, source) + elif day == "Saturday": + source.scheduler_job = time_manager.schedule_job_on_saturday(at, self.run_collector, source) + elif day == "Sunday": + source.scheduler_job = time_manager.schedule_job_on_sunday(at, self.run_collector, source) + # run task every XY minutes + else: + logger.debug(f"{self.name} '{source.name}': Scheduling for {interval}") + source.scheduler_job = time_manager.schedule_job_minutes(int(interval), self.run_collector, source) + else: + # TODO: send update to core with the error message + logger.warning(f"configuration not received, code: {code}, response: {response}") + pass + except Exception as error: + logger.exception(f"Refreshing of sources failed: {error}") + pass + + def run_collector(self, source): + """Run the collector on the given source. + + Parameters: + source: The source to collect data from. + """ + runner = self.__class__() # get right type of collector + runner.collector_source = f"{self.name} '{source.name}':" + logger.info(f"{runner.collector_source} Starting collector") + BaseCollector.update_last_attempt(source) + runner.collect(source) + logger.info(f"{runner.collector_source} Collection finished") + + def initialize(self): + """Initialize the collector.""" + self.refresh() + + @staticmethod + def read_int_parameter(name, default_value, source): + """Read an integer parameter from a source dictionary. + + Parameters: + name (str): The name of the parameter to read. + default_value (int): The default value to return if the parameter is not found or is not a valid integer. + source (dict): The dictionary containing the parameter values. + Returns: + val (int): The value of the parameter, or the default value if the parameter is not found or is not a valid integer. + """ + val = default_value + try: + par_val = source.parameter_values[name] + if par_val != "": + val = int(par_val) + if val <= 0: + val = default_value + except Exception as error: + logger.exception(f"Reading of int parameter failed: {error}") + return val diff --git a/src/collectors/collectors/email_collector.py b/src/collectors/collectors/email_collector.py index 538182a4..b9eb2310 100644 --- a/src/collectors/collectors/email_collector.py +++ b/src/collectors/collectors/email_collector.py @@ -1,186 +1,182 @@ -"""Module for email collector.""" - -import datetime -import hashlib -import uuid -import imaplib -import poplib -from email import policy -import email.header -import email.utils -import socket - -from .base_collector import BaseCollector -from managers.log_manager import logger -from shared.schema.news_item import NewsItemData, NewsItemAttribute -from shared.schema.parameter import Parameter, ParameterType - - -class EmailCollector(BaseCollector): - """Collector for gathering data from emails. - - Attributes: - type (str): Type of the collector. - name (str): Name of the collector. - description (str): Description of the collector. - parameters (list): List of parameters required for the collector. - Methods: - collect(source): Collect data from email source. - """ - - type = "EMAIL_COLLECTOR" - name = "EMAIL Collector" - description = "Collector for gathering data from emails" - - parameters = [ - Parameter(0, "EMAIL_SERVER_TYPE", "Email server type", "Server type parameter means IMAP or POP3 email server", ParameterType.STRING), - Parameter(0, "EMAIL_SERVER_HOSTNAME", "Email server hostname", "Hostname of email server", ParameterType.STRING), - Parameter(0, "EMAIL_SERVER_PORT", "Email server port", "Port of email server", ParameterType.NUMBER), - Parameter(0, "EMAIL_USERNAME", "Username", "Username of email account", ParameterType.STRING), - Parameter(0, "EMAIL_PASSWORD", "Password", "Password of email account", ParameterType.STRING), - ] - - parameters.extend(BaseCollector.parameters) - - @BaseCollector.ignore_exceptions - def collect(self, source): - """Collect data from email source. - - Parameters: - source -- Source object. - """ - BaseCollector.update_last_attempt(source) - self.collector_source = f"{self.name} '{source.name}':" - logger.info(f"{self.collector_source} Collecting data from email") - news_items = [] - email_server_type = source.parameter_values["EMAIL_SERVER_TYPE"] - email_server_hostname = source.parameter_values["EMAIL_SERVER_HOSTNAME"] - email_server_port = source.parameter_values["EMAIL_SERVER_PORT"] - email_username = source.parameter_values["EMAIL_USERNAME"] - email_password = source.parameter_values["EMAIL_PASSWORD"] - proxy_server = source.parameter_values["PROXY_SERVER"] - - def proxy_tunnel(): - server = f"{email_server_type.lower()}.{email_server_hostname.lower()}" - port = email_server_port - - server_proxy = proxy_server.rsplit(":", 1)[0] - server_proxy_port = proxy_server.rsplit(":", 1)[-1] - - proxy = (str(server_proxy), int(server_proxy_port)) - con = f"CONNECT {server}:{port} HTTP/1.0\r\nConnection: close\r\n\r\n" - - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.connect(proxy) - s.send(str.encode(con)) - s.recv(4096) - - def get_data(): - review = "" - content = "" - url = "" - link = "" - key = "" - value = "" - - date_tuple = email.utils.parsedate_tz(email_message["Date"]) - local_date = datetime.datetime.fromtimestamp(email.utils.mktime_tz(date_tuple)) - published = f'{str(local_date.strftime("%a, %d %b %Y %H:%M:%S"))}' - - author = str(email.header.make_header(email.header.decode_header(email_message["From"]))) - title = str(email.header.make_header(email.header.decode_header(email_message["Subject"]))) - message_id = str(email.header.make_header(email.header.decode_header(email_message["Message-ID"]))) - - for part in email_message.walk(): - if part.get_content_type() == "text/plain": - content = part.get_payload(decode=True) - review = content[:500].decode("utf-8") - content = content.decode("utf-8") - - for_hash = author + title + message_id - - news_item = NewsItemData( - uuid.uuid4(), - hashlib.sha256(for_hash.encode()).hexdigest(), - title, - review, - url, - link, - published, - author, - datetime.datetime.now(), - content, - source.id, - attributes, - ) - - if part.get_content_maintype() == "multipart": - pass - if part.get("Content-Disposition") is None: - pass - - file_name = part.get_filename() - - if file_name: - - binary_mime_type = part.get_content_type() - binary_value = part.get_payload() - - news_attribute = NewsItemAttribute(uuid.uuid4(), key, value, binary_mime_type, binary_value) - news_item.attributes.append(news_attribute) - - news_items.append(news_item) - - if email_server_type.lower() == "imap": - try: - if proxy_server: - proxy_tunnel() - - connection = imaplib.IMAP4_SSL(f"{email_server_type.lower()}.{email_server_hostname.lower()}", email_server_port) - connection.login(email_username, email_password) - connection.select("inbox") - - result, data = connection.uid("search", None, "UNSEEN") - i = len(data[0].split()) - - for x in range(i): - attributes = [] - latest_email_uid = data[0].split()[x] - result, email_data = connection.uid("fetch", latest_email_uid, "(RFC822)") - raw_email = email_data[0][1] - raw_email_string = raw_email.decode("utf-8") - email_message = email.message_from_string(raw_email_string, policy=policy.default) - - get_data() - - connection.close() - connection.logout() - except Exception as error: - logger.exception(f"{self.collector_source} Failed to fetch emails using IMAP: {error}") - elif email_server_type.lower() == "pop3": - try: - if proxy_server: - proxy_tunnel() - - connection = poplib.POP3_SSL(f"{email_server_type.lower()}.{email_server_hostname.lower()}", email_server_port) - connection.user(email_username) - connection.pass_(email_password) - - num_messages = len(connection.list()[1]) - - for i in range(num_messages): - attributes = [] - - raw_email = b"\n".join(connection.retr(i + 1)[1]) - email_message = email.message_from_bytes(raw_email) - - get_data() - - connection.quit() - except Exception as error: - logger.exception(f"{self.collector_source} Failed to fetch emails using POP3: {error}") - else: - logger.error(f"{self.collector_source} Email server connection type is not supported: {email_server_type}") - - BaseCollector.publish(news_items, source) - logger.info(f"{self.collector_source} Collection finished.") +"""Module for email collector.""" + +import datetime +import hashlib +import uuid +import imaplib +import poplib +from email import policy +import email.header +import email.utils +import socket + +from .base_collector import BaseCollector +from managers.log_manager import logger +from shared.schema.news_item import NewsItemData, NewsItemAttribute +from shared.schema.parameter import Parameter, ParameterType + + +class EmailCollector(BaseCollector): + """Collector for gathering data from emails. + + Attributes: + type (str): Type of the collector. + name (str): Name of the collector. + description (str): Description of the collector. + parameters (list): List of parameters required for the collector. + Methods: + collect(source): Collect data from email source. + """ + + type = "EMAIL_COLLECTOR" + name = "EMAIL Collector" + description = "Collector for gathering data from emails" + + parameters = [ + Parameter(0, "EMAIL_SERVER_TYPE", "Email server type", "Server type parameter means IMAP or POP3 email server", ParameterType.STRING), + Parameter(0, "EMAIL_SERVER_HOSTNAME", "Email server hostname", "Hostname of email server", ParameterType.STRING), + Parameter(0, "EMAIL_SERVER_PORT", "Email server port", "Port of email server", ParameterType.NUMBER), + Parameter(0, "EMAIL_USERNAME", "Username", "Username of email account", ParameterType.STRING), + Parameter(0, "EMAIL_PASSWORD", "Password", "Password of email account", ParameterType.STRING), + ] + + parameters.extend(BaseCollector.parameters) + + @BaseCollector.ignore_exceptions + def collect(self, source): + """Collect data from email source. + + Parameters: + source -- Source object. + """ + news_items = [] + email_server_type = source.parameter_values["EMAIL_SERVER_TYPE"] + email_server_hostname = source.parameter_values["EMAIL_SERVER_HOSTNAME"] + email_server_port = source.parameter_values["EMAIL_SERVER_PORT"] + email_username = source.parameter_values["EMAIL_USERNAME"] + email_password = source.parameter_values["EMAIL_PASSWORD"] + proxy_server = source.parameter_values["PROXY_SERVER"] + + def proxy_tunnel(): + server = f"{email_server_type.lower()}.{email_server_hostname.lower()}" + port = email_server_port + + server_proxy = proxy_server.rsplit(":", 1)[0] + server_proxy_port = proxy_server.rsplit(":", 1)[-1] + + proxy = (str(server_proxy), int(server_proxy_port)) + con = f"CONNECT {server}:{port} HTTP/1.0\r\nConnection: close\r\n\r\n" + + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect(proxy) + s.send(str.encode(con)) + s.recv(4096) + + def get_data(): + review = "" + content = "" + url = "" + link = "" + key = "" + value = "" + + date_tuple = email.utils.parsedate_tz(email_message["Date"]) + local_date = datetime.datetime.fromtimestamp(email.utils.mktime_tz(date_tuple)) + published = f'{str(local_date.strftime("%a, %d %b %Y %H:%M:%S"))}' + + author = str(email.header.make_header(email.header.decode_header(email_message["From"]))) + title = str(email.header.make_header(email.header.decode_header(email_message["Subject"]))) + message_id = str(email.header.make_header(email.header.decode_header(email_message["Message-ID"]))) + + for part in email_message.walk(): + if part.get_content_type() == "text/plain": + content = part.get_payload(decode=True) + review = content[:500].decode("utf-8") + content = content.decode("utf-8") + + for_hash = author + title + message_id + + news_item = NewsItemData( + uuid.uuid4(), + hashlib.sha256(for_hash.encode()).hexdigest(), + title, + review, + url, + link, + published, + author, + datetime.datetime.now(), + content, + source.id, + attributes, + ) + + if part.get_content_maintype() == "multipart": + pass + if part.get("Content-Disposition") is None: + pass + + file_name = part.get_filename() + + if file_name: + + binary_mime_type = part.get_content_type() + binary_value = part.get_payload() + + news_attribute = NewsItemAttribute(uuid.uuid4(), key, value, binary_mime_type, binary_value) + news_item.attributes.append(news_attribute) + + news_items.append(news_item) + + if email_server_type.lower() == "imap": + try: + if proxy_server: + proxy_tunnel() + + connection = imaplib.IMAP4_SSL(f"{email_server_type.lower()}.{email_server_hostname.lower()}", email_server_port) + connection.login(email_username, email_password) + connection.select("inbox") + + result, data = connection.uid("search", None, "UNSEEN") + i = len(data[0].split()) + + for x in range(i): + attributes = [] + latest_email_uid = data[0].split()[x] + result, email_data = connection.uid("fetch", latest_email_uid, "(RFC822)") + raw_email = email_data[0][1] + raw_email_string = raw_email.decode("utf-8") + email_message = email.message_from_string(raw_email_string, policy=policy.default) + + get_data() + + connection.close() + connection.logout() + except Exception as error: + logger.exception(f"{self.collector_source} Fetch emails using IMAP failed: {error}") + elif email_server_type.lower() == "pop3": + try: + if proxy_server: + proxy_tunnel() + + connection = poplib.POP3_SSL(f"{email_server_type.lower()}.{email_server_hostname.lower()}", email_server_port) + connection.user(email_username) + connection.pass_(email_password) + + num_messages = len(connection.list()[1]) + + for i in range(num_messages): + attributes = [] + + raw_email = b"\n".join(connection.retr(i + 1)[1]) + email_message = email.message_from_bytes(raw_email) + + get_data() + + connection.quit() + except Exception as error: + logger.exception(f"{self.collector_source} Fetch emails using POP3 failed: {error}") + else: + logger.error(f"{self.collector_source} Email server connection type is not supported: {email_server_type}") + + BaseCollector.publish(news_items, source, self.collector_source) diff --git a/src/collectors/collectors/rss_collector.py b/src/collectors/collectors/rss_collector.py index 33a973fc..43f52783 100644 --- a/src/collectors/collectors/rss_collector.py +++ b/src/collectors/collectors/rss_collector.py @@ -1,213 +1,213 @@ -"""RSS collector module.""" - -import datetime -import hashlib -import uuid -import re -import socks -import feedparser -import urllib.request -from sockshandler import SocksiPyHandler -from bs4 import BeautifulSoup - -from .base_collector import BaseCollector -from managers.log_manager import logger -from shared.schema.news_item import NewsItemData -from shared.schema.parameter import Parameter, ParameterType - - -class RSSCollector(BaseCollector): - """RSS collector class. - - Arguments: - BaseCollector -- Base collector class. - """ - - type = "RSS_COLLECTOR" - name = "RSS Collector" - description = "Collector for gathering data from RSS and Atom feeds" - - parameters = [ - Parameter(0, "FEED_URL", "Feed URL", "Full URL for RSS or Atom feed", ParameterType.STRING), - Parameter(0, "USER_AGENT", "User agent", "Type of user agent", ParameterType.STRING), - Parameter( - 0, - "LINKS_LIMIT", - "Limit for article links", - "OPTIONAL: Maximum number of article links to process. Default: all", - ParameterType.NUMBER, - ), - ] - - parameters.extend(BaseCollector.parameters) - - news_items = [] - - @BaseCollector.ignore_exceptions - def collect(self, source): - """Collect data from RSS or Atom feed. - - Arguments: - source -- Source object. - """ - self.collector_source = f"{self.name} '{source.name}':" - - def strip_html_tags(html_string): - soup = BeautifulSoup(html_string, "html.parser") - return soup.get_text(separator=" ", strip=True) - - BaseCollector.update_last_attempt(source) - feed_url = source.parameter_values["FEED_URL"] - links_limit = BaseCollector.read_int_parameter("LINKS_LIMIT", 0, source) - - logger.info(f"{self.collector_source} Starting collector for URL: {feed_url}") - - user_agent = source.parameter_values["USER_AGENT"] - if user_agent: - logger.debug(f"{self.collector_source} Using user agent: {user_agent}") - feedparser.USER_AGENT = user_agent - - # use system proxy - proxy_handler = None - opener = urllib.request.urlopen - - if "PROXY_SERVER" in source.parameter_values: - proxy_server = source.parameter_values["PROXY_SERVER"] - - # disable proxy - do not use system proxy - if proxy_server == "none": # WTF? - proxy_handler = urllib.request.ProxyHandler({}) - else: - proxy = re.search(r"^(http|https|socks4|socks5|ftp)://([a-zA-Z0-9\-\.\_]+):(\d+)/?$", proxy_server) - if proxy: - scheme, host, port = proxy.groups() - if scheme in ["http", "https", "ftp"]: - logger.debug(f"{self.collector_source} Using {scheme} proxy: {host}:{port}") - proxy_handler = urllib.request.ProxyHandler( - { - "http": f"{scheme}://{host}:{port}", - "https": f"{scheme}://{host}:{port}", - "ftp": f"{scheme}://{host}:{port}", - } - ) - elif scheme == "socks4": - logger.debug(f"{self.collector_source} Using socks4 proxy: {host}:{port}") - proxy_handler = SocksiPyHandler(socks.SOCKS4, host, int(port)) - elif scheme == "socks5": - logger.debug(f"{self.collector_source} Using socks5 proxy: {host}:{port}") - proxy_handler = SocksiPyHandler(socks.SOCKS5, host, int(port)) - - # use proxy in urllib - if proxy_handler: - opener = urllib.request.build_opener(proxy_handler).open - - try: - if proxy_handler: - logger.debug(f"{self.collector_source} Using proxy for feed collection: {proxy_server}") - feed = feedparser.parse(feed_url, handlers=[proxy_handler]) - else: - feed = feedparser.parse(feed_url) - - logger.debug(f"{self.collector_source} Feed returned {len(feed['entries'])} entries.") - - news_items = [] - - count = 0 - for feed_entry in feed["entries"]: - count += 1 - author = feed_entry.get("author", "") - title = feed_entry.get("title", "") - published = feed_entry.get("published", "") - published_parsed = feed_entry.get("published_parsed", "") - updated = feed_entry.get("updated", "") - updated_parsed = feed_entry.get("updated_parsed", "") - summary = feed_entry.get("summary", "") - content = feed_entry.get("content", "") - date = "" - review = "" - article = "" - link_for_article = feed_entry.get("link", "") - if summary: - review = strip_html_tags(summary[:500]) - if content: - article = strip_html_tags(content[0].get("value", "")) - - if not link_for_article: - logger.debug(f"{self.collector_source} Skipping an empty link in feed entry '{title}'.") - continue - elif not article: - logger.debug(f"{self.collector_source} Visiting an article {count}/{len(feed['entries'])}: {link_for_article}") - html_article = "" - try: - request = urllib.request.Request(link_for_article) - request.add_header("User-Agent", user_agent) - - with opener(request) as response: - html_article = response.read() - - soup = BeautifulSoup(html_article, features="html.parser") - - if html_article: - article_text = [p.text.strip() for p in soup.findAll("p")] - replaced_str = "\xa0" - article_sanit = [w.replace(replaced_str, " ") for w in article_text] - article_sanit = " ".join(article_sanit) - # use HTML article if it is longer than summary - if len(article_sanit) > len(summary): - article = article_sanit - logger.debug(f"{self.collector_source} Got an article: {link_for_article}") - except Exception as error: - logger.exception(f"{self.collector_source} Failed to fetch article: {error}") - - # use summary if article is empty - if summary and not article: - article = strip_html_tags(summary) - logger.debug(f"{self.collector_source} Using summary for article: {article}") - # use first 500 characters of article if summary is empty - elif not summary and article: - review = article[:500] - logger.debug(f"{self.collector_source} Using first 500 characters of article for summary: {review}") - - # use published date if available, otherwise use updated date - if published_parsed: - date = datetime.datetime(*published_parsed[:6]).strftime("%d.%m.%Y - %H:%M") - logger.debug(f"{self.collector_source} Using parsed 'published' date: {date}") - elif updated_parsed: - date = datetime.datetime(*updated_parsed[:6]).strftime("%d.%m.%Y - %H:%M") - logger.debug(f"{self.collector_source} Using parsed 'updated' date: {date}") - elif published: - date = published - logger.debug(f"{self.collector_source} Using 'published' date: {date}") - elif updated: - date = updated - logger.debug(f"{self.collector_source} Using 'updated' date: {date}") - - for_hash = author + title + link_for_article - - news_item = NewsItemData( - uuid.uuid4(), - hashlib.sha256(for_hash.encode()).hexdigest(), - title, - review, - feed_url, - link_for_article, - date, - author, - datetime.datetime.now(), - article, - source.id, - [], - ) - - news_items.append(news_item) - - if count >= links_limit & links_limit > 0: - logger.info(f"{self.collector_source} Limit for article links ({links_limit}) has been reached.") - break - - BaseCollector.publish(news_items, source) - - except Exception as error: - logger.exception(f"{self.collector_source} RSS collection exceptionally failed: {error}") - - logger.info(f"{self.collector_source} Collection finished.") +"""RSS collector module.""" + +import datetime +import hashlib +import uuid +import re +import socks +import feedparser +import urllib.request +from sockshandler import SocksiPyHandler +from bs4 import BeautifulSoup + +from .base_collector import BaseCollector +from managers.log_manager import logger +from shared.schema.news_item import NewsItemData +from shared.schema.parameter import Parameter, ParameterType + + +class RSSCollector(BaseCollector): + """RSS collector class. + + Arguments: + BaseCollector -- Base collector class. + """ + + type = "RSS_COLLECTOR" + name = "RSS Collector" + description = "Collector for gathering data from RSS and Atom feeds" + + parameters = [ + Parameter(0, "FEED_URL", "Feed URL", "Full URL for RSS or Atom feed", ParameterType.STRING), + Parameter(0, "USER_AGENT", "User agent", "Type of user agent", ParameterType.STRING), + Parameter( + 0, + "LINKS_LIMIT", + "Limit for article links", + "OPTIONAL: Maximum number of article links to process. Default: all", + ParameterType.NUMBER, + ), + ] + + parameters.extend(BaseCollector.parameters) + + news_items = [] + + @BaseCollector.ignore_exceptions + def collect(self, source): + """Collect data from RSS or Atom feed. + + Arguments: + source -- Source object. + """ + def strip_html_tags(html_string): + soup = BeautifulSoup(html_string, "html.parser") + return soup.get_text(separator=" ", strip=True) + + feed_url = source.parameter_values["FEED_URL"] + links_limit = BaseCollector.read_int_parameter("LINKS_LIMIT", 0, source) + + logger.info(f"{self.collector_source} Requesting feed URL: {feed_url}") + + user_agent = source.parameter_values["USER_AGENT"] + if user_agent: + logger.debug(f"{self.collector_source} Using user agent: {user_agent}") + feedparser.USER_AGENT = user_agent + + # use system proxy + proxy_handler = None + opener = urllib.request.urlopen + + if "PROXY_SERVER" in source.parameter_values: + proxy_server = source.parameter_values["PROXY_SERVER"] + + # disable proxy - do not use system proxy + if proxy_server == "none": # WTF? + proxy_handler = urllib.request.ProxyHandler({}) + else: + proxy = re.search(r"^(http|https|socks4|socks5|ftp)://([a-zA-Z0-9\-\.\_]+):(\d+)/?$", proxy_server) + if proxy: + scheme, host, port = proxy.groups() + if scheme in ["http", "https", "ftp"]: + logger.debug(f"{self.collector_source} Using {scheme} proxy: {host}:{port}") + proxy_handler = urllib.request.ProxyHandler( + { + "http": f"{scheme}://{host}:{port}", + "https": f"{scheme}://{host}:{port}", + "ftp": f"{scheme}://{host}:{port}", + } + ) + elif scheme == "socks4": + logger.debug(f"{self.collector_source} Using socks4 proxy: {host}:{port}") + proxy_handler = SocksiPyHandler(socks.SOCKS4, host, int(port)) + elif scheme == "socks5": + logger.debug(f"{self.collector_source} Using socks5 proxy: {host}:{port}") + proxy_handler = SocksiPyHandler(socks.SOCKS5, host, int(port)) + + # use proxy in urllib + if proxy_handler: + opener = urllib.request.build_opener(proxy_handler).open + + try: + if proxy_handler: + logger.debug(f"{self.collector_source} Using proxy for feed collection: {proxy_server}") + feed = feedparser.parse(feed_url, handlers=[proxy_handler]) + else: + feed = feedparser.parse(feed_url) + + logger.debug(f"{self.collector_source} Feed returned {len(feed['entries'])} entries.") + + news_items = [] + + count = 0 + for feed_entry in feed["entries"]: + count += 1 + author = feed_entry.get("author", "") + title = feed_entry.get("title", "") + published = feed_entry.get("published", "") + published_parsed = feed_entry.get("published_parsed", "") + updated = feed_entry.get("updated", "") + updated_parsed = feed_entry.get("updated_parsed", "") + summary = feed_entry.get("summary", "") + content = feed_entry.get("content", "") + date = "" + review = "" + article = "" + link_for_article = feed_entry.get("link", "") + if summary: + review = strip_html_tags(summary[:500]) + if content: + article = strip_html_tags(content[0].get("value", "")) + + if not link_for_article: + logger.debug(f"{self.collector_source} Skipping an empty link in feed entry '{title}'.") + continue + elif not article: + logger.info(f"{self.collector_source} Visiting an article {count}/{len(feed['entries'])}: {link_for_article}") + html_article = "" + try: + request = urllib.request.Request(link_for_article) + request.add_header("User-Agent", user_agent) + + with opener(request) as response: + html_article = response.read() + + soup = BeautifulSoup(html_article, features="html.parser") + + if html_article: + article_text = [p.text.strip() for p in soup.findAll("p")] + replaced_str = "\xa0" + article_sanit = [w.replace(replaced_str, " ") for w in article_text] + article_sanit = " ".join(article_sanit) + # use HTML article if it is longer than summary + if len(article_sanit) > len(summary): + article = article_sanit + logger.debug(f"{self.collector_source} Got an article: {link_for_article}") + except Exception as error: + logger.exception(f"{self.collector_source} Fetch article failed: {error}") + + # use summary if article is empty + if summary and not article: + article = strip_html_tags(summary) + logger.debug(f"{self.collector_source} Using summary for article: {article}") + # use first 500 characters of article if summary is empty + elif not summary and article: + review = article[:500] + logger.debug(f"{self.collector_source} Using first 500 characters of article for summary: {review}") + + # use published date if available, otherwise use updated date + if published_parsed: + date = datetime.datetime(*published_parsed[:6]).strftime("%d.%m.%Y - %H:%M") + logger.debug(f"{self.collector_source} Using parsed 'published' date") + elif updated_parsed: + date = datetime.datetime(*updated_parsed[:6]).strftime("%d.%m.%Y - %H:%M") + logger.debug(f"{self.collector_source} Using parsed 'updated' date") + elif published: + date = published + logger.debug(f"{self.collector_source} Using 'published' date") + elif updated: + date = updated + logger.debug(f"{self.collector_source} Using 'updated' date") + + logger.debug(f"{self.collector_source} ... Title : {title}") + logger.debug(f"{self.collector_source} ... Review : {review.replace('\r', '').replace('\n', ' ').strip()[:100]}") + logger.debug(f"{self.collector_source} ... Content : {article.replace('\r', '').replace('\n', ' ').strip()[:100]}") + logger.debug(f"{self.collector_source} ... Published: {date}") + + for_hash = author + title + link_for_article + + news_item = NewsItemData( + uuid.uuid4(), + hashlib.sha256(for_hash.encode()).hexdigest(), + title, + review, + feed_url, + link_for_article, + date, + author, + datetime.datetime.now(), + article, + source.id, + [], + ) + + news_items.append(news_item) + + if count >= links_limit & links_limit > 0: + logger.debug(f"{self.collector_source} Limit for article links ({links_limit}) has been reached.") + break + + BaseCollector.publish(news_items, source, self.collector_source) + + except Exception as error: + logger.exception(f"{self.collector_source} Collection failed: {error}") diff --git a/src/collectors/collectors/scheduled_tasks_collector.py b/src/collectors/collectors/scheduled_tasks_collector.py index 61c87b3d..40864b3f 100644 --- a/src/collectors/collectors/scheduled_tasks_collector.py +++ b/src/collectors/collectors/scheduled_tasks_collector.py @@ -1,63 +1,64 @@ -import datetime -import hashlib -import os -import random -import string -import uuid - -from .base_collector import BaseCollector -from shared.schema.news_item import NewsItemData -from shared.schema.parameter import Parameter, ParameterType - - -class ScheduledTasksCollector(BaseCollector): - type = "SCHEDULED_TASKS_COLLECTOR" - name = "Scheduled tasks Collector" - description = "Collector for collecting scheduled tasks" - - parameters = [Parameter(0, "TASK_TITLE", "Task title", "Title of scheduled task", ParameterType.STRING), - Parameter(0, "TASK_COMMAND", "Task command", "Command which will be executed", ParameterType.STRING), - Parameter(0, "TASK_DESCRIPTION", "Task description", "Description of scheduled task", - ParameterType.STRING) - ] - - parameters.extend(BaseCollector.parameters) - - @BaseCollector.ignore_exceptions - def collect(self, source): - """Collect data from scheduled tasks. - - Arguments: - source -- Source object. - """ - - BaseCollector.update_last_attempt(source) - news_items = [] - head, tail = os.path.split(source.parameter_values['TASK_COMMAND']) - task_title = source.parameter_values['TASK_TITLE'] - - try: - if head == '': - task_command = source.parameter_values['TASK_COMMAND'] - else: - task_command = os.popen('.' + source.parameter_values['TASK_COMMAND']).read() - - preview = source.parameter_values['TASK_DESCRIPTION'] - author = '' - osint_source = 'TaranisNG System' - link = '' - content = task_command - collected = datetime.datetime.now() - published = datetime.datetime.now() - - letters = string.ascii_lowercase - random_string = ''.join(random.choice(letters) for i in range(10)) - - news_item = NewsItemData(uuid.uuid4(), hashlib.sha256(random_string.encode()).hexdigest(), task_title, - preview, osint_source, link, published, author, collected, content, source.id, []) - - news_items.append(news_item) - - BaseCollector.publish(news_items, source) - except Exception as error: - BaseCollector.print_exception(source, error) +import datetime +import hashlib +import os +import random +import string +import uuid + +from .base_collector import BaseCollector +from managers.log_manager import logger +from shared.schema.news_item import NewsItemData +from shared.schema.parameter import Parameter, ParameterType + + +class ScheduledTasksCollector(BaseCollector): + type = "SCHEDULED_TASKS_COLLECTOR" + name = "Scheduled tasks Collector" + description = "Collector for collecting scheduled tasks" + + parameters = [Parameter(0, "TASK_TITLE", "Task title", "Title of scheduled task", ParameterType.STRING), + Parameter(0, "TASK_COMMAND", "Task command", "Command which will be executed", ParameterType.STRING), + Parameter(0, "TASK_DESCRIPTION", "Task description", "Description of scheduled task", + ParameterType.STRING) + ] + + parameters.extend(BaseCollector.parameters) + + @BaseCollector.ignore_exceptions + def collect(self, source): + """Collect data from scheduled tasks. + + Arguments: + source -- Source object. + """ + + news_items = [] + head, tail = os.path.split(source.parameter_values['TASK_COMMAND']) + task_title = source.parameter_values['TASK_TITLE'] + + try: + if head == '': + task_command = source.parameter_values['TASK_COMMAND'] + else: + task_command = os.popen('.' + source.parameter_values['TASK_COMMAND']).read() + + preview = source.parameter_values['TASK_DESCRIPTION'] + author = '' + osint_source = 'TaranisNG System' + link = '' + content = task_command + collected = datetime.datetime.now() + published = datetime.datetime.now() + + letters = string.ascii_lowercase + random_string = ''.join(random.choice(letters) for i in range(10)) + + news_item = NewsItemData(uuid.uuid4(), hashlib.sha256(random_string.encode()).hexdigest(), task_title, + preview, osint_source, link, published, author, collected, content, source.id, []) + + news_items.append(news_item) + + BaseCollector.publish(news_items, source, self.collector_source) + + except Exception as error: + logger.exception(f"{self.collector_source} Collection failed: {error}") diff --git a/src/collectors/collectors/slack_collector.py b/src/collectors/collectors/slack_collector.py index def3b1a6..1b167c26 100644 --- a/src/collectors/collectors/slack_collector.py +++ b/src/collectors/collectors/slack_collector.py @@ -1,137 +1,135 @@ -"""Module for Slack collector.""" - -import datetime -import hashlib -import uuid -import time -import traceback -import socket - -from slack import WebClient -from .base_collector import BaseCollector -from managers.log_manager import logger -from shared.schema.news_item import NewsItemData -from shared.schema.parameter import Parameter, ParameterType - - -# the slackclient project is in maintenance mode now, "slack_sdk" is successor: https://pypi.org/project/slack-sdk/ -class SlackCollector(BaseCollector): - """Collector for gathering data from Slack. - - Attributes: - type (str): Type of the collector. - name (str): Name of the collector. - description (str): Description of the collector. - parameters (list): List of parameters required for the collector. - Methods: - collect(source): Collects data from Slack source. - """ - - type = "SLACK_COLLECTOR" - name = "Slack Collector" - description = "Collector for gathering data from Slack" - - parameters = [ - Parameter(0, "SLACK_API_TOKEN", "Slack API token", "API token for Slack authentication.", ParameterType.STRING), - Parameter(0, "WORKSPACE_CHANNELS_ID", "Collected workspace's channels ID", "Channels which will be collected.", ParameterType.STRING), - ] - - parameters.extend(BaseCollector.parameters) - - @BaseCollector.ignore_exceptions - def collect(self, source): - """Collect data from Slack source. - - Arguments: - source: Source object. - """ - self.collector_source = f"{self.name} '{source.name}':" - BaseCollector.update_last_attempt(source) - news_items = [] - proxy_server = source.parameter_values["PROXY_SERVER"] - - if proxy_server: - - server = "https://slack.com" - port = 443 - - server_proxy = proxy_server.rsplit(":", 1)[0] - server_proxy_port = proxy_server.rsplit(":", 1)[-1] - - try: - proxy = (str(server_proxy), int(server_proxy_port)) - connection = f"CONNECT {server}:{port} HTTP/1.0\r\nConnection: close\r\n\r\n" - - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.connect(proxy) - s.send(str.encode(connection)) - s.recv(4096) - except Exception: - print(f"OSINTSource ID: {source.id}") - print(f"OSINTSource name: {source.name}") - print("Proxy connection failed") - - ids = source.parameter_values["WORKSPACE_CHANNELS_ID"].replace(" ", "") - channels_list = ids.split(",") - - slack_client = WebClient(source.parameter_values["SLACK_API_TOKEN"]) - - try: - for channel_id in channels_list: - logger.info(f"{self.collector_source} Channel: {channel_id}") - channel_info = slack_client.conversations_info(channel=channel_id) - channel_name = channel_info["channel"]["name"] - - # in future we can use parameter "oldest" - Only messages after this Unix timestamp will be included in results - data = slack_client.conversations_history(channel=channel_id, limit=30) - count = 0 - for message in data["messages"]: - count += 1 - logger.debug(f"{self.collector_source} Message: {0}".format(count)) - published = time.ctime(float(message["ts"])) - content = message["text"] - preview = content[:500] - - user_id = message["user"] - user_name = slack_client.users_profile_get(user=user_id) - author = user_name["profile"]["real_name"] - - team_id = message.get("team", "") - if team_id: - team_info = slack_client.team_info(team=team_id) - team_name = team_info["team"]["name"] - else: - team_name = "" - - title = f"Slack post from channel {channel_name}" - if team_name: - title += f" ({team_name})" - link = "" - url = "" - for_hash = user_id + channel_id + content - - logger.debug(f"{self.collector_source} ... Title : {title}") - logger.debug(f"{self.collector_source} ... Content : {content.replace('\r', '').replace('\n', ' ').strip()[:100]}") - logger.debug(f"{self.collector_source} ... Author : {author}") - logger.debug(f"{self.collector_source} ... Published: {published}") - - news_item = NewsItemData( - uuid.uuid4(), - hashlib.sha256(for_hash.encode()).hexdigest(), - title, - preview, - url, - link, - published, - author, - datetime.datetime.now(), - content, - source.id, - [], - ) - news_items.append(news_item) - - BaseCollector.publish(news_items, source) - - except Exception as ex: # noqa F841 - logger.info(f"{self.collector_source} Error: {traceback.format_exc()}") +"""Module for Slack collector.""" + +import datetime +import hashlib +import uuid +import time +import traceback +import socket + +from slack import WebClient +from .base_collector import BaseCollector +from managers.log_manager import logger +from shared.schema.news_item import NewsItemData +from shared.schema.parameter import Parameter, ParameterType + + +# the slackclient project is in maintenance mode now, "slack_sdk" is successor: https://pypi.org/project/slack-sdk/ +class SlackCollector(BaseCollector): + """Collector for gathering data from Slack. + + Attributes: + type (str): Type of the collector. + name (str): Name of the collector. + description (str): Description of the collector. + parameters (list): List of parameters required for the collector. + Methods: + collect(source): Collects data from Slack source. + """ + + type = "SLACK_COLLECTOR" + name = "Slack Collector" + description = "Collector for gathering data from Slack" + + parameters = [ + Parameter(0, "SLACK_API_TOKEN", "Slack API token", "API token for Slack authentication.", ParameterType.STRING), + Parameter(0, "WORKSPACE_CHANNELS_ID", "Collected workspace's channels ID", "Channels which will be collected.", ParameterType.STRING), + ] + + parameters.extend(BaseCollector.parameters) + + @BaseCollector.ignore_exceptions + def collect(self, source): + """Collect data from Slack source. + + Arguments: + source: Source object. + """ + news_items = [] + proxy_server = source.parameter_values["PROXY_SERVER"] + + if proxy_server: + + server = "https://slack.com" + port = 443 + + server_proxy = proxy_server.rsplit(":", 1)[0] + server_proxy_port = proxy_server.rsplit(":", 1)[-1] + + try: + proxy = (str(server_proxy), int(server_proxy_port)) + connection = f"CONNECT {server}:{port} HTTP/1.0\r\nConnection: close\r\n\r\n" + + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect(proxy) + s.send(str.encode(connection)) + s.recv(4096) + except Exception: + print(f"OSINTSource ID: {source.id}") + print(f"OSINTSource name: {source.name}") + print("Proxy connection failed") + + ids = source.parameter_values["WORKSPACE_CHANNELS_ID"].replace(" ", "") + channels_list = ids.split(",") + + slack_client = WebClient(source.parameter_values["SLACK_API_TOKEN"]) + + try: + for channel_id in channels_list: + logger.info(f"{self.collector_source} Channel: {channel_id}") + channel_info = slack_client.conversations_info(channel=channel_id) + channel_name = channel_info["channel"]["name"] + + # in future we can use parameter "oldest" - Only messages after this Unix timestamp will be included in results + data = slack_client.conversations_history(channel=channel_id, limit=30) + count = 0 + for message in data["messages"]: + count += 1 + logger.debug(f"{self.collector_source} Message: {0}".format(count)) + published = time.ctime(float(message["ts"])) + content = message["text"] + preview = content[:500] + + user_id = message["user"] + user_name = slack_client.users_profile_get(user=user_id) + author = user_name["profile"]["real_name"] + + team_id = message.get("team", "") + if team_id: + team_info = slack_client.team_info(team=team_id) + team_name = team_info["team"]["name"] + else: + team_name = "" + + title = f"Slack post from channel {channel_name}" + if team_name: + title += f" ({team_name})" + link = "" + url = "" + for_hash = user_id + channel_id + content + + logger.debug(f"{self.collector_source} ... Title : {title}") + logger.debug(f"{self.collector_source} ... Content : {content.replace('\r', '').replace('\n', ' ').strip()[:100]}") + logger.debug(f"{self.collector_source} ... Author : {author}") + logger.debug(f"{self.collector_source} ... Published: {published}") + + news_item = NewsItemData( + uuid.uuid4(), + hashlib.sha256(for_hash.encode()).hexdigest(), + title, + preview, + url, + link, + published, + author, + datetime.datetime.now(), + content, + source.id, + [], + ) + news_items.append(news_item) + + BaseCollector.publish(news_items, source, self.collector_source) + + except Exception as error: + logger.exception(f"{self.collector_source} Collection failed: {error}") diff --git a/src/collectors/collectors/twitter_collector.py b/src/collectors/collectors/twitter_collector.py index a5552262..8ac156a3 100644 --- a/src/collectors/collectors/twitter_collector.py +++ b/src/collectors/collectors/twitter_collector.py @@ -1,137 +1,135 @@ -"""Module for X collector.""" - -import datetime -import hashlib -import uuid -import tweepy - -from .base_collector import BaseCollector -from managers.log_manager import logger -from shared.schema.news_item import NewsItemData -from shared.schema.parameter import Parameter, ParameterType - - -class TwitterCollector(BaseCollector): - """Collector for gathering data from Twitter. - - Attributes: - type (str): Type of the collector. - name (str): Name of the collector. - description (str): Description of the collector. - parameters (list): List of parameters required for the collector. - Methods: - collect(source): Collect data from a Twitter source. - Raises: - Exception: If an error occurs during the collection process. - """ - - type = "TWITTER_COLLECTOR" - name = "Twitter Collector" - description = "Collector for gathering data from Twitter" - - parameters = [ - Parameter(0, "TWITTER_API_KEY", "Twitter API key", "API key of Twitter account", ParameterType.STRING), - Parameter(0, "TWITTER_API_KEY_SECRET", "Twitter API key secret", "API key secret of Twitter account", ParameterType.STRING), - Parameter(0, "TWITTER_ACCESS_TOKEN", "Twitter access token", "Twitter access token of Twitter account", ParameterType.STRING), - Parameter( - 0, - "TWITTER_ACCESS_TOKEN_SECRET", - "Twitter access token secret", - "Twitter access token secret of Twitter account", - ParameterType.STRING, - ), - Parameter(0, "SEARCH_KEYWORDS", "Search by keywords", "Search tweets by keywords", ParameterType.STRING), - Parameter(0, "SEARCH_HASHTAGS", "Search by hashtags", "Search tweets by hashtags", ParameterType.STRING), - Parameter(0, "NUMBER_OF_TWEETS", "Number of tweets", "How many tweets will be provided", ParameterType.NUMBER), - ] - - parameters.extend(BaseCollector.parameters) - - @BaseCollector.ignore_exceptions - def collect(self, source): - """Collect data from X source. - - Parameters: - source -- Source object. - """ - try: - BaseCollector.update_last_attempt(source) - self.collector_source = f"{self.name} '{source.name}':" - logger.info(f"{self.collector_source} Collecting data from Twitter") - news_items = [] - attributes = [] - - search_keywords = source.parameter_values["SEARCH_KEYWORDS"].replace(" ", "") - keywords_list = search_keywords.split(",") - - search_hashtags = source.parameter_values["SEARCH_HASHTAGS"].replace(" ", "") - hashtags_list = search_hashtags.split(",") - - number_of_tweets = source.parameter_values["NUMBER_OF_TWEETS"] - - twitter_api_key = source.parameter_values["TWITTER_API_KEY"] - twitter_api_key_secret = source.parameter_values["TWITTER_API_KEY_SECRET"] - twitter_access_token = source.parameter_values["TWITTER_ACCESS_TOKEN"] - twitter_access_token_secret = source.parameter_values["TWITTER_ACCESS_TOKEN_SECRET"] - - proxy_server = source.parameter_values["PROXY_SERVER"] - - auth = tweepy.OAuthHandler(twitter_api_key, twitter_api_key_secret) - auth.set_access_token(twitter_access_token, twitter_access_token_secret) - - if proxy_server: - proxy = "socks5://" + proxy_server - api = tweepy.API(auth, proxy=str(proxy), wait_on_rate_limit=True) - else: - api = tweepy.API(auth, wait_on_rate_limit=True) - - if number_of_tweets == "": - number_of_tweets = 100 - - if search_keywords: - public_tweets = tweepy.Cursor(api.search, q=keywords_list).items(int(number_of_tweets)) - elif search_hashtags: - public_tweets = tweepy.Cursor(api.search, q=hashtags_list).items(int(number_of_tweets)) - else: - public_tweets = api.home_timeline(count=number_of_tweets) - - interval = source.parameter_values["REFRESH_INTERVAL"] - - limit = BaseCollector.history(interval) - - for tweet in public_tweets: - - time_to_collect = tweet.created_at - - if time_to_collect > limit: - tweet_id = tweet.id_str - link = f"https://twitter.com/{tweet.user.screen_name}/status/{tweet.id}" - author = tweet.author.name - preview = tweet.text.encode("utf-8") - published = tweet.created_at - title = "Twitter post from " + "@" + author - content = "" - url = "" - - for_hash = author + tweet_id + str(preview) - - news_item = NewsItemData( - uuid.uuid4(), - hashlib.sha256(for_hash.encode()).hexdigest(), - title, - preview, - url, - link, - published, - author, - datetime.datetime.now(), - content, - source.id, - attributes, - ) - - news_items.append(news_item) - - BaseCollector.publish(news_items, source) - except Exception as error: - BaseCollector.print_exception(source, error) +"""Module for X collector.""" + +import datetime +import hashlib +import uuid +import tweepy + +from .base_collector import BaseCollector +from managers.log_manager import logger +from shared.schema.news_item import NewsItemData +from shared.schema.parameter import Parameter, ParameterType + + +class TwitterCollector(BaseCollector): + """Collector for gathering data from Twitter. + + Attributes: + type (str): Type of the collector. + name (str): Name of the collector. + description (str): Description of the collector. + parameters (list): List of parameters required for the collector. + Methods: + collect(source): Collect data from a Twitter source. + Raises: + Exception: If an error occurs during the collection process. + """ + + type = "TWITTER_COLLECTOR" + name = "Twitter Collector" + description = "Collector for gathering data from Twitter" + + parameters = [ + Parameter(0, "TWITTER_API_KEY", "Twitter API key", "API key of Twitter account", ParameterType.STRING), + Parameter(0, "TWITTER_API_KEY_SECRET", "Twitter API key secret", "API key secret of Twitter account", ParameterType.STRING), + Parameter(0, "TWITTER_ACCESS_TOKEN", "Twitter access token", "Twitter access token of Twitter account", ParameterType.STRING), + Parameter( + 0, + "TWITTER_ACCESS_TOKEN_SECRET", + "Twitter access token secret", + "Twitter access token secret of Twitter account", + ParameterType.STRING, + ), + Parameter(0, "SEARCH_KEYWORDS", "Search by keywords", "Search tweets by keywords", ParameterType.STRING), + Parameter(0, "SEARCH_HASHTAGS", "Search by hashtags", "Search tweets by hashtags", ParameterType.STRING), + Parameter(0, "NUMBER_OF_TWEETS", "Number of tweets", "How many tweets will be provided", ParameterType.NUMBER), + ] + + parameters.extend(BaseCollector.parameters) + + @BaseCollector.ignore_exceptions + def collect(self, source): + """Collect data from X source. + + Parameters: + source -- Source object. + """ + try: + news_items = [] + attributes = [] + + search_keywords = source.parameter_values["SEARCH_KEYWORDS"].replace(" ", "") + keywords_list = search_keywords.split(",") + + search_hashtags = source.parameter_values["SEARCH_HASHTAGS"].replace(" ", "") + hashtags_list = search_hashtags.split(",") + + number_of_tweets = source.parameter_values["NUMBER_OF_TWEETS"] + + twitter_api_key = source.parameter_values["TWITTER_API_KEY"] + twitter_api_key_secret = source.parameter_values["TWITTER_API_KEY_SECRET"] + twitter_access_token = source.parameter_values["TWITTER_ACCESS_TOKEN"] + twitter_access_token_secret = source.parameter_values["TWITTER_ACCESS_TOKEN_SECRET"] + + proxy_server = source.parameter_values["PROXY_SERVER"] + + auth = tweepy.OAuthHandler(twitter_api_key, twitter_api_key_secret) + auth.set_access_token(twitter_access_token, twitter_access_token_secret) + + if proxy_server: + proxy = "socks5://" + proxy_server + api = tweepy.API(auth, proxy=str(proxy), wait_on_rate_limit=True) + else: + api = tweepy.API(auth, wait_on_rate_limit=True) + + if number_of_tweets == "": + number_of_tweets = 100 + + if search_keywords: + public_tweets = tweepy.Cursor(api.search, q=keywords_list).items(int(number_of_tweets)) + elif search_hashtags: + public_tweets = tweepy.Cursor(api.search, q=hashtags_list).items(int(number_of_tweets)) + else: + public_tweets = api.home_timeline(count=number_of_tweets) + + interval = source.parameter_values["REFRESH_INTERVAL"] + + limit = BaseCollector.history(interval) + + for tweet in public_tweets: + + time_to_collect = tweet.created_at + + if time_to_collect > limit: + tweet_id = tweet.id_str + link = f"https://twitter.com/{tweet.user.screen_name}/status/{tweet.id}" + author = tweet.author.name + preview = tweet.text.encode("utf-8") + published = tweet.created_at + title = "Twitter post from " + "@" + author + content = "" + url = "" + + for_hash = author + tweet_id + str(preview) + + news_item = NewsItemData( + uuid.uuid4(), + hashlib.sha256(for_hash.encode()).hexdigest(), + title, + preview, + url, + link, + published, + author, + datetime.datetime.now(), + content, + source.id, + attributes, + ) + + news_items.append(news_item) + + BaseCollector.publish(news_items, source, self.collector_source) + + except Exception as error: + logger.exception(f"{self.collector_source} Collection failed: {error}") diff --git a/src/collectors/collectors/web_collector.py b/src/collectors/collectors/web_collector.py index f4b99e32..6a67dabc 100644 --- a/src/collectors/collectors/web_collector.py +++ b/src/collectors/collectors/web_collector.py @@ -1,898 +1,882 @@ -"""Module for Web collector.""" - -import datetime -import hashlib -import subprocess -import uuid -import time -import copy -from selenium import webdriver -from selenium.webdriver.common.keys import Keys -from selenium.webdriver.common.action_chains import ActionChains -from selenium.webdriver.support.ui import WebDriverWait -from selenium.common.exceptions import NoSuchElementException -from selenium.webdriver.support import expected_conditions as EC -from selenium.webdriver.common.by import By -from selenium.webdriver.chrome.options import Options as ChromeOptions -from selenium.webdriver.chrome.service import Service as ChromeService -from selenium.webdriver.firefox.options import Options as FirefoxOptions -from selenium.webdriver.firefox.service import Service as FirefoxService -from urllib.parse import urlparse -import os -import dateparser -import re - -from .base_collector import BaseCollector -from managers.log_manager import logger -from shared.schema.news_item import NewsItemData, NewsItemAttribute -from shared.schema.parameter import Parameter, ParameterType - - -class WebCollector(BaseCollector): - """Collector for gathering data from web page. - - Attributes: - type (str): The type of the collector. - name (str): The name of the collector. - description (str): The description of the collector. - parameters (list): The list of parameters for the collector. - auth_username (str): The username for web page authentication. - auth_password (str): The password for web page authentication. - web_url (str): The URL of the web page. - interpret_as (str): The type of the URL (uri or directory). - user_agent (str): The user agent for the web page. - tor_service (str): The Tor service status. - pagination_limit (int): The maximum number of pages to visit. - links_limit (int): The maximum number of article links to process. - word_limit (int): The limit for the article body. - selectors (dict): The dictionary of selectors for the web page. - web_driver_type (str): The type of the web driver. - client_cert_directory (str): The directory with client's certificates. - proxy (str): The proxy server. - proxy_port (int): The proxy port. - proxy_proto (str): The proxy protocol. - proxy_host (str): The proxy - """ - - type = "WEB_COLLECTOR" - name = "Web Collector" - description = "Collector for gathering data from web page" - - parameters = [ - # base parameters - Parameter(0, "WEB_URL", "Web URL", "Full url for web page or folder of html file", ParameterType.STRING), - # browser options - # TODO: implement ENUM - Parameter(0, "WEBDRIVER", "Name of Webdriver", "Name of webdriver for Selenium (chrome|firefox)", ParameterType.STRING), - # TODO: change to BOOLEAN, implement defaults, default False - Parameter(0, "TOR", "Do you want to use Tor service? Enter Yes or No", "Using Tor service (yes|no)", ParameterType.STRING), - Parameter(0, "USER_AGENT", "User agent", "Set user agent", ParameterType.STRING), - # authentication options - Parameter( - 0, - "AUTH_USERNAME", - "Username for web page authentication", - "Username for authentication with basic auth header", - ParameterType.STRING, - ), - Parameter( - 0, - "AUTH_PASSWORD", - "Password for web page authentication", - "Password for authentication with basic auth header", - ParameterType.STRING, - ), - # TODO reimplement for new web collector - Parameter( - 0, - "CLIENT_CERT_DIR", - "PATH to directory with client's certificates", - "PATH to client's certificates directory", - ParameterType.STRING, - ), - # web page parsing options - # removing popups - Parameter( - 0, - "POPUP_CLOSE_SELECTOR", - "SELECTOR at TITLE PAGE: Popup removal", - "OPTIONAL: For sites with popups, this is a selector of the clickable element (button or a link) for the popup removal button", - ParameterType.STRING, - ), - # navigating the list of articles page by page - Parameter( - 0, - "NEXT_BUTTON_SELECTOR", - "SELECTOR at TITLE PAGE: Next page", - "OPTIONAL: For sites with pagination, this is a selector of the clickable element (button or a link) for the 'next page'", - ParameterType.STRING, - ), - Parameter( - 0, - "LOAD_MORE_BUTTON_SELECTOR", - "SELECTOR at TITLE PAGE: Load more", - "OPTIONAL: For sites with progressive loading, this is a selector of the clickable element" - " (button or a link) for the 'load more'", - ParameterType.STRING, - ), - Parameter( - 0, - "PAGINATION_LIMIT", - "Pagination limit", - "OPTIONAL: For sites with pagination or progressive loading, maximum number of pages to visit." - " Default: 1 (stay on the first page only)", - ParameterType.NUMBER, - ), - # obtaining links to articles (optional) - Parameter( - 0, - "SINGLE_ARTICLE_LINK_SELECTOR", - "SELECTOR at TITLE PAGE: Links to articles", - "Selector that matches the link to the article. Matching results should contain a 'href' attribute.", - ParameterType.STRING, - ), - Parameter( - 0, - "LINKS_LIMIT", - "Limit for article links", - "OPTIONAL: Maximum number of article links to process. Default: all", - ParameterType.NUMBER, - ), - # parsing a single article - Parameter(0, "TITLE_SELECTOR", "SELECTOR at ARTICLE: Article title", "Selector for article title", ParameterType.STRING), - Parameter( - 0, - "ARTICLE_DESCRIPTION_SELECTOR", - "SELECTOR at ARTICLE: short summary", - "OPTIONAL: Selector of article description or summary", - ParameterType.STRING, - ), - Parameter( - 0, - "ARTICLE_FULL_TEXT_SELECTOR", - "SELECTOR at ARTICLE: Article content", - "Selector for the article content / text of the article", - ParameterType.STRING, - ), - Parameter( - 0, "AUTHOR_SELECTOR", "SELECTOR at ARTICLE: Author", "OPTIONAL: Selector to find the author of the post", ParameterType.STRING - ), - Parameter( - 0, "PUBLISHED_SELECTOR", "SELECTOR at ARTICLE: Date published", "OPTIONAL: Selector of the 'published' date", ParameterType.STRING - ), - # TODO reimplement for new web collector - Parameter( - 0, - "ATTACHMENT_SELECTOR", - "SELECTOR at ARTICLE: Attachment selector", - "OPTIONAL: Selector for links to article attachments", - ParameterType.STRING, - ), - Parameter( - 0, - "WORD_LIMIT", - "Limit article body to this many words", - "Collect only first few words of the article (perhaps for legal reasons)", - ParameterType.STRING, - ), - # legacy options, to be studied in more detail or removed - Parameter( - 0, - "ADDITIONAL_ID_SELECTOR", - "SELECTOR at ARTICLE: Additional ID selector", - "OPTIONAL: Selector of an additional article ID", - ParameterType.STRING, - ), - ] - parameters.extend(BaseCollector.parameters) - - @staticmethod - def __get_prefix_and_selector(element_selector): - """Extract the prefix and selector from the given element_selector. - - Parameters: - element_selector (str): The element selector in the format "prefix: selector". - Returns: - tuple: A tuple containing the prefix and selector as separate strings. - """ - selector_split = element_selector.split(":", 1) - if len(selector_split) != 2: - return "", "" - prefix = selector_split[0].strip().lower() - selector = selector_split[1].lstrip() - return prefix, selector - - @staticmethod - def __get_element_locator(element_selector): - """Extract a single element from the headless browser by selector. - - Parameters: - element_selector (str): The selector used to locate the element. - Returns: - locator (tuple): A tuple containing the locator type and the selector. - """ - prefix, selector = WebCollector.__get_prefix_and_selector(element_selector) - - locator = None - if prefix == "id": - locator = (By.ID, selector) - if prefix == "name": - locator = (By.NAME, selector) - elif prefix == "xpath": - locator = (By.XPATH, selector) - elif prefix in ["tag_name", "tag"]: - locator = (By.TAG_NAME, selector) - elif prefix in ["class_name", "class"]: - locator = (By.CLASS_NAME, selector) - elif prefix in ["css_selector", "css"]: - locator = (By.CSS_SELECTOR, selector) - - return locator - - @staticmethod - def __find_element_by(driver, element_selector): - """Extract single element from the headless browser by selector. - - Parameters: - driver: The headless browser driver. - element_selector: The selector used to locate the element. - Returns: - The extracted element. - """ - prefix, selector = WebCollector.__get_prefix_and_selector(element_selector) - - element = None - if prefix == "id": - element = driver.find_element(By.ID, selector) - if prefix == "name": - element = driver.find_element(By.NAME, selector) - elif prefix == "xpath": - element = driver.find_element(By.XPATH, selector) - elif prefix in ["tag_name", "tag"]: - element = driver.find_element(By.TAG_NAME, selector) - elif prefix in ["class_name", "class"]: - element = driver.find_element(By.CLASS_NAME, selector) - elif prefix in ["css_selector", "css"]: - element = driver.find_element(By.CSS_SELECTOR, selector) - - return element - - @staticmethod - def __find_element_text_by(driver, element_selector, return_none=False): - """Find the text of an element identified by the given selector using the provided driver. - - Parameters: - driver: The driver object used to interact with the web page. - element_selector: The selector used to locate the element. - return_none (bool): A boolean indicating whether to return None if the element is not found. - If set to False, an empty string will be returned instead. - Defaults to False. - Returns: - The text of the element if found, otherwise None or an empty string based on the value of return_none. - """ - if return_none: - failure_retval = None - else: - failure_retval = "" - - try: - ret = WebCollector.__find_element_by(driver, element_selector) - if not ret: - return failure_retval - return ret.text - except NoSuchElementException as e: # noqa F841 - return failure_retval - - @staticmethod - def __find_elements_by(driver, element_selector): - """Extract list of elements from the headless browser by selector. - - Parameters: - driver: The headless browser driver. - element_selector: The selector used to locate the elements. - Returns: - A list of elements found using the given selector. - """ - prefix, selector = WebCollector.__get_prefix_and_selector(element_selector) - logger.debug(f"Prefix: {prefix}") - logger.debug(f"Selector: {selector}") - logger.debug(f"Page source code: {driver.page_source}") - - elements = None - if prefix == "id": - elements = [driver.find_element(By.ID, selector)] - if prefix == "name": - elements = driver.find_elements(By.NAME, selector) - elif prefix == "xpath": - elements = driver.find_elements(By.XPATH, selector) - elif prefix in ["tag_name", "tag"]: - elements = driver.find_elements(By.TAG_NAME, selector) - elif prefix in ["class_name", "class"]: - elements = driver.find_elements(By.CLASS_NAME, selector) - elif prefix in ["css_selector", "css"]: - elements = driver.find_elements(By.CSS_SELECTOR, selector) - return elements - - @staticmethod - def __safe_find_elements_by(driver, element_selector): - """Safely find elements by the given element selector using the provided driver. - - Parameters: - driver: The driver object used to interact with the web page. - element_selector: The selector used to locate the elements. - Returns: - A list of elements matching the given selector, or None if no elements are found. - """ - try: - ret = WebCollector.__find_elements_by(driver, element_selector) - if not ret: - return None - return ret - except NoSuchElementException as e: # noqa F841 - return None - - @staticmethod - def __smart_truncate(content, length=500, suffix="..."): - """Truncate the given content to a specified length and adds a suffix if necessary. - - Parameters: - content (str): The content to be truncated. - length (int): The maximum length of the truncated content. Default is 500. - suffix (str): The suffix to be added at the end of the truncated content. Default is "...". - Returns: - (str): The truncated content. - """ - if len(content) <= length: - return content - else: - return " ".join(re.compile(r"\s+").split(content[: length + 1])[0:-1]) + suffix - - @staticmethod - def __wait_for_new_tab(browser, timeout, current_tab): - """Wait for a new tab to open in the browser. - - Parameters: - browser (WebDriver): The browser instance. - timeout (int): The maximum time to wait for a new tab to open, in seconds. - current_tab (str): The current tab handle. - Raises: - TimeoutException: If a new tab does not open within the specified timeout. - """ - yield - WebDriverWait(browser, timeout).until(lambda browser: len(browser.window_handles) != 1) - for tab in browser.window_handles: - if tab != current_tab: - browser.switch_to.window(tab) - return - - def __close_other_tabs(self, browser, handle_to_keep, fallback_url): - """Close all browser tabs except for the specified handle. - - Parameters: - browser (WebDriver): The browser instance. - handle_to_keep (str): The handle of the tab to keep open. - fallback_url (str): The URL to load if tab restoration fails. - Returns: - (bool): True if the tab restoration is successful and the current window handle matches the handle_to_keep, False otherwise. - """ - try: - handles_to_close = copy.copy(browser.window_handles) - for handle_to_close in handles_to_close: - if handle_to_close != handle_to_keep: - browser.switch_to.window(handle_to_close) - browser.close() - # time.sleep(1) - if len(browser.window_handles) == 1: - break - browser.switch_to.window(handle_to_keep) - except Exception as error: - logger.exception(f"{self.collector_source} Browser tab restoration failed, reloading the title page: {error}") - try: - # last resort - at least try to reopen the original page - browser.get(fallback_url) - return True - except Exception as error: - logger.exception(f"{self.collector_source} Fallback to the original page failed: {error}") - return False - return browser.current_window_handle == handle_to_keep - - def __parse_settings(self): - """Load the collector settings to instance variables. - - Returns: - bool: True if the settings were successfully loaded, False otherwise. - """ - self.auth_username = self.source.parameter_values["AUTH_USERNAME"] - self.auth_password = self.source.parameter_values["AUTH_PASSWORD"] - - # parse the URL - web_url = self.source.parameter_values["WEB_URL"] - - if web_url.lower().startswith("file://"): - file_part = web_url[7:] - if os.path.isfile(file_part): - self.interpret_as = "uri" - self.web_url = "file://" + file_part - elif os.path.isdir(file_part): - self.interpret_as = "directory" - self.web_url = file_part - else: - logger.info(f"{self.collector_source} Missing file {web_url}") - return False - - elif re.search(r"^[a-z0-9]+://", web_url.lower()): - self.interpret_as = "uri" - self.web_url = web_url - elif os.path.isfile(web_url): - self.interpret_as = "uri" - self.web_url = f"file://{web_url}" - elif os.path.isdir(web_url): - self.interpret_as = "directory" - self.web_url = web_url - else: - self.interpret_as = "uri" - self.web_url = f"https://{web_url}" - - if self.interpret_as == "uri" and self.auth_username and self.auth_password: - parsed_url = urlparse(self.web_url) - self.web_url = f"{parsed_url.scheme}://{self.auth_username}:{self.auth_password}@{parsed_url.netloc}{parsed_url.path}" - - # parse other arguments - self.user_agent = self.source.parameter_values["USER_AGENT"] - self.tor_service = self.source.parameter_values["TOR"] - - # self.interval = self.source.parameter_values['REFRESH_INTERVAL'] - - self.pagination_limit = BaseCollector.read_int_parameter("PAGINATION_LIMIT", 1, self.source) - self.links_limit = BaseCollector.read_int_parameter("LINKS_LIMIT", 0, self.source) - self.word_limit = BaseCollector.read_int_parameter("WORD_LIMIT", 0, self.source) - - self.selectors = {} - - self.selectors["popup_close"] = self.source.parameter_values["POPUP_CLOSE_SELECTOR"] - self.selectors["next_page"] = self.source.parameter_values["NEXT_BUTTON_SELECTOR"] - self.selectors["load_more"] = self.source.parameter_values["LOAD_MORE_BUTTON_SELECTOR"] - self.selectors["single_article_link"] = self.source.parameter_values["SINGLE_ARTICLE_LINK_SELECTOR"] - - self.selectors["title"] = self.source.parameter_values["TITLE_SELECTOR"] - self.selectors["article_description"] = self.source.parameter_values["ARTICLE_DESCRIPTION_SELECTOR"] - self.selectors["article_full_text"] = self.source.parameter_values["ARTICLE_FULL_TEXT_SELECTOR"] - self.selectors["published"] = self.source.parameter_values["PUBLISHED_SELECTOR"] - self.selectors["author"] = self.source.parameter_values["AUTHOR_SELECTOR"] - self.selectors["attachment"] = self.source.parameter_values["ATTACHMENT_SELECTOR"] - self.selectors["additional_id"] = self.source.parameter_values["ADDITIONAL_ID_SELECTOR"] - - self.web_driver_type = self.source.parameter_values["WEBDRIVER"] - self.client_cert_directory = self.source.parameter_values["CLIENT_CERT_DIR"] - - set_proxy = False - self.proxy = "" - param_proxy = self.source.parameter_values["PROXY_SERVER"] - if re.search(r"^(https?|socks[45])://", param_proxy.lower()): - set_proxy = True - self.proxy = param_proxy - elif re.search(r"^.*:\d+$/", param_proxy.lower()): - set_proxy = True - self.proxy = f"http://{param_proxy}" - - if set_proxy: - results = re.match(r"(https?)://([^/]+)/?$", self.proxy) - if results: - self.proxy_port = 8080 - self.proxy_proto = results.group(1) - self.proxy_host = results.group(2) - results = re.match(r"(https?)://([^/]+):(\d+)/?$", self.proxy) - if results: - self.proxy_proto = results.group(1) - self.proxy_host = results.group(2) - self.proxy_port = results.group(3) - - return True - - def __get_headless_driver_chrome(self): - """Initialize and return Chrome driver. - - Returns: - WebDriver: The initialized Chrome driver. - """ - logger.debug("Initializing Chrome driver...") - - chrome_driver_executable = os.environ.get("SELENIUM_CHROME_DRIVER_PATH", "/usr/bin/chromedriver") - - chrome_options = ChromeOptions() - chrome_options.page_load_strategy = "normal" # .get() returns on document ready - chrome_options.add_argument("start-maximized") - chrome_options.add_argument("disable-infobars") - chrome_options.add_argument("--disable-extensions") - chrome_options.add_argument("--disable-gpu") - chrome_options.add_argument("--disable-dev-shm-usage") - chrome_options.add_argument("--no-sandbox") - chrome_options.add_argument("--headless") - chrome_options.add_argument("--ignore-certificate-errors") - chrome_options.add_argument("--incognito") - chrome_service = ChromeService(executable_path=chrome_driver_executable) - if self.user_agent: - chrome_options.add_argument("user-agent=" + self.user_agent) - if self.tor_service.lower() == "yes": - socks_proxy = "socks5://127.0.0.1:9050" - chrome_options.add_argument(f"--proxy-server={socks_proxy}") - elif self.proxy: - webdriver.DesiredCapabilities.CHROME["proxy"] = { - "proxyType": "manual", - "httpProxy": self.proxy, - "ftpProxy": self.proxy, - "sslProxy": self.proxy, - } - - driver = webdriver.Chrome(service=chrome_service, options=chrome_options) - logger.debug("Chrome driver initialized.") - return driver - - def __get_headless_driver_firefox(self): - """Initialize and return Firefox driver. - - Returns: - WebDriver: The initialized Firefox driver. - """ - logger.debug("Initializing Firefox driver...") - - firefox_driver_executable = os.environ.get("SELENIUM_FIREFOX_DRIVER_PATH", "/usr/local/bin/geckodriver") - - core_url = os.environ.get("TARANIS_NG_CORE_URL", "http://core") - core_url_host = urlparse(core_url).hostname # get only the hostname from URL - - firefox_options = FirefoxOptions() - firefox_options.page_load_strategy = "normal" # .get() returns on document ready - firefox_options.add_argument("--headless") - firefox_options.add_argument("--ignore-certificate-errors") - firefox_options.add_argument("--incognito") - - if self.user_agent: - firefox_options.add_argument(f"user-agent={self.user_agent}") - - if self.tor_service.lower() == "yes": - firefox_options.set_preference("network.proxy.type", 1) # manual proxy config - firefox_options.set_preference("network.proxy.socks", "127.0.0.1") - firefox_options.set_preference("network.proxy.socks_port", 9050) - firefox_options.set_preference("network.proxy.no_proxies_on", f"localhost, ::1, 127.0.0.1, {core_url_host}, 127.0.0.0/8") - - elif self.proxy: - firefox_options.set_preference("network.proxy.type", 1) # manual proxy config - firefox_options.set_preference("network.proxy.http", self.proxy_host) - firefox_options.set_preference("network.proxy.http_port", int(self.proxy_port)) - firefox_options.set_preference("network.proxy.ssl", self.proxy_host) - firefox_options.set_preference("network.proxy.ssl_port", int(self.proxy_port)) - firefox_options.set_preference("network.proxy.ftp", self.proxy) - firefox_options.set_preference("network.proxy.ftp_port", int(self.proxy_port)) - firefox_options.set_preference("network.proxy.no_proxies_on", f"localhost, ::1, 127.0.0.1, {core_url_host}, 127.0.0.0/8") - else: - firefox_options.set_preference("network.proxy.type", 0) # no proxy - - firefox_service = FirefoxService(executable_path=firefox_driver_executable) - driver = webdriver.Firefox(service=firefox_service, options=firefox_options) - - logger.debug("Firefox driver initialized.") - return driver - - def __get_headless_driver(self): - """Initialize and return a headless browser driver. - - Returns: - browser: The headless browser driver - """ - try: - if self.web_driver_type.lower() == "firefox": - browser = self.__get_headless_driver_firefox() - else: - browser = self.__get_headless_driver_chrome() - browser.implicitly_wait(15) # how long to wait for elements when selector doesn't match - return browser - except Exception as error: - logger.exception(f"{self.collector_source} Failed to get headless driver: {error}") - return None - - def __dispose_of_headless_driver(self, driver): - """Destroy the headless browser driver, and its browser. - - Parameters: - driver: The headless browser driver to be disposed of. - """ - try: - driver.close() - except Exception as error: - logger.exception(f"{self.collector_source} Could not close the headless browser driver: {error}") - pass - try: - driver.quit() - except Exception as error: - logger.exception(f"{self.collector_source} Could not quit the headless browser driver: {error}") - pass - try: - driver.dispose() - except Exception as error: - logger.exception(f"{self.collector_source} Could not dispose the headless browser driver: {error}") - pass - - def __run_tor(self): - """Run The Onion Router service in a subprocess.""" - logger.info(f"{self.collector_source} Initializing TOR") - subprocess.Popen(["tor"]) - time.sleep(3) - - @BaseCollector.ignore_exceptions - def collect(self, source): - """Collect news items from this source (main function). - - Parameters: - source (Source): The source to collect news items from. - """ - self.collector_source = f"{self.name} '{source.name}':" - BaseCollector.update_last_attempt(source) - self.source = source - logger.info(f"{self.collector_source} Starting collector") - - self.__parse_settings() - self.news_items = [] - - if self.tor_service.lower() == "yes": - self.__run_tor() - - if self.interpret_as == "uri": - result, message, total_processed_articles, total_failed_articles = self.__browse_title_page(self.web_url) - - elif self.interpret_as == "directory": - logger.info(f"{self.collector_source} Searching for html files in {self.web_url}") - for file_name in os.listdir(self.web_url): - if file_name.lower().endswith(".html"): - html_file = f"file://{self.web_url}/{file_name}" - result, message = self.__browse_title_page(html_file) - - def __browse_title_page(self, index_url): - """Spawn a browser, download the title page for parsing, call parser. - - Parameters: - index_url (str): The URL of the title page. - """ - browser = self.__get_headless_driver() - if browser is None: - logger.info(f"{self.collector_source} Error initializing the headless browser") - return False, "Error initializing the headless browser", 0, 0 - - logger.info(f"{self.collector_source} Requesting title page: {self.web_url}") - try: - browser.get(index_url) - except Exception as error: - logger.exception(f"{self.collector_source} Error obtaining title page: {error}") - self.__dispose_of_headless_driver(browser) - return False, "Error obtaining title page", 0, 0 - - # if there is a popup selector, click on it! - if self.selectors["popup_close"]: - popup = None - try: - popup = WebDriverWait(browser, 10).until( - EC.presence_of_element_located(self.__get_element_locator(self.selectors["popup_close"])) - ) - except Exception as error: - logger.exception(f"{self.collector_source} Popup find error: {error}") - if popup is not None: - try: - popup.click() - except Exception as error: - logger.exception(f"{self.collector_source} Popup click error: {error}") - - # if there is a "load more" selector, click on it! - page = 1 - while self.selectors["load_more"] and page < self.pagination_limit: - try: - load_more = WebDriverWait(browser, 5).until( - EC.element_to_be_clickable(self.__get_element_locator(self.selectors["load_more"])) - ) - # TODO: check for None - - try: - action = ActionChains(browser) - action.move_to_element(load_more) - load_more.click() - except Exception: - browser.execute_script("arguments[0].scrollIntoView(true);", load_more) - load_more.click() - - try: - WebDriverWait(browser, 5).until(EC.staleness_of(load_more)) - except Exception: - pass - - except Exception: - break - page += 1 - - title_page_handle = browser.current_window_handle - total_processed_articles, total_failed_articles = 0, 0 - while True: - try: - processed_articles, failed_articles = self.__process_title_page_articles(browser, title_page_handle, index_url) - - total_processed_articles += processed_articles - total_failed_articles += failed_articles - - # safety cleanup - if not self.__close_other_tabs(browser, title_page_handle, fallback_url=index_url): - logger.error(f"{self.collector_source} Error during page crawl (after-crawl clean up)") - break - except Exception as error: - logger.exception() - logger.error(f"{self.collector_source} Error during page crawl (exception): {error}") - break - - if page >= self.pagination_limit or not self.selectors["next_page"]: - if self.pagination_limit > 1: - logger.info(f"{self.collector_source} Page limit reached") - break - - # visit next page of results - page += 1 - logger.info(f"{self.collector_source} Clicking 'next page'") - try: - next_page = self.__find_element_by(browser, self.selectors["next_page"]) - # TODO: check for None - ActionChains(browser).move_to_element(next_page).click(next_page).perform() - except Exception: - logger.info(f"{self.collector_source} This was the last page") - break - - self.__dispose_of_headless_driver(browser) - BaseCollector.publish(self.news_items, self.source) - - return True, "", total_processed_articles, total_failed_articles - - def __process_title_page_articles(self, browser, title_page_handle, index_url): - """Parse the title page for articles. - - Parameters: - browser (WebDriver): The browser instance. - title_page_handle (str): The handle of the title page tab. - index_url (str): The URL of the title page. - Returns: - (processed_articles, failed_articles) (tuple): A tuple containing the number of processed articles and - the number of failed articles. - """ - processed_articles, failed_articles = 0, 0 - article_items = self.__safe_find_elements_by(browser, self.selectors["single_article_link"]) - if article_items is None: - logger.info(f"{self.collector_source} Invalid page or incorrect selector for article items") - return 0, 0 - - index_url_just_before_click = browser.current_url - - count = 0 - # print(browser.page_source, flush=True) - for item in article_items: - count += 1 - # try: - # print("H: {0} {1:.200}".format(count, item.get_attribute('outerHTML')), flush=True) - # except Exception as ex: - # pass - # if first item works but next items have problems - it's because this: - # https://www.selenium.dev/documentation/webdriver/troubleshooting/errors/#stale-element-reference-exception - link = None - try: - link = item.get_attribute("href") - if link is None: # don't continue, it will crash in current situation - logger.info(f"{self.collector_source} Warning: no link for article {count}/{len(article_items)}") - continue - logger.info(f"{self.collector_source} Visiting article {count}/{len(article_items)}: {link}") - except Exception: - logger.info(f"{self.collector_source} Failed to get link for article {count}/{len(article_items)}") - continue - - click_method = 1 # TODO: some day, make this user-configurable with tri-state enum - if click_method == 1: - browser.switch_to.new_window("tab") - browser.get(link) - elif click_method == 2: - browser.move_to_element(item) - ActionChains(browser).key_down(Keys.CONTROL).click(item).key_up(Keys.CONTROL).perform() - self.__wait_for_new_tab(browser, 15, title_page_handle) - elif click_method == 3: - browser.move_to_element(item) - item.send_keys(Keys.CONTROL + Keys.RETURN) - self.__wait_for_new_tab(browser, 15, title_page_handle) - time.sleep(1) - - try: - news_item = self.__process_article_page(index_url, browser) - if news_item: - logger.debug(f"{self.collector_source} ... Title : {news_item.title}") - logger.debug( - f"{self.collector_source} ... Review : {news_item.review.replace('\r', '').replace('\n', ' ').strip()[:100]}" - ) - logger.debug( - f"{self.collector_source} ... Content : {news_item.content.replace('\r', '').replace('\n', ' ').strip()[:100]}" - ) - logger.debug(f"{self.collector_source} ... Author : {news_item.author}") - logger.debug(f"{self.collector_source} ... Published: {news_item.published}") - self.news_items.append(news_item) - else: - logger.info(f"{self.collector_source} Failed to parse an article") - except Exception as error: - logger.exception() - logger.error(f"{self.collector_source} Failed to parse an article (exception): {error}") - - if len(browser.window_handles) == 1: - back_clicks = 1 - while browser.current_url != index_url_just_before_click: - browser.back() - back_clicks += 1 - if back_clicks > 3: - logger.info(f"{self.collector_source} Error during page crawl (cannot restore window after crawl)") - elif not self.__close_other_tabs(browser, title_page_handle, fallback_url=index_url): - logger.info(f"{self.collector_source} Error during page crawl (after-crawl clean up)") - break - if count >= self.links_limit & self.links_limit > 0: - logger.info(f"{self.collector_source} Limit for article links reached ({self.links_limit})") - break - - return processed_articles, failed_articles - - def __process_article_page(self, index_url, browser): - """Parse a single article. - - Parameters: - index_url (str): The URL of the title page. - browser (WebDriver): The browser instance. - Returns: - news_item (NewsItemData): The parsed news item. - """ - current_url = browser.current_url - - title = self.__find_element_text_by(browser, self.selectors["title"]) - - article_full_text = self.__find_element_text_by(browser, self.selectors["article_full_text"]) - if self.word_limit > 0: - article_full_text = " ".join(re.compile(r"\s+").split(article_full_text)[: self.word_limit]) - - if self.selectors["article_description"]: - article_description = self.__find_element_text_by(browser, self.selectors["article_description"]) - else: - article_description = "" - if self.word_limit > 0: - article_description = " ".join(re.compile(r"\s+").split(article_description)[: self.word_limit]) - if not article_description: - article_description = self.__smart_truncate(article_full_text) - - published_str = self.__find_element_text_by(browser, self.selectors["published"]) - if not published_str: - published_str = "today" - published = dateparser.parse(published_str, settings={"DATE_ORDER": "DMY"}) - published_str = published.strftime("%Y-%m-%d %H:%M") # remove microseconds/seconds from the screen, looks ugly - - link = current_url - - author = self.__find_element_text_by(browser, self.selectors["author"]) - - for_hash = author + title + article_description - news_item = NewsItemData( - uuid.uuid4(), - hashlib.sha256(for_hash.encode()).hexdigest(), - title, - article_description, - self.web_url, - link, - published_str, - author, - datetime.datetime.now(), - article_full_text, - self.source.id, - [], - ) - - if self.selectors["additional_id"]: - value = self.__find_element_text_by(browser, self.selectors["additional_id"]) - if value: - key = "Additional_ID" - binary_mime_type = "" - binary_value = "" - attribute = NewsItemAttribute(uuid.uuid4(), key, value, binary_mime_type, binary_value) - news_item.attributes.append(attribute) - return news_item +"""Module for Web collector.""" + +import datetime +import hashlib +import subprocess +import uuid +import time +import copy +from selenium import webdriver +from selenium.webdriver.common.keys import Keys +from selenium.webdriver.common.action_chains import ActionChains +from selenium.webdriver.support.ui import WebDriverWait +from selenium.common.exceptions import NoSuchElementException +from selenium.webdriver.support import expected_conditions as EC +from selenium.webdriver.common.by import By +from selenium.webdriver.chrome.options import Options as ChromeOptions +from selenium.webdriver.chrome.service import Service as ChromeService +from selenium.webdriver.firefox.options import Options as FirefoxOptions +from selenium.webdriver.firefox.service import Service as FirefoxService +from urllib.parse import urlparse +import os +import dateparser +import re + +from .base_collector import BaseCollector +from managers.log_manager import logger +from shared.schema.news_item import NewsItemData, NewsItemAttribute +from shared.schema.parameter import Parameter, ParameterType + + +class WebCollector(BaseCollector): + """Collector for gathering data from web page. + + Attributes: + type (str): The type of the collector. + name (str): The name of the collector. + description (str): The description of the collector. + parameters (list): The list of parameters for the collector. + auth_username (str): The username for web page authentication. + auth_password (str): The password for web page authentication. + web_url (str): The URL of the web page. + interpret_as (str): The type of the URL (uri or directory). + user_agent (str): The user agent for the web page. + tor_service (str): The Tor service status. + pagination_limit (int): The maximum number of pages to visit. + links_limit (int): The maximum number of article links to process. + word_limit (int): The limit for the article body. + selectors (dict): The dictionary of selectors for the web page. + web_driver_type (str): The type of the web driver. + client_cert_directory (str): The directory with client's certificates. + proxy (str): The proxy server. + proxy_port (int): The proxy port. + proxy_proto (str): The proxy protocol. + proxy_host (str): The proxy + """ + + type = "WEB_COLLECTOR" + name = "Web Collector" + description = "Collector for gathering data from web page" + + parameters = [ + # base parameters + Parameter(0, "WEB_URL", "Web URL", "Full url for web page or folder of html file", ParameterType.STRING), + # browser options + # TODO: implement ENUM + Parameter(0, "WEBDRIVER", "Name of Webdriver", "Name of webdriver for Selenium (chrome|firefox)", ParameterType.STRING), + # TODO: change to BOOLEAN, implement defaults, default False + Parameter(0, "TOR", "Do you want to use Tor service? Enter Yes or No", "Using Tor service (yes|no)", ParameterType.STRING), + Parameter(0, "USER_AGENT", "User agent", "Set user agent", ParameterType.STRING), + # authentication options + Parameter( + 0, + "AUTH_USERNAME", + "Username for web page authentication", + "Username for authentication with basic auth header", + ParameterType.STRING, + ), + Parameter( + 0, + "AUTH_PASSWORD", + "Password for web page authentication", + "Password for authentication with basic auth header", + ParameterType.STRING, + ), + # TODO reimplement for new web collector + Parameter( + 0, + "CLIENT_CERT_DIR", + "PATH to directory with client's certificates", + "PATH to client's certificates directory", + ParameterType.STRING, + ), + # web page parsing options + # removing popups + Parameter( + 0, + "POPUP_CLOSE_SELECTOR", + "SELECTOR at TITLE PAGE: Popup removal", + "OPTIONAL: For sites with popups, this is a selector of the clickable element (button or a link) for the popup removal button", + ParameterType.STRING, + ), + # navigating the list of articles page by page + Parameter( + 0, + "NEXT_BUTTON_SELECTOR", + "SELECTOR at TITLE PAGE: Next page", + "OPTIONAL: For sites with pagination, this is a selector of the clickable element (button or a link) for the 'next page'", + ParameterType.STRING, + ), + Parameter( + 0, + "LOAD_MORE_BUTTON_SELECTOR", + "SELECTOR at TITLE PAGE: Load more", + "OPTIONAL: For sites with progressive loading, this is a selector of the clickable element" + " (button or a link) for the 'load more'", + ParameterType.STRING, + ), + Parameter( + 0, + "PAGINATION_LIMIT", + "Pagination limit", + "OPTIONAL: For sites with pagination or progressive loading, maximum number of pages to visit." + " Default: 1 (stay on the first page only)", + ParameterType.NUMBER, + ), + # obtaining links to articles (optional) + Parameter( + 0, + "SINGLE_ARTICLE_LINK_SELECTOR", + "SELECTOR at TITLE PAGE: Links to articles", + "Selector that matches the link to the article. Matching results should contain a 'href' attribute.", + ParameterType.STRING, + ), + Parameter( + 0, + "LINKS_LIMIT", + "Limit for article links", + "OPTIONAL: Maximum number of article links to process. Default: all", + ParameterType.NUMBER, + ), + # parsing a single article + Parameter(0, "TITLE_SELECTOR", "SELECTOR at ARTICLE: Article title", "Selector for article title", ParameterType.STRING), + Parameter( + 0, + "ARTICLE_DESCRIPTION_SELECTOR", + "SELECTOR at ARTICLE: short summary", + "OPTIONAL: Selector of article description or summary", + ParameterType.STRING, + ), + Parameter( + 0, + "ARTICLE_FULL_TEXT_SELECTOR", + "SELECTOR at ARTICLE: Article content", + "Selector for the article content / text of the article", + ParameterType.STRING, + ), + Parameter( + 0, "AUTHOR_SELECTOR", "SELECTOR at ARTICLE: Author", "OPTIONAL: Selector to find the author of the post", ParameterType.STRING + ), + Parameter( + 0, "PUBLISHED_SELECTOR", "SELECTOR at ARTICLE: Date published", "OPTIONAL: Selector of the 'published' date", ParameterType.STRING + ), + # TODO reimplement for new web collector + Parameter( + 0, + "ATTACHMENT_SELECTOR", + "SELECTOR at ARTICLE: Attachment selector", + "OPTIONAL: Selector for links to article attachments", + ParameterType.STRING, + ), + Parameter( + 0, + "WORD_LIMIT", + "Limit article body to this many words", + "Collect only first few words of the article (perhaps for legal reasons)", + ParameterType.STRING, + ), + # legacy options, to be studied in more detail or removed + Parameter( + 0, + "ADDITIONAL_ID_SELECTOR", + "SELECTOR at ARTICLE: Additional ID selector", + "OPTIONAL: Selector of an additional article ID", + ParameterType.STRING, + ), + ] + parameters.extend(BaseCollector.parameters) + + @staticmethod + def __get_prefix_and_selector(element_selector): + """Extract the prefix and selector from the given element_selector. + + Parameters: + element_selector (str): The element selector in the format "prefix: selector". + Returns: + tuple: A tuple containing the prefix and selector as separate strings. + """ + selector_split = element_selector.split(":", 1) + if len(selector_split) != 2: + return "", "" + prefix = selector_split[0].strip().lower() + selector = selector_split[1].lstrip() + return prefix, selector + + @staticmethod + def __get_element_locator(element_selector): + """Extract a single element from the headless browser by selector. + + Parameters: + element_selector (str): The selector used to locate the element. + Returns: + locator (tuple): A tuple containing the locator type and the selector. + """ + prefix, selector = WebCollector.__get_prefix_and_selector(element_selector) + + locator = None + if prefix == "id": + locator = (By.ID, selector) + if prefix == "name": + locator = (By.NAME, selector) + elif prefix == "xpath": + locator = (By.XPATH, selector) + elif prefix in ["tag_name", "tag"]: + locator = (By.TAG_NAME, selector) + elif prefix in ["class_name", "class"]: + locator = (By.CLASS_NAME, selector) + elif prefix in ["css_selector", "css"]: + locator = (By.CSS_SELECTOR, selector) + + return locator + + @staticmethod + def __find_element_by(driver, element_selector): + """Extract single element from the headless browser by selector. + + Parameters: + driver: The headless browser driver. + element_selector: The selector used to locate the element. + Returns: + The extracted element. + """ + prefix, selector = WebCollector.__get_prefix_and_selector(element_selector) + + element = None + if prefix == "id": + element = driver.find_element(By.ID, selector) + if prefix == "name": + element = driver.find_element(By.NAME, selector) + elif prefix == "xpath": + element = driver.find_element(By.XPATH, selector) + elif prefix in ["tag_name", "tag"]: + element = driver.find_element(By.TAG_NAME, selector) + elif prefix in ["class_name", "class"]: + element = driver.find_element(By.CLASS_NAME, selector) + elif prefix in ["css_selector", "css"]: + element = driver.find_element(By.CSS_SELECTOR, selector) + + return element + + @staticmethod + def __find_element_text_by(driver, element_selector, return_none=False): + """Find the text of an element identified by the given selector using the provided driver. + + Parameters: + driver: The driver object used to interact with the web page. + element_selector: The selector used to locate the element. + return_none (bool): A boolean indicating whether to return None if the element is not found. + If set to False, an empty string will be returned instead. + Defaults to False. + Returns: + The text of the element if found, otherwise None or an empty string based on the value of return_none. + """ + if return_none: + failure_retval = None + else: + failure_retval = "" + + try: + ret = WebCollector.__find_element_by(driver, element_selector) + if not ret: + return failure_retval + return ret.text + except NoSuchElementException as e: # noqa F841 + return failure_retval + + @staticmethod + def __find_elements_by(driver, element_selector): + """Extract list of elements from the headless browser by selector. + + Parameters: + driver: The headless browser driver. + element_selector: The selector used to locate the elements. + Returns: + A list of elements found using the given selector. + """ + prefix, selector = WebCollector.__get_prefix_and_selector(element_selector) + # logger.debug(f"Prefix: {prefix}") + # logger.debug(f"Selector: {selector}") + # logger.debug(f"Page source code: {driver.page_source}") + + elements = None + if prefix == "id": + elements = [driver.find_element(By.ID, selector)] + if prefix == "name": + elements = driver.find_elements(By.NAME, selector) + elif prefix == "xpath": + elements = driver.find_elements(By.XPATH, selector) + elif prefix in ["tag_name", "tag"]: + elements = driver.find_elements(By.TAG_NAME, selector) + elif prefix in ["class_name", "class"]: + elements = driver.find_elements(By.CLASS_NAME, selector) + elif prefix in ["css_selector", "css"]: + elements = driver.find_elements(By.CSS_SELECTOR, selector) + return elements + + @staticmethod + def __safe_find_elements_by(driver, element_selector): + """Safely find elements by the given element selector using the provided driver. + + Parameters: + driver: The driver object used to interact with the web page. + element_selector: The selector used to locate the elements. + Returns: + A list of elements matching the given selector, or None if no elements are found. + """ + try: + ret = WebCollector.__find_elements_by(driver, element_selector) + if not ret: + return None + return ret + except NoSuchElementException as e: # noqa F841 + return None + + @staticmethod + def __smart_truncate(content, length=500, suffix="..."): + """Truncate the given content to a specified length and adds a suffix if necessary. + + Parameters: + content (str): The content to be truncated. + length (int): The maximum length of the truncated content. Default is 500. + suffix (str): The suffix to be added at the end of the truncated content. Default is "...". + Returns: + (str): The truncated content. + """ + if len(content) <= length: + return content + else: + return " ".join(re.compile(r"\s+").split(content[: length + 1])[0:-1]) + suffix + + @staticmethod + def __wait_for_new_tab(browser, timeout, current_tab): + """Wait for a new tab to open in the browser. + + Parameters: + browser (WebDriver): The browser instance. + timeout (int): The maximum time to wait for a new tab to open, in seconds. + current_tab (str): The current tab handle. + Raises: + TimeoutException: If a new tab does not open within the specified timeout. + """ + yield + WebDriverWait(browser, timeout).until(lambda browser: len(browser.window_handles) != 1) + for tab in browser.window_handles: + if tab != current_tab: + browser.switch_to.window(tab) + return + + def __close_other_tabs(self, browser, handle_to_keep, fallback_url): + """Close all browser tabs except for the specified handle. + + Parameters: + browser (WebDriver): The browser instance. + handle_to_keep (str): The handle of the tab to keep open. + fallback_url (str): The URL to load if tab restoration fails. + Returns: + (bool): True if the tab restoration is successful and the current window handle matches the handle_to_keep, False otherwise. + """ + try: + handles_to_close = copy.copy(browser.window_handles) + for handle_to_close in handles_to_close: + if handle_to_close != handle_to_keep: + browser.switch_to.window(handle_to_close) + browser.close() + # time.sleep(1) + if len(browser.window_handles) == 1: + break + browser.switch_to.window(handle_to_keep) + except Exception as error: + logger.exception(f"{self.collector_source} Browser tab restoration failed, reloading the title page: {error}") + try: + # last resort - at least try to reopen the original page + browser.get(fallback_url) + return True + except Exception as error: + logger.exception(f"{self.collector_source} Fallback to the original page failed: {error}") + return False + return browser.current_window_handle == handle_to_keep + + def __parse_settings(self): + """Load the collector settings to instance variables. + + Returns: + bool: True if the settings were successfully loaded, False otherwise. + """ + self.auth_username = self.source.parameter_values["AUTH_USERNAME"] + self.auth_password = self.source.parameter_values["AUTH_PASSWORD"] + + # parse the URL + web_url = self.source.parameter_values["WEB_URL"] + + if web_url.lower().startswith("file://"): + file_part = web_url[7:] + if os.path.isfile(file_part): + self.interpret_as = "uri" + self.web_url = "file://" + file_part + elif os.path.isdir(file_part): + self.interpret_as = "directory" + self.web_url = file_part + else: + logger.warning(f"{self.collector_source} Missing file {web_url}") + return False + + elif re.search(r"^[a-z0-9]+://", web_url.lower()): + self.interpret_as = "uri" + self.web_url = web_url + elif os.path.isfile(web_url): + self.interpret_as = "uri" + self.web_url = f"file://{web_url}" + elif os.path.isdir(web_url): + self.interpret_as = "directory" + self.web_url = web_url + else: + self.interpret_as = "uri" + self.web_url = f"https://{web_url}" + + if self.interpret_as == "uri" and self.auth_username and self.auth_password: + parsed_url = urlparse(self.web_url) + self.web_url = f"{parsed_url.scheme}://{self.auth_username}:{self.auth_password}@{parsed_url.netloc}{parsed_url.path}" + + # parse other arguments + self.user_agent = self.source.parameter_values["USER_AGENT"] + self.tor_service = self.source.parameter_values["TOR"] + + # self.interval = self.source.parameter_values['REFRESH_INTERVAL'] + + self.pagination_limit = BaseCollector.read_int_parameter("PAGINATION_LIMIT", 1, self.source) + self.links_limit = BaseCollector.read_int_parameter("LINKS_LIMIT", 0, self.source) + self.word_limit = BaseCollector.read_int_parameter("WORD_LIMIT", 0, self.source) + + self.selectors = {} + + self.selectors["popup_close"] = self.source.parameter_values["POPUP_CLOSE_SELECTOR"] + self.selectors["next_page"] = self.source.parameter_values["NEXT_BUTTON_SELECTOR"] + self.selectors["load_more"] = self.source.parameter_values["LOAD_MORE_BUTTON_SELECTOR"] + self.selectors["single_article_link"] = self.source.parameter_values["SINGLE_ARTICLE_LINK_SELECTOR"] + + self.selectors["title"] = self.source.parameter_values["TITLE_SELECTOR"] + self.selectors["article_description"] = self.source.parameter_values["ARTICLE_DESCRIPTION_SELECTOR"] + self.selectors["article_full_text"] = self.source.parameter_values["ARTICLE_FULL_TEXT_SELECTOR"] + self.selectors["published"] = self.source.parameter_values["PUBLISHED_SELECTOR"] + self.selectors["author"] = self.source.parameter_values["AUTHOR_SELECTOR"] + self.selectors["attachment"] = self.source.parameter_values["ATTACHMENT_SELECTOR"] + self.selectors["additional_id"] = self.source.parameter_values["ADDITIONAL_ID_SELECTOR"] + + self.web_driver_type = self.source.parameter_values["WEBDRIVER"] + self.client_cert_directory = self.source.parameter_values["CLIENT_CERT_DIR"] + + set_proxy = False + self.proxy = "" + param_proxy = self.source.parameter_values["PROXY_SERVER"] + if re.search(r"^(https?|socks[45])://", param_proxy.lower()): + set_proxy = True + self.proxy = param_proxy + elif re.search(r"^.*:\d+$/", param_proxy.lower()): + set_proxy = True + self.proxy = f"http://{param_proxy}" + + if set_proxy: + results = re.match(r"(https?)://([^/]+)/?$", self.proxy) + if results: + self.proxy_port = 8080 + self.proxy_proto = results.group(1) + self.proxy_host = results.group(2) + results = re.match(r"(https?)://([^/]+):(\d+)/?$", self.proxy) + if results: + self.proxy_proto = results.group(1) + self.proxy_host = results.group(2) + self.proxy_port = results.group(3) + + return True + + def __get_headless_driver_chrome(self): + """Initialize and return Chrome driver. + + Returns: + WebDriver: The initialized Chrome driver. + """ + logger.debug("Initializing Chrome driver...") + + chrome_driver_executable = os.environ.get("SELENIUM_CHROME_DRIVER_PATH", "/usr/bin/chromedriver") + + chrome_options = ChromeOptions() + chrome_options.page_load_strategy = "normal" # .get() returns on document ready + chrome_options.add_argument("start-maximized") + chrome_options.add_argument("disable-infobars") + chrome_options.add_argument("--disable-extensions") + chrome_options.add_argument("--disable-gpu") + chrome_options.add_argument("--disable-dev-shm-usage") + chrome_options.add_argument("--no-sandbox") + chrome_options.add_argument("--headless") + chrome_options.add_argument("--ignore-certificate-errors") + chrome_options.add_argument("--incognito") + chrome_service = ChromeService(executable_path=chrome_driver_executable) + if self.user_agent: + chrome_options.add_argument("user-agent=" + self.user_agent) + if self.tor_service.lower() == "yes": + socks_proxy = "socks5://127.0.0.1:9050" + chrome_options.add_argument(f"--proxy-server={socks_proxy}") + elif self.proxy: + webdriver.DesiredCapabilities.CHROME["proxy"] = { + "proxyType": "manual", + "httpProxy": self.proxy, + "ftpProxy": self.proxy, + "sslProxy": self.proxy, + } + + driver = webdriver.Chrome(service=chrome_service, options=chrome_options) + logger.debug("Chrome driver initialized.") + return driver + + def __get_headless_driver_firefox(self): + """Initialize and return Firefox driver. + + Returns: + WebDriver: The initialized Firefox driver. + """ + logger.debug("Initializing Firefox driver...") + + firefox_driver_executable = os.environ.get("SELENIUM_FIREFOX_DRIVER_PATH", "/usr/local/bin/geckodriver") + + core_url = os.environ.get("TARANIS_NG_CORE_URL", "http://core") + core_url_host = urlparse(core_url).hostname # get only the hostname from URL + + firefox_options = FirefoxOptions() + firefox_options.page_load_strategy = "normal" # .get() returns on document ready + firefox_options.add_argument("--headless") + firefox_options.add_argument("--ignore-certificate-errors") + firefox_options.add_argument("--incognito") + + if self.user_agent: + firefox_options.add_argument(f"user-agent={self.user_agent}") + + if self.tor_service.lower() == "yes": + firefox_options.set_preference("network.proxy.type", 1) # manual proxy config + firefox_options.set_preference("network.proxy.socks", "127.0.0.1") + firefox_options.set_preference("network.proxy.socks_port", 9050) + firefox_options.set_preference("network.proxy.no_proxies_on", f"localhost, ::1, 127.0.0.1, {core_url_host}, 127.0.0.0/8") + + elif self.proxy: + firefox_options.set_preference("network.proxy.type", 1) # manual proxy config + firefox_options.set_preference("network.proxy.http", self.proxy_host) + firefox_options.set_preference("network.proxy.http_port", int(self.proxy_port)) + firefox_options.set_preference("network.proxy.ssl", self.proxy_host) + firefox_options.set_preference("network.proxy.ssl_port", int(self.proxy_port)) + firefox_options.set_preference("network.proxy.ftp", self.proxy) + firefox_options.set_preference("network.proxy.ftp_port", int(self.proxy_port)) + firefox_options.set_preference("network.proxy.no_proxies_on", f"localhost, ::1, 127.0.0.1, {core_url_host}, 127.0.0.0/8") + else: + firefox_options.set_preference("network.proxy.type", 0) # no proxy + + firefox_service = FirefoxService(executable_path=firefox_driver_executable) + driver = webdriver.Firefox(service=firefox_service, options=firefox_options) + + logger.debug("Firefox driver initialized.") + return driver + + def __get_headless_driver(self): + """Initialize and return a headless browser driver. + + Returns: + browser: The headless browser driver + """ + try: + if self.web_driver_type.lower() == "firefox": + browser = self.__get_headless_driver_firefox() + else: + browser = self.__get_headless_driver_chrome() + browser.implicitly_wait(15) # how long to wait for elements when selector doesn't match + return browser + except Exception as error: + logger.exception(f"{self.collector_source} Get headless driver failed: {error}") + return None + + def __dispose_of_headless_driver(self, driver): + """Destroy the headless browser driver, and its browser. + + Parameters: + driver: The headless browser driver to be disposed of. + """ + try: + driver.close() + except Exception as error: + logger.exception(f"{self.collector_source} Could not close the headless browser driver: {error}") + pass + try: + driver.quit() + except Exception as error: + logger.exception(f"{self.collector_source} Could not quit the headless browser driver: {error}") + pass + + def __run_tor(self): + """Run The Onion Router service in a subprocess.""" + logger.info(f"{self.collector_source} Initializing TOR") + subprocess.Popen(["tor"]) + time.sleep(3) + + @BaseCollector.ignore_exceptions + def collect(self, source): + """Collect news items from this source (main function). + + Parameters: + source (Source): The source to collect news items from. + """ + self.source = source + self.__parse_settings() + self.news_items = [] + + if self.tor_service.lower() == "yes": + self.__run_tor() + + if self.interpret_as == "uri": + result, message, total_processed_articles, total_failed_articles = self.__browse_title_page(self.web_url) + + elif self.interpret_as == "directory": + logger.info(f"{self.collector_source} Searching for html files in {self.web_url}") + for file_name in os.listdir(self.web_url): + if file_name.lower().endswith(".html"): + html_file = f"file://{self.web_url}/{file_name}" + result, message = self.__browse_title_page(html_file) + + def __browse_title_page(self, index_url): + """Spawn a browser, download the title page for parsing, call parser. + + Parameters: + index_url (str): The URL of the title page. + """ + browser = self.__get_headless_driver() + if browser is None: + logger.error(f"{self.collector_source} Error initializing the headless browser") + return False, "Error initializing the headless browser", 0, 0 + + logger.info(f"{self.collector_source} Requesting title page: {self.web_url}") + try: + browser.get(index_url) + except Exception as error: + logger.exception(f"{self.collector_source} Obtaining title page failed: {error}") + self.__dispose_of_headless_driver(browser) + return False, "Error obtaining title page", 0, 0 + + # if there is a popup selector, click on it! + if self.selectors["popup_close"]: + popup = None + try: + popup = WebDriverWait(browser, 10).until( + EC.presence_of_element_located(self.__get_element_locator(self.selectors["popup_close"])) + ) + except Exception as error: + logger.exception(f"{self.collector_source} Popup find failed: {error}") + if popup is not None: + try: + popup.click() + except Exception as error: + logger.exception(f"{self.collector_source} Popup click failed: {error}") + + # if there is a "load more" selector, click on it! + page = 1 + while self.selectors["load_more"] and page < self.pagination_limit: + try: + load_more = WebDriverWait(browser, 5).until( + EC.element_to_be_clickable(self.__get_element_locator(self.selectors["load_more"])) + ) + # TODO: check for None + + try: + action = ActionChains(browser) + action.move_to_element(load_more) + load_more.click() + except Exception: + browser.execute_script("arguments[0].scrollIntoView(true);", load_more) + load_more.click() + + try: + WebDriverWait(browser, 5).until(EC.staleness_of(load_more)) + except Exception: + pass + + except Exception: + break + page += 1 + + title_page_handle = browser.current_window_handle + total_processed_articles, total_failed_articles = 0, 0 + while True: + try: + processed_articles, failed_articles = self.__process_title_page_articles(browser, title_page_handle, index_url) + + total_processed_articles += processed_articles + total_failed_articles += failed_articles + + # safety cleanup + if not self.__close_other_tabs(browser, title_page_handle, fallback_url=index_url): + logger.error(f"{self.collector_source} Page crawl failed (after-crawl clean up)") + break + except Exception as error: + logger.exception(f"{self.collector_source} Page crawl failed: {error}") + break + + if page >= self.pagination_limit or not self.selectors["next_page"]: + if self.pagination_limit > 1: + logger.info(f"{self.collector_source} Page limit reached") + break + + # visit next page of results + page += 1 + logger.info(f"{self.collector_source} Clicking 'next page'") + try: + next_page = self.__find_element_by(browser, self.selectors["next_page"]) + # TODO: check for None + ActionChains(browser).move_to_element(next_page).click(next_page).perform() + except Exception: + logger.info(f"{self.collector_source} This was the last page") + break + + self.__dispose_of_headless_driver(browser) + BaseCollector.publish(self.news_items, self.source, self.collector_source) + + return True, "", total_processed_articles, total_failed_articles + + def __process_title_page_articles(self, browser, title_page_handle, index_url): + """Parse the title page for articles. + + Parameters: + browser (WebDriver): The browser instance. + title_page_handle (str): The handle of the title page tab. + index_url (str): The URL of the title page. + Returns: + (processed_articles, failed_articles) (tuple): A tuple containing the number of processed articles and + the number of failed articles. + """ + processed_articles, failed_articles = 0, 0 + article_items = self.__safe_find_elements_by(browser, self.selectors["single_article_link"]) + if article_items is None: + logger.warning(f"{self.collector_source} Invalid page or incorrect selector for article items") + return 0, 0 + + index_url_just_before_click = browser.current_url + + count = 0 + # print(browser.page_source, flush=True) + for item in article_items: + count += 1 + # try: + # print("H: {0} {1:.200}".format(count, item.get_attribute('outerHTML')), flush=True) + # except Exception as ex: + # pass + # if first item works but next items have problems - it's because this: + # https://www.selenium.dev/documentation/webdriver/troubleshooting/errors/#stale-element-reference-exception + link = None + try: + link = item.get_attribute("href") + if link is None: # don't continue, it will crash in current situation + logger.warning(f"{self.collector_source} No link for article {count}/{len(article_items)}") + continue + logger.info(f"{self.collector_source} Visiting article {count}/{len(article_items)}: {link}") + except Exception: + logger.warning(f"{self.collector_source} Failed to get link for article {count}/{len(article_items)}") + continue + + click_method = 1 # TODO: some day, make this user-configurable with tri-state enum + if click_method == 1: + browser.switch_to.new_window("tab") + browser.get(link) + elif click_method == 2: + browser.move_to_element(item) + ActionChains(browser).key_down(Keys.CONTROL).click(item).key_up(Keys.CONTROL).perform() + self.__wait_for_new_tab(browser, 15, title_page_handle) + elif click_method == 3: + browser.move_to_element(item) + item.send_keys(Keys.CONTROL + Keys.RETURN) + self.__wait_for_new_tab(browser, 15, title_page_handle) + time.sleep(1) + + try: + news_item = self.__process_article_page(index_url, browser) + if news_item: + logger.debug(f"{self.collector_source} ... Title : {news_item.title}") + logger.debug(f"{self.collector_source} ... Review : {news_item.review.replace('\r', '').replace('\n', ' ').strip()[:100]}") + logger.debug(f"{self.collector_source} ... Content : {news_item.content.replace('\r', '').replace('\n', ' ').strip()[:100]}") + logger.debug(f"{self.collector_source} ... Published: {news_item.published}") + self.news_items.append(news_item) + else: + logger.warning(f"{self.collector_source} Parsing an article failed") + except Exception as error: + logger.exception(f"{self.collector_source} Parsing an article failed: {error}") + + if len(browser.window_handles) == 1: + back_clicks = 1 + while browser.current_url != index_url_just_before_click: + browser.back() + back_clicks += 1 + if back_clicks > 3: + logger.warning(f"{self.collector_source} Error during page crawl (cannot restore window after crawl)") + elif not self.__close_other_tabs(browser, title_page_handle, fallback_url=index_url): + logger.warning(f"{self.collector_source} Error during page crawl (after-crawl clean up)") + break + if count >= self.links_limit & self.links_limit > 0: + logger.debug(f"{self.collector_source} Limit for article links reached ({self.links_limit})") + break + + return processed_articles, failed_articles + + def __process_article_page(self, index_url, browser): + """Parse a single article. + + Parameters: + index_url (str): The URL of the title page. + browser (WebDriver): The browser instance. + Returns: + news_item (NewsItemData): The parsed news item. + """ + current_url = browser.current_url + + title = self.__find_element_text_by(browser, self.selectors["title"]) + + article_full_text = self.__find_element_text_by(browser, self.selectors["article_full_text"]) + if self.word_limit > 0: + article_full_text = " ".join(re.compile(r"\s+").split(article_full_text)[: self.word_limit]) + + if self.selectors["article_description"]: + article_description = self.__find_element_text_by(browser, self.selectors["article_description"]) + else: + article_description = "" + if self.word_limit > 0: + article_description = " ".join(re.compile(r"\s+").split(article_description)[: self.word_limit]) + if not article_description: + article_description = self.__smart_truncate(article_full_text) + + published_str = self.__find_element_text_by(browser, self.selectors["published"]) + if not published_str: + published_str = "today" + published = dateparser.parse(published_str, settings={"DATE_ORDER": "DMY"}) + published_str = published.strftime("%Y-%m-%d %H:%M") # remove microseconds/seconds from the screen, looks ugly + + link = current_url + + author = self.__find_element_text_by(browser, self.selectors["author"]) + + for_hash = author + title + article_description + news_item = NewsItemData( + uuid.uuid4(), + hashlib.sha256(for_hash.encode()).hexdigest(), + title, + article_description, + self.web_url, + link, + published_str, + author, + datetime.datetime.now(), + article_full_text, + self.source.id, + [], + ) + + if self.selectors["additional_id"]: + value = self.__find_element_text_by(browser, self.selectors["additional_id"]) + if value: + key = "Additional_ID" + binary_mime_type = "" + binary_value = "" + attribute = NewsItemAttribute(uuid.uuid4(), key, value, binary_mime_type, binary_value) + news_item.attributes.append(attribute) + return news_item diff --git a/src/collectors/managers/collectors_manager.py b/src/collectors/managers/collectors_manager.py index a95a7af7..565edd9f 100644 --- a/src/collectors/managers/collectors_manager.py +++ b/src/collectors/managers/collectors_manager.py @@ -1,129 +1,129 @@ -"""Contains the CollectorsManager class, which manages the collectors.""" - -import os -import threading -import time - -from managers.log_manager import logger -from collectors.atom_collector import AtomCollector -from collectors.email_collector import EmailCollector -from collectors.manual_collector import ManualCollector -from collectors.rss_collector import RSSCollector -from collectors.scheduled_tasks_collector import ScheduledTasksCollector -from collectors.slack_collector import SlackCollector -from collectors.twitter_collector import TwitterCollector -from collectors.web_collector import WebCollector -from remote.core_api import CoreApi - -collectors = {} -status_report_thread = None - - -def reportStatus(): - """Continuously send status updates to the Core API.""" - while True: - logger.debug(f"[{__name__}] Sending status update...") - response, status_code = CoreApi.update_collector_status() - if status_code != 200: - logger.warning(f"[{__name__}] Core status update response: HTTP {status_code}, {response}") - # for debugging scheduler tasks - # for key in collectors: - # for source in collectors[key].osint_sources: - # if hasattr(source, "scheduler_job"): - # logger.debug("Last run: {}, Next run: {}, {}".format(source.scheduler_job.last_run or "never", - # source.scheduler_job.next_run or "never", source.name)) - time.sleep(55) - - -def initialize(): - """Initialize the collectors.""" - logger.info(f"{__name__}: Initializing collector...") - - # inform core that this collector node is alive - status_report_thread = threading.Thread(target=reportStatus) - status_report_thread.daemon = True - status_report_thread.start() - - register_collector(RSSCollector()) - register_collector(WebCollector()) - register_collector(TwitterCollector()) - register_collector(EmailCollector()) - register_collector(SlackCollector()) - register_collector(AtomCollector()) - register_collector(ManualCollector()) - register_collector(ScheduledTasksCollector()) - - logger.info(f"{__name__}: Collector initialized.") - - -def register_collector(collector): - """Register a collector. - - Parameters: - collector: The collector object to register. - """ - collectors[collector.type] = collector - - class InitializeThread(threading.Thread): - """A thread class for initializing the collector.""" - - @classmethod - def run(cls): - """Run method for the collectors manager. - - Parameters: - cls: The class object. - """ - collector.initialize() - - initialize_thread = InitializeThread() - initialize_thread.start() - - -def refresh_collector(collector_type): - """Refresh the specified collector. - - Parameters: - collector_type (str): The type of the collector to refresh. - Returns: - (int): The HTTP status code indicating the result of the refresh operation. Returns 200 if the collector was refreshed successfully, - or 403 if the collector type is not found in the collectors dictionary. - """ - if collector_type in collectors: - - class RefreshThread(threading.Thread): - """A thread class for refreshing the collector.""" - - @classmethod - def run(cls): - """Run method for the collectors manager. - - Parameters: - cls: The class object. - """ - collectors[collector_type].refresh() - - refresh_thread = RefreshThread() - refresh_thread.start() - return 200 - else: - return 403 - - -def get_registered_collectors_info(id): - """Retrieve information about registered collectors. - - Parameters: - id (str): The ID of the collector. - Returns: - collectors_info (list): A list of collector information. - """ - config_file = os.getenv("COLLECTOR_CONFIG_FILE") - with open(config_file, "w") as file: - file.write(id) - - collectors_info = [] - for key in collectors: - collectors_info.append(collectors[key].get_info()) - - return collectors_info +"""Contains the CollectorsManager class, which manages the collectors.""" + +import os +import threading +import time + +from managers.log_manager import logger +from collectors.atom_collector import AtomCollector +from collectors.email_collector import EmailCollector +from collectors.manual_collector import ManualCollector +from collectors.rss_collector import RSSCollector +from collectors.scheduled_tasks_collector import ScheduledTasksCollector +from collectors.slack_collector import SlackCollector +from collectors.twitter_collector import TwitterCollector +from collectors.web_collector import WebCollector +from remote.core_api import CoreApi + +collectors = {} +status_report_thread = None + + +def reportStatus(): + """Continuously send status updates to the Core API.""" + while True: + logger.debug(f"Sending status update...") + response, status_code = CoreApi.update_collector_status() + if status_code != 200: + logger.warning(f"Core status update response: HTTP {status_code}, {response}") + # for debugging scheduler tasks + # for key in collectors: + # for source in collectors[key].osint_sources: + # if hasattr(source, "scheduler_job"): + # logger.debug("Last run: {}, Next run: {}, {}".format(source.scheduler_job.last_run or "never", + # source.scheduler_job.next_run or "never", source.name)) + time.sleep(55) + + +def initialize(): + """Initialize the collectors.""" + logger.info(f"{__name__}: Initializing collector...") + + # inform core that this collector node is alive + status_report_thread = threading.Thread(target=reportStatus) + status_report_thread.daemon = True + status_report_thread.start() + + register_collector(RSSCollector()) + register_collector(WebCollector()) + register_collector(TwitterCollector()) + register_collector(EmailCollector()) + register_collector(SlackCollector()) + register_collector(AtomCollector()) + register_collector(ManualCollector()) + register_collector(ScheduledTasksCollector()) + + logger.info(f"{__name__}: Collector initialized.") + + +def register_collector(collector): + """Register a collector. + + Parameters: + collector: The collector object to register. + """ + collectors[collector.type] = collector + + class InitializeThread(threading.Thread): + """A thread class for initializing the collector.""" + + @classmethod + def run(cls): + """Run method for the collectors manager. + + Parameters: + cls: The class object. + """ + collector.initialize() + + initialize_thread = InitializeThread() + initialize_thread.start() + + +def refresh_collector(collector_type): + """Refresh the specified collector. + + Parameters: + collector_type (str): The type of the collector to refresh. + Returns: + (int): The HTTP status code indicating the result of the refresh operation. Returns 200 if the collector was refreshed successfully, + or 403 if the collector type is not found in the collectors dictionary. + """ + if collector_type in collectors: + + class RefreshThread(threading.Thread): + """A thread class for refreshing the collector.""" + + @classmethod + def run(cls): + """Run method for the collectors manager. + + Parameters: + cls: The class object. + """ + collectors[collector_type].refresh() + + refresh_thread = RefreshThread() + refresh_thread.start() + return 200 + else: + return 403 + + +def get_registered_collectors_info(id): + """Retrieve information about registered collectors. + + Parameters: + id (str): The ID of the collector. + Returns: + collectors_info (list): A list of collector information. + """ + config_file = os.getenv("COLLECTOR_CONFIG_FILE") + with open(config_file, "w") as file: + file.write(id) + + collectors_info = [] + for key in collectors: + collectors_info.append(collectors[key].get_info()) + + return collectors_info