Skip to content

Commit

Permalink
[DPE-2366][DPE-2365] HA tests (#129)
Browse files Browse the repository at this point in the history
* add two cluster ha test

* restructure tests pipeline

* add continuous writes structure

* add delay to restart
  • Loading branch information
zmraul committed Oct 17, 2023
1 parent db2a2b4 commit b775ea4
Show file tree
Hide file tree
Showing 13 changed files with 503 additions and 190 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ jobs:
fail-fast: false
matrix:
tox-environments:
- integration-ha-ha
- integration-ha
name: ${{ matrix.tox-environments }}
needs:
- lint
Expand Down
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ __pycache__/
*.py[cod]
.vscode
.idea
.python-version
.python-version
last_written_value
1 change: 1 addition & 0 deletions last_written_value
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
20052,0
30 changes: 25 additions & 5 deletions lib/charms/kafka/v0/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def on_kafka_relation_created(self, event: RelationCreatedEvent):

from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer
from kafka.admin import NewTopic
from kafka.errors import KafkaError

logger = logging.getLogger(__name__)
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
Expand All @@ -93,7 +94,7 @@ def on_kafka_relation_created(self, event: RelationCreatedEvent):

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 1
LIBPATCH = 2


class KafkaClient:
Expand Down Expand Up @@ -126,6 +127,7 @@ def __init__(
self.mtls = self.security_protocol == "SSL"

self._subscription = None
self._consumer_group_prefix = None

@cached_property
def _admin_client(self) -> KafkaAdminClient:
Expand Down Expand Up @@ -174,7 +176,7 @@ def _consumer_client(self) -> KafkaConsumer:
ssl_certfile=self.certfile_path if self.ssl else None,
ssl_keyfile=self.keyfile_path if self.mtls else None,
api_version=KafkaClient.API_VERSION if self.mtls else None,
group_id=self._consumer_group_prefix or None,
group_id=self._consumer_group_prefix,
enable_auto_commit=True,
auto_offset_reset="earliest",
consumer_timeout_ms=15000,
Expand All @@ -188,10 +190,17 @@ def create_topic(self, topic: NewTopic) -> None:
Args:
topic: the configuration of the topic to create
"""
self._admin_client.create_topics(new_topics=[topic], validate_only=False)

def delete_topics(self, topics: list[str]) -> None:
"""Deletes a topic.
Args:
topics: list of topics to delete
"""
self._admin_client.delete_topics(topics=topics)

def subscribe_to_topic(
self, topic_name: str, consumer_group_prefix: Optional[str] = None
) -> None:
Expand Down Expand Up @@ -240,8 +249,19 @@ def produce_message(self, topic_name: str, message_content: str) -> None:
"""
item_content = f"Message #{message_content}"
future = self._producer_client.send(topic_name, str.encode(item_content))
future.get(timeout=60)
logger.info(f"Message published to topic={topic_name}, message content: {item_content}")
try:
future.get(timeout=60)
logger.info(
f"Message published to topic={topic_name}, message content: {item_content}"
)
except KafkaError as e:
logger.error(f"Error producing message {message_content} to topic {topic_name}: {e}")

def close(self) -> None:
"""Close the connection to the client."""
self._admin_client.close()
self._producer_client.close()
self._consumer_client.close()


if __name__ == "__main__":
Expand Down
2 changes: 2 additions & 0 deletions tests/integration/ha/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.
220 changes: 220 additions & 0 deletions tests/integration/ha/continuous_writes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
#!/usr/bin/env python3
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.
import asyncio
import logging
import os
from multiprocessing import Event, Process, Queue
from types import SimpleNamespace

from charms.kafka.v0.client import KafkaClient
from kafka.admin import NewTopic
from kafka.errors import KafkaTimeoutError
from pytest_operator.plugin import OpsTest
from tenacity import (
RetryError,
Retrying,
retry,
stop_after_attempt,
stop_after_delay,
wait_fixed,
wait_random,
)

from integration.helpers import DUMMY_NAME, get_provider_data

logger = logging.getLogger(__name__)


class ContinuousWrites:
"""Utility class for managing continuous writes."""

TOPIC_NAME = "ha-test-topic"
LAST_WRITTEN_VAL_PATH = "last_written_value"

def __init__(self, ops_test: OpsTest, app: str):
self._ops_test = ops_test
self._app = app
self._is_stopped = True
self._event = None
self._queue = None
self._process = None

@retry(
wait=wait_fixed(wait=5) + wait_random(0, 5),
stop=stop_after_attempt(5),
)
def start(self) -> None:
"""Run continuous writes in the background."""
if not self._is_stopped:
self.clear()

# create topic
self._create_replicated_topic()

# create process
self._create_process()

# pass the model full name to the process once it starts
self.update()

# start writes
self._process.start()

def update(self):
"""Update cluster related conf. Useful in cases such as scaling, pwd change etc."""
self._queue.put(SimpleNamespace(model_full_name=self._ops_test.model_full_name))

@retry(
wait=wait_fixed(wait=5) + wait_random(0, 5),
stop=stop_after_attempt(5),
)
def clear(self) -> None:
"""Stop writes and delete the topic."""
if not self._is_stopped:
self.stop()

client = self._client()
try:
client.delete_topics(topics=[self.TOPIC_NAME])
finally:
client.close()

def consumed_messages(self) -> list | None:
"""Consume the messages in the topic."""
client = self._client()
try:
for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(5)):
with attempt:
client.subscribe_to_topic(topic_name=self.TOPIC_NAME)
# FIXME: loading whole list of consumed messages into memory might not be the best idea
return list(client.messages())
except RetryError:
return []
finally:
client.close()

def _create_replicated_topic(self):
"""Create topic with replication_factor = 3."""
client = self._client()
topic_config = NewTopic(
name=self.TOPIC_NAME,
num_partitions=1,
replication_factor=3,
)
client.create_topic(topic=topic_config)

@retry(
wait=wait_fixed(wait=5) + wait_random(0, 5),
stop=stop_after_attempt(5),
)
def stop(self) -> SimpleNamespace:
"""Stop the continuous writes process and return max inserted ID."""
if not self._is_stopped:
self._stop_process()

result = SimpleNamespace()

# messages count
consumed_messages = self.consumed_messages()
result.count = len(consumed_messages)
result.last_message = consumed_messages[-1]

# last expected message stored on disk
try:
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(5)):
with attempt:
with open(ContinuousWrites.LAST_WRITTEN_VAL_PATH, "r") as f:
result.last_expected_message, result.lost_messages = (
f.read().rstrip().split(",", maxsplit=2)
)
except RetryError:
result.last_expected_message = result.lost_messages = -1

return result

def _create_process(self):
self._is_stopped = False
self._event = Event()
self._queue = Queue()
self._process = Process(
target=ContinuousWrites._run_async,
name="continuous_writes",
args=(self._event, self._queue, 0),
)

def _stop_process(self):
self._event.set()
self._process.join()
self._queue.close()
self._is_stopped = True

def _client(self):
"""Build a Kafka client."""
relation_data = get_provider_data(
unit_name=f"{DUMMY_NAME}/0",
model_full_name=self._ops_test.model_full_name,
endpoint="kafka-client-admin",
)
return KafkaClient(
servers=relation_data["endpoints"].split(","),
username=relation_data["username"],
password=relation_data["password"],
security_protocol="SASL_PLAINTEXT",
)

@staticmethod
async def _run(event: Event, data_queue: Queue, starting_number: int) -> None: # noqa: C901
"""Continuous writing."""
initial_data = data_queue.get(True)

def _client():
"""Build a Kafka client."""
relation_data = get_provider_data(
unit_name=f"{DUMMY_NAME}/0",
model_full_name=initial_data.model_full_name,
endpoint="kafka-client-admin",
)
return KafkaClient(
servers=relation_data["endpoints"].split(","),
username=relation_data["username"],
password=relation_data["password"],
security_protocol="SASL_PLAINTEXT",
)

write_value = starting_number
lost_messages = 0
client = _client()

while True:
if not data_queue.empty(): # currently evaluates to false as we don't make updates
data_queue.get(False)
client.close()
client = _client()

try:
client.produce_message(
topic_name=ContinuousWrites.TOPIC_NAME, message_content=str(write_value)
)
except KafkaTimeoutError:
client.close()
client = _client()
lost_messages += 1
finally:
# process termination requested
if event.is_set():
break

write_value += 1

# write last expected written value on disk
with open(ContinuousWrites.LAST_WRITTEN_VAL_PATH, "w") as f:
f.write(f"{str(write_value)},{str(lost_messages)}")
os.fsync(f)

client.close()

@staticmethod
def _run_async(event: Event, data_queue: Queue, starting_number: int):
"""Run async code."""
asyncio.run(ContinuousWrites._run(event, data_queue, starting_number))
Loading

0 comments on commit b775ea4

Please sign in to comment.