Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add discard_logs_on_reconnect_error in asyncsender #176

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@
/build
/dist
.idea/
venv/
env/
32 changes: 26 additions & 6 deletions fluent/asyncsender.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-

import socket
import threading
from queue import Queue, Full, Empty

Expand Down Expand Up @@ -50,9 +51,11 @@ def __init__(self,
queue_maxsize=DEFAULT_QUEUE_MAXSIZE,
queue_circular=DEFAULT_QUEUE_CIRCULAR,
queue_overflow_handler=None,
discard_logs_on_reconnect_error=False,
**kwargs):
"""
:param kwargs: This kwargs argument is not used in __init__. This will be removed in the next major version.
:discard_logs_on_reconnect_error: When set to true, will discard logs when reconnect error is reported.
"""
super(FluentSender, self).__init__(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
Expand All @@ -66,6 +69,8 @@ def __init__(self,
else:
self._queue_overflow_handler = self._queue_overflow_handler_default

self._discard_logs_on_reconnect_error = discard_logs_on_reconnect_error

self._thread_guard = threading.Event() # This ensures visibility across all variables
self._closed = False

Expand All @@ -81,11 +86,7 @@ def close(self, flush=True):
return
self._closed = True
if not flush:
while True:
try:
self._queue.get(block=False)
except Empty:
break
self._clear_queue()
self._queue.put(_TOMBSTONE)
self._send_thread.join()

Expand All @@ -101,6 +102,13 @@ def queue_blocking(self):
def queue_circular(self):
return self._queue_circular

def _clear_queue(self):
while True:
try:
self._queue.get(block=False)
except Empty:
break

def _send(self, bytes_):
with self.lock:
if self._closed:
Expand All @@ -120,8 +128,20 @@ def _send(self, bytes_):

return True

def _send_internal(self, bytes_):
send_internal_result = super(FluentSender, self)._send_internal(bytes_)
if send_internal_result is False:
# when send_result is False, super() caught socket.error
# and assigned the error to self.last_error
if self._discard_logs_on_reconnect_error is True and isinstance(self.last_error, socket.gaierror):
# clear the queue to avoid blocking and print the log
self._clear_queue()
print("%s. Please check address: (%s, %s)" % (str(self.last_error), self.host, self.port))

return send_internal_result

def _send_loop(self):
send_internal = super(FluentSender, self)._send_internal
send_internal = self._send_internal

try:
while True:
Expand Down
2 changes: 0 additions & 2 deletions tests/test_asynchandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
except ImportError:
from mock import patch



import fluent.asynchandler
import fluent.handler
from tests import mockserver
Expand Down
21 changes: 20 additions & 1 deletion tests/test_asyncsender.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,6 @@ def test_simple(self):


class TestSenderUnlimitedSize(unittest.TestCase):
Q_SIZE = 3

def setUp(self):
super(TestSenderUnlimitedSize, self).setUp()
Expand Down Expand Up @@ -374,3 +373,23 @@ def test_simple(self):
eq(3, len(el))
eq("test.foo{}".format(NUM), el[0])
eq({'bar': "baz{}".format(NUM)}, el[2])


class TestSenderSocketGaierror(unittest.TestCase):
def setUp(self):
super(TestSenderSocketGaierror, self).setUp()
self._sender = fluent.asyncsender.FluentSender(host="localhost_error",
tag="test",
discard_logs_on_reconnect_error=True)

def tearDown(self):
self._sender.close()

def test_simple(self):
with self._sender as sender:
for _ in range(100):
sender._queue.put(b"1")

sender._queue.put(fluent.asyncsender._TOMBSTONE)

self.assertEqual(self._sender._queue.qsize(), 0)