diff --git a/pkg/service/backup/model.go b/pkg/service/backup/model.go index 4f23b47c0..42f540ab9 100644 --- a/pkg/service/backup/model.go +++ b/pkg/service/backup/model.go @@ -3,17 +3,21 @@ package backup import ( + "context" "encoding/json" "fmt" "reflect" "regexp" + "slices" "strconv" + "strings" "time" "github.com/gocql/gocql" "github.com/pkg/errors" "github.com/scylladb/go-set/strset" "github.com/scylladb/gocqlx/v2" + "github.com/scylladb/scylla-manager/v3/pkg/util/inexlist/ksfilter" "go.uber.org/multierr" "github.com/scylladb/scylla-manager/v3/pkg/scyllaclient" @@ -251,6 +255,140 @@ type taskProperties struct { PurgeOnly bool `json:"purge_only"` } +func (p taskProperties) validate(dcs []string, dcMap map[string][]string) error { + if p.Location == nil { + return errors.Errorf("missing location") + } + if policy := p.extractRetention(); policy.Retention < 0 || policy.RetentionDays < 0 { + return errors.New("negative retention") + } + + // Validate location DCs + if err := checkDCs(func(i int) (string, string) { return p.Location[i].DC, p.Location[i].String() }, len(p.Location), dcMap); err != nil { + return errors.Wrap(err, "invalid location") + } + // Validate rate limit DCs + if err := checkDCs(dcLimitDCAtPos(p.RateLimit), len(p.RateLimit), dcMap); err != nil { + return errors.Wrap(err, "invalid rate-limit") + } + // Validate upload parallel DCs + if err := checkDCs(dcLimitDCAtPos(p.SnapshotParallel), len(p.SnapshotParallel), dcMap); err != nil { + return errors.Wrap(err, "invalid snapshot-parallel") + } + // Validate snapshot parallel DCs + if err := checkDCs(dcLimitDCAtPos(p.UploadParallel), len(p.UploadParallel), dcMap); err != nil { + return errors.Wrap(err, "invalid upload-parallel") + } + // Validate all DCs have backup location + if err := checkAllDCsCovered(filterDCLocations(p.Location, dcs), dcs); err != nil { + return errors.Wrap(err, "invalid location") + } + return nil +} + +func (p taskProperties) toTarget(ctx context.Context, client *scyllaclient.Client, dcs []string, + liveNodes scyllaclient.NodeStatusInfoSlice, filters []tabFilter, validators []tabValidator, +) (Target, error) { + policy := p.extractRetention() + rateLimit := filterDCLimits(p.RateLimit, dcs) + if len(rateLimit) == 0 { + rateLimit = []DCLimit{{Limit: defaultRateLimit}} + } + + units, err := p.createUnits(ctx, client, filters, validators) + if err != nil { + return Target{}, errors.Wrap(err, "create units") + } + + return Target{ + Units: units, + DC: dcs, + Location: filterDCLocations(p.Location, dcs), + Retention: policy.Retention, + RetentionDays: policy.RetentionDays, + RetentionMap: p.RetentionMap, + RateLimit: rateLimit, + SnapshotParallel: filterDCLimits(p.SnapshotParallel, dcs), + UploadParallel: filterDCLimits(p.UploadParallel, dcs), + Continue: p.Continue, + PurgeOnly: p.PurgeOnly, + liveNodes: liveNodes, + }, nil +} + +func (p taskProperties) createUnits(ctx context.Context, client *scyllaclient.Client, + filters []tabFilter, validators []tabValidator, +) ([]Unit, error) { + keyspaces, err := client.Keyspaces(ctx) + if err != nil { + return nil, errors.Wrapf(err, "get keyspaces") + } + rd := scyllaclient.NewRingDescriber(ctx, client) + + var units []Unit + for _, ks := range keyspaces { + tables, err := client.Tables(ctx, ks) + if err != nil { + return nil, errors.Wrapf(err, "keyspace %s: get tables", ks) + } + + var filteredTables []string + for _, tab := range tables { + // Always backup system_schema. + // Before Scylla 6.0, schema had to be restored from sstables + // (output of DESC SCHEMA was missing important information like dropped columns). + // Starting from Scylla 6.0, schema has to be restored from output of DESC SCHEMA WITH INTERNALS + // (restoring sstables doesn't play well with raft). + // system_schema sstables are still always backed up - just in case. + if ks == systemSchema { + filteredTables = append(filteredTables, tab) + continue + } + + ring, err := rd.DescribeRing(ctx, ks, tab) + if err != nil { + return nil, errors.Wrap(err, "describe ring") + } + + // Apply filters + skip := false + for _, f := range filters { + if !f.filter(ks, tab, ring) { + skip = true + } + } + if skip { + continue + } + + // Apply validators + for _, v := range validators { + if err := v.validate(ks, tab, ring); err != nil { + return nil, err + } + } + + filteredTables = append(filteredTables, tab) + } + + if len(filteredTables) > 0 { + units = append(units, Unit{ + Keyspace: ks, + Tables: filteredTables, + AllTables: len(filteredTables) == len(tables), + }) + } + } + + // Validate that any keyspace except for system_schema is going to be backed up + if len(units) < 2 { + return nil, errors.New("no keyspace matched criteria") + } + + sortUnits(units) + return units, nil +} + func (p taskProperties) extractRetention() RetentionPolicy { if p.Retention == nil && p.RetentionDays == nil { return defaultRetention() @@ -328,3 +466,88 @@ func ExtractRetention(properties json.RawMessage) (RetentionPolicy, error) { } return p.extractRetention(), nil } + +// tabFilter checks if table should be backed. +type tabFilter interface { + filter(ks, tab string, ring scyllaclient.Ring) bool +} + +// Filters tables according to '--keyspace' flag. +type patternFilter struct { + pattern *ksfilter.Filter +} + +func (f patternFilter) filter(ks, tab string, _ scyllaclient.Ring) bool { + return f.pattern.Check(ks, tab) +} + +// Filters out tables not replicated in backed up dcs. +type dcFilter struct { + dcs *strset.Set +} + +func (f dcFilter) filter(_, _ string, ring scyllaclient.Ring) bool { + return f.dcs.HasAny(ring.Datacenters()...) +} + +// Filters out tables containing local data, as they shouldn't be restored. +type localDataFilter struct{} + +func (f localDataFilter) filter(_, _ string, ring scyllaclient.Ring) bool { + return ring.Replication != scyllaclient.LocalStrategy +} + +// tableValidator checks if it's safe to back up table. +type tabValidator interface { + validate(ks, tab string, ring scyllaclient.Ring) error +} + +// Validates that each token range is owned by at least one live backed up node. +// Otherwise, corresponding data wouldn't be included in the backup. +type tokenRangesValidator struct { + liveNodes *strset.Set + dcs *strset.Set +} + +func (v tokenRangesValidator) validate(ks, tab string, ring scyllaclient.Ring) error { + // Skip validation for SimpleStrategy when all hosts from backed up dcs are live + // in order to preserve backward compatibility (#3922). + if ring.Replication == scyllaclient.SimpleStrategy { + missingHost := false + for h, dc := range ring.HostDC { + if v.dcs.Has(dc) && !v.liveNodes.Has(h) { + missingHost = true + break + } + } + if !missingHost { + return nil + } + } + + for _, rt := range ring.ReplicaTokens { + if !v.liveNodes.HasAny(rt.ReplicaSet...) { + return errors.Errorf("%s.%s: the whole replica set %v is filtered out, so the data owned by it can't be backed up", ks, tab, rt.ReplicaSet) + } + } + return nil +} + +func sortUnits(units []Unit) { + slices.SortFunc(units, func(a, b Unit) int { + l := strings.HasPrefix(a.Keyspace, "system") + r := strings.HasPrefix(b.Keyspace, "system") + // Put system_schema at the end as schema mustn't be older than snapshot-ed data. + switch { + case b.Keyspace == systemSchema || l && !r: + return -1 + case a.Keyspace == systemSchema || !l && r: + return 1 + default: + if a.Keyspace < b.Keyspace { + return -1 + } + return 1 + } + }) +} diff --git a/pkg/service/backup/service.go b/pkg/service/backup/service.go index 0705d2a99..1583f35f3 100644 --- a/pkg/service/backup/service.go +++ b/pkg/service/backup/service.go @@ -117,187 +117,58 @@ func (s *Service) GetTarget(ctx context.Context, clusterID uuid.UUID, properties s.logger.Info(ctx, "Generating backup target", "cluster_id", clusterID) p := defaultTaskProperties() - t := Target{} - if err := json.Unmarshal(properties, &p); err != nil { - return t, service.ErrValidate(err) - } - - if p.Location == nil { - return t, errors.Errorf("missing location") + return Target{}, service.ErrValidate(err) } client, err := s.scyllaClient(ctx, clusterID) if err != nil { - return t, errors.Wrapf(err, "get client") + return Target{}, errors.Wrapf(err, "get client") } - // Get hosts in DCs dcMap, err := client.Datacenters(ctx) if err != nil { - return t, errors.Wrap(err, "read datacenters") - } - - // Validate location DCs - if err := checkDCs(func(i int) (string, string) { return p.Location[i].DC, p.Location[i].String() }, len(p.Location), dcMap); err != nil { - return t, errors.Wrap(err, "invalid location") - } - - // Validate rate limit DCs - if err := checkDCs(dcLimitDCAtPos(p.RateLimit), len(p.RateLimit), dcMap); err != nil { - return t, errors.Wrap(err, "invalid rate-limit") - } - - // Validate upload parallel DCs - if err := checkDCs(dcLimitDCAtPos(p.SnapshotParallel), len(p.SnapshotParallel), dcMap); err != nil { - return t, errors.Wrap(err, "invalid snapshot-parallel") - } - - // Validate snapshot parallel DCs - if err := checkDCs(dcLimitDCAtPos(p.UploadParallel), len(p.UploadParallel), dcMap); err != nil { - return t, errors.Wrap(err, "invalid upload-parallel") - } - - // Copy retention policy - policy := p.extractRetention() - t.Retention = policy.Retention - t.RetentionDays = policy.RetentionDays - if policy.Retention < 0 { - return t, errors.New("negative retention") + return Target{}, errors.Wrap(err, "read datacenters") } - if policy.RetentionDays < 0 { - return t, errors.New("negative retention days") - } - - // Copy simple properties - t.RetentionMap = p.RetentionMap - t.Continue = p.Continue - t.PurgeOnly = p.PurgeOnly - - // Filter DCs - if t.DC, err = dcfilter.Apply(dcMap, p.DC); err != nil { - return t, err - } - - // Filter out properties of not used DCs - t.Location = filterDCLocations(p.Location, t.DC) - t.RateLimit = filterDCLimits(p.RateLimit, t.DC) - if len(t.RateLimit) == 0 { - t.RateLimit = []DCLimit{{Limit: defaultRateLimit}} - } - t.SnapshotParallel = filterDCLimits(p.SnapshotParallel, t.DC) - t.UploadParallel = filterDCLimits(p.UploadParallel, t.DC) - - if err := checkAllDCsCovered(t.Location, t.DC); err != nil { - return t, errors.Wrap(err, "invalid location") - } - - // Get live nodes - t.liveNodes, err = s.getLiveNodes(ctx, client, t.DC) + dcs, err := dcfilter.Apply(dcMap, p.DC) if err != nil { - return t, err + return Target{}, err } - targetDCs := strset.New(t.DC...) - - // Filter keyspaces - f, err := ksfilter.NewFilter(p.Keyspace) - if err != nil { - return t, err + if err := p.validate(dcs, dcMap); err != nil { + return Target{}, err } - keyspaces, err := client.Keyspaces(ctx) + liveNodes, err := s.getLiveNodes(ctx, client, dcs) if err != nil { - return t, errors.Wrapf(err, "read keyspaces") - } - - // Always backup system_schema. - // - // Some schema changes, like dropping columns, are applied lazily to - // sstables during compaction. Information about those schema changes is - // recorded in the system schema tables, but not in the output of "desc schema". - // Using output of "desc schema" is not enough to restore all schema changes. - // As a result, writes in sstables may be incorrectly interpreted. - // For example, writes of deleted columns which were later recreated may be - // resurrected. - systemSchemaUnit := Unit{ - Keyspace: systemSchema, - // Tables are added later - AllTables: true, - } - - ringDescriber := scyllaclient.NewRingDescriber(ctx, client) - liveNodes := strset.New(t.liveNodes.Hosts()...) - for _, keyspace := range keyspaces { - tables, err := client.Tables(ctx, keyspace) - if err != nil { - return t, errors.Wrapf(err, "keyspace %s: get tables", keyspace) - } - - var filteredTables []string - for _, tab := range tables { - if !f.Check(keyspace, tab) { - continue - } - // Get the ring description and skip local data - ring, err := ringDescriber.DescribeRing(ctx, keyspace, tab) - if err != nil { - return t, errors.Wrapf(err, "%s.%s: get ring description", keyspace, tab) - } - if ring.Replication == scyllaclient.LocalStrategy { - if strings.HasPrefix(keyspace, "system") && keyspace != "system_schema" { - continue - } - } else { - // Check if keyspace has replica in any DC - if !targetDCs.HasAny(ring.Datacenters()...) { - continue - } - for _, rt := range ring.ReplicaTokens { - if !liveNodes.HasAny(rt.ReplicaSet...) { - return t, errors.Errorf("the whole replica set %v of %s.%s is down", rt.ReplicaSet, keyspace, tab) - } - } - } - - // Do not filter system_schema - if keyspace == systemSchema { - systemSchemaUnit.Tables = append(systemSchemaUnit.Tables, tab) - } else { - filteredTables = append(filteredTables, tab) - } - } - - if len(filteredTables) > 0 { - f.Add(keyspace, filteredTables) + return Target{}, err + } + if err := s.checkLocationsAvailableFromNodes(ctx, client, liveNodes, p.Location); err != nil { + if strings.Contains(err.Error(), "NoSuchBucket") { + return Target{}, errors.New("specified bucket does not exist") } + return Target{}, errors.Wrap(err, "location is not accessible") } - // Get the filtered units - v, err := f.Apply(false) + f, err := ksfilter.NewFilter(p.Keyspace) if err != nil { - return t, err + return Target{}, err } - // Copy units and add system_schema by the end. - for _, u := range v { - t.Units = append(t.Units, Unit{ - Keyspace: u.Keyspace, - Tables: u.Tables, - AllTables: u.AllTables, - }) + filters := []tabFilter{ + patternFilter{pattern: f}, + dcFilter{dcs: strset.New(dcs...)}, + localDataFilter{}, } - t.Units = append(t.Units, systemSchemaUnit) - // Validate locations access - if err := s.checkLocationsAvailableFromNodes(ctx, client, t.liveNodes, t.Location); err != nil { - if strings.Contains(err.Error(), "NoSuchBucket") { - return t, errors.New("specified bucket does not exist") - } - return t, errors.Wrap(err, "location is not accessible") + validators := []tabValidator{ + tokenRangesValidator{ + liveNodes: strset.New(liveNodes.Hosts()...), + dcs: strset.New(dcs...), + }, } - return t, nil + return p.toTarget(ctx, client, dcs, liveNodes, filters, validators) } // getLiveNodes returns live nodes of specified datacenters.