diff --git a/Cargo.lock b/Cargo.lock index d9ac167042ad..476c910908b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -29,6 +29,17 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" +[[package]] +name = "aes" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0" +dependencies = [ + "cfg-if", + "cipher", + "cpufeatures", +] + [[package]] name = "ahash" version = "0.8.11" @@ -875,6 +886,17 @@ dependencies = [ "time", ] +[[package]] +name = "backon" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5289ec98f68f28dd809fd601059e6aa908bb8f6108620930828283d4ee23d7" +dependencies = [ + "fastrand 2.2.0", + "gloo-timers", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.74" @@ -1008,6 +1030,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "block-padding" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8894febbff9f758034a5b8e12d87918f56dfc64a8e1fe757d65e29041538d93" +dependencies = [ + "generic-array", +] + [[package]] name = "bstr" version = "1.5.0" @@ -1082,6 +1113,15 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" +[[package]] +name = "cbc" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26b52a9543ae338f279b96b0b9fed9c8093744685043739079ce85cd58f289a6" +dependencies = [ + "cipher", +] + [[package]] name = "cc" version = "1.1.30" @@ -1163,6 +1203,16 @@ dependencies = [ "half", ] +[[package]] +name = "cipher" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad" +dependencies = [ + "crypto-common", + "inout", +] + [[package]] name = "clang-sys" version = "1.6.1" @@ -2106,6 +2156,12 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" +[[package]] +name = "flagset" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3ea1ec5f8307826a5b71094dd91fc04d4ae75d5709b20ad351c7fb4815c86ec" + [[package]] name = "flate2" version = "1.0.26" @@ -2344,6 +2400,18 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +[[package]] +name = "gloo-timers" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "group" version = "0.12.1" @@ -2499,6 +2567,15 @@ dependencies = [ "digest", ] +[[package]] +name = "home" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589533453244b0995c858700322199b2becb13b627df2851f64a2775d024abcf" +dependencies = [ + "windows-sys 0.59.0", +] + [[package]] name = "hostname" version = "0.4.0" @@ -2964,6 +3041,16 @@ dependencies = [ "libc", ] +[[package]] +name = "inout" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5" +dependencies = [ + "block-padding", + "generic-array", +] + [[package]] name = "instant" version = "0.1.12" @@ -3681,6 +3768,35 @@ version = "11.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" +[[package]] +name = "opendal" +version = "0.50.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb28bb6c64e116ceaf8dd4e87099d3cfea4a58e85e62b104fef74c91afba0f44" +dependencies = [ + "anyhow", + "async-trait", + "backon", + "base64 0.22.1", + "bytes", + "chrono", + "flagset", + "futures", + "getrandom 0.2.11", + "http 1.1.0", + "log", + "md-5", + "once_cell", + "percent-encoding", + "quick-xml 0.36.2", + "reqsign", + "reqwest", + "serde", + "serde_json", + "tokio", + "uuid", +] + [[package]] name = "openssl-probe" version = "0.1.5" @@ -4256,6 +4372,21 @@ dependencies = [ "spki 0.7.3", ] +[[package]] +name = "pkcs5" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e847e2c91a18bfa887dd028ec33f2fe6f25db77db3619024764914affe8b69a6" +dependencies = [ + "aes", + "cbc", + "der 0.7.8", + "pbkdf2", + "scrypt", + "sha2", + "spki 0.7.3", +] + [[package]] name = "pkcs8" version = "0.9.0" @@ -4273,6 +4404,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7" dependencies = [ "der 0.7.8", + "pkcs5", + "rand_core 0.6.4", "spki 0.7.3", ] @@ -4793,6 +4926,16 @@ dependencies = [ "serde", ] +[[package]] +name = "quick-xml" +version = "0.36.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7649a7b4df05aed9ea7ec6f628c67c9953a43869b8bc50929569b2999d443fe" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quote" version = "1.0.37" @@ -5077,6 +5220,7 @@ dependencies = [ "itertools 0.10.5", "metrics", "once_cell", + "opendal", "pin-project-lite", "rand 0.8.5", "reqwest", @@ -5093,6 +5237,34 @@ dependencies = [ "utils", ] +[[package]] +name = "reqsign" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb0075a66c8bfbf4cc8b70dca166e722e1f55a3ea9250ecbb85f4d92a5f64149" +dependencies = [ + "anyhow", + "async-trait", + "base64 0.22.1", + "chrono", + "form_urlencoded", + "getrandom 0.2.11", + "hex", + "hmac", + "home", + "http 1.1.0", + "jsonwebtoken", + "log", + "percent-encoding", + "rand 0.8.5", + "reqwest", + "rsa", + "serde", + "serde_json", + "sha1", + "sha2", +] + [[package]] name = "reqwest" version = "0.12.4" @@ -5291,6 +5463,7 @@ dependencies = [ "pkcs1", "pkcs8 0.10.2", "rand_core 0.6.4", + "sha2", "signature 2.2.0", "spki 0.7.3", "subtle", @@ -5603,6 +5776,15 @@ dependencies = [ "workspace_hack", ] +[[package]] +name = "salsa20" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97a22f5af31f73a954c10289c93e8a50cc23d971e80ee446f1f6f7137a088213" +dependencies = [ + "cipher", +] + [[package]] name = "same-file" version = "1.0.6" @@ -5636,6 +5818,17 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" +[[package]] +name = "scrypt" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0516a385866c09368f0b5bcd1caff3366aace790fcd46e2bb032697bb172fd1f" +dependencies = [ + "pbkdf2", + "salsa20", + "sha2", +] + [[package]] name = "sct" version = "0.7.1" @@ -7639,6 +7832,15 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-sys" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" +dependencies = [ + "windows-targets 0.52.6", +] + [[package]] name = "windows-targets" version = "0.48.0" diff --git a/Cargo.toml b/Cargo.toml index 885f02ba8190..b612ebc8c5de 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -203,6 +203,7 @@ rustls-native-certs = "0.8" x509-parser = "0.16" whoami = "1.5.1" zerocopy = { version = "0.7", features = ["derive"] } +opendal = {version = "0.50.2" } ## TODO replace this with tracing env_logger = "0.10" diff --git a/libs/remote_storage/Cargo.toml b/libs/remote_storage/Cargo.toml index 33fa6e89f501..04caa7f78277 100644 --- a/libs/remote_storage/Cargo.toml +++ b/libs/remote_storage/Cargo.toml @@ -31,6 +31,7 @@ metrics.workspace = true utils.workspace = true pin-project-lite.workspace = true +opendal = { workspace = true, features = ["services-gcs"] } azure_core.workspace = true azure_identity.workspace = true azure_storage.workspace = true diff --git a/libs/remote_storage/src/config.rs b/libs/remote_storage/src/config.rs index dd49d4d5e710..baf84f916567 100644 --- a/libs/remote_storage/src/config.rs +++ b/libs/remote_storage/src/config.rs @@ -180,6 +180,14 @@ impl Debug for AzureConfig { } } +/// Gcs bucket coordinates and access credentials to manage the bucket contents (read and write). +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct GcsConfig { + pub bucket_name: String, + /// A "subfolder" in the bucket, to use the same bucket separately by multiple remote storage users at once. + pub prefix_in_bucket: Option, +} + fn deserialize_storage_class<'de, D: serde::Deserializer<'de>>( deserializer: D, ) -> Result, D::Error> { diff --git a/libs/remote_storage/src/gcs.rs b/libs/remote_storage/src/gcs.rs new file mode 100644 index 000000000000..8aa3aecf28bb --- /dev/null +++ b/libs/remote_storage/src/gcs.rs @@ -0,0 +1,239 @@ +//! Google Cloud Storage wrapper + +use crate::config::GcsConfig; +use crate::{ + Download, DownloadError, DownloadOpts, Listing, ListingMode, ListingObject, RemotePath, + RemoteStorage, StorageMetadata, TimeTravelError, +}; +use anyhow::Result; +use bytes::Bytes; +use futures::Stream; +use futures_util::{SinkExt, StreamExt}; +use opendal::layers::{LoggingLayer, RetryLayer, TimeoutLayer}; +use opendal::Operator; +use std::env; +use std::num::NonZeroU32; +use std::pin::pin; +use std::time::{Duration, SystemTime}; +use tokio_util::sync::CancellationToken; +use tracing::debug; + +/// TODO: features we can implement in the future +/// +/// They are all available at opendal side but need to be integrated into this wrapper. +/// +/// - connection pool size +/// - concurrent limit +/// - max keys per list response +pub struct GoogleCloudStorage { + client: Operator, +} + +impl GoogleCloudStorage { + pub fn new(gcs_config: &GcsConfig, timeout: Duration) -> Result { + debug!( + "Creating gcs remote storage for gcs bucket {}", + gcs_config.bucket_name + ); + + let mut cfg = opendal::services::GcsConfig::default(); + cfg.bucket = gcs_config.bucket_name.clone(); + cfg.root = gcs_config.prefix_in_bucket.clone(); + + // If the `GOOGLE_CLOUD_STORAGE_CREDENTIAL_PATH` env var has an path, use that, + // otherwise let opendal to try gcs well-known path. + if let Ok(credential_path) = env::var("GOOGLE_CLOUD_STORAGE_CREDENTIAL_PATH") { + cfg.credential_path = Some(credential_path); + } + + let client = Operator::from_config(cfg)? + // Setup logging for operator + .layer(LoggingLayer::default()) + // Setup timeout for operator + .layer(TimeoutLayer::new().with_timeout(timeout)) + // Setup retry for operator + .layer(RetryLayer::default().with_jitter()) + .finish(); + + Ok(GoogleCloudStorage { client }) + } +} + +fn to_download_error(error: opendal::Error) -> DownloadError { + use opendal::ErrorKind; + match error.kind() { + ErrorKind::NotFound => DownloadError::NotFound, + ErrorKind::ConditionNotMatch => DownloadError::Unmodified, + _ => DownloadError::Other(error.into()), + } +} + +impl RemoteStorage for GoogleCloudStorage { + fn list_streaming( + &self, + prefix: Option<&RemotePath>, + mode: ListingMode, + max_keys: Option, + _cancel: &CancellationToken, + ) -> impl Stream> + Send { + async_stream::stream! { + let mut lister_builder = self.client.lister_with(prefix.map(|p| p.get_path().as_str()).unwrap_or("/")); + + if matches!(mode, ListingMode::NoDelimiter) { + lister_builder = lister_builder.recursive(true); + } + + if let Some(max_keys) = max_keys { + lister_builder = lister_builder.limit(max_keys.get() as usize); + } + + let lister = lister_builder.await.map_err(to_download_error)?; + // FIXME: list_streaming requires to return a vector instead. + // + // Maybe we can refactor here to return a stream of Object instead of Listing. + let mut lister = lister.chunks(1000); + + while let Some(entries) = lister.next().await { + let mut result = Listing::default(); + for entry in entries { + let entry = entry.map_err(to_download_error)?; + let key = RemotePath::from_string(entry.path()).map_err(|err| DownloadError::Other(err))?; + if entry.metadata().mode().is_dir() { + result.prefixes.push(key); + } else { + result.keys.push(ListingObject { + key, + last_modified: entry.metadata().last_modified().unwrap_or_default().into(), + size: entry.metadata().content_length(), + }); + } + } + + yield Ok(result); + } + } + } + + async fn head_object( + &self, + key: &RemotePath, + _cancel: &CancellationToken, + ) -> Result { + let meta = self + .client + .stat(key.get_path().as_str()) + .await + .map_err(to_download_error)?; + + Ok(ListingObject { + key: key.to_owned(), + last_modified: meta.last_modified().unwrap_or_default().into(), + size: meta.content_length(), + }) + } + + async fn upload( + &self, + mut from: impl Stream> + Send + Sync + 'static, + _data_size_bytes: usize, + to: &RemotePath, + metadata: Option, + _cancel: &CancellationToken, + ) -> Result<()> { + let mut fut = self.client.writer_with(to.get_path().as_str()); + + if let Some(metadata) = metadata { + fut = fut.user_metadata(metadata.0); + } + + let writer = fut.await?; + let mut sink = writer.into_bytes_sink(); + let mut from = pin!(from); + sink.send_all(&mut from).await?; + sink.close().await?; + + Ok(()) + } + + /// ## TODO + /// + /// - OpenDAL doesn't support returning metadata for read so far. Calling stat to get it, tracked at https://github.com/apache/opendal/issues/5425 + /// - OpenDAL doesn't support `if-none-match` on reader yet, tracked at https://github.com/apache/opendal/issues/5426 + async fn download( + &self, + from: &RemotePath, + opts: &DownloadOpts, + _cancel: &CancellationToken, + ) -> Result { + let meta = self + .client + .stat(from.get_path().as_str()) + .await + .map_err(to_download_error)?; + + let fut = self + .client + .reader(from.get_path().as_str()) + .await + .map_err(to_download_error)?; + let stream = fut + .into_bytes_stream((opts.byte_end, opts.byte_end)) + .await + .map_err(to_download_error)?; + + Ok(Download { + download_stream: Box::pin(stream), + last_modified: meta.last_modified().unwrap_or_default().into(), + etag: meta.etag().unwrap_or_default().into(), + metadata: meta + .user_metadata() + .map(|v| StorageMetadata::new(v.clone())), + }) + } + + async fn delete(&self, path: &RemotePath, _cancel: &CancellationToken) -> Result<()> { + Ok(self.client.delete(path.get_path().as_ref()).await?) + } + + async fn delete_objects( + &self, + paths: &[RemotePath], + _cancel: &CancellationToken, + ) -> Result<()> { + Ok(self + .client + .remove(paths.iter().map(|p| p.get_path().to_string()).collect()) + .await?) + } + + fn max_keys_per_delete(&self) -> usize { + self.client + .info() + .full_capability() + .batch_max_operations + .unwrap_or(1) + } + + async fn copy( + &self, + from: &RemotePath, + to: &RemotePath, + _cancel: &CancellationToken, + ) -> Result<()> { + Ok(self + .client + .copy(from.get_path().as_str(), to.get_path().as_str()) + .await?) + } + + /// TODO: We can implement via opendal's list_with(path).version(true).await; + async fn time_travel_recover( + &self, + _prefix: Option<&RemotePath>, + _timestamp: SystemTime, + _done_if_after: SystemTime, + _cancel: &CancellationToken, + ) -> Result<(), TimeTravelError> { + Err(TimeTravelError::Unimplemented) + } +} diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 7a864151ecef..b1d611d91473 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -12,6 +12,7 @@ mod azure_blob; mod config; mod error; +mod gcs; mod local_fs; mod metrics; mod s3_bucket; @@ -689,6 +690,12 @@ impl GenericRemoteStorage { #[derive(Debug, Clone, PartialEq, Eq)] pub struct StorageMetadata(HashMap); +impl StorageMetadata { + pub fn new(map: HashMap) -> Self { + Self(map) + } +} + impl From<[(&str, &str); N]> for StorageMetadata { fn from(arr: [(&str, &str); N]) -> Self { let map: HashMap = arr