Skip to content

Commit

Permalink
Ml feed cache kv v2 (#562)
Browse files Browse the repository at this point in the history
* cfkv

* clippy fix

* cfkv

* cfkv

* cfkv coldstart

* cfkv fis

* cfkv
  • Loading branch information
komal-sai-yral authored Dec 11, 2024
1 parent d9ec5de commit b8a1871
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 85 deletions.
19 changes: 13 additions & 6 deletions ssr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,16 @@ speedate = { version = "0.14.4", optional = true }
urlencoding = "2.1.3"
yral-types = { git = "https://github.com/yral-dapp/yral-common.git", rev = "5e4414a3f1e0644d93f181949d533c6a9991da04" }
yral-qstash-types = { git = "https://github.com/yral-dapp/yral-common.git", rev = "5e4414a3f1e0644d93f181949d533c6a9991da04" }
yral-canisters-client = { git = "https://github.com/yral-dapp/yral-common.git", rev = "5e4414a3f1e0644d93f181949d533c6a9991da04", features = ["full"] }
yral-canisters-common = { git = "https://github.com/yral-dapp/yral-common.git", rev = "5e4414a3f1e0644d93f181949d533c6a9991da04" }
yral-canisters-client = { git = "https://github.com/yral-dapp/yral-common.git", rev = "5e4414a3f1e0644d93f181949d533c6a9991da04", features = [
"full",
] }
yral-canisters-common = { git = "https://github.com/yral-dapp/yral-common.git", rev = "5e4414a3f1e0644d93f181949d533c6a9991da04" }
pulldown-cmark = "0.12.1"
ic-certification = "2.6.0"
ciborium = "0.2.2"
yral-metadata-client = { git = "https://github.com/yral-dapp/yral-metadata", rev = "56e3f1f1f5f452673bee17739520c800c1264295", optional = true }
yral-metadata-types = { git = "https://github.com/yral-dapp/yral-metadata", rev = "56e3f1f1f5f452673bee17739520c800c1264295", optional = true }


[build-dependencies]
tonic-build = { version = "0.12.0", default-features = false, features = [
"prost",
Expand All @@ -133,7 +134,7 @@ hydrate = [
"reqwest/native-tls",
"dep:rand_chacha",
"tonic/codegen",
"speedate"
"speedate",
]
ssr = [
"dep:axum",
Expand Down Expand Up @@ -162,7 +163,7 @@ ssr = [
"tonic/tls-webpki-roots",
"tonic/transport",
"tonic-build/transport",
"speedate"
"speedate",
]
# Fetch mock referral history instead of history via canister
mock-referral-history = ["dep:rand_chacha", "k256/arithmetic"]
Expand Down Expand Up @@ -209,7 +210,13 @@ local-bin = [
"dep:yral-metadata-client",
"dep:yral-metadata-types",
]
local-lib = ["hydrate", "redis-kv", "local-auth", "backend-admin", "yral-canisters-common/local"]
local-lib = [
"hydrate",
"redis-kv",
"local-auth",
"backend-admin",
"yral-canisters-common/local",
]

[package.metadata.leptos]
# The name used by wasm-bindgen/cargo-leptos for the JS/WASM bundle. Defaults to the crate name
Expand Down
3 changes: 3 additions & 0 deletions ssr/src/consts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ pub const CDAO_SWAP_TIME_SECS: u64 = CDAO_SWAP_PRE_READY_TIME_SECS + 150;
pub const ICPUMP_SEARCH_GRPC_URL: &str = "https://prod-yral-icpumpsearch.fly.dev:443";
pub const NSFW_SERVER_URL: &str = "https://prod-yral-nsfw-classification.fly.dev:443";

pub const CF_KV_ML_CACHE_NAMESPACE_ID: &str = "ea145fc839bd42f9bf2d34b950ddbda5";
pub const CLOUDFLARE_ACCOUNT_ID: &str = "a209c523d2d9646cc56227dbe6ce3ede";

pub mod social {
pub const TELEGRAM: &str = "https://t.me/+c-LTX0Cp-ENmMzI1";
pub const DISCORD: &str = "https://discord.gg/GZ9QemnZuj";
Expand Down
106 changes: 50 additions & 56 deletions ssr/src/page/post_view/video_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ use yral_canisters_client::post_cache::{self, NsfwFilter};
use crate::{
consts::USER_CANISTER_ID_STORE,
state::canisters::auth_canisters_store,
utils::{host::show_nsfw_content, posts::FetchCursor},
utils::{
host::show_nsfw_content,
ml_feed::{get_coldstart_feed_paginated, get_posts_ml_feed_cache_paginated},
posts::FetchCursor,
},
};
use yral_canisters_common::{utils::posts::PostDetails, Canisters, Error as CanistersError};

Expand All @@ -22,6 +26,7 @@ pub enum FeedResultType {
PostCache,
MLFeedCache,
MLFeed,
MLFeedColdstart,
}

pub struct FetchVideosRes<'a> {
Expand Down Expand Up @@ -171,50 +176,32 @@ impl<'a, const AUTH: bool> VideoFetchStream<'a, AUTH> {
&self,
chunks: usize,
_allow_nsfw: bool,
video_queue: Vec<PostDetails>,
_video_queue: Vec<PostDetails>,
) -> Result<FetchVideosRes<'a>, ServerFnError> {
#[cfg(feature = "hydrate")]
{
use crate::utils::ml_feed::ml_feed_grpcweb::MLFeed;

let ml_feed: MLFeed = expect_context();

let top_posts = ml_feed
.get_next_feed_coldstart(self.cursor.limit as u32, video_queue)
.await;

let top_posts = match top_posts {
Ok(top_posts) => top_posts,
Err(e) => {
return Err(ServerFnError::new(
format!("Error fetching ml feed: {e:?}",),
));
}
};

let end = false;
let chunk_stream = top_posts
.into_iter()
.map(move |item| self.canisters.get_post_details(item.0, item.1))
.collect::<FuturesOrdered<_>>()
.filter_map(|res| async { res.transpose() })
.chunks(chunks);
let top_posts = get_coldstart_feed_paginated(self.cursor.start, self.cursor.limit).await;

let top_posts = match top_posts {
Ok(top_posts) => top_posts,
Err(e) => {
return Err(ServerFnError::new(
format!("Error fetching ml feed: {e:?}",),
));
}
};

Ok(FetchVideosRes {
posts_stream: Box::pin(chunk_stream),
end,
res_type: FeedResultType::MLFeed,
})
}
let end = false;
let chunk_stream = top_posts
.into_iter()
.map(move |item| self.canisters.get_post_details(item.0, item.1))
.collect::<FuturesOrdered<_>>()
.filter_map(|res| async { res.transpose() })
.chunks(chunks);

#[cfg(not(feature = "hydrate"))]
{
return Ok(FetchVideosRes {
posts_stream: Box::pin(futures::stream::empty()),
end: true,
res_type: FeedResultType::MLFeed,
});
}
Ok(FetchVideosRes {
posts_stream: Box::pin(chunk_stream),
end,
res_type: FeedResultType::MLFeedColdstart,
})
}
}

Expand All @@ -227,25 +214,32 @@ impl<'a> VideoFetchStream<'a, true> {
) -> Result<FetchVideosRes<'a>, ServerFnError> {
let cans_true = self.canisters;

let user_canister = cans_true.authenticated_user().await;
let top_posts_fut =
user_canister.get_ml_feed_cache_paginated(self.cursor.start, self.cursor.limit);
let user_canister_id = cans_true.user_canister();

let top_posts = top_posts_fut.await?;
if top_posts.is_empty() {
// try ml feed once again - with coldstart feed
return self
.fetch_post_uids_ml_feed_coldstart_chunked(chunks, allow_nsfw, video_queue)
.await;
}
let top_posts = match get_posts_ml_feed_cache_paginated(
user_canister_id,
self.cursor.start,
self.cursor.limit,
)
.await
{
Ok(posts) if posts.is_empty() => {
return self
.fetch_post_uids_ml_feed_coldstart_chunked(chunks, allow_nsfw, video_queue)
.await;
}
Ok(posts) => posts,
Err(_) => {
return self
.fetch_post_uids_ml_feed_coldstart_chunked(chunks, allow_nsfw, video_queue)
.await;
}
};

let end = false;
let chunk_stream = top_posts
.into_iter()
.map(move |item| {
self.canisters
.get_post_details(item.canister_id, item.post_id)
})
.map(move |item| self.canisters.get_post_details(item.0, item.1))
.collect::<FuturesOrdered<_>>()
.filter_map(|res| async { res.transpose() })
.chunks(chunks);
Expand Down
45 changes: 23 additions & 22 deletions ssr/src/page/root.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
use candid::Principal;
use leptos::*;
use leptos_router::*;

use crate::{component::spinner::FullScreenSpinner, utils::host::show_cdao_page};
use rand_chacha::{
rand_core::{RngCore, SeedableRng},
ChaCha8Rng,
};
use yral_canisters_common::utils::time::current_epoch;

use crate::{
component::spinner::FullScreenSpinner,
utils::{
host::show_cdao_page,
ml_feed::{get_coldstart_feed_paginated, get_posts_ml_feed_cache_paginated},
},
};

#[server]
async fn get_top_post_id() -> Result<Option<(Principal, u64)>, ServerFnError> {
Expand Down Expand Up @@ -58,28 +69,18 @@ async fn get_top_post_id_mlcache() -> Result<Option<(Principal, u64)>, ServerFnE
return get_top_post_id_mlfeed().await;
}

let user_canister = canisters.individual_user(user_canister_id.unwrap()).await;

let top_items = user_canister
.get_ml_feed_cache_paginated(0, 1)
.await
.unwrap();
if top_items.is_empty() {
return get_top_post_id_mlfeed().await;
let posts = get_posts_ml_feed_cache_paginated(user_canister_id.unwrap(), 0, 1).await;
if let Ok(posts) = posts {
if !posts.is_empty() {
return Ok(Some((posts[0].0, posts[0].1)));
}
}

let Some(top_item) = top_items.first() else {
return Ok(None);
};

Ok(Some((top_item.canister_id, top_item.post_id)))
get_top_post_id_mlfeed().await
}

#[server]
async fn get_top_post_id_mlfeed() -> Result<Option<(Principal, u64)>, ServerFnError> {
use crate::utils::ml_feed::ml_feed_grpc::get_coldstart_feed;

let top_posts_fut = get_coldstart_feed();
let top_posts_fut = get_coldstart_feed_paginated(0, 50);

let top_items = match top_posts_fut.await {
Ok(top_posts) => top_posts,
Expand All @@ -90,9 +91,9 @@ async fn get_top_post_id_mlfeed() -> Result<Option<(Principal, u64)>, ServerFnEr
));
}
};
let Some(top_item) = top_items.first() else {
return Ok(None);
};
let mut rand_gen = ChaCha8Rng::seed_from_u64(current_epoch().as_nanos() as u64);
let rand_num = rand_gen.next_u32() as usize % top_items.len();
let top_item = top_items[rand_num];

Ok(Some((top_item.0, top_item.1)))
}
Expand Down
72 changes: 71 additions & 1 deletion ssr/src/utils/ml_feed/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use crate::consts::ML_FEED_GRPC_URL;
use std::env;

use crate::consts::{self, ML_FEED_GRPC_URL};
use candid::Principal;
use leptos::{server, ServerFnError};
use serde::{Deserialize, Serialize};

use super::types::PostId;

Expand Down Expand Up @@ -240,3 +244,69 @@ pub mod ml_feed_grpc {
.collect())
}
}

#[derive(Serialize, Deserialize, Debug)]
pub struct CustomMlFeedCacheItem {
post_id: u64,
canister_id: String,
video_id: String,
creator_principal_id: String,
}

#[server]
pub async fn get_posts_ml_feed_cache_paginated(
canister_id: Principal,
start: u64,
limit: u64,
) -> Result<Vec<PostId>, ServerFnError> {
get_posts_ml_feed_cache_paginated_impl(canister_id.to_text(), start, limit).await
}

#[server]
pub async fn get_coldstart_feed_paginated(
start: u64,
limit: u64,
) -> Result<Vec<PostId>, ServerFnError> {
get_posts_ml_feed_cache_paginated_impl("global-feed".to_string(), start, limit).await
}

pub async fn get_posts_ml_feed_cache_paginated_impl(
canister_id_str: String,
start: u64,
limit: u64,
) -> Result<Vec<PostId>, ServerFnError> {
let client = reqwest::Client::new();
let account_id = consts::CLOUDFLARE_ACCOUNT_ID;
let namespace_id = consts::CF_KV_ML_CACHE_NAMESPACE_ID;
let api_token = env::var("CF_TOKEN").unwrap();

let url = format!(
"https://api.cloudflare.com/client/v4/accounts/{}/storage/kv/namespaces/{}/values/{}",
account_id, namespace_id, canister_id_str
);

let response = client
.get(&url)
.header("Authorization", format!("Bearer {}", api_token))
.header("Content-Type", "application/json")
.send()
.await?;

if !response.status().is_success() {
return Ok(vec![]);
}

let response = response.json::<Vec<CustomMlFeedCacheItem>>().await.unwrap();

Ok(response
.into_iter()
.skip(start as usize)
.take(limit as usize)
.map(|item| {
(
Principal::from_text(&item.canister_id).unwrap(),
item.post_id,
)
})
.collect::<Vec<PostId>>())
}

0 comments on commit b8a1871

Please sign in to comment.