Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add migration manager #255

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open

Add migration manager #255

wants to merge 4 commits into from

Conversation

amirRamirfatahi
Copy link
Collaborator

@amirRamirfatahi amirRamirfatahi commented Dec 17, 2024

Fixes #143

Pre-submission Checklist

For tests to work you need a working neo4j and redis instance with the example dataset in docker/db-graph

  • Testing: Implement and pass new tests for the new features/fixes, cargo test.
  • Performance: Ensure new code has relevant performance benchmarks, cargo bench

@SHAcollision
Copy link
Collaborator

Is this inteded only for Redis keys/cache migration?

If so, can we first create an MVP "migration" with the minimal amount of complexity and code keeping it extremely simple?

What's the overall flow and usage? I'd say we can do something straight forward. E.g:
I'd say we need to version the Redis schemas. The current version will always be availble as a Redis value. On service/watcher setup (start-up), we check whether current version is the latest. If it's not latest, apply migrations.

@amirRamirfatahi amirRamirfatahi marked this pull request as ready for review December 19, 2024 09:38
README.md Outdated
cargo run --bin migrations new MigrationNAME
```

This will generate a new migration file in the `migrations/` directory.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe be more specific of the migration path? src/db/migrations/migrations_list

README.md Outdated

The Migration Manager uses a phased approach to handle data migrations safely and systematically. Each phase serves a distinct purpose in transitioning data from the old source to the new source, ensuring consistency and minimal disruption. Here's an overview of the phases:

- **Dual Write**: During this phase, all writes to the old source are mirrored to the new source. This ensures that both sources remain synchronized during normal application operations. Developers invoke MigrationManager::dual_write in the application logic for this purpose. Once dual writes are stable and verified, the migration can progress to the next phase.
Copy link
Collaborator

@tipogi tipogi Jan 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

application logic

application data layer logic?

README.md Outdated
The Migration Manager uses a phased approach to handle data migrations safely and systematically. Each phase serves a distinct purpose in transitioning data from the old source to the new source, ensuring consistency and minimal disruption. Here's an overview of the phases:

- **Dual Write**: During this phase, all writes to the old source are mirrored to the new source. This ensures that both sources remain synchronized during normal application operations. Developers invoke MigrationManager::dual_write in the application logic for this purpose. Once dual writes are stable and verified, the migration can progress to the next phase.
**Note**: Mark a migration as ready for backfill phase using the `MIGRATIONS_BACKFILL_READY` env var.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The migration has to be referenced by migration id

README.md Outdated
```

This will generate a new migration file in the `migrations/` directory.
Next, register your migration in the `get_migration_manager` function, which ensures it is included in the migration lifecycle. Once registered, implement the required phases (dual_write, backfill, cutover, and cleanup) in the generated file. Each phase serves a specific purpose in safely transitioning data between the old and new sources. After implementing your migration, run the migrations binary to execute pending migrations:
Copy link
Collaborator

@tipogi tipogi Jan 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_migration_manager

get_migration_manager in src/db/migrations/mod.rs file

Comment on lines +30 to +39
fn to_string(&self) -> &str {
match self {
MigrationPhase::DualWrite => "dual_write",
MigrationPhase::Backfill => "backfill",
MigrationPhase::Cutover => "cutover",
MigrationPhase::Cleanup => "cleanup",
MigrationPhase::Done => "done",
}
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need that implementation having the Serialize trait from Serde??

serde_json::to_string(&MigrationPhase::Backfill).unwrap()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this much heavier than just implementing Display?

}

pub async fn new_migration(name: &str) -> Result<(), DynError> {
let migration_name = format!("{}{}", name, Utc::now().timestamp());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could calculate once and add to the names the timestamp, instead of generate twice

pub async fn new_migration(name: &str) -> Result<(), DynError> {
let migration_name = format!("{}{}", name, Utc::now().timestamp());
let migration_file_name = format!("{}_{}", to_snake_case(name), Utc::now().timestamp());
let migration_template = get_migration_template(migration_name.as_str());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could use the reference & instead of as_str()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there's any difference at all. I took the convention to use .as_str() as it's much cleaner IMO.

let migration_template = get_migration_template(migration_name.as_str());
let file_path = format!(
"src/db/migrations/migrations_list/{}.rs",
migration_file_name.as_str()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need for as_str() because the String already implements Deref trait

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to use the same string again. So instead of cloning it, I burrowed it here.

"src/db/migrations/migrations_list/{}.rs",
migration_file_name.as_str()
);
tokio::fs::write(file_path.clone(), migration_template).await?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe throw more customised error:

.map_err(|e| format!("Failed to write migration file at {}: {}", file_path, e))?;

Comment on lines +114 to +118
let mut mod_file = tokio::fs::OpenOptions::new()
.append(true)
.open(mod_file_path)
.await?;
tokio::io::AsyncWriteExt::write_all(&mut mod_file, mod_file_content.as_bytes()).await?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option is to do in that way and also throw some errors meanwhile:

OpenOptions::new()
        .append(true)
        .open(mod_file_path)
        .await
        .map_err(|e| format!("Failed to open mod.rs at {}: {}", mod_file_path, e))?
        .write_all(mod_file_content.as_bytes())
        .await
        .map_err(|e| format!("Failed to write to mod.rs at {}: {}", mod_file_path, e))?;

Here if the File (which is the return type of open) implements the tokio::io::AsyncWriteExt trait, we could chain it all the operation in one

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this improves the readability at all either.

}

pub struct MigrationManager {
graph: Arc<Mutex<Graph>>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it some reason to not use already the helpers functions instead of have a graph instance:

  • exec_single_row
  • retrieve_from_graph
    Like this we will not need to create more boilerplate code. I think with that two functions would be enough. Also with default trait would be enough for MigrationManager

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't know what's gonna be the required functions to run inside a migration. Most frequently, you're gonna want to start transactions and commit them when you're done so migrations should have total freedom.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not see the sense to keep repeating code. We will see if it is useful to do it in that way in the future...

Comment on lines +129 to +142
let stored_migrations = self.get_migrations().await?;
// update any migration marked as ready for backfill
for stored_migration in &stored_migrations {
if config
.migrations_backfill_ready
.contains(&stored_migration.id)
{
self.update_migration_phase(&stored_migration.id, &MigrationPhase::Backfill)
.await?;
}
}

// get all migrations from the database
let stored_migrations = self.get_migrations().await?;
Copy link
Collaborator

@tipogi tipogi Jan 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could not be better to edit the migration instance instead of calling again to the database to get the migration?

for stored_migration in &mut stored_migrations {
   ...
   stored_migration.phase = MigrationPhase::Backfill;
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two steps are separate from each other. There's no harm in updating the DB and getting from it again.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnessesary

stored_migration = Some(new_migration_node.clone());
}
}
let stored_migration = stored_migration.unwrap();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ unsafe

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not in the server. So we do actually want to panic if this ever fails. So it's not unsafe.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want to panic() and stop also other ongoing migrations? I do not think so...

println!("Migration {} is already done", migration_id);
continue;
}
println!(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe instead of println! use info!?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some of them need to stay but I changed the rest.

self.graph.lock().await.run(query).await?;
Ok(())
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using already commented functions, these above three functions could refactor at least in two functions

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know what you mean? Are you talking about exec_single_row?
I'd rather not mix these two parts together. Think of it as more like a library you'd use.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am referring to the content of the function. From 240-247 and 255-263 are the same code. It could be part of the utils.rs file as I mention above as update_migration_phase

Comment on lines +257 to +336
fn get_migration_template(name: &str) -> String {
format!(
"use axum::async_trait;

use crate::db::migrations::manager::Migration;
use crate::types::DynError;

pub struct {name};

#[async_trait]
impl Migration for {name} {{
fn id(&self) -> &'static str {{
\"{name}\"
}}

fn is_multi_staged(&self) -> bool {{
true
}}

async fn dual_write(data: Box<dyn std::any::Any + Send + 'static>) -> Result<(), DynError> {{
// Implement your dual write logic here. Downcast data to your struct type.
Ok(())
}}

async fn backfill(&self) -> Result<(), DynError> {{
// Your backfill logic here
Ok(())
}}

async fn cutover(&self) -> Result<(), DynError> {{
// Your cutover logic here
Ok(())
}}

async fn cleanup(&self) -> Result<(), DynError> {{
// Your cleanup logic here
Ok(())
}}

}}
",
name = name
)
}

fn to_snake_case(input: &str) -> String {
let mut result = String::new();
let mut prev_was_upper = false;

for (i, c) in input.chars().enumerate() {
if c.is_uppercase() {
if i > 0 && !prev_was_upper {
result.push('_');
}
result.push(c.to_ascii_lowercase());
prev_was_upper = true;
} else {
result.push(c);
prev_was_upper = false;
}
}

result
}

#[cfg(test)]
mod tests {
#[tokio_shared_rt::test(shared)]
async fn test_to_snake_case() {
assert_eq!(super::to_snake_case("CamelCase"), "camel_case");
assert_eq!(super::to_snake_case("PascalCase"), "pascal_case");
assert_eq!(super::to_snake_case("snake_case"), "snake_case");
assert_eq!(super::to_snake_case("kebab-case"), "kebab-case");
assert_eq!(super::to_snake_case("UPPERCASE"), "uppercase");
assert_eq!(super::to_snake_case("lowercase"), "lowercase");
assert_eq!(super::to_snake_case("12345"), "12345");
assert_eq!(super::to_snake_case("snake_case_123"), "snake_case_123");
assert_eq!(super::to_snake_case("UserNewField"), "user_new_field");
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would create another file (utils.rs, generetor.rs, ...) for that kind of utility functions

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I said I want to keep the migration separate from server code and this function then only would be used here so it's fine to keep it local to the file.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be under db/migrations/utils.rs

Comment on lines 11 to 15
pub fn get_migration_manager(graph: Arc<Mutex<Graph>>) -> MigrationManager {
// let migration_manager = MigrationManager::new(graph);
// migration_manager.register(Box::new(MigrationX));
MigrationManager::new(graph)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document a bit that one

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a line but it's already documented in the DB and there's also an example for you.

Copy link
Collaborator

@tipogi tipogi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some clean up need it but It looks good that migration skeleton

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Discussion: add a reindex (or migrations) binary to run on breaking index changes
3 participants