Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Update client protocol sync interval #761

Merged
merged 4 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions libs/client-api/src/collab_sync/collab_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,15 +191,20 @@ where
let cloned_object = object.clone();
let collab = collab.clone();
let sink = sink.clone();
let sync_reason = match state_vector_v1 {
None => SyncReason::ClientMissUpdates { reason },
Some(sv) => SyncReason::ServerMissUpdates {
state_vector_v1: sv,
reason,
},
};
tokio::spawn(async move {
select! {
_ = new_cancel_token.cancelled() => {
if cfg!(feature = "sync_verbose_log") {
trace!("{} receive cancel signal, cancel pull missing updates", cloned_object.object_id);
}
trace!("{} cancel pull missing updates", cloned_object.object_id);
},
_ = tokio::time::sleep(tokio::time::Duration::from_secs(3)) => {
Self::pull_missing_updates(&cloned_origin, &cloned_object, &collab, &sink, state_vector_v1, reason)
_ = tokio::time::sleep(tokio::time::Duration::from_secs(1)) => {
Self::pull_missing_updates(&cloned_origin, &cloned_object, &collab, &sink, sync_reason)
.await;
}
}
Expand Down Expand Up @@ -289,14 +294,9 @@ where
object: &SyncObject,
collab: &Arc<RwLock<dyn BorrowMut<Collab> + Send + Sync + 'static>>,
sink: &Arc<CollabSink<Sink>>,
state_vector_v1: Option<Vec<u8>>,
reason: MissUpdateReason,
reason: SyncReason,
) {
let lock = collab.read().await;
let reason = SyncReason::MissUpdates {
state_vector_v1,
reason,
};
if let Err(err) = start_sync(origin.clone(), object, (*lock).borrow(), sink, reason) {
error!("Error while start sync: {}", err);
}
Expand Down
71 changes: 33 additions & 38 deletions libs/client-api/src/collab_sync/sync_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,11 @@ where

pub enum SyncReason {
CollabInitialize,
MissUpdates {
state_vector_v1: Option<Vec<u8>>,
ServerMissUpdates {
state_vector_v1: Vec<u8>,
reason: MissUpdateReason,
},
ClientMissUpdates {
reason: MissUpdateReason,
},
ServerCannotApplyUpdate,
Expand All @@ -147,7 +150,8 @@ impl Display for SyncReason {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SyncReason::CollabInitialize => write!(f, "CollabInitialize"),
SyncReason::MissUpdates { reason, .. } => write!(f, "MissUpdates: {}", reason),
SyncReason::ServerMissUpdates { reason, .. } => write!(f, "ServerMissUpdates: {}", reason),
SyncReason::ClientMissUpdates { reason } => write!(f, "ClientMissUpdates: {}", reason),
SyncReason::ServerCannotApplyUpdate => write!(f, "ServerCannotApplyUpdate"),
SyncReason::NetworkResume => write!(f, "NetworkResume"),
}
Expand Down Expand Up @@ -186,46 +190,37 @@ where
E: Into<anyhow::Error> + Send + Sync + 'static,
Sink: SinkExt<Vec<ClientCollabMessage>, Error = E> + Send + Sync + Unpin + 'static,
{
if !sink.should_queue_init_sync() {
return Ok(false);
}

if let Err(err) = sync_object.collab_type.validate_require_data(collab) {
error!("start init sync :{}:{}", sync_object.object_id, err);
return Err(SyncError::Internal(err));
}

match reason {
SyncReason::MissUpdates {
SyncReason::ClientMissUpdates { reason } => {
if !sink.should_queue_init_sync() {
return Ok(false);
}

trace!("🔥{} start sync, reason:{}", &sync_object.object_id, reason);
let awareness = collab.get_awareness();
let payload = gen_sync_state(awareness, &ClientSyncProtocol)?;
sink.queue_init_sync(|msg_id| {
let init_sync = InitSync::new(
origin,
sync_object.object_id.clone(),
sync_object.collab_type.clone(),
sync_object.workspace_id.clone(),
msg_id,
payload,
);
ClientCollabMessage::new_init_sync(init_sync)
});
},
SyncReason::ServerMissUpdates {
state_vector_v1,
reason,
} => match state_vector_v1.and_then(|sv| StateVector::decode_v1(&sv).ok()) {
None => {
trace!(
"🔥{} start init sync, reason:{}",
&sync_object.object_id,
reason
);
let awareness = collab.get_awareness();
let payload = gen_sync_state(awareness, &ClientSyncProtocol)?;
sink.queue_init_sync(|msg_id| {
let init_sync = InitSync::new(
origin,
sync_object.object_id.clone(),
sync_object.collab_type.clone(),
sync_object.workspace_id.clone(),
msg_id,
payload,
);
ClientCollabMessage::new_init_sync(init_sync)
});
},
Some(sv) => {
trace!(
"🔥{} start init sync with state vector, reason:{}",
&sync_object.object_id,
reason
);
} => match StateVector::decode_v1(&state_vector_v1) {
Ok(sv) => {
trace!("🔥{} start sync, reason:{}", &sync_object.object_id, reason);
let update = gen_missing_updates(collab, sv)?;
sink.queue_msg(|msg_id| {
let update_sync = UpdateSync::new(
Expand All @@ -237,18 +232,18 @@ where
ClientCollabMessage::new_update_sync(update_sync)
});
},
Err(err) => error!("fail to decode server state vector: {}", err),
},
SyncReason::CollabInitialize
| SyncReason::ServerCannotApplyUpdate
| SyncReason::NetworkResume => {
trace!(
"🔥{} start init sync, reason: {}",
"🔥{} start sync, reason: {}",
&sync_object.object_id,
reason
);
let awareness = collab.get_awareness();
let payload = gen_sync_state(awareness, &ClientSyncProtocol)?;

sink.queue_init_sync(|msg_id| {
let init_sync = InitSync::new(
origin,
Expand Down
8 changes: 4 additions & 4 deletions libs/client-api/src/ws/msg_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ async fn handle_tick(
maximum_payload_size: usize,
weak_seen_ids: Weak<Mutex<HashSet<SeenId>>>,
) -> (usize, usize) {
let (did_sent_seen_ids, messages_map) = next_batch_message(10, maximum_payload_size, queue).await;
let (did_sent_seen_ids, messages_map) = next_batch_message(20, maximum_payload_size, queue).await;
if messages_map.is_empty() {
return (0, 0);
}
Expand Down Expand Up @@ -227,9 +227,9 @@ fn calculate_next_tick_duration(
Duration::from_secs(1)
} else {
match num_init_sync {
0..=3 => default_interval,
4..=7 => Duration::from_secs(4),
_ => Duration::from_secs(6),
0..=10 => default_interval,
11..=20 => Duration::from_secs(2),
_ => Duration::from_secs(4),
}
}
}
6 changes: 4 additions & 2 deletions libs/collab-rt-protocol/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,11 @@ impl CollabSyncProtocol for ClientSyncProtocol {
update.missing.is_empty()
);
}
let state_vector_v1 = txn.state_vector().encode_v1();

// when client handle sync step 2 and found missing updates, just return MissUpdates Error.
// the state vector should be none that will trigger a client init sync
Err(RTProtocolError::MissUpdates {
state_vector_v1: Some(state_vector_v1),
state_vector_v1: None,
reason: "client miss updates".to_string(),
})
},
Expand Down
Loading