Skip to content

Commit

Permalink
VStreamer: For larger compressed transaction payloads, stream the int…
Browse files Browse the repository at this point in the history
…ernal contents (#17239)

Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored Dec 6, 2024
1 parent f9acb77 commit 747a61c
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 4 deletions.
7 changes: 7 additions & 0 deletions go/mysql/binlog_event_compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ type TransactionPayload struct {
payload []byte
reader io.Reader
iterator func() (BinlogEvent, error)
// StreamingContents tells the consumer that we are streaming the
// decompressed payload and they should also stream the events.
// This ensures that neither the producer nor the consumer are
// holding the entire payload's contents in memory.
StreamingContents bool
}

// IsTransactionPayload returns true if a compressed transaction
Expand Down Expand Up @@ -292,6 +297,8 @@ func (tp *TransactionPayload) decompress() error {
}
compressedTrxPayloadsUsingStream.Add(1)
tp.reader = streamDecoder
// Signal the consumer to also stream the contents.
tp.StreamingContents = true
return nil
}

Expand Down
2 changes: 2 additions & 0 deletions go/mysql/binlog_event_mysql56_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,11 @@ func TestMysql56DecodeTransactionPayload(t *testing.T) {
}
}
if tc.inMemory {
require.False(t, tp.StreamingContents)
require.Equal(t, memDecodingCnt+1, compressedTrxPayloadsInMem.Get())
require.Equal(t, tc.want, eventStrs)
} else {
require.True(t, tp.StreamingContents)
require.Equal(t, streamDecodingCnt+1, compressedTrxPayloadsUsingStream.Get())
require.Len(t, eventStrs, len(tc.want))
totalSize := 0
Expand Down
32 changes: 28 additions & 4 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
}
return fmt.Errorf("unexpected server EOF")
}
vevents, err := vs.parseEvent(ev)
vevents, err := vs.parseEvent(ev, bufferAndTransmit)
if err != nil {
vs.vse.errorCounts.Add("ParseEvent", 1)
return err
Expand Down Expand Up @@ -416,7 +416,11 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
}

// parseEvent parses an event from the binlog and converts it to a list of VEvents.
func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, error) {
// The bufferAndTransmit function must be passed if the event is a TransactionPayloadEvent
// as for larger payloads (> ZstdInMemoryDecompressorMaxSize) the internal events need
// to be streamed directly here in order to avoid holding the entire payload's contents,
// which can be 10s or even 100s of GiBs, all in memory.
func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent, bufferAndTransmit func(vevent *binlogdatapb.VEvent) error) ([]*binlogdatapb.VEvent, error) {
if !ev.IsValid() {
return nil, fmt.Errorf("can't parse binlog event: invalid data: %#v", ev)
}
Expand Down Expand Up @@ -672,11 +676,31 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e
}
return nil, err
}
tpvevents, err := vs.parseEvent(tpevent)
tpvevents, err := vs.parseEvent(tpevent, nil) // Parse the internal event
if err != nil {
return nil, vterrors.Wrap(err, "failed to parse transaction payload's internal event")
}
vevents = append(vevents, tpvevents...)
if tp.StreamingContents {
// Transmit each internal event individually to avoid buffering
// the large transaction's entire payload of events in memory, as
// the uncompressed size can be 10s or even 100s of GiBs in size.
if bufferAndTransmit == nil {
return nil, vterrors.New(vtrpcpb.Code_INTERNAL, "[bug] cannot stream compressed transaction payload's internal events as no bufferAndTransmit function was provided")
}
for _, tpvevent := range tpvevents {
tpvevent.Timestamp = int64(ev.Timestamp())
tpvevent.CurrentTime = time.Now().UnixNano()
if err := bufferAndTransmit(tpvevent); err != nil {
if err == io.EOF {
return nil, nil
}
vs.vse.errorCounts.Add("TransactionPayloadBufferAndTransmit", 1)
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "error sending compressed transaction payload's internal event: %v", err)
}
}
} else { // Process the payload's internal events all at once
vevents = append(vevents, tpvevents...)
}
}
vs.vse.vstreamerCompressedTransactionsDecoded.Add(1)
}
Expand Down

0 comments on commit 747a61c

Please sign in to comment.