Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nueva clase IoT #70

Merged
merged 8 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions doc/best_practices.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ Las funciones que actualmente soporta la librería son:
- modulo `cb`: Módulo que incluye funciones relacionadas con la comunicación con el Context Broker.
- `send_batch`: Función que envía un lote de entidades al Context Broker. Recibe un listado con todos los tokens por subservicio y usa el correspondiente para realizar la llamada al Context Broker. Si no se dispone de token o ha caducado, se solicita o renueva el token según el caso y luego envía los datos.
- `get_entities_page`: Función que permite la recogida de datos del Context Broker. Permite el uso de ciertos parámetros como offset, limit, orderBy y type para filtrar la recogida de datos.
- modulo `iota`: Módulo que incluye funciones relacionadas con la comunicación con el agente IoT.
- `send_http`: Función que envía un JSON al agente IoT por una petición HTTP.
- `send_batch_http`: Función que envía un un conjunto de datos en formato JSON al agente IoT por una petición HTTP. Puede recibir una lista de diccionarios o un DataFrame. En el caso de DataFrames, convierte cada fila en un diccionario y los envía uno por uno cada cierto tiempo definido en `sleep_send_batch`.

Se puede encontrar más detalles de la librería en la documentación de esta. [Ref.](../python-lib/tc_etl_lib/README.md)

Expand Down
File renamed without changes.
26 changes: 26 additions & 0 deletions python-lib/tc_etl_lib/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ entities = cbm.get_entities_page(subservice='/energia', auth=auth, type='myType'

# have a look to the retrieved entities
print (json.dumps(entities))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creo que sería mejor dejar los ejemplos que ya hay como están y usar los dos nuevos fragmentos para generar un ejemplo separado independiente.

#create an iota manager and use it
iotam: tc.iota.iotaManager = tc.iota.iotaManager(endpoint = 'http://<iota_endpoint>:<port>/iot/json', sensor_id='<sensor_id>', api_key='<api_key>')
iotam.send_http(data={"<key_1>": "<value_1>", "<key_2>": "<value_2>"})
```

Ejemplo de uso de Recogida de todos los datos de tipo SupplyPoint con y sin paginación:
Expand Down Expand Up @@ -225,6 +229,10 @@ with tc.orionStore(cb=cbm, auth=auth, subservice='/energia') as store:
# O un store sqlFile
with tc.sqlFileStore(path="inserts.sql", subservice="/energia", namespace="energy") as store:
store(entities)

# Envío de datos en ráfaga al Agente IoT.
iotam: tc.iota.iotaManager = tc.iota.iotaManager(endpoint = 'http://<iota_endpoint>:<port>/iot/json', sensor_id='<sensor_id>', api_key='<api_key>', sleep_send_batch='<time_sleep>')
iotam.send_batch_http(data=[{"<key_1>": "<value_1>", "<key_2>": "<value_2>"}, {"<key_3>": "<value_3>", "<key_4>": "<value_4>"}])
```

## Funciones disponibles en la librería
Expand Down Expand Up @@ -343,6 +351,24 @@ La librería está creada con diferentes clases dependiendo de la funcionalidad
- Reemplaza todos los espacios en blanco consecutivos por el carácter de reemplazo.
- NOTA: Esta función no recorta la longitud de la cadena devuelta a 256 caracteres, porque el llamante puede querer conservar la cadena entera para por ejemplo guardarla en algún otro atributo, antes de truncarla.

- Clase `iotaManager`: En esta clase están las funciones relacionadas con el agente IoT.

- `__init__`: constructor de objetos de la clase.
- :param obligatorio `sensor_id`: El ID del sensor.
- :param obligatorio `api_key`: La API key correspondiente al sensor.
- :param obligatorio `endpoint`: La URL del servicio al que se le quiere enviar los datos.
- :param opcional `sleep_send_batch`: Es el tiempo de espera entre cada envío de datos en segundos.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Siendo un parámetro opcional, indicar el default cuando no se especifica nada.

- `send_http`: Función que envía un archivo en formato JSON al agente IoT por petición HTTP.
- :param obligatorio: `data`: Datos a enviar. La estructura debe tener pares de elementos clave-valor (diccionario).
- :raises [TypeError](https://docs.python.org/3/library/exceptions.html#TypeError): Se lanza cuando el tipo de dato es incorrecto.
- :raises [ConnectionError](https://docs.python.org/3/library/exceptions.html#ConnectionError): Se lanza cuando no puede conectarse al servidor.
- :raises FetchError: se lanza cuando se produce un error en en la solicitud HTTP.
- :return: True si el envío de datos es exitoso.
- `send_batch_http`: Función que envía un conjunto de datos en formato JSON al agente IoT por petición HTTP.
- :param obligatorio: `data`: Datos a enviar. Puede ser una lista de diccionarios o un DataFrame.
- :raises SendBatchError: Se levanta cuando se produce una excepción dentro de `send_http`. Atrapa la excepción original y se guarda y se imprime el índice donde se produjo el error.


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incluir en el changelog (justo por encima de la marca de versión 0.9.0) un par de entradas que describan los cambios incluidos en esta PR. Por ejemplo:

  • Add: new class iotaManager to deal with IOTAgent interactions, with methods send_http and send_batch_http

Algunos ejemplos de uso de `normalizer`:

```
Expand Down
3 changes: 2 additions & 1 deletion python-lib/tc_etl_lib/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
INSTALL_REQUIRES = [
'requests==2.21.0',
'urllib3==1.24.1',
'psycopg2-binary>=2.9.5'
'psycopg2-binary>=2.9.5',
'pandas==2.0.3'
]

setup(
Expand Down
4 changes: 3 additions & 1 deletion python-lib/tc_etl_lib/tc_etl_lib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#

from .auth import authManager
from .cb import FetchError, cbManager
from .cb import cbManager
from .exceptions import FetchError
from .iota import iotaManager
from .store import Store, orionStore, sqlFileStore
from .normalizer import normalizer
29 changes: 2 additions & 27 deletions python-lib/tc_etl_lib/tc_etl_lib/cb.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,39 +34,14 @@
import time
import json

from . import authManager
from . import authManager, exceptions

# control urllib3 post and get verify in false
import urllib3, urllib3.exceptions
urllib3.disable_warnings(category=urllib3.exceptions.InsecureRequestWarning)

logger = logging.getLogger(__name__)

class FetchError(Exception):
"""
FetchError encapsulates all parameters of an HTTP request and the erroneous response
"""

response: requests.Response
method: str
url: str
params: Optional[Any] = None
headers: Optional[Any] = None
body: Optional[Any] = None

def __init__(self, response: requests.Response, method: str, url: str, params: Optional[Any] = None, headers: Optional[Any] = None, body: Optional[Any] = None):
"""Constructor for FetchError class"""
self.response = response
self.method = method
self.url = url
self.params = params
self.headers = headers
self.body = body

def __str__(self) -> str:
return f"Failed to {self.method} {self.url} (headers: {self.headers}, params: {self.params}, body: {self.body}): [{self.response.status_code}] {self.response.text}"


class cbManager:
"""ContextBroker Manager

Expand Down Expand Up @@ -260,7 +235,7 @@ def get_entities_page(self, *, service: Optional[str] = None, subservice: Option
respjson = resp.json()
logger.error(f'{respjson["name"]}: {respjson["message"]}')
if resp.status_code < 200 or resp.status_code > 204:
raise FetchError(response=resp, method="GET", url=req_url, params=params, headers=headers)
raise exceptions.FetchError(response=resp, method="GET", url=req_url, params=params, headers=headers)

return resp.json()

Expand Down
50 changes: 50 additions & 0 deletions python-lib/tc_etl_lib/tc_etl_lib/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2022 Telefónica Soluciones de Informática y Comunicaciones de España, S.A.U.
#
# This file is part of tc_etl_lib
#
# tc_etl_lib is free software: you can redistribute it and/or
# modify it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# tc_etl_lib is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero
# General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with IoT orchestrator. If not, see http://www.gnu.org/licenses/.

"""
Exceptions handling for Python.
"""

import requests
from typing import Any, Optional

class FetchError(Exception):
"""
FetchError encapsulates all parameters of an HTTP request and the erroneous response.
"""

response: requests.Response
method: str
url: str
params: Optional[Any] = None
headers: Optional[Any] = None
body: Optional[Any] = None

def __init__(self, response: requests.Response, method: str, url: str, params: Optional[Any] = None, headers: Optional[Any] = None, body: Optional[Any] = None):
"""Constructor for FetchError class."""
self.response = response
self.method = method
self.url = url
self.params = params
self.headers = headers
self.body = body

def __str__(self) -> str:
return f"Failed to {self.method} {self.url} (headers: {self.headers}, params: {self.params}, body: {self.body}): [{self.response.status_code}] {self.response.text}"
106 changes: 106 additions & 0 deletions python-lib/tc_etl_lib/tc_etl_lib/iota.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2022 Telefónica Soluciones de Informática y Comunicaciones de España, S.A.U.
#
# This file is part of tc_etl_lib
#
# tc_etl_lib is free software: you can redistribute it and/or
# modify it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# tc_etl_lib is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero
# General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with IoT orchestrator. If not, see http://www.gnu.org/licenses/.

'''
IoT Agent routines for Python:
- iotaManager.send_http
- iotaManager.send_batch_http
'''

from . import exceptions
import pandas as pd
import requests
import tc_etl_lib as tc
import time
from typing import Iterable, Optional, Union

class SendBatchError(Exception):
"SendBatchError is a class that can handle exceptions."
def __init__(self, message, original_exception=None, index=None):
super().__init__(message)
self.original_exception = original_exception
self.index = index

class iotaManager:
"""IoT Agent Manager.

endpoint: define service endpoint iota (example: https://<service>:<port>).
sensor_id: sensor ID.
api_key: API key of the corresponding sensor.
sleep_send_batch: time sleep in seconds (default: 0).
"""

endpoint: str
sensor_id: str
api_key: str
sleep_send_batch: float

def __init__(self, endpoint: str, sensor_id: str, api_key: str, sleep_send_batch: float = 0):
self.endpoint = endpoint
self.sensor_id = sensor_id
self.api_key = api_key
self.sleep_send_batch = sleep_send_batch

def send_http(self,
data: dict) -> Union[None, bool]:

if not isinstance(data, dict):
raise TypeError("The 'data' parameter should be a dictionary with key-value pairs.")

params = {
'i': self.sensor_id,
'k': self.api_key
}
headers = {
"Content-Type": "application/json"
}

try:
resp = requests.post(url=self.endpoint, json=data, params=params, headers=headers)
if resp.status_code == 200:
return True
else:
raise exceptions.FetchError(
response=resp,
method="POST",
url=self.endpoint,
params=params,
headers=headers)
except requests.exceptions.ConnectionError as e:
raise e

def send_batch_http(self, data: Iterable) -> Union[None, bool]:

if isinstance(data, pd.DataFrame):
# Convert each row of the DataFrame to a dictionary.
for i, row in data.iterrows():
try:
self.send_http(row.to_dict())
time.sleep(self.sleep_send_batch)
except Exception as e:
raise SendBatchError(f"send_batch_http error. Row that caused the error: {i}\nError detail: {str(e)}", original_exception=e, index=i) from e
else:
for i, dictionary in enumerate(data):
try:
self.send_http(dictionary)
time.sleep(self.sleep_send_batch)
except Exception as e:
raise SendBatchError(f"send_batch_http error. Index where the error occurred: {i}\nError detail: {str(e)}", original_exception=e, index=i) from e
return True
Loading