diff --git a/flow/connectors/mysql/cdc.go b/flow/connectors/mysql/cdc.go index faea9a8b2..894d944a6 100644 --- a/flow/connectors/mysql/cdc.go +++ b/flow/connectors/mysql/cdc.go @@ -289,10 +289,8 @@ func qvalueFromMysql(mytype byte, qkind qvalue.QValueKind, val any) qvalue.QValu mysql.MYSQL_TYPE_STRING: return qvalue.QValueString{Val: val} } - default: - panic(fmt.Sprintf("unexpected type %T for mysql type %d", val, mytype)) } - return nil + panic(fmt.Sprintf("unexpected type %T for mysql type %d", val, mytype)) } func (c *MySqlConnector) PullRecords( @@ -367,7 +365,7 @@ func (c *MySqlConnector) PullRecords( switch event.Header.EventType { case replication.WRITE_ROWS_EVENTv0, replication.UPDATE_ROWS_EVENTv0, replication.DELETE_ROWS_EVENTv0: return errors.New("mysql v0 replication protocol not supported") - case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2: + case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2, replication.MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1: for idx, val := range row { fd := schema.Columns[idx] items.AddColumn(fd.Name, qvalueFromMysql(ev.Table.ColumnType[idx], qvalue.QValueKind(fd.Type), val)) @@ -378,7 +376,7 @@ func (c *MySqlConnector) PullRecords( SourceTableName: sourceTableName, DestinationTableName: destinationTableName, } - case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2: + case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2, replication.MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1: oldItems := model.NewRecordItems(len(row) / 2) for idx, val := range row { fd := schema.Columns[idx>>1] @@ -396,7 +394,7 @@ func (c *MySqlConnector) PullRecords( SourceTableName: sourceTableName, DestinationTableName: destinationTableName, } - case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2: + case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2, replication.MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1: for idx, val := range row { fd := schema.Columns[idx] items.AddColumn(fd.Name, qvalueFromMysql(ev.Table.ColumnType[idx], qvalue.QValueKind(fd.Type), val))