diff --git a/ChangeLog.md b/ChangeLog.md index 84cbe0f3..e89888a2 100644 --- a/ChangeLog.md +++ b/ChangeLog.md @@ -1,4 +1,6 @@ # v2.5.12 +IMPROVEMENTS +- added "object_disk_size" to upload and download command logs BUG FIXES - fixed corner case in `API server` hang when `watch` background command failures, fix [929](https://github.com/Altinity/clickhouse-backup/pull/929) thanks @tadus21 diff --git a/pkg/backup/download.go b/pkg/backup/download.go index 89341126..fe2a7ad8 100644 --- a/pkg/backup/download.go +++ b/pkg/backup/download.go @@ -231,19 +231,21 @@ func (b *Backuper) Download(backupName string, tablePattern string, partitions [ backupMetadata := remoteBackup.BackupMetadata backupMetadata.Tables = tablesForDownload - backupMetadata.DataSize = dataSize - backupMetadata.MetadataSize = metadataSize if b.isEmbedded && b.cfg.ClickHouse.EmbeddedBackupDisk != "" && backupMetadata.Tables != nil && len(backupMetadata.Tables) > 0 { localClickHouseBackupFile := path.Join(b.EmbeddedBackupDataPath, backupName, ".backup") remoteClickHouseBackupFile := path.Join(backupName, ".backup") - if err = b.downloadSingleBackupFile(ctx, remoteClickHouseBackupFile, localClickHouseBackupFile, disks); err != nil { + localEmbeddedMetadataSize := int64(0) + if localEmbeddedMetadataSize, err = b.downloadSingleBackupFile(ctx, remoteClickHouseBackupFile, localClickHouseBackupFile, disks); err != nil { return err } + metadataSize += uint64(localEmbeddedMetadataSize) } backupMetadata.CompressedSize = 0 backupMetadata.DataFormat = "" + backupMetadata.DataSize = dataSize + backupMetadata.MetadataSize = metadataSize backupMetadata.ConfigSize = configSize backupMetadata.RBACSize = rbacSize backupMetadata.ClickhouseBackupVersion = backupVersion @@ -285,7 +287,8 @@ func (b *Backuper) Download(backupName string, tablePattern string, partitions [ log.WithFields(apexLog.Fields{ "duration": utils.HumanizeDuration(time.Since(startDownload)), - "size": utils.FormatBytes(dataSize + metadataSize + rbacSize + configSize), + "download_size": utils.FormatBytes(dataSize + metadataSize + rbacSize + configSize), + "object_disk_size": utils.FormatBytes(backupMetadata.ObjectDiskSize), "version": backupVersion, }).Info("done") return nil @@ -1076,12 +1079,17 @@ func (b *Backuper) makePartHardlinks(exists, new string) error { return nil } -func (b *Backuper) downloadSingleBackupFile(ctx context.Context, remoteFile string, localFile string, disks []clickhouse.Disk) error { - if b.resume && b.resumableState.IsAlreadyProcessedBool(remoteFile) { - return nil +func (b *Backuper) downloadSingleBackupFile(ctx context.Context, remoteFile string, localFile string, disks []clickhouse.Disk) (int64, error) { + var size int64 + var isProcessed bool + if b.resume { + if isProcessed, size = b.resumableState.IsAlreadyProcessed(remoteFile); isProcessed { + return size, nil + } } log := b.log.WithField("logger", "downloadSingleBackupFile") retry := retrier.New(retrier.ConstantBackoff(b.cfg.General.RetriesOnFailure, b.cfg.General.RetriesDuration), nil) + err := retry.RunCtx(ctx, func(ctx context.Context) error { remoteReader, err := b.dst.GetFileReader(ctx, remoteFile) if err != nil { @@ -1105,7 +1113,7 @@ func (b *Backuper) downloadSingleBackupFile(ctx context.Context, remoteFile stri } }() - _, err = io.CopyBuffer(localWriter, remoteReader, nil) + size, err = io.CopyBuffer(localWriter, remoteReader, nil) if err != nil { return err } @@ -1116,12 +1124,12 @@ func (b *Backuper) downloadSingleBackupFile(ctx context.Context, remoteFile stri return nil }) if err != nil { - return err + return 0, err } if b.resume { - b.resumableState.AppendToState(remoteFile, 0) + b.resumableState.AppendToState(remoteFile, size) } - return nil + return size, nil } // filterDisksByStoragePolicyAndType - https://github.com/Altinity/clickhouse-backup/issues/561 diff --git a/pkg/backup/upload.go b/pkg/backup/upload.go index e3674da6..a7195a6d 100644 --- a/pkg/backup/upload.go +++ b/pkg/backup/upload.go @@ -188,6 +188,16 @@ func (b *Backuper) Upload(backupName string, deleteSource bool, diffFrom, diffFr if backupMetadata.ConfigSize, err = b.uploadConfigData(ctx, backupName); err != nil { return fmt.Errorf("b.uploadConfigData return error: %v", err) } + //upload embedded .backup file + if b.isEmbedded && b.cfg.ClickHouse.EmbeddedBackupDisk != "" && backupMetadata.Tables != nil && len(backupMetadata.Tables) > 0 { + localClickHouseBackupFile := path.Join(b.EmbeddedBackupDataPath, backupName, ".backup") + remoteClickHouseBackupFile := path.Join(backupName, ".backup") + localEmbeddedMetadataSize := int64(0) + if localEmbeddedMetadataSize, err = b.uploadSingleBackupFile(ctx, localClickHouseBackupFile, remoteClickHouseBackupFile); err != nil { + return fmt.Errorf("b.uploadSingleBackupFile return error: %v", err) + } + metadataSize += localEmbeddedMetadataSize + } // upload metadata for backup backupMetadata.CompressedSize = uint64(compressedDataSize) @@ -220,20 +230,14 @@ func (b *Backuper) Upload(backupName string, deleteSource bool, diffFrom, diffFr return fmt.Errorf("can't upload %s: %v", remoteBackupMetaFile, err) } } - if b.isEmbedded && b.cfg.ClickHouse.EmbeddedBackupDisk != "" && backupMetadata.Tables != nil && len(backupMetadata.Tables) > 0 { - localClickHouseBackupFile := path.Join(b.EmbeddedBackupDataPath, backupName, ".backup") - remoteClickHouseBackupFile := path.Join(backupName, ".backup") - if err = b.uploadSingleBackupFile(ctx, localClickHouseBackupFile, remoteClickHouseBackupFile); err != nil { - return fmt.Errorf("b.uploadSingleBackupFile return error: %v", err) - } - } if b.resume { b.resumableState.Close() } log.WithFields(apexLog.Fields{ - "duration": utils.HumanizeDuration(time.Since(startUpload)), - "size": utils.FormatBytes(uint64(compressedDataSize) + uint64(metadataSize) + uint64(len(newBackupMetadataBody)) + backupMetadata.RBACSize + backupMetadata.ConfigSize), - "version": backupVersion, + "duration": utils.HumanizeDuration(time.Since(startUpload)), + "upload_size": utils.FormatBytes(uint64(compressedDataSize) + uint64(metadataSize) + uint64(len(newBackupMetadataBody)) + backupMetadata.RBACSize + backupMetadata.ConfigSize), + "object_disk_size": utils.FormatBytes(backupMetadata.ObjectDiskSize), + "version": backupVersion, }).Info("done") // Remote old backup retention @@ -290,14 +294,16 @@ func (b *Backuper) RemoveOldBackupsRemote(ctx context.Context) error { return nil } -func (b *Backuper) uploadSingleBackupFile(ctx context.Context, localFile, remoteFile string) error { - if b.resume && b.resumableState.IsAlreadyProcessedBool(remoteFile) { - return nil +func (b *Backuper) uploadSingleBackupFile(ctx context.Context, localFile, remoteFile string) (int64, error) { + if b.resume { + if isProcessed, size := b.resumableState.IsAlreadyProcessed(remoteFile); isProcessed { + return size, nil + } } log := b.log.WithField("logger", "uploadSingleBackupFile") f, err := os.Open(localFile) if err != nil { - return fmt.Errorf("can't open %s: %v", localFile, err) + return 0, fmt.Errorf("can't open %s: %v", localFile, err) } defer func() { if err := f.Close(); err != nil { @@ -309,16 +315,16 @@ func (b *Backuper) uploadSingleBackupFile(ctx context.Context, localFile, remote return b.dst.PutFile(ctx, remoteFile, f) }) if err != nil { - return fmt.Errorf("can't upload %s: %v", remoteFile, err) + return 0, fmt.Errorf("can't upload %s: %v", remoteFile, err) + } + info, err := os.Stat(localFile) + if err != nil { + return 0, fmt.Errorf("can't stat %s", localFile) } if b.resume { - info, err := os.Stat(localFile) - if err != nil { - return fmt.Errorf("can't stat %s", localFile) - } b.resumableState.AppendToState(remoteFile, info.Size()) } - return nil + return info.Size(), nil } func (b *Backuper) prepareTableListToUpload(ctx context.Context, backupName string, tablePattern string, partitions []string) (tablesForUpload ListOfTables, err error) { diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index 5aaa84d4..bbc976b9 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -635,7 +635,7 @@ func TestConfigs(t *testing.T) { } } -// TestLongListRemote - no parallel, cause need to restart minito +// TestLongListRemote - no parallel, cause need to restart minio func TestLongListRemote(t *testing.T) { ch := &TestClickHouse{} r := require.New(t)