diff --git a/go/mysql/binlog_event_compression.go b/go/mysql/binlog_event_compression.go index 7455218d4b5..37d28431087 100644 --- a/go/mysql/binlog_event_compression.go +++ b/go/mysql/binlog_event_compression.go @@ -98,6 +98,11 @@ type TransactionPayload struct { payload []byte reader io.Reader iterator func() (BinlogEvent, error) + // StreamingContents tells the consumer that we are streaming the + // decompressed payload and they should also stream the events. + // This ensures that neither the producer nor the consumer are + // holding the entire payload's contents in memory. + StreamingContents bool } // IsTransactionPayload returns true if a compressed transaction @@ -292,6 +297,8 @@ func (tp *TransactionPayload) decompress() error { } compressedTrxPayloadsUsingStream.Add(1) tp.reader = streamDecoder + // Signal the consumer to also stream the contents. + tp.StreamingContents = true return nil } diff --git a/go/mysql/binlog_event_mysql56_test.go b/go/mysql/binlog_event_mysql56_test.go index 861d98c6e4f..5844779de63 100644 --- a/go/mysql/binlog_event_mysql56_test.go +++ b/go/mysql/binlog_event_mysql56_test.go @@ -186,9 +186,11 @@ func TestMysql56DecodeTransactionPayload(t *testing.T) { } } if tc.inMemory { + require.False(t, tp.StreamingContents) require.Equal(t, memDecodingCnt+1, compressedTrxPayloadsInMem.Get()) require.Equal(t, tc.want, eventStrs) } else { + require.True(t, tp.StreamingContents) require.Equal(t, streamDecodingCnt+1, compressedTrxPayloadsUsingStream.Get()) require.Len(t, eventStrs, len(tc.want)) totalSize := 0 diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index ea7f75cdc38..59db723ff2b 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -375,7 +375,7 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog } return fmt.Errorf("unexpected server EOF") } - vevents, err := vs.parseEvent(ev) + vevents, err := vs.parseEvent(ev, bufferAndTransmit) if err != nil { vs.vse.errorCounts.Add("ParseEvent", 1) return err @@ -416,7 +416,11 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog } // parseEvent parses an event from the binlog and converts it to a list of VEvents. -func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, error) { +// The bufferAndTransmit function must be passed if the event is a TransactionPayloadEvent +// as for larger payloads (> ZstdInMemoryDecompressorMaxSize) the internal events need +// to be streamed directly here in order to avoid holding the entire payload's contents, +// which can be 10s or even 100s of GiBs, all in memory. +func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent, bufferAndTransmit func(vevent *binlogdatapb.VEvent) error) ([]*binlogdatapb.VEvent, error) { if !ev.IsValid() { return nil, fmt.Errorf("can't parse binlog event: invalid data: %#v", ev) } @@ -672,11 +676,31 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e } return nil, err } - tpvevents, err := vs.parseEvent(tpevent) + tpvevents, err := vs.parseEvent(tpevent, nil) // Parse the internal event if err != nil { return nil, vterrors.Wrap(err, "failed to parse transaction payload's internal event") } - vevents = append(vevents, tpvevents...) + if tp.StreamingContents { + // Transmit each internal event individually to avoid buffering + // the large transaction's entire payload of events in memory, as + // the uncompressed size can be 10s or even 100s of GiBs in size. + if bufferAndTransmit == nil { + return nil, vterrors.New(vtrpcpb.Code_INTERNAL, "[bug] cannot stream compressed transaction payload's internal events as no bufferAndTransmit function was provided") + } + for _, tpvevent := range tpvevents { + tpvevent.Timestamp = int64(ev.Timestamp()) + tpvevent.CurrentTime = time.Now().UnixNano() + if err := bufferAndTransmit(tpvevent); err != nil { + if err == io.EOF { + return nil, nil + } + vs.vse.errorCounts.Add("TransactionPayloadBufferAndTransmit", 1) + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "error sending compressed transaction payload's internal event: %v", err) + } + } + } else { // Process the payload's internal events all at once + vevents = append(vevents, tpvevents...) + } } vs.vse.vstreamerCompressedTransactionsDecoded.Add(1) }