Skip to content

Commit

Permalink
update tests for batch_handler.py
Browse files Browse the repository at this point in the history
  • Loading branch information
lamr02n committed Jun 12, 2024
1 parent a335786 commit 1800ffc
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 51 deletions.
2 changes: 0 additions & 2 deletions heidgaf_core/batch_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@


class KafkaBatchSender:
# TODO: Test
def __init__(self, topic: str):
self.topic = topic
self.messages = []
self.lock = Lock()
self.timer = None
self.kafka_produce_handler = KafkaProduceHandler()

# TODO: Test
def _send_batch(self):
with self.lock:
if self.messages:
Expand Down
98 changes: 49 additions & 49 deletions tests/test_batch_handler.py
Original file line number Diff line number Diff line change
@@ -1,59 +1,59 @@
import json
import unittest
from unittest.mock import patch, MagicMock

from heidgaf_core.batch_handler import KafkaBatchSender
from heidgaf_core.config import *


# class TestInit(unittest.TestCase):
# def test_init(self):
# sender_instance = KafkaBatchSender(topic="test_topic")
#
# self.assertEqual(sender_instance.topic, "test_topic")
# self.assertEqual(sender_instance.messages, [])
# self.assertIsInstance(sender_instance.lock, type(Lock()))
# self.assertIsNone(sender_instance.timer)
# self.assertEqual(
# {'bootstrap.servers': f"{KAFKA_BROKER_HOST}:{KAFKA_BROKER_PORT}"},
# sender_instance.conf,
# )


# class TestSendBatch(unittest.TestCase):
# @patch('heidgaf_log_collector.batch_handler.Producer')
# def test_send_batch(self, mock_producer):
# mock_producer_instance = MagicMock()
# mock_producer.return_value = mock_producer_instance
# sender_instance = KafkaBatchSender(topic="test_topic")
# sender_instance._reset_timer = MagicMock()
# sender_instance.kafka_producer = mock_producer_instance
#
# sender_instance.messages = ["message1", "message2"]
#
# sender_instance._send_batch()
#
# mock_producer_instance.produce.assert_called_once_with(
# topic="test_topic",
# key=None,
# value=b'["message1", "message2"]',
# callback=utils.kafka_delivery_report,
# )
#
# mock_producer_instance.flush.assert_called_once()
# sender_instance._reset_timer.assert_called_once()
# self.assertEqual(sender_instance.messages, [])
#
# def test_send_batch_no_producer(self):
# sender_instance = KafkaBatchSender(topic="test_topic")
# sender_instance._reset_timer = MagicMock()
# sender_instance.kafka_producer = None
#
# sender_instance.messages = ["message1", "message2"]
#
# sender_instance._send_batch()
#
# sender_instance._reset_timer.assert_not_called()
# self.assertEqual(sender_instance.messages, ["message1", "message2"])
class TestInit(unittest.TestCase):
@patch('heidgaf_core.batch_handler.KafkaProduceHandler')
@patch('heidgaf_core.batch_handler.Lock')
def test_init(self, mock_lock, mock_kafka_produce_handler):
mock_lock_instance = MagicMock()
mock_lock.return_value = mock_lock_instance
mock_handler_instance = MagicMock()
mock_kafka_produce_handler.return_value = mock_handler_instance
sender_instance = KafkaBatchSender(topic="test_topic")

self.assertEqual(sender_instance.topic, "test_topic")
self.assertEqual(sender_instance.messages, [])
self.assertIsNone(sender_instance.timer)
mock_lock.assert_called_once()
mock_kafka_produce_handler.assert_called_once()
self.assertEqual(mock_handler_instance, sender_instance.kafka_produce_handler)
self.assertEqual(mock_lock_instance, sender_instance.lock)


class TestSendBatch(unittest.TestCase):
@patch('heidgaf_core.batch_handler.KafkaProduceHandler')
def test_send_batch_with_messages(self, mock_kafka_produce_handler):
sender_instance = KafkaBatchSender(topic="test_topic")
sender_instance._reset_timer = MagicMock()
mock_handler_instance = mock_kafka_produce_handler.return_value
mock_send = mock_handler_instance.send

sender_instance.messages = ["message1", "message2"]

sender_instance._send_batch()

sender_instance._reset_timer.assert_called_once()
self.assertEqual(sender_instance.messages, [])
mock_send.assert_called_once_with(
topic="test_topic",
data=json.dumps(["message1", "message2"])
)

def test_send_batch_without_messages(self):
sender_instance = KafkaBatchSender(topic="test_topic")
sender_instance._reset_timer = MagicMock()

sender_instance.messages = []

sender_instance._send_batch()

sender_instance._reset_timer.assert_called_once()
self.assertEqual(sender_instance.messages, [])


class TestClose(unittest.TestCase):
Expand Down

0 comments on commit 1800ffc

Please sign in to comment.