Skip to content

Commit

Permalink
Move staged commit to intents (#984)
Browse files Browse the repository at this point in the history
## tl;dr

- Moves `staged_commits` from being handled by OpenMLS to being stored on each intent
- Adds a mutex for each group's sync operations. Only one sync may happen in parallel at any time per group
- Creates database columns for storing the staged commit and the epoch the commit was published
- Updates a test for message conflicts to better check for forked group states
- I found some cases where we were calling `publish_intents` and then immediately calling `sync`. That should not be necessary, since we call `publish_intents` from inside the sync method
- Adds tests for parallel  and reentrant syncs

## More Info
[https://github.com/xmtp/libxmtp/issues/979](https://github.com/xmtp/libxmtp/issues/979#issuecomment-2303541848)
  • Loading branch information
neekolas authored Aug 23, 2024
1 parent c4d3a6d commit 846b635
Show file tree
Hide file tree
Showing 14 changed files with 466 additions and 103 deletions.
10 changes: 9 additions & 1 deletion bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2465,13 +2465,14 @@ mod tests {
let alix_group = alix.group(group.id()).unwrap();
let bo_group = bo.group(group.id()).unwrap();
let caro_group = caro.group(group.id()).unwrap();

log::info!("Alix sending first message");
// Alix sends a message in the group
alix_group
.send("First message".as_bytes().to_vec())
.await
.unwrap();

log::info!("Caro sending second message");
// Caro sends a message in the group
caro_group
.send("Second message".as_bytes().to_vec())
Expand All @@ -2489,6 +2490,7 @@ mod tests {
.await;
bo_stream_messages.wait_for_ready().await;

log::info!("Alix sending third message after Bo's second installation added");
// Alix sends a message to the group
alix_group
.send("Third message".as_bytes().to_vec())
Expand All @@ -2499,21 +2501,27 @@ mod tests {
bo2.conversations().sync().await.unwrap();
let bo2_group = bo2.group(group.id()).unwrap();

log::info!("Bo sending fourth message");
// Bo sends a message to the group
bo2_group
.send("Fourth message".as_bytes().to_vec())
.await
.unwrap();

log::info!("Caro sending fifth message");
// Caro sends a message in the group
caro_group
.send("Fifth message".as_bytes().to_vec())
.await
.unwrap();

log::info!("Syncing alix");
alix_group.sync().await.unwrap();
log::info!("Syncing bo 1");
bo_group.sync().await.unwrap();
log::info!("Syncing bo 2");
bo2_group.sync().await.unwrap();
log::info!("Syncing caro");
caro_group.sync().await.unwrap();

// Get the message count for all the clients
Expand Down
2 changes: 1 addition & 1 deletion dev/docker/compose
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash
set -eou pipefail

docker-compose -f dev/docker/docker-compose.yml -p "libxmtp" "$@"
docker compose -f dev/docker/docker-compose.yml -p "libxmtp" "$@"
55 changes: 33 additions & 22 deletions xmtp_mls/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,24 @@ name = "update-schema"
path = "src/bin/update-schema.rs"

[features]
bench = [
"test-utils",
"indicatif",
"tracing-subscriber",
"anyhow",
"tracing-flame",
"once_cell",
"xmtp_api_grpc",
]
default = ["native"]
grpc = ["xmtp_proto/grpc"]
http-api = ["xmtp_api_http"]
native = ["libsqlite3-sys/bundled-sqlcipher-vendored-openssl"]
test-utils = []
bench = ["test-utils", "indicatif", "tracing-subscriber", "anyhow", "tracing-flame", "once_cell", "xmtp_api_grpc"]
http-api = ["xmtp_api_http"]

[dependencies]
aes-gcm = { version = "0.10.3", features = ["std"] }
async-stream.workspace = true
async-trait.workspace = true
bincode = "1.3.3"
chrono = { workspace = true }
Expand All @@ -28,68 +37,70 @@ diesel = { version = "2.2.2", features = [
] }
diesel_migrations = { version = "2.2.0", features = ["sqlite"] }
ed25519-dalek = "2.1.1"
ethers.workspace = true
ethers-core.workspace = true
ethers.workspace = true
futures.workspace = true
parking_lot = "0.12.3"
hex.workspace = true
libsqlite3-sys = { version = "0.29.0", optional = true }
log.workspace = true
tracing.workspace = true
openmls = { workspace = true, features = ["test-utils"] }
openmls_basic_credential = { workspace = true }
openmls_rust_crypto = { workspace = true }
openmls_traits = { workspace = true }
parking_lot = "0.12.3"
prost = { workspace = true, features = ["prost-derive"] }
rand = { workspace = true }
reqwest = { version = "0.12.4", features = ["stream"] }
ring = "0.17.8"
scoped-futures = "0.1"
serde = { workspace = true }
serde_json.workspace = true
sha2.workspace = true
smart-default = "0.7.1"
thiserror = { workspace = true }
tls_codec = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "tracing"] }
tokio-stream = { version = "0.1", features = ["sync"] }
async-stream.workspace = true
tokio = { workspace = true, features = [
"macros",
"rt-multi-thread",
"tracing",
] }
tokio-stream = { version = "0.1", features = ["sync"] }
toml = "0.8.4"
tracing.workspace = true
xmtp_cryptography = { workspace = true }
xmtp_id = { path = "../xmtp_id" }
xmtp_proto = { workspace = true, features = ["proto_full", "convert"] }
xmtp_v2 = { path = "../xmtp_v2" }
scoped-futures = "0.1"

# Test/Bench Utils
xmtp_api_grpc = { path = "../xmtp_api_grpc", optional = true }
xmtp_api_http = { path = "../xmtp_api_http", optional = true }
tracing-subscriber = { workspace = true, optional = true }
indicatif = { version = "0.17", optional = true }
anyhow = { workspace = true, optional = true }
tracing-flame = { version = "0.2", optional = true }
indicatif = { version = "0.17", optional = true }
once_cell = { version = "1.19", optional = true }
tracing-flame = { version = "0.2", optional = true }
tracing-subscriber = { workspace = true, optional = true }
xmtp_api_grpc = { path = "../xmtp_api_grpc", optional = true }
xmtp_api_http = { path = "../xmtp_api_http", optional = true }

[dev-dependencies]
anyhow.workspace = true
async-barrier = "1.1"
criterion = { version = "0.5", features = ["html_reports", "async_tokio"] }
ctor.workspace = true
flume = "0.11"
mockall = "0.13.0"
mockito = "1.4.0"
tempfile = "3.5.0"
tracing.workspace = true
tracing-subscriber.workspace = true
tracing-log = "0.2.0"
tracing-subscriber.workspace = true
tracing-test = "0.2.4"
tracing.workspace = true
xmtp_api_grpc = { path = "../xmtp_api_grpc" }
xmtp_id = { path = "../xmtp_id", features = ["test-utils"] }
async-barrier = "1.1"
anyhow.workspace = true
criterion = { version = "0.5", features = ["html_reports", "async_tokio"] }

[[bench]]
name = "group_limit"
harness = false
name = "group_limit"

[[bench]]
name = "crypto"
harness = false

name = "crypto"
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- This file should undo anything in `up.sql`
ALTER TABLE group_intents
DROP COLUMN staged_commit;

ALTER TABLE group_intents
DROP COLUMN published_in_epoch;

Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- Your SQL goes here
ALTER TABLE group_intents
ADD COLUMN staged_commit BLOB;

ALTER TABLE group_intents
ADD COLUMN published_in_epoch BIGINT;

15 changes: 9 additions & 6 deletions xmtp_mls/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::{
},
identity::{parse_credential, Identity, IdentityError},
identity_updates::{load_identity_updates, IdentityUpdateError},
mutex_registry::MutexRegistry,
retry::Retry,
retry_async, retryable,
storage::{
Expand Down Expand Up @@ -137,10 +138,6 @@ pub enum MessageProcessingError {
Identity(#[from] IdentityError),
#[error("openmls process message error: {0}")]
OpenMlsProcessMessage(#[from] openmls::prelude::ProcessMessageError),
#[error("merge pending commit: {0}")]
MergePendingCommit(
#[from] openmls::group::MergePendingCommitError<sql_key_store::SqlKeyStoreError>,
),
#[error("merge staged commit: {0}")]
MergeStagedCommit(#[from] openmls::group::MergeCommitError<sql_key_store::SqlKeyStoreError>),
#[error(
Expand Down Expand Up @@ -178,6 +175,8 @@ pub enum MessageProcessingError {
Group(#[from] Box<GroupError>),
#[error("generic:{0}")]
Generic(String),
#[error("intent is missing staged_commit field")]
IntentMissingStagedCommit,
}

impl crate::retry::RetryableError for MessageProcessingError {
Expand All @@ -186,7 +185,6 @@ impl crate::retry::RetryableError for MessageProcessingError {
Self::Group(group_error) => retryable!(group_error),
Self::Identity(identity_error) => retryable!(identity_error),
Self::OpenMlsProcessMessage(err) => retryable!(err),
Self::MergePendingCommit(err) => retryable!(err),
Self::MergeStagedCommit(err) => retryable!(err),
Self::Diesel(diesel_error) => retryable!(diesel_error),
Self::Storage(s) => retryable!(s),
Expand Down Expand Up @@ -226,6 +224,7 @@ pub struct XmtpMlsLocalContext {
pub(crate) identity: Identity,
/// XMTP Local Storage
pub(crate) store: EncryptedMessageStore,
pub(crate) mutexes: MutexRegistry,
}

impl XmtpMlsLocalContext {
Expand Down Expand Up @@ -269,7 +268,11 @@ where
store: EncryptedMessageStore,
history_sync_url: Option<String>,
) -> Self {
let context = XmtpMlsLocalContext { identity, store };
let context = XmtpMlsLocalContext {
identity,
store,
mutexes: MutexRegistry::new(),
};
let (tx, _) = broadcast::channel(10);
Self {
api_client,
Expand Down
Loading

0 comments on commit 846b635

Please sign in to comment.