diff --git a/docker-compose.yml b/docker-compose.yml index d12b360..d8df889 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -20,7 +20,6 @@ services: api-pgd: image: ghcr.io/gestaogovbr/api-pgd:latest - pull_policy: always container_name: api-pgd depends_on: db: @@ -40,8 +39,24 @@ services: # to new `SECRET` run openssl rand -hex 32 SECRET: b8a3054ba3457614e95a88cc0807384430c1b338a54e95e4245f41e060da68bc ACCESS_TOKEN_EXPIRE_MINUTES: 30 + MAIL_USERNAME: '' + MAIL_FROM: admin@api-pgd.gov.br + MAIL_PORT: 25 + MAIL_SERVER: smtp4dev + MAIL_FROM_NAME: admin@api-pgd.gov.br + MAIL_PASSWORD: '' healthcheck: test: ["CMD", "curl", "-f", "http://0.0.0.0:5057/docs"] interval: 5s timeout: 5s retries: 20 + + smtp4dev: + image: rnwood/smtp4dev:v3 + restart: always + ports: + - '5000:80' + - '25:25' # Change the number before : to the port the SMTP server should be accessible on + - '143:143' # Change the number before : to the port the IMAP server should be accessible on + environment: + - ServerOptions__HostName=smtp4dev diff --git a/requirements.txt b/requirements.txt index 5776e03..24137b0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,3 +8,4 @@ httpx==0.24.1 python-jose[cryptography]==3.3.0 passlib[bcrypt]==1.7.4 python-multipart==0.0.6 +fastapi-mail==1.4.1 \ No newline at end of file diff --git a/src/api.py b/src/api.py index 4751803..596244a 100644 --- a/src/api.py +++ b/src/api.py @@ -12,11 +12,13 @@ from fastapi.responses import RedirectResponse from sqlalchemy.exc import IntegrityError + import schemas import crud from db_config import DbContextManager, create_db_and_tables import crud_auth from create_admin_user import init_user_admin +import email_config ACCESS_TOKEN_EXPIRE_MINUTES = int(os.environ.get("ACCESS_TOKEN_EXPIRE_MINUTES")) @@ -104,13 +106,12 @@ async def login_for_access_token( tags=["Auth"], ) async def get_users( - user_logged: Annotated[ + user_logged: Annotated[ # pylint: disable=unused-argument schemas.UsersSchema, Depends(crud_auth.get_current_admin_user), ], db: DbContextManager = Depends(DbContextManager), ) -> list[schemas.UsersGetSchema]: - return await crud_auth.get_all_users(db) @@ -120,9 +121,8 @@ async def get_users( tags=["Auth"], ) async def create_or_update_user( - user_logged: Annotated[ - schemas.UsersSchema, - Depends(crud_auth.get_current_admin_user) + user_logged: Annotated[ # pylint: disable=unused-argument + schemas.UsersSchema, Depends(crud_auth.get_current_admin_user) ], user: schemas.UsersSchema, email: str, @@ -171,7 +171,7 @@ async def create_or_update_user( tags=["Auth"], ) async def get_user( - user_logged: Annotated[ + user_logged: Annotated[ # pylint: disable=unused-argument schemas.UsersSchema, Depends(crud_auth.get_current_admin_user), ], @@ -218,6 +218,50 @@ async def delete_user( ) from exception +@app.post( + "/user/forgot_password/{email}", + summary="Recuperação de Acesso", + tags=["Auth"], +) +async def forgot_password( + email: str, + db: DbContextManager = Depends(DbContextManager), +) -> schemas.UsersInputSchema: + user = await crud_auth.get_user(db, email) + + if user: + access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) + access_token = crud_auth.create_access_token( + data={"sub": user.email}, expires_delta=access_token_expires + ) + + return await email_config.send_reset_password_mail(email, access_token) + + raise HTTPException( + status.HTTP_404_NOT_FOUND, detail=f"Usuário `{email}` não existe" + ) + + +@app.get( + "/user/reset_password/", + summary="Criar nova senha a partir do token de acesso", + tags=["Auth"], +) +async def reset_password( + access_token: str, + password: str, + db: DbContextManager = Depends(DbContextManager), +): + """ + Gera uma nova senha através do token fornecido por email. + """ + try: + return await crud_auth.user_reset_password(db, access_token, password) + + except ValueError as e: + raise HTTPException(status_code=400, detail=f"{e}") from e + + # ## DATA -------------------------------------------------- @@ -260,11 +304,6 @@ async def create_or_update_plano_entregas( plano_entregas: schemas.PlanoEntregasSchema, response: Response, db: DbContextManager = Depends(DbContextManager), - # TODO: Obter meios de verificar permissão opcional. O código abaixo - # bloqueia o acesso, mesmo informando que é opcional. - # access_token_info: Optional[FiefAccessTokenInfo] = Depends( - # auth_backend.authenticated(permissions=["all:read"], optional=True) - # ), ): """Cria um novo plano de entregas ou, se existente, substitui um plano de entregas por um novo com os dados informados.""" @@ -388,11 +427,6 @@ async def create_or_update_plano_trabalho( plano_trabalho: schemas.PlanoTrabalhoSchema, response: Response, db: DbContextManager = Depends(DbContextManager), - # TODO: Obter meios de verificar permissão opcional. O código abaixo - # bloqueia o acesso, mesmo informando que é opcional. - # access_token_info: Optional[FiefAccessTokenInfo] = Depends( - # auth_backend.authenticated(permissions=["all:read"], optional=True) - # ), ): """Cria um novo plano de trabalho ou, se existente, substitui um plano de trabalho por um novo com os dados informados.""" diff --git a/src/crud_auth.py b/src/crud_auth.py index fb31ed8..90fedb2 100644 --- a/src/crud_auth.py +++ b/src/crud_auth.py @@ -93,11 +93,7 @@ def create_access_token(data: dict, expires_delta: timedelta | None = None): return encoded_jwt - -async def get_current_user( - token: Annotated[str, Depends(oauth2_scheme)], - db: DbContextManager = Depends(DbContextManager), -): +async def verify_token(token: str, db: DbContextManager): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Credenciais não podem ser validadas", @@ -120,6 +116,18 @@ async def get_current_user( return user +async def get_current_user( + token: Annotated[str, Depends(oauth2_scheme)], + db: DbContextManager = Depends(DbContextManager), +): + return await verify_token(token, db) + +async def get_user_by_token( + token: str, + db: DbContextManager = Depends(DbContextManager), +): + return await verify_token(token, db) + async def get_current_active_user( current_user: Annotated[schemas.UsersSchema, Depends(get_current_user)] @@ -234,3 +242,30 @@ async def delete_user( await session.commit() return f"Usuário `{email}` deletado" + +async def user_reset_password(db_session: DbContextManager, + token: str, + new_password: str) -> str: + """Reset password of a user by passing a access token. + + Args: + db_session (DbContextManager): Session with api database + token (str): access token sended by email + new_password (str): the new password for encryption + + Returns: + str: Message about updated password + """ + + + user = await get_user_by_token(token, db_session) + + user.password = get_password_hash(new_password) + + async with db_session as session: + await session.execute( + update(models.Users).filter_by(email=user.email).values(**user.model_dump()) + ) + await session.commit() + + return f"Senha do Usuário {user.email} atualizada" \ No newline at end of file diff --git a/src/email_config.py b/src/email_config.py new file mode 100644 index 0000000..883732e --- /dev/null +++ b/src/email_config.py @@ -0,0 +1,69 @@ +import os +import logging +from fastapi_mail import FastMail, MessageSchema, ConnectionConfig, MessageType +from fastapi_mail.errors import DBProvaiderError, ConnectionErrors, ApiError +from starlette.responses import JSONResponse + +conf = ConnectionConfig( + MAIL_USERNAME=os.environ["MAIL_USERNAME"], + MAIL_FROM=os.environ["MAIL_FROM"], + MAIL_PORT=os.environ["MAIL_PORT"], + MAIL_SERVER=os.environ["MAIL_SERVER"], + MAIL_FROM_NAME=os.environ["MAIL_FROM_NAME"], + MAIL_STARTTLS=False, + MAIL_SSL_TLS=False, + MAIL_PASSWORD=os.environ["MAIL_PASSWORD"], + USE_CREDENTIALS=False, + VALIDATE_CERTS=False +) + +async def send_reset_password_mail(email: str, + token: str, + ) -> JSONResponse: + """Envia o e-mail contendo token para redefinir a senha. + + Args: + email (str): o email do usuário. + token (str): o token para redefinir a senha. + + Raises: + e: exceção gerada pelo FastAPI Mail. + + Returns: + JSONResponse: resposta retornada para o endpoint. + """ + token_expiration_minutes = os.environ["ACCESS_TOKEN_EXPIRE_MINUTES"] + body = f""" + +
+Olá, {email}.
+Foi solicitada a recuperação de sua senha da API PGD. + Caso essa solicitação não tenha sido feita por você, por + favor ignore esta mensagem.
+Foi gerado o seguinte token para geração de uma nova + senha.
+{token}
Utilize o endpoint /user/reset_password
com
+ este token para redefinir a senha.
([^<]+)
")
+
# post /token
def test_authenticate(header_usr_1: dict):
@@ -65,6 +72,7 @@ def test_get_all_users_logged_in_admin(client: Client, header_usr_1: dict):
response = client.get("/users", headers=header_usr_1)
assert response.status_code == status.HTTP_200_OK
+
# get /user
def test_get_user_not_logged_in(client: Client, header_not_logged_in: dict):
response = client.get("/user/foo@oi.com", headers=header_not_logged_in)
@@ -72,32 +80,26 @@ def test_get_user_not_logged_in(client: Client, header_not_logged_in: dict):
def test_get_user_logged_in_not_admin(
- client: Client, user2_credentials: dict, header_usr_2: dict # user is_admin=False
+ client: Client, user2_credentials: dict, header_usr_2: dict # user is_admin=False
):
response = client.get(f"/user/{user2_credentials['email']}", headers=header_usr_2)
assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_get_user_as_admin(
- client: Client,
- user1_credentials: dict,
- header_usr_1: dict # user is_admin=True
+ client: Client, user1_credentials: dict, header_usr_1: dict # user is_admin=True
):
response = client.get(f"/user/{user1_credentials['email']}", headers=header_usr_1)
assert response.status_code == status.HTTP_200_OK
-def test_get_user_not_exists(
- client: Client, header_usr_1: dict # user is_admin=True
-):
+def test_get_user_not_exists(client: Client, header_usr_1: dict): # user is_admin=True
response = client.get(f"/user/{USERS_TEST[1]['email']}", headers=header_usr_1)
assert response.status_code == status.HTTP_404_NOT_FOUND
def test_get_user_self_logged_in(
- client: Client,
- user1_credentials: dict,
- header_usr_1: dict # user is_admin=True
+ client: Client, user1_credentials: dict, header_usr_1: dict # user is_admin=True
):
response = client.get(f"/user/{user1_credentials['email']}", headers=header_usr_1)
assert response.status_code == status.HTTP_200_OK
@@ -116,15 +118,16 @@ def test_get_user_self_logged_in(
# create /user
def test_create_user_not_logged_in(client: Client, header_not_logged_in: dict):
response = client.put(
- f"/user/{USERS_TEST[0]['email']}", headers=header_not_logged_in, json=USERS_TEST[0]
+ f"/user/{USERS_TEST[0]['email']}",
+ headers=header_not_logged_in,
+ json=USERS_TEST[0],
)
assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_create_user_logged_in_not_admin(
- client: Client,
- header_usr_2: dict # user is_admin=False
- ):
+ client: Client, header_usr_2: dict # user is_admin=False
+):
response = client.put(
f"/user/{USERS_TEST[0]['email']}", headers=header_usr_2, json=USERS_TEST[0]
)
@@ -132,9 +135,8 @@ def test_create_user_logged_in_not_admin(
def test_create_user_logged_in_admin(
- client: Client,
- header_usr_1: dict # user is_admin=True
- ):
+ client: Client, header_usr_1: dict # user is_admin=True
+):
response = client.put(
f"/user/{USERS_TEST[0]['email']}", headers=header_usr_1, json=USERS_TEST[0]
)
@@ -142,9 +144,8 @@ def test_create_user_logged_in_admin(
def test_create_user_without_required_fields(
- client: Client,
- header_usr_1: dict # user is_admin=True
- ):
+ client: Client, header_usr_1: dict # user is_admin=True
+):
response = client.put(
f"/user/{USERS_TEST[2]['email']}", headers=header_usr_1, json=USERS_TEST[2]
)
@@ -152,8 +153,7 @@ def test_create_user_without_required_fields(
def test_create_user_bad_email_format(
- client: Client,
- header_usr_1: dict # user is_admin=True
+ client: Client, header_usr_1: dict # user is_admin=True
):
response = client.put(
f"/user/{USERS_TEST[3]['email']}", headers=header_usr_1, json=USERS_TEST[3]
@@ -162,10 +162,7 @@ def test_create_user_bad_email_format(
# update /user
-def test_update_user(
- client: Client,
- header_usr_1: dict # user is_admin=True
- ):
+def test_update_user(client: Client, header_usr_1: dict): # user is_admin=True
# get
r_1 = client.get(f"/user/{USERS_TEST[0]['email']}", headers=header_usr_1)
data_before = r_1.json()
@@ -183,44 +180,213 @@ def test_update_user(
r_3 = client.get(f"/user/{USERS_TEST[0]['email']}", headers=header_usr_1)
data_after = r_3.json()
- assert data_before.get("cod_SIAPE_instituidora", None) == data_after.get("cod_SIAPE_instituidora", None)
+ assert data_before.get("cod_SIAPE_instituidora", None) == data_after.get(
+ "cod_SIAPE_instituidora", None
+ )
# delete /user
def test_delete_user_not_logged_in(client: Client, header_not_logged_in: dict):
response = client.delete(
- f"/user/{USERS_TEST[0]['email']}", headers=header_not_logged_in)
+ f"/user/{USERS_TEST[0]['email']}", headers=header_not_logged_in
+ )
assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_delete_user_logged_in_not_admin(
- client: Client,
- header_usr_2: dict # user is_admin=False
- ):
- response = client.delete(
- f"/user/{USERS_TEST[0]['email']}", headers=header_usr_2)
+ client: Client, header_usr_2: dict # user is_admin=False
+):
+ response = client.delete(f"/user/{USERS_TEST[0]['email']}", headers=header_usr_2)
assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_delete_user_logged_in_admin(
- client: Client,
- header_usr_1: dict # user is_admin=True
- ):
- response = client.delete(
- f"/user/{USERS_TEST[0]['email']}", headers=header_usr_1)
+ client: Client, header_usr_1: dict # user is_admin=True
+):
+ response = client.delete(f"/user/{USERS_TEST[0]['email']}", headers=header_usr_1)
assert response.status_code == status.HTTP_200_OK
def test_delete_user_not_exists_logged_in_admin(
- client: Client,
- header_usr_1: dict # user is_admin=True
- ):
- response = client.delete(
- f"/user/{USERS_TEST[1]['email']}", headers=header_usr_1)
+ client: Client, header_usr_1: dict # user is_admin=True
+):
+ response = client.delete(f"/user/{USERS_TEST[1]['email']}", headers=header_usr_1)
assert response.status_code == status.HTTP_404_NOT_FOUND
def test_delete_yourself(client: Client, user1_credentials: dict, header_usr_1: dict):
response = client.delete(
- f"/user/{user1_credentials['email']}", headers=header_usr_1)
- assert response.status_code == status.HTTP_401_UNAUTHORIZED
\ No newline at end of file
+ f"/user/{user1_credentials['email']}", headers=header_usr_1
+ )
+ assert response.status_code == status.HTTP_401_UNAUTHORIZED
+
+
+# forgot/reset password
+
+
+def get_all_mailbox_messages(
+ host: str = "smtp4dev",
+ user: str = "smtp4dev",
+ password: str = "",
+) -> Generator[email.message.Message, None, None]:
+ """Get messages from the mailbox.
+
+ Args:
+ host (str, optional): IMAP connection host. Defaults to "localhost".
+ user (str, optional): IMAP connection user name. Defaults to "smtp4dev".
+ password (str, optional): IMAP connection password. Defaults to "".
+
+ Raises:
+ ValueError: if the mailbox can't be accessed.
+
+ Yields:
+ Iterator[email.message.Message]: each message found in the mailbox.
+ """
+ with IMAP4(host=host, port=143) as connection:
+ connection.login(user, password)
+ connection.enable("UTF8=ACCEPT")
+ query_status, inbox = connection.select(readonly=True)
+ if query_status != "OK":
+ raise ValueError("Access to email inbox failed.")
+ message_count = int(inbox[0])
+ for message_index in range(message_count + 1):
+ query_status, response = connection.fetch(str(message_index), "(RFC822)")
+ for item in response:
+ if isinstance(item, tuple):
+ yield email.message_from_bytes(item[1])
+
+
+def get_latest_message_uid(messages: dict[int, email.message.Message]) -> str:
+ """Get the latest message uid from the mailbox.
+
+ Args:
+ messages (dict[int,email.message.Message]): a dict containing
+ the message uids for keys and the message object for values,
+ representing all messages in the mailbox.
+
+ Returns:
+ str: the latest message's uid.
+ """
+ return max(
+ (
+ (datetime.strptime(message["date"], "%a, %d %b %Y %H:%M:%S %z"), index)
+ for index, message in messages.items()
+ )
+ )[1]
+
+
+def get_message_body(
+ uid: int,
+ host: str = "smtp4dev",
+ user: str = "smtp4dev",
+ password: str = "",
+) -> str:
+ """Given an uid, get the message's body from the IMAP server.
+
+ Args:
+ uid (int): the uid of the message.
+ host (str, optional): IMAP connection host. Defaults to "localhost".
+ user (str, optional): IMAP connection user name. Defaults to "smtp4dev".
+ password (str, optional): IMAP connection password. Defaults to "".
+
+ Raises:
+ ValueError: _description_
+
+ Returns:
+ str: _description_
+ """
+ with IMAP4(host=host, port=143) as connection:
+ connection.login(user, password)
+ connection.enable("UTF8=ACCEPT")
+ query_status, _ = connection.select(readonly=True)
+ if query_status != "OK":
+ raise ValueError("Access to email inbox failed.")
+ query_status, response = connection.fetch(str(uid), "(RFC822)")
+ message = email.message_from_bytes(response[0][1])
+ content = [
+ part for part in message.walk() if part.get_content_type() == "text/html"
+ ][0].get_payload(decode=True)
+ return content.decode("utf-8")
+
+
+def get_token_from_content(content: str) -> str:
+ """Get the token from the email content.
+
+ Args:
+ content (str): content of email.
+
+ Raises:
+ ValueError: if no token is found in the email.
+
+ Returns:
+ str: the access token.
+ """
+ match = TOKEN_IN_MESSAGE.search(content)
+ if match:
+ return match.group(1)
+ raise ValueError("Message contains no token.")
+
+
+def get_token_from_email(
+ host: str = "smtp4dev",
+ user: str = "smtp4dev",
+ password: str = "",
+) -> str:
+ """Access the mailbox and get the token from the email.
+
+ Args:
+ host (str, optional): IMAP connection host. Defaults to "localhost".
+ user (str, optional): IMAP connection user name. Defaults to "smtp4dev".
+ password (str, optional): IMAP connection password. Defaults to "".
+
+ Returns:
+ str: the access token.
+ """
+ messages = dict(enumerate(get_all_mailbox_messages(host, user, password), start=1))
+ latest_message = get_latest_message_uid(messages)
+ return get_token_from_content(get_message_body(uid=latest_message, host=host))
+
+
+def test_forgot_password(
+ register_user_1, # pylint: disable=unused-argument
+ client: Client,
+ user1_credentials: dict,
+ header_usr_1: dict,
+):
+ """Tests the forgot and reset password functionality."""
+ # use the forgot_password endpoint to send an email
+ response = client.post(
+ f"/user/forgot_password/{user1_credentials['email']}", headers=header_usr_1
+ )
+ assert response.status_code == status.HTTP_200_OK
+
+ # get the token from the email
+ access_token = get_token_from_email(host="smtp4dev")
+
+ # reset the password to a new password using the received token
+ new_password = "new_password_for_test"
+ response = client.get(
+ "/user/reset_password/",
+ params={"access_token": access_token, "password": new_password},
+ )
+ assert response.status_code == status.HTTP_200_OK
+
+ # test if the old credentials no longer work
+ response = client.post(
+ "/token",
+ data={
+ "username": user1_credentials["email"],
+ "password": user1_credentials["password"],
+ },
+ )
+ assert response.status_code == status.HTTP_401_UNAUTHORIZED
+
+ # test if the new credentials work
+ response = client.post(
+ "/token",
+ data={
+ "username": user1_credentials["email"],
+ "password": new_password,
+ },
+ )
+ assert response.status_code == status.HTTP_200_OK