Skip to content

Commit

Permalink
object_store: Migrate from snafu to thiserror (#6266)
Browse files Browse the repository at this point in the history
* object_store: Add `thiserror` dependency

* object_store/memory: Migrate from `snafu` to `thiserror`

* object_store/parse: Migrate from `snafu` to `thiserror`

* object_store/util: Migrate from `snafu` to `thiserror`

* object_store/local: Migrate from `snafu` to `thiserror`

* object_store/delimited: Migrate from `snafu` to `thiserror`

* object_store/path/parts: Migrate from `snafu` to `thiserror`

* object_store/path: Migrate from `snafu` to `thiserror`

* object_store/http: Migrate from `snafu` to `thiserror`

* object_store/client: Migrate from `snafu` to `thiserror`

* object_store/aws: Migrate from `snafu` to `thiserror`

* object_store/azure: Migrate from `snafu` to `thiserror`

* object_store/gcp: Migrate from `snafu` to `thiserror`

* object_store/lib: Migrate from `snafu` to `thiserror`

* Remove `snafu` dependency
  • Loading branch information
Turbo87 authored Jan 2, 2025
1 parent 4a0bdde commit f7263e2
Show file tree
Hide file tree
Showing 24 changed files with 620 additions and 528 deletions.
2 changes: 1 addition & 1 deletion object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ humantime = "2.1"
itertools = "0.13.0"
parking_lot = { version = "0.12" }
percent-encoding = "2.1"
snafu = { version = "0.8", default-features = false, features = ["std", "rust_1_61"] }
thiserror = "2.0.2"
tracing = { version = "0.1" }
url = "2.2"
walkdir = { version = "2", optional = true }
Expand Down
52 changes: 32 additions & 20 deletions object_store/src/aws/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use itertools::Itertools;
use md5::{Digest, Md5};
use reqwest::header::{HeaderMap, HeaderValue};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, Snafu};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -43,46 +42,46 @@ use url::Url;
static DEFAULT_METADATA_ENDPOINT: &str = "http://169.254.169.254";

/// A specialized `Error` for object store-related errors
#[derive(Debug, Snafu)]
#[derive(Debug, thiserror::Error)]
enum Error {
#[snafu(display("Missing bucket name"))]
#[error("Missing bucket name")]
MissingBucketName,

#[snafu(display("Missing AccessKeyId"))]
#[error("Missing AccessKeyId")]
MissingAccessKeyId,

#[snafu(display("Missing SecretAccessKey"))]
#[error("Missing SecretAccessKey")]
MissingSecretAccessKey,

#[snafu(display("Unable parse source url. Url: {}, Error: {}", url, source))]
#[error("Unable parse source url. Url: {}, Error: {}", url, source)]
UnableToParseUrl {
source: url::ParseError,
url: String,
},

#[snafu(display(
#[error(
"Unknown url scheme cannot be parsed into storage location: {}",
scheme
))]
)]
UnknownUrlScheme { scheme: String },

#[snafu(display("URL did not match any known pattern for scheme: {}", url))]
#[error("URL did not match any known pattern for scheme: {}", url)]
UrlNotRecognised { url: String },

#[snafu(display("Configuration key: '{}' is not known.", key))]
#[error("Configuration key: '{}' is not known.", key)]
UnknownConfigurationKey { key: String },

#[snafu(display("Invalid Zone suffix for bucket '{bucket}'"))]
#[error("Invalid Zone suffix for bucket '{bucket}'")]
ZoneSuffix { bucket: String },

#[snafu(display("Invalid encryption type: {}. Valid values are \"AES256\", \"sse:kms\", \"sse:kms:dsse\" and \"sse-c\".", passed))]
#[error("Invalid encryption type: {}. Valid values are \"AES256\", \"sse:kms\", \"sse:kms:dsse\" and \"sse-c\".", passed)]
InvalidEncryptionType { passed: String },

#[snafu(display(
#[error(
"Invalid encryption header values. Header: {}, source: {}",
header,
source
))]
)]
InvalidEncryptionHeader {
header: &'static str,
source: Box<dyn std::error::Error + Send + Sync + 'static>,
Expand Down Expand Up @@ -603,8 +602,15 @@ impl AmazonS3Builder {
/// This is a separate member function to allow fallible computation to
/// be deferred until [`Self::build`] which in turn allows deriving [`Clone`]
fn parse_url(&mut self, url: &str) -> Result<()> {
let parsed = Url::parse(url).context(UnableToParseUrlSnafu { url })?;
let host = parsed.host_str().context(UrlNotRecognisedSnafu { url })?;
let parsed = Url::parse(url).map_err(|source| {
let url = url.into();
Error::UnableToParseUrl { url, source }
})?;

let host = parsed
.host_str()
.ok_or_else(|| Error::UrlNotRecognised { url: url.into() })?;

match parsed.scheme() {
"s3" | "s3a" => self.bucket_name = Some(host.to_string()),
"https" => match host.splitn(4, '.').collect_tuple() {
Expand All @@ -630,9 +636,12 @@ impl AmazonS3Builder {
self.bucket_name = Some(bucket.into());
}
}
_ => return Err(UrlNotRecognisedSnafu { url }.build().into()),
_ => return Err(Error::UrlNotRecognised { url: url.into() }.into()),
},
scheme => return Err(UnknownUrlSchemeSnafu { scheme }.build().into()),
scheme => {
let scheme = scheme.into();
return Err(Error::UnknownUrlScheme { scheme }.into());
}
};
Ok(())
}
Expand Down Expand Up @@ -875,7 +884,7 @@ impl AmazonS3Builder {
self.parse_url(&url)?;
}

let bucket = self.bucket_name.context(MissingBucketNameSnafu)?;
let bucket = self.bucket_name.ok_or(Error::MissingBucketName)?;
let region = self.region.unwrap_or_else(|| "us-east-1".to_string());
let checksum = self.checksum_algorithm.map(|x| x.get()).transpose()?;
let copy_if_not_exists = self.copy_if_not_exists.map(|x| x.get()).transpose()?;
Expand Down Expand Up @@ -957,7 +966,10 @@ impl AmazonS3Builder {

let (session_provider, zonal_endpoint) = match self.s3_express.get()? {
true => {
let zone = parse_bucket_az(&bucket).context(ZoneSuffixSnafu { bucket: &bucket })?;
let zone = parse_bucket_az(&bucket).ok_or_else(|| {
let bucket = bucket.clone();
Error::ZoneSuffix { bucket }
})?;

// https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-express-Regions-and-Zones.html
let endpoint = format!("https://{bucket}.s3express-{zone}.{region}.amazonaws.com");
Expand Down
87 changes: 49 additions & 38 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ use reqwest::{Client as ReqwestClient, Method, RequestBuilder, Response};
use ring::digest;
use ring::digest::Context;
use serde::{Deserialize, Serialize};
use snafu::{ResultExt, Snafu};
use std::sync::Arc;

const VERSION_HEADER: &str = "x-amz-version-id";
Expand All @@ -65,56 +64,56 @@ const USER_DEFINED_METADATA_HEADER_PREFIX: &str = "x-amz-meta-";
const ALGORITHM: &str = "x-amz-checksum-algorithm";

/// A specialized `Error` for object store-related errors
#[derive(Debug, Snafu)]
#[derive(Debug, thiserror::Error)]
pub(crate) enum Error {
#[snafu(display("Error performing DeleteObjects request: {}", source))]
#[error("Error performing DeleteObjects request: {}", source)]
DeleteObjectsRequest { source: crate::client::retry::Error },

#[snafu(display(
#[error(
"DeleteObjects request failed for key {}: {} (code: {})",
path,
message,
code
))]
)]
DeleteFailed {
path: String,
code: String,
message: String,
},

#[snafu(display("Error getting DeleteObjects response body: {}", source))]
#[error("Error getting DeleteObjects response body: {}", source)]
DeleteObjectsResponse { source: reqwest::Error },

#[snafu(display("Got invalid DeleteObjects response: {}", source))]
#[error("Got invalid DeleteObjects response: {}", source)]
InvalidDeleteObjectsResponse {
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},

#[snafu(display("Error performing list request: {}", source))]
#[error("Error performing list request: {}", source)]
ListRequest { source: crate::client::retry::Error },

#[snafu(display("Error getting list response body: {}", source))]
#[error("Error getting list response body: {}", source)]
ListResponseBody { source: reqwest::Error },

#[snafu(display("Error getting create multipart response body: {}", source))]
#[error("Error getting create multipart response body: {}", source)]
CreateMultipartResponseBody { source: reqwest::Error },

#[snafu(display("Error performing complete multipart request: {}: {}", path, source))]
#[error("Error performing complete multipart request: {}: {}", path, source)]
CompleteMultipartRequest {
source: crate::client::retry::Error,
path: String,
},

#[snafu(display("Error getting complete multipart response body: {}", source))]
#[error("Error getting complete multipart response body: {}", source)]
CompleteMultipartResponseBody { source: reqwest::Error },

#[snafu(display("Got invalid list response: {}", source))]
#[error("Got invalid list response: {}", source)]
InvalidListResponse { source: quick_xml::de::DeError },

#[snafu(display("Got invalid multipart response: {}", source))]
#[error("Got invalid multipart response: {}", source)]
InvalidMultipartResponse { source: quick_xml::de::DeError },

#[snafu(display("Unable to extract metadata from headers: {}", source))]
#[error("Unable to extract metadata from headers: {}", source)]
Metadata {
source: crate::client::header::Error,
},
Expand Down Expand Up @@ -263,10 +262,15 @@ impl SessionCredential<'_> {
}
}

#[derive(Debug, Snafu)]
#[derive(Debug, thiserror::Error)]
pub enum RequestError {
#[snafu(context(false))]
Generic { source: crate::Error },
#[error(transparent)]
Generic {
#[from]
source: crate::Error,
},

#[error("Retry")]
Retry {
source: crate::client::retry::Error,
path: String,
Expand Down Expand Up @@ -426,12 +430,16 @@ impl<'a> Request<'a> {
.payload(self.payload)
.send()
.await
.context(RetrySnafu { path })
.map_err(|source| {
let path = path.into();
RequestError::Retry { source, path }
})
}

pub(crate) async fn do_put(self) -> Result<PutResult> {
let response = self.send().await?;
Ok(get_put_result(response.headers(), VERSION_HEADER).context(MetadataSnafu)?)
Ok(get_put_result(response.headers(), VERSION_HEADER)
.map_err(|source| Error::Metadata { source })?)
}
}

Expand Down Expand Up @@ -535,10 +543,10 @@ impl S3Client {
.with_aws_sigv4(credential.authorizer(), Some(digest.as_ref()))
.send_retry(&self.config.retry_config)
.await
.context(DeleteObjectsRequestSnafu {})?
.map_err(|source| Error::DeleteObjectsRequest { source })?
.bytes()
.await
.context(DeleteObjectsResponseSnafu {})?;
.map_err(|source| Error::DeleteObjectsResponse { source })?;

let response: BatchDeleteResponse =
quick_xml::de::from_reader(response.reader()).map_err(|err| {
Expand Down Expand Up @@ -635,10 +643,10 @@ impl S3Client {
.await?
.bytes()
.await
.context(CreateMultipartResponseBodySnafu)?;
.map_err(|source| Error::CreateMultipartResponseBody { source })?;

let response: InitiateMultipartUploadResult =
quick_xml::de::from_reader(response.reader()).context(InvalidMultipartResponseSnafu)?;
let response: InitiateMultipartUploadResult = quick_xml::de::from_reader(response.reader())
.map_err(|source| Error::InvalidMultipartResponse { source })?;

Ok(response.upload_id)
}
Expand Down Expand Up @@ -683,14 +691,14 @@ impl S3Client {
.map(|v| v.to_string());

let e_tag = match is_copy {
false => get_etag(response.headers()).context(MetadataSnafu)?,
false => get_etag(response.headers()).map_err(|source| Error::Metadata { source })?,
true => {
let response = response
.bytes()
.await
.context(CreateMultipartResponseBodySnafu)?;
.map_err(|source| Error::CreateMultipartResponseBody { source })?;
let response: CopyPartResult = quick_xml::de::from_reader(response.reader())
.context(InvalidMultipartResponseSnafu)?;
.map_err(|source| Error::InvalidMultipartResponse { source })?;
response.e_tag
}
};
Expand Down Expand Up @@ -764,19 +772,21 @@ impl S3Client {
.retry_error_body(true)
.send()
.await
.context(CompleteMultipartRequestSnafu {
path: location.as_ref(),
.map_err(|source| Error::CompleteMultipartRequest {
source,
path: location.as_ref().to_string(),
})?;

let version = get_version(response.headers(), VERSION_HEADER).context(MetadataSnafu)?;
let version = get_version(response.headers(), VERSION_HEADER)
.map_err(|source| Error::Metadata { source })?;

let data = response
.bytes()
.await
.context(CompleteMultipartResponseBodySnafu)?;
.map_err(|source| Error::CompleteMultipartResponseBody { source })?;

let response: CompleteMultipartUploadResult =
quick_xml::de::from_reader(data.reader()).context(InvalidMultipartResponseSnafu)?;
let response: CompleteMultipartUploadResult = quick_xml::de::from_reader(data.reader())
.map_err(|source| Error::InvalidMultipartResponse { source })?;

Ok(PutResult {
e_tag: Some(response.e_tag),
Expand Down Expand Up @@ -884,13 +894,14 @@ impl ListClient for S3Client {
.with_aws_sigv4(credential.authorizer(), None)
.send_retry(&self.config.retry_config)
.await
.context(ListRequestSnafu)?
.map_err(|source| Error::ListRequest { source })?
.bytes()
.await
.context(ListResponseBodySnafu)?;
.map_err(|source| Error::ListResponseBody { source })?;

let mut response: ListResponse = quick_xml::de::from_reader(response.reader())
.map_err(|source| Error::InvalidListResponse { source })?;

let mut response: ListResponse =
quick_xml::de::from_reader(response.reader()).context(InvalidListResponseSnafu)?;
let token = response.next_continuation_token.take();

Ok((response.try_into()?, token))
Expand Down
17 changes: 8 additions & 9 deletions object_store/src/aws/credential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,22 @@ use percent_encoding::utf8_percent_encode;
use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION};
use reqwest::{Client, Method, Request, RequestBuilder, StatusCode};
use serde::Deserialize;
use snafu::{ResultExt, Snafu};
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tracing::warn;
use url::Url;

#[derive(Debug, Snafu)]
#[derive(Debug, thiserror::Error)]
#[allow(clippy::enum_variant_names)]
enum Error {
#[snafu(display("Error performing CreateSession request: {source}"))]
#[error("Error performing CreateSession request: {source}")]
CreateSessionRequest { source: crate::client::retry::Error },

#[snafu(display("Error getting CreateSession response: {source}"))]
#[error("Error getting CreateSession response: {source}")]
CreateSessionResponse { source: reqwest::Error },

#[snafu(display("Invalid CreateSessionOutput response: {source}"))]
#[error("Invalid CreateSessionOutput response: {source}")]
CreateSessionOutput { source: quick_xml::DeError },
}

Expand Down Expand Up @@ -726,13 +725,13 @@ impl TokenProvider for SessionProvider {
.with_aws_sigv4(Some(authorizer), None)
.send_retry(retry)
.await
.context(CreateSessionRequestSnafu)?
.map_err(|source| Error::CreateSessionRequest { source })?
.bytes()
.await
.context(CreateSessionResponseSnafu)?;
.map_err(|source| Error::CreateSessionResponse { source })?;

let resp: CreateSessionOutput =
quick_xml::de::from_reader(bytes.reader()).context(CreateSessionOutputSnafu)?;
let resp: CreateSessionOutput = quick_xml::de::from_reader(bytes.reader())
.map_err(|source| Error::CreateSessionOutput { source })?;

let creds = resp.credentials;
Ok(TemporaryToken {
Expand Down
Loading

0 comments on commit f7263e2

Please sign in to comment.