Skip to content

Commit

Permalink
fix TestIntegrationEmbedded
Browse files Browse the repository at this point in the history
  • Loading branch information
Slach committed Oct 12, 2024
1 parent 954053b commit c19fb8d
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 10 deletions.
7 changes: 7 additions & 0 deletions pkg/backup/backuper.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,3 +430,10 @@ func (b *Backuper) GetLocalDataSize(ctx context.Context) (float64, error) {
err := b.ch.SelectSingleRow(ctx, &localDataSize, "SELECT value FROM system.asynchronous_metrics WHERE metric='TotalBytesOfMergeTreeTables'")
return localDataSize, err
}

func (b *Backuper) GetStateBackupDir() string {
if b.isEmbedded {
return b.EmbeddedBackupDataPath
}
return b.DefaultDataPath
}
2 changes: 1 addition & 1 deletion pkg/backup/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (b *Backuper) Download(backupName string, tablePattern string, partitions [
return err
}
if b.resume {
b.resumableState = resumable.NewState(b.DefaultDataPath, backupName, "download", map[string]interface{}{
b.resumableState = resumable.NewState(b.GetStateBackupDir(), backupName, "download", map[string]interface{}{
"tablePattern": tablePattern,
"partitions": partitions,
"schemaOnly": schemaOnly,
Expand Down
2 changes: 1 addition & 1 deletion pkg/backup/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (b *Backuper) Upload(backupName string, deleteSource bool, diffFrom, diffFr
backupMetadata.RequiredBackup = diffFromRemote
}
if b.resume {
b.resumableState = resumable.NewState(b.DefaultDataPath, backupName, "upload", map[string]interface{}{
b.resumableState = resumable.NewState(b.GetStateBackupDir(), backupName, "upload", map[string]interface{}{
"diffFrom": diffFrom,
"diffFromRemote": diffFromRemote,
"tablePattern": tablePattern,
Expand Down
20 changes: 14 additions & 6 deletions pkg/resumable/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@ type State struct {
params map[string]interface{}
}

func NewState(defaultDiskPath, backupName, command string, params map[string]interface{}) *State {
func NewState(stateBackupDir, backupName, command string, params map[string]interface{}) *State {
s := State{
stateFile: path.Join(defaultDiskPath, "backup", backupName, fmt.Sprintf("%s.state2", command)),
stateFile: path.Join(stateBackupDir, "backup", backupName, fmt.Sprintf("%s.state2", command)),
db: nil,
}
db, err := bolt.Open(s.stateFile, 0600, nil)
if err != nil {
if db, err := bolt.Open(s.stateFile, 0600, nil); err == nil {
s.db = db
} else {
log.Warn().Msgf("resumable state: can't open %s error: %v", s.stateFile, err)
return nil
return &s
}
s.db = db
s.LoadState()
s.LoadParams()
s.CleanupStateIfParamsChange(params)
Expand Down Expand Up @@ -85,6 +86,9 @@ func (s *State) LoadState() {
}

func (s *State) CleanupStateIfParamsChange(params map[string]interface{}) {
if s.db == nil {
return
}
needCleanup := false
if s.params != nil && params == nil {
needCleanup = true
Expand Down Expand Up @@ -155,6 +159,7 @@ func (s *State) IsAlreadyProcessedBool(path string) bool {
}

func (s *State) IsAlreadyProcessed(path string) (bool, int64) {
log.Info().Msgf("SUKA2 s.db=%v path=%s", s.db, path)
if s.db == nil {
return false, 0
}
Expand All @@ -178,6 +183,9 @@ func (s *State) IsAlreadyProcessed(path string) (bool, int64) {
}

func (s *State) Close() {
if s.db == nil {
return
}
if err := s.db.Close(); err != nil {
log.Warn().Err(err).Msgf("resumable state: can't close %s", s.stateFile)
}
Expand Down
13 changes: 11 additions & 2 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1711,6 +1711,10 @@ func (api *APIServer) ResumeOperationsAfterRestart() error {
if err != nil {
return err
}
embeddedBackupDiskPath, err := ch.GetEmbeddedBackupPath(disks)
if err != nil {
return err
}
backupList, err := os.ReadDir(path.Join(defaultDiskPath, "backup"))
if err != nil {
return err
Expand All @@ -1722,9 +1726,14 @@ func (api *APIServer) ResumeOperationsAfterRestart() error {
if err != nil {
return err
}
embeddedStateFiles, err := filepath.Glob(path.Join(embeddedBackupDiskPath, "backup", backupName, "*.state2"))
if err != nil {
return err
}
stateFiles = append(stateFiles, embeddedStateFiles...)
for _, stateFile := range stateFiles {
command := strings.TrimSuffix(strings.TrimPrefix(stateFile, path.Join(defaultDiskPath, "backup", backupName)+"/"), ".state2")
state := resumable.NewState(defaultDiskPath, backupName, command, nil)
command := strings.TrimSuffix(filepath.Base(stateFile), ".state2")
state := resumable.NewState(filepath.Dir(stateFile), backupName, command, nil)
params := state.GetParams()
state.Close()
if !api.config.API.AllowParallel && status.Current.InProgress() {
Expand Down

0 comments on commit c19fb8d

Please sign in to comment.