From c0f7b1d3a9de1ea6a47bce0b2bdb03c5a4181e7b Mon Sep 17 00:00:00 2001 From: Zack Date: Wed, 18 Dec 2024 13:47:20 +0800 Subject: [PATCH 1/7] feat: upsert database row (#1067) * feat: upsert database row * feat: improve api to accept database row document payload * feat: test case for upsert * chore: refactor * feat: database row doc content impl * refactor: simplify insert row field names * feat: folder updates for database row document * refactor: simplify row detail return * feat: add doc contents for database row detail * feat: refactor database row creation * feat: upsert database row doc modification * feat: ignore empty string * feat: add document similarity check * chore: refactor to use broadcast with timeout * feat: wrap broadcast timeout in a tokio spawn --- Cargo.lock | 23 +- Cargo.toml | 23 +- libs/client-api/src/http_collab.rs | 44 +- libs/shared-entity/src/dto/workspace_dto.rs | 29 +- src/api/workspace.rs | 66 ++- src/biz/access_request/ops.rs | 6 +- src/biz/collab/ops.rs | 473 +++++++++++++++----- src/biz/collab/utils.rs | 293 +++++++++++- src/biz/workspace/ops.rs | 30 +- src/biz/workspace/page_view.rs | 6 +- src/biz/workspace/publish.rs | 2 +- src/biz/workspace/publish_dup.rs | 21 +- tests/collab/database_crud.rs | 236 ++++++++-- 13 files changed, 1050 insertions(+), 202 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 54063c008..483b5d9e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1902,9 +1902,9 @@ checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" [[package]] name = "chrono" -version = "0.4.38" +version = "0.4.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401" +checksum = "7e36cc9d416881d2e24f9a963be5fb1cd90966419ac844274161d10488b3e825" dependencies = [ "android-tzdata", "iana-time-zone", @@ -2137,7 +2137,7 @@ dependencies = [ [[package]] name = "collab" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=33cca67554c29b1a3821df6faf5423cff2cd5db3#33cca67554c29b1a3821df6faf5423cff2cd5db3" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2443178e4249354c094867875f300cc924cbe0e2#2443178e4249354c094867875f300cc924cbe0e2" dependencies = [ "anyhow", "arc-swap", @@ -2162,7 +2162,7 @@ dependencies = [ [[package]] name = "collab-database" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=33cca67554c29b1a3821df6faf5423cff2cd5db3#33cca67554c29b1a3821df6faf5423cff2cd5db3" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2443178e4249354c094867875f300cc924cbe0e2#2443178e4249354c094867875f300cc924cbe0e2" dependencies = [ "anyhow", "async-trait", @@ -2176,6 +2176,7 @@ dependencies = [ "fancy-regex 0.13.0", "futures", "getrandom 0.2.15", + "iana-time-zone", "js-sys", "lazy_static", "nanoid", @@ -2201,7 +2202,7 @@ dependencies = [ [[package]] name = "collab-document" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=33cca67554c29b1a3821df6faf5423cff2cd5db3#33cca67554c29b1a3821df6faf5423cff2cd5db3" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2443178e4249354c094867875f300cc924cbe0e2#2443178e4249354c094867875f300cc924cbe0e2" dependencies = [ "anyhow", "arc-swap", @@ -2222,7 +2223,7 @@ dependencies = [ [[package]] name = "collab-entity" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=33cca67554c29b1a3821df6faf5423cff2cd5db3#33cca67554c29b1a3821df6faf5423cff2cd5db3" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2443178e4249354c094867875f300cc924cbe0e2#2443178e4249354c094867875f300cc924cbe0e2" dependencies = [ "anyhow", "bytes", @@ -2242,7 +2243,7 @@ dependencies = [ [[package]] name = "collab-folder" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=33cca67554c29b1a3821df6faf5423cff2cd5db3#33cca67554c29b1a3821df6faf5423cff2cd5db3" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2443178e4249354c094867875f300cc924cbe0e2#2443178e4249354c094867875f300cc924cbe0e2" dependencies = [ "anyhow", "arc-swap", @@ -2264,7 +2265,7 @@ dependencies = [ [[package]] name = "collab-importer" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=33cca67554c29b1a3821df6faf5423cff2cd5db3#33cca67554c29b1a3821df6faf5423cff2cd5db3" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2443178e4249354c094867875f300cc924cbe0e2#2443178e4249354c094867875f300cc924cbe0e2" dependencies = [ "anyhow", "async-recursion", @@ -2367,7 +2368,7 @@ dependencies = [ [[package]] name = "collab-user" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=33cca67554c29b1a3821df6faf5423cff2cd5db3#33cca67554c29b1a3821df6faf5423cff2cd5db3" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2443178e4249354c094867875f300cc924cbe0e2#2443178e4249354c094867875f300cc924cbe0e2" dependencies = [ "anyhow", "collab", @@ -4034,9 +4035,9 @@ dependencies = [ [[package]] name = "iana-time-zone" -version = "0.1.60" +version = "0.1.61" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7ffbb5a1b541ea2561f8c41c087286cc091e21e556a4f09a8f6cbf17b69b141" +checksum = "235e081f3925a06703c2d0117ea8b91f042756fd6e7a6e5d901e8ca1a996b220" dependencies = [ "android_system_properties", "core-foundation-sys", diff --git a/Cargo.toml b/Cargo.toml index 702a78f99..dde1cd013 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,10 +40,7 @@ tokio-stream.workspace = true tokio-util = { version = "0.7.10", features = ["io"] } futures-util = { workspace = true, features = ["std", "io"] } once_cell = "1.19.0" -chrono = { version = "0.4.37", features = [ - "serde", - "clock", -], default-features = false } +chrono.workspace = true derive_more = { version = "0.99" } secrecy.workspace = true rand = { version = "0.8", features = ["std_rng"] } @@ -286,6 +283,10 @@ md5 = "0.7.0" pin-project = "1.1.5" validator = "0.19" zstd = { version = "0.13.2", features = [] } +chrono = { version = "0.4.39", features = [ + "serde", + "clock", +], default-features = false } # collaboration yrs = { version = "0.21.3", features = ["sync"] } @@ -315,13 +316,13 @@ lto = false [patch.crates-io] # It's diffcult to resovle different version with the same crate used in AppFlowy Frontend and the Client-API crate. # So using patch to workaround this issue. -collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "33cca67554c29b1a3821df6faf5423cff2cd5db3" } -collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "33cca67554c29b1a3821df6faf5423cff2cd5db3" } -collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "33cca67554c29b1a3821df6faf5423cff2cd5db3" } -collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "33cca67554c29b1a3821df6faf5423cff2cd5db3" } -collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "33cca67554c29b1a3821df6faf5423cff2cd5db3" } -collab-database = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "33cca67554c29b1a3821df6faf5423cff2cd5db3" } -collab-importer = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "33cca67554c29b1a3821df6faf5423cff2cd5db3" } +collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2443178e4249354c094867875f300cc924cbe0e2" } +collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2443178e4249354c094867875f300cc924cbe0e2" } +collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2443178e4249354c094867875f300cc924cbe0e2" } +collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2443178e4249354c094867875f300cc924cbe0e2" } +collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2443178e4249354c094867875f300cc924cbe0e2" } +collab-database = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2443178e4249354c094867875f300cc924cbe0e2" } +collab-importer = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2443178e4249354c094867875f300cc924cbe0e2" } [features] history = [] diff --git a/libs/client-api/src/http_collab.rs b/libs/client-api/src/http_collab.rs index f4f20aa5b..cccb3e3ce 100644 --- a/libs/client-api/src/http_collab.rs +++ b/libs/client-api/src/http_collab.rs @@ -7,7 +7,8 @@ use bytes::Bytes; use chrono::{DateTime, Utc}; use client_api_entity::workspace_dto::{ AFDatabase, AFDatabaseField, AFDatabaseRow, AFDatabaseRowDetail, AFInsertDatabaseField, - DatabaseRowUpdatedItem, ListDatabaseRowDetailParam, ListDatabaseRowUpdatedParam, + AddDatatabaseRow, DatabaseRowUpdatedItem, ListDatabaseRowDetailParam, + ListDatabaseRowUpdatedParam, UpsertDatatabaseRow, }; use client_api_entity::{ AFCollabEmbedInfo, BatchQueryCollabParams, BatchQueryCollabResult, CollabParams, @@ -24,6 +25,7 @@ use reqwest::{Body, Method}; use serde::Serialize; use shared_entity::dto::workspace_dto::{CollabResponse, CollabTypeParam, EmbeddedCollabQuery}; use shared_entity::response::{AppResponse, AppResponseError}; +use std::collections::HashMap; use std::future::Future; use std::io::Cursor; use std::pin::Pin; @@ -262,6 +264,7 @@ impl Client { workspace_id: &str, database_id: &str, row_ids: &[&str], + with_doc: bool, ) -> Result, AppResponseError> { let url = format!( "{}/api/workspace/{}/database/{}/row/detail", @@ -270,7 +273,7 @@ impl Client { let resp = self .http_client_with_auth(Method::GET, &url) .await? - .query(&ListDatabaseRowDetailParam::from(row_ids)) + .query(&ListDatabaseRowDetailParam::new(row_ids, with_doc)) .send() .await?; log_request_id(&resp); @@ -287,7 +290,8 @@ impl Client { &self, workspace_id: &str, database_id: &str, - payload: &serde_json::Value, + cells_by_id: HashMap, + row_doc_content: Option, ) -> Result { let url = format!( "{}/api/workspace/{}/database/{}/row", @@ -296,7 +300,39 @@ impl Client { let resp = self .http_client_with_auth(Method::POST, &url) .await? - .json(&payload) + .json(&AddDatatabaseRow { + cells: cells_by_id, + document: row_doc_content, + }) + .send() + .await?; + log_request_id(&resp); + AppResponse::from_response(resp).await?.into_data() + } + + /// Like [add_database_item], but use a [pre_hash] as identifier of the row + /// Given the same `pre_hash` value will result in the same row + /// Creates the row if now exists, else row will be modified + pub async fn upsert_database_item( + &self, + workspace_id: &str, + database_id: &str, + pre_hash: String, + cells_by_id: HashMap, + row_doc_content: Option, + ) -> Result { + let url = format!( + "{}/api/workspace/{}/database/{}/row", + self.base_url, workspace_id, database_id + ); + let resp = self + .http_client_with_auth(Method::PUT, &url) + .await? + .json(&UpsertDatatabaseRow { + pre_hash, + cells: cells_by_id, + document: row_doc_content, + }) .send() .await?; log_request_id(&resp); diff --git a/libs/shared-entity/src/dto/workspace_dto.rs b/libs/shared-entity/src/dto/workspace_dto.rs index f8becc835..0597fbf6a 100644 --- a/libs/shared-entity/src/dto/workspace_dto.rs +++ b/libs/shared-entity/src/dto/workspace_dto.rs @@ -342,6 +342,9 @@ pub struct ListDatabaseRowDetailParam { // Comma separated database row ids // e.g. ",," pub ids: String, + // if set to true, document data will be fetched (if exist) + // as markdown + pub with_doc: Option, } #[derive(Default, Debug, Deserialize, Serialize)] @@ -356,8 +359,11 @@ pub struct DatabaseRowUpdatedItem { } impl ListDatabaseRowDetailParam { - pub fn from(ids: &[&str]) -> Self { - Self { ids: ids.join(",") } + pub fn new(ids: &[&str], with_doc: bool) -> Self { + Self { + ids: ids.join(","), + with_doc: Some(with_doc), + } } pub fn into_ids(&self) -> Vec<&str> { self.ids.split(',').collect() @@ -396,7 +402,11 @@ pub struct AFDatabaseRow { #[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct AFDatabaseRowDetail { pub id: String, - pub cells: HashMap>, + // database field id -> cell data + pub cells: HashMap, + pub has_doc: bool, + /// available if rows has doc and client request for it in [ListDatabaseRowDetailParam] + pub doc: Option, } #[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] @@ -414,3 +424,16 @@ pub struct AFInsertDatabaseField { pub field_type: i64, // FieldType ID pub type_option_data: Option, // TypeOptionData } + +#[derive(Clone, Serialize, Deserialize)] +pub struct AddDatatabaseRow { + pub cells: HashMap, + pub document: Option, +} + +#[derive(Clone, Serialize, Deserialize)] +pub struct UpsertDatatabaseRow { + pub pre_hash: String, // input which will be hashed into database row id + pub cells: HashMap, + pub document: Option, +} diff --git a/src/api/workspace.rs b/src/api/workspace.rs index c41e76a5b..1b7565e8e 100644 --- a/src/api/workspace.rs +++ b/src/api/workspace.rs @@ -48,11 +48,11 @@ use database_entity::dto::PublishInfo; use database_entity::dto::*; use prost::Message as ProstMessage; use rayon::prelude::*; +use sha2::{Digest, Sha256}; use shared_entity::dto::workspace_dto::*; use shared_entity::response::AppResponseError; use shared_entity::response::{AppResponse, JsonAppResponse}; use sqlx::types::uuid; -use std::collections::HashMap; use std::io::Cursor; use std::time::Instant; use tokio_stream::StreamExt; @@ -284,7 +284,8 @@ pub fn workspace_scope() -> Scope { .service( web::resource("/{workspace_id}/database/{database_id}/row") .route(web::get().to(list_database_row_id_handler)) - .route(web::post().to(post_database_row_handler)), + .route(web::post().to(post_database_row_handler)) + .route(web::put().to(put_database_row_handler)), ) .service( web::resource("/{workspace_id}/database/{database_id}/fields") @@ -2017,7 +2018,7 @@ async fn post_database_row_handler( user_uuid: UserUuid, path_param: web::Path<(String, String)>, state: Data, - cells_by_id: Json>, + add_database_row: Json, ) -> Result>> { let (workspace_id, db_id) = path_param.into_inner(); let uid = state.user_cache.get_user_uid(&user_uuid).await?; @@ -2026,18 +2027,69 @@ async fn post_database_row_handler( .enforce_action(&uid, &workspace_id, Action::Write) .await?; + let AddDatatabaseRow { cells, document } = add_database_row.into_inner(); + let new_db_row_id = biz::collab::ops::insert_database_row( - &state.collab_access_control_storage, + state.collab_access_control_storage.clone(), &state.pg_pool, &workspace_id, &db_id, uid, - cells_by_id.into_inner(), + None, + cells, + document, ) .await?; Ok(Json(AppResponse::Ok().with_data(new_db_row_id))) } +async fn put_database_row_handler( + user_uuid: UserUuid, + path_param: web::Path<(String, String)>, + state: Data, + upsert_db_row: Json, +) -> Result>> { + let (workspace_id, db_id) = path_param.into_inner(); + let uid = state.user_cache.get_user_uid(&user_uuid).await?; + state + .workspace_access_control + .enforce_action(&uid, &workspace_id, Action::Write) + .await?; + + let UpsertDatatabaseRow { + pre_hash, + cells, + document, + } = upsert_db_row.into_inner(); + + let row_id = { + let mut hasher = Sha256::new(); + hasher.update(&workspace_id); + hasher.update(&db_id); + hasher.update(pre_hash); + let hash = hasher.finalize(); + Uuid::from_bytes([ + // take 16 out of 32 bytes + hash[0], hash[1], hash[2], hash[3], hash[4], hash[5], hash[6], hash[7], hash[8], hash[9], + hash[10], hash[11], hash[12], hash[13], hash[14], hash[15], + ]) + }; + let row_id_str = row_id.to_string(); + + biz::collab::ops::upsert_database_row( + state.collab_access_control_storage.clone(), + &state.pg_pool, + &workspace_id, + &db_id, + uid, + &row_id_str, + cells, + document, + ) + .await?; + Ok(Json(AppResponse::Ok().with_data(row_id_str))) +} + async fn get_database_fields_handler( user_uuid: UserUuid, path_param: web::Path<(String, String)>, @@ -2075,7 +2127,7 @@ async fn post_database_fields_handler( let field_id = biz::collab::ops::add_database_field( uid, - &state.collab_access_control_storage, + state.collab_access_control_storage.clone(), &state.pg_pool, &workspace_id, &db_id, @@ -2125,6 +2177,7 @@ async fn list_database_row_details_handler( let (workspace_id, db_id) = path_param.into_inner(); let uid = state.user_cache.get_user_uid(&user_uuid).await?; let list_db_row_query = param.into_inner(); + let with_doc = list_db_row_query.with_doc.unwrap_or_default(); let row_ids = list_db_row_query.into_ids(); if let Err(e) = Uuid::parse_str(&workspace_id) { @@ -2156,6 +2209,7 @@ async fn list_database_row_details_handler( db_id, &row_ids, UNSUPPORTED_FIELD_TYPES, + with_doc, ) .await?; Ok(Json(AppResponse::Ok().with_data(db_rows))) diff --git a/src/biz/access_request/ops.rs b/src/biz/access_request/ops.rs index eaef98183..44041bc0d 100644 --- a/src/biz/access_request/ops.rs +++ b/src/biz/access_request/ops.rs @@ -1,12 +1,10 @@ use std::ops::DerefMut; use std::sync::Arc; +use crate::biz::collab::utils::get_latest_collab_folder; use crate::mailer::AFCloudMailer; use crate::{ - biz::collab::{ - folder_view::{to_dto_view_icon, to_dto_view_layout}, - ops::get_latest_collab_folder, - }, + biz::collab::folder_view::{to_dto_view_icon, to_dto_view_layout}, mailer::{WorkspaceAccessRequestApprovedMailerParam, WorkspaceAccessRequestMailerParam}, }; use access_control::workspace::WorkspaceAccessControl; diff --git a/src/biz/collab/ops.rs b/src/biz/collab/ops.rs index 58c92396b..851988bec 100644 --- a/src/biz/collab/ops.rs +++ b/src/biz/collab/ops.rs @@ -11,18 +11,21 @@ use collab_database::database::gen_row_id; use collab_database::entity::FieldType; use collab_database::fields::Field; use collab_database::fields::TypeOptions; -use collab_database::rows::Cell; +use collab_database::rows::meta_id_from_row_id; use collab_database::rows::CreateRowParams; use collab_database::rows::DatabaseRowBody; use collab_database::rows::Row; use collab_database::rows::RowDetail; +use collab_database::rows::RowId; +use collab_database::rows::RowMetaKey; use collab_database::views::OrderObjectPosition; use collab_database::workspace_database::WorkspaceDatabase; use collab_database::workspace_database::WorkspaceDatabaseBody; +use collab_document::document::Document; use collab_entity::CollabType; use collab_entity::EncodedCollab; +use collab_folder::CollabOrigin; use collab_folder::SectionItem; -use collab_folder::{CollabOrigin, Folder}; use database::collab::select_last_updated_database_row_ids; use database::collab::select_workspace_database_oid; use database::collab::{CollabStorage, GetCollabOrigin}; @@ -43,9 +46,10 @@ use shared_entity::dto::workspace_dto::RecentFolderView; use shared_entity::dto::workspace_dto::TrashFolderView; use sqlx::PgPool; use std::ops::DerefMut; +use yrs::Map; -use crate::biz::collab::utils::field_by_name_uniq; -use crate::biz::workspace::ops::broadcast_update; +use crate::biz::collab::utils::get_database_row_doc_changes; +use crate::biz::workspace::ops::broadcast_update_with_timeout; use access_control::collab::CollabAccessControl; use anyhow::Context; use database_entity::dto::{ @@ -65,14 +69,19 @@ use super::folder_view::to_dto_folder_view_miminal; use super::publish_outline::collab_folder_to_published_outline; use super::utils::collab_from_doc_state; use super::utils::collab_to_bin; +use super::utils::create_row_document; use super::utils::field_by_id_name_uniq; -use super::utils::get_database_body; use super::utils::get_latest_collab; +use super::utils::get_latest_collab_database_body; +use super::utils::get_latest_collab_database_row_body; use super::utils::get_latest_collab_encoded; +use super::utils::get_latest_collab_folder; use super::utils::get_row_details_serde; use super::utils::type_option_reader_by_id; -use super::utils::type_option_writer_by_id; use super::utils::type_options_serde; +use super::utils::write_to_database_row; +use super::utils::CreatedRowDocument; +use super::utils::DocChanges; /// Create a new collab member /// If the collab member already exists, return [AppError::RecordAlreadyExists] @@ -343,36 +352,6 @@ pub async fn get_latest_workspace_database( Ok((workspace_database_oid, workspace_database)) } -pub async fn get_latest_collab_folder( - collab_storage: &CollabAccessControlStorage, - collab_origin: GetCollabOrigin, - workspace_id: &str, -) -> Result { - let folder_uid = if let GetCollabOrigin::User { uid } = collab_origin { - uid - } else { - // Dummy uid to open the collab folder if the request does not originate from user - 0 - }; - let encoded_collab = get_latest_collab_encoded( - collab_storage, - collab_origin, - workspace_id, - workspace_id, - CollabType::Folder, - ) - .await?; - let folder = Folder::from_collab_doc_state( - folder_uid, - CollabOrigin::Server, - encoded_collab.into(), - workspace_id, - vec![], - ) - .map_err(|e| AppError::Unhandled(e.to_string()))?; - Ok(folder) -} - pub async fn get_published_view( collab_storage: &CollabAccessControlStorage, publish_namespace: String, @@ -459,7 +438,7 @@ pub async fn list_database_row_ids( database_uuid_str: &str, ) -> Result, AppError> { let (db_collab, db_body) = - get_database_body(collab_storage, workspace_uuid_str, database_uuid_str).await?; + get_latest_collab_database_body(collab_storage, workspace_uuid_str, database_uuid_str).await?; // get any view_id let txn = db_collab.transact(); let iid = db_body.get_inline_view_id(&txn); @@ -479,80 +458,77 @@ pub async fn list_database_row_ids( Ok(db_rows) } +#[allow(clippy::too_many_arguments)] pub async fn insert_database_row( - collab_storage: &CollabAccessControlStorage, + collab_storage: Arc, pg_pool: &PgPool, workspace_uuid_str: &str, database_uuid_str: &str, uid: i64, + new_db_row_id: Option<&str>, cell_value_by_id: HashMap, + row_doc_content: Option, ) -> Result { - // get database types and type options - let (mut db_collab, db_body) = - get_database_body(collab_storage, workspace_uuid_str, database_uuid_str).await?; + let new_db_row_id: RowId = new_db_row_id + .map(|id| RowId::from(id.to_string())) + .unwrap_or_else(gen_row_id); - let all_fields = db_body.fields.get_all_fields(&db_collab.transact()); - let field_by_id = all_fields.iter().fold(HashMap::new(), |mut acc, field| { - acc.insert(field.id.clone(), field.clone()); - acc - }); - let type_option_reader_by_id = type_option_writer_by_id(&all_fields); - let field_by_name = field_by_name_uniq(all_fields); + let creation_time = Utc::now(); - let new_db_row_id = gen_row_id(); let mut new_db_row_collab = Collab::new_with_origin(CollabOrigin::Empty, new_db_row_id.clone(), vec![], false); + let new_db_row_body = DatabaseRowBody::create( + new_db_row_id.clone(), + &mut new_db_row_collab, + Row::empty(new_db_row_id.clone(), database_uuid_str), + ); + new_db_row_body.update(&mut new_db_row_collab.transact_mut(), |row_update| { + row_update.set_created_at(Utc::now().timestamp()); + }); - let new_db_row_body = { - let db_row_body = DatabaseRowBody::create( - new_db_row_id.clone(), - &mut new_db_row_collab, - Row::empty(new_db_row_id.clone(), database_uuid_str), - ); - let mut txn = new_db_row_collab.transact_mut(); - - // set last_modified and created_at - db_row_body.update(&mut txn, |row_update| { - row_update - .set_last_modified(Utc::now().timestamp()) - .set_created_at(Utc::now().timestamp()); - }); - - for (id, serde_val) in cell_value_by_id { - let field = match field_by_id.get(&id) { - Some(f) => f, - // try use field name if id not found - None => match field_by_name.get(&id) { - Some(f) => f, - None => { - tracing::warn!( - "field not found: {} for database: {}", - id, - database_uuid_str - ); - continue; - }, - }, - }; - let cell_writer = match type_option_reader_by_id.get(&field.id) { - Some(cell_writer) => cell_writer, - None => { - tracing::error!("Failed to get type option writer for field: {}", field.id); - continue; - }, - }; - let new_cell: Cell = cell_writer.convert_json_to_cell(serde_val); - db_row_body.update(&mut txn, |row_update| { - row_update.update_cells(|cells_update| { - cells_update.insert_cell(&field.id, new_cell); - }); - }); - } - db_row_body + let new_row_doc_creation: Option<(String, CreatedRowDocument)> = match row_doc_content { + Some(row_doc_content) if !row_doc_content.is_empty() => { + // update row to indicate that the document is not empty + let is_document_empty_id = + meta_id_from_row_id(&new_db_row_id.parse()?, RowMetaKey::IsDocumentEmpty); + new_db_row_body.get_meta().insert( + &mut new_db_row_collab.transact_mut(), + is_document_empty_id, + false, + ); + + // get document id + let new_doc_id = new_db_row_body + .document_id(&new_db_row_collab.transact()) + .map_err(|err| AppError::Internal(anyhow::anyhow!("Failed to get document id: {:?}", err)))? + .ok_or_else(|| AppError::Internal(anyhow::anyhow!("Failed to get document id")))?; + + let created_row_doc = create_row_document( + workspace_uuid_str, + uid, + &new_doc_id, + &collab_storage, + row_doc_content, + ) + .await?; + Some((new_doc_id, created_row_doc)) + }, + _ => None, }; + let (mut db_collab, db_body) = + get_latest_collab_database_body(&collab_storage, workspace_uuid_str, database_uuid_str).await?; + write_to_database_row( + &db_body, + &mut new_db_row_collab.transact_mut(), + &new_db_row_body, + cell_value_by_id, + creation_time.timestamp(), + ) + .await?; + // Create new row order - let ts_now = chrono::Utc::now().timestamp(); + let ts_now = creation_time.timestamp(); let row_order = db_body .create_row(CreateRowParams { id: new_db_row_id.clone(), @@ -569,8 +545,7 @@ pub async fn insert_database_row( .await .map_err(|e| AppError::Internal(anyhow::anyhow!("Failed to create row: {:?}", e)))?; - // Prepare new row collab binary to store in postgres - let db_row_ec_v1 = collab_to_bin(new_db_row_collab, CollabType::DatabaseRow).await?; + let new_db_row_ec_v1 = collab_to_bin(new_db_row_collab, CollabType::DatabaseRow).await?; // For each database view, add the new row order let db_collab_update = { @@ -589,6 +564,46 @@ pub async fn insert_database_row( let updated_db_collab = collab_to_bin(db_collab, CollabType::Database).await?; let mut db_txn = pg_pool.begin().await?; + + // handle row document (if provided) + if let Some((doc_id, created_doc)) = new_row_doc_creation { + // insert document + collab_storage + .upsert_new_collab_with_transaction( + workspace_uuid_str, + &uid, + CollabParams { + object_id: doc_id, + encoded_collab_v1: created_doc.doc_ec_bytes.into(), + collab_type: CollabType::Document, + }, + &mut db_txn, + "inserting new database row document from server", + ) + .await?; + + // update folder and broadcast + collab_storage + .upsert_new_collab_with_transaction( + workspace_uuid_str, + &uid, + CollabParams { + object_id: workspace_uuid_str.to_string(), + encoded_collab_v1: created_doc.updated_folder.into(), + collab_type: CollabType::Folder, + }, + &mut db_txn, + "inserting updated folder from server", + ) + .await?; + broadcast_update_with_timeout( + collab_storage.clone(), + workspace_uuid_str.to_string(), + created_doc.folder_updates, + ) + .await; + }; + // insert row collab_storage .upsert_new_collab_with_transaction( @@ -596,7 +611,7 @@ pub async fn insert_database_row( &uid, CollabParams { object_id: new_db_row_id.to_string(), - encoded_collab_v1: db_row_ec_v1.into(), + encoded_collab_v1: new_db_row_ec_v1.into(), collab_type: CollabType::DatabaseRow, }, &mut db_txn, @@ -620,17 +635,177 @@ pub async fn insert_database_row( .await?; db_txn.commit().await?; - broadcast_update(collab_storage, database_uuid_str, db_collab_update).await?; + broadcast_update_with_timeout( + collab_storage, + database_uuid_str.to_string(), + db_collab_update, + ) + .await; Ok(new_db_row_id.to_string()) } +#[allow(clippy::too_many_arguments)] +pub async fn upsert_database_row( + collab_storage: Arc, + pg_pool: &PgPool, + workspace_uuid_str: &str, + database_uuid_str: &str, + uid: i64, + row_id: &str, + cell_value_by_id: HashMap, + row_doc_content: Option, +) -> Result<(), AppError> { + let (mut db_row_collab, db_row_body) = + match get_latest_collab_database_row_body(&collab_storage, workspace_uuid_str, row_id).await { + Ok(res) => res, + Err(err) => match err { + AppError::RecordNotFound(_) => { + return insert_database_row( + collab_storage, + pg_pool, + workspace_uuid_str, + database_uuid_str, + uid, + Some(row_id), + cell_value_by_id, + row_doc_content, + ) + .await + .map(|_id| {}); + }, + _ => return Err(err), + }, + }; + + // At this point, db row exists, + // so we modify it, put into storage and broadcast change + let (_db_collab, db_body) = + get_latest_collab_database_body(&collab_storage, workspace_uuid_str, database_uuid_str).await?; + let mut db_row_txn = db_row_collab.transact_mut(); + write_to_database_row( + &db_body, + &mut db_row_txn, + &db_row_body, + cell_value_by_id, + Utc::now().timestamp(), + ) + .await?; + + // determine if there are any document changes + let doc_changes: Option<(String, DocChanges)> = get_database_row_doc_changes( + &collab_storage, + workspace_uuid_str, + row_doc_content, + &db_row_body, + &mut db_row_txn, + row_id, + uid, + ) + .await?; + + // finalize update for database row + let db_row_collab_updates = db_row_txn.encode_update_v1(); + drop(db_row_txn); + let db_row_ec_v1 = collab_to_bin(db_row_collab, CollabType::DatabaseRow).await?; + + // write to disk and broadcast changes + let mut db_txn = pg_pool.begin().await?; + collab_storage + .upsert_new_collab_with_transaction( + workspace_uuid_str, + &uid, + CollabParams { + object_id: row_id.to_string(), + encoded_collab_v1: db_row_ec_v1.into(), + collab_type: CollabType::DatabaseRow, + }, + &mut db_txn, + "inserting new database row from server", + ) + .await?; + broadcast_update_with_timeout( + collab_storage.clone(), + row_id.to_string(), + db_row_collab_updates, + ) + .await; + + // handle document changes + if let Some((doc_id, doc_changes)) = doc_changes { + match doc_changes { + DocChanges::Update(updated_doc, doc_update) => { + collab_storage + .upsert_new_collab_with_transaction( + workspace_uuid_str, + &uid, + CollabParams { + object_id: doc_id.clone(), + encoded_collab_v1: updated_doc.into(), + collab_type: CollabType::Document, + }, + &mut db_txn, + "updating database row document from server", + ) + .await?; + broadcast_update_with_timeout(collab_storage, doc_id, doc_update).await; + }, + DocChanges::Insert(created_doc) => { + let CreatedRowDocument { + updated_folder, + folder_updates, + doc_ec_bytes, + } = created_doc; + + // insert document + collab_storage + .upsert_new_collab_with_transaction( + workspace_uuid_str, + &uid, + CollabParams { + object_id: doc_id, + encoded_collab_v1: doc_ec_bytes.into(), + collab_type: CollabType::Document, + }, + &mut db_txn, + "inserting new database row document from server", + ) + .await?; + + // update folder and broadcast + collab_storage + .upsert_new_collab_with_transaction( + workspace_uuid_str, + &uid, + CollabParams { + object_id: workspace_uuid_str.to_string(), + encoded_collab_v1: updated_folder.into(), + collab_type: CollabType::Folder, + }, + &mut db_txn, + "inserting updated folder from server", + ) + .await?; + broadcast_update_with_timeout( + collab_storage, + workspace_uuid_str.to_string(), + folder_updates, + ) + .await; + }, + } + } + + db_txn.commit().await?; + Ok(()) +} + pub async fn get_database_fields( collab_storage: &CollabAccessControlStorage, workspace_uuid_str: &str, database_uuid_str: &str, ) -> Result, AppError> { let (db_collab, db_body) = - get_database_body(collab_storage, workspace_uuid_str, database_uuid_str).await?; + get_latest_collab_database_body(collab_storage, workspace_uuid_str, database_uuid_str).await?; let all_fields = db_body.fields.get_all_fields(&db_collab.transact()); let mut acc = Vec::with_capacity(all_fields.len()); @@ -651,14 +826,14 @@ pub async fn get_database_fields( // returns the id of the field created pub async fn add_database_field( uid: i64, - collab_storage: &CollabAccessControlStorage, + collab_storage: Arc, pg_pool: &PgPool, workspace_id: &str, database_id: &str, insert_field: AFInsertDatabaseField, ) -> Result { let (mut db_collab, db_body) = - get_database_body(collab_storage, workspace_id, database_id).await?; + get_latest_collab_database_body(&collab_storage, workspace_id, database_id).await?; let new_id = gen_field_id(); let mut type_options = TypeOptions::new(); @@ -712,7 +887,7 @@ pub async fn add_database_field( .await?; pg_txn.commit().await?; - broadcast_update(collab_storage, database_id, db_collab_update).await?; + broadcast_update_with_timeout(collab_storage, database_id.to_string(), db_collab_update).await; Ok(new_id) } @@ -743,9 +918,11 @@ pub async fn list_database_row_details( database_uuid_str: String, row_ids: &[&str], unsupported_field_types: &[FieldType], + with_doc: bool, ) -> Result, AppError> { let (database_collab, db_body) = - get_database_body(collab_storage, &workspace_uuid_str, &database_uuid_str).await?; + get_latest_collab_database_body(collab_storage, &workspace_uuid_str, &database_uuid_str) + .await?; let all_fields: Vec = db_body .fields @@ -766,7 +943,7 @@ pub async fn list_database_row_details( collab_type: CollabType::DatabaseRow, }) .collect(); - let database_row_details = collab_storage + let mut db_row_details = collab_storage .batch_get_collab(&uid, &workspace_uuid_str, query_collabs, true) .await .into_iter() @@ -794,8 +971,15 @@ pub async fn list_database_row_details( return None; }, }; + + let has_doc = !row_detail.meta.is_document_empty; let cells = get_row_details_serde(row_detail, &field_by_id, &type_option_reader_by_id); - Some(AFDatabaseRowDetail { id, cells }) + Some(AFDatabaseRowDetail { + id, + cells, + has_doc, + doc: None, + }) }, QueryCollabResult::Failed { error } => { tracing::warn!("Failed to get collab: {:?}", error); @@ -804,5 +988,78 @@ pub async fn list_database_row_details( }) .collect::>(); - Ok(database_row_details) + // Fill in the document content if requested and exists + if with_doc { + let doc_id_by_row_id = db_row_details + .iter() + .filter(|row| row.has_doc) + .flat_map(|row| { + row.id.parse::().ok().map(|row_uuid| { + ( + row.id.clone(), + meta_id_from_row_id(&row_uuid, RowMetaKey::DocumentId), + ) + }) + }) + .collect::>(); + let query_db_docs = doc_id_by_row_id + .values() + .map(|doc_id| QueryCollab { + object_id: doc_id.to_string(), + collab_type: CollabType::Document, + }) + .collect::>(); + let mut query_res = collab_storage + .batch_get_collab(&uid, &workspace_uuid_str, query_db_docs, true) + .await; + for row_detail in &mut db_row_details { + if let Err(err) = fill_in_db_row_doc(row_detail, &doc_id_by_row_id, &mut query_res) { + tracing::error!("Failed to fill in document content: {:?}", err); + }; + } + } + + Ok(db_row_details) +} + +fn fill_in_db_row_doc( + row_detail: &mut AFDatabaseRowDetail, + doc_id_by_row_id: &HashMap, + query_res: &mut HashMap, +) -> Result<(), AppError> { + let doc_id = doc_id_by_row_id.get(&row_detail.id).ok_or_else(|| { + AppError::Internal(anyhow::anyhow!( + "Failed to get document id for row id: {}", + row_detail.id + )) + })?; + let res = query_res.remove(doc_id.as_str()).ok_or_else(|| { + AppError::Internal(anyhow::anyhow!( + "Failed to get document collab for row id: {}", + row_detail.id + )) + })?; + + let ec_bytes = match res { + QueryCollabResult::Success { encode_collab_v1 } => encode_collab_v1, + QueryCollabResult::Failed { error } => return Err(AppError::Internal(anyhow::anyhow!(error))), + }; + let ec = EncodedCollab::decode_from_bytes(&ec_bytes)?; + let doc_collab = Collab::new_with_source(CollabOrigin::Server, doc_id, ec.into(), vec![], false) + .map_err(|err| { + AppError::Internal(anyhow::anyhow!( + "Failed to create document collab: {:?}", + err + )) + })?; + let doc = Document::open(doc_collab) + .map_err(|err| AppError::Internal(anyhow::anyhow!("Failed to open document: {:?}", err)))?; + let plain_text = doc.to_plain_text(true, false).map_err(|err| { + AppError::Internal(anyhow::anyhow!( + "Failed to convert document to plain text: {:?}", + err + )) + })?; + row_detail.doc = Some(plain_text); + Ok(()) } diff --git a/src/biz/collab/utils.rs b/src/biz/collab/utils.rs index c89bceb7d..510432e22 100644 --- a/src/biz/collab/utils.rs +++ b/src/biz/collab/utils.rs @@ -11,14 +11,20 @@ use collab_database::fields::TypeOptionCellReader; use collab_database::fields::TypeOptionCellWriter; use collab_database::fields::TypeOptionData; use collab_database::fields::TypeOptions; +use collab_database::rows::meta_id_from_row_id; use collab_database::rows::Cell; +use collab_database::rows::DatabaseRowBody; use collab_database::rows::RowDetail; -use collab_database::template::entity::CELL_DATA; +use collab_database::rows::RowId; +use collab_database::rows::RowMetaKey; use collab_database::template::timestamp_parse::TimestampCellData; use collab_database::workspace_database::NoPersistenceDatabaseCollabService; +use collab_document::document::Document; +use collab_document::importer::md_importer::MDImporter; use collab_entity::CollabType; use collab_entity::EncodedCollab; use collab_folder::CollabOrigin; +use collab_folder::Folder; use database::collab::CollabStorage; use database::collab::GetCollabOrigin; use database_entity::dto::QueryCollab; @@ -26,14 +32,15 @@ use database_entity::dto::QueryCollabParams; use std::collections::HashMap; use std::collections::HashSet; use std::sync::Arc; +use yrs::Map; pub fn get_row_details_serde( row_detail: RowDetail, field_by_id_name_uniq: &HashMap, type_option_reader_by_id: &HashMap>, -) -> HashMap> { +) -> HashMap { let mut cells = row_detail.row.cells; - let mut row_details_serde: HashMap> = + let mut row_details_serde: HashMap = HashMap::with_capacity(cells.len()); for (field_id, field) in field_by_id_name_uniq { let cell: Cell = match cells.remove(field_id) { @@ -58,10 +65,7 @@ pub fn get_row_details_serde( serde_json::Value::Null }, }; - row_details_serde.insert( - field.name.clone(), - HashMap::from([(CELL_DATA.to_string(), cell_value)]), - ); + row_details_serde.insert(field.name.clone(), cell_value); } row_details_serde @@ -180,7 +184,33 @@ pub fn type_options_serde( result } -pub async fn get_database_body( +pub async fn get_latest_collab_database_row_body( + collab_storage: &CollabAccessControlStorage, + workspace_uuid_str: &str, + db_row_uuid_str: &str, +) -> Result<(Collab, DatabaseRowBody), AppError> { + let mut db_row_collab = get_latest_collab( + collab_storage, + GetCollabOrigin::Server, + workspace_uuid_str, + db_row_uuid_str, + CollabType::DatabaseRow, + ) + .await?; + + let row_id: RowId = db_row_uuid_str.to_string().into(); + let db_row_body = DatabaseRowBody::open(row_id, &mut db_row_collab).map_err(|err| { + AppError::Internal(anyhow::anyhow!( + "Failed to create database row body from collab, db_row_id: {}, err: {}", + db_row_uuid_str, + err + )) + })?; + + Ok((db_row_collab, db_row_body)) +} + +pub async fn get_latest_collab_database_body( collab_storage: &CollabAccessControlStorage, workspace_uuid_str: &str, database_uuid_str: &str, @@ -247,6 +277,59 @@ pub async fn get_latest_collab( Ok(collab) } +pub async fn get_latest_collab_folder( + collab_storage: &CollabAccessControlStorage, + collab_origin: GetCollabOrigin, + workspace_id: &str, +) -> Result { + let folder_uid = if let GetCollabOrigin::User { uid } = collab_origin { + uid + } else { + // Dummy uid to open the collab folder if the request does not originate from user + 0 + }; + let encoded_collab = get_latest_collab_encoded( + collab_storage, + collab_origin, + workspace_id, + workspace_id, + CollabType::Folder, + ) + .await?; + let folder = Folder::from_collab_doc_state( + folder_uid, + CollabOrigin::Server, + encoded_collab.into(), + workspace_id, + vec![], + ) + .map_err(|e| AppError::Unhandled(e.to_string()))?; + Ok(folder) +} + +pub async fn get_latest_collab_document( + collab_storage: &CollabAccessControlStorage, + collab_origin: GetCollabOrigin, + workspace_id: &str, + doc_oid: &str, +) -> Result { + let doc_collab = get_latest_collab( + collab_storage, + collab_origin, + workspace_id, + doc_oid, + CollabType::Document, + ) + .await?; + Document::open(doc_collab).map_err(|e| { + AppError::Internal(anyhow::anyhow!( + "Failed to create document body from collab, doc_oid: {}, {}", + doc_oid, + e + )) + }) +} + pub async fn collab_to_bin(collab: Collab, collab_type: CollabType) -> Result, AppError> { tokio::task::spawn_blocking(move || { let bin = collab @@ -269,3 +352,197 @@ pub fn collab_from_doc_state(doc_state: Vec, object_id: &str) -> Result, + db_row_body: &DatabaseRowBody, + cell_value_by_id: HashMap, + modified_ts: i64, +) -> Result<(), AppError> { + let all_fields = db_body.fields.get_all_fields(db_row_txn); + let field_by_id = all_fields.iter().fold(HashMap::new(), |mut acc, field| { + acc.insert(field.id.clone(), field.clone()); + acc + }); + let type_option_reader_by_id = type_option_writer_by_id(&all_fields); + let field_by_name = field_by_name_uniq(all_fields); + + // set last_modified + db_row_body.update(db_row_txn, |row_update| { + row_update.set_last_modified(modified_ts); + }); + + // for each field given by user input, overwrite existing data + for (id, serde_val) in cell_value_by_id { + let field = match field_by_id.get(&id) { + Some(f) => f, + // try use field name if id not found + None => match field_by_name.get(&id) { + Some(f) => f, + None => { + tracing::warn!("Failed to get field by id or name for field: {}", id); + continue; + }, + }, + }; + let cell_writer = match type_option_reader_by_id.get(&field.id) { + Some(cell_writer) => cell_writer, + None => { + tracing::error!("Failed to get type option writer for field: {}", field.id); + continue; + }, + }; + let new_cell: Cell = cell_writer.convert_json_to_cell(serde_val); + db_row_body.update(db_row_txn, |row_update| { + row_update.update_cells(|cells_update| { + cells_update.insert_cell(&field.id, new_cell); + }); + }); + } + Ok(()) +} + +pub async fn create_row_document( + workspace_id: &str, + uid: i64, + new_doc_id: &str, + collab_storage: &CollabAccessControlStorage, + row_doc_content: String, +) -> Result { + let md_importer = MDImporter::new(None); + let doc_data = md_importer + .import(new_doc_id, row_doc_content) + .map_err(|e| AppError::Internal(anyhow::anyhow!("Failed to import markdown: {:?}", e)))?; + let doc = Document::create(new_doc_id, doc_data) + .map_err(|e| AppError::Internal(anyhow::anyhow!("Failed to create document: {:?}", e)))?; + let doc_ec = doc.encode_collab().map_err(|e| { + AppError::Internal(anyhow::anyhow!("Failed to encode document collab: {:?}", e)) + })?; + + let mut folder = + get_latest_collab_folder(collab_storage, GetCollabOrigin::Server, workspace_id).await?; + let folder_updates = { + let mut folder_txn = folder.collab.transact_mut(); + folder.body.views.insert( + &mut folder_txn, + collab_folder::View::orphan_view(new_doc_id, collab_folder::ViewLayout::Document, Some(uid)), + None, + ); + folder_txn.encode_update_v1() + }; + + let updated_folder = collab_to_bin(folder.collab, CollabType::Folder).await?; + + let doc_ec_bytes = doc_ec + .encode_to_bytes() + .map_err(|e| AppError::Internal(anyhow::anyhow!("Failed to encode db doc: {:?}", e)))?; + + Ok(CreatedRowDocument { + updated_folder, + folder_updates, + doc_ec_bytes, + }) +} + +pub enum DocChanges { + Update(Vec, Vec), // (updated_doc, doc_update) + Insert(CreatedRowDocument), +} + +pub async fn get_database_row_doc_changes( + collab_storage: &CollabAccessControlStorage, + workspace_uuid_str: &str, + row_doc_content: Option, + db_row_body: &DatabaseRowBody, + db_row_txn: &mut yrs::TransactionMut<'_>, + row_id: &str, + uid: i64, +) -> Result, AppError> { + let row_doc_content = match row_doc_content { + Some(row_doc_content) if !row_doc_content.is_empty() => row_doc_content, + _ => return Ok(None), + }; + + let doc_id = db_row_body + .document_id(db_row_txn) + .map_err(|err| AppError::Internal(anyhow::anyhow!("Failed to get document id: {:?}", err)))?; + + match doc_id { + Some(doc_id) => { + let cur_doc = get_latest_collab_document( + collab_storage, + GetCollabOrigin::Server, + workspace_uuid_str, + &doc_id, + ) + .await?; + + let md_importer = MDImporter::new(None); + let new_doc_data = md_importer + .import(&doc_id, row_doc_content) + .map_err(|e| AppError::Internal(anyhow::anyhow!("Failed to import markdown: {:?}", e)))?; + let new_doc = Document::create(&doc_id, new_doc_data) + .map_err(|e| AppError::Internal(anyhow::anyhow!("Failed to create document: {:?}", e)))?; + + // if the document content is the same, there is no need to update + if cur_doc.to_plain_text(false, false).unwrap_or_default() + == new_doc.to_plain_text(false, false).unwrap_or_default() + { + return Ok(None); + }; + + let (mut cur_doc_collab, mut cur_doc_body) = cur_doc.split(); + + let doc_update = { + let mut txn = cur_doc_collab.context.transact_mut(); + let new_doc_data = new_doc.get_document_data().map_err(|e| { + AppError::Internal(anyhow::anyhow!("Failed to get document data: {:?}", e)) + })?; + cur_doc_body + .reset_with_data(&mut txn, Some(new_doc_data)) + .map_err(|e| AppError::Internal(anyhow::anyhow!("Failed to reset document: {:?}", e)))?; + txn.encode_update_v1() + }; + + // Clear undo manager state to save space + if let Ok(undo_mgr) = cur_doc_collab.undo_manager_mut() { + undo_mgr.clear(); + } + + let updated_doc = collab_to_bin(cur_doc_collab, CollabType::Document).await?; + Ok(Some((doc_id, DocChanges::Update(updated_doc, doc_update)))) + }, + None => { + // update row to indicate that the document is not empty + let is_document_empty_id = meta_id_from_row_id(&row_id.parse()?, RowMetaKey::IsDocumentEmpty); + db_row_body + .get_meta() + .insert(db_row_txn, is_document_empty_id, false); + + // get document id + let new_doc_id = db_row_body + .document_id(db_row_txn) + .map_err(|err| AppError::Internal(anyhow::anyhow!("Failed to get document id: {:?}", err)))? + .ok_or_else(|| AppError::Internal(anyhow::anyhow!("Failed to get document id")))?; + + let created_row_doc: CreatedRowDocument = create_row_document( + workspace_uuid_str, + uid, + &new_doc_id, + collab_storage, + row_doc_content, + ) + .await?; + Ok(Some((new_doc_id, DocChanges::Insert(created_row_doc)))) + }, + } +} + +pub struct CreatedRowDocument { + pub updated_folder: Vec, + pub folder_updates: Vec, + pub doc_ec_bytes: Vec, +} diff --git a/src/biz/workspace/ops.rs b/src/biz/workspace/ops.rs index b20b1a02e..d93f51989 100644 --- a/src/biz/workspace/ops.rs +++ b/src/biz/workspace/ops.rs @@ -11,7 +11,7 @@ use serde_json::json; use sqlx::{types::uuid, PgPool}; use std::ops::DerefMut; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; use tracing::instrument; use uuid::Uuid; use yrs::updates::encoder::Encode; @@ -758,3 +758,31 @@ pub async fn broadcast_update( Ok(()) } + +/// like [broadcast_update] but in separate tokio task +/// waits for a maximum of 30 seconds for the broadcast to complete +pub async fn broadcast_update_with_timeout( + collab_storage: Arc, + oid: String, + encoded_update: Vec, +) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + tracing::info!("broadcasting update to group: {}", oid); + let res = match tokio::time::timeout( + Duration::from_secs(30), + broadcast_update(&collab_storage, &oid, encoded_update), + ) + .await + { + Ok(res) => res, + Err(err) => { + tracing::error!("Error while broadcasting the updates: {:?}", err); + return; + }, + }; + match res { + Ok(()) => tracing::info!("broadcasted update to group: {}", oid), + Err(err) => tracing::error!("Error while broadcasting the updates: {:?}", err), + } + }) +} diff --git a/src/biz/workspace/page_view.rs b/src/biz/workspace/page_view.rs index 05f56c7f5..c74074e02 100644 --- a/src/biz/workspace/page_view.rs +++ b/src/biz/workspace/page_view.rs @@ -5,8 +5,10 @@ use crate::biz::collab::folder_view::{ check_if_view_is_space, parse_extra_field_as_json, to_dto_view_icon, to_dto_view_layout, to_folder_view_icon, to_space_permission, }; -use crate::biz::collab::ops::{get_latest_collab_folder, get_latest_workspace_database}; -use crate::biz::collab::utils::{collab_from_doc_state, get_latest_collab_encoded}; +use crate::biz::collab::ops::get_latest_workspace_database; +use crate::biz::collab::utils::{ + collab_from_doc_state, get_latest_collab_encoded, get_latest_collab_folder, +}; use actix_web::web::Data; use anyhow::anyhow; use app_error::AppError; diff --git a/src/biz/workspace/publish.rs b/src/biz/workspace/publish.rs index 954f0b8ec..f852c4358 100644 --- a/src/biz/workspace/publish.rs +++ b/src/biz/workspace/publish.rs @@ -39,7 +39,7 @@ use database::{ use crate::{ api::metrics::PublishedCollabMetrics, - biz::collab::{folder_view::to_dto_folder_view_miminal, ops::get_latest_collab_folder}, + biz::collab::{folder_view::to_dto_folder_view_miminal, utils::get_latest_collab_folder}, }; async fn check_workspace_owner_or_publisher( diff --git a/src/biz/workspace/publish_dup.rs b/src/biz/workspace/publish_dup.rs index cfed05aa4..78b0745bd 100644 --- a/src/biz/workspace/publish_dup.rs +++ b/src/biz/workspace/publish_dup.rs @@ -47,6 +47,7 @@ use crate::biz::collab::utils::collab_to_bin; use crate::biz::collab::utils::get_latest_collab_encoded; use super::ops::broadcast_update; +use super::ops::broadcast_update_with_timeout; #[allow(clippy::too_many_arguments)] pub async fn duplicate_published_collab_to_workspace( @@ -357,20 +358,12 @@ impl PublishCollabDuplicator { }?; // broadcast folder changes - match tokio::time::timeout( - Duration::from_secs(30), - broadcast_update(&collab_storage, &dest_workspace_id, encoded_update), - ) - .await - { - Ok(result) => result.map_err(AppError::from), - Err(_) => { - error!("Timeout waiting for broadcasting the updates"); - Err(AppError::RequestTimeout( - "timeout while duplicating".to_string(), - )) - }, - } + tokio::spawn(broadcast_update_with_timeout( + collab_storage, + dest_workspace_id, + encoded_update, + )); + Ok(()) } /// Deep copy a published collab to the destination workspace. diff --git a/tests/collab/database_crud.rs b/tests/collab/database_crud.rs index cfaffd3c0..80966fcff 100644 --- a/tests/collab/database_crud.rs +++ b/tests/collab/database_crud.rs @@ -1,7 +1,145 @@ +use std::collections::HashMap; + use client_api_test::{generate_unique_registered_user_client, workspace_id_from_client}; use collab_database::entity::FieldType; +use serde_json::json; use shared_entity::dto::workspace_dto::AFInsertDatabaseField; +#[tokio::test] +async fn database_row_upsert_with_doc() { + let (c, _user) = generate_unique_registered_user_client().await; + let workspace_id = workspace_id_from_client(&c).await; + let databases = c.list_databases(&workspace_id).await.unwrap(); + assert_eq!(databases.len(), 1); + + let todo_db = &databases[0]; + + // Upsert row + let row_id = c + .upsert_database_item( + &workspace_id, + &todo_db.id, + "my_pre_hash_123".to_string(), + HashMap::from([]), + Some("This is a document of a database row".to_string()), + ) + .await + .unwrap(); + + { + // Get row and check data + let row_detail = &c + .list_database_row_details(&workspace_id, &todo_db.id, &[&row_id], true) + .await + .unwrap()[0]; + assert!(row_detail.has_doc); + assert_eq!( + row_detail.doc, + Some(String::from("\nThis is a document of a database row")) + ); + } + // Upsert row with another doc + let _ = c + .upsert_database_item( + &workspace_id, + &todo_db.id, + "my_pre_hash_123".to_string(), + HashMap::from([]), + Some("This is a another document".to_string()), + ) + .await + .unwrap(); + { + // Get row and check that doc has been modified + let row_detail = &c + .list_database_row_details(&workspace_id, &todo_db.id, &[&row_id], true) + .await + .unwrap()[0]; + assert_eq!( + row_detail.doc, + Some(String::from("\nThis is a another document")) + ); + } +} + +#[tokio::test] +async fn database_row_upsert() { + let (c, _user) = generate_unique_registered_user_client().await; + let workspace_id = workspace_id_from_client(&c).await; + let databases = c.list_databases(&workspace_id).await.unwrap(); + assert_eq!(databases.len(), 1); + + let todo_db = &databases[0]; + + // predefined string to be used to identify the row + let pre_hash = String::from("my_id_123"); + + // Upsert row + let row_id = c + .upsert_database_item( + &workspace_id, + &todo_db.id, + pre_hash.clone(), + HashMap::from([ + (String::from("Description"), json!("description_1")), + (String::from("Status"), json!("To Do")), + (String::from("Multiselect"), json!(["social", "news"])), + ]), + Some("".to_string()), + ) + .await + .unwrap(); + + { + // Get row and check data + let row_detail = &c + .list_database_row_details(&workspace_id, &todo_db.id, &[&row_id], false) + .await + .unwrap()[0]; + assert_eq!(row_detail.cells["Description"], "description_1"); + assert_eq!(row_detail.cells["Status"], "To Do"); + assert_eq!(row_detail.cells["Multiselect"][0], "social"); + assert_eq!(row_detail.cells["Multiselect"][1], "news"); + assert!(!row_detail.has_doc); + } + + { + // Upsert row again with different data, using same pre_hash + // row_id return should be the same as previous + let row_id_2 = c + .upsert_database_item( + &workspace_id, + &todo_db.id, + pre_hash, + HashMap::from([ + (String::from("Description"), json!("description_2")), + (String::from("Status"), json!("Doing")), + (String::from("Multiselect"), json!(["fast", "self-host"])), + ]), + Some("This is a document of a database row".to_string()), + ) + .await + .unwrap(); + assert_eq!(row_id, row_id_2); + } + { + // Get row and check data, it should be modified + let row_detail = &c + .list_database_row_details(&workspace_id, &todo_db.id, &[&row_id], true) + .await + .unwrap()[0]; + assert_eq!(row_detail.cells["Description"], "description_2"); + assert_eq!(row_detail.cells["Status"], "Doing"); + assert_eq!(row_detail.cells["Multiselect"][0], "fast"); + assert_eq!(row_detail.cells["Multiselect"][1], "self-host"); + assert!(row_detail.has_doc); + assert_eq!( + row_detail.doc, + Some("\nThis is a document of a database row".to_string()) + ); + } +} + #[tokio::test] async fn database_fields_crud() { let (c, _user) = generate_unique_registered_user_client().await; @@ -69,39 +207,47 @@ async fn database_fields_crud() { .add_database_item( &workspace_id, &todo_db.id, - &serde_json::json!({ - "Description": my_description, - "Status": my_status, - "Multiselect": ["social", "news"], - my_num_field_id: 123, - my_datetime_field_id: 1733210221, - my_url_field_id: "https://appflowy.io", - my_checkbox_field_id: true, - }), + HashMap::from([ + (String::from("Description"), json!(my_description)), + (String::from("Status"), json!(my_status)), + (String::from("Multiselect"), json!(["social", "news"])), + (my_num_field_id, json!(123)), + (my_datetime_field_id, json!(1733210221)), + (my_url_field_id, json!("https://appflowy.io")), + (my_checkbox_field_id, json!(true)), + ]), + None, ) .await .unwrap(); let row_details = c - .list_database_row_details(&workspace_id, &todo_db.id, &[&new_row_id]) + .list_database_row_details(&workspace_id, &todo_db.id, &[&new_row_id], false) .await .unwrap(); assert_eq!(row_details.len(), 1); let new_row_detail = &row_details[0]; - assert_eq!(new_row_detail.cells["Description"]["data"], my_description); - assert_eq!(new_row_detail.cells["Status"]["data"], my_status); - assert_eq!(new_row_detail.cells["Multiselect"]["data"][0], "social"); - assert_eq!(new_row_detail.cells["Multiselect"]["data"][1], "news"); - assert_eq!(new_row_detail.cells["MyNumberColumn"]["data"], "123"); + assert_eq!(new_row_detail.cells["Description"], my_description); + assert_eq!(new_row_detail.cells["Status"], my_status); + assert_eq!(new_row_detail.cells["Multiselect"][0], "social"); + assert_eq!(new_row_detail.cells["Multiselect"][1], "news"); + assert_eq!(new_row_detail.cells["MyNumberColumn"], "123"); assert_eq!( - new_row_detail.cells["MyDateTimeColumn"]["data"]["timestamp"], - 1733210221 + new_row_detail.cells["MyDateTimeColumn"], + json!({ + "end": serde_json::Value::Null, + "pretty_end_date": serde_json::Value::Null, + "pretty_end_datetime": serde_json::Value::Null, + "pretty_end_time": serde_json::Value::Null, + "pretty_start_date": "2024-12-03", + "pretty_start_datetime": "2024-12-03 07:17:01 UTC", + "pretty_start_time": "07:17:01", + "start": "2024-12-03T07:17:01+00:00", + "timezone": "UTC", + }), ); - assert_eq!( - new_row_detail.cells["MyUrlField"]["data"], - "https://appflowy.io" - ); - assert_eq!(new_row_detail.cells["MyCheckboxColumn"]["data"], true); + assert_eq!(new_row_detail.cells["MyUrlField"], "https://appflowy.io"); + assert_eq!(new_row_detail.cells["MyCheckboxColumn"], true); } } @@ -133,18 +279,19 @@ async fn database_fields_unsupported_field_type() { .add_database_item( &workspace_id, &todo_db.id, - &serde_json::json!({ - "Description": my_description, - "Status": my_status, - "Multiselect": ["social", "news"], - my_rel_field_id: "relation_data" - }), + HashMap::from([ + (String::from("Description"), json!(my_description)), + (String::from("Status"), json!(my_status)), + (String::from("Multiselect"), json!(["social", "news"])), + (my_rel_field_id, json!("relation_data")), + ]), + None, ) .await .unwrap(); let row_details = c - .list_database_row_details(&workspace_id, &todo_db.id, &[&new_row_id]) + .list_database_row_details(&workspace_id, &todo_db.id, &[&new_row_id], false) .await .unwrap(); assert_eq!(row_details.len(), 1); @@ -152,3 +299,34 @@ async fn database_fields_unsupported_field_type() { assert!(!new_row_detail.cells.contains_key("MyRelationCol")); } } + +#[tokio::test] +async fn database_insert_row_with_doc() { + let (c, _user) = generate_unique_registered_user_client().await; + let workspace_id = workspace_id_from_client(&c).await; + let databases = c.list_databases(&workspace_id).await.unwrap(); + assert_eq!(databases.len(), 1); + let todo_db = &databases[0]; + + let row_doc = "This is a document of a database row"; + let new_row_id = c + .add_database_item( + &workspace_id, + &todo_db.id, + HashMap::from([]), + row_doc.to_string().into(), + ) + .await + .unwrap(); + + let row_details = c + .list_database_row_details(&workspace_id, &todo_db.id, &[&new_row_id], true) + .await + .unwrap(); + let row_detail = &row_details[0]; + assert!(row_detail.has_doc); + assert_eq!( + row_detail.doc, + Some("\nThis is a document of a database row".to_string()) + ); +} From 5f388c7432c4bb8da10e42d848d35eab8193f1f3 Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Wed, 18 Dec 2024 17:03:38 +0800 Subject: [PATCH 2/7] chore: add metrics for write embeeding to pg (#1084) --- .../src/indexer/indexer_scheduler.rs | 36 +++++++++++++++---- .../src/indexer/metrics.rs | 18 ++++++++-- services/appflowy-worker/src/config.rs | 2 +- 3 files changed, 46 insertions(+), 10 deletions(-) diff --git a/services/appflowy-collaborate/src/indexer/indexer_scheduler.rs b/services/appflowy-collaborate/src/indexer/indexer_scheduler.rs index adb7791a2..8e02e32b2 100644 --- a/services/appflowy-collaborate/src/indexer/indexer_scheduler.rs +++ b/services/appflowy-collaborate/src/indexer/indexer_scheduler.rs @@ -26,8 +26,9 @@ use rayon::prelude::*; use sqlx::PgPool; use std::pin::Pin; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tokio::time::timeout; use tracing::{debug, error, info, trace, warn}; use uuid::Uuid; @@ -101,8 +102,13 @@ impl IndexerScheduler { this.index_enabled(), num_thread ); + if this.index_enabled() { - tokio::spawn(spawn_write_indexing(rx, this.pg_pool.clone())); + tokio::spawn(spawn_write_indexing( + rx, + this.pg_pool.clone(), + this.metrics.clone(), + )); tokio::spawn(handle_unindexed_collabs(this.clone())); } @@ -301,7 +307,7 @@ impl IndexerScheduler { indexer.embed(&embedder, chunks) }); let duration = start.elapsed(); - metrics.record_processing_time(duration.as_millis()); + metrics.record_generate_embedding_time(duration.as_millis()); match result { Ok(embed_result) => match embed_result { @@ -494,7 +500,11 @@ async fn index_unindexd_collab( } const EMBEDDING_RECORD_BUFFER_SIZE: usize = 5; -async fn spawn_write_indexing(mut rx: UnboundedReceiver, pg_pool: PgPool) { +async fn spawn_write_indexing( + mut rx: UnboundedReceiver, + pg_pool: PgPool, + metrics: Arc, +) { let mut buf = Vec::with_capacity(EMBEDDING_RECORD_BUFFER_SIZE); loop { let n = rx.recv_many(&mut buf, EMBEDDING_RECORD_BUFFER_SIZE).await; @@ -503,6 +513,7 @@ async fn spawn_write_indexing(mut rx: UnboundedReceiver, pg_poo break; } + let start = Instant::now(); let records = buf.drain(..n).collect::>(); for record in records.iter() { info!( @@ -510,7 +521,20 @@ async fn spawn_write_indexing(mut rx: UnboundedReceiver, pg_poo record.object_id, record.tokens_used ); } - match batch_insert_records(&pg_pool, records).await { + + let result = timeout( + Duration::from_secs(20), + batch_insert_records(&pg_pool, records), + ) + .await + .unwrap_or_else(|_| { + Err(AppError::Internal(anyhow!( + "timeout when writing embeddings" + ))) + }); + + metrics.record_write_embedding_time(start.elapsed().as_millis()); + match result { Ok(_) => trace!("[Embedding] save {} embeddings to disk", n), Err(err) => error!("Failed to write collab embedding to disk:{}", err), } @@ -567,7 +591,7 @@ fn process_collab( let chunks = indexer.create_embedded_chunks(&collab, embdder.model())?; let result = indexer.embed(embdder, chunks); let duration = start_time.elapsed(); - metrics.record_processing_time(duration.as_millis()); + metrics.record_generate_embedding_time(duration.as_millis()); match result { Ok(Some(embeddings)) => Ok(Some((embeddings.tokens_consumed, embeddings.params))), diff --git a/services/appflowy-collaborate/src/indexer/metrics.rs b/services/appflowy-collaborate/src/indexer/metrics.rs index e410b5086..6825bdb8d 100644 --- a/services/appflowy-collaborate/src/indexer/metrics.rs +++ b/services/appflowy-collaborate/src/indexer/metrics.rs @@ -5,6 +5,7 @@ pub struct EmbeddingMetrics { total_embed_count: Counter, failed_embed_count: Counter, processing_time_histogram: Histogram, + write_embedding_time_histogram: Histogram, } impl EmbeddingMetrics { @@ -12,7 +13,8 @@ impl EmbeddingMetrics { Self { total_embed_count: Counter::default(), failed_embed_count: Counter::default(), - processing_time_histogram: Histogram::new([100.0, 300.0, 800.0, 2000.0, 5000.0].into_iter()), + processing_time_histogram: Histogram::new([500.0, 1000.0, 5000.0, 8000.0].into_iter()), + write_embedding_time_histogram: Histogram::new([500.0, 1000.0, 5000.0, 8000.0].into_iter()), } } @@ -36,6 +38,11 @@ impl EmbeddingMetrics { "Histogram of embedding processing times", metrics.processing_time_histogram.clone(), ); + realtime_registry.register( + "write_embedding_time_seconds", + "Histogram of embedding write times", + metrics.write_embedding_time_histogram.clone(), + ); metrics } @@ -48,8 +55,13 @@ impl EmbeddingMetrics { self.failed_embed_count.inc_by(count); } - pub fn record_processing_time(&self, millis: u128) { - tracing::trace!("[Embedding]: processing time: {}ms", millis); + pub fn record_generate_embedding_time(&self, millis: u128) { + tracing::trace!("[Embedding]: generate embeddings cost: {}ms", millis); self.processing_time_histogram.observe(millis as f64); } + + pub fn record_write_embedding_time(&self, millis: u128) { + tracing::trace!("[Embedding]: write embedding time cost: {}ms", millis); + self.write_embedding_time_histogram.observe(millis as f64); + } } diff --git a/services/appflowy-worker/src/config.rs b/services/appflowy-worker/src/config.rs index f8354c6e1..d5bb99af0 100644 --- a/services/appflowy-worker/src/config.rs +++ b/services/appflowy-worker/src/config.rs @@ -56,7 +56,7 @@ impl Config { // Adapted from: https://github.com/AppFlowy-IO/AppFlowy-Cloud/issues/984 smtp_username: get_env_var("APPFLOWY_MAILER_SMTP_USERNAME", "sender@example.com"), smtp_password: get_env_var("APPFLOWY_MAILER_SMTP_PASSWORD", "password").into(), - smtp_tls_kind: get_env_var("APPFLOWY_MAILER_SMTP_TLS_KIND", "wrapper").into(), + smtp_tls_kind: get_env_var("APPFLOWY_MAILER_SMTP_TLS_KIND", "wrapper"), }, }) } From 6fa6fa00cb82513b3d6bb8bffa3e3686c90c0068 Mon Sep 17 00:00:00 2001 From: Khor Shu Heng <32997938+khorshuheng@users.noreply.github.com> Date: Wed, 18 Dec 2024 22:47:25 +0800 Subject: [PATCH 3/7] chore: refactor page view APIs to write to in-memory collab group (#1083) --- .../appflowy-collaborate/src/group/cmd.rs | 6 +- src/api/workspace.rs | 62 ++- src/biz/workspace/page_view.rs | 463 ++++++++---------- tests/workspace/page_view.rs | 8 +- 4 files changed, 263 insertions(+), 276 deletions(-) diff --git a/services/appflowy-collaborate/src/group/cmd.rs b/services/appflowy-collaborate/src/group/cmd.rs index 70ea14c58..7f2bce2d9 100644 --- a/services/appflowy-collaborate/src/group/cmd.rs +++ b/services/appflowy-collaborate/src/group/cmd.rs @@ -266,8 +266,8 @@ where }); // Create message router for user if it's not exist - let should_sub = self.msg_router_by_user.get(user).is_none(); - if should_sub { + let is_router_exists = self.msg_router_by_user.get(user).is_some(); + if !is_router_exists { trace!("create a new client message router for user:{}", user); let new_client_router = ClientMessageRouter::new(NullSender::<()>::default()); self @@ -285,7 +285,7 @@ where } // Only subscribe when the user is not subscribed to the group - if should_sub { + if !self.group_manager.contains_user(object_id, user) { self.subscribe_group(user, object_id, &origin).await?; } if let Some(client_stream) = self.msg_router_by_user.get(user) { diff --git a/src/api/workspace.rs b/src/api/workspace.rs index 1b7565e8e..aa3f5e00d 100644 --- a/src/api/workspace.rs +++ b/src/api/workspace.rs @@ -962,13 +962,18 @@ async fn post_space_handler( path: web::Path, payload: Json, state: Data, + server: Data, + req: HttpRequest, ) -> Result>> { let uid = state.user_cache.get_user_uid(&user_uuid).await?; let workspace_uuid = path.into_inner(); + let user = realtime_user_for_web_request(req.headers(), uid)?; let space = create_space( + &state.metrics.appflowy_web_metrics, + server, + user, &state.pg_pool, &state.collab_access_control_storage, - uid, workspace_uuid, &payload.space_permission, &payload.name, @@ -984,13 +989,17 @@ async fn update_space_handler( path: web::Path<(Uuid, String)>, payload: Json, state: Data, + server: Data, + req: HttpRequest, ) -> Result>> { let uid = state.user_cache.get_user_uid(&user_uuid).await?; let (workspace_uuid, view_id) = path.into_inner(); + let user = realtime_user_for_web_request(req.headers(), uid)?; update_space( - &state.pg_pool, + &state.metrics.appflowy_web_metrics, + server, + user, &state.collab_access_control_storage, - uid, workspace_uuid, &view_id, &payload.space_permission, @@ -1007,13 +1016,18 @@ async fn post_page_view_handler( path: web::Path, payload: Json, state: Data, + server: Data, + req: HttpRequest, ) -> Result>> { let uid = state.user_cache.get_user_uid(&user_uuid).await?; let workspace_uuid = path.into_inner(); + let user = realtime_user_for_web_request(req.headers(), uid)?; let page = create_page( + &state.metrics.appflowy_web_metrics, + server, + user, &state.pg_pool, &state.collab_access_control_storage, - uid, workspace_uuid, &payload.parent_view_id, &payload.layout, @@ -1028,13 +1042,17 @@ async fn move_page_handler( path: web::Path<(Uuid, String)>, payload: Json, state: Data, + server: Data, + req: HttpRequest, ) -> Result>> { let uid = state.user_cache.get_user_uid(&user_uuid).await?; let (workspace_uuid, view_id) = path.into_inner(); + let user = realtime_user_for_web_request(req.headers(), uid)?; move_page( - &state.pg_pool, + &state.metrics.appflowy_web_metrics, + server, + user, &state.collab_access_control_storage, - uid, workspace_uuid, &view_id, &payload.new_parent_view_id, @@ -1048,13 +1066,17 @@ async fn move_page_to_trash_handler( user_uuid: UserUuid, path: web::Path<(Uuid, String)>, state: Data, + server: Data, + req: HttpRequest, ) -> Result>> { let uid = state.user_cache.get_user_uid(&user_uuid).await?; let (workspace_uuid, view_id) = path.into_inner(); + let user = realtime_user_for_web_request(req.headers(), uid)?; move_page_to_trash( - &state.pg_pool, + &state.metrics.appflowy_web_metrics, + server, + user, &state.collab_access_control_storage, - uid, workspace_uuid, &view_id, ) @@ -1066,13 +1088,17 @@ async fn restore_page_from_trash_handler( user_uuid: UserUuid, path: web::Path<(Uuid, String)>, state: Data, + server: Data, + req: HttpRequest, ) -> Result>> { let uid = state.user_cache.get_user_uid(&user_uuid).await?; let (workspace_uuid, view_id) = path.into_inner(); + let user = realtime_user_for_web_request(req.headers(), uid)?; restore_page_from_trash( - &state.pg_pool, + &state.metrics.appflowy_web_metrics, + server, + user, &state.collab_access_control_storage, - uid, workspace_uuid, &view_id, ) @@ -1084,13 +1110,17 @@ async fn restore_all_pages_from_trash_handler( user_uuid: UserUuid, path: web::Path, state: Data, + server: Data, + req: HttpRequest, ) -> Result>> { let uid = state.user_cache.get_user_uid(&user_uuid).await?; let workspace_uuid = path.into_inner(); + let user = realtime_user_for_web_request(req.headers(), uid)?; restore_all_pages_from_trash( - &state.pg_pool, + &state.metrics.appflowy_web_metrics, + server, + user, &state.collab_access_control_storage, - uid, workspace_uuid, ) .await?; @@ -1161,6 +1191,8 @@ async fn update_page_view_handler( path: web::Path<(Uuid, String)>, payload: Json, state: Data, + server: Data, + req: HttpRequest, ) -> Result>> { let uid = state.user_cache.get_user_uid(&user_uuid).await?; let (workspace_uuid, view_id) = path.into_inner(); @@ -1169,10 +1201,12 @@ async fn update_page_view_handler( .extra .as_ref() .map(|json_value| json_value.to_string()); + let user = realtime_user_for_web_request(req.headers(), uid)?; update_page( - &state.pg_pool, + &state.metrics.appflowy_web_metrics, + server, + user, &state.collab_access_control_storage, - uid, workspace_uuid, &view_id, &payload.name, diff --git a/src/biz/workspace/page_view.rs b/src/biz/workspace/page_view.rs index c74074e02..b88c2f507 100644 --- a/src/biz/workspace/page_view.rs +++ b/src/biz/workspace/page_view.rs @@ -1,4 +1,3 @@ -use super::ops::broadcast_update; use crate::api::metrics::AppFlowyWebMetrics; use crate::api::ws::RealtimeServerAddr; use crate::biz::collab::folder_view::{ @@ -49,7 +48,7 @@ use serde_json::json; use shared_entity::dto::workspace_dto::{ FolderView, Page, PageCollab, PageCollabData, Space, SpacePermission, ViewIcon, ViewLayout, }; -use sqlx::{PgPool, Transaction}; +use sqlx::PgPool; use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -57,21 +56,12 @@ use tokio::time::timeout_at; use tracing::instrument; use uuid::Uuid; -struct WorkspaceDatabaseUpdate { - pub updated_encoded_collab: Vec, - pub encoded_updates: Vec, -} - -struct FolderUpdate { - pub updated_encoded_collab: Vec, - pub encoded_update: Vec, -} - #[allow(clippy::too_many_arguments)] pub async fn update_space( - pg_pool: &PgPool, + appflowy_web_metrics: &AppFlowyWebMetrics, + server: Data, + user: RealtimeUser, collab_storage: &CollabAccessControlStorage, - uid: i64, workspace_id: Uuid, view_id: &str, space_permission: &SpacePermission, @@ -79,7 +69,7 @@ pub async fn update_space( space_icon: &str, space_icon_color: &str, ) -> Result<(), AppError> { - let collab_origin = GetCollabOrigin::User { uid }; + let collab_origin = GetCollabOrigin::User { uid: user.uid }; let mut folder = get_latest_collab_folder(collab_storage, collab_origin, &workspace_id.to_string()).await?; let folder_update = update_space_properties( @@ -91,24 +81,24 @@ pub async fn update_space( space_icon_color, ) .await?; - let mut transaction = pg_pool.begin().await?; - insert_and_broadcast_workspace_folder_update( - uid, + update_workspace_folder_data( + appflowy_web_metrics, + server, + user, workspace_id, folder_update, - collab_storage, - &mut transaction, ) .await?; - transaction.commit().await?; Ok(()) } #[allow(clippy::too_many_arguments)] pub async fn create_space( + appflowy_web_metrics: &AppFlowyWebMetrics, + server: Data, + user: RealtimeUser, pg_pool: &PgPool, collab_storage: &CollabAccessControlStorage, - uid: i64, workspace_id: Uuid, space_permission: &SpacePermission, name: &str, @@ -117,11 +107,11 @@ pub async fn create_space( ) -> Result { let default_document_collab_params = prepare_default_document_collab_param()?; let view_id = default_document_collab_params.object_id.clone(); - let collab_origin = GetCollabOrigin::User { uid }; + let collab_origin = GetCollabOrigin::User { uid: user.uid }; let mut folder = get_latest_collab_folder(collab_storage, collab_origin, &workspace_id.to_string()).await?; let folder_update = add_new_space_to_folder( - uid, + user.uid, &workspace_id.to_string(), &view_id, &mut folder, @@ -137,18 +127,18 @@ pub async fn create_space( collab_storage .upsert_new_collab_with_transaction( &workspace_id.to_string(), - &uid, + &user.uid, default_document_collab_params, &mut transaction, &action, ) .await?; - insert_and_broadcast_workspace_folder_update( - uid, + update_workspace_folder_data( + appflowy_web_metrics, + server, + user, workspace_id, folder_update, - collab_storage, - &mut transaction, ) .await?; transaction.commit().await?; @@ -156,10 +146,13 @@ pub async fn create_space( Ok(Space { view_id }) } +#[allow(clippy::too_many_arguments)] pub async fn create_page( + appflowy_web_metrics: &AppFlowyWebMetrics, + server: Data, + user: RealtimeUser, pg_pool: &PgPool, collab_storage: &CollabAccessControlStorage, - uid: i64, workspace_id: Uuid, parent_view_id: &str, view_layout: &ViewLayout, @@ -168,9 +161,11 @@ pub async fn create_page( match view_layout { ViewLayout::Document => { create_document_page( + appflowy_web_metrics, + server, + user, pg_pool, collab_storage, - uid, workspace_id, parent_view_id, name, @@ -179,9 +174,11 @@ pub async fn create_page( }, ViewLayout::Grid => { create_grid_page( + appflowy_web_metrics, + server, + user, pg_pool, collab_storage, - uid, workspace_id, parent_view_id, name, @@ -190,9 +187,11 @@ pub async fn create_page( }, ViewLayout::Calendar => { create_calendar_page( + appflowy_web_metrics, + server, + user, pg_pool, collab_storage, - uid, workspace_id, parent_view_id, name, @@ -201,9 +200,11 @@ pub async fn create_page( }, ViewLayout::Board => { create_board_page( + appflowy_web_metrics, + server, + user, pg_pool, collab_storage, - uid, workspace_id, parent_view_id, name, @@ -412,7 +413,7 @@ async fn add_new_space_to_folder( name: &str, space_icon: &str, space_icon_color: &str, -) -> Result { +) -> Result, AppError> { let encoded_update = { let view = NestedChildViewBuilder::new(uid, workspace_id.to_string()) .with_view_id(view_id) @@ -437,10 +438,7 @@ async fn add_new_space_to_folder( } txn.encode_update_v1() }; - Ok(FolderUpdate { - updated_encoded_collab: folder_to_encoded_collab(folder)?, - encoded_update, - }) + Ok(encoded_update) } async fn update_space_properties( @@ -450,7 +448,7 @@ async fn update_space_properties( name: &str, space_icon: &str, space_icon_color: &str, -) -> Result { +) -> Result, AppError> { let encoded_update = { let mut txn = folder.collab.transact_mut(); folder.body.views.update_view(&mut txn, view_id, |update| { @@ -471,27 +469,22 @@ async fn update_space_properties( }); txn.encode_update_v1() }; - Ok(FolderUpdate { - updated_encoded_collab: folder_to_encoded_collab(folder)?, - encoded_update, - }) + Ok(encoded_update) } async fn add_new_database_to_workspace( workspace_database: &mut WorkspaceDatabase, database_id: &str, view_id: &str, -) -> Result { - let view_ids_by_database_id = - HashMap::from([(database_id.to_string(), vec![view_id.to_string()])]); - let encoded_updates = workspace_database - .batch_add_database(view_ids_by_database_id) - .encode_update_v1(); - let updated_encoded_collab = workspace_database_to_encoded_collab(workspace_database)?; - Ok(WorkspaceDatabaseUpdate { - updated_encoded_collab, - encoded_updates, - }) +) -> Result, AppError> { + let encoded_updates = { + let mut txn = workspace_database.collab.transact_mut(); + workspace_database + .body + .add_database(&mut txn, database_id, vec![view_id.to_string()]); + txn.encode_update_v1() + }; + Ok(encoded_updates) } async fn add_new_view_to_folder( @@ -501,7 +494,7 @@ async fn add_new_view_to_folder( folder: &mut Folder, name: Option<&str>, layout: collab_folder::ViewLayout, -) -> Result { +) -> Result, AppError> { let encoded_update = { let view = NestedChildViewBuilder::new(uid, parent_view_id.to_string()) .with_view_id(view_id) @@ -514,10 +507,7 @@ async fn add_new_view_to_folder( txn.encode_update_v1() }; - Ok(FolderUpdate { - updated_encoded_collab: folder_to_encoded_collab(folder)?, - encoded_update, - }) + Ok(encoded_update) } async fn update_view_properties( @@ -526,7 +516,7 @@ async fn update_view_properties( name: &str, icon: Option<&ViewIcon>, extra: Option>, -) -> Result { +) -> Result, AppError> { let encoded_update = { let mut txn = folder.collab.transact_mut(); let icon = icon.map(|icon| to_folder_view_icon(icon.clone())); @@ -539,10 +529,7 @@ async fn update_view_properties( }); txn.encode_update_v1() }; - Ok(FolderUpdate { - updated_encoded_collab: folder_to_encoded_collab(folder)?, - encoded_update, - }) + Ok(encoded_update) } async fn move_view( @@ -550,7 +537,7 @@ async fn move_view( new_parent_view_id: &str, prev_view_id: Option, folder: &mut Folder, -) -> Result { +) -> Result, AppError> { let encoded_update = { let mut txn = folder.collab.transact_mut(); folder @@ -558,13 +545,10 @@ async fn move_view( .move_nested_view(&mut txn, view_id, new_parent_view_id, prev_view_id); txn.encode_update_v1() }; - Ok(FolderUpdate { - updated_encoded_collab: folder_to_encoded_collab(folder)?, - encoded_update, - }) + Ok(encoded_update) } -async fn move_view_to_trash(view_id: &str, folder: &mut Folder) -> Result { +async fn move_view_to_trash(view_id: &str, folder: &mut Folder) -> Result, AppError> { let mut current_view_and_descendants = folder .get_views_belong_to(view_id) .iter() @@ -585,17 +569,10 @@ async fn move_view_to_trash(view_id: &str, folder: &mut Folder) -> Result Result { +async fn move_view_out_from_trash(view_id: &str, folder: &mut Folder) -> Result, AppError> { let encoded_update = { let mut txn = folder.collab.transact_mut(); folder @@ -604,14 +581,10 @@ async fn move_view_out_from_trash( .update_view(&mut txn, view_id, |update| update.set_trash(false).done()); txn.encode_update_v1() }; - - Ok(FolderUpdate { - updated_encoded_collab: folder_to_encoded_collab(folder)?, - encoded_update, - }) + Ok(encoded_update) } -async fn move_all_views_out_from_trash(folder: &mut Folder) -> Result { +async fn move_all_views_out_from_trash(folder: &mut Folder) -> Result, AppError> { let encoded_update = { let mut txn = folder.collab.transact_mut(); if let Some(op) = folder @@ -624,10 +597,7 @@ async fn move_all_views_out_from_trash(folder: &mut Folder) -> Result Result, AppError> { @@ -667,113 +637,24 @@ async fn delete_all_views_from_trash(folder: &mut Folder) -> Result, App Ok(encoded_update) } -fn folder_to_encoded_collab(folder: &Folder) -> Result, AppError> { - let collab_type = CollabType::Folder; - let encoded_folder_collab = folder - .encode_collab_v1(|collab| collab_type.validate_require_data(collab)) - .map_err(|err| AppError::Internal(anyhow!("Failed to encode workspace folder: {}", err)))?; - encoded_folder_collab.encode_to_bytes().map_err(|err| { - AppError::Internal(anyhow!( - "Failed to encode workspace folder to bytes: {}", - err - )) - }) -} - -fn workspace_database_to_encoded_collab( - workspace_db: &WorkspaceDatabase, -) -> Result, AppError> { - let encoded_workspace_db_collab = workspace_db - .encode_collab_v1() - .map_err(|err| AppError::Internal(anyhow!("Failed to encode workspace folder: {}", err)))?; - encoded_workspace_db_collab - .encode_to_bytes() - .map_err(|err| { - AppError::Internal(anyhow!( - "Failed to encode workspace folder to bytes: {}", - err - )) - }) -} - -async fn insert_and_broadcast_workspace_database_update( - uid: i64, - workspace_id: Uuid, - workspace_database_id: &str, - workspace_database_update: WorkspaceDatabaseUpdate, - collab_storage: &CollabAccessControlStorage, - transaction: &mut Transaction<'_, sqlx::Postgres>, -) -> Result<(), AppError> { - let params = CollabParams { - object_id: workspace_database_id.to_string(), - encoded_collab_v1: workspace_database_update.updated_encoded_collab.into(), - collab_type: CollabType::WorkspaceDatabase, - }; - let action_description = format!("Update workspace database: {}", workspace_id); - collab_storage - .upsert_new_collab_with_transaction( - &workspace_id.to_string(), - &uid, - params, - transaction, - &action_description, - ) - .await?; - broadcast_update( - collab_storage, - workspace_database_id, - workspace_database_update.encoded_updates.clone(), - ) - .await?; - Ok(()) -} - -async fn insert_and_broadcast_workspace_folder_update( - uid: i64, - workspace_id: Uuid, - folder_update: FolderUpdate, - collab_storage: &CollabAccessControlStorage, - transaction: &mut Transaction<'_, sqlx::Postgres>, -) -> Result<(), AppError> { - let params = CollabParams { - object_id: workspace_id.to_string(), - encoded_collab_v1: folder_update.updated_encoded_collab.into(), - collab_type: CollabType::Folder, - }; - let action_description = format!("Update workspace folder: {}", workspace_id); - collab_storage - .upsert_new_collab_with_transaction( - &workspace_id.to_string(), - &uid, - params, - transaction, - &action_description, - ) - .await?; - broadcast_update( - collab_storage, - &workspace_id.to_string(), - folder_update.encoded_update.clone(), - ) - .await?; - Ok(()) -} - +#[allow(clippy::too_many_arguments)] async fn create_document_page( + appflowy_web_metrics: &AppFlowyWebMetrics, + server: Data, + user: RealtimeUser, pg_pool: &PgPool, collab_storage: &CollabAccessControlStorage, - uid: i64, workspace_id: Uuid, parent_view_id: &str, name: Option<&str>, ) -> Result { let default_document_collab_params = prepare_default_document_collab_param()?; let view_id = default_document_collab_params.object_id.clone(); - let collab_origin = GetCollabOrigin::User { uid }; + let collab_origin = GetCollabOrigin::User { uid: user.uid }; let mut folder = get_latest_collab_folder(collab_storage, collab_origin, &workspace_id.to_string()).await?; let folder_update = add_new_view_to_folder( - uid, + user.uid, parent_view_id, &view_id, &mut folder, @@ -787,18 +668,18 @@ async fn create_document_page( collab_storage .upsert_new_collab_with_transaction( &workspace_id.to_string(), - &uid, + &user.uid, default_document_collab_params, &mut transaction, &action, ) .await?; - insert_and_broadcast_workspace_folder_update( - uid, + update_workspace_folder_data( + appflowy_web_metrics, + server, + user, workspace_id, folder_update, - collab_storage, - &mut transaction, ) .await?; transaction.commit().await?; @@ -806,10 +687,13 @@ async fn create_document_page( Ok(Page { view_id }) } +#[allow(clippy::too_many_arguments)] async fn create_grid_page( + appflowy_web_metrics: &AppFlowyWebMetrics, + server: Data, + user: RealtimeUser, pg_pool: &PgPool, collab_storage: &CollabAccessControlStorage, - uid: i64, workspace_id: Uuid, parent_view_id: &str, name: Option<&str>, @@ -819,9 +703,11 @@ async fn create_grid_page( let default_grid_encoded_database = prepare_default_grid_encoded_database(&view_id, &database_id, name.unwrap_or_default()).await?; create_database_page( + appflowy_web_metrics, + server, + user, pg_pool, collab_storage, - uid, workspace_id, parent_view_id, &view_id, @@ -832,10 +718,13 @@ async fn create_grid_page( .await } +#[allow(clippy::too_many_arguments)] async fn create_board_page( + appflowy_web_metrics: &AppFlowyWebMetrics, + server: Data, + user: RealtimeUser, pg_pool: &PgPool, collab_storage: &CollabAccessControlStorage, - uid: i64, workspace_id: Uuid, parent_view_id: &str, name: Option<&str>, @@ -846,9 +735,11 @@ async fn create_board_page( prepare_default_board_encoded_database(&view_id, &database_id, name.unwrap_or_default()) .await?; create_database_page( + appflowy_web_metrics, + server, + user, pg_pool, collab_storage, - uid, workspace_id, parent_view_id, &view_id, @@ -859,10 +750,13 @@ async fn create_board_page( .await } +#[allow(clippy::too_many_arguments)] async fn create_calendar_page( + appflowy_web_metrics: &AppFlowyWebMetrics, + server: Data, + user: RealtimeUser, pg_pool: &PgPool, collab_storage: &CollabAccessControlStorage, - uid: i64, workspace_id: Uuid, parent_view_id: &str, name: Option<&str>, @@ -873,9 +767,11 @@ async fn create_calendar_page( prepare_default_calendar_encoded_database(&view_id, &database_id, name.unwrap_or_default()) .await?; create_database_page( + appflowy_web_metrics, + server, + user, pg_pool, collab_storage, - uid, workspace_id, parent_view_id, &view_id, @@ -888,9 +784,11 @@ async fn create_calendar_page( #[allow(clippy::too_many_arguments)] async fn create_database_page( + appflowy_web_metrics: &AppFlowyWebMetrics, + server: Data, + user: RealtimeUser, pg_pool: &PgPool, collab_storage: &CollabAccessControlStorage, - uid: i64, workspace_id: Uuid, parent_view_id: &str, view_id: &str, @@ -898,15 +796,22 @@ async fn create_database_page( name: Option<&str>, encoded_database: &EncodedDatabase, ) -> Result { - let collab_origin = GetCollabOrigin::User { uid }; + let collab_origin = GetCollabOrigin::User { uid: user.uid }; let mut folder = get_latest_collab_folder( collab_storage, collab_origin.clone(), &workspace_id.to_string(), ) .await?; - let folder_update = - add_new_view_to_folder(uid, parent_view_id, view_id, &mut folder, name, view_layout).await?; + let folder_update = add_new_view_to_folder( + user.uid, + parent_view_id, + view_id, + &mut folder, + name, + view_layout, + ) + .await?; let (workspace_database_id, mut workspace_database) = get_latest_workspace_database(collab_storage, pg_pool, collab_origin, workspace_id).await?; let database_id = encoded_database.encoded_database_collab.object_id.clone(); @@ -937,30 +842,30 @@ async fn create_database_page( collab_storage .upsert_new_collab_with_transaction( &workspace_id.to_string(), - &uid, + &user.uid, database_collab_params, &mut transaction, &action, ) .await?; collab_storage - .batch_insert_new_collab(&workspace_id.to_string(), &uid, row_collab_params_list) + .batch_insert_new_collab(&workspace_id.to_string(), &user.uid, row_collab_params_list) .await?; - insert_and_broadcast_workspace_folder_update( - uid, + update_workspace_folder_data( + appflowy_web_metrics, + server.clone(), + user.clone(), workspace_id, folder_update, - collab_storage, - &mut transaction, ) .await?; - insert_and_broadcast_workspace_database_update( - uid, + update_workspace_database_data( + appflowy_web_metrics, + server, + user, workspace_id, &workspace_database_id, workspace_database_update, - collab_storage, - &mut transaction, ) .await?; transaction.commit().await?; @@ -970,40 +875,41 @@ async fn create_database_page( }) } +#[allow(clippy::too_many_arguments)] pub async fn move_page( - pg_pool: &PgPool, + appflowy_web_metrics: &AppFlowyWebMetrics, + server: Data, + user: RealtimeUser, collab_storage: &CollabAccessControlStorage, - uid: i64, workspace_id: Uuid, view_id: &str, new_parent_view_id: &str, prev_view_id: Option, ) -> Result<(), AppError> { - let collab_origin = GetCollabOrigin::User { uid }; + let collab_origin = GetCollabOrigin::User { uid: user.uid }; let mut folder = get_latest_collab_folder(collab_storage, collab_origin, &workspace_id.to_string()).await?; let folder_update = move_view(view_id, new_parent_view_id, prev_view_id, &mut folder).await?; - let mut transaction = pg_pool.begin().await?; - insert_and_broadcast_workspace_folder_update( - uid, + update_workspace_folder_data( + appflowy_web_metrics, + server, + user, workspace_id, folder_update, - collab_storage, - &mut transaction, ) .await?; - transaction.commit().await?; Ok(()) } pub async fn move_page_to_trash( - pg_pool: &PgPool, + appflowy_web_metrics: &AppFlowyWebMetrics, + server: Data, + user: RealtimeUser, collab_storage: &CollabAccessControlStorage, - uid: i64, workspace_id: Uuid, view_id: &str, ) -> Result<(), AppError> { - let collab_origin = GetCollabOrigin::User { uid }; + let collab_origin = GetCollabOrigin::User { uid: user.uid }; let mut folder = get_latest_collab_folder(collab_storage, collab_origin, &workspace_id.to_string()).await?; let trash_info = folder.get_my_trash_info(); @@ -1011,63 +917,59 @@ pub async fn move_page_to_trash( return Ok(()); } let folder_update = move_view_to_trash(view_id, &mut folder).await?; - let mut transaction = pg_pool.begin().await?; - insert_and_broadcast_workspace_folder_update( - uid, + update_workspace_folder_data( + appflowy_web_metrics, + server, + user, workspace_id, folder_update, - collab_storage, - &mut transaction, ) .await?; - transaction.commit().await?; Ok(()) } pub async fn restore_page_from_trash( - pg_pool: &PgPool, + appflowy_web_metrics: &AppFlowyWebMetrics, + server: Data, + user: RealtimeUser, collab_storage: &CollabAccessControlStorage, - uid: i64, workspace_id: Uuid, view_id: &str, ) -> Result<(), AppError> { - let collab_origin = GetCollabOrigin::User { uid }; + let collab_origin = GetCollabOrigin::User { uid: user.uid }; let mut folder = get_latest_collab_folder(collab_storage, collab_origin, &workspace_id.to_string()).await?; let folder_update = move_view_out_from_trash(view_id, &mut folder).await?; - let mut transaction = pg_pool.begin().await?; - insert_and_broadcast_workspace_folder_update( - uid, + update_workspace_folder_data( + appflowy_web_metrics, + server, + user, workspace_id, folder_update, - collab_storage, - &mut transaction, ) .await?; - transaction.commit().await?; Ok(()) } pub async fn restore_all_pages_from_trash( - pg_pool: &PgPool, + appflowy_web_metrics: &AppFlowyWebMetrics, + server: Data, + user: RealtimeUser, collab_storage: &CollabAccessControlStorage, - uid: i64, workspace_id: Uuid, ) -> Result<(), AppError> { - let collab_origin = GetCollabOrigin::User { uid }; + let collab_origin = GetCollabOrigin::User { uid: user.uid }; let mut folder = get_latest_collab_folder(collab_storage, collab_origin, &workspace_id.to_string()).await?; let folder_update = move_all_views_out_from_trash(&mut folder).await?; - let mut transaction = pg_pool.begin().await?; - insert_and_broadcast_workspace_folder_update( - uid, + update_workspace_folder_data( + appflowy_web_metrics, + server, + user, workspace_id, folder_update, - collab_storage, - &mut transaction, ) .await?; - transaction.commit().await?; Ok(()) } @@ -1106,29 +1008,28 @@ pub async fn delete_all_pages_from_trash( #[allow(clippy::too_many_arguments)] pub async fn update_page( - pg_pool: &PgPool, + appflowy_web_metrics: &AppFlowyWebMetrics, + server: Data, + user: RealtimeUser, collab_storage: &CollabAccessControlStorage, - uid: i64, workspace_id: Uuid, view_id: &str, name: &str, icon: Option<&ViewIcon>, extra: Option>, ) -> Result<(), AppError> { - let collab_origin = GetCollabOrigin::User { uid }; + let collab_origin = GetCollabOrigin::User { uid: user.uid }; let mut folder = get_latest_collab_folder(collab_storage, collab_origin, &workspace_id.to_string()).await?; let folder_update = update_view_properties(view_id, &mut folder, name, icon, extra).await?; - let mut transaction = pg_pool.begin().await?; - insert_and_broadcast_workspace_folder_update( - uid, + update_workspace_folder_data( + appflowy_web_metrics, + server, + user, workspace_id, folder_update, - collab_storage, - &mut transaction, ) .await?; - transaction.commit().await?; Ok(()) } @@ -1424,3 +1325,55 @@ pub async fn update_workspace_folder_data( }, } } + +#[instrument(level = "debug", skip_all)] +pub async fn update_workspace_database_data( + appflowy_web_metrics: &AppFlowyWebMetrics, + server: Data, + user: RealtimeUser, + workspace_id: Uuid, + workspace_database_id: &str, + update: Vec, +) -> Result<(), AppError> { + appflowy_web_metrics.record_update_size_bytes(update.len()); + + let (tx, rx) = tokio::sync::oneshot::channel(); + let message = ClientHttpUpdateMessage { + user, + workspace_id: workspace_id.to_string(), + object_id: workspace_database_id.to_string(), + collab_type: CollabType::WorkspaceDatabase, + update: Bytes::from(update), + state_vector: None, + return_tx: Some(tx), + }; + + server + .try_send(message) + .map_err(|err| AppError::Internal(anyhow!("Failed to send message to server: {}", err)))?; + + let resp = timeout_at( + tokio::time::Instant::now() + Duration::from_millis(2000), + rx, + ) + .await + .map_err(|err| { + appflowy_web_metrics.incr_apply_update_timeout_count(1); + AppError::Internal(anyhow!( + "Failed to receive apply update within timeout: {}", + err + )) + })? + .map_err(|err| AppError::Internal(anyhow!("Unable to receive folder update reply: {}", err)))?; + + match resp { + Ok(_) => Ok(()), + Err(err) => { + appflowy_web_metrics.incr_apply_update_failure_count(1); + Err(AppError::Internal(anyhow!( + "Failed to apply workspace database update: {}", + err + ))) + }, + } +} diff --git a/tests/workspace/page_view.rs b/tests/workspace/page_view.rs index a433dd251..6b4efe43f 100644 --- a/tests/workspace/page_view.rs +++ b/tests/workspace/page_view.rs @@ -287,7 +287,7 @@ async fn move_page_to_trash_then_restore() { .await .unwrap(); for view_id in view_ids_to_be_deleted.iter() { - app_client + web_client .api_client .move_workspace_page_view_to_trash(Uuid::parse_str(&workspace_id).unwrap(), view_id) .await @@ -379,7 +379,7 @@ async fn move_page_with_child_to_trash_then_restore() { .wait_object_sync_complete(&workspace_id) .await .unwrap(); - app_client + web_client .api_client .move_workspace_page_view_to_trash( Uuid::parse_str(&workspace_id).unwrap(), @@ -453,7 +453,7 @@ async fn move_page_with_child_to_trash_then_delete_permanently() { .wait_object_sync_complete(&workspace_id) .await .unwrap(); - app_client + web_client .api_client .move_workspace_page_view_to_trash( Uuid::parse_str(&workspace_id).unwrap(), @@ -528,7 +528,7 @@ async fn move_page_with_child_to_trash_then_delete_all_permanently() { .wait_object_sync_complete(&workspace_id) .await .unwrap(); - app_client + web_client .api_client .move_workspace_page_view_to_trash( Uuid::parse_str(&workspace_id).unwrap(), From 7ff9a923bbb229922aafd45060dd168dc4062aa7 Mon Sep 17 00:00:00 2001 From: Khor Shu Heng <32997938+khorshuheng@users.noreply.github.com> Date: Wed, 18 Dec 2024 22:48:10 +0800 Subject: [PATCH 4/7] fix: incorrect group subscription criteria for http real time user (#1087) From e758f18d7599b32520b1a85e82dd96673cfad12c Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Wed, 18 Dec 2024 22:48:25 +0800 Subject: [PATCH 5/7] chore: add metadata column to save embeding info (#1086) --- libs/database-entity/src/dto.rs | 61 +------------------ .../src/index/collab_embeddings_ops.rs | 8 ++- ...18090459_collab_embedding_add_metadata.sql | 42 +++++++++++++ .../src/indexer/document_indexer.rs | 4 ++ src/api/workspace.rs | 2 +- 5 files changed, 53 insertions(+), 64 deletions(-) create mode 100644 migrations/20241218090459_collab_embedding_add_metadata.sql diff --git a/libs/database-entity/src/dto.rs b/libs/database-entity/src/dto.rs index 432f2b381..6f77bf80b 100644 --- a/libs/database-entity/src/dto.rs +++ b/libs/database-entity/src/dto.rs @@ -746,44 +746,7 @@ pub struct AFCollabEmbeddedChunk { pub content_type: EmbeddingContentType, pub content: String, pub embedding: Option>, -} - -impl AFCollabEmbeddedChunk { - pub fn from_proto(proto: &proto::collab::CollabEmbeddingsParams) -> Result { - let collab_type_proto = proto::collab::CollabType::try_from(proto.collab_type).unwrap(); - let collab_type = CollabType::from_proto(&collab_type_proto); - let content_type_proto = - proto::collab::EmbeddingContentType::try_from(proto.content_type).unwrap(); - let content_type = EmbeddingContentType::from_proto(content_type_proto)?; - let embedding = if proto.embedding.is_empty() { - None - } else { - Some(proto.embedding.clone()) - }; - Ok(Self { - fragment_id: proto.fragment_id.clone(), - object_id: proto.object_id.clone(), - collab_type, - content_type, - content: proto.content.clone(), - embedding, - }) - } - - pub fn to_proto(&self) -> proto::collab::CollabEmbeddingsParams { - proto::collab::CollabEmbeddingsParams { - fragment_id: self.fragment_id.clone(), - object_id: self.object_id.clone(), - collab_type: self.collab_type.to_proto() as i32, - content_type: self.content_type.to_proto() as i32, - content: self.content.clone(), - embedding: self.embedding.clone().unwrap_or_default(), - } - } - - pub fn to_protobuf_bytes(&self) -> Vec { - self.to_proto().encode_to_vec() - } + pub metadata: serde_json::Value, } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] @@ -792,28 +755,6 @@ pub struct AFCollabEmbeddings { pub params: Vec, } -impl AFCollabEmbeddings { - pub fn from_proto(proto: proto::collab::CollabEmbeddings) -> Result { - let mut params = vec![]; - for param in proto.embeddings { - params.push(AFCollabEmbeddedChunk::from_proto(¶m)?); - } - Ok(Self { - tokens_consumed: proto.tokens_consumed, - params, - }) - } - - pub fn to_proto(&self) -> proto::collab::CollabEmbeddings { - let embeddings: Vec = - self.params.iter().map(|param| param.to_proto()).collect(); - proto::collab::CollabEmbeddings { - tokens_consumed: self.tokens_consumed, - embeddings, - } - } -} - /// Type of content stored by the embedding. /// Currently only plain text of the document is supported. /// In the future, we might support other kinds like i.e. PDF, images or image-extracted text. diff --git a/libs/database/src/index/collab_embeddings_ops.rs b/libs/database/src/index/collab_embeddings_ops.rs index 874c271da..4063699ec 100644 --- a/libs/database/src/index/collab_embeddings_ops.rs +++ b/libs/database/src/index/collab_embeddings_ops.rs @@ -57,12 +57,13 @@ WHERE w.workspace_id = $1"#, } #[derive(sqlx::Type)] -#[sqlx(type_name = "af_fragment", no_pg_array)] +#[sqlx(type_name = "af_fragment_v2", no_pg_array)] struct Fragment { fragment_id: String, content_type: i32, contents: String, embedding: Option, + metadata: serde_json::Value, } impl From for Fragment { @@ -72,13 +73,14 @@ impl From for Fragment { content_type: value.content_type as i32, contents: value.content, embedding: value.embedding.map(Vector::from), + metadata: value.metadata, } } } impl PgHasArrayType for Fragment { fn array_type_info() -> PgTypeInfo { - PgTypeInfo::with_name("af_fragment[]") + PgTypeInfo::with_name("af_fragment_v2[]") } } @@ -96,7 +98,7 @@ pub async fn upsert_collab_embeddings( let collab_type = records[0].collab_type.clone(); let fragments = records.into_iter().map(Fragment::from).collect::>(); - sqlx::query(r#"CALL af_collab_embeddings_upsert($1, $2, $3, $4, $5::af_fragment[])"#) + sqlx::query(r#"CALL af_collab_embeddings_upsert($1, $2, $3, $4, $5::af_fragment_v2[])"#) .bind(*workspace_id) .bind(object_id) .bind(crate::collab::partition_key_from_collab_type(&collab_type)) diff --git a/migrations/20241218090459_collab_embedding_add_metadata.sql b/migrations/20241218090459_collab_embedding_add_metadata.sql new file mode 100644 index 000000000..92ebaabea --- /dev/null +++ b/migrations/20241218090459_collab_embedding_add_metadata.sql @@ -0,0 +1,42 @@ +-- Add migration script here +ALTER TABLE af_collab_embeddings +ADD COLUMN metadata JSONB DEFAULT '{}'::jsonb; + +CREATE TYPE af_fragment_v2 AS ( + fragment_id TEXT, + content_type INT, + contents TEXT, + embedding VECTOR(1536), + metadata JSONB +); + +CREATE OR REPLACE PROCEDURE af_collab_embeddings_upsert( + IN p_workspace_id UUID, + IN p_oid TEXT, + IN p_partition_key INT, + IN p_tokens_used INT, + IN p_fragments af_fragment_v2[] +) +LANGUAGE plpgsql +AS $$ +BEGIN + DELETE FROM af_collab_embeddings WHERE oid = p_oid; + INSERT INTO af_collab_embeddings (fragment_id, oid, partition_key, content_type, content, embedding, indexed_at, metadata) + SELECT + f.fragment_id, + p_oid, + p_partition_key, + f.content_type, + f.contents, + f.embedding, + NOW(), + f.metadata + FROM UNNEST(p_fragments) as f; + + -- Update the usage tracking table + INSERT INTO af_workspace_ai_usage(created_at, workspace_id, search_requests, search_tokens_consumed, index_tokens_consumed) + VALUES (now()::date, p_workspace_id, 0, 0, p_tokens_used) + ON CONFLICT (created_at, workspace_id) + DO UPDATE SET index_tokens_consumed = af_workspace_ai_usage.index_tokens_consumed + p_tokens_used; +END +$$; \ No newline at end of file diff --git a/services/appflowy-collaborate/src/indexer/document_indexer.rs b/services/appflowy-collaborate/src/indexer/document_indexer.rs index 1ce9c8b54..fec2d845f 100644 --- a/services/appflowy-collaborate/src/indexer/document_indexer.rs +++ b/services/appflowy-collaborate/src/indexer/document_indexer.rs @@ -12,6 +12,7 @@ use collab_document::document::DocumentBody; use collab_document::error::DocumentError; use collab_entity::CollabType; use database_entity::dto::{AFCollabEmbeddedChunk, AFCollabEmbeddings, EmbeddingContentType}; +use serde_json::json; use tracing::trace; use uuid::Uuid; @@ -106,6 +107,8 @@ fn split_text_into_chunks( // We assume that every token is ~4 bytes. We're going to split document content into fragments // of ~2000 tokens each. let split_contents = split_text_by_max_content_len(content, 8000)?; + let metadata = + json!({"id": object_id, "source": "appflowy", "name": "document", "collab_type": collab_type }); Ok( split_contents .into_iter() @@ -116,6 +119,7 @@ fn split_text_into_chunks( content_type: EmbeddingContentType::PlainText, content, embedding: None, + metadata: metadata.clone(), }) .collect(), ) diff --git a/src/api/workspace.rs b/src/api/workspace.rs index aa3f5e00d..fa18bb2d5 100644 --- a/src/api/workspace.rs +++ b/src/api/workspace.rs @@ -2397,7 +2397,7 @@ async fn collab_full_sync_handler( uid, device_id, connect_at: timestamp(), - session_id: uuid::Uuid::new_v4().to_string(), + session_id: Uuid::new_v4().to_string(), app_version, }; From ecadf8e287204c9f05d9dfe63b88b47f8d446068 Mon Sep 17 00:00:00 2001 From: Richard Shiue <71320345+richardshiue@users.noreply.github.com> Date: Thu, 19 Dec 2024 00:12:53 +0800 Subject: [PATCH 6/7] chore: find question message from reply message (#1085) * chore: find question message from answer message id * chore: sqlx * test: fix tests * test: fix test * chore: apply code suggestions to 2 files --- ...8c14137dd09b11be73442a7f46b2f938b8445.json | 53 +++++++++++++ libs/client-api/src/http_chat.rs | 22 ++++++ libs/database/src/chat/chat_ops.rs | 37 +++++++++ src/api/chat.rs | 23 +++++- src/biz/chat/ops.rs | 14 +++- tests/ai_test/chat_test.rs | 76 +++++++++++++++++-- 6 files changed, 216 insertions(+), 9 deletions(-) create mode 100644 .sqlx/query-794c4ced16801b3e98a62eb44c18c14137dd09b11be73442a7f46b2f938b8445.json diff --git a/.sqlx/query-794c4ced16801b3e98a62eb44c18c14137dd09b11be73442a7f46b2f938b8445.json b/.sqlx/query-794c4ced16801b3e98a62eb44c18c14137dd09b11be73442a7f46b2f938b8445.json new file mode 100644 index 000000000..1148f8cd7 --- /dev/null +++ b/.sqlx/query-794c4ced16801b3e98a62eb44c18c14137dd09b11be73442a7f46b2f938b8445.json @@ -0,0 +1,53 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT message_id, content, created_at, author, meta_data, reply_message_id\n FROM af_chat_messages\n WHERE chat_id = $1\n AND reply_message_id = $2\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "message_id", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "content", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "created_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 3, + "name": "author", + "type_info": "Jsonb" + }, + { + "ordinal": 4, + "name": "meta_data", + "type_info": "Jsonb" + }, + { + "ordinal": 5, + "name": "reply_message_id", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Int8" + ] + }, + "nullable": [ + false, + false, + false, + false, + false, + true + ] + }, + "hash": "794c4ced16801b3e98a62eb44c18c14137dd09b11be73442a7f46b2f938b8445" +} diff --git a/libs/client-api/src/http_chat.rs b/libs/client-api/src/http_chat.rs index 36f015a8b..dc5401217 100644 --- a/libs/client-api/src/http_chat.rs +++ b/libs/client-api/src/http_chat.rs @@ -262,6 +262,28 @@ impl Client { .into_data() } + pub async fn get_question_message_from_answer_id( + &self, + workspace_id: &str, + chat_id: &str, + answer_message_id: i64, + ) -> Result, AppResponseError> { + let url = format!( + "{}/api/chat/{workspace_id}/{chat_id}/message/find_question", + self.base_url + ); + + let resp = self + .http_client_with_auth(Method::GET, &url) + .await? + .query(&[("answer_message_id", answer_message_id)]) + .send() + .await?; + AppResponse::>::from_response(resp) + .await? + .into_data() + } + pub async fn calculate_similarity( &self, params: CalculateSimilarityParams, diff --git a/libs/database/src/chat/chat_ops.rs b/libs/database/src/chat/chat_ops.rs index 7bb625ced..d415fa67f 100644 --- a/libs/database/src/chat/chat_ops.rs +++ b/libs/database/src/chat/chat_ops.rs @@ -669,3 +669,40 @@ pub async fn select_chat_message_content<'a, E: Executor<'a, Database = Postgres .await?; Ok((row.content, row.meta_data)) } + +pub async fn select_chat_message_matching_reply_message_id( + txn: &mut Transaction<'_, Postgres>, + chat_id: &str, + reply_message_id: i64, +) -> Result, AppError> { + let chat_id = Uuid::from_str(chat_id)?; + let row = sqlx::query!( + r#" + SELECT message_id, content, created_at, author, meta_data, reply_message_id + FROM af_chat_messages + WHERE chat_id = $1 + AND reply_message_id = $2 + "#, + &chat_id, + reply_message_id + ) + .fetch_one(txn.deref_mut()) + .await?; + + let message = match serde_json::from_value::(row.author) { + Ok(author) => Some(ChatMessage { + author, + message_id: row.message_id, + content: row.content, + created_at: row.created_at, + meta_data: row.meta_data, + reply_message_id: row.reply_message_id, + }), + Err(err) => { + warn!("Failed to deserialize author: {}", err); + None + }, + }; + + Ok(message) +} diff --git a/src/api/chat.rs b/src/api/chat.rs index c44ebba41..67dc1fabc 100644 --- a/src/api/chat.rs +++ b/src/api/chat.rs @@ -1,10 +1,11 @@ use crate::biz::chat::ops::{ create_chat, create_chat_message, delete_chat, generate_chat_message_answer, get_chat_messages, - update_chat_message, + get_question_message, update_chat_message, }; use crate::state::AppState; use actix_web::web::{Data, Json}; use actix_web::{web, HttpRequest, HttpResponse, Scope}; +use serde::Deserialize; use crate::api::util::ai_model_from_header; use app_error::AppError; @@ -69,6 +70,10 @@ pub fn chat_scope() -> Scope { web::resource("/{chat_id}/message/answer") .route(web::post().to(save_answer_handler)) ) + .service( + web::resource("/{chat_id}/message/find_question") + .route(web::get().to(get_chat_question_message_handler)) + ) // AI response generation .service( @@ -349,6 +354,17 @@ async fn get_chat_message_handler( Ok(AppResponse::Ok().with_data(messages).into()) } +#[instrument(level = "debug", skip_all, err)] +async fn get_chat_question_message_handler( + path: web::Path<(String, String)>, + query: web::Query, + state: Data, +) -> actix_web::Result>> { + let (_workspace_id, chat_id) = path.into_inner(); + let message = get_question_message(&state.pg_pool, &chat_id, query.0.answer_message_id).await?; + Ok(AppResponse::Ok().with_data(message).into()) +} + #[instrument(level = "debug", skip_all, err)] async fn get_chat_settings_handler( path: web::Path<(String, String)>, @@ -501,3 +517,8 @@ where } } } + +#[derive(Debug, Deserialize)] +struct FindQuestionParams { + answer_message_id: i64, +} diff --git a/src/biz/chat/ops.rs b/src/biz/chat/ops.rs index 65a5c125f..ff6e2541a 100644 --- a/src/biz/chat/ops.rs +++ b/src/biz/chat/ops.rs @@ -8,7 +8,7 @@ use database::chat; use database::chat::chat_ops::{ delete_answer_message_by_question_message_id, insert_answer_message, insert_answer_message_with_transaction, insert_chat, insert_question_message, - select_chat_messages, + select_chat_message_matching_reply_message_id, select_chat_messages, }; use futures::stream::Stream; use serde_json::json; @@ -232,3 +232,15 @@ pub async fn get_chat_messages( txn.commit().await?; Ok(messages) } + +pub async fn get_question_message( + pg_pool: &PgPool, + chat_id: &str, + answer_message_id: i64, +) -> Result, AppError> { + let mut txn = pg_pool.begin().await?; + let message = + select_chat_message_matching_reply_message_id(&mut txn, chat_id, answer_message_id).await?; + txn.commit().await?; + Ok(message) +} diff --git a/tests/ai_test/chat_test.rs b/tests/ai_test/chat_test.rs index 46da750d5..df21dc339 100644 --- a/tests/ai_test/chat_test.rs +++ b/tests/ai_test/chat_test.rs @@ -6,8 +6,8 @@ use client_api_test::{ai_test_enabled, TestClient}; use futures_util::StreamExt; use serde_json::json; use shared_entity::dto::chat_dto::{ - ChatMessageMetadata, ChatRAGData, CreateChatMessageParams, CreateChatParams, MessageCursor, - UpdateChatParams, + ChatMessageMetadata, ChatRAGData, CreateAnswerMessageParams, CreateChatMessageParams, + CreateChatParams, MessageCursor, UpdateChatParams, }; #[tokio::test] @@ -344,6 +344,10 @@ async fn create_chat_context_test() { // #[tokio::test] // async fn update_chat_message_test() { +// if !ai_test_enabled() { +// return; +// } + // let test_client = TestClient::new_user_without_ws_conn().await; // let workspace_id = test_client.workspace_id().await; // let chat_id = uuid::Uuid::new_v4().to_string(); @@ -352,13 +356,13 @@ async fn create_chat_context_test() { // name: "my second chat".to_string(), // rag_ids: vec![], // }; -// + // test_client // .api_client // .create_chat(&workspace_id, params) // .await // .unwrap(); -// + // let params = CreateChatMessageParams::new_user("where is singapore?"); // let stream = test_client // .api_client @@ -367,7 +371,7 @@ async fn create_chat_context_test() { // .unwrap(); // let messages: Vec = stream.map(|message| message.unwrap()).collect().await; // assert_eq!(messages.len(), 2); -// + // let params = UpdateChatMessageContentParams { // chat_id: chat_id.clone(), // message_id: messages[0].message_id, @@ -378,7 +382,7 @@ async fn create_chat_context_test() { // .update_chat_message(&workspace_id, &chat_id, params) // .await // .unwrap(); -// + // let remote_messages = test_client // .api_client // .get_chat_messages(&workspace_id, &chat_id, MessageCursor::NextBack, 2) @@ -387,11 +391,69 @@ async fn create_chat_context_test() { // .messages; // assert_eq!(remote_messages[0].content, "where is China?"); // assert_eq!(remote_messages.len(), 2); -// + // // when the question was updated, the answer should be different // assert_ne!(remote_messages[1].content, messages[1].content); // } +#[tokio::test] +async fn get_question_message_test() { + if !ai_test_enabled() { + return; + } + + let test_client = TestClient::new_user_without_ws_conn().await; + let workspace_id = test_client.workspace_id().await; + let chat_id = uuid::Uuid::new_v4().to_string(); + let params = CreateChatParams { + chat_id: chat_id.clone(), + name: "my ai chat".to_string(), + rag_ids: vec![], + }; + + test_client + .api_client + .create_chat(&workspace_id, params) + .await + .unwrap(); + + let params = CreateChatMessageParams::new_user("where is singapore?"); + let question = test_client + .api_client + .create_question(&workspace_id, &chat_id, params) + .await + .unwrap(); + + let answer = test_client + .api_client + .get_answer(&workspace_id, &chat_id, question.message_id) + .await + .unwrap(); + + test_client + .api_client + .save_answer( + &workspace_id, + &chat_id, + CreateAnswerMessageParams { + content: answer.content, + metadata: None, + question_message_id: question.message_id, + }, + ) + .await + .unwrap(); + + let find_question = test_client + .api_client + .get_question_message_from_answer_id(&workspace_id, &chat_id, answer.message_id) + .await + .unwrap() + .unwrap(); + + assert_eq!(find_question.reply_message_id.unwrap(), answer.message_id); +} + async fn collect_answer(mut stream: QuestionStream) -> String { let mut answer = String::new(); while let Some(value) = stream.next().await { From ea131f0baab67defe7591067357eced490072372 Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Thu, 19 Dec 2024 12:58:39 +0800 Subject: [PATCH 7/7] chore: map ai error code (#1090) --- libs/app-error/src/lib.rs | 33 ++++++++++++++++++----- libs/appflowy-ai-client/src/client.rs | 30 ++++++++++++++++----- libs/appflowy-ai-client/src/error.rs | 3 +++ libs/client-api/src/http_chat.rs | 13 ++++++++- libs/shared-entity/src/response_stream.rs | 7 ++++- src/api/chat.rs | 2 +- src/biz/chat/ops.rs | 3 ++- 7 files changed, 75 insertions(+), 16 deletions(-) diff --git a/libs/app-error/src/lib.rs b/libs/app-error/src/lib.rs index 4df946f1d..0ba39eefc 100644 --- a/libs/app-error/src/lib.rs +++ b/libs/app-error/src/lib.rs @@ -3,6 +3,7 @@ pub mod gotrue; #[cfg(feature = "gotrue_error")] use crate::gotrue::GoTrueError; +use std::error::Error; use std::string::FromUtf8Error; #[cfg(feature = "appflowy_ai_error")] @@ -277,13 +278,32 @@ impl From for AppError { return AppError::RequestTimeout(error.to_string()); } - if error.is_request() { - return if error.status() == Some(StatusCode::PAYLOAD_TOO_LARGE) { - AppError::PayloadTooLarge(error.to_string()) - } else { - AppError::InvalidRequest(error.to_string()) - }; + if let Some(cause) = error.source() { + if cause + .to_string() + .contains("connection closed before message completed") + { + return AppError::ServiceTemporaryUnavailable(error.to_string()); + } } + + // Handle request-related errors + if let Some(status_code) = error.status() { + if error.is_request() { + match status_code { + StatusCode::PAYLOAD_TOO_LARGE => { + return AppError::PayloadTooLarge(error.to_string()); + }, + status_code if status_code.is_server_error() => { + return AppError::ServiceTemporaryUnavailable(error.to_string()); + }, + _ => { + return AppError::InvalidRequest(error.to_string()); + }, + } + } + } + AppError::Unhandled(error.to_string()) } } @@ -447,6 +467,7 @@ impl From for AppError { AIError::PayloadTooLarge(err) => AppError::PayloadTooLarge(err), AIError::InvalidRequest(err) => AppError::InvalidRequest(err), AIError::SerdeError(err) => AppError::SerdeError(err), + AIError::ServiceUnavailable(err) => AppError::AIServiceUnavailable(err), } } } diff --git a/libs/appflowy-ai-client/src/client.rs b/libs/appflowy-ai-client/src/client.rs index 49457173e..8b9af236d 100644 --- a/libs/appflowy-ai-client/src/client.rs +++ b/libs/appflowy-ai-client/src/client.rs @@ -373,6 +373,11 @@ where resp: reqwest::Response, ) -> Result>, AIError> { let status_code = resp.status(); + if status_code.is_server_error() { + let body = resp.text().await?; + return Err(AIError::ServiceUnavailable(body)); + } + if !status_code.is_success() { let body = resp.text().await?; return Err(AIError::InvalidRequest(body)); @@ -385,16 +390,29 @@ where } impl From for AIError { fn from(error: reqwest::Error) -> Self { + if error.is_connect() { + return AIError::ServiceUnavailable(error.to_string()); + } + if error.is_timeout() { return AIError::RequestTimeout(error.to_string()); } - if error.is_request() { - return if error.status() == Some(StatusCode::PAYLOAD_TOO_LARGE) { - AIError::PayloadTooLarge(error.to_string()) - } else { - AIError::InvalidRequest(format!("{:?}", error)) - }; + // Handle request-related errors + if let Some(status_code) = error.status() { + if error.is_request() { + match status_code { + StatusCode::PAYLOAD_TOO_LARGE => { + return AIError::PayloadTooLarge(error.to_string()); + }, + status_code if status_code.is_server_error() => { + return AIError::ServiceUnavailable(error.to_string()); + }, + _ => { + return AIError::InvalidRequest(format!("{:?}", error)); + }, + } + } } AIError::Internal(error.into()) } diff --git a/libs/appflowy-ai-client/src/error.rs b/libs/appflowy-ai-client/src/error.rs index d82520c62..c2f80a856 100644 --- a/libs/appflowy-ai-client/src/error.rs +++ b/libs/appflowy-ai-client/src/error.rs @@ -14,4 +14,7 @@ pub enum AIError { #[error(transparent)] SerdeError(#[from] serde_json::Error), + + #[error("Service unavailable:{0}")] + ServiceUnavailable(String), } diff --git a/libs/client-api/src/http_chat.rs b/libs/client-api/src/http_chat.rs index dc5401217..9c09aa2c4 100644 --- a/libs/client-api/src/http_chat.rs +++ b/libs/client-api/src/http_chat.rs @@ -1,6 +1,7 @@ use crate::http::log_request_id; use crate::Client; +use app_error::AppError; use client_api_entity::chat_dto::{ ChatMessage, CreateAnswerMessageParams, CreateChatMessageParams, CreateChatParams, MessageCursor, RepeatedChatMessage, UpdateChatMessageContentParams, @@ -154,7 +155,17 @@ impl Client { .await? .timeout(Duration::from_secs(30)) .send() - .await?; + .await + .map_err(|err| { + let app_err = AppError::from(err); + if matches!(app_err, AppError::ServiceTemporaryUnavailable(_)) { + AppError::AIServiceUnavailable( + "AI service temporarily unavailable, please try again later".to_string(), + ) + } else { + app_err + } + })?; log_request_id(&resp); let stream = AppResponse::::json_response_stream(resp).await?; Ok(QuestionStream::new(stream)) diff --git a/libs/shared-entity/src/response_stream.rs b/libs/shared-entity/src/response_stream.rs index 034c4160b..aea8f6fa4 100644 --- a/libs/shared-entity/src/response_stream.rs +++ b/libs/shared-entity/src/response_stream.rs @@ -1,5 +1,5 @@ use crate::response::{AppResponse, AppResponseError}; -use app_error::ErrorCode; +use app_error::{AppError, ErrorCode}; use bytes::{Buf, Bytes, BytesMut}; use futures::{ready, Stream, TryStreamExt}; @@ -22,6 +22,11 @@ where resp: reqwest::Response, ) -> Result>, AppResponseError> { let status_code = resp.status(); + if status_code.is_server_error() { + let body = resp.text().await?; + return Err(AppError::AIServiceUnavailable(body).into()); + } + if !status_code.is_success() { let body = resp.text().await?; return Err(AppResponseError::new(ErrorCode::Internal, body)); diff --git a/src/api/chat.rs b/src/api/chat.rs index 67dc1fabc..62e465dba 100644 --- a/src/api/chat.rs +++ b/src/api/chat.rs @@ -316,7 +316,7 @@ async fn answer_stream_v2_handler( ) }, Err(err) => Ok( - HttpResponse::Ok() + HttpResponse::ServiceUnavailable() .content_type("text/event-stream") .streaming(stream::once(async move { Err(AppError::AIServiceUnavailable(err.to_string())) diff --git a/src/biz/chat/ops.rs b/src/biz/chat/ops.rs index ff6e2541a..d542e42f2 100644 --- a/src/biz/chat/ops.rs +++ b/src/biz/chat/ops.rs @@ -97,7 +97,8 @@ pub async fn generate_chat_message_answer( &ai_model, Some(metadata), ) - .await?; + .await + .map_err(|err| AppError::AIServiceUnavailable(err.to_string()))?; info!("new_answer: {:?}", new_answer); // Save the answer to the database