Skip to content

Commit

Permalink
Merge pull request #59 from dusk-network/multi-item-chunk-fix
Browse files Browse the repository at this point in the history
Fixed multi-item chunk problem
  • Loading branch information
miloszm authored Nov 28, 2023
2 parents 2e1407c + 8a164f5 commit 5df23dc
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 42 deletions.
41 changes: 16 additions & 25 deletions moat-core/src/citadel_queries/citadel_inquirer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,20 +106,15 @@ impl CitadelInquirer {
) -> Result<Vec<(u64, License)>, Error> {
const ITEM_LEN: usize = CitadelInquirer::GET_LICENSES_ITEM_LEN;
let mut pairs = vec![];
loop {
let r = StreamAux::find_item::<(u64, Vec<u8>), ITEM_LEN>(
|(_, lic_vec)| {
let license = Self::deserialise_license(lic_vec);
Ok(ssk_user.view_key().owns(&license.lsa))
},
stream,
);
if r.is_err() {
break;
}
let (pos, lic_ser) = r?;
pairs.push((pos, Self::deserialise_license(&lic_ser)))
}
StreamAux::find_items::<(u64, Vec<u8>), ITEM_LEN>(
|(pos, lic_vec)| {
let license = Self::deserialise_license(lic_vec);
if ssk_user.view_key().owns(&license.lsa) {
pairs.push((*pos, license));
};
},
stream,
)?;
Ok(pairs)
}

Expand All @@ -131,17 +126,13 @@ impl CitadelInquirer {
) -> Result<Vec<(u64, License)>, Error> {
const ITEM_LEN: usize = CitadelInquirer::GET_LICENSES_ITEM_LEN;
let mut pairs = vec![];
loop {
let r = StreamAux::find_item::<(u64, Vec<u8>), ITEM_LEN>(
|_| Ok(true),
stream,
);
if r.is_err() {
break;
}
let (pos, lic_ser) = r?;
pairs.push((pos, Self::deserialise_license(&lic_ser)))
}
StreamAux::find_items::<(u64, Vec<u8>), ITEM_LEN>(
|(pos, lic_vec)| {
let license = Self::deserialise_license(lic_vec);
pairs.push((*pos, license));
},
stream,
)?;
Ok(pairs)
}
}
32 changes: 15 additions & 17 deletions moat-core/src/contract_queries/stream_aux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,38 +16,36 @@ use rkyv::{Archive, Deserialize, Infallible};
pub struct StreamAux;

impl StreamAux {
/// Finds and returns first item for which
/// Finds and returns items for which
/// the given filter returns true,
/// returns error if no such item was found
pub fn find_item<R, const L: usize>(
filter: impl Fn(&R) -> Result<bool, Error>,
pub fn find_items<R, const L: usize>(
mut filter_collect: impl FnMut(&R),
stream: &mut (impl futures_core::Stream<Item = Result<Bytes, reqwest::Error>>
+ std::marker::Unpin),
) -> Result<R, Error>
) -> Result<(), Error>
where
R: Archive,
R: Archive + Clone,
R::Archived: Deserialize<R, Infallible>
+ for<'b> CheckBytes<DefaultValidator<'b>>
+ Deserialize<R, SharedDeserializeMap>,
{
let mut buffer = vec![];
while let Some(http_chunk) = stream.next().wait() {
let mut remainder = Vec::<u8>::new();
while let Some(chunk) = stream.next().wait() {
let mut buffer = vec![];
buffer.append(&mut remainder);
buffer.extend_from_slice(
&http_chunk
.map_err(|_| Error::Stream("chunking error".into()))?,
&chunk.map_err(|_| Error::Stream("chunking error".into()))?,
);
let mut chunk = buffer.chunks_exact(L);
for bytes in chunk.by_ref() {
let mut iter = buffer.chunks_exact(L);
for bytes in iter.by_ref() {
let item: R = rkyv::from_bytes(bytes).map_err(|_| {
Error::Stream("deserialization error".into())
})?;
if filter(&item)? {
return Ok(item);
}
filter_collect(&item);
}
buffer = chunk.remainder().to_vec();
remainder.extend_from_slice(iter.remainder());
}
Err(Error::Stream("item not found".into()))
Ok(())
}

/// Collects all items and returns them in a vector,
Expand Down

0 comments on commit 5df23dc

Please sign in to comment.