Skip to content

Commit

Permalink
clear queue when socket.gaierror occurs
Browse files Browse the repository at this point in the history
  • Loading branch information
enjoy-binbin committed Apr 24, 2021
1 parent ace80f4 commit 5a23fd5
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 7 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@
/build
/dist
.idea/
venv/
env/
28 changes: 22 additions & 6 deletions fluent/asyncsender.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-

import socket
import threading
from queue import Queue, Full, Empty

Expand Down Expand Up @@ -81,11 +82,7 @@ def close(self, flush=True):
return
self._closed = True
if not flush:
while True:
try:
self._queue.get(block=False)
except Empty:
break
self._clear_queue()
self._queue.put(_TOMBSTONE)
self._send_thread.join()

Expand All @@ -101,6 +98,13 @@ def queue_blocking(self):
def queue_circular(self):
return self._queue_circular

def _clear_queue(self):
while True:
try:
self._queue.get(block=False)
except Empty:
break

def _send(self, bytes_):
with self.lock:
if self._closed:
Expand All @@ -120,8 +124,20 @@ def _send(self, bytes_):

return True

def _send_internal(self, bytes_):
send_internal_result = super(FluentSender, self)._send_internal(bytes_)
if send_internal_result is False:
# when send_result is False, super() caught socket.error
# and assigned the error to self.last_error
if isinstance(self.last_error, socket.gaierror):
# clear the queue to avoid blocking and print the log
self._clear_queue()
print("%s. Please check address: (%s, %s)" % (str(self.last_error), self.host, self.port))

return send_internal_result

def _send_loop(self):
send_internal = super(FluentSender, self)._send_internal
send_internal = self._send_internal

try:
while True:
Expand Down
19 changes: 18 additions & 1 deletion tests/test_asyncsender.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,6 @@ def test_simple(self):


class TestSenderUnlimitedSize(unittest.TestCase):
Q_SIZE = 3

def setUp(self):
super(TestSenderUnlimitedSize, self).setUp()
Expand Down Expand Up @@ -374,3 +373,21 @@ def test_simple(self):
eq(3, len(el))
eq("test.foo{}".format(NUM), el[0])
eq({'bar': "baz{}".format(NUM)}, el[2])


class TestSenderSocketGaierror(unittest.TestCase):
def setUp(self):
super(TestSenderSocketGaierror, self).setUp()
self._sender = fluent.asyncsender.FluentSender(host='localhost_error', tag='test')

def tearDown(self):
self._sender.close()

def test_simple(self):
with self._sender as sender:
for _ in range(100):
sender._queue.put(b"1")

sender._queue.put(fluent.asyncsender._TOMBSTONE)

self.assertEqual(self._sender._queue.qsize(), 0)

0 comments on commit 5a23fd5

Please sign in to comment.