Skip to content

Commit

Permalink
feat(rcserver): don't rely on cached bandwidth limit in configguard (#…
Browse files Browse the repository at this point in the history
…4120)

* fix(rcserver): don't rely on cached bandwidth limit in configguard

Fixes #4119

* refactor(backup): remove unnecessary calls to RcloneSetBandwidthLimit

Setting bandwidth limit should be handled by specific Rclone calls.

(cherry picked from commit d9dde35)
  • Loading branch information
Michal-Leszczynski committed Dec 11, 2024
1 parent b8aa485 commit b359f52
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 26 deletions.
79 changes: 66 additions & 13 deletions pkg/rclone/rcserver/rcconfigguard.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package rcserver

import (
"context"
"slices"
"sync"

"github.com/pkg/errors"
Expand All @@ -26,16 +27,12 @@ type configGuard struct {
initialized atomic.Bool

defaultTransfers int
transfers int
bandwidthLimit string
}

func (cg *configGuard) init() {
if cg.initialized.CompareAndSwap(false, true) {
defaultTransfers := fs.GetConfig(context.Background()).Transfers
cg.defaultTransfers = defaultTransfers
cg.transfers = defaultTransfers
cg.bandwidthLimit = ""
}
}

Expand All @@ -55,15 +52,11 @@ func SetTransfers(transfers int) error {
}
// Returns global config
ci := fs.GetConfig(context.Background())
if transfers == globalConfigGuard.transfers {
// Safety check in case configguard is not in sync with global config
if transfers == ci.Transfers {
// Transfers are already set to specified value
return nil
}
if transfers == ci.Transfers {
// Transfers are already set to specified value
return nil
}

globalConfigGuard.transfers = transfers
ci.Transfers = transfers
// The amount of transfers impacts fs.Fs initialization (e.g. pool.Pool and fs.Pacer),
// so fs.Fs cache should be cleared on transfers count change.
Expand All @@ -77,16 +70,76 @@ func SetBandwidthLimit(limit string) error {
defer globalConfigGuard.mu.Unlock()
globalConfigGuard.init()

if limit == globalConfigGuard.bandwidthLimit {
currLimit, err := getBandwidthLimit()
if err != nil {
return err
}
eq, err := equalBandwidths(limit, currLimit)
if err != nil {
return err
}
if eq {
// Bandwidth limit is already set to specified value
return nil
}

in := rc.Params{
"rate": limit,
}
_, err := rcCalls.Get("core/bwlimit").Fn(context.Background(), in)
// Uses *tokenBucket.rcBwlimit method
_, err = rcCalls.Get("core/bwlimit").Fn(context.Background(), in)
if err != nil {
return errors.Wrapf(err, "set bandwidth to %s", limit)
}
return nil
}

func getBandwidthLimit() (string, error) {
// Uses *tokenBucket.rcBwlimit method
out, err := rcCalls.Get("core/bwlimit").Fn(context.Background(), make(rc.Params))
if err != nil {
return "", errors.Wrap(err, "get bandwidth")
}
limit, err := out.GetString("rate")
if err != nil {
return "", errors.Wrap(err, "parse current bandwidth")
}
return limit, err
}

func equalBandwidths(limit1, limit2 string) (bool, error) {
bws1, err := parseBandwidth(limit1)
if err != nil {
return false, err
}
bws2, err := parseBandwidth(limit2)
if err != nil {
return false, err
}
return slices.EqualFunc(bws1, bws2, func(s1 fs.BwTimeSlot, s2 fs.BwTimeSlot) bool {
if s1.HHMM != s2.HHMM || s1.DayOfTheWeek != s2.DayOfTheWeek {
return false
}
// Unlimited bandwidth can be described by any number <= 0
if s1.Bandwidth.Tx > 0 || s2.Bandwidth.Tx > 0 {
if s1.Bandwidth.Tx != s2.Bandwidth.Tx {
return false
}
}
if s1.Bandwidth.Rx > 0 || s2.Bandwidth.Rx > 0 {
if s1.Bandwidth.Rx != s2.Bandwidth.Rx {
return false
}
}
return true
}), nil
}

func parseBandwidth(limit string) (fs.BwTimetable, error) {
var bws fs.BwTimetable
err := bws.Set(limit)
if err != nil {
return nil, errors.Wrapf(err, "parse limit: %s", limit)
}
return bws, nil
}
5 changes: 5 additions & 0 deletions pkg/scyllaclient/client_rclone.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ import (
// transfers performed under current client session.
// Limit is expressed in MiB per second.
// To turn off limitation set it to 0.
//
// Note that it's safer to set bandwidth limit as a parameter of given Rclone call
// (e.g. RcloneMoveDir or RcloneCopyPaths) as it's resistant to agent restarts
// or some other process modifying Rclone bandwidth limit in the meantime.
// Because of that, this method should be used in tests only.
func (c *Client) RcloneSetBandwidthLimit(ctx context.Context, host string, limit int) error {
p := operations.CoreBwlimitParams{
Context: forceHost(ctx, host),
Expand Down
4 changes: 0 additions & 4 deletions pkg/service/backup/worker_deduplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@ func (w *worker) deduplicateHost(ctx context.Context, h hostInfo) error {
}(w.hostSnapshotDirs(h))
}

if err := w.setRateLimit(ctx, h); err != nil {
return errors.Wrap(err, "set rate limit")
}

dirs := w.hostSnapshotDirs(h)
f := func(i int) (err error) {
d := &dirs[i]
Expand Down
9 changes: 0 additions & 9 deletions pkg/service/backup/worker_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ func (w *worker) Upload(ctx context.Context, hosts []hostInfo, limits []DCLimit)
}

func (w *worker) uploadHost(ctx context.Context, h hostInfo) error {
if err := w.setRateLimit(ctx, h); err != nil {
return errors.Wrap(err, "set rate limit")
}

dirs := w.hostSnapshotDirs(h)

f := func(i int) (err error) {
Expand Down Expand Up @@ -151,11 +147,6 @@ func (w *worker) snapshotJobID(ctx context.Context, d snapshotDir) int64 {
return 0
}

func (w *worker) setRateLimit(ctx context.Context, h hostInfo) error {
w.Logger.Info(ctx, "Setting rate limit", "host", h.IP, "limit", h.RateLimit.Limit)
return w.Client.RcloneSetBandwidthLimit(ctx, h.IP, h.RateLimit.Limit)
}

func (w *worker) uploadSnapshotDir(ctx context.Context, h hostInfo, d snapshotDir) error {
w.Logger.Info(ctx, "Uploading table snapshot",
"host", h.IP,
Expand Down

0 comments on commit b359f52

Please sign in to comment.