From 8de1c9180f3d6a6ed34f19029e21e6e1960eb4fc Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 26 Feb 2024 12:10:34 -0500 Subject: [PATCH] VReplication: use proper column collations in vstreamer (#15313) Signed-off-by: Matt Lord --- go/mysql/binlog_event.go | 9 ++ go/mysql/binlog_event_make_test.go | 15 +-- go/mysql/binlog_event_rbr.go | 87 +++++++++++++++-- .../vreplication/vcopier_test.go | 10 +- .../tabletserver/repltracker/reader.go | 3 +- .../vttablet/tabletserver/schema/historian.go | 4 +- go/vt/vttablet/tabletserver/schema/tracker.go | 10 +- .../vstreamer/helper_event_test.go | 38 +++++--- .../tabletserver/vstreamer/rowstreamer.go | 2 +- .../tabletserver/vstreamer/testenv/testenv.go | 42 ++++++++- .../tabletserver/vstreamer/vstreamer.go | 58 +++++++++--- .../tabletserver/vstreamer/vstreamer_test.go | 93 +++++++++++++------ 12 files changed, 287 insertions(+), 84 deletions(-) diff --git a/go/mysql/binlog_event.go b/go/mysql/binlog_event.go index e58cb9b254c..3acf99c2408 100644 --- a/go/mysql/binlog_event.go +++ b/go/mysql/binlog_event.go @@ -19,7 +19,9 @@ package mysql import ( "fmt" + "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/mysql/replication" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) @@ -216,6 +218,13 @@ type TableMap struct { // - If the metadata is one byte, only the lower 8 bits are used. // - If the metadata is two bytes, all 16 bits are used. Metadata []uint16 + + // ColumnCollationIDs contains information about the inherited + // or implied column default collation and any explicit per-column + // override for text based columns ONLY. This means that the + // array position needs to be mapped to the ordered list of + // text based columns in the table. + ColumnCollationIDs []collations.ID } // Rows contains data from a {WRITE,UPDATE,DELETE}_ROWS_EVENT. diff --git a/go/mysql/binlog_event_make_test.go b/go/mysql/binlog_event_make_test.go index 12d8a54ff97..32401bfa401 100644 --- a/go/mysql/binlog_event_make_test.go +++ b/go/mysql/binlog_event_make_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/mysql/binlog" @@ -222,6 +223,7 @@ func TestTableMapEvent(t *testing.T) { 0, 384, // Length of the varchar field. }, + ColumnCollationIDs: []collations.ID{}, } tm.CanBeNull.Set(1, true) tm.CanBeNull.Set(2, true) @@ -258,12 +260,13 @@ func TestLargeTableMapEvent(t *testing.T) { } tm := &TableMap{ - Flags: 0x8090, - Database: "my_database", - Name: "my_table", - Types: types, - CanBeNull: NewServerBitmap(colLen), - Metadata: metadata, + Flags: 0x8090, + Database: "my_database", + Name: "my_table", + Types: types, + CanBeNull: NewServerBitmap(colLen), + Metadata: metadata, + ColumnCollationIDs: []collations.ID{}, } tm.CanBeNull.Set(1, true) tm.CanBeNull.Set(2, true) diff --git a/go/mysql/binlog_event_rbr.go b/go/mysql/binlog_event_rbr.go index 58777d4cfba..64d17c2b306 100644 --- a/go/mysql/binlog_event_rbr.go +++ b/go/mysql/binlog_event_rbr.go @@ -20,12 +20,35 @@ import ( "encoding/binary" "vitess.io/vitess/go/mysql/binlog" - "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/vt/vterrors" querypb "vitess.io/vitess/go/vt/proto/query" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) +// These are the TABLE_MAP_EVENT's optional metadata field types from +// MySQL's libbinlogevents/include/rows_event.h. +// See also: https://dev.mysql.com/doc/dev/mysql-server/8.0.34/structbinary__log_1_1Table__map__event_1_1Optional__metadata__fields.html +const ( + tableMapSignedness uint8 = iota + 1 + tableMapDefaultCharset + tableMapColumnCharset + tableMapColumnName + tableMapSetStrValue + tableMapEnumStrValue + tableMapGeometryType + tableMapSimplePrimaryKey + tableMapPrimaryKeyWithPrefix + tableMapEnumAndSetDefaultCharset + tableMapEnumAndSetColumnCharset + tableMapColumnVisibility +) + +// This byte in the optional metadata indicates that we should +// read the next 2 bytes as a collation ID. +const readTwoByteCollationID = 252 + // TableMap implements BinlogEvent.TableMap(). // // Expected format (L = total length of event data): @@ -43,6 +66,7 @@ import ( // cc column-def, one byte per column // column-meta-def (var-len encoded string) // n NULL-bitmask, length: (cc + 7) / 8 +// n Optional Metadata func (ev binlogEvent) TableMap(f BinlogFormat) (*TableMap, error) { data := ev.Bytes()[f.HeaderLength:] @@ -64,7 +88,7 @@ func (ev binlogEvent) TableMap(f BinlogFormat) (*TableMap, error) { columnCount, read, ok := readLenEncInt(data, pos) if !ok { - return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "expected column count at position %v (data=%v)", pos, data) + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "expected column count at position %v (data=%v)", pos, data) } pos = read @@ -73,7 +97,7 @@ func (ev binlogEvent) TableMap(f BinlogFormat) (*TableMap, error) { metaLen, read, ok := readLenEncInt(data, pos) if !ok { - return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "expected metadata length at position %v (data=%v)", pos, data) + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "expected metadata length at position %v (data=%v)", pos, data) } pos = read @@ -88,11 +112,19 @@ func (ev binlogEvent) TableMap(f BinlogFormat) (*TableMap, error) { } } if pos != expectedEnd { - return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "unexpected metadata end: got %v was expecting %v (data=%v)", pos, expectedEnd, data) + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected metadata end: got %v was expecting %v (data=%v)", pos, expectedEnd, data) } // A bit array that says if each column can be NULL. - result.CanBeNull, _ = newBitmap(data, pos, int(columnCount)) + result.CanBeNull, read = newBitmap(data, pos, int(columnCount)) + pos = read + + // Read any text based column collation values provided in the optional metadata. + // The binlog_row_metadata only contains this info for text based columns. + var err error + if result.ColumnCollationIDs, err = readColumnCollationIDs(data, pos, int(columnCount)); err != nil { + return nil, err + } return result, nil } @@ -118,7 +150,7 @@ func metadataLength(typ byte) int { default: // Unknown type. This is used in tests only, so panic. - panic(vterrors.Errorf(vtrpc.Code_INTERNAL, "metadataLength: unhandled data type: %v", typ)) + panic(vterrors.Errorf(vtrpcpb.Code_INTERNAL, "metadataLength: unhandled data type: %v", typ)) } } @@ -154,7 +186,7 @@ func metadataRead(data []byte, pos int, typ byte) (uint16, int, error) { default: // Unknown types, we can't go on. - return 0, 0, vterrors.Errorf(vtrpc.Code_INTERNAL, "metadataRead: unhandled data type: %v", typ) + return 0, 0, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "metadataRead: unhandled data type: %v", typ) } } @@ -185,8 +217,45 @@ func metadataWrite(data []byte, pos int, typ byte, value uint16) int { default: // Unknown type. This is used in tests only, so panic. - panic(vterrors.Errorf(vtrpc.Code_INTERNAL, "metadataRead: unhandled data type: %v", typ)) + panic(vterrors.Errorf(vtrpcpb.Code_INTERNAL, "metadataRead: unhandled data type: %v", typ)) + } +} + +// readColumnCollationIDs reads from the optional metadata that exists. +// See: https://github.com/mysql/mysql-server/blob/8.0/libbinlogevents/include/rows_event.h +// What's included depends on the server configuration: +// https://dev.mysql.com/doc/refman/en/replication-options-binary-log.html#sysvar_binlog_row_metadata +// and the table definition. +// We only care about any collation IDs in the optional metadata and +// this info is provided in all binlog_row_metadata formats. Note that +// this info is only provided for text based columns. +func readColumnCollationIDs(data []byte, pos, count int) ([]collations.ID, error) { + collationIDs := make([]collations.ID, 0, count) + for pos < len(data) { + fieldType := uint8(data[pos]) + pos++ + + fieldLen, read, ok := readLenEncInt(data, pos) + if !ok { + return nil, vterrors.New(vtrpcpb.Code_INTERNAL, "error reading optional metadata field length") + } + pos = read + + fieldVal := data[pos : pos+int(fieldLen)] + pos += int(fieldLen) + + if fieldType == tableMapDefaultCharset || fieldType == tableMapColumnCharset { // It's one or the other + for i := uint64(0); i < fieldLen; i++ { + v := uint16(fieldVal[i]) + if v == readTwoByteCollationID { // The ID is the subsequent 2 bytes + v = binary.LittleEndian.Uint16(fieldVal[i+1 : i+3]) + i += 2 + } + collationIDs = append(collationIDs, collations.ID(v)) + } + } } + return collationIDs, nil } // Rows implements BinlogEvent.TableMap(). @@ -235,7 +304,7 @@ func (ev binlogEvent) Rows(f BinlogFormat, tm *TableMap) (Rows, error) { columnCount, read, ok := readLenEncInt(data, pos) if !ok { - return result, vterrors.Errorf(vtrpc.Code_INTERNAL, "expected column count at position %v (data=%v)", pos, data) + return result, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "expected column count at position %v (data=%v)", pos, data) } pos = read diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go index c32482641b2..fb5648f49af 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go @@ -108,7 +108,7 @@ func testPlayerCopyCharPK(t *testing.T) { defer func() { waitRetryTime = savedWaitRetryTime }() execStatements(t, []string{ - "create table src(idc binary(2) , val int, primary key(idc))", + "create table src(idc binary(2), val int, primary key(idc))", "insert into src values('a', 1), ('c', 2)", fmt.Sprintf("create table %s.dst(idc binary(2), val int, primary key(idc))", vrepldb), }) @@ -215,7 +215,7 @@ func testPlayerCopyVarcharPKCaseInsensitive(t *testing.T) { defer func() { waitRetryTime = savedWaitRetryTime }() execStatements(t, []string{ - "create table src(idc varchar(20), val int, primary key(idc))", + "create table src(idc varchar(20), val int, primary key(idc)) character set utf8mb3", // Use utf8mb3 to get a consistent default collation across MySQL versions "insert into src values('a', 1), ('c', 2)", fmt.Sprintf("create table %s.dst(idc varchar(20), val int, primary key(idc))", vrepldb), }) @@ -285,7 +285,7 @@ func testPlayerCopyVarcharPKCaseInsensitive(t *testing.T) { "/update _vt.vreplication set state='Copying'", // Copy mode. "insert into dst(idc,val) values ('a',1)", - `/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \('fields:{name:\\"idc\\" type:VARCHAR charset:45 flags:20483} rows:{lengths:1 values:\\"a\\"}'.*`, + `/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \('fields:{name:\\"idc\\" type:VARCHAR charset:33 flags:20483} rows:{lengths:1 values:\\"a\\"}'.*`, // Copy-catchup mode. `/insert into dst\(idc,val\) select 'B', 3 from dual where \( .* 'B' COLLATE .* \) <= \( .* 'a' COLLATE .* \)`, ).Then(func(expect qh.ExpectationSequencer) qh.ExpectationSequencer { @@ -295,11 +295,11 @@ func testPlayerCopyVarcharPKCaseInsensitive(t *testing.T) { //upd1 := expect. upd1 := expect.Then(qh.Eventually( "insert into dst(idc,val) values ('B',3)", - `/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \('fields:{name:\\"idc\\" type:VARCHAR charset:45 flags:20483} rows:{lengths:1 values:\\"B\\"}'.*`, + `/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \('fields:{name:\\"idc\\" type:VARCHAR charset:33 flags:20483} rows:{lengths:1 values:\\"B\\"}'.*`, )) upd2 := expect.Then(qh.Eventually( "insert into dst(idc,val) values ('c',2)", - `/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \('fields:{name:\\"idc\\" type:VARCHAR charset:45 flags:20483} rows:{lengths:1 values:\\"c\\"}'.*`, + `/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \('fields:{name:\\"idc\\" type:VARCHAR charset:33 flags:20483} rows:{lengths:1 values:\\"c\\"}'.*`, )) upd1.Then(upd2.Eventually()) return upd2 diff --git a/go/vt/vttablet/tabletserver/repltracker/reader.go b/go/vt/vttablet/tabletserver/repltracker/reader.go index 694778d1119..b50e5e4b2c7 100644 --- a/go/vt/vttablet/tabletserver/repltracker/reader.go +++ b/go/vt/vttablet/tabletserver/repltracker/reader.go @@ -23,13 +23,12 @@ import ( "time" "vitess.io/vitess/go/constants/sidecar" - "vitess.io/vitess/go/vt/vterrors" - "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/timer" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" diff --git a/go/vt/vttablet/tabletserver/schema/historian.go b/go/vt/vttablet/tabletserver/schema/historian.go index b65ab514585..ca57f6d43e0 100644 --- a/go/vt/vttablet/tabletserver/schema/historian.go +++ b/go/vt/vttablet/tabletserver/schema/historian.go @@ -26,11 +26,11 @@ import ( "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/log" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" - "vitess.io/vitess/go/vt/sqlparser" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) const getInitialSchemaVersions = "select id, pos, ddl, time_updated, schemax from %s.schema_version where time_updated > %d order by id asc" diff --git a/go/vt/vttablet/tabletserver/schema/tracker.go b/go/vt/vttablet/tabletserver/schema/tracker.go index bce5e4b33d6..8db202efa13 100644 --- a/go/vt/vttablet/tabletserver/schema/tracker.go +++ b/go/vt/vttablet/tabletserver/schema/tracker.go @@ -25,18 +25,18 @@ import ( "vitess.io/vitess/go/constants/sidecar" "vitess.io/vitess/go/mysql/replication" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" - "vitess.io/vitess/go/sqltypes" - "vitess.io/vitess/go/vt/log" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" - "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" - "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" ) -// VStreamer defines the functions of VStreamer +// VStreamer defines the functions of VStreamer // that the replicationWatcher needs. type VStreamer interface { Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, throttlerApp throttlerapp.Name, send func([]*binlogdatapb.VEvent) error) error diff --git a/go/vt/vttablet/tabletserver/vstreamer/helper_event_test.go b/go/vt/vttablet/tabletserver/vstreamer/helper_event_test.go index 0b479bd588c..6c54a27996e 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/helper_event_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/helper_event_test.go @@ -45,6 +45,7 @@ import ( "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql/collations" + "vitess.io/vitess/go/mysql/collations/colldata" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/proto/binlogdata" "vitess.io/vitess/go/vt/proto/query" @@ -60,10 +61,6 @@ const ( lengthSet = 56 ) -func getDefaultCollationID() int64 { - return 45 // utf8mb4_general_ci -} - var ( // noEvents is used to indicate that a query is expected to generate no events. noEvents = []TestRowEvent{} @@ -72,10 +69,10 @@ var ( // TestColumn has all the attributes of a column required for the test cases. type TestColumn struct { name, dataType, colType string - len, collationID int64 + collationID collations.ID + len int64 dataTypeLowered string skip bool - collationName string } // TestFieldEvent has all the attributes of a table required for creating a field event. @@ -187,6 +184,15 @@ func (ts *TestSpec) Init() error { if ts.options == nil { ts.options = &TestSpecOptions{} } + // Add the unicode character set to each table definition. + // The collation used will then be the default for that character set + // in the given MySQL version used in the test: + // - 5.7: utf8mb4_general_ci + // - 8.0: utf8mb4_0900_ai_ci + tableOptions := "ENGINE=InnoDB CHARSET=utf8mb4" + for i := range ts.ddls { + ts.ddls[i] = fmt.Sprintf("%s %s", ts.ddls[i], tableOptions) + } ts.schema, err = schemadiff.NewSchemaFromQueries(schemadiff.NewTestEnv(), ts.ddls) if err != nil { return err @@ -372,7 +378,15 @@ func (ts *TestSpec) getFieldEvent(table *schemadiff.CreateTableEntity) *TestFiel sqlType := col.Type.SQLType() tc.dataType = sqlType.String() tc.dataTypeLowered = strings.ToLower(tc.dataType) - tc.collationName = col.Type.Options.Collate + collationName := col.Type.Options.Collate + if collationName == "" { + // Use the default, which is derived from the mysqld server default set + // in the testenv. + tc.collationID = testenv.DefaultCollationID + } else { + tc.collationID = testenv.CollationEnv.LookupByName(collationName) + } + collation := colldata.Lookup(tc.collationID) switch tc.dataTypeLowered { case "int32": tc.len = lengthInt @@ -385,29 +399,25 @@ func (ts *TestSpec) getFieldEvent(table *schemadiff.CreateTableEntity) *TestFiel tc.len = int64(l) tc.collationID = collations.CollationBinaryID default: - tc.len = 4 * int64(l) - tc.collationID = getDefaultCollationID() - if tc.dataTypeLowered == "char" && strings.Contains(tc.collationName, "bin") { + tc.len = int64(collation.Charset().MaxWidth()) * int64(l) + if tc.dataTypeLowered == "char" && collation.IsBinary() { tc.dataType = "BINARY" } } tc.colType = fmt.Sprintf("%s(%d)", tc.dataTypeLowered, l) case "blob": tc.len = lengthBlob - tc.collationID = collations.CollationBinaryID tc.colType = "blob" + tc.collationID = collations.CollationBinaryID case "text": tc.len = lengthText - tc.collationID = getDefaultCollationID() tc.colType = "text" case "set": tc.len = lengthSet - tc.collationID = getDefaultCollationID() tc.colType = fmt.Sprintf("%s(%s)", tc.dataTypeLowered, strings.Join(col.Type.EnumValues, ",")) ts.metadata[getMetadataKey(table.Name(), tc.name)] = col.Type.EnumValues case "enum": tc.len = int64(len(col.Type.EnumValues) + 1) - tc.collationID = getDefaultCollationID() tc.colType = fmt.Sprintf("%s(%s)", tc.dataTypeLowered, strings.Join(col.Type.EnumValues, ",")) ts.metadata[getMetadataKey(table.Name(), tc.name)] = col.Type.EnumValues default: diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go index c1685c61d13..44f25067dfe 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go @@ -168,7 +168,7 @@ func (rs *rowStreamer) buildPlan() error { Name: st.Name, } - ti.Fields, err = getFields(rs.ctx, rs.cp, st.Name, rs.cp.DBName(), st.Fields) + ti.Fields, err = getFields(rs.ctx, rs.cp, rs.vse.se, st.Name, rs.cp.DBName(), st.Fields) if err != nil { return err } diff --git a/go/vt/vttablet/tabletserver/vstreamer/testenv/testenv.go b/go/vt/vttablet/tabletserver/vstreamer/testenv/testenv.go index c09f73c0354..125fa75416f 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/testenv/testenv.go +++ b/go/vt/vttablet/tabletserver/vstreamer/testenv/testenv.go @@ -25,8 +25,10 @@ import ( "strings" "vitess.io/vitess/go/json2" + "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/dbconfigs" + "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/topo" @@ -43,6 +45,32 @@ import ( const DBName = "vttest" +var ( + // These are exported to coordinate on version specific + // behavior between the testenv and its users. + CollationEnv *collations.Environment + DefaultCollationID collations.ID + MySQLVersion string +) + +func init() { + vs, err := mysqlctl.GetVersionString() + if err != nil { + panic("could not get MySQL version: " + err.Error()) + } + _, mv, err := mysqlctl.ParseVersionString(vs) + if err != nil { + panic("could not parse MySQL version: " + err.Error()) + } + MySQLVersion = fmt.Sprintf("%d.%d.%d", mv.Major, mv.Minor, mv.Patch) + log.Infof("MySQL version: %s", MySQLVersion) + CollationEnv = collations.NewEnvironment(MySQLVersion) + // utf8mb4_general_ci is the default for MySQL 5.7 and + // utf8mb4_0900_ai_ci is the default for MySQL 8.0. + DefaultCollationID = CollationEnv.DefaultConnectionCharset() + log.Infof("Default collation ID: %d", DefaultCollationID) +} + // Env contains all the env vars for a test against a mysql instance. type Env struct { cluster *vttest.LocalCluster @@ -97,7 +125,7 @@ func Init(ctx context.Context) (*Env, error) { }, }, OnlyMySQL: true, - Charset: "utf8mb4_general_ci", + Charset: CollationEnv.LookupName(DefaultCollationID), ExtraMyCnf: strings.Split(os.Getenv("EXTRA_MY_CNF"), ":"), } te.cluster = &vttest.LocalCluster{ @@ -110,7 +138,14 @@ func Init(ctx context.Context) (*Env, error) { te.Dbcfgs = dbconfigs.NewTestDBConfigs(te.cluster.MySQLConnParams(), te.cluster.MySQLAppDebugConnParams(), te.cluster.DbName()) conf := tabletenv.NewDefaultConfig() conf.DB = te.Dbcfgs - te.TabletEnv = tabletenv.NewEnv(vtenv.NewTestEnv(), conf, "VStreamerTest") + vtenvCfg := vtenv.Options{ + MySQLServerVersion: MySQLVersion, + } + vtenv, err := vtenv.New(vtenvCfg) + if err != nil { + return nil, fmt.Errorf("could not initialize new vtenv: %v", err) + } + te.TabletEnv = tabletenv.NewEnv(vtenv, conf, "VStreamerTest") te.Mysqld = mysqlctl.NewMysqld(te.Dbcfgs) pos, _ := te.Mysqld.PrimaryPosition() if strings.HasPrefix(strings.ToLower(pos.GTIDSet.Flavor()), string(mysqlctl.FlavorMariaDB)) { @@ -123,6 +158,9 @@ func Init(ctx context.Context) (*Env, error) { if err != nil { return nil, fmt.Errorf("could not get server version: %w", err) } + if !strings.Contains(dbVersionStr, MySQLVersion) { + return nil, fmt.Errorf("MySQL version mismatch between mysqlctl %s and mysqld %s", MySQLVersion, dbVersionStr) + } _, version, err := mysqlctl.ParseVersionString(dbVersionStr) if err != nil { return nil, fmt.Errorf("could not parse server version %q: %w", dbVersionStr, err) diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index 9c63f8a499c..da0d79a1bca 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -733,7 +733,6 @@ func (vs *vstreamer) buildTablePlan(id uint64, tm *mysql.TableMap) (*binlogdatap if err != nil { return nil, err } - table := &Table{ Name: tm.Name, Fields: cols, @@ -763,12 +762,30 @@ func (vs *vstreamer) buildTablePlan(id uint64, tm *mysql.TableMap) (*binlogdatap func (vs *vstreamer) buildTableColumns(tm *mysql.TableMap) ([]*querypb.Field, error) { var fields []*querypb.Field + var txtFieldIdx int for i, typ := range tm.Types { t, err := sqltypes.MySQLToType(typ, 0) if err != nil { return nil, fmt.Errorf("unsupported type: %d, position: %d", typ, i) } - coll := collations.CollationForType(t, vs.se.Environment().CollationEnv().DefaultConnectionCharset()) + // Use the the collation inherited or the one specified explicitly for the + // column if one was provided in the event's optional metadata (MySQL only + // provides this for text based columns). + var coll collations.ID + switch { + case sqltypes.IsText(t) && len(tm.ColumnCollationIDs) > txtFieldIdx: + coll = tm.ColumnCollationIDs[txtFieldIdx] + txtFieldIdx++ + case t == sqltypes.TypeJSON: + // JSON is a blob at this (storage) layer -- vs the connection/query serving + // layer which CollationForType seems primarily concerned about and JSON at + // the response layer should be using utf-8 as that's the standard -- so we + // should NOT use utf8mb4 as the collation in MySQL for a JSON column is + // NULL, meaning there is not one (same as for int) and we should use binary. + coll = collations.CollationBinaryID + default: // Use the server defined default for the column's type + coll = collations.CollationForType(t, vs.se.Environment().CollationEnv().DefaultConnectionCharset()) + } fields = append(fields, &querypb.Field{ Name: fmt.Sprintf("@%d", i+1), Type: t, @@ -793,7 +810,7 @@ func (vs *vstreamer) buildTableColumns(tm *mysql.TableMap) ([]*querypb.Field, er return fields, nil } - // check if the schema returned by schema.Engine matches with row. + // Check if the schema returned by schema.Engine matches with row. for i := range tm.Types { if !sqltypes.AreTypesEquivalent(fields[i].Type, st.Fields[i].Type) { return fields, nil @@ -801,21 +818,31 @@ func (vs *vstreamer) buildTableColumns(tm *mysql.TableMap) ([]*querypb.Field, er } // Columns should be truncated to match those in tm. - fieldsCopy, err := getFields(vs.ctx, vs.cp, tm.Name, tm.Database, st.Fields[:len(tm.Types)]) + // This uses the historian which queries the columns in the table and uses the + // generated fields metadata. This means that the fields for text types are + // initially using collations for the column types based on the *connection + // collation* and not the actual *column collation*. + // But because we now get the correct collation for the actual column from + // mysqld in getExtColsInfo we know this is the correct one for the vstream + // target and we use that rather than any that were in the binlog events, + // which were for the source and which can be using a different collation + // than the target. + fieldsCopy, err := getFields(vs.ctx, vs.cp, vs.se, tm.Name, tm.Database, st.Fields[:len(tm.Types)]) if err != nil { return nil, err } + return fieldsCopy, nil } -func getExtColInfos(ctx context.Context, cp dbconfigs.Connector, table, database string) (map[string]*extColInfo, error) { +func getExtColInfos(ctx context.Context, cp dbconfigs.Connector, se *schema.Engine, table, database string) (map[string]*extColInfo, error) { extColInfos := make(map[string]*extColInfo) conn, err := cp.Connect(ctx) if err != nil { return nil, err } defer conn.Close() - queryTemplate := "select column_name, column_type from information_schema.columns where table_schema=%s and table_name=%s;" + queryTemplate := "select column_name, column_type, collation_name from information_schema.columns where table_schema=%s and table_name=%s;" query := fmt.Sprintf(queryTemplate, encodeString(database), encodeString(table)) qr, err := conn.ExecuteFetch(query, 10000, false) if err != nil { @@ -825,34 +852,43 @@ func getExtColInfos(ctx context.Context, cp dbconfigs.Connector, table, database extColInfo := &extColInfo{ columnType: row[1].ToString(), } + collationName := row[2].ToString() + var coll collations.ID + if row[2].IsNull() || collationName == "" { + coll = collations.CollationBinaryID + } else { + coll = se.Environment().CollationEnv().LookupByName(collationName) + } + extColInfo.collationID = coll extColInfos[row[0].ToString()] = extColInfo } return extColInfos, nil } -func getFields(ctx context.Context, cp dbconfigs.Connector, table, database string, fields []*querypb.Field) ([]*querypb.Field, error) { +func getFields(ctx context.Context, cp dbconfigs.Connector, se *schema.Engine, table, database string, fields []*querypb.Field) ([]*querypb.Field, error) { // Make a deep copy of the schema.Engine fields as they are pointers and // will be modified by adding ColumnType below fieldsCopy := make([]*querypb.Field, len(fields)) for i, field := range fields { fieldsCopy[i] = field.CloneVT() } - extColInfos, err := getExtColInfos(ctx, cp, table, database) + extColInfos, err := getExtColInfos(ctx, cp, se, table, database) if err != nil { return nil, err } for _, field := range fieldsCopy { if colInfo, ok := extColInfos[field.Name]; ok { field.ColumnType = colInfo.columnType + field.Charset = uint32(colInfo.collationID) } } return fieldsCopy, nil } -// additional column attributes from information_schema.columns. Currently only column_type is used, but -// we expect to add more in the future +// Additional column attributes to get from information_schema.columns. type extColInfo struct { - columnType string + columnType string + collationID collations.ID } func encodeString(in string) string { diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go index 4d1983cd4d3..f64c6699217 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go @@ -27,20 +27,18 @@ import ( "time" "github.com/prometheus/common/version" - - "vitess.io/vitess/go/mysql/replication" - "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" - "vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer/testenv" - + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/mysql/collations" + "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" + "vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer/testenv" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "vitess.io/vitess/go/mysql" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) @@ -78,8 +76,10 @@ func (tfe *TestFieldEvent) String() string { return s } -// TestPlayerNoBlob sets up a new environment with mysql running with binlog_row_image as noblob. It confirms that -// the VEvents created are correct: that they don't contain the missing columns and that the DataColumns bitmap is sent +// TestPlayerNoBlob sets up a new environment with mysql running with +// binlog_row_image as noblob. It confirms that the VEvents created are +// correct: that they don't contain the missing columns and that the +// DataColumns bitmap is sent. func TestNoBlob(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -159,7 +159,8 @@ func TestCellValuePadding(t *testing.T) { ddls: []string{ "create table t1(id int, val binary(4), primary key(val))", "create table t2(id int, val char(4), primary key(val))", - "create table t3(id int, val char(4) collate utf8mb4_bin, primary key(val))"}, + "create table t3(id int, val char(4) collate utf8mb4_bin, primary key(val))", + }, } defer ts.Close() require.NoError(t, ts.Init()) @@ -185,6 +186,34 @@ func TestCellValuePadding(t *testing.T) { ts.Run() } +// TestColumnCollationHandling confirms that we handle column collations +// properly in vstreams now that we parse any optional collation ID values +// in binlog_row_metadata AND we query mysqld for the collation when possible. +func TestColumnCollationHandling(t *testing.T) { + extraCollation := "utf8mb4_ja_0900_as_cs" // Test 2 byte collation ID handling + if strings.HasPrefix(testenv.MySQLVersion, "5.7") { // 5.7 does not support 2 byte collation IDs + extraCollation = "utf8mb4_croatian_ci" + } + ts := &TestSpec{ + t: t, + ddls: []string{ + fmt.Sprintf("create table t1(id int, txt text, val char(4) collate utf8mb4_bin, id2 int, val2 varchar(64) collate utf8mb4_general_ci, valvb varbinary(128), val3 varchar(255) collate %s, primary key(val))", extraCollation), + }, + } + defer ts.Close() + require.NoError(t, ts.Init()) + ts.tests = [][]*TestQuery{{ + {"begin", nil}, + {"insert into t1 values (1, 'aaa', 'aaa', 1, 'aaa', 'aaa', 'aaa')", nil}, + {"insert into t1 values (2, 'bb', 'bb', 1, 'bb', 'bb', 'bb')", nil}, + {"update t1 set id = 11 where val = 'aaa'", []TestRowEvent{ + {spec: &TestRowEventSpec{table: "t1", changes: []TestRowChange{{before: []string{"1", "aaa", "aaa", "1", "aaa", "aaa", "aaa"}, after: []string{"11", "aaa", "aaa", "1", "aaa", "aaa", "aaa"}}}}}, + }}, + {"commit", nil}, + }} + ts.Run() +} + func TestSetStatement(t *testing.T) { if !checkIfOptionIsSupported(t, "log_builtin_as_identified_by_password") { // the combination of setting this option and support for "set password" only works on a few flavors @@ -510,9 +539,9 @@ func TestVStreamCopyWithDifferentFilters(t *testing.T) { t.Skip() } execStatements(t, []string{ - "create table t1(id1 int, id2 int, id3 int, primary key(id1))", - "create table t2a(id1 int, id2 int, primary key(id1))", - "create table t2b(id1 varchar(20), id2 int, primary key(id1))", + "create table t1(id1 int, id2 int, id3 int, primary key(id1)) ENGINE=InnoDB CHARSET=utf8mb4", + "create table t2a(id1 int, id2 int, primary key(id1)) ENGINE=InnoDB CHARSET=utf8mb4", + "create table t2b(id1 varchar(20), id2 int, primary key(id1)) ENGINE=InnoDB CHARSET=utf8mb4", }) defer execStatements(t, []string{ "drop table t1", @@ -557,10 +586,10 @@ func TestVStreamCopyWithDifferentFilters(t *testing.T) { "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t2a\"} completed:true}", "type:COMMIT", "type:BEGIN", - "type:FIELD field_event:{table_name:\"t2b\" fields:{name:\"id1\" type:VARCHAR table:\"t2b\" org_table:\"t2b\" database:\"vttest\" org_name:\"id1\" column_length:80 charset:45 column_type:\"varchar(20)\"} fields:{name:\"id2\" type:INT32 table:\"t2b\" org_table:\"t2b\" database:\"vttest\" org_name:\"id2\" column_length:11 charset:63 column_type:\"int(11)\"}}", + fmt.Sprintf("type:FIELD field_event:{table_name:\"t2b\" fields:{name:\"id1\" type:VARCHAR table:\"t2b\" org_table:\"t2b\" database:\"vttest\" org_name:\"id1\" column_length:80 charset:%d column_type:\"varchar(20)\"} fields:{name:\"id2\" type:INT32 table:\"t2b\" org_table:\"t2b\" database:\"vttest\" org_name:\"id2\" column_length:11 charset:63 column_type:\"int(11)\"}}", testenv.DefaultCollationID), "type:ROW row_event:{table_name:\"t2b\" row_changes:{after:{lengths:1 lengths:1 values:\"a5\"}}}", "type:ROW row_event:{table_name:\"t2b\" row_changes:{after:{lengths:1 lengths:1 values:\"b6\"}}}", - "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t2b\" lastpk:{fields:{name:\"id1\" type:VARCHAR charset:45 flags:20483} rows:{lengths:1 values:\"b\"}}}}", + fmt.Sprintf("type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t2b\" lastpk:{fields:{name:\"id1\" type:VARCHAR charset:%d flags:20483} rows:{lengths:1 values:\"b\"}}}}", testenv.DefaultCollationID), "type:COMMIT", "type:BEGIN", "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t2b\"} completed:true}", @@ -1290,6 +1319,7 @@ func TestDDLAddColumn(t *testing.T) { "commit", }) engine.se.Reload(context.Background()) + env.SchemaEngine.Reload(context.Background()) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -1499,10 +1529,20 @@ func TestBuffering(t *testing.T) { runCases(t, nil, testcases, "", nil) } +// TestBestEffortNameInFieldEvent tests that we make a valid best effort +// attempt to deduce the type and collation in the event of table renames. +// In both cases the varbinary becomes a varchar. We get the correct +// collation information, however, in the binlog_row_metadata in 8.0 but +// not in 5.7. So in 5.7 our best effort uses varchar with its default +// collation for text fields. func TestBestEffortNameInFieldEvent(t *testing.T) { if testing.Short() { t.Skip() } + bestEffortCollation := collations.ID(collations.CollationBinaryID) + if strings.HasPrefix(testenv.MySQLVersion, "5.7") { + bestEffortCollation = testenv.DefaultCollationID + } filter := &binlogdatapb.Filter{ FieldEventMode: binlogdatapb.Filter_BEST_EFFORT, Rules: []*binlogdatapb.Rule{{ @@ -1511,7 +1551,7 @@ func TestBestEffortNameInFieldEvent(t *testing.T) { } // Modeled after vttablet endtoend compatibility tests. execStatements(t, []string{ - "create table vitess_test(id int, val varbinary(128), primary key(id))", + "create table vitess_test(id int, val varbinary(128), primary key(id)) ENGINE=InnoDB CHARSET=utf8mb4", }) position := primaryPosition(t) execStatements(t, []string{ @@ -1531,7 +1571,7 @@ func TestBestEffortNameInFieldEvent(t *testing.T) { // information returned by binlog for val column == varchar (rather than varbinary). output: [][]string{{ `begin`, - `type:FIELD field_event:{table_name:"vitess_test" fields:{name:"@1" type:INT32 charset:63} fields:{name:"@2" type:VARCHAR charset:255}}`, + fmt.Sprintf(`type:FIELD field_event:{table_name:"vitess_test" fields:{name:"@1" type:INT32 charset:63} fields:{name:"@2" type:VARCHAR charset:%d}}`, bestEffortCollation), `type:ROW row_event:{table_name:"vitess_test" row_changes:{after:{lengths:1 lengths:3 values:"1abc"}}}`, `gtid`, `commit`, @@ -1615,12 +1655,12 @@ func TestTypes(t *testing.T) { // Modeled after vttablet endtoend compatibility tests. execStatements(t, []string{ - "create table vitess_ints(tiny tinyint, tinyu tinyint unsigned, small smallint, smallu smallint unsigned, medium mediumint, mediumu mediumint unsigned, normal int, normalu int unsigned, big bigint, bigu bigint unsigned, y year, primary key(tiny))", - "create table vitess_fracts(id int, deci decimal(5,2), num numeric(5,2), f float, d double, primary key(id))", - "create table vitess_strings(vb varbinary(16), c char(16), vc varchar(16), b binary(4), tb tinyblob, bl blob, ttx tinytext, tx text, en enum('a','b'), s set('a','b'), primary key(vb))", - "create table vitess_misc(id int, b bit(8), d date, dt datetime, t time, g geometry, primary key(id))", - "create table vitess_null(id int, val varbinary(128), primary key(id))", - "create table vitess_decimal(id int, dec1 decimal(12,4), dec2 decimal(13,4), primary key(id))", + "create table vitess_ints(tiny tinyint, tinyu tinyint unsigned, small smallint, smallu smallint unsigned, medium mediumint, mediumu mediumint unsigned, normal int, normalu int unsigned, big bigint, bigu bigint unsigned, y year, primary key(tiny)) ENGINE=InnoDB CHARSET=utf8mb4", + "create table vitess_fracts(id int, deci decimal(5,2), num numeric(5,2), f float, d double, primary key(id)) ENGINE=InnoDB CHARSET=utf8mb4", + "create table vitess_strings(vb varbinary(16), c char(16), vc varchar(16), b binary(4), tb tinyblob, bl blob, ttx tinytext, tx text, en enum('a','b'), s set('a','b'), primary key(vb)) ENGINE=InnoDB CHARSET=utf8mb4", + "create table vitess_misc(id int, b bit(8), d date, dt datetime, t time, g geometry, primary key(id)) ENGINE=InnoDB CHARSET=utf8mb4", + "create table vitess_null(id int, val varbinary(128), primary key(id)) ENGINE=InnoDB CHARSET=utf8mb4", + "create table vitess_decimal(id int, dec1 decimal(12,4), dec2 decimal(13,4), primary key(id)) ENGINE=InnoDB CHARSET=utf8mb4", }) defer execStatements(t, []string{ "drop table vitess_ints", @@ -1673,7 +1713,7 @@ func TestTypes(t *testing.T) { }, output: [][]string{{ `begin`, - `type:FIELD field_event:{table_name:"vitess_strings" fields:{name:"vb" type:VARBINARY table:"vitess_strings" org_table:"vitess_strings" database:"vttest" org_name:"vb" column_length:16 charset:63 column_type:"varbinary(16)"} fields:{name:"c" type:CHAR table:"vitess_strings" org_table:"vitess_strings" database:"vttest" org_name:"c" column_length:64 charset:45 column_type:"char(16)"} fields:{name:"vc" type:VARCHAR table:"vitess_strings" org_table:"vitess_strings" database:"vttest" org_name:"vc" column_length:64 charset:45 column_type:"varchar(16)"} fields:{name:"b" type:BINARY table:"vitess_strings" org_table:"vitess_strings" database:"vttest" org_name:"b" column_length:4 charset:63 column_type:"binary(4)"} fields:{name:"tb" type:BLOB table:"vitess_strings" org_table:"vitess_strings" database:"vttest" org_name:"tb" column_length:255 charset:63 column_type:"tinyblob"} fields:{name:"bl" type:BLOB table:"vitess_strings" org_table:"vitess_strings" database:"vttest" org_name:"bl" column_length:65535 charset:63 column_type:"blob"} fields:{name:"ttx" type:TEXT table:"vitess_strings" org_table:"vitess_strings" database:"vttest" org_name:"ttx" column_length:1020 charset:45 column_type:"tinytext"} fields:{name:"tx" type:TEXT table:"vitess_strings" org_table:"vitess_strings" database:"vttest" org_name:"tx" column_length:262140 charset:45 column_type:"text"} fields:{name:"en" type:ENUM table:"vitess_strings" org_table:"vitess_strings" database:"vttest" org_name:"en" column_length:4 charset:45 column_type:"enum('a','b')"} fields:{name:"s" type:SET table:"vitess_strings" org_table:"vitess_strings" database:"vttest" org_name:"s" column_length:12 charset:45 column_type:"set('a','b')"}}`, + fmt.Sprintf(`type:FIELD field_event:{table_name:"vitess_strings" fields:{name:"vb" type:VARBINARY table:"vitess_strings" org_table:"vitess_strings" database:"vttest" org_name:"vb" column_length:16 charset:63 column_type:"varbinary(16)"} fields:{name:"c" type:CHAR table:"vitess_strings" org_table:"vitess_strings" database:"vttest" org_name:"c" column_length:64 charset:%d column_type:"char(16)"} fields:{name:"vc" type:VARCHAR table:"vitess_strings" org_table:"vitess_strings" database:"vttest" org_name:"vc" column_length:64 charset:%d column_type:"varchar(16)"} fields:{name:"b" type:BINARY table:"vitess_strings" org_table:"vitess_strings" database:"vttest" org_name:"b" column_length:4 charset:63 column_type:"binary(4)"} fields:{name:"tb" type:BLOB table:"vitess_strings" org_table:"vitess_strings" database:"vttest" org_name:"tb" column_length:255 charset:63 column_type:"tinyblob"} fields:{name:"bl" type:BLOB table:"vitess_strings" org_table:"vitess_strings" database:"vttest" org_name:"bl" column_length:65535 charset:63 column_type:"blob"} fields:{name:"ttx" type:TEXT table:"vitess_strings" org_table:"vitess_strings" database:"vttest" org_name:"ttx" column_length:1020 charset:%d column_type:"tinytext"} fields:{name:"tx" type:TEXT table:"vitess_strings" org_table:"vitess_strings" database:"vttest" org_name:"tx" column_length:262140 charset:%d column_type:"text"} fields:{name:"en" type:ENUM table:"vitess_strings" org_table:"vitess_strings" database:"vttest" org_name:"en" column_length:4 charset:%d column_type:"enum('a','b')"} fields:{name:"s" type:SET table:"vitess_strings" org_table:"vitess_strings" database:"vttest" org_name:"s" column_length:12 charset:%d column_type:"set('a','b')"}}`, testenv.DefaultCollationID, testenv.DefaultCollationID, testenv.DefaultCollationID, testenv.DefaultCollationID, testenv.DefaultCollationID, testenv.DefaultCollationID), `type:ROW row_event:{table_name:"vitess_strings" row_changes:{after:{lengths:1 lengths:1 lengths:1 lengths:4 lengths:1 lengths:1 lengths:1 lengths:1 lengths:1 lengths:1 ` + `values:"abcd\x00\x00\x00efgh13"}}}`, `gtid`, @@ -2100,7 +2140,6 @@ func TestGeneratedInvisiblePrimaryKey(t *testing.T) { } func runCases(t *testing.T, filter *binlogdatapb.Filter, testcases []testcase, position string, tablePK []*binlogdatapb.TableLastPK) { - ctx, cancel := context.WithCancel(context.Background()) defer cancel() wg, ch := startStream(ctx, t, filter, position, tablePK)