Skip to content

Commit

Permalink
Implementa rotas para acesso ao dados de arquivos com diários agregad…
Browse files Browse the repository at this point in the history
…os (#73)

Implementação da rota para consulta de meta dados dos arquivos
agregados. A rota devolve todas as informações relevantes mantidas na
tabela 'aggregates', que são:

- territory_id,
- state_code,
- url_zip,
- year,
- last_updated,
- hash_info,
- file_size
  • Loading branch information
ogecece authored Aug 19, 2024
2 parents 5f10af5 + b8b09cf commit f07e32b
Show file tree
Hide file tree
Showing 9 changed files with 232 additions and 38 deletions.
24 changes: 12 additions & 12 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ POD_NAME ?= querido-diario
DATABASE_CONTAINER_NAME ?= $(POD_NAME)-db
OPENSEARCH_CONTAINER_NAME ?= $(POD_NAME)-opensearch
# Database info user to run the tests
POSTGRES_USER ?= companies
POSTGRES_PASSWORD ?= companies
POSTGRES_DB ?= companiesdb
POSTGRES_HOST ?= localhost
POSTGRES_PORT ?= 5432
POSTGRES_IMAGE ?= docker.io/postgres:10
POSTGRES_COMPANIES_USER ?= companies
POSTGRES_COMPANIES_PASSWORD ?= companies
POSTGRES_COMPANIES_DB ?= companiesdb
POSTGRES_COMPANIES_HOST ?= localhost
POSTGRES_COMPANIES_PORT ?= 5432
POSTGRES_COMPANIES_IMAGE ?= docker.io/postgres:10
DATABASE_RESTORE_FILE ?= contrib/data/queridodiariodb.tar
# Run integration tests. Run local opensearch to validate the iteration
RUN_INTEGRATION_TESTS ?= 0
Expand Down Expand Up @@ -73,7 +73,7 @@ destroy-pod:

create-pod: setup-environment destroy-pod
podman pod create --publish $(API_PORT):$(API_PORT) \
--publish $(POSTGRES_PORT):$(POSTGRES_PORT) \
--publish $(POSTGRES_COMPANIES_PORT):$(POSTGRES_COMPANIES_PORT) \
--publish $(OPENSEARCH_PORT1):$(OPENSEARCH_PORT1) \
--publish $(OPENSEARCH_PORT2):$(OPENSEARCH_PORT2) \
--name $(POD_NAME)
Expand Down Expand Up @@ -166,18 +166,18 @@ start-database:
podman run -d --rm -ti \
--name $(DATABASE_CONTAINER_NAME) \
--pod $(POD_NAME) \
-e POSTGRES_PASSWORD=$(POSTGRES_PASSWORD) \
-e POSTGRES_USER=$(POSTGRES_USER) \
-e POSTGRES_DB=$(POSTGRES_DB) \
$(POSTGRES_IMAGE)
-e POSTGRES_COMPANIES_PASSWORD=$(POSTGRES_COMPANIES_PASSWORD) \
-e POSTGRES_COMPANIES_USER=$(POSTGRES_COMPANIES_USER) \
-e POSTGRES_COMPANIES_DB=$(POSTGRES_COMPANIES_DB) \
$(POSTGRES_COMPANIES_IMAGE)

wait-database:
$(call wait-for, localhost:5432)

load-database:
ifneq ("$(wildcard $(DATABASE_RESTORE_FILE))","")
podman cp $(DATABASE_RESTORE_FILE) $(DATABASE_CONTAINER_NAME):/mnt/dump_file
podman exec $(DATABASE_CONTAINER_NAME) bash -c "pg_restore -v -c -h localhost -U $(POSTGRES_USER) -d $(POSTGRES_DB) /mnt/dump_file || true"
podman exec $(DATABASE_CONTAINER_NAME) bash -c "pg_restore -v -c -h localhost -U $(POSTGRES_COMPANIES_USER) -d $(POSTGRES_COMPANIES_DB) /mnt/dump_file || true"
else
@echo "cannot restore because file does not exists '$(DATABASE_RESTORE_FILE)'"
@exit 1
Expand Down
7 changes: 7 additions & 0 deletions aggregates/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from .aggregates_access import (
AggregatesAccess,
AggregatesAccessInterface,
AggregatesDatabaseInterface,
create_aggregates_interface,
Aggregates
)
67 changes: 67 additions & 0 deletions aggregates/aggregates_access.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import abc
from typing import Optional, Dict

class AggregatesDatabaseInterface(abc.ABC):
"""
Interface to access data from aggregates.
"""

@abc.abstractmethod
def get_aggregates(self, territory_id: Optional[str] = None, state_code: str = ""):
"""
Get information about a aggregate.
"""

class AggregatesAccessInterface(abc.ABC):
"""
Interface to access data from aggregates.
"""

@abc.abstractmethod
def get_aggregates(self, territory_id: Optional[str] = None, state_code: str = ""):
"""
Get information about a aggregate.
"""

class AggregatesAccess(AggregatesAccessInterface):
_database_gateway = None

def __init__(self, database_gateway=None):
self._database_gateway = database_gateway

def get_aggregates(self, territory_id: Optional[str] = None, state_code: str = ""):
aggregate_info = self._database_gateway.get_aggregates(territory_id, state_code)
return aggregate_info

class Aggregates:
"""
Item to represente a aggregate in memory inside the module
"""

def __init__(
self,
territory_id,
state_code,
file_path,
year,
last_updated,
hash_info,
file_size_mb,
):
self.territory_id = territory_id
self.state_code = state_code
self.file_path = file_path
self.year = year
self.last_updated = last_updated
self.hash_info = hash_info
self.file_size_mb = file_size_mb

def __repr__(self):
return f"Aggregates(territory_id={self.territory_id}, state_code={self.state_code}, file_path={self.file_path}, year={self.year}, last_updated={self.last_updated}, hash_info={self.hash_info}, file_size_mb={self.file_size_mb})"

def create_aggregates_interface(database_gateway: AggregatesDatabaseInterface) -> AggregatesAccessInterface:
if not isinstance(database_gateway, AggregatesDatabaseInterface):
raise Exception(
"Database gateway should implement the AggregatesDatabaseInterface interface"
)
return AggregatesAccess(database_gateway)
40 changes: 40 additions & 0 deletions api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from config.config import load_configuration
from themed_excerpts import ThemedExcerptAccessInterface, ThemedExcerptAccessInterface
from themed_excerpts.themed_excerpt_access import ThemedExcerptRequest
from aggregates import AggregatesAccessInterface

config = load_configuration()

Expand Down Expand Up @@ -87,6 +88,17 @@ class Entity(BaseModel):
class EntitiesSearchResponse(BaseModel):
entities: List[Entity]

class Aggregates(BaseModel):
territory_id: str
state_code: str
file_path: str
year: str
last_updated: datetime
hash_info: str
file_size_mb: str

class AggregatesSearchResponse(BaseModel):
aggregates: List[Aggregates]

@unique
class CityLevel(str, Enum):
Expand Down Expand Up @@ -554,13 +566,36 @@ async def get_partners(

return {"total_partners": total_partners, "partners": partners}

@app.get(
"/aggregates/{state_code}",
name="Get aggregated data files by state code and optionally territory ID",
response_model=AggregatesSearchResponse,
description="Get information about a aggregate by state code and territory ID.",
responses={
404: {"model": HTTPExceptionMessage, "description": "State and/or city not found."},
},)
async def get_aggregates(territory_id: Optional[str] = Query(None, description="City's 7-digit IBGE ID."),
state_code: str = Path(..., description="City's state code.")):

aggregates = app.aggregates.get_aggregates(territory_id, state_code.upper())

if not aggregates:
return JSONResponse(status_code=404, content={"detail":"No aggregate file was found for the data reported."})

return JSONResponse(status_code=200,
content={
"state_code":state_code.upper(),
"territory_id":territory_id,
"aggregates":aggregates}
)

def configure_api_app(
gazettes: GazetteAccessInterface,
themed_excerpts: ThemedExcerptAccessInterface,
cities: CityAccessInterface,
suggestion_service: SuggestionServiceInterface,
companies: CompaniesAccessInterface,
aggregates: AggregatesAccessInterface,
api_root_path=None,
):
if not isinstance(gazettes, GazetteAccessInterface):
Expand All @@ -583,11 +618,16 @@ def configure_api_app(
raise Exception(
"Only CompaniesAccessInterface object are accepted for companies parameter"
)
if not isinstance(aggregates, AggregatesAccessInterface):
raise Exception(
"Only AggregatesAccessInterface object are accepted for aggregates parameter"
)
if api_root_path is not None and type(api_root_path) != str:
raise Exception("Invalid api_root_path")
app.gazettes = gazettes
app.themed_excerpts = themed_excerpts
app.cities = cities
app.suggestion_service = suggestion_service
app.companies = companies
app.aggregates = aggregates
app.root_path = api_root_path
15 changes: 10 additions & 5 deletions config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,18 @@ def __init__(self):
self.themed_excerpt_number_of_fragments = int(
os.environ.get("THEMED_EXCERPT_NUMBER_OF_FRAGMENTS", 1)
)
self.companies_database_host = os.environ.get("POSTGRES_HOST", "")
self.companies_database_db = os.environ.get("POSTGRES_DB", "")
self.companies_database_user = os.environ.get("POSTGRES_USER", "")
self.companies_database_pass = os.environ.get("POSTGRES_PASSWORD", "")
self.companies_database_port = os.environ.get("POSTGRES_PORT", "")
self.companies_database_host = os.environ.get("POSTGRES_COMPANIES_HOST", "")
self.companies_database_db = os.environ.get("POSTGRES_COMPANIES_DB", "")
self.companies_database_user = os.environ.get("POSTGRES_COMPANIES_USER", "")
self.companies_database_pass = os.environ.get("POSTGRES_COMPANIES_PASSWORD", "")
self.companies_database_port = os.environ.get("POSTGRES_COMPANIES_PORT", "")
self.opensearch_user = os.environ.get("QUERIDO_DIARIO_OPENSEARCH_USER", "")
self.opensearch_pswd = os.environ.get("QUERIDO_DIARIO_OPENSEARCH_PASSWORD", "")
self.aggregates_database_host = os.environ.get("POSTGRES_AGGREGATES_HOST", "")
self.aggregates_database_db = os.environ.get("POSTGRES_AGGREGATES_DB", "")
self.aggregates_database_user = os.environ.get("POSTGRES_AGGREGATES_USER", "")
self.aggregates_database_pass = os.environ.get("POSTGRES_AGGREGATES_PASSWORD", "")
self.aggregates_database_port = os.environ.get("POSTGRES_AGGREGATES_PORT", "")
@classmethod
def _load_list(cls, key, default=[]):
value = os.environ.get(key, default)
Expand Down
16 changes: 11 additions & 5 deletions config/sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,17 @@ [email protected]
QUERIDO_DIARIO_SUGGESTION_RECIPIENT_NAME=Recipient Name
QUERIDO_DIARIO_SUGGESTION_RECIPIENT_EMAIL=[email protected]
QUERIDO_DIARIO_SUGGESTION_MAILJET_CUSTOM_ID=AppCustomID
POSTGRES_USER=companies
POSTGRES_PASSWORD=companies
POSTGRES_DB=companiesdb
POSTGRES_HOST=localhost
POSTGRES_PORT=5432
QUERIDO_DIARIO_FILES_ENDPOINT=http://localhost:9000/queridodiariobucket/
POSTGRES_COMPANIES_USER=companies
POSTGRES_COMPANIES_PASSWORD=companies
POSTGRES_COMPANIES_DB=companiesdb
POSTGRES_COMPANIES_HOST=localhost
POSTGRES_COMPANIES_PORT=5432
POSTGRES_AGGREGATES_USER=queridodiario
POSTGRES_AGGREGATES_PASSWORD=queridodiario
POSTGRES_AGGREGATES_DB=queridodiariodb
POSTGRES_AGGREGATES_HOST=localhost
POSTGRES_AGGREGATES_PORT=5432
CITY_DATABASE_CSV=censo.csv
GAZETTE_OPENSEARCH_INDEX=querido-diario
GAZETTE_CONTENT_FIELD=source_text
Expand Down
11 changes: 9 additions & 2 deletions database/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
from companies import CompaniesDatabaseInterface
from aggregates import AggregatesDatabaseInterface

from .postgresql import PostgreSQLDatabase
from .postgresql import PostgreSQLDatabaseCompanies, PostgreSQLDatabaseAggregates


def create_companies_database_interface(
db_host, db_name, db_user, db_pass, db_port
) -> CompaniesDatabaseInterface:
return PostgreSQLDatabase(db_host, db_name, db_user, db_pass, db_port)
return PostgreSQLDatabaseCompanies(db_host, db_name, db_user, db_pass, db_port)


def create_aggregates_database_interface(
db_host, db_name, db_user, db_pass, db_port
) -> AggregatesDatabaseInterface:
return PostgreSQLDatabaseAggregates(db_host, db_name, db_user, db_pass, db_port)
75 changes: 63 additions & 12 deletions database/postgresql.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
import logging
import re
from typing import Any, Dict, Iterable, List, Tuple, Union
import os
from typing import Any, Dict, Iterable, List, Tuple, Union, Optional

import psycopg2

from companies import Company, InvalidCNPJException, Partner, CompaniesDatabaseInterface
from aggregates import AggregatesDatabaseInterface, Aggregates


class PostgreSQLDatabase(CompaniesDatabaseInterface):
class PostgreSQLDatabase:
def __init__(self, host, database, user, password, port):
self.host = host
self.database = database
self.user = user
self.password = password
self.port = port

def _select(self, command: str, data: Dict = {}) -> Iterable[Tuple]:
connection = psycopg2.connect(
dbname=self.database,
Expand All @@ -30,6 +31,16 @@ def _select(self, command: str, data: Dict = {}) -> Iterable[Tuple]:
logging.debug(entry)
yield entry
logging.debug(f"Finished query: {cursor.query}")

def _always_str_or_none(self, data: Any) -> Union[str, None]:
if data == "None" or data == "" or data is None:
return None
elif not isinstance(data, str):
return str(data)
else:
return data

class PostgreSQLDatabaseCompanies(PostgreSQLDatabase, CompaniesDatabaseInterface):

def get_company(self, cnpj: str = "") -> Union[Company, None]:
command = """
Expand Down Expand Up @@ -153,14 +164,6 @@ def _format_partner_data(self, data: Tuple, cnpj: str) -> Partner:
faixa_etaria=formatted_data[11],
)

def _always_str_or_none(self, data: Any) -> Union[str, None]:
if data == "None" or data == "" or data is None:
return None
elif not isinstance(data, str):
return str(data)
else:
return data

def _is_valid_cnpj(self, cnpj: str) -> bool:
cnpj_only_digits = self._cnpj_only_digits(cnpj)
if cnpj_only_digits == "" or len(cnpj_only_digits) > 14:
Expand Down Expand Up @@ -224,3 +227,51 @@ def _unsplit_cnpj(self, cnpj_basico: str, cnpj_ordem: str, cnpj_dv: str) -> str:
cnpj_ordem=str(cnpj_ordem).zfill(4),
cnpj_dv=str(cnpj_dv).zfill(2),
)

class PostgreSQLDatabaseAggregates(PostgreSQLDatabase, AggregatesDatabaseInterface):

def _format_aggregates_data(self, data: Tuple) -> Aggregates:
formatted_data = [self._always_str_or_none(value) for value in data]
return Aggregates(
territory_id=formatted_data[1],
state_code=formatted_data[2],
file_path=os.environ.get("QUERIDO_DIARIO_FILES_ENDPOINT","")+formatted_data[4],
year=formatted_data[3],
hash_info=formatted_data[6],
file_size_mb=formatted_data[5],
last_updated=formatted_data[7]
)

def get_aggregates(self, territory_id: Optional[str] = None, state_code: str = "") -> Union[List[Aggregates], None]:
command = """
SELECT
*
FROM
aggregates
WHERE
state_code = %(state_code)s
AND
territory_id {territory_id_query_statement}
ORDER BY year DESC
"""

data = {
"state_code": state_code
}

if territory_id is None:
command = command.format(territory_id_query_statement="IS NULL")
else:
data["territory_id"] = territory_id
command = command.format(territory_id_query_statement="= %(territory_id)s")

results = list(self._select(command, data))

if not results:
return []

return (
[vars(self._format_aggregates_data(result)) for result in results]
)


Loading

0 comments on commit f07e32b

Please sign in to comment.