Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ClickHouse connection #57

Merged
merged 60 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from 50 commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
9bd5bdf
Add monitoring related files
lamr02n Nov 20, 2024
0baa73b
Merge branch 'refs/heads/main' into add-clickhouse-connection
lamr02n Nov 25, 2024
1e25801
Make LogServer write to ClickHouse
lamr02n Nov 28, 2024
2986ea9
Add requirements.monitoring.txt
lamr02n Nov 28, 2024
04199b1
Make Collector write to ClickHouse
lamr02n Nov 28, 2024
150fe7d
Add forbidden field names to logline_handler.py
lamr02n Nov 30, 2024
a501ba2
Fix dns_loglines insertion
lamr02n Nov 30, 2024
63746d7
Add dns_loglines insertion in collector.py
lamr02n Nov 30, 2024
393cc01
Add logline_status insertion in collector.py
lamr02n Nov 30, 2024
db16775
Add logline_timestamps insertion in collector.py
lamr02n Nov 30, 2024
61776ff
Add development query script
lamr02n Nov 30, 2024
a321ebf
Update logline_timestamps insertion in collector.py
lamr02n Nov 30, 2024
9203bc2
Bug fixing
lamr02n Dec 3, 2024
6b0e27c
Add batch_id to Batch object and make Prefilter use Batch
lamr02n Dec 3, 2024
386494a
Add logline_timestamps insertion in batch_handler.py
lamr02n Dec 3, 2024
56787c5
Move Batch datatype to its own file
lamr02n Dec 4, 2024
a333511
Rename clickhouse_batch.py to clickhouse_batch_sender.py
lamr02n Dec 4, 2024
e20f441
Use Marshmallow to transmit data to MonitoringAgent
lamr02n Dec 4, 2024
360ff50
Make BufferedBatch store Batch IDs
lamr02n Dec 5, 2024
0318a92
Add database insertion in BufferedBatch
lamr02n Dec 5, 2024
63dacf9
Switch status field in connectors by is_active
lamr02n Dec 5, 2024
13aab6b
Use correct transactional_id in batch_handler.py
lamr02n Dec 5, 2024
eabde81
Update tests for batch_handler.py
lamr02n Dec 5, 2024
133320f
Update tests and fix bugs
lamr02n Dec 6, 2024
abadc20
Add batch_timestamps insertion in batch_handler.py
lamr02n Dec 6, 2024
468803d
Update test_inspector.py
lamr02n Dec 6, 2024
b181280
Update status field for Collector database insertion
lamr02n Dec 6, 2024
7f9655c
Update Prefilter and add database insertion
lamr02n Dec 6, 2024
e6711db
Bug fix
lamr02n Dec 6, 2024
1a2f2b9
Update Inspector and add database insertion
lamr02n Dec 6, 2024
51188e5
Update Detector and add database insertion
lamr02n Dec 6, 2024
d250383
Update tests and some bug fixing
lamr02n Dec 9, 2024
8d4b345
Add a dev-query module for quick database checks
lamr02n Dec 9, 2024
649641b
Remove batch_status table
lamr02n Dec 10, 2024
a5147bd
Remove logline_status table
lamr02n Dec 10, 2024
5c07ab2
Add alerts and suspicious_batch_timestamps tables
lamr02n Dec 10, 2024
4c2b547
Add alerts connectors
lamr02n Dec 11, 2024
da9528a
Add suspicious_batches_to_batch connectors
lamr02n Dec 11, 2024
2d503eb
Update suspicious_batch_timestamps connectors
lamr02n Dec 11, 2024
4f560c7
Add overall_score and result to Alerts
lamr02n Dec 11, 2024
4b2f508
Add overall_score and result to Alerts
lamr02n Dec 11, 2024
a311d27
Add detector database insertion
lamr02n Dec 11, 2024
27a6e32
Update prefilter debug messages
lamr02n Dec 11, 2024
964e8fe
Update inspector database insertion
lamr02n Dec 11, 2024
1ea7ece
Bug fixing and update all tests
lamr02n Dec 12, 2024
bbaa722
Update requirements
lamr02n Dec 12, 2024
270df8b
Move message_text field in ServerLogs table
lamr02n Dec 13, 2024
fe5f79c
Bug fixes
lamr02n Dec 13, 2024
20de508
Restructure and update tests
lamr02n Dec 13, 2024
ad612eb
Fix test_coverage.yml workflow to discover all tests
lamr02n Dec 13, 2024
2c3f602
Updating MINGW64 install
stefanDeveloper Dec 13, 2024
c74ed70
Fix packages of mingw
stefanDeveloper Dec 13, 2024
9510e5c
Add Dockerfile.monitoring
lamr02n Dec 13, 2024
68a7a74
Rename dev-query.py to query.dev.py
lamr02n Dec 13, 2024
07cb41f
Rename dev-query.py to query.dev.py
lamr02n Dec 13, 2024
d0ee5f9
Fix bugs
lamr02n Dec 13, 2024
1d66567
Fix test for Prefilter
lamr02n Dec 13, 2024
27c061f
Update test for Inspector
lamr02n Dec 15, 2024
18b8e30
Add docstrings for clickhouse_kafka_sender.py
lamr02n Dec 17, 2024
35aa1ea
Move monitoring related docker compose entries to own file
lamr02n Dec 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test_coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ jobs:

- name: Test
run: |
python -m coverage run -m unittest
python -m coverage run -m unittest discover tests
python -m coverage xml

- name: Get Coverage
Expand Down
9 changes: 8 additions & 1 deletion config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ pipeline:
log_storage:
logserver:
input_file: "/opt/file.txt"
max_number_of_connections: 1000

log_collection:
collector:
Expand Down Expand Up @@ -64,6 +63,11 @@ pipeline:
base_url: https://heibox.uni-heidelberg.de/d/0d5cbcbe16cd46a58021/
threshold: 0.5

monitoring:
clickhouse_connector:
batch_size: 10000
batch_timeout: 2.0

environment:
timestamp_format: "%Y-%m-%dT%H:%M:%S.%fZ"
kafka_brokers:
Expand All @@ -80,3 +84,6 @@ environment:
batch_sender_to_prefilter: "pipeline.batch_sender_to_prefilter"
prefilter_to_inspector: "pipeline.prefilter_to_inspector"
inspector_to_detector: "pipeline.inspector_to_detector"
monitoring:
clickhouse_server:
hostname: 172.27.0.11
51 changes: 51 additions & 0 deletions docker/dev-query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import datetime
import os
import sys
import uuid

import clickhouse_connect

sys.path.append(os.getcwd())
from src.base.data_classes.clickhouse_connectors import TABLE_NAME_TO_TYPE


def get_tables():
tables = {}

for table_name in TABLE_NAME_TO_TYPE:
tables[table_name] = []

return tables


def query_once(client, tables):
for table_name in tables.keys():
tables[table_name] = client.query(f"SELECT * FROM {table_name};")

return tables


def main():
client = clickhouse_connect.get_client(host="172.27.0.11", port=8123)
tables = get_tables()

client.insert(
"server_logs",
[[uuid.uuid4(), datetime.datetime.now(), "This is a logline"]],
["message_id", "timestamp_in", "message_text"],
)

results = query_once(client, tables)

for key in results:
print(f"'{key}':")

if results[key].result_rows:
for row in results[key].result_rows:
print("\t", row)
else:
print("\t -")


if __name__ == "__main__":
main()
56 changes: 56 additions & 0 deletions docker/docker-compose.dev-query.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
include:
- "docker-compose.kafka.yml"

services:
sandbox:
build:
context: ..
dockerfile: docker/dockerfiles/Dockerfile.dev-query
network: host
depends_on:
kafka1:
condition: service_healthy
kafka2:
condition: service_healthy
kafka3:
condition: service_healthy
networks:
heidgaf:
ipv4_address: 172.27.0.100
memswap_limit: 768m
deploy:
resources:
limits:
cpus: '2'
memory: 512m
reservations:
cpus: '1'
memory: 256m
volumes:
- "${MOUNT_PATH:?MOUNT_PATH not set}:/opt/file.txt"

clickhouse-server:
image: clickhouse/clickhouse-server:24.3.12.75-alpine
container_name: clickhouse-server
networks:
heidgaf:
ipv4_address: 172.27.0.11
restart: "unless-stopped"
ports:
- "8123:8123"
- "9000:9000"
healthcheck:
test: [ "CMD-SHELL", "nc -z 127.0.0.1 8123" ]
interval: 10s
timeout: 5s
retries: 3


networks:
heidgaf:
driver: bridge
ipam:
driver: default
config:
- subnet: 172.27.0.0/16
gateway: 172.27.0.1
19 changes: 19 additions & 0 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,25 @@ services:
timeout: 5s
retries: 3

monitoring_agent:
build:
context: ..
dockerfile: docker/dockerfiles/Dockerfile.monitoring
network: host
restart: "unless-stopped"
depends_on:
kafka1:
condition: service_healthy
kafka2:
condition: service_healthy
kafka3:
condition: service_healthy
clickhouse-server:
condition: service_healthy
networks:
heidgaf:
ipv4_address: 172.27.0.12

networks:
heidgaf:
driver: bridge
Expand Down
15 changes: 15 additions & 0 deletions docker/dockerfiles/Dockerfile.dev-query
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
FROM python:3.11-slim-bookworm

ENV PYTHONDONTWRITEBYTECODE=1

WORKDIR /usr/src/app

RUN pip --disable-pip-version-check install --no-cache-dir --no-compile clickhouse_connect marshmallow_dataclass colorlog pyYAML confluent_kafka

COPY src/base ./src/base
COPY config.yaml .
COPY docker/dev-query.py .

RUN rm -rf /root/.cache

CMD [ "python", "dev-query.py"]
1 change: 1 addition & 0 deletions requirements/requirements.detector.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ colorlog~=6.8.2
PyYAML~=6.0.1
confluent-kafka~=2.4.0
marshmallow_dataclass~=8.7.1
clickhouse_connect~=0.8.3
1 change: 1 addition & 0 deletions requirements/requirements.inspector.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ colorlog~=6.8.2
streamad~=0.3.1
numpy~=1.26.4
marshmallow_dataclass~=8.7.1
clickhouse_connect~=0.8.3
1 change: 1 addition & 0 deletions requirements/requirements.logcollector.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ PyYAML~=6.0.1
colorlog~=6.8.2
confluent-kafka~=2.4.0
marshmallow_dataclass~=8.7.1
clickhouse_connect~=0.8.3
1 change: 1 addition & 0 deletions requirements/requirements.logserver.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ colorlog~=6.8.2
confluent-kafka~=2.4.0
marshmallow_dataclass~=8.7.1
aiofiles~=24.1.0
clickhouse_connect~=0.8.3
5 changes: 5 additions & 0 deletions requirements/requirements.monitoring.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
clickhouse_connect~=0.8.3
confluent-kafka~=2.4.0
marshmallow_dataclass~=8.7.1
colorlog~=6.8.2
PyYAML~=6.0.1
1 change: 1 addition & 0 deletions requirements/requirements.prefilter.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ PyYAML~=6.0.1
colorlog~=6.8.2
confluent-kafka~=2.4.0
marshmallow_dataclass~=8.7.1
clickhouse_connect~=0.8.3
19 changes: 0 additions & 19 deletions src/base/__init__.py
Original file line number Diff line number Diff line change
@@ -1,19 +0,0 @@
from typing import List
from dataclasses import dataclass, field
import marshmallow.validate
import datetime


@dataclass
class Batch:
begin_timestamp: datetime.datetime = field(
metadata={
"marshmallow_field": marshmallow.fields.DateTime("%Y-%m-%dT%H:%M:%S.%fZ")
}
)
end_timestamp: datetime.datetime = field(
metadata={
"marshmallow_field": marshmallow.fields.DateTime("%Y-%m-%dT%H:%M:%S.%fZ")
}
)
data: List[dict] = field(default_factory=list)
26 changes: 26 additions & 0 deletions src/base/clickhouse_kafka_sender.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import os
import sys

import marshmallow_dataclass

sys.path.append(os.getcwd())
from src.base.data_classes.clickhouse_connectors import TABLE_NAME_TO_TYPE
from src.base.kafka_handler import SimpleKafkaProduceHandler
from src.base.log_config import get_logger

logger = get_logger()


class ClickHouseKafkaSender:
def __init__(self, table_name: str):
self.table_name = table_name
self.kafka_producer = SimpleKafkaProduceHandler()
self.data_schema = marshmallow_dataclass.class_schema(
TABLE_NAME_TO_TYPE.get(table_name)
)()

def insert(self, data: dict):
self.kafka_producer.produce(
topic=f"clickhouse_{self.table_name}",
data=self.data_schema.dumps(data),
)
24 changes: 24 additions & 0 deletions src/base/data_classes/batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import datetime
import uuid
from dataclasses import dataclass, field
from typing import List

import marshmallow.validate


@dataclass
class Batch:
batch_id: uuid.UUID = field(
metadata={"marshmallow_field": marshmallow.fields.UUID()}
)
begin_timestamp: datetime.datetime = field(
metadata={
"marshmallow_field": marshmallow.fields.DateTime("%Y-%m-%dT%H:%M:%S.%fZ")
}
)
end_timestamp: datetime.datetime = field(
metadata={
"marshmallow_field": marshmallow.fields.DateTime("%Y-%m-%dT%H:%M:%S.%fZ")
}
)
data: List = field(default_factory=list)
Loading
Loading