Skip to content

Commit

Permalink
elasticapm/transport: specific shutdown handling for http transport (#…
Browse files Browse the repository at this point in the history
…2085)

We have a race condition at http transport shutdown where our atexit
handler is racing against urllib3 ConnectionPool weakref finalizer.
Having the urllib3 finalizer called before atexit would lead to
have our thread hang waiting for send any eventual queued data via
urllib3 pools that are closed.
Force the creation of a new PoolManager so that we are always able to flush.
  • Loading branch information
xrmx authored Jul 26, 2024
1 parent b219e91 commit ae65416
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 1 deletion.
3 changes: 2 additions & 1 deletion elasticapm/transport/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,8 @@ def close(self) -> None:
if not self._flushed.wait(timeout=self._max_flush_time_seconds):
logger.error("Closing the transport connection timed out.")

stop_thread = close
def stop_thread(self) -> None:
self.close()

def flush(self):
"""
Expand Down
18 changes: 18 additions & 0 deletions elasticapm/transport/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import hashlib
import json
import os
import re
import ssl
import urllib.parse
Expand Down Expand Up @@ -250,6 +251,23 @@ def ca_certs(self):
return self._server_ca_cert_file
return certifi.where() if (certifi and self.client.config.use_certifi) else None

def close(self):
"""
Take care of being able to shutdown cleanly
:return:
"""
if self._closed or (not self._thread or self._thread.pid != os.getpid()):
return

self._closed = True
# we are racing against urllib3 ConnectionPool weakref finalizer that would lead to having them closed
# and we hanging waiting for send any eventual queued data
# Force the creation of a new PoolManager so that we are always able to flush
self._http = None
self.queue("close", None)
if not self._flushed.wait(timeout=self._max_flush_time_seconds):
logger.error("Closing the transport connection timed out.")


def version_string_to_tuple(version):
if version:
Expand Down
88 changes: 88 additions & 0 deletions tests/transports/test_urllib3.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@


import os
import time

import certifi
import mock
Expand Down Expand Up @@ -517,3 +518,90 @@ def test_fetch_server_info_flat_string(waiting_httpserver, caplog, elasticapm_cl
transport.fetch_server_info()
assert elasticapm_client.server_version is None
assert_any_record_contains(caplog.records, "No version key found in server response")


def test_close(waiting_httpserver, elasticapm_client):
elasticapm_client.server_version = (8, 0, 0) # avoid making server_info request
waiting_httpserver.serve_content(code=202, content="", headers={"Location": "http://example.com/foo"})
transport = Transport(
waiting_httpserver.url, client=elasticapm_client, headers=elasticapm_client._transport._headers
)
transport.start_thread()

transport.close()

assert transport._closed is True
assert transport._flushed.is_set() is True


def test_close_does_nothing_if_called_from_another_pid(waiting_httpserver, caplog, elasticapm_client):
elasticapm_client.server_version = (8, 0, 0) # avoid making server_info request
waiting_httpserver.serve_content(code=202, content="", headers={"Location": "http://example.com/foo"})
transport = Transport(
waiting_httpserver.url, client=elasticapm_client, headers=elasticapm_client._transport._headers
)
transport.start_thread()

with mock.patch("os.getpid") as getpid_mock:
getpid_mock.return_value = 0
transport.close()

assert transport._closed is False

transport.close()


def test_close_can_be_called_multiple_times(waiting_httpserver, caplog, elasticapm_client):
elasticapm_client.server_version = (8, 0, 0) # avoid making server_info request
waiting_httpserver.serve_content(code=202, content="", headers={"Location": "http://example.com/foo"})
transport = Transport(
waiting_httpserver.url, client=elasticapm_client, headers=elasticapm_client._transport._headers
)
transport.start_thread()

with caplog.at_level("INFO", logger="elasticapm.transport.http"):
transport.close()

assert transport._closed is True

transport.close()


def test_close_timeout_error_without_flushing(waiting_httpserver, caplog, elasticapm_client):
elasticapm_client.server_version = (8, 0, 0) # avoid making server_info request
waiting_httpserver.serve_content(code=202, content="", headers={"Location": "http://example.com/foo"})

with caplog.at_level("INFO", logger="elasticapm.transport.http"):
with mock.patch.object(Transport, "_max_flush_time_seconds", 0):
with mock.patch.object(Transport, "_flush") as flush_mock:
# sleep more that the timeout
flush_mock.side_effect = lambda x: time.sleep(0.1)
transport = Transport(
waiting_httpserver.url, client=elasticapm_client, headers=elasticapm_client._transport._headers
)
transport.start_thread()
# need to write something to the buffer to have _flush() called
transport.queue("error", {"an": "error"})
transport.close()

assert transport._flushed.is_set() is False
assert transport._closed is True
record = caplog.records[-1]
assert "Closing the transport connection timed out." in record.msg


def test_http_pool_manager_is_recycled_at_stop_thread(waiting_httpserver, caplog, elasticapm_client):
elasticapm_client.server_version = (8, 0, 0) # avoid making server_info request
waiting_httpserver.serve_content(code=202, content="", headers={"Location": "http://example.com/foo"})
transport = Transport(
waiting_httpserver.url, client=elasticapm_client, headers=elasticapm_client._transport._headers
)
transport.start_thread()
pool_manager = transport.http

with caplog.at_level("INFO", logger="elasticapm.transport.http"):
transport.stop_thread()

assert transport._flushed.is_set() is True
assert pool_manager != transport._http
assert not caplog.records

0 comments on commit ae65416

Please sign in to comment.