-
Notifications
You must be signed in to change notification settings - Fork 257
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: duplicate read proto #1741
Changes from 19 commits
38fabe3
a84c6f3
6f28835
7bdfd60
f0c7193
136c805
a9f24cb
67eabde
8efe9c4
76d61dd
0811348
a1dff86
7a3f902
2134bd6
1db5a38
c011dd1
187ab41
202c544
f465172
852c0e9
da660a7
7ee194b
3a2d9f5
af2d812
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -1,3 +1,6 @@ | ||||||
use std::collections::HashMap; | ||||||
use std::sync::{Arc, Mutex}; | ||||||
|
||||||
use futures_util::future::join_all; | ||||||
use futures_util::TryFutureExt; | ||||||
use url::Url; | ||||||
|
@@ -11,16 +14,67 @@ | |||||
pub path: String, | ||||||
} | ||||||
|
||||||
pub struct ResourceReader { | ||||||
#[derive(Clone)] | ||||||
pub struct ResourceReader<A>(A); | ||||||
|
||||||
impl<A: ResourceReaderHandler + Send + Sync> ResourceReader<A> { | ||||||
/// Reads all the files in parallel | ||||||
pub async fn read_files<T: ToString + Send + Sync>( | ||||||
&self, | ||||||
files: &[T], | ||||||
) -> anyhow::Result<Vec<FileRead>> { | ||||||
let files = files.iter().map(|x| { | ||||||
self.read_file(x.to_string()) | ||||||
.map_err(|e| e.context(x.to_string())) | ||||||
tusharmath marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
}); | ||||||
let content = join_all(files) | ||||||
.await | ||||||
.into_iter() | ||||||
.collect::<anyhow::Result<Vec<_>>>()?; | ||||||
Ok(content) | ||||||
tusharmath marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
} | ||||||
} | ||||||
tusharmath marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
impl ResourceReader<Direct> { | ||||||
pub fn direct(runtime: TargetRuntime) -> Self { | ||||||
Check failure on line 39 in src/resource_reader.rs GitHub Actions / Check Examples
Check failure on line 39 in src/resource_reader.rs GitHub Actions / Run Tests (WASM)
Check failure on line 39 in src/resource_reader.rs GitHub Actions / Run Tests on linux-x64-musl
Check failure on line 39 in src/resource_reader.rs GitHub Actions / Run Tests on linux-arm64-gnu
Check failure on line 39 in src/resource_reader.rs GitHub Actions / Run Tests on linux-x64-gnu
Check failure on line 39 in src/resource_reader.rs GitHub Actions / Run Tests on win32-ia32-gnu
Check failure on line 39 in src/resource_reader.rs GitHub Actions / Run Tests on linux-arm64-musl
Check failure on line 39 in src/resource_reader.rs GitHub Actions / Run Tests on linux-ia32-gnu
Check failure on line 39 in src/resource_reader.rs GitHub Actions / Run Formatter and Lint Check
Check failure on line 39 in src/resource_reader.rs GitHub Actions / Run Formatter and Lint Check
Check failure on line 39 in src/resource_reader.rs GitHub Actions / Run Tests on darwin-x64
Check failure on line 39 in src/resource_reader.rs GitHub Actions / Test AWS Lambda Build
Check failure on line 39 in src/resource_reader.rs GitHub Actions / Run Tests on darwin-arm64
Check failure on line 39 in src/resource_reader.rs GitHub Actions / Run Tests on win32-x64-msvc
Check failure on line 39 in src/resource_reader.rs GitHub Actions / Run Tests on win32-arm64-msvc
|
||||||
ResourceReader(Direct::init(runtime)) | ||||||
} | ||||||
} | ||||||
|
||||||
impl ResourceReader<Cached> { | ||||||
pub fn cached(runtime: TargetRuntime) -> Self { | ||||||
ResourceReader(Cached::init(runtime)) | ||||||
} | ||||||
} | ||||||
|
||||||
#[async_trait::async_trait] | ||||||
impl<A: ResourceReaderHandler + Send + Sync> ResourceReaderHandler for ResourceReader<A> { | ||||||
async fn read_file<T: ToString + Send>(&self, file: T) -> anyhow::Result<FileRead> { | ||||||
self.read_file(file).await | ||||||
} | ||||||
} | ||||||
|
||||||
#[async_trait::async_trait] | ||||||
pub trait ResourceReaderHandler { | ||||||
async fn read_file<T: ToString + Send>(&self, file: T) -> anyhow::Result<FileRead>; | ||||||
} | ||||||
|
||||||
/// Reads the files directly from the filesystem or from an HTTP URL | ||||||
#[derive(Clone)] | ||||||
pub struct Direct { | ||||||
runtime: TargetRuntime, | ||||||
} | ||||||
|
||||||
impl ResourceReader { | ||||||
impl Direct { | ||||||
pub fn init(runtime: TargetRuntime) -> Self { | ||||||
Self { runtime } | ||||||
} | ||||||
} | ||||||
|
||||||
#[async_trait::async_trait] | ||||||
impl ResourceReaderHandler for Direct { | ||||||
/// Reads a file from the filesystem or from an HTTP URL | ||||||
pub async fn read_file<T: ToString>(&self, file: T) -> anyhow::Result<FileRead> { | ||||||
async fn read_file<T: ToString + Send>(&self, file: T) -> anyhow::Result<FileRead> { | ||||||
// Is an HTTP URL | ||||||
let content = if let Ok(url) = Url::parse(&file.to_string()) { | ||||||
if url.scheme().starts_with("http") { | ||||||
|
@@ -41,20 +95,49 @@ | |||||
|
||||||
self.runtime.file.read(&file.to_string()).await? | ||||||
}; | ||||||
|
||||||
Ok(FileRead { content, path: file.to_string() }) | ||||||
} | ||||||
} | ||||||
|
||||||
/// Reads all the files in parallel | ||||||
pub async fn read_files<T: ToString>(&self, files: &[T]) -> anyhow::Result<Vec<FileRead>> { | ||||||
let files = files.iter().map(|x| { | ||||||
self.read_file(x.to_string()) | ||||||
.map_err(|e| e.context(x.to_string())) | ||||||
}); | ||||||
let content = join_all(files) | ||||||
.await | ||||||
.into_iter() | ||||||
.collect::<anyhow::Result<Vec<_>>>()?; | ||||||
Ok(content) | ||||||
/// Reads the files from the filesystem or from an HTTP URL with cache | ||||||
#[derive(Clone)] | ||||||
pub struct Cached { | ||||||
direct: Direct, | ||||||
// Cache file content, path -> content | ||||||
cache: Arc<Mutex<HashMap<String, String>>>, | ||||||
} | ||||||
|
||||||
impl Cached { | ||||||
pub fn init(runtime: TargetRuntime) -> Self { | ||||||
Self { direct: Direct::init(runtime), cache: Default::default() } | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Initialize the cache explicitly to ensure it matches the expected type and is thread-safe. - cache: Default::default(),
+ cache: Arc::new(Mutex::new(HashMap::new())), Explicitly initializing the cache as Committable suggestion
Suggested change
|
||||||
} | ||||||
} | ||||||
|
||||||
#[async_trait::async_trait] | ||||||
impl ResourceReaderHandler for Cached { | ||||||
/// Reads a file from the filesystem or from an HTTP URL with cache | ||||||
async fn read_file<T: ToString + Send>(&self, file: T) -> anyhow::Result<FileRead> { | ||||||
// check cache | ||||||
let file_path = file.to_string(); | ||||||
let content = self | ||||||
.cache | ||||||
.as_ref() | ||||||
.lock() | ||||||
.unwrap() | ||||||
tusharmath marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
.get(&file_path) | ||||||
.map(|v| v.to_owned()); | ||||||
let content = if let Some(content) = content { | ||||||
content.to_owned() | ||||||
} else { | ||||||
let file_read = self.direct.read_file(file.to_string()).await?; | ||||||
self.cache | ||||||
.as_ref() | ||||||
.lock() | ||||||
.unwrap() | ||||||
.insert(file_path.to_owned(), file_read.content.clone()); | ||||||
tusharmath marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
file_read.content | ||||||
}; | ||||||
|
||||||
Ok(FileRead { content, path: file_path }) | ||||||
} | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we changing the API here? I don't think it's required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ProtoReader need to share ResourceReader to avoid duplicate reading.