Skip to content

Commit

Permalink
Merge pull request #221 from Progress1/collectors_logs
Browse files Browse the repository at this point in the history
Collectors update (Limit for article links, logs, fixes)
  • Loading branch information
Progress1 authored Mar 7, 2024
2 parents 5fce1bf + ef6cf16 commit 8fd28c9
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 118 deletions.
88 changes: 48 additions & 40 deletions src/collectors/collectors/atom_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ class AtomCollector(BaseCollector):
description = "Collector for gathering data from Atom feeds"

parameters = [Parameter(0, "ATOM_FEED_URL", "Atom feed URL", "Full url for Atom feed", ParameterType.STRING),
Parameter(0, "USER_AGENT", "User agent", "Type of user agent", ParameterType.STRING)
Parameter(0, "USER_AGENT", "User agent", "Type of user agent", ParameterType.STRING),
Parameter(0, "LINKS_LIMIT", "Limit for article links",
"OPTIONAL: Maximum number of article links to process. Default: all", ParameterType.NUMBER)
]

parameters.extend(BaseCollector.parameters)
Expand All @@ -32,6 +34,8 @@ def collect(self, source):
feed_url = source.parameter_values['ATOM_FEED_URL']
user_agent = source.parameter_values['USER_AGENT']
interval = source.parameter_values['REFRESH_INTERVAL']
links_limit = BaseCollector.read_int_parameter("LINKS_LIMIT", 0, source)

log_manager.log_collector_activity("atom", source.name, "Starting collector for url: {}".format(feed_url))

proxies = {}
Expand All @@ -55,48 +59,52 @@ def collect(self, source):

news_items = []

limit = BaseCollector.history(interval)
count = 0
for feed_entry in feed['entries']:
published = feed_entry['updated']
published = parse(published, tzinfos=BaseCollector.timezone_info())
# comment this at the beginning of the testing to get some initial data
if str(published) > str(limit):
link_for_article = feed_entry['link']
log_manager.log_collector_activity("atom", source.name, "Processing entry [{}]".format(link_for_article))
if proxies:
page = requests.get(link_for_article, headers={'User-Agent': user_agent}, proxies=proxies)
else:
page = requests.get(link_for_article, headers={'User-Agent': user_agent})

html_content = page.text

if html_content:
content = BeautifulSoup(html_content, features='html.parser').text
else:
content = ''

description = feed_entry['summary'][:500].replace('<p>', ' ')

for_hash = feed_entry['author'] + feed_entry['title'] + feed_entry['link']

news_item = NewsItemData(
uuid.uuid4(),
hashlib.sha256(for_hash.encode()).hexdigest(),
feed_entry['title'],
description,
feed_url,
feed_entry['link'],
feed_entry['updated'],
feed_entry['author'],
datetime.datetime.now(),
content,
source.id,
[]
)

news_items.append(news_item)
count += 1
link_for_article = feed_entry['link']
log_manager.log_collector_activity("atom", source.name, "Visiting article {}/{}: {}".format(count, len(feed["entries"]), link_for_article))
if proxies:
page = requests.get(link_for_article, headers={'User-Agent': user_agent}, proxies=proxies)
else:
page = requests.get(link_for_article, headers={'User-Agent': user_agent})

html_content = page.text

if html_content:
content = BeautifulSoup(html_content, features='html.parser').text
else:
content = ''

description = feed_entry['summary'][:500].replace('<p>', ' ')

# author can exist/miss in header/entry
author = feed_entry['author'] if "author" in feed_entry else ""
for_hash = author + feed_entry['title'] + feed_entry['link']

news_item = NewsItemData(
uuid.uuid4(),
hashlib.sha256(for_hash.encode()).hexdigest(),
feed_entry['title'],
description,
feed_url,
feed_entry['link'],
feed_entry['updated'],
author,
datetime.datetime.now(),
content,
source.id,
[]
)

news_items.append(news_item)

if count >= links_limit & links_limit > 0:
log_manager.log_collector_activity('atom', source.name, 'Limit for article links reached ({})'.format(links_limit))
break

BaseCollector.publish(news_items, source)

except Exception as error:
log_manager.log_collector_activity("atom", source.name, "ATOM collection exceptionally failed")
BaseCollector.print_exception(source, error)
Expand Down
17 changes: 13 additions & 4 deletions src/collectors/collectors/base_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,11 @@ def wrapper(self, source):

@staticmethod
def print_exception(source, error):
log_info('OSINTSource ID: ' + source.id)
log_info('OSINTSource name: ' + source.name)
log_warning('OSINTSource name: ' + source.name)
if str(error).startswith('b'):
log_info('ERROR: ' + str(error)[2:-1])
log_warning('ERROR: ' + str(error)[2:-1])
else:
log_info('ERROR: ' + str(error))
log_warning('ERROR: ' + str(error))

@staticmethod
def timezone_info():
Expand Down Expand Up @@ -306,3 +305,13 @@ def refresh(self):

def initialize(self):
self.refresh()

@staticmethod
def read_int_parameter(name, default_value, source):
try:
val = int(source.parameter_values[name])
if val <= 0:
val = default_value
except Exception:
val = default_value
return val
100 changes: 52 additions & 48 deletions src/collectors/collectors/rss_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class RSSCollector(BaseCollector):
parameters = [
Parameter(0, "FEED_URL", "Feed URL", "Full url for RSS feed", ParameterType.STRING),
Parameter(0, "USER_AGENT", "User agent", "Type of user agent", ParameterType.STRING),
Parameter(0, "LINKS_LIMIT", "Limit for article links",
"OPTIONAL: Maximum number of article links to process. Default: all", ParameterType.NUMBER),
]

parameters.extend(BaseCollector.parameters)
Expand All @@ -46,6 +48,7 @@ def collect(self, source):
"""
feed_url = source.parameter_values["FEED_URL"]
interval = source.parameter_values["REFRESH_INTERVAL"]
links_limit = BaseCollector.read_int_parameter("LINKS_LIMIT", 0, source)

log_manager.log_collector_activity("rss", source.name, "Starting collector for url: {}".format(feed_url))

Expand Down Expand Up @@ -101,59 +104,60 @@ def collect(self, source):

news_items = []

limit = BaseCollector.history(interval)
count = 0
for feed_entry in feed["entries"]:
count += 1
for key in ["author", "published", "title", "description", "link"]:
if key not in feed_entry.keys():
feed_entry[key] = ""

published = feed_entry["published"]
published = dateparser.parse(published, settings={"DATE_ORDER": "DMY"})
# comment this at the beginning of the testing to get some initial data
if str(published) > str(limit):
link_for_article = feed_entry["link"]
if not link_for_article:
log_manager.log_collector_activity("rss", source.name, "Skipping (empty link)")
continue

log_manager.log_collector_activity("rss", source.name, "Processing entry [{}]".format(link_for_article))

html_content = ""
request = urllib.request.Request(link_for_article)
request.add_header("User-Agent", user_agent)

with opener(request) as response:
html_content = response.read()

soup = BeautifulSoup(html_content, features="html.parser")

content = ""

if html_content:
content_text = [p.text.strip() for p in soup.findAll("p")]
replaced_str = "\xa0"
if replaced_str:
content = [w.replace(replaced_str, " ") for w in content_text]
content = " ".join(content)

for_hash = feed_entry["author"] + feed_entry["title"] + feed_entry["link"]

news_item = NewsItemData(
uuid.uuid4(),
hashlib.sha256(for_hash.encode()).hexdigest(),
feed_entry["title"],
feed_entry["description"],
feed_url,
feed_entry["link"],
feed_entry["published"],
feed_entry["author"],
datetime.datetime.now(),
content,
source.id,
[],
)

news_items.append(news_item)
link_for_article = feed_entry["link"]
if not link_for_article:
log_manager.log_collector_activity("rss", source.name, "Skipping (empty link)")
continue

log_manager.log_collector_activity("rss", source.name, "Visiting article {}/{}: {}".format(count, len(feed["entries"]), link_for_article))

html_content = ""
request = urllib.request.Request(link_for_article)
request.add_header("User-Agent", user_agent)

with opener(request) as response:
html_content = response.read()

soup = BeautifulSoup(html_content, features="html.parser")

content = ""

if html_content:
content_text = [p.text.strip() for p in soup.findAll("p")]
replaced_str = "\xa0"
if replaced_str:
content = [w.replace(replaced_str, " ") for w in content_text]
content = " ".join(content)

for_hash = feed_entry["author"] + feed_entry["title"] + feed_entry["link"]

news_item = NewsItemData(
uuid.uuid4(),
hashlib.sha256(for_hash.encode()).hexdigest(),
feed_entry["title"],
feed_entry["description"],
feed_url,
feed_entry["link"],
feed_entry["published"],
feed_entry["author"],
datetime.datetime.now(),
content,
source.id,
[],
)

news_items.append(news_item)

if count >= links_limit & links_limit > 0:
log_manager.log_collector_activity('rss', source.name, 'Limit for article links reached ({})'.format(links_limit))
break

BaseCollector.publish(news_items, source)

Expand Down
Loading

0 comments on commit 8fd28c9

Please sign in to comment.