diff --git a/internal/sqlite/binlog_engine.go b/internal/sqlite/binlog_engine.go index b0e0c2b3e..81c8f49d5 100644 --- a/internal/sqlite/binlog_engine.go +++ b/internal/sqlite/binlog_engine.go @@ -2,6 +2,7 @@ package sqlite import ( "errors" + "io" "time" binlog2 "github.com/vkcom/statshouse/internal/vkgo/binlog" @@ -24,6 +25,19 @@ const ( maxReplicaQueueBytes = 1 << 30 // 1GB ) +func isEOFErr(err error) bool { + return err == binlog2.ErrorNotEnoughData || + err == io.EOF || + err == io.ErrUnexpectedEOF || + errors.Is(err, binlog2.ErrorNotEnoughData) || + errors.Is(err, io.EOF) || + errors.Is(err, io.ErrUnexpectedEOF) +} + +func isExpectedError(err error) bool { + return errors.Is(err, binlog2.ErrorUnknownMagic) || isEOFErr(err) +} + // Apply is used when re-reading or when working as a replica func (impl *binlogEngineReplicaImpl) Apply(payload []byte) (newOffset int64, err error) { defer impl.e.opt.StatsOptions.measureActionDurationSince("engine_apply", time.Now()) @@ -83,7 +97,7 @@ func (impl *binlogEngineReplicaImpl) apply(payload []byte) (newOffset int64, err err1 := binlogUpdateOffset(conn, newOffset) if err != nil { errToReturn = err - if !errors.Is(err, binlog2.ErrorUnknownMagic) && !errors.Is(err, binlog2.ErrorNotEnoughData) { + if !isExpectedError(err) { return err } }