Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

forky: Fixed Chunk Data Size Store #2017

Open
wants to merge 25 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
a2d5edc
storage/localstore: integrate fcds
janos Dec 5, 2019
2506b88
storage/{fcds,localstore}: add comments and minor adjustments
janos Dec 6, 2019
c8f3622
storage/fcds: add doc.go
janos Dec 6, 2019
ec14da3
cmd/swarm, storage/localstore: support breaking migrations
janos Dec 9, 2019
e352b88
cmd/swarm, storage/localstore: improve migrations, export and import
janos Dec 9, 2019
c8c16e1
storage/localstore: export pins
janos Dec 9, 2019
66aa7fd
Merge branch 'master' into fcds
janos Dec 9, 2019
7aed3b1
storage/{fcds,localstore}: address Viktor's comments
janos Dec 12, 2019
26bcb48
storage/fcds: correctly return explicit nil in getOffset
janos Dec 13, 2019
1e94680
storage/fcds: add WithCache optional argument to New constructor
janos Dec 13, 2019
db658c7
storage/fcds: address most of Petar's comments
janos Dec 18, 2019
26f6626
storage/fcds: add offsetCache ttl
janos Dec 18, 2019
5be3c25
Revert "storage/fcds: add offsetCache ttl"
janos Dec 18, 2019
c222011
Merge branch 'master' into fcds
janos Dec 19, 2019
661a7f5
storage/fcds: rename fcds.Interface to fcds.Storer
janos Jan 14, 2020
f51c6d8
storage/fcds: improve some commenting
janos Jan 14, 2020
91fc21f
storage/localstore: improve comment in the Import method
janos Jan 14, 2020
60e3938
storage/localstore: improve migrateDiwali migration message
janos Jan 14, 2020
c542f32
storage/fcds: ensure that chunk data is no longer the the max value
janos Jan 14, 2020
0fc5e3a
storage/localstore: terminate import goroutine in case of errors
janos Jan 14, 2020
842f7d8
storage/localstore: do not put existing chunks
janos Mar 5, 2020
d9341b8
storage/fcds/test: correctly handle storage path
janos Mar 5, 2020
0f13d3b
strage/fcds: check if chunk exists before it is put
janos Mar 5, 2020
4b6f726
storage/fcds: add and use MetaStore.Has
janos Mar 5, 2020
39d328a
storage/fcds: optimize locking
janos Mar 6, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 100 additions & 85 deletions storage/fcds/fcds.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,43 +52,40 @@ var ErrDBClosed = errors.New("closed database")
// Store is the main FCDS implementation. It stores chunk data into
// a number of files partitioned by the last byte of the chunk address.
type Store struct {
shards map[uint8]*os.File // relations with shard id and a shard file
shardsMu map[uint8]*sync.Mutex // mutex for every shard file
meta MetaStore // stores chunk offsets
free map[uint8]struct{} // which shards have free offsets
freeMu sync.RWMutex // protects free field
freeCache *offsetCache // optional cache of free offset values
wg sync.WaitGroup // blocks Close until all other method calls are done
maxChunkSize int // maximal chunk data size
quit chan struct{} // quit disables all operations after Close is called
quitOnce sync.Once // protects close channel from multiple Close calls
shards []shard // relations with shard id and a shard file and their mutexes
meta MetaStore // stores chunk offsets
free []bool // which shards have free offsets
freeMu sync.RWMutex // protects free field
freeCache *offsetCache // optional cache of free offset values
wg sync.WaitGroup // blocks Close until all other method calls are done
maxChunkSize int // maximal chunk data size
quit chan struct{} // quit disables all operations after Close is called
quitOnce sync.Once // protects quit channel from multiple Close calls
}

// NewStore constructs a new Store with files at path, with specified max chunk size.
// New constructs a new Store with files at path, with specified max chunk size.
// Argument withCache enables in memory cache of free chunk data positions in files.
zelig marked this conversation as resolved.
Show resolved Hide resolved
func NewStore(path string, maxChunkSize int, metaStore MetaStore, withCache bool) (s *Store, err error) {
func New(path string, maxChunkSize int, metaStore MetaStore, withCache bool) (s *Store, err error) {
if err := os.MkdirAll(path, 0777); err != nil {
return nil, err
}
shards := make(map[byte]*os.File, shardCount)
shardsMu := make(map[uint8]*sync.Mutex)
shards := make([]shard, shardCount)
for i := byte(0); i < shardCount; i++ {
shards[i], err = os.OpenFile(filepath.Join(path, fmt.Sprintf("chunks-%v.db", i)), os.O_CREATE|os.O_RDWR, 0666)
shards[i].f, err = os.OpenFile(filepath.Join(path, fmt.Sprintf("chunks-%v.db", i)), os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
return nil, err
}
shardsMu[i] = new(sync.Mutex)
shards[i].mu = new(sync.Mutex)
}
var freeCache *offsetCache
if withCache {
freeCache = newOffsetCache(shardCount)
}
return &Store{
shards: shards,
shardsMu: shardsMu,
meta: metaStore,
freeCache: freeCache,
free: make(map[uint8]struct{}),
free: make([]bool, shardCount),
maxChunkSize: maxChunkSize,
quit: make(chan struct{}),
}, nil
Expand All @@ -102,16 +99,16 @@ func (s *Store) Get(addr chunk.Address) (ch chunk.Chunk, err error) {
}
defer done()

mu := s.shardsMu[getShard(addr)]
mu.Lock()
defer mu.Unlock()
sh := s.shards[getShard(addr)]
sh.mu.Lock()
defer sh.mu.Unlock()

m, err := s.getMeta(addr)
if err != nil {
return nil, err
}
data := make([]byte, m.Size)
n, err := s.shards[getShard(addr)].ReadAt(data, m.Offset)
n, err := sh.f.ReadAt(data, m.Offset)
if err != nil && err != io.EOF {
return nil, err
}
Expand All @@ -129,7 +126,7 @@ func (s *Store) Has(addr chunk.Address) (yes bool, err error) {
}
defer done()

mu := s.shardsMu[getShard(addr)]
mu := s.shards[getShard(addr)].mu
mu.Lock()
defer mu.Unlock()

Expand All @@ -152,74 +149,76 @@ func (s *Store) Put(ch chunk.Chunk) (err error) {
defer done()

addr := ch.Address()
shard := getShard(addr)
f := s.shards[shard]
data := ch.Data()

section := make([]byte, s.maxChunkSize)
copy(section, data)
acud marked this conversation as resolved.
Show resolved Hide resolved

s.freeMu.RLock()
_, hasFree := s.free[shard]
s.freeMu.RUnlock()
shard := getShard(addr)
sh := s.shards[shard]

var offset int64
var reclaimed bool
mu := s.shardsMu[shard]
mu.Lock()
if hasFree {
var freeOffset int64 = -1
if s.freeCache != nil {
freeOffset = s.freeCache.get(shard)
}
if freeOffset < 0 {
freeOffset, err = s.meta.FreeOffset(shard)
if err != nil {
return err
}
}
if freeOffset < 0 {
offset, err = f.Seek(0, io.SeekEnd)
if err != nil {
mu.Unlock()
return err
}
s.freeMu.Lock()
delete(s.free, shard)
s.freeMu.Unlock()
} else {
offset, err = f.Seek(freeOffset, io.SeekStart)
if err != nil {
mu.Unlock()
return err
}
reclaimed = true
}
sh.mu.Lock()
defer sh.mu.Unlock()

offset, reclaimed, err := s.getOffset(shard)
if err != nil {
return err
}

if offset < 0 {
offset, err = sh.f.Seek(0, io.SeekEnd)
acud marked this conversation as resolved.
Show resolved Hide resolved
} else {
offset, err = f.Seek(0, io.SeekEnd)
if err != nil {
mu.Unlock()
return err
}
_, err = sh.f.Seek(offset, io.SeekStart)
}
_, err = f.Write(section)
if err != nil {
mu.Unlock()
return err
}
if reclaimed {
if s.freeCache != nil {
s.freeCache.remove(shard, offset)
}
defer mu.Unlock()
} else {
mu.Unlock()

if _, err = sh.f.Write(section); err != nil {
return err
}
if reclaimed && s.freeCache != nil {
s.freeCache.remove(shard, offset)
}
return s.meta.Set(addr, shard, reclaimed, &Meta{
Size: uint16(len(data)),
zelig marked this conversation as resolved.
Show resolved Hide resolved
Offset: offset,
})
}

// getOffset returns an offset where chunk data can be written to
// and a flag if the offset is reclaimed from a previously removed chunk.
// If offset is less then 0, no free offsets are available.
func (s *Store) getOffset(shard uint8) (offset int64, reclaimed bool, err error) {
if !s.shardHasFreeOffsets(shard) {
// shard does not have free offset
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would maybe reduce comments in this function as the function comment itself explains the behavior and the code itself with private function names are 100% self explanatory. So I think having only the code makes it even more readable, imo.

return -1, false, err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

err -> nil preferred

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, yes of course, my mistake.

}

offset = -1 // negative offset denotes no available free offset
if s.freeCache != nil {
// check if local cache has an offset
offset = s.freeCache.get(shard)
}

if offset < 0 {
acud marked this conversation as resolved.
Show resolved Hide resolved
// free cache did not return a free offset,
// check the meta store for one
offset, err = s.meta.FreeOffset(shard)
if err != nil {
return 0, false, err
}
}
if offset < 0 {
// meta store did not return a free offset,
// mark this shard that has no free offsets
s.markShardWithFreeOffsets(shard, false)
return -1, false, nil
}

return offset, true, nil
}

// Delete removes chunk data.
acud marked this conversation as resolved.
Show resolved Hide resolved
func (s *Store) Delete(addr chunk.Address) (err error) {
done, err := s.protect()
Expand All @@ -229,11 +228,9 @@ func (s *Store) Delete(addr chunk.Address) (err error) {
defer done()

shard := getShard(addr)
s.freeMu.Lock()
s.free[shard] = struct{}{}
s.freeMu.Unlock()
s.markShardWithFreeOffsets(shard, true)

mu := s.shardsMu[shard]
mu := s.shards[shard].mu
acud marked this conversation as resolved.
Show resolved Hide resolved
mu.Lock()
defer mu.Unlock()

Expand All @@ -260,18 +257,18 @@ func (s *Store) Iterate(fn func(chunk.Chunk) (stop bool, err error)) (err error)
}
defer done()

for _, mu := range s.shardsMu {
mu.Lock()
for _, sh := range s.shards {
sh.mu.Lock()
}
defer func() {
for _, mu := range s.shardsMu {
mu.Unlock()
for _, sh := range s.shards {
sh.mu.Unlock()
}
}()

return s.meta.Iterate(func(addr chunk.Address, m *Meta) (stop bool, err error) {
data := make([]byte, m.Size)
_, err = s.shards[getShard(addr)].ReadAt(data, m.Offset)
_, err = s.shards[getShard(addr)].f.ReadAt(data, m.Offset)
if err != nil {
return true, err
}
Expand All @@ -298,8 +295,8 @@ func (s *Store) Close() (err error) {
case <-time.After(15 * time.Second):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a debug log here?

}

for _, f := range s.shards {
if err := f.Close(); err != nil {
for _, sh := range s.shards {
if err := sh.f.Close(); err != nil {
Copy link
Collaborator

@jmozah jmozah Mar 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping the fd open for a long duration without mmap is a disaster in the making. I am not sure if go file flush() does a os level fsync(). If it is, then we should flush() at regular intervals to save the contents against crash.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice observation. Go os.File.Sync() is flushing the content to the disk, I am not sure to which flush() are you referring to.

Flushing on (regular) intervals is what operating system is already doing. I am not sure how this would help agains crash unless we fsync on every write, which is quite costly. I have already tested fsync on every write and it makes fcds much slower, even compared to go-leveldb, as go-leveldb dos not fsync at all.

Mmap brings its own complexity, especially on different operating systems.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reply.

Go os.File.Sync() is flushing the content to the disk, I am not sure to which flush() are you referring to.

Apologies if i confused you. There are two things...

  1. flush(), which flushes the application write buffers to OS.
  2. fsync(), the OS level sync which absolutely makes sure that buffers has gone to disk.

The first one is done by golang itself as pointed out by you. The second one is the one i am concerned.

Flushing on (regular) intervals is what operating system is already doing

It is usually done by OS disk drivers whenever they feel it is okay. The fsync() is expensive as pointed out by you. To avoid fsyncing on every commit, DB's usually implement WAL's which fsyncs it on very small regular intervals on the background. so that even if some data is lost it will be of for very small duration . If you fsync on foreground (query path) everytime that will be very expensive.

even compared to go-leveldb, as go-leveldb dos not fsync at all.

leveldb takes another strategy my doing a larger mmap file than required and written directly using memcopy, then on mmap driver takes care of writing the dirty pages to the disk.

All i am saying is, one way or other, if we want to evade crash and end up with corrupted files on bootup, We have to implement this otherwise the I am sure we can expect some gibberish files when you switch off power abruptly.

Mmap brings its own complexity, especially on different operating systems.

Yes. We should not reinvent the wheel.
As a remedy... we have two options

  1. Use already existing DB which takes care of this ( we can talk about badger if you like)
  2. In Forky, do a FIle.Sync() every X seconds ( where we can tolerate X seconds data loss) in a background thread for all files.

I found this intresting read on this topic
https://www.joeshaw.org/dont-defer-close-on-writable-files/

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explanations.

Use already existing DB which takes care of this ( we can talk about badger if you like)

I have already tried badger and it is slower then go-leveldb. Maybe you can find to do it more efficiently. And still it does not scale with more cpu cores.

In Forky, do a FIle.Sync() every X seconds ( where we can tolerate X seconds data loss) in a background thread for all files.

I am concerned if this is actually help us, as os is already doing that in some frequency. Also, as you pointed, the solution would be to implement WAL, which in some basic form I already did, with of course, some performance penalties.

As far as I can see go-leveldb is not using mmap.

I see your point very valid, but I think that your suggestions should be tested, even the ones that I already did, to revalidate. We can talk about possibilities, but it would be good to actually test resilience for the weak points that you described.

As this is a very important part of the swarm I am more in favour not to merge this PR until we are clear that it is reliable. Currently, I see that it creates more problems than it brings benefits.

Copy link
Collaborator

@jmozah jmozah Mar 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did few changes in the badger configuration and did run TestLevelDBForky and TestBadgerSuite test cases. These are the results.
Screenshot 2020-03-10 at 1 16 10 AM

The iteration in Badger is somewhat slow... so i commented out the Iteration and did these test cases.. with an assumption that write/read/delete is more important and not iteration.

There are more write improvements i can make in badger.. like batching the Writes and so on.. but for now.. i will leave it like this...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, but I cannot say anything from the screenshots except to compare timings and conclude that TestBadgerSuite writes are slower. It would be very helpful to actually see the changes and what is TestBadgerSuite doing.

Copy link
Collaborator

@jmozah jmozah Mar 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look at the last 2 commits in my fork https://github.com/jmozah/forky/commits/master
The only one that matters is SyncWrites = false, which makes Forky and badger apples to apples in my opinion. Others configs are more experimental.

return err
}
}
Expand Down Expand Up @@ -327,7 +324,25 @@ func (s *Store) getMeta(addr chunk.Address) (m *Meta, err error) {
return s.meta.Get(addr)
}

func (s *Store) markShardWithFreeOffsets(shard uint8, has bool) {
s.freeMu.Lock()
s.free[shard] = has
s.freeMu.Unlock()
}

func (s *Store) shardHasFreeOffsets(shard uint8) (has bool) {
s.freeMu.RLock()
has = s.free[shard]
s.freeMu.RUnlock()
return has
}

// getShard returns a shard number for the chunk address.
func getShard(addr chunk.Address) (shard uint8) {
return addr[len(addr)-1] % shardCount
acud marked this conversation as resolved.
Show resolved Hide resolved
}

type shard struct {
f *os.File
mu *sync.Mutex
}
4 changes: 2 additions & 2 deletions storage/fcds/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ type Store struct {
m *mock.NodeStore
}

// NewStore returns a new store with mock NodeStore
// New returns a new store with mock NodeStore
// for storing Chunk data.
func NewStore(m *mock.NodeStore) (s *Store) {
func New(m *mock.NodeStore) (s *Store) {
return &Store{
m: m,
}
Expand Down
2 changes: 1 addition & 1 deletion storage/fcds/mock/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
// TestFCDS runs a standard series of tests on mock Store implementation.
func TestFCDS(t *testing.T) {
test.RunAll(t, func(t *testing.T) (fcds.Interface, func()) {
return mock.NewStore(
return mock.New(
mem.NewGlobalStore().NewNodeStore(
common.BytesToAddress(make([]byte, 20)),
),
Expand Down
2 changes: 1 addition & 1 deletion storage/fcds/test/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func NewFCDSStore(t *testing.T, path string, metaStore fcds.MetaStore) (s *fcds.
t.Fatal(err)
}

s, err = fcds.NewStore(path, chunk.DefaultSize, metaStore, !*noCacheFlag)
s, err = fcds.New(path, chunk.DefaultSize, metaStore, !*noCacheFlag)
if err != nil {
os.RemoveAll(path)
t.Fatal(err)
Expand Down
1 change: 0 additions & 1 deletion storage/localstore/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
metrics.GetOrRegisterGauge(metricName+".accessts", nil).Update(item.AccessTimestamp)

// delete from retrieve, pull, gc
//db.retrievalDataIndex.DeleteInBatch(batch, item)
addrs = append(addrs, item.Address)
db.metaIndex.DeleteInBatch(batch, item)
db.pullIndex.DeleteInBatch(batch, item)
Expand Down
4 changes: 2 additions & 2 deletions storage/localstore/localstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
if err != nil {
return nil, err
}
db.data, err = fcds.NewStore(
db.data, err = fcds.New(
filepath.Join(path, "data"),
chunk.DefaultSize+8, // chunk data has additional 8 bytes prepended
metaStore,
Expand All @@ -237,7 +237,7 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
}
} else {
// Mock store is provided, use mock FCDS.
db.data = fcdsmock.NewStore(o.MockStore)
db.data = fcdsmock.New(o.MockStore)
}
// Index storing bin id, store and access timestamp for a particular address.
// It is needed in order to update gc index keys for iteration order.
Expand Down