From cf7dd10b8a7d95c047bf9a7cb2af6f23477b843e Mon Sep 17 00:00:00 2001 From: Milosz Muszynski Date: Fri, 24 Nov 2023 17:06:55 +0100 Subject: [PATCH 1/3] Fixed multi-item chunk problem --- .../src/citadel_queries/citadel_inquirer.rs | 22 ++++++++++--------- moat-core/src/contract_queries/stream_aux.rs | 18 +++++++-------- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/moat-core/src/citadel_queries/citadel_inquirer.rs b/moat-core/src/citadel_queries/citadel_inquirer.rs index cc2b384..2d58ad2 100644 --- a/moat-core/src/citadel_queries/citadel_inquirer.rs +++ b/moat-core/src/citadel_queries/citadel_inquirer.rs @@ -107,18 +107,19 @@ impl CitadelInquirer { const ITEM_LEN: usize = CitadelInquirer::GET_LICENSES_ITEM_LEN; let mut pairs = vec![]; loop { - let r = StreamAux::find_item::<(u64, Vec), ITEM_LEN>( + let v = StreamAux::find_items::<(u64, Vec), ITEM_LEN>( |(_, lic_vec)| { let license = Self::deserialise_license(lic_vec); Ok(ssk_user.view_key().owns(&license.lsa)) }, stream, - ); - if r.is_err() { + )?; + if v.is_empty() { break; } - let (pos, lic_ser) = r?; - pairs.push((pos, Self::deserialise_license(&lic_ser))) + for (pos, lic_ser) in v.iter() { + pairs.push((*pos, Self::deserialise_license(lic_ser))) + } } Ok(pairs) } @@ -132,15 +133,16 @@ impl CitadelInquirer { const ITEM_LEN: usize = CitadelInquirer::GET_LICENSES_ITEM_LEN; let mut pairs = vec![]; loop { - let r = StreamAux::find_item::<(u64, Vec), ITEM_LEN>( + let v = StreamAux::find_items::<(u64, Vec), ITEM_LEN>( |_| Ok(true), stream, - ); - if r.is_err() { + )?; + if v.is_empty() { break; } - let (pos, lic_ser) = r?; - pairs.push((pos, Self::deserialise_license(&lic_ser))) + for (pos, lic_ser) in v.iter() { + pairs.push((*pos, Self::deserialise_license(lic_ser))) + } } Ok(pairs) } diff --git a/moat-core/src/contract_queries/stream_aux.rs b/moat-core/src/contract_queries/stream_aux.rs index a55bbbc..69b9795 100644 --- a/moat-core/src/contract_queries/stream_aux.rs +++ b/moat-core/src/contract_queries/stream_aux.rs @@ -16,38 +16,36 @@ use rkyv::{Archive, Deserialize, Infallible}; pub struct StreamAux; impl StreamAux { - /// Finds and returns first item for which + /// Finds and returns items for which /// the given filter returns true, - /// returns error if no such item was found - pub fn find_item( + pub fn find_items( filter: impl Fn(&R) -> Result, stream: &mut (impl futures_core::Stream> + std::marker::Unpin), - ) -> Result + ) -> Result, Error> where R: Archive, R::Archived: Deserialize + for<'b> CheckBytes> + Deserialize, { + let mut output = vec![]; let mut buffer = vec![]; - while let Some(http_chunk) = stream.next().wait() { + if let Some(http_chunk) = stream.next().wait() { buffer.extend_from_slice( &http_chunk .map_err(|_| Error::Stream("chunking error".into()))?, ); - let mut chunk = buffer.chunks_exact(L); - for bytes in chunk.by_ref() { + for bytes in buffer.chunks_exact(L) { let item: R = rkyv::from_bytes(bytes).map_err(|_| { Error::Stream("deserialization error".into()) })?; if filter(&item)? { - return Ok(item); + output.push(item); } } - buffer = chunk.remainder().to_vec(); } - Err(Error::Stream("item not found".into())) + Ok(output) } /// Collects all items and returns them in a vector, From 34ed15b53f28c6b5faa139470daa0daed8c7ddef Mon Sep 17 00:00:00 2001 From: Milosz Muszynski Date: Mon, 27 Nov 2023 10:14:23 +0100 Subject: [PATCH 2/3] Fixed remainder processing --- .../src/citadel_queries/citadel_inquirer.rs | 40 +++++++------------ moat-core/src/contract_queries/stream_aux.rs | 37 +++++++++++++++-- 2 files changed, 49 insertions(+), 28 deletions(-) diff --git a/moat-core/src/citadel_queries/citadel_inquirer.rs b/moat-core/src/citadel_queries/citadel_inquirer.rs index 2d58ad2..4f0dd01 100644 --- a/moat-core/src/citadel_queries/citadel_inquirer.rs +++ b/moat-core/src/citadel_queries/citadel_inquirer.rs @@ -106,20 +106,15 @@ impl CitadelInquirer { ) -> Result, Error> { const ITEM_LEN: usize = CitadelInquirer::GET_LICENSES_ITEM_LEN; let mut pairs = vec![]; - loop { - let v = StreamAux::find_items::<(u64, Vec), ITEM_LEN>( - |(_, lic_vec)| { - let license = Self::deserialise_license(lic_vec); - Ok(ssk_user.view_key().owns(&license.lsa)) - }, - stream, - )?; - if v.is_empty() { - break; - } - for (pos, lic_ser) in v.iter() { - pairs.push((*pos, Self::deserialise_license(lic_ser))) - } + let v = StreamAux::find_items::<(u64, Vec), ITEM_LEN>( + |(_, lic_vec)| { + let license = Self::deserialise_license(lic_vec); + Ok(ssk_user.view_key().owns(&license.lsa)) + }, + stream, + )?; + for (pos, lic_ser) in v.iter() { + pairs.push((*pos, Self::deserialise_license(lic_ser))) } Ok(pairs) } @@ -132,17 +127,12 @@ impl CitadelInquirer { ) -> Result, Error> { const ITEM_LEN: usize = CitadelInquirer::GET_LICENSES_ITEM_LEN; let mut pairs = vec![]; - loop { - let v = StreamAux::find_items::<(u64, Vec), ITEM_LEN>( - |_| Ok(true), - stream, - )?; - if v.is_empty() { - break; - } - for (pos, lic_ser) in v.iter() { - pairs.push((*pos, Self::deserialise_license(lic_ser))) - } + let v = StreamAux::find_items::<(u64, Vec), ITEM_LEN>( + |_| Ok(true), + stream, + )?; + for (pos, lic_ser) in v.iter() { + pairs.push((*pos, Self::deserialise_license(lic_ser))) } Ok(pairs) } diff --git a/moat-core/src/contract_queries/stream_aux.rs b/moat-core/src/contract_queries/stream_aux.rs index 69b9795..0bf7f7e 100644 --- a/moat-core/src/contract_queries/stream_aux.rs +++ b/moat-core/src/contract_queries/stream_aux.rs @@ -23,6 +23,31 @@ impl StreamAux { stream: &mut (impl futures_core::Stream> + std::marker::Unpin), ) -> Result, Error> + where + R: Archive + Clone, + R::Archived: Deserialize + + for<'b> CheckBytes> + + Deserialize, + { + let mut remainder = Vec::::new(); + let mut items = vec![]; + loop { + let (v, stop) = + Self::do_find_items::(&filter, stream, &mut remainder)?; + items.extend_from_slice(v.as_slice()); + if stop { + break; + } + } + Ok(items) + } + + fn do_find_items( + filter: &impl Fn(&R) -> Result, + stream: &mut (impl futures_core::Stream> + + std::marker::Unpin), + remainder: &mut Vec, + ) -> Result<(Vec, bool), Error> where R: Archive, R::Archived: Deserialize @@ -30,13 +55,15 @@ impl StreamAux { + Deserialize, { let mut output = vec![]; - let mut buffer = vec![]; + let mut stream_finished = false; if let Some(http_chunk) = stream.next().wait() { + let mut buffer = remainder.clone(); buffer.extend_from_slice( &http_chunk .map_err(|_| Error::Stream("chunking error".into()))?, ); - for bytes in buffer.chunks_exact(L) { + let mut iter = buffer.chunks_exact(L); + for bytes in iter.by_ref() { let item: R = rkyv::from_bytes(bytes).map_err(|_| { Error::Stream("deserialization error".into()) })?; @@ -44,8 +71,12 @@ impl StreamAux { output.push(item); } } + remainder.clear(); + remainder.extend_from_slice(iter.remainder()); + } else { + stream_finished = true; } - Ok(output) + Ok((output, stream_finished)) } /// Collects all items and returns them in a vector, From 8a164f5eeaf1d667c39a49b3b0421dc92ca6a215 Mon Sep 17 00:00:00 2001 From: Milosz Muszynski Date: Mon, 27 Nov 2023 21:02:34 +0100 Subject: [PATCH 3/3] Moved pushes to callbacks --- .../src/citadel_queries/citadel_inquirer.rs | 21 ++++----- moat-core/src/contract_queries/stream_aux.rs | 47 ++++--------------- 2 files changed, 18 insertions(+), 50 deletions(-) diff --git a/moat-core/src/citadel_queries/citadel_inquirer.rs b/moat-core/src/citadel_queries/citadel_inquirer.rs index 4f0dd01..3c7b390 100644 --- a/moat-core/src/citadel_queries/citadel_inquirer.rs +++ b/moat-core/src/citadel_queries/citadel_inquirer.rs @@ -106,16 +106,15 @@ impl CitadelInquirer { ) -> Result, Error> { const ITEM_LEN: usize = CitadelInquirer::GET_LICENSES_ITEM_LEN; let mut pairs = vec![]; - let v = StreamAux::find_items::<(u64, Vec), ITEM_LEN>( - |(_, lic_vec)| { + StreamAux::find_items::<(u64, Vec), ITEM_LEN>( + |(pos, lic_vec)| { let license = Self::deserialise_license(lic_vec); - Ok(ssk_user.view_key().owns(&license.lsa)) + if ssk_user.view_key().owns(&license.lsa) { + pairs.push((*pos, license)); + }; }, stream, )?; - for (pos, lic_ser) in v.iter() { - pairs.push((*pos, Self::deserialise_license(lic_ser))) - } Ok(pairs) } @@ -127,13 +126,13 @@ impl CitadelInquirer { ) -> Result, Error> { const ITEM_LEN: usize = CitadelInquirer::GET_LICENSES_ITEM_LEN; let mut pairs = vec![]; - let v = StreamAux::find_items::<(u64, Vec), ITEM_LEN>( - |_| Ok(true), + StreamAux::find_items::<(u64, Vec), ITEM_LEN>( + |(pos, lic_vec)| { + let license = Self::deserialise_license(lic_vec); + pairs.push((*pos, license)); + }, stream, )?; - for (pos, lic_ser) in v.iter() { - pairs.push((*pos, Self::deserialise_license(lic_ser))) - } Ok(pairs) } } diff --git a/moat-core/src/contract_queries/stream_aux.rs b/moat-core/src/contract_queries/stream_aux.rs index 0bf7f7e..cdef03f 100644 --- a/moat-core/src/contract_queries/stream_aux.rs +++ b/moat-core/src/contract_queries/stream_aux.rs @@ -19,10 +19,10 @@ impl StreamAux { /// Finds and returns items for which /// the given filter returns true, pub fn find_items( - filter: impl Fn(&R) -> Result, + mut filter_collect: impl FnMut(&R), stream: &mut (impl futures_core::Stream> + std::marker::Unpin), - ) -> Result, Error> + ) -> Result<(), Error> where R: Archive + Clone, R::Archived: Deserialize @@ -30,53 +30,22 @@ impl StreamAux { + Deserialize, { let mut remainder = Vec::::new(); - let mut items = vec![]; - loop { - let (v, stop) = - Self::do_find_items::(&filter, stream, &mut remainder)?; - items.extend_from_slice(v.as_slice()); - if stop { - break; - } - } - Ok(items) - } - - fn do_find_items( - filter: &impl Fn(&R) -> Result, - stream: &mut (impl futures_core::Stream> - + std::marker::Unpin), - remainder: &mut Vec, - ) -> Result<(Vec, bool), Error> - where - R: Archive, - R::Archived: Deserialize - + for<'b> CheckBytes> - + Deserialize, - { - let mut output = vec![]; - let mut stream_finished = false; - if let Some(http_chunk) = stream.next().wait() { - let mut buffer = remainder.clone(); + while let Some(chunk) = stream.next().wait() { + let mut buffer = vec![]; + buffer.append(&mut remainder); buffer.extend_from_slice( - &http_chunk - .map_err(|_| Error::Stream("chunking error".into()))?, + &chunk.map_err(|_| Error::Stream("chunking error".into()))?, ); let mut iter = buffer.chunks_exact(L); for bytes in iter.by_ref() { let item: R = rkyv::from_bytes(bytes).map_err(|_| { Error::Stream("deserialization error".into()) })?; - if filter(&item)? { - output.push(item); - } + filter_collect(&item); } - remainder.clear(); remainder.extend_from_slice(iter.remainder()); - } else { - stream_finished = true; } - Ok((output, stream_finished)) + Ok(()) } /// Collects all items and returns them in a vector,