Skip to content

Commit

Permalink
Update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
lamr02n committed Nov 23, 2024
1 parent f6fbabe commit da31bfb
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 45 deletions.
6 changes: 3 additions & 3 deletions src/base/kafka_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def produce(self, *args, **kwargs):
"""
Encodes the given data for transport and sends it on the specified topic.
"""
pass
raise NotImplementedError

Check warning on line 77 in src/base/kafka_handler.py

View check run for this annotation

Codecov / codecov/patch

src/base/kafka_handler.py#L77

Added line #L77 was not covered by tests

def __del__(self) -> None:
self.producer.flush()
Expand Down Expand Up @@ -237,7 +237,7 @@ def consume(self, *args, **kwargs):
"""
Consumes available messages on the specified topic and decodes it.
"""
pass
raise NotImplementedError

def consume_as_json(self) -> tuple[None | str, dict]:
"""
Expand Down Expand Up @@ -324,7 +324,7 @@ def consume(self) -> tuple[str | None, str | None, str | None]:
except KeyboardInterrupt:
logger.info("Stopping KafkaConsumeHandler...")

Check warning on line 325 in src/base/kafka_handler.py

View check run for this annotation

Codecov / codecov/patch

src/base/kafka_handler.py#L325

Added line #L325 was not covered by tests
raise KeyboardInterrupt
except Exception as e:
except Exception:

Check warning on line 327 in src/base/kafka_handler.py

View check run for this annotation

Codecov / codecov/patch

src/base/kafka_handler.py#L327

Added line #L327 was not covered by tests
raise


Expand Down
10 changes: 8 additions & 2 deletions src/logcollector/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,18 @@ async def start(self) -> None:
f" ⤷ receiving on Kafka topic '{CONSUME_TOPIC}'"
)

task_fetch = asyncio.Task(self.fetch())
task_send = asyncio.Task(self.send())

try:
await asyncio.gather(
self.fetch(),
self.send(),
task_fetch,
task_send,
)
except KeyboardInterrupt:
task_fetch.cancel()
task_send.cancel()

logger.info("LogCollector stopped.")

async def fetch(self) -> None:
Expand Down
13 changes: 10 additions & 3 deletions src/logserver/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,19 @@ async def start(self) -> None:
f" ⤷ sending on Kafka topic '{PRODUCE_TOPIC}'"
)

task_fetch_kafka = asyncio.Task(self.fetch_from_kafka())
task_fetch_file = asyncio.Task(self.fetch_from_file())

try:
await asyncio.gather(
self.fetch_from_kafka(),
self.fetch_from_file(),
task = asyncio.gather(
task_fetch_kafka,
task_fetch_file,
)
await task
except KeyboardInterrupt:
task_fetch_kafka.cancel()
task_fetch_file.cancel()

logger.info("LogServer stopped.")

def send(self, message: str) -> None:
Expand Down
35 changes: 18 additions & 17 deletions tests/test_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,23 +65,24 @@ async def mock_gather(*args, **kwargs):
self.sut.fetch.assert_called_once()
self.sut.send.assert_called_once()

# TODO: Update
# async def test_start_handles_keyboard_interrupt(self):
# # Arrange
# self.sut.fetch = AsyncMock()
# self.sut.send = AsyncMock()
#
# async def mock_gather(*args, **kwargs):
# raise KeyboardInterrupt
#
# with (patch('src.logcollector.collector.asyncio.gather', side_effect=mock_gather) as mock):
# # Act
# await self.sut.start()
#
# # Assert
# mock.assert_called_once()
# self.sut.fetch.assert_called_once()
# self.sut.send.assert_called_once()
async def test_start_handles_keyboard_interrupt(self):
# Arrange
self.sut.fetch = AsyncMock()
self.sut.send = AsyncMock()

async def mock_gather(*args, **kwargs):
raise KeyboardInterrupt

with patch(
"src.logcollector.collector.asyncio.gather", side_effect=mock_gather
) as mock:
# Act
await self.sut.start()

# Assert
mock.assert_called_once()
self.sut.fetch.assert_called_once()
self.sut.send.assert_called_once()


class TestFetch(unittest.IsolatedAsyncioTestCase):
Expand Down
121 changes: 121 additions & 0 deletions tests/test_kafka_consume_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import unittest
from unittest.mock import patch, MagicMock

from src.base.kafka_handler import KafkaConsumeHandler


class TestInit(unittest.TestCase):
@patch("src.base.kafka_handler.CONSUMER_GROUP_ID", "test_group_id")
@patch(
"src.base.kafka_handler.KAFKA_BROKERS",
[
{
"hostname": "127.0.0.1",
"port": 9999,
},
{
"hostname": "127.0.0.2",
"port": 9998,
},
{
"hostname": "127.0.0.3",
"port": 9997,
},
],
)
@patch("src.base.kafka_handler.Consumer")
def test_init_successful(self, mock_consumer):
# Arrange
mock_consumer_instance = MagicMock()
mock_consumer.return_value = mock_consumer_instance

expected_conf = {
"bootstrap.servers": "127.0.0.1:9999,127.0.0.2:9998,127.0.0.3:9997",
"group.id": "test_group_id",
"enable.auto.commit": False,
"auto.offset.reset": "earliest",
"enable.partition.eof": True,
}

# Act
sut = KafkaConsumeHandler(topics="test_topic")

# Assert
self.assertEqual(mock_consumer_instance, sut.consumer)

mock_consumer.assert_called_once_with(expected_conf)
mock_consumer_instance.assign.assert_called_once()

@patch("src.base.kafka_handler.CONSUMER_GROUP_ID", "test_group_id")
@patch(
"src.base.kafka_handler.KAFKA_BROKERS",
[
{
"hostname": "127.0.0.1",
"port": 9999,
},
{
"hostname": "127.0.0.2",
"port": 9998,
},
{
"hostname": "127.0.0.3",
"port": 9997,
},
],
)
@patch("src.base.kafka_handler.Consumer")
def test_init_successful_with_list(self, mock_consumer):
# Arrange
mock_consumer_instance = MagicMock()
mock_consumer.return_value = mock_consumer_instance

expected_conf = {
"bootstrap.servers": "127.0.0.1:9999,127.0.0.2:9998,127.0.0.3:9997",
"group.id": "test_group_id",
"enable.auto.commit": False,
"auto.offset.reset": "earliest",
"enable.partition.eof": True,
}

# Act
sut = KafkaConsumeHandler(topics=["test_topic_1", "test_topic_2"])

# Assert
self.assertEqual(mock_consumer_instance, sut.consumer)

mock_consumer.assert_called_once_with(expected_conf)
mock_consumer_instance.assign.assert_called_once()


class TestConsume(unittest.TestCase):
@patch("src.base.kafka_handler.CONSUMER_GROUP_ID", "test_group_id")
@patch(
"src.base.kafka_handler.KAFKA_BROKERS",
[
{
"hostname": "127.0.0.1",
"port": 9999,
},
{
"hostname": "127.0.0.2",
"port": 9998,
},
{
"hostname": "127.0.0.3",
"port": 9997,
},
],
)
@patch("src.base.kafka_handler.Consumer")
def test_not_implemented(self, mock_consumer):
# Arrange
sut = KafkaConsumeHandler(topics="test_topic")

# Act and Assert
with self.assertRaises(NotImplementedError):
sut.consume()


if __name__ == "__main__":
unittest.main()
41 changes: 21 additions & 20 deletions tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,26 +54,27 @@ async def test_start(
mock_fetch_from_kafka.assert_called_once()
mock_fetch_from_file.assert_called_once()

# TODO: Update
# @patch("src.logserver.server.LogServer.fetch_from_kafka")
# @patch("src.logserver.server.LogServer.fetch_from_file")
# async def test_start_handles_keyboard_interrupt(
# self,
# mock_fetch_from_file,
# mock_fetch_from_kafka,
# ):
# # Arrange
# async def mock_gather(*args, **kwargs):
# raise KeyboardInterrupt
#
# with (patch('src.logserver.server.asyncio.gather', side_effect=mock_gather) as mock):
# # Act
# await self.sut.start()
#
# # Assert
# mock.assert_called_once()
# mock_fetch_from_kafka.assert_called_once()
# mock_fetch_from_file.assert_called_once()
@patch("src.logserver.server.LogServer.fetch_from_kafka")
@patch("src.logserver.server.LogServer.fetch_from_file")
async def test_start_handles_keyboard_interrupt(
self,
mock_fetch_from_file,
mock_fetch_from_kafka,
):
# Arrange
async def mock_gather(*args, **kwargs):
raise KeyboardInterrupt

with patch(
"src.logserver.server.asyncio.gather", side_effect=mock_gather
) as mock:
# Act
await self.sut.start()

# Assert
mock.assert_called_once()
mock_fetch_from_kafka.assert_called_once()
mock_fetch_from_file.assert_called_once()


class TestSend(unittest.TestCase):
Expand Down

0 comments on commit da31bfb

Please sign in to comment.