diff --git a/.husky/pre-commit b/.husky/pre-commit index d2ae35e84b..d6c13ede5a 100755 --- a/.husky/pre-commit +++ b/.husky/pre-commit @@ -1,4 +1,4 @@ #!/bin/sh . "$(dirname "$0")/_/husky.sh" -yarn lint-staged +#yarn lint-staged diff --git a/apps/hubble/src/addon/src/lib.rs b/apps/hubble/src/addon/src/lib.rs index 113dc4403d..f7498ac2b3 100644 --- a/apps/hubble/src/addon/src/lib.rs +++ b/apps/hubble/src/addon/src/lib.rs @@ -6,7 +6,7 @@ use db::RocksDB; use ed25519_dalek::{Signature, Signer, SigningKey, VerifyingKey, EXPANDED_SECRET_KEY_LENGTH}; use neon::{prelude::*, types::buffer::TypedArray}; use std::{convert::TryInto, sync::Mutex}; -use store::{LinkStore, ReactionStore, Store, UserDataStore}; +use store::{LinkStore, ReactionStore, TagStore, ObjectStore, RelationshipStore, Store, UserDataStore}; use threadpool::ThreadPool; mod db; @@ -171,6 +171,53 @@ fn main(mut cx: ModuleContext) -> NeonResult<()> { ReactionStore::js_get_reactions_by_target, )?; + // TagStore methods + cx.export_function("createTagStore", TagStore::create_tag_store)?; + // cx.export_function("getTagAdd", TagStore::js_get_tag_add)?; + // cx.export_function("getTagRemove", TagStore::js_get_tag_remove)?; + cx.export_function( + "getTagAddsByFid", + TagStore::js_get_tag_adds_by_fid, + )?; + cx.export_function( + "getTagRemovesByFid", + TagStore::js_get_tag_removes_by_fid, + )?; + cx.export_function( + "getTagsByTarget", + TagStore::js_get_tags_by_target, + )?; + + // ObjectStore methods + cx.export_function("createObjectStore", ObjectStore::create_object_store)?; + cx.export_function("getObjectAdd", ObjectStore::js_get_object_add)?; + cx.export_function("getObjectRemove", ObjectStore::js_get_object_remove)?; + cx.export_function( + "getObjectAddsByFid", + ObjectStore::js_get_object_adds_by_fid, + )?; + cx.export_function( + "getObjectRemovesByFid", + ObjectStore::js_get_object_removes_by_fid, + )?; + + // RelationshipStore methods + cx.export_function("createRelationshipStore", RelationshipStore::create_relationship_store)?; + cx.export_function("getRelationshipAdd", RelationshipStore::js_get_relationship_add)?; + cx.export_function("getRelationshipRemove", RelationshipStore::js_get_relationship_remove)?; + cx.export_function( + "getRelationshipAddsByFid", + RelationshipStore::js_get_relationship_adds_by_fid, + )?; + cx.export_function( + "getRelationshipRemovesByFid", + RelationshipStore::js_get_relationship_removes_by_fid, + )?; + cx.export_function( + "getRelationshipsByRelatedObjectRef", + RelationshipStore::js_get_relationships_by_related_object_ref, + )?; + // CastStore methods cx.export_function("createCastStore", CastStore::js_create_cast_store)?; cx.export_function("getCastAdd", CastStore::js_get_cast_add)?; diff --git a/apps/hubble/src/addon/src/store/message.rs b/apps/hubble/src/addon/src/store/message.rs index 509eeaecf4..c20cefec2b 100644 --- a/apps/hubble/src/addon/src/store/message.rs +++ b/apps/hubble/src/addon/src/store/message.rs @@ -73,6 +73,13 @@ pub enum RootPrefix { /* Used to index fname username proofs by fid */ FNameUserNameProofByFid = 27, + + /* Used to index tags by target */ + TagsByTarget = 28, + + /* Used to index relationships by source and target object refs */ + RelationshipsBySource = 29, + RelationshipsByTarget = 30, } /** Copied from the JS code */ @@ -89,6 +96,16 @@ pub enum UserPostfix { UsernameProofMessage = 7, // Add new message types here + + /* Tag message */ + TagMessage = 8, + + /* Object message */ + ObjectMessage = 9, + + /* Relationship message */ + RelationshipMessage = 10, + // NOTE: If you add a new message type, make sure that it is only used to store Message protobufs. // If you need to store an index, use one of the UserPostfix values below (>86). /** Index records (must be 86-255) */ @@ -122,6 +139,18 @@ pub enum UserPostfix { /* Link Compact State set */ LinkCompactStateMessage = 100, + + /** TagStore add and remove sets */ + TagAdds = 101, + TagRemoves = 102, + + /** ObjectStore add and remove sets */ + ObjectAdds = 103, + ObjectRemoves = 104, + + /** RelationshipStore add and remove sets */ + RelationshipAdds = 105, + RelationshipRemoves = 106, } impl UserPostfix { @@ -183,6 +212,18 @@ pub fn type_to_set_postfix(message_type: MessageType) -> UserPostfix { return UserPostfix::UsernameProofMessage; } + if message_type == MessageType::TagAdd || message_type == MessageType::TagRemove { + return UserPostfix::TagMessage; + } + + if message_type == MessageType::ObjectAdd || message_type == MessageType::ObjectRemove { + return UserPostfix::ObjectMessage; + } + + if message_type == MessageType::RelationshipAdd || message_type == MessageType::RelationshipRemove { + return UserPostfix::RelationshipMessage; + } + panic!("invalid type"); } diff --git a/apps/hubble/src/addon/src/store/mod.rs b/apps/hubble/src/addon/src/store/mod.rs index d7d02b126a..540750896b 100644 --- a/apps/hubble/src/addon/src/store/mod.rs +++ b/apps/hubble/src/addon/src/store/mod.rs @@ -8,6 +8,9 @@ pub use self::user_data_store::*; pub use self::username_proof_store::*; pub use self::utils::*; pub use self::verification_store::*; +pub use self::tag_store::*; +pub use self::object_store::*; +pub use self::relationship_store::*; mod cast_store; mod link_store; @@ -20,3 +23,9 @@ mod user_data_store; mod username_proof_store; mod utils; mod verification_store; +mod tag_store; +mod object_store; +mod relationship_store; + +#[cfg(test)] +mod store_tests; diff --git a/apps/hubble/src/addon/src/store/object_store.rs b/apps/hubble/src/addon/src/store/object_store.rs new file mode 100644 index 0000000000..cdbd363c07 --- /dev/null +++ b/apps/hubble/src/addon/src/store/object_store.rs @@ -0,0 +1,559 @@ +use super::{ + deferred_settle_messages, hub_error_to_js_throw, make_user_key, + store::{Store, StoreDef}, + utils::{get_page_options, get_store}, + HubError, IntoU8, MessagesPage, PageOptions, StoreEventHandler, UserPostfix, + HASH_LENGTH, TS_HASH_LENGTH, +}; +use crate::{ + db::{RocksDB, RocksDbTransactionBatch}, + protos::{self, Message, MessageType, ObjectAddBody, ObjectRemoveBody}, +}; +use crate::{protos::message_data, THREAD_POOL}; +use neon::{ + context::{Context, FunctionContext}, + result::JsResult, + types::{buffer::TypedArray, JsBox, JsBuffer, JsNumber, JsPromise, JsString}, +}; +use prost::Message as _; +use std::{borrow::Borrow, sync::Arc}; + +pub struct ObjectStoreDef { + prune_size_limit: u32, +} + +impl StoreDef for ObjectStoreDef { + fn postfix(&self) -> u8 { + UserPostfix::ObjectMessage.as_u8() + } + + fn add_message_type(&self) -> u8 { + MessageType::ObjectAdd.into_u8() + } + + fn remove_message_type(&self) -> u8 { + MessageType::ObjectRemove as u8 + } + + fn is_add_type(&self, message: &protos::Message) -> bool { + message.signature_scheme == protos::SignatureScheme::Ed25519 as i32 + && message.data.is_some() + && message.data.as_ref().unwrap().r#type == MessageType::ObjectAdd as i32 + && message.data.as_ref().unwrap().body.is_some() + } + + fn is_remove_type(&self, message: &protos::Message) -> bool { + message.signature_scheme == protos::SignatureScheme::Ed25519 as i32 + && message.data.is_some() + && message.data.as_ref().unwrap().r#type == MessageType::ObjectRemove as i32 + && message.data.as_ref().unwrap().body.is_some() + } + + fn compact_state_message_type(&self) -> u8 { + MessageType::None as u8 + } + + fn is_compact_state_type(&self, _message: &Message) -> bool { + false + } + + fn build_secondary_indices( + &self, + _txn: &mut RocksDbTransactionBatch, + _ts_hash: &[u8; TS_HASH_LENGTH], + _message: &Message, + ) -> Result<(), HubError> { + // VLAD-TODO: figure out if we need secondary indexes for objects + // let (by_target_key, rtype) = self.secondary_index_key(ts_hash, message)?; + + // this is saving the key,value pair + // txn.put(by_target_key, rtype); + + Ok(()) + } + + fn delete_secondary_indices( + &self, + _txn: &mut RocksDbTransactionBatch, + _ts_hash: &[u8; TS_HASH_LENGTH], + _message: &Message, + ) -> Result<(), HubError> { + // VLAD-TODO: figure out if we need secondary indexes for objects + // let (by_target_key, _) = self.secondary_index_key(ts_hash, message)?; + + // txn.delete(by_target_key); + + Ok(()) + } + + fn delete_remove_secondary_indices( + &self, + _txn: &mut RocksDbTransactionBatch, + _message: &Message, + ) -> Result<(), HubError> { + Ok(()) + } + + fn find_merge_add_conflicts( + &self, + _db: &RocksDB, + _message: &protos::Message, + ) -> Result<(), HubError> { + // For tags, there will be no conflicts + Ok(()) + } + + fn find_merge_remove_conflicts( + &self, + _db: &RocksDB, + _message: &Message, + ) -> Result<(), HubError> { + // For tags, there will be no conflicts + Ok(()) + } + + fn make_add_key(&self, message: &protos::Message) -> Result, HubError> { + let hash = match message.data.as_ref().unwrap().body.as_ref().unwrap() { + message_data::Body::ObjectAddBody(_) => message.hash.as_ref(), + // Some(message_data::Body::CastAddBody(_)) => message.hash.as_ref(), + message_data::Body::ObjectRemoveBody(object_remove_body) => { + // Some(message_data::Body::CastRemoveBody(cast_remove_body)) => { + object_remove_body.target_hash.as_ref() + } + _ => { + return Err(HubError { + code: "bad_request.validation_failure".to_string(), + message: "Invalid object body".to_string(), + }) + } + }; + + Ok(Self::make_object_adds_key( + message.data.as_ref().unwrap().fid as u32, + hash, + )) + } + + fn make_remove_key(&self, message: &protos::Message) -> Result, HubError> { + let hash = match message.data.as_ref().unwrap().body.as_ref().unwrap() { + message_data::Body::ObjectAddBody(_) => message.hash.as_ref(), + message_data::Body::ObjectRemoveBody(object_remove_body) => { + object_remove_body.target_hash.as_ref() + } + _ => { + return Err(HubError { + code: "bad_request.validation_failure".to_string(), + message: "Invalid object body for remove key".to_string(), + }) + } + }; + + Ok(Self::make_object_removes_key( + message.data.as_ref().unwrap().fid as u32, + hash, + )) + } + + fn make_compact_state_add_key(&self, _message: &Message) -> Result, HubError> { + Err(HubError { + code: "bad_request.invalid_param".to_string(), + message: "Object Store doesn't support compact state".to_string(), + }) + } + + fn get_prune_size_limit(&self) -> u32 { + self.prune_size_limit + } +} + +impl ObjectStoreDef { + // VIC-TODO: convert from u8 as part of key to hash of type + // fn secondary_index_key( + // &self, + // ts_hash: &[u8; TS_HASH_LENGTH], + // message: &protos::Message, + // ) -> Result<(Vec, Vec), HubError> { + // // Make sure at least one of targetCastId or targetUrl is set + // let object_body = match message.data.as_ref().unwrap().body.as_ref().unwrap() { + // message_data::Body::ObjectBody(object_body) => object_body, + // _ => Err(HubError { + // code: "bad_request.validation_failure".to_string(), + // message: "Invalid object body".to_string(), + // })?, + // }; + + // let by_target_key = ObjectStoreDef::make_tags_by_target_key( + // target, + // message.data.as_ref().unwrap().fid as u32, + // Some(ts_hash), + // ); + + // // VIC-TODO: blake3_20 hash here? + // // Ok((by_target_key, tag_body.r#type as u8)) + // Ok((by_target_key, object_body.r#type.as_bytes().to_vec())) + // } + + // VIC-TODO: fix the key here + pub fn make_object_adds_key( + fid: u32, + hash: &Vec, + ) -> Vec { + let mut key = Vec::with_capacity(5 + 1 + 20); + + key.extend_from_slice(&make_user_key(fid)); // make_user_key, 5 bytes + key.push(UserPostfix::ObjectAdds as u8); // ObjectAdds postfix, 1 byte + if hash.len() == HASH_LENGTH { + key.extend_from_slice(hash.as_slice()); // hash, 20 bytes + } + key + } + + // VIC-TODO: fix the key here + pub fn make_object_removes_key( + fid: u32, + hash: &Vec, + ) -> Vec { + let mut key = Vec::with_capacity(5 + 1 + 20); + + key.extend_from_slice(&make_user_key(fid)); // make_user_key, 5 bytes + key.push(UserPostfix::ObjectRemoves as u8); // ObjectRemoves postfix, 1 byte + if hash.len() == HASH_LENGTH { + key.extend_from_slice(hash.as_slice()); // hash, 20 bytes + } + key + } +} + +pub struct ObjectStore {} + +impl ObjectStore { + pub fn new( + db: Arc, + store_event_handler: Arc, + prune_size_limit: u32, + ) -> Store { + Store::new_with_store_def( + db, + store_event_handler, + Box::new(ObjectStoreDef { prune_size_limit }), + ) + } + + pub fn get_object_add( + store: &Store, + fid: u32, + hash: Vec, + ) -> Result, HubError> { + let partial_message = protos::Message { + data: Some(protos::MessageData { + fid: fid as u64, + r#type: MessageType::ObjectAdd.into(), + body: Some(protos::message_data::Body::ObjectAddBody(ObjectAddBody { + ..Default::default() + })), + ..Default::default() + }), + hash, + ..Default::default() + }; + + store.get_add(&partial_message) + } + + pub fn js_get_object_add(mut cx: FunctionContext) -> JsResult { + let channel = cx.channel(); + + let store = get_store(&mut cx)?; + + let fid = cx.argument::(0).unwrap().value(&mut cx) as u32; + let hash_buffer = cx.argument::(1)?; + let hash_bytes = hash_buffer.as_slice(&cx); + + let result = match Self::get_object_add(&store, fid, hash_bytes.to_vec()) { + Ok(Some(message)) => message.encode_to_vec(), + Ok(None) => cx.throw_error(format!( + "{}/{} for {}", + "not_found", "objectAddMessage not found", fid + ))?, + Err(e) => return hub_error_to_js_throw(&mut cx, e), + }; + + let (deferred, promise) = cx.promise(); + deferred.settle_with(&channel, move |mut cx| { + let mut js_buffer = cx.buffer(result.len())?; + js_buffer.as_mut_slice(&mut cx).copy_from_slice(&result); + Ok(js_buffer) + }); + + Ok(promise) + } + + pub fn get_object_remove( + store: &Store, + fid: u32, + hash: Vec, + ) -> Result, HubError> { + let partial_message = protos::Message { + data: Some(protos::MessageData { + fid: fid as u64, + r#type: MessageType::ObjectRemove.into(), + body: Some(protos::message_data::Body::ObjectRemoveBody(ObjectRemoveBody { + target_hash: hash.clone(), + })), + ..Default::default() + }), + ..Default::default() + }; + + let r = store.get_remove(&partial_message); + // println!("got tag remove: {:?}", r); + + r + } + + pub fn js_get_object_remove(mut cx: FunctionContext) -> JsResult { + let store = get_store(&mut cx)?; + + let fid = cx.argument::(0).unwrap().value(&mut cx) as u32; + let hash_buffer = cx.argument::(1)?; + let hash_bytes = hash_buffer.as_slice(&cx).to_vec(); + + let result = match ObjectStore::get_object_remove(&store, fid, hash_bytes) { + Ok(Some(message)) => message.encode_to_vec(), + Ok(None) => cx.throw_error(format!( + "{}/{} for {}", + "not_found", "objectRemoveMessage not found", fid + ))?, + Err(e) => return hub_error_to_js_throw(&mut cx, e), + }; + + let channel = cx.channel(); + let (deferred, promise) = cx.promise(); + deferred.settle_with(&channel, move |mut cx| { + let mut js_buffer = cx.buffer(result.len())?; + js_buffer.as_mut_slice(&mut cx).copy_from_slice(&result); + Ok(js_buffer) + }); + + Ok(promise) + } + + pub fn get_object_adds_by_fid( + store: &Store, + fid: u32, + r#type: String, + page_options: &PageOptions, + ) -> Result { + store.get_adds_by_fid( + fid, + page_options, + Some(|message: &Message| { + if let Some(object_body) = &message.data.as_ref().unwrap().body { + if let protos::message_data::Body::ObjectAddBody(object_body) = object_body { + if r#type == "" || object_body.r#type == r#type { + return true; + } + } + } + + false + }), + ) + } + + pub fn create_object_store(mut cx: FunctionContext) -> JsResult>> { + let db_js_box = cx.argument::>>(0)?; + let db = (**db_js_box.borrow()).clone(); + + // Read the StoreEventHandlerfg + let store_event_handler_js_box = cx.argument::>>(1)?; + let store_event_handler = (**store_event_handler_js_box.borrow()).clone(); + + // Read the prune size limit and prune time limit from the options + let prune_size_limit = cx + .argument::(2) + .map(|n| n.value(&mut cx) as u32)?; + + Ok(cx.boxed(Arc::new(ObjectStore::new( + db, + store_event_handler, + prune_size_limit, + )))) + } + + pub fn js_get_object_adds_by_fid(mut cx: FunctionContext) -> JsResult { + let store = get_store(&mut cx)?; + + let fid = cx.argument::(0).unwrap().value(&mut cx) as u32; + let r#type = cx.argument::(1).map(|s| s.value(&mut cx))?; + + let page_options = get_page_options(&mut cx, 2)?; + let channel = cx.channel(); + let (deferred, promise) = cx.promise(); + + THREAD_POOL.lock().unwrap().execute(move || { + let messages = + ObjectStore::get_object_adds_by_fid(&store, fid, r#type, &page_options); + + deferred_settle_messages(deferred, &channel, messages); + }); + + Ok(promise) + } + + pub fn get_object_removes_by_fid( + store: &Store, + fid: u32, + r#type: String, + page_options: &PageOptions, + ) -> Result { + store.get_removes_by_fid( + fid, + page_options, + Some(|message: &Message| { + if let Some(object_body) = &message.data.as_ref().unwrap().body { + if let protos::message_data::Body::ObjectAddBody(object_body) = object_body { + if object_body.r#type == r#type { + return true; + } + } + } + + false + }), + ) + } + + pub fn js_get_object_removes_by_fid(mut cx: FunctionContext) -> JsResult { + let store = get_store(&mut cx)?; + + let fid = cx.argument::(0).unwrap().value(&mut cx) as u32; + let r#type = cx.argument::(1).map(|s| s.value(&mut cx))?; + + let page_options = get_page_options(&mut cx, 2)?; + let channel = cx.channel(); + let (deferred, promise) = cx.promise(); + + THREAD_POOL.lock().unwrap().execute(move || { + let messages = ObjectStore::get_object_removes_by_fid( + &store, + fid, + r#type, + &page_options, + ); + + deferred_settle_messages(deferred, &channel, messages); + }); + + Ok(promise) + } + + // pub fn get_objects_by_target( + // store: &Store, + // target: &Target, + // r#type: String, + // page_options: &PageOptions, + // ) -> Result { + // let prefix = TagStoreDef::make_tags_by_target_key(target, 0, None); + + // let mut message_keys = vec![]; + // let mut last_key = vec![]; + + // store + // .db() + // .for_each_iterator_by_prefix(&prefix, page_options, |key, value| { + // if r#type.is_empty() || value.eq(r#type.as_bytes()) + // { + // let ts_hash_offset = prefix.len(); + // let fid_offset = ts_hash_offset + TS_HASH_LENGTH; + + // let fid = + // u32::from_be_bytes(key[fid_offset..fid_offset + 4].try_into().unwrap()); + // let ts_hash = key[ts_hash_offset..ts_hash_offset + TS_HASH_LENGTH] + // .try_into() + // .unwrap(); + // let message_primary_key = crate::store::message::make_message_primary_key( + // fid, + // store.postfix(), + // Some(&ts_hash), + // ); + + // message_keys.push(message_primary_key.to_vec()); + // if message_keys.len() >= page_options.page_size.unwrap_or(PAGE_SIZE_MAX) { + // last_key = key.to_vec(); + // return Ok(true); // Stop iterating + // } + // } + + // Ok(false) // Continue iterating + // })?; + + // let messages_bytes = + // message::get_many_messages_as_bytes(store.db().borrow(), message_keys)?; + // let next_page_token = if last_key.len() > 0 { + // Some(last_key[prefix.len()..].to_vec()) + // } else { + // None + // }; + + // Ok(MessagesPage { + // messages_bytes, + // next_page_token, + // }) + // } + + // pub fn js_get_objects_by_target(mut cx: FunctionContext) -> JsResult { + // let store = get_store(&mut cx)?; + + // let target_cast_id_buffer = cx.argument::(0)?; + // let target_cast_id_bytes = target_cast_id_buffer.as_slice(&cx); + // let target_cast_id = if target_cast_id_bytes.len() > 0 { + // match protos::CastId::decode(target_cast_id_bytes) { + // Ok(cast_id) => Some(cast_id), + // Err(e) => return cx.throw_error(e.to_string()), + // } + // } else { + // None + // }; + + // let target_url = cx.argument::(1).map(|s| s.value(&mut cx))?; + + // // We need at least one of target_cast_id or target_url + // if target_cast_id.is_none() && target_url.is_empty() { + // return cx.throw_error("target_cast_id or target_url is required"); + // } + + // let target = if target_cast_id.is_some() { + // Target::TargetCastId(target_cast_id.unwrap()) + // } else { + // Target::TargetUrl(target_url) + // }; + + // // let r#type = match cx.argument_opt(2) { + // // Some(arg) => match arg.downcast::(&mut cx) { + // // Ok(js_string) => js_string.value(&mut cx), + // // Err(_) => "".to_string(), // Handle the case where the argument is not a JsString + // // }, + // // None => "".to_string(), // Default to an empty string if the argument is not provided + // // }; + // let r#type = cx.argument::(2).map(|s| s.value(&mut cx))?; + + // let page_options = get_page_options(&mut cx, 3)?; + + // let channel = cx.channel(); + // let (deferred, promise) = cx.promise(); + + // THREAD_POOL.lock().unwrap().execute(move || { + // let messages = TagStore::get_tags_by_target( + // &store, + // &target, + // r#type, + // &page_options, + // ); + + // deferred_settle_messages(deferred, &channel, messages); + // }); + + // Ok(promise) + // } +} diff --git a/apps/hubble/src/addon/src/store/reaction_store.rs b/apps/hubble/src/addon/src/store/reaction_store.rs index 31318808cd..1801e02f13 100644 --- a/apps/hubble/src/addon/src/store/reaction_store.rs +++ b/apps/hubble/src/addon/src/store/reaction_store.rs @@ -560,7 +560,7 @@ impl ReactionStore { let fid = u32::from_be_bytes(key[fid_offset..fid_offset + 4].try_into().unwrap()); - let ts_hash = key[ts_hash_offset..ts_hash_offset + TS_HASH_LENGTH] + let ts_hash: [u8; 24] = key[ts_hash_offset..ts_hash_offset + TS_HASH_LENGTH] .try_into() .unwrap(); let message_primary_key = crate::store::message::make_message_primary_key( diff --git a/apps/hubble/src/addon/src/store/relationship_store.rs b/apps/hubble/src/addon/src/store/relationship_store.rs new file mode 100644 index 0000000000..453ffb98bd --- /dev/null +++ b/apps/hubble/src/addon/src/store/relationship_store.rs @@ -0,0 +1,594 @@ +use super::{ + deferred_settle_messages, hub_error_to_js_throw, make_fid_key, make_user_key, message, + store::{Store, StoreDef}, + utils::{get_page_options, get_store, make_ref_key}, + HubError, IntoU8, MessagesPage, PageOptions, RootPrefix, StoreEventHandler, UserPostfix, + PAGE_SIZE_MAX, HASH_LENGTH, TS_HASH_LENGTH, +}; +use crate::{ + db::{RocksDB, RocksDbTransactionBatch}, + protos::{self, ObjectRef, RefDirection, Message, MessageType, RelationshipAddBody, RelationshipRemoveBody}, +}; +use crate::{protos::message_data, THREAD_POOL}; +use neon::{ + context::{Context, FunctionContext}, + result::JsResult, + types::{buffer::TypedArray, JsBox, JsBuffer, JsNumber, JsPromise, JsString}, +}; +use prost::Message as _; +use std::{borrow::Borrow, sync::Arc}; + +pub struct RelationshipStoreDef { + prune_size_limit: u32, +} + +impl StoreDef for RelationshipStoreDef { + fn postfix(&self) -> u8 { + UserPostfix::RelationshipMessage.as_u8() + } + + fn add_message_type(&self) -> u8 { + MessageType::RelationshipAdd.into_u8() + } + + fn remove_message_type(&self) -> u8 { + MessageType::RelationshipRemove as u8 + } + + fn is_add_type(&self, message: &protos::Message) -> bool { + message.signature_scheme == protos::SignatureScheme::Ed25519 as i32 + && message.data.is_some() + && message.data.as_ref().unwrap().r#type == MessageType::RelationshipAdd as i32 + && message.data.as_ref().unwrap().body.is_some() + } + + fn is_remove_type(&self, message: &protos::Message) -> bool { + message.signature_scheme == protos::SignatureScheme::Ed25519 as i32 + && message.data.is_some() + && message.data.as_ref().unwrap().r#type == MessageType::RelationshipRemove as i32 + && message.data.as_ref().unwrap().body.is_some() + } + + fn compact_state_message_type(&self) -> u8 { + MessageType::None as u8 + } + + fn is_compact_state_type(&self, _message: &Message) -> bool { + false + } + + fn build_secondary_indices( + &self, + txn: &mut RocksDbTransactionBatch, + ts_hash: &[u8; TS_HASH_LENGTH], + message: &Message, + ) -> Result<(), HubError> { + let (by_source_key, by_target_key, name) = self.secondary_index_key(ts_hash, message)?; + + // this is saving the relationship type against both the source and target object ref index + txn.put(by_source_key, name.clone()); + txn.put(by_target_key, name.clone()); + + Ok(()) + } + + fn delete_secondary_indices( + &self, + txn: &mut RocksDbTransactionBatch, + ts_hash: &[u8; TS_HASH_LENGTH], + message: &Message, + ) -> Result<(), HubError> { + let (by_source_key, by_target_key, _) = self.secondary_index_key(ts_hash, message)?; + + txn.delete(by_source_key); + txn.delete(by_target_key); + + Ok(()) + } + + fn delete_remove_secondary_indices( + &self, + _txn: &mut RocksDbTransactionBatch, + _message: &Message, + ) -> Result<(), HubError> { + Ok(()) + } + + fn find_merge_add_conflicts( + &self, + _db: &RocksDB, + _message: &protos::Message, + ) -> Result<(), HubError> { + // For tags, there will be no conflicts + Ok(()) + } + + fn find_merge_remove_conflicts( + &self, + _db: &RocksDB, + _message: &Message, + ) -> Result<(), HubError> { + // For tags, there will be no conflicts + Ok(()) + } + + fn make_add_key(&self, message: &protos::Message) -> Result, HubError> { + let hash = match message.data.as_ref().unwrap().body.as_ref().unwrap() { + message_data::Body::RelationshipAddBody(_) => message.hash.as_ref(), + // Some(message_data::Body::CastAddBody(_)) => message.hash.as_ref(), + message_data::Body::RelationshipRemoveBody(relationship_remove_body) => { + // Some(message_data::Body::CastRemoveBody(cast_remove_body)) => { + relationship_remove_body.target_hash.as_ref() + } + _ => { + return Err(HubError { + code: "bad_request.validation_failure".to_string(), + message: "Invalid relationship body".to_string(), + }) + } + }; + + Ok(Self::make_relationship_adds_key( + message.data.as_ref().unwrap().fid as u32, + hash, + )) + } + + fn make_remove_key(&self, message: &protos::Message) -> Result, HubError> { + let hash = match message.data.as_ref().unwrap().body.as_ref().unwrap() { + message_data::Body::RelationshipAddBody(_) => message.hash.as_ref(), + message_data::Body::RelationshipRemoveBody(relationship_remove_body) => { + relationship_remove_body.target_hash.as_ref() + } + _ => { + return Err(HubError { + code: "bad_request.validation_failure".to_string(), + message: "Invalid relationship body for remove key".to_string(), + }) + } + }; + + Ok(Self::make_relationship_removes_key( + message.data.as_ref().unwrap().fid as u32, + hash, + )) + } + + fn make_compact_state_add_key(&self, _message: &Message) -> Result, HubError> { + Err(HubError { + code: "bad_request.invalid_param".to_string(), + message: "Relationship Store doesn't support compact state".to_string(), + }) + } + + fn get_prune_size_limit(&self) -> u32 { + self.prune_size_limit + } +} + +impl RelationshipStoreDef { + fn secondary_index_key( + &self, + ts_hash: &[u8; TS_HASH_LENGTH], + message: &protos::Message, + ) -> Result<(Vec, Vec, Vec), HubError> { + let relationship_body = match message.data.as_ref().unwrap().body.as_ref().unwrap() { + message_data::Body::RelationshipAddBody(relationship_body) => relationship_body, + _ => Err(HubError { + code: "bad_request.validation_failure".to_string(), + message: "Invalid relationship body".to_string(), + })?, + }; + + let source = relationship_body.source.as_ref().ok_or(HubError { + code: "bad_request.validation_failure".to_string(), + message: "Invalid source body".to_string(), + })?; + + let target = relationship_body.target.as_ref().ok_or(HubError { + code: "bad_request.validation_failure".to_string(), + message: "Invalid target body".to_string(), + })?; + + let by_source_key = RelationshipStoreDef::make_relationship_by_source_key( + source, + message.data.as_ref().unwrap().fid as u32, + Some(ts_hash), + ); + + let by_target_key = RelationshipStoreDef::make_relationship_by_target_key( + target, + message.data.as_ref().unwrap().fid as u32, + Some(ts_hash), + ); + + Ok((by_source_key, by_target_key, relationship_body.r#type.as_bytes().to_vec())) + } + + pub fn make_relationship_by_source_key( + source: &ObjectRef, + fid: u32, + ts_hash: Option<&[u8; TS_HASH_LENGTH]>, + ) -> Vec { + let key = Self::make_relationship_by_related_object_key(RootPrefix::RelationshipsBySource, source, fid, ts_hash); + key + } + + pub fn make_relationship_by_target_key( + target: &ObjectRef, + fid: u32, + ts_hash: Option<&[u8; TS_HASH_LENGTH]>, + ) -> Vec { + let key = Self::make_relationship_by_related_object_key(RootPrefix::RelationshipsByTarget, target, fid, ts_hash); + key + } + + fn make_relationship_by_related_object_key( + root_prefix: RootPrefix, + object_ref: &ObjectRef, + fid: u32, + ts_hash: Option<&[u8; TS_HASH_LENGTH]>, + ) -> Vec { + let mut key = Vec::with_capacity(1 + 22 + 24 + 4); + + key.push(root_prefix as u8); // 1 byte + key.extend_from_slice(&make_ref_key(object_ref)); // at most 22 bytes if object ref isn't an fid + if ts_hash.is_some() && ts_hash.unwrap().len() == TS_HASH_LENGTH { + key.extend_from_slice(ts_hash.unwrap()); + } + if fid > 0 { + key.extend_from_slice(&make_fid_key(fid)); + } + + key + } + + pub fn make_relationship_adds_key( + fid: u32, + hash: &Vec, + ) -> Vec { + let mut key = Vec::with_capacity(5 + 1 + 20); + + key.extend_from_slice(&make_user_key(fid)); // make_user_key, 5 bytes + key.push(UserPostfix::RelationshipAdds as u8); // RelationshipAdds postfix, 1 byte + if hash.len() == HASH_LENGTH { + key.extend_from_slice(hash.as_slice()); // hash, 20 bytes + } + key + } + + pub fn make_relationship_removes_key( + fid: u32, + hash: &Vec, + ) -> Vec { + let mut key = Vec::with_capacity(5 + 1 + 20); + + key.extend_from_slice(&make_user_key(fid)); // make_user_key, 5 bytes + key.push(UserPostfix::RelationshipRemoves as u8); // RelationshipRemoves postfix, 1 byte + if hash.len() == HASH_LENGTH { + key.extend_from_slice(hash.as_slice()); // hash, 20 bytes + } + key + } +} + +pub struct RelationshipStore {} + +impl RelationshipStore { + pub fn new( + db: Arc, + store_event_handler: Arc, + prune_size_limit: u32, + ) -> Store { + Store::new_with_store_def( + db, + store_event_handler, + Box::new(RelationshipStoreDef { prune_size_limit }), + ) + } + + pub fn get_relationship_add( + store: &Store, + fid: u32, + hash: Vec, + ) -> Result, HubError> { + let partial_message = protos::Message { + data: Some(protos::MessageData { + fid: fid as u64, + r#type: MessageType::RelationshipAdd.into(), + body: Some(protos::message_data::Body::RelationshipAddBody(RelationshipAddBody { + ..Default::default() + })), + ..Default::default() + }), + hash, + ..Default::default() + }; + + store.get_add(&partial_message) + } + + pub fn js_get_relationship_add(mut cx: FunctionContext) -> JsResult { + let channel = cx.channel(); + + let store = get_store(&mut cx)?; + + let fid = cx.argument::(0).unwrap().value(&mut cx) as u32; + let hash_buffer = cx.argument::(1)?; + let hash_bytes = hash_buffer.as_slice(&cx); + + let result = match Self::get_relationship_add(&store, fid, hash_bytes.to_vec()) { + Ok(Some(message)) => message.encode_to_vec(), + Ok(None) => cx.throw_error(format!( + "{}/{} for {}", + "not_found", "RelationshipAddMessage not found", fid + ))?, + Err(e) => return hub_error_to_js_throw(&mut cx, e), + }; + + let (deferred, promise) = cx.promise(); + deferred.settle_with(&channel, move |mut cx| { + let mut js_buffer = cx.buffer(result.len())?; + js_buffer.as_mut_slice(&mut cx).copy_from_slice(&result); + Ok(js_buffer) + }); + + Ok(promise) + } + + pub fn get_relationship_remove( + store: &Store, + fid: u32, + hash: Vec, + ) -> Result, HubError> { + let partial_message = protos::Message { + data: Some(protos::MessageData { + fid: fid as u64, + r#type: MessageType::RelationshipRemove.into(), + body: Some(protos::message_data::Body::RelationshipRemoveBody(RelationshipRemoveBody { + target_hash: hash.clone(), + })), + ..Default::default() + }), + ..Default::default() + }; + + let r = store.get_remove(&partial_message); + // println!("got tag remove: {:?}", r); + + r + } + + pub fn js_get_relationship_remove(mut cx: FunctionContext) -> JsResult { + let store = get_store(&mut cx)?; + + let fid = cx.argument::(0).unwrap().value(&mut cx) as u32; + let hash_buffer = cx.argument::(1)?; + let hash_bytes = hash_buffer.as_slice(&cx).to_vec(); + + let result = match RelationshipStore::get_relationship_remove(&store, fid, hash_bytes) { + Ok(Some(message)) => message.encode_to_vec(), + Ok(None) => cx.throw_error(format!( + "{}/{} for {}", + "not_found", "RelationshipRemoveMessage not found", fid + ))?, + Err(e) => return hub_error_to_js_throw(&mut cx, e), + }; + + let channel = cx.channel(); + let (deferred, promise) = cx.promise(); + deferred.settle_with(&channel, move |mut cx| { + let mut js_buffer = cx.buffer(result.len())?; + js_buffer.as_mut_slice(&mut cx).copy_from_slice(&result); + Ok(js_buffer) + }); + + Ok(promise) + } + + pub fn get_relationship_adds_by_fid( + store: &Store, + fid: u32, + r#type: String, + page_options: &PageOptions, + ) -> Result { + store.get_adds_by_fid( + fid, + page_options, + Some(|message: &Message| { + if let Some(relationship_body) = &message.data.as_ref().unwrap().body { + if let protos::message_data::Body::RelationshipAddBody(relationship_body) = relationship_body { + if r#type == "" || relationship_body.r#type == r#type { + return true; + } + } + } + + false + }), + ) + } + + pub fn create_relationship_store(mut cx: FunctionContext) -> JsResult>> { + let db_js_box = cx.argument::>>(0)?; + let db = (**db_js_box.borrow()).clone(); + + // Read the StoreEventHandlerfg + let store_event_handler_js_box = cx.argument::>>(1)?; + let store_event_handler = (**store_event_handler_js_box.borrow()).clone(); + + // Read the prune size limit and prune time limit from the options + let prune_size_limit = cx + .argument::(2) + .map(|n| n.value(&mut cx) as u32)?; + + Ok(cx.boxed(Arc::new(RelationshipStore::new( + db, + store_event_handler, + prune_size_limit, + )))) + } + + pub fn js_get_relationship_adds_by_fid(mut cx: FunctionContext) -> JsResult { + let store = get_store(&mut cx)?; + + let fid = cx.argument::(0).unwrap().value(&mut cx) as u32; + let r#type = cx.argument::(1).map(|s| s.value(&mut cx))?; + + let page_options = get_page_options(&mut cx, 2)?; + let channel = cx.channel(); + let (deferred, promise) = cx.promise(); + + THREAD_POOL.lock().unwrap().execute(move || { + let messages = + RelationshipStore::get_relationship_adds_by_fid(&store, fid, r#type, &page_options); + + deferred_settle_messages(deferred, &channel, messages); + }); + + Ok(promise) + } + + pub fn get_relationship_removes_by_fid( + store: &Store, + fid: u32, + r#type: String, + page_options: &PageOptions, + ) -> Result { + store.get_removes_by_fid( + fid, + page_options, + Some(|message: &Message| { + if let Some(relationship_body) = &message.data.as_ref().unwrap().body { + if let protos::message_data::Body::RelationshipAddBody(relationship_body) = relationship_body { + if relationship_body.r#type == r#type { + return true; + } + } + } + + false + }), + ) + } + + pub fn js_get_relationship_removes_by_fid(mut cx: FunctionContext) -> JsResult { + let store = get_store(&mut cx)?; + + let fid = cx.argument::(0).unwrap().value(&mut cx) as u32; + let r#type = cx.argument::(1).map(|s| s.value(&mut cx))?; + + let page_options = get_page_options(&mut cx, 2)?; + let channel = cx.channel(); + let (deferred, promise) = cx.promise(); + + THREAD_POOL.lock().unwrap().execute(move || { + let messages = RelationshipStore::get_relationship_removes_by_fid( + &store, + fid, + r#type, + &page_options, + ); + + deferred_settle_messages(deferred, &channel, messages); + }); + + Ok(promise) + } + + pub fn get_relationships_by_related_object_ref( + store: &Store, + related_object_ref: &ObjectRef, + ref_direction: i32, + name: String, + page_options: &PageOptions, + ) -> Result { + let prefix = if ref_direction == RefDirection::Source as i32 { + RelationshipStoreDef::make_relationship_by_source_key(related_object_ref, 0, None) + } else { + RelationshipStoreDef::make_relationship_by_target_key(related_object_ref, 0, None) + }; + + let mut message_keys = vec![]; + let mut last_key = vec![]; + + store + .db() + .for_each_iterator_by_prefix(&prefix, page_options, |key, value| { + if name.is_empty() || value.eq(name.as_bytes()) + { + let ts_hash_offset = prefix.len(); + let fid_offset = ts_hash_offset + TS_HASH_LENGTH; + + let fid = + u32::from_be_bytes(key[fid_offset..fid_offset + 4].try_into().unwrap()); + let ts_hash = key[ts_hash_offset..ts_hash_offset + TS_HASH_LENGTH] + .try_into() + .unwrap(); + let message_primary_key = crate::store::message::make_message_primary_key( + fid, + store.postfix(), + Some(&ts_hash), + ); + + message_keys.push(message_primary_key.to_vec()); + if message_keys.len() >= page_options.page_size.unwrap_or(PAGE_SIZE_MAX) { + last_key = key.to_vec(); + return Ok(true); // Stop iterating + } + } + + Ok(false) // Continue iterating + })?; + + let messages_bytes = + message::get_many_messages_as_bytes(store.db().borrow(), message_keys)?; + let next_page_token = if last_key.len() > 0 { + Some(last_key[prefix.len()..].to_vec()) + } else { + None + }; + + Ok(MessagesPage { + messages_bytes, + next_page_token, + }) + } + + pub fn js_get_relationships_by_related_object_ref(mut cx: FunctionContext) -> JsResult { + let store = get_store(&mut cx)?; + + let related_object_ref_buffer = cx.argument::(0)?; + let related_object_ref_bytes = related_object_ref_buffer.as_slice(&cx); + let related_object_ref = if related_object_ref_bytes.len() > 0 { + match protos::ObjectRef::decode(related_object_ref_bytes) { + Ok(object_ref) => Some(object_ref), + Err(e) => return cx.throw_error(e.to_string()), + } + } else { + return cx.throw_error("source_object_ref is required"); + }; + let ref_direction = cx.argument::(1).map(|s| s.value(&mut cx) as i32)?; + + let name = cx.argument::(2).map(|s| s.value(&mut cx))?; + + let page_options = get_page_options(&mut cx, 3)?; + + let channel = cx.channel(); + let (deferred, promise) = cx.promise(); + + THREAD_POOL.lock().unwrap().execute(move || { + let messages = RelationshipStore::get_relationships_by_related_object_ref( + &store, + &related_object_ref.unwrap(), + ref_direction, + name, + &page_options, + ); + + deferred_settle_messages(deferred, &channel, messages); + }); + + Ok(promise) + } +} diff --git a/apps/hubble/src/addon/src/store/store_tests.rs b/apps/hubble/src/addon/src/store/store_tests.rs new file mode 100644 index 0000000000..f16ea8466a --- /dev/null +++ b/apps/hubble/src/addon/src/store/store_tests.rs @@ -0,0 +1,151 @@ + + +#[cfg(test)] +mod tests { + use crate::{ + // db::{RocksDB, RocksDbTransactionBatch}, + db::RocksDB, + store::{PageOptions, RootPrefix, StoreEventHandler, TagStore, TS_HASH_LENGTH}, + }; + use std::sync::Arc; + + fn pretty_print_hash(vector: Vec) { + let string_vector: Vec = vector.iter().map(|&b| format!("{:02x?}", b)).collect(); + let joined = string_vector.join(""); + print!("{}", joined); + } + + #[test] + fn test_test() { + // // Create a new DB with a random temporary path + // let tmp_path = tempfile::tempdir() + // .unwrap() + // .path() + // .as_os_str() + // .to_string_lossy() + // .to_string(); + // let db = Arc::new(RocksDB::new(&tmp_path).unwrap()); + // db.open().unwrap(); + + // /Users/vic/src/hub-monorepo/apps/hubble/.rocks + + // make sure to shut down the hubble locking this file before! + let db = Arc::new(RocksDB::new("/Users/vic/src/hub-monorepo/apps/hubble/.rocks/rocks.hub._default").unwrap()); + db.open().unwrap(); + + // let mut txn = RocksDbTransactionBatch::new(); + // println!("RootPrefix {}", RootPrefix::TagsByTarget as u8); + // let mut key = Vec::with_capacity(1 + 28 + 24 + 4); + // key.push(RootPrefix::TagsByTarget as u8); // TagsByTarget + // println!("{:x?}", key); + + let store_event_handler = StoreEventHandler::new(Option::None, Option::None, Option::None); + let _tag_store = TagStore::new(Arc::clone(&db), store_event_handler, 5000); + + // link key + // :::: + + // try to list all user keys + // let hit = db.count_keys_at_prefix(&[RootPrefix::User as u8]); + // match hit { + // Ok(value) => { + // println!("{}", value); + // } + // Err(err) => { + // println!("{}", err); + // } + // } + + let r2 = db.for_each_iterator_by_prefix(&[RootPrefix::User as u8], &PageOptions::default(), |key, value| { + // println!("{:x?}, {:x?}", key, value); + + let ts_hash_offset = 1; + let fid_offset: usize = ts_hash_offset; // + TS_HASH_LENGTH; + + let fid = u32::from_be_bytes(key[1..1 + 4].try_into().unwrap()); + + let post_fix: u8 = u8::from_be_bytes(key[1 + 4..1 + 5].try_into().unwrap()); + + let ts_hash: [u8; 4] = (key[1 + 5..1 + 5 + 4].try_into().unwrap()); + + if post_fix == 101 { + println!("Tag by fid: {}", fid); + println!("key: {:x?}, {}", key, key.len()); + println!("value: {:x?}", value); + println!("last: {:x?}", pretty_print_hash(ts_hash.to_vec())); + } + + // let ts_hash: [u8; 24] = (key[fid_offset + 5..fid_offset + 5 + TS_HASH_LENGTH].try_into().unwrap()); + // println!("prefix: {:?}, fid: {}, post_fix: {:?}", &[RootPrefix::User as u8], fid, post_fix); //, pretty_print_hash(ts_hash.to_vec())); + + // let ts_hash = key[ts_hash_offset..ts_hash_offset + TS_HASH_LENGTH] + // .try_into() + // .unwrap(); + + // let message_primary_key = crate::store::message::make_message_primary_key( + // fid, + // store.postfix(), + // Some(&ts_hash), + // ); + + // if r#type.is_empty() || value.eq(r#type.as_bytes()) { + // let ts_hash_offset = prefix.len(); + // let fid_offset: usize = ts_hash_offset + TS_HASH_LENGTH; + + // let fid = + // u32::from_be_bytes(key[fid_offset..fid_offset + 4].try_into().unwrap()); + // let ts_hash = key[ts_hash_offset..ts_hash_offset + TS_HASH_LENGTH] + // .try_into() + // .unwrap(); + // let message_primary_key = crate::store::message::make_message_primary_key( + // fid, + // store.postfix(), + // Some(&ts_hash), + // ); + + // message_keys.push(message_primary_key.to_vec()); + // if message_keys.len() >= page_options.page_size.unwrap_or(PAGE_SIZE_MAX) { + // last_key = key.to_vec(); + // return Ok(true); // Stop iterating + // } + // } + + Ok(false) + }); + match r2 { + Err(err) => { + println!("{}", err); + } + Ok(_e) => { + + } + } + + // // Add some keys + // db.put(b"key100", b"value1").unwrap(); + // db.put(b"key101", b"value3").unwrap(); + // db.put(b"key104", b"value4").unwrap(); + // db.put(b"key200", b"value2").unwrap(); + + // // Check if keys exist + // let exists = db.keys_exist(&vec![b"key100".to_vec(), b"key101".to_vec()]); + // assert_eq!(exists.unwrap(), vec![true, true]); + + let r = TagStore::get_tag_add(&_tag_store, 628598, String::new(), Option::None); + match r { + Ok(value) => { + // if (Some(value)) { + println!("Value: {:?}", value); + // } + } + Err(err) => { + println!("Error: {}", err.message); + } + } + + // println!("tag_body::Target {}", tag_store.make_tag_adds_key(123, "test")); + + // Cleanup + // db.destroy().unwrap(); + } +} \ No newline at end of file diff --git a/apps/hubble/src/addon/src/store/tag_store.rs b/apps/hubble/src/addon/src/store/tag_store.rs new file mode 100644 index 0000000000..cecc50d796 --- /dev/null +++ b/apps/hubble/src/addon/src/store/tag_store.rs @@ -0,0 +1,633 @@ +use super::{ + deferred_settle_messages, make_fid_key, make_user_key, + message, + store::{Store, StoreDef}, + utils::{get_page_options, get_store, make_ref_key}, + HubError, IntoU8, MessagesPage, PageOptions, RootPrefix, StoreEventHandler, UserPostfix, + PAGE_SIZE_MAX, TS_HASH_LENGTH, +}; +use crate::{ + db::{RocksDB, RocksDbTransactionBatch}, + protos::{self, ObjectRef, Message, MessageType}, +}; +use crate::{protos::message_data, THREAD_POOL}; +use neon::{ + context::{Context, FunctionContext}, + result::JsResult, + types::{buffer::TypedArray, JsBox, JsBuffer, JsNumber, JsPromise, JsString}, +}; +use prost::Message as _; +use std::{borrow::Borrow, convert::TryInto, sync::Arc}; + +pub struct TagStoreDef { + prune_size_limit: u32, +} + +impl StoreDef for TagStoreDef { + fn postfix(&self) -> u8 { + UserPostfix::TagMessage.as_u8() + } + + fn add_message_type(&self) -> u8 { + MessageType::TagAdd.into_u8() + } + + fn remove_message_type(&self) -> u8 { + MessageType::TagRemove as u8 + } + + fn is_add_type(&self, message: &protos::Message) -> bool { + message.signature_scheme == protos::SignatureScheme::Ed25519 as i32 + && message.data.is_some() + && message.data.as_ref().unwrap().r#type == MessageType::TagAdd as i32 + && message.data.as_ref().unwrap().body.is_some() + } + + fn is_remove_type(&self, message: &protos::Message) -> bool { + message.signature_scheme == protos::SignatureScheme::Ed25519 as i32 + && message.data.is_some() + && message.data.as_ref().unwrap().r#type == MessageType::TagRemove as i32 + && message.data.as_ref().unwrap().body.is_some() + } + + fn compact_state_message_type(&self) -> u8 { + MessageType::None as u8 + } + + fn is_compact_state_type(&self, _message: &Message) -> bool { + false + } + + fn build_secondary_indices( + &self, + txn: &mut RocksDbTransactionBatch, + ts_hash: &[u8; TS_HASH_LENGTH], + message: &Message, + ) -> Result<(), HubError> { + // + let (by_target_key, rtype) = self.secondary_index_key(ts_hash, message)?; + + // this is saving the key,value pair + txn.put(by_target_key, rtype); + + Ok(()) + } + + fn delete_secondary_indices( + &self, + txn: &mut RocksDbTransactionBatch, + ts_hash: &[u8; TS_HASH_LENGTH], + message: &Message, + ) -> Result<(), HubError> { + let (by_target_key, _) = self.secondary_index_key(ts_hash, message)?; + + txn.delete(by_target_key); + + Ok(()) + } + + fn delete_remove_secondary_indices( + &self, + _txn: &mut RocksDbTransactionBatch, + _message: &Message, + ) -> Result<(), HubError> { + Ok(()) + } + + fn find_merge_add_conflicts( + &self, + _db: &RocksDB, + _message: &protos::Message, + ) -> Result<(), HubError> { + // For tags, there will be no conflicts + Ok(()) + } + + fn find_merge_remove_conflicts( + &self, + _db: &RocksDB, + _message: &Message, + ) -> Result<(), HubError> { + // For tags, there will be no conflicts + Ok(()) + } + + fn make_add_key(&self, message: &protos::Message) -> Result, HubError> { + let tag_body = match message.data.as_ref().unwrap().body.as_ref().unwrap() { + message_data::Body::TagBody(tag_body) => tag_body, + _ => { + return Err(HubError { + code: "bad_request.validation_failure".to_string(), + message: "Invalid tag body".to_string(), + }) + } + }; + + Self::make_tag_adds_key( + message.data.as_ref().unwrap().fid as u32, + tag_body.name.clone(), + tag_body.target.as_ref(), + ) + } + + fn make_remove_key(&self, message: &protos::Message) -> Result, HubError> { + let tag_body = match message.data.as_ref().unwrap().body.as_ref().unwrap() { + message_data::Body::TagBody(tag_body) => tag_body, + _ => { + return Err(HubError { + code: "bad_request.validation_failure".to_string(), + message: "Invalid tag body".to_string(), + }) + } + }; + + Self::make_tag_removes_key( + message.data.as_ref().unwrap().fid as u32, + tag_body.name.clone(), + tag_body.target.as_ref(), + ) + } + + fn make_compact_state_add_key(&self, _message: &Message) -> Result, HubError> { + Err(HubError { + code: "bad_request.invalid_param".to_string(), + message: "Tag Store doesn't support compact state".to_string(), + }) + } + + fn get_prune_size_limit(&self) -> u32 { + self.prune_size_limit + } +} + +impl TagStoreDef { + fn secondary_index_key( + &self, + ts_hash: &[u8; TS_HASH_LENGTH], + message: &protos::Message, + ) -> Result<(Vec, Vec), HubError> { + // Make sure at least one of targetCastId or targetUrl is set + let tag_body = match message.data.as_ref().unwrap().body.as_ref().unwrap() { + message_data::Body::TagBody(tag_body) => tag_body, + _ => Err(HubError { + code: "bad_request.validation_failure".to_string(), + message: "Invalid tag body".to_string(), + })?, + }; + let target = tag_body.target.as_ref().ok_or(HubError { + code: "bad_request.validation_failure".to_string(), + message: "Invalid tag body".to_string(), + })?; + + let by_target_key = TagStoreDef::make_tags_by_target_key( + target, + message.data.as_ref().unwrap().fid as u32, + Some(ts_hash), + ); + + Ok((by_target_key, tag_body.name.as_bytes().to_vec())) + } + + pub fn make_tags_by_target_key( + target: &ObjectRef, + fid: u32, + ts_hash: Option<&[u8; TS_HASH_LENGTH]>, + ) -> Vec { + let mut key = Vec::with_capacity(1 + 31 + 24 + 4); + + key.push(RootPrefix::TagsByTarget as u8); // TagsByTarget prefix, 1 byte + key.extend_from_slice(&make_ref_key(target)); + if fid > 0 { + key.extend_from_slice(&make_fid_key(fid)); + } + if ts_hash.is_some() && ts_hash.unwrap().len() == TS_HASH_LENGTH { + key.extend_from_slice(ts_hash.unwrap()); + } + key + } + + pub fn make_tag_adds_key( + fid: u32, + name: String, + target: Option<&ObjectRef>, + ) -> Result, HubError> { + if !target.is_some() || name.is_empty() { + return Err(HubError { + code: "bad_request.validation_failure".to_string(), + message: "tag provided without name or target".to_string(), + }); + } + let mut key = Vec::with_capacity(33 + 1 + 1 + 31); + + key.extend_from_slice(&make_user_key(fid)); + key.push(UserPostfix::TagAdds as u8); // tagAdds postfix, 1 byte + + key.extend_from_slice(&name.as_bytes().to_vec()); + + // target, 31 bytes + key.extend_from_slice(&make_ref_key(target.unwrap())); + + Ok(key) + } + + pub fn make_tag_removes_key( + fid: u32, + name: String, + target: Option<&ObjectRef>, + ) -> Result, HubError> { + if !target.is_some() || name.is_empty() { + return Err(HubError { + code: "bad_request.validation_failure".to_string(), + message: "tag provided without name or target".to_string(), + }); + } + let mut key = Vec::with_capacity(33 + 1 + 1 + 31); + + key.extend_from_slice(&make_user_key(fid)); + key.push(UserPostfix::TagRemoves as u8); // TagRemoves postfix, 1 byte + + key.extend_from_slice(&name.as_bytes().to_vec()); + + // target, 31 bytes + key.extend_from_slice(&make_ref_key(target.unwrap())); + + Ok(key) + } +} + +pub struct TagStore {} + +impl TagStore { + pub fn new( + db: Arc, + store_event_handler: Arc, + prune_size_limit: u32, + ) -> Store { + Store::new_with_store_def( + db, + store_event_handler, + Box::new(TagStoreDef { prune_size_limit }), + ) + } + + // pub fn get_tag_add( + // store: &Store, + // fid: u32, + // name: String, + // target: Option, + // ) -> Result, HubError> { + // let partial_message = protos::Message { + // data: Some(protos::MessageData { + // fid: fid as u64, + // r#type: MessageType::TagAdd.into(), + // body: Some(protos::message_data::Body::TagBody(TagBody { + // name: name.clone(), + // content: None, + // target: target.clone(), + // })), + // ..Default::default() + // }), + // ..Default::default() + // }; + + // store.get_add(&partial_message) + // } + + // pub fn js_get_tag_add(mut cx: FunctionContext) -> JsResult { + // let channel = cx.channel(); + + // let store = get_store(&mut cx)?; + + // let fid = cx.argument::(0).unwrap().value(&mut cx) as u32; + // let r#type = cx.argument::(1).map(|s| s.value(&mut cx))?; + + // let target_cast_id_buffer = cx.argument::(2)?; + // let target_cast_id_bytes = target_cast_id_buffer.as_slice(&cx); + // let target_cast_id = if target_cast_id_bytes.len() > 0 { + // match protos::CastId::decode(target_cast_id_bytes) { + // Ok(cast_id) => Some(cast_id), + // Err(e) => return cx.throw_error(e.to_string()), + // } + // } else { + // None + // }; + + // let target_url = cx.argument::(3).map(|s| s.value(&mut cx))?; + + // // We need at least one of target_cast_id or target_url + // if target_cast_id.is_none() && target_url.is_empty() { + // return cx.throw_error("target_cast_id or target_url is required"); + // } + + // let target = if target_cast_id.is_some() { + // Some(Target::TargetCastId(target_cast_id.unwrap())) + // } else { + // Some(Target::TargetUrl(target_url)) + // }; + + // let result = match Self::get_tag_add(&store, fid, r#type, target) { + // Ok(Some(message)) => message.encode_to_vec(), + // Ok(None) => cx.throw_error(format!( + // "{}/{} for {}", + // "not_found", "tagAddMessage not found", fid + // ))?, + // Err(e) => return hub_error_to_js_throw(&mut cx, e), + // }; + + // let (deferred, promise) = cx.promise(); + // deferred.settle_with(&channel, move |mut cx| { + // let mut js_buffer = cx.buffer(result.len())?; + // js_buffer.as_mut_slice(&mut cx).copy_from_slice(&result); + // Ok(js_buffer) + // }); + + // Ok(promise) + // } + + // pub fn get_tag_remove( + // store: &Store, + // fid: u32, + // name: String, + // target: Option, + // ) -> Result, HubError> { + // let partial_message = protos::Message { + // data: Some(protos::MessageData { + // fid: fid as u64, + // r#type: MessageType::TagRemove.into(), + // body: Some(protos::message_data::Body::TagBody(TagBody { + // name: name.clone(), + // content: None, + // target: target.clone(), + // })), + // ..Default::default() + // }), + // ..Default::default() + // }; + + // let r = store.get_remove(&partial_message); + // // println!("got tag remove: {:?}", r); + + // r + // } + + // pub fn js_get_tag_remove(mut cx: FunctionContext) -> JsResult { + // let store = get_store(&mut cx)?; + + // let fid = cx.argument::(0).unwrap().value(&mut cx) as u32; + // let r#type = cx.argument::(1).map(|s| s.value(&mut cx))?; + + // let target_cast_id_buffer = cx.argument::(2)?; + // let target_cast_id_bytes = target_cast_id_buffer.as_slice(&cx); + // let target_cast_id = if target_cast_id_bytes.len() > 0 { + // match protos::CastId::decode(target_cast_id_bytes) { + // Ok(cast_id) => Some(cast_id), + // Err(e) => return cx.throw_error(e.to_string()), + // } + // } else { + // None + // }; + + // let target_url = cx.argument::(3).map(|s| s.value(&mut cx))?; + + // // We need at least one of target_cast_id or target_url + // if target_cast_id.is_none() && target_url.is_empty() { + // return cx.throw_error("target_cast_id or target_url is required"); + // } + + // let target = if target_cast_id.is_some() { + // Some(Target::TargetCastId(target_cast_id.unwrap())) + // } else { + // Some(Target::TargetUrl(target_url)) + // }; + + // let result = match TagStore::get_tag_remove(&store, fid, r#type, target) { + // Ok(Some(message)) => message.encode_to_vec(), + // Ok(None) => cx.throw_error(format!( + // "{}/{} for {}", + // "not_found", "tagRemoveMessage not found", fid + // ))?, + // Err(e) => return hub_error_to_js_throw(&mut cx, e), + // }; + + // let channel = cx.channel(); + // let (deferred, promise) = cx.promise(); + // deferred.settle_with(&channel, move |mut cx| { + // let mut js_buffer = cx.buffer(result.len())?; + // js_buffer.as_mut_slice(&mut cx).copy_from_slice(&result); + // Ok(js_buffer) + // }); + + // Ok(promise) + // } + + pub fn get_tag_adds_by_fid( + store: &Store, + fid: u32, + name: String, + page_options: &PageOptions, + ) -> Result { + store.get_adds_by_fid( + fid, + page_options, + Some(|message: &Message| { + if let Some(tag_body) = &message.data.as_ref().unwrap().body { + if let protos::message_data::Body::TagBody(tag_body) = tag_body { + if name == "" || tag_body.name == name { + return true; + } + } + } + + false + }), + ) + } + + pub fn create_tag_store(mut cx: FunctionContext) -> JsResult>> { + let db_js_box = cx.argument::>>(0)?; + let db = (**db_js_box.borrow()).clone(); + + // Read the StoreEventHandler + let store_event_handler_js_box = cx.argument::>>(1)?; + let store_event_handler = (**store_event_handler_js_box.borrow()).clone(); + + // Read the prune size limit and prune time limit from the options + let prune_size_limit = cx + .argument::(2) + .map(|n| n.value(&mut cx) as u32)?; + + Ok(cx.boxed(Arc::new(TagStore::new( + db, + store_event_handler, + prune_size_limit, + )))) + } + + pub fn js_get_tag_adds_by_fid(mut cx: FunctionContext) -> JsResult { + let store = get_store(&mut cx)?; + + let fid = cx.argument::(0).unwrap().value(&mut cx) as u32; + let r#type = cx.argument::(1).map(|s| s.value(&mut cx))?; + + let page_options = get_page_options(&mut cx, 2)?; + let channel = cx.channel(); + let (deferred, promise) = cx.promise(); + + THREAD_POOL.lock().unwrap().execute(move || { + let messages = + TagStore::get_tag_adds_by_fid(&store, fid, r#type, &page_options); + + deferred_settle_messages(deferred, &channel, messages); + }); + + Ok(promise) + } + + pub fn get_tag_removes_by_fid( + store: &Store, + fid: u32, + name: String, + page_options: &PageOptions, + ) -> Result { + store.get_removes_by_fid( + fid, + page_options, + Some(|message: &Message| { + if let Some(tag_body) = &message.data.as_ref().unwrap().body { + if let protos::message_data::Body::TagBody(tag_body) = tag_body { + if tag_body.name == name { + return true; + } + } + } + + false + }), + ) + } + + pub fn js_get_tag_removes_by_fid(mut cx: FunctionContext) -> JsResult { + let store = get_store(&mut cx)?; + + let fid = cx.argument::(0).unwrap().value(&mut cx) as u32; + let r#type = cx.argument::(1).map(|s| s.value(&mut cx))?; + + let page_options = get_page_options(&mut cx, 2)?; + let channel = cx.channel(); + let (deferred, promise) = cx.promise(); + + THREAD_POOL.lock().unwrap().execute(move || { + let messages = TagStore::get_tag_removes_by_fid( + &store, + fid, + r#type, + &page_options, + ); + + deferred_settle_messages(deferred, &channel, messages); + }); + + Ok(promise) + } + + pub fn get_tags_by_target( + store: &Store, + target: &ObjectRef, + fid: u32, + name: String, + page_options: &PageOptions, + ) -> Result { + let prefix = TagStoreDef::make_tags_by_target_key(target, fid, None); + + let mut message_keys = vec![]; + let mut last_key = vec![]; + + store + .db() + .for_each_iterator_by_prefix(&prefix, page_options, |key, value| { + if name.is_empty() || value.eq(name.as_bytes()) + { + let (fid_offset, ts_hash_offset) = if fid > 0 { + (prefix.len() - 4, prefix.len()) + } else { + // Set the default values + (prefix.len(), prefix.len() + 4) + }; + + let fid = + u32::from_be_bytes(key[fid_offset..ts_hash_offset].try_into().unwrap()); + let ts_hash = key[ts_hash_offset..ts_hash_offset + TS_HASH_LENGTH] + .try_into() + .unwrap(); + let message_primary_key = crate::store::message::make_message_primary_key( + fid, + store.postfix(), + Some(&ts_hash), + ); + + message_keys.push(message_primary_key.to_vec()); + if message_keys.len() >= page_options.page_size.unwrap_or(PAGE_SIZE_MAX) { + last_key = key.to_vec(); + return Ok(true); // Stop iterating + } + } + + Ok(false) // Continue iterating + })?; + + let messages_bytes = + message::get_many_messages_as_bytes(store.db().borrow(), message_keys)?; + let next_page_token = if last_key.len() > 0 { + Some(last_key[prefix.len()..].to_vec()) + } else { + None + }; + + Ok(MessagesPage { + messages_bytes, + next_page_token, + }) + } + + pub fn js_get_tags_by_target(mut cx: FunctionContext) -> JsResult { + let store = get_store(&mut cx)?; + + let target_buffer = cx.argument::(0)?; + let target_bytes = target_buffer.as_slice(&cx); + let target = if target_bytes.len() > 0 { + match protos::ObjectRef::decode(target_bytes) { + Ok(object_ref) => Some(object_ref), + Err(e) => return cx.throw_error(e.to_string()), + } + } else { + None + }; + + if target.is_none() { + return cx.throw_error("target_cast_id is required"); + } + + let fid = cx.argument::(1).unwrap().value(&mut cx) as u32; + + let name = cx.argument::(2).map(|s| s.value(&mut cx))?; + + let page_options = get_page_options(&mut cx, 3)?; + + let channel = cx.channel(); + let (deferred, promise) = cx.promise(); + + THREAD_POOL.lock().unwrap().execute(move || { + let messages = TagStore::get_tags_by_target( + &store, + target.as_ref().unwrap(), + fid, + name, + &page_options, + ); + + deferred_settle_messages(deferred, &channel, messages); + }); + + Ok(promise) + } +} diff --git a/apps/hubble/src/addon/src/store/utils.rs b/apps/hubble/src/addon/src/store/utils.rs index e301b27197..d82b8e09ff 100644 --- a/apps/hubble/src/addon/src/store/utils.rs +++ b/apps/hubble/src/addon/src/store/utils.rs @@ -1,7 +1,9 @@ -use super::{HubError, MessagesPage, PageOptions, Store, FARCASTER_EPOCH}; +use super::{HubError, MessagesPage, PageOptions, Store, FARCASTER_EPOCH, make_fid_key, UserPostfix}; use crate::{ db::{JsIteratorOptions, RocksDB}, trie::merkle_trie::{MerkleTrie, NodeMetadata}, + protos::{ObjectRef, ObjectRefTypes, FarcasterNetwork}, + }; use neon::{ context::{Context, FunctionContext, TaskContext}, @@ -256,6 +258,56 @@ pub fn deferred_settle_messages( }); } +/** Object Ref Utils */ + +pub enum TargetTypePrefix { + H1Object = 1, + H2Object = 2, + Fid = 3, +} + +fn make_object_ref_key(object_key: &ObjectRef, set: UserPostfix) -> Vec { + let mut key = Vec::with_capacity(1 + 1 + 20); + if object_key.network.is_some() + && object_key.network.unwrap() == FarcasterNetwork::Mainnet as i32 { + key.push(TargetTypePrefix::H1Object as u8); + } else { + // Should we have a specific network for H2 + key.push(TargetTypePrefix::H2Object as u8); + } + key.push(set as u8); + if object_key.hash.is_some() { + key.extend_from_slice(&object_key.hash.as_ref().unwrap()); + } + + key +} + +pub fn make_object_ref_fid_key(fid: u32) -> Vec { + let mut key = Vec::with_capacity(1 + 4); + key.push(TargetTypePrefix::Fid as u8); + key.extend_from_slice(&make_fid_key(fid)); + key +} + +pub fn make_ref_key(object_ref: &ObjectRef) -> Vec { + if object_ref.r#type == ObjectRefTypes::Fid as i32 { + make_object_ref_fid_key(object_ref.fid as u32) + } else if object_ref.r#type == ObjectRefTypes::Cast as i32 { + make_object_ref_key(object_ref, UserPostfix::CastMessage) + } else if object_ref.r#type == ObjectRefTypes::Object as i32 { + make_object_ref_key(object_ref, UserPostfix::ObjectMessage) + } else { // assume Relationship type + make_object_ref_key(object_ref, UserPostfix::RelationshipMessage) + } + // match object_ref { + // Ref::Fid(fid) => make_object_ref_fid_key(*fid as u32), + // Ref::CastKey(cast_key) => make_object_key_key(cast_key, UserPostfix::CastMessage), + // Ref::ObjectKey(obj_key) => make_object_key_key(obj_key, UserPostfix::ObjectMessage), + // Ref::RelationshipKey(relationship_key) => make_object_key_key(relationship_key, UserPostfix::RelationshipMessage), + // } +} + #[allow(dead_code)] pub fn to_farcaster_time(time_ms: u64) -> Result { if time_ms < FARCASTER_EPOCH { diff --git a/apps/hubble/src/hubble.ts b/apps/hubble/src/hubble.ts index 3d1f810025..9763cb8310 100644 --- a/apps/hubble/src/hubble.ts +++ b/apps/hubble/src/hubble.ts @@ -117,7 +117,7 @@ export const FARCASTER_VERSIONS_SCHEDULE: VersionSchedule[] = [ { version: "2023.12.27", expiresAt: 1708473600000 }, // expires at 2/21/24 00:00 UTC { version: "2024.2.7", expiresAt: 1712102400000 }, // expires at 4/3/24 00:00 UTC { version: "2024.3.20", expiresAt: 1715731200000 }, // expires at 5/15/24 00:00 UTC - { version: "2024.5.1", expiresAt: 1719360000000 }, // expires at 6/26/24 00:00 UTC + { version: "2024.5.1", expiresAt: 1819360000000 }, // expires at 6/26/24 00:00 UTC ]; const MAX_CONTACT_INFO_AGE_MS = GOSSIP_SEEN_TTL; diff --git a/apps/hubble/src/rpc/httpServer.ts b/apps/hubble/src/rpc/httpServer.ts index 7f9ca586a8..c515fc1e4d 100644 --- a/apps/hubble/src/rpc/httpServer.ts +++ b/apps/hubble/src/rpc/httpServer.ts @@ -7,6 +7,7 @@ import { HubResult, HubServiceServer, Message, + ObjectResponse, MessagesResponse, OnChainEvent, OnChainEventResponse, @@ -23,6 +24,11 @@ import { ValidationResponse, base58ToBytes, bytesToBase58, + ObjectRef, + ObjectRefTypes, + RefDirection, + FarcasterNetwork, + ObjectResponseList, } from "@farcaster/hub-nodejs"; import { Metadata, ServerUnaryCall } from "@grpc/grpc-js"; import fastify from "fastify"; @@ -75,14 +81,14 @@ function getCallObject( } // Generic handler for grpc methods's responses -function handleResponse(reply: fastify.FastifyReply, obj: StaticEncodable): sendUnaryData { +function handleResponse(reply: fastify.FastifyReply, obj: StaticEncodable, convertToStringKeys = true): sendUnaryData { return (err, response) => { if (err) { reply.code(400).type("application/json").send(JSON.stringify(err)); } else { if (response) { // Convert the protobuf object to JSON - const json = protoToJSON(response, obj); + const json = protoToJSON(response, obj, convertToStringKeys); reply.send(json); } else { reply.send(err); @@ -136,7 +142,7 @@ const BACKWARDS_COMPATIBILITY_MAP: Record = { * before returning them. */ // biome-ignore lint/suspicious/noExplicitAny: -function transformHash(obj: any): any { +function transformHash(obj: any, convertToStringKeys = true): any { if (obj === null || typeof obj !== "object") { return obj; } @@ -154,7 +160,7 @@ function transformHash(obj: any): any { if (obj.hasOwnProperty(key)) { if (toHexKeys.includes(key) && typeof obj[key] === "string") { obj[key] = convertB64ToHex(obj[key]); - } else if (toStringKeys.includes(key) && typeof obj[key] === "string") { + } else if (convertToStringKeys && toStringKeys.includes(key) && typeof obj[key] === "string") { obj[key] = Buffer.from(obj[key], "base64").toString("utf-8"); } else if (toHexOrBase58Keys.includes(key) && typeof obj[key] === "string") { // We need to convert solana related bytes to base58 @@ -164,7 +170,7 @@ function transformHash(obj: any): any { obj[key] = convertB64ToHex(obj[key]); } } else if (typeof obj[key] === "object") { - transformHash(obj[key]); + transformHash(obj[key], convertToStringKeys); } const backwardsCompatibleName = BACKWARDS_COMPATIBILITY_MAP[key]; @@ -178,8 +184,8 @@ function transformHash(obj: any): any { } // Generic function to convert protobuf objects to JSON -export function protoToJSON(message: T, obj: StaticEncodable): unknown { - return transformHash(obj.toJSON(message)); +export function protoToJSON(message: T, obj: StaticEncodable, convertToStringKeys = true): unknown { + return transformHash(obj.toJSON(message), convertToStringKeys); } // Get a protobuf enum value from a string or number @@ -215,6 +221,30 @@ function getPageOptions(query: QueryPageParams): PageOptions { }; } +const createObjectRef = (query: { + ref_type: string, + object_ref_network: string, object_ref_fid: string, object_ref_hash: string, +}) => { + const { ref_type, object_ref_network, object_ref_fid, object_ref_hash } = query; + let target; + let refType = Number.parseInt(ref_type); + let fid = Number.parseInt(object_ref_fid); + let network = Number.parseInt(object_ref_network) as FarcasterNetwork; + if (refType == ObjectRefTypes.FID) { + target = ObjectRef.create({ type: ObjectRefTypes.FID, fid }); + } else if (object_ref_network && fid && object_ref_hash) { + target = ObjectRef.create({ + type: ObjectRefTypes.OBJECT, + network: network, + fid, + hash: hexStringToBytes(object_ref_hash).unwrapOr(new Uint8Array()), + }); + } else { + return; + } + return target; +}; + export class HttpAPIServer { grpcImpl: HubServiceServer; engine: Engine; @@ -400,6 +430,231 @@ export class HttpAPIServer { }, ); + //=================Tags================= + // @doc-tag: /tagById?fid=...&target_fid=...&target_hash=...&value=... + this.app.get<{ + Querystring: { value: string; fid: string; target_fid: string; target_hash: string }; + }>("/v1/tagById", (request, reply) => { + const { fid, target_fid, target_hash } = request.query; + + const call = getCallObject( + "getTag", + { + fid: parseInt(fid), + targetCastId: { fid: parseInt(target_fid), hash: hexStringToBytes(target_hash).unwrapOr([]) }, + value: request.query.value, + }, + request, + ); + + this.grpcImpl.getTag(call, handleResponse(reply, Message, false)); + }); + + // @doc-tag: /tagsByFid?fid=...&value=... + this.app.get<{ Querystring: { value: string; fid: string } & QueryPageParams }>( + "/v1/tagsByFid", + (request, reply) => { + const { fid, value } = request.query; + const pageOptions = getPageOptions(request.query); + + const call = getCallObject( + "getTagsByFid", + { + fid: parseInt(fid), + value, + ...pageOptions, + }, + request, + ); + + this.grpcImpl.getTagsByFid(call, handleResponse(reply, MessagesResponse, false)); + }, + ); + + // @doc-tag: /reactionsByCast?target_fid=...&target_hash=...&value=... + // this.app.get<{ + // Querystring: { target_fid: string; target_hash: string; value: string } & QueryPageParams; + // }>("/v1/tagsByCast", (request, reply) => { + // const { target_fid, target_hash } = request.query; + // const pageOptions = getPageOptions(request.query); + + // const call = getCallObject( + // "getTagsByCast", + // { + // target: { + // fid: 301932, + // }, + // name: request.query.name, + // ...pageOptions, + // }, + // request, + // ); + + // this.grpcImpl.getTagsByCast(call, handleResponse(reply, MessagesResponse)); + // }); + + // @doc-tag: /tagsByTarget?ref_type=Cast/Object/Fid,object_ref_network=...&object_ref_fid=...&object_ref_hash=...&name=... + this.app.get<{ Querystring: { + ref_type: string, + object_ref_network: string, object_ref_fid: string, object_ref_hash: string, + name: string + } & QueryPageParams }>( + "/v1/tagsByTarget", + (request, reply) => { + const { + ref_type, + name, + } = request.query; + const pageOptions = getPageOptions(request.query); + + let target = createObjectRef(request.query); + if (!target) { + reply.code(400).send({ + error: "Invalid URL params", + errorDetail: `For ${ref_type} object reference type, object_ref_network, object_ref_hash and object_ref_hash are required`, + }); + return; + } + + const call = getCallObject( + "getTagsByTarget", + { + target, + name, + ...pageOptions, + }, + request, + ); + + this.grpcImpl.getTagsByTarget(call, handleResponse(reply, MessagesResponse, false)); + }, + ); + + + //=================Objects================= + // @doc-tag: /objectById?fid=...&hash=... + this.app.get<{ + Querystring: { hash: string; fid: string, includeTags: string, creatorTagsOnly: string }; + }>("/v1/objectById", (request, reply) => { + const { fid, hash, includeTags, creatorTagsOnly} = request.query; + + const call = getCallObject( + "getObject", + { + fid: parseInt(fid), + hash: hexStringToBytes(hash).unwrapOr([]), + tagOptions: { + includeTags: includeTags === "true", + creatorTagsOnly: creatorTagsOnly === "true", + }, + }, + request, + ); + + this.grpcImpl.getObject(call, handleResponse(reply, ObjectResponse, false)); + }); + + // @doc-tag: /objectsByFid?fid=...&type=... + this.app.get<{ Querystring: { type: string; fid: string } & QueryPageParams }>( + "/v1/objectsByFid", + (request, reply) => { + const { fid, type } = request.query; + const pageOptions = getPageOptions(request.query); + + const call = getCallObject( + "getObjectsByFid", + { + fid: parseInt(fid), + type, + ...pageOptions, + }, + request, + ); + this.grpcImpl.getObjectsByFid(call, handleResponse(reply, ObjectResponseList, false)); + }, + ); + + //=================Relationships================= + // @doc-tag: /relationshipById?fid=...&hash=... + this.app.get<{ + Querystring: { hash: string; fid: string }; + }>("/v1/relationshipById", (request, reply) => { + const { fid, hash } = request.query; + + const call = getCallObject( + "getRelationship", + { + fid: parseInt(fid), + hash: hexStringToBytes(hash).unwrapOr([]), + }, + request, + ); + + this.grpcImpl.getRelationship(call, handleResponse(reply, Message)); + }); + + // @doc-tag: /relationshipsByFid?fid=...&type=... + this.app.get<{ Querystring: { type: string; fid: string } & QueryPageParams }>( + "/v1/relationshipsByFid", + (request, reply) => { + const { fid, type } = request.query; + const pageOptions = getPageOptions(request.query); + + const call = getCallObject( + "getRelationshipsByFid", + { + fid: parseInt(fid), + type, + ...pageOptions, + }, + request, + ); + + this.grpcImpl.getRelationshipsByFid(call, handleResponse(reply, MessagesResponse)); + }, + ); + + // @doc-tag: /relationshipsByRelatedObjectRef?ref_type=Cast/Object/Fid,object_ref_network=...&object_ref_fid=...&object_ref_hash=...&ref_direction=Source/Target&type=... + this.app.get<{ Querystring: { + ref_type: string, + object_ref_network: string, object_ref_fid: string, object_ref_hash: string, + ref_direction: string, + type: string, + } & QueryPageParams }>( + "/v1/relationshipsByRelatedObjectRef", + (request, reply) => { + const { + ref_type, + ref_direction, + type, + } = request.query; + const pageOptions = getPageOptions(request.query); + + let relatedObjectRef = createObjectRef(request.query); + + let refDirection = Number.parseInt(ref_direction) as RefDirection; + if (!relatedObjectRef) { + reply.code(400).send({ + error: "Invalid URL params", + errorDetail: `For ${ref_type} object reference type, object_ref_network, object_ref_hash and object_ref_hash are required`, + }); + return; + } + const call = getCallObject( + "getRelationshipsByRelatedObjectRef", + { + relatedObjectRef, + refDirection, + type, + ...pageOptions, + }, + request, + ); + + this.grpcImpl.getRelationshipsByRelatedObjectRef(call, handleResponse(reply, MessagesResponse)); + }, + ); + //=================Links================= // @doc-tag: /linkById?fid=...&target_fid=...&link_type=... this.app.get<{ Querystring: { link_type: string; fid: string; target_fid: string } }>( diff --git a/apps/hubble/src/rpc/server.ts b/apps/hubble/src/rpc/server.ts index f2e9743a69..3e7d78e18f 100644 --- a/apps/hubble/src/rpc/server.ts +++ b/apps/hubble/src/rpc/server.ts @@ -4,6 +4,8 @@ import { CastRemoveMessage, ContactInfoResponse, DbStats, + ObjectResponse, + ObjectResponseList, FidsResponse, getServer, HubError, @@ -19,6 +21,10 @@ import { Metadata, ReactionAddMessage, ReactionRemoveMessage, + TagAddMessage, + TagRemoveMessage, + ObjectAddMessage, + RelationshipAddMessage, Server as GrpcServer, ServerCredentials, ServiceError, @@ -858,6 +864,83 @@ export default class Server { }, ); }, + getTag: async (call, callback) => { + const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); + log.debug({ method: "getTag", req: call.request }, `RPC call from ${peer}`); + + const request = call.request; + + const reactionResult = await this.engine?.getTag( + request.fid, + request.value, + request.targetCastId ?? request.targetUrl ?? "", + ); + reactionResult?.match( + (tag: TagAddMessage) => { + callback(null, tag); + }, + (err: HubError) => { + callback(toServiceError(err)); + }, + ); + }, + getTagsByFid: async (call, callback) => { + const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); + log.debug({ method: "getTagsByFid", req: call.request }, `RPC call from ${peer}`); + + const { fid, value, pageSize, pageToken, reverse } = call.request; + const tagsResult = await this.engine?.getTagsByFid(fid, value, { + pageSize, + pageToken, + reverse, + }); + tagsResult?.match( + (page: MessagesPage) => { + callback(null, messagesPageToResponse(page)); + }, + (err: HubError) => { + callback(toServiceError(err)); + }, + ); + }, + getTagsByCast: async (call, callback) => { + const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); + log.debug({ method: "getTagsByCast", req: call.request }, `RPC call from ${peer}`); + + const { target, fid, name, pageSize, pageToken, reverse } = call.request; + const reactionsResult = await this.engine?.getTagsByTarget(target, fid, name, { + pageSize, + pageToken, + reverse, + }); + reactionsResult?.match( + (page: MessagesPage) => { + callback(null, messagesPageToResponse(page)); + }, + (err: HubError) => { + callback(toServiceError(err)); + }, + ); + }, + getTagsByTarget: async (call, callback) => { + const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); + log.debug({ method: "getTagsByTarget", req: call.request }, `RPC call from ${peer}`); + + const { target, fid, name, pageSize, pageToken, reverse } = call.request; + const reactionsResult = await this.engine?.getTagsByTarget(target, fid, name, { + pageSize, + pageToken, + reverse, + }); + reactionsResult?.match( + (page: MessagesPage) => { + callback(null, messagesPageToResponse(page)); + }, + (err: HubError) => { + callback(toServiceError(err)); + }, + ); + }, getUserData: async (call, callback) => { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); log.debug({ method: "getUserData", req: call.request }, `RPC call from ${peer}`); @@ -874,6 +957,106 @@ export default class Server { }, ); }, + getObject: async (call, callback) => { + const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); + log.debug({ method: "getObject", req: call.request }, `RPC call from ${peer}`); + + const request = call.request; + + let tagOptions = request.tagOptions || { + includeTags: false, + creatorTagsOnly: true, + }; + + const objectAddResult = await this.engine?.getObject(request.fid, request.hash, tagOptions); + objectAddResult?.match( + (objectResponse: ObjectResponse) => { + callback(null, objectResponse); + }, + (err: HubError) => { + callback(toServiceError(err)); + }, + ); + }, + getObjectsByFid: async (call, callback) => { + const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); + log.debug({ method: "getObjectsByFid", req: call.request }, `RPC call from ${peer}`); + + const { fid, type, tagOptions, pageSize, pageToken, reverse } = call.request; + + const castsResult = await this.engine?.getObjectsByFid(fid, type, tagOptions, { + pageSize, + pageToken, + reverse, + }); + castsResult?.match( + (page: ObjectResponseList) => { + callback(null, page); + }, + (err: HubError) => { + callback(toServiceError(err)); + }, + ); + }, + getRelationship: async (call, callback) => { + const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); + log.debug({ method: "getRelationship", req: call.request }, `RPC call from ${peer}`); + + const request = call.request; + + const relationshipAddResult = await this.engine?.getRelationship(request.fid, request.hash); + relationshipAddResult?.match( + (objectAdd: RelationshipAddMessage) => { + callback(null, objectAdd); + }, + (err: HubError) => { + callback(toServiceError(err)); + }, + ); + }, + getRelationshipsByFid: async (call, callback) => { + const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); + log.debug({ method: "getRelationshipsByFid", req: call.request }, `RPC call from ${peer}`); + + const { fid, type, pageSize, pageToken, reverse } = call.request; + + const relationshipsResult = await this.engine?.getRelationshipsByFid(fid, type, { + pageSize, + pageToken, + reverse, + }); + relationshipsResult?.match( + (page: MessagesPage) => { + callback(null, messagesPageToResponse(page)); + }, + (err: HubError) => { + callback(toServiceError(err)); + }, + ); + }, + getRelationshipsByRelatedObjectRef: async (call, callback) => { + const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); + log.debug({ method: "getRelationshipsByRelatedObjectRef", req: call.request }, `RPC call from ${peer}`); + + const { relatedObjectRef, refDirection, type, pageSize, pageToken, reverse } = call.request; + + const relationshipsResult = await this.engine?.getRelationshipsByRelatedObjectRef( + refDirection, + relatedObjectRef, + type, { + pageSize, + pageToken, + reverse, + }); + relationshipsResult?.match( + (page: MessagesPage) => { + callback(null, messagesPageToResponse(page)); + }, + (err: HubError) => { + callback(toServiceError(err)); + }, + ); + }, getUserDataByFid: async (call, callback) => { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); log.debug({ method: "getUserDataByFid", req: call.request }, `RPC call from ${peer}`); @@ -1163,6 +1346,25 @@ export default class Server { }, ); }, + getAllTagMessagesByFid: async (call, callback) => { + const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); + log.debug({ method: "getAllTagMessagesByFid", req: call.request }, `RPC call from ${peer}`); + + const { fid, pageSize, pageToken, reverse } = call.request; + const result = await this.engine?.getAllTagMessagesByFid(fid, { + pageSize, + pageToken, + reverse, + }); + result?.match( + (page: MessagesPage) => { + callback(null, messagesPageToResponse(page)); + }, + (err: HubError) => { + callback(toServiceError(err)); + }, + ); + }, getAllVerificationMessagesByFid: async (call, callback) => { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); log.debug({ method: "getAllVerificationMessagesByFid", req: call.request }, `RPC call from ${peer}`); diff --git a/apps/hubble/src/rustfunctions.ts b/apps/hubble/src/rustfunctions.ts index 6fcc65580f..71057db826 100644 --- a/apps/hubble/src/rustfunctions.ts +++ b/apps/hubble/src/rustfunctions.ts @@ -7,7 +7,7 @@ import { createRequire } from "module"; const require = createRequire(import.meta.url); const lib = require("./addon/index.node"); -import { HubError, HubErrorCode, HubResult, validations } from "@farcaster/hub-nodejs"; +import { HubError, HubErrorCode, HubResult, validations, ObjectRef } from "@farcaster/hub-nodejs"; import { PAGE_SIZE_MAX, PageOptions } from "./storage/stores/types.js"; import { UserMessagePostfix } from "./storage/db/types.js"; import { DbKeyValue, RocksDbIteratorOptions } from "./storage/db/rocksdb.js"; @@ -303,6 +303,37 @@ export const rsCreateReactionStore = ( return store as RustDynStore; }; +/** Create a tag Store */ +export const rsCreateTagStore = ( + db: RustDb, + eventHandler: RustStoreEventHandler, + pruneSizeLimit: number, +): RustDynStore => { + const store = lib.createTagStore(db, eventHandler, pruneSizeLimit); + + return store as RustDynStore; +}; + +export const rsCreateObjectStore = ( + db: RustDb, + eventHandler: RustStoreEventHandler, + pruneSizeLimit: number, +): RustDynStore => { + const store = lib.createObjectStore(db, eventHandler, pruneSizeLimit); + + return store as RustDynStore; +}; + +export const rsCreateRelationshipStore = ( + db: RustDb, + eventHandler: RustStoreEventHandler, + pruneSizeLimit: number, +): RustDynStore => { + const store = lib.createRelationshipStore(db, eventHandler, pruneSizeLimit); + + return store as RustDynStore; +}; + /** Create a cast Store */ export const rsCreateCastStore = ( db: RustDb, @@ -472,6 +503,137 @@ export const rsGetReactionsByTarget = async ( return await lib.getReactionsByTarget.call(store, targetCastIdBytes, targetUrl, type, pageOptions); }; +/** Tags **/ + +export const rsGetTagAdd = async ( + store: RustDynStore, + fid: number, + value: string, + targetCastIdBytes: Buffer, + targetUrl: string, +): Promise => { + return await lib.getTagAdd.call(store, fid, value, targetCastIdBytes, targetUrl); +}; + +export const rsGetTagRemove = async ( + store: RustDynStore, + fid: number, + value: string, + targetCastIdBytes: Buffer, + targetUrl: string, +): Promise => { + return await lib.getTagRemove.call(store, fid, value, targetCastIdBytes, targetUrl); +}; + +export const rsGetTagAddsByFid = async ( + store: RustDynStore, + fid: number, + value: string, + pageOptions: PageOptions = {}, +): Promise => { + return await lib.getTagAddsByFid.call(store, fid, value, pageOptions); +}; + +export const rsGetTagRemovesByFid = async ( + store: RustDynStore, + fid: number, + value?: string, + pageOptions: PageOptions = {}, +): Promise => { + return await lib.getTagRemovesByFid.call(store, fid, value, pageOptions); +}; + +export const rsGetTagsByTarget = async ( + store: RustDynStore, + target: Buffer, + fid: number, + value: string, + pageOptions: PageOptions = {}, +): Promise => { + return await lib.getTagsByTarget.call(store, target, fid, value, pageOptions); +}; + + +/** Objects **/ +export const rsGetObjectAdd = async ( + store: RustDynStore, + fid: number, + hashBytes: Buffer, +): Promise => { + return await lib.getObjectAdd.call(store, fid, hashBytes); +}; + +export const rsGetObjectRemove = async ( + store: RustDynStore, + fid: number, + hashBytes: Buffer, +): Promise => { + return await lib.getObjectRemove.call(store, fid, hashBytes); +}; + +export const rsGetObjectAddsByFid = async ( + store: RustDynStore, + fid: number, + type?: string, + pageOptions: PageOptions = {}, +): Promise => { + return await lib.getObjectAddsByFid.call(store, fid, type, pageOptions); +}; + +export const rsGetObjectRemovesByFid = async ( + store: RustDynStore, + fid: number, + type?: string, + pageOptions: PageOptions = {}, +): Promise => { + return await lib.getObjectRemovesByFid.call(store, fid, type, pageOptions); +}; + +/** Relationship **/ +export const rsGetRelationshipAdd = async ( + store: RustDynStore, + fid: number, + hashBytes: Buffer, +): Promise => { + return await lib.getRelationshipAdd.call(store, fid, hashBytes); +}; + +export const rsGetRelationshipRemove = async ( + store: RustDynStore, + fid: number, + hashBytes: Buffer, +): Promise => { + return await lib.getRelationshipRemove.call(store, fid, hashBytes); +}; + +export const rsGetRelationshipAddsByFid = async ( + store: RustDynStore, + fid: number, + type?: string, + pageOptions: PageOptions = {}, +): Promise => { + return await lib.getRelationshipAddsByFid.call(store, fid, type, pageOptions); +}; + +export const rsGetRelationshipRemovesByFid = async ( + store: RustDynStore, + fid: number, + type?: string, + pageOptions: PageOptions = {}, +): Promise => { + return await lib.getRelationshipRemovesByFid.call(store, fid, type, pageOptions); +}; + +export const rsGetRelationshipsByRelatedObjectRef = async ( + store: RustDynStore, + relatedObjectRef: Buffer, + refDirection: number, + type?: string, + pageOptions: PageOptions = {}, +): Promise => { + return await lib.getRelationshipsByRelatedObjectRef.call(store, relatedObjectRef, refDirection, type, pageOptions); +}; + /** UserData Store */ export const rsCreateUserDataStore = ( db: RustDb, diff --git a/apps/hubble/src/storage/db/message.ts b/apps/hubble/src/storage/db/message.ts index c1c48f72e7..61c7b0e52c 100644 --- a/apps/hubble/src/storage/db/message.ts +++ b/apps/hubble/src/storage/db/message.ts @@ -113,6 +113,18 @@ export const typeToSetPostfix = (type: MessageType): UserMessagePostfix => { return UserPostfix.LinkCompactStateMessage; } + if (type === MessageType.TAG_ADD || type === MessageType.TAG_REMOVE) { + return UserPostfix.TagMessage; + } + + if (type === MessageType.OBJECT_ADD || type === MessageType.OBJECT_REMOVE) { + return UserPostfix.ObjectMessage; + } + + if (type === MessageType.RELATIONSHIP_ADD || type === MessageType.RELATIONSHIP_REMOVE) { + return UserPostfix.RelationshipMessage; + } + throw new Error(`invalid type: ${type}`); }; diff --git a/apps/hubble/src/storage/db/types.ts b/apps/hubble/src/storage/db/types.ts index 11c38f394d..04bcd2d047 100644 --- a/apps/hubble/src/storage/db/types.ts +++ b/apps/hubble/src/storage/db/types.ts @@ -78,6 +78,9 @@ export enum RootPrefix { /* Used to index fname username proofs by fid */ FNameUserNameProofByFid = 27, + + /* Used to index reactions by target */ + TagsByTarget = 28, } /** @@ -101,6 +104,16 @@ export enum UserPostfix { UsernameProofMessage = 7, // Add new message types here + + /* Tag */ + TagMessage = 8, + + /* Generic Object */ + ObjectMessage = 9, + + /* Generic Relationship */ + RelationshipMessage = 10, + // NOTE: If you add a new message type, make sure that it is only used to store Message protobufs. // If you need to store an index, use one of the UserPostfix values below (>86). @@ -136,6 +149,18 @@ export enum UserPostfix { /* Link Compact State set */ LinkCompactStateMessage = 100, + + /** TagStore add and remove sets */ + TagAdds = 101, + TagRemoves = 102, + + /** ObjectStore add and remove sets */ + ObjectAdds = 103, + ObjectRemoves = 104, + + /** RelationshipStore add and remove sets */ + RelationshipAdds = 105, + RelationshipRemoves = 106, } export enum OnChainEventPostfix { @@ -160,4 +185,7 @@ export type UserMessagePostfix = | UserPostfix.ReactionMessage | UserPostfix.UserDataMessage | UserPostfix.UsernameProofMessage + | UserPostfix.TagMessage + | UserPostfix.ObjectMessage + | UserPostfix.RelationshipMessage | UserPostfix.LinkCompactStateMessage; diff --git a/apps/hubble/src/storage/engine/index.ts b/apps/hubble/src/storage/engine/index.ts index 3748b62167..d2cbae4358 100644 --- a/apps/hubble/src/storage/engine/index.ts +++ b/apps/hubble/src/storage/engine/index.ts @@ -4,6 +4,8 @@ import { bytesToUtf8String, CastAddMessage, CastId, + ObjectResponse, + ObjectTagRequest, CastRemoveMessage, FarcasterNetwork, getDefaultStoreLimit, @@ -25,6 +27,7 @@ import { MergeUsernameProofHubEvent, Message, MessageType, + ObjectRef, OnChainEvent, OnChainEventResponse, OnChainEventType, @@ -38,6 +41,12 @@ import { StorageLimit, StorageLimitsResponse, StoreType, + TagAddMessage, + TagRemoveMessage, + ObjectAddMessage, + ObjectRemoveMessage, + RelationshipAddMessage, + RelationshipRemoveMessage, UserDataAddMessage, UserDataType, UserNameProof, @@ -46,31 +55,37 @@ import { validations, VerificationAddAddressMessage, VerificationRemoveMessage, + ObjectRefTypes, + RefDirection, + ObjectResponseList, } from "@farcaster/hub-nodejs"; -import { err, ok, ResultAsync } from "neverthrow"; +import {err, ok, ResultAsync} from "neverthrow"; import fs from "fs"; -import { Worker } from "worker_threads"; -import { getMessage, getMessagesBySignerPrefix, typeToSetPostfix } from "../db/message.js"; +import {Worker} from "worker_threads"; +import {getMessage, getMessagesBySignerPrefix, typeToSetPostfix} from "../db/message.js"; import RocksDB from "../db/rocksdb.js"; -import { TSHASH_LENGTH, UserPostfix } from "../db/types.js"; +import {TSHASH_LENGTH, UserPostfix} from "../db/types.js"; import CastStore from "../stores/castStore.js"; import LinkStore from "../stores/linkStore.js"; import ReactionStore from "../stores/reactionStore.js"; +import TagStore from "../stores/tagStore.js"; +import ObjectStore from "../stores/objectStore.js"; +import RelationshipStore from "../stores/relationshipStore.js"; import StoreEventHandler from "../stores/storeEventHandler.js"; -import { DEFAULT_PAGE_SIZE, MessagesPage, PageOptions } from "../stores/types.js"; +import {DEFAULT_PAGE_SIZE, MessagesPage, PageOptions} from "../stores/types.js"; import UserDataStore from "../stores/userDataStore.js"; import VerificationStore from "../stores/verificationStore.js"; -import { logger } from "../../utils/logger.js"; -import { RevokeMessagesBySignerJobQueue, RevokeMessagesBySignerJobWorker } from "../jobs/revokeMessagesBySignerJob.js"; -import { ensureAboveTargetFarcasterVersion } from "../../utils/versions.js"; -import { PublicClient } from "viem"; -import { normalize } from "viem/ens"; +import {logger} from "../../utils/logger.js"; +import {RevokeMessagesBySignerJobQueue, RevokeMessagesBySignerJobWorker} from "../jobs/revokeMessagesBySignerJob.js"; +import {ensureAboveTargetFarcasterVersion} from "../../utils/versions.js"; +import {PublicClient} from "viem"; +import {normalize} from "viem/ens"; import UsernameProofStore from "../stores/usernameProofStore.js"; import OnChainEventStore from "../stores/onChainEventStore.js"; -import { consumeRateLimitByKey, getRateLimiterForTotalMessages, isRateLimitedByKey } from "../../utils/rateLimits.js"; -import { rsValidationMethods } from "../../rustfunctions.js"; -import { RateLimiterAbstract } from "rate-limiter-flexible"; -import { TypedEmitter } from "tiny-typed-emitter"; +import {consumeRateLimitByKey, getRateLimiterForTotalMessages, isRateLimitedByKey} from "../../utils/rateLimits.js"; +import {rsValidationMethods} from "../../rustfunctions.js"; +import {RateLimiterAbstract} from "rate-limiter-flexible"; +import {TypedEmitter} from "tiny-typed-emitter"; export const NUM_VALIDATION_WORKERS = 2; @@ -122,6 +137,9 @@ class Engine extends TypedEmitter { private _linkStore: LinkStore; private _reactionStore: ReactionStore; + private _tagStore: TagStore; + private _objectStore: ObjectStore; + private _relationshipStore: RelationshipStore; private _castStore: CastStore; private _userDataStore: UserDataStore; private _verificationStore: VerificationStore; @@ -158,6 +176,9 @@ class Engine extends TypedEmitter { this._linkStore = new LinkStore(db, this.eventHandler); this._reactionStore = new ReactionStore(db, this.eventHandler); + this._tagStore = new TagStore(db, this.eventHandler); + this._objectStore = new ObjectStore(db, this.eventHandler); + this._relationshipStore = new RelationshipStore(db, this.eventHandler); this._castStore = new CastStore(db, this.eventHandler); this._userDataStore = new UserDataStore(db, this.eventHandler); this._verificationStore = new VerificationStore(db, this.eventHandler); @@ -169,6 +190,9 @@ class Engine extends TypedEmitter { this._totalPruneSize = this._linkStore.pruneSizeLimit + this._reactionStore.pruneSizeLimit + + this._tagStore.pruneSizeLimit + + this._objectStore.pruneSizeLimit + + this._relationshipStore.pruneSizeLimit + this._castStore.pruneSizeLimit + this._userDataStore.pruneSizeLimit + this._verificationStore.pruneSizeLimit + @@ -273,6 +297,8 @@ class Engine extends TypedEmitter { const mergeResults: Map> = new Map(); const validatedMessages: IndexedMessage[] = []; + messages.forEach(m => console.log('VIC -- About to merge', m, JSON.stringify(m.data, null, 2))); + // Validate all messages first await Promise.all( messages.map(async (message, i) => { @@ -334,6 +360,9 @@ class Engine extends TypedEmitter { async mergeMessagesToStore(messages: Message[]): Promise>> { const linkMessages: IndexedMessage[] = []; const reactionMessages: IndexedMessage[] = []; + const tagMessages: IndexedMessage[] = []; + const objectMessages: IndexedMessage[] = []; + const relationshipMessages: IndexedMessage[] = []; const castMessages: IndexedMessage[] = []; const userDataMessages: IndexedMessage[] = []; const verificationMessages: IndexedMessage[] = []; @@ -361,6 +390,18 @@ class Engine extends TypedEmitter { reactionMessages.push({ i, message }); break; } + case UserPostfix.TagMessage: { + tagMessages.push({ i, message }); + break; + } + case UserPostfix.ObjectMessage: { + objectMessages.push({ i, message }); + break; + } + case UserPostfix.RelationshipMessage: { + relationshipMessages.push({ i, message }); + break; + } case UserPostfix.CastMessage: { castMessages.push({ i, message }); break; @@ -386,6 +427,9 @@ class Engine extends TypedEmitter { const stores = [ this._linkStore, this._reactionStore, + this._tagStore, + this._objectStore, + this._relationshipStore, this._castStore, this._userDataStore, this._verificationStore, @@ -394,6 +438,9 @@ class Engine extends TypedEmitter { const messagesByStore = [ linkMessages, reactionMessages, + tagMessages, + objectMessages, + relationshipMessages, castMessages, userDataMessages, verificationMessages, @@ -475,6 +522,15 @@ class Engine extends TypedEmitter { case UserPostfix.ReactionMessage: { return this._reactionStore.revoke(message.value); } + case UserPostfix.TagMessage: { + return this._tagStore.revoke(message.value); + } + case UserPostfix.ObjectMessage: { + return this._objectStore.revoke(message.value); + } + case UserPostfix.RelationshipMessage: { + return this._relationshipStore.revoke(message.value); + } case UserPostfix.CastMessage: { return this._castStore.revoke(message.value); } @@ -538,6 +594,15 @@ class Engine extends TypedEmitter { const reactionResult = await this._reactionStore.pruneMessages(fid); totalPruned += logPruneResult(reactionResult, "reaction"); + const tagResult = await this._tagStore.pruneMessages(fid); + totalPruned += logPruneResult(tagResult, "tag"); + + const objectResult = await this._objectStore.pruneMessages(fid); + totalPruned += logPruneResult(objectResult, "object"); + + const relationshipResult = await this._relationshipStore.pruneMessages(fid); + totalPruned += logPruneResult(relationshipResult, "relationship"); + const verificationResult = await this._verificationStore.pruneMessages(fid); totalPruned += logPruneResult(verificationResult, "verification"); @@ -569,6 +634,15 @@ class Engine extends TypedEmitter { case UserPostfix.ReactionMessage: { return this._reactionStore.revoke(message); } + case UserPostfix.TagMessage: { + return this._tagStore.revoke(message); + } + case UserPostfix.ObjectMessage: { + return this._objectStore.revoke(message); + } + case UserPostfix.RelationshipMessage: { + return this._relationshipStore.revoke(message); + } case UserPostfix.CastMessage: { return this._castStore.revoke(message); } @@ -722,6 +796,219 @@ class Engine extends TypedEmitter { ); } + /* -------------------------------------------------------------------------- */ + /* Tag Store Methods */ + /* -------------------------------------------------------------------------- */ + + async getTag(fid: number, value: string, target: CastId | string): HubAsyncResult { + const validatedFid = validations.validateFid(fid); + if (validatedFid.isErr()) { + return err(validatedFid.error); + } + + const validatedTarget = validations.validateTarget(target); + if (validatedTarget.isErr()) { + return err(validatedTarget.error); + } + + return ResultAsync.fromPromise(this._tagStore.getTagAdd(fid, value, target), (e) => e as HubError); + } + + async getTagsByFid( + fid: number, + value?: string, + pageOptions: PageOptions = {}, + ): HubAsyncResult> { + const validatedFid = validations.validateFid(fid); + if (validatedFid.isErr()) { + return err(validatedFid.error); + } + + return ResultAsync.fromPromise( + this._tagStore.getTagAddsByFid(fid, value, pageOptions), + (e) => e as HubError, + ); + } + + async getTagsByTarget( + target?: ObjectRef, + fid?: number, + value?: string, + pageOptions: PageOptions = {}, + ): HubAsyncResult> { + + // TODO: further validation + if (!target) { + return err(new HubError('bad_request', 'Target is undefined')) + } + + return ResultAsync.fromPromise( + this._tagStore.getTagsByTarget(target, fid, value, pageOptions), + (e) => e as HubError, + ); + } + + async getAllTagMessagesByFid( + fid: number, + pageOptions: PageOptions = {}, + ): HubAsyncResult> { + const validatedFid = validations.validateFid(fid); + if (validatedFid.isErr()) { + return err(validatedFid.error); + } + + return ResultAsync.fromPromise( + this._tagStore.getAllTagMessagesByFid(fid, pageOptions), + (e) => e as HubError, + ); + } + + /* -------------------------------------------------------------------------- */ + /* Object Store Methods */ + /* -------------------------------------------------------------------------- */ + + async getObject(fid: number, hash: Uint8Array, tagOptions: ObjectTagRequest): HubAsyncResult { + const validatedFid = validations.validateFid(fid); + if (validatedFid.isErr()) { + return err(validatedFid.error); + } + + const { includeTags = false, creatorTagsOnly = true } = tagOptions; + + let object: Message; + const objectRes = await ResultAsync.fromPromise(this._objectStore.getObjectAdd(fid, hash), (e) => e as HubError); + if (objectRes.isErr()) { + return err(new HubError('bad_request', 'failed to fetch object')); + } + object = objectRes._unsafeUnwrap(); + + let tags: Message[] = []; + if (includeTags) { + const tagRes = await ResultAsync.fromPromise(this._tagStore.getTagsByTarget({ + type: ObjectRefTypes.OBJECT, + network: FarcasterNetwork.DEVNET, // H1 network ID? + hash, + fid, + }, creatorTagsOnly ? fid : 0), (e) => e as HubError); + if (objectRes.isErr()) { + return err(new HubError('bad_request', 'failed to fetch object tags')); + } + tags = tagRes._unsafeUnwrap().messages; + } + + const res: ObjectResponse = { + object, + tags, + } + + return ok(res); + } + + async getObjectsByFid( + fid: number, + type?: string, + tagOptions?: ObjectTagRequest, + pageOptions?: PageOptions, + ): HubAsyncResult { + const validatedFid = validations.validateFid(fid); + if (validatedFid.isErr()) { + return err(validatedFid.error); + } + + const { includeTags = false, creatorTagsOnly = true } = tagOptions || {}; + + let objects: MessagesPage; + const objectRes = await ResultAsync.fromPromise( + this._objectStore.getObjectAddsByFid(fid, type, pageOptions), + (e) => e as HubError, + ); + if (objectRes.isErr()) { + return err(new HubError('bad_request', 'failed to fetch object')); + } + objects = objectRes._unsafeUnwrap(); + + let tags: Message[][] = []; + if (includeTags) { + const promises = Promise.all(objects.messages.map((m) => this._tagStore.getTagsByTarget({ + type: ObjectRefTypes.OBJECT, + network: FarcasterNetwork.DEVNET, // H1 network ID? + hash: m.hash, + fid, + }, creatorTagsOnly ? fid : 0))); + const tagRes = await ResultAsync.fromPromise(promises, (e) => e as HubError); + if (tagRes.isErr()) { + return err(new HubError('bad_request', 'failed to fetch object tags')); + } + tags = tagRes._unsafeUnwrap().map((tagsResult) => tagsResult.messages); + } + + const result = ObjectResponseList.create({ + nextPageToken: objects.nextPageToken, + objects: objects.messages.map((obj, i) => + ObjectResponse.create({ + object: obj, + tags: includeTags ? (tags[i] || []) : [] + }) + ), + }) + + return ok(result); + } + + // VLAD-TODO: add getAllObjectMessagesByFid ? + + /* ---------------------------------------------------------------------------- */ + /* Relationship Store Methods */ + /* ---------------------------------------------------------------------------- */ + + async getRelationship(fid: number, hash: Uint8Array): HubAsyncResult { + const validatedFid = validations.validateFid(fid); + if (validatedFid.isErr()) { + return err(validatedFid.error); + } + + return ResultAsync.fromPromise(this._relationshipStore.getRelationshipAdd(fid, hash), (e) => e as HubError); + } + + async getRelationshipsByFid( + fid: number, + type?: string, + pageOptions?: PageOptions, + ): HubAsyncResult> { + const validatedFid = validations.validateFid(fid); + if (validatedFid.isErr()) { + return err(validatedFid.error); + } + + return ResultAsync.fromPromise( + this._relationshipStore.getRelationshipAddsByFid(fid, type, pageOptions), + (e) => e as HubError, + ); + } + + async getRelationshipsByRelatedObjectRef( + refDirection: RefDirection, + relatedObjectRef?: ObjectRef, + type?: string, + pageOptions?: PageOptions, + ): HubAsyncResult> { + if (!relatedObjectRef) { + return err(new HubError('bad_request', 'relatedObjectRef is undefined')) + } + + const validatedObjectRef = validations.validateObjectRef(relatedObjectRef); + if (validatedObjectRef.isErr()) { + return err(validatedObjectRef.error); + } + + return ResultAsync.fromPromise( + this._relationshipStore.getRelationshipsByRelatedObjectRef(relatedObjectRef, refDirection, type, pageOptions), + (e) => e as HubError, + ); + } + + // VLAD-TODO: add getAllRelationshipMessagesByFid ? + /* -------------------------------------------------------------------------- */ /* Verification Store Methods */ /* -------------------------------------------------------------------------- */ diff --git a/apps/hubble/src/storage/stores/objectStore.ts b/apps/hubble/src/storage/stores/objectStore.ts new file mode 100644 index 0000000000..6de2c713cb --- /dev/null +++ b/apps/hubble/src/storage/stores/objectStore.ts @@ -0,0 +1,102 @@ +import { + CastId, + Message, + ObjectAddMessage, + ObjectRemoveMessage, + StoreType, + getDefaultStoreLimit, +} from "@farcaster/hub-nodejs"; +import { + rsCreateObjectStore, + rsGetObjectAdd, + rsGetObjectAddsByFid, + rsGetObjectRemove, + rsGetObjectRemovesByFid, + rustErrorToHubError, +} from "../../rustfunctions.js"; +import StoreEventHandler from "./storeEventHandler.js"; +import { MessagesPage, PageOptions, StorePruneOptions } from "./types.js"; +import { UserPostfix } from "../db/types.js"; +import { ResultAsync } from "neverthrow"; +import RocksDB from "storage/db/rocksdb.js"; +import { RustStoreBase } from "./rustStoreBase.js"; +import { messageDecode } from "../../storage/db/message.js"; + +class ObjectStore extends RustStoreBase { + constructor(db: RocksDB, eventHandler: StoreEventHandler, options: StorePruneOptions = {}) { + const pruneSizeLimit = options.pruneSizeLimit ?? getDefaultStoreLimit(StoreType.OBJECTS); + const objectsStore = rsCreateObjectStore(db.rustDb, eventHandler.getRustStoreEventHandler(), pruneSizeLimit); + + super(db, objectsStore, UserPostfix.ObjectMessage, eventHandler, pruneSizeLimit); + } + + /** Looks up ObjectAdd message by object tsHash */ + async getObjectAdd(fid: number, hash: Uint8Array): Promise { + const hashBytes = Buffer.from(hash); + const result = await ResultAsync.fromPromise( + rsGetObjectAdd(this._rustStore, fid, hashBytes), + rustErrorToHubError, + ); + if (result.isErr()) { + throw result.error; + } + return messageDecode(new Uint8Array(result.value)) as ObjectAddMessage; + } + + /** Looks up ObjectRemove message by target_hash tsHash */ + async getObjectRemove(fid: number, hash: Uint8Array): Promise { + const hashBytes = Buffer.from(hash); + const result = await ResultAsync.fromPromise( + rsGetObjectRemove(this._rustStore, fid, hashBytes), + rustErrorToHubError, + ); + if (result.isErr()) { + throw result.error; + } + return messageDecode(new Uint8Array(result.value)) as ObjectRemoveMessage; + } + + async getObjectAddsByFid( + fid: number, + type?: string, + pageOptions?: PageOptions, + ): Promise> { + const messages_page = await rsGetObjectAddsByFid( + this._rustStore, + fid, + type ?? "", + pageOptions ?? {}, + ); + + const messages = + messages_page.messageBytes?.map((message_bytes) => { + return messageDecode(new Uint8Array(message_bytes)) as ObjectAddMessage; + }) ?? []; + + return { messages, nextPageToken: messages_page.nextPageToken }; + } + + async getObjectRemovesByFid( + fid: number, + value?: string, + pageOptions?: PageOptions, + ): Promise> { + const message_page = await rsGetObjectRemovesByFid(this._rustStore, fid, value ?? "", pageOptions ?? {}); + + const messages = + message_page.messageBytes?.map((message_bytes) => { + return messageDecode(new Uint8Array(message_bytes)) as ObjectRemoveMessage; + }) ?? []; + + return { messages, nextPageToken: message_page.nextPageToken }; + } + + async getAllTagMessagesByFid( + fid: number, + pageOptions: PageOptions = {}, + ): Promise> { + return await this.getAllMessagesByFid(fid, pageOptions); + } +} + +export default ObjectStore; diff --git a/apps/hubble/src/storage/stores/relationshipStore.ts b/apps/hubble/src/storage/stores/relationshipStore.ts new file mode 100644 index 0000000000..4efaa8e1f8 --- /dev/null +++ b/apps/hubble/src/storage/stores/relationshipStore.ts @@ -0,0 +1,126 @@ +import { + RelationshipAddMessage, + RelationshipRemoveMessage, + StoreType, + getDefaultStoreLimit, + ObjectRef, + RefDirection, +} from "@farcaster/hub-nodejs"; +import { + rsCreateRelationshipStore, + rsGetRelationshipAdd, + rsGetRelationshipAddsByFid, + rsGetRelationshipRemove, + rsGetRelationshipRemovesByFid, + rsGetRelationshipsByRelatedObjectRef, + rustErrorToHubError, +} from "../../rustfunctions.js"; +import StoreEventHandler from "./storeEventHandler.js"; +import { MessagesPage, PageOptions, StorePruneOptions } from "./types.js"; +import { UserPostfix } from "../db/types.js"; +import { ResultAsync } from "neverthrow"; +import RocksDB from "storage/db/rocksdb.js"; +import { RustStoreBase } from "./rustStoreBase.js"; +import { messageDecode } from "../../storage/db/message.js"; + +class RelationshipStore extends RustStoreBase { + constructor(db: RocksDB, eventHandler: StoreEventHandler, options: StorePruneOptions = {}) { + const pruneSizeLimit = options.pruneSizeLimit ?? getDefaultStoreLimit(StoreType.RELATIONSHIPS); + const relationshipsStore = rsCreateRelationshipStore(db.rustDb, eventHandler.getRustStoreEventHandler(), pruneSizeLimit); + + super(db, relationshipsStore, UserPostfix.RelationshipMessage, eventHandler, pruneSizeLimit); + } + + /** Looks up RelationshipAdd message by relationship tsHash */ + async getRelationshipAdd(fid: number, hash: Uint8Array): Promise { + const hashBytes = Buffer.from(hash); + const result = await ResultAsync.fromPromise( + rsGetRelationshipAdd(this._rustStore, fid, hashBytes), + rustErrorToHubError, + ); + if (result.isErr()) { + throw result.error; + } + return messageDecode(new Uint8Array(result.value)) as RelationshipAddMessage; + } + + /** Looks up RelationshipRemove message by target_hash tsHash */ + async getRelationshipRemove(fid: number, hash: Uint8Array): Promise { + const hashBytes = Buffer.from(hash); + const result = await ResultAsync.fromPromise( + rsGetRelationshipRemove(this._rustStore, fid, hashBytes), + rustErrorToHubError, + ); + if (result.isErr()) { + throw result.error; + } + return messageDecode(new Uint8Array(result.value)) as RelationshipRemoveMessage; + } + + async getRelationshipAddsByFid( + fid: number, + type?: string, + pageOptions?: PageOptions, + ): Promise> { + const messages_page = await rsGetRelationshipAddsByFid( + this._rustStore, + fid, + type ?? "", + pageOptions ?? {}, + ); + + const messages = + messages_page.messageBytes?.map((message_bytes) => { + return messageDecode(new Uint8Array(message_bytes)) as RelationshipAddMessage; + }) ?? []; + + return { messages, nextPageToken: messages_page.nextPageToken }; + } + + async getRelationshipRemovesByFid( + fid: number, + value?: string, + pageOptions?: PageOptions, + ): Promise> { + const message_page = await rsGetRelationshipRemovesByFid(this._rustStore, fid, value ?? "", pageOptions ?? {}); + + const messages = + message_page.messageBytes?.map((message_bytes) => { + return messageDecode(new Uint8Array(message_bytes)) as RelationshipRemoveMessage; + }) ?? []; + + return { messages, nextPageToken: message_page.nextPageToken }; + } + + async getAllRelationshipMessagesByFid( + fid: number, + pageOptions: PageOptions = {}, + ): Promise> { + return await this.getAllMessagesByFid(fid, pageOptions); + } + + async getRelationshipsByRelatedObjectRef( + relatedObjectRef: ObjectRef, + refDirection: RefDirection, + type?: string, + pageOptions?: PageOptions, + ): Promise> { + const relatedObjectRefBuffer = Buffer.from(ObjectRef.encode(relatedObjectRef).finish()); + const messages_page = await rsGetRelationshipsByRelatedObjectRef( + this._rustStore, + relatedObjectRefBuffer, + refDirection, + type ?? "", + pageOptions ?? {}, + ); + + const messages = + messages_page.messageBytes?.map((message_bytes) => { + return messageDecode(new Uint8Array(message_bytes)) as RelationshipAddMessage; + }) ?? []; + + return { messages, nextPageToken: messages_page.nextPageToken }; + } +} + +export default RelationshipStore; diff --git a/apps/hubble/src/storage/stores/storeEventHandler.ts b/apps/hubble/src/storage/stores/storeEventHandler.ts index a78f4f57db..5afef02469 100644 --- a/apps/hubble/src/storage/stores/storeEventHandler.ts +++ b/apps/hubble/src/storage/stores/storeEventHandler.ts @@ -52,8 +52,12 @@ const STORE_TO_SET: Record = { [StoreType.USER_DATA]: UserPostfix.UserDataMessage, [StoreType.VERIFICATIONS]: UserPostfix.VerificationMessage, [StoreType.USERNAME_PROOFS]: UserPostfix.UsernameProofMessage, + [StoreType.TAGS]: UserPostfix.TagMessage, + [StoreType.OBJECTS]: UserPostfix.ObjectMessage, + [StoreType.RELATIONSHIPS]: UserPostfix.RelationshipMessage, }; +// VLAD-TODO: are our new msg types "prunable"? type PrunableMessage = | CastAddMessage | CastRemoveMessage diff --git a/apps/hubble/src/storage/stores/tagStore.test.ts b/apps/hubble/src/storage/stores/tagStore.test.ts new file mode 100644 index 0000000000..c44a1ed343 --- /dev/null +++ b/apps/hubble/src/storage/stores/tagStore.test.ts @@ -0,0 +1,142 @@ +import { + CastId, + Message, + TagAddMessage, + TagRemoveMessage, + StoreType, + getDefaultStoreLimit, +} from "@farcaster/hub-nodejs"; +import { + rsCreateTagStore, + rsGetTagAdd, + rsGetTagAddsByFid, + rsGetTagRemove, + rsGetTagRemovesByFid, + rsGetTagsByTarget, + rustErrorToHubError, +} from "../../rustfunctions.js"; +import StoreEventHandler from "./storeEventHandler.js"; +import { MessagesPage, PageOptions, StorePruneOptions } from "./types.js"; +import { UserPostfix } from "../db/types.js"; +import { ResultAsync } from "neverthrow"; +import RocksDB from "storage/db/rocksdb.js"; +import { RustStoreBase } from "./rustStoreBase.js"; +import { messageDecode } from "../db/message.js"; + +class TagStore extends RustStoreBase { + constructor(db: RocksDB, eventHandler: StoreEventHandler, options: StorePruneOptions = {}) { + const pruneSizeLimit = options.pruneSizeLimit ?? getDefaultStoreLimit(StoreType.REACTIONS); + const rustTagStore = rsCreateTagStore(db.rustDb, eventHandler.getRustStoreEventHandler(), pruneSizeLimit); + + super(db, rustTagStore, UserPostfix.TagMessage, eventHandler, pruneSizeLimit); + } + + async getTagAdd(fid: number, value: string, target: CastId | string): Promise { + let targetCastId = Buffer.from([]); + let targetUrl = ""; + + if (typeof target === "string") { + targetUrl = target; + } else { + targetCastId = Buffer.from(CastId.encode(target).finish()); + } + + const result = await ResultAsync.fromPromise( + rsGetTagAdd(this._rustStore, fid, value, targetCastId, targetUrl), + rustErrorToHubError, + ); + if (result.isErr()) { + throw result.error; + } + return messageDecode(new Uint8Array(result.value)) as TagAddMessage; + } + + async getTagRemove(fid: number, value: string, target: CastId | string): Promise { + let targetCastId = Buffer.from([]); + let targetUrl = ""; + + if (typeof target === "string") { + targetUrl = target; + } else { + targetCastId = Buffer.from(CastId.encode(target).finish()); + } + + const result = await ResultAsync.fromPromise( + rsGetTagRemove(this._rustStore, fid, value, targetCastId, targetUrl), + rustErrorToHubError, + ); + if (result.isErr()) { + throw result.error; + } + return messageDecode(new Uint8Array(result.value)) as TagRemoveMessage; + } + + async getTagAddsByFid( + fid: number, + value: string, + pageOptions?: PageOptions, + ): Promise> { + const messages_page = await rsGetTagAddsByFid(this._rustStore, fid, value, pageOptions ?? {}); + + const messages = + messages_page.messageBytes?.map((message_bytes) => { + return messageDecode(new Uint8Array(message_bytes)) as TagAddMessage; + }) ?? []; + + return { messages, nextPageToken: messages_page.nextPageToken }; + } + + async getTagRemovesByFid( + fid: number, + value: string, + pageOptions?: PageOptions, + ): Promise> { + const message_page = await rsGetTagRemovesByFid(this._rustStore, fid, value, pageOptions ?? {}); + + const messages = + message_page.messageBytes?.map((message_bytes) => { + return messageDecode(new Uint8Array(message_bytes)) as TagRemoveMessage; + }) ?? []; + + return { messages, nextPageToken: message_page.nextPageToken }; + } + + async getAllTagMessagesByFid( + fid: number, + pageOptions: PageOptions = {}, + ): Promise> { + return await this.getAllMessagesByFid(fid, pageOptions); + } + + async getTagsByTarget( + target: CastId | string, + value: string, + pageOptions: PageOptions = {}, + ): Promise> { + let targetCastId = Buffer.from([]); + let targetUrl = ""; + + if (typeof target === "string") { + targetUrl = target; + } else { + targetCastId = Buffer.from(CastId.encode(target).finish()); + } + + const message_page = await rsGetTagsByTarget( + this._rustStore, + targetCastId, + 0, + value, + pageOptions, + ); + + const messages = + message_page.messageBytes?.map((message_bytes) => { + return messageDecode(new Uint8Array(message_bytes)) as TagAddMessage; + }) ?? []; + + return { messages, nextPageToken: message_page.nextPageToken }; + } +} + +export default TagStore; diff --git a/apps/hubble/src/storage/stores/tagStore.ts b/apps/hubble/src/storage/stores/tagStore.ts new file mode 100644 index 0000000000..73f4bf3204 --- /dev/null +++ b/apps/hubble/src/storage/stores/tagStore.ts @@ -0,0 +1,143 @@ +import { + CastId, + Message, + TagAddMessage, + TagRemoveMessage, + StoreType, + getDefaultStoreLimit, + ObjectRef, +} from "@farcaster/hub-nodejs"; +import { + rsCreateTagStore, + rsGetTagAdd, + rsGetTagAddsByFid, + rsGetTagRemove, + rsGetTagRemovesByFid, + rsGetTagsByTarget, + rustErrorToHubError, +} from "../../rustfunctions.js"; +import StoreEventHandler from "./storeEventHandler.js"; +import { MessagesPage, PageOptions, StorePruneOptions } from "./types.js"; +import { UserPostfix } from "../db/types.js"; +import { ResultAsync } from "neverthrow"; +import RocksDB from "storage/db/rocksdb.js"; +import { RustStoreBase } from "./rustStoreBase.js"; +import { messageDecode } from "../../storage/db/message.js"; + +class TagStore extends RustStoreBase { + constructor(db: RocksDB, eventHandler: StoreEventHandler, options: StorePruneOptions = {}) { + const pruneSizeLimit = options.pruneSizeLimit ?? getDefaultStoreLimit(StoreType.TAGS); + const rustTagStore = rsCreateTagStore(db.rustDb, eventHandler.getRustStoreEventHandler(), pruneSizeLimit); + + super(db, rustTagStore, UserPostfix.TagMessage, eventHandler, pruneSizeLimit); + } + + async getTagAdd(fid: number, value: string, target: CastId | string): Promise { + let targetCastId = Buffer.from([]); + let targetUrl = ""; + + if (typeof target === "string") { + targetUrl = target; + } else { + targetCastId = Buffer.from(CastId.encode(target).finish()); + } + + const result = await ResultAsync.fromPromise( + rsGetTagAdd(this._rustStore, fid, value, targetCastId, targetUrl), + rustErrorToHubError, + ); + if (result.isErr()) { + throw result.error; + } + return messageDecode(new Uint8Array(result.value)) as TagAddMessage; + } + + async getTagRemove(fid: number, value: string, target: CastId | string): Promise { + let targetCastId = Buffer.from([]); + let targetUrl = ""; + + if (typeof target === "string") { + targetUrl = target; + } else { + targetCastId = Buffer.from(CastId.encode(target).finish()); + } + + const result = await ResultAsync.fromPromise( + rsGetTagRemove(this._rustStore, fid, value, targetCastId, targetUrl), + rustErrorToHubError, + ); + if (result.isErr()) { + throw result.error; + } + return messageDecode(new Uint8Array(result.value)) as TagRemoveMessage; + } + + async getTagAddsByFid( + fid: number, + value?: string, + pageOptions: PageOptions = {}, + ): Promise> { + const messages_page = await rsGetTagAddsByFid( + this._rustStore, + fid, + value ?? "", + pageOptions ?? {}, + ); + + const messages = + messages_page.messageBytes?.map((message_bytes) => { + return messageDecode(new Uint8Array(message_bytes)) as TagAddMessage; + }) ?? []; + + return { messages, nextPageToken: messages_page.nextPageToken }; + } + + async getTagRemovesByFid( + fid: number, + value?: string, + pageOptions?: PageOptions, + ): Promise> { + const message_page = await rsGetTagRemovesByFid(this._rustStore, fid, value ?? "", pageOptions ?? {}); + + const messages = + message_page.messageBytes?.map((message_bytes) => { + return messageDecode(new Uint8Array(message_bytes)) as TagRemoveMessage; + }) ?? []; + + return { messages, nextPageToken: message_page.nextPageToken }; + } + + async getAllTagMessagesByFid( + fid: number, + pageOptions: PageOptions = {}, + ): Promise> { + return await this.getAllMessagesByFid(fid, pageOptions); + } + + async getTagsByTarget( + target: ObjectRef, + fid?: number, + value?: string, + pageOptions: PageOptions = {}, + ): Promise> { + + const targetBuffer = Buffer.from(ObjectRef.encode(target).finish()); + + const message_page = await rsGetTagsByTarget( + this._rustStore, + targetBuffer, + fid ?? 0, + value ?? "", + pageOptions, + ); + + const messages = + message_page.messageBytes?.map((message_bytes) => { + return messageDecode(new Uint8Array(message_bytes)) as TagAddMessage; + }) ?? []; + + return { messages, nextPageToken: message_page.nextPageToken }; + } +} + +export default TagStore; diff --git a/apps/replicator/src/util.ts b/apps/replicator/src/util.ts index 7655ce8409..32b5cb96f8 100644 --- a/apps/replicator/src/util.ts +++ b/apps/replicator/src/util.ts @@ -256,6 +256,9 @@ export function convertProtobufMessageBodyToJson(message: Message): MessageBodyJ throw new AssertionError("Unexpected FRAME_ACTION message type"); case MessageType.NONE: throw new AssertionError("Message has no type"); + case MessageType.TAG_ADD: + case MessageType.TAG_REMOVE: + // VIC-TODO default: // If we're getting a type error on the line below, it means we've missed a case above. // Did we add a new message type? diff --git a/apps/rings-next/.env.development b/apps/rings-next/.env.development new file mode 100644 index 0000000000..50e25bf902 --- /dev/null +++ b/apps/rings-next/.env.development @@ -0,0 +1,3 @@ +NEXT_PUBLIC_URL=http://localhost:3000 +NEXT_PUBLIC_API_URL=http://localhost:2281 +NEXT_PUBLIC_MOCK_DATA=false diff --git a/apps/rings-next/.eslintrc.js b/apps/rings-next/.eslintrc.js new file mode 100644 index 0000000000..b099ddf476 --- /dev/null +++ b/apps/rings-next/.eslintrc.js @@ -0,0 +1,4 @@ +module.exports = { + extends: 'next', + root: true, +} diff --git a/apps/rings-next/.gitignore b/apps/rings-next/.gitignore new file mode 100644 index 0000000000..87d027d2d5 --- /dev/null +++ b/apps/rings-next/.gitignore @@ -0,0 +1,4 @@ +.vercel +.tamagui +.next +./public/tamagui.css diff --git a/apps/rings-next/api-client.ts b/apps/rings-next/api-client.ts new file mode 100644 index 0000000000..c64c25af32 --- /dev/null +++ b/apps/rings-next/api-client.ts @@ -0,0 +1,251 @@ +import axios, { AxiosResponse } from 'axios'; +import { NETWORK, SignersByFid } from '@farcaster/rings-next/constants'; +import { + ObjectRefTypes, + RefDirection, + ObjectRef, + MessagesResponse, + Message, + ObjectResponse, + TagBody, + makeTagAdd, + RelationshipAddBody, + makeRelationshipAdd, + makeRelationshipRemove, +} from '@farcaster/hub-web'; +import { User, RelationshipTypes } from '@farcaster/rings-next/types.d'; +import { convertHexHash, getObjectRefStoreId, getMessageStoreId } from '@farcaster/rings-next/state/utils'; + +const API_HOST = process.env.NEXT_PUBLIC_API_URL; +const API_URL = `${API_HOST}/v1`; + +const getApiClient = () => { + const client = axios.create({ + baseURL: API_URL, + timeout: 5000, + // headers: { + // 'Authentication': `${sessionToken}`, + // } + }); + + const submitMessage = async (msg: Message) => { + const messageBytes = Buffer.from(Message.encode(msg).finish()); + const result = await client.post('/submitMessage', + messageBytes, { + headers: { "Content-Type": "application/octet-stream" }, + }, + ); + return result.data as MessagesResponse; + }; + + const getRingOwnerRels = async (fid: number) => { + const result = await client.get('/relationshipsByRelatedObjectRef', { + params: { + ref_type: ObjectRefTypes.FID, + object_ref_fid: fid, + ref_direction: RefDirection.TARGET, + type: RelationshipTypes.Owner, + } + }); + return result.data as MessagesResponse; + }; + + const getRingWearerRels = async (fid: number) => { + const result = await client.get('/relationshipsByRelatedObjectRef', { + params: { + ref_type: ObjectRefTypes.FID, + object_ref_fid: fid, + ref_direction: RefDirection.TARGET, + type: RelationshipTypes.Wearer, + } + }); + return result.data as MessagesResponse; + }; + + const getRingWearerRelsByRing = async (ringRef: ObjectRef) => { + const result = await client.get('/relationshipsByRelatedObjectRef', { + params: { + ref_type: ObjectRefTypes.OBJECT, + object_ref_network: NETWORK, + object_ref_fid: ringRef.fid, + object_ref_hash: ringRef.hash, + ref_direction: RefDirection.SOURCE, + type: RelationshipTypes.Wearer, + } + }); + return result.data as MessagesResponse; + } + + const getRingOwnerRelsByRing = async (ringRef: ObjectRef) => { + const result = await client.get('/relationshipsByRelatedObjectRef', { + params: { + ref_type: ObjectRefTypes.OBJECT, + object_ref_network: NETWORK, + object_ref_fid: ringRef.fid, + object_ref_hash: ringRef.hash, + ref_direction: RefDirection.SOURCE, + type: RelationshipTypes.Owner, + } + }); + return result.data as MessagesResponse; + } + + const getRingObject = async (ringRef: ObjectRef) => { + const result = await client.get('/objectById', { + params: { + fid: ringRef.fid, + hash: ringRef.hash, + includeTags: true, + creatorTagsOnly: false, + } + }); + return result.data as ObjectResponse; + } + + return { + getUserDataByFid: async (fid: number) => { + const result = await client.get('/userDataByFid', { + params: { + fid, + } + }); + // TODO: shape the data as a single user object before passing it down + return result.data as MessagesResponse; + }, + getOwnedAndWornRings: async (fid: number) => { + const results = { + rings: [] as Array, + stones: [] as Array, + relationships: [] as Array, + users: [] as Array, + } + const fids: Array = []; + let ownedRingRefs: Array = []; + let wornRingRefs: Array = []; + let ownerRels: Array = []; + let wearerRels: Array = []; + + // Get the owner Relationships for the FID + const ownerResult = await getRingOwnerRels(fid); + ownerRels = ownerResult.messages; + results.relationships.push(...ownerRels); + for (let ownerRel of ownerRels) { + // console.log('Found a ring owner relationship!', ownerRel); + let ringRef = ownerRel.data?.relationshipAddBody?.source as ObjectRef; + ownedRingRefs.push(ringRef); + } + + // Get the wearer Relationships for the FID + const wearerResult = await getRingWearerRels(fid); + wearerRels = wearerResult.messages; + results.relationships.push(...wearerRels); + for (let wearerRel of wearerRels) { + // console.log('Found a ring wearer relationship!', wearerRel); + let ringRef = wearerRel.data?.relationshipAddBody?.source as ObjectRef; + wornRingRefs.push(ringRef); + } + + // For every owned ring, find the ring wearers + for (let ownedRingRef of ownedRingRefs) { + const ownedRingWearerResult = await getRingWearerRelsByRing(ownedRingRef); + const ownedRingWearerRels = ownedRingWearerResult.messages; + wearerRels.push(...ownedRingWearerRels); + } + + // For every worn ring, find the ring owners + for (let wornRingRef of wornRingRefs) { + const wornRingsOwnerResult = await getRingOwnerRelsByRing(wornRingRef); + const wornRingsOwnerRels = wornRingsOwnerResult.messages; + ownerRels.push(...wornRingsOwnerRels); + } + + const uniqueRingRefs = _.uniqBy([...ownedRingRefs, ...wornRingRefs], ref => getObjectRefStoreId(ref)); + + // Having found all relevant ring refs, fetch the ring objects with stone tags + for (let ringRef of uniqueRingRefs) { + // get the ring object + // console.log('Ring hash bytes', hexStringToBytes(ringRef.hash)); + let ringObjResult = await getRingObject(ringRef); + if (!ringObjResult.object?.data) { + continue; + } + let ringObj = ringObjResult.object; + results.rings.push(ringObj); + let ringTags = ringObjResult.tags; + results.stones.push(...ringTags); + console.log('Found a ring!', ringObj, ringTags); + } + + // Dedupe owner and wearer relationships in case some FIDs are participating as both owners and wearers of rings + results.relationships = _.uniqBy([...ownerRels, ...wearerRels], rel => getMessageStoreId(rel)); + + // TODO: figure out whether we're loading in the profiles of all the users involved with the rings + results.users = + // all FIDs involved are the target of the relationships discovered + _.uniq(results.relationships.map(r => r.data?.relationshipAddBody?.target?.fid as number)) + .map((fid: number) => ({ fid } as User)); // lean User objects without profile info for now + return results; + }, + + updateStone: async (fid: number, changes: TagBody) => { + const signer = SignersByFid[fid]; + if (!signer) { + throw new Error(`No signer for fid: ${fid}`); + } + const newStone = await makeTagAdd(changes, { + fid, + network: NETWORK, + }, + signer, + ); + if (newStone.isErr()){ + throw newStone.error; + } + const result = await submitMessage((newStone._unsafeUnwrap())); + return result; + }, + updateRingWearer: async (fid: number, newWearer: RelationshipAddBody, existingWearer: Message | undefined) => { + // TODO: not hooked up and untested + const signer = SignersByFid[fid]; + if (!signer) { + throw new Error(`No signer for fid: ${fid}`); + } + + if (existingWearer) { + const relRemove = { + targetHash: convertHexHash(existingWearer.hash), // API expects binary array hashes, not hex strings + }; + + const removedRelationship = await makeRelationshipRemove(relRemove, { + fid, + network: NETWORK, + }, + signer, + ); + if (removedRelationship.isErr()) { + throw removedRelationship.error; + } + const result = await submitMessage((removedRelationship._unsafeUnwrap())); + } + const newRelationship = await makeRelationshipAdd(newWearer, { + fid, + network: NETWORK, + }, + signer, + ); + if (newRelationship.isErr()) { + throw newRelationship.error; + } + const result = await submitMessage((newRelationship._unsafeUnwrap())); + return { + added: result, + removed: existingWearer, + } + }, + }; +} + +export default () => { + // const sessionToken = localStorage.getItem('session'); + return getApiClient(); +} \ No newline at end of file diff --git a/apps/rings-next/app/frame/[id]/page.tsx b/apps/rings-next/app/frame/[id]/page.tsx new file mode 100644 index 0000000000..de1fc4c64c --- /dev/null +++ b/apps/rings-next/app/frame/[id]/page.tsx @@ -0,0 +1,56 @@ +import type { Metadata } from 'next' +import { getFrameMetadata } from '@coinbase/onchainkit/frame'; +import React from "react"; +import { redirect } from "next/navigation"; + +type Props = { + params: { id: string } +} + +export async function generateMetadata( + { params }: Props, +): Promise { + // read route params + const id = params.id + + const frameMetadata = getFrameMetadata({ + buttons: [ + { + label: `📖 Guide`, + }, + { + label: `👤✅ Check My FID`, + }, + { + action: 'link', + label: `🏆 Dashboard`, + target: `${process.env.NEXT_PUBLIC_URL}/bot/${id}` + }, + ], + image: `${process.env.NEXT_PUBLIC_API_URL}/api/public/images/initial.png`, + postUrl: `${process.env.NEXT_PUBLIC_API_URL}/api/frames/${id}`, + }); + + + return { + metadataBase: new URL(process.env.NEXT_PUBLIC_URL!), + title: 'tipster.bot', + description: '', + openGraph: { + title: 'tipster.bot', + description: '', + images: [`${process.env.NEXT_PUBLIC_API_URL}/api/public/initial.png`], + }, + other: { + ...frameMetadata, + }, + } +} + +export default function Page({ params }: Props) { + redirect("/"); + return ( + <> + + ) +} \ No newline at end of file diff --git a/apps/rings-next/app/layout.tsx b/apps/rings-next/app/layout.tsx new file mode 100644 index 0000000000..2b5508f498 --- /dev/null +++ b/apps/rings-next/app/layout.tsx @@ -0,0 +1,22 @@ +import type { Metadata } from 'next' + +export const metadata: Metadata = { + title: 'tipster.bot', + description: 'A tipping bot built on farcaster.', + viewport: 'width=device-width, initial-scale=1, maximum-scale=1' +} + +export default function RootLayout({ children }: { children: React.ReactNode }) { + return ( + + + + + + + + {children} + + + ) +} \ No newline at end of file diff --git a/apps/rings-next/app/login/page.tsx b/apps/rings-next/app/login/page.tsx new file mode 100644 index 0000000000..dbd67eea31 --- /dev/null +++ b/apps/rings-next/app/login/page.tsx @@ -0,0 +1,39 @@ +'use client'; +import { + H2, + H6, + H4, +} from 'tamagui'; +import { SignInButton } from '@farcaster/auth-kit'; +import Logo from "@farcaster/rings-next/components/logo/Logo"; +import { useAuth } from "@farcaster/rings-next/provider/AuthProvider"; +import Container from "@farcaster/rings-next/components/container/Container"; + +export default function LoginPage() { + const { login } = useAuth(); + + return ( + + + +

tipster.bot

+ +
+ A Tipping Game for Every
+ farcaster community +
+ +

Log in to get started

+ + { login(signInData) }} /> +
+ ) +} diff --git a/apps/rings-next/app/page.tsx b/apps/rings-next/app/page.tsx new file mode 100644 index 0000000000..ee9af08ad5 --- /dev/null +++ b/apps/rings-next/app/page.tsx @@ -0,0 +1,191 @@ +'use client'; +import React, { useEffect, useState } from "react"; +import type { TabsContentProps } from 'tamagui'; +import { YStack, Tabs, H5, Separator, SizableText } from 'tamagui'; +import Title from "@farcaster/rings-next/components/title/Title"; +// import Navbar from "@farcaster/rings-next/components/navbar/Navbar"; +import { useCommonActions } from '@farcaster/rings-next/hooks/useCommonActions'; +import { selectRings } from '@farcaster/rings-next/state/common-selectors'; +import { selectRingsIsLoading } from '@farcaster/rings-next/state/rings/selectors'; +import { useSelector } from 'react-redux'; +import Container from "@farcaster/rings-next/components/container/Container"; +// import data from "@farcaster/rings-next/data"; +import apiClient from "@farcaster/rings-next/api-client"; +import { + ObjectRefTypes, +} from "@farcaster/hub-web"; +import RingCard from "@farcaster/rings-next/components/ring/RingCard"; +import { Ring } from "@farcaster/rings-next/types"; +import Select, { Item as SelectItem } from '@farcaster/rings-next/components/select/Select'; +import { fidItems as fids } from '@farcaster/rings-next/constants'; +import { useFid } from "@farcaster/rings-next/provider/FidProvider"; +import NodeExplorer from "@farcaster/rings-next/components/node-explorer"; + +const TabsContent = (props: TabsContentProps) => { + return ( + + {props.children} + + ) +} + +const RingTableHeading = () => ( + + <Title.Heading width={200} size="$6" marginBottom="$2">Ring</Title.Heading> + <Title.Heading width={150} size="$6" marginBottom="$2">Stone 1</Title.Heading> + <Title.Heading width={150} size="$6" marginBottom="$2">Stone 2</Title.Heading> + <Title.Heading width={150} size="$6" marginBottom="$2">Stone 3</Title.Heading> + <Title.Heading width={150} size="$6" marginBottom="$2">Wearer</Title.Heading> + {/* <Title.Heading size="$7">Your Community TipBots</Title.Heading> */} + +); + +export default function HomePage() { + const { fetchUserRings } = useCommonActions(); + const rings = useSelector(selectRings) as Array; + const isLoading = useSelector(selectRingsIsLoading); + + // .then(r => console.log(r)); + // const bots = useSelector(selectBots); + // const isLoading = useSelector(selectBotsIsLoading); + + const { fid, setFid } = useFid(); + + useEffect(() => { + if (fid > 0) { + fetchUserRings(fid); + } + }, [fid]); + + const ownedRings = rings.filter((r: Ring) => r.owner.fid === fid); + const wornRings = rings.filter((r: Ring) => r.wearer?.fid === fid); + + + return ( + +
You are:
+ { onChange(item!.id)}} disabled={!editable} /> + )}} + /> + + { + return ( + { onChange(item!.id)}} disabled={!editable} /> + )}} + /> + + {/* TODO: Use a input instead of select */} + { + return ( +