From 7cd48e28de8084e719893511561cc0f16518a10b Mon Sep 17 00:00:00 2001 From: Jade Ellis Date: Wed, 18 Dec 2024 03:04:39 +0000 Subject: [PATCH] Send read reciept and typing indicator EDUs to appservices with receive_ephemeral --- Cargo.lock | 26 ++++++------- Cargo.toml | 4 +- src/api/client/read_marker.rs | 14 +++---- src/api/server/send.rs | 2 +- src/service/rooms/read_receipt/mod.rs | 18 +++++++-- src/service/rooms/typing/mod.rs | 54 +++++++++++++++++++++++---- src/service/sending/mod.rs | 48 +++++++++++++++++++++++- src/service/sending/sender.rs | 15 +++----- 8 files changed, 135 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c86904d7a..2c0ae75cd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3162,7 +3162,7 @@ dependencies = [ [[package]] name = "ruma" version = "0.10.1" -source = "git+https://github.com/girlbossceo/ruwuma?rev=a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2#a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2" +source = "git+https://github.com/girlbossceo/ruwuma?rev=112ccc24cb14de26757715d611285d0806d5d91f#112ccc24cb14de26757715d611285d0806d5d91f" dependencies = [ "assign", "js_int", @@ -3184,7 +3184,7 @@ dependencies = [ [[package]] name = "ruma-appservice-api" version = "0.10.0" -source = "git+https://github.com/girlbossceo/ruwuma?rev=a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2#a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2" +source = "git+https://github.com/girlbossceo/ruwuma?rev=112ccc24cb14de26757715d611285d0806d5d91f#112ccc24cb14de26757715d611285d0806d5d91f" dependencies = [ "js_int", "ruma-common", @@ -3196,7 +3196,7 @@ dependencies = [ [[package]] name = "ruma-client-api" version = "0.18.0" -source = "git+https://github.com/girlbossceo/ruwuma?rev=a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2#a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2" +source = "git+https://github.com/girlbossceo/ruwuma?rev=112ccc24cb14de26757715d611285d0806d5d91f#112ccc24cb14de26757715d611285d0806d5d91f" dependencies = [ "as_variant", "assign", @@ -3219,7 +3219,7 @@ dependencies = [ [[package]] name = "ruma-common" version = "0.13.0" -source = "git+https://github.com/girlbossceo/ruwuma?rev=a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2#a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2" +source = "git+https://github.com/girlbossceo/ruwuma?rev=112ccc24cb14de26757715d611285d0806d5d91f#112ccc24cb14de26757715d611285d0806d5d91f" dependencies = [ "as_variant", "base64 0.22.1", @@ -3249,7 +3249,7 @@ dependencies = [ [[package]] name = "ruma-events" version = "0.28.1" -source = "git+https://github.com/girlbossceo/ruwuma?rev=a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2#a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2" +source = "git+https://github.com/girlbossceo/ruwuma?rev=112ccc24cb14de26757715d611285d0806d5d91f#112ccc24cb14de26757715d611285d0806d5d91f" dependencies = [ "as_variant", "indexmap 2.7.0", @@ -3273,7 +3273,7 @@ dependencies = [ [[package]] name = "ruma-federation-api" version = "0.9.0" -source = "git+https://github.com/girlbossceo/ruwuma?rev=a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2#a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2" +source = "git+https://github.com/girlbossceo/ruwuma?rev=112ccc24cb14de26757715d611285d0806d5d91f#112ccc24cb14de26757715d611285d0806d5d91f" dependencies = [ "bytes", "http", @@ -3291,7 +3291,7 @@ dependencies = [ [[package]] name = "ruma-identifiers-validation" version = "0.9.5" -source = "git+https://github.com/girlbossceo/ruwuma?rev=a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2#a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2" +source = "git+https://github.com/girlbossceo/ruwuma?rev=112ccc24cb14de26757715d611285d0806d5d91f#112ccc24cb14de26757715d611285d0806d5d91f" dependencies = [ "js_int", "thiserror 2.0.7", @@ -3300,7 +3300,7 @@ dependencies = [ [[package]] name = "ruma-identity-service-api" version = "0.9.0" -source = "git+https://github.com/girlbossceo/ruwuma?rev=a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2#a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2" +source = "git+https://github.com/girlbossceo/ruwuma?rev=112ccc24cb14de26757715d611285d0806d5d91f#112ccc24cb14de26757715d611285d0806d5d91f" dependencies = [ "js_int", "ruma-common", @@ -3310,7 +3310,7 @@ dependencies = [ [[package]] name = "ruma-macros" version = "0.13.0" -source = "git+https://github.com/girlbossceo/ruwuma?rev=a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2#a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2" +source = "git+https://github.com/girlbossceo/ruwuma?rev=112ccc24cb14de26757715d611285d0806d5d91f#112ccc24cb14de26757715d611285d0806d5d91f" dependencies = [ "cfg-if", "proc-macro-crate", @@ -3325,7 +3325,7 @@ dependencies = [ [[package]] name = "ruma-push-gateway-api" version = "0.9.0" -source = "git+https://github.com/girlbossceo/ruwuma?rev=a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2#a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2" +source = "git+https://github.com/girlbossceo/ruwuma?rev=112ccc24cb14de26757715d611285d0806d5d91f#112ccc24cb14de26757715d611285d0806d5d91f" dependencies = [ "js_int", "ruma-common", @@ -3337,7 +3337,7 @@ dependencies = [ [[package]] name = "ruma-server-util" version = "0.3.0" -source = "git+https://github.com/girlbossceo/ruwuma?rev=a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2#a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2" +source = "git+https://github.com/girlbossceo/ruwuma?rev=112ccc24cb14de26757715d611285d0806d5d91f#112ccc24cb14de26757715d611285d0806d5d91f" dependencies = [ "headers", "http", @@ -3350,7 +3350,7 @@ dependencies = [ [[package]] name = "ruma-signatures" version = "0.15.0" -source = "git+https://github.com/girlbossceo/ruwuma?rev=a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2#a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2" +source = "git+https://github.com/girlbossceo/ruwuma?rev=112ccc24cb14de26757715d611285d0806d5d91f#112ccc24cb14de26757715d611285d0806d5d91f" dependencies = [ "base64 0.22.1", "ed25519-dalek", @@ -3366,7 +3366,7 @@ dependencies = [ [[package]] name = "ruma-state-res" version = "0.11.0" -source = "git+https://github.com/girlbossceo/ruwuma?rev=a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2#a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2" +source = "git+https://github.com/girlbossceo/ruwuma?rev=112ccc24cb14de26757715d611285d0806d5d91f#112ccc24cb14de26757715d611285d0806d5d91f" dependencies = [ "futures-util", "js_int", diff --git a/Cargo.toml b/Cargo.toml index cb2ab9166..38d6d7291 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -334,7 +334,7 @@ version = "0.1.2" [workspace.dependencies.ruma] git = "https://github.com/girlbossceo/ruwuma" #branch = "conduwuit-changes" -rev = "a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2" +rev = "112ccc24cb14de26757715d611285d0806d5d91f" features = [ "compat", "rand", @@ -350,7 +350,6 @@ features = [ "compat-upload-signatures", "identifiers-validation", "unstable-unspecified", - "unstable-msc2409", "unstable-msc2448", "unstable-msc2666", "unstable-msc2867", @@ -366,6 +365,7 @@ features = [ "unstable-msc4121", "unstable-msc4125", "unstable-msc4186", + "unstable-msc4203", # sending to-device events to appservices "unstable-msc4210", # remove legacy mentions "unstable-extensible-events", ] diff --git a/src/api/client/read_marker.rs b/src/api/client/read_marker.rs index 89fe003a5..ab7cc6ad9 100644 --- a/src/api/client/read_marker.rs +++ b/src/api/client/read_marker.rs @@ -72,14 +72,10 @@ pub(crate) async fn set_read_marker_route( services .rooms .read_receipt - .readreceipt_update( - sender_user, - &body.room_id, - &ruma::events::receipt::ReceiptEvent { - content: ruma::events::receipt::ReceiptEventContent(receipt_content), - room_id: body.room_id.clone(), - }, - ) + .readreceipt_update(sender_user, &body.room_id, ruma::events::receipt::ReceiptEvent { + content: ruma::events::receipt::ReceiptEventContent(receipt_content), + room_id: body.room_id.clone(), + }) .await; } @@ -171,7 +167,7 @@ pub(crate) async fn create_receipt_route( .readreceipt_update( sender_user, &body.room_id, - &ruma::events::receipt::ReceiptEvent { + ruma::events::receipt::ReceiptEvent { content: ruma::events::receipt::ReceiptEventContent(receipt_content), room_id: body.room_id.clone(), }, diff --git a/src/api/server/send.rs b/src/api/server/send.rs index c5fc7118a..db6fd7487 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -275,7 +275,7 @@ async fn handle_edu_receipt( services .rooms .read_receipt - .readreceipt_update(&user_id, &room_id, &event) + .readreceipt_update(&user_id, &room_id, event) .await; } } else { diff --git a/src/service/rooms/read_receipt/mod.rs b/src/service/rooms/read_receipt/mod.rs index 53e64957d..4075c4475 100644 --- a/src/service/rooms/read_receipt/mod.rs +++ b/src/service/rooms/read_receipt/mod.rs @@ -2,9 +2,10 @@ mod data; use std::{collections::BTreeMap, sync::Arc}; -use conduwuit::{debug, err, warn, PduCount, PduId, RawPduId, Result}; +use conduwuit::{debug, err, result::LogErr, warn, PduCount, PduId, RawPduId, Result}; use futures::{try_join, Stream, TryFutureExt}; use ruma::{ + api::appservice::event::push_events::v1::EphemeralData, events::{ receipt::{ReceiptEvent, ReceiptEventContent, Receipts}, AnySyncEphemeralRoomEvent, SyncEphemeralRoomEvent, @@ -48,14 +49,25 @@ impl Service { &self, user_id: &UserId, room_id: &RoomId, - event: &ReceiptEvent, + event: ReceiptEvent, ) { - self.db.readreceipt_update(user_id, room_id, event).await; + self.db.readreceipt_update(user_id, room_id, &event).await; self.services .sending .flush_room(room_id) .await .expect("room flush failed"); + // update appservices + let edu = EphemeralData::Receipt(event); + let _ = self + .services + .sending + .send_edu_appservice_room( + room_id, + serde_json::to_vec(&edu).expect("Serialized EphemeralData::Receipt"), + ) + .await + .log_err(); } /// Gets the latest private read receipt from the user in the room diff --git a/src/service/rooms/typing/mod.rs b/src/service/rooms/typing/mod.rs index a61233220..31ea40aeb 100644 --- a/src/service/rooms/typing/mod.rs +++ b/src/service/rooms/typing/mod.rs @@ -7,8 +7,11 @@ use conduwuit::{ }; use futures::StreamExt; use ruma::{ - api::federation::transactions::edu::{Edu, TypingContent}, - events::SyncEphemeralRoomEvent, + api::{ + appservice::event::push_events::v1::EphemeralData, + federation::transactions::edu::{Edu, TypingContent}, + }, + events::{typing::TypingEventContent, EphemeralRoomEvent, SyncEphemeralRoomEvent}, OwnedRoomId, OwnedUserId, RoomId, UserId, }; use tokio::sync::{broadcast, RwLock}; @@ -76,6 +79,9 @@ impl Service { trace!("receiver found what it was looking for and is no longer interested"); } + // update appservices + self.appservice_send(room_id).await?; + // update federation if self.services.globals.user_is_local(user_id) { self.federation_send(room_id, user_id, true).await?; @@ -103,7 +109,8 @@ impl Service { if self.typing_update_sender.send(room_id.to_owned()).is_err() { trace!("receiver found what it was looking for and is no longer interested"); } - + // update appservices + self.appservice_send(room_id).await?; // update federation if self.services.globals.user_is_local(user_id) { self.federation_send(room_id, user_id, false).await?; @@ -157,6 +164,9 @@ impl Service { trace!("receiver found what it was looking for and is no longer interested"); } + // update appservices + self.appservice_send(room_id).await?; + // update federation for user in &removable { if self.services.globals.user_is_local(user) { @@ -180,17 +190,30 @@ impl Service { .unwrap_or(0)) } + /// Returns a new typing EDU. + pub async fn typings_content(&self, room_id: &RoomId) -> Result { + let room_typing_indicators = self.typing.read().await.get(room_id).cloned(); + + let Some(typing_indicators) = room_typing_indicators else { + return Ok(TypingEventContent { user_ids: Vec::new() }); + }; + + let user_ids: Vec<_> = typing_indicators.into_keys().collect(); + + Ok(TypingEventContent { user_ids }) + } + /// Returns a new typing EDU. pub async fn typings_all( &self, room_id: &RoomId, sender_user: &UserId, - ) -> Result> { + ) -> Result> { let room_typing_indicators = self.typing.read().await.get(room_id).cloned(); let Some(typing_indicators) = room_typing_indicators else { return Ok(SyncEphemeralRoomEvent { - content: ruma::events::typing::TypingEventContent { user_ids: Vec::new() }, + content: TypingEventContent { user_ids: Vec::new() }, }); }; @@ -208,9 +231,7 @@ impl Service { .collect() .await; - Ok(SyncEphemeralRoomEvent { - content: ruma::events::typing::TypingEventContent { user_ids }, - }) + Ok(SyncEphemeralRoomEvent { content: TypingEventContent { user_ids } }) } async fn federation_send( @@ -237,4 +258,21 @@ impl Service { Ok(()) } + + async fn appservice_send(&self, room_id: &RoomId) -> Result<()> { + let edu = EphemeralData::Typing(EphemeralRoomEvent { + content: self.typings_content(room_id).await?, + room_id: room_id.into(), + }); + + self.services + .sending + .send_edu_appservice_room( + room_id, + serde_json::to_vec(&edu).expect("Serialized EphemeralData::Typing"), + ) + .await?; + + Ok(()) + } } diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index 2038f4eb5..2b5710340 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -25,7 +25,10 @@ pub use self::{ sender::{EDU_LIMIT, PDU_LIMIT}, }; use crate::{ - account_data, client, globals, presence, pusher, resolver, rooms, rooms::timeline::RawPduId, + account_data, + appservice::NamespaceRegex, + client, globals, presence, pusher, resolver, + rooms::{self, timeline::RawPduId}, server_keys, users, Dep, }; @@ -38,6 +41,7 @@ pub struct Service { } struct Services { + alias: Dep, client: Dep, globals: Dep, resolver: Dep, @@ -76,6 +80,7 @@ impl crate::Service for Service { Ok(Arc::new(Self { server: args.server.clone(), services: Services { + alias: args.depend::("rooms::alias"), client: args.depend::("client"), globals: args.depend::("globals"), resolver: args.depend::("resolver"), @@ -184,6 +189,47 @@ impl Service { }) } + #[tracing::instrument(skip(self, serialized), level = "debug")] + pub fn send_edu_appservice(&self, appservice_id: String, serialized: Vec) -> Result { + let dest = Destination::Appservice(appservice_id); + let event = SendingEvent::Edu(serialized); + let _cork = self.db.db.cork(); + let keys = self.db.queue_requests(once((&event, &dest))); + self.dispatch(Msg { + dest, + event, + queue_id: keys.into_iter().next().expect("request queue key"), + }) + } + + #[tracing::instrument(skip(self, room_id, serialized), level = "debug")] + pub async fn send_edu_appservice_room( + &self, + room_id: &RoomId, + serialized: Vec, + ) -> Result<()> { + for appservice in self.services.appservice.read().await.values() { + let matching_aliases = |aliases: NamespaceRegex| { + self.services + .alias + .local_aliases_for_room(room_id) + .ready_any(move |room_alias| aliases.is_match(room_alias.as_str())) + }; + + if appservice.rooms.is_match(room_id.as_str()) + || matching_aliases(appservice.aliases.clone()).await + || self + .services + .state_cache + .appservice_in_room(room_id, appservice) + .await + { + self.send_edu_appservice(appservice.registration.id.clone(), serialized.clone())?; + } + } + Ok(()) + } + #[tracing::instrument(skip(self, room_id, serialized), level = "debug")] pub async fn send_edu_room(&self, room_id: &RoomId, serialized: Vec) -> Result<()> { let servers = self diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index 1f462f396..1589101b5 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -1,3 +1,4 @@ +use core::str; use std::{ collections::{BTreeMap, HashMap, HashSet}, fmt::Debug, @@ -21,7 +22,7 @@ use futures::{ }; use ruma::{ api::{ - appservice::event::push_events::v1::Edu as RumaEdu, + appservice::event::push_events::v1::EphemeralData, federation::transactions::{ edu::{ DeviceListUpdateContent, Edu, PresenceContent, PresenceUpdate, ReceiptContent, @@ -587,7 +588,7 @@ impl Service { .filter(|event| matches!(event, SendingEvent::Pdu(_))) .count(), ); - let mut edu_jsons: Vec = Vec::with_capacity( + let mut edu_jsons: Vec = Vec::with_capacity( events .iter() .filter(|event| matches!(event, SendingEvent::Edu(_))) @@ -600,16 +601,12 @@ impl Service { pdu_jsons.push(pdu.to_room_event()); } }, - | SendingEvent::Edu(edu) => { - if appservice - .receive_ephemeral - .is_some_and(|receive_edus| receive_edus) - { + | SendingEvent::Edu(edu) => + if appservice.receive_ephemeral { if let Ok(edu) = serde_json::from_slice(edu) { edu_jsons.push(edu); } - } - }, + }, | SendingEvent::Flush => {}, // flush only; no new content } }