Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cfkv #561

Closed
wants to merge 7 commits into from
Closed

cfkv #561

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions ssr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,16 @@ speedate = { version = "0.14.4", optional = true }
urlencoding = "2.1.3"
yral-types = { git = "https://github.com/yral-dapp/yral-common.git", rev = "5e4414a3f1e0644d93f181949d533c6a9991da04" }
yral-qstash-types = { git = "https://github.com/yral-dapp/yral-common.git", rev = "5e4414a3f1e0644d93f181949d533c6a9991da04" }
yral-canisters-client = { git = "https://github.com/yral-dapp/yral-common.git", rev = "5e4414a3f1e0644d93f181949d533c6a9991da04", features = ["full"] }
yral-canisters-common = { git = "https://github.com/yral-dapp/yral-common.git", rev = "5e4414a3f1e0644d93f181949d533c6a9991da04" }
yral-canisters-client = { git = "https://github.com/yral-dapp/yral-common.git", rev = "5e4414a3f1e0644d93f181949d533c6a9991da04", features = [
"full",
] }
yral-canisters-common = { git = "https://github.com/yral-dapp/yral-common.git", rev = "5e4414a3f1e0644d93f181949d533c6a9991da04" }
pulldown-cmark = "0.12.1"
ic-certification = "2.6.0"
ciborium = "0.2.2"
yral-metadata-client = { git = "https://github.com/yral-dapp/yral-metadata", rev = "56e3f1f1f5f452673bee17739520c800c1264295", optional = true }
yral-metadata-types = { git = "https://github.com/yral-dapp/yral-metadata", rev = "56e3f1f1f5f452673bee17739520c800c1264295", optional = true }


[build-dependencies]
tonic-build = { version = "0.12.0", default-features = false, features = [
"prost",
Expand All @@ -133,7 +134,7 @@ hydrate = [
"reqwest/native-tls",
"dep:rand_chacha",
"tonic/codegen",
"speedate"
"speedate",
]
ssr = [
"dep:axum",
Expand Down Expand Up @@ -162,7 +163,7 @@ ssr = [
"tonic/tls-webpki-roots",
"tonic/transport",
"tonic-build/transport",
"speedate"
"speedate",
]
# Fetch mock referral history instead of history via canister
mock-referral-history = ["dep:rand_chacha", "k256/arithmetic"]
Expand Down Expand Up @@ -209,7 +210,13 @@ local-bin = [
"dep:yral-metadata-client",
"dep:yral-metadata-types",
]
local-lib = ["hydrate", "redis-kv", "local-auth", "backend-admin", "yral-canisters-common/local"]
local-lib = [
"hydrate",
"redis-kv",
"local-auth",
"backend-admin",
"yral-canisters-common/local",
]

[package.metadata.leptos]
# The name used by wasm-bindgen/cargo-leptos for the JS/WASM bundle. Defaults to the crate name
Expand Down
3 changes: 3 additions & 0 deletions ssr/src/consts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ pub const CDAO_SWAP_TIME_SECS: u64 = CDAO_SWAP_PRE_READY_TIME_SECS + 150;
pub const ICPUMP_SEARCH_GRPC_URL: &str = "https://prod-yral-icpumpsearch.fly.dev:443";
pub const NSFW_SERVER_URL: &str = "https://prod-yral-nsfw-classification.fly.dev:443";

pub const CF_KV_ML_CACHE_NAMESPACE_ID: &str = "ea145fc839bd42f9bf2d34b950ddbda5";
pub const CLOUDFLARE_ACCOUNT_ID: &str = "a209c523d2d9646cc56227dbe6ce3ede";

pub mod social {
pub const TELEGRAM: &str = "https://t.me/+c-LTX0Cp-ENmMzI1";
pub const DISCORD: &str = "https://discord.gg/GZ9QemnZuj";
Expand Down
106 changes: 50 additions & 56 deletions ssr/src/page/post_view/video_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ use yral_canisters_client::post_cache::{self, NsfwFilter};
use crate::{
consts::USER_CANISTER_ID_STORE,
state::canisters::auth_canisters_store,
utils::{host::show_nsfw_content, posts::FetchCursor},
utils::{
host::show_nsfw_content,
ml_feed::{get_coldstart_feed_paginated, get_posts_ml_feed_cache_paginated},
posts::FetchCursor,
},
};
use yral_canisters_common::{utils::posts::PostDetails, Canisters, Error as CanistersError};

Expand All @@ -22,6 +26,7 @@ pub enum FeedResultType {
PostCache,
MLFeedCache,
MLFeed,
MLFeedColdstart,
}

pub struct FetchVideosRes<'a> {
Expand Down Expand Up @@ -171,50 +176,32 @@ impl<'a, const AUTH: bool> VideoFetchStream<'a, AUTH> {
&self,
chunks: usize,
_allow_nsfw: bool,
video_queue: Vec<PostDetails>,
_video_queue: Vec<PostDetails>,
) -> Result<FetchVideosRes<'a>, ServerFnError> {
#[cfg(feature = "hydrate")]
{
use crate::utils::ml_feed::ml_feed_grpcweb::MLFeed;

let ml_feed: MLFeed = expect_context();

let top_posts = ml_feed
.get_next_feed_coldstart(self.cursor.limit as u32, video_queue)
.await;

let top_posts = match top_posts {
Ok(top_posts) => top_posts,
Err(e) => {
return Err(ServerFnError::new(
format!("Error fetching ml feed: {e:?}",),
));
}
};

let end = false;
let chunk_stream = top_posts
.into_iter()
.map(move |item| self.canisters.get_post_details(item.0, item.1))
.collect::<FuturesOrdered<_>>()
.filter_map(|res| async { res.transpose() })
.chunks(chunks);
let top_posts = get_coldstart_feed_paginated(self.cursor.start, self.cursor.limit).await;

let top_posts = match top_posts {
Ok(top_posts) => top_posts,
Err(e) => {
return Err(ServerFnError::new(
format!("Error fetching ml feed: {e:?}",),
));
}
};

Ok(FetchVideosRes {
posts_stream: Box::pin(chunk_stream),
end,
res_type: FeedResultType::MLFeed,
})
}
let end = false;
let chunk_stream = top_posts
.into_iter()
.map(move |item| self.canisters.get_post_details(item.0, item.1))
.collect::<FuturesOrdered<_>>()
.filter_map(|res| async { res.transpose() })
.chunks(chunks);

#[cfg(not(feature = "hydrate"))]
{
return Ok(FetchVideosRes {
posts_stream: Box::pin(futures::stream::empty()),
end: true,
res_type: FeedResultType::MLFeed,
});
}
Ok(FetchVideosRes {
posts_stream: Box::pin(chunk_stream),
end,
res_type: FeedResultType::MLFeedColdstart,
})
}
}

Expand All @@ -227,25 +214,32 @@ impl<'a> VideoFetchStream<'a, true> {
) -> Result<FetchVideosRes<'a>, ServerFnError> {
let cans_true = self.canisters;

let user_canister = cans_true.authenticated_user().await;
let top_posts_fut =
user_canister.get_ml_feed_cache_paginated(self.cursor.start, self.cursor.limit);
let user_canister_id = cans_true.user_canister();

let top_posts = top_posts_fut.await?;
if top_posts.is_empty() {
// try ml feed once again - with coldstart feed
return self
.fetch_post_uids_ml_feed_coldstart_chunked(chunks, allow_nsfw, video_queue)
.await;
}
let top_posts = match get_posts_ml_feed_cache_paginated(
user_canister_id,
self.cursor.start,
self.cursor.limit,
)
.await
{
Ok(posts) if posts.is_empty() => {
return self
.fetch_post_uids_ml_feed_coldstart_chunked(chunks, allow_nsfw, video_queue)
.await;
}
Ok(posts) => posts,
Err(_) => {
return self
.fetch_post_uids_ml_feed_coldstart_chunked(chunks, allow_nsfw, video_queue)
.await;
}
};

let end = false;
let chunk_stream = top_posts
.into_iter()
.map(move |item| {
self.canisters
.get_post_details(item.canister_id, item.post_id)
})
.map(move |item| self.canisters.get_post_details(item.0, item.1))
.collect::<FuturesOrdered<_>>()
.filter_map(|res| async { res.transpose() })
.chunks(chunks);
Expand Down
45 changes: 23 additions & 22 deletions ssr/src/page/root.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
use candid::Principal;
use leptos::*;
use leptos_router::*;

use crate::{component::spinner::FullScreenSpinner, utils::host::show_cdao_page};
use rand_chacha::{
rand_core::{RngCore, SeedableRng},
ChaCha8Rng,
};
use yral_canisters_common::utils::time::current_epoch;

use crate::{
component::spinner::FullScreenSpinner,
utils::{
host::show_cdao_page,
ml_feed::{get_coldstart_feed_paginated, get_posts_ml_feed_cache_paginated},
},
};

#[server]
async fn get_top_post_id() -> Result<Option<(Principal, u64)>, ServerFnError> {
Expand Down Expand Up @@ -58,28 +69,18 @@ async fn get_top_post_id_mlcache() -> Result<Option<(Principal, u64)>, ServerFnE
return get_top_post_id_mlfeed().await;
}

let user_canister = canisters.individual_user(user_canister_id.unwrap()).await;

let top_items = user_canister
.get_ml_feed_cache_paginated(0, 1)
.await
.unwrap();
if top_items.is_empty() {
return get_top_post_id_mlfeed().await;
let posts = get_posts_ml_feed_cache_paginated(user_canister_id.unwrap(), 0, 1).await;
if let Ok(posts) = posts {
if !posts.is_empty() {
return Ok(Some((posts[0].0, posts[0].1)));
}
}

let Some(top_item) = top_items.first() else {
return Ok(None);
};

Ok(Some((top_item.canister_id, top_item.post_id)))
get_top_post_id_mlfeed().await
}

#[server]
async fn get_top_post_id_mlfeed() -> Result<Option<(Principal, u64)>, ServerFnError> {
use crate::utils::ml_feed::ml_feed_grpc::get_coldstart_feed;

let top_posts_fut = get_coldstart_feed();
let top_posts_fut = get_coldstart_feed_paginated(0, 50);

let top_items = match top_posts_fut.await {
Ok(top_posts) => top_posts,
Expand All @@ -90,9 +91,9 @@ async fn get_top_post_id_mlfeed() -> Result<Option<(Principal, u64)>, ServerFnEr
));
}
};
let Some(top_item) = top_items.first() else {
return Ok(None);
};
let mut rand_gen = ChaCha8Rng::seed_from_u64(current_epoch().as_nanos() as u64);
let rand_num = rand_gen.next_u32() as usize % top_items.len();
let top_item = top_items[rand_num];

Ok(Some((top_item.0, top_item.1)))
}
Expand Down
72 changes: 71 additions & 1 deletion ssr/src/utils/ml_feed/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use crate::consts::ML_FEED_GRPC_URL;
use std::env;

use crate::consts::{self, ML_FEED_GRPC_URL};
use candid::Principal;
use leptos::{server, ServerFnError};
use serde::{Deserialize, Serialize};

use super::types::PostId;

Expand Down Expand Up @@ -240,3 +244,69 @@ pub mod ml_feed_grpc {
.collect())
}
}

#[derive(Serialize, Deserialize, Debug)]
pub struct CustomMlFeedCacheItem {
post_id: u64,
canister_id: String,
video_id: String,
creator_principal_id: String,
}

#[server]
pub async fn get_posts_ml_feed_cache_paginated(
canister_id: Principal,
start: u64,
limit: u64,
) -> Result<Vec<PostId>, ServerFnError> {
get_posts_ml_feed_cache_paginated_impl(canister_id.to_text(), start, limit).await
}

#[server]
pub async fn get_coldstart_feed_paginated(
start: u64,
limit: u64,
) -> Result<Vec<PostId>, ServerFnError> {
get_posts_ml_feed_cache_paginated_impl("global-feed".to_string(), start, limit).await
}

pub async fn get_posts_ml_feed_cache_paginated_impl(
canister_id_str: String,
start: u64,
limit: u64,
) -> Result<Vec<PostId>, ServerFnError> {
let client = reqwest::Client::new();
let account_id = consts::CLOUDFLARE_ACCOUNT_ID;
let namespace_id = consts::CF_KV_ML_CACHE_NAMESPACE_ID;
let api_token = env::var("CF_TOKEN").unwrap();

let url = format!(
"https://api.cloudflare.com/client/v4/accounts/{}/storage/kv/namespaces/{}/values/{}",
account_id, namespace_id, canister_id_str
);

let response = client
.get(&url)
.header("Authorization", format!("Bearer {}", api_token))
.header("Content-Type", "application/json")
.send()
.await?;

if !response.status().is_success() {
return Ok(vec![]);
}

let response = response.json::<Vec<CustomMlFeedCacheItem>>().await.unwrap();

Ok(response
.into_iter()
.skip(start as usize)
.take(limit as usize)
.map(|item| {
(
Principal::from_text(&item.canister_id).unwrap(),
item.post_id,
)
})
.collect::<Vec<PostId>>())
}
Loading