Skip to content

Commit

Permalink
chore: use env to use control multiple thread runtime or not (#760)
Browse files Browse the repository at this point in the history
  • Loading branch information
appflowy authored Aug 27, 2024
1 parent 5badffc commit ab14568
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 11 deletions.
3 changes: 3 additions & 0 deletions deploy.env
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,6 @@ APPFLOWY_HISTORY_DATABASE_URL=postgres://postgres:password@postgres:5432/postgre
# AppFlowy Indexer
APPFLOWY_INDEXER_DATABASE_URL=postgres://postgres:password@postgres:5432/postgres
APPFLOWY_INDEXER_REDIS_URL=redis://redis:6379

# AppFlowy Collaborate
APPFLOWY_COLLABORATE_MULTI_THREAD=false
5 changes: 4 additions & 1 deletion dev.env
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,7 @@ APPFLOWY_HISTORY_DATABASE_URL=postgres://postgres:password@postgres:5432/postgre

# AppFlowy Indexer
APPFLOWY_INDEXER_DATABASE_URL=postgres://postgres:password@postgres:5432/postgres
APPFLOWY_INDEXER_REDIS_URL=redis://redis:6379
APPFLOWY_INDEXER_REDIS_URL=redis://redis:6379

# AppFlowy Collaborate
APPFLOWY_COLLABORATE_MULTI_THREAD=false
3 changes: 0 additions & 3 deletions services/appflowy-collaborate/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,3 @@ workspace-access.workspace = true
[dev-dependencies]
rand = "0.8.5"
workspace-template.workspace = true

[features]
collab-rt-multi-thread = []
5 changes: 2 additions & 3 deletions services/appflowy-collaborate/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::fmt::Display;
use std::str::FromStr;

use anyhow::Context;
use secrecy::Secret;
use semver::Version;
use serde::Deserialize;
use sqlx::postgres::{PgConnectOptions, PgSslMode};
use std::fmt::Display;
use std::str::FromStr;

#[derive(Clone, Debug)]
pub struct Config {
Expand Down
22 changes: 19 additions & 3 deletions services/appflowy-collaborate/src/rt_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use crate::group::manager::GroupManager;
use crate::indexer::IndexerProvider;
use crate::metrics::CollabMetricsCalculate;

use crate::config::get_env_var;
use crate::rt_server::collaboration_runtime::COLLAB_RUNTIME;
use crate::state::RedisConnectionManager;
use crate::{spawn_metrics, CollabRealtimeMetrics, RealtimeClientWebsocketSink};

Expand All @@ -37,6 +39,7 @@ pub struct CollaborationServer<S, AC> {
#[allow(dead_code)]
metrics: Arc<CollabRealtimeMetrics>,
metrics_calculate: CollabMetricsCalculate,
enable_custom_runtime: bool,
}

impl<S, AC> CollaborationServer<S, AC>
Expand All @@ -56,8 +59,14 @@ where
edit_state_max_secs: i64,
indexer_provider: Arc<IndexerProvider>,
) -> Result<Self, RealtimeError> {
if cfg!(feature = "collab-rt-multi-thread") {
info!("CollaborationServer with multi-thread feature enabled");
let enable_custom_runtime = get_env_var("APPFLOWY_COLLABORATE_MULTI_THREAD", "false")
.parse::<bool>()
.unwrap_or(false);

if enable_custom_runtime {
info!("CollaborationServer with custom runtime");
} else {
info!("CollaborationServer with actix-web runtime");
}

let metrics_calculate = CollabMetricsCalculate::default();
Expand Down Expand Up @@ -95,6 +104,7 @@ where
group_sender_by_object_id,
metrics,
metrics_calculate,
enable_custom_runtime,
})
}

Expand Down Expand Up @@ -178,6 +188,7 @@ where
let group_sender_by_object_id = self.group_sender_by_object_id.clone();
let client_msg_router_by_user = self.connect_state.client_message_routers.clone();
let group_manager = self.group_manager.clone();
let enable_custom_runtime = self.enable_custom_runtime;

Box::pin(async move {
for (object_id, collab_messages) in message_by_oid {
Expand All @@ -200,7 +211,12 @@ where

let object_id = entry.key().clone();
let clone_notify = notify.clone();
tokio::spawn(runner.run(object_id, clone_notify));
if enable_custom_runtime {
COLLAB_RUNTIME.spawn(runner.run(object_id, clone_notify));
} else {
tokio::spawn(runner.run(object_id, clone_notify));
}

entry.insert(new_sender.clone());

// wait for the runner to be ready to handle the message.
Expand Down
4 changes: 3 additions & 1 deletion services/appflowy-history/src/core/open_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,9 @@ fn apply_updates(messages: &[StreamMessage], collab: &mut Collab) -> Result<(),
let CollabUpdateEvent::UpdateV1 { encode_update } = CollabUpdateEvent::decode(&message.data)?;
let update = Update::decode_v1(&encode_update)
.map_err(|e| CollabError::YrsEncodeStateError(e.to_string()))?;
txn.apply_update(update);
txn
.apply_update(update)
.map_err(|err| HistoryError::Internal(err.into()))?;
}
Ok(())
}
Expand Down

0 comments on commit ab14568

Please sign in to comment.