-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Prevent charger tasks from running concurrently
- Loading branch information
1 parent
bec330e
commit 1cbe341
Showing
38 changed files
with
559 additions
and
69 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
"""empty message | ||
Revision ID: 1c15c81ce8a8 | ||
Revises: b56e853bb310 | ||
Create Date: 2024-09-09 15:40:14.071295 | ||
""" | ||
|
||
from collections.abc import Sequence | ||
|
||
import sqlalchemy as sa | ||
from sqlalchemy.dialects import postgresql | ||
|
||
from alembic import op | ||
|
||
# revision identifiers, used by Alembic. | ||
revision: str = "1c15c81ce8a8" | ||
down_revision: str | None = "b56e853bb310" | ||
branch_labels: str | Sequence[str] | None = None | ||
depends_on: str | Sequence[str] | None = None | ||
|
||
|
||
def upgrade() -> None: | ||
# ### commands auto generated by Alembic - please adjust! ### | ||
op.create_table( | ||
"task_registry", | ||
sa.Column("task_name", sa.String(), nullable=False), | ||
sa.Column("last_run", sa.DateTime(timezone=True), nullable=True), | ||
sa.Column("last_duration", sa.Float(), server_default=sa.text("0"), nullable=False), | ||
sa.Column("last_error", sa.String(), nullable=True), | ||
sa.PrimaryKeyConstraint("task_name", name=op.f("pk_task_registry")), | ||
) | ||
op.drop_column("job", "updated_at") | ||
# ### end Alembic commands ### | ||
|
||
|
||
def downgrade() -> None: | ||
# ### commands auto generated by Alembic - please adjust! ### | ||
op.add_column( | ||
"job", | ||
sa.Column( | ||
"updated_at", | ||
postgresql.TIMESTAMP(timezone=True), | ||
server_default=sa.text("now()"), | ||
autoincrement=False, | ||
nullable=False, | ||
), | ||
) | ||
op.drop_table("task_registry") | ||
# ### end Alembic commands ### |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
"""Task registry module.""" | ||
|
||
from datetime import datetime | ||
|
||
import sqlalchemy as sa | ||
from sqlalchemy.dialects import postgresql as pg | ||
from sqlalchemy.exc import DBAPIError | ||
|
||
from app.db.model import TaskRegistry | ||
from app.repository.base import BaseRepository | ||
|
||
|
||
class TaskRegistryRepository(BaseRepository): | ||
"""TaskRegistryRepository.""" | ||
|
||
async def populate_task(self, task_name: str) -> TaskRegistry | None: | ||
"""Insert a task record if it doesn't exist already. | ||
Args: | ||
task_name: name of the task. | ||
""" | ||
insert_query = ( | ||
pg.insert(TaskRegistry) | ||
.values(task_name=task_name) | ||
.on_conflict_do_nothing() | ||
.returning(TaskRegistry) | ||
) | ||
return (await self.db.execute(insert_query)).scalar_one_or_none() | ||
|
||
async def get_locked_task(self, task_name: str) -> TaskRegistry | None: | ||
"""Lock and return a record from the task registry, or None if already locked. | ||
Args: | ||
task_name: name of the task. | ||
""" | ||
select_query = ( | ||
sa.select(TaskRegistry) | ||
.where(TaskRegistry.task_name == task_name) | ||
.with_for_update(nowait=True) | ||
) | ||
try: | ||
# ensure that the record exists and that it can be locked | ||
return (await self.db.execute(select_query)).scalar_one() | ||
except DBAPIError as ex: | ||
if getattr(ex.orig, "pgcode", None) == "55P03": | ||
# Lock Not Available: the record exists, but it cannot be locked | ||
return None | ||
raise | ||
|
||
async def update_task( | ||
self, | ||
task_name: str, | ||
*, | ||
last_run: datetime, | ||
last_duration: float, | ||
last_error: str | None, | ||
) -> TaskRegistry: | ||
"""Update an existing task in the registry. | ||
Args: | ||
task_name: name of the task. | ||
last_run: last start time. | ||
last_duration: last duration in seconds. | ||
last_error: traceback from the last task execution, or None. | ||
""" | ||
query = ( | ||
sa.update(TaskRegistry) | ||
.values( | ||
last_run=last_run, | ||
last_duration=last_duration, | ||
last_error=last_error, | ||
) | ||
.where(TaskRegistry.task_name == task_name) | ||
.returning(TaskRegistry) | ||
) | ||
return (await self.db.execute(query)).scalar_one() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.