Skip to content

Commit

Permalink
Fixed multi-item chunk problem
Browse files Browse the repository at this point in the history
  • Loading branch information
miloszm committed Nov 24, 2023
1 parent 2e1407c commit cf7dd10
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 20 deletions.
22 changes: 12 additions & 10 deletions moat-core/src/citadel_queries/citadel_inquirer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,18 +107,19 @@ impl CitadelInquirer {
const ITEM_LEN: usize = CitadelInquirer::GET_LICENSES_ITEM_LEN;
let mut pairs = vec![];
loop {
let r = StreamAux::find_item::<(u64, Vec<u8>), ITEM_LEN>(
let v = StreamAux::find_items::<(u64, Vec<u8>), ITEM_LEN>(
|(_, lic_vec)| {
let license = Self::deserialise_license(lic_vec);
Ok(ssk_user.view_key().owns(&license.lsa))
},
stream,
);
if r.is_err() {
)?;
if v.is_empty() {
break;
}
let (pos, lic_ser) = r?;
pairs.push((pos, Self::deserialise_license(&lic_ser)))
for (pos, lic_ser) in v.iter() {
pairs.push((*pos, Self::deserialise_license(lic_ser)))
}
}
Ok(pairs)
}
Expand All @@ -132,15 +133,16 @@ impl CitadelInquirer {
const ITEM_LEN: usize = CitadelInquirer::GET_LICENSES_ITEM_LEN;
let mut pairs = vec![];
loop {
let r = StreamAux::find_item::<(u64, Vec<u8>), ITEM_LEN>(
let v = StreamAux::find_items::<(u64, Vec<u8>), ITEM_LEN>(
|_| Ok(true),
stream,
);
if r.is_err() {
)?;
if v.is_empty() {
break;
}
let (pos, lic_ser) = r?;
pairs.push((pos, Self::deserialise_license(&lic_ser)))
for (pos, lic_ser) in v.iter() {
pairs.push((*pos, Self::deserialise_license(lic_ser)))
}
}
Ok(pairs)
}
Expand Down
18 changes: 8 additions & 10 deletions moat-core/src/contract_queries/stream_aux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,38 +16,36 @@ use rkyv::{Archive, Deserialize, Infallible};
pub struct StreamAux;

impl StreamAux {
/// Finds and returns first item for which
/// Finds and returns items for which
/// the given filter returns true,
/// returns error if no such item was found
pub fn find_item<R, const L: usize>(
pub fn find_items<R, const L: usize>(
filter: impl Fn(&R) -> Result<bool, Error>,
stream: &mut (impl futures_core::Stream<Item = Result<Bytes, reqwest::Error>>
+ std::marker::Unpin),
) -> Result<R, Error>
) -> Result<Vec<R>, Error>
where
R: Archive,
R::Archived: Deserialize<R, Infallible>
+ for<'b> CheckBytes<DefaultValidator<'b>>
+ Deserialize<R, SharedDeserializeMap>,
{
let mut output = vec![];
let mut buffer = vec![];
while let Some(http_chunk) = stream.next().wait() {
if let Some(http_chunk) = stream.next().wait() {
buffer.extend_from_slice(
&http_chunk
.map_err(|_| Error::Stream("chunking error".into()))?,
);
let mut chunk = buffer.chunks_exact(L);
for bytes in chunk.by_ref() {
for bytes in buffer.chunks_exact(L) {
let item: R = rkyv::from_bytes(bytes).map_err(|_| {
Error::Stream("deserialization error".into())
})?;
if filter(&item)? {
return Ok(item);
output.push(item);
}
}
buffer = chunk.remainder().to_vec();
}
Err(Error::Stream("item not found".into()))
Ok(output)
}

/// Collects all items and returns them in a vector,
Expand Down

0 comments on commit cf7dd10

Please sign in to comment.