Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Run ruff check . #420

Merged
merged 1 commit into from
Sep 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 46 additions & 44 deletions app/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,30 @@
import sys
import types
from collections import defaultdict
from collections.abc import Callable, Generator
from contextlib import contextmanager
from datetime import datetime, timedelta
from typing import Any, Callable, Generator
from typing import TYPE_CHECKING, Any

import click
import minify_html
import typer.cli
from fastapi.params import Depends, Header
from fastapi.routing import APIRoute
from rich.console import Console
from rich.table import Table
from sqlalchemy.orm import Session, joinedload
from sqlmodel import col
from starlette.routing import Route

from app import aws, mail, main, models, util
from app.db import get_db, handle_skipped_award, rollback_on_error
from app.exceptions import SkippedAwardError, SourceFormatError
from app.settings import app_settings
from app.sources import colombia as data_access

if TYPE_CHECKING:
from fastapi.routing import APIRoute
from starlette.routing import Route

state = {"quiet": False}


Expand Down Expand Up @@ -203,16 +206,15 @@ def update_applications_to_lapsed() -> None:
"""
Lapse applications that have been waiting for the borrower to respond for some time.
"""
with contextmanager(get_db)() as session:
with rollback_on_error(session):
for application in models.Application.lapseable(session).options(
joinedload(models.Application.borrower),
joinedload(models.Application.borrower_documents),
):
application.status = models.ApplicationStatus.LAPSED
application.application_lapsed_at = datetime.utcnow()
with contextmanager(get_db)() as session, rollback_on_error(session):
for application in models.Application.lapseable(session).options(
joinedload(models.Application.borrower),
joinedload(models.Application.borrower_documents),
):
application.status = models.ApplicationStatus.LAPSED
application.application_lapsed_at = datetime.utcnow()

session.commit()
session.commit()


@app.command()
Expand Down Expand Up @@ -258,40 +260,39 @@ def remove_dated_application_data() -> None:
Clear personal data and delete borrower documents from applications that have been in a final state for some time.
If the borrower has no other active applications, clear the borrower's personal data.
"""
with contextmanager(get_db)() as session:
with rollback_on_error(session):
for application in models.Application.archivable(session).options(
joinedload(models.Application.borrower),
joinedload(models.Application.borrower_documents),
):
application.award.previous = True
application.primary_email = ""
application.archived_at = datetime.utcnow()

for document in application.borrower_documents:
session.delete(document)

# Clear the associated borrower's personal data if they have no other active applications.
if not session.query(
models.Application.unarchived(session)
.filter(
models.Application.borrower_id == application.borrower_id,
models.Application.id != application.id,
)
.exists()
).scalar():
application.borrower.legal_name = ""
application.borrower.email = ""
application.borrower.address = ""
application.borrower.legal_identifier = ""
application.borrower.source_data = {}
with contextmanager(get_db)() as session, rollback_on_error(session):
for application in models.Application.archivable(session).options(
joinedload(models.Application.borrower),
joinedload(models.Application.borrower_documents),
):
application.award.previous = True
application.primary_email = ""
application.archived_at = datetime.utcnow()

for document in application.borrower_documents:
session.delete(document)

# Clear the associated borrower's personal data if they have no other active applications.
if not session.query(
models.Application.unarchived(session)
.filter(
models.Application.borrower_id == application.borrower_id,
models.Application.id != application.id,
)
.exists()
).scalar():
application.borrower.legal_name = ""
application.borrower.email = ""
application.borrower.address = ""
application.borrower.legal_identifier = ""
application.borrower.source_data = {}

session.commit()
session.commit()


# The openapi.json file can't be used, because it doesn't track Python modules.
@dev.command()
def routes(csv_format: bool = False) -> None:
def routes(*, csv_format: bool = False) -> None:
"""
Print a table of routes.
"""
Expand All @@ -313,7 +314,8 @@ def _pretty(model: Any, expected: str) -> str:

rows = []
for route in main.app.routes:
assert isinstance(route, (APIRoute, Route))
if TYPE_CHECKING:
assert isinstance(route, APIRoute | Route)

# Skip default OpenAPI routes.
if route.endpoint.__module__.startswith("fastapi."):
Expand All @@ -327,7 +329,7 @@ def _pretty(model: Any, expected: str) -> str:
arg
for arg, default in itertools.zip_longest(reversed(spec.args), reversed(spec.defaults or []))
# Note: Depends() can contain application `id` and `uuid` args, under many layers.
if not isinstance(default, (Depends, Header))
if not isinstance(default, Depends | Header)
)

response = _pretty(getattr(route, "response_model", None), "app.serializers")
Expand Down Expand Up @@ -373,7 +375,7 @@ def cli_input_json(name: str, file: typer.FileText) -> None:

# https://typer.tiangolo.com/tutorial/commands/callback/
@app.callback()
def cli(quiet: bool = typer.Option(False, "--quiet", "-q")) -> None:
def cli(*, quiet: bool = typer.Option(False, "--quiet", "-q")) -> None: # noqa: FBT003
if quiet:
state["quiet"] = True

Expand Down
23 changes: 12 additions & 11 deletions app/auth.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import functools
from typing import Any
from typing import TYPE_CHECKING, Any

import jwt
import requests # moto intercepts only requests, not httpx: https://github.com/getmoto/moto/issues/4197
Expand Down Expand Up @@ -33,7 +33,8 @@ def get_keys() -> dict[str, JWK]:
for jwk in JWKS.model_validate(
requests.get(
f"https://cognito-idp.{app_settings.aws_region}.amazonaws.com/"
f"{app_settings.cognito_pool_id}/.well-known/jwks.json"
f"{app_settings.cognito_pool_id}/.well-known/jwks.json",
timeout=10,
).json()
).keys
}
Expand All @@ -48,7 +49,7 @@ class JWTAuthorization(HTTPBearer):
:param auto_error: If set to True, automatic error responses will be sent when request authentication fails.
"""

def __init__(self, auto_error: bool = True):
def __init__(self, *, auto_error: bool = True):
super().__init__(auto_error=auto_error)
self.kid_to_jwk = get_keys()

Expand All @@ -73,14 +74,15 @@ def verify_jwk_token(self, jwt_credentials: JWTAuthorizationCredentials) -> bool
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=_("JWK public key not found"),
)
) from None

msg = jwt_credentials.message.encode()
sig = base64url_decode(jwt_credentials.signature.encode())

obj = jwt.PyJWK(public_key)
alg_obj = obj.Algorithm
assert alg_obj
if TYPE_CHECKING:
assert alg_obj
prepared_key = alg_obj.prepare_key(obj.key)

return alg_obj.verify(msg, prepared_key, sig)
Expand Down Expand Up @@ -123,7 +125,7 @@ async def __call__(self, request: Request) -> JWTAuthorizationCredentials: # ty
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=_("JWK invalid"),
)
) from None

if not self.verify_jwk_token(jwt_credentials):
raise HTTPException(
Expand All @@ -132,8 +134,7 @@ async def __call__(self, request: Request) -> JWTAuthorizationCredentials: # ty
)

return jwt_credentials
else:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=_("Not authenticated"),
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=_("Not authenticated"),
)
26 changes: 9 additions & 17 deletions app/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import logging
import random
import string
from typing import Callable
from collections.abc import Callable

import boto3
from fastapi import HTTPException, status
Expand All @@ -16,31 +16,23 @@

logger = logging.getLogger(__name__)

PASSWORD_LENGTH = 14
PASSWORD_CHARACTERS = list(
set(string.ascii_letters) | set(string.digits) | set(string.punctuation) - set('"/\\|_-#@%&*(){}[]<>~`')
)


def generate_password_fn() -> str:
"""
Generates a random password of length at least 14 characters.
The generated password includes characters from ASCII letters, digits and punctuation,
but it excludes the following characters: '"/\\|_-#@%&*(){}[]<>~`'.

:return: The randomly generated password.
Return a random password of 14 ASCII letter, digit and punctuation characters.
"""
excluded_chars = '"/\\|_-#@%&*(){}[]<>~`'
characters = f"{string.ascii_letters}{string.digits}{string.punctuation}"
password = ""

while len(password) < 14:
char = random.choice(characters)
if char not in excluded_chars:
password += char

return password
return "".join(random.choice(PASSWORD_CHARACTERS) for _ in range(PASSWORD_LENGTH)) # noqa: S311


# https://docs.aws.amazon.com/cognito/latest/developerguide/signing-up-users-in-your-app.html#cognito-user-pools-computing-secret-hash
def get_secret_hash(username: str) -> str:
"""
Generates a secret hash for the given username using Cognito client secret and Cognito client id.
Generate a secret hash for the given username using Cognito client secret and Cognito client id.

:param username: The username
:return: A base64 encoded string containing the generated secret hash.
Expand Down
2 changes: 1 addition & 1 deletion app/db.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import traceback
from collections.abc import Generator
from contextlib import contextmanager
from typing import Generator

from sqlalchemy import create_engine
from sqlalchemy.orm import Session, sessionmaker
Expand Down
19 changes: 10 additions & 9 deletions app/dependencies.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from collections.abc import Callable, Generator
from datetime import datetime
from enum import Enum
from typing import Any, Callable, Generator
from typing import TYPE_CHECKING, Any

from fastapi import Depends, Form, HTTPException, Request, status
from sqlalchemy.orm import Session, defaultload, joinedload
Expand Down Expand Up @@ -42,7 +43,7 @@ async def get_current_user(credentials: auth.JWTAuthorizationCredentials = Depen
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=_("Username missing"),
)
) from None


async def get_user(username: str = Depends(get_current_user), session: Session = Depends(get_db)) -> models.User:
Expand Down Expand Up @@ -81,7 +82,8 @@ def raise_if_unauthorized(
statuses: tuple[models.ApplicationStatus, ...] = (),
) -> None:
if roles:
assert user is not None
if TYPE_CHECKING:
assert user is not None
for role in roles:
match role:
case models.UserType.OCP:
Expand Down Expand Up @@ -109,12 +111,11 @@ def raise_if_unauthorized(
detail=_("Application expired"),
)

if statuses:
if application.status not in statuses:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=_("Application status should not be %(status)s", status=_(application.status)),
)
if statuses and application.status not in statuses:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=_("Application status should not be %(status)s", status=_(application.status)),
)


def get_application_as_user(id: int, session: Session = Depends(get_db)) -> models.Application:
Expand Down
2 changes: 1 addition & 1 deletion app/mail.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ def _send_email(
parameters.setdefault("IMAGES_BASE_URL", app_settings.images_base_url)
content = (BASE_TEMPLATES_PATH / f"{template_name}.{app_settings.email_template_lang}.html").read_text()
for key, value in parameters.items():
content = content.replace("{{%s}}" % key, value)
content = content.replace("{{%s}}" % key, value) # noqa: UP031

logger.info("%s - Email to: %s sent to %s", app_settings.environment, original_addresses, to_addresses)
return ses.send_templated_email(
Expand Down
19 changes: 6 additions & 13 deletions app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ def get_order_by(sort_field: str, sort_order: str, model: type[SQLModel] | None
#
# The session.flush() calls are not strictly necessary. However, they can avoid errors like:
#
# instance.related_id = related.id # related_id is set to None
# >>> instance.related_id = related.id
# (related_id is set to None)
class ActiveRecordMixin:
@classmethod
def filter_by(cls, session: Session, field: str, value: Any) -> "Query[Self]":
Expand Down Expand Up @@ -956,24 +957,16 @@ def days_waiting_for_lender(self, session: Session) -> int:
ApplicationAction.type == ApplicationActionType.FI_REQUEST_INFORMATION
).all()

if lender_requests:
# Days between the lender starting and making a first request.
end_time = lender_requests.pop(0).created_at
else:
# Days between the lender starting and now.
end_time = datetime.now(self.tz)
# Days between the lender starting and making a first request. / Days between the lender starting and now.
end_time = lender_requests.pop(0).created_at if lender_requests else datetime.now(self.tz)
days += (end_time - self.lender_started_at).days # type: ignore[operator]

# A lender can have only one unresponded request at a time.
for borrower_response in base_query.filter(
ApplicationAction.type == ApplicationActionType.MSME_UPLOAD_ADDITIONAL_DOCUMENT_COMPLETED
):
if lender_requests:
# Days between the next request and the next response.
end_time = lender_requests.pop(0).created_at
else:
# Days between the last request and now.
end_time = datetime.now(self.tz)
# Days between the next request and the next response. / Days between the last request and now.
end_time = lender_requests.pop(0).created_at if lender_requests else datetime.now(self.tz)
days += (end_time - borrower_response.created_at).days

if not lender_requests:
Expand Down
Loading