Skip to content

Commit

Permalink
node/engine: do not skip objects if shards are busy
Browse files Browse the repository at this point in the history
If every shard's pool is overloaded with routines, choose the best one and try
to PUT an object to it 30 seconds. Closes #2871.

Signed-off-by: Pavel Karpy <[email protected]>
  • Loading branch information
carpawell committed Nov 20, 2024
1 parent 758f86c commit 5110450
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ attribute, which is used for container domain name in NNS contracts (#2954)
- `meta.DB.Open(readOnly)` moves metabase in RO mode (#3000)
- Panic in event listener related to inability to switch RPC node (#2970)
- Non-container nodes never check placement policy on PUT, SEARCH requests (#3014)
- If shards are overloaded with PUT requests, operation is not skipped but waits for 30 seconds (#2871)

### Changed
- `ObjectService`'s `Put` RPC handler caches up to 10K lists of per-object sorted container nodes (#2901)
Expand Down
2 changes: 1 addition & 1 deletion pkg/local_object_storage/engine/evacuate.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ mainLoop:
if _, ok := shardMap[shards[j].ID().String()]; ok {
continue
}
putDone, exists := e.putToShard(shards[j].shardWrapper, j, shards[j].pool, addr, getRes.Object(), nil, 0)
putDone, exists, _ := e.putToShard(shards[j].shardWrapper, j, shards[j].pool, addr, getRes.Object(), nil, 0)
if putDone || exists {
if putDone {
e.log.Debug("object is moved to another shard",
Expand Down
51 changes: 47 additions & 4 deletions pkg/local_object_storage/engine/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package engine

import (
"errors"
"time"

"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
Expand All @@ -10,6 +11,7 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/util"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -46,29 +48,43 @@ func (e *StorageEngine) Put(obj *objectSDK.Object, objBin []byte, hdrLen int) er
return err
}

var bestShard shardWrapper
var bestPool util.WorkerPool

for i, sh := range e.sortedShards(addr) {
e.mtx.RLock()
pool, ok := e.shardPools[sh.ID().String()]
if ok && bestPool == nil {
bestShard = sh
bestPool = pool
}
e.mtx.RUnlock()
if !ok {
// Shard was concurrently removed, skip.
continue
}

putDone, exists := e.putToShard(sh, i, pool, addr, obj, objBin, hdrLen)
putDone, exists, _ := e.putToShard(sh, i, pool, addr, obj, objBin, hdrLen)
if putDone || exists {
return nil
}
}

e.log.Debug("failed to put object to shards, trying the best one more", zap.Stringer("addr", addr))

if e.putToShardWithDeadLine(bestShard, 0, bestPool, addr, obj, objBin, hdrLen) {
return nil
}

return errPutShard
}

// putToShard puts object to sh.
// First return value is true iff put has been successfully done.
// Second return value is true iff object already exists.
func (e *StorageEngine) putToShard(sh shardWrapper, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object, objBin []byte, hdrLen int) (bool, bool) {
var putSuccess, alreadyExists bool
// Third return value is true iff object cannot be put because of max concurrent load.
func (e *StorageEngine) putToShard(sh shardWrapper, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object, objBin []byte, hdrLen int) (bool, bool, bool) {
var putSuccess, alreadyExists, overloaded bool
id := sh.ID()

exitCh := make(chan struct{})
Expand Down Expand Up @@ -140,10 +156,37 @@ func (e *StorageEngine) putToShard(sh shardWrapper, ind int, pool util.WorkerPoo
putSuccess = true
}); err != nil {
e.log.Warn("object put: pool task submitting", zap.Stringer("shard", id), zap.Error(err))
overloaded = errors.Is(err, ants.ErrPoolOverload)
close(exitCh)
}

<-exitCh

return putSuccess, alreadyExists
return putSuccess, alreadyExists, overloaded
}

func (e *StorageEngine) putToShardWithDeadLine(sh shardWrapper, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object, objBin []byte, hdrLen int) bool {
var deadline = 30 * time.Second
timer := time.NewTimer(deadline)
defer timer.Stop()

const putCooldown = 100 * time.Millisecond
ticker := time.NewTicker(putCooldown)
defer ticker.Stop()

for {
select {
case <-timer.C:
e.log.Error("could not put object", zap.Stringer("addr", addr), zap.Duration("deadline", deadline))
return false
case <-ticker.C:
putDone, exists, overloaded := e.putToShard(sh, ind, pool, addr, obj, objBin, hdrLen)
if overloaded {
ticker.Reset(putCooldown)
continue
}

return putDone || exists
}
}
}

0 comments on commit 5110450

Please sign in to comment.