Skip to content

Commit

Permalink
fail fast
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 27, 2024
1 parent 4ac03f0 commit 33e90f3
Showing 1 changed file with 4 additions and 6 deletions.
10 changes: 4 additions & 6 deletions flow/connectors/mysql/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,10 +289,8 @@ func qvalueFromMysql(mytype byte, qkind qvalue.QValueKind, val any) qvalue.QValu
mysql.MYSQL_TYPE_STRING:
return qvalue.QValueString{Val: val}
}
default:
panic(fmt.Sprintf("unexpected type %T for mysql type %d", val, mytype))
}
return nil
panic(fmt.Sprintf("unexpected type %T for mysql type %d", val, mytype))
}

func (c *MySqlConnector) PullRecords(
Expand Down Expand Up @@ -367,7 +365,7 @@ func (c *MySqlConnector) PullRecords(
switch event.Header.EventType {
case replication.WRITE_ROWS_EVENTv0, replication.UPDATE_ROWS_EVENTv0, replication.DELETE_ROWS_EVENTv0:
return errors.New("mysql v0 replication protocol not supported")
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2, replication.MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1:
for idx, val := range row {
fd := schema.Columns[idx]
items.AddColumn(fd.Name, qvalueFromMysql(ev.Table.ColumnType[idx], qvalue.QValueKind(fd.Type), val))
Expand All @@ -378,7 +376,7 @@ func (c *MySqlConnector) PullRecords(
SourceTableName: sourceTableName,
DestinationTableName: destinationTableName,
}
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2, replication.MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1:
oldItems := model.NewRecordItems(len(row) / 2)
for idx, val := range row {
fd := schema.Columns[idx>>1]
Expand All @@ -396,7 +394,7 @@ func (c *MySqlConnector) PullRecords(
SourceTableName: sourceTableName,
DestinationTableName: destinationTableName,
}
case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2, replication.MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1:
for idx, val := range row {
fd := schema.Columns[idx]
items.AddColumn(fd.Name, qvalueFromMysql(ev.Table.ColumnType[idx], qvalue.QValueKind(fd.Type), val))
Expand Down

0 comments on commit 33e90f3

Please sign in to comment.