From 44ba7a6975c7c1a64f1fd3e89dbbb7978ac97971 Mon Sep 17 00:00:00 2001 From: enjoy-binbin Date: Sat, 24 Apr 2021 14:36:30 +0800 Subject: [PATCH] add discard_logs_on_reconnect_error in asyncsender Signed-off-by: enjoy-binbin --- .gitignore | 2 ++ fluent/asyncsender.py | 32 ++++++++++++++++++++++++++------ tests/test_asyncsender.py | 21 ++++++++++++++++++++- 3 files changed, 48 insertions(+), 7 deletions(-) diff --git a/.gitignore b/.gitignore index fd4bc6c..09ad4d2 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,5 @@ /build /dist .idea/ +venv/ +env/ \ No newline at end of file diff --git a/fluent/asyncsender.py b/fluent/asyncsender.py index 24c6924..25623c7 100644 --- a/fluent/asyncsender.py +++ b/fluent/asyncsender.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- +import socket import threading from queue import Queue, Full, Empty @@ -50,9 +51,11 @@ def __init__(self, queue_maxsize=DEFAULT_QUEUE_MAXSIZE, queue_circular=DEFAULT_QUEUE_CIRCULAR, queue_overflow_handler=None, + discard_logs_on_reconnect_error=False, **kwargs): """ :param kwargs: This kwargs argument is not used in __init__. This will be removed in the next major version. + :discard_logs_on_reconnect_error: When set to true, will discard logs when reconnect error is reported. """ super(FluentSender, self).__init__(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout, verbose=verbose, buffer_overflow_handler=buffer_overflow_handler, @@ -66,6 +69,8 @@ def __init__(self, else: self._queue_overflow_handler = self._queue_overflow_handler_default + self._discard_logs_on_reconnect_error = discard_logs_on_reconnect_error + self._thread_guard = threading.Event() # This ensures visibility across all variables self._closed = False @@ -81,11 +86,7 @@ def close(self, flush=True): return self._closed = True if not flush: - while True: - try: - self._queue.get(block=False) - except Empty: - break + self._clear_queue() self._queue.put(_TOMBSTONE) self._send_thread.join() @@ -101,6 +102,13 @@ def queue_blocking(self): def queue_circular(self): return self._queue_circular + def _clear_queue(self): + while True: + try: + self._queue.get(block=False) + except Empty: + break + def _send(self, bytes_): with self.lock: if self._closed: @@ -120,8 +128,20 @@ def _send(self, bytes_): return True + def _send_internal(self, bytes_): + send_internal_result = super(FluentSender, self)._send_internal(bytes_) + if send_internal_result is False: + # when send_result is False, super() caught socket.error + # and assigned the error to self.last_error + if self._discard_logs_on_reconnect_error is True and isinstance(self.last_error, socket.gaierror): + # clear the queue to avoid blocking and print the log + self._clear_queue() + print("%s. Please check address: (%s, %s)" % (str(self.last_error), self.host, self.port)) + + return send_internal_result + def _send_loop(self): - send_internal = super(FluentSender, self)._send_internal + send_internal = self._send_internal try: while True: diff --git a/tests/test_asyncsender.py b/tests/test_asyncsender.py index eb36f96..3c0650a 100644 --- a/tests/test_asyncsender.py +++ b/tests/test_asyncsender.py @@ -330,7 +330,6 @@ def test_simple(self): class TestSenderUnlimitedSize(unittest.TestCase): - Q_SIZE = 3 def setUp(self): super(TestSenderUnlimitedSize, self).setUp() @@ -374,3 +373,23 @@ def test_simple(self): eq(3, len(el)) eq("test.foo{}".format(NUM), el[0]) eq({'bar': "baz{}".format(NUM)}, el[2]) + + +class TestSenderSocketGaierror(unittest.TestCase): + def setUp(self): + super(TestSenderSocketGaierror, self).setUp() + self._sender = fluent.asyncsender.FluentSender(host="localhost_error", + tag="test", + discard_logs_on_reconnect_error=True) + + def tearDown(self): + self._sender.close() + + def test_simple(self): + with self._sender as sender: + for _ in range(100): + sender._queue.put(b"1") + + sender._queue.put(fluent.asyncsender._TOMBSTONE) + + self.assertEqual(self._sender._queue.qsize(), 0)