Skip to content

Commit

Permalink
chore: Update client protocol sync interval (AppFlowy-IO#761)
Browse files Browse the repository at this point in the history
* chore: separate server/client miss update

* chore: short pull timeout

* chore: short tick interval

* chore: fix clippy
  • Loading branch information
appflowy authored Aug 28, 2024
1 parent 0462d97 commit 76fe082
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 55 deletions.
22 changes: 11 additions & 11 deletions libs/client-api/src/collab_sync/collab_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,15 +191,20 @@ where
let cloned_object = object.clone();
let collab = collab.clone();
let sink = sink.clone();
let sync_reason = match state_vector_v1 {
None => SyncReason::ClientMissUpdates { reason },
Some(sv) => SyncReason::ServerMissUpdates {
state_vector_v1: sv,
reason,
},
};
tokio::spawn(async move {
select! {
_ = new_cancel_token.cancelled() => {
if cfg!(feature = "sync_verbose_log") {
trace!("{} receive cancel signal, cancel pull missing updates", cloned_object.object_id);
}
trace!("{} cancel pull missing updates", cloned_object.object_id);
},
_ = tokio::time::sleep(tokio::time::Duration::from_secs(3)) => {
Self::pull_missing_updates(&cloned_origin, &cloned_object, &collab, &sink, state_vector_v1, reason)
_ = tokio::time::sleep(tokio::time::Duration::from_secs(1)) => {
Self::pull_missing_updates(&cloned_origin, &cloned_object, &collab, &sink, sync_reason)
.await;
}
}
Expand Down Expand Up @@ -289,14 +294,9 @@ where
object: &SyncObject,
collab: &Arc<RwLock<dyn BorrowMut<Collab> + Send + Sync + 'static>>,
sink: &Arc<CollabSink<Sink>>,
state_vector_v1: Option<Vec<u8>>,
reason: MissUpdateReason,
reason: SyncReason,
) {
let lock = collab.read().await;
let reason = SyncReason::MissUpdates {
state_vector_v1,
reason,
};
if let Err(err) = start_sync(origin.clone(), object, (*lock).borrow(), sink, reason) {
error!("Error while start sync: {}", err);
}
Expand Down
71 changes: 33 additions & 38 deletions libs/client-api/src/collab_sync/sync_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,11 @@ where

pub enum SyncReason {
CollabInitialize,
MissUpdates {
state_vector_v1: Option<Vec<u8>>,
ServerMissUpdates {
state_vector_v1: Vec<u8>,
reason: MissUpdateReason,
},
ClientMissUpdates {
reason: MissUpdateReason,
},
ServerCannotApplyUpdate,
Expand All @@ -147,7 +150,8 @@ impl Display for SyncReason {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SyncReason::CollabInitialize => write!(f, "CollabInitialize"),
SyncReason::MissUpdates { reason, .. } => write!(f, "MissUpdates: {}", reason),
SyncReason::ServerMissUpdates { reason, .. } => write!(f, "ServerMissUpdates: {}", reason),
SyncReason::ClientMissUpdates { reason } => write!(f, "ClientMissUpdates: {}", reason),
SyncReason::ServerCannotApplyUpdate => write!(f, "ServerCannotApplyUpdate"),
SyncReason::NetworkResume => write!(f, "NetworkResume"),
}
Expand Down Expand Up @@ -186,46 +190,37 @@ where
E: Into<anyhow::Error> + Send + Sync + 'static,
Sink: SinkExt<Vec<ClientCollabMessage>, Error = E> + Send + Sync + Unpin + 'static,
{
if !sink.should_queue_init_sync() {
return Ok(false);
}

if let Err(err) = sync_object.collab_type.validate_require_data(collab) {
error!("start init sync :{}:{}", sync_object.object_id, err);
return Err(SyncError::Internal(err));
}

match reason {
SyncReason::MissUpdates {
SyncReason::ClientMissUpdates { reason } => {
if !sink.should_queue_init_sync() {
return Ok(false);
}

trace!("🔥{} start sync, reason:{}", &sync_object.object_id, reason);
let awareness = collab.get_awareness();
let payload = gen_sync_state(awareness, &ClientSyncProtocol)?;
sink.queue_init_sync(|msg_id| {
let init_sync = InitSync::new(
origin,
sync_object.object_id.clone(),
sync_object.collab_type.clone(),
sync_object.workspace_id.clone(),
msg_id,
payload,
);
ClientCollabMessage::new_init_sync(init_sync)
});
},
SyncReason::ServerMissUpdates {
state_vector_v1,
reason,
} => match state_vector_v1.and_then(|sv| StateVector::decode_v1(&sv).ok()) {
None => {
trace!(
"🔥{} start init sync, reason:{}",
&sync_object.object_id,
reason
);
let awareness = collab.get_awareness();
let payload = gen_sync_state(awareness, &ClientSyncProtocol)?;
sink.queue_init_sync(|msg_id| {
let init_sync = InitSync::new(
origin,
sync_object.object_id.clone(),
sync_object.collab_type.clone(),
sync_object.workspace_id.clone(),
msg_id,
payload,
);
ClientCollabMessage::new_init_sync(init_sync)
});
},
Some(sv) => {
trace!(
"🔥{} start init sync with state vector, reason:{}",
&sync_object.object_id,
reason
);
} => match StateVector::decode_v1(&state_vector_v1) {
Ok(sv) => {
trace!("🔥{} start sync, reason:{}", &sync_object.object_id, reason);
let update = gen_missing_updates(collab, sv)?;
sink.queue_msg(|msg_id| {
let update_sync = UpdateSync::new(
Expand All @@ -237,18 +232,18 @@ where
ClientCollabMessage::new_update_sync(update_sync)
});
},
Err(err) => error!("fail to decode server state vector: {}", err),
},
SyncReason::CollabInitialize
| SyncReason::ServerCannotApplyUpdate
| SyncReason::NetworkResume => {
trace!(
"🔥{} start init sync, reason: {}",
"🔥{} start sync, reason: {}",
&sync_object.object_id,
reason
);
let awareness = collab.get_awareness();
let payload = gen_sync_state(awareness, &ClientSyncProtocol)?;

sink.queue_init_sync(|msg_id| {
let init_sync = InitSync::new(
origin,
Expand Down
8 changes: 4 additions & 4 deletions libs/client-api/src/ws/msg_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ async fn handle_tick(
maximum_payload_size: usize,
weak_seen_ids: Weak<Mutex<HashSet<SeenId>>>,
) -> (usize, usize) {
let (did_sent_seen_ids, messages_map) = next_batch_message(10, maximum_payload_size, queue).await;
let (did_sent_seen_ids, messages_map) = next_batch_message(20, maximum_payload_size, queue).await;
if messages_map.is_empty() {
return (0, 0);
}
Expand Down Expand Up @@ -227,9 +227,9 @@ fn calculate_next_tick_duration(
Duration::from_secs(1)
} else {
match num_init_sync {
0..=3 => default_interval,
4..=7 => Duration::from_secs(4),
_ => Duration::from_secs(6),
0..=10 => default_interval,
11..=20 => Duration::from_secs(2),
_ => Duration::from_secs(4),
}
}
}
6 changes: 4 additions & 2 deletions libs/collab-rt-protocol/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,11 @@ impl CollabSyncProtocol for ClientSyncProtocol {
update.missing.is_empty()
);
}
let state_vector_v1 = txn.state_vector().encode_v1();

// when client handle sync step 2 and found missing updates, just return MissUpdates Error.
// the state vector should be none that will trigger a client init sync
Err(RTProtocolError::MissUpdates {
state_vector_v1: Some(state_vector_v1),
state_vector_v1: None,
reason: "client miss updates".to_string(),
})
},
Expand Down

0 comments on commit 76fe082

Please sign in to comment.