-
Notifications
You must be signed in to change notification settings - Fork 257
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: duplicate read proto #1741
Changes from 22 commits
38fabe3
a84c6f3
6f28835
7bdfd60
f0c7193
136c805
a9f24cb
67eabde
8efe9c4
76d61dd
0811348
a1dff86
7a3f902
2134bd6
1db5a38
c011dd1
187ab41
202c544
f465172
852c0e9
da660a7
7ee194b
3a2d9f5
af2d812
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -1,3 +1,6 @@ | ||||||
use std::collections::HashMap; | ||||||
use std::sync::{Arc, Mutex}; | ||||||
|
||||||
use futures_util::future::join_all; | ||||||
use futures_util::TryFutureExt; | ||||||
use url::Url; | ||||||
|
@@ -11,16 +14,65 @@ | |||||
pub path: String, | ||||||
} | ||||||
|
||||||
pub struct ResourceReader { | ||||||
#[derive(Clone)] | ||||||
pub struct ResourceReader<A>(A); | ||||||
|
||||||
impl<A: Reader + Send + Sync> ResourceReader<A> { | ||||||
/// Reads all the files in parallel | ||||||
pub async fn read_files<T: ToString + Send + Sync>( | ||||||
&self, | ||||||
files: &[T], | ||||||
) -> anyhow::Result<Vec<FileRead>> { | ||||||
let files = files.iter().map(|x| { | ||||||
self.read_file(x.to_string()) | ||||||
.map_err(|e| e.context(x.to_string())) | ||||||
tusharmath marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
}); | ||||||
let content = join_all(files) | ||||||
.await | ||||||
.into_iter() | ||||||
.collect::<anyhow::Result<Vec<_>>>()?; | ||||||
Ok(content) | ||||||
tusharmath marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
} | ||||||
|
||||||
pub async fn read_file<T: ToString + Send>(&self, file: T) -> anyhow::Result<FileRead> { | ||||||
self.0.read(file).await | ||||||
} | ||||||
} | ||||||
|
||||||
impl ResourceReader<Cached> { | ||||||
pub fn cached(runtime: TargetRuntime) -> Self { | ||||||
ResourceReader(Cached::init(runtime)) | ||||||
} | ||||||
} | ||||||
|
||||||
#[async_trait::async_trait] | ||||||
impl<A: Reader + Send + Sync> Reader for ResourceReader<A> { | ||||||
async fn read<T: ToString + Send>(&self, file: T) -> anyhow::Result<FileRead> { | ||||||
self.read(file).await | ||||||
tusharmath marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
} | ||||||
} | ||||||
|
||||||
#[async_trait::async_trait] | ||||||
pub trait Reader { | ||||||
async fn read<T: ToString + Send>(&self, file: T) -> anyhow::Result<FileRead>; | ||||||
} | ||||||
|
||||||
/// Reads the files directly from the filesystem or from an HTTP URL | ||||||
#[derive(Clone)] | ||||||
pub struct Direct { | ||||||
runtime: TargetRuntime, | ||||||
} | ||||||
|
||||||
impl ResourceReader { | ||||||
impl Direct { | ||||||
pub fn init(runtime: TargetRuntime) -> Self { | ||||||
Self { runtime } | ||||||
} | ||||||
} | ||||||
|
||||||
#[async_trait::async_trait] | ||||||
impl Reader for Direct { | ||||||
/// Reads a file from the filesystem or from an HTTP URL | ||||||
pub async fn read_file<T: ToString>(&self, file: T) -> anyhow::Result<FileRead> { | ||||||
async fn read<T: ToString + Send>(&self, file: T) -> anyhow::Result<FileRead> { | ||||||
// Is an HTTP URL | ||||||
let content = if let Ok(url) = Url::parse(&file.to_string()) { | ||||||
if url.scheme().starts_with("http") { | ||||||
|
@@ -41,20 +93,49 @@ | |||||
|
||||||
self.runtime.file.read(&file.to_string()).await? | ||||||
}; | ||||||
|
||||||
Ok(FileRead { content, path: file.to_string() }) | ||||||
} | ||||||
} | ||||||
|
||||||
/// Reads all the files in parallel | ||||||
pub async fn read_files<T: ToString>(&self, files: &[T]) -> anyhow::Result<Vec<FileRead>> { | ||||||
let files = files.iter().map(|x| { | ||||||
self.read_file(x.to_string()) | ||||||
.map_err(|e| e.context(x.to_string())) | ||||||
}); | ||||||
let content = join_all(files) | ||||||
.await | ||||||
.into_iter() | ||||||
.collect::<anyhow::Result<Vec<_>>>()?; | ||||||
Ok(content) | ||||||
/// Reads the files from the filesystem or from an HTTP URL with cache | ||||||
#[derive(Clone)] | ||||||
pub struct Cached { | ||||||
direct: Direct, | ||||||
// Cache file content, path -> content | ||||||
cache: Arc<Mutex<HashMap<String, String>>>, | ||||||
} | ||||||
|
||||||
impl Cached { | ||||||
pub fn init(runtime: TargetRuntime) -> Self { | ||||||
Self { direct: Direct::init(runtime), cache: Default::default() } | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Initialize the cache explicitly to ensure it matches the expected type and is thread-safe. - cache: Default::default(),
+ cache: Arc::new(Mutex::new(HashMap::new())), Explicitly initializing the cache as Committable suggestion
Suggested change
|
||||||
} | ||||||
} | ||||||
|
||||||
#[async_trait::async_trait] | ||||||
impl Reader for Cached { | ||||||
/// Reads a file from the filesystem or from an HTTP URL with cache | ||||||
async fn read<T: ToString + Send>(&self, file: T) -> anyhow::Result<FileRead> { | ||||||
// check cache | ||||||
let file_path = file.to_string(); | ||||||
let content = self | ||||||
.cache | ||||||
.as_ref() | ||||||
.lock() | ||||||
.unwrap() | ||||||
tusharmath marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
.get(&file_path) | ||||||
.map(|v| v.to_owned()); | ||||||
let content = if let Some(content) = content { | ||||||
content.to_owned() | ||||||
} else { | ||||||
let file_read = self.direct.read(file.to_string()).await?; | ||||||
self.cache | ||||||
.as_ref() | ||||||
.lock() | ||||||
.unwrap() | ||||||
.insert(file_path.to_owned(), file_read.content.clone()); | ||||||
file_read.content | ||||||
}; | ||||||
|
||||||
Ok(FileRead { content, path: file_path }) | ||||||
} | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we changing the API here? I don't think it's required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ProtoReader need to share ResourceReader to avoid duplicate reading.