Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Avoid creation of workflows with non-empty tables in target keyspace #16874

Merged
merged 11 commits into from
Oct 18, 2024
75 changes: 75 additions & 0 deletions go/vt/vtctl/workflow/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ import (
"sync"
"time"

"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"

"vitess.io/vitess/go/ptr"
"vitess.io/vitess/go/sqlescape"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/textutil"
"vitess.io/vitess/go/vt/concurrency"
Expand All @@ -45,6 +49,7 @@ import (
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

const (
Expand All @@ -54,6 +59,7 @@ const (
// For automatically created sequence tables, use a standard format
// of tableName_seq.
autoSequenceTableFormat = "%s_seq"
getNonEmptyTableQuery = "select 1 from %s limit 1"
)

type materializer struct {
Expand Down Expand Up @@ -290,6 +296,15 @@ func (mz *materializer) deploySchema() error {
}
}

// Check if any table being moved is already non-empty in the target keyspace.
// Skip this check for multi-tenant migrations.
if !mz.IsMultiTenantMigration() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A nice future addition would be to check for any existing tenant data in each table on the target too.

err := mz.validateEmptyTables()
if err != nil {
return vterrors.Wrap(err, "failed to validate that all target tables are empty")
}
}

err := forAllShards(mz.targetShards, func(target *topo.ShardInfo) error {
allTables := []string{"/.*/"}

Expand Down Expand Up @@ -544,6 +559,66 @@ func (mz *materializer) buildMaterializer() error {
return nil
}

// validateEmptyTables checks if all tables are empty across all target shards.
// It queries each shard's primary tablet and if any non-empty table is found,
// returns an error containing a list of non-empty tables.
func (mz *materializer) validateEmptyTables() error {
var mu sync.Mutex
isNonEmptyTable := map[string]bool{}

err := forAllShards(mz.targetShards, func(shard *topo.ShardInfo) error {
primary := shard.PrimaryAlias
if primary == nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "no primary tablet found for shard %s/%s", shard.Keyspace(), shard.ShardName())
}

ti, err := mz.ts.GetTablet(mz.ctx, primary)
if err != nil {
return err
}

eg, groupCtx := errgroup.WithContext(mz.ctx)
eg.SetLimit(20)

for _, ts := range mz.ms.TableSettings {
eg.Go(func() error {
table, err := sqlescape.EnsureEscaped(ts.TargetTable)
if err != nil {
return err
}
query := fmt.Sprintf(getNonEmptyTableQuery, table)
res, err := mz.tmc.ExecuteFetchAsAllPrivs(groupCtx, ti.Tablet, &tabletmanagerdatapb.ExecuteFetchAsAllPrivsRequest{
Query: []byte(query),
MaxRows: 1,
})
// Ignore table not found error
if err != nil && !IsTableDidNotExistError(err) {
return err
}
if res != nil && len(res.Rows) > 0 {
mu.Lock()
isNonEmptyTable[ts.TargetTable] = true
mu.Unlock()
}
return nil
})
}
if err = eg.Wait(); err != nil {
return err
}
return nil
})
if err != nil {
return err
}

nonEmptyTables := maps.Keys(isNonEmptyTable)
if len(nonEmptyTables) > 0 {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "non-empty tables found in target keyspace(%s): %s", mz.ms.TargetKeyspace, strings.Join(nonEmptyTables, ", "))
}
return nil
}

func (mz *materializer) startStreams(ctx context.Context) error {
return forAllShards(mz.targetShards, func(target *topo.ShardInfo) error {
targetPrimary, err := mz.ts.GetTablet(ctx, target.PrimaryAlias)
Expand Down
28 changes: 27 additions & 1 deletion go/vt/vtctl/workflow/materializer_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

Expand Down Expand Up @@ -224,6 +225,7 @@ type testMaterializerTMClient struct {

mu sync.Mutex
vrQueries map[int][]*queryResult
fetchAsAllPrivsQueries map[int]map[string]*queryResult
createVReplicationWorkflowRequests map[uint32]*createVReplicationWorkflowRequestResponse

// Used to confirm the number of times WorkflowDelete was called.
Expand All @@ -243,6 +245,7 @@ func newTestMaterializerTMClient(keyspace string, sourceShards []string, tableSe
sourceShards: sourceShards,
tableSettings: tableSettings,
vrQueries: make(map[int][]*queryResult),
fetchAsAllPrivsQueries: make(map[int]map[string]*queryResult),
createVReplicationWorkflowRequests: make(map[uint32]*createVReplicationWorkflowRequestResponse),
getSchemaResponses: make(map[uint32]*tabletmanagerdatapb.SchemaDefinition),
}
Expand Down Expand Up @@ -370,6 +373,20 @@ func (tmc *testMaterializerTMClient) expectVRQuery(tabletID int, query string, r
})
}

func (tmc *testMaterializerTMClient) expectFetchAsAllPrivsQuery(tabletID int, query string, result *sqltypes.Result) {
tmc.mu.Lock()
defer tmc.mu.Unlock()

if tmc.fetchAsAllPrivsQueries[tabletID] == nil {
tmc.fetchAsAllPrivsQueries[tabletID] = make(map[string]*queryResult)
}

tmc.fetchAsAllPrivsQueries[tabletID][query] = &queryResult{
query: query,
result: sqltypes.ResultToProto3(result),
}
}

func (tmc *testMaterializerTMClient) expectCreateVReplicationWorkflowRequest(tabletID uint32, req *createVReplicationWorkflowRequestResponse) {
tmc.mu.Lock()
defer tmc.mu.Unlock()
Expand Down Expand Up @@ -420,7 +437,16 @@ func (tmc *testMaterializerTMClient) ExecuteFetchAsDba(ctx context.Context, tabl
}

func (tmc *testMaterializerTMClient) ExecuteFetchAsAllPrivs(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.ExecuteFetchAsAllPrivsRequest) (*querypb.QueryResult, error) {
return nil, nil
tmc.mu.Lock()
defer tmc.mu.Unlock()

if resultsForTablet, ok := tmc.fetchAsAllPrivsQueries[int(tablet.Alias.Uid)]; ok {
if result, ok := resultsForTablet[string(req.Query)]; ok {
return result.result, result.err
}
}

return nil, fmt.Errorf("%w: no ExecuteFetchAsAllPrivs result set for tablet %d", assert.AnError, int(tablet.Alias.Uid))
}

// Note: ONLY breaks up change.SQL into individual statements and executes it. Does NOT fully implement ApplySchema.
Expand Down
Loading
Loading