Skip to content

Commit

Permalink
feat: sync protocol (#421)
Browse files Browse the repository at this point in the history
* chore: send full update when it's first time to sync

* chore: update last sync at
  • Loading branch information
appflowy authored Mar 27, 2024
1 parent 3e36dae commit d30cf93
Show file tree
Hide file tree
Showing 18 changed files with 593 additions and 620 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,10 @@ inherits = "release"
debug = true

[patch.crates-io]
collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "409058aad0969c4d4429151317428a3d17f341d1" }
collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "409058aad0969c4d4429151317428a3d17f341d1" }
collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "409058aad0969c4d4429151317428a3d17f341d1" }
collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "409058aad0969c4d4429151317428a3d17f341d1" }
collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "d4c4ed1ed1eb115f3d578bf7cf65b5a769c18b0f" }
collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "d4c4ed1ed1eb115f3d578bf7cf65b5a769c18b0f" }
collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "d4c4ed1ed1eb115f3d578bf7cf65b5a769c18b0f" }
collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "d4c4ed1ed1eb115f3d578bf7cf65b5a769c18b0f" }

[features]
custom_env= []
Expand Down
7 changes: 5 additions & 2 deletions libs/client-api/src/collab_sync/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ pub enum SyncError {
#[error("Workspace id is not found")]
NoWorkspaceId,

#[error("Missing broadcast data:{0}")]
MissingBroadcast(String),
#[error("{0}")]
MissingUpdates(String),

#[error(transparent)]
Internal(#[from] anyhow::Error),
Expand All @@ -35,4 +35,7 @@ impl SyncError {
pub fn is_cannot_apply_update(&self) -> bool {
matches!(self, Self::CannotApplyUpdate(_))
}
pub fn is_missing_updates(&self) -> bool {
matches!(self, Self::MissingUpdates(_))
}
}
129 changes: 88 additions & 41 deletions libs/client-api/src/collab_sync/sink.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use crate::af_spawn;
use crate::collab_sync::sink_config::SinkConfig;
use crate::collab_sync::sink_queue::{QueueItem, SinkQueue};
use crate::collab_sync::SyncObject;
use crate::collab_sync::{check_update_contiguous, SyncError, SyncObject};
use futures_util::SinkExt;

use collab_rt_entity::collab_msg::{CollabSinkMessage, MsgId, ServerCollabMessage};
use std::collections::{HashMap, HashSet};
use std::marker::PhantomData;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
use std::sync::{Arc, Weak};
use std::time::Duration;
use std::time::{Duration, Instant};
use tokio::sync::{watch, Mutex};
use tokio::time::{interval, sleep};
use tracing::{error, trace, warn};
Expand Down Expand Up @@ -64,6 +64,7 @@ pub struct CollabSink<Sink, Msg> {
pause: AtomicBool,
object: SyncObject,
flying_messages: Arc<parking_lot::Mutex<HashSet<MsgId>>>,
last_sync: Arc<LastSync>,
}

impl<Sink, Msg> Drop for CollabSink<Sink, Msg> {
Expand Down Expand Up @@ -97,9 +98,11 @@ where
let msg_id_counter = Arc::new(msg_id_counter);
let flying_messages = Arc::new(parking_lot::Mutex::new(HashSet::new()));

let last_sync = Arc::new(LastSync::new());
let mut interval = interval(SEND_INTERVAL);
let weak_notifier = Arc::downgrade(&notifier);
let weak_flying_messages = Arc::downgrade(&flying_messages);
let cloned_last_sync = last_sync.clone();
af_spawn(async move {
// Initial delay to make sure the first tick waits for SEND_INTERVAL
sleep(SEND_INTERVAL).await;
Expand All @@ -110,7 +113,9 @@ where
Some(notifier) => {
// Removing the flying messages allows for the re-sending of the top k messages in the message queue.
if let Some(flying_messages) = weak_flying_messages.upgrade() {
flying_messages.lock().clear();
if cloned_last_sync.should_clear().await {
flying_messages.lock().clear();
}
}

if notifier.send(SinkSignal::Proceed).is_err() {
Expand All @@ -133,6 +138,7 @@ where
pause: AtomicBool::new(pause),
object,
flying_messages,
last_sync,
}
}

Expand All @@ -153,12 +159,12 @@ where
msg_queue.push_msg(msg_id, new_msg);
// msg_queue.extend(requeue_items);
drop(msg_queue);
self.merge();

// Notify the sink to process the next message after 500ms.
let _ = self
.notifier
.send(SinkSignal::ProcessAfterMillis(COLLAB_SINK_DELAY_MILLIS));

self.merge();
}

/// When queue the init message, the sink will clear all the pending messages and send the init
Expand Down Expand Up @@ -216,19 +222,19 @@ where

/// Notify the sink to process the next message and mark the current message as done.
/// Returns bool value to indicate whether the message is valid.
pub async fn validate_response(&self, server_message: &ServerCollabMessage) -> bool {
if server_message.msg_id().is_none() {
// msg_id will be None for [ServerBroadcast] or [ServerAwareness], automatically valid.
return true;
}

pub async fn validate_response(
&self,
msg_id: MsgId,
server_message: &ServerCollabMessage,
seq_num_counter: &Arc<AtomicU32>,
) -> Result<bool, SyncError> {
// safety: msg_id is not None
let income_message_id = server_message.msg_id().unwrap();
let income_message_id = msg_id;
let mut flying_messages = self.flying_messages.lock();

// if the message id is not in the flying messages, it means the message is invalid.
if !flying_messages.contains(&income_message_id) {
return false;
return Ok(false);
}

let mut message_queue = self.message_queue.lock();
Expand All @@ -249,6 +255,17 @@ where
}
}

if is_valid {
if let ServerCollabMessage::ClientAck(ack) = server_message {
let prev_seq_num = seq_num_counter
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(ack.seq_num))
.unwrap();

// Check the seq_num is contiguous.
check_update_contiguous(&self.object.object_id, ack.seq_num, prev_seq_num)?;
}
}

trace!(
"{:?}: pending count:{} ids:{}",
self.object.object_id,
Expand All @@ -265,7 +282,7 @@ where
error!("send sink state failed: {}", e);
}
}
is_valid
Ok(is_valid)
}

async fn process_next_msg(&self) {
Expand Down Expand Up @@ -302,21 +319,24 @@ where
.map(|item| item.into_message())
.collect::<Vec<_>>();
match self.sender.try_lock() {
Ok(mut sender) => match sender.send(messages).await {
Ok(_) => {
trace!(
"🔥 sending {} messages {:?}",
self.object.object_id,
message_ids
);
},
Err(err) => {
error!("Failed to send error: {:?}", err.into());
self
.flying_messages
.lock()
.retain(|id| !message_ids.contains(id));
},
Ok(mut sender) => {
self.last_sync.tick().await;
match sender.send(messages).await {
Ok(_) => {
trace!(
"🔥 sending {} messages {:?}",
self.object.object_id,
message_ids
);
},
Err(err) => {
error!("Failed to send error: {:?}", err.into());
self
.flying_messages
.lock()
.retain(|id| !message_ids.contains(id));
},
}
},
Err(_) => {
warn!("failed to acquire the lock of the sink, retry later");
Expand Down Expand Up @@ -398,10 +418,10 @@ fn get_next_batch_item<Msg>(
where
Msg: CollabSinkMessage,
{
let mut items = vec![];
let mut next_sending_items = vec![];
let mut requeue_items = vec![];
while let Some(item) = msg_queue.pop() {
if items.len() > 20 {
if next_sending_items.len() > 20 {
requeue_items.push(item);
break;
}
Expand All @@ -417,14 +437,15 @@ where
// messages.
requeue_items.push(item);
break;
}

let is_init_sync = item.message().is_client_init_sync();
items.push(item.clone());
requeue_items.push(item);
} else {
let is_init_sync = item.message().is_client_init_sync();
next_sending_items.push(item.clone());
requeue_items.push(item);

if is_init_sync {
break;
// only send one message if the message is init sync message.
if is_init_sync {
break;
}
}
}

Expand All @@ -440,9 +461,12 @@ where
);
}
msg_queue.extend(requeue_items);
let message_ids = items.iter().map(|item| item.msg_id()).collect::<Vec<_>>();
let message_ids = next_sending_items
.iter()
.map(|item| item.msg_id())
.collect::<Vec<_>>();
flying_messages.extend(message_ids);
items
next_sending_items
}

fn retry_later(weak_notifier: Weak<watch::Sender<SinkSignal>>) {
Expand Down Expand Up @@ -506,3 +530,26 @@ impl DefaultMsgIdCounter {
self.0.fetch_add(1, Ordering::SeqCst)
}
}

struct LastSync {
last_sync: Mutex<Instant>,
}

impl LastSync {
fn new() -> Self {
let now = std::time::Instant::now();
LastSync {
last_sync: Mutex::new(now.checked_sub(Duration::from_secs(60)).unwrap_or(now)),
}
}

async fn should_clear(&self) -> bool {
let now = std::time::Instant::now();
now.duration_since(*self.last_sync.lock().await) > SEND_INTERVAL
}

async fn tick(&self) {
let mut last_sync_locked = self.last_sync.lock().await;
*last_sync_locked = std::time::Instant::now();
}
}
Loading

0 comments on commit d30cf93

Please sign in to comment.