From 8a164f5eeaf1d667c39a49b3b0421dc92ca6a215 Mon Sep 17 00:00:00 2001 From: Milosz Muszynski Date: Mon, 27 Nov 2023 21:02:34 +0100 Subject: [PATCH] Moved pushes to callbacks --- .../src/citadel_queries/citadel_inquirer.rs | 21 ++++----- moat-core/src/contract_queries/stream_aux.rs | 47 ++++--------------- 2 files changed, 18 insertions(+), 50 deletions(-) diff --git a/moat-core/src/citadel_queries/citadel_inquirer.rs b/moat-core/src/citadel_queries/citadel_inquirer.rs index 4f0dd01..3c7b390 100644 --- a/moat-core/src/citadel_queries/citadel_inquirer.rs +++ b/moat-core/src/citadel_queries/citadel_inquirer.rs @@ -106,16 +106,15 @@ impl CitadelInquirer { ) -> Result, Error> { const ITEM_LEN: usize = CitadelInquirer::GET_LICENSES_ITEM_LEN; let mut pairs = vec![]; - let v = StreamAux::find_items::<(u64, Vec), ITEM_LEN>( - |(_, lic_vec)| { + StreamAux::find_items::<(u64, Vec), ITEM_LEN>( + |(pos, lic_vec)| { let license = Self::deserialise_license(lic_vec); - Ok(ssk_user.view_key().owns(&license.lsa)) + if ssk_user.view_key().owns(&license.lsa) { + pairs.push((*pos, license)); + }; }, stream, )?; - for (pos, lic_ser) in v.iter() { - pairs.push((*pos, Self::deserialise_license(lic_ser))) - } Ok(pairs) } @@ -127,13 +126,13 @@ impl CitadelInquirer { ) -> Result, Error> { const ITEM_LEN: usize = CitadelInquirer::GET_LICENSES_ITEM_LEN; let mut pairs = vec![]; - let v = StreamAux::find_items::<(u64, Vec), ITEM_LEN>( - |_| Ok(true), + StreamAux::find_items::<(u64, Vec), ITEM_LEN>( + |(pos, lic_vec)| { + let license = Self::deserialise_license(lic_vec); + pairs.push((*pos, license)); + }, stream, )?; - for (pos, lic_ser) in v.iter() { - pairs.push((*pos, Self::deserialise_license(lic_ser))) - } Ok(pairs) } } diff --git a/moat-core/src/contract_queries/stream_aux.rs b/moat-core/src/contract_queries/stream_aux.rs index 0bf7f7e..cdef03f 100644 --- a/moat-core/src/contract_queries/stream_aux.rs +++ b/moat-core/src/contract_queries/stream_aux.rs @@ -19,10 +19,10 @@ impl StreamAux { /// Finds and returns items for which /// the given filter returns true, pub fn find_items( - filter: impl Fn(&R) -> Result, + mut filter_collect: impl FnMut(&R), stream: &mut (impl futures_core::Stream> + std::marker::Unpin), - ) -> Result, Error> + ) -> Result<(), Error> where R: Archive + Clone, R::Archived: Deserialize @@ -30,53 +30,22 @@ impl StreamAux { + Deserialize, { let mut remainder = Vec::::new(); - let mut items = vec![]; - loop { - let (v, stop) = - Self::do_find_items::(&filter, stream, &mut remainder)?; - items.extend_from_slice(v.as_slice()); - if stop { - break; - } - } - Ok(items) - } - - fn do_find_items( - filter: &impl Fn(&R) -> Result, - stream: &mut (impl futures_core::Stream> - + std::marker::Unpin), - remainder: &mut Vec, - ) -> Result<(Vec, bool), Error> - where - R: Archive, - R::Archived: Deserialize - + for<'b> CheckBytes> - + Deserialize, - { - let mut output = vec![]; - let mut stream_finished = false; - if let Some(http_chunk) = stream.next().wait() { - let mut buffer = remainder.clone(); + while let Some(chunk) = stream.next().wait() { + let mut buffer = vec![]; + buffer.append(&mut remainder); buffer.extend_from_slice( - &http_chunk - .map_err(|_| Error::Stream("chunking error".into()))?, + &chunk.map_err(|_| Error::Stream("chunking error".into()))?, ); let mut iter = buffer.chunks_exact(L); for bytes in iter.by_ref() { let item: R = rkyv::from_bytes(bytes).map_err(|_| { Error::Stream("deserialization error".into()) })?; - if filter(&item)? { - output.push(item); - } + filter_collect(&item); } - remainder.clear(); remainder.extend_from_slice(iter.remainder()); - } else { - stream_finished = true; } - Ok((output, stream_finished)) + Ok(()) } /// Collects all items and returns them in a vector,