Skip to content

Commit

Permalink
Fix all conflicts of pre-commit
Browse files Browse the repository at this point in the history
  • Loading branch information
stefanDeveloper committed Oct 2, 2024
1 parent 37a6ac2 commit 47e688e
Show file tree
Hide file tree
Showing 27 changed files with 397 additions and 238 deletions.
2 changes: 1 addition & 1 deletion docs/api/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ API
logserver
prefilter
train
version
version
4 changes: 2 additions & 2 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
"sphinx.ext.autosummary",
"sphinx.ext.autosectionlabel",
"sphinxcontrib.bibtex",
"sphinx.ext.mathjax"
"sphinx.ext.mathjax",
]

# Napoleon settings
Expand Down Expand Up @@ -83,4 +83,4 @@
# -- Bibliography ------------------------------------------------------------
bibtex_bibfiles = ["refs.bib"]
bibtex_default_style = "unsrt"
bibtex_reference_style = "author_year"
bibtex_reference_style = "author_year"
2 changes: 1 addition & 1 deletion docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ Contents
pipeline
training
api/index
references
references
9 changes: 4 additions & 5 deletions docs/pipeline.rst
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,9 @@ Stage 4: Inspection
Overview
--------

The `Inspector` stage is responsible to run time-series based anomaly detection on prefiltered batches. This stage is essentiell to reduce
the load on the `Detection` stage.
Otherwise, resource complexity would increase disproportionately.
The `Inspector` stage is responsible to run time-series based anomaly detection on prefiltered batches. This stage is essentiell to reduce
the load on the `Detection` stage.
Otherwise, resource complexity would increase disproportionately.

Main Class
----------
Expand All @@ -390,7 +390,7 @@ Usage
-----

The :class:`Inspector` loads the StreamAD model to perform anomaly detection.
It consumes batches on the topic ``inspect``, usually produced by the ``Prefilter``.
It consumes batches on the topic ``inspect``, usually produced by the ``Prefilter``.
For a new batch, it derives the timestamps ``begin_timestamp`` and ``end_timestamp``.
Based on time type (e.g. ``s``, ``ms``) and time range (e.g. ``5``) the sliding non-overlapping window is created.
For univariate time-series, it counts the number of occurances, whereas for multivariate, it considers the packet size. :cite:`schuppen_fanci_2018`
Expand Down Expand Up @@ -462,4 +462,3 @@ Configuration
-------------

In case you want to load self-trained models, the :class:`Detector` needs a URL path, model name, and SHA256 checksum to download the model during start-up.

2 changes: 1 addition & 1 deletion docs/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
sphinx==8.0.2
sphinx-book-theme
sphinxcontrib.bibtex
sphinxcontrib.bibtex
2 changes: 1 addition & 1 deletion docs/training.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ Training
~~~~~~~~

Overview
========
========
4 changes: 2 additions & 2 deletions docs/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ Install all Python requirements.
$ python -m venv .venv
.. code-block:: console
$ source .venv/bin/activate
.. code-block:: console
(.venv) $ pip install -r requirements/requirements-dev.txt -r requirements/requirements.detector.txt -r requirements/requirements.logcollector.txt -r requirements/requirements.prefilter.txt -r requirements/requirements.inspector.txt
Expand Down
2 changes: 1 addition & 1 deletion requirements/requirements.train.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ numpy
polars
torch
xgboost
scikit-learn
scikit-learn
15 changes: 10 additions & 5 deletions src/base/logline_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def __init__(self, name: str, allowed_list: list, relevant_list: list):
self.allowed_list = allowed_list

if relevant_list and not all(e in allowed_list for e in relevant_list):
raise ValueError('Relevant types are not allowed types')
raise ValueError("Relevant types are not allowed types")

self.relevant_list = relevant_list

Expand Down Expand Up @@ -137,7 +137,7 @@ def __init__(self):
self.instances_by_position = {}
self.number_of_fields = 0

for field in CONFIG['loglines']['fields']:
for field in CONFIG["loglines"]["fields"]:
instance = self._create_instance_from_list_entry(field)

if self.instances_by_name.get(instance.name):
Expand Down Expand Up @@ -171,7 +171,8 @@ def validate_logline(self, logline: str) -> bool:
# check number of entries
if len(parts) != self.number_of_fields:
logger.warning(
f"Logline contains {len(parts)} value(s), not {self.number_of_fields}.")
f"Logline contains {len(parts)} value(s), not {self.number_of_fields}."
)
return False

valid_values = []
Expand Down Expand Up @@ -245,7 +246,9 @@ def check_relevance(self, logline_dict: dict) -> bool:
for i in self.instances_by_position:
current_instance = self.instances_by_position[i]
if isinstance(current_instance, ListItem):
if not current_instance.check_relevance(logline_dict[current_instance.name]):
if not current_instance.check_relevance(
logline_dict[current_instance.name]
):
relevant = False
break

Expand Down Expand Up @@ -284,7 +287,9 @@ def _create_instance_from_list_entry(field_list: list):
raise ValueError("Invalid ListItem parameters")

relevant_list = field_list[3] if len_of_field_list == 4 else None
instance = cls(name=name, allowed_list=field_list[2], relevant_list=relevant_list)
instance = cls(
name=name, allowed_list=field_list[2], relevant_list=relevant_list
)

elif cls_name == "IpAddress":
if len_of_field_list != 2:
Expand Down
10 changes: 6 additions & 4 deletions src/base/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def setup_config():


def validate_host(
host: int | str | bytes | ipaddress.IPv4Address | ipaddress.IPv6Address
host: int | str | bytes | ipaddress.IPv4Address | ipaddress.IPv6Address,
) -> ipaddress.IPv4Address | ipaddress.IPv6Address:
"""
Checks if the given host is a valid IP address. If it is, the IP address is returned with IP address type.
Expand Down Expand Up @@ -101,7 +101,9 @@ def kafka_delivery_report(err: None | KafkaError, msg: None | Message):
)


def get_first_part_of_ipv4_address(address: ipaddress.IPv4Address, length: int) -> ipaddress.IPv4Address:
def get_first_part_of_ipv4_address(
address: ipaddress.IPv4Address, length: int
) -> ipaddress.IPv4Address:
"""
Returns the first part of an IPv4 address, the rest is filled with 0. For example:
>>> get_first_part_of_ipv4_address(ipaddress.IPv4Address("255.255.255.255"), 23)
Expand All @@ -120,9 +122,9 @@ def get_first_part_of_ipv4_address(address: ipaddress.IPv4Address, length: int)
raise ValueError("Invalid length. Must be between 0 and 32.")

if isinstance(address, ipaddress.IPv4Address):
binary_string = ''.join(format(byte, '08b') for byte in address.packed)
binary_string = "".join(format(byte, "08b") for byte in address.packed)
first_part_binary = binary_string[:length]
first_part_binary_padded = first_part_binary.ljust(32, '0')
first_part_binary_padded = first_part_binary.ljust(32, "0")
first_part_address = ipaddress.IPv4Address(int(first_part_binary_padded, 2))
else:
raise ValueError("Invalid IP address format")
Expand Down
1 change: 1 addition & 0 deletions src/detector/detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class Detector:
"""Logs detection with probability score of requests. It runs the provided machine learning model.
In addition, it returns all individually probabilities of the anomalous batch.
"""

def __init__(self) -> None:
self.topic = "detector"
self.messages = []
Expand Down
4 changes: 2 additions & 2 deletions src/inspector/inspector.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ class EnsembleModels(str, Enum):


class Inspector:
"""Finds anomalies in a batch of requests and produces it to the ``Detector``.
"""
"""Finds anomalies in a batch of requests and produces it to the ``Detector``."""

def __init__(self) -> None:
self.key = None
self.topic = "Collector"
Expand Down
62 changes: 42 additions & 20 deletions src/logcollector/batch_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ def get_number_of_buffered_messages(self, key: str) -> int:
return 0

@staticmethod
def sort_messages(data: list[tuple[str, str]], timestamp_format: str = "%Y-%m-%dT%H:%M:%S.%fZ") -> list[str]:
def sort_messages(
data: list[tuple[str, str]], timestamp_format: str = "%Y-%m-%dT%H:%M:%S.%fZ"
) -> list[str]:
"""
Sorts the given list of loglines by their respective timestamps, in ascending order.
Expand All @@ -89,13 +91,17 @@ def sort_messages(data: list[tuple[str, str]], timestamp_format: str = "%Y-%m-%d
Returns:
List of log lines as strings sorted by timestamps (ascending)
"""
sorted_data = sorted(data, key=lambda x: datetime.strptime(x[0], timestamp_format))
sorted_data = sorted(
data, key=lambda x: datetime.strptime(x[0], timestamp_format)
)
loglines = [message for _, message in sorted_data]

return loglines

@staticmethod
def extract_tuples_from_json_formatted_strings(data: list[str]) -> list[tuple[str, str]]:
def extract_tuples_from_json_formatted_strings(
data: list[str],
) -> list[tuple[str, str]]:
"""
Args:
data (list[str]): Input list of strings to be prepared
Expand Down Expand Up @@ -165,7 +171,9 @@ def sort_buffer(self, key: str):
key (str): Key for which to sort entries
"""
if key in self.buffer:
self.buffer[key] = self.sort_messages(self.extract_tuples_from_json_formatted_strings(self.buffer[key]))
self.buffer[key] = self.sort_messages(
self.extract_tuples_from_json_formatted_strings(self.buffer[key])
)

def sort_batch(self, key: str):
"""
Expand All @@ -175,7 +183,9 @@ def sort_batch(self, key: str):
key (str): Key for which to sort entries
"""
if key in self.batch:
self.batch[key] = self.sort_messages(self.extract_tuples_from_json_formatted_strings(self.batch[key]))
self.batch[key] = self.sort_messages(
self.extract_tuples_from_json_formatted_strings(self.batch[key])
)

def complete_batch(self, key: str) -> dict:
"""
Expand Down Expand Up @@ -219,7 +229,9 @@ def complete_batch(self, key: str) -> dict:

if self.buffer: # Variant 3: Only buffer has entries
logger.debug("Variant 3: Only buffer has entries.")
logger.debug("Deleting buffer data (has no influence on analysis since it was too long ago)...")
logger.debug(
"Deleting buffer data (has no influence on analysis since it was too long ago)..."
)
del self.buffer[key]
else: # Variant 4: No data exists
logger.debug("Variant 4: No data exists. Nothing to send.")
Expand Down Expand Up @@ -286,10 +298,14 @@ def add_message(self, key: str, message: str) -> None:
number_of_messages_for_key = self.batch.get_number_of_messages(key)

if number_of_messages_for_key >= BATCH_SIZE:
logger.debug(f"Batch for {key=} is full. Calling _send_batch_for_key({key})...")
logger.debug(
f"Batch for {key=} is full. Calling _send_batch_for_key({key})..."
)
self._send_batch_for_key(key)
logger.info(f"Full batch: Successfully sent batch messages for subnet_id {key}.\n"
f" ⤷ {number_of_messages_for_key} messages sent.")
logger.info(
f"Full batch: Successfully sent batch messages for subnet_id {key}.\n"
f" ⤷ {number_of_messages_for_key} messages sent."
)
elif not self.timer: # First time setting the timer
logger.debug("Timer not set yet. Calling _reset_timer()...")
self._reset_timer()
Expand All @@ -304,7 +320,9 @@ def _send_all_batches(self, reset_timer: bool = True) -> None:
for key in self.batch.get_stored_keys():
number_of_keys += 1
total_number_of_batch_messages += self.batch.get_number_of_messages(key)
total_number_of_buffer_messages += self.batch.get_number_of_buffered_messages(key)
total_number_of_buffer_messages += (
self.batch.get_number_of_buffered_messages(key)
)
self._send_batch_for_key(key)

if reset_timer:
Expand All @@ -314,17 +332,21 @@ def _send_all_batches(self, reset_timer: bool = True) -> None:
return

if number_of_keys == 1:
logger.info("Successfully sent all batches.\n"
f" ⤷ Batch for one subnet_id with "
f"{total_number_of_batch_messages + total_number_of_buffer_messages} message(s) sent ("
f"{total_number_of_batch_messages} batch message(s), {total_number_of_buffer_messages} "
f"buffer message(s)).")
logger.info(
"Successfully sent all batches.\n"
f" ⤷ Batch for one subnet_id with "
f"{total_number_of_batch_messages + total_number_of_buffer_messages} message(s) sent ("
f"{total_number_of_batch_messages} batch message(s), {total_number_of_buffer_messages} "
f"buffer message(s))."
)
else: # if number_of_keys > 1
logger.info("Successfully sent all batches.\n"
f" ⤷ Batches for {number_of_keys} subnet_ids sent. "
f"In total: {total_number_of_batch_messages + total_number_of_buffer_messages} messages ("
f"{total_number_of_batch_messages} batch message(s), {total_number_of_buffer_messages} "
f"buffer message(s))")
logger.info(
"Successfully sent all batches.\n"
f" ⤷ Batches for {number_of_keys} subnet_ids sent. "
f"In total: {total_number_of_batch_messages + total_number_of_buffer_messages} messages ("
f"{total_number_of_batch_messages} batch message(s), {total_number_of_buffer_messages} "
f"buffer message(s))"
)

def _send_batch_for_key(self, key: str) -> None:
logger.debug(f"Starting to send the batch for {key=}...")
Expand Down
2 changes: 1 addition & 1 deletion src/logcollector/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def fetch_logline(self) -> None:
logger.debug("Fetching new logline from LogServer...")
try:
with socket.socket(
socket.AF_INET, socket.SOCK_STREAM
socket.AF_INET, socket.SOCK_STREAM
) as self.client_socket:
logger.debug(
f"Trying to connect to LogServer ({self.log_server['host']}:{self.log_server['port']})..."
Expand Down
16 changes: 11 additions & 5 deletions src/logserver/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,19 @@ async def open(self):
async def handle_connection(self, reader, writer, sending: bool):
logger.debug(f"Handling connection with {sending=}...")
if self.number_of_connections < MAX_NUMBER_OF_CONNECTIONS:
logger.debug(f"Adding connection to {self.number_of_connections}/{MAX_NUMBER_OF_CONNECTIONS}) open "
f"connections...")
logger.debug(
f"Adding connection to {self.number_of_connections}/{MAX_NUMBER_OF_CONNECTIONS}) open "
f"connections..."
)
self.number_of_connections += 1
client_address = writer.get_extra_info("peername")
logger.debug(f"Connection from {client_address} accepted")

try:
if sending:
logger.debug("Sending active: Calling send_logline for next available logline...")
logger.debug(
"Sending active: Calling send_logline for next available logline..."
)
await self.send_logline(writer, self.get_next_logline())
else:
logger.debug("Receiving: Calling receive_logline...")
Expand All @@ -112,12 +116,14 @@ async def handle_kafka_inputs(self):
loop = asyncio.get_running_loop()

while True:
key, value = await loop.run_in_executor(None, self.kafka_consume_handler.consume)
key, value = await loop.run_in_executor(
None, self.kafka_consume_handler.consume
)
logger.info(f"Received message via Kafka:\n{value}")
self.data_queue.put(value)

async def async_follow(self, file: str = READ_FROM_FILE):
async with aiofiles.open(file, mode='r') as file:
async with aiofiles.open(file, mode="r") as file:
# jump to end of file
await file.seek(0, 2)

Expand Down
Loading

0 comments on commit 47e688e

Please sign in to comment.