diff --git a/Cargo.lock b/Cargo.lock index 5c9851d77..0a056442c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "access-control" @@ -539,9 +539,9 @@ checksum = "1bec1de6f59aedf83baf9ff929c98f2ad654b97c9510f4e70cf6f661d49fd5b1" [[package]] name = "anyhow" -version = "1.0.89" +version = "1.0.94" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86fdf8605db99b54d3cd748a44c6d04df638eb5dafb219b135d0149bd0db01f6" +checksum = "c1fd03a028ef38ba2276dce7e33fcd6369c158a1bca17946c4b1b701891c1ff7" [[package]] name = "app-error" @@ -2133,7 +2133,7 @@ dependencies = [ [[package]] name = "collab" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c4e23b6d6b1ba320c74b40e557069e99dd0377e7#c4e23b6d6b1ba320c74b40e557069e99dd0377e7" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=128f3a81ea86a58e355615b1c00edbf861867886#128f3a81ea86a58e355615b1c00edbf861867886" dependencies = [ "anyhow", "arc-swap", @@ -2158,7 +2158,7 @@ dependencies = [ [[package]] name = "collab-database" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c4e23b6d6b1ba320c74b40e557069e99dd0377e7#c4e23b6d6b1ba320c74b40e557069e99dd0377e7" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=128f3a81ea86a58e355615b1c00edbf861867886#128f3a81ea86a58e355615b1c00edbf861867886" dependencies = [ "anyhow", "async-trait", @@ -2197,7 +2197,7 @@ dependencies = [ [[package]] name = "collab-document" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c4e23b6d6b1ba320c74b40e557069e99dd0377e7#c4e23b6d6b1ba320c74b40e557069e99dd0377e7" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=128f3a81ea86a58e355615b1c00edbf861867886#128f3a81ea86a58e355615b1c00edbf861867886" dependencies = [ "anyhow", "arc-swap", @@ -2218,7 +2218,7 @@ dependencies = [ [[package]] name = "collab-entity" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c4e23b6d6b1ba320c74b40e557069e99dd0377e7#c4e23b6d6b1ba320c74b40e557069e99dd0377e7" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=128f3a81ea86a58e355615b1c00edbf861867886#128f3a81ea86a58e355615b1c00edbf861867886" dependencies = [ "anyhow", "bytes", @@ -2238,7 +2238,7 @@ dependencies = [ [[package]] name = "collab-folder" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c4e23b6d6b1ba320c74b40e557069e99dd0377e7#c4e23b6d6b1ba320c74b40e557069e99dd0377e7" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=128f3a81ea86a58e355615b1c00edbf861867886#128f3a81ea86a58e355615b1c00edbf861867886" dependencies = [ "anyhow", "arc-swap", @@ -2260,7 +2260,7 @@ dependencies = [ [[package]] name = "collab-importer" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c4e23b6d6b1ba320c74b40e557069e99dd0377e7#c4e23b6d6b1ba320c74b40e557069e99dd0377e7" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=128f3a81ea86a58e355615b1c00edbf861867886#128f3a81ea86a58e355615b1c00edbf861867886" dependencies = [ "anyhow", "async-recursion", @@ -2363,7 +2363,7 @@ dependencies = [ [[package]] name = "collab-user" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c4e23b6d6b1ba320c74b40e557069e99dd0377e7#c4e23b6d6b1ba320c74b40e557069e99dd0377e7" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=128f3a81ea86a58e355615b1c00edbf861867886#128f3a81ea86a58e355615b1c00edbf861867886" dependencies = [ "anyhow", "collab", diff --git a/Cargo.toml b/Cargo.toml index a51288b96..674c07ce5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,7 +47,7 @@ chrono = { version = "0.4.37", features = [ derive_more = { version = "0.99" } secrecy.workspace = true rand = { version = "0.8", features = ["std_rng"] } -anyhow = "1.0.79" +anyhow.workspace = true thiserror = "1.0.56" reqwest = { workspace = true, features = [ "json", @@ -248,7 +248,7 @@ serde = { version = "1.0.195", features = ["derive"] } bytes = "1.5.0" workspace-template = { path = "libs/workspace-template" } uuid = { version = "1.6.1", features = ["v4", "v5"] } -anyhow = "1.0.79" +anyhow = "1.0.94" actix = "0.13.3" actix-web = { version = "4.5.1", default-features = false, features = [ "openssl", @@ -312,13 +312,13 @@ lto = false # Disable Link-Time Optimization [patch.crates-io] # It's diffcult to resovle different version with the same crate used in AppFlowy Frontend and the Client-API crate. # So using patch to workaround this issue. -collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c4e23b6d6b1ba320c74b40e557069e99dd0377e7" } -collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c4e23b6d6b1ba320c74b40e557069e99dd0377e7" } -collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c4e23b6d6b1ba320c74b40e557069e99dd0377e7" } -collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c4e23b6d6b1ba320c74b40e557069e99dd0377e7" } -collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c4e23b6d6b1ba320c74b40e557069e99dd0377e7" } -collab-database = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c4e23b6d6b1ba320c74b40e557069e99dd0377e7" } -collab-importer = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c4e23b6d6b1ba320c74b40e557069e99dd0377e7" } +collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "128f3a81ea86a58e355615b1c00edbf861867886" } +collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "128f3a81ea86a58e355615b1c00edbf861867886" } +collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "128f3a81ea86a58e355615b1c00edbf861867886" } +collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "128f3a81ea86a58e355615b1c00edbf861867886" } +collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "128f3a81ea86a58e355615b1c00edbf861867886" } +collab-database = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "128f3a81ea86a58e355615b1c00edbf861867886" } +collab-importer = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "128f3a81ea86a58e355615b1c00edbf861867886" } [features] history = [] diff --git a/admin_frontend/Cargo.toml b/admin_frontend/Cargo.toml index 3d1cd4079..e292ff1c1 100644 --- a/admin_frontend/Cargo.toml +++ b/admin_frontend/Cargo.toml @@ -12,7 +12,7 @@ gotrue-entity = { path = "../libs/gotrue-entity" } database-entity = { path = "../libs/database-entity" } shared-entity = { path = "../libs/shared-entity" } -anyhow = "1.0.79" +anyhow.workspace = true axum = { version = "0.7", features = ["json"] } tokio = { version = "1.36", features = ["rt-multi-thread", "macros"] } askama = "0.12" diff --git a/libs/client-api/src/http_chat.rs b/libs/client-api/src/http_chat.rs index cc5ea5834..36f015a8b 100644 --- a/libs/client-api/src/http_chat.rs +++ b/libs/client-api/src/http_chat.rs @@ -320,7 +320,7 @@ impl Stream for QuestionStream { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); - return match ready!(this.stream.as_mut().poll_next(cx)) { + match ready!(this.stream.as_mut().poll_next(cx)) { Some(Ok(value)) => match value { Value::Object(mut value) => { if let Some(metadata) = value.remove(STREAM_METADATA_KEY) { @@ -344,6 +344,6 @@ impl Stream for QuestionStream { }, Some(Err(err)) => Poll::Ready(Some(Err(err))), None => Poll::Ready(None), - }; + } } } diff --git a/libs/client-api/src/http_collab.rs b/libs/client-api/src/http_collab.rs index d1841167a..2214ad160 100644 --- a/libs/client-api/src/http_collab.rs +++ b/libs/client-api/src/http_collab.rs @@ -4,8 +4,8 @@ use app_error::AppError; use bytes::Bytes; use chrono::{DateTime, Utc}; use client_api_entity::workspace_dto::{ - AFDatabase, AFDatabaseField, AFDatabaseRow, AFDatabaseRowDetail, DatabaseRowUpdatedItem, - ListDatabaseRowDetailParam, ListDatabaseRowUpdatedParam, + AFDatabase, AFDatabaseField, AFDatabaseRow, AFDatabaseRowDetail, AFInsertDatabaseField, + DatabaseRowUpdatedItem, ListDatabaseRowDetailParam, ListDatabaseRowUpdatedParam, }; use client_api_entity::{ BatchQueryCollabParams, BatchQueryCollabResult, CollabParams, CreateCollabParams, @@ -210,6 +210,28 @@ impl Client { AppResponse::from_response(resp).await?.into_data() } + // Adds a database field to the specified database. + // Returns the field id of the newly created field. + pub async fn add_database_field( + &self, + workspace_id: &str, + database_id: &str, + insert_field: &AFInsertDatabaseField, + ) -> Result { + let url = format!( + "{}/api/workspace/{}/database/{}/fields", + self.base_url, workspace_id, database_id + ); + let resp = self + .http_client_with_auth(Method::POST, &url) + .await? + .json(insert_field) + .send() + .await?; + log_request_id(&resp); + AppResponse::from_response(resp).await?.into_data() + } + pub async fn list_database_row_ids_updated( &self, workspace_id: &str, @@ -250,6 +272,32 @@ impl Client { AppResponse::from_response(resp).await?.into_data() } + /// Example payload: + /// { + /// "Name": "some_data", # using column name + /// "_pIkG": "some other data" # using field_id (can be obtained from [get_database_fields]) + /// } + /// Upon success, returns the row id for the newly created row. + pub async fn add_database_item( + &self, + workspace_id: &str, + database_id: &str, + payload: &serde_json::Value, + ) -> Result { + let url = format!( + "{}/api/workspace/{}/database/{}/row", + self.base_url, workspace_id, database_id + ); + let resp = self + .http_client_with_auth(Method::POST, &url) + .await? + .json(&payload) + .send() + .await?; + log_request_id(&resp); + AppResponse::from_response(resp).await?.into_data() + } + #[instrument(level = "debug", skip_all, err)] pub async fn post_realtime_msg( &self, diff --git a/libs/client-api/src/http_file.rs b/libs/client-api/src/http_file.rs index ad4fe5546..1ef9a7312 100644 --- a/libs/client-api/src/http_file.rs +++ b/libs/client-api/src/http_file.rs @@ -290,7 +290,7 @@ impl WSClientConnectURLProvider for Client { /// /// # Returns /// A `Result` containing the base64-encoded MD5 hash on success, or an error if the file cannot be read. - +/// /// Asynchronously calculates the MD5 hash of a file using efficient buffer handling and returns it as a base64-encoded string. /// /// # Arguments diff --git a/libs/collab-stream/src/pubsub.rs b/libs/collab-stream/src/pubsub.rs index abe88038f..1bbe479ab 100644 --- a/libs/collab-stream/src/pubsub.rs +++ b/libs/collab-stream/src/pubsub.rs @@ -50,7 +50,7 @@ impl CollabStreamPub { #[instrument(level = "debug", skip_all, err)] pub async fn publish(&mut self, message: PubSubMessage) -> Result<(), StreamError> { - self.conn.publish(ACTIVE_COLLAB_CHANNEL, message).await?; + let () = self.conn.publish(ACTIVE_COLLAB_CHANNEL, message).await?; Ok(()) } } diff --git a/libs/collab-stream/src/stream.rs b/libs/collab-stream/src/stream.rs index a69222dc5..148bceb24 100644 --- a/libs/collab-stream/src/stream.rs +++ b/libs/collab-stream/src/stream.rs @@ -2,7 +2,7 @@ use crate::error::StreamError; use crate::model::{MessageId, StreamBinary, StreamMessage, StreamMessageByStreamKey}; use redis::aio::ConnectionManager; use redis::streams::{StreamMaxlen, StreamReadOptions}; -use redis::{pipe, AsyncCommands, RedisError}; +use redis::{pipe, AsyncCommands, Pipeline, RedisError}; pub struct CollabStream { connection_manager: ConnectionManager, @@ -34,9 +34,9 @@ impl CollabStream { let mut pipe = pipe(); for message in messages { let tuple = message.into_tuple_array(); - pipe.xadd(&self.stream_key, "*", tuple.as_slice()); + let _: &mut Pipeline = pipe.xadd(&self.stream_key, "*", tuple.as_slice()); } - pipe.query_async(&mut self.connection_manager).await?; + let () = pipe.query_async(&mut self.connection_manager).await?; Ok(()) } @@ -90,7 +90,7 @@ impl CollabStream { } pub async fn clear(&mut self) -> Result<(), RedisError> { - self + let () = self .connection_manager .xtrim(&self.stream_key, StreamMaxlen::Equals(0)) .await?; diff --git a/libs/collab-stream/src/stream_group.rs b/libs/collab-stream/src/stream_group.rs index 55fe754ec..8361f9eb7 100644 --- a/libs/collab-stream/src/stream_group.rs +++ b/libs/collab-stream/src/stream_group.rs @@ -7,7 +7,7 @@ use redis::streams::{ StreamClaimOptions, StreamClaimReply, StreamMaxlen, StreamPendingData, StreamPendingReply, StreamReadOptions, }; -use redis::{pipe, AsyncCommands, ErrorKind, RedisResult}; +use redis::{pipe, AsyncCommands, ErrorKind, Pipeline, RedisResult}; use tokio_util::sync::CancellationToken; use tracing::{error, info, trace, warn}; @@ -119,7 +119,7 @@ impl StreamGroup { .into_iter() .map(|m| m.to_string()) .collect::>(); - self + let () = self .connection_manager .xack(&self.stream_key, &self.group_name, &message_ids) .await?; @@ -164,7 +164,7 @@ impl StreamGroup { let message = message.into(); let tuple = message.into_tuple_array(); if let Some(len) = self.config.max_len { - pipe + let _: &mut Pipeline = pipe .cmd("XADD") .arg(&self.stream_key) .arg("MAXLEN") @@ -177,7 +177,7 @@ impl StreamGroup { } } - pipe.query_async(&mut self.connection_manager).await?; + let () = pipe.query_async(&mut self.connection_manager).await?; if let Err(err) = self.set_expiration().await { error!("set expiration fail: {:?}", err); } @@ -197,13 +197,13 @@ impl StreamGroup { let tuple = message.into_tuple_array(); match self.config.max_len { Some(max_len) => { - self + let () = self .connection_manager .xadd_maxlen(&self.stream_key, StreamMaxlen::Approx(max_len), "*", &tuple) .await?; }, None => { - self + let () = self .connection_manager .xadd(&self.stream_key, "*", tuple.as_slice()) .await?; @@ -369,7 +369,7 @@ impl StreamGroup { /// Use the `XTRIM` command to truncate the Redis stream to a maximum length of zero, effectively /// removing all entries from the stream. pub async fn clear(&mut self) -> Result<(), StreamError> { - self + let () = self .connection_manager .xtrim(&self.stream_key, StreamMaxlen::Equals(0)) .await?; @@ -393,7 +393,7 @@ impl StreamGroup { }; if should_set_expiration { - self + let () = self .connection_manager .expire(&self.stream_key, expire_time) .await?; diff --git a/libs/database/src/collab/collab_db_ops.rs b/libs/database/src/collab/collab_db_ops.rs index 4fdc0653f..41013164a 100644 --- a/libs/database/src/collab/collab_db_ops.rs +++ b/libs/database/src/collab/collab_db_ops.rs @@ -43,7 +43,6 @@ use uuid::Uuid; /// * There's a database operation failure. /// * There's an attempt to insert a row with an existing `object_id` but a different `workspace_id`. /// - #[inline] #[instrument(level = "trace", skip(tx, params), fields(oid=%params.object_id), err)] pub async fn insert_into_af_collab( diff --git a/libs/shared-entity/src/dto/workspace_dto.rs b/libs/shared-entity/src/dto/workspace_dto.rs index 84ca4708f..0d80fe58a 100644 --- a/libs/shared-entity/src/dto/workspace_dto.rs +++ b/libs/shared-entity/src/dto/workspace_dto.rs @@ -376,3 +376,10 @@ pub struct AFDatabaseField { pub type_option: HashMap, pub is_primary: bool, } + +#[derive(Default, Debug, Clone, Serialize, Deserialize)] +pub struct AFInsertDatabaseField { + pub name: String, + pub field_type: i64, // FieldType ID + pub type_option_data: Option, // TypeOptionData +} diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 51985806f..0193dee36 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,2 +1,2 @@ [toolchain] -channel = "1.78.0" +channel = "1.83.0" diff --git a/services/appflowy-collaborate/src/collab/cache/mem_cache.rs b/services/appflowy-collaborate/src/collab/cache/mem_cache.rs index 69215a887..c79f87277 100644 --- a/services/appflowy-collaborate/src/collab/cache/mem_cache.rs +++ b/services/appflowy-collaborate/src/collab/cache/mem_cache.rs @@ -32,7 +32,7 @@ impl CollabMemCache { pub async fn insert_collab_meta(&self, meta: CollabMetadata) -> Result<(), AppError> { let key = collab_meta_key(&meta.object_id); let value = serde_json::to_string(&meta)?; - self + let () = self .connection_manager .clone() .set_ex(key, value, ONE_MONTH) @@ -188,7 +188,7 @@ impl CollabMemCache { // for executing a subsequent transaction (with MULTI/EXEC). If any of the watched keys are // altered by another client before the current client executes EXEC, the transaction will be // aborted by Redis (the EXEC will return nil indicating the transaction was not processed). - redis::cmd("WATCH") + let () = redis::cmd("WATCH") .arg(&cache_object_id) .query_async::<_, ()>(&mut conn) .await?; @@ -228,7 +228,7 @@ impl CollabMemCache { .ignore() .expire(&cache_object_id, expiration_seconds.unwrap_or(SEVEN_DAYS) as i64) // Setting the expiration to 7 days .ignore(); - pipeline.query_async(&mut conn).await?; + let () = pipeline.query_async(&mut conn).await?; } Ok::<(), redis::RedisError>(()) } diff --git a/services/appflowy-collaborate/src/group/manager.rs b/services/appflowy-collaborate/src/group/manager.rs index fbaa95afa..e1032dd04 100644 --- a/services/appflowy-collaborate/src/group/manager.rs +++ b/services/appflowy-collaborate/src/group/manager.rs @@ -179,7 +179,7 @@ where self.edit_state_max_secs, indexer, )?); - self.state.insert_group(object_id, group.clone()); + self.state.insert_group(object_id, group); Ok(()) } } diff --git a/src/api/workspace.rs b/src/api/workspace.rs index 7c966beae..3cdf3b844 100644 --- a/src/api/workspace.rs +++ b/src/api/workspace.rs @@ -7,11 +7,13 @@ use anyhow::{anyhow, Context}; use bytes::BytesMut; use chrono::{DateTime, Duration, Utc}; use collab::entity::EncodedCollab; +use collab_database::entity::FieldType; use collab_entity::CollabType; use futures_util::future::try_join_all; use prost::Message as ProstMessage; use rayon::prelude::*; use sqlx::types::uuid; +use std::collections::HashMap; use std::time::Instant; use tokio_stream::StreamExt; @@ -259,11 +261,13 @@ pub fn workspace_scope() -> Scope { .service(web::resource("/{workspace_id}/database").route(web::get().to(list_database_handler))) .service( web::resource("/{workspace_id}/database/{database_id}/row") - .route(web::get().to(list_database_row_id_handler)), + .route(web::get().to(list_database_row_id_handler)) + .route(web::post().to(post_database_row_handler)), ) .service( web::resource("/{workspace_id}/database/{database_id}/fields") - .route(web::get().to(get_database_fields_handler)), + .route(web::get().to(get_database_fields_handler)) + .route(web::post().to(post_database_fields_handler)), ) .service( web::resource("/{workspace_id}/database/{database_id}/row/updated") @@ -1922,6 +1926,31 @@ async fn list_database_row_id_handler( Ok(Json(AppResponse::Ok().with_data(db_rows))) } +async fn post_database_row_handler( + user_uuid: UserUuid, + path_param: web::Path<(String, String)>, + state: Data, + cells_by_id: Json>, +) -> Result>> { + let (workspace_id, db_id) = path_param.into_inner(); + let uid = state.user_cache.get_user_uid(&user_uuid).await?; + state + .workspace_access_control + .enforce_action(&uid, &workspace_id, Action::Write) + .await?; + + let new_db_row_id = biz::collab::ops::insert_database_row( + &state.collab_access_control_storage, + &state.pg_pool, + &workspace_id, + &db_id, + uid, + cells_by_id.into_inner(), + ) + .await?; + Ok(Json(AppResponse::Ok().with_data(new_db_row_id))) +} + async fn get_database_fields_handler( user_uuid: UserUuid, path_param: web::Path<(String, String)>, @@ -1944,6 +1973,32 @@ async fn get_database_fields_handler( Ok(Json(AppResponse::Ok().with_data(db_fields))) } +async fn post_database_fields_handler( + user_uuid: UserUuid, + path_param: web::Path<(String, String)>, + state: Data, + field: Json, +) -> Result>> { + let (workspace_id, db_id) = path_param.into_inner(); + let uid = state.user_cache.get_user_uid(&user_uuid).await?; + state + .workspace_access_control + .enforce_action(&uid, &workspace_id, Action::Write) + .await?; + + let field_id = biz::collab::ops::add_database_field( + uid, + &state.collab_access_control_storage, + &state.pg_pool, + &workspace_id, + &db_id, + field.into_inner(), + ) + .await?; + + Ok(Json(AppResponse::Ok().with_data(field_id))) +} + async fn list_database_row_id_updated_handler( user_uuid: UserUuid, path_param: web::Path<(String, String)>, @@ -2005,12 +2060,15 @@ async fn list_database_row_details_handler( .enforce_action(&uid, &workspace_id, Action::Read) .await?; + static UNSUPPORTED_FIELD_TYPES: &[FieldType] = &[FieldType::Relation]; + let db_rows = biz::collab::ops::list_database_row_details( &state.collab_access_control_storage, uid, workspace_id, db_id, &row_ids, + UNSUPPORTED_FIELD_TYPES, ) .await?; Ok(Json(AppResponse::Ok().with_data(db_rows))) diff --git a/src/biz/collab/mod.rs b/src/biz/collab/mod.rs index dc877de24..8378f840f 100644 --- a/src/biz/collab/mod.rs +++ b/src/biz/collab/mod.rs @@ -1,3 +1,4 @@ pub mod folder_view; pub mod ops; pub mod publish_outline; +pub mod utils; diff --git a/src/biz/collab/ops.rs b/src/biz/collab/ops.rs index b27805660..56703bc88 100644 --- a/src/biz/collab/ops.rs +++ b/src/biz/collab/ops.rs @@ -5,14 +5,18 @@ use app_error::AppError; use appflowy_collaborate::collab::storage::CollabAccessControlStorage; use chrono::DateTime; use chrono::Utc; -use collab::core::collab::DataSource; use collab::preclude::Collab; -use collab_database::database::DatabaseBody; +use collab_database::database::gen_field_id; +use collab_database::database::gen_row_id; use collab_database::entity::FieldType; use collab_database::fields::Field; use collab_database::fields::TypeOptions; +use collab_database::rows::Cell; +use collab_database::rows::CreateRowParams; +use collab_database::rows::DatabaseRowBody; +use collab_database::rows::Row; use collab_database::rows::RowDetail; -use collab_database::workspace_database::NoPersistenceDatabaseCollabService; +use collab_database::views::OrderObjectPosition; use collab_database::workspace_database::WorkspaceDatabase; use collab_database::workspace_database::WorkspaceDatabaseBody; use collab_entity::CollabType; @@ -24,12 +28,14 @@ use database::collab::select_workspace_database_oid; use database::collab::{CollabStorage, GetCollabOrigin}; use database::publish::select_published_view_ids_for_workspace; use database::publish::select_workspace_id_for_publish_namespace; +use database_entity::dto::CollabParams; +use database_entity::dto::QueryCollab; use database_entity::dto::QueryCollabResult; -use database_entity::dto::{QueryCollab, QueryCollabParams}; use shared_entity::dto::workspace_dto::AFDatabase; use shared_entity::dto::workspace_dto::AFDatabaseField; use shared_entity::dto::workspace_dto::AFDatabaseRow; use shared_entity::dto::workspace_dto::AFDatabaseRowDetail; +use shared_entity::dto::workspace_dto::AFInsertDatabaseField; use shared_entity::dto::workspace_dto::DatabaseRowUpdatedItem; use shared_entity::dto::workspace_dto::FavoriteFolderView; use shared_entity::dto::workspace_dto::FolderViewMinimal; @@ -52,12 +58,25 @@ use database_entity::dto::{ UpdateCollabMemberParams, }; +use crate::biz::collab::utils::field_by_name_uniq; +use crate::biz::workspace::ops::broadcast_update; + use super::folder_view::collab_folder_to_folder_view; use super::folder_view::section_items_to_favorite_folder_view; use super::folder_view::section_items_to_recent_folder_view; use super::folder_view::section_items_to_trash_folder_view; use super::folder_view::to_dto_folder_view_miminal; use super::publish_outline::collab_folder_to_published_outline; +use super::utils::collab_from_doc_state; +use super::utils::collab_to_bin; +use super::utils::field_by_id_name_uniq; +use super::utils::get_database_body; +use super::utils::get_latest_collab; +use super::utils::get_latest_collab_encoded; +use super::utils::get_row_details_serde; +use super::utils::type_option_reader_by_id; +use super::utils::type_option_writer_by_id; +use super::utils::type_options_serde; /// Create a new collab member /// If the collab member already exists, return [AppError::RecordAlreadyExists] @@ -358,46 +377,6 @@ pub async fn get_latest_collab_folder( Ok(folder) } -pub async fn get_latest_collab_encoded( - collab_storage: &CollabAccessControlStorage, - collab_origin: GetCollabOrigin, - workspace_id: &str, - oid: &str, - collab_type: CollabType, -) -> Result { - collab_storage - .get_encode_collab( - collab_origin, - QueryCollabParams { - workspace_id: workspace_id.to_string(), - inner: QueryCollab { - object_id: oid.to_string(), - collab_type, - }, - }, - true, - ) - .await -} - -pub async fn get_latest_collab( - storage: &CollabAccessControlStorage, - origin: GetCollabOrigin, - workspace_id: &str, - oid: &str, - collab_type: CollabType, -) -> Result { - let ec = get_latest_collab_encoded(storage, origin, workspace_id, oid, collab_type).await?; - let collab: Collab = Collab::new_with_source(CollabOrigin::Server, oid, ec.into(), vec![], false) - .map_err(|e| { - AppError::Internal(anyhow::anyhow!( - "Failed to create collab from encoded collab: {:?}", - e - )) - })?; - Ok(collab) -} - pub async fn get_published_view( collab_storage: &CollabAccessControlStorage, publish_namespace: String, @@ -504,6 +483,153 @@ pub async fn list_database_row_ids( Ok(db_rows) } +pub async fn insert_database_row( + collab_storage: &CollabAccessControlStorage, + pg_pool: &PgPool, + workspace_uuid_str: &str, + database_uuid_str: &str, + uid: i64, + cell_value_by_id: HashMap, +) -> Result { + // get database types and type options + let (mut db_collab, db_body) = + get_database_body(collab_storage, workspace_uuid_str, database_uuid_str).await?; + + let all_fields = db_body.fields.get_all_fields(&db_collab.transact()); + let field_by_id = all_fields.iter().fold(HashMap::new(), |mut acc, field| { + acc.insert(field.id.clone(), field.clone()); + acc + }); + let type_option_reader_by_id = type_option_writer_by_id(&all_fields); + let field_by_name = field_by_name_uniq(all_fields); + + let new_db_row_id = gen_row_id(); + let mut new_db_row_collab = + Collab::new_with_origin(CollabOrigin::Empty, new_db_row_id.clone(), vec![], false); + + let new_db_row_body = { + let db_row_body = DatabaseRowBody::create( + new_db_row_id.clone(), + &mut new_db_row_collab, + Row::empty(new_db_row_id.clone(), database_uuid_str), + ); + let mut txn = new_db_row_collab.transact_mut(); + + // set last_modified and created_at + db_row_body.update(&mut txn, |row_update| { + row_update + .set_last_modified(Utc::now().timestamp()) + .set_created_at(Utc::now().timestamp()); + }); + + for (id, serde_val) in cell_value_by_id { + let field = match field_by_id.get(&id) { + Some(f) => f, + // try use field name if id not found + None => match field_by_name.get(&id) { + Some(f) => f, + None => { + tracing::warn!( + "field not found: {} for database: {}", + id, + database_uuid_str + ); + continue; + }, + }, + }; + let cell_writer = match type_option_reader_by_id.get(&field.id) { + Some(cell_writer) => cell_writer, + None => { + tracing::error!("Failed to get type option writer for field: {}", field.id); + continue; + }, + }; + let new_cell: Cell = cell_writer.convert_json_to_cell(serde_val); + db_row_body.update(&mut txn, |row_update| { + row_update.update_cells(|cells_update| { + cells_update.insert_cell(&field.id, new_cell); + }); + }); + } + db_row_body + }; + + // Create new row order + let ts_now = chrono::Utc::now().timestamp(); + let row_order = db_body + .create_row(CreateRowParams { + id: new_db_row_id.clone(), + database_id: database_uuid_str.to_string(), + cells: new_db_row_body + .cells(&new_db_row_collab.transact()) + .unwrap_or_default(), + height: 30, + visibility: true, + row_position: OrderObjectPosition::End, + created_at: ts_now, + modified_at: ts_now, + }) + .await + .map_err(|e| AppError::Internal(anyhow::anyhow!("Failed to create row: {:?}", e)))?; + + // Prepare new row collab binary to store in postgres + let db_row_ec_v1 = collab_to_bin(new_db_row_collab, CollabType::DatabaseRow).await?; + + // For each database view, add the new row order + let db_collab_update = { + let mut txn = db_collab.transact_mut(); + let mut db_views = db_body.views.get_all_views(&txn); + for db_view in db_views.iter_mut() { + db_view.row_orders.push(row_order.clone()); + } + db_body.views.clear(&mut txn); + for view in db_views { + db_body.views.insert_view(&mut txn, view); + } + + txn.encode_update_v1() + }; + let updated_db_collab = collab_to_bin(db_collab, CollabType::Database).await?; + + let mut db_txn = pg_pool.begin().await?; + // insert row + collab_storage + .upsert_new_collab_with_transaction( + workspace_uuid_str, + &uid, + CollabParams { + object_id: new_db_row_id.to_string(), + encoded_collab_v1: db_row_ec_v1.into(), + collab_type: CollabType::DatabaseRow, + embeddings: None, + }, + &mut db_txn, + "inserting new database row from server", + ) + .await?; + + // update database + collab_storage + .upsert_new_collab_with_transaction( + workspace_uuid_str, + &uid, + CollabParams { + object_id: database_uuid_str.to_string(), + encoded_collab_v1: updated_db_collab.into(), + collab_type: CollabType::Database, + embeddings: None, + }, + &mut db_txn, + "inserting updated database from server", + ) + .await?; + + db_txn.commit().await?; + broadcast_update(collab_storage, database_uuid_str, db_collab_update).await?; + Ok(new_db_row_id.to_string()) +} + pub async fn get_database_fields( collab_storage: &CollabAccessControlStorage, workspace_uuid_str: &str, @@ -527,6 +653,77 @@ pub async fn get_database_fields( Ok(acc) } +// inserts a new field into the database +// returns the id of the field created +pub async fn add_database_field( + uid: i64, + collab_storage: &CollabAccessControlStorage, + pg_pool: &PgPool, + workspace_id: &str, + database_id: &str, + insert_field: AFInsertDatabaseField, +) -> Result { + let (mut db_collab, db_body) = + get_database_body(collab_storage, workspace_id, database_id).await?; + + let new_id = gen_field_id(); + let mut type_options = TypeOptions::new(); + let type_option_data = insert_field + .type_option_data + .unwrap_or(serde_json::json!({})); + match serde_json::from_value(type_option_data) { + Ok(tod) => type_options.insert(insert_field.field_type.to_string(), tod), + Err(err) => { + return Err(AppError::InvalidRequest(format!( + "Failed to parse type option: {:?}", + err + ))); + }, + }; + + let new_field = Field { + id: new_id.clone(), + name: insert_field.name, + field_type: insert_field.field_type, + type_options, + ..Default::default() + }; + + let db_collab_update = { + let mut yrs_txn = db_collab.transact_mut(); + db_body.create_field( + &mut yrs_txn, + None, + new_field, + &OrderObjectPosition::End, + &HashMap::new(), + ); + yrs_txn.encode_update_v1() + }; + let updated_db_collab = collab_to_bin(db_collab, CollabType::Database).await?; + + let mut pg_txn = pg_pool.begin().await?; + collab_storage + .upsert_new_collab_with_transaction( + workspace_id, + &uid, + CollabParams { + object_id: database_id.to_string(), + encoded_collab_v1: updated_db_collab.into(), + collab_type: CollabType::Database, + embeddings: None, + }, + &mut pg_txn, + "inserting updated database from server", + ) + .await?; + + pg_txn.commit().await?; + broadcast_update(collab_storage, database_id, db_collab_update).await?; + + Ok(new_id) +} + pub async fn list_database_row_ids_updated( collab_storage: &CollabAccessControlStorage, pg_pool: &PgPool, @@ -552,7 +749,23 @@ pub async fn list_database_row_details( workspace_uuid_str: String, database_uuid_str: String, row_ids: &[&str], + unsupported_field_types: &[FieldType], ) -> Result, AppError> { + let (database_collab, db_body) = + get_database_body(collab_storage, &workspace_uuid_str, &database_uuid_str).await?; + + let all_fields: Vec = db_body + .fields + .get_all_fields(&database_collab.transact()) + .into_iter() + .filter(|field| !unsupported_field_types.contains(&FieldType::from(field.field_type))) + .collect(); + if all_fields.is_empty() { + return Ok(vec![]); + } + + let type_option_reader_by_id = type_option_reader_by_id(&all_fields); + let field_by_id = field_by_id_name_uniq(all_fields); let query_collabs: Vec = row_ids .iter() .map(|id| QueryCollab { @@ -560,69 +773,35 @@ pub async fn list_database_row_details( collab_type: CollabType::DatabaseRow, }) .collect(); - - let database_collab = get_latest_collab( - collab_storage, - GetCollabOrigin::User { uid }, - &workspace_uuid_str, - &database_uuid_str, - CollabType::Database, - ) - .await?; - let db_body = DatabaseBody::from_collab( - &database_collab, - Arc::new(NoPersistenceDatabaseCollabService), - None, - ) - .ok_or_else(|| { - AppError::Internal(anyhow::anyhow!( - "Failed to create database body from collab, db_collab_id: {}", - database_uuid_str, - )) - })?; - - // create a map of field id to field. - // ensure that the field name is unique. - // if the field name is repeated, it will be appended with the field id, - // under practical usage circumstances, no other collision should occur - let field_by_id: HashMap = { - let all_fields = db_body.fields.get_all_fields(&database_collab.transact()); - - let mut uniq_name_set: HashSet = HashSet::with_capacity(all_fields.len()); - let mut field_by_id: HashMap = HashMap::with_capacity(all_fields.len()); - - for mut field in all_fields { - // if the name already exists, append the field id to the name - if uniq_name_set.contains(&field.name) { - let new_name = format!("{}-{}", field.name, field.id); - field.name.clone_from(&new_name); - } - uniq_name_set.insert(field.name.clone()); - field_by_id.insert(field.id.clone(), field); - } - field_by_id - }; - - let mut selection_name_by_id: HashMap = HashMap::new(); - for field in field_by_id.values() { - add_to_selection_from_field(&mut selection_name_by_id, field); - } - let database_row_details = collab_storage .batch_get_collab(&uid, &workspace_uuid_str, query_collabs, true) .await .into_iter() .flat_map(|(id, result)| match result { QueryCollabResult::Success { encode_collab_v1 } => { - let ec = EncodedCollab::decode_from_bytes(&encode_collab_v1).unwrap(); + let ec = match EncodedCollab::decode_from_bytes(&encode_collab_v1) { + Ok(ec) => ec, + Err(err) => { + tracing::error!("Failed to decode encoded collab: {:?}", err); + return None; + }, + }; let collab = - Collab::new_with_source(CollabOrigin::Server, &id, ec.into(), vec![], false).unwrap(); - let row_detail = RowDetail::from_collab(&collab).unwrap(); - let cells = convert_database_cells_human_readable( - row_detail.row.cells, - &field_by_id, - &selection_name_by_id, - ); + match Collab::new_with_source(CollabOrigin::Server, &id, ec.into(), vec![], false) { + Ok(collab) => collab, + Err(err) => { + tracing::error!("Failed to create collab: {:?}", err); + return None; + }, + }; + let row_detail = match RowDetail::from_collab(&collab) { + Some(row_detail) => row_detail, + None => { + tracing::error!("Failed to get row detail from collab: {:?}", collab); + return None; + }, + }; + let cells = get_row_details_serde(row_detail, &field_by_id, &type_option_reader_by_id); Some(AFDatabaseRowDetail { id, cells }) }, QueryCollabResult::Failed { error } => { @@ -634,216 +813,3 @@ pub async fn list_database_row_details( Ok(database_row_details) } - -fn convert_database_cells_human_readable( - db_cells: HashMap>, - field_by_id: &HashMap, - selection_name_by_id: &HashMap, -) -> HashMap> { - let mut human_readable_records: HashMap> = - HashMap::with_capacity(db_cells.len()); - - for (field_id, cell) in db_cells { - let field = match field_by_id.get(&field_id) { - Some(field) => field, - None => { - tracing::error!("Failed to get field by id: {}", field_id); - continue; - }, - }; - let field_type = FieldType::from(field.field_type); - - let mut human_readable_cell: HashMap = - HashMap::with_capacity(cell.len()); - for (key, value) in cell { - let serde_value: serde_json::Value = match key.as_str() { - "created_at" | "last_modified" => match value.cast::() { - Ok(timestamp) => chrono::DateTime::from_timestamp(timestamp, 0) - .unwrap_or_default() - .to_rfc3339() - .into(), - Err(err) => { - tracing::error!("Failed to cast timestamp: {:?}", err); - serde_json::Value::Null - }, - }, - "field_type" => format!("{:?}", field_type).into(), - "data" => { - match field_type { - FieldType::DateTime => { - if let yrs::any::Any::String(value_str) = value { - let int_value = value_str.parse::().unwrap_or_default(); - chrono::DateTime::from_timestamp(int_value, 0) - .unwrap_or_default() - .to_rfc3339() - .into() - } else { - serde_json::to_value(value).unwrap_or_default() - } - }, - FieldType::Checklist => { - if let yrs::any::Any::String(value_str) = value { - serde_json::from_str(&value_str).unwrap_or_default() - } else { - serde_json::to_value(value).unwrap_or_default() - } - }, - FieldType::Media => { - if let yrs::any::Any::Array(arr) = value { - let mut acc = Vec::with_capacity(arr.len()); - for v in arr.as_ref() { - if let yrs::any::Any::String(value_str) = v { - let serde_value = serde_json::from_str(value_str).unwrap_or_default(); - acc.push(serde_value); - } - } - serde_json::Value::Array(acc) - } else { - serde_json::to_value(value).unwrap_or_default() - } - }, - FieldType::SingleSelect => { - if let yrs::any::Any::String(ref value_str) = value { - selection_name_by_id - .get(value_str.as_ref()) - .map(|v| v.to_string()) - .map(serde_json::Value::String) - .unwrap_or_else(|| value.to_string().into()) - } else { - serde_json::to_value(value).unwrap_or_default() - } - }, - FieldType::MultiSelect => { - if let yrs::any::Any::String(value_str) = value { - value_str - .split(',') - .filter_map(|v| selection_name_by_id.get(v).map(|v| v.to_string())) - .fold(String::new(), |mut acc, s| { - if !acc.is_empty() { - acc.push(','); - } - acc.push_str(&s); - acc - }) - .into() - } else { - serde_json::to_value(value).unwrap_or_default() - } - }, - // Handle different field types formatting as needed - _ => serde_json::to_value(value).unwrap_or_default(), - } - }, - _ => serde_json::to_value(value).unwrap_or_default(), - }; - human_readable_cell.insert(key, serde_value); - } - human_readable_records.insert(field.name.clone(), human_readable_cell); - } - human_readable_records -} - -fn add_to_selection_from_field(name_by_id: &mut HashMap, field: &Field) { - let field_type = FieldType::from(field.field_type); - match field_type { - FieldType::SingleSelect => { - add_to_selection_from_type_options(name_by_id, &field.type_options, &field_type); - }, - FieldType::MultiSelect => { - add_to_selection_from_type_options(name_by_id, &field.type_options, &field_type) - }, - _ => (), - } -} - -fn add_to_selection_from_type_options( - name_by_id: &mut HashMap, - type_options: &TypeOptions, - field_type: &FieldType, -) { - if let Some(type_opt) = type_options.get(&field_type.type_id()) { - if let Some(yrs::Any::String(arc_str)) = type_opt.get("content") { - if let Ok(serde_value) = serde_json::from_str::(arc_str) { - if let Some(selections) = serde_value.get("options").and_then(|v| v.as_array()) { - for selection in selections { - if let serde_json::Value::Object(selection) = selection { - if let (Some(id), Some(name)) = ( - selection.get("id").and_then(|v| v.as_str()), - selection.get("name").and_then(|v| v.as_str()), - ) { - name_by_id.insert(id.to_owned(), name.to_owned()); - } - } - } - } - } - } - }; -} - -async fn get_database_body( - collab_storage: &CollabAccessControlStorage, - workspace_uuid_str: &str, - database_uuid_str: &str, -) -> Result<(Collab, DatabaseBody), AppError> { - let db_collab = get_latest_collab( - collab_storage, - GetCollabOrigin::Server, - workspace_uuid_str, - database_uuid_str, - CollabType::Database, - ) - .await?; - let db_body = DatabaseBody::from_collab( - &db_collab, - Arc::new(NoPersistenceDatabaseCollabService), - None, - ) - .ok_or_else(|| { - AppError::Internal(anyhow::anyhow!( - "Failed to create database body from collab, db_collab_id: {}", - database_uuid_str, - )) - })?; - Ok((db_collab, db_body)) -} - -pub fn collab_from_doc_state(doc_state: Vec, object_id: &str) -> Result { - let collab = Collab::new_with_source( - CollabOrigin::Server, - object_id, - DataSource::DocStateV1(doc_state), - vec![], - false, - ) - .map_err(|e| AppError::Unhandled(e.to_string()))?; - Ok(collab) -} - -fn type_options_serde( - type_options: &TypeOptions, - field_type: &FieldType, -) -> HashMap { - let type_option = match type_options.get(&field_type.type_id()) { - Some(type_option) => type_option, - None => return HashMap::new(), - }; - - let mut result = HashMap::with_capacity(type_option.len()); - for (key, value) in type_option { - match field_type { - FieldType::SingleSelect | FieldType::MultiSelect | FieldType::Media => { - if let yrs::Any::String(arc_str) = value { - if let Ok(serde_value) = serde_json::from_str::(arc_str) { - result.insert(key.clone(), serde_value); - } - } - }, - _ => { - result.insert(key.clone(), serde_json::to_value(value).unwrap_or_default()); - }, - } - } - - result -} diff --git a/src/biz/collab/utils.rs b/src/biz/collab/utils.rs new file mode 100644 index 000000000..c89bceb7d --- /dev/null +++ b/src/biz/collab/utils.rs @@ -0,0 +1,271 @@ +use app_error::AppError; +use appflowy_collaborate::collab::storage::CollabAccessControlStorage; +use collab::core::collab::DataSource; +use collab::preclude::Collab; +use collab_database::database::DatabaseBody; +use collab_database::entity::FieldType; +use collab_database::fields::type_option_cell_reader; +use collab_database::fields::type_option_cell_writer; +use collab_database::fields::Field; +use collab_database::fields::TypeOptionCellReader; +use collab_database::fields::TypeOptionCellWriter; +use collab_database::fields::TypeOptionData; +use collab_database::fields::TypeOptions; +use collab_database::rows::Cell; +use collab_database::rows::RowDetail; +use collab_database::template::entity::CELL_DATA; +use collab_database::template::timestamp_parse::TimestampCellData; +use collab_database::workspace_database::NoPersistenceDatabaseCollabService; +use collab_entity::CollabType; +use collab_entity::EncodedCollab; +use collab_folder::CollabOrigin; +use database::collab::CollabStorage; +use database::collab::GetCollabOrigin; +use database_entity::dto::QueryCollab; +use database_entity::dto::QueryCollabParams; +use std::collections::HashMap; +use std::collections::HashSet; +use std::sync::Arc; + +pub fn get_row_details_serde( + row_detail: RowDetail, + field_by_id_name_uniq: &HashMap, + type_option_reader_by_id: &HashMap>, +) -> HashMap> { + let mut cells = row_detail.row.cells; + let mut row_details_serde: HashMap> = + HashMap::with_capacity(cells.len()); + for (field_id, field) in field_by_id_name_uniq { + let cell: Cell = match cells.remove(field_id) { + Some(cell) => cell.clone(), + None => { + let field_type = FieldType::from(field.field_type); + match field_type { + FieldType::CreatedTime => { + TimestampCellData::new(Some(row_detail.row.created_at)).to_cell(field_type) + }, + FieldType::LastEditedTime => { + TimestampCellData::new(Some(row_detail.row.modified_at)).to_cell(field_type) + }, + _ => Cell::new(), + } + }, + }; + let cell_value = match type_option_reader_by_id.get(&field.id) { + Some(tor) => tor.json_cell(&cell), + None => { + tracing::error!("Failed to get type option reader by id: {}", field.id); + serde_json::Value::Null + }, + }; + row_details_serde.insert( + field.name.clone(), + HashMap::from([(CELL_DATA.to_string(), cell_value)]), + ); + } + + row_details_serde +} + +/// create a map of field name to field +/// if the field name is repeated, it will be appended with the field id, +pub fn field_by_name_uniq(mut fields: Vec) -> HashMap { + fields.sort_by_key(|a| a.id.clone()); + let mut uniq_name_set: HashSet = HashSet::with_capacity(fields.len()); + let mut field_by_name: HashMap = HashMap::with_capacity(fields.len()); + + for field in fields { + // if the name already exists, append the field id to the name + let name = if uniq_name_set.contains(&field.name) { + format!("{}-{}", field.name, field.id) + } else { + field.name.clone() + }; + uniq_name_set.insert(name.clone()); + field_by_name.insert(name, field); + } + field_by_name +} + +/// create a map of field id to field name, and ensure that the field name is unique. +/// if the field name is repeated, it will be appended with the field id, +/// under practical usage circumstances, no other collision should occur +pub fn field_by_id_name_uniq(mut fields: Vec) -> HashMap { + fields.sort_by_key(|a| a.id.clone()); + let mut uniq_name_set: HashSet = HashSet::with_capacity(fields.len()); + let mut field_by_id: HashMap = HashMap::with_capacity(fields.len()); + + for mut field in fields { + // if the name already exists, append the field id to the name + if uniq_name_set.contains(&field.name) { + let new_name = format!("{}-{}", field.name, field.id); + field.name.clone_from(&new_name); + } + uniq_name_set.insert(field.name.clone()); + field_by_id.insert(field.id.clone(), field); + } + field_by_id +} + +/// create a map type option writer by field id +pub fn type_option_writer_by_id( + fields: &[Field], +) -> HashMap> { + let mut type_option_reader_by_id: HashMap> = + HashMap::with_capacity(fields.len()); + for field in fields { + let field_id: String = field.id.clone(); + let type_option_reader: Box = { + let field_type: &FieldType = &FieldType::from(field.field_type); + let type_option_data: TypeOptionData = match field.get_any_type_option(field_type.type_id()) { + Some(tod) => tod.clone(), + None => HashMap::new(), + }; + type_option_cell_writer(type_option_data, field_type) + }; + type_option_reader_by_id.insert(field_id, type_option_reader); + } + type_option_reader_by_id +} + +/// create a map type option reader by field id +pub fn type_option_reader_by_id( + fields: &[Field], +) -> HashMap> { + let mut type_option_reader_by_id: HashMap> = + HashMap::with_capacity(fields.len()); + for field in fields { + let field_id: String = field.id.clone(); + let type_option_reader: Box = { + let field_type: &FieldType = &FieldType::from(field.field_type); + let type_option_data: TypeOptionData = match field.get_any_type_option(field_type.type_id()) { + Some(tod) => tod.clone(), + None => HashMap::new(), + }; + type_option_cell_reader(type_option_data, field_type) + }; + type_option_reader_by_id.insert(field_id, type_option_reader); + } + type_option_reader_by_id +} + +pub fn type_options_serde( + type_options: &TypeOptions, + field_type: &FieldType, +) -> HashMap { + let type_option = match type_options.get(&field_type.type_id()) { + Some(type_option) => type_option, + None => return HashMap::new(), + }; + + let mut result = HashMap::with_capacity(type_option.len()); + for (key, value) in type_option { + match field_type { + FieldType::SingleSelect | FieldType::MultiSelect | FieldType::Media => { + // Certain type option are stored as stringified JSON + // We need to parse them back to JSON + // e.g. "{ \"key\": \"value\" }" -> { "key": "value" } + if let yrs::Any::String(arc_str) = value { + if let Ok(serde_value) = serde_json::from_str::(arc_str) { + result.insert(key.clone(), serde_value); + } + } + }, + _ => { + result.insert(key.clone(), serde_json::to_value(value).unwrap_or_default()); + }, + } + } + + result +} + +pub async fn get_database_body( + collab_storage: &CollabAccessControlStorage, + workspace_uuid_str: &str, + database_uuid_str: &str, +) -> Result<(Collab, DatabaseBody), AppError> { + let db_collab = get_latest_collab( + collab_storage, + GetCollabOrigin::Server, + workspace_uuid_str, + database_uuid_str, + CollabType::Database, + ) + .await?; + let db_body = DatabaseBody::from_collab( + &db_collab, + Arc::new(NoPersistenceDatabaseCollabService), + None, + ) + .ok_or_else(|| { + AppError::Internal(anyhow::anyhow!( + "Failed to create database body from collab, db_collab_id: {}", + database_uuid_str, + )) + })?; + Ok((db_collab, db_body)) +} + +pub async fn get_latest_collab_encoded( + collab_storage: &CollabAccessControlStorage, + collab_origin: GetCollabOrigin, + workspace_id: &str, + oid: &str, + collab_type: CollabType, +) -> Result { + collab_storage + .get_encode_collab( + collab_origin, + QueryCollabParams { + workspace_id: workspace_id.to_string(), + inner: QueryCollab { + object_id: oid.to_string(), + collab_type, + }, + }, + true, + ) + .await +} + +pub async fn get_latest_collab( + storage: &CollabAccessControlStorage, + origin: GetCollabOrigin, + workspace_id: &str, + oid: &str, + collab_type: CollabType, +) -> Result { + let ec = get_latest_collab_encoded(storage, origin, workspace_id, oid, collab_type).await?; + let collab: Collab = Collab::new_with_source(CollabOrigin::Server, oid, ec.into(), vec![], false) + .map_err(|e| { + AppError::Internal(anyhow::anyhow!( + "Failed to create collab from encoded collab: {:?}", + e + )) + })?; + Ok(collab) +} + +pub async fn collab_to_bin(collab: Collab, collab_type: CollabType) -> Result, AppError> { + tokio::task::spawn_blocking(move || { + let bin = collab + .encode_collab_v1(|collab| collab_type.validate_require_data(collab)) + .map_err(|e| AppError::Unhandled(e.to_string()))? + .encode_to_bytes()?; + Ok(bin) + }) + .await? +} + +pub fn collab_from_doc_state(doc_state: Vec, object_id: &str) -> Result { + let collab = Collab::new_with_source( + CollabOrigin::Server, + object_id, + DataSource::DocStateV1(doc_state), + vec![], + false, + ) + .map_err(|e| AppError::Unhandled(e.to_string()))?; + Ok(collab) +} diff --git a/src/biz/workspace/page_view.rs b/src/biz/workspace/page_view.rs index 3fadeb2b3..739a88d40 100644 --- a/src/biz/workspace/page_view.rs +++ b/src/biz/workspace/page_view.rs @@ -47,11 +47,9 @@ use crate::biz::collab::folder_view::{ parse_extra_field_as_json, to_dto_view_icon, to_dto_view_layout, to_folder_view_icon, to_space_permission, }; -use crate::biz::collab::ops::{collab_from_doc_state, get_latest_workspace_database}; -use crate::biz::collab::{ - folder_view::check_if_view_is_space, - ops::{get_latest_collab_encoded, get_latest_collab_folder}, -}; +use crate::biz::collab::ops::get_latest_workspace_database; +use crate::biz::collab::utils::{collab_from_doc_state, get_latest_collab_encoded}; +use crate::biz::collab::{folder_view::check_if_view_is_space, ops::get_latest_collab_folder}; use super::ops::broadcast_update; @@ -1253,6 +1251,7 @@ pub async fn update_page_collab_data( let encode_collab = collab_access_control_storage .get_encode_collab(GetCollabOrigin::User { uid }, param, true) .await?; + let mut collab = collab_from_doc_state(encode_collab.doc_state.to_vec(), &object_id.to_string())?; appflowy_web_metrics.record_update_size_bytes(doc_state.len()); let update = Update::decode_v1(doc_state).map_err(|e| { diff --git a/src/biz/workspace/publish_dup.rs b/src/biz/workspace/publish_dup.rs index f0929c866..2ec2ce3dc 100644 --- a/src/biz/workspace/publish_dup.rs +++ b/src/biz/workspace/publish_dup.rs @@ -3,7 +3,6 @@ use appflowy_collaborate::collab::storage::CollabAccessControlStorage; use anyhow::anyhow; use bytes::Bytes; -use collab::preclude::Collab; use collab_database::database::gen_row_id; use collab_database::database::DatabaseBody; use collab_database::entity::FieldType; @@ -35,8 +34,7 @@ use std::{collections::HashMap, sync::Arc}; use crate::biz::collab::folder_view::to_folder_view_icon; use crate::biz::collab::folder_view::to_folder_view_layout; -use crate::biz::collab::ops::collab_from_doc_state; -use crate::biz::collab::ops::get_latest_collab_encoded; +use crate::biz::collab::utils::collab_from_doc_state; use tracing::error; use workspace_template::gen_view_id; use yrs::Any; @@ -45,6 +43,9 @@ use yrs::ArrayRef; use yrs::Out; use yrs::{Map, MapRef}; +use crate::biz::collab::utils::collab_to_bin; +use crate::biz::collab::utils::get_latest_collab_encoded; + use super::ops::broadcast_update; #[allow(clippy::too_many_arguments)] @@ -1174,14 +1175,3 @@ fn add_to_view_info(acc: &mut HashMap, view_infos: &[Pu } } } - -async fn collab_to_bin(collab: Collab, collab_type: CollabType) -> Result, AppError> { - tokio::task::spawn_blocking(move || { - let bin = collab - .encode_collab_v1(|collab| collab_type.validate_require_data(collab)) - .map_err(|e| AppError::Unhandled(e.to_string()))? - .encode_to_bytes()?; - Ok(bin) - }) - .await? -} diff --git a/tests/collab/database_crud.rs b/tests/collab/database_crud.rs new file mode 100644 index 000000000..cfaffd3c0 --- /dev/null +++ b/tests/collab/database_crud.rs @@ -0,0 +1,154 @@ +use client_api_test::{generate_unique_registered_user_client, workspace_id_from_client}; +use collab_database::entity::FieldType; +use shared_entity::dto::workspace_dto::AFInsertDatabaseField; + +#[tokio::test] +async fn database_fields_crud() { + let (c, _user) = generate_unique_registered_user_client().await; + let workspace_id = workspace_id_from_client(&c).await; + let databases = c.list_databases(&workspace_id).await.unwrap(); + assert_eq!(databases.len(), 1); + let todo_db = &databases[0]; + + let my_num_field_id = { + c.add_database_field( + &workspace_id, + &todo_db.id, + &AFInsertDatabaseField { + name: "MyNumberColumn".to_string(), + field_type: FieldType::Number.into(), + ..Default::default() + }, + ) + .await + .unwrap() + }; + let my_datetime_field_id = { + c.add_database_field( + &workspace_id, + &todo_db.id, + &AFInsertDatabaseField { + name: "MyDateTimeColumn".to_string(), + field_type: FieldType::DateTime.into(), + ..Default::default() + }, + ) + .await + .unwrap() + }; + let my_url_field_id = { + c.add_database_field( + &workspace_id, + &todo_db.id, + &AFInsertDatabaseField { + name: "MyUrlField".to_string(), + field_type: FieldType::URL.into(), + ..Default::default() + }, + ) + .await + .unwrap() + }; + let my_checkbox_field_id = { + c.add_database_field( + &workspace_id, + &todo_db.id, + &AFInsertDatabaseField { + name: "MyCheckboxColumn".to_string(), + field_type: FieldType::Checkbox.into(), + ..Default::default() + }, + ) + .await + .unwrap() + }; + { + let my_description = "my task 123"; + let my_status = "To Do"; + let new_row_id = c + .add_database_item( + &workspace_id, + &todo_db.id, + &serde_json::json!({ + "Description": my_description, + "Status": my_status, + "Multiselect": ["social", "news"], + my_num_field_id: 123, + my_datetime_field_id: 1733210221, + my_url_field_id: "https://appflowy.io", + my_checkbox_field_id: true, + }), + ) + .await + .unwrap(); + + let row_details = c + .list_database_row_details(&workspace_id, &todo_db.id, &[&new_row_id]) + .await + .unwrap(); + assert_eq!(row_details.len(), 1); + let new_row_detail = &row_details[0]; + assert_eq!(new_row_detail.cells["Description"]["data"], my_description); + assert_eq!(new_row_detail.cells["Status"]["data"], my_status); + assert_eq!(new_row_detail.cells["Multiselect"]["data"][0], "social"); + assert_eq!(new_row_detail.cells["Multiselect"]["data"][1], "news"); + assert_eq!(new_row_detail.cells["MyNumberColumn"]["data"], "123"); + assert_eq!( + new_row_detail.cells["MyDateTimeColumn"]["data"]["timestamp"], + 1733210221 + ); + assert_eq!( + new_row_detail.cells["MyUrlField"]["data"], + "https://appflowy.io" + ); + assert_eq!(new_row_detail.cells["MyCheckboxColumn"]["data"], true); + } +} + +#[tokio::test] +async fn database_fields_unsupported_field_type() { + let (c, _user) = generate_unique_registered_user_client().await; + let workspace_id = workspace_id_from_client(&c).await; + let databases = c.list_databases(&workspace_id).await.unwrap(); + assert_eq!(databases.len(), 1); + let todo_db = &databases[0]; + + let my_rel_field_id = { + c.add_database_field( + &workspace_id, + &todo_db.id, + &AFInsertDatabaseField { + name: "MyRelationCol".to_string(), + field_type: FieldType::Relation.into(), + ..Default::default() + }, + ) + .await + .unwrap() + }; + { + let my_description = "my task 123"; + let my_status = "To Do"; + let new_row_id = c + .add_database_item( + &workspace_id, + &todo_db.id, + &serde_json::json!({ + "Description": my_description, + "Status": my_status, + "Multiselect": ["social", "news"], + my_rel_field_id: "relation_data" + }), + ) + .await + .unwrap(); + + let row_details = c + .list_database_row_details(&workspace_id, &todo_db.id, &[&new_row_id]) + .await + .unwrap(); + assert_eq!(row_details.len(), 1); + let new_row_detail = &row_details[0]; + assert!(!new_row_detail.cells.contains_key("MyRelationCol")); + } +} diff --git a/tests/collab/mod.rs b/tests/collab/mod.rs index b7b380152..c37e150a8 100644 --- a/tests/collab/mod.rs +++ b/tests/collab/mod.rs @@ -1,5 +1,6 @@ mod awareness_test; mod collab_curd_test; +mod database_crud; mod member_crud; mod missing_update_test; mod multi_devices_edit; diff --git a/tests/workspace/publish.rs b/tests/workspace/publish.rs index 56957b955..36489094a 100644 --- a/tests/workspace/publish.rs +++ b/tests/workspace/publish.rs @@ -1,6 +1,6 @@ use app_error::ErrorCode; use appflowy_cloud::biz::collab::folder_view::collab_folder_to_folder_view; -use appflowy_cloud::biz::collab::ops::collab_from_doc_state; +use appflowy_cloud::biz::collab::utils::collab_from_doc_state; use client_api::entity::{ AFRole, GlobalComment, PatchPublishedCollab, PublishCollabItem, PublishCollabMetadata, PublishInfoMeta, diff --git a/tests/workspace/workspace_crud.rs b/tests/workspace/workspace_crud.rs index 822bc85b5..fd41d2778 100644 --- a/tests/workspace/workspace_crud.rs +++ b/tests/workspace/workspace_crud.rs @@ -135,51 +135,6 @@ async fn workspace_list_database() { .await .unwrap(); assert_eq!(db_row_ids.len(), 5, "{:?}", db_row_ids); - { - let db_row_ids: Vec<&str> = db_row_ids.iter().map(|s| s.id.as_str()).collect(); - let db_row_ids: &[&str] = &db_row_ids; - let db_row_details = c - .list_database_row_details(&workspace_id, &todos_db.id, db_row_ids) - .await - .unwrap(); - assert_eq!(db_row_details.len(), 5, "{:#?}", db_row_details); - - // cells: { - // "Multiselect": { - // "field_type": "MultiSelect", - // "last_modified": "2024-08-16T07:23:57+00:00", - // "created_at": "2024-08-16T07:23:35+00:00", - // "data": "looks great,fast", - // }, - // "Description": { - // "field_type": "RichText", - // "last_modified": "2024-08-16T07:17:03+00:00", - // "created_at": "2024-08-16T07:16:51+00:00", - // "data": "Install AppFlowy Mobile", - // }, - // "Status": { - // "data": "To Do", - // "field_type": "SingleSelect", - // }, - // }, - let _ = db_row_details - .into_iter() - .find(|row| { - row.cells["Multiselect"]["field_type"] == "MultiSelect" - && row.cells["Multiselect"]["last_modified"] == "2024-08-16T07:23:57+00:00" - && row.cells["Multiselect"]["created_at"] == "2024-08-16T07:23:35+00:00" - && row.cells["Multiselect"]["data"] == "looks great,fast" - // Description - && row.cells["Description"]["field_type"] == "RichText" - && row.cells["Description"]["last_modified"] == "2024-08-16T07:17:03+00:00" - && row.cells["Description"]["created_at"] == "2024-08-16T07:16:51+00:00" - && row.cells["Description"]["data"] == "Install AppFlowy Mobile" - // Status - && row.cells["Status"]["data"] == "To Do" - && row.cells["Status"]["field_type"] == "SingleSelect" - }) - .unwrap(); - } } } }