Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Database row fetch #345

Merged
merged 2 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions collab-database/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,14 +529,15 @@ impl Database {
pub fn init_database_rows<'a, T: Into<RowId> + Send + 'a>(
&'a self,
row_ids: Vec<T>,
chunk_size: usize,
cancel_token: Option<CancellationToken>,
) -> impl Stream<Item = Result<Arc<RwLock<DatabaseRow>>, DatabaseError>> + 'a {
let row_ids_chunk_stream = stream::iter(
row_ids
.into_iter()
.map(Into::into)
.collect::<Vec<RowId>>()
.chunks(50)
.chunks(chunk_size)
.map(|chunk| chunk.to_vec())
.collect::<Vec<Vec<RowId>>>(),
);
Expand All @@ -552,6 +553,7 @@ impl Database {
}
}

trace!("Initializing chunked database rows: {}", chunk.len());
self.body.block.init_database_rows(chunk).await
}
})
Expand Down Expand Up @@ -595,11 +597,12 @@ impl Database {
pub async fn get_rows_for_view(
&self,
view_id: &str,
chunk_size: usize,
cancel_token: Option<CancellationToken>,
) -> impl Stream<Item = Result<Row, DatabaseError>> + '_ {
let row_orders = self.get_row_orders_for_view(view_id);
self
.get_rows_from_row_orders(&row_orders, cancel_token)
.get_rows_from_row_orders(&row_orders, chunk_size, cancel_token)
.await
}

Expand All @@ -623,10 +626,11 @@ impl Database {
pub async fn get_rows_from_row_orders<'a>(
&'a self,
row_orders: &[RowOrder],
chunk_size: usize,
cancel_token: Option<CancellationToken>,
) -> impl Stream<Item = Result<Row, DatabaseError>> + 'a {
let row_ids = row_orders.iter().map(|order| order.id.clone()).collect();
let rows_stream = self.init_database_rows(row_ids, cancel_token);
let rows_stream = self.init_database_rows(row_ids, chunk_size, cancel_token);
let database_id = self.get_database_id();
rows_stream.then(move |result| {
let database_id = database_id.clone();
Expand Down Expand Up @@ -1341,7 +1345,7 @@ impl Database {
let inline_view_id = self.body.get_inline_view_id(&txn);
let views = self.get_all_views();
let fields = self.body.get_fields_in_view(&txn, &inline_view_id, None);
let rows_stream = self.get_all_rows(None).await;
let rows_stream = self.get_all_rows(20, None).await;
let rows: Vec<Row> = rows_stream
.filter_map(|result| async move { result.ok() })
.collect()
Expand Down Expand Up @@ -1372,6 +1376,7 @@ impl Database {

pub async fn get_all_rows(
&self,
chunk_size: usize,
cancel_token: Option<CancellationToken>,
) -> impl Stream<Item = Result<Row, DatabaseError>> + '_ {
let row_orders = {
Expand All @@ -1381,12 +1386,12 @@ impl Database {
};

self
.get_rows_from_row_orders(&row_orders, cancel_token)
.get_rows_from_row_orders(&row_orders, chunk_size, cancel_token)
.await
}

pub async fn collect_all_rows(&self) -> Vec<Result<Row, DatabaseError>> {
let rows_stream = self.get_all_rows(None).await;
let rows_stream = self.get_all_rows(20, None).await;
rows_stream.collect::<Vec<_>>().await
}

Expand Down
2 changes: 1 addition & 1 deletion collab-database/tests/database_test/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct DatabaseTest {

impl DatabaseTest {
pub async fn get_rows_for_view(&self, view_id: &str) -> Vec<Row> {
let rows_stream = self.database.get_rows_for_view(view_id, None).await;
let rows_stream = self.database.get_rows_for_view(view_id, 10, None).await;
let rows: Vec<Row> = rows_stream
.filter_map(|result| async move { result.ok() })
.collect()
Expand Down
2 changes: 1 addition & 1 deletion collab-database/tests/database_test/view_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async fn create_initial_database_test() {
let database_test = create_database(1, &database_id);

let all_rows: Vec<Row> = database_test
.get_all_rows(None)
.get_all_rows(20, None)
.await
.filter_map(|result| async move { result.ok() })
.collect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ async fn create_template_test() {

// Assert num of rows
let rows: Vec<Row> = database
.get_all_rows(None)
.get_all_rows(10, None)
.await
.filter_map(|result| async move { result.ok() })
.collect()
Expand Down
2 changes: 1 addition & 1 deletion collab-database/tests/template_test/import_csv_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async fn import_csv_test() {

let fields = database.get_fields_in_view(&database.get_inline_view_id(), None);
let rows: Vec<Row> = database
.get_all_rows(None)
.get_all_rows(20, None)
.await
.filter_map(|result| async move { result.ok() })
.collect()
Expand Down
12 changes: 6 additions & 6 deletions collab-database/tests/user_test/database_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ async fn duplicate_database_inline_view_test() {
.unwrap();

assert_eq!(
db.get_rows_for_view(&duplicated_view_id, None)
db.get_rows_for_view(&duplicated_view_id, 20, None)
.await
.count()
.await,
Expand All @@ -140,7 +140,7 @@ async fn duplicate_database_inline_view_test() {
database
.read()
.await
.get_rows_for_view("v1", None)
.get_rows_for_view("v1", 10, None)
.await
.count()
.await,
Expand Down Expand Up @@ -185,13 +185,13 @@ async fn duplicate_database_view_test() {

// Duplicated database should have the same rows as the original database
assert_eq!(
db.get_rows_for_view(&duplicated_view.id, None)
db.get_rows_for_view(&duplicated_view.id, 10, None)
.await
.count()
.await,
1
);
assert_eq!(db.get_rows_for_view("v1", None).await.count().await, 1);
assert_eq!(db.get_rows_for_view("v1", 10, None).await.count().await, 1);
}

#[tokio::test]
Expand Down Expand Up @@ -283,14 +283,14 @@ async fn duplicate_database_data_test() {

// compare rows
let original_rows: Vec<Row> = original
.get_rows_for_view("v1", None)
.get_rows_for_view("v1", 10, None)
.await
.filter_map(|result| async { result.ok() })
.collect()
.await;

let duplicate_rows: Vec<Row> = duplicate
.get_rows_for_view(duplicated_view_id, None)
.get_rows_for_view(duplicated_view_id, 10, None)
.await
.filter_map(|result| async { result.ok() })
.collect()
Expand Down
Loading