diff --git a/go.mod b/go.mod index c9f452f32e..4e1d352046 100644 --- a/go.mod +++ b/go.mod @@ -127,6 +127,6 @@ require ( replace ( github.com/gocql/gocql => github.com/scylladb/gocql v1.12.0 - github.com/rclone/rclone => github.com/scylladb/rclone v1.54.1-0.20240312172628-afe1fd2aa65e + github.com/rclone/rclone => github.com/scylladb/rclone v1.54.1-0.20241202112750-92c79d296b68 google.golang.org/api v0.114.0 => github.com/scylladb/google-api-go-client v0.34.1-patched ) diff --git a/go.sum b/go.sum index 4eec7b07b4..c5e812a006 100644 --- a/go.sum +++ b/go.sum @@ -1051,8 +1051,8 @@ github.com/scylladb/gocqlx/v2 v2.8.0 h1:f/oIgoEPjKDKd+RIoeHqexsIQVIbalVmT+axwvUq github.com/scylladb/gocqlx/v2 v2.8.0/go.mod h1:4/+cga34PVqjhgSoo5Nr2fX1MQIqZB5eCE5DK4xeDig= github.com/scylladb/google-api-go-client v0.34.1-patched h1:DW+T0HA+74o6FDr3TFzVwgESabOB1eTwb4woE6oUziY= github.com/scylladb/google-api-go-client v0.34.1-patched/go.mod h1:RriRmS2wJXH+2yd9PRTEcR380U9AXmurWwznqVhzsSc= -github.com/scylladb/rclone v1.54.1-0.20240312172628-afe1fd2aa65e h1:lJRphCtu+nKd+mfo8whOTeFkgjMWvk8iCSlqgibKSa8= -github.com/scylladb/rclone v1.54.1-0.20240312172628-afe1fd2aa65e/go.mod h1:JGZp4EvCUK+6AM1Fe1dye5xvihTc/Bk0WnHHSCJOePM= +github.com/scylladb/rclone v1.54.1-0.20241202112750-92c79d296b68 h1:+qClmAsTLhSo3Vm40prizacg5IaUjtNQgoGgA8FyjKA= +github.com/scylladb/rclone v1.54.1-0.20241202112750-92c79d296b68/go.mod h1:JGZp4EvCUK+6AM1Fe1dye5xvihTc/Bk0WnHHSCJOePM= github.com/scylladb/scylla-manager/v3/pkg/managerclient v0.0.0-20241104134613-aba35605c28b h1:JRDV1d1FIiH0TIyHVmTAILAjQ2f8O4t7ZtZ/S+fT2sY= github.com/scylladb/scylla-manager/v3/pkg/managerclient v0.0.0-20241104134613-aba35605c28b/go.mod h1:Tss7a99vrgds+B70w8ZFG3Skxfr9Br3kAzrKP2b3CmQ= github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20241104134613-aba35605c28b h1:7CHNmPrQqSdApaEh5nkRL+D52KFHaOHVBBVDvytHEOY= diff --git a/vendor/github.com/rclone/rclone/backend/s3/s3.go b/vendor/github.com/rclone/rclone/backend/s3/s3.go index d51c6ee7e6..a77a5b9831 100644 --- a/vendor/github.com/rclone/rclone/backend/s3/s3.go +++ b/vendor/github.com/rclone/rclone/backend/s3/s3.go @@ -1396,7 +1396,7 @@ var retryErrorCodes = []int{ 503, // Service Unavailable/Slow Down - "Reduce your request rate" } -//S3 is pretty resilient, and the built in retry handling is probably sufficient +// S3 is pretty resilient, and the built in retry handling is probably sufficient // as it should notice closed connections and timeouts which are the most likely // sort of failure modes func (f *Fs) shouldRetry(err error) (bool, error) { @@ -1713,7 +1713,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e // Return an Object from a path // -//If it can't be found it returns the error ErrorObjectNotFound. +// If it can't be found it returns the error ErrorObjectNotFound. func (f *Fs) newObjectWithInfo(ctx context.Context, remote string, info *s3.Object) (fs.Object, error) { o := &Object{ fs: f, @@ -1979,25 +1979,28 @@ func (f *Fs) itemToDirEntry(ctx context.Context, remote string, object *s3.Objec return o, nil } -// listDir lists files and directories to out -func (f *Fs) listDir(ctx context.Context, bucket, directory, prefix string, addBucket bool) (entries fs.DirEntries, err error) { - // List the objects and directories - err = f.list(ctx, bucket, directory, prefix, addBucket, false, func(remote string, object *s3.Object, isDirectory bool) error { +// listDirCB calls callback on files as they are being listed. +func (f *Fs) listDir(ctx context.Context, bucket, directory, prefix string, addBucket bool, cb fs.ListCBCallback) error { + // List the objects and directories. + // Batch callbacks by 100 with walk.NewListRHelper (compatible callback type) + // in order to reduce the amount of allocations and function calls. + list := walk.NewListRHelper(fs.ListRCallback(cb)) + err := f.list(ctx, bucket, directory, prefix, addBucket, false, func(remote string, object *s3.Object, isDirectory bool) error { entry, err := f.itemToDirEntry(ctx, remote, object, isDirectory) if err != nil { return err } if entry != nil { - entries = append(entries, entry) + return list.Add(entry) } return nil }) if err != nil { - return nil, err + return err } // bucket must be present if listing succeeded f.cache.MarkOK(bucket) - return entries, nil + return list.Flush() } // listBuckets lists the buckets to out @@ -2029,7 +2032,7 @@ func (f *Fs) listBuckets(ctx context.Context) (entries fs.DirEntries, err error) // // This should return ErrDirNotFound if the directory isn't // found. -func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) { +func (f *Fs) List(ctx context.Context, dir string) (fs.DirEntries, error) { bucket, directory := f.split(dir) if bucket == "" { if directory != "" { @@ -2037,7 +2040,40 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e } return f.listBuckets(ctx) } - return f.listDir(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "") + + // Use callback for regular listing + var entries fs.DirEntries + err := f.listDir(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "", func(e fs.DirEntries) error { + entries = append(entries, e...) + return nil + }) + if err != nil { + return nil, err + } + return entries, nil +} + +// ListCB calls callback to the objects and directories in dir as they are being listed. +// The callback might be called for just a subset of directory entries. +// When listing buckets, the callback is called just once for all of them. +// +// dir should be "" to list the root, and should not have +// trailing slashes. +// +// This should return ErrDirNotFound if the directory isn't found. +func (f *Fs) ListCB(ctx context.Context, dir string, cb fs.ListCBCallback) error { + bucket, directory := f.split(dir) + if bucket == "" { + if directory != "" { + return fs.ErrorListBucketRequired + } + entries, err := f.listBuckets(ctx) + if err != nil { + return err + } + return cb(entries) + } + return f.listDir(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "", cb) } // ListR lists the objects and directories of the Fs starting @@ -2358,9 +2394,9 @@ func (f *Fs) copyMultipart(ctx context.Context, copyReq *s3.CopyObjectInput, dst // Copy src to this remote using server-side copy operations. // -// This is stored with the remote path given +// # This is stored with the remote path given // -// It returns the destination Object and a possible error +// # It returns the destination Object and a possible error // // Will only be called if src.Fs().Name() == f.Name() // diff --git a/vendor/github.com/rclone/rclone/fs/config.go b/vendor/github.com/rclone/rclone/fs/config.go index 22e3682f99..9d8793d05b 100644 --- a/vendor/github.com/rclone/rclone/fs/config.go +++ b/vendor/github.com/rclone/rclone/fs/config.go @@ -82,6 +82,7 @@ type ConfigInfo struct { Suffix string `yaml:"suffix"` SuffixKeepExtension bool `yaml:"suffix_keep_extension"` UseListR bool `yaml:"use_list_r"` + UseListCB bool `yaml:"use_list_cb"` BufferSize SizeSuffix `yaml:"buffer_size"` BwLimit BwTimetable `yaml:"bw_limit"` BwLimitFile BwTimetable `yaml:"bw_limit_file"` diff --git a/vendor/github.com/rclone/rclone/fs/fs.go b/vendor/github.com/rclone/rclone/fs/fs.go index f91192261c..2d795c28a8 100644 --- a/vendor/github.com/rclone/rclone/fs/fs.go +++ b/vendor/github.com/rclone/rclone/fs/fs.go @@ -1045,6 +1045,29 @@ type ListRer interface { ListR(ctx context.Context, dir string, callback ListRCallback) error } +// ListCBCallback defines a callback function for ListCB to use. +// +// It is called for each tranche of entries read from the listing and +// if it returns an error, the listing stops. +type ListCBCallback func(entry DirEntries) error + +// ListCBer extends Fs with ListCB. +type ListCBer interface { + Fs + // ListCB calls callback on directory entries as they are being listed. + // + // dir should be "" to start from the root, and should not + // have trailing slashes. + // + // This should return ErrDirNotFound if the directory isn't found. + // + // It should call callback for each tranche of entries read. + // These need not be returned in any particular order. If + // callback returns an error then the listing will stop + // immediately. + ListCB(ctx context.Context, dir string, callback ListCBCallback) error +} + // RangeSeeker is the interface that wraps the RangeSeek method. // // Some of the returns from Object.Open() may optionally implement @@ -1187,7 +1210,7 @@ func Find(name string) (*RegInfo, error) { // MustFind looks for an Info object for the type name passed in // -// Services are looked up in the config file +// # Services are looked up in the config file // // Exits with a fatal error if not found func MustFind(name string) *RegInfo { diff --git a/vendor/github.com/rclone/rclone/fs/list/list.go b/vendor/github.com/rclone/rclone/fs/list/list.go index dfa8b688d4..38aca0cd92 100644 --- a/vendor/github.com/rclone/rclone/fs/list/list.go +++ b/vendor/github.com/rclone/rclone/fs/list/list.go @@ -36,6 +36,37 @@ func DirSorted(ctx context.Context, f fs.Fs, includeAll bool, dir string) (entri return filterAndSortDir(ctx, entries, includeAll, dir, fi.IncludeObject, fi.IncludeDirectory(ctx, f)) } +// Func is copied from imports github.com/rclone/rclone/fs/walk +// in order to avoid import cycle. +type Func func(path string, entries fs.DirEntries, err error) error + +// DirCBFunc is the type of DirCB function. +type DirCBFunc func(ctx context.Context, fs fs.ListCBer, includeAll bool, dir string, cb Func) error + +// DirCB works like DirSorted but uses ListCB instead of List for file listing. +func DirCB(ctx context.Context, f fs.ListCBer, includeAll bool, dir string, cb Func) error { + fi := filter.GetConfig(ctx) + // Get unfiltered entries from the fs + return f.ListCB(ctx, dir, func(entries fs.DirEntries) error { + // This should happen only if exclude files lives in the + // starting directory, otherwise ListDirSorted should not be + // called. + if !includeAll && fi.ListContainsExcludeFile(entries) { + fs.Debugf(dir, "Excluded") + return nil + } + var err error + entries, err = filterAndSortDir(ctx, entries, includeAll, dir, fi.IncludeObject, fi.IncludeDirectory(ctx, f)) + if err != nil { + return err + } + if len(entries) > 0 { + return cb(dir, entries, nil) + } + return nil + }) +} + // filter (if required) and check the entries, then sort them func filterAndSortDir(ctx context.Context, entries fs.DirEntries, includeAll bool, dir string, IncludeObject func(ctx context.Context, o fs.Object) bool, diff --git a/vendor/github.com/rclone/rclone/fs/walk/walk.go b/vendor/github.com/rclone/rclone/fs/walk/walk.go index b731fcf5d7..f9a23eef7a 100644 --- a/vendor/github.com/rclone/rclone/fs/walk/walk.go +++ b/vendor/github.com/rclone/rclone/fs/walk/walk.go @@ -49,7 +49,7 @@ type Func func(path string, entries fs.DirEntries, err error) error // Note that fn will not be called concurrently whereas the directory // listing will proceed concurrently. // -// Parent directories are always listed before their children +// # Parent directories are always listed before their children // // This is implemented by WalkR if Config.UseListR is true // and f supports it and level > 1, or WalkN otherwise. @@ -68,6 +68,9 @@ func Walk(ctx context.Context, f fs.Fs, path string, includeAll bool, maxLevel i if (maxLevel < 0 || maxLevel > 1) && ci.UseListR && f.Features().ListR != nil { return walkListR(ctx, f, path, includeAll, maxLevel, fn) } + if fcb, ok := f.(fs.ListCBer); ci.UseListCB && ok { + return walkCB(ctx, fcb, path, includeAll, maxLevel, fn, list.DirCB) + } return walkListDirSorted(ctx, f, path, includeAll, maxLevel, fn) } @@ -446,6 +449,106 @@ func walk(ctx context.Context, f fs.Fs, path string, includeAll bool, maxLevel i close(in) wg.Wait() close(errs) + close(errs) + // return the first error returned or nil + return <-errs +} + +func walkCB(ctx context.Context, f fs.ListCBer, path string, includeAll bool, maxLevel int, fn Func, listDir list.DirCBFunc) error { + var ( + wg sync.WaitGroup // sync closing of go routines + traversing sync.WaitGroup // running directory traversals + doClose sync.Once // close the channel once + mu sync.Mutex // stop fn being called concurrently + ci = fs.GetConfig(ctx) // current config + ) + // listJob describe a directory listing that needs to be done + type listJob struct { + remote string + depth int + } + + in := make(chan listJob, ci.Checkers) + errs := make(chan error, 1) + quit := make(chan struct{}) + closeQuit := func() { + doClose.Do(func() { + close(quit) + go func() { + for range in { + traversing.Done() + } + }() + }) + } + for i := 0; i < ci.Checkers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case job, ok := <-in: + if !ok { + return + } + // Diff from walk + // Start + var jobs []listJob + err := listDir(ctx, f, includeAll, job.remote, func(path string, entries fs.DirEntries, err error) error { + if err == nil && job.depth != 0 { + entries.ForDir(func(dir fs.Directory) { + // Recurse for the directory + jobs = append(jobs, listJob{ + remote: dir.Remote(), + depth: job.depth - 1, + }) + }) + } + mu.Lock() + defer mu.Unlock() + return fn(path, entries, err) + }) + // End + // NB once we have passed entries to fn we mustn't touch it again + if err != nil && err != ErrorSkipDir { + traversing.Done() + err = fs.CountError(err) + fs.Errorf(job.remote, "error listing: %v", err) + closeQuit() + // Send error to error channel if space + select { + case errs <- err: + default: + } + continue + } + if err == nil && len(jobs) > 0 { + traversing.Add(len(jobs)) + go func() { + // Now we have traversed this directory, send these + // jobs off for traversal in the background + for _, newJob := range jobs { + in <- newJob + } + }() + } + traversing.Done() + case <-quit: + return + } + } + }() + } + // Start the process + traversing.Add(1) + in <- listJob{ + remote: path, + depth: maxLevel - 1, + } + traversing.Wait() + close(in) + wg.Wait() + close(errs) // return the first error returned or nil return <-errs } diff --git a/vendor/modules.txt b/vendor/modules.txt index 992bd5d1df..d36402eb15 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -313,7 +313,7 @@ github.com/prometheus/common/model github.com/prometheus/procfs github.com/prometheus/procfs/internal/fs github.com/prometheus/procfs/internal/util -# github.com/rclone/rclone v1.51.0 => github.com/scylladb/rclone v1.54.1-0.20240312172628-afe1fd2aa65e +# github.com/rclone/rclone v1.51.0 => github.com/scylladb/rclone v1.54.1-0.20241202112750-92c79d296b68 ## explicit; go 1.21 github.com/rclone/rclone/backend/azureblob github.com/rclone/rclone/backend/crypt @@ -758,4 +758,4 @@ gopkg.in/yaml.v2 ## explicit gopkg.in/yaml.v3 # github.com/gocql/gocql => github.com/scylladb/gocql v1.12.0 -# github.com/rclone/rclone => github.com/scylladb/rclone v1.54.1-0.20240312172628-afe1fd2aa65e +# github.com/rclone/rclone => github.com/scylladb/rclone v1.54.1-0.20241202112750-92c79d296b68