diff --git a/Cargo.lock b/Cargo.lock index 968a29880..52cfc5ffd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2365,6 +2365,7 @@ dependencies = [ "collab-entity", "futures", "loole", + "prometheus-client", "prost 0.13.3", "rand 0.8.5", "redis 0.25.4", diff --git a/libs/collab-stream/Cargo.toml b/libs/collab-stream/Cargo.toml index 12bcb1d2e..1384ef914 100644 --- a/libs/collab-stream/Cargo.toml +++ b/libs/collab-stream/Cargo.toml @@ -24,6 +24,7 @@ tokio-util = { version = "0.7" } prost.workspace = true async-stream.workspace = true async-trait.workspace = true +prometheus-client.workspace = true zstd = "0.13" loole = "0.4.0" diff --git a/libs/collab-stream/src/client.rs b/libs/collab-stream/src/client.rs index 2ad9d79ed..027aae82f 100644 --- a/libs/collab-stream/src/client.rs +++ b/libs/collab-stream/src/client.rs @@ -1,6 +1,7 @@ use crate::collab_update_sink::{AwarenessUpdateSink, CollabUpdateSink}; use crate::error::{internal, StreamError}; use crate::lease::{Lease, LeaseAcquisition}; +use crate::metrics::CollabStreamMetrics; use crate::model::{AwarenessStreamUpdate, CollabStreamUpdate, MessageId}; use crate::stream_group::{StreamConfig, StreamGroup}; use crate::stream_router::{StreamRouter, StreamRouterOptions}; @@ -21,14 +22,21 @@ pub struct CollabRedisStream { impl CollabRedisStream { pub const LEASE_TTL: Duration = Duration::from_secs(60); - pub async fn new(redis_client: redis::Client) -> Result { + pub async fn new( + redis_client: redis::Client, + metrics: Arc, + ) -> Result { let router_options = StreamRouterOptions { worker_count: 60, xread_streams: 100, xread_block_millis: Some(5000), xread_count: None, }; - let stream_router = Arc::new(StreamRouter::with_options(&redis_client, router_options)?); + let stream_router = Arc::new(StreamRouter::with_options( + &redis_client, + metrics, + router_options, + )?); let connection_manager = redis_client.get_connection_manager().await?; Ok(Self::new_with_connection_manager( connection_manager, diff --git a/libs/collab-stream/src/lib.rs b/libs/collab-stream/src/lib.rs index ff2c0cad6..efa0fb524 100644 --- a/libs/collab-stream/src/lib.rs +++ b/libs/collab-stream/src/lib.rs @@ -2,6 +2,7 @@ pub mod client; pub mod collab_update_sink; pub mod error; pub mod lease; +pub mod metrics; pub mod model; pub mod pubsub; pub mod stream_group; diff --git a/libs/collab-stream/src/metrics.rs b/libs/collab-stream/src/metrics.rs new file mode 100644 index 000000000..825b9f9f2 --- /dev/null +++ b/libs/collab-stream/src/metrics.rs @@ -0,0 +1,28 @@ +use prometheus_client::metrics::counter::Counter; +use prometheus_client::registry::Registry; + +#[derive(Default)] +pub struct CollabStreamMetrics { + /// Incremented each time a new collab stream read task is set (including recurring tasks). + pub reads_enqueued: Counter, + /// Incremented each time an existing task is consumed (including recurring tasks). + pub reads_dequeued: Counter, +} + +impl CollabStreamMetrics { + pub fn register(registry: &mut Registry) -> Self { + let metrics = Self::default(); + let realtime_registry = registry.sub_registry_with_prefix("collab_stream"); + realtime_registry.register( + "reads_enqueued", + "Incremented each time a new collab stream read task is set (including recurring tasks).", + metrics.reads_enqueued.clone(), + ); + realtime_registry.register( + "reads_dequeued", + "Incremented each time an existing task is consumed (including recurring tasks).", + metrics.reads_dequeued.clone(), + ); + metrics + } +} diff --git a/libs/collab-stream/src/stream_router.rs b/libs/collab-stream/src/stream_router.rs index fb048c1a9..dcbcb7698 100644 --- a/libs/collab-stream/src/stream_router.rs +++ b/libs/collab-stream/src/stream_router.rs @@ -1,3 +1,4 @@ +use crate::metrics::CollabStreamMetrics; use loole::{Receiver, Sender}; use redis::streams::{StreamReadOptions, StreamReadReply}; use redis::Client; @@ -27,14 +28,19 @@ pub struct StreamRouter { alive: Arc, #[allow(dead_code)] workers: Vec, + metrics: Arc, } impl StreamRouter { - pub fn new(client: &Client) -> Result { - Self::with_options(client, Default::default()) + pub fn new(client: &Client, metrics: Arc) -> Result { + Self::with_options(client, metrics, Default::default()) } - pub fn with_options(client: &Client, options: StreamRouterOptions) -> Result { + pub fn with_options( + client: &Client, + metrics: Arc, + options: StreamRouterOptions, + ) -> Result { let alive = Arc::new(AtomicBool::new(true)); let (tx, rx) = loole::unbounded(); let mut workers = Vec::with_capacity(options.worker_count); @@ -47,6 +53,7 @@ impl StreamRouter { rx.clone(), alive.clone(), &options, + metrics.clone(), ); workers.push(worker); } @@ -55,6 +62,7 @@ impl StreamRouter { buf: tx, workers, alive, + metrics, }) } @@ -63,6 +71,7 @@ impl StreamRouter { let last_id = last_id.unwrap_or_else(|| "0".to_string()); let h = StreamHandle::new(stream_key, last_id, tx); self.buf.send(h).unwrap(); + self.metrics.reads_enqueued.inc(); rx } } @@ -118,6 +127,7 @@ impl Worker { rx: Receiver, alive: Arc, options: &StreamRouterOptions, + metrics: Arc, ) -> Self { let mut xread_options = StreamReadOptions::default(); if let Some(block_millis) = options.xread_block_millis { @@ -128,7 +138,7 @@ impl Worker { } let count = options.xread_streams; let handle = std::thread::spawn(move || { - if let Err(err) = Self::process_streams(conn, tx, rx, alive, xread_options, count) { + if let Err(err) = Self::process_streams(conn, tx, rx, alive, xread_options, count, metrics) { tracing::error!("worker {} failed: {}", worker_id, err); } }); @@ -142,11 +152,13 @@ impl Worker { alive: Arc, options: StreamReadOptions, count: usize, + metrics: Arc, ) -> RedisResult<()> { let mut stream_keys = Vec::with_capacity(count); let mut message_ids = Vec::with_capacity(count); let mut senders = HashMap::with_capacity(count); while alive.load(SeqCst) { + // receive next `count` of stream read requests if !Self::read_buf(&rx, &mut stream_keys, &mut message_ids, &mut senders) { break; // rx channel has closed } @@ -158,10 +170,12 @@ impl Worker { continue; } + metrics.reads_dequeued.inc_by(key_count as u64); let result: StreamReadReply = conn.xread_options(&stream_keys, &message_ids, &options)?; let mut msgs = 0; for stream in result.keys { + // for each stream returned from Redis, resolve corresponding subscriber and send messages let mut remove_sender = false; if let Some((sender, idx)) = senders.get(stream.key.as_str()) { for id in stream.ids { @@ -170,7 +184,7 @@ impl Worker { message_ids[*idx].clone_from(&message_id); //TODO: optimize msgs += 1; if let Err(err) = sender.send((message_id, value)) { - tracing::warn!("failed to send: {}", err); + tracing::debug!("failed to send: {}", err); remove_sender = true; } } @@ -188,7 +202,8 @@ impl Worker { key_count ); } - Self::schedule_back(&tx, &mut stream_keys, &mut message_ids, &mut senders); + let scheduled = Self::schedule_back(&tx, &mut stream_keys, &mut message_ids, &mut senders); + metrics.reads_enqueued.inc_by(scheduled as u64); } Ok(()) } @@ -198,21 +213,27 @@ impl Worker { keys: &mut Vec, ids: &mut Vec, senders: &mut HashMap<&str, (StreamSender, usize)>, - ) { + ) -> usize { let keys = keys.drain(..); let mut ids = ids.drain(..); + let mut scheduled = 0; for key in keys { if let Some(last_id) = ids.next() { if let Some((sender, _)) = senders.remove(key.as_str()) { + if sender.is_closed() { + continue; // sender is already closed + } let h = StreamHandle::new(key, last_id, sender); if let Err(err) = tx.send(h) { - tracing::warn!("failed to reschedule: {}", err); + tracing::error!("failed to reschedule: {}", err); break; } + scheduled += 1; } } } senders.clear(); + scheduled } fn read_buf( @@ -276,19 +297,23 @@ impl StreamHandle { #[cfg(test)] mod test { - use crate::stream_router::StreamRouter; + use crate::metrics::CollabStreamMetrics; + use crate::stream_router::{StreamRouter, StreamRouterOptions}; use rand::random; use redis::{Client, Commands, FromRedisValue}; + use std::sync::Arc; use tokio::task::JoinSet; #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn multi_worker_preexisting_messages() { const ROUTES_COUNT: usize = 200; const MSG_PER_ROUTE: usize = 10; + let mut client = Client::open("redis://127.0.0.1/").unwrap(); let keys = init_streams(&mut client, ROUTES_COUNT, MSG_PER_ROUTE); + let metrics = Arc::new(CollabStreamMetrics::default()); - let router = StreamRouter::new(&client).unwrap(); + let router = StreamRouter::new(&client, metrics).unwrap(); let mut join_set = JoinSet::new(); for key in keys { @@ -313,8 +338,9 @@ mod test { const MSG_PER_ROUTE: usize = 10; let mut client = Client::open("redis://127.0.0.1/").unwrap(); let keys = init_streams(&mut client, ROUTES_COUNT, 0); + let metrics = Arc::new(CollabStreamMetrics::default()); - let router = StreamRouter::new(&client).unwrap(); + let router = StreamRouter::new(&client, metrics).unwrap(); let mut join_set = JoinSet::new(); for key in keys.iter() { @@ -348,8 +374,9 @@ mod test { let _: String = client.xadd(&key, "*", &[("data", 1)]).unwrap(); let m2: String = client.xadd(&key, "*", &[("data", 2)]).unwrap(); let m3: String = client.xadd(&key, "*", &[("data", 3)]).unwrap(); + let metrics = Arc::new(CollabStreamMetrics::default()); - let router = StreamRouter::new(&client).unwrap(); + let router = StreamRouter::new(&client, metrics).unwrap(); let mut observer = router.observe(key, Some(m2)); let (msg_id, m) = observer.recv().await.unwrap(); @@ -357,6 +384,51 @@ mod test { assert_eq!(u32::from_redis_value(&m["data"]).unwrap(), 3); } + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn drop_subscription() { + const ROUTES_COUNT: usize = 1; + const MSG_PER_ROUTE: usize = 10; + + let mut client = Client::open("redis://127.0.0.1/").unwrap(); + let mut keys = init_streams(&mut client, ROUTES_COUNT, MSG_PER_ROUTE); + let metrics = Arc::new(CollabStreamMetrics::default()); + + let router = StreamRouter::with_options( + &client, + metrics.clone(), + StreamRouterOptions { + worker_count: 2, + xread_streams: 100, + xread_block_millis: Some(50), + xread_count: None, + }, + ) + .unwrap(); + + let key = keys.pop().unwrap(); + let mut observer = router.observe(key.clone(), None); + for i in 0..MSG_PER_ROUTE { + let (_msg_id, map) = observer.recv().await.unwrap(); + let value = String::from_redis_value(&map["data"]).unwrap(); + assert_eq!(value, format!("{}-{}", key, i)); + } + // drop observer and wait for worker to release + drop(observer); + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + let enqueued = metrics.reads_enqueued.get(); + let dequeued = metrics.reads_dequeued.get(); + assert_eq!(enqueued, dequeued, "dropped observer state"); + + // after dropping observer, no new polling task should be rescheduled + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + assert_eq!(metrics.reads_enqueued.get(), enqueued, "unchanged enqueues"); + assert_eq!(metrics.reads_dequeued.get(), dequeued, "unchanged dequeues"); + + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + assert_eq!(metrics.reads_enqueued.get(), enqueued, "unchanged enqueues"); + assert_eq!(metrics.reads_dequeued.get(), dequeued, "unchanged dequeues"); + } + fn init_streams(client: &mut Client, stream_count: usize, msgs_per_stream: usize) -> Vec { let test_prefix: u32 = random(); let mut keys = Vec::with_capacity(stream_count); diff --git a/libs/collab-stream/tests/collab_stream_test/test_util.rs b/libs/collab-stream/tests/collab_stream_test/test_util.rs index f3b03fa51..d50fe7441 100644 --- a/libs/collab-stream/tests/collab_stream_test/test_util.rs +++ b/libs/collab-stream/tests/collab_stream_test/test_util.rs @@ -1,6 +1,8 @@ use anyhow::Context; use collab_stream::client::CollabRedisStream; +use collab_stream::metrics::CollabStreamMetrics; use rand::{thread_rng, Rng}; +use std::sync::Arc; pub async fn redis_client() -> redis::Client { let redis_uri = "redis://localhost:6379"; @@ -11,7 +13,7 @@ pub async fn redis_client() -> redis::Client { pub async fn stream_client() -> CollabRedisStream { let redis_client = redis_client().await; - CollabRedisStream::new(redis_client) + CollabRedisStream::new(redis_client, Arc::new(CollabStreamMetrics::default())) .await .context("failed to create stream client") .unwrap() diff --git a/services/appflowy-collaborate/src/application.rs b/services/appflowy-collaborate/src/application.rs index 7f1e3aa43..0d9cc9bdb 100644 --- a/services/appflowy-collaborate/src/application.rs +++ b/services/appflowy-collaborate/src/application.rs @@ -24,6 +24,7 @@ use crate::actix_ws::server::RealtimeServerActor; use crate::api::{collab_scope, ws_scope}; use crate::collab::access_control::CollabStorageAccessControlImpl; use access_control::casbin::access::AccessControl; +use collab_stream::metrics::CollabStreamMetrics; use collab_stream::stream_router::{StreamRouter, StreamRouterOptions}; use database::file::s3_client_impl::AwsS3BucketClientImpl; @@ -110,8 +111,12 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result Result, ) -> Result<(redis::aio::ConnectionManager, Arc), Error> { info!("Connecting to redis with uri: {}", redis_uri); let client = redis::Client::open(redis_uri).context("failed to connect to redis")?; let router = StreamRouter::with_options( &client, + metrics, StreamRouterOptions { worker_count, xread_streams: 100, diff --git a/services/appflowy-collaborate/src/state.rs b/services/appflowy-collaborate/src/state.rs index 58eca5d22..fe9b58da2 100644 --- a/services/appflowy-collaborate/src/state.rs +++ b/services/appflowy-collaborate/src/state.rs @@ -13,6 +13,7 @@ use crate::pg_listener::PgListeners; use crate::CollabRealtimeMetrics; use access_control::metrics::AccessControlMetrics; use app_error::AppError; +use collab_stream::metrics::CollabStreamMetrics; use collab_stream::stream_router::StreamRouter; use database::user::{select_all_uid_uuid, select_uid_from_uuid}; use indexer::metrics::EmbeddingMetrics; @@ -40,6 +41,7 @@ pub struct AppMetrics { pub access_control_metrics: Arc, pub realtime_metrics: Arc, pub collab_metrics: Arc, + pub collab_stream_metrics: Arc, pub embedding_metrics: Arc, } @@ -55,12 +57,14 @@ impl AppMetrics { let access_control_metrics = Arc::new(AccessControlMetrics::register(&mut registry)); let realtime_metrics = Arc::new(CollabRealtimeMetrics::register(&mut registry)); let collab_metrics = Arc::new(CollabMetrics::register(&mut registry)); + let collab_stream_metrics = Arc::new(CollabStreamMetrics::register(&mut registry)); let embedding_metrics = Arc::new(EmbeddingMetrics::register(&mut registry)); Self { registry: Arc::new(registry), access_control_metrics, realtime_metrics, collab_metrics, + collab_stream_metrics, embedding_metrics, } } diff --git a/src/application.rs b/src/application.rs index 7f96a97b3..2196f3670 100644 --- a/src/application.rs +++ b/src/application.rs @@ -42,6 +42,7 @@ use appflowy_collaborate::collab::storage::CollabStorageImpl; use appflowy_collaborate::command::{CLCommandReceiver, CLCommandSender}; use appflowy_collaborate::snapshot::SnapshotControl; use appflowy_collaborate::CollaborationServer; +use collab_stream::metrics::CollabStreamMetrics; use collab_stream::stream_router::{StreamRouter, StreamRouterOptions}; use database::file::s3_client_impl::{AwsS3BucketClientImpl, S3BucketStorage}; use indexer::collab_indexer::IndexerProvider; @@ -248,8 +249,12 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result, ) -> Result<(redis::aio::ConnectionManager, Arc), Error> { info!("Connecting to redis with uri: {}", redis_uri); let client = redis::Client::open(redis_uri).context("failed to connect to redis")?; let router = StreamRouter::with_options( &client, + metrics, StreamRouterOptions { worker_count, xread_streams: 100, diff --git a/src/state.rs b/src/state.rs index 1659d43de..661caa914 100644 --- a/src/state.rs +++ b/src/state.rs @@ -17,6 +17,7 @@ use appflowy_collaborate::collab::cache::CollabCache; use appflowy_collaborate::collab::storage::CollabAccessControlStorage; use appflowy_collaborate::metrics::CollabMetrics; use appflowy_collaborate::CollabRealtimeMetrics; +use collab_stream::metrics::CollabStreamMetrics; use collab_stream::stream_router::StreamRouter; use database::file::s3_client_impl::{AwsS3BucketClientImpl, S3BucketStorage}; use database::user::{select_all_uid_uuid, select_uid_from_uuid}; @@ -128,6 +129,7 @@ pub struct AppMetrics { pub published_collab_metrics: Arc, pub appflowy_web_metrics: Arc, pub embedding_metrics: Arc, + pub collab_stream_metrics: Arc, } impl Default for AppMetrics { @@ -146,6 +148,7 @@ impl AppMetrics { let published_collab_metrics = Arc::new(PublishedCollabMetrics::register(&mut registry)); let appflowy_web_metrics = Arc::new(AppFlowyWebMetrics::register(&mut registry)); let embedding_metrics = Arc::new(EmbeddingMetrics::register(&mut registry)); + let collab_stream_metrics = Arc::new(CollabStreamMetrics::register(&mut registry)); Self { registry: Arc::new(registry), request_metrics, @@ -155,6 +158,7 @@ impl AppMetrics { published_collab_metrics, appflowy_web_metrics, embedding_metrics, + collab_stream_metrics, } } }