diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index b81be0c0efad..246f2779dd07 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -855,7 +855,7 @@ impl GetClient for S3Client { } #[async_trait] -impl ListClient for S3Client { +impl ListClient for Arc { /// Make an S3 List request async fn list_request( &self, diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index 7f449c49963c..82ef909de984 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -273,7 +273,7 @@ impl ObjectStore for AmazonS3 { .boxed() } - fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result> { + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { self.client.list(prefix) } @@ -281,7 +281,7 @@ impl ObjectStore for AmazonS3 { &self, prefix: Option<&Path>, offset: &Path, - ) -> BoxStream<'_, Result> { + ) -> BoxStream<'static, Result> { if self.client.config.is_s3_express() { let offset = offset.clone(); // S3 Express does not support start-after diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index bd72d0c6aee1..fa5412c455fc 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -925,7 +925,7 @@ impl GetClient for AzureClient { } #[async_trait] -impl ListClient for AzureClient { +impl ListClient for Arc { /// Make an Azure List request async fn list_request( &self, diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index 81b6667bc058..ea4dd8f567a9 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -119,6 +119,9 @@ impl ObjectStore for MicrosoftAzure { self.client.delete_request(location, &()).await } + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { + self.client.list(prefix) + } fn delete_stream<'a>( &'a self, locations: BoxStream<'a, Result>, @@ -139,10 +142,6 @@ impl ObjectStore for MicrosoftAzure { .boxed() } - fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result> { - self.client.list(prefix) - } - async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { self.client.list_with_delimiter(prefix).await } diff --git a/object_store/src/chunked.rs b/object_store/src/chunked.rs index 3f83c1336dc4..4998e9f2a04d 100644 --- a/object_store/src/chunked.rs +++ b/object_store/src/chunked.rs @@ -150,7 +150,7 @@ impl ObjectStore for ChunkedStore { self.inner.delete(location).await } - fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result> { + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { self.inner.list(prefix) } @@ -158,7 +158,7 @@ impl ObjectStore for ChunkedStore { &self, prefix: Option<&Path>, offset: &Path, - ) -> BoxStream<'_, Result> { + ) -> BoxStream<'static, Result> { self.inner.list_with_offset(prefix, offset) } diff --git a/object_store/src/client/list.rs b/object_store/src/client/list.rs index 4445d0d17533..fe9bfebf768d 100644 --- a/object_store/src/client/list.rs +++ b/object_store/src/client/list.rs @@ -44,37 +44,38 @@ pub(crate) trait ListClientExt { prefix: Option<&Path>, delimiter: bool, offset: Option<&Path>, - ) -> BoxStream<'_, Result>; + ) -> BoxStream<'static, Result>; - fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result>; + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result>; #[allow(unused)] fn list_with_offset( &self, prefix: Option<&Path>, offset: &Path, - ) -> BoxStream<'_, Result>; + ) -> BoxStream<'static, Result>; async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result; } #[async_trait] -impl ListClientExt for T { +impl ListClientExt for T { fn list_paginated( &self, prefix: Option<&Path>, delimiter: bool, offset: Option<&Path>, - ) -> BoxStream<'_, Result> { + ) -> BoxStream<'static, Result> { let offset = offset.map(|x| x.to_string()); let prefix = prefix .filter(|x| !x.as_ref().is_empty()) .map(|p| format!("{}{}", p.as_ref(), crate::path::DELIMITER)); stream_paginated( + self.clone(), (prefix, offset), - move |(prefix, offset), token| async move { - let (r, next_token) = self + move |client, (prefix, offset), token| async move { + let (r, next_token) = client .list_request( prefix.as_deref(), delimiter, @@ -88,7 +89,7 @@ impl ListClientExt for T { .boxed() } - fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result> { + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { self.list_paginated(prefix, false, None) .map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok))) .try_flatten() @@ -99,7 +100,7 @@ impl ListClientExt for T { &self, prefix: Option<&Path>, offset: &Path, - ) -> BoxStream<'_, Result> { + ) -> BoxStream<'static, Result> { self.list_paginated(prefix, false, Some(offset)) .map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok))) .try_flatten() diff --git a/object_store/src/client/pagination.rs b/object_store/src/client/pagination.rs index 77b2a3d8e2f2..d789c7431d8c 100644 --- a/object_store/src/client/pagination.rs +++ b/object_store/src/client/pagination.rs @@ -35,9 +35,14 @@ use std::future::Future; /// finish, otherwise it will continue to call `op(state, token)` with the values returned by the /// previous call to `op`, until a continuation token of `None` is returned /// -pub(crate) fn stream_paginated(state: S, op: F) -> impl Stream> +pub(crate) fn stream_paginated( + client: C, + state: S, + op: F, +) -> impl Stream> where - F: Fn(S, Option) -> Fut + Copy, + C: Clone, + F: Fn(C, S, Option) -> Fut + Copy, Fut: Future)>>, { enum PaginationState { @@ -46,27 +51,30 @@ where Done, } - futures::stream::unfold(PaginationState::Start(state), move |state| async move { - let (s, page_token) = match state { - PaginationState::Start(s) => (s, None), - PaginationState::HasMore(s, page_token) if !page_token.is_empty() => { - (s, Some(page_token)) - } - _ => { - return None; - } - }; + futures::stream::unfold(PaginationState::Start(state), move |state| { + let client = client.clone(); + async move { + let (s, page_token) = match state { + PaginationState::Start(s) => (s, None), + PaginationState::HasMore(s, page_token) if !page_token.is_empty() => { + (s, Some(page_token)) + } + _ => { + return None; + } + }; - let (resp, s, continuation) = match op(s, page_token).await { - Ok(resp) => resp, - Err(e) => return Some((Err(e), PaginationState::Done)), - }; + let (resp, s, continuation) = match op(client, s, page_token).await { + Ok(resp) => resp, + Err(e) => return Some((Err(e), PaginationState::Done)), + }; - let next_state = match continuation { - Some(token) => PaginationState::HasMore(s, token), - None => PaginationState::Done, - }; + let next_state = match continuation { + Some(token) => PaginationState::HasMore(s, token), + None => PaginationState::Done, + }; - Some((Ok(resp), next_state)) + Some((Ok(resp), next_state)) + } }) } diff --git a/object_store/src/gcp/client.rs b/object_store/src/gcp/client.rs index d6f89ca71740..8dd1c69802a8 100644 --- a/object_store/src/gcp/client.rs +++ b/object_store/src/gcp/client.rs @@ -633,7 +633,7 @@ impl GetClient for GoogleCloudStorageClient { } #[async_trait] -impl ListClient for GoogleCloudStorageClient { +impl ListClient for Arc { /// Perform a list request async fn list_request( &self, diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs index 5199135ba6b0..a2f512415a8d 100644 --- a/object_store/src/gcp/mod.rs +++ b/object_store/src/gcp/mod.rs @@ -183,7 +183,7 @@ impl ObjectStore for GoogleCloudStorage { self.client.delete_request(location).await } - fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result> { + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { self.client.list(prefix) } @@ -191,7 +191,7 @@ impl ObjectStore for GoogleCloudStorage { &self, prefix: Option<&Path>, offset: &Path, - ) -> BoxStream<'_, Result> { + ) -> BoxStream<'static, Result> { self.client.list_with_offset(prefix, offset) } diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs index 417f72856722..899740d36db9 100644 --- a/object_store/src/http/mod.rs +++ b/object_store/src/http/mod.rs @@ -31,6 +31,8 @@ //! [rfc2518]: https://datatracker.ietf.org/doc/html/rfc2518 //! [WebDAV]: https://en.wikipedia.org/wiki/WebDAV +use std::sync::Arc; + use async_trait::async_trait; use futures::stream::BoxStream; use futures::{StreamExt, TryStreamExt}; @@ -79,7 +81,7 @@ impl From for crate::Error { /// See [`crate::http`] for more information #[derive(Debug)] pub struct HttpStore { - client: Client, + client: Arc, } impl std::fmt::Display for HttpStore { @@ -130,19 +132,20 @@ impl ObjectStore for HttpStore { self.client.delete(location).await } - fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result> { + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { let prefix_len = prefix.map(|p| p.as_ref().len()).unwrap_or_default(); let prefix = prefix.cloned(); + let client = Arc::clone(&self.client); futures::stream::once(async move { - let status = self.client.list(prefix.as_ref(), "infinity").await?; + let status = client.list(prefix.as_ref(), "infinity").await?; let iter = status .response .into_iter() .filter(|r| !r.is_dir()) - .map(|response| { + .map(move |response| { response.check_ok()?; - response.object_meta(self.client.base_url()) + response.object_meta(client.base_url()) }) // Filter out exact prefix matches .filter_ok(move |r| r.location.as_ref().len() > prefix_len); @@ -238,7 +241,7 @@ impl HttpBuilder { let parsed = Url::parse(&url).map_err(|source| Error::UnableToParseUrl { url, source })?; Ok(HttpStore { - client: Client::new(parsed, self.client_options, self.retry_config)?, + client: Arc::new(Client::new(parsed, self.client_options, self.retry_config)?), }) } } diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 987ffacc6e49..53eda5a82fd5 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -722,7 +722,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static { /// `foo/bar_baz/x`. List is recursive, i.e. `foo/bar/more/x` will be included. /// /// Note: the order of returned [`ObjectMeta`] is not guaranteed - fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result>; + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result>; /// List all the objects with the given prefix and a location greater than `offset` /// @@ -734,7 +734,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static { &self, prefix: Option<&Path>, offset: &Path, - ) -> BoxStream<'_, Result> { + ) -> BoxStream<'static, Result> { let offset = offset.clone(); self.list(prefix) .try_filter(move |f| futures::future::ready(f.location > offset)) @@ -847,7 +847,7 @@ macro_rules! as_ref_impl { self.as_ref().delete_stream(locations) } - fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result> { + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { self.as_ref().list(prefix) } @@ -855,7 +855,7 @@ macro_rules! as_ref_impl { &self, prefix: Option<&Path>, offset: &Path, - ) -> BoxStream<'_, Result> { + ) -> BoxStream<'static, Result> { self.as_ref().list_with_offset(prefix, offset) } diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs index 6a3c3b574e62..77f72a0e11a1 100644 --- a/object_store/src/limit.rs +++ b/object_store/src/limit.rs @@ -45,7 +45,7 @@ use tokio::sync::{OwnedSemaphorePermit, Semaphore}; /// #[derive(Debug)] pub struct LimitStore { - inner: T, + inner: Arc, max_requests: usize, semaphore: Arc, } @@ -56,7 +56,7 @@ impl LimitStore { /// `max_requests` pub fn new(inner: T, max_requests: usize) -> Self { Self { - inner, + inner: Arc::new(inner), max_requests, semaphore: Arc::new(Semaphore::new(max_requests)), } @@ -144,12 +144,13 @@ impl ObjectStore for LimitStore { self.inner.delete_stream(locations) } - fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result> { + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { let prefix = prefix.cloned(); + let inner = Arc::clone(&self.inner); let fut = Arc::clone(&self.semaphore) .acquire_owned() .map(move |permit| { - let s = self.inner.list(prefix.as_ref()); + let s = inner.list(prefix.as_ref()); PermitWrapper::new(s, permit.unwrap()) }); fut.into_stream().flatten().boxed() @@ -159,13 +160,14 @@ impl ObjectStore for LimitStore { &self, prefix: Option<&Path>, offset: &Path, - ) -> BoxStream<'_, Result> { + ) -> BoxStream<'static, Result> { let prefix = prefix.cloned(); let offset = offset.clone(); + let inner = Arc::clone(&self.inner); let fut = Arc::clone(&self.semaphore) .acquire_owned() .map(move |permit| { - let s = self.inner.list_with_offset(prefix.as_ref(), &offset); + let s = inner.list_with_offset(prefix.as_ref(), &offset); PermitWrapper::new(s, permit.unwrap()) }); fut.into_stream().flatten().boxed() diff --git a/object_store/src/local.rs b/object_store/src/local.rs index b193481ae7b8..364026459a03 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -488,7 +488,7 @@ impl ObjectStore for LocalFileSystem { .await } - fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result> { + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { let config = Arc::clone(&self.config); let root_path = match prefix { diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs index 3f3cff3390db..6402f924346f 100644 --- a/object_store/src/memory.rs +++ b/object_store/src/memory.rs @@ -297,7 +297,7 @@ impl ObjectStore for InMemory { Ok(()) } - fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result> { + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { let root = Path::default(); let prefix = prefix.unwrap_or(&root); diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs index 227887d78fd7..a0b67ca4b58e 100644 --- a/object_store/src/prefix.rs +++ b/object_store/src/prefix.rs @@ -74,6 +74,28 @@ impl PrefixStore { } } +// Note: This is a relative hack to move these two functions to pure functions so they don't rely +// on the `self` lifetime. Expected to be cleaned up before merge. +// +/// Strip the constant prefix from a given path +fn strip_prefix(prefix: &Path, path: Path) -> Path { + // Note cannot use match because of borrow checker + if let Some(suffix) = path.prefix_match(prefix) { + return suffix.collect(); + } + path +} + +/// Strip the constant prefix from a given ObjectMeta +fn strip_meta(prefix: &Path, meta: ObjectMeta) -> ObjectMeta { + ObjectMeta { + last_modified: meta.last_modified, + size: meta.size, + location: strip_prefix(prefix, meta.location), + e_tag: meta.e_tag, + version: None, + } +} #[async_trait::async_trait] impl ObjectStore for PrefixStore { async fn put(&self, location: &Path, payload: PutPayload) -> Result { @@ -136,21 +158,23 @@ impl ObjectStore for PrefixStore { self.inner.delete(&full_path).await } - fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result> { + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { let prefix = self.full_path(prefix.unwrap_or(&Path::default())); let s = self.inner.list(Some(&prefix)); - s.map_ok(|meta| self.strip_meta(meta)).boxed() + let slf_prefix = self.prefix.clone(); + s.map_ok(move |meta| strip_meta(&slf_prefix, meta)).boxed() } fn list_with_offset( &self, prefix: Option<&Path>, offset: &Path, - ) -> BoxStream<'_, Result> { + ) -> BoxStream<'static, Result> { let offset = self.full_path(offset); let prefix = self.full_path(prefix.unwrap_or(&Path::default())); let s = self.inner.list_with_offset(Some(&prefix), &offset); - s.map_ok(|meta| self.strip_meta(meta)).boxed() + let slf_prefix = self.prefix.clone(); + s.map_ok(move |meta| strip_meta(&slf_prefix, meta)).boxed() } async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs index b9dff5c6d1d2..29cd32705ccc 100644 --- a/object_store/src/throttle.rs +++ b/object_store/src/throttle.rs @@ -237,11 +237,13 @@ impl ObjectStore for ThrottledStore { self.inner.delete(location).await } - fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result> { + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { let stream = self.inner.list(prefix); + let config = Arc::clone(&self.config); futures::stream::once(async move { - let wait_list_per_entry = self.config().wait_list_per_entry; - sleep(self.config().wait_list_per_call).await; + let config = *config.lock(); + let wait_list_per_entry = config.wait_list_per_entry; + sleep(config.wait_list_per_call).await; throttle_stream(stream, move |_| wait_list_per_entry) }) .flatten() @@ -252,11 +254,13 @@ impl ObjectStore for ThrottledStore { &self, prefix: Option<&Path>, offset: &Path, - ) -> BoxStream<'_, Result> { + ) -> BoxStream<'static, Result> { let stream = self.inner.list_with_offset(prefix, offset); + let config = Arc::clone(&self.config); futures::stream::once(async move { - let wait_list_per_entry = self.config().wait_list_per_entry; - sleep(self.config().wait_list_per_call).await; + let config = *config.lock(); + let wait_list_per_entry = config.wait_list_per_entry; + sleep(config.wait_list_per_call).await; throttle_stream(stream, move |_| wait_list_per_entry) }) .flatten() diff --git a/object_store/tests/get_range_file.rs b/object_store/tests/get_range_file.rs index c5550ac21728..e500fc8ac87d 100644 --- a/object_store/tests/get_range_file.rs +++ b/object_store/tests/get_range_file.rs @@ -62,7 +62,7 @@ impl ObjectStore for MyStore { todo!() } - fn list(&self, _: Option<&Path>) -> BoxStream<'_, Result> { + fn list(&self, _: Option<&Path>) -> BoxStream<'static, Result> { todo!() }