Skip to content

Commit

Permalink
Add new logic for adding unique tables from unsharded keyspaces witho…
Browse files Browse the repository at this point in the history
…ut vschemas to the global routing. Create more comprehensive unit test.

Signed-off-by: Rohit Nayak <[email protected]>
  • Loading branch information
rohit-nayak-ps committed Dec 19, 2024
1 parent 271e95f commit 53fa30e
Show file tree
Hide file tree
Showing 8 changed files with 364 additions and 197 deletions.
37 changes: 0 additions & 37 deletions go/test/endtoend/vtgate/gen4/gen4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,40 +520,3 @@ func TestDualJoinQueries(t *testing.T) {
mcmp.Exec("select t.title, t2.id from t2 left join (select 'ABC' as title) as t on t.title = t2.tcol1")

}

// TestSchemaTrackingGlobalTables tests that schema tracking works as intended with global table routing.
// This test creates a new table in a schema and verifies we can query it without needing to add it to the vschema as long
// as the name of the table is unique. It also creates a table which is already in the vschema to ensure that we
// don't mark it as ambiguous when schema tracking also finds it.
func TestSchemaTrackingGlobalTables(t *testing.T) {
// Create a new vtgate connection.
tables := []string{"uniqueTableName", "t1"}
for _, table := range tables {
t.Run(table, func(t *testing.T) {
vtConn, err := mysql.Connect(context.Background(), &vtParams)
require.NoError(t, err)
defer vtConn.Close()

// Create a new table in the unsharded keyspace such that it has a unique name that allows for global routing.
utils.Exec(t, vtConn, `use `+unshardedKs)
// Use the same schema as t1 from sharded_schema.sql
utils.Exec(t, vtConn, fmt.Sprintf(`create table if not exists %s(id bigint, col bigint, primary key(id))`, table))
defer utils.Exec(t, vtConn, fmt.Sprintf(`drop table %s`, table))

// Wait for schema tracking to see this column.
err = utils.WaitForAuthoritative(t, unshardedKs, table, clusterInstance.VtgateProcess.ReadVSchema)
require.NoError(t, err)

// Create a new vtgate connection.
vtConn2, err := mysql.Connect(context.Background(), &vtParams)
require.NoError(t, err)
defer vtConn2.Close()

// Insert rows into the table and select them to verify we can use them.
utils.Exec(t, vtConn2, fmt.Sprintf(`insert into %s(id, col) values (10, 100),(20, 200)`, table))
require.NoError(t, err)
utils.AssertMatches(t, vtConn2, fmt.Sprintf(`select * from %s order by id`, table),
`[[INT64(10) INT64(100)] [INT64(20) INT64(200)]]`)
})
}
}
17 changes: 17 additions & 0 deletions go/test/endtoend/vtgate/gen4/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ var (
}
]}
`
unsharded2Ks = "uks2"

//go:embed unsharded2_schema.sql
unsharded2SchemaSQL string

//go:embed unsharded2_vschema.json
unsharded2VSchema string
)

func TestMain(m *testing.M) {
Expand Down Expand Up @@ -101,6 +108,16 @@ func TestMain(m *testing.M) {
return 1
}

uKs2 := &cluster.Keyspace{
Name: unsharded2Ks,
SchemaSQL: unsharded2SchemaSQL,
VSchema: unsharded2VSchema,
}
err = clusterInstance.StartUnshardedKeyspace(*uKs2, 0, false)
if err != nil {
return 1
}

// apply routing rules
err = clusterInstance.VtctldClientProcess.ApplyRoutingRules(routingRules)
if err != nil {
Expand Down
13 changes: 13 additions & 0 deletions go/test/endtoend/vtgate/gen4/unsharded2_schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
create table u2_a
(
id bigint,
a bigint,
primary key (id)
) Engine = InnoDB;

create table u2_b
(
id bigint,
b varchar(50),
primary key (id)
) Engine = InnoDB;
Empty file.
52 changes: 44 additions & 8 deletions go/vt/vtgate/vindexes/vschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func BuildVSchema(source *vschemapb.SrvVSchema, parser *sqlparser.Parser) (vsche
buildKeyspaces(source, vschema, parser)
// buildGlobalTables before buildReferences so that buildReferences can
// resolve sources which reference global tables.
BuildGlobalTables(source, vschema, false)
BuildGlobalTables(source, vschema)
buildReferences(source, vschema)
buildRoutingRule(source, vschema, parser)
buildShardRoutingRule(source, vschema)
Expand Down Expand Up @@ -461,27 +461,63 @@ func (vschema *VSchema) AddUDF(ksname, udfName string) error {
return nil
}

func BuildGlobalTables(source *vschemapb.SrvVSchema, vschema *VSchema, skipIfAlreadyGlobal bool) {
func BuildGlobalTables(source *vschemapb.SrvVSchema, vschema *VSchema) {
for ksname, ks := range source.Keyspaces {
ksvschema := vschema.Keyspaces[ksname]
// If the keyspace requires explicit routing, don't include any of
// its tables in global tables.
if ks.RequireExplicitRouting {
continue
}
buildKeyspaceGlobalTables(vschema, ksvschema, skipIfAlreadyGlobal)
buildKeyspaceGlobalTables(vschema, ksvschema)
}
}

func buildKeyspaceGlobalTables(vschema *VSchema, ksvschema *KeyspaceSchema, skipIfAlreadyGlobal bool) {
// AddAdditionalGlobalTables adds unique tables from unsharded keyspaces to the global tables.
// It is expected to be called from the schema tracking code.
func AddAdditionalGlobalTables(source *vschemapb.SrvVSchema, vschema *VSchema) {
type tableInfo struct {
table *Table
cnt int
}
newTables := make(map[string]*tableInfo)

// Collect valid uniquely named tables from unsharded keyspaces.
for ksname, ks := range source.Keyspaces {
ksvschema := vschema.Keyspaces[ksname]
// Ignore sharded keyspaces and those flagged for explicit routing.
if ks.RequireExplicitRouting || ks.Sharded {
continue
}
for tname, table := range ksvschema.Tables {
// Ignore tables already global or ambiguous.
if _, found := vschema.globalTables[tname]; !found {
_, ok := newTables[tname]
if !ok {
table.Keyspace = &Keyspace{Name: ksname}
newTables[tname] = &tableInfo{table: table, cnt: 0}
}
newTables[tname].cnt++
}
}
}

// Mark new tables found just once as globally routable, rest as ambiguous.
for tname, ti := range newTables {
switch ti.cnt {
case 1:
vschema.globalTables[tname] = ti.table
default:
vschema.globalTables[tname] = nil
}
}
}

func buildKeyspaceGlobalTables(vschema *VSchema, ksvschema *KeyspaceSchema) {
for tname, t := range ksvschema.Tables {
if gt, ok := vschema.globalTables[tname]; ok {
// There is already an entry table stored in global tables
// with this name.
if skipIfAlreadyGlobal {
// Called when updating from schema tracking
continue
}
if gt == nil {
// Table name is already marked ambiguous, nothing to do.
continue
Expand Down
Loading

0 comments on commit 53fa30e

Please sign in to comment.