From 85f5e05baa99881d20bf6418a881f92c03116dfb Mon Sep 17 00:00:00 2001 From: nathan Date: Fri, 6 Dec 2024 20:56:03 +0800 Subject: [PATCH 1/2] chore: chunk rows --- collab-database/src/database.rs | 17 +++++++++++------ collab-database/tests/database_test/helper.rs | 2 +- .../tests/database_test/view_test.rs | 2 +- .../tests/template_test/create_template_test.rs | 2 +- .../tests/template_test/import_csv_test.rs | 2 +- .../tests/user_test/database_test.rs | 12 ++++++------ 6 files changed, 21 insertions(+), 16 deletions(-) diff --git a/collab-database/src/database.rs b/collab-database/src/database.rs index ef30c99e8..1e5f61a47 100644 --- a/collab-database/src/database.rs +++ b/collab-database/src/database.rs @@ -529,6 +529,7 @@ impl Database { pub fn init_database_rows<'a, T: Into + Send + 'a>( &'a self, row_ids: Vec, + chunk_size: usize, cancel_token: Option, ) -> impl Stream>, DatabaseError>> + 'a { let row_ids_chunk_stream = stream::iter( @@ -536,7 +537,7 @@ impl Database { .into_iter() .map(Into::into) .collect::>() - .chunks(50) + .chunks(chunk_size) .map(|chunk| chunk.to_vec()) .collect::>>(), ); @@ -552,6 +553,7 @@ impl Database { } } + trace!("Initializing chunked database rows: {:?}", chunk); self.body.block.init_database_rows(chunk).await } }) @@ -595,11 +597,12 @@ impl Database { pub async fn get_rows_for_view( &self, view_id: &str, + chunk_size: usize, cancel_token: Option, ) -> impl Stream> + '_ { let row_orders = self.get_row_orders_for_view(view_id); self - .get_rows_from_row_orders(&row_orders, cancel_token) + .get_rows_from_row_orders(&row_orders, chunk_size, cancel_token) .await } @@ -623,10 +626,11 @@ impl Database { pub async fn get_rows_from_row_orders<'a>( &'a self, row_orders: &[RowOrder], + chunk_size: usize, cancel_token: Option, ) -> impl Stream> + 'a { let row_ids = row_orders.iter().map(|order| order.id.clone()).collect(); - let rows_stream = self.init_database_rows(row_ids, cancel_token); + let rows_stream = self.init_database_rows(row_ids, chunk_size, cancel_token); let database_id = self.get_database_id(); rows_stream.then(move |result| { let database_id = database_id.clone(); @@ -1341,7 +1345,7 @@ impl Database { let inline_view_id = self.body.get_inline_view_id(&txn); let views = self.get_all_views(); let fields = self.body.get_fields_in_view(&txn, &inline_view_id, None); - let rows_stream = self.get_all_rows(None).await; + let rows_stream = self.get_all_rows(20, None).await; let rows: Vec = rows_stream .filter_map(|result| async move { result.ok() }) .collect() @@ -1372,6 +1376,7 @@ impl Database { pub async fn get_all_rows( &self, + chunk_size: usize, cancel_token: Option, ) -> impl Stream> + '_ { let row_orders = { @@ -1381,12 +1386,12 @@ impl Database { }; self - .get_rows_from_row_orders(&row_orders, cancel_token) + .get_rows_from_row_orders(&row_orders, chunk_size, cancel_token) .await } pub async fn collect_all_rows(&self) -> Vec> { - let rows_stream = self.get_all_rows(None).await; + let rows_stream = self.get_all_rows(20, None).await; rows_stream.collect::>().await } diff --git a/collab-database/tests/database_test/helper.rs b/collab-database/tests/database_test/helper.rs index 264127807..52e6556b8 100644 --- a/collab-database/tests/database_test/helper.rs +++ b/collab-database/tests/database_test/helper.rs @@ -32,7 +32,7 @@ pub struct DatabaseTest { impl DatabaseTest { pub async fn get_rows_for_view(&self, view_id: &str) -> Vec { - let rows_stream = self.database.get_rows_for_view(view_id, None).await; + let rows_stream = self.database.get_rows_for_view(view_id, 10, None).await; let rows: Vec = rows_stream .filter_map(|result| async move { result.ok() }) .collect() diff --git a/collab-database/tests/database_test/view_test.rs b/collab-database/tests/database_test/view_test.rs index e5bfa477f..a1ce94194 100644 --- a/collab-database/tests/database_test/view_test.rs +++ b/collab-database/tests/database_test/view_test.rs @@ -23,7 +23,7 @@ async fn create_initial_database_test() { let database_test = create_database(1, &database_id); let all_rows: Vec = database_test - .get_all_rows(None) + .get_all_rows(20, None) .await .filter_map(|result| async move { result.ok() }) .collect() diff --git a/collab-database/tests/template_test/create_template_test.rs b/collab-database/tests/template_test/create_template_test.rs index 0c5ad987a..f3aa767f9 100644 --- a/collab-database/tests/template_test/create_template_test.rs +++ b/collab-database/tests/template_test/create_template_test.rs @@ -130,7 +130,7 @@ async fn create_template_test() { // Assert num of rows let rows: Vec = database - .get_all_rows(None) + .get_all_rows(10, None) .await .filter_map(|result| async move { result.ok() }) .collect() diff --git a/collab-database/tests/template_test/import_csv_test.rs b/collab-database/tests/template_test/import_csv_test.rs index 36920534e..1eedbdfa5 100644 --- a/collab-database/tests/template_test/import_csv_test.rs +++ b/collab-database/tests/template_test/import_csv_test.rs @@ -16,7 +16,7 @@ async fn import_csv_test() { let fields = database.get_fields_in_view(&database.get_inline_view_id(), None); let rows: Vec = database - .get_all_rows(None) + .get_all_rows(20, None) .await .filter_map(|result| async move { result.ok() }) .collect() diff --git a/collab-database/tests/user_test/database_test.rs b/collab-database/tests/user_test/database_test.rs index f09781483..1a2935bfc 100644 --- a/collab-database/tests/user_test/database_test.rs +++ b/collab-database/tests/user_test/database_test.rs @@ -130,7 +130,7 @@ async fn duplicate_database_inline_view_test() { .unwrap(); assert_eq!( - db.get_rows_for_view(&duplicated_view_id, None) + db.get_rows_for_view(&duplicated_view_id, 20, None) .await .count() .await, @@ -140,7 +140,7 @@ async fn duplicate_database_inline_view_test() { database .read() .await - .get_rows_for_view("v1", None) + .get_rows_for_view("v1", 10, None) .await .count() .await, @@ -185,13 +185,13 @@ async fn duplicate_database_view_test() { // Duplicated database should have the same rows as the original database assert_eq!( - db.get_rows_for_view(&duplicated_view.id, None) + db.get_rows_for_view(&duplicated_view.id, 10, None) .await .count() .await, 1 ); - assert_eq!(db.get_rows_for_view("v1", None).await.count().await, 1); + assert_eq!(db.get_rows_for_view("v1", 10, None).await.count().await, 1); } #[tokio::test] @@ -283,14 +283,14 @@ async fn duplicate_database_data_test() { // compare rows let original_rows: Vec = original - .get_rows_for_view("v1", None) + .get_rows_for_view("v1", 10, None) .await .filter_map(|result| async { result.ok() }) .collect() .await; let duplicate_rows: Vec = duplicate - .get_rows_for_view(duplicated_view_id, None) + .get_rows_for_view(duplicated_view_id, 10, None) .await .filter_map(|result| async { result.ok() }) .collect() From 29e27063d94162d6c32d8bb95ea7e8abc0cf85bc Mon Sep 17 00:00:00 2001 From: nathan Date: Fri, 6 Dec 2024 23:29:36 +0800 Subject: [PATCH 2/2] chore: log --- collab-database/src/database.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/collab-database/src/database.rs b/collab-database/src/database.rs index 1e5f61a47..a6e798415 100644 --- a/collab-database/src/database.rs +++ b/collab-database/src/database.rs @@ -553,7 +553,7 @@ impl Database { } } - trace!("Initializing chunked database rows: {:?}", chunk); + trace!("Initializing chunked database rows: {}", chunk.len()); self.body.block.init_database_rows(chunk).await } })