Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

elasticapm/transport: specific shutdown handling for http transport #2085

Merged
merged 2 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion elasticapm/transport/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,8 @@ def close(self) -> None:
if not self._flushed.wait(timeout=self._max_flush_time_seconds):
logger.error("Closing the transport connection timed out.")

stop_thread = close
def stop_thread(self) -> None:
self.close()

def flush(self):
"""
Expand Down
18 changes: 18 additions & 0 deletions elasticapm/transport/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import hashlib
import json
import os
import re
import ssl
import urllib.parse
Expand Down Expand Up @@ -250,6 +251,23 @@ def ca_certs(self):
return self._server_ca_cert_file
return certifi.where() if (certifi and self.client.config.use_certifi) else None

def close(self):
"""
Take care of being able to shutdown cleanly
:return:
"""
if self._closed or (not self._thread or self._thread.pid != os.getpid()):
return

self._closed = True
# we are racing against urllib3 ConnectionPool weakref finalizer that would lead to having them closed
# and we hanging waiting for send any eventual queued data
# Force the creation of a new PoolManager so that we are always able to flush
self._http = None
self.queue("close", None)
if not self._flushed.wait(timeout=self._max_flush_time_seconds):
logger.error("Closing the transport connection timed out.")


def version_string_to_tuple(version):
if version:
Expand Down
88 changes: 88 additions & 0 deletions tests/transports/test_urllib3.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@


import os
import time

import certifi
import mock
Expand Down Expand Up @@ -517,3 +518,90 @@ def test_fetch_server_info_flat_string(waiting_httpserver, caplog, elasticapm_cl
transport.fetch_server_info()
assert elasticapm_client.server_version is None
assert_any_record_contains(caplog.records, "No version key found in server response")


def test_close(waiting_httpserver, elasticapm_client):
elasticapm_client.server_version = (8, 0, 0) # avoid making server_info request
waiting_httpserver.serve_content(code=202, content="", headers={"Location": "http://example.com/foo"})
transport = Transport(
waiting_httpserver.url, client=elasticapm_client, headers=elasticapm_client._transport._headers
)
transport.start_thread()

transport.close()

assert transport._closed is True
assert transport._flushed.is_set() is True


def test_close_does_nothing_if_called_from_another_pid(waiting_httpserver, caplog, elasticapm_client):
elasticapm_client.server_version = (8, 0, 0) # avoid making server_info request
waiting_httpserver.serve_content(code=202, content="", headers={"Location": "http://example.com/foo"})
transport = Transport(
waiting_httpserver.url, client=elasticapm_client, headers=elasticapm_client._transport._headers
)
transport.start_thread()

with mock.patch("os.getpid") as getpid_mock:
getpid_mock.return_value = 0
transport.close()

assert transport._closed is False

transport.close()


def test_close_can_be_called_multiple_times(waiting_httpserver, caplog, elasticapm_client):
elasticapm_client.server_version = (8, 0, 0) # avoid making server_info request
waiting_httpserver.serve_content(code=202, content="", headers={"Location": "http://example.com/foo"})
transport = Transport(
waiting_httpserver.url, client=elasticapm_client, headers=elasticapm_client._transport._headers
)
transport.start_thread()

with caplog.at_level("INFO", logger="elasticapm.transport.http"):
transport.close()

assert transport._closed is True

transport.close()


def test_close_timeout_error_without_flushing(waiting_httpserver, caplog, elasticapm_client):
elasticapm_client.server_version = (8, 0, 0) # avoid making server_info request
waiting_httpserver.serve_content(code=202, content="", headers={"Location": "http://example.com/foo"})

with caplog.at_level("INFO", logger="elasticapm.transport.http"):
with mock.patch.object(Transport, "_max_flush_time_seconds", 0):
with mock.patch.object(Transport, "_flush") as flush_mock:
xrmx marked this conversation as resolved.
Show resolved Hide resolved
# sleep more that the timeout
flush_mock.side_effect = lambda x: time.sleep(0.1)
transport = Transport(
waiting_httpserver.url, client=elasticapm_client, headers=elasticapm_client._transport._headers
)
transport.start_thread()
# need to write something to the buffer to have _flush() called
transport.queue("error", {"an": "error"})
transport.close()

assert transport._flushed.is_set() is False
assert transport._closed is True
record = caplog.records[-1]
assert "Closing the transport connection timed out." in record.msg


def test_http_pool_manager_is_recycled_at_stop_thread(waiting_httpserver, caplog, elasticapm_client):
elasticapm_client.server_version = (8, 0, 0) # avoid making server_info request
waiting_httpserver.serve_content(code=202, content="", headers={"Location": "http://example.com/foo"})
transport = Transport(
waiting_httpserver.url, client=elasticapm_client, headers=elasticapm_client._transport._headers
)
transport.start_thread()
pool_manager = transport.http

with caplog.at_level("INFO", logger="elasticapm.transport.http"):
transport.stop_thread()

assert transport._flushed.is_set() is True
assert pool_manager != transport._http
assert not caplog.records
Loading