Skip to content

Commit

Permalink
Simplify engine interface (#3008)
Browse files Browse the repository at this point in the history
Mostly similar to #3001, we have a number of Prm/Res structures that
make it harder to used engine and trace things going on. But there are
some functional changes here as well and also things that go beyond
Prm/Res. Overall we get rid of many pointless conversions.
  • Loading branch information
roman-khimov authored Nov 12, 2024
2 parents 007e992 + e482dbf commit 6ea74d2
Show file tree
Hide file tree
Showing 38 changed files with 351 additions and 956 deletions.
3 changes: 1 addition & 2 deletions cmd/neofs-lens/internal/storage/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"

common "github.com/nspcc-dev/neofs-node/cmd/neofs-lens/internal"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -38,7 +37,7 @@ func getFunc(cmd *cobra.Command, _ []string) error {
}
defer storage.Close()

obj, err := engine.Get(storage, addr)
obj, err := storage.Get(addr)
if err != nil {
return fmt.Errorf("could not fetch object: %w", err)
}
Expand Down
11 changes: 6 additions & 5 deletions cmd/neofs-lens/internal/storage/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"

common "github.com/nspcc-dev/neofs-node/cmd/neofs-lens/internal"
objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
"github.com/spf13/cobra"
)
Expand All @@ -32,10 +33,12 @@ func listFunc(cmd *cobra.Command, _ []string) error {
}
defer storage.Close()

var p engine.ListWithCursorPrm
p.WithCount(1024)
var (
addrs []objectcore.AddressWithType
cursor *engine.Cursor
)
for {
r, err := storage.ListWithCursor(p)
addrs, cursor, err = storage.ListWithCursor(1024, cursor)
if err != nil {
if errors.Is(err, engine.ErrEndOfListing) {
return nil
Expand All @@ -44,13 +47,11 @@ func listFunc(cmd *cobra.Command, _ []string) error {
return fmt.Errorf("Storage iterator failure: %w", err)
}
}
var addrs = r.AddressList()
for _, at := range addrs {
_, err = io.WriteString(w, at.Address.String()+"\n")
if err != nil {
return fmt.Errorf("print failure: %w", err)
}
}
p.WithCursor(r.Cursor())
}
}
4 changes: 2 additions & 2 deletions cmd/neofs-node/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,13 +431,13 @@ type localStorageLoad struct {
}

func (d *localStorageLoad) Iterate(f loadcontroller.UsedSpaceFilter, h loadcontroller.UsedSpaceHandler) error {
idList, err := engine.ListContainers(d.engine)
idList, err := d.engine.ListContainers()
if err != nil {
return fmt.Errorf("list containers on engine failure: %w", err)
}

for i := range idList {
sz, err := engine.ContainerSize(d.engine, idList[i])
sz, err := d.engine.ContainerSize(idList[i])
if err != nil {
d.log.Debug("failed to calculate container size in storage engine",
zap.Stringer("cid", idList[i]),
Expand Down
20 changes: 3 additions & 17 deletions cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,7 @@ func initObjectService(c *cfg) {
policer.WithHeadTimeout(c.applicationConfiguration.policer.headTimeout),
policer.WithReplicator(c.replicator),
policer.WithRedundantCopyCallback(func(addr oid.Address) {
var inhumePrm engine.InhumePrm
inhumePrm.MarkAsGarbage(addr)

_, err := ls.Inhume(inhumePrm)
err := ls.Delete(addr)
if err != nil {
c.log.Warn("could not inhume mark redundant copy as garbage",
zap.String("error", err.Error()),
Expand Down Expand Up @@ -516,32 +513,21 @@ func (e storageEngine) IsLocked(address oid.Address) (bool, error) {
}

func (e storageEngine) Delete(tombstone oid.Address, tombExpiration uint64, toDelete []oid.ID) error {
var prm engine.InhumePrm

addrs := make([]oid.Address, len(toDelete))
for i := range addrs {
addrs[i].SetContainer(tombstone.Container())
addrs[i].SetObject(toDelete[i])
}

prm.WithTombstone(tombstone, tombExpiration, addrs...)

_, err := e.engine.Inhume(prm)
return err
return e.engine.Inhume(tombstone, tombExpiration, addrs...)
}

func (e storageEngine) Lock(locker oid.Address, toLock []oid.ID) error {
return e.engine.Lock(locker.Container(), locker.Object(), toLock)
}

func (e storageEngine) Put(o *objectSDK.Object, objBin []byte, hdrLen int) error {
var putPrm engine.PutPrm
putPrm.WithObject(o)
if objBin != nil {
putPrm.SetObjectBinary(objBin, hdrLen)
}
_, err := e.engine.Put(putPrm)
return err
return e.engine.Put(o, objBin, hdrLen)
}

func cachedHeaderSource(getSvc *getsvc.Service, cacheSize int, l *zap.Logger) headerSource {
Expand Down
109 changes: 29 additions & 80 deletions pkg/local_object_storage/engine/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,118 +12,69 @@ import (
"golang.org/x/sync/errgroup"
)

// ContainerSizePrm groups parameters of ContainerSize operation.
type ContainerSizePrm struct {
cnr cid.ID
}

// ContainerSizeRes resulting values of ContainerSize operation.
type ContainerSizeRes struct {
size uint64
}

// ListContainersPrm groups parameters of ListContainers operation.
type ListContainersPrm struct{}

// ListContainersRes groups the resulting values of ListContainers operation.
type ListContainersRes struct {
containers []cid.ID
}

// SetContainerID sets the identifier of the container to estimate the size.
func (p *ContainerSizePrm) SetContainerID(cnr cid.ID) {
p.cnr = cnr
}

// Size returns calculated estimation of the container size.
func (r ContainerSizeRes) Size() uint64 {
return r.size
}

// Containers returns a list of identifiers of the containers in which local objects are stored.
func (r ListContainersRes) Containers() []cid.ID {
return r.containers
}

// ContainerSize returns the sum of estimation container sizes among all shards.
//
// Returns an error if executions are blocked (see BlockExecution).
func (e *StorageEngine) ContainerSize(prm ContainerSizePrm) (res ContainerSizeRes, err error) {
func (e *StorageEngine) ContainerSize(cnr cid.ID) (uint64, error) {
var (
err error
size uint64
)
if e.metrics != nil {
defer elapsed(e.metrics.AddEstimateContainerSizeDuration)()
}

err = e.execIfNotBlocked(func() error {
res, err = e.containerSize(prm)
size, err = e.containerSize(cnr)
return err
})

return
return size, err
}

// ContainerSize calls ContainerSize method on engine to calculate sum of estimation container sizes among all shards.
func ContainerSize(e *StorageEngine, id cid.ID) (uint64, error) {
var prm ContainerSizePrm

prm.SetContainerID(id)

res, err := e.ContainerSize(prm)
if err != nil {
return 0, err
}

return res.Size(), nil
}

func (e *StorageEngine) containerSize(prm ContainerSizePrm) (res ContainerSizeRes, err error) {
if e.metrics != nil {
defer elapsed(e.metrics.AddEstimateContainerSizeDuration)()
}
func (e *StorageEngine) containerSize(cnr cid.ID) (uint64, error) {
var size uint64

e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
var csPrm shard.ContainerSizePrm
csPrm.SetContainerID(prm.cnr)
csPrm.SetContainerID(cnr)

csRes, err := sh.Shard.ContainerSize(csPrm)
if err != nil {
e.reportShardError(sh, "can't get container size", err,
zap.Stringer("container_id", prm.cnr))
zap.Stringer("container_id", cnr))
return false
}

res.size += csRes.Size()
size += csRes.Size()

return false
})

return
return size, nil
}

// ListContainers returns a unique container IDs presented in the engine objects.
//
// Returns an error if executions are blocked (see BlockExecution).
func (e *StorageEngine) ListContainers(_ ListContainersPrm) (res ListContainersRes, err error) {
func (e *StorageEngine) ListContainers() ([]cid.ID, error) {
var (
res []cid.ID
err error
)
if e.metrics != nil {
defer elapsed(e.metrics.AddListContainersDuration)()
}

err = e.execIfNotBlocked(func() error {
res, err = e.listContainers()
return err
})

return
return res, err
}

// ListContainers calls ListContainers method on engine to get a unique container IDs presented in the engine objects.
func ListContainers(e *StorageEngine) ([]cid.ID, error) {
var prm ListContainersPrm

res, err := e.ListContainers(prm)
if err != nil {
return nil, err
}

return res.Containers(), nil
}

func (e *StorageEngine) listContainers() (ListContainersRes, error) {
if e.metrics != nil {
defer elapsed(e.metrics.AddListContainersDuration)()
}

func (e *StorageEngine) listContainers() ([]cid.ID, error) {
uniqueIDs := make(map[cid.ID]struct{})

e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
Expand All @@ -147,9 +98,7 @@ func (e *StorageEngine) listContainers() (ListContainersRes, error) {
result = append(result, cnr)
}

return ListContainersRes{
containers: result,
}, nil
return result, nil
}

// DeleteContainer deletes container's objects that engine stores.
Expand Down
16 changes: 4 additions & 12 deletions pkg/local_object_storage/engine/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,25 +51,17 @@ func TestStorageEngine_ContainerCleanUp(t *testing.T) {
o2 := objecttest.Object()
o2.SetPayload(make([]byte, errSmallSize+1))

var prmPut PutPrm
prmPut.WithObject(&o1)

_, err := e.Put(prmPut)
err := e.Put(&o1, nil, 0)
require.NoError(t, err)

prmPut.WithObject(&o2)
_, err = e.Put(prmPut)
err = e.Put(&o2, nil, 0)
require.NoError(t, err)

require.NoError(t, e.Init())

require.Eventually(t, func() bool {
var prmGet GetPrm
prmGet.WithAddress(object.AddressOf(&o1))
_, err1 := e.Get(prmGet)

prmGet.WithAddress(object.AddressOf(&o2))
_, err2 := e.Get(prmGet)
_, err1 := e.Get(object.AddressOf(&o1))
_, err2 := e.Get(object.AddressOf(&o2))

return errors.Is(err1, new(apistatus.ObjectNotFound)) && errors.Is(err2, new(apistatus.ObjectNotFound))
}, time.Second, 100*time.Millisecond)
Expand Down
8 changes: 4 additions & 4 deletions pkg/local_object_storage/engine/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,28 +154,28 @@ func TestExecBlocks(t *testing.T) {

addr := object.AddressOf(obj)

require.NoError(t, Put(e, obj))
require.NoError(t, e.Put(obj, nil, 0))

// block executions
errBlock := errors.New("block exec err")

require.NoError(t, e.BlockExecution(errBlock))

// try to exec some op
_, err := Head(e, addr)
_, err := e.Head(addr, false)
require.ErrorIs(t, err, errBlock)

// resume executions
require.NoError(t, e.ResumeExecution())

_, err = Head(e, addr) // can be any data-related op
_, err = e.Head(addr, false) // can be any data-related op
require.NoError(t, err)

// close
require.NoError(t, e.Close())

// try exec after close
_, err = Head(e, addr)
_, err = e.Head(addr, false)
require.Error(t, err)

// try to resume
Expand Down
Loading

0 comments on commit 6ea74d2

Please sign in to comment.