From fcb17da85efd5a1100b8d14301bf91ac654d9359 Mon Sep 17 00:00:00 2001 From: Joao Andre Date: Sat, 25 May 2024 19:42:01 -0300 Subject: [PATCH] feat: update retry methods and queries on sql repository --- taipy/core/_repository/_sql_repository.py | 45 ++++++++++++++++------ taipy/core/_version/_version_model.py | 4 +- taipy/core/cycle/_cycle_model.py | 4 +- taipy/core/data/_data_model.py | 4 +- taipy/core/job/_job_model.py | 4 +- taipy/core/scenario/_scenario_model.py | 4 +- taipy/core/submission/_submission_model.py | 4 +- taipy/core/task/_task_model.py | 4 +- tests/core/repository/mocks.py | 5 ++- 9 files changed, 57 insertions(+), 21 deletions(-) diff --git a/taipy/core/_repository/_sql_repository.py b/taipy/core/_repository/_sql_repository.py index 13d630bd35..137a9d0bd0 100644 --- a/taipy/core/_repository/_sql_repository.py +++ b/taipy/core/_repository/_sql_repository.py @@ -11,9 +11,12 @@ import json import pathlib +import sqlite3 +from datetime import datetime from sqlite3 import DatabaseError from typing import Any, Dict, Iterable, List, Optional, Type, Union +from sqlalchemy import text from sqlalchemy.dialects import sqlite from sqlalchemy.exc import NoResultFound, OperationalError @@ -26,7 +29,14 @@ class _SQLRepository(_AbstractRepository[ModelType, Entity]): - __EXCEPTIONS_TO_RETRY = (OperationalError,) + __EXCEPTIONS_TO_RETRY = ( + OperationalError, + sqlite3.OperationalError, + sqlite3.InterfaceError, + IndexError, + StopIteration, + ) + MAX_MODEL_NOT_FOUND_RETRY = 10 _logger = _TaipyLogger._get_logger() def __init__(self, model_type: Type[ModelType], converter: Type[Converter]): @@ -73,11 +83,16 @@ def _exists(self, entity_id: str): @_retry_repository_operation(__EXCEPTIONS_TO_RETRY) def _load(self, entity_id: str) -> Entity: - query = self.table.select().filter_by(id=entity_id) - - if entry := self.db.execute(str(query.compile(dialect=sqlite.dialect())), [entity_id]).fetchone(): - entry = self.model_type.from_dict(entry) - return self.converter._model_to_entity(entry) + query = self.table.select().filter_by(id=entity_id).order_by(text("updated_at DESC")) + + for _ in range(self.MAX_MODEL_NOT_FOUND_RETRY): + if entry := self.db.execute(str(query.compile(dialect=sqlite.dialect())), [entity_id]).fetchall(): + entry = sorted( + entry, key=lambda x: x["updated_at"] if x["updated_at"] is not None else "", reverse=True + ) + entry = entry[0] + entry = self.model_type.from_dict(entry) + return self.converter._model_to_entity(entry) raise ModelNotFound(str(self.model_type.__name__), entity_id) def _load_all(self, filters: Optional[List[Dict]] = None) -> List[Entity]: @@ -85,7 +100,7 @@ def _load_all(self, filters: Optional[List[Dict]] = None) -> List[Entity]: entities: List[Entity] = [] for f in filters or [{}]: - filtered_query = query.filter_by(**f) + filtered_query = query.filter_by(**f).order_by(text("updated_at DESC")) try: entries = self.db.execute( str(filtered_query.compile(dialect=sqlite.dialect())), @@ -100,15 +115,16 @@ def _load_all(self, filters: Optional[List[Dict]] = None) -> List[Entity]: def _delete(self, entity_id: str): delete_query = self.table.delete().filter_by(id=entity_id) cursor = self.db.execute(str(delete_query.compile(dialect=sqlite.dialect())), [entity_id]) - if cursor.rowcount == 0: raise ModelNotFound(str(self.model_type.__name__), entity_id) self.db.commit() + cursor.close() def _delete_all(self): - self.db.execute(str(self.table.delete().compile(dialect=sqlite.dialect()))) + cursor = self.db.execute(str(self.table.delete().compile(dialect=sqlite.dialect()))) self.db.commit() + cursor.close() def _delete_many(self, ids: Iterable[str]): for entity_id in ids: @@ -116,9 +132,9 @@ def _delete_many(self, ids: Iterable[str]): def _delete_by(self, attribute: str, value: str): delete_by_query = self.table.delete().filter_by(**{attribute: value}) - - self.db.execute(str(delete_by_query.compile(dialect=sqlite.dialect())), [value]) + cursor = self.db.execute(str(delete_by_query.compile(dialect=sqlite.dialect())), [value]) self.db.commit() + cursor.close() def _search(self, attribute: str, value: Any, filters: Optional[List[Dict]] = None) -> List[Entity]: query = self.table.select().filter_by(**{attribute: value}) @@ -219,12 +235,17 @@ def __get_entities_by_config_and_owner( @_retry_repository_operation(__EXCEPTIONS_TO_RETRY) def __insert_model(self, model: ModelType): query = self.table.insert() - self.db.execute(str(query.compile(dialect=sqlite.dialect())), model.to_list()) + query_str = str(query.compile(dialect=sqlite.dialect())) + model.updated_at = datetime.now() # type: ignore + + cursor = self.db.execute(query_str, model.to_list()) self.db.commit() + cursor.close() @_retry_repository_operation(__EXCEPTIONS_TO_RETRY) def _update_entry(self, model): query = self.table.update().filter_by(id=model.id) + model.updated_at = datetime.now() # type: ignore cursor = self.db.execute(str(query.compile(dialect=sqlite.dialect())), model.to_list() + [model.id]) self.db.commit() cursor.close() diff --git a/taipy/core/_version/_version_model.py b/taipy/core/_version/_version_model.py index 51288126ef..e6fa4c2132 100644 --- a/taipy/core/_version/_version_model.py +++ b/taipy/core/_version/_version_model.py @@ -12,7 +12,7 @@ from dataclasses import dataclass from typing import Any, Dict -from sqlalchemy import Boolean, Column, String, Table +from sqlalchemy import TIMESTAMP, Boolean, Column, String, Table from .._repository._base_taipy_model import _BaseModel from .._repository.db._sql_base_model import mapper_registry @@ -30,6 +30,7 @@ class _VersionModel(_BaseModel): Column("is_production", Boolean), Column("is_development", Boolean), Column("is_latest", Boolean), + Column("updated_at", TIMESTAMP), ) id: str config: str @@ -55,4 +56,5 @@ def to_list(self): self.is_production, self.is_development, self.is_latest, + self.updated_at, ] diff --git a/taipy/core/cycle/_cycle_model.py b/taipy/core/cycle/_cycle_model.py index 57a28ab389..e32ae9ebaf 100644 --- a/taipy/core/cycle/_cycle_model.py +++ b/taipy/core/cycle/_cycle_model.py @@ -12,7 +12,7 @@ from dataclasses import dataclass from typing import Any, Dict -from sqlalchemy import JSON, Column, Enum, String, Table +from sqlalchemy import JSON, TIMESTAMP, Column, Enum, String, Table from taipy.config.common.frequency import Frequency @@ -34,6 +34,7 @@ class _CycleModel(_BaseModel): Column("creation_date", String), Column("start_date", String), Column("end_date", String), + Column("updated_at", TIMESTAMP), ) id: CycleId name: str @@ -64,4 +65,5 @@ def to_list(self): self.creation_date, self.start_date, self.end_date, + self.updated_at, ] diff --git a/taipy/core/data/_data_model.py b/taipy/core/data/_data_model.py index eafb5d6580..babaa509ed 100644 --- a/taipy/core/data/_data_model.py +++ b/taipy/core/data/_data_model.py @@ -12,7 +12,7 @@ from dataclasses import dataclass from typing import Any, Dict, List, Optional -from sqlalchemy import JSON, Boolean, Column, Enum, Float, String, Table, UniqueConstraint +from sqlalchemy import JSON, TIMESTAMP, Boolean, Column, Enum, Float, String, Table, UniqueConstraint from taipy.config.common.scope import Scope @@ -42,6 +42,7 @@ class _DataNodeModel(_BaseModel): Column("editor_id", String), Column("editor_expiration_date", String), Column("data_node_properties", JSON), + Column("updated_at", TIMESTAMP), ) __table_args__ = (UniqueConstraint("config_id", "owner_id", name="_config_owner_uc"),) @@ -98,4 +99,5 @@ def to_list(self): self.editor_id, self.editor_expiration_date, _BaseModel._serialize_attribute(self.data_node_properties), + self.updated_at, ] diff --git a/taipy/core/job/_job_model.py b/taipy/core/job/_job_model.py index b542c6982a..20edcbf0ae 100644 --- a/taipy/core/job/_job_model.py +++ b/taipy/core/job/_job_model.py @@ -12,7 +12,7 @@ from dataclasses import dataclass from typing import Any, Dict, List -from sqlalchemy import JSON, Boolean, Column, Enum, String, Table +from sqlalchemy import JSON, TIMESTAMP, Boolean, Column, Enum, String, Table from .._repository._base_taipy_model import _BaseModel from .._repository.db._sql_base_model import mapper_registry @@ -36,6 +36,7 @@ class _JobModel(_BaseModel): Column("subscribers", JSON), Column("stacktrace", JSON), Column("version", String), + Column("updated_at", TIMESTAMP), ) id: JobId task_id: str @@ -75,4 +76,5 @@ def to_list(self): _BaseModel._serialize_attribute(self.subscribers), _BaseModel._serialize_attribute(self.stacktrace), self.version, + self.updated_at, ] diff --git a/taipy/core/scenario/_scenario_model.py b/taipy/core/scenario/_scenario_model.py index 844ae6abe1..3a5bf4f40b 100644 --- a/taipy/core/scenario/_scenario_model.py +++ b/taipy/core/scenario/_scenario_model.py @@ -12,7 +12,7 @@ from dataclasses import dataclass from typing import Any, Dict, List, Optional -from sqlalchemy import JSON, Boolean, Column, String, Table +from sqlalchemy import JSON, TIMESTAMP, Boolean, Column, String, Table from .._repository._base_taipy_model import _BaseModel from .._repository.db._sql_base_model import mapper_registry @@ -40,6 +40,7 @@ class _ScenarioModel(_BaseModel): Column("version", String), Column("sequences", JSON), Column("cycle", String), + Column("updated_at", TIMESTAMP), ) id: ScenarioId config_id: str @@ -85,4 +86,5 @@ def to_list(self): self.version, _BaseModel._serialize_attribute(self.sequences), self.cycle, + self.updated_at, ] diff --git a/taipy/core/submission/_submission_model.py b/taipy/core/submission/_submission_model.py index b770716384..7111e17053 100644 --- a/taipy/core/submission/_submission_model.py +++ b/taipy/core/submission/_submission_model.py @@ -12,7 +12,7 @@ from dataclasses import dataclass from typing import Any, Dict, List, Optional, Union -from sqlalchemy import JSON, Boolean, Column, Enum, String, Table +from sqlalchemy import JSON, TIMESTAMP, Boolean, Column, Enum, String, Table from .._repository._base_taipy_model import _BaseModel from .._repository.db._sql_base_model import mapper_registry @@ -41,6 +41,7 @@ class _SubmissionModel(_BaseModel): Column("running_jobs", JSON), Column("blocked_jobs", JSON), Column("pending_jobs", JSON), + Column("updated_at", TIMESTAMP), ) id: str entity_id: str @@ -95,4 +96,5 @@ def to_list(self): _BaseModel._serialize_attribute(self.running_jobs), _BaseModel._serialize_attribute(self.blocked_jobs), _BaseModel._serialize_attribute(self.pending_jobs), + self.updated_at, ] diff --git a/taipy/core/task/_task_model.py b/taipy/core/task/_task_model.py index 14c256d23b..482fec7291 100644 --- a/taipy/core/task/_task_model.py +++ b/taipy/core/task/_task_model.py @@ -12,7 +12,7 @@ from dataclasses import dataclass from typing import Any, Dict, List, Optional -from sqlalchemy import JSON, Boolean, Column, String, Table +from sqlalchemy import JSON, TIMESTAMP, Boolean, Column, String, Table from .._repository._base_taipy_model import _BaseModel from .._repository.db._sql_base_model import mapper_registry @@ -35,6 +35,7 @@ class _TaskModel(_BaseModel): Column("version", String), Column("skippable", Boolean), Column("properties", JSON), + Column("updated_at", TIMESTAMP), ) id: str owner_id: Optional[str] @@ -77,4 +78,5 @@ def to_list(self): self.version, self.skippable, _BaseModel._serialize_attribute(self.properties), + self.updated_at, ] diff --git a/tests/core/repository/mocks.py b/tests/core/repository/mocks.py index ac55a300c9..93c59b7b12 100644 --- a/tests/core/repository/mocks.py +++ b/tests/core/repository/mocks.py @@ -14,7 +14,7 @@ from dataclasses import dataclass from typing import Any, Dict, Optional -from sqlalchemy import Column, String, Table +from sqlalchemy import TIMESTAMP, Column, String, Table from sqlalchemy.dialects import sqlite from sqlalchemy.orm import declarative_base, registry from sqlalchemy.schema import CreateTable @@ -53,6 +53,7 @@ class MockModel(Base): # type: ignore Column("id", String(200), primary_key=True), Column("name", String(200)), Column("version", String(200)), + Column("updated_at", TIMESTAMP), ) id: str name: str @@ -73,7 +74,7 @@ def _from_entity(cls, entity: MockObj): return MockModel(id=entity.id, name=entity.name, version=entity._version) def to_list(self): - return [self.id, self.name, self.version] + return [self.id, self.name, self.version, self.updated_at] class MockConverter(_AbstractConverter):