Skip to content

Commit

Permalink
iterator: typed API for new deserialization framework
Browse files Browse the repository at this point in the history
This commit finishes the work related to adjusting the iterators module
to the new deserialization framework.

The previous commit brought RawRowsLendingStream, which can deserialize
ColumnIterators. This commit introduces new TypedRowLendingStream, which
type-checks once and then deserializes from ColumnIterators into rows.
RawRowsLendingStream can be converted to TypedRowLendingStream by
calling the `into_typed()` method.

Unfortunately, due to the limitations of the Stream trait (no support
for lending streams, analogous to lending iterators in case of
RawRowsLendingIterator), a Stream cannot be used to deserialize borrowed
types (i.e. those that borrow from the frame serialized contents).

In order to give users both capabilities:
1) deserializing borrowed types (for efficiency),
2) deserializing using Stream (for convienience),
two distinct types are used: TypedRowLendingStream and TypedRowStream.
The first supports borrowed types and the second implements Stream.

To sum up, instead of `RowIterator` (returning `Row`s) and
`TypedRowIterator` (returning instances of the target type) both
implementing `Stream`, now we have the following:
- `RawRowsLendingStream`
  - cannot implement `Stream`, because returns `ColumnIterator`s that
    borrow from it,
  - provide `type_check()` and `next()` methods that can be used for
    low-level, manual deserialization (not recommended for ordinary
    users)
  - supports deserializing manually borrowed types (such as `&str`).
- `TypedRowLendingStream`
  - created by calling `into_typed::<TargetType>()` on `RawIterator`,
  - type checks upon creation,
  - supports deserializing borrowed types (such as `&str`),
  - does not implement `Stream` in order to support borrowed types,
  - provides basic Stream-like methods (`next()`, `try_next()`),
- `TypedRowStream`
  - created by calling `into_stream()` on `TypedRowLendingStream`,
  - implements `Stream` and hence does not support borrowed types.

Co-authored-by: Piotr Dulikowski <[email protected]>
  • Loading branch information
wprzytula and piodul committed Nov 6, 2024
1 parent ec5af4a commit 0fe6890
Showing 1 changed file with 153 additions and 1 deletion.
154 changes: 153 additions & 1 deletion scylla/src/transport/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use scylla_cql::frame::response::result::RawMetadataAndRawRows;
use scylla_cql::frame::response::NonErrorResponse;
use scylla_cql::types::deserialize::result::RawRowLendingIterator;
use scylla_cql::types::deserialize::row::{ColumnIterator, DeserializeRow};
use scylla_cql::types::deserialize::TypeCheckError;
use scylla_cql::types::serialize::row::SerializedValues;
use std::result::Result;
use thiserror::Error;
Expand Down Expand Up @@ -540,7 +541,10 @@ where
/// An intermediate object that allows to construct an iterator over a query
/// that is asynchronously paged in the background.
///
/// TODO: implement and describe the new API
/// Before the results can be processed in a convenient way, the QueryPager
/// needs to be cast into a typed iterator. This is done by use of `into_typed()` method.
/// As the method is generic over the target type, the turbofish syntax
/// can come in handy there, e.g. `raw_iter.into_typed::<(i32, &str, Uuid)>()`.
///
/// A pre-0.15.0 interface is also available, although deprecated:
/// `into_legacy()` method converts QueryPager to LegacyRowIterator,
Expand Down Expand Up @@ -625,6 +629,48 @@ impl QueryPager {
Poll::Ready(Some(Ok(())))
}

/// Type-checks the iterator against given type.
///
/// This is automatically called upon transforming [QueryPager] into [TypedRowLendingStream].
/// Can be used with `next()` for manual deserialization. See `next()` for an example.
#[inline]
pub fn type_check<'frame, 'metadata, RowT: DeserializeRow<'frame, 'metadata>>(
&self,
) -> Result<(), TypeCheckError> {
RowT::type_check(self.column_specs().inner())
}

/// Casts the iterator to a given row type, enabling Stream'ed operations
/// on rows, which deserialize them on-the-fly to that given type.
/// It allows deserializing borrowed types, but hence cannot implement [Stream]
/// (because [Stream] is not lending).
/// Begins with performing type check.
#[inline]
pub fn rows_lending_stream<'frame, 'metadata, RowT: DeserializeRow<'frame, 'metadata>>(
self,
) -> Result<TypedRowLendingStream<RowT>, TypeCheckError>
where
'frame: 'metadata,
{
TypedRowLendingStream::<RowT>::new(self)
}

/// Casts the iterator to a given row type, enabling [Stream]'ed operations
/// on rows, which deserialize them on-the-fly to that given type.
/// It only allows deserializing owned types, because [Stream] is not lending.
/// Begins with performing type check.
#[inline]
pub fn rows_stream<'frame, 'metadata, RowT: 'static + DeserializeRow<'frame, 'metadata>>(
self,
) -> Result<TypedRowStream<RowT>, TypeCheckError>
where
'frame: 'metadata,
{
TypedRowLendingStream::<RowT>::new(self).map(|typed_row_lending_stream| TypedRowStream {
typed_row_lending_stream,
})
}

/// Converts this iterator into an iterator over rows parsed as given type,
/// using the legacy deserialization framework.
/// This is inefficient, because all rows are being eagerly deserialized
Expand Down Expand Up @@ -936,6 +982,112 @@ impl QueryPager {
}
}

/// Returned by [QueryPager::rows_lending_stream].
///
/// Does not implement [Stream], but permits deserialization of borrowed types.
/// To use [Stream] API (only accessible for owned types), use [QueryPager::rows_stream].
pub struct TypedRowLendingStream<RowT> {
raw_row_lending_stream: QueryPager,
_phantom: std::marker::PhantomData<RowT>,
}

impl<RowT> Unpin for TypedRowLendingStream<RowT> {}

impl<RowT> TypedRowLendingStream<RowT> {
/// If tracing was enabled, returns tracing ids of all finished page queries.
#[inline]
pub fn tracing_ids(&self) -> &[Uuid] {
self.raw_row_lending_stream.tracing_ids()
}

/// Returns specification of row columns
#[inline]
pub fn column_specs(&self) -> ColumnSpecs {
self.raw_row_lending_stream.column_specs()
}
}

impl<'frame, 'metadata, RowT> TypedRowLendingStream<RowT>
where
'frame: 'metadata,
RowT: DeserializeRow<'frame, 'metadata>,
{
fn new(raw_stream: QueryPager) -> Result<Self, TypeCheckError> {
raw_stream.type_check::<RowT>()?;

Ok(Self {
raw_row_lending_stream: raw_stream,
_phantom: Default::default(),
})
}

/// Stream-like next() implementation for TypedRowLendingStream.
///
/// It also works with borrowed types! For example, &str is supported.
/// However, this is not a Stream. To create a Stream, use `into_stream()`.
#[inline]
pub async fn next(&'frame mut self) -> Option<Result<RowT, QueryError>> {
self.raw_row_lending_stream.next().await.map(|res| {
res.and_then(|column_iterator| {
<RowT as DeserializeRow>::deserialize(column_iterator)
.map_err(|err| RowsParseError::from(err).into())
})
})
}

/// Stream-like try_next() implementation for TypedRowLendingStream.
///
/// It also works with borrowed types! For example, &str is supported.
/// However, this is not a Stream. To create a Stream, use `into_stream()`.
#[inline]
pub async fn try_next(&'frame mut self) -> Result<Option<RowT>, QueryError> {
self.next().await.transpose()
}
}

/// Returned by [QueryPager::rows_stream].
///
/// Implements [Stream], but only permits deserialization of owned types.
/// To use [Stream] API (only accessible for owned types), use [QueryPager::rows_stream].
pub struct TypedRowStream<RowT: 'static> {
typed_row_lending_stream: TypedRowLendingStream<RowT>,
}

impl<RowT> Unpin for TypedRowStream<RowT> {}

impl<RowT> TypedRowStream<RowT> {
/// If tracing was enabled, returns tracing ids of all finished page queries.
#[inline]
pub fn tracing_ids(&self) -> &[Uuid] {
self.typed_row_lending_stream.tracing_ids()
}

/// Returns specification of row columns
#[inline]
pub fn column_specs(&self) -> ColumnSpecs {
self.typed_row_lending_stream.column_specs()
}
}

/// Stream implementation for TypedRowStream.
///
/// It only works with owned types! For example, &str is not supported.
impl<RowT> Stream for TypedRowStream<RowT>
where
RowT: for<'r> DeserializeRow<'r, 'r>,
{
type Item = Result<RowT, QueryError>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut s = self.as_mut();

let next_fut = s.typed_row_lending_stream.next();
futures::pin_mut!(next_fut);
let value = ready_some_ok!(next_fut.poll(cx));
Poll::Ready(Some(Ok(value)))
}
}

mod legacy {
use super::*;

Expand Down

0 comments on commit 0fe6890

Please sign in to comment.