From c54d35304a3d42749abe11be1dc74ddc719b0674 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Mon, 7 Oct 2024 20:57:15 +0300 Subject: [PATCH] node/engine: do not skip objects if shards are busy If every shard's pool is overloaded with routines, choose the best one and try to PUT an object to it 30 seconds. Closes #2871. Signed-off-by: Pavel Karpy --- CHANGELOG.md | 1 + pkg/local_object_storage/engine/evacuate.go | 2 +- pkg/local_object_storage/engine/put.go | 52 +++++++++++++++++++-- 3 files changed, 50 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c8e199d4a5..92cd1a352c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ attribute, which is used for container domain name in NNS contracts (#2954) - `meta.DB.Open(readOnly)` moves metabase in RO mode (#3000) - Panic in event listener related to inability to switch RPC node (#2970) - Non-container nodes never check placement policy on PUT, SEARCH requests (#3014) +- If shards are overloaded with PUT requests, operation is not skipped but waits for 30 seconds (#2871) ### Changed - `ObjectService`'s `Put` RPC handler caches up to 10K lists of per-object sorted container nodes (#2901) diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index b42edad30f..afaac70b90 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -125,7 +125,7 @@ mainLoop: if _, ok := shardMap[shards[j].ID().String()]; ok { continue } - putDone, exists := e.putToShard(shards[j].shardWrapper, j, shards[j].pool, addr, getRes.Object(), nil, 0) + putDone, exists, _ := e.putToShard(shards[j].shardWrapper, j, shards[j].pool, addr, getRes.Object(), nil, 0) if putDone || exists { if putDone { e.log.Debug("object is moved to another shard", diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index 483e928a17..130508d6e5 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -2,6 +2,7 @@ package engine import ( "errors" + "time" "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" @@ -10,6 +11,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/util" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "github.com/panjf2000/ants/v2" "go.uber.org/zap" ) @@ -46,29 +48,44 @@ func (e *StorageEngine) Put(obj *objectSDK.Object, objBin []byte, hdrLen int) er return err } + var bestShard shardWrapper + var bestPool util.WorkerPool + for i, sh := range e.sortedShards(addr) { e.mtx.RLock() pool, ok := e.shardPools[sh.ID().String()] + if ok && bestPool == nil { + bestShard = sh + bestPool = pool + } e.mtx.RUnlock() if !ok { // Shard was concurrently removed, skip. continue } - putDone, exists := e.putToShard(sh, i, pool, addr, obj, objBin, hdrLen) + putDone, exists, _ := e.putToShard(sh, i, pool, addr, obj, objBin, hdrLen) if putDone || exists { return nil } } + e.log.Debug("failed to put object to shards, trying the best one more", + zap.Stringer("addr", addr), zap.Stringer("best shard", bestShard.ID())) + + if e.putToShardWithDeadLine(bestShard, 0, bestPool, addr, obj, objBin, hdrLen) { + return nil + } + return errPutShard } // putToShard puts object to sh. // First return value is true iff put has been successfully done. // Second return value is true iff object already exists. -func (e *StorageEngine) putToShard(sh shardWrapper, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object, objBin []byte, hdrLen int) (bool, bool) { - var putSuccess, alreadyExists bool +// Third return value is true iff object cannot be put because of max concurrent load. +func (e *StorageEngine) putToShard(sh shardWrapper, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object, objBin []byte, hdrLen int) (bool, bool, bool) { + var putSuccess, alreadyExists, overloaded bool id := sh.ID() exitCh := make(chan struct{}) @@ -140,10 +157,37 @@ func (e *StorageEngine) putToShard(sh shardWrapper, ind int, pool util.WorkerPoo putSuccess = true }); err != nil { e.log.Warn("object put: pool task submitting", zap.Stringer("shard", id), zap.Error(err)) + overloaded = errors.Is(err, ants.ErrPoolOverload) close(exitCh) } <-exitCh - return putSuccess, alreadyExists + return putSuccess, alreadyExists, overloaded +} + +func (e *StorageEngine) putToShardWithDeadLine(sh shardWrapper, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object, objBin []byte, hdrLen int) bool { + var deadline = 30 * time.Second + timer := time.NewTimer(deadline) + defer timer.Stop() + + const putCooldown = 100 * time.Millisecond + ticker := time.NewTicker(putCooldown) + defer ticker.Stop() + + for { + select { + case <-timer.C: + e.log.Error("could not put object", zap.Stringer("addr", addr), zap.Duration("deadline", deadline)) + return false + case <-ticker.C: + putDone, exists, overloaded := e.putToShard(sh, ind, pool, addr, obj, objBin, hdrLen) + if overloaded { + ticker.Reset(putCooldown) + continue + } + + return putDone || exists + } + } }