From 28b038a195afd4faa0caa0b73969a8958233679d Mon Sep 17 00:00:00 2001 From: Augusto Herrmann Date: Wed, 24 Jan 2024 10:38:04 -0300 Subject: [PATCH] Reimplement truncate_users without using delete using instead the TRUNCATE TABLE sql command --- src/api.py | 3 +-- src/create_admin_user.py | 28 ---------------------------- src/crud.py | 8 ++++++++ src/crud_auth.py | 24 +++++++++++++++++++++++- tests/conftest.py | 15 +++++---------- 5 files changed, 37 insertions(+), 41 deletions(-) delete mode 100644 src/create_admin_user.py diff --git a/src/api.py b/src/api.py index df1ed81..2ec5b10 100644 --- a/src/api.py +++ b/src/api.py @@ -17,7 +17,6 @@ import crud from db_config import DbContextManager, create_db_and_tables import crud_auth -from create_admin_user import init_user_admin import email_config ACCESS_TOKEN_EXPIRE_MINUTES = int(os.environ.get("ACCESS_TOKEN_EXPIRE_MINUTES")) @@ -43,7 +42,7 @@ @app.on_event("startup") async def on_startup(): await create_db_and_tables() - await init_user_admin() + await crud_auth.init_user_admin() @app.get("/", include_in_schema=False) diff --git a/src/create_admin_user.py b/src/create_admin_user.py deleted file mode 100644 index 5df14a0..0000000 --- a/src/create_admin_user.py +++ /dev/null @@ -1,28 +0,0 @@ -import os - -from db_config import async_session_maker -from crud_auth import get_password_hash, get_user -from models import Users - -API_PGD_ADMIN_USER = os.environ.get("API_PGD_ADMIN_USER") -API_PGD_ADMIN_PASSWORD = os.environ.get("API_PGD_ADMIN_PASSWORD") - - -async def init_user_admin(): - db_session = async_session_maker() - - if not await get_user(db_session=db_session, email=API_PGD_ADMIN_USER): - new_user = Users( - email=API_PGD_ADMIN_USER, - # b-crypt - password=get_password_hash(API_PGD_ADMIN_PASSWORD), - is_admin=True, - cod_SIAPE_instituidora=1, - ) - - async with db_session as session: - session.add(new_user) - await session.commit() - print(f"API_PGD_ADMIN: Usuário administrador `{API_PGD_ADMIN_USER}` criado") - else: - print(f"API_PGD_ADMIN: Usuário administrador `{API_PGD_ADMIN_USER}` já existe") diff --git a/src/crud.py b/src/crud.py index 0793a9e..9bb75e3 100644 --- a/src/crud.py +++ b/src/crud.py @@ -440,3 +440,11 @@ def truncate_status_participante(): with SyncSession.begin() as session: result = session.execute(text("TRUNCATE status_participante CASCADE;")) return result + +def truncate_user(): + """Apaga a tabela users. + Usado no ambiente de testes de integração contínua. + """ + with SyncSession.begin() as session: + result = session.execute(text("TRUNCATE users CASCADE;")) + return result diff --git a/src/crud_auth.py b/src/crud_auth.py index 680c031..069f882 100644 --- a/src/crud_auth.py +++ b/src/crud_auth.py @@ -9,11 +9,13 @@ from passlib.context import CryptContext import models, schemas -from db_config import DbContextManager +from db_config import DbContextManager, async_session_maker SECRET_KEY = str(os.environ.get("SECRET_KEY")) ALGORITHM = str(os.environ.get("ALGORITHM")) +API_PGD_ADMIN_USER = os.environ.get("API_PGD_ADMIN_USER") +API_PGD_ADMIN_PASSWORD = os.environ.get("API_PGD_ADMIN_PASSWORD") pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") @@ -147,6 +149,26 @@ async def get_current_active_user( return current_user +async def init_user_admin(): + db_session = async_session_maker() + + if not await get_user(db_session=db_session, email=API_PGD_ADMIN_USER): + new_user = models.Users( + email=API_PGD_ADMIN_USER, + # b-crypt + password=get_password_hash(API_PGD_ADMIN_PASSWORD), + is_admin=True, + cod_SIAPE_instituidora=1, + ) + + async with db_session as session: + session.add(new_user) + await session.commit() + print(f"API_PGD_ADMIN: Usuário administrador `{API_PGD_ADMIN_USER}` criado") + else: + print(f"API_PGD_ADMIN: Usuário administrador `{API_PGD_ADMIN_USER}` já existe") + + async def get_current_admin_user( current_user: Annotated[schemas.UsersSchema, Depends(get_current_user)] ): diff --git a/tests/conftest.py b/tests/conftest.py index 9405a36..a051394 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -6,6 +6,7 @@ import sys import json from typing import Generator, Optional +import asyncio import httpx from fastapi.testclient import TestClient @@ -17,7 +18,9 @@ truncate_plano_entregas, truncate_plano_trabalho, truncate_status_participante, + truncate_user, ) +from crud_auth import init_user_admin from api import app USERS_CREDENTIALS = [ @@ -267,16 +270,8 @@ def truncate_participantes(): @pytest.fixture(scope="module", name="truncate_users") def fixture_truncate_users(admin_credentials: dict): - for del_user_email in get_all_users( - admin_credentials["username"], admin_credentials["password"] - ): - if del_user_email != admin_credentials["username"]: - response = delete_user( - admin_credentials["username"], - admin_credentials["password"], - del_user_email, - ) - response.raise_for_status() + truncate_user() + asyncio.get_event_loop().run_until_complete(init_user_admin()) @pytest.fixture(scope="module", name="register_user_1")