Skip to content

Commit

Permalink
chore: add preliminary check for enabled indexing to batch collab insert
Browse files Browse the repository at this point in the history
  • Loading branch information
Horusiath committed Dec 19, 2024
1 parent d5252f4 commit c1d94e3
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 11 deletions.
1 change: 0 additions & 1 deletion services/appflowy-collaborate/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ async fn post_realtime_message_stream_handler(
bytes.extend_from_slice(&item?);
}

event!(tracing::Level::INFO, "message len: {}", bytes.len());
let device_id = device_id.to_string();

let message = parser_realtime_msg(bytes.freeze(), req.clone()).await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ impl IndexerScheduler {
Ok(())
}

pub fn is_indexing_enabled(&self, collab_type: &CollabType) -> bool {
self.indexer_provider.is_indexing_enabled(collab_type)
}

pub fn index_encoded_collabs(
&self,
workspace_id: &str,
Expand Down
4 changes: 4 additions & 0 deletions services/appflowy-collaborate/src/indexer/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,8 @@ impl IndexerProvider {
pub fn indexer_for(&self, collab_type: &CollabType) -> Option<Arc<dyn Indexer>> {
self.indexer_cache.get(collab_type).cloned()
}

pub fn is_indexing_enabled(&self, collab_type: &CollabType) -> bool {
self.indexer_cache.contains_key(collab_type)
}
}
23 changes: 14 additions & 9 deletions src/api/workspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -816,8 +816,7 @@ async fn batch_create_collab_handler(
let total_size = collab_params_list
.iter()
.fold(0, |acc, x| acc + x.encoded_collab_v1.len());
event!(
tracing::Level::INFO,
tracing::info!(
"decompressed {} collab objects in {:?}",
collab_params_list.len(),
start.elapsed()
Expand All @@ -828,10 +827,18 @@ async fn batch_create_collab_handler(
.can_index_workspace(&workspace_id)
.await?
{
state.indexer_scheduler.index_encoded_collabs(
&workspace_id,
collab_params_list.iter().map(IndexedCollab::from).collect(),
)?;
let indexed_collabs: Vec<_> = collab_params_list
.iter()
.filter(|p| state.indexer_scheduler.is_indexing_enabled(&p.collab_type))
.map(IndexedCollab::from)
.collect();

let len = indexed_collabs.len();
state
.indexer_scheduler
.index_encoded_collabs(&workspace_id, indexed_collabs)?;

tracing::info!("scheduled indexing for {} collabs", len);
}

let start = Instant::now();
Expand All @@ -840,8 +847,7 @@ async fn batch_create_collab_handler(
.batch_insert_new_collab(&workspace_id, &uid, collab_params_list)
.await?;

event!(
tracing::Level::INFO,
tracing::info!(
"inserted collab objects to disk in {:?}, total size:{}",
start.elapsed(),
total_size
Expand Down Expand Up @@ -1825,7 +1831,6 @@ async fn post_realtime_message_stream_handler(
bytes.extend_from_slice(&item?);
}

event!(tracing::Level::INFO, "message len: {}", bytes.len());
let device_id = device_id.to_string();

let message = parser_realtime_msg(bytes.freeze(), req.clone()).await?;
Expand Down
2 changes: 1 addition & 1 deletion src/biz/workspace/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,7 @@ pub async fn broadcast_update(
oid: &str,
encoded_update: Vec<u8>,
) -> Result<(), AppError> {
tracing::info!("broadcasting update to group: {}", oid);
tracing::trace!("broadcasting update to group: {}", oid);
let payload = Message::Sync(SyncMessage::Update(encoded_update)).encode_v1();
let msg = ClientCollabMessage::ClientUpdateSync {
data: UpdateSync {
Expand Down

0 comments on commit c1d94e3

Please sign in to comment.