Skip to content

Commit

Permalink
fix TestIntegrationSFTP
Browse files Browse the repository at this point in the history
  • Loading branch information
Slach committed Oct 11, 2024
1 parent 7be46a4 commit 954053b
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 20 deletions.
21 changes: 10 additions & 11 deletions pkg/resumable/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func NewState(defaultDiskPath, backupName, command string, params map[string]int
}
db, err := bolt.Open(s.stateFile, 0600, nil)
if err != nil {
log.Warn().Msgf("can't open %s error: %v", s.stateFile, err)
log.Warn().Msgf("resumable state: can't open %s error: %v", s.stateFile, err)
return nil
}
s.db = db
Expand All @@ -42,7 +42,7 @@ func (s *State) GetParams() map[string]interface{} {
func (s *State) getBucket(tx *bolt.Tx) *bolt.Bucket {
bucket := tx.Bucket(bucketName)
if bucket == nil {
log.Fatal().Msgf("can't open bucket %s in %s", bucketName, s.stateFile)
log.Fatal().Msgf("resumable state: can't open bucket %s in %s", bucketName, s.stateFile)
}
return bucket
}
Expand All @@ -60,7 +60,7 @@ func (s *State) LoadParams() {
}
return nil
}); err != nil {
log.Warn().Msgf("can't load params from %s, error: %v", s.stateFile, err)
log.Warn().Msgf("resumable state: can't load params from %s, error: %v", s.stateFile, err)
}
}

Expand All @@ -74,7 +74,7 @@ func (s *State) LoadState() {
if bucket == nil {
bucket, err = tx.CreateBucket(bucketName)
if err != nil {
return fmt.Errorf("can't create bucket: %s", err)
return fmt.Errorf("resumable state: can't create bucket: %s", err)
}
}
return nil
Expand All @@ -90,7 +90,6 @@ func (s *State) CleanupStateIfParamsChange(params map[string]interface{}) {
needCleanup = true
}
if s.params != nil && params != nil && !common.CompareMaps(s.params, params) {
log.Debug().Msgf("SUKA s.params != nil %v && params != nil %v", s.params != nil, params != nil)
needCleanup = true
}

Expand All @@ -107,7 +106,7 @@ func (s *State) CleanupStateIfParamsChange(params map[string]interface{}) {
return nil
})
if err != nil {
log.Warn().Msgf("can't cleanupBucket %s in %s", bucketName, s.stateFile)
log.Warn().Msgf("resumable state: can't cleanupBucket %s in %s", bucketName, s.stateFile)
}
}
_ = s.db.Batch(func(tx *bolt.Tx) error {
Expand All @@ -127,11 +126,11 @@ func (s *State) saveParams(b *bolt.Bucket, params map[string]interface{}) {
}
paramsBytes, err := json.Marshal(s.params)
if err != nil {
log.Warn().Msgf("can't json.Marshal(s.params=%#v): %v", s.params, err)
log.Warn().Msgf("resumable state: can't json.Marshal(s.params=%#v): %v", s.params, err)
return
}
if err = b.Put([]byte("params"), paramsBytes); err != nil {
log.Warn().Msgf("can't bolt.Put(s.params): %v", err)
log.Warn().Msgf("resumable state: can't bolt.Put(s.params): %v", err)
}
}

Expand All @@ -146,7 +145,7 @@ func (s *State) AppendToState(path string, size int64) {
return b.Put([]byte(path), buf[:n])
})
if err != nil {
log.Fatal().Msgf("can't write key %s to %s error: %v", path, s.stateFile, err)
log.Fatal().Msgf("resumable state: can't write key %s to %s error: %v", path, s.stateFile, err)
}
}

Expand All @@ -172,14 +171,14 @@ func (s *State) IsAlreadyProcessed(path string) (bool, int64) {
return nil
})
if err != nil {
log.Fatal().Msgf("can't read key %s to %s error: %v", path, s.stateFile, err)
log.Fatal().Msgf("resumable state: can't read key %s to %s error: %v", path, s.stateFile, err)
return false, 0
}
return found, size
}

func (s *State) Close() {
if err := s.db.Close(); err != nil {
log.Warn().Err(err).Msgf("can't close %s", s.stateFile)
log.Warn().Err(err).Msgf("resumable state: can't close %s", s.stateFile)
}
}
6 changes: 2 additions & 4 deletions pkg/storage/sftp.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,11 @@ func (sftp *SFTP) Connect(ctx context.Context) error {
}

func (sftp *SFTP) Close(ctx context.Context) error {
sftp.Debug("[SFTP_DEBUG] sftpClient.Close()")
if err := sftp.sftpClient.Close(); err != nil {
return err
return fmt.Errorf("sftpClient.Close() error: , %v", err)
}
sftp.Debug("[SFTP_DEBUG] sshClient.Close()")
if err := sftp.sshClient.Close(); err != nil {
return err
return fmt.Errorf("sshClient.Close() error: , %v", err)
}
return nil
}
Expand Down
12 changes: 7 additions & 5 deletions test/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2816,14 +2816,16 @@ func (env *TestEnvironment) checkResumeAlreadyProcessed(backupCmd, testBackupNam
}
out, err := env.DockerExecOut("clickhouse-backup", "bash", "-xce", backupCmd)
r.NoError(err, "%s\nunexpected checkResumeAlreadyProcessed error: %v", out, err)
const alreadyProcesses = "already processed"
const resumableWarning = "resumable state: can't"
const resumableCleanup = "state2 cleanup begin"
if strings.Contains(backupCmd, "--resume") {
if !strings.Contains(out, "already processed") {
log.Debug().Msg("!!!!!SUKA!!!!!")
if !strings.Contains(out, alreadyProcesses) || strings.Contains(out, resumableWarning) || strings.Contains(out, resumableCleanup) {
log.Debug().Msg(out)
}
r.NotContains(out, "can't")
r.NotContains(out, "state2 cleanup begin")
r.Contains(out, "already processed")
r.NotContains(out, resumableWarning)
r.NotContains(out, resumableCleanup)
r.Contains(out, alreadyProcesses)
}
}

Expand Down

0 comments on commit 954053b

Please sign in to comment.