Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge dev-manuel into main #1

Merged
merged 43 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
092b804
update requirements.txt
lamr02n May 15, 2024
e48c1eb
introduce first sandbox code (not usable)
lamr02n May 16, 2024
f91d9f8
change producer sandbox code (not usable)
lamr02n May 17, 2024
6bcf7a9
first working sandbox code
lamr02n May 17, 2024
4a0495e
add draft for pipeline figure
lamr02n May 17, 2024
6e00bff
add draft for pipeline figure
lamr02n May 17, 2024
271fab6
add second sandbox try
lamr02n May 17, 2024
18bee11
add tests and prototype server
lamr02n May 17, 2024
0633ed8
update draft
lamr02n May 17, 2024
6613906
update server methods
lamr02n May 18, 2024
bea0cbd
add collector
lamr02n May 18, 2024
d5c58fc
make server asynchronous
lamr02n May 18, 2024
61c5546
add first working version
lamr02n May 18, 2024
de3fcb8
add logging messages in receive_logline
lamr02n May 18, 2024
42d0d1b
rename mock_generator.py
lamr02n May 18, 2024
5425e47
add logger
lamr02n May 18, 2024
14422c4
add logger
lamr02n May 18, 2024
ced02a9
move receive_logline method up
lamr02n May 18, 2024
5588fe4
clean up server.py
lamr02n May 18, 2024
a45dd13
make files Terminal executable
lamr02n May 19, 2024
2eff790
make files Terminal executable
lamr02n May 19, 2024
143ae32
rename variables in server.py
lamr02n May 19, 2024
0292176
update docs
lamr02n May 20, 2024
a390abb
add log generation
lamr02n May 21, 2024
cb32aa4
add extraction including tests
lamr02n May 21, 2024
4e4d129
add all tests for collector.py
lamr02n May 21, 2024
6bec1d3
add all tests for collector.py
lamr02n May 21, 2024
bb6d35c
update test_utils.py
lamr02n May 21, 2024
b7292d3
update test_server.py
lamr02n May 21, 2024
78bbe85
add remaining tests for server.py
lamr02n May 22, 2024
c29ef08
update tests
lamr02n May 22, 2024
b857576
delete old_tests directory
lamr02n May 22, 2024
0a3d18a
update collector tests
lamr02n May 22, 2024
5903eb3
update error messages in collector.py
lamr02n May 23, 2024
3570fa4
update drafts and docs
lamr02n May 24, 2024
cc18387
change collector.py variables to lists
lamr02n May 26, 2024
b90facc
add Kafka producer to collector.py
lamr02n May 26, 2024
7611cc8
add Kafka producer to collector.py
lamr02n May 26, 2024
1d9134e
update get topic name method
lamr02n May 27, 2024
2dcb172
prepare testing environment
lamr02n May 28, 2024
54f94a9
prepare testing environment
lamr02n May 28, 2024
d62fd40
prepare testing environment
lamr02n May 28, 2024
9fda7dd
move files to top directory
lamr02n May 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
178 changes: 178 additions & 0 deletions heidgaf_log_collector/collector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
import ipaddress
import json
import logging
import os # needed for Terminal execution
import re
import socket
import sys # needed for Terminal execution

from confluent_kafka import Producer

sys.path.append(os.getcwd()) # needed for Terminal execution
from heidgaf_log_collector.utils import validate_host
from heidgaf_log_collector.utils import kafka_delivery_report
from heidgaf_log_collector import utils
from pipeline_prototype.logging_config import setup_logging

setup_logging()
logger = logging.getLogger(__name__)

# LOG FORMAT:
# TIMESTAMP STATUS CLIENT_IP DNS_IP HOST_DOMAIN_NAME RECORD_TYPE RESPONSE_IP SIZE
# EXAMPLE:
# 2024-05-21T08:31:28.119Z NOERROR 192.168.0.105 8.8.8.8 www.heidelberg-botanik.de A
# b937:2f2e:2c1c:82a:33ad:9e59:ceb9:8e1 150b

KAFKA_BROKER_HOST = "localhost" # TODO: Move to config file
KAFKA_BROKER_PORT = 9092 # TODO: Move to config file
SUBNET_CUTOFF_LENGTH = 24

valid_statuses = [
"NOERROR",
"NXDOMAIN",
]

valid_record_types = [
"AAAA",
"A",
]


class LogCollector:
log_server = {
# Are filled in the methods, all values empty at the beginning.
# "host": None,
# "port": None,
}
logline = None
kafka_producer = None
log_data = {
# Are filled in the methods, all values empty at the beginning.
# "timestamp": None,
# "status": None,
# "client_ip": None,
# "dns_ip": None,
# "host_domain_name": None,
# "record_type": None,
# "response_ip": None,
# "size": None,
}

def __init__(self, server_host, server_port):
self.log_server["host"] = utils.validate_host(server_host)
self.log_server["port"] = utils.validate_port(server_port)

# Kafka setup
conf = {'bootstrap.servers': f"{KAFKA_BROKER_HOST}:{KAFKA_BROKER_PORT}"}
self.kafka_producer = Producer(conf)

def fetch_logline(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as self.client_socket:
self.client_socket.connect((str(self.log_server.get("host")), self.log_server.get("port")))
while True:
data = self.client_socket.recv(1024)
if not data:
break
self.logline = data.decode('utf-8')
logger.info(f"Received logline: {self.logline}")

def validate_and_extract_logline(self):
parts = self.logline.split()

try:
if not self._check_length(parts):
raise ValueError(f"Logline does not contain exactly 8 values, but {len(parts)} values were found.")
if not self._check_timestamp(parts[0]):
raise ValueError(f"Invalid timestamp")
if not self._check_status(parts[1]):
raise ValueError(f"Invalid status")
if not self._check_domain_name(parts[4]):
raise ValueError(f"Invalid domain name")
if not self._check_record_type(parts[5]):
raise ValueError(f"Invalid record type")
if not self._check_size(parts[7]):
raise ValueError(f"Invalid size value")
except ValueError as e:
raise ValueError(f"Incorrect logline: {e}")

try:
self.log_data["client_ip"] = validate_host(parts[2])
self.log_data["dns_ip"] = validate_host(parts[3])
self.log_data["response_ip"] = validate_host(parts[6])
except ValueError as e:
self.log_data["client_ip"] = None
self.log_data["dns_ip"] = None
self.log_data["response_ip"] = None
raise ValueError(f"Incorrect logline: {e}")

self.log_data["timestamp"] = parts[0]
self.log_data["status"] = parts[1]
self.log_data["host_domain_name"] = parts[4]
self.log_data["record_type"] = parts[5]
self.log_data["size"] = parts[7]

def produce(self):
log_entry = self.log_data.copy()
log_entry["client_ip"] = str(self.log_data["client_ip"])
log_entry["dns_ip"] = str(self.log_data["dns_ip"])
log_entry["response_ip"] = str(self.log_data["response_ip"])

self.kafka_producer.produce(
topic=self._get_topic_name(),
key=log_entry["client_ip"],
value=json.dumps(log_entry),
callback=kafka_delivery_report,
)

self.kafka_producer.flush()

def _get_topic_name(self, length: int = SUBNET_CUTOFF_LENGTH) -> str:
try:
address = ipaddress.IPv4Address(self.log_data.get("client_ip"))
except ValueError as e:
raise ValueError(f"Invalid IP address format: {e}")

cutoff_address = utils.get_first_part_of_ipv4_address(address, length)

return f"{cutoff_address}_{length}"

@staticmethod
def _check_length(parts: list[str]) -> bool:
return len(parts) == 8

@staticmethod
def _check_timestamp(timestamp: str) -> bool:
pattern = r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z$"
if re.match(pattern, timestamp):
return True
return False

@staticmethod
def _check_status(status: str):
return status in valid_statuses

@staticmethod
def _check_domain_name(domain_name: str):
pattern = r"^(?=.{1,253}$)((?!-)[A-Za-z0-9-]{1,63}(?<!-)\.)+[A-Za-z]{2,63}$"
if re.match(pattern, domain_name):
return True
return False

@staticmethod
def _check_record_type(record_type: str):
return record_type in valid_record_types

@staticmethod
def _check_size(size: str):
pattern = r"^\d+b$"

if re.match(pattern, size):
return True
return False


if __name__ == '__main__':
collector = LogCollector("127.0.0.1", 9998)
collector.fetch_logline()
collector.validate_and_extract_logline()
collector.produce()
55 changes: 55 additions & 0 deletions heidgaf_log_collector/log_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import datetime
import ipaddress
import random


def generate_dns_log_line():
timestamp = datetime.datetime.now().replace(hour=random.randint(0, 23),
minute=random.randint(0, 59),
second=random.randint(0, 59),
microsecond=random.randint(0, 999999)
).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
statuses = ["NOERROR"] * 4 + ["NXDOMAIN"]
status = random.choice(statuses)
client_ip = f'192.168.0.{random.randint(1, 255)}'
domain_suffixes = ["com", "org", "de"]
domain_names = ['example', 'testsite', 'mydomain', 'google', 'website',
'uni-heidelberg', 'hei', 'urz', 'mathematikon', 'bioquant',
'klinikum-heidelberg', 'hochschulsport', 'studentenwerk-heidelberg',
'heidelberg-research', 'campuslife-hei', 'philfak-uni-hd',
'heidelberg-alumni', 'studium-generale-hei', 'hei-bibliothek',
'physik-uni-heidelberg', 'chemie-heidelberg', 'historischesmuseum-hd',
'heidelberg-sportclub', 'uni-hd-jura', 'medizin-forschung-hei',
'heidelberg-studentenleben', 'altstadt-heidelberg', 'biofak-hei',
'physiklabor-uni-hd', 'philosophie-heidelberg', 'heidelberger-schloss',
'uni-hd-musik', 'mathematik-hei', 'heidelberg-botanik',
'altphilologie-hei', 'neurowissenschaft-hd', 'hei-campusradio',
'medieninformatik-uni-hd', 'heidelberg-geowissenschaften',
'studienberatung-hei', 'heidelberg-sprachenzentrum', 'uni-hd-vwl',
'heidelberger-theater', 'physiologie-uni-hd', 'ethnologie-heidelberg',
'biotech-hei', 'uni-hd-psychologie', 'heidelberg-stadtbibliothek',
'geographie-uni-heidelberg', 'soziologie-hei', 'heidelberg-mensa',
'uni-hd-philosophenweg', 'astronomie-heidelberg', 'heidelberger-kunst',
'uni-hd-bwl', 'altstadt-campus-hd', 'heidelberg-law', 'uni-hd-theologie',
'heidelberg-biologie', 'heimatmuseum-hd', 'uni-heidelberg-chemie',
'heidelberg-studentenrat', 'campus-kunst-heidelberg']

hostname = f"www.{random.choice(domain_names)}.{random.choice(domain_suffixes)}"
record_type = random.choice(["AAAA", "A"])
response = str(ipaddress.IPv6Address(random.randint(0, 2 ** 128 - 1)))
size = f"{random.randint(50, 150)}b"

# ONLY FOR TESTING # TODO: Remove
client_ip = '192.168.0.160'
#################################

return f"{timestamp} {status} {client_ip} 8.8.8.8 {hostname} {record_type} {response} {size}"


if __name__ == "__main__":
dns_log_lines = [generate_dns_log_line() for _ in range(100000)]

# Writing to a file
with open("../pipeline_prototype/sandbox/dns_log.txt", "w") as file:
for line in dns_log_lines:
file.write(line + "\n")
36 changes: 36 additions & 0 deletions heidgaf_log_collector/mock_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import logging
import os # needed for Terminal execution
import socket
import sys # needed for Terminal execution

sys.path.append(os.getcwd()) # needed for Terminal execution
from heidgaf_log_collector import utils
from pipeline_prototype.logging_config import setup_logging
from heidgaf_log_collector.log_generator import generate_dns_log_line

setup_logging()
logger = logging.getLogger(__name__)


def generate_random_logline():
return generate_dns_log_line()


class LogGenerator:
server_host = None
server_port = None

def __init__(self, server_host, server_port):
self.server_host = utils.validate_host(server_host)
self.server_port = utils.validate_port(server_port)

def send_logline(self, logline: str):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as self.client_socket:
self.client_socket.connect((str(self.server_host), self.server_port))
self.client_socket.send(logline.encode('utf-8'))
logger.info(f"Sent {logline} to server")


if __name__ == "__main__":
generator = LogGenerator("127.0.0.1", 9999)
generator.send_logline(generate_random_logline())
120 changes: 120 additions & 0 deletions heidgaf_log_collector/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import asyncio
import logging
import os # needed for Terminal execution
import queue
import sys # needed for Terminal execution

sys.path.append(os.getcwd()) # needed for Terminal execution
from heidgaf_log_collector import utils
from pipeline_prototype.logging_config import setup_logging

MAX_NUMBER_OF_CONNECTIONS = 5

setup_logging()
logger = logging.getLogger(__name__)


class LogServer:
host = None
port_out = None
port_in = None
socket = None
number_of_connections = 0

def __init__(self, host: str, port_in: int, port_out: int) -> None:
self.host = utils.validate_host(host)
self.port_in = utils.validate_port(port_in)
self.port_out = utils.validate_port(port_out)
self.data_queue = queue.Queue()

async def open(self):
send_server = await asyncio.start_server(
self.handle_send_logline,
str(self.host),
self.port_out
)
receive_server = await asyncio.start_server(
self.handle_receive_logline,
str(self.host),
self.port_in
)
logger.info(
f"LogServer running on {self.host}:{self.port_out} for sending, " +
f"and on {self.host}:{self.port_in} for receiving"
)

try:
await asyncio.gather(
send_server.serve_forever(),
receive_server.serve_forever()
)
except KeyboardInterrupt:
pass

send_server.close()
receive_server.close()
await asyncio.gather(
send_server.wait_closed(),
receive_server.wait_closed()
)

async def handle_connection(self, reader, writer, sending: bool):
if self.number_of_connections <= MAX_NUMBER_OF_CONNECTIONS:
self.number_of_connections += 1
client_address = writer.get_extra_info('peername')
logger.debug(f"Connection from {client_address} accepted")

try:
if sending:
await self.send_logline(writer, self.get_next_logline())
else:
await self.receive_logline(reader)
except asyncio.CancelledError:
pass
finally:
logger.debug(f"Connection to {client_address} closed")
writer.close()
await writer.wait_closed()
self.number_of_connections -= 1
else:
client_address = writer.get_extra_info('peername')
logger.warning(
f"Client connection to {client_address} denied. Max number of connections reached!"
)
writer.close()
await writer.wait_closed()

async def handle_send_logline(self, reader, writer):
await self.handle_connection(reader, writer, True)

async def handle_receive_logline(self, reader, writer):
await self.handle_connection(reader, writer, False)

@staticmethod
async def send_logline(writer, logline):
if logline:
writer.write(logline.encode('utf-8'))
await writer.drain()
logger.info(f"Logline sent: {logline}")
return

logger.info("No logline available")

async def receive_logline(self, reader):
while True:
data = await reader.read(1024)
if not data:
break
received_message = data.decode()
logger.info(f"Received message: {received_message}")
self.data_queue.put(received_message)

def get_next_logline(self) -> str | None:
if not self.data_queue.empty():
return self.data_queue.get()
return None


if __name__ == '__main__':
server = LogServer("127.0.0.1", 9999, 9998)
asyncio.run(server.open())
Empty file.
Loading
Loading