Skip to content

Commit

Permalink
Fixed remainder processing
Browse files Browse the repository at this point in the history
  • Loading branch information
miloszm committed Nov 27, 2023
1 parent cf7dd10 commit 34ed15b
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 28 deletions.
40 changes: 15 additions & 25 deletions moat-core/src/citadel_queries/citadel_inquirer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,20 +106,15 @@ impl CitadelInquirer {
) -> Result<Vec<(u64, License)>, Error> {
const ITEM_LEN: usize = CitadelInquirer::GET_LICENSES_ITEM_LEN;
let mut pairs = vec![];
loop {
let v = StreamAux::find_items::<(u64, Vec<u8>), ITEM_LEN>(
|(_, lic_vec)| {
let license = Self::deserialise_license(lic_vec);
Ok(ssk_user.view_key().owns(&license.lsa))
},
stream,
)?;
if v.is_empty() {
break;
}
for (pos, lic_ser) in v.iter() {
pairs.push((*pos, Self::deserialise_license(lic_ser)))
}
let v = StreamAux::find_items::<(u64, Vec<u8>), ITEM_LEN>(
|(_, lic_vec)| {
let license = Self::deserialise_license(lic_vec);
Ok(ssk_user.view_key().owns(&license.lsa))
},
stream,
)?;
for (pos, lic_ser) in v.iter() {
pairs.push((*pos, Self::deserialise_license(lic_ser)))
}
Ok(pairs)
}
Expand All @@ -132,17 +127,12 @@ impl CitadelInquirer {
) -> Result<Vec<(u64, License)>, Error> {
const ITEM_LEN: usize = CitadelInquirer::GET_LICENSES_ITEM_LEN;
let mut pairs = vec![];
loop {
let v = StreamAux::find_items::<(u64, Vec<u8>), ITEM_LEN>(
|_| Ok(true),
stream,
)?;
if v.is_empty() {
break;
}
for (pos, lic_ser) in v.iter() {
pairs.push((*pos, Self::deserialise_license(lic_ser)))
}
let v = StreamAux::find_items::<(u64, Vec<u8>), ITEM_LEN>(
|_| Ok(true),
stream,
)?;
for (pos, lic_ser) in v.iter() {
pairs.push((*pos, Self::deserialise_license(lic_ser)))
}
Ok(pairs)
}
Expand Down
37 changes: 34 additions & 3 deletions moat-core/src/contract_queries/stream_aux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,60 @@ impl StreamAux {
stream: &mut (impl futures_core::Stream<Item = Result<Bytes, reqwest::Error>>
+ std::marker::Unpin),
) -> Result<Vec<R>, Error>
where
R: Archive + Clone,
R::Archived: Deserialize<R, Infallible>
+ for<'b> CheckBytes<DefaultValidator<'b>>
+ Deserialize<R, SharedDeserializeMap>,
{
let mut remainder = Vec::<u8>::new();
let mut items = vec![];
loop {
let (v, stop) =
Self::do_find_items::<R, L>(&filter, stream, &mut remainder)?;
items.extend_from_slice(v.as_slice());
if stop {
break;
}
}
Ok(items)
}

fn do_find_items<R, const L: usize>(
filter: &impl Fn(&R) -> Result<bool, Error>,
stream: &mut (impl futures_core::Stream<Item = Result<Bytes, reqwest::Error>>
+ std::marker::Unpin),
remainder: &mut Vec<u8>,
) -> Result<(Vec<R>, bool), Error>
where
R: Archive,
R::Archived: Deserialize<R, Infallible>
+ for<'b> CheckBytes<DefaultValidator<'b>>
+ Deserialize<R, SharedDeserializeMap>,
{
let mut output = vec![];
let mut buffer = vec![];
let mut stream_finished = false;
if let Some(http_chunk) = stream.next().wait() {
let mut buffer = remainder.clone();
buffer.extend_from_slice(
&http_chunk
.map_err(|_| Error::Stream("chunking error".into()))?,
);
for bytes in buffer.chunks_exact(L) {
let mut iter = buffer.chunks_exact(L);
for bytes in iter.by_ref() {
let item: R = rkyv::from_bytes(bytes).map_err(|_| {
Error::Stream("deserialization error".into())
})?;
if filter(&item)? {
output.push(item);
}
}
remainder.clear();
remainder.extend_from_slice(iter.remainder());
} else {
stream_finished = true;
}
Ok(output)
Ok((output, stream_finished))
}

/// Collects all items and returns them in a vector,
Expand Down

0 comments on commit 34ed15b

Please sign in to comment.