Skip to content

Commit

Permalink
Refactor process_own_message to not mutate intent state directly (#985)
Browse files Browse the repository at this point in the history
## tl;dr

- Simplifies `process_own_message` to return the desired intent state, instead of having a bunch of DB operations happen in the method
  • Loading branch information
neekolas authored Aug 23, 2024
1 parent 846b635 commit 68d90b2
Showing 1 changed file with 36 additions and 28 deletions.
64 changes: 36 additions & 28 deletions xmtp_mls/src/groups/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,9 @@ impl MlsGroup {
message: ProtocolMessage,
envelope_timestamp_ns: u64,
allow_epoch_increment: bool,
) -> Result<(), MessageProcessingError> {
) -> Result<IntentState, MessageProcessingError> {
if intent.state == IntentState::Committed {
return Ok(());
return Ok(IntentState::Committed);
}
let message_epoch = message.epoch();
let group_epoch = openmls_group.epoch();
Expand Down Expand Up @@ -298,13 +298,12 @@ impl MlsGroup {
let group_epoch_u64 = group_epoch.as_u64();

if published_in_epoch_u64 != group_epoch_u64 {
conn.set_group_intent_to_publish(intent.id)?;
log::warn!(
"Intent was published in epoch {} but group is currently in epoch {}",
published_in_epoch_u64,
group_epoch_u64
);
return Ok(());
return Ok(IntentState::ToPublish);
}
}

Expand All @@ -330,10 +329,9 @@ impl MlsGroup {
.await;

if maybe_validated_commit.is_err() {
conn.set_group_intent_error(intent.id)?;
// Return before merging commit since it does not pass validation
// Return OK so that the group intent update is still written to the DB
return Ok(());
return Ok(IntentState::Error);
}

let validated_commit = maybe_validated_commit.expect("Checked for error");
Expand All @@ -345,7 +343,7 @@ impl MlsGroup {
);
if let Err(err) = openmls_group.merge_staged_commit(&provider, pending_commit) {
log::error!("error merging commit: {}", err);
conn.set_group_intent_to_publish(intent.id)?;
return Ok(IntentState::ToPublish);
} else {
// If no error committing the change, write a transcript message
self.save_transcript_message(conn, validated_commit, envelope_timestamp_ns)?;
Expand All @@ -359,18 +357,15 @@ impl MlsGroup {
message_epoch,
MAX_PAST_EPOCHS,
) {
conn.set_group_intent_to_publish(intent.id)?;
return Ok(());
return Ok(IntentState::ToPublish);
}
if let Some(id) = intent.message_id()? {
conn.set_delivery_status_to_published(&id, envelope_timestamp_ns)?;
}
}
};

conn.set_group_intent_committed(intent.id)?;

Ok(())
Ok(IntentState::Committed)
}

#[tracing::instrument(level = "trace", skip_all)]
Expand Down Expand Up @@ -612,26 +607,39 @@ impl MlsGroup {
match intent {
// Intent with the payload hash matches
Ok(Some(intent)) => {
let intent_id = intent.id;
log::info!(
"client [{}] is about to process own envelope [{}]",
"client [{}] is about to process own envelope [{}] for intent [{}]",
client.inbox_id(),
envelope.id
);
log::info!(
"envelope [{}] is equal to intent [{}]",
envelope.id,
intent.id
intent_id
);
self.process_own_message(
client,
intent,
openmls_group,
provider,
message.into(),
envelope.created_ns,
allow_epoch_increment,
)
.await
match self
.process_own_message(
client,
intent,
openmls_group,
provider,
message.into(),
envelope.created_ns,
allow_epoch_increment,
)
.await?
{
IntentState::ToPublish => {
Ok(provider.conn_ref().set_group_intent_to_publish(intent_id)?)
}
IntentState::Committed => {
Ok(provider.conn_ref().set_group_intent_committed(intent_id)?)
}
IntentState::Published => {
log::warn!("Unexpected behaviour: returned intent state published from process_own_message");
Ok(())
}
IntentState::Error => {
Ok(provider.conn_ref().set_group_intent_error(intent_id)?)
}
}
}
// No matching intent found
Ok(None) => {
Expand Down

0 comments on commit 68d90b2

Please sign in to comment.