diff --git a/README.md b/README.md index 6b6bb80..c9cceb8 100644 --- a/README.md +++ b/README.md @@ -438,6 +438,28 @@ pool = sqlalchemy.create_engine( connector.close() ``` +### Debug Logging + +The AlloyDB Python Connector uses the standard [Python logging module][python-logging] +for debug logging support. + +Add the below code to your application to enable debug logging with the AlloyDB +Python Connector: + +```python +import logging + +logging.basicConfig(format="%(asctime)s [%(levelname)s]: %(message)s") +logger = logging.getLogger(name="google.cloud.alloydb.connector") +logger.setLevel(logging.DEBUG) +``` + +For more details on configuring logging, please refer to the +[Python logging docs][configure-logging]. + +[python-logging]: https://docs.python.org/3/library/logging.html +[configure-logging]: https://docs.python.org/3/howto/logging.html#configuring-logging + ## Support policy ### Major version lifecycle diff --git a/google/cloud/alloydb/connector/async_connector.py b/google/cloud/alloydb/connector/async_connector.py index 349ad8e..5a2b9f5 100644 --- a/google/cloud/alloydb/connector/async_connector.py +++ b/google/cloud/alloydb/connector/async_connector.py @@ -15,6 +15,7 @@ from __future__ import annotations import asyncio +import logging from types import TracebackType from typing import Any, Dict, Optional, Type, TYPE_CHECKING, Union @@ -33,6 +34,8 @@ if TYPE_CHECKING: from google.auth.credentials import Credentials +logger = logging.getLogger(name=__name__) + class AsyncConnector: """A class to configure and create connections to Cloud SQL instances @@ -141,10 +144,18 @@ async def connect( cache = self._cache[instance_uri] else: if self._refresh_strategy == RefreshStrategy.LAZY: + logger.debug( + f"['{instance_uri}']: Refresh strategy is set to lazy refresh" + ) cache = LazyRefreshCache(instance_uri, self._client, self._keys) else: + logger.debug( + f"['{instance_uri}']: Refresh strategy is set to background" + " refresh" + ) cache = RefreshAheadCache(instance_uri, self._client, self._keys) self._cache[instance_uri] = cache + logger.debug(f"['{instance_uri}']: Connection info added to cache") connect_func = { "asyncpg": asyncpg.connect, @@ -168,6 +179,7 @@ async def connect( ip_type = IPTypes(ip_type.upper()) conn_info = await cache.connect_info() ip_address = conn_info.get_preferred_ip(ip_type) + logger.debug(f"['{instance_uri}']: Connecting to {ip_address}:5433") # callable to be used for auto IAM authn def get_authentication_token() -> str: diff --git a/google/cloud/alloydb/connector/client.py b/google/cloud/alloydb/connector/client.py index 58498c1..6edb9ee 100644 --- a/google/cloud/alloydb/connector/client.py +++ b/google/cloud/alloydb/connector/client.py @@ -118,8 +118,6 @@ async def _get_metadata( Returns: dict: IP addresses of the AlloyDB instance. """ - logger.debug(f"['{project}/{region}/{cluster}/{name}']: Requesting metadata") - headers = { "Authorization": f"Bearer {self._credentials.token}", } @@ -165,8 +163,6 @@ async def _get_client_certificate( Tuple[str, list[str]]: Tuple containing the CA certificate and certificate chain for the AlloyDB instance. """ - logger.debug(f"['{project}/{region}/{cluster}']: Requesting client certificate") - headers = { "Authorization": f"Bearer {self._credentials.token}", } @@ -252,4 +248,6 @@ async def get_connection_info( async def close(self) -> None: """Close AlloyDBClient gracefully.""" + logger.debug("Waiting for connector's http client to close") await self._client.close() + logger.debug("Closed connector's http client") diff --git a/google/cloud/alloydb/connector/connector.py b/google/cloud/alloydb/connector/connector.py index 8b71426..c4e8bae 100644 --- a/google/cloud/alloydb/connector/connector.py +++ b/google/cloud/alloydb/connector/connector.py @@ -16,6 +16,7 @@ import asyncio from functools import partial +import logging import socket import struct from threading import Thread @@ -41,6 +42,8 @@ from google.auth.credentials import Credentials +logger = logging.getLogger(name=__name__) + # the port the AlloyDB server-side proxy receives connections on SERVER_PROXY_PORT = 5433 # the maximum amount of time to wait before aborting a metadata exchange @@ -170,10 +173,18 @@ async def connect_async(self, instance_uri: str, driver: str, **kwargs: Any) -> cache = self._cache[instance_uri] else: if self._refresh_strategy == RefreshStrategy.LAZY: + logger.debug( + f"['{instance_uri}']: Refresh strategy is set to lazy refresh" + ) cache = LazyRefreshCache(instance_uri, self._client, self._keys) else: + logger.debug( + f"['{instance_uri}']: Refresh strategy is set to background" + " refresh" + ) cache = RefreshAheadCache(instance_uri, self._client, self._keys) self._cache[instance_uri] = cache + logger.debug(f"['{instance_uri}']: Connection info added to cache") connect_func = { "pg8000": pg8000.connect, @@ -197,6 +208,7 @@ async def connect_async(self, instance_uri: str, driver: str, **kwargs: Any) -> ip_type = IPTypes(ip_type.upper()) conn_info = await cache.connect_info() ip_address = conn_info.get_preferred_ip(ip_type) + logger.debug(f"['{instance_uri}']: Connecting to {ip_address}:5433") # synchronous drivers are blocking and run using executor try: diff --git a/google/cloud/alloydb/connector/instance.py b/google/cloud/alloydb/connector/instance.py index 5758d76..52adf8a 100644 --- a/google/cloud/alloydb/connector/instance.py +++ b/google/cloud/alloydb/connector/instance.py @@ -15,6 +15,9 @@ from __future__ import annotations import asyncio +from datetime import datetime +from datetime import timedelta +from datetime import timezone import logging import re from typing import Tuple, TYPE_CHECKING @@ -104,7 +107,9 @@ async def _perform_refresh(self) -> ConnectionInfo: ConnectionInfo: Result of the refresh operation. """ self._refresh_in_progress.set() - logger.debug(f"['{self._instance_uri}']: Entered _perform_refresh") + logger.debug( + f"['{self._instance_uri}']: Connection info refresh operation started" + ) try: await self._refresh_rate_limiter.acquire() @@ -115,10 +120,19 @@ async def _perform_refresh(self) -> ConnectionInfo: self._name, self._keys, ) + logger.debug( + f"['{self._instance_uri}']: Connection info refresh operation" + " complete" + ) + logger.debug( + f"['{self._instance_uri}']: Current certificate expiration = " + f"{connection_info.expiration.isoformat()}" + ) - except Exception: + except Exception as e: logger.debug( - f"['{self._instance_uri}']: Error occurred during _perform_refresh." + f"['{self._instance_uri}']: Connection info refresh operation" + f" failed: {str(e)}" ) raise @@ -153,7 +167,6 @@ async def _refresh_operation(self, delay: int) -> ConnectionInfo: refresh_task: asyncio.Task try: if delay > 0: - logger.debug(f"['{self._instance_uri}']: Entering sleep") await asyncio.sleep(delay) refresh_task = asyncio.create_task(self._perform_refresh()) refresh_result = await refresh_task @@ -162,6 +175,11 @@ async def _refresh_operation(self, delay: int) -> ConnectionInfo: raise RefreshError( f"['{self._instance_uri}']: Invalid refresh operation. Certficate appears to be expired." ) + except asyncio.CancelledError: + logger.debug( + f"['{self._instance_uri}']: Scheduled refresh operation cancelled" + ) + raise # bad refresh attempt except Exception: logger.info( @@ -180,6 +198,12 @@ async def _refresh_operation(self, delay: int) -> ConnectionInfo: self._current = refresh_task # calculate refresh delay based on certificate expiration delay = _seconds_until_refresh(refresh_result.expiration) + logger.debug( + f"['{self._instance_uri}']: Connection info refresh operation" + " scheduled for " + f"{(datetime.now(timezone.utc) + timedelta(seconds=delay)).isoformat(timespec='seconds')} " + f"(now + {timedelta(seconds=delay)})" + ) self._next = self._schedule_refresh(delay) return refresh_result @@ -207,9 +231,11 @@ async def close(self) -> None: """ Cancel refresh tasks. """ - logger.debug(f"['{self._instance_uri}']: Waiting for _current to be cancelled") + logger.debug( + f"['{self._instance_uri}']: Canceling connection info refresh" + " operation tasks" + ) self._current.cancel() - logger.debug(f"['{self._instance_uri}']: Waiting for _next to be cancelled") self._next.cancel() # gracefully wait for tasks to cancel tasks = asyncio.gather(self._current, self._next, return_exceptions=True)