Skip to content

Commit

Permalink
Merge pull request #1116 from AppFlowy-IO/collab-stream-metrics
Browse files Browse the repository at this point in the history
chore: add metrics to redis collab stream
  • Loading branch information
khorshuheng authored Jan 3, 2025
2 parents 9d63113 + 9bd6589 commit 424796e
Show file tree
Hide file tree
Showing 11 changed files with 154 additions and 19 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions libs/collab-stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ tokio-util = { version = "0.7" }
prost.workspace = true
async-stream.workspace = true
async-trait.workspace = true
prometheus-client.workspace = true
zstd = "0.13"
loole = "0.4.0"

Expand Down
12 changes: 10 additions & 2 deletions libs/collab-stream/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::collab_update_sink::{AwarenessUpdateSink, CollabUpdateSink};
use crate::error::{internal, StreamError};
use crate::lease::{Lease, LeaseAcquisition};
use crate::metrics::CollabStreamMetrics;
use crate::model::{AwarenessStreamUpdate, CollabStreamUpdate, MessageId};
use crate::stream_group::{StreamConfig, StreamGroup};
use crate::stream_router::{StreamRouter, StreamRouterOptions};
Expand All @@ -21,14 +22,21 @@ pub struct CollabRedisStream {
impl CollabRedisStream {
pub const LEASE_TTL: Duration = Duration::from_secs(60);

pub async fn new(redis_client: redis::Client) -> Result<Self, redis::RedisError> {
pub async fn new(
redis_client: redis::Client,
metrics: Arc<CollabStreamMetrics>,
) -> Result<Self, redis::RedisError> {
let router_options = StreamRouterOptions {
worker_count: 60,
xread_streams: 100,
xread_block_millis: Some(5000),
xread_count: None,
};
let stream_router = Arc::new(StreamRouter::with_options(&redis_client, router_options)?);
let stream_router = Arc::new(StreamRouter::with_options(
&redis_client,
metrics,
router_options,
)?);
let connection_manager = redis_client.get_connection_manager().await?;
Ok(Self::new_with_connection_manager(
connection_manager,
Expand Down
1 change: 1 addition & 0 deletions libs/collab-stream/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod client;
pub mod collab_update_sink;
pub mod error;
pub mod lease;
pub mod metrics;
pub mod model;
pub mod pubsub;
pub mod stream_group;
Expand Down
28 changes: 28 additions & 0 deletions libs/collab-stream/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use prometheus_client::metrics::counter::Counter;
use prometheus_client::registry::Registry;

#[derive(Default)]
pub struct CollabStreamMetrics {
/// Incremented each time a new collab stream read task is set (including recurring tasks).
pub reads_enqueued: Counter,
/// Incremented each time an existing task is consumed (including recurring tasks).
pub reads_dequeued: Counter,
}

impl CollabStreamMetrics {
pub fn register(registry: &mut Registry) -> Self {
let metrics = Self::default();
let realtime_registry = registry.sub_registry_with_prefix("collab_stream");
realtime_registry.register(
"reads_enqueued",
"Incremented each time a new collab stream read task is set (including recurring tasks).",
metrics.reads_enqueued.clone(),
);
realtime_registry.register(
"reads_dequeued",
"Incremented each time an existing task is consumed (including recurring tasks).",
metrics.reads_dequeued.clone(),
);
metrics
}
}
96 changes: 84 additions & 12 deletions libs/collab-stream/src/stream_router.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::metrics::CollabStreamMetrics;
use loole::{Receiver, Sender};
use redis::streams::{StreamReadOptions, StreamReadReply};
use redis::Client;
Expand Down Expand Up @@ -27,14 +28,19 @@ pub struct StreamRouter {
alive: Arc<AtomicBool>,
#[allow(dead_code)]
workers: Vec<Worker>,
metrics: Arc<CollabStreamMetrics>,
}

impl StreamRouter {
pub fn new(client: &Client) -> Result<Self, RedisError> {
Self::with_options(client, Default::default())
pub fn new(client: &Client, metrics: Arc<CollabStreamMetrics>) -> Result<Self, RedisError> {
Self::with_options(client, metrics, Default::default())
}

pub fn with_options(client: &Client, options: StreamRouterOptions) -> Result<Self, RedisError> {
pub fn with_options(
client: &Client,
metrics: Arc<CollabStreamMetrics>,
options: StreamRouterOptions,
) -> Result<Self, RedisError> {
let alive = Arc::new(AtomicBool::new(true));
let (tx, rx) = loole::unbounded();
let mut workers = Vec::with_capacity(options.worker_count);
Expand All @@ -47,6 +53,7 @@ impl StreamRouter {
rx.clone(),
alive.clone(),
&options,
metrics.clone(),
);
workers.push(worker);
}
Expand All @@ -55,6 +62,7 @@ impl StreamRouter {
buf: tx,
workers,
alive,
metrics,
})
}

Expand All @@ -63,6 +71,7 @@ impl StreamRouter {
let last_id = last_id.unwrap_or_else(|| "0".to_string());
let h = StreamHandle::new(stream_key, last_id, tx);
self.buf.send(h).unwrap();
self.metrics.reads_enqueued.inc();
rx
}
}
Expand Down Expand Up @@ -118,6 +127,7 @@ impl Worker {
rx: Receiver<StreamHandle>,
alive: Arc<AtomicBool>,
options: &StreamRouterOptions,
metrics: Arc<CollabStreamMetrics>,
) -> Self {
let mut xread_options = StreamReadOptions::default();
if let Some(block_millis) = options.xread_block_millis {
Expand All @@ -128,7 +138,7 @@ impl Worker {
}
let count = options.xread_streams;
let handle = std::thread::spawn(move || {
if let Err(err) = Self::process_streams(conn, tx, rx, alive, xread_options, count) {
if let Err(err) = Self::process_streams(conn, tx, rx, alive, xread_options, count, metrics) {
tracing::error!("worker {} failed: {}", worker_id, err);
}
});
Expand All @@ -142,11 +152,13 @@ impl Worker {
alive: Arc<AtomicBool>,
options: StreamReadOptions,
count: usize,
metrics: Arc<CollabStreamMetrics>,
) -> RedisResult<()> {
let mut stream_keys = Vec::with_capacity(count);
let mut message_ids = Vec::with_capacity(count);
let mut senders = HashMap::with_capacity(count);
while alive.load(SeqCst) {
// receive next `count` of stream read requests
if !Self::read_buf(&rx, &mut stream_keys, &mut message_ids, &mut senders) {
break; // rx channel has closed
}
Expand All @@ -158,10 +170,12 @@ impl Worker {
continue;
}

metrics.reads_dequeued.inc_by(key_count as u64);
let result: StreamReadReply = conn.xread_options(&stream_keys, &message_ids, &options)?;

let mut msgs = 0;
for stream in result.keys {
// for each stream returned from Redis, resolve corresponding subscriber and send messages
let mut remove_sender = false;
if let Some((sender, idx)) = senders.get(stream.key.as_str()) {
for id in stream.ids {
Expand All @@ -170,7 +184,7 @@ impl Worker {
message_ids[*idx].clone_from(&message_id); //TODO: optimize
msgs += 1;
if let Err(err) = sender.send((message_id, value)) {
tracing::warn!("failed to send: {}", err);
tracing::debug!("failed to send: {}", err);
remove_sender = true;
}
}
Expand All @@ -188,7 +202,8 @@ impl Worker {
key_count
);
}
Self::schedule_back(&tx, &mut stream_keys, &mut message_ids, &mut senders);
let scheduled = Self::schedule_back(&tx, &mut stream_keys, &mut message_ids, &mut senders);
metrics.reads_enqueued.inc_by(scheduled as u64);
}
Ok(())
}
Expand All @@ -198,21 +213,27 @@ impl Worker {
keys: &mut Vec<StreamKey>,
ids: &mut Vec<String>,
senders: &mut HashMap<&str, (StreamSender, usize)>,
) {
) -> usize {
let keys = keys.drain(..);
let mut ids = ids.drain(..);
let mut scheduled = 0;
for key in keys {
if let Some(last_id) = ids.next() {
if let Some((sender, _)) = senders.remove(key.as_str()) {
if sender.is_closed() {
continue; // sender is already closed
}
let h = StreamHandle::new(key, last_id, sender);
if let Err(err) = tx.send(h) {
tracing::warn!("failed to reschedule: {}", err);
tracing::error!("failed to reschedule: {}", err);
break;
}
scheduled += 1;
}
}
}
senders.clear();
scheduled
}

fn read_buf(
Expand Down Expand Up @@ -276,19 +297,23 @@ impl StreamHandle {

#[cfg(test)]
mod test {
use crate::stream_router::StreamRouter;
use crate::metrics::CollabStreamMetrics;
use crate::stream_router::{StreamRouter, StreamRouterOptions};
use rand::random;
use redis::{Client, Commands, FromRedisValue};
use std::sync::Arc;
use tokio::task::JoinSet;

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn multi_worker_preexisting_messages() {
const ROUTES_COUNT: usize = 200;
const MSG_PER_ROUTE: usize = 10;

let mut client = Client::open("redis://127.0.0.1/").unwrap();
let keys = init_streams(&mut client, ROUTES_COUNT, MSG_PER_ROUTE);
let metrics = Arc::new(CollabStreamMetrics::default());

let router = StreamRouter::new(&client).unwrap();
let router = StreamRouter::new(&client, metrics).unwrap();

let mut join_set = JoinSet::new();
for key in keys {
Expand All @@ -313,8 +338,9 @@ mod test {
const MSG_PER_ROUTE: usize = 10;
let mut client = Client::open("redis://127.0.0.1/").unwrap();
let keys = init_streams(&mut client, ROUTES_COUNT, 0);
let metrics = Arc::new(CollabStreamMetrics::default());

let router = StreamRouter::new(&client).unwrap();
let router = StreamRouter::new(&client, metrics).unwrap();

let mut join_set = JoinSet::new();
for key in keys.iter() {
Expand Down Expand Up @@ -348,15 +374,61 @@ mod test {
let _: String = client.xadd(&key, "*", &[("data", 1)]).unwrap();
let m2: String = client.xadd(&key, "*", &[("data", 2)]).unwrap();
let m3: String = client.xadd(&key, "*", &[("data", 3)]).unwrap();
let metrics = Arc::new(CollabStreamMetrics::default());

let router = StreamRouter::new(&client).unwrap();
let router = StreamRouter::new(&client, metrics).unwrap();
let mut observer = router.observe(key, Some(m2));

let (msg_id, m) = observer.recv().await.unwrap();
assert_eq!(msg_id, m3);
assert_eq!(u32::from_redis_value(&m["data"]).unwrap(), 3);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn drop_subscription() {
const ROUTES_COUNT: usize = 1;
const MSG_PER_ROUTE: usize = 10;

let mut client = Client::open("redis://127.0.0.1/").unwrap();
let mut keys = init_streams(&mut client, ROUTES_COUNT, MSG_PER_ROUTE);
let metrics = Arc::new(CollabStreamMetrics::default());

let router = StreamRouter::with_options(
&client,
metrics.clone(),
StreamRouterOptions {
worker_count: 2,
xread_streams: 100,
xread_block_millis: Some(50),
xread_count: None,
},
)
.unwrap();

let key = keys.pop().unwrap();
let mut observer = router.observe(key.clone(), None);
for i in 0..MSG_PER_ROUTE {
let (_msg_id, map) = observer.recv().await.unwrap();
let value = String::from_redis_value(&map["data"]).unwrap();
assert_eq!(value, format!("{}-{}", key, i));
}
// drop observer and wait for worker to release
drop(observer);
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let enqueued = metrics.reads_enqueued.get();
let dequeued = metrics.reads_dequeued.get();
assert_eq!(enqueued, dequeued, "dropped observer state");

// after dropping observer, no new polling task should be rescheduled
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
assert_eq!(metrics.reads_enqueued.get(), enqueued, "unchanged enqueues");
assert_eq!(metrics.reads_dequeued.get(), dequeued, "unchanged dequeues");

tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
assert_eq!(metrics.reads_enqueued.get(), enqueued, "unchanged enqueues");
assert_eq!(metrics.reads_dequeued.get(), dequeued, "unchanged dequeues");
}

fn init_streams(client: &mut Client, stream_count: usize, msgs_per_stream: usize) -> Vec<String> {
let test_prefix: u32 = random();
let mut keys = Vec::with_capacity(stream_count);
Expand Down
4 changes: 3 additions & 1 deletion libs/collab-stream/tests/collab_stream_test/test_util.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use anyhow::Context;
use collab_stream::client::CollabRedisStream;
use collab_stream::metrics::CollabStreamMetrics;
use rand::{thread_rng, Rng};
use std::sync::Arc;

pub async fn redis_client() -> redis::Client {
let redis_uri = "redis://localhost:6379";
Expand All @@ -11,7 +13,7 @@ pub async fn redis_client() -> redis::Client {

pub async fn stream_client() -> CollabRedisStream {
let redis_client = redis_client().await;
CollabRedisStream::new(redis_client)
CollabRedisStream::new(redis_client, Arc::new(CollabStreamMetrics::default()))
.await
.context("failed to create stream client")
.unwrap()
Expand Down
11 changes: 9 additions & 2 deletions services/appflowy-collaborate/src/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::actix_ws::server::RealtimeServerActor;
use crate::api::{collab_scope, ws_scope};
use crate::collab::access_control::CollabStorageAccessControlImpl;
use access_control::casbin::access::AccessControl;
use collab_stream::metrics::CollabStreamMetrics;
use collab_stream::stream_router::{StreamRouter, StreamRouterOptions};
use database::file::s3_client_impl::AwsS3BucketClientImpl;

Expand Down Expand Up @@ -110,8 +111,12 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result<A
let user_cache = UserCache::new(pg_pool.clone()).await;

info!("Connecting to Redis...");
let (redis_conn_manager, redis_stream_router) =
get_redis_client(config.redis_uri.expose_secret(), config.redis_worker_count).await?;
let (redis_conn_manager, redis_stream_router) = get_redis_client(
config.redis_uri.expose_secret(),
config.redis_worker_count,
metrics.collab_stream_metrics.clone(),
)
.await?;

// Pg listeners
info!("Setting up Pg listeners...");
Expand Down Expand Up @@ -189,12 +194,14 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result<A
async fn get_redis_client(
redis_uri: &str,
worker_count: usize,
metrics: Arc<CollabStreamMetrics>,
) -> Result<(redis::aio::ConnectionManager, Arc<StreamRouter>), Error> {
info!("Connecting to redis with uri: {}", redis_uri);
let client = redis::Client::open(redis_uri).context("failed to connect to redis")?;

let router = StreamRouter::with_options(
&client,
metrics,
StreamRouterOptions {
worker_count,
xread_streams: 100,
Expand Down
Loading

0 comments on commit 424796e

Please sign in to comment.