Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update retry methods and queries on sql repository #1320

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 35 additions & 14 deletions taipy/core/_repository/_sql_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@

import json
import pathlib
import sqlite3
from datetime import datetime
from sqlite3 import DatabaseError
from typing import Any, Dict, Iterable, List, Optional, Type, Union

from sqlalchemy import text
from sqlalchemy.dialects import sqlite
from sqlalchemy.exc import NoResultFound, OperationalError

Expand All @@ -26,7 +29,14 @@


class _SQLRepository(_AbstractRepository[ModelType, Entity]):
__EXCEPTIONS_TO_RETRY = (OperationalError,)
__EXCEPTIONS_TO_RETRY = (
OperationalError,
sqlite3.OperationalError,
sqlite3.InterfaceError,
IndexError,
StopIteration,
)
MAX_MODEL_NOT_FOUND_RETRY = 30
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it too much? Or is it for debugging and investigation purpose?

_logger = _TaipyLogger._get_logger()

def __init__(self, model_type: Type[ModelType], converter: Type[Converter]):
Expand Down Expand Up @@ -73,19 +83,24 @@ def _exists(self, entity_id: str):

@_retry_repository_operation(__EXCEPTIONS_TO_RETRY)
def _load(self, entity_id: str) -> Entity:
query = self.table.select().filter_by(id=entity_id)

if entry := self.db.execute(str(query.compile(dialect=sqlite.dialect())), [entity_id]).fetchone():
entry = self.model_type.from_dict(entry)
return self.converter._model_to_entity(entry)
query = self.table.select().filter_by(id=entity_id).order_by(text("updated_at DESC"))

for _ in range(self.MAX_MODEL_NOT_FOUND_RETRY):
if entry := self.db.execute(str(query.compile(dialect=sqlite.dialect())), [entity_id]).fetchall():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The behavior we want is to have at most one element returned by this query.
We should at least log a warning if it is not the case.
To investigate why, maybe we can exit as well.

entry = sorted(
entry, key=lambda x: x["updated_at"] if x["updated_at"] is not None else "", reverse=True
)
entry = entry[0]
entry = self.model_type.from_dict(entry)
return self.converter._model_to_entity(entry)
raise ModelNotFound(str(self.model_type.__name__), entity_id)

def _load_all(self, filters: Optional[List[Dict]] = None) -> List[Entity]:
query = self.table.select()
entities: List[Entity] = []

for f in filters or [{}]:
filtered_query = query.filter_by(**f)
filtered_query = query.filter_by(**f).order_by(text("updated_at DESC"))
try:
entries = self.db.execute(
str(filtered_query.compile(dialect=sqlite.dialect())),
Expand All @@ -100,25 +115,26 @@ def _load_all(self, filters: Optional[List[Dict]] = None) -> List[Entity]:
def _delete(self, entity_id: str):
delete_query = self.table.delete().filter_by(id=entity_id)
cursor = self.db.execute(str(delete_query.compile(dialect=sqlite.dialect())), [entity_id])

if cursor.rowcount == 0:
raise ModelNotFound(str(self.model_type.__name__), entity_id)

self.db.commit()
cursor.close()

def _delete_all(self):
self.db.execute(str(self.table.delete().compile(dialect=sqlite.dialect())))
cursor = self.db.execute(str(self.table.delete().compile(dialect=sqlite.dialect())))
self.db.commit()
cursor.close()

def _delete_many(self, ids: Iterable[str]):
for entity_id in ids:
self._delete(entity_id)

def _delete_by(self, attribute: str, value: str):
delete_by_query = self.table.delete().filter_by(**{attribute: value})

self.db.execute(str(delete_by_query.compile(dialect=sqlite.dialect())), [value])
cursor = self.db.execute(str(delete_by_query.compile(dialect=sqlite.dialect())), [value])
self.db.commit()
cursor.close()

def _search(self, attribute: str, value: Any, filters: Optional[List[Dict]] = None) -> List[Entity]:
query = self.table.select().filter_by(**{attribute: value})
Expand Down Expand Up @@ -216,15 +232,20 @@ def __get_entities_by_config_and_owner(
#############################
# ## Private methods ## #
#############################
@_retry_repository_operation(__EXCEPTIONS_TO_RETRY)
@_retry_repository_operation(__EXCEPTIONS_TO_RETRY, 0.5)
def __insert_model(self, model: ModelType):
query = self.table.insert()
jrobinAV marked this conversation as resolved.
Show resolved Hide resolved
self.db.execute(str(query.compile(dialect=sqlite.dialect())), model.to_list())
query_str = str(query.compile(dialect=sqlite.dialect()))
model.updated_at = datetime.now() # type: ignore

cursor = self.db.execute(query_str, model.to_list())
self.db.commit()
cursor.close()

@_retry_repository_operation(__EXCEPTIONS_TO_RETRY)
@_retry_repository_operation(__EXCEPTIONS_TO_RETRY, 0.5)
def _update_entry(self, model):
query = self.table.update().filter_by(id=model.id)
model.updated_at = datetime.now() # type: ignore
cursor = self.db.execute(str(query.compile(dialect=sqlite.dialect())), model.to_list() + [model.id])
self.db.commit()
cursor.close()
Expand Down
4 changes: 3 additions & 1 deletion taipy/core/_version/_version_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from dataclasses import dataclass
from typing import Any, Dict

from sqlalchemy import Boolean, Column, String, Table
from sqlalchemy import TIMESTAMP, Boolean, Column, String, Table

from .._repository._base_taipy_model import _BaseModel
from .._repository.db._sql_base_model import mapper_registry
Expand All @@ -30,6 +30,7 @@ class _VersionModel(_BaseModel):
Column("is_production", Boolean),
Column("is_development", Boolean),
Column("is_latest", Boolean),
Column("updated_at", TIMESTAMP),
)
id: str
config: str
Expand All @@ -55,4 +56,5 @@ def to_list(self):
self.is_production,
self.is_development,
self.is_latest,
self.updated_at,
]
4 changes: 3 additions & 1 deletion taipy/core/cycle/_cycle_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from dataclasses import dataclass
from typing import Any, Dict

from sqlalchemy import JSON, Column, Enum, String, Table
from sqlalchemy import JSON, TIMESTAMP, Column, Enum, String, Table

from taipy.config.common.frequency import Frequency

Expand All @@ -34,6 +34,7 @@ class _CycleModel(_BaseModel):
Column("creation_date", String),
Column("start_date", String),
Column("end_date", String),
Column("updated_at", TIMESTAMP),
)
id: CycleId
name: str
Expand Down Expand Up @@ -64,4 +65,5 @@ def to_list(self):
self.creation_date,
self.start_date,
self.end_date,
self.updated_at,
]
4 changes: 3 additions & 1 deletion taipy/core/data/_data_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from dataclasses import dataclass
from typing import Any, Dict, List, Optional

from sqlalchemy import JSON, Boolean, Column, Enum, Float, String, Table, UniqueConstraint
from sqlalchemy import JSON, TIMESTAMP, Boolean, Column, Enum, Float, String, Table, UniqueConstraint

from taipy.config.common.scope import Scope

Expand Down Expand Up @@ -42,6 +42,7 @@ class _DataNodeModel(_BaseModel):
Column("editor_id", String),
Column("editor_expiration_date", String),
Column("data_node_properties", JSON),
Column("updated_at", TIMESTAMP),
)
__table_args__ = (UniqueConstraint("config_id", "owner_id", name="_config_owner_uc"),)

Expand Down Expand Up @@ -98,4 +99,5 @@ def to_list(self):
self.editor_id,
self.editor_expiration_date,
_BaseModel._serialize_attribute(self.data_node_properties),
self.updated_at,
]
4 changes: 3 additions & 1 deletion taipy/core/job/_job_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from dataclasses import dataclass
from typing import Any, Dict, List

from sqlalchemy import JSON, Boolean, Column, Enum, String, Table
from sqlalchemy import JSON, TIMESTAMP, Boolean, Column, Enum, String, Table

from .._repository._base_taipy_model import _BaseModel
from .._repository.db._sql_base_model import mapper_registry
Expand All @@ -36,6 +36,7 @@ class _JobModel(_BaseModel):
Column("subscribers", JSON),
Column("stacktrace", JSON),
Column("version", String),
Column("updated_at", TIMESTAMP),
)
id: JobId
task_id: str
Expand Down Expand Up @@ -75,4 +76,5 @@ def to_list(self):
_BaseModel._serialize_attribute(self.subscribers),
_BaseModel._serialize_attribute(self.stacktrace),
self.version,
self.updated_at,
]
4 changes: 3 additions & 1 deletion taipy/core/scenario/_scenario_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from dataclasses import dataclass
from typing import Any, Dict, List, Optional

from sqlalchemy import JSON, Boolean, Column, String, Table
from sqlalchemy import JSON, TIMESTAMP, Boolean, Column, String, Table

from .._repository._base_taipy_model import _BaseModel
from .._repository.db._sql_base_model import mapper_registry
Expand Down Expand Up @@ -40,6 +40,7 @@ class _ScenarioModel(_BaseModel):
Column("version", String),
Column("sequences", JSON),
Column("cycle", String),
Column("updated_at", TIMESTAMP),
)
id: ScenarioId
config_id: str
Expand Down Expand Up @@ -85,4 +86,5 @@ def to_list(self):
self.version,
_BaseModel._serialize_attribute(self.sequences),
self.cycle,
self.updated_at,
]
4 changes: 3 additions & 1 deletion taipy/core/submission/_submission_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Union

from sqlalchemy import JSON, Boolean, Column, Enum, String, Table
from sqlalchemy import JSON, TIMESTAMP, Boolean, Column, Enum, String, Table

from .._repository._base_taipy_model import _BaseModel
from .._repository.db._sql_base_model import mapper_registry
Expand Down Expand Up @@ -41,6 +41,7 @@ class _SubmissionModel(_BaseModel):
Column("running_jobs", JSON),
Column("blocked_jobs", JSON),
Column("pending_jobs", JSON),
Column("updated_at", TIMESTAMP),
)
id: str
entity_id: str
Expand Down Expand Up @@ -95,4 +96,5 @@ def to_list(self):
_BaseModel._serialize_attribute(self.running_jobs),
_BaseModel._serialize_attribute(self.blocked_jobs),
_BaseModel._serialize_attribute(self.pending_jobs),
self.updated_at,
]
4 changes: 3 additions & 1 deletion taipy/core/task/_task_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from dataclasses import dataclass
from typing import Any, Dict, List, Optional

from sqlalchemy import JSON, Boolean, Column, String, Table
from sqlalchemy import JSON, TIMESTAMP, Boolean, Column, String, Table

from .._repository._base_taipy_model import _BaseModel
from .._repository.db._sql_base_model import mapper_registry
Expand All @@ -35,6 +35,7 @@ class _TaskModel(_BaseModel):
Column("version", String),
Column("skippable", Boolean),
Column("properties", JSON),
Column("updated_at", TIMESTAMP),
)
id: str
owner_id: Optional[str]
Expand Down Expand Up @@ -77,4 +78,5 @@ def to_list(self):
self.version,
self.skippable,
_BaseModel._serialize_attribute(self.properties),
self.updated_at,
]
5 changes: 3 additions & 2 deletions tests/core/repository/mocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from dataclasses import dataclass
from typing import Any, Dict, Optional

from sqlalchemy import Column, String, Table
from sqlalchemy import TIMESTAMP, Column, String, Table
from sqlalchemy.dialects import sqlite
from sqlalchemy.orm import declarative_base, registry
from sqlalchemy.schema import CreateTable
Expand Down Expand Up @@ -53,6 +53,7 @@ class MockModel(Base): # type: ignore
Column("id", String(200), primary_key=True),
Column("name", String(200)),
Column("version", String(200)),
Column("updated_at", TIMESTAMP),
)
id: str
name: str
Expand All @@ -73,7 +74,7 @@ def _from_entity(cls, entity: MockObj):
return MockModel(id=entity.id, name=entity.name, version=entity._version)

def to_list(self):
return [self.id, self.name, self.version]
return [self.id, self.name, self.version, self.updated_at]


class MockConverter(_AbstractConverter):
Expand Down
Loading