Skip to content

Commit

Permalink
Vttablet schema tracking: Fix _vt.schema_version corruption (#13045)
Browse files Browse the repository at this point in the history
* Copy fields from schema.Engine before modifying

This fixes a race condition which caused protobuf marshaled schema
data in _vt.schema_version rows to become corrupted when ColumnType was
modified between the time when Field message sizes were calculated
and when Field message data was written to the buffer.

Signed-off-by: Brendan Dougherty <[email protected]>

* Acquire schema engine mutex while marshaling schema

This change is purely defensive, but it may help avoid future
race conditions caused by modifying shared schema structs while
they are being marshaled to protobuf.

Signed-off-by: Brendan Dougherty <[email protected]>

* reorganize imports and allocate slices with make

Signed-off-by: Austen Lacy <[email protected]>

* gofmt

Signed-off-by: Austen Lacy <[email protected]>

* flakey unit test

Signed-off-by: Austen Lacy <[email protected]>

* Use common code to update column_type for both vstreamer and rowstream events in a thread-safe fashion. Fix related tests

Signed-off-by: Rohit Nayak <[email protected]>

* Use DBName and not keyspace name while getting extended field info!

Signed-off-by: Rohit Nayak <[email protected]>

* Fix TestVStreamSharded

Signed-off-by: Rohit Nayak <[email protected]>

---------

Signed-off-by: Brendan Dougherty <[email protected]>
Signed-off-by: Austen Lacy <[email protected]>
Signed-off-by: Rohit Nayak <[email protected]>
Co-authored-by: Austen Lacy <[email protected]>
Co-authored-by: Rohit Nayak <[email protected]>
  • Loading branch information
3 people authored and frouioui committed Sep 18, 2023
1 parent 69f88f4 commit df54722
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 77 deletions.
18 changes: 15 additions & 3 deletions go/vt/vtgate/endtoend/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"io"
"regexp"
"sync"
"testing"

Expand Down Expand Up @@ -330,9 +331,9 @@ func TestVStreamSharded(t *testing.T) {
received bool
}
expectedEvents := []*expectedEvent{
{`type:FIELD field_event:{table_name:"ks.t1_sharded" fields:{name:"id1" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_-80" org_name:"id1" column_length:20 charset:63 flags:53251} fields:{name:"id2" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_-80" org_name:"id2" column_length:20 charset:63 flags:32768} keyspace:"ks" shard:"-80"}`, false},
{`type:FIELD field_event:{table_name:"ks.t1_sharded" fields:{name:"id1" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_-80" org_name:"id1" column_length:20 charset:63 flags:53251 column_type:"bigint(20)"} fields:{name:"id2" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_-80" org_name:"id2" column_length:20 charset:63 flags:32768 column_type:"bigint(20)"} keyspace:"ks" shard:"-80"}`, false},
{`type:ROW row_event:{table_name:"ks.t1_sharded" row_changes:{after:{lengths:1 lengths:1 values:"11"}} keyspace:"ks" shard:"-80"}`, false},
{`type:FIELD field_event:{table_name:"ks.t1_sharded" fields:{name:"id1" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_80-" org_name:"id1" column_length:20 charset:63 flags:53251} fields:{name:"id2" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_80-" org_name:"id2" column_length:20 charset:63 flags:32768} keyspace:"ks" shard:"80-"}`, false},
{`type:FIELD field_event:{table_name:"ks.t1_sharded" fields:{name:"id1" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_80-" org_name:"id1" column_length:20 charset:63 flags:53251 column_type:"bigint(20)"} fields:{name:"id2" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_80-" org_name:"id2" column_length:20 charset:63 flags:32768 column_type:"bigint(20)"} keyspace:"ks" shard:"80-"}`, false},
{`type:ROW row_event:{table_name:"ks.t1_sharded" row_changes:{after:{lengths:1 lengths:1 values:"44"}} keyspace:"ks" shard:"80-"}`, false},
}
for {
Expand All @@ -357,7 +358,7 @@ func TestVStreamSharded(t *testing.T) {
for _, ev := range evs {
s := fmt.Sprintf("%v", ev)
for _, expectedEv := range expectedEvents {
if expectedEv.ev == s {
if removeAnyDeprecatedDisplayWidths(expectedEv.ev) == removeAnyDeprecatedDisplayWidths(s) {
expectedEv.received = true
break
}
Expand All @@ -381,6 +382,17 @@ func TestVStreamSharded(t *testing.T) {

}

func removeAnyDeprecatedDisplayWidths(orig string) string {
var adjusted string
baseIntType := "int"
intRE := regexp.MustCompile(`(?i)int\(([0-9]*)?\)`)
adjusted = intRE.ReplaceAllString(orig, baseIntType)
baseYearType := "year"
yearRE := regexp.MustCompile(`(?i)year\(([0-9]*)?\)`)
adjusted = yearRE.ReplaceAllString(adjusted, baseYearType)
return adjusted
}

var printMu sync.Mutex

func printEvents(evs []*binlogdatapb.VEvent) {
Expand Down
26 changes: 26 additions & 0 deletions go/vt/vttablet/tabletserver/schema/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,32 @@ func (se *Engine) GetSchema() map[string]*Table {
return tables
}

// MarshalMinimalSchema returns a protobuf encoded binlogdata.MinimalSchema
func (se *Engine) MarshalMinimalSchema() ([]byte, error) {
se.mu.Lock()
defer se.mu.Unlock()
dbSchema := &binlogdatapb.MinimalSchema{
Tables: make([]*binlogdatapb.MinimalTable, 0, len(se.tables)),
}
for _, table := range se.tables {
dbSchema.Tables = append(dbSchema.Tables, newMinimalTable(table))
}
return dbSchema.MarshalVT()
}

func newMinimalTable(st *Table) *binlogdatapb.MinimalTable {
table := &binlogdatapb.MinimalTable{
Name: st.Name.String(),
Fields: st.Fields,
}
pkc := make([]int64, len(st.PKColumns))
for i, pk := range st.PKColumns {
pkc[i] = int64(pk)
}
table.PKColumns = pkc
return table
}

// GetConnection returns a connection from the pool
func (se *Engine) GetConnection(ctx context.Context) (*connpool.DBConn, error) {
return se.conns.Get(ctx, nil)
Expand Down
25 changes: 3 additions & 22 deletions go/vt/vttablet/tabletserver/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
"sync"
"time"

"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/vt/schema"

"vitess.io/vitess/go/mysql"
Expand Down Expand Up @@ -240,14 +238,10 @@ func (tr *Tracker) schemaUpdated(gtid string, ddl string, timestamp int64) error
}

func (tr *Tracker) saveCurrentSchemaToDb(ctx context.Context, gtid, ddl string, timestamp int64) error {
tables := tr.engine.GetSchema()
dbSchema := &binlogdatapb.MinimalSchema{
Tables: []*binlogdatapb.MinimalTable{},
}
for _, table := range tables {
dbSchema.Tables = append(dbSchema.Tables, newMinimalTable(table))
blob, err := tr.engine.MarshalMinimalSchema()
if err != nil {
return err
}
blob, _ := proto.Marshal(dbSchema)

conn, err := tr.engine.GetConnection(ctx)
if err != nil {
Expand All @@ -265,19 +259,6 @@ func (tr *Tracker) saveCurrentSchemaToDb(ctx context.Context, gtid, ddl string,
return nil
}

func newMinimalTable(st *Table) *binlogdatapb.MinimalTable {
table := &binlogdatapb.MinimalTable{
Name: st.Name.String(),
Fields: st.Fields,
}
var pkc []int64
for _, pk := range st.PKColumns {
pkc = append(pkc, int64(pk))
}
table.PKColumns = pkc
return table
}

func encodeString(in string) string {
buf := bytes.NewBuffer(nil)
sqltypes.NewVarChar(in).EncodeSQL(buf)
Expand Down
9 changes: 7 additions & 2 deletions go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,14 @@ func (rs *rowStreamer) buildPlan() error {
return err
}
ti := &Table{
Name: st.Name,
Fields: st.Fields,
Name: st.Name,
}

ti.Fields, err = getFields(rs.ctx, rs.cp, st.Name, rs.cp.DBName(), st.Fields)
if err != nil {
return err
}

// The plan we build is identical to the one for vstreamer.
// This is because the row format of a read is identical
// to the row format of a binlog event. So, the same
Expand Down
Loading

0 comments on commit df54722

Please sign in to comment.