diff --git a/pkg/rclone/rcserver/rcconfigguard.go b/pkg/rclone/rcserver/rcconfigguard.go index 7cb7c7b12..e9eb4a843 100644 --- a/pkg/rclone/rcserver/rcconfigguard.go +++ b/pkg/rclone/rcserver/rcconfigguard.go @@ -4,6 +4,7 @@ package rcserver import ( "context" + "slices" "sync" "github.com/pkg/errors" @@ -26,16 +27,12 @@ type configGuard struct { initialized atomic.Bool defaultTransfers int - transfers int - bandwidthLimit string } func (cg *configGuard) init() { if cg.initialized.CompareAndSwap(false, true) { defaultTransfers := fs.GetConfig(context.Background()).Transfers cg.defaultTransfers = defaultTransfers - cg.transfers = defaultTransfers - cg.bandwidthLimit = "" } } @@ -55,15 +52,11 @@ func SetTransfers(transfers int) error { } // Returns global config ci := fs.GetConfig(context.Background()) - if transfers == globalConfigGuard.transfers { - // Safety check in case configguard is not in sync with global config - if transfers == ci.Transfers { - // Transfers are already set to specified value - return nil - } + if transfers == ci.Transfers { + // Transfers are already set to specified value + return nil } - globalConfigGuard.transfers = transfers ci.Transfers = transfers // The amount of transfers impacts fs.Fs initialization (e.g. pool.Pool and fs.Pacer), // so fs.Fs cache should be cleared on transfers count change. @@ -77,16 +70,76 @@ func SetBandwidthLimit(limit string) error { defer globalConfigGuard.mu.Unlock() globalConfigGuard.init() - if limit == globalConfigGuard.bandwidthLimit { + currLimit, err := getBandwidthLimit() + if err != nil { + return err + } + eq, err := equalBandwidths(limit, currLimit) + if err != nil { + return err + } + if eq { + // Bandwidth limit is already set to specified value return nil } in := rc.Params{ "rate": limit, } - _, err := rcCalls.Get("core/bwlimit").Fn(context.Background(), in) + // Uses *tokenBucket.rcBwlimit method + _, err = rcCalls.Get("core/bwlimit").Fn(context.Background(), in) if err != nil { return errors.Wrapf(err, "set bandwidth to %s", limit) } return nil } + +func getBandwidthLimit() (string, error) { + // Uses *tokenBucket.rcBwlimit method + out, err := rcCalls.Get("core/bwlimit").Fn(context.Background(), make(rc.Params)) + if err != nil { + return "", errors.Wrap(err, "get bandwidth") + } + limit, err := out.GetString("rate") + if err != nil { + return "", errors.Wrap(err, "parse current bandwidth") + } + return limit, err +} + +func equalBandwidths(limit1, limit2 string) (bool, error) { + bws1, err := parseBandwidth(limit1) + if err != nil { + return false, err + } + bws2, err := parseBandwidth(limit2) + if err != nil { + return false, err + } + return slices.EqualFunc(bws1, bws2, func(s1 fs.BwTimeSlot, s2 fs.BwTimeSlot) bool { + if s1.HHMM != s2.HHMM || s1.DayOfTheWeek != s2.DayOfTheWeek { + return false + } + // Unlimited bandwidth can be described by any number <= 0 + if s1.Bandwidth.Tx > 0 || s2.Bandwidth.Tx > 0 { + if s1.Bandwidth.Tx != s2.Bandwidth.Tx { + return false + } + } + if s1.Bandwidth.Rx > 0 || s2.Bandwidth.Rx > 0 { + if s1.Bandwidth.Rx != s2.Bandwidth.Rx { + return false + } + } + return true + }), nil +} + +func parseBandwidth(limit string) (fs.BwTimetable, error) { + var bws fs.BwTimetable + err := bws.Set(limit) + if err != nil { + return nil, errors.Wrapf(err, "parse limit: %s", limit) + } + return bws, nil +} diff --git a/pkg/scyllaclient/client_rclone.go b/pkg/scyllaclient/client_rclone.go index 8b8ae6854..c8750ff84 100644 --- a/pkg/scyllaclient/client_rclone.go +++ b/pkg/scyllaclient/client_rclone.go @@ -24,6 +24,11 @@ import ( // transfers performed under current client session. // Limit is expressed in MiB per second. // To turn off limitation set it to 0. +// +// Note that it's safer to set bandwidth limit as a parameter of given Rclone call +// (e.g. RcloneMoveDir or RcloneCopyPaths) as it's resistant to agent restarts +// or some other process modifying Rclone bandwidth limit in the meantime. +// Because of that, this method should be used in tests only. func (c *Client) RcloneSetBandwidthLimit(ctx context.Context, host string, limit int) error { p := operations.CoreBwlimitParams{ Context: forceHost(ctx, host), diff --git a/pkg/service/backup/worker_deduplicate.go b/pkg/service/backup/worker_deduplicate.go index 82cdf59ec..e3b5fdc2a 100644 --- a/pkg/service/backup/worker_deduplicate.go +++ b/pkg/service/backup/worker_deduplicate.go @@ -56,10 +56,6 @@ func (w *worker) deduplicateHost(ctx context.Context, h hostInfo) error { }(w.hostSnapshotDirs(h)) } - if err := w.setRateLimit(ctx, h); err != nil { - return errors.Wrap(err, "set rate limit") - } - dirs := w.hostSnapshotDirs(h) f := func(i int) (err error) { d := &dirs[i] diff --git a/pkg/service/backup/worker_upload.go b/pkg/service/backup/worker_upload.go index d57a57c07..23a76955f 100644 --- a/pkg/service/backup/worker_upload.go +++ b/pkg/service/backup/worker_upload.go @@ -32,10 +32,6 @@ func (w *worker) Upload(ctx context.Context, hosts []hostInfo, limits []DCLimit) } func (w *worker) uploadHost(ctx context.Context, h hostInfo) error { - if err := w.setRateLimit(ctx, h); err != nil { - return errors.Wrap(err, "set rate limit") - } - dirs := w.hostSnapshotDirs(h) f := func(i int) (err error) { @@ -151,11 +147,6 @@ func (w *worker) snapshotJobID(ctx context.Context, d snapshotDir) int64 { return 0 } -func (w *worker) setRateLimit(ctx context.Context, h hostInfo) error { - w.Logger.Info(ctx, "Setting rate limit", "host", h.IP, "limit", h.RateLimit.Limit) - return w.Client.RcloneSetBandwidthLimit(ctx, h.IP, h.RateLimit.Limit) -} - func (w *worker) uploadSnapshotDir(ctx context.Context, h hostInfo, d snapshotDir) error { w.Logger.Info(ctx, "Uploading table snapshot", "host", h.IP,