diff --git a/pylitterbot/session.py b/pylitterbot/session.py index da9fe10..543a20a 100644 --- a/pylitterbot/session.py +++ b/pylitterbot/session.py @@ -10,7 +10,7 @@ from aiohttp import ClientSession from .exceptions import InvalidCommandException -from .utils import decode, utcnow +from .utils import decode, redact, utcnow T = TypeVar("T", bound="Session") @@ -101,7 +101,7 @@ async def request( resp.raise_for_status() data = await resp.json() - _LOGGER.debug("Received %s response: %s", resp.status, data) + _LOGGER.debug("Received %s response: %s", resp.status, redact(data)) return data # type: ignore async def __aenter__(self: T) -> T: diff --git a/pylitterbot/utils.py b/pylitterbot/utils.py index 1d7e5d1..17a3b2e 100644 --- a/pylitterbot/utils.py +++ b/pylitterbot/utils.py @@ -5,13 +5,32 @@ import logging import re from base64 import b64decode, b64encode +from collections.abc import Mapping from datetime import datetime, time, timezone +from typing import TypeVar, cast, overload from urllib.parse import urljoin as _urljoin from warnings import warn _LOGGER = logging.getLogger(__name__) +_T = TypeVar("_T") ENCODING = "utf-8" +REDACTED = "**REDACTED**" +REDACT_FIELDS = [ + "token", + "idToken", + "refreshToken", + "userId", + "userEmail", + "sessionId", + "oneSignalPlayerId", + "deviceId", + "id", + "litterRobotId", + "unitId", + "litterRobotSerial", + "serial", +] def decode(value: str) -> str: @@ -79,3 +98,38 @@ def send_deprecation_warning( message = f"{old_name} has been deprecated{'' if new_name is None else f' in favor of {new_name}'} and will be removed in a future release" warn(message, DeprecationWarning, stacklevel=2) _LOGGER.warning(message) + + +@overload +def redact(data: Mapping) -> dict: # type: ignore[misc] + ... + + +@overload +def redact(data: _T) -> _T: + ... + + +def redact(data: _T) -> _T: + """Redact sensitive data in a dict.""" + if not isinstance(data, (Mapping, list)): + return data + + if isinstance(data, list): + return cast(_T, [redact(val) for val in data]) + + redacted = {**data} + + for key, value in redacted.items(): + if value is None: + continue + if isinstance(value, str) and not value: + continue + if key in REDACT_FIELDS: + redacted[key] = REDACTED + elif isinstance(value, Mapping): + redacted[key] = redact(value) + elif isinstance(value, list): + redacted[key] = [redact(item) for item in value] + + return cast(_T, redacted)