Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/put on busy shards #2965

Merged
merged 3 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ attribute, which is used for container domain name in NNS contracts (#2954)
- `meta.DB.Open(readOnly)` moves metabase in RO mode (#3000)
- Panic in event listener related to inability to switch RPC node (#2970)
- Non-container nodes never check placement policy on PUT, SEARCH requests (#3014)
- If shards are overloaded with PUT requests, operation is not skipped but waits for 30 seconds (#2871)

### Changed
- `ObjectService`'s `Put` RPC handler caches up to 10K lists of per-object sorted container nodes (#2901)
Expand Down Expand Up @@ -58,6 +59,11 @@ Metabase version has been increased, auto migrating will be performed once
a v0.44.0 Storage Node is started with a v0.43.0 metabase. This action can
not be undone. No additional work should be done.

The new `storage.put_retry_timeout` config value added. If an object cannot
be PUT to storage, node tries to PUT it to the best shard for it (according to
placement sorting) and only to it for this long before operation error is
returned.

Binary keys are no longer supported by storage node, NEP-6 wallet support was
introduced in version 0.22.3 and support for binary keys was removed from
other components in 0.33.0 and 0.37.0. Please migrate to wallets (see 0.37.0
Expand Down
2 changes: 2 additions & 0 deletions cmd/neofs-node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
shardPoolSize uint32
shards []storage.ShardCfg
isIgnoreUninitedShards bool
objectPutRetryDeadline time.Duration
}

policer struct {
Expand Down Expand Up @@ -159,6 +160,7 @@
a.engine.errorThreshold = engineconfig.ShardErrorThreshold(c)
a.engine.shardPoolSize = engineconfig.ShardPoolSize(c)
a.engine.isIgnoreUninitedShards = engineconfig.IgnoreUninitedShards(c)
a.engine.objectPutRetryDeadline = engineconfig.ObjectPutRetryDeadline(c)

Check warning on line 163 in cmd/neofs-node/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/config.go#L163

Added line #L163 was not covered by tests

// Morph

Expand Down
8 changes: 8 additions & 0 deletions cmd/neofs-node/config/engine/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package engineconfig
import (
"errors"
"strconv"
"time"

"github.com/nspcc-dev/neofs-node/cmd/neofs-node/config"
shardconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard"
Expand Down Expand Up @@ -90,3 +91,10 @@ func ShardErrorThreshold(c *config.Config) uint32 {
func IgnoreUninitedShards(c *config.Config) bool {
return config.BoolSafe(c.Sub(subsection), "ignore_uninited_shards")
}

// ObjectPutRetryDeadline returns the value of "put_retry_deadline" config parameter from "storage" section.
//
// Returns false if the value is missing.
func ObjectPutRetryDeadline(c *config.Config) time.Duration {
return config.DurationSafe(c.Sub(subsection), "put_retry_timeout")
}
2 changes: 2 additions & 0 deletions cmd/neofs-node/config/engine/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func TestEngineSection(t *testing.T) {
require.EqualValues(t, 0, engineconfig.ShardErrorThreshold(empty))
require.EqualValues(t, engineconfig.ShardPoolSizeDefault, engineconfig.ShardPoolSize(empty))
require.EqualValues(t, mode.ReadWrite, shardconfig.From(empty).Mode())
require.Zero(t, engineconfig.ObjectPutRetryDeadline(empty))
})

const path = "../../../../config/example/node"
Expand All @@ -46,6 +47,7 @@ func TestEngineSection(t *testing.T) {

require.EqualValues(t, 100, engineconfig.ShardErrorThreshold(c))
require.EqualValues(t, 15, engineconfig.ShardPoolSize(c))
require.EqualValues(t, 5*time.Second, engineconfig.ObjectPutRetryDeadline(c))
require.EqualValues(t, true, engineconfig.IgnoreUninitedShards(c))

err := engineconfig.IterateShards(c, true, func(sc *shardconfig.Config) error {
Expand Down
7 changes: 4 additions & 3 deletions cmd/neofs-node/config/internal/validate/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,10 @@ type valideConfig struct {
} `mapstructure:"object"`

Storage struct {
ShardPoolSize int `mapstructure:"shard_pool_size"`
ShardROErrorThreshold int `mapstructure:"shard_ro_error_threshold"`
IgnoreUninitedShards bool `mapstructure:"ignore_uninited_shards"`
ShardPoolSize int `mapstructure:"shard_pool_size"`
ShardROErrorThreshold int `mapstructure:"shard_ro_error_threshold"`
PutRetryTimeout time.Duration `mapstructure:"put_retry_timeout"`
IgnoreUninitedShards bool `mapstructure:"ignore_uninited_shards"`
Shard struct {
Default shardDetails `mapstructure:"default"`
ShardList map[string]shardDetails `mapstructure:",remain" prefix:""`
Expand Down
1 change: 1 addition & 0 deletions cmd/neofs-node/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@

engine.WithLogger(c.log),
engine.WithIgnoreUninitedShards(c.engine.isIgnoreUninitedShards),
engine.WithObjectPutRetryTimeout(c.engine.objectPutRetryDeadline),

Check warning on line 87 in cmd/neofs-node/storage.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/storage.go#L87

Added line #L87 was not covered by tests
)

if c.shared.basics.ttl > 0 {
Expand Down
1 change: 1 addition & 0 deletions config/example/node.env
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ NEOFS_OBJECT_PUT_POOL_SIZE_REMOTE=100

# Storage engine section
NEOFS_STORAGE_SHARD_POOL_SIZE=15
NEOFS_STORAGE_PUT_RETRY_TIMEOUT=5s
NEOFS_STORAGE_SHARD_RO_ERROR_THRESHOLD=100
NEOFS_STORAGE_IGNORE_UNINITED_SHARDS=true
## 0 shard
Expand Down
1 change: 1 addition & 0 deletions config/example/node.json
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
"storage": {
"shard_pool_size": 15,
"shard_ro_error_threshold": 100,
"put_retry_timeout": "5s",
"ignore_uninited_shards": true,
"shard": {
"0": {
Expand Down
1 change: 1 addition & 0 deletions config/example/node.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ object:
storage:
# note: shard configuration can be omitted for relay node (see `node.relay`)
shard_pool_size: 15 # size of per-shard worker pools used for PUT operations
put_retry_timeout: 5s # object PUT retry timeout
shard_ro_error_threshold: 100 # amount of errors to occur before shard is made read-only (default: 0, ignore errors)
ignore_uninited_shards: true # do we need to ignore uninited shards (default: false, fail on any shard failure)

Expand Down
13 changes: 7 additions & 6 deletions docs/storage-node-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,13 @@ morph:

Local storage engine configuration.

| Parameter | Type | Default value | Description |
|----------------------------|-----------------------------------|---------------|------------------------------------------------------------------------------------------------------------------|
| `shard_pool_size` | `int` | `20` | Pool size for shard workers. Limits the amount of concurrent `PUT` operations on each shard. |
| `shard_ro_error_threshold` | `int` | `0` | Maximum amount of storage errors to encounter before shard automatically moves to `Degraded` or `ReadOnly` mode. |
| `ignore_uninited_shards` | `bool` | `false` | Flag that specifies whether uninited shards should be ignored. |
| `shard` | [Shard config](#shard-subsection) | | Configuration for separate shards. |
| Parameter | Type | Default value | Description |
|----------------------------|------------------------------------|---------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `shard_pool_size` | `int` | `20` | Pool size for shard workers. Limits the amount of concurrent `PUT` operations on each shard. |
| `shard_ro_error_threshold` | `int` | `0` | Maximum amount of storage errors to encounter before shard automatically moves to `Degraded` or `ReadOnly` mode. |
| `ignore_uninited_shards` | `bool` | `false` | Flag that specifies whether uninited shards should be ignored. |
| `put_retry_deadline` | `duration` | `0` | If an object cannot be PUT to storage, node tries to PUT it to the best shard for it (according to placement sorting) and only to it for this long before operation error is returned. Defalt value does not apply any retry policy at all. |
| `shard` | [Shard config](#shard-subsection) | | Configuration for separate shards. |

## `shard` subsection

Expand Down
13 changes: 12 additions & 1 deletion pkg/local_object_storage/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"errors"
"sync"
"sync/atomic"
"time"

"github.com/nspcc-dev/neofs-node/pkg/core/container"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
Expand Down Expand Up @@ -197,7 +198,8 @@

metrics MetricRegister

shardPoolSize uint32
objectPutTimeout time.Duration
shardPoolSize uint32

containerSource container.Source

Expand Down Expand Up @@ -271,3 +273,12 @@
c.isIgnoreUninitedShards = flag
}
}

// WithObjectPutRetryTimeout return an option to specify time for object PUT operation.
// It does not stop any disk operation, only affects retryes policy. Zero value
// is acceptable and means no retry on any shard.
func WithObjectPutRetryTimeout(t time.Duration) Option {
return func(c *cfg) {
c.objectPutTimeout = t
}

Check warning on line 283 in pkg/local_object_storage/engine/engine.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/engine.go#L280-L283

Added lines #L280 - L283 were not covered by tests
}
2 changes: 1 addition & 1 deletion pkg/local_object_storage/engine/evacuate.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ mainLoop:
if _, ok := shardMap[shards[j].ID().String()]; ok {
continue
}
putDone, exists := e.putToShard(shards[j].shardWrapper, j, shards[j].pool, addr, getRes.Object(), nil, 0)
putDone, exists, _ := e.putToShard(shards[j].shardWrapper, j, shards[j].pool, addr, getRes.Object(), nil, 0)
if putDone || exists {
if putDone {
e.log.Debug("object is moved to another shard",
Expand Down
62 changes: 53 additions & 9 deletions pkg/local_object_storage/engine/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import (
"errors"
"time"

"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
Expand All @@ -10,6 +11,7 @@
"github.com/nspcc-dev/neofs-node/pkg/util"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -46,29 +48,45 @@
return err
}

var bestShard shardWrapper
var bestPool util.WorkerPool

for i, sh := range e.sortedShards(addr) {
e.mtx.RLock()
pool, ok := e.shardPools[sh.ID().String()]
if ok && bestPool == nil {
bestShard = sh
bestPool = pool
}
e.mtx.RUnlock()
if !ok {
// Shard was concurrently removed, skip.
continue
}

putDone, exists := e.putToShard(sh, i, pool, addr, obj, objBin, hdrLen)
putDone, exists, _ := e.putToShard(sh, i, pool, addr, obj, objBin, hdrLen)
if putDone || exists {
return nil
}
}

e.log.Debug("failed to put object to shards, trying the best one more",
zap.Stringer("addr", addr), zap.Stringer("best shard", bestShard.ID()))

if e.objectPutTimeout > 0 && e.putToShardWithDeadLine(bestShard, 0, bestPool, addr, obj, objBin, hdrLen) {
return nil
}

Check warning on line 78 in pkg/local_object_storage/engine/put.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/put.go#L73-L78

Added lines #L73 - L78 were not covered by tests

return errPutShard
}

// putToShard puts object to sh.
// First return value is true iff put has been successfully done.
// Second return value is true iff object already exists.
func (e *StorageEngine) putToShard(sh shardWrapper, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object, objBin []byte, hdrLen int) (bool, bool) {
var putSuccess, alreadyExists bool
// Third return value is true iff object cannot be put because of max concurrent load.
func (e *StorageEngine) putToShard(sh shardWrapper, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object, objBin []byte, hdrLen int) (bool, bool, bool) {
var putSuccess, alreadyExists, overloaded bool
id := sh.ID()

exitCh := make(chan struct{})

Expand All @@ -82,7 +100,7 @@
if err != nil {
e.log.Warn("object put: check object existence",
zap.Stringer("addr", addr),
zap.Stringer("shard", sh.ID()),
zap.Stringer("shard", id),

Check warning on line 103 in pkg/local_object_storage/engine/put.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/put.go#L103

Added line #L103 was not covered by tests
zap.Error(err))

if shard.IsErrObjectExpired(err) {
Expand All @@ -103,14 +121,14 @@
_, err = sh.ToMoveIt(toMoveItPrm)
if err != nil {
e.log.Warn("could not mark object for shard relocation",
zap.Stringer("shard", sh.ID()),
zap.Stringer("shard", id),

Check warning on line 124 in pkg/local_object_storage/engine/put.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/put.go#L124

Added line #L124 was not covered by tests
zap.String("error", err.Error()),
)
}
}

e.log.Debug("object put: object already exists",
zap.Stringer("shard", sh.ID()),
zap.Stringer("shard", id),
zap.Stringer("addr", addr))

return
Expand All @@ -127,7 +145,7 @@
if errors.Is(err, shard.ErrReadOnlyMode) || errors.Is(err, blobstor.ErrNoPlaceFound) ||
errors.Is(err, common.ErrReadOnly) || errors.Is(err, common.ErrNoSpace) {
e.log.Warn("could not put object to shard",
zap.Stringer("shard_id", sh.ID()),
zap.Stringer("shard_id", id),
zap.String("error", err.Error()))
return
}
Expand All @@ -138,11 +156,37 @@

putSuccess = true
}); err != nil {
e.log.Warn("object put: pool task submitting", zap.Error(err))
e.log.Warn("object put: pool task submitting", zap.Stringer("shard", id), zap.Error(err))
overloaded = errors.Is(err, ants.ErrPoolOverload)

Check warning on line 160 in pkg/local_object_storage/engine/put.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/put.go#L159-L160

Added lines #L159 - L160 were not covered by tests
close(exitCh)
}

<-exitCh

return putSuccess, alreadyExists
return putSuccess, alreadyExists, overloaded
}

func (e *StorageEngine) putToShardWithDeadLine(sh shardWrapper, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object, objBin []byte, hdrLen int) bool {
timer := time.NewTimer(e.cfg.objectPutTimeout)
defer timer.Stop()

const putCooldown = 100 * time.Millisecond
ticker := time.NewTicker(putCooldown)
defer ticker.Stop()

for {
select {
case <-timer.C:
e.log.Error("could not put object", zap.Stringer("addr", addr), zap.Duration("deadline", e.cfg.objectPutTimeout))
return false
case <-ticker.C:
putDone, exists, overloaded := e.putToShard(sh, ind, pool, addr, obj, objBin, hdrLen)
if overloaded {
ticker.Reset(putCooldown)
continue

Check warning on line 186 in pkg/local_object_storage/engine/put.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/put.go#L169-L186

Added lines #L169 - L186 were not covered by tests
}

return putDone || exists

Check warning on line 189 in pkg/local_object_storage/engine/put.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/put.go#L189

Added line #L189 was not covered by tests
}
}
}
Loading