Skip to content

Commit

Permalink
node/engine: do not skip objects if shards are busy
Browse files Browse the repository at this point in the history
If every shard's pool is overloaded with routines, choose the best one and try
to PUT an object to it 30 seconds. Closes #2871.

Signed-off-by: Pavel Karpy <[email protected]>
  • Loading branch information
carpawell committed Nov 20, 2024
1 parent 758f86c commit c54d353
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ attribute, which is used for container domain name in NNS contracts (#2954)
- `meta.DB.Open(readOnly)` moves metabase in RO mode (#3000)
- Panic in event listener related to inability to switch RPC node (#2970)
- Non-container nodes never check placement policy on PUT, SEARCH requests (#3014)
- If shards are overloaded with PUT requests, operation is not skipped but waits for 30 seconds (#2871)

### Changed
- `ObjectService`'s `Put` RPC handler caches up to 10K lists of per-object sorted container nodes (#2901)
Expand Down
2 changes: 1 addition & 1 deletion pkg/local_object_storage/engine/evacuate.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ mainLoop:
if _, ok := shardMap[shards[j].ID().String()]; ok {
continue
}
putDone, exists := e.putToShard(shards[j].shardWrapper, j, shards[j].pool, addr, getRes.Object(), nil, 0)
putDone, exists, _ := e.putToShard(shards[j].shardWrapper, j, shards[j].pool, addr, getRes.Object(), nil, 0)
if putDone || exists {
if putDone {
e.log.Debug("object is moved to another shard",
Expand Down
52 changes: 48 additions & 4 deletions pkg/local_object_storage/engine/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package engine

import (
"errors"
"time"

"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
Expand All @@ -10,6 +11,7 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/util"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -46,29 +48,44 @@ func (e *StorageEngine) Put(obj *objectSDK.Object, objBin []byte, hdrLen int) er
return err
}

var bestShard shardWrapper
var bestPool util.WorkerPool

for i, sh := range e.sortedShards(addr) {
e.mtx.RLock()
pool, ok := e.shardPools[sh.ID().String()]
if ok && bestPool == nil {
bestShard = sh
bestPool = pool
}
e.mtx.RUnlock()
if !ok {
// Shard was concurrently removed, skip.
continue
}

putDone, exists := e.putToShard(sh, i, pool, addr, obj, objBin, hdrLen)
putDone, exists, _ := e.putToShard(sh, i, pool, addr, obj, objBin, hdrLen)
if putDone || exists {
return nil
}
}

e.log.Debug("failed to put object to shards, trying the best one more",
zap.Stringer("addr", addr), zap.Stringer("best shard", bestShard.ID()))

if e.putToShardWithDeadLine(bestShard, 0, bestPool, addr, obj, objBin, hdrLen) {
return nil
}

Check warning on line 78 in pkg/local_object_storage/engine/put.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/put.go#L73-L78

Added lines #L73 - L78 were not covered by tests

return errPutShard
}

// putToShard puts object to sh.
// First return value is true iff put has been successfully done.
// Second return value is true iff object already exists.
func (e *StorageEngine) putToShard(sh shardWrapper, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object, objBin []byte, hdrLen int) (bool, bool) {
var putSuccess, alreadyExists bool
// Third return value is true iff object cannot be put because of max concurrent load.
func (e *StorageEngine) putToShard(sh shardWrapper, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object, objBin []byte, hdrLen int) (bool, bool, bool) {
var putSuccess, alreadyExists, overloaded bool
id := sh.ID()

exitCh := make(chan struct{})
Expand Down Expand Up @@ -140,10 +157,37 @@ func (e *StorageEngine) putToShard(sh shardWrapper, ind int, pool util.WorkerPoo
putSuccess = true
}); err != nil {
e.log.Warn("object put: pool task submitting", zap.Stringer("shard", id), zap.Error(err))
overloaded = errors.Is(err, ants.ErrPoolOverload)

Check warning on line 160 in pkg/local_object_storage/engine/put.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/put.go#L159-L160

Added lines #L159 - L160 were not covered by tests
close(exitCh)
}

<-exitCh

return putSuccess, alreadyExists
return putSuccess, alreadyExists, overloaded
}

func (e *StorageEngine) putToShardWithDeadLine(sh shardWrapper, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object, objBin []byte, hdrLen int) bool {
var deadline = 30 * time.Second
timer := time.NewTimer(deadline)
defer timer.Stop()

const putCooldown = 100 * time.Millisecond
ticker := time.NewTicker(putCooldown)
defer ticker.Stop()

for {
select {
case <-timer.C:
e.log.Error("could not put object", zap.Stringer("addr", addr), zap.Duration("deadline", deadline))
return false
case <-ticker.C:
putDone, exists, overloaded := e.putToShard(sh, ind, pool, addr, obj, objBin, hdrLen)
if overloaded {
ticker.Reset(putCooldown)
continue

Check warning on line 187 in pkg/local_object_storage/engine/put.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/put.go#L169-L187

Added lines #L169 - L187 were not covered by tests
}

return putDone || exists

Check warning on line 190 in pkg/local_object_storage/engine/put.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/put.go#L190

Added line #L190 was not covered by tests
}
}
}

0 comments on commit c54d353

Please sign in to comment.