diff --git a/Makefile b/Makefile index 2bfd841..ad0270d 100644 --- a/Makefile +++ b/Makefile @@ -13,12 +13,12 @@ POD_NAME ?= querido-diario DATABASE_CONTAINER_NAME ?= $(POD_NAME)-db OPENSEARCH_CONTAINER_NAME ?= $(POD_NAME)-opensearch # Database info user to run the tests -POSTGRES_USER ?= companies -POSTGRES_PASSWORD ?= companies -POSTGRES_DB ?= companiesdb -POSTGRES_HOST ?= localhost -POSTGRES_PORT ?= 5432 -POSTGRES_IMAGE ?= docker.io/postgres:10 +POSTGRES_COMPANIES_USER ?= companies +POSTGRES_COMPANIES_PASSWORD ?= companies +POSTGRES_COMPANIES_DB ?= companiesdb +POSTGRES_COMPANIES_HOST ?= localhost +POSTGRES_COMPANIES_PORT ?= 5432 +POSTGRES_COMPANIES_IMAGE ?= docker.io/postgres:10 DATABASE_RESTORE_FILE ?= contrib/data/queridodiariodb.tar # Run integration tests. Run local opensearch to validate the iteration RUN_INTEGRATION_TESTS ?= 0 @@ -73,7 +73,7 @@ destroy-pod: create-pod: setup-environment destroy-pod podman pod create --publish $(API_PORT):$(API_PORT) \ - --publish $(POSTGRES_PORT):$(POSTGRES_PORT) \ + --publish $(POSTGRES_COMPANIES_PORT):$(POSTGRES_COMPANIES_PORT) \ --publish $(OPENSEARCH_PORT1):$(OPENSEARCH_PORT1) \ --publish $(OPENSEARCH_PORT2):$(OPENSEARCH_PORT2) \ --name $(POD_NAME) @@ -166,10 +166,10 @@ start-database: podman run -d --rm -ti \ --name $(DATABASE_CONTAINER_NAME) \ --pod $(POD_NAME) \ - -e POSTGRES_PASSWORD=$(POSTGRES_PASSWORD) \ - -e POSTGRES_USER=$(POSTGRES_USER) \ - -e POSTGRES_DB=$(POSTGRES_DB) \ - $(POSTGRES_IMAGE) + -e POSTGRES_COMPANIES_PASSWORD=$(POSTGRES_COMPANIES_PASSWORD) \ + -e POSTGRES_COMPANIES_USER=$(POSTGRES_COMPANIES_USER) \ + -e POSTGRES_COMPANIES_DB=$(POSTGRES_COMPANIES_DB) \ + $(POSTGRES_COMPANIES_IMAGE) wait-database: $(call wait-for, localhost:5432) @@ -177,7 +177,7 @@ wait-database: load-database: ifneq ("$(wildcard $(DATABASE_RESTORE_FILE))","") podman cp $(DATABASE_RESTORE_FILE) $(DATABASE_CONTAINER_NAME):/mnt/dump_file - podman exec $(DATABASE_CONTAINER_NAME) bash -c "pg_restore -v -c -h localhost -U $(POSTGRES_USER) -d $(POSTGRES_DB) /mnt/dump_file || true" + podman exec $(DATABASE_CONTAINER_NAME) bash -c "pg_restore -v -c -h localhost -U $(POSTGRES_COMPANIES_USER) -d $(POSTGRES_COMPANIES_DB) /mnt/dump_file || true" else @echo "cannot restore because file does not exists '$(DATABASE_RESTORE_FILE)'" @exit 1 diff --git a/aggregates/__init__.py b/aggregates/__init__.py new file mode 100644 index 0000000..88e5ff9 --- /dev/null +++ b/aggregates/__init__.py @@ -0,0 +1,7 @@ +from .aggregates_access import ( + AggregatesAccess, + AggregatesAccessInterface, + AggregatesDatabaseInterface, + create_aggregates_interface, + Aggregates +) \ No newline at end of file diff --git a/aggregates/aggregates_access.py b/aggregates/aggregates_access.py new file mode 100644 index 0000000..66ad2c5 --- /dev/null +++ b/aggregates/aggregates_access.py @@ -0,0 +1,67 @@ +import abc +from typing import Optional, Dict + +class AggregatesDatabaseInterface(abc.ABC): + """ + Interface to access data from aggregates. + """ + + @abc.abstractmethod + def get_aggregates(self, territory_id: Optional[str] = None, state_code: str = ""): + """ + Get information about a aggregate. + """ + +class AggregatesAccessInterface(abc.ABC): + """ + Interface to access data from aggregates. + """ + + @abc.abstractmethod + def get_aggregates(self, territory_id: Optional[str] = None, state_code: str = ""): + """ + Get information about a aggregate. + """ + +class AggregatesAccess(AggregatesAccessInterface): + _database_gateway = None + + def __init__(self, database_gateway=None): + self._database_gateway = database_gateway + + def get_aggregates(self, territory_id: Optional[str] = None, state_code: str = ""): + aggregate_info = self._database_gateway.get_aggregates(territory_id, state_code) + return aggregate_info + +class Aggregates: + """ + Item to represente a aggregate in memory inside the module + """ + + def __init__( + self, + territory_id, + state_code, + file_path, + year, + last_updated, + hash_info, + file_size_mb, + ): + self.territory_id = territory_id + self.state_code = state_code + self.file_path = file_path + self.year = year + self.last_updated = last_updated + self.hash_info = hash_info + self.file_size_mb = file_size_mb + + def __repr__(self): + return f"Aggregates(territory_id={self.territory_id}, state_code={self.state_code}, file_path={self.file_path}, year={self.year}, last_updated={self.last_updated}, hash_info={self.hash_info}, file_size_mb={self.file_size_mb})" + +def create_aggregates_interface(database_gateway: AggregatesDatabaseInterface) -> AggregatesAccessInterface: + if not isinstance(database_gateway, AggregatesDatabaseInterface): + raise Exception( + "Database gateway should implement the AggregatesDatabaseInterface interface" + ) + return AggregatesAccess(database_gateway) \ No newline at end of file diff --git a/api/api.py b/api/api.py index 1bad5f8..18c71ff 100644 --- a/api/api.py +++ b/api/api.py @@ -14,6 +14,7 @@ from config.config import load_configuration from themed_excerpts import ThemedExcerptAccessInterface, ThemedExcerptAccessInterface from themed_excerpts.themed_excerpt_access import ThemedExcerptRequest +from aggregates import AggregatesAccessInterface config = load_configuration() @@ -87,6 +88,17 @@ class Entity(BaseModel): class EntitiesSearchResponse(BaseModel): entities: List[Entity] +class Aggregates(BaseModel): + territory_id: str + state_code: str + file_path: str + year: str + last_updated: datetime + hash_info: str + file_size_mb: str + +class AggregatesSearchResponse(BaseModel): + aggregates: List[Aggregates] @unique class CityLevel(str, Enum): @@ -554,6 +566,28 @@ async def get_partners( return {"total_partners": total_partners, "partners": partners} +@app.get( + "/aggregates/{state_code}", + name="Get aggregated data files by state code and optionally territory ID", + response_model=AggregatesSearchResponse, + description="Get information about a aggregate by state code and territory ID.", + responses={ + 404: {"model": HTTPExceptionMessage, "description": "State and/or city not found."}, + },) +async def get_aggregates(territory_id: Optional[str] = Query(None, description="City's 7-digit IBGE ID."), + state_code: str = Path(..., description="City's state code.")): + + aggregates = app.aggregates.get_aggregates(territory_id, state_code.upper()) + + if not aggregates: + return JSONResponse(status_code=404, content={"detail":"No aggregate file was found for the data reported."}) + + return JSONResponse(status_code=200, + content={ + "state_code":state_code.upper(), + "territory_id":territory_id, + "aggregates":aggregates} + ) def configure_api_app( gazettes: GazetteAccessInterface, @@ -561,6 +595,7 @@ def configure_api_app( cities: CityAccessInterface, suggestion_service: SuggestionServiceInterface, companies: CompaniesAccessInterface, + aggregates: AggregatesAccessInterface, api_root_path=None, ): if not isinstance(gazettes, GazetteAccessInterface): @@ -583,6 +618,10 @@ def configure_api_app( raise Exception( "Only CompaniesAccessInterface object are accepted for companies parameter" ) + if not isinstance(aggregates, AggregatesAccessInterface): + raise Exception( + "Only AggregatesAccessInterface object are accepted for aggregates parameter" + ) if api_root_path is not None and type(api_root_path) != str: raise Exception("Invalid api_root_path") app.gazettes = gazettes @@ -590,4 +629,5 @@ def configure_api_app( app.cities = cities app.suggestion_service = suggestion_service app.companies = companies + app.aggregates = aggregates app.root_path = api_root_path diff --git a/config/config.py b/config/config.py index 1f796cd..684d744 100644 --- a/config/config.py +++ b/config/config.py @@ -91,13 +91,18 @@ def __init__(self): self.themed_excerpt_number_of_fragments = int( os.environ.get("THEMED_EXCERPT_NUMBER_OF_FRAGMENTS", 1) ) - self.companies_database_host = os.environ.get("POSTGRES_HOST", "") - self.companies_database_db = os.environ.get("POSTGRES_DB", "") - self.companies_database_user = os.environ.get("POSTGRES_USER", "") - self.companies_database_pass = os.environ.get("POSTGRES_PASSWORD", "") - self.companies_database_port = os.environ.get("POSTGRES_PORT", "") + self.companies_database_host = os.environ.get("POSTGRES_COMPANIES_HOST", "") + self.companies_database_db = os.environ.get("POSTGRES_COMPANIES_DB", "") + self.companies_database_user = os.environ.get("POSTGRES_COMPANIES_USER", "") + self.companies_database_pass = os.environ.get("POSTGRES_COMPANIES_PASSWORD", "") + self.companies_database_port = os.environ.get("POSTGRES_COMPANIES_PORT", "") self.opensearch_user = os.environ.get("QUERIDO_DIARIO_OPENSEARCH_USER", "") self.opensearch_pswd = os.environ.get("QUERIDO_DIARIO_OPENSEARCH_PASSWORD", "") + self.aggregates_database_host = os.environ.get("POSTGRES_AGGREGATES_HOST", "") + self.aggregates_database_db = os.environ.get("POSTGRES_AGGREGATES_DB", "") + self.aggregates_database_user = os.environ.get("POSTGRES_AGGREGATES_USER", "") + self.aggregates_database_pass = os.environ.get("POSTGRES_AGGREGATES_PASSWORD", "") + self.aggregates_database_port = os.environ.get("POSTGRES_AGGREGATES_PORT", "") @classmethod def _load_list(cls, key, default=[]): value = os.environ.get(key, default) diff --git a/config/sample.env b/config/sample.env index 8b4a0e2..41fb740 100644 --- a/config/sample.env +++ b/config/sample.env @@ -8,11 +8,17 @@ QUERIDO_DIARIO_SUGGESTION_SENDER_EMAIL=example@email.com QUERIDO_DIARIO_SUGGESTION_RECIPIENT_NAME=Recipient Name QUERIDO_DIARIO_SUGGESTION_RECIPIENT_EMAIL=example@email.com QUERIDO_DIARIO_SUGGESTION_MAILJET_CUSTOM_ID=AppCustomID -POSTGRES_USER=companies -POSTGRES_PASSWORD=companies -POSTGRES_DB=companiesdb -POSTGRES_HOST=localhost -POSTGRES_PORT=5432 +QUERIDO_DIARIO_FILES_ENDPOINT=http://localhost:9000/queridodiariobucket/ +POSTGRES_COMPANIES_USER=companies +POSTGRES_COMPANIES_PASSWORD=companies +POSTGRES_COMPANIES_DB=companiesdb +POSTGRES_COMPANIES_HOST=localhost +POSTGRES_COMPANIES_PORT=5432 +POSTGRES_AGGREGATES_USER=queridodiario +POSTGRES_AGGREGATES_PASSWORD=queridodiario +POSTGRES_AGGREGATES_DB=queridodiariodb +POSTGRES_AGGREGATES_HOST=localhost +POSTGRES_AGGREGATES_PORT=5432 CITY_DATABASE_CSV=censo.csv GAZETTE_OPENSEARCH_INDEX=querido-diario GAZETTE_CONTENT_FIELD=source_text diff --git a/database/__init__.py b/database/__init__.py index 0d0e2de..5ba06e1 100644 --- a/database/__init__.py +++ b/database/__init__.py @@ -1,9 +1,16 @@ from companies import CompaniesDatabaseInterface +from aggregates import AggregatesDatabaseInterface -from .postgresql import PostgreSQLDatabase +from .postgresql import PostgreSQLDatabaseCompanies, PostgreSQLDatabaseAggregates def create_companies_database_interface( db_host, db_name, db_user, db_pass, db_port ) -> CompaniesDatabaseInterface: - return PostgreSQLDatabase(db_host, db_name, db_user, db_pass, db_port) + return PostgreSQLDatabaseCompanies(db_host, db_name, db_user, db_pass, db_port) + + +def create_aggregates_database_interface( + db_host, db_name, db_user, db_pass, db_port +) -> AggregatesDatabaseInterface: + return PostgreSQLDatabaseAggregates(db_host, db_name, db_user, db_pass, db_port) diff --git a/database/postgresql.py b/database/postgresql.py index e780c43..93a73d9 100644 --- a/database/postgresql.py +++ b/database/postgresql.py @@ -1,20 +1,21 @@ import logging import re -from typing import Any, Dict, Iterable, List, Tuple, Union +import os +from typing import Any, Dict, Iterable, List, Tuple, Union, Optional import psycopg2 from companies import Company, InvalidCNPJException, Partner, CompaniesDatabaseInterface +from aggregates import AggregatesDatabaseInterface, Aggregates - -class PostgreSQLDatabase(CompaniesDatabaseInterface): +class PostgreSQLDatabase: def __init__(self, host, database, user, password, port): self.host = host self.database = database self.user = user self.password = password self.port = port - + def _select(self, command: str, data: Dict = {}) -> Iterable[Tuple]: connection = psycopg2.connect( dbname=self.database, @@ -30,6 +31,16 @@ def _select(self, command: str, data: Dict = {}) -> Iterable[Tuple]: logging.debug(entry) yield entry logging.debug(f"Finished query: {cursor.query}") + + def _always_str_or_none(self, data: Any) -> Union[str, None]: + if data == "None" or data == "" or data is None: + return None + elif not isinstance(data, str): + return str(data) + else: + return data + +class PostgreSQLDatabaseCompanies(PostgreSQLDatabase, CompaniesDatabaseInterface): def get_company(self, cnpj: str = "") -> Union[Company, None]: command = """ @@ -153,14 +164,6 @@ def _format_partner_data(self, data: Tuple, cnpj: str) -> Partner: faixa_etaria=formatted_data[11], ) - def _always_str_or_none(self, data: Any) -> Union[str, None]: - if data == "None" or data == "" or data is None: - return None - elif not isinstance(data, str): - return str(data) - else: - return data - def _is_valid_cnpj(self, cnpj: str) -> bool: cnpj_only_digits = self._cnpj_only_digits(cnpj) if cnpj_only_digits == "" or len(cnpj_only_digits) > 14: @@ -224,3 +227,51 @@ def _unsplit_cnpj(self, cnpj_basico: str, cnpj_ordem: str, cnpj_dv: str) -> str: cnpj_ordem=str(cnpj_ordem).zfill(4), cnpj_dv=str(cnpj_dv).zfill(2), ) + +class PostgreSQLDatabaseAggregates(PostgreSQLDatabase, AggregatesDatabaseInterface): + + def _format_aggregates_data(self, data: Tuple) -> Aggregates: + formatted_data = [self._always_str_or_none(value) for value in data] + return Aggregates( + territory_id=formatted_data[1], + state_code=formatted_data[2], + file_path=os.environ.get("QUERIDO_DIARIO_FILES_ENDPOINT","")+formatted_data[4], + year=formatted_data[3], + hash_info=formatted_data[6], + file_size_mb=formatted_data[5], + last_updated=formatted_data[7] + ) + + def get_aggregates(self, territory_id: Optional[str] = None, state_code: str = "") -> Union[List[Aggregates], None]: + command = """ + SELECT + * + FROM + aggregates + WHERE + state_code = %(state_code)s + AND + territory_id {territory_id_query_statement} + ORDER BY year DESC + """ + + data = { + "state_code": state_code + } + + if territory_id is None: + command = command.format(territory_id_query_statement="IS NULL") + else: + data["territory_id"] = territory_id + command = command.format(territory_id_query_statement="= %(territory_id)s") + + results = list(self._select(command, data)) + + if not results: + return [] + + return ( + [vars(self._format_aggregates_data(result)) for result in results] + ) + + \ No newline at end of file diff --git a/main/__main__.py b/main/__main__.py index f1dcd92..e4e3b72 100644 --- a/main/__main__.py +++ b/main/__main__.py @@ -4,7 +4,8 @@ from cities import create_cities_data_gateway, create_cities_interface from config import load_configuration from companies import create_companies_interface -from database import create_companies_database_interface +from aggregates import create_aggregates_interface +from database import create_companies_database_interface, create_aggregates_database_interface from gazettes import ( create_gazettes_interface, create_gazettes_data_gateway, @@ -80,13 +81,23 @@ db_port=configuration.companies_database_port, ) companies_interface = create_companies_interface(companies_database) +aggregates_database = create_aggregates_database_interface( + db_host=configuration.aggregates_database_host, + db_name=configuration.aggregates_database_db, + db_user=configuration.aggregates_database_user, + db_pass=configuration.aggregates_database_pass, + db_port=configuration.aggregates_database_port, +) +aggregates_interface = create_aggregates_interface(aggregates_database) + configure_api_app( gazettes_interface, themed_excerpts_interface, cities_interface, suggestion_service, companies_interface, - configuration.root_path, + aggregates_interface, + configuration.root_path ) uvicorn.run(app, host="0.0.0.0", port=8080, root_path=configuration.root_path)