Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
firelizzard18 committed Apr 25, 2024
1 parent d668f21 commit cbb76ec
Showing 1 changed file with 62 additions and 18 deletions.
80 changes: 62 additions & 18 deletions pkg/database/keyvalue/block/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type blockLocation struct {
type recordLocation struct {
file uint
block uint64
header int64
offset int64
length int64
}
Expand Down Expand Up @@ -154,7 +155,13 @@ func Open(path string, options ...Option) (_ *Database, err error) {
return nil, fmt.Errorf("%v is corrupted", f.file.Name())
}

records.Put(e.Key.Hash(), recordLocation{file: uint(fileNo), block: *block, offset: offset, length: e.Length})
records.Put(e.Key.Hash(), recordLocation{
file: uint(fileNo),
block: *block,
header: start + 2, // The header has a 2 byte length prefix
offset: offset,
length: e.Length,
})

if e.Length <= 0 {
continue
Expand Down Expand Up @@ -182,9 +189,12 @@ func (db *Database) newFile() (*blockFile, error) {

// Create a new file
name := db.nameFn(len(db.files))
if !filepath.IsAbs(name) {
name = filepath.Join(db.path, name)
if name == "" {
return nil, fmt.Errorf("invalid block file name: empty")
} else if filepath.Base(name) != name {
return nil, fmt.Errorf("invalid block file name: %q contains a slash or is empty", name)
}
name = filepath.Join(db.path, name)

f, err := os.OpenFile(name, os.O_RDWR|os.O_EXCL|os.O_CREATE, 0600)
if err != nil {
Expand Down Expand Up @@ -248,7 +258,10 @@ func (d *Database) get(view *recordsView, key *record.Key) ([]byte, error) {
if !ok || loc.length < 0 {
return nil, (*database.NotFoundError)(key)
}
return d.read(loc)
if !d.validLoc(loc) {
return nil, errors.InternalError.WithFormat("record is corrupted")
}
return d.read(loc), nil
}

func (d *Database) forEach(view *recordsView, fn func(*record.Key, []byte) error) error {
Expand All @@ -257,28 +270,57 @@ func (d *Database) forEach(view *recordsView, fn func(*record.Key, []byte) error
if loc.length < 0 {
return nil
}
if !d.validLoc(loc) {
return errors.InternalError.WithFormat("record is corrupted")
}

value, err := d.read(loc)
header, err := d.readHeader(loc)
if err != nil {
return err
}

return fn(record.KeyFromHash(key), value)
return fn(header.Key, d.read(loc))
})
}

func (d *Database) read(loc recordLocation) ([]byte, error) {
if loc.file >= uint(len(d.files)) {
return nil, errors.InternalError.WithFormat("record is corrupted")
func (d *Database) validLoc(loc recordLocation) bool {
// If any of these conditions fail, there's a bug. Record locations are
// initialized from disk, so any issue there indicates corruption or an
// initialization bug. Record locations are only ever added by Commit, which
// writes the records to disk and remaps them before committing the record
// locations, so any issue there indicates a bug.
switch {
case loc.header < 0 || loc.offset < 0 || loc.header > loc.offset:
// Corrupted offsets
return false

case loc.file >= uint(len(d.files)):
// loc.file is invalid
return false

case d.files[loc.file].mmap == nil:
// File is not memory mapped
return false

case loc.offset+loc.length > int64(len(d.files[loc.file].mmap)):
// Requested range is outside the memory mapped region
return false
}
return true
}

func (d *Database) read(loc recordLocation) []byte {
b := make([]byte, loc.length)
n := copy(b, d.files[loc.file].mmap[loc.offset:])
if n < len(b) {
return nil, io.ErrUnexpectedEOF
}
return b, nil
copy(b, d.files[loc.file].mmap[loc.offset:])
return b
}

func (d *Database) readHeader(loc recordLocation) (*recordEntry, error) {
b := make([]byte, loc.length)
copy(b, d.files[loc.file].mmap[loc.header:loc.offset])
e := new(recordEntry)
err := e.UnmarshalBinary(b)
return e, err
}

func (d *Database) commit(view *recordsView, entries map[[32]byte]memory.Entry) error {
Expand Down Expand Up @@ -310,10 +352,12 @@ func (d *Database) commit(view *recordsView, entries map[[32]byte]memory.Entry)
// Time for a new file?
if offset >= int64(d.fileLimit) {
// Close the block
haveBlock = false
_, err = writeEntry(f.file, &endBlockEntry{})
if err != nil {
return err
if haveBlock {
haveBlock = false
_, err = writeEntry(f.file, &endBlockEntry{})
if err != nil {
return err
}
}

// Remap the file
Expand Down

0 comments on commit cbb76ec

Please sign in to comment.