From 78ec72480412a341297c6edd06983c31470e8136 Mon Sep 17 00:00:00 2001 From: samoii Date: Thu, 5 Sep 2024 17:45:18 +0700 Subject: [PATCH 01/12] add aws s3 sse --- .../quickwit-config/src/storage_config.rs | 3 +++ .../object_storage/s3_compatible_storage.rs | 23 ++++++++++++++++--- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/quickwit/quickwit-config/src/storage_config.rs b/quickwit/quickwit-config/src/storage_config.rs index a2485a9cf79..2586759d2c5 100644 --- a/quickwit/quickwit-config/src/storage_config.rs +++ b/quickwit/quickwit-config/src/storage_config.rs @@ -333,6 +333,8 @@ pub struct S3StorageConfig { pub disable_multi_object_delete: bool, #[serde(default)] pub disable_multipart_upload: bool, + #[serde(default)] + pub server_side_encryption: Option, } impl S3StorageConfig { @@ -393,6 +395,7 @@ impl fmt::Debug for S3StorageConfig { "disable_multi_object_delete", &self.disable_multi_object_delete, ) + .field("server_side_encryption", &self.server_side_encryption) .finish() } } diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index 86ef692c671..68423b5d059 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -33,7 +33,7 @@ use aws_sdk_s3::operation::delete_objects::DeleteObjectsOutput; use aws_sdk_s3::operation::get_object::{GetObjectError, GetObjectOutput}; use aws_sdk_s3::primitives::ByteStream; use aws_sdk_s3::types::builders::ObjectIdentifierBuilder; -use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart, Delete, ObjectIdentifier}; +use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart, Delete, ObjectIdentifier, ServerSideEncryption}; use aws_sdk_s3::Client as S3Client; use base64::prelude::{Engine, BASE64_STANDARD}; use futures::{stream, StreamExt}; @@ -91,6 +91,7 @@ pub struct S3CompatibleObjectStorage { retry_params: RetryParams, disable_multi_object_delete: bool, disable_multipart_upload: bool, + server_side_encryption: Option, } impl fmt::Debug for S3CompatibleObjectStorage { @@ -177,6 +178,7 @@ impl S3CompatibleObjectStorage { let retry_params = RetryParams::aggressive(); let disable_multi_object_delete = s3_storage_config.disable_multi_object_delete; let disable_multipart_upload = s3_storage_config.disable_multipart_upload; + let server_side_encryption = s3_storage_config.server_side_encryption.clone(); Ok(Self { s3_client, uri: uri.clone(), @@ -186,6 +188,7 @@ impl S3CompatibleObjectStorage { retry_params, disable_multi_object_delete, disable_multipart_upload, + server_side_encryption, }) } @@ -203,6 +206,7 @@ impl S3CompatibleObjectStorage { retry_params: self.retry_params, disable_multi_object_delete: self.disable_multi_object_delete, disable_multipart_upload: self.disable_multipart_upload, + server_side_encryption: self.server_side_encryption, } } @@ -289,12 +293,20 @@ impl S3CompatibleObjectStorage { .byte_stream() .await .map_err(|io_error| Retry::Permanent(StorageError::from(io_error)))?; - self.s3_client + let mut put_object_request = self.s3_client .put_object() .bucket(bucket) .key(key) .body(body) - .content_length(len as i64) + .content_length(len as i64); + if let Some(_encryption) = &self.server_side_encryption { + put_object_request = match _encryption.as_str() { + "Aes256" => put_object_request.server_side_encryption(ServerSideEncryption::Aes256), + "AwsKms" => put_object_request.server_side_encryption(ServerSideEncryption::AwsKms), + _ => put_object_request, + }; + } + put_object_request .send() .await .map_err(|sdk_error| { @@ -956,6 +968,7 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: false, disable_multipart_upload: false, + server_side_encryption: None, }; assert_eq!( s3_storage.relative_path("indexes/foo"), @@ -1011,6 +1024,7 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: true, disable_multipart_upload: false, + server_side_encryption: None, }; let _ = s3_storage .bulk_delete(&[Path::new("foo"), Path::new("bar")]) @@ -1052,6 +1066,7 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: false, disable_multipart_upload: false, + server_side_encryption: None, }; let _ = s3_storage .bulk_delete(&[Path::new("foo"), Path::new("bar")]) @@ -1134,6 +1149,7 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: false, disable_multipart_upload: false, + server_side_encryption: None, }; let bulk_delete_error = s3_storage .bulk_delete(&[ @@ -1227,6 +1243,7 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: false, disable_multipart_upload: false, + server_side_encryption: None, }; s3_storage .put(Path::new("my-path"), Box::new(vec![1, 2, 3])) From 42045beaf5b18fed435c055eaf04c7d2bf75111d Mon Sep 17 00:00:00 2001 From: samoii Date: Thu, 12 Sep 2024 15:15:26 +0700 Subject: [PATCH 02/12] server_side_encryption use enum varialble --- quickwit/quickwit-config/src/lib.rs | 2 +- quickwit/quickwit-config/src/storage_config.rs | 10 ++++++++-- .../src/object_storage/s3_compatible_storage.rs | 14 ++++++++------ 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/quickwit/quickwit-config/src/lib.rs b/quickwit/quickwit-config/src/lib.rs index 5e256793fcd..4dc1dd36532 100644 --- a/quickwit/quickwit-config/src/lib.rs +++ b/quickwit/quickwit-config/src/lib.rs @@ -80,7 +80,7 @@ pub use crate::node_config::{ use crate::source_config::serialize::{SourceConfigV0_7, SourceConfigV0_8, VersionedSourceConfig}; pub use crate::storage_config::{ AzureStorageConfig, FileStorageConfig, GoogleCloudStorageConfig, RamStorageConfig, - S3StorageConfig, StorageBackend, StorageBackendFlavor, StorageConfig, StorageConfigs, + S3StorageConfig, StorageBackend, StorageBackendFlavor, StorageConfig, StorageConfigs, S3ServerSideEncryption, }; /// Returns true if the ingest API v2 is enabled. diff --git a/quickwit/quickwit-config/src/storage_config.rs b/quickwit/quickwit-config/src/storage_config.rs index 75fbff411de..347e50f9d38 100644 --- a/quickwit/quickwit-config/src/storage_config.rs +++ b/quickwit/quickwit-config/src/storage_config.rs @@ -312,7 +312,13 @@ impl fmt::Debug for AzureStorageConfig { .finish() } } - +#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum S3ServerSideEncryption { + Aes256, + AwsKms, + AwsKmsDsse, +} #[derive(Clone, Default, Eq, PartialEq, Serialize, Deserialize)] #[serde(deny_unknown_fields)] pub struct S3StorageConfig { @@ -335,7 +341,7 @@ pub struct S3StorageConfig { #[serde(default)] pub disable_multipart_upload: bool, #[serde(default)] - pub server_side_encryption: Option, + pub server_side_encryption: Option, } impl S3StorageConfig { diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index 68423b5d059..e8ff2110e19 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -43,7 +43,7 @@ use quickwit_aws::retry::{aws_retry, AwsRetryable}; use quickwit_common::retry::{Retry, RetryParams}; use quickwit_common::uri::Uri; use quickwit_common::{chunk_range, into_u64_range}; -use quickwit_config::S3StorageConfig; +use quickwit_config::{S3StorageConfig, S3ServerSideEncryption}; use regex::Regex; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt, BufReader, ReadBuf}; use tokio::sync::Semaphore; @@ -91,7 +91,8 @@ pub struct S3CompatibleObjectStorage { retry_params: RetryParams, disable_multi_object_delete: bool, disable_multipart_upload: bool, - server_side_encryption: Option, + server_side_encryption: Option, + // server_side_encryption: Option, } impl fmt::Debug for S3CompatibleObjectStorage { @@ -299,10 +300,11 @@ impl S3CompatibleObjectStorage { .key(key) .body(body) .content_length(len as i64); - if let Some(_encryption) = &self.server_side_encryption { - put_object_request = match _encryption.as_str() { - "Aes256" => put_object_request.server_side_encryption(ServerSideEncryption::Aes256), - "AwsKms" => put_object_request.server_side_encryption(ServerSideEncryption::AwsKms), + if let Some(encryption) = &self.server_side_encryption { + put_object_request = match encryption { + S3ServerSideEncryption::Aes256 => put_object_request.server_side_encryption(ServerSideEncryption::Aes256), + S3ServerSideEncryption::AwsKms => put_object_request.server_side_encryption(ServerSideEncryption::AwsKms), + S3ServerSideEncryption::AwsKmsDsse => put_object_request.server_side_encryption(ServerSideEncryption::AwsKmsDsse), _ => put_object_request, }; } From 38af88b39f72a6ee8a3b94077a4a13f66d2a7c86 Mon Sep 17 00:00:00 2001 From: samoii Date: Thu, 12 Sep 2024 17:14:03 +0700 Subject: [PATCH 03/12] remove unused --- .../quickwit-storage/src/object_storage/s3_compatible_storage.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index e8ff2110e19..166658091a9 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -92,7 +92,6 @@ pub struct S3CompatibleObjectStorage { disable_multi_object_delete: bool, disable_multipart_upload: bool, server_side_encryption: Option, - // server_side_encryption: Option, } impl fmt::Debug for S3CompatibleObjectStorage { From b8e5b787dbec038e3456800f2a8e8ffdc97e2b11 Mon Sep 17 00:00:00 2001 From: samoii Date: Fri, 13 Sep 2024 18:48:56 +0700 Subject: [PATCH 04/12] add kms key id variable --- .../quickwit-config/src/storage_config.rs | 3 ++ .../object_storage/s3_compatible_storage.rs | 29 +++++++++++++++++-- 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/quickwit/quickwit-config/src/storage_config.rs b/quickwit/quickwit-config/src/storage_config.rs index 347e50f9d38..c2e5389a8a1 100644 --- a/quickwit/quickwit-config/src/storage_config.rs +++ b/quickwit/quickwit-config/src/storage_config.rs @@ -342,6 +342,8 @@ pub struct S3StorageConfig { pub disable_multipart_upload: bool, #[serde(default)] pub server_side_encryption: Option, + #[serde(default)] + pub sse_kms_key_id: Option, } impl S3StorageConfig { @@ -406,6 +408,7 @@ impl fmt::Debug for S3StorageConfig { &self.disable_multi_object_delete, ) .field("server_side_encryption", &self.server_side_encryption) + .field("sse_kms_key_id", &self.sse_kms_key_id) .finish() } } diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index 166658091a9..799ccb719f4 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -92,6 +92,7 @@ pub struct S3CompatibleObjectStorage { disable_multi_object_delete: bool, disable_multipart_upload: bool, server_side_encryption: Option, + sse_kms_key_id: Option, } impl fmt::Debug for S3CompatibleObjectStorage { @@ -179,6 +180,7 @@ impl S3CompatibleObjectStorage { let disable_multi_object_delete = s3_storage_config.disable_multi_object_delete; let disable_multipart_upload = s3_storage_config.disable_multipart_upload; let server_side_encryption = s3_storage_config.server_side_encryption.clone(); + let sse_kms_key_id = s3_storage_config.sse_kms_key_id.clone(); Ok(Self { s3_client, uri: uri.clone(), @@ -189,6 +191,7 @@ impl S3CompatibleObjectStorage { disable_multi_object_delete, disable_multipart_upload, server_side_encryption, + sse_kms_key_id, }) } @@ -207,6 +210,7 @@ impl S3CompatibleObjectStorage { disable_multi_object_delete: self.disable_multi_object_delete, disable_multipart_upload: self.disable_multipart_upload, server_side_encryption: self.server_side_encryption, + sse_kms_key_id: self.sse_kms_key_id, } } @@ -302,8 +306,24 @@ impl S3CompatibleObjectStorage { if let Some(encryption) = &self.server_side_encryption { put_object_request = match encryption { S3ServerSideEncryption::Aes256 => put_object_request.server_side_encryption(ServerSideEncryption::Aes256), - S3ServerSideEncryption::AwsKms => put_object_request.server_side_encryption(ServerSideEncryption::AwsKms), - S3ServerSideEncryption::AwsKmsDsse => put_object_request.server_side_encryption(ServerSideEncryption::AwsKmsDsse), + S3ServerSideEncryption::AwsKms => { //put_object_request.server_side_encryption(ServerSideEncryption::AwsKms) + if let Some(kms_key_id) = &self.sse_kms_key_id { + put_object_request + .server_side_encryption(ServerSideEncryption::AwsKms) + .ssekms_key_id(kms_key_id) + } else { + put_object_request.server_side_encryption(ServerSideEncryption::AwsKms) // ไม่มี key ID + } + }, + S3ServerSideEncryption::AwsKmsDsse => { //put_object_request.server_side_encryption(ServerSideEncryption::AwsKmsDsse) + if let Some(kms_key_id) = &self.sse_kms_key_id { + put_object_request + .server_side_encryption(ServerSideEncryption::AwsKmsDsse) + .ssekms_key_id(kms_key_id) + } else { + put_object_request.server_side_encryption(ServerSideEncryption::AwsKmsDsse) // ไม่มี key ID + } + } _ => put_object_request, }; } @@ -970,6 +990,7 @@ mod tests { disable_multi_object_delete: false, disable_multipart_upload: false, server_side_encryption: None, + sse_kms_key_id: None, }; assert_eq!( s3_storage.relative_path("indexes/foo"), @@ -1026,6 +1047,7 @@ mod tests { disable_multi_object_delete: true, disable_multipart_upload: false, server_side_encryption: None, + sse_kms_key_id: None, }; let _ = s3_storage .bulk_delete(&[Path::new("foo"), Path::new("bar")]) @@ -1068,6 +1090,7 @@ mod tests { disable_multi_object_delete: false, disable_multipart_upload: false, server_side_encryption: None, + sse_kms_key_id: None, }; let _ = s3_storage .bulk_delete(&[Path::new("foo"), Path::new("bar")]) @@ -1151,6 +1174,7 @@ mod tests { disable_multi_object_delete: false, disable_multipart_upload: false, server_side_encryption: None, + sse_kms_key_id: None, }; let bulk_delete_error = s3_storage .bulk_delete(&[ @@ -1245,6 +1269,7 @@ mod tests { disable_multi_object_delete: false, disable_multipart_upload: false, server_side_encryption: None, + sse_kms_key_id: None, }; s3_storage .put(Path::new("my-path"), Box::new(vec![1, 2, 3])) From 63191473b771b825f138f3fedeff78da26170001 Mon Sep 17 00:00:00 2001 From: samoii Date: Sat, 14 Sep 2024 12:10:36 +0700 Subject: [PATCH 05/12] add kms key id --- .../src/object_storage/s3_compatible_storage.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index 799ccb719f4..d2573caa88b 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -306,26 +306,25 @@ impl S3CompatibleObjectStorage { if let Some(encryption) = &self.server_side_encryption { put_object_request = match encryption { S3ServerSideEncryption::Aes256 => put_object_request.server_side_encryption(ServerSideEncryption::Aes256), - S3ServerSideEncryption::AwsKms => { //put_object_request.server_side_encryption(ServerSideEncryption::AwsKms) + S3ServerSideEncryption::AwsKms => { if let Some(kms_key_id) = &self.sse_kms_key_id { put_object_request .server_side_encryption(ServerSideEncryption::AwsKms) - .ssekms_key_id(kms_key_id) + .ssekms_key_id(kms_key_id) } else { - put_object_request.server_side_encryption(ServerSideEncryption::AwsKms) // ไม่มี key ID + put_object_request.server_side_encryption(ServerSideEncryption::AwsKms) } }, - S3ServerSideEncryption::AwsKmsDsse => { //put_object_request.server_side_encryption(ServerSideEncryption::AwsKmsDsse) + S3ServerSideEncryption::AwsKmsDsse => { if let Some(kms_key_id) = &self.sse_kms_key_id { put_object_request .server_side_encryption(ServerSideEncryption::AwsKmsDsse) - .ssekms_key_id(kms_key_id) + .ssekms_key_id(kms_key_id) } else { - put_object_request.server_side_encryption(ServerSideEncryption::AwsKmsDsse) // ไม่มี key ID + put_object_request.server_side_encryption(ServerSideEncryption::AwsKmsDsse) } } - _ => put_object_request, - }; + } } put_object_request .send() From e6f0c1685516f172c125ac23dabce0c4eb302cf8 Mon Sep 17 00:00:00 2001 From: samoii Date: Sat, 14 Sep 2024 15:25:58 +0700 Subject: [PATCH 06/12] function apply sse --- .../object_storage/s3_compatible_storage.rs | 70 +++++++++++++------ 1 file changed, 49 insertions(+), 21 deletions(-) diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index d2573caa88b..e69c8535fa3 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -286,6 +286,25 @@ impl S3CompatibleObjectStorage { .to_path_buf() } + fn apply_server_side_encryption<'a>( + &'a self, + encryption: Option, + kms_key_id: Option, + ) -> (Option, Option) { + let server_side_encryption = match encryption { + Some(S3ServerSideEncryption::Aes256) => Some(ServerSideEncryption::Aes256), + Some(S3ServerSideEncryption::AwsKms) => Some(ServerSideEncryption::AwsKms), + Some(S3ServerSideEncryption::AwsKmsDsse) => Some(ServerSideEncryption::AwsKmsDsse), + None => None, + }; + let kms_key_id = match server_side_encryption { + Some(ServerSideEncryption::AwsKms) | Some(ServerSideEncryption::AwsKmsDsse) => kms_key_id, + _ => None, + }; + (server_side_encryption, kms_key_id) + } + + async fn put_single_part_single_try<'a>( &'a self, bucket: &'a str, @@ -303,28 +322,37 @@ impl S3CompatibleObjectStorage { .key(key) .body(body) .content_length(len as i64); - if let Some(encryption) = &self.server_side_encryption { - put_object_request = match encryption { - S3ServerSideEncryption::Aes256 => put_object_request.server_side_encryption(ServerSideEncryption::Aes256), - S3ServerSideEncryption::AwsKms => { - if let Some(kms_key_id) = &self.sse_kms_key_id { - put_object_request - .server_side_encryption(ServerSideEncryption::AwsKms) - .ssekms_key_id(kms_key_id) - } else { - put_object_request.server_side_encryption(ServerSideEncryption::AwsKms) - } - }, - S3ServerSideEncryption::AwsKmsDsse => { - if let Some(kms_key_id) = &self.sse_kms_key_id { - put_object_request - .server_side_encryption(ServerSideEncryption::AwsKmsDsse) - .ssekms_key_id(kms_key_id) - } else { - put_object_request.server_side_encryption(ServerSideEncryption::AwsKmsDsse) - } - } + let (server_side_encryption, kms_key_id) = self.apply_server_side_encryption( + self.server_side_encryption.clone(), + self.sse_kms_key_id.clone() + ); + if let Some(encryption) = server_side_encryption { + put_object_request = put_object_request.server_side_encryption(encryption); + + if let Some(kms_key_id) = kms_key_id { + put_object_request = put_object_request.ssekms_key_id(kms_key_id); } + // put_object_request = match encryption { + // S3ServerSideEncryption::Aes256 => put_object_request.server_side_encryption(ServerSideEncryption::Aes256), + // S3ServerSideEncryption::AwsKms => { + // if let Some(kms_key_id) = &self.sse_kms_key_id { + // put_object_request + // .server_side_encryption(ServerSideEncryption::AwsKms) + // .ssekms_key_id(kms_key_id) + // } else { + // put_object_request.server_side_encryption(ServerSideEncryption::AwsKms) + // } + // }, + // S3ServerSideEncryption::AwsKmsDsse => { + // if let Some(kms_key_id) = &self.sse_kms_key_id { + // put_object_request + // .server_side_encryption(ServerSideEncryption::AwsKmsDsse) + // .ssekms_key_id(kms_key_id) + // } else { + // put_object_request.server_side_encryption(ServerSideEncryption::AwsKmsDsse) + // } + // } + // } } put_object_request .send() From 514750588ca9219786bc0740218905796e9ef415 Mon Sep 17 00:00:00 2001 From: samoii Date: Sat, 14 Sep 2024 16:43:29 +0700 Subject: [PATCH 07/12] add sse to multipart upload and edit shown log --- .../quickwit-config/src/storage_config.rs | 6 ++- .../object_storage/s3_compatible_storage.rs | 41 +++++++------------ 2 files changed, 20 insertions(+), 27 deletions(-) diff --git a/quickwit/quickwit-config/src/storage_config.rs b/quickwit/quickwit-config/src/storage_config.rs index c2e5389a8a1..2ee46eadd6b 100644 --- a/quickwit/quickwit-config/src/storage_config.rs +++ b/quickwit/quickwit-config/src/storage_config.rs @@ -394,6 +394,10 @@ impl S3StorageConfig { impl fmt::Debug for S3StorageConfig { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let display_sse_kms_key_id = match &self.server_side_encryption { + Some(S3ServerSideEncryption::AwsKms) | Some(S3ServerSideEncryption::AwsKmsDsse) => &self.sse_kms_key_id, + _ => &None, + }; f.debug_struct("S3StorageConfig") .field("access_key_id", &self.access_key_id) .field( @@ -408,7 +412,7 @@ impl fmt::Debug for S3StorageConfig { &self.disable_multi_object_delete, ) .field("server_side_encryption", &self.server_side_encryption) - .field("sse_kms_key_id", &self.sse_kms_key_id) + .field("sse_kms_key_id", &display_sse_kms_key_id) .finish() } } diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index e69c8535fa3..ee263cec069 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -322,37 +322,15 @@ impl S3CompatibleObjectStorage { .key(key) .body(body) .content_length(len as i64); - let (server_side_encryption, kms_key_id) = self.apply_server_side_encryption( + let (s3_sse, kms_key_id) = self.apply_server_side_encryption( self.server_side_encryption.clone(), self.sse_kms_key_id.clone() ); - if let Some(encryption) = server_side_encryption { + if let Some(encryption) = s3_sse { put_object_request = put_object_request.server_side_encryption(encryption); - if let Some(kms_key_id) = kms_key_id { put_object_request = put_object_request.ssekms_key_id(kms_key_id); } - // put_object_request = match encryption { - // S3ServerSideEncryption::Aes256 => put_object_request.server_side_encryption(ServerSideEncryption::Aes256), - // S3ServerSideEncryption::AwsKms => { - // if let Some(kms_key_id) = &self.sse_kms_key_id { - // put_object_request - // .server_side_encryption(ServerSideEncryption::AwsKms) - // .ssekms_key_id(kms_key_id) - // } else { - // put_object_request.server_side_encryption(ServerSideEncryption::AwsKms) - // } - // }, - // S3ServerSideEncryption::AwsKmsDsse => { - // if let Some(kms_key_id) = &self.sse_kms_key_id { - // put_object_request - // .server_side_encryption(ServerSideEncryption::AwsKmsDsse) - // .ssekms_key_id(kms_key_id) - // } else { - // put_object_request.server_side_encryption(ServerSideEncryption::AwsKmsDsse) - // } - // } - // } } put_object_request .send() @@ -390,10 +368,21 @@ impl S3CompatibleObjectStorage { async fn create_multipart_upload(&self, key: &str) -> StorageResult { let upload_id = aws_retry(&self.retry_params, || async { - self.s3_client + let mut create_multipart_req = self.s3_client .create_multipart_upload() .bucket(self.bucket.clone()) - .key(key) + .key(key); + let (s3_sse, kms_key_id) = self.apply_server_side_encryption( + self.server_side_encryption.clone(), + self.sse_kms_key_id.clone() + ); + if let Some(encryption) = s3_sse { + create_multipart_req = create_multipart_req.server_side_encryption(encryption); + if let Some(kms_key_id) = kms_key_id { + create_multipart_req = create_multipart_req.ssekms_key_id(kms_key_id); + } + } + create_multipart_req .send() .await }) From 2996aee7f5adfd8cbd772b8ee4e988c57cb7603d Mon Sep 17 00:00:00 2001 From: samoii Date: Fri, 20 Sep 2024 19:53:04 +0700 Subject: [PATCH 08/12] fix lint --- quickwit/quickwit-config/src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/quickwit/quickwit-config/src/lib.rs b/quickwit/quickwit-config/src/lib.rs index 4dc1dd36532..22a3ab8212d 100644 --- a/quickwit/quickwit-config/src/lib.rs +++ b/quickwit/quickwit-config/src/lib.rs @@ -80,7 +80,8 @@ pub use crate::node_config::{ use crate::source_config::serialize::{SourceConfigV0_7, SourceConfigV0_8, VersionedSourceConfig}; pub use crate::storage_config::{ AzureStorageConfig, FileStorageConfig, GoogleCloudStorageConfig, RamStorageConfig, - S3StorageConfig, StorageBackend, StorageBackendFlavor, StorageConfig, StorageConfigs, S3ServerSideEncryption, + S3StorageConfig, StorageBackend, StorageBackendFlavor, StorageConfig, StorageConfigs, + S3ServerSideEncryption, }; /// Returns true if the ingest API v2 is enabled. From c5d50d2804bd877399b591f9af2d5a3aef1c3545 Mon Sep 17 00:00:00 2001 From: samoii Date: Fri, 20 Sep 2024 20:16:42 +0700 Subject: [PATCH 09/12] fix lint --- quickwit/quickwit-config/src/lib.rs | 4 +- .../quickwit-config/src/storage_config.rs | 4 +- .../object_storage/s3_compatible_storage.rs | 42 +++++++++---------- 3 files changed, 26 insertions(+), 24 deletions(-) diff --git a/quickwit/quickwit-config/src/lib.rs b/quickwit/quickwit-config/src/lib.rs index 22a3ab8212d..ba085c95ce3 100644 --- a/quickwit/quickwit-config/src/lib.rs +++ b/quickwit/quickwit-config/src/lib.rs @@ -80,8 +80,8 @@ pub use crate::node_config::{ use crate::source_config::serialize::{SourceConfigV0_7, SourceConfigV0_8, VersionedSourceConfig}; pub use crate::storage_config::{ AzureStorageConfig, FileStorageConfig, GoogleCloudStorageConfig, RamStorageConfig, - S3StorageConfig, StorageBackend, StorageBackendFlavor, StorageConfig, StorageConfigs, - S3ServerSideEncryption, + S3ServerSideEncryption, S3StorageConfig, StorageBackend, StorageBackendFlavor, StorageConfig, + StorageConfigs, }; /// Returns true if the ingest API v2 is enabled. diff --git a/quickwit/quickwit-config/src/storage_config.rs b/quickwit/quickwit-config/src/storage_config.rs index 2ee46eadd6b..81776e5aff5 100644 --- a/quickwit/quickwit-config/src/storage_config.rs +++ b/quickwit/quickwit-config/src/storage_config.rs @@ -395,7 +395,9 @@ impl S3StorageConfig { impl fmt::Debug for S3StorageConfig { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let display_sse_kms_key_id = match &self.server_side_encryption { - Some(S3ServerSideEncryption::AwsKms) | Some(S3ServerSideEncryption::AwsKmsDsse) => &self.sse_kms_key_id, + Some(S3ServerSideEncryption::AwsKms) | Some(S3ServerSideEncryption::AwsKmsDsse) => { + &self.sse_kms_key_id + } _ => &None, }; f.debug_struct("S3StorageConfig") diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index ee263cec069..1179bce42b7 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -33,7 +33,9 @@ use aws_sdk_s3::operation::delete_objects::DeleteObjectsOutput; use aws_sdk_s3::operation::get_object::{GetObjectError, GetObjectOutput}; use aws_sdk_s3::primitives::ByteStream; use aws_sdk_s3::types::builders::ObjectIdentifierBuilder; -use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart, Delete, ObjectIdentifier, ServerSideEncryption}; +use aws_sdk_s3::types::{ + CompletedMultipartUpload, CompletedPart, Delete, ObjectIdentifier, ServerSideEncryption +}; use aws_sdk_s3::Client as S3Client; use base64::prelude::{Engine, BASE64_STANDARD}; use futures::{stream, StreamExt}; @@ -43,7 +45,7 @@ use quickwit_aws::retry::{aws_retry, AwsRetryable}; use quickwit_common::retry::{Retry, RetryParams}; use quickwit_common::uri::Uri; use quickwit_common::{chunk_range, into_u64_range}; -use quickwit_config::{S3StorageConfig, S3ServerSideEncryption}; +use quickwit_config::{S3ServerSideEncryption, S3StorageConfig}; use regex::Regex; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt, BufReader, ReadBuf}; use tokio::sync::Semaphore; @@ -298,13 +300,14 @@ impl S3CompatibleObjectStorage { None => None, }; let kms_key_id = match server_side_encryption { - Some(ServerSideEncryption::AwsKms) | Some(ServerSideEncryption::AwsKmsDsse) => kms_key_id, + Some(ServerSideEncryption::AwsKms) | Some(ServerSideEncryption::AwsKmsDsse) => { + kms_key_id + } _ => None, }; (server_side_encryption, kms_key_id) } - async fn put_single_part_single_try<'a>( &'a self, bucket: &'a str, @@ -316,7 +319,8 @@ impl S3CompatibleObjectStorage { .byte_stream() .await .map_err(|io_error| Retry::Permanent(StorageError::from(io_error)))?; - let mut put_object_request = self.s3_client + let mut put_object_request = self + .s3_client .put_object() .bucket(bucket) .key(key) @@ -324,7 +328,7 @@ impl S3CompatibleObjectStorage { .content_length(len as i64); let (s3_sse, kms_key_id) = self.apply_server_side_encryption( self.server_side_encryption.clone(), - self.sse_kms_key_id.clone() + self.sse_kms_key_id.clone(), ); if let Some(encryption) = s3_sse { put_object_request = put_object_request.server_side_encryption(encryption); @@ -332,16 +336,13 @@ impl S3CompatibleObjectStorage { put_object_request = put_object_request.ssekms_key_id(kms_key_id); } } - put_object_request - .send() - .await - .map_err(|sdk_error| { - if sdk_error.is_retryable() { - Retry::Transient(StorageError::from(sdk_error)) - } else { - Retry::Permanent(StorageError::from(sdk_error)) - } - })?; + put_object_request.send().await.map_err(|sdk_error| { + if sdk_error.is_retryable() { + Retry::Transient(StorageError::from(sdk_error)) + } else { + Retry::Permanent(StorageError::from(sdk_error)) + } + })?; crate::STORAGE_METRICS.object_storage_put_parts.inc(); crate::STORAGE_METRICS @@ -368,13 +369,14 @@ impl S3CompatibleObjectStorage { async fn create_multipart_upload(&self, key: &str) -> StorageResult { let upload_id = aws_retry(&self.retry_params, || async { - let mut create_multipart_req = self.s3_client + let mut create_multipart_req = self + .s3_client .create_multipart_upload() .bucket(self.bucket.clone()) .key(key); let (s3_sse, kms_key_id) = self.apply_server_side_encryption( self.server_side_encryption.clone(), - self.sse_kms_key_id.clone() + self.sse_kms_key_id.clone(), ); if let Some(encryption) = s3_sse { create_multipart_req = create_multipart_req.server_side_encryption(encryption); @@ -382,9 +384,7 @@ impl S3CompatibleObjectStorage { create_multipart_req = create_multipart_req.ssekms_key_id(kms_key_id); } } - create_multipart_req - .send() - .await + create_multipart_req.send().await }) .await? .upload_id From 202c6e9cbaf05c598270d181ba8c85b648d0a5b5 Mon Sep 17 00:00:00 2001 From: samoii Date: Sat, 21 Sep 2024 05:33:20 +0700 Subject: [PATCH 10/12] fix lint --- quickwit/quickwit-config/src/lib.rs | 4 ++-- .../src/object_storage/s3_compatible_storage.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/quickwit/quickwit-config/src/lib.rs b/quickwit/quickwit-config/src/lib.rs index ba085c95ce3..3a168e622a2 100644 --- a/quickwit/quickwit-config/src/lib.rs +++ b/quickwit/quickwit-config/src/lib.rs @@ -80,8 +80,8 @@ pub use crate::node_config::{ use crate::source_config::serialize::{SourceConfigV0_7, SourceConfigV0_8, VersionedSourceConfig}; pub use crate::storage_config::{ AzureStorageConfig, FileStorageConfig, GoogleCloudStorageConfig, RamStorageConfig, - S3ServerSideEncryption, S3StorageConfig, StorageBackend, StorageBackendFlavor, StorageConfig, - StorageConfigs, + S3ServerSideEncryption, S3StorageConfig, StorageBackend, StorageBackendFlavor, StorageConfig, + StorageConfigs, }; /// Returns true if the ingest API v2 is enabled. diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index 1179bce42b7..310e6c7cca6 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -34,7 +34,7 @@ use aws_sdk_s3::operation::get_object::{GetObjectError, GetObjectOutput}; use aws_sdk_s3::primitives::ByteStream; use aws_sdk_s3::types::builders::ObjectIdentifierBuilder; use aws_sdk_s3::types::{ - CompletedMultipartUpload, CompletedPart, Delete, ObjectIdentifier, ServerSideEncryption + CompletedMultipartUpload, CompletedPart, Delete, ObjectIdentifier, ServerSideEncryption, }; use aws_sdk_s3::Client as S3Client; use base64::prelude::{Engine, BASE64_STANDARD}; @@ -307,7 +307,7 @@ impl S3CompatibleObjectStorage { }; (server_side_encryption, kms_key_id) } - + async fn put_single_part_single_try<'a>( &'a self, bucket: &'a str, From 5730b1eb4ecbf8cce32ad208c1032dbf72faadf2 Mon Sep 17 00:00:00 2001 From: samoii Date: Sat, 21 Sep 2024 11:06:04 +0700 Subject: [PATCH 11/12] fix clippy --- .../src/object_storage/s3_compatible_storage.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index 310e6c7cca6..f57a0c48cce 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -181,7 +181,7 @@ impl S3CompatibleObjectStorage { let retry_params = RetryParams::aggressive(); let disable_multi_object_delete = s3_storage_config.disable_multi_object_delete; let disable_multipart_upload = s3_storage_config.disable_multipart_upload; - let server_side_encryption = s3_storage_config.server_side_encryption.clone(); + let server_side_encryption = s3_storage_config.server_side_encryption; let sse_kms_key_id = s3_storage_config.sse_kms_key_id.clone(); Ok(Self { s3_client, @@ -288,8 +288,8 @@ impl S3CompatibleObjectStorage { .to_path_buf() } - fn apply_server_side_encryption<'a>( - &'a self, + fn apply_server_side_encryption( + &self, encryption: Option, kms_key_id: Option, ) -> (Option, Option) { @@ -327,7 +327,7 @@ impl S3CompatibleObjectStorage { .body(body) .content_length(len as i64); let (s3_sse, kms_key_id) = self.apply_server_side_encryption( - self.server_side_encryption.clone(), + self.server_side_encryption, self.sse_kms_key_id.clone(), ); if let Some(encryption) = s3_sse { @@ -375,7 +375,7 @@ impl S3CompatibleObjectStorage { .bucket(self.bucket.clone()) .key(key); let (s3_sse, kms_key_id) = self.apply_server_side_encryption( - self.server_side_encryption.clone(), + self.server_side_encryption, self.sse_kms_key_id.clone(), ); if let Some(encryption) = s3_sse { From 4243a5f411a5029e6a09aab8f1358e7e516eb404 Mon Sep 17 00:00:00 2001 From: samoii Date: Sat, 21 Sep 2024 11:13:07 +0700 Subject: [PATCH 12/12] fix rustfmt --- .../src/object_storage/s3_compatible_storage.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index f57a0c48cce..61d748037a4 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -326,10 +326,8 @@ impl S3CompatibleObjectStorage { .key(key) .body(body) .content_length(len as i64); - let (s3_sse, kms_key_id) = self.apply_server_side_encryption( - self.server_side_encryption, - self.sse_kms_key_id.clone(), - ); + let (s3_sse, kms_key_id) = self + .apply_server_side_encryption(self.server_side_encryption, self.sse_kms_key_id.clone()); if let Some(encryption) = s3_sse { put_object_request = put_object_request.server_side_encryption(encryption); if let Some(kms_key_id) = kms_key_id {