Skip to content

Commit

Permalink
feat: update retry methods and queries on sql repository
Browse files Browse the repository at this point in the history
  • Loading branch information
joaoandre-avaiga committed May 26, 2024
1 parent 4d3e5d6 commit fcb17da
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 21 deletions.
45 changes: 33 additions & 12 deletions taipy/core/_repository/_sql_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@

import json
import pathlib
import sqlite3
from datetime import datetime
from sqlite3 import DatabaseError
from typing import Any, Dict, Iterable, List, Optional, Type, Union

from sqlalchemy import text
from sqlalchemy.dialects import sqlite
from sqlalchemy.exc import NoResultFound, OperationalError

Expand All @@ -26,7 +29,14 @@


class _SQLRepository(_AbstractRepository[ModelType, Entity]):
__EXCEPTIONS_TO_RETRY = (OperationalError,)
__EXCEPTIONS_TO_RETRY = (
OperationalError,
sqlite3.OperationalError,
sqlite3.InterfaceError,
IndexError,
StopIteration,
)
MAX_MODEL_NOT_FOUND_RETRY = 10
_logger = _TaipyLogger._get_logger()

def __init__(self, model_type: Type[ModelType], converter: Type[Converter]):
Expand Down Expand Up @@ -73,19 +83,24 @@ def _exists(self, entity_id: str):

@_retry_repository_operation(__EXCEPTIONS_TO_RETRY)
def _load(self, entity_id: str) -> Entity:
query = self.table.select().filter_by(id=entity_id)

if entry := self.db.execute(str(query.compile(dialect=sqlite.dialect())), [entity_id]).fetchone():
entry = self.model_type.from_dict(entry)
return self.converter._model_to_entity(entry)
query = self.table.select().filter_by(id=entity_id).order_by(text("updated_at DESC"))

for _ in range(self.MAX_MODEL_NOT_FOUND_RETRY):
if entry := self.db.execute(str(query.compile(dialect=sqlite.dialect())), [entity_id]).fetchall():
entry = sorted(
entry, key=lambda x: x["updated_at"] if x["updated_at"] is not None else "", reverse=True
)
entry = entry[0]
entry = self.model_type.from_dict(entry)
return self.converter._model_to_entity(entry)
raise ModelNotFound(str(self.model_type.__name__), entity_id)

def _load_all(self, filters: Optional[List[Dict]] = None) -> List[Entity]:
query = self.table.select()
entities: List[Entity] = []

for f in filters or [{}]:
filtered_query = query.filter_by(**f)
filtered_query = query.filter_by(**f).order_by(text("updated_at DESC"))
try:
entries = self.db.execute(
str(filtered_query.compile(dialect=sqlite.dialect())),
Expand All @@ -100,25 +115,26 @@ def _load_all(self, filters: Optional[List[Dict]] = None) -> List[Entity]:
def _delete(self, entity_id: str):
delete_query = self.table.delete().filter_by(id=entity_id)
cursor = self.db.execute(str(delete_query.compile(dialect=sqlite.dialect())), [entity_id])

if cursor.rowcount == 0:
raise ModelNotFound(str(self.model_type.__name__), entity_id)

self.db.commit()
cursor.close()

def _delete_all(self):
self.db.execute(str(self.table.delete().compile(dialect=sqlite.dialect())))
cursor = self.db.execute(str(self.table.delete().compile(dialect=sqlite.dialect())))
self.db.commit()
cursor.close()

def _delete_many(self, ids: Iterable[str]):
for entity_id in ids:
self._delete(entity_id)

def _delete_by(self, attribute: str, value: str):
delete_by_query = self.table.delete().filter_by(**{attribute: value})

self.db.execute(str(delete_by_query.compile(dialect=sqlite.dialect())), [value])
cursor = self.db.execute(str(delete_by_query.compile(dialect=sqlite.dialect())), [value])
self.db.commit()
cursor.close()

def _search(self, attribute: str, value: Any, filters: Optional[List[Dict]] = None) -> List[Entity]:
query = self.table.select().filter_by(**{attribute: value})
Expand Down Expand Up @@ -219,12 +235,17 @@ def __get_entities_by_config_and_owner(
@_retry_repository_operation(__EXCEPTIONS_TO_RETRY)
def __insert_model(self, model: ModelType):
query = self.table.insert()
self.db.execute(str(query.compile(dialect=sqlite.dialect())), model.to_list())
query_str = str(query.compile(dialect=sqlite.dialect()))
model.updated_at = datetime.now() # type: ignore

cursor = self.db.execute(query_str, model.to_list())
self.db.commit()
cursor.close()

@_retry_repository_operation(__EXCEPTIONS_TO_RETRY)
def _update_entry(self, model):
query = self.table.update().filter_by(id=model.id)
model.updated_at = datetime.now() # type: ignore
cursor = self.db.execute(str(query.compile(dialect=sqlite.dialect())), model.to_list() + [model.id])
self.db.commit()
cursor.close()
Expand Down
4 changes: 3 additions & 1 deletion taipy/core/_version/_version_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from dataclasses import dataclass
from typing import Any, Dict

from sqlalchemy import Boolean, Column, String, Table
from sqlalchemy import TIMESTAMP, Boolean, Column, String, Table

from .._repository._base_taipy_model import _BaseModel
from .._repository.db._sql_base_model import mapper_registry
Expand All @@ -30,6 +30,7 @@ class _VersionModel(_BaseModel):
Column("is_production", Boolean),
Column("is_development", Boolean),
Column("is_latest", Boolean),
Column("updated_at", TIMESTAMP),
)
id: str
config: str
Expand All @@ -55,4 +56,5 @@ def to_list(self):
self.is_production,
self.is_development,
self.is_latest,
self.updated_at,
]
4 changes: 3 additions & 1 deletion taipy/core/cycle/_cycle_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from dataclasses import dataclass
from typing import Any, Dict

from sqlalchemy import JSON, Column, Enum, String, Table
from sqlalchemy import JSON, TIMESTAMP, Column, Enum, String, Table

from taipy.config.common.frequency import Frequency

Expand All @@ -34,6 +34,7 @@ class _CycleModel(_BaseModel):
Column("creation_date", String),
Column("start_date", String),
Column("end_date", String),
Column("updated_at", TIMESTAMP),
)
id: CycleId
name: str
Expand Down Expand Up @@ -64,4 +65,5 @@ def to_list(self):
self.creation_date,
self.start_date,
self.end_date,
self.updated_at,
]
4 changes: 3 additions & 1 deletion taipy/core/data/_data_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from dataclasses import dataclass
from typing import Any, Dict, List, Optional

from sqlalchemy import JSON, Boolean, Column, Enum, Float, String, Table, UniqueConstraint
from sqlalchemy import JSON, TIMESTAMP, Boolean, Column, Enum, Float, String, Table, UniqueConstraint

from taipy.config.common.scope import Scope

Expand Down Expand Up @@ -42,6 +42,7 @@ class _DataNodeModel(_BaseModel):
Column("editor_id", String),
Column("editor_expiration_date", String),
Column("data_node_properties", JSON),
Column("updated_at", TIMESTAMP),
)
__table_args__ = (UniqueConstraint("config_id", "owner_id", name="_config_owner_uc"),)

Expand Down Expand Up @@ -98,4 +99,5 @@ def to_list(self):
self.editor_id,
self.editor_expiration_date,
_BaseModel._serialize_attribute(self.data_node_properties),
self.updated_at,
]
4 changes: 3 additions & 1 deletion taipy/core/job/_job_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from dataclasses import dataclass
from typing import Any, Dict, List

from sqlalchemy import JSON, Boolean, Column, Enum, String, Table
from sqlalchemy import JSON, TIMESTAMP, Boolean, Column, Enum, String, Table

from .._repository._base_taipy_model import _BaseModel
from .._repository.db._sql_base_model import mapper_registry
Expand All @@ -36,6 +36,7 @@ class _JobModel(_BaseModel):
Column("subscribers", JSON),
Column("stacktrace", JSON),
Column("version", String),
Column("updated_at", TIMESTAMP),
)
id: JobId
task_id: str
Expand Down Expand Up @@ -75,4 +76,5 @@ def to_list(self):
_BaseModel._serialize_attribute(self.subscribers),
_BaseModel._serialize_attribute(self.stacktrace),
self.version,
self.updated_at,
]
4 changes: 3 additions & 1 deletion taipy/core/scenario/_scenario_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from dataclasses import dataclass
from typing import Any, Dict, List, Optional

from sqlalchemy import JSON, Boolean, Column, String, Table
from sqlalchemy import JSON, TIMESTAMP, Boolean, Column, String, Table

from .._repository._base_taipy_model import _BaseModel
from .._repository.db._sql_base_model import mapper_registry
Expand Down Expand Up @@ -40,6 +40,7 @@ class _ScenarioModel(_BaseModel):
Column("version", String),
Column("sequences", JSON),
Column("cycle", String),
Column("updated_at", TIMESTAMP),
)
id: ScenarioId
config_id: str
Expand Down Expand Up @@ -85,4 +86,5 @@ def to_list(self):
self.version,
_BaseModel._serialize_attribute(self.sequences),
self.cycle,
self.updated_at,
]
4 changes: 3 additions & 1 deletion taipy/core/submission/_submission_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Union

from sqlalchemy import JSON, Boolean, Column, Enum, String, Table
from sqlalchemy import JSON, TIMESTAMP, Boolean, Column, Enum, String, Table

from .._repository._base_taipy_model import _BaseModel
from .._repository.db._sql_base_model import mapper_registry
Expand Down Expand Up @@ -41,6 +41,7 @@ class _SubmissionModel(_BaseModel):
Column("running_jobs", JSON),
Column("blocked_jobs", JSON),
Column("pending_jobs", JSON),
Column("updated_at", TIMESTAMP),
)
id: str
entity_id: str
Expand Down Expand Up @@ -95,4 +96,5 @@ def to_list(self):
_BaseModel._serialize_attribute(self.running_jobs),
_BaseModel._serialize_attribute(self.blocked_jobs),
_BaseModel._serialize_attribute(self.pending_jobs),
self.updated_at,
]
4 changes: 3 additions & 1 deletion taipy/core/task/_task_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from dataclasses import dataclass
from typing import Any, Dict, List, Optional

from sqlalchemy import JSON, Boolean, Column, String, Table
from sqlalchemy import JSON, TIMESTAMP, Boolean, Column, String, Table

from .._repository._base_taipy_model import _BaseModel
from .._repository.db._sql_base_model import mapper_registry
Expand All @@ -35,6 +35,7 @@ class _TaskModel(_BaseModel):
Column("version", String),
Column("skippable", Boolean),
Column("properties", JSON),
Column("updated_at", TIMESTAMP),
)
id: str
owner_id: Optional[str]
Expand Down Expand Up @@ -77,4 +78,5 @@ def to_list(self):
self.version,
self.skippable,
_BaseModel._serialize_attribute(self.properties),
self.updated_at,
]
5 changes: 3 additions & 2 deletions tests/core/repository/mocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from dataclasses import dataclass
from typing import Any, Dict, Optional

from sqlalchemy import Column, String, Table
from sqlalchemy import TIMESTAMP, Column, String, Table
from sqlalchemy.dialects import sqlite
from sqlalchemy.orm import declarative_base, registry
from sqlalchemy.schema import CreateTable
Expand Down Expand Up @@ -53,6 +53,7 @@ class MockModel(Base): # type: ignore
Column("id", String(200), primary_key=True),
Column("name", String(200)),
Column("version", String(200)),
Column("updated_at", TIMESTAMP),
)
id: str
name: str
Expand All @@ -73,7 +74,7 @@ def _from_entity(cls, entity: MockObj):
return MockModel(id=entity.id, name=entity.name, version=entity._version)

def to_list(self):
return [self.id, self.name, self.version]
return [self.id, self.name, self.version, self.updated_at]


class MockConverter(_AbstractConverter):
Expand Down

0 comments on commit fcb17da

Please sign in to comment.