diff --git a/moat-core/src/citadel_queries/citadel_inquirer.rs b/moat-core/src/citadel_queries/citadel_inquirer.rs index cc2b384..3c7b390 100644 --- a/moat-core/src/citadel_queries/citadel_inquirer.rs +++ b/moat-core/src/citadel_queries/citadel_inquirer.rs @@ -106,20 +106,15 @@ impl CitadelInquirer { ) -> Result, Error> { const ITEM_LEN: usize = CitadelInquirer::GET_LICENSES_ITEM_LEN; let mut pairs = vec![]; - loop { - let r = StreamAux::find_item::<(u64, Vec), ITEM_LEN>( - |(_, lic_vec)| { - let license = Self::deserialise_license(lic_vec); - Ok(ssk_user.view_key().owns(&license.lsa)) - }, - stream, - ); - if r.is_err() { - break; - } - let (pos, lic_ser) = r?; - pairs.push((pos, Self::deserialise_license(&lic_ser))) - } + StreamAux::find_items::<(u64, Vec), ITEM_LEN>( + |(pos, lic_vec)| { + let license = Self::deserialise_license(lic_vec); + if ssk_user.view_key().owns(&license.lsa) { + pairs.push((*pos, license)); + }; + }, + stream, + )?; Ok(pairs) } @@ -131,17 +126,13 @@ impl CitadelInquirer { ) -> Result, Error> { const ITEM_LEN: usize = CitadelInquirer::GET_LICENSES_ITEM_LEN; let mut pairs = vec![]; - loop { - let r = StreamAux::find_item::<(u64, Vec), ITEM_LEN>( - |_| Ok(true), - stream, - ); - if r.is_err() { - break; - } - let (pos, lic_ser) = r?; - pairs.push((pos, Self::deserialise_license(&lic_ser))) - } + StreamAux::find_items::<(u64, Vec), ITEM_LEN>( + |(pos, lic_vec)| { + let license = Self::deserialise_license(lic_vec); + pairs.push((*pos, license)); + }, + stream, + )?; Ok(pairs) } } diff --git a/moat-core/src/contract_queries/stream_aux.rs b/moat-core/src/contract_queries/stream_aux.rs index a55bbbc..cdef03f 100644 --- a/moat-core/src/contract_queries/stream_aux.rs +++ b/moat-core/src/contract_queries/stream_aux.rs @@ -16,38 +16,36 @@ use rkyv::{Archive, Deserialize, Infallible}; pub struct StreamAux; impl StreamAux { - /// Finds and returns first item for which + /// Finds and returns items for which /// the given filter returns true, - /// returns error if no such item was found - pub fn find_item( - filter: impl Fn(&R) -> Result, + pub fn find_items( + mut filter_collect: impl FnMut(&R), stream: &mut (impl futures_core::Stream> + std::marker::Unpin), - ) -> Result + ) -> Result<(), Error> where - R: Archive, + R: Archive + Clone, R::Archived: Deserialize + for<'b> CheckBytes> + Deserialize, { - let mut buffer = vec![]; - while let Some(http_chunk) = stream.next().wait() { + let mut remainder = Vec::::new(); + while let Some(chunk) = stream.next().wait() { + let mut buffer = vec![]; + buffer.append(&mut remainder); buffer.extend_from_slice( - &http_chunk - .map_err(|_| Error::Stream("chunking error".into()))?, + &chunk.map_err(|_| Error::Stream("chunking error".into()))?, ); - let mut chunk = buffer.chunks_exact(L); - for bytes in chunk.by_ref() { + let mut iter = buffer.chunks_exact(L); + for bytes in iter.by_ref() { let item: R = rkyv::from_bytes(bytes).map_err(|_| { Error::Stream("deserialization error".into()) })?; - if filter(&item)? { - return Ok(item); - } + filter_collect(&item); } - buffer = chunk.remainder().to_vec(); + remainder.extend_from_slice(iter.remainder()); } - Err(Error::Stream("item not found".into())) + Ok(()) } /// Collects all items and returns them in a vector,