diff --git a/libs/database-entity/src/dto.rs b/libs/database-entity/src/dto.rs index bb5e2db1b..c5e979ac2 100644 --- a/libs/database-entity/src/dto.rs +++ b/libs/database-entity/src/dto.rs @@ -746,6 +746,8 @@ pub struct AFCollabEmbeddedChunk { pub content: String, pub embedding: Option>, pub metadata: serde_json::Value, + pub fragment_index: i32, + pub embedded_type: i16, } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] diff --git a/libs/database/src/index/collab_embeddings_ops.rs b/libs/database/src/index/collab_embeddings_ops.rs index e97b6d6ba..8db2bdbac 100644 --- a/libs/database/src/index/collab_embeddings_ops.rs +++ b/libs/database/src/index/collab_embeddings_ops.rs @@ -62,13 +62,15 @@ WHERE w.workspace_id = $1"#, } #[derive(sqlx::Type)] -#[sqlx(type_name = "af_fragment_v2", no_pg_array)] +#[sqlx(type_name = "af_fragment_v3", no_pg_array)] struct Fragment { fragment_id: String, content_type: i32, contents: String, embedding: Option, metadata: serde_json::Value, + fragment_index: i32, + embedded_type: i16, } impl From for Fragment { @@ -79,13 +81,15 @@ impl From for Fragment { contents: value.content, embedding: value.embedding.map(Vector::from), metadata: value.metadata, + fragment_index: value.fragment_index, + embedded_type: value.embedded_type, } } } impl PgHasArrayType for Fragment { fn array_type_info() -> PgTypeInfo { - PgTypeInfo::with_name("af_fragment_v2[]") + PgTypeInfo::with_name("af_fragment_v3[]") } } @@ -103,7 +107,7 @@ pub async fn upsert_collab_embeddings( object_id, fragments.len() ); - sqlx::query(r#"CALL af_collab_embeddings_upsert($1, $2, $3, $4, $5::af_fragment_v2[])"#) + sqlx::query(r#"CALL af_collab_embeddings_upsert($1, $2, $3, $4, $5::af_fragment_v3[])"#) .bind(*workspace_id) .bind(object_id) .bind(crate::collab::partition_key_from_collab_type(&collab_type)) diff --git a/libs/indexer/src/collab_indexer/document_indexer.rs b/libs/indexer/src/collab_indexer/document_indexer.rs index b0409864b..73ba7c0c7 100644 --- a/libs/indexer/src/collab_indexer/document_indexer.rs +++ b/libs/indexer/src/collab_indexer/document_indexer.rs @@ -123,13 +123,16 @@ fn split_text_into_chunks( Ok( split_contents .into_iter() - .map(|content| AFCollabEmbeddedChunk { + .enumerate() + .map(|(index, content)| AFCollabEmbeddedChunk { fragment_id: Uuid::new_v4().to_string(), object_id: object_id.clone(), content_type: EmbeddingContentType::PlainText, content, embedding: None, metadata: metadata.clone(), + fragment_index: index as i32, + embedded_type: 0, }) .collect(), ) diff --git a/migrations/20241230064618_collab_embedding_add_fragment_index.sql b/migrations/20241230064618_collab_embedding_add_fragment_index.sql new file mode 100644 index 000000000..04ce605a8 --- /dev/null +++ b/migrations/20241230064618_collab_embedding_add_fragment_index.sql @@ -0,0 +1,47 @@ +-- Add migration script here +ALTER TABLE af_collab_embeddings +ADD COLUMN fragment_index INTEGER DEFAULT 0, +ADD COLUMN embedder_type SMALLINT DEFAULT 0; + +CREATE TYPE af_fragment_v3 AS ( + fragment_id TEXT, + content_type INT, + contents TEXT, + embedding VECTOR(1536), + metadata JSONB, + fragment_index INTEGER, + embedder_type SMALLINT +); + +CREATE OR REPLACE PROCEDURE af_collab_embeddings_upsert( + IN p_workspace_id UUID, + IN p_oid TEXT, + IN p_partition_key INT, + IN p_tokens_used INT, + IN p_fragments af_fragment_v3[] +) +LANGUAGE plpgsql +AS $$ +BEGIN + DELETE FROM af_collab_embeddings WHERE oid = p_oid; + INSERT INTO af_collab_embeddings (fragment_id, oid, partition_key, content_type, content, embedding, indexed_at, metadata, fragment_index, embedder_type) + SELECT + f.fragment_id, + p_oid, + p_partition_key, + f.content_type, + f.contents, + f.embedding, + NOW(), + f.metadata, + f.fragment_index, + f.embedder_type + FROM UNNEST(p_fragments) as f; + + -- Update the usage tracking table + INSERT INTO af_workspace_ai_usage(created_at, workspace_id, search_requests, search_tokens_consumed, index_tokens_consumed) + VALUES (now()::date, p_workspace_id, 0, 0, p_tokens_used) + ON CONFLICT (created_at, workspace_id) + DO UPDATE SET index_tokens_consumed = af_workspace_ai_usage.index_tokens_consumed + p_tokens_used; +END +$$; \ No newline at end of file