Skip to content

Commit

Permalink
Add a ReadAt method to the snapshot record reader [#3434]
Browse files Browse the repository at this point in the history
  • Loading branch information
firelizzard18 committed Oct 3, 2023
1 parent ac651d6 commit b5714f1
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 13 deletions.
54 changes: 54 additions & 0 deletions pkg/database/snapshot/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,43 @@ func readRecord(rd io.Reader) ([]byte, int64, error) {
return b, int64(n), nil
}

// readRecordAt reads a length-prefixed record
func readRecordAt(rd io.ReaderAt, off int64) ([]byte, int64, error) {
// Read the length
var v [8]byte
n, err := readFullAt(rd, off, v[:])
if err != nil {
return nil, int64(n), err
}
l := binary.BigEndian.Uint64(v[:])

// Read the data
b := make([]byte, l)
m, err := readFullAt(rd, off+int64(len(v)), b)
n += m
if err != nil {
return nil, int64(n), err
}

return b, int64(n), nil
}

func readFullAt(rd io.ReaderAt, off int64, b []byte) (int, error) {
var m int
for len(b) > 0 {
n, err := rd.ReadAt(b, off)
m += n
if err != nil {
return m, err
}
if n == 0 {
return m, errors.InternalError.With("read nothing")
}
b = b[n:]
}
return m, nil
}

// readValue unmarshals a length-prefixed record into the value.
func readValue(rd io.Reader, v encoding.BinaryValue) (int64, error) {
// Read the record
Expand All @@ -144,3 +181,20 @@ func readValue(rd io.Reader, v encoding.BinaryValue) (int64, error) {

return n, nil
}

// readValueAt unmarshals a length-prefixed record into the value.
func readValueAt(rd io.ReaderAt, off int64, v encoding.BinaryValue) (int64, error) {
// Read the record
b, n, err := readRecordAt(rd, off)
if err != nil {
return n, err
}

// Unmarshal the header
err = v.UnmarshalBinary(b)
if err != nil {
return n, errors.EncodingError.WithFormat("unmarshal: %w", err)
}

return n, nil
}
33 changes: 27 additions & 6 deletions pkg/database/snapshot/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ const Version2 = 2

type rawWriter = ioutil.SegmentedWriter[SectionType, *SectionType]
type sectionReader = ioutil.Segment[SectionType, *SectionType]
type sectionWriter = ioutil.SegmentWriter[SectionType, *SectionType]

func GetVersion(file ioutil.SectionReader) (uint64, error) {
r, err := open(file)
Expand Down Expand Up @@ -147,7 +146,7 @@ func (r *Reader) OpenRecords(i int) (RecordReader, error) {
return nil, errors.UnknownError.Wrap(err)
}
bufio.NewReader(rd)
return &recordReader{rd}, nil
return recordReader{rd}, nil
}

func (r *Reader) OpenBPT(i int) (RecordReader, error) {
Expand All @@ -156,9 +155,9 @@ func (r *Reader) OpenBPT(i int) (RecordReader, error) {
return nil, errors.UnknownError.Wrap(err)
}
if s.Type() == SectionTypeRawBPT {
return &rawBptReader{rd}, nil
return rawBptReader{rd}, nil
}
return &recordReader{rd}, nil
return recordReader{rd}, nil
}

type IndexReader struct {
Expand All @@ -175,23 +174,32 @@ func (i *IndexReader) Read(n int) (*RecordIndexEntry, error) {
type RecordReader interface {
io.Seeker
Read() (*RecordEntry, error)
ReadAt(offset int64) (*RecordEntry, error)
}

// recordReader reads length-prefixed record entries.
type recordReader struct {
ioutil.SectionReader
}

func (r *recordReader) Read() (*RecordEntry, error) {
func (r recordReader) Read() (*RecordEntry, error) {
v := new(RecordEntry)
_, err := readValue(r.SectionReader, v)
return v, err
}

func (r recordReader) ReadAt(offset int64) (*RecordEntry, error) {
v := new(RecordEntry)
_, err := readValueAt(r.SectionReader, offset, v)
return v, err
}

// rawBptReader reads (key hash, value) BPT pairs.
type rawBptReader struct {
ioutil.SectionReader
}

func (r *rawBptReader) Read() (*RecordEntry, error) {
func (r rawBptReader) Read() (*RecordEntry, error) {
var b [64]byte
_, err := io.ReadFull(r.SectionReader, b[:])
if err != nil {
Expand All @@ -204,6 +212,19 @@ func (r *rawBptReader) Read() (*RecordEntry, error) {
}, nil
}

func (r rawBptReader) ReadAt(offset int64) (*RecordEntry, error) {
var b [64]byte
_, err := readFullAt(r.SectionReader, offset, b[:])
if err != nil {
return nil, err
}

return &RecordEntry{
Key: record.KeyFromHash(*(*[32]byte)(b[:32])),
Value: b[32:],
}, nil
}

type Writer struct {
wr *rawWriter
wroteHeader bool
Expand Down
8 changes: 1 addition & 7 deletions pkg/database/snapshot/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package snapshot

import (
"bytes"
"io"
"sort"

"gitlab.com/accumulatenetwork/accumulate/exp/ioutil"
Expand Down Expand Up @@ -91,12 +90,7 @@ func (s *Store) Get(key *record.Key) ([]byte, error) {
return nil, errors.UnknownError.WithFormat("open index: %w", err)
}

_, err = rd.Seek(int64(x.Offset), io.SeekStart)
if err != nil {
return nil, errors.BadRequest.WithFormat("seek to record: %w", err)
}

entry, err := (&recordReader{rd}).Read()
entry, err := recordReader{rd}.ReadAt(int64(x.Offset))
if err != nil {
return nil, errors.UnknownError.Wrap(err)
}
Expand Down

0 comments on commit b5714f1

Please sign in to comment.