Skip to content

Commit

Permalink
Updated to only filter event 164 (START_ENCRYPTION_EVENT) and ignorab…
Browse files Browse the repository at this point in the history
…le flag

Shopify#296
  • Loading branch information
HemeraOne committed Jul 12, 2021
1 parent 6ce1aa3 commit 5b127c0
Showing 1 changed file with 6 additions and 24 deletions.
30 changes: 6 additions & 24 deletions binlog_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ func (s *BinlogStreamer) Run() {

isEventPositionResumable := false
isEventPositionValid := true
skipEvent := false

switch e := ev.Event.(type) {
case *replication.RotateEvent:
Expand Down Expand Up @@ -268,31 +267,14 @@ func (s *BinlogStreamer) Run() {
// or the end of the current/next transaction. As such, the query will be
// reset following the next RowsQueryEvent before the corresponding RowsEvent(s)
query = nil
case *replication.TableMapEvent:
skipEvent = true//we can skip this event
case *replication.GenericEvent:
if ev.Header.Flags == replication.LOG_EVENT_IGNORABLE_F {
skipEvent = true//we can skip this event
if ev.RawData[4] == 164 && ev.Header.Flags == replication.LOG_EVENT_IGNORABLE_F {
// This event is published in binlog when encrypt_binlog is enabled for MariaDB
// event 164 shouldn't be published to replica but actually is, as it is not relevant
// since we receive binlogs unencrypted we can skip this event
// https://mariadb.com/kb/en/start_encryption_event/
isEventPositionValid = false
}
case *replication.QueryEvent:
// This event is published in binlog when not using ROW binlog_format
// or when MariaDB sees that replica can't handle annotations
// these always contain "BEGIN" or "# Dummy event replacing event type %u that slave cannot handle."
// this means that we can check for # as this is a comment in a query
queryEventQuery := ev.Event.(*replication.QueryEvent).Query
queryString := string(queryEventQuery)
if queryString == "BEGIN" {
skipEvent = true//we can skip this event
} else if queryEventQuery[0] == 35 {//35 is #
skipEvent = true//we can skip this event
} else {
err := fmt.Errorf("failed to handle query event query: %s", queryString)
s.logger.WithError(err).Error("failed to handle query event")
s.ErrorHandler.Fatal("binlog_streamer", err)
}
}
if skipEvent {
continue
}

if isEventPositionValid {
Expand Down

0 comments on commit 5b127c0

Please sign in to comment.