Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: use proper column collations in vstreamer #15313

Merged
merged 12 commits into from
Feb 26, 2024
9 changes: 9 additions & 0 deletions go/mysql/binlog_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package mysql
import (
"fmt"

"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/mysql/replication"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

Expand Down Expand Up @@ -216,6 +218,13 @@ type TableMap struct {
// - If the metadata is one byte, only the lower 8 bits are used.
// - If the metadata is two bytes, all 16 bits are used.
Metadata []uint16

// ColumnCollationIDs contains information about the inherited
// or implied column default collation and any explicit per-column
// override for text based columns ONLY. This means that the
// array position needs to be mapped to the ordered list of
// text based columns in the table.
ColumnCollationIDs []collations.ID
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a lot of the code is going to be easier to write and easier to follow if this slice has one entry for each column and you simply fill the non-text columns with a Binary collation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Soooo... I agree! But unfortunately I'd have to perform the same logic to fill in those gaps as the binlog row metadata does not include the column's index or name. I'll think about it some more though.

Copy link
Contributor Author

@mattlord mattlord Feb 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I explored this path, but we don't have access to the collation ENV in order to properly fill in the gaps here. So leaving it as-is for now, where users of this (vstreamer) have access to both the collation ENV and mysqld.

}

// Rows contains data from a {WRITE,UPDATE,DELETE}_ROWS_EVENT.
Expand Down
15 changes: 9 additions & 6 deletions go/mysql/binlog_event_make_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/mysql/replication"

"vitess.io/vitess/go/mysql/binlog"
Expand Down Expand Up @@ -222,6 +223,7 @@ func TestTableMapEvent(t *testing.T) {
0,
384, // Length of the varchar field.
},
ColumnCollationIDs: []collations.ID{},
}
tm.CanBeNull.Set(1, true)
tm.CanBeNull.Set(2, true)
Expand Down Expand Up @@ -258,12 +260,13 @@ func TestLargeTableMapEvent(t *testing.T) {
}

tm := &TableMap{
Flags: 0x8090,
Database: "my_database",
Name: "my_table",
Types: types,
CanBeNull: NewServerBitmap(colLen),
Metadata: metadata,
Flags: 0x8090,
Database: "my_database",
Name: "my_table",
Types: types,
CanBeNull: NewServerBitmap(colLen),
Metadata: metadata,
ColumnCollationIDs: []collations.ID{},
}
tm.CanBeNull.Set(1, true)
tm.CanBeNull.Set(2, true)
Expand Down
87 changes: 78 additions & 9 deletions go/mysql/binlog_event_rbr.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,35 @@
"encoding/binary"

"vitess.io/vitess/go/mysql/binlog"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/vt/vterrors"

querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// These are the TABLE_MAP_EVENT's optional metadata field types from
// MySQL's libbinlogevents/include/rows_event.h.
// See also: https://dev.mysql.com/doc/dev/mysql-server/8.0.34/structbinary__log_1_1Table__map__event_1_1Optional__metadata__fields.html
const (
tableMapSignedness uint8 = iota + 1
tableMapDefaultCharset
tableMapColumnCharset
tableMapColumnName
tableMapSetStrValue
tableMapEnumStrValue
tableMapGeometryType
tableMapSimplePrimaryKey
tableMapPrimaryKeyWithPrefix
tableMapEnumAndSetDefaultCharset
tableMapEnumAndSetColumnCharset
tableMapColumnVisibility
)

// This byte in the optional metadata indicates that we should
// read the next 2 bytes as a collation ID.
const readTwoByteCollationID = 252

// TableMap implements BinlogEvent.TableMap().
//
// Expected format (L = total length of event data):
Expand All @@ -43,6 +66,7 @@
// cc column-def, one byte per column
// <var> column-meta-def (var-len encoded string)
// n NULL-bitmask, length: (cc + 7) / 8
// n Optional Metadata
func (ev binlogEvent) TableMap(f BinlogFormat) (*TableMap, error) {
data := ev.Bytes()[f.HeaderLength:]

Expand All @@ -64,7 +88,7 @@

columnCount, read, ok := readLenEncInt(data, pos)
if !ok {
return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "expected column count at position %v (data=%v)", pos, data)
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "expected column count at position %v (data=%v)", pos, data)

Check warning on line 91 in go/mysql/binlog_event_rbr.go

View check run for this annotation

Codecov / codecov/patch

go/mysql/binlog_event_rbr.go#L91

Added line #L91 was not covered by tests
}
pos = read

Expand All @@ -73,7 +97,7 @@

metaLen, read, ok := readLenEncInt(data, pos)
if !ok {
return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "expected metadata length at position %v (data=%v)", pos, data)
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "expected metadata length at position %v (data=%v)", pos, data)

Check warning on line 100 in go/mysql/binlog_event_rbr.go

View check run for this annotation

Codecov / codecov/patch

go/mysql/binlog_event_rbr.go#L100

Added line #L100 was not covered by tests
}
pos = read

Expand All @@ -88,11 +112,19 @@
}
}
if pos != expectedEnd {
return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "unexpected metadata end: got %v was expecting %v (data=%v)", pos, expectedEnd, data)
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected metadata end: got %v was expecting %v (data=%v)", pos, expectedEnd, data)

Check warning on line 115 in go/mysql/binlog_event_rbr.go

View check run for this annotation

Codecov / codecov/patch

go/mysql/binlog_event_rbr.go#L115

Added line #L115 was not covered by tests
}

// A bit array that says if each column can be NULL.
result.CanBeNull, _ = newBitmap(data, pos, int(columnCount))
result.CanBeNull, read = newBitmap(data, pos, int(columnCount))
pos = read

// Read any text based column collation values provided in the optional metadata.
// The binlog_row_metadata only contains this info for text based columns.
var err error
if result.ColumnCollationIDs, err = readColumnCollationIDs(data, pos, int(columnCount)); err != nil {
return nil, err

Check warning on line 126 in go/mysql/binlog_event_rbr.go

View check run for this annotation

Codecov / codecov/patch

go/mysql/binlog_event_rbr.go#L126

Added line #L126 was not covered by tests
}

return result, nil
}
Expand All @@ -118,7 +150,7 @@

default:
// Unknown type. This is used in tests only, so panic.
panic(vterrors.Errorf(vtrpc.Code_INTERNAL, "metadataLength: unhandled data type: %v", typ))
panic(vterrors.Errorf(vtrpcpb.Code_INTERNAL, "metadataLength: unhandled data type: %v", typ))

Check warning on line 153 in go/mysql/binlog_event_rbr.go

View check run for this annotation

Codecov / codecov/patch

go/mysql/binlog_event_rbr.go#L153

Added line #L153 was not covered by tests
}
}

Expand Down Expand Up @@ -154,7 +186,7 @@

default:
// Unknown types, we can't go on.
return 0, 0, vterrors.Errorf(vtrpc.Code_INTERNAL, "metadataRead: unhandled data type: %v", typ)
return 0, 0, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "metadataRead: unhandled data type: %v", typ)

Check warning on line 189 in go/mysql/binlog_event_rbr.go

View check run for this annotation

Codecov / codecov/patch

go/mysql/binlog_event_rbr.go#L189

Added line #L189 was not covered by tests
}
}

Expand Down Expand Up @@ -185,8 +217,45 @@

default:
// Unknown type. This is used in tests only, so panic.
panic(vterrors.Errorf(vtrpc.Code_INTERNAL, "metadataRead: unhandled data type: %v", typ))
panic(vterrors.Errorf(vtrpcpb.Code_INTERNAL, "metadataRead: unhandled data type: %v", typ))

Check warning on line 220 in go/mysql/binlog_event_rbr.go

View check run for this annotation

Codecov / codecov/patch

go/mysql/binlog_event_rbr.go#L220

Added line #L220 was not covered by tests
}
}

// readColumnCollationIDs reads from the optional metadata that exists.
// See: https://github.com/mysql/mysql-server/blob/8.0/libbinlogevents/include/rows_event.h
// What's included depends on the server configuration:
// https://dev.mysql.com/doc/refman/en/replication-options-binary-log.html#sysvar_binlog_row_metadata
// and the table definition.
// We only care about any collation IDs in the optional metadata and
// this info is provided in all binlog_row_metadata formats. Note that
// this info is only provided for text based columns.
func readColumnCollationIDs(data []byte, pos, count int) ([]collations.ID, error) {
collationIDs := make([]collations.ID, 0, count)
for pos < len(data) {
fieldType := uint8(data[pos])
pos++

fieldLen, read, ok := readLenEncInt(data, pos)
if !ok {
return nil, vterrors.New(vtrpcpb.Code_INTERNAL, "error reading optional metadata field length")

Check warning on line 240 in go/mysql/binlog_event_rbr.go

View check run for this annotation

Codecov / codecov/patch

go/mysql/binlog_event_rbr.go#L240

Added line #L240 was not covered by tests
}
pos = read

fieldVal := data[pos : pos+int(fieldLen)]
pos += int(fieldLen)

if fieldType == tableMapDefaultCharset || fieldType == tableMapColumnCharset { // It's one or the other
for i := uint64(0); i < fieldLen; i++ {
v := uint16(fieldVal[i])
if v == readTwoByteCollationID { // The ID is the subsequent 2 bytes
v = binary.LittleEndian.Uint16(fieldVal[i+1 : i+3])
i += 2
}
collationIDs = append(collationIDs, collations.ID(v))
}
}
}
return collationIDs, nil
}

// Rows implements BinlogEvent.TableMap().
Expand Down Expand Up @@ -235,7 +304,7 @@

columnCount, read, ok := readLenEncInt(data, pos)
if !ok {
return result, vterrors.Errorf(vtrpc.Code_INTERNAL, "expected column count at position %v (data=%v)", pos, data)
return result, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "expected column count at position %v (data=%v)", pos, data)

Check warning on line 307 in go/mysql/binlog_event_rbr.go

View check run for this annotation

Codecov / codecov/patch

go/mysql/binlog_event_rbr.go#L307

Added line #L307 was not covered by tests
}
pos = read

Expand Down
10 changes: 5 additions & 5 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func testPlayerCopyCharPK(t *testing.T) {
defer func() { waitRetryTime = savedWaitRetryTime }()

execStatements(t, []string{
"create table src(idc binary(2) , val int, primary key(idc))",
"create table src(idc binary(2), val int, primary key(idc))",
"insert into src values('a', 1), ('c', 2)",
fmt.Sprintf("create table %s.dst(idc binary(2), val int, primary key(idc))", vrepldb),
})
Expand Down Expand Up @@ -215,7 +215,7 @@ func testPlayerCopyVarcharPKCaseInsensitive(t *testing.T) {
defer func() { waitRetryTime = savedWaitRetryTime }()

execStatements(t, []string{
"create table src(idc varchar(20), val int, primary key(idc))",
"create table src(idc varchar(20), val int, primary key(idc)) character set utf8mb3", // Use utf8mb3 to get a consistent default collation across MySQL versions
"insert into src values('a', 1), ('c', 2)",
fmt.Sprintf("create table %s.dst(idc varchar(20), val int, primary key(idc))", vrepldb),
})
Expand Down Expand Up @@ -285,7 +285,7 @@ func testPlayerCopyVarcharPKCaseInsensitive(t *testing.T) {
"/update _vt.vreplication set state='Copying'",
// Copy mode.
"insert into dst(idc,val) values ('a',1)",
`/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \('fields:{name:\\"idc\\" type:VARCHAR charset:45 flags:20483} rows:{lengths:1 values:\\"a\\"}'.*`,
`/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \('fields:{name:\\"idc\\" type:VARCHAR charset:33 flags:20483} rows:{lengths:1 values:\\"a\\"}'.*`,
// Copy-catchup mode.
`/insert into dst\(idc,val\) select 'B', 3 from dual where \( .* 'B' COLLATE .* \) <= \( .* 'a' COLLATE .* \)`,
).Then(func(expect qh.ExpectationSequencer) qh.ExpectationSequencer {
Expand All @@ -295,11 +295,11 @@ func testPlayerCopyVarcharPKCaseInsensitive(t *testing.T) {
//upd1 := expect.
upd1 := expect.Then(qh.Eventually(
"insert into dst(idc,val) values ('B',3)",
`/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \('fields:{name:\\"idc\\" type:VARCHAR charset:45 flags:20483} rows:{lengths:1 values:\\"B\\"}'.*`,
`/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \('fields:{name:\\"idc\\" type:VARCHAR charset:33 flags:20483} rows:{lengths:1 values:\\"B\\"}'.*`,
))
upd2 := expect.Then(qh.Eventually(
"insert into dst(idc,val) values ('c',2)",
`/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \('fields:{name:\\"idc\\" type:VARCHAR charset:45 flags:20483} rows:{lengths:1 values:\\"c\\"}'.*`,
`/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \('fields:{name:\\"idc\\" type:VARCHAR charset:33 flags:20483} rows:{lengths:1 values:\\"c\\"}'.*`,
))
upd1.Then(upd2.Eventually())
return upd2
Expand Down
3 changes: 1 addition & 2 deletions go/vt/vttablet/tabletserver/repltracker/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@ import (
"time"

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/vt/vterrors"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletserver/connpool"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"

Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/schema/historian.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import (
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/tabletserver/connpool"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"

"vitess.io/vitess/go/vt/sqlparser"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

const getInitialSchemaVersions = "select id, pos, ddl, time_updated, schemax from %s.schema_version where time_updated > %d order by id asc"
Expand Down
10 changes: 5 additions & 5 deletions go/vt/vttablet/tabletserver/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,18 @@ import (

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
)

// VStreamer defines the functions of VStreamer
// VStreamer defines the functions of VStreamer
// that the replicationWatcher needs.
type VStreamer interface {
Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, throttlerApp throttlerapp.Name, send func([]*binlogdatapb.VEvent) error) error
Expand Down
Loading
Loading