Skip to content

Commit

Permalink
add discard_logs_on_reconnect_error in asyncsender
Browse files Browse the repository at this point in the history
Signed-off-by: enjoy-binbin <[email protected]>
  • Loading branch information
enjoy-binbin committed Apr 24, 2021
1 parent ace80f4 commit 44ba7a6
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 7 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@
/build
/dist
.idea/
venv/
env/
32 changes: 26 additions & 6 deletions fluent/asyncsender.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-

import socket
import threading
from queue import Queue, Full, Empty

Expand Down Expand Up @@ -50,9 +51,11 @@ def __init__(self,
queue_maxsize=DEFAULT_QUEUE_MAXSIZE,
queue_circular=DEFAULT_QUEUE_CIRCULAR,
queue_overflow_handler=None,
discard_logs_on_reconnect_error=False,
**kwargs):
"""
:param kwargs: This kwargs argument is not used in __init__. This will be removed in the next major version.
:discard_logs_on_reconnect_error: When set to true, will discard logs when reconnect error is reported.
"""
super(FluentSender, self).__init__(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
Expand All @@ -66,6 +69,8 @@ def __init__(self,
else:
self._queue_overflow_handler = self._queue_overflow_handler_default

self._discard_logs_on_reconnect_error = discard_logs_on_reconnect_error

self._thread_guard = threading.Event() # This ensures visibility across all variables
self._closed = False

Expand All @@ -81,11 +86,7 @@ def close(self, flush=True):
return
self._closed = True
if not flush:
while True:
try:
self._queue.get(block=False)
except Empty:
break
self._clear_queue()
self._queue.put(_TOMBSTONE)
self._send_thread.join()

Expand All @@ -101,6 +102,13 @@ def queue_blocking(self):
def queue_circular(self):
return self._queue_circular

def _clear_queue(self):
while True:
try:
self._queue.get(block=False)
except Empty:
break

def _send(self, bytes_):
with self.lock:
if self._closed:
Expand All @@ -120,8 +128,20 @@ def _send(self, bytes_):

return True

def _send_internal(self, bytes_):
send_internal_result = super(FluentSender, self)._send_internal(bytes_)
if send_internal_result is False:
# when send_result is False, super() caught socket.error
# and assigned the error to self.last_error
if self._discard_logs_on_reconnect_error is True and isinstance(self.last_error, socket.gaierror):
# clear the queue to avoid blocking and print the log
self._clear_queue()
print("%s. Please check address: (%s, %s)" % (str(self.last_error), self.host, self.port))

return send_internal_result

def _send_loop(self):
send_internal = super(FluentSender, self)._send_internal
send_internal = self._send_internal

try:
while True:
Expand Down
21 changes: 20 additions & 1 deletion tests/test_asyncsender.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,6 @@ def test_simple(self):


class TestSenderUnlimitedSize(unittest.TestCase):
Q_SIZE = 3

def setUp(self):
super(TestSenderUnlimitedSize, self).setUp()
Expand Down Expand Up @@ -374,3 +373,23 @@ def test_simple(self):
eq(3, len(el))
eq("test.foo{}".format(NUM), el[0])
eq({'bar': "baz{}".format(NUM)}, el[2])


class TestSenderSocketGaierror(unittest.TestCase):
def setUp(self):
super(TestSenderSocketGaierror, self).setUp()
self._sender = fluent.asyncsender.FluentSender(host="localhost_error",
tag="test",
discard_logs_on_reconnect_error=True)

def tearDown(self):
self._sender.close()

def test_simple(self):
with self._sender as sender:
for _ in range(100):
sender._queue.put(b"1")

sender._queue.put(fluent.asyncsender._TOMBSTONE)

self.assertEqual(self._sender._queue.qsize(), 0)

0 comments on commit 44ba7a6

Please sign in to comment.