From 33e90f3c0e5f5acc5e07396a4fa80a67c7c5e4fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 27 Dec 2024 21:20:20 +0000 Subject: [PATCH] fail fast --- flow/connectors/mysql/cdc.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/flow/connectors/mysql/cdc.go b/flow/connectors/mysql/cdc.go index faea9a8b29..894d944a67 100644 --- a/flow/connectors/mysql/cdc.go +++ b/flow/connectors/mysql/cdc.go @@ -289,10 +289,8 @@ func qvalueFromMysql(mytype byte, qkind qvalue.QValueKind, val any) qvalue.QValu mysql.MYSQL_TYPE_STRING: return qvalue.QValueString{Val: val} } - default: - panic(fmt.Sprintf("unexpected type %T for mysql type %d", val, mytype)) } - return nil + panic(fmt.Sprintf("unexpected type %T for mysql type %d", val, mytype)) } func (c *MySqlConnector) PullRecords( @@ -367,7 +365,7 @@ func (c *MySqlConnector) PullRecords( switch event.Header.EventType { case replication.WRITE_ROWS_EVENTv0, replication.UPDATE_ROWS_EVENTv0, replication.DELETE_ROWS_EVENTv0: return errors.New("mysql v0 replication protocol not supported") - case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2: + case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2, replication.MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1: for idx, val := range row { fd := schema.Columns[idx] items.AddColumn(fd.Name, qvalueFromMysql(ev.Table.ColumnType[idx], qvalue.QValueKind(fd.Type), val)) @@ -378,7 +376,7 @@ func (c *MySqlConnector) PullRecords( SourceTableName: sourceTableName, DestinationTableName: destinationTableName, } - case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2: + case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2, replication.MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1: oldItems := model.NewRecordItems(len(row) / 2) for idx, val := range row { fd := schema.Columns[idx>>1] @@ -396,7 +394,7 @@ func (c *MySqlConnector) PullRecords( SourceTableName: sourceTableName, DestinationTableName: destinationTableName, } - case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2: + case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2, replication.MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1: for idx, val := range row { fd := schema.Columns[idx] items.AddColumn(fd.Name, qvalueFromMysql(ev.Table.ColumnType[idx], qvalue.QValueKind(fd.Type), val))