From 30e1e407337e1fb512a5441aebbbfaca5ed85ddd Mon Sep 17 00:00:00 2001 From: Renan Rangel Date: Thu, 26 Dec 2024 16:06:57 +0000 Subject: [PATCH] add S3 minimum part size defined by the user (#17171) Signed-off-by: Renan Rangel --- examples/operator/operator.yaml | 20 +++++++ go.mod | 2 +- go/flags/endtoend/vtbackup.txt | 1 + go/flags/endtoend/vtctld.txt | 1 + go/flags/endtoend/vttablet.txt | 1 + go/vt/mysqlctl/s3backupstorage/s3.go | 39 +++++++++++--- go/vt/mysqlctl/s3backupstorage/s3_mock.go | 12 ++++- go/vt/mysqlctl/s3backupstorage/s3_test.go | 65 +++++++++++++++++++++++ 8 files changed, 132 insertions(+), 9 deletions(-) diff --git a/examples/operator/operator.yaml b/examples/operator/operator.yaml index 4b1b64df1ac..ded85de5285 100644 --- a/examples/operator/operator.yaml +++ b/examples/operator/operator.yaml @@ -679,6 +679,9 @@ spec: maxLength: 256 pattern: ^[^\r\n]*$ type: string + minPartSize: + format: int64 + type: integer region: minLength: 1 type: string @@ -1995,6 +1998,9 @@ spec: maxLength: 256 pattern: ^[^\r\n]*$ type: string + minPartSize: + format: int64 + type: integer region: minLength: 1 type: string @@ -3510,6 +3516,14 @@ spec: mysql80Compatible: type: string type: object + mysqldExporter: + type: string + vtbackup: + type: string + vtorc: + type: string + vttablet: + type: string type: object name: maxLength: 63 @@ -5241,6 +5255,9 @@ spec: maxLength: 256 pattern: ^[^\r\n]*$ type: string + minPartSize: + format: int64 + type: integer region: minLength: 1 type: string @@ -6688,6 +6705,9 @@ spec: maxLength: 256 pattern: ^[^\r\n]*$ type: string + minPartSize: + format: int64 + type: integer region: minLength: 1 type: string diff --git a/go.mod b/go.mod index 7c2cff0d433..603bc37c62e 100644 --- a/go.mod +++ b/go.mod @@ -96,6 +96,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/s3 v1.66.3 github.com/aws/smithy-go v1.22.0 github.com/bndr/gotabulate v1.1.2 + github.com/dustin/go-humanize v1.0.1 github.com/gammazero/deque v0.2.1 github.com/google/safehtml v0.1.0 github.com/hashicorp/go-version v1.7.0 @@ -153,7 +154,6 @@ require ( github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.5 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect - github.com/dustin/go-humanize v1.0.1 // indirect github.com/eapache/queue/v2 v2.0.0-20230407133247-75960ed334e4 // indirect github.com/ebitengine/purego v0.8.1 // indirect github.com/envoyproxy/go-control-plane v0.13.1 // indirect diff --git a/go/flags/endtoend/vtbackup.txt b/go/flags/endtoend/vtbackup.txt index b4405960711..7bda9048211 100644 --- a/go/flags/endtoend/vtbackup.txt +++ b/go/flags/endtoend/vtbackup.txt @@ -195,6 +195,7 @@ Flags: --remote_operation_timeout duration time to wait for a remote operation (default 15s) --restart_before_backup Perform a mysqld clean/full restart after applying binlogs, but before taking the backup. Only makes sense to work around xtrabackup bugs. --s3_backup_aws_endpoint string endpoint of the S3 backend (region must be provided). + --s3_backup_aws_min_partsize int Minimum part size to use, defaults to 5MiB but can be increased due to the dataset size. (default 5242880) --s3_backup_aws_region string AWS region to use. (default "us-east-1") --s3_backup_aws_retries int AWS request retries. (default -1) --s3_backup_force_path_style force the s3 path style. diff --git a/go/flags/endtoend/vtctld.txt b/go/flags/endtoend/vtctld.txt index be0f5114e79..c84c5fadf5f 100644 --- a/go/flags/endtoend/vtctld.txt +++ b/go/flags/endtoend/vtctld.txt @@ -110,6 +110,7 @@ Flags: --purge_logs_interval duration how often try to remove old logs (default 1h0m0s) --remote_operation_timeout duration time to wait for a remote operation (default 15s) --s3_backup_aws_endpoint string endpoint of the S3 backend (region must be provided). + --s3_backup_aws_min_partsize int Minimum part size to use, defaults to 5MiB but can be increased due to the dataset size. (default 5242880) --s3_backup_aws_region string AWS region to use. (default "us-east-1") --s3_backup_aws_retries int AWS request retries. (default -1) --s3_backup_force_path_style force the s3 path style. diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt index 48776d0e8e0..bc647fb5347 100644 --- a/go/flags/endtoend/vttablet.txt +++ b/go/flags/endtoend/vttablet.txt @@ -303,6 +303,7 @@ Flags: --restore_from_backup_ts string (init restore parameter) if set, restore the latest backup taken at or before this timestamp. Example: '2021-04-29.133050' --retain_online_ddl_tables duration How long should vttablet keep an old migrated table before purging it (default 24h0m0s) --s3_backup_aws_endpoint string endpoint of the S3 backend (region must be provided). + --s3_backup_aws_min_partsize int Minimum part size to use, defaults to 5MiB but can be increased due to the dataset size. (default 5242880) --s3_backup_aws_region string AWS region to use. (default "us-east-1") --s3_backup_aws_retries int AWS request retries. (default -1) --s3_backup_force_path_style force the s3 path style. diff --git a/go/vt/mysqlctl/s3backupstorage/s3.go b/go/vt/mysqlctl/s3backupstorage/s3.go index 97861e83729..4dd583009aa 100644 --- a/go/vt/mysqlctl/s3backupstorage/s3.go +++ b/go/vt/mysqlctl/s3backupstorage/s3.go @@ -47,6 +47,7 @@ import ( "github.com/aws/aws-sdk-go-v2/service/s3/types" transport "github.com/aws/smithy-go/endpoints" "github.com/aws/smithy-go/middleware" + "github.com/dustin/go-humanize" "github.com/spf13/pflag" errorsbackup "vitess.io/vitess/go/vt/mysqlctl/errors" @@ -57,6 +58,11 @@ import ( "vitess.io/vitess/go/vt/servenv" ) +const ( + sseCustomerPrefix = "sse_c:" + MaxPartSize = 1024 * 1024 * 1024 * 5 // 5GiB - limited by AWS https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html +) + var ( // AWS API region region string @@ -86,6 +92,11 @@ var ( // path component delimiter delimiter = "/" + + // minimum part size + minPartSize int64 + + ErrPartSize = errors.New("minimum S3 part size must be between 5MiB and 5GiB") ) func registerFlags(fs *pflag.FlagSet) { @@ -98,6 +109,7 @@ func registerFlags(fs *pflag.FlagSet) { fs.BoolVar(&tlsSkipVerifyCert, "s3_backup_tls_skip_verify_cert", false, "skip the 'certificate is valid' check for SSL connections.") fs.StringVar(&requiredLogLevel, "s3_backup_log_level", "LogOff", "determine the S3 loglevel to use from LogOff, LogDebug, LogDebugWithSigning, LogDebugWithHTTPBody, LogDebugWithRequestRetries, LogDebugWithRequestErrors.") fs.StringVar(&sse, "s3_backup_server_side_encryption", "", "server-side encryption algorithm (e.g., AES256, aws:kms, sse_c:/path/to/key/file).") + fs.Int64Var(&minPartSize, "s3_backup_aws_min_partsize", manager.MinUploadPartSize, "Minimum part size to use, defaults to 5MiB but can be increased due to the dataset size.") } func init() { @@ -111,8 +123,6 @@ type logNameToLogLevel map[string]aws.ClientLogMode var logNameMap logNameToLogLevel -const sseCustomerPrefix = "sse_c:" - type endpointResolver struct { r s3.EndpointResolverV2 endpoint *string @@ -166,7 +176,12 @@ func (bh *S3BackupHandle) AddFile(ctx context.Context, filename string, filesize return nil, fmt.Errorf("AddFile cannot be called on read-only backup") } - partSizeBytes := calculateUploadPartSize(filesize) + partSizeBytes, err := calculateUploadPartSize(filesize) + if err != nil { + return nil, err + } + + bh.bs.params.Logger.Infof("Using S3 upload part size: %s", humanize.IBytes(uint64(partSizeBytes))) reader, writer := io.Pipe() bh.handleAddFile(ctx, filename, partSizeBytes, reader, func(err error) { @@ -213,9 +228,11 @@ func (bh *S3BackupHandle) handleAddFile(ctx context.Context, filename string, pa }() } -func calculateUploadPartSize(filesize int64) int64 { +// calculateUploadPartSize is a helper to calculate the part size, taking into consideration the minimum part size +// passed in by an operator. +func calculateUploadPartSize(filesize int64) (partSizeBytes int64, err error) { // Calculate s3 upload part size using the source filesize - partSizeBytes := manager.DefaultUploadPartSize + partSizeBytes = manager.DefaultUploadPartSize if filesize > 0 { minimumPartSize := float64(filesize) / float64(manager.MaxUploadParts) // Round up to ensure large enough partsize @@ -224,7 +241,17 @@ func calculateUploadPartSize(filesize int64) int64 { partSizeBytes = calculatedPartSizeBytes } } - return partSizeBytes + + if minPartSize != 0 && partSizeBytes < minPartSize { + if minPartSize > MaxPartSize || minPartSize < manager.MinUploadPartSize { // 5GiB and 5MiB respectively + return 0, fmt.Errorf("%w, currently set to %s", + ErrPartSize, humanize.IBytes(uint64(minPartSize)), + ) + } + partSizeBytes = int64(minPartSize) + } + + return } // EndBackup is part of the backupstorage.BackupHandle interface. diff --git a/go/vt/mysqlctl/s3backupstorage/s3_mock.go b/go/vt/mysqlctl/s3backupstorage/s3_mock.go index f244c4d63b1..910a22bd9d5 100644 --- a/go/vt/mysqlctl/s3backupstorage/s3_mock.go +++ b/go/vt/mysqlctl/s3backupstorage/s3_mock.go @@ -162,7 +162,11 @@ func FailFirstWrite(s3bh *S3BackupHandle, ctx context.Context, filename string, return nil, fmt.Errorf("AddFile cannot be called on read-only backup") } - partSizeBytes := calculateUploadPartSize(filesize) + partSizeBytes, err := calculateUploadPartSize(filesize) + if err != nil { + return nil, err + } + reader, writer := io.Pipe() r := io.Reader(reader) @@ -181,7 +185,11 @@ func FailAllWrites(s3bh *S3BackupHandle, ctx context.Context, filename string, f return nil, fmt.Errorf("AddFile cannot be called on read-only backup") } - partSizeBytes := calculateUploadPartSize(filesize) + partSizeBytes, err := calculateUploadPartSize(filesize) + if err != nil { + return nil, err + } + reader, writer := io.Pipe() r := &failReadPipeReader{PipeReader: reader} diff --git a/go/vt/mysqlctl/s3backupstorage/s3_test.go b/go/vt/mysqlctl/s3backupstorage/s3_test.go index 84ef8de6e48..5e9364219af 100644 --- a/go/vt/mysqlctl/s3backupstorage/s3_test.go +++ b/go/vt/mysqlctl/s3backupstorage/s3_test.go @@ -328,3 +328,68 @@ func TestWithParams(t *testing.T) { assert.NotNil(t, s3.transport.DialContext) assert.NotNil(t, s3.transport.Proxy) } + +func TestCalculateUploadPartSize(t *testing.T) { + originalMinimum := minPartSize + defer func() { minPartSize = originalMinimum }() + + tests := []struct { + name string + filesize int64 + minimumPartSize int64 + want int64 + err error + }{ + { + name: "minimum - 10 MiB", + filesize: 1024 * 1024 * 10, // 10 MiB + minimumPartSize: 1024 * 1024 * 5, // 5 MiB + want: 1024 * 1024 * 5, // 5 MiB, + err: nil, + }, + { + name: "below minimum - 10 MiB", + filesize: 1024 * 1024 * 10, // 10 MiB + minimumPartSize: 1024 * 1024 * 8, // 8 MiB + want: 1024 * 1024 * 8, // 8 MiB, + err: nil, + }, + { + name: "above minimum - 1 TiB", + filesize: 1024 * 1024 * 1024 * 1024, // 1 TiB + minimumPartSize: 1024 * 1024 * 5, // 5 MiB + want: 109951163, // ~104 MiB + err: nil, + }, + { + name: "below minimum - 1 TiB", + filesize: 1024 * 1024 * 1024 * 1024, // 1 TiB + minimumPartSize: 1024 * 1024 * 200, // 200 MiB + want: 1024 * 1024 * 200, // 200 MiB + err: nil, + }, + { + name: "below S3 limits - 5 MiB", + filesize: 1024 * 1024 * 3, // 3 MiB + minimumPartSize: 1024 * 1024 * 4, // 4 MiB + want: 1024 * 1024 * 5, // 5 MiB - should always return the minimum + err: nil, + }, + { + name: "above S3 limits - 5 GiB", + filesize: 1024 * 1024 * 1024 * 1024, // 1 TiB + minimumPartSize: 1024 * 1024 * 1024 * 6, // 6 GiB + want: 0, + err: ErrPartSize, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + minPartSize = tt.minimumPartSize + partSize, err := calculateUploadPartSize(tt.filesize) + require.ErrorIs(t, err, tt.err) + require.Equal(t, tt.want, partSize) + }) + } +}