Skip to content

Commit

Permalink
refactor(backup): move validation and unit creation from GetTarget to…
Browse files Browse the repository at this point in the history
… dedicated methods
  • Loading branch information
Michal-Leszczynski committed Jul 11, 2024
1 parent 611675e commit 71356e8
Show file tree
Hide file tree
Showing 2 changed files with 249 additions and 155 deletions.
223 changes: 223 additions & 0 deletions pkg/service/backup/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,21 @@
package backup

import (
"context"
"encoding/json"
"fmt"
"reflect"
"regexp"
"slices"
"strconv"
"strings"
"time"

"github.com/gocql/gocql"
"github.com/pkg/errors"
"github.com/scylladb/go-set/strset"
"github.com/scylladb/gocqlx/v2"
"github.com/scylladb/scylla-manager/v3/pkg/util/inexlist/ksfilter"
"go.uber.org/multierr"

"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
Expand Down Expand Up @@ -251,6 +255,140 @@ type taskProperties struct {
PurgeOnly bool `json:"purge_only"`
}

func (p taskProperties) validate(dcs []string, dcMap map[string][]string) error {
if p.Location == nil {
return errors.Errorf("missing location")
}
if policy := p.extractRetention(); policy.Retention < 0 || policy.RetentionDays < 0 {
return errors.New("negative retention")
}

// Validate location DCs
if err := checkDCs(func(i int) (string, string) { return p.Location[i].DC, p.Location[i].String() }, len(p.Location), dcMap); err != nil {
return errors.Wrap(err, "invalid location")
}
// Validate rate limit DCs
if err := checkDCs(dcLimitDCAtPos(p.RateLimit), len(p.RateLimit), dcMap); err != nil {
return errors.Wrap(err, "invalid rate-limit")
}
// Validate upload parallel DCs
if err := checkDCs(dcLimitDCAtPos(p.SnapshotParallel), len(p.SnapshotParallel), dcMap); err != nil {
return errors.Wrap(err, "invalid snapshot-parallel")
}
// Validate snapshot parallel DCs
if err := checkDCs(dcLimitDCAtPos(p.UploadParallel), len(p.UploadParallel), dcMap); err != nil {
return errors.Wrap(err, "invalid upload-parallel")
}
// Validate all DCs have backup location
if err := checkAllDCsCovered(filterDCLocations(p.Location, dcs), dcs); err != nil {
return errors.Wrap(err, "invalid location")
}
return nil
}

func (p taskProperties) toTarget(ctx context.Context, client *scyllaclient.Client, dcs []string,
liveNodes scyllaclient.NodeStatusInfoSlice, filters []tabFilter, validators []tabValidator,
) (Target, error) {
policy := p.extractRetention()
rateLimit := filterDCLimits(p.RateLimit, dcs)
if len(rateLimit) == 0 {
rateLimit = []DCLimit{{Limit: defaultRateLimit}}
}

units, err := p.createUnits(ctx, client, filters, validators)
if err != nil {
return Target{}, errors.Wrap(err, "create units")
}

return Target{
Units: units,
DC: dcs,
Location: filterDCLocations(p.Location, dcs),
Retention: policy.Retention,
RetentionDays: policy.RetentionDays,
RetentionMap: p.RetentionMap,
RateLimit: rateLimit,
SnapshotParallel: filterDCLimits(p.SnapshotParallel, dcs),
UploadParallel: filterDCLimits(p.UploadParallel, dcs),
Continue: p.Continue,
PurgeOnly: p.PurgeOnly,
liveNodes: liveNodes,
}, nil
}

func (p taskProperties) createUnits(ctx context.Context, client *scyllaclient.Client,
filters []tabFilter, validators []tabValidator,
) ([]Unit, error) {
keyspaces, err := client.Keyspaces(ctx)
if err != nil {
return nil, errors.Wrapf(err, "get keyspaces")
}
rd := scyllaclient.NewRingDescriber(ctx, client)

var units []Unit
for _, ks := range keyspaces {
tables, err := client.Tables(ctx, ks)
if err != nil {
return nil, errors.Wrapf(err, "keyspace %s: get tables", ks)
}

var filteredTables []string
for _, tab := range tables {
// Always backup system_schema.
// Before Scylla 6.0, schema had to be restored from sstables
// (output of DESC SCHEMA was missing important information like dropped columns).
// Starting from Scylla 6.0, schema has to be restored from output of DESC SCHEMA WITH INTERNALS
// (restoring sstables doesn't play well with raft).
// system_schema sstables are still always backed up - just in case.
if ks == systemSchema {
filteredTables = append(filteredTables, tab)
continue
}

ring, err := rd.DescribeRing(ctx, ks, tab)
if err != nil {
return nil, errors.Wrap(err, "describe ring")
}

// Apply filters
skip := false
for _, f := range filters {
if !f.filter(ks, tab, ring) {
skip = true
}
}
if skip {
continue
}

// Apply validators
for _, v := range validators {
if err := v.validate(ks, tab, ring); err != nil {
return nil, err
}
}

filteredTables = append(filteredTables, tab)
}

if len(filteredTables) > 0 {
units = append(units, Unit{
Keyspace: ks,
Tables: filteredTables,
AllTables: len(filteredTables) == len(tables),
})
}
}

// Validate that any keyspace except for system_schema is going to be backed up
if len(units) < 2 {
return nil, errors.New("no keyspace matched criteria")
}

sortUnits(units)
return units, nil
}

func (p taskProperties) extractRetention() RetentionPolicy {
if p.Retention == nil && p.RetentionDays == nil {
return defaultRetention()
Expand Down Expand Up @@ -328,3 +466,88 @@ func ExtractRetention(properties json.RawMessage) (RetentionPolicy, error) {
}
return p.extractRetention(), nil
}

// tabFilter checks if table should be backed.
type tabFilter interface {
filter(ks, tab string, ring scyllaclient.Ring) bool
}

// Filters tables according to '--keyspace' flag.
type patternFilter struct {
pattern *ksfilter.Filter
}

func (f patternFilter) filter(ks, tab string, _ scyllaclient.Ring) bool {
return f.pattern.Check(ks, tab)
}

// Filters out tables not replicated in backed up dcs.
type dcFilter struct {
dcs *strset.Set
}

func (f dcFilter) filter(_, _ string, ring scyllaclient.Ring) bool {
return f.dcs.HasAny(ring.Datacenters()...)
}

// Filters out tables containing local data, as they shouldn't be restored.
type localDataFilter struct{}

func (f localDataFilter) filter(_, _ string, ring scyllaclient.Ring) bool {
return ring.Replication != scyllaclient.LocalStrategy
}

// tableValidator checks if it's safe to back up table.
type tabValidator interface {
validate(ks, tab string, ring scyllaclient.Ring) error
}

// Validates that each token range is owned by at least one live backed up node.
// Otherwise, corresponding data wouldn't be included in the backup.
type tokenRangesValidator struct {
liveNodes *strset.Set
dcs *strset.Set
}

func (v tokenRangesValidator) validate(ks, tab string, ring scyllaclient.Ring) error {
// Skip validation for SimpleStrategy when all hosts from backed up dcs are live
// in order to preserve backward compatibility (#3922).
if ring.Replication == scyllaclient.SimpleStrategy {
missingHost := false
for h, dc := range ring.HostDC {
if v.dcs.Has(dc) && !v.liveNodes.Has(h) {
missingHost = true
break
}
}
if !missingHost {
return nil
}
}

for _, rt := range ring.ReplicaTokens {
if !v.liveNodes.HasAny(rt.ReplicaSet...) {
return errors.Errorf("%s.%s: the whole replica set %v is filtered out, so the data owned by it can't be backed up", ks, tab, rt.ReplicaSet)
}
}
return nil
}

func sortUnits(units []Unit) {
slices.SortFunc(units, func(a, b Unit) int {
l := strings.HasPrefix(a.Keyspace, "system")
r := strings.HasPrefix(b.Keyspace, "system")
// Put system_schema at the end as schema mustn't be older than snapshot-ed data.
switch {
case b.Keyspace == systemSchema || l && !r:
return -1
case a.Keyspace == systemSchema || !l && r:
return 1
default:
if a.Keyspace < b.Keyspace {
return -1
}
return 1
}
})
}
Loading

0 comments on commit 71356e8

Please sign in to comment.