diff --git a/src/service/rooms/state/mod.rs b/src/service/rooms/state/mod.rs index 622b83252..9d702cd7d 100644 --- a/src/service/rooms/state/mod.rs +++ b/src/service/rooms/state/mod.rs @@ -27,7 +27,7 @@ use crate::{ globals, rooms, rooms::{ short::{ShortEventId, ShortStateHash}, - state_compressor::CompressedStateEvent, + state_compressor::{parse_compressed_state_event, CompressedStateEvent}, }, Dep, }; @@ -98,12 +98,12 @@ impl Service { _statediffremoved: Arc>, state_lock: &RoomMutexGuard, // Take mutex guard to make sure users get the room state mutex ) -> Result { - let event_ids = statediffnew.iter().stream().filter_map(|new| { - self.services - .state_compressor - .parse_compressed_state_event(*new) - .map_ok_or_else(|_| None, |(_, event_id)| Some(event_id)) - }); + let event_ids = statediffnew + .iter() + .stream() + .map(|&new| parse_compressed_state_event(new).1) + .then(|shorteventid| self.services.short.get_eventid_from_short(shorteventid)) + .ignore_err(); pin_mut!(event_ids); while let Some(event_id) = event_ids.next().await { @@ -428,17 +428,19 @@ impl Service { .full_state; let mut ret = HashMap::new(); - for compressed in full_state.iter() { - let Ok((shortstatekey, event_id)) = self - .services - .state_compressor - .parse_compressed_state_event(*compressed) - .await - else { + for &compressed in full_state.iter() { + let (shortstatekey, shorteventid) = parse_compressed_state_event(compressed); + + let Some((ty, state_key)) = sauthevents.remove(&shortstatekey) else { continue; }; - let Some((ty, state_key)) = sauthevents.remove(&shortstatekey) else { + let Ok(event_id) = self + .services + .short + .get_eventid_from_short(shorteventid) + .await + else { continue; }; diff --git a/src/service/rooms/state_accessor/data.rs b/src/service/rooms/state_accessor/data.rs index 06cd648cf..8df0d8b0e 100644 --- a/src/service/rooms/state_accessor/data.rs +++ b/src/service/rooms/state_accessor/data.rs @@ -1,11 +1,22 @@ use std::{collections::HashMap, sync::Arc}; -use conduit::{err, PduEvent, Result}; +use conduit::{ + err, + utils::{future::TryExtExt, IterStream}, + PduEvent, Result, +}; use database::{Deserialized, Map}; -use futures::TryFutureExt; +use futures::{StreamExt, TryFutureExt}; use ruma::{events::StateEventType, EventId, RoomId}; -use crate::{rooms, rooms::short::ShortStateHash, Dep}; +use crate::{ + rooms, + rooms::{ + short::{ShortEventId, ShortStateHash, ShortStateKey}, + state_compressor::parse_compressed_state_event, + }, + Dep, +}; pub(super) struct Data { eventid_shorteventid: Arc, @@ -35,73 +46,67 @@ impl Data { } } - #[allow(unused_qualifications)] // async traits - pub(super) async fn state_full_ids(&self, shortstatehash: ShortStateHash) -> Result>> { - let full_state = self - .services - .state_compressor - .load_shortstatehash_info(shortstatehash) - .await - .map_err(|e| err!(Database("Missing state IDs: {e}")))? - .pop() - .expect("there is always one layer") - .full_state; + pub(super) async fn state_full( + &self, shortstatehash: ShortStateHash, + ) -> Result>> { + Ok(self + .state_full_pdus(shortstatehash) + .await? + .into_iter() + .filter_map(|pdu| Some(((pdu.kind.to_string().into(), pdu.state_key.clone()?), pdu))) + .collect()) + } - let mut result = HashMap::new(); - let mut i: u8 = 0; - for compressed in full_state.iter() { - let parsed = self - .services - .state_compressor - .parse_compressed_state_event(*compressed) - .await?; - - result.insert(parsed.0, parsed.1); - - i = i.wrapping_add(1); - if i % 100 == 0 { - tokio::task::yield_now().await; - } - } + pub(super) async fn state_full_pdus(&self, shortstatehash: ShortStateHash) -> Result>> { + Ok(self + .state_full_shortids(shortstatehash) + .await? + .iter() + .stream() + .filter_map(|(_, shorteventid)| { + self.services + .short + .get_eventid_from_short(*shorteventid) + .ok() + }) + .filter_map(|eventid| async move { self.services.timeline.get_pdu(&eventid).await.ok() }) + .collect() + .await) + } - Ok(result) + pub(super) async fn state_full_ids(&self, shortstatehash: ShortStateHash) -> Result>> { + Ok(self + .state_full_shortids(shortstatehash) + .await? + .iter() + .stream() + .filter_map(|(shortstatekey, shorteventid)| { + self.services + .short + .get_eventid_from_short(*shorteventid) + .map_ok(move |eventid| (*shortstatekey, eventid)) + .ok() + }) + .collect() + .await) } - #[allow(unused_qualifications)] // async traits - pub(super) async fn state_full( + pub(super) async fn state_full_shortids( &self, shortstatehash: ShortStateHash, - ) -> Result>> { - let full_state = self + ) -> Result> { + Ok(self .services .state_compressor .load_shortstatehash_info(shortstatehash) - .await? + .await + .map_err(|e| err!(Database("Missing state IDs: {e}")))? .pop() .expect("there is always one layer") - .full_state; - - let mut result = HashMap::new(); - let mut i: u8 = 0; - for compressed in full_state.iter() { - let (_, eventid) = self - .services - .state_compressor - .parse_compressed_state_event(*compressed) - .await?; - - if let Ok(pdu) = self.services.timeline.get_pdu(&eventid).await { - if let Some(state_key) = pdu.state_key.as_ref() { - result.insert((pdu.kind.to_string().into(), state_key.clone()), pdu); - } - } - - i = i.wrapping_add(1); - if i % 100 == 0 { - tokio::task::yield_now().await; - } - } - - Ok(result) + .full_state + .iter() + .copied() + .map(parse_compressed_state_event) + .collect()) } /// Returns a single PDU from `room_id` with key (`event_type`,`state_key`). @@ -130,18 +135,11 @@ impl Data { .find(|bytes| bytes.starts_with(&shortstatekey.to_be_bytes())) .ok_or(err!(Database("No shortstatekey in compressed state")))?; + let (_, shorteventid) = parse_compressed_state_event(*compressed); + self.services - .state_compressor - .parse_compressed_state_event(*compressed) - .map_ok(|(_, id)| id) - .map_err(|e| { - err!(Database(error!( - ?event_type, - ?state_key, - ?shortstatekey, - "Failed to parse compressed: {e:?}" - ))) - }) + .short + .get_eventid_from_short(shorteventid) .await } @@ -176,6 +174,17 @@ impl Data { .await } + /// Returns the full room state's pdus. + #[allow(unused_qualifications)] // async traits + pub(super) async fn room_state_full_pdus(&self, room_id: &RoomId) -> Result>> { + self.services + .state + .get_room_shortstatehash(room_id) + .and_then(|shortstatehash| self.state_full_pdus(shortstatehash)) + .map_err(|e| err!(Database("Missing state pdus for {room_id:?}: {e:?}"))) + .await + } + /// Returns a single PDU from `room_id` with key (`event_type`,`state_key`). pub(super) async fn room_state_get_id( &self, room_id: &RoomId, event_type: &StateEventType, state_key: &str, diff --git a/src/service/rooms/state_accessor/mod.rs b/src/service/rooms/state_accessor/mod.rs index 4958c4eaf..697f7236c 100644 --- a/src/service/rooms/state_accessor/mod.rs +++ b/src/service/rooms/state_accessor/mod.rs @@ -301,6 +301,12 @@ impl Service { self.db.room_state_full(room_id).await } + /// Returns the full room state pdus + #[tracing::instrument(skip(self), level = "debug")] + pub async fn room_state_full_pdus(&self, room_id: &RoomId) -> Result>> { + self.db.room_state_full_pdus(room_id).await + } + /// Returns a single PDU from `room_id` with key (`event_type`, /// `state_key`). #[tracing::instrument(skip(self), level = "debug")] diff --git a/src/service/rooms/state_compressor/mod.rs b/src/service/rooms/state_compressor/mod.rs index f9db6f673..52ad5437f 100644 --- a/src/service/rooms/state_compressor/mod.rs +++ b/src/service/rooms/state_compressor/mod.rs @@ -17,7 +17,7 @@ use ruma::{EventId, RoomId}; use crate::{ rooms, - rooms::short::{ShortId, ShortStateHash, ShortStateKey}, + rooms::short::{ShortEventId, ShortId, ShortStateHash, ShortStateKey}, Dep, }; @@ -196,24 +196,6 @@ impl Service { .expect("failed to create CompressedStateEvent") } - /// Returns shortstatekey, event id - #[inline] - pub async fn parse_compressed_state_event( - &self, compressed_event: CompressedStateEvent, - ) -> Result<(ShortStateKey, Arc)> { - use utils::u64_from_u8; - - let shortstatekey = u64_from_u8(&compressed_event[0..size_of::()]); - let shorteventid = u64_from_u8(&compressed_event[size_of::()..]); - let event_id = self - .services - .short - .get_eventid_from_short(shorteventid) - .await?; - - Ok((shortstatekey, event_id)) - } - /// Creates a new shortstatehash that often is just a diff to an already /// existing shortstatehash and therefore very efficient. /// @@ -488,6 +470,17 @@ impl Service { } } +#[inline] +#[must_use] +pub fn parse_compressed_state_event(compressed_event: CompressedStateEvent) -> (ShortStateKey, ShortEventId) { + use utils::u64_from_u8; + + let shortstatekey = u64_from_u8(&compressed_event[0..size_of::()]); + let shorteventid = u64_from_u8(&compressed_event[size_of::()..]); + + (shortstatekey, shorteventid) +} + #[inline] fn compressed_state_size(compressed_state: &CompressedState) -> usize { compressed_state