Skip to content

Commit

Permalink
Merge branch 'master' into remove-socket-support
Browse files Browse the repository at this point in the history
  • Loading branch information
kaplanelad authored Nov 25, 2024
2 parents eb78d7f + b618128 commit 3d1121f
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 19 deletions.
2 changes: 1 addition & 1 deletion loco-gen/src/templates/task.t
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ injections:
append: true
content: "pub mod {{ file_name }};"
- into: src/app.rs
above: "// tasks-inject"
before: "// tasks-inject"
content: " tasks.register(tasks::{{file_name}}::{{module_name}});"
---
use loco_rs::prelude::*;
Expand Down
22 changes: 13 additions & 9 deletions src/bgworker/sqlt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,13 +193,13 @@ pub async fn initialize_database(pool: &SqlitePool) -> Result<()> {
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS aquire_queue_write_lock (
CREATE TABLE IF NOT EXISTS sqlt_loco_queue_lock (
id INTEGER PRIMARY KEY CHECK (id = 1),
is_locked BOOLEAN NOT NULL DEFAULT FALSE,
locked_at TIMESTAMP NULL
);
INSERT OR IGNORE INTO aquire_queue_write_lock (id, is_locked) VALUES (1, FALSE);
INSERT OR IGNORE INTO sqlt_loco_queue_lock (id, is_locked) VALUES (1, FALSE);
CREATE INDEX IF NOT EXISTS idx_sqlt_queue_status_run_at ON sqlt_loco_queue(status, run_at);
",
Expand Down Expand Up @@ -245,7 +245,7 @@ async fn dequeue(client: &SqlitePool) -> Result<Option<Task>> {
let mut tx = client.begin().await?;

let acquired_write_lock = sqlx::query(
"UPDATE aquire_queue_write_lock SET
"UPDATE sqlt_loco_queue_lock SET
is_locked = TRUE,
locked_at = CURRENT_TIMESTAMP
WHERE id = 1 AND is_locked = FALSE",
Expand Down Expand Up @@ -282,15 +282,16 @@ async fn dequeue(client: &SqlitePool) -> Result<Option<Task>> {

if let Some(task) = row {
sqlx::query(
"UPDATE sqlt_loco_queue SET status = 'processing', updated_at = CURRENT_TIMESTAMP WHERE id = $1",
"UPDATE sqlt_loco_queue SET status = 'processing', updated_at = CURRENT_TIMESTAMP \
WHERE id = $1",
)
.bind(&task.id)
.execute(&mut *tx)
.await?;

// Release the write lock
sqlx::query(
"UPDATE aquire_queue_write_lock
"UPDATE sqlt_loco_queue_lock
SET is_locked = FALSE,
locked_at = NULL
WHERE id = 1",
Expand All @@ -304,7 +305,7 @@ async fn dequeue(client: &SqlitePool) -> Result<Option<Task>> {
} else {
// Release the write lock, no task found
sqlx::query(
"UPDATE aquire_queue_write_lock
"UPDATE sqlt_loco_queue_lock
SET is_locked = FALSE,
locked_at = NULL
WHERE id = 1",
Expand All @@ -325,15 +326,17 @@ async fn complete_task(
if let Some(interval_ms) = interval_ms {
let next_run_at = Utc::now() + chrono::Duration::milliseconds(interval_ms);
sqlx::query(
"UPDATE sqlt_loco_queue SET status = 'queued', updated_at = CURRENT_TIMESTAMP, run_at = DATETIME($1) WHERE id = $2",
"UPDATE sqlt_loco_queue SET status = 'queued', updated_at = CURRENT_TIMESTAMP, run_at \
= DATETIME($1) WHERE id = $2",
)
.bind(next_run_at)
.bind(task_id)
.execute(pool)
.await?;
} else {
sqlx::query(
"UPDATE sqlt_loco_queue SET status = 'completed', updated_at = CURRENT_TIMESTAMP WHERE id = $1",
"UPDATE sqlt_loco_queue SET status = 'completed', updated_at = CURRENT_TIMESTAMP \
WHERE id = $1",
)
.bind(task_id)
.execute(pool)
Expand All @@ -347,7 +350,8 @@ async fn fail_task(pool: &SqlitePool, task_id: &TaskId, error: &crate::Error) ->
error!(err = msg, "failed task");
let error_json = serde_json::json!({ "error": msg });
sqlx::query(
"UPDATE sqlt_loco_queue SET status = 'failed', updated_at = CURRENT_TIMESTAMP, task_data = json_patch(task_data, $1) WHERE id = $2",
"UPDATE sqlt_loco_queue SET status = 'failed', updated_at = CURRENT_TIMESTAMP, task_data \
= json_patch(task_data, $1) WHERE id = $2",
)
.bind(error_json)
.bind(task_id)
Expand Down
79 changes: 70 additions & 9 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ use crate::{
};

pub static EXTRACT_DB_NAME: OnceLock<Regex> = OnceLock::new();
const IGNORED_TABLES: &[&str] = &[
"seaql_migrations",
"pg_loco_queue",
"sqlt_loco_queue",
"sqlt_loco_queue_lock",
];

fn get_extract_db_name() -> &'static Regex {
EXTRACT_DB_NAME.get_or_init(|| Regex::new(r"/([^/]+)$").unwrap())
Expand Down Expand Up @@ -229,6 +235,8 @@ pub async fn reset<M: MigratorTrait>(db: &DatabaseConnection) -> Result<(), sea_
migrate::<M>(db).await
}

use sea_orm::EntityName;
use serde_json::Value;
/// Seed the database with data from a specified file.
/// Seeds open the file path and insert all file content into the DB.
///
Expand All @@ -239,25 +247,76 @@ pub async fn reset<M: MigratorTrait>(db: &DatabaseConnection) -> Result<(), sea_
/// Returns a [`AppResult`] if could not render the path content into
/// [`Vec<serde_json::Value>`] or could not inset the vector to DB.
#[allow(clippy::type_repetition_in_bounds)]
pub async fn seed<A>(db: &DatabaseConnection, path: &str) -> AppResult<()>
pub async fn seed<A>(db: &DatabaseConnection, path: &str) -> crate::Result<()>
where
<<A as ActiveModelTrait>::Entity as EntityTrait>::Model: IntoActiveModel<A>,
for<'de> <<A as ActiveModelTrait>::Entity as EntityTrait>::Model: serde::de::Deserialize<'de>,
A: sea_orm::ActiveModelTrait + Send + Sync,
sea_orm::Insert<A>: Send + Sync, // Add this Send bound
A: ActiveModelTrait + Send + Sync,
sea_orm::Insert<A>: Send + Sync,
<A as ActiveModelTrait>::Entity: EntityName,
{
let seed_data: Vec<serde_json::Value> = serde_yaml::from_reader(File::open(path)?)?;
// Deserialize YAML file into a vector of JSON values
let seed_data: Vec<Value> = serde_yaml::from_reader(File::open(path)?)?;

// Insert each row
for row in seed_data {
let model = <A as ActiveModelTrait>::from_json(row)?;
<A as ActiveModelTrait>::Entity::insert(model)
.exec(db)
.await?;
let model = A::from_json(row)?;
A::Entity::insert(model).exec(db).await?;
}

// Get the table name from the entity
let table_name = A::Entity::default().table_name().to_string();

// Get the database backend
let db_backend = db.get_database_backend();

// Reset auto-increment
reset_autoincrement(db_backend, &table_name, db).await?;

Ok(())
}

/// Function to reset auto-increment
/// # Errors
/// Returns error if it fails
pub async fn reset_autoincrement(
db_backend: DatabaseBackend,
table_name: &str,
db: &DatabaseConnection,
) -> crate::Result<()> {
match db_backend {
DatabaseBackend::Postgres => {
let query_str = format!(
"SELECT setval(pg_get_serial_sequence('{table_name}', 'id'), COALESCE(MAX(id), 0) \
+ 1, false) FROM {table_name}"
);
db.execute(Statement::from_sql_and_values(
DatabaseBackend::Postgres,
&query_str,
vec![],
))
.await?;
}
DatabaseBackend::Sqlite => {
let query_str = format!(
"UPDATE sqlite_sequence SET seq = (SELECT MAX(id) FROM {table_name}) WHERE name = \
'{table_name}'"
);
db.execute(Statement::from_sql_and_values(
DatabaseBackend::Sqlite,
&query_str,
vec![],
))
.await?;
}
DatabaseBackend::MySql => {
return Err(Error::Message(
"Unsupported database backend: MySQL".to_string(),
))
}
}
Ok(())
}
/// Generate entity model.
/// This function using sea-orm-cli.
///
Expand All @@ -277,7 +336,9 @@ pub async fn entities<M: MigratorTrait>(ctx: &AppContext) -> AppResult<String> {
"--output-dir",
"src/models/_entities",
"--database-url",
&ctx.config.database.uri
&ctx.config.database.uri,
"--ignore-tables",
IGNORED_TABLES.join(","),
)
.stderr_to_stdout()
.run()
Expand Down

0 comments on commit 3d1121f

Please sign in to comment.