Skip to content

Commit

Permalink
feat: add standardized debug logging (#354)
Browse files Browse the repository at this point in the history
  • Loading branch information
jackwotherspoon authored Jul 24, 2024
1 parent c9ef40d commit 14d6b1c
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 10 deletions.
22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,28 @@ pool = sqlalchemy.create_engine(
connector.close()
```

### Debug Logging

The AlloyDB Python Connector uses the standard [Python logging module][python-logging]
for debug logging support.

Add the below code to your application to enable debug logging with the AlloyDB
Python Connector:

```python
import logging

logging.basicConfig(format="%(asctime)s [%(levelname)s]: %(message)s")
logger = logging.getLogger(name="google.cloud.alloydb.connector")
logger.setLevel(logging.DEBUG)
```

For more details on configuring logging, please refer to the
[Python logging docs][configure-logging].

[python-logging]: https://docs.python.org/3/library/logging.html
[configure-logging]: https://docs.python.org/3/howto/logging.html#configuring-logging

## Support policy

### Major version lifecycle
Expand Down
12 changes: 12 additions & 0 deletions google/cloud/alloydb/connector/async_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from __future__ import annotations

import asyncio
import logging
from types import TracebackType
from typing import Any, Dict, Optional, Type, TYPE_CHECKING, Union

Expand All @@ -33,6 +34,8 @@
if TYPE_CHECKING:
from google.auth.credentials import Credentials

logger = logging.getLogger(name=__name__)


class AsyncConnector:
"""A class to configure and create connections to Cloud SQL instances
Expand Down Expand Up @@ -141,10 +144,18 @@ async def connect(
cache = self._cache[instance_uri]
else:
if self._refresh_strategy == RefreshStrategy.LAZY:
logger.debug(
f"['{instance_uri}']: Refresh strategy is set to lazy refresh"
)
cache = LazyRefreshCache(instance_uri, self._client, self._keys)
else:
logger.debug(
f"['{instance_uri}']: Refresh strategy is set to background"
" refresh"
)
cache = RefreshAheadCache(instance_uri, self._client, self._keys)
self._cache[instance_uri] = cache
logger.debug(f"['{instance_uri}']: Connection info added to cache")

connect_func = {
"asyncpg": asyncpg.connect,
Expand All @@ -168,6 +179,7 @@ async def connect(
ip_type = IPTypes(ip_type.upper())
conn_info = await cache.connect_info()
ip_address = conn_info.get_preferred_ip(ip_type)
logger.debug(f"['{instance_uri}']: Connecting to {ip_address}:5433")

# callable to be used for auto IAM authn
def get_authentication_token() -> str:
Expand Down
6 changes: 2 additions & 4 deletions google/cloud/alloydb/connector/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,6 @@ async def _get_metadata(
Returns:
dict: IP addresses of the AlloyDB instance.
"""
logger.debug(f"['{project}/{region}/{cluster}/{name}']: Requesting metadata")

headers = {
"Authorization": f"Bearer {self._credentials.token}",
}
Expand Down Expand Up @@ -165,8 +163,6 @@ async def _get_client_certificate(
Tuple[str, list[str]]: Tuple containing the CA certificate
and certificate chain for the AlloyDB instance.
"""
logger.debug(f"['{project}/{region}/{cluster}']: Requesting client certificate")

headers = {
"Authorization": f"Bearer {self._credentials.token}",
}
Expand Down Expand Up @@ -252,4 +248,6 @@ async def get_connection_info(

async def close(self) -> None:
"""Close AlloyDBClient gracefully."""
logger.debug("Waiting for connector's http client to close")
await self._client.close()
logger.debug("Closed connector's http client")
12 changes: 12 additions & 0 deletions google/cloud/alloydb/connector/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import asyncio
from functools import partial
import logging
import socket
import struct
from threading import Thread
Expand All @@ -41,6 +42,8 @@

from google.auth.credentials import Credentials

logger = logging.getLogger(name=__name__)

# the port the AlloyDB server-side proxy receives connections on
SERVER_PROXY_PORT = 5433
# the maximum amount of time to wait before aborting a metadata exchange
Expand Down Expand Up @@ -170,10 +173,18 @@ async def connect_async(self, instance_uri: str, driver: str, **kwargs: Any) ->
cache = self._cache[instance_uri]
else:
if self._refresh_strategy == RefreshStrategy.LAZY:
logger.debug(
f"['{instance_uri}']: Refresh strategy is set to lazy refresh"
)
cache = LazyRefreshCache(instance_uri, self._client, self._keys)
else:
logger.debug(
f"['{instance_uri}']: Refresh strategy is set to background"
" refresh"
)
cache = RefreshAheadCache(instance_uri, self._client, self._keys)
self._cache[instance_uri] = cache
logger.debug(f"['{instance_uri}']: Connection info added to cache")

connect_func = {
"pg8000": pg8000.connect,
Expand All @@ -197,6 +208,7 @@ async def connect_async(self, instance_uri: str, driver: str, **kwargs: Any) ->
ip_type = IPTypes(ip_type.upper())
conn_info = await cache.connect_info()
ip_address = conn_info.get_preferred_ip(ip_type)
logger.debug(f"['{instance_uri}']: Connecting to {ip_address}:5433")

# synchronous drivers are blocking and run using executor
try:
Expand Down
38 changes: 32 additions & 6 deletions google/cloud/alloydb/connector/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
from __future__ import annotations

import asyncio
from datetime import datetime
from datetime import timedelta
from datetime import timezone
import logging
import re
from typing import Tuple, TYPE_CHECKING
Expand Down Expand Up @@ -104,7 +107,9 @@ async def _perform_refresh(self) -> ConnectionInfo:
ConnectionInfo: Result of the refresh operation.
"""
self._refresh_in_progress.set()
logger.debug(f"['{self._instance_uri}']: Entered _perform_refresh")
logger.debug(
f"['{self._instance_uri}']: Connection info refresh operation started"
)

try:
await self._refresh_rate_limiter.acquire()
Expand All @@ -115,10 +120,19 @@ async def _perform_refresh(self) -> ConnectionInfo:
self._name,
self._keys,
)
logger.debug(
f"['{self._instance_uri}']: Connection info refresh operation"
" complete"
)
logger.debug(
f"['{self._instance_uri}']: Current certificate expiration = "
f"{connection_info.expiration.isoformat()}"
)

except Exception:
except Exception as e:
logger.debug(
f"['{self._instance_uri}']: Error occurred during _perform_refresh."
f"['{self._instance_uri}']: Connection info refresh operation"
f" failed: {str(e)}"
)
raise

Expand Down Expand Up @@ -153,7 +167,6 @@ async def _refresh_operation(self, delay: int) -> ConnectionInfo:
refresh_task: asyncio.Task
try:
if delay > 0:
logger.debug(f"['{self._instance_uri}']: Entering sleep")
await asyncio.sleep(delay)
refresh_task = asyncio.create_task(self._perform_refresh())
refresh_result = await refresh_task
Expand All @@ -162,6 +175,11 @@ async def _refresh_operation(self, delay: int) -> ConnectionInfo:
raise RefreshError(
f"['{self._instance_uri}']: Invalid refresh operation. Certficate appears to be expired."
)
except asyncio.CancelledError:
logger.debug(
f"['{self._instance_uri}']: Scheduled refresh operation cancelled"
)
raise
# bad refresh attempt
except Exception:
logger.info(
Expand All @@ -180,6 +198,12 @@ async def _refresh_operation(self, delay: int) -> ConnectionInfo:
self._current = refresh_task
# calculate refresh delay based on certificate expiration
delay = _seconds_until_refresh(refresh_result.expiration)
logger.debug(
f"['{self._instance_uri}']: Connection info refresh operation"
" scheduled for "
f"{(datetime.now(timezone.utc) + timedelta(seconds=delay)).isoformat(timespec='seconds')} "
f"(now + {timedelta(seconds=delay)})"
)
self._next = self._schedule_refresh(delay)

return refresh_result
Expand Down Expand Up @@ -207,9 +231,11 @@ async def close(self) -> None:
"""
Cancel refresh tasks.
"""
logger.debug(f"['{self._instance_uri}']: Waiting for _current to be cancelled")
logger.debug(
f"['{self._instance_uri}']: Canceling connection info refresh"
" operation tasks"
)
self._current.cancel()
logger.debug(f"['{self._instance_uri}']: Waiting for _next to be cancelled")
self._next.cancel()
# gracefully wait for tasks to cancel
tasks = asyncio.gather(self._current, self._next, return_exceptions=True)
Expand Down

0 comments on commit 14d6b1c

Please sign in to comment.