diff --git a/cmd/neofs-lens/internal/storage/get.go b/cmd/neofs-lens/internal/storage/get.go index e8b96f4f61..5848544f35 100644 --- a/cmd/neofs-lens/internal/storage/get.go +++ b/cmd/neofs-lens/internal/storage/get.go @@ -4,7 +4,6 @@ import ( "fmt" common "github.com/nspcc-dev/neofs-node/cmd/neofs-lens/internal" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/spf13/cobra" ) @@ -38,7 +37,7 @@ func getFunc(cmd *cobra.Command, _ []string) error { } defer storage.Close() - obj, err := engine.Get(storage, addr) + obj, err := storage.Get(addr) if err != nil { return fmt.Errorf("could not fetch object: %w", err) } diff --git a/cmd/neofs-lens/internal/storage/list.go b/cmd/neofs-lens/internal/storage/list.go index 86fb48494c..c8d19cf120 100644 --- a/cmd/neofs-lens/internal/storage/list.go +++ b/cmd/neofs-lens/internal/storage/list.go @@ -6,6 +6,7 @@ import ( "io" common "github.com/nspcc-dev/neofs-node/cmd/neofs-lens/internal" + objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" "github.com/spf13/cobra" ) @@ -32,10 +33,12 @@ func listFunc(cmd *cobra.Command, _ []string) error { } defer storage.Close() - var p engine.ListWithCursorPrm - p.WithCount(1024) + var ( + addrs []objectcore.AddressWithType + cursor *engine.Cursor + ) for { - r, err := storage.ListWithCursor(p) + addrs, cursor, err = storage.ListWithCursor(1024, cursor) if err != nil { if errors.Is(err, engine.ErrEndOfListing) { return nil @@ -44,13 +47,11 @@ func listFunc(cmd *cobra.Command, _ []string) error { return fmt.Errorf("Storage iterator failure: %w", err) } } - var addrs = r.AddressList() for _, at := range addrs { _, err = io.WriteString(w, at.Address.String()+"\n") if err != nil { return fmt.Errorf("print failure: %w", err) } } - p.WithCursor(r.Cursor()) } } diff --git a/cmd/neofs-node/container.go b/cmd/neofs-node/container.go index 8af3b200c1..9a6f0d35b8 100644 --- a/cmd/neofs-node/container.go +++ b/cmd/neofs-node/container.go @@ -431,13 +431,13 @@ type localStorageLoad struct { } func (d *localStorageLoad) Iterate(f loadcontroller.UsedSpaceFilter, h loadcontroller.UsedSpaceHandler) error { - idList, err := engine.ListContainers(d.engine) + idList, err := d.engine.ListContainers() if err != nil { return fmt.Errorf("list containers on engine failure: %w", err) } for i := range idList { - sz, err := engine.ContainerSize(d.engine, idList[i]) + sz, err := d.engine.ContainerSize(idList[i]) if err != nil { d.log.Debug("failed to calculate container size in storage engine", zap.Stringer("cid", idList[i]), diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index 43ed23346e..aa4aff5ce7 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -207,10 +207,7 @@ func initObjectService(c *cfg) { policer.WithHeadTimeout(c.applicationConfiguration.policer.headTimeout), policer.WithReplicator(c.replicator), policer.WithRedundantCopyCallback(func(addr oid.Address) { - var inhumePrm engine.InhumePrm - inhumePrm.MarkAsGarbage(addr) - - _, err := ls.Inhume(inhumePrm) + err := ls.Delete(addr) if err != nil { c.log.Warn("could not inhume mark redundant copy as garbage", zap.String("error", err.Error()), @@ -516,18 +513,13 @@ func (e storageEngine) IsLocked(address oid.Address) (bool, error) { } func (e storageEngine) Delete(tombstone oid.Address, tombExpiration uint64, toDelete []oid.ID) error { - var prm engine.InhumePrm - addrs := make([]oid.Address, len(toDelete)) for i := range addrs { addrs[i].SetContainer(tombstone.Container()) addrs[i].SetObject(toDelete[i]) } - prm.WithTombstone(tombstone, tombExpiration, addrs...) - - _, err := e.engine.Inhume(prm) - return err + return e.engine.Inhume(tombstone, tombExpiration, addrs...) } func (e storageEngine) Lock(locker oid.Address, toLock []oid.ID) error { @@ -535,13 +527,7 @@ func (e storageEngine) Lock(locker oid.Address, toLock []oid.ID) error { } func (e storageEngine) Put(o *objectSDK.Object, objBin []byte, hdrLen int) error { - var putPrm engine.PutPrm - putPrm.WithObject(o) - if objBin != nil { - putPrm.SetObjectBinary(objBin, hdrLen) - } - _, err := e.engine.Put(putPrm) - return err + return e.engine.Put(o, objBin, hdrLen) } func cachedHeaderSource(getSvc *getsvc.Service, cacheSize int, l *zap.Logger) headerSource { diff --git a/pkg/local_object_storage/engine/container.go b/pkg/local_object_storage/engine/container.go index 472ecfac94..d933d18fcf 100644 --- a/pkg/local_object_storage/engine/container.go +++ b/pkg/local_object_storage/engine/container.go @@ -12,118 +12,69 @@ import ( "golang.org/x/sync/errgroup" ) -// ContainerSizePrm groups parameters of ContainerSize operation. -type ContainerSizePrm struct { - cnr cid.ID -} - -// ContainerSizeRes resulting values of ContainerSize operation. -type ContainerSizeRes struct { - size uint64 -} - -// ListContainersPrm groups parameters of ListContainers operation. -type ListContainersPrm struct{} - -// ListContainersRes groups the resulting values of ListContainers operation. -type ListContainersRes struct { - containers []cid.ID -} - -// SetContainerID sets the identifier of the container to estimate the size. -func (p *ContainerSizePrm) SetContainerID(cnr cid.ID) { - p.cnr = cnr -} - -// Size returns calculated estimation of the container size. -func (r ContainerSizeRes) Size() uint64 { - return r.size -} - -// Containers returns a list of identifiers of the containers in which local objects are stored. -func (r ListContainersRes) Containers() []cid.ID { - return r.containers -} - // ContainerSize returns the sum of estimation container sizes among all shards. // // Returns an error if executions are blocked (see BlockExecution). -func (e *StorageEngine) ContainerSize(prm ContainerSizePrm) (res ContainerSizeRes, err error) { +func (e *StorageEngine) ContainerSize(cnr cid.ID) (uint64, error) { + var ( + err error + size uint64 + ) + if e.metrics != nil { + defer elapsed(e.metrics.AddEstimateContainerSizeDuration)() + } + err = e.execIfNotBlocked(func() error { - res, err = e.containerSize(prm) + size, err = e.containerSize(cnr) return err }) - return + return size, err } -// ContainerSize calls ContainerSize method on engine to calculate sum of estimation container sizes among all shards. -func ContainerSize(e *StorageEngine, id cid.ID) (uint64, error) { - var prm ContainerSizePrm - - prm.SetContainerID(id) - - res, err := e.ContainerSize(prm) - if err != nil { - return 0, err - } - - return res.Size(), nil -} - -func (e *StorageEngine) containerSize(prm ContainerSizePrm) (res ContainerSizeRes, err error) { - if e.metrics != nil { - defer elapsed(e.metrics.AddEstimateContainerSizeDuration)() - } +func (e *StorageEngine) containerSize(cnr cid.ID) (uint64, error) { + var size uint64 e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { var csPrm shard.ContainerSizePrm - csPrm.SetContainerID(prm.cnr) + csPrm.SetContainerID(cnr) csRes, err := sh.Shard.ContainerSize(csPrm) if err != nil { e.reportShardError(sh, "can't get container size", err, - zap.Stringer("container_id", prm.cnr)) + zap.Stringer("container_id", cnr)) return false } - res.size += csRes.Size() + size += csRes.Size() return false }) - return + return size, nil } // ListContainers returns a unique container IDs presented in the engine objects. // // Returns an error if executions are blocked (see BlockExecution). -func (e *StorageEngine) ListContainers(_ ListContainersPrm) (res ListContainersRes, err error) { +func (e *StorageEngine) ListContainers() ([]cid.ID, error) { + var ( + res []cid.ID + err error + ) + if e.metrics != nil { + defer elapsed(e.metrics.AddListContainersDuration)() + } + err = e.execIfNotBlocked(func() error { res, err = e.listContainers() return err }) - return + return res, err } -// ListContainers calls ListContainers method on engine to get a unique container IDs presented in the engine objects. -func ListContainers(e *StorageEngine) ([]cid.ID, error) { - var prm ListContainersPrm - - res, err := e.ListContainers(prm) - if err != nil { - return nil, err - } - - return res.Containers(), nil -} - -func (e *StorageEngine) listContainers() (ListContainersRes, error) { - if e.metrics != nil { - defer elapsed(e.metrics.AddListContainersDuration)() - } - +func (e *StorageEngine) listContainers() ([]cid.ID, error) { uniqueIDs := make(map[cid.ID]struct{}) e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { @@ -147,9 +98,7 @@ func (e *StorageEngine) listContainers() (ListContainersRes, error) { result = append(result, cnr) } - return ListContainersRes{ - containers: result, - }, nil + return result, nil } // DeleteContainer deletes container's objects that engine stores. diff --git a/pkg/local_object_storage/engine/container_test.go b/pkg/local_object_storage/engine/container_test.go index 2c3ae9dded..76c3dffe5e 100644 --- a/pkg/local_object_storage/engine/container_test.go +++ b/pkg/local_object_storage/engine/container_test.go @@ -51,25 +51,17 @@ func TestStorageEngine_ContainerCleanUp(t *testing.T) { o2 := objecttest.Object() o2.SetPayload(make([]byte, errSmallSize+1)) - var prmPut PutPrm - prmPut.WithObject(&o1) - - _, err := e.Put(prmPut) + err := e.Put(&o1, nil, 0) require.NoError(t, err) - prmPut.WithObject(&o2) - _, err = e.Put(prmPut) + err = e.Put(&o2, nil, 0) require.NoError(t, err) require.NoError(t, e.Init()) require.Eventually(t, func() bool { - var prmGet GetPrm - prmGet.WithAddress(object.AddressOf(&o1)) - _, err1 := e.Get(prmGet) - - prmGet.WithAddress(object.AddressOf(&o2)) - _, err2 := e.Get(prmGet) + _, err1 := e.Get(object.AddressOf(&o1)) + _, err2 := e.Get(object.AddressOf(&o2)) return errors.Is(err1, new(apistatus.ObjectNotFound)) && errors.Is(err2, new(apistatus.ObjectNotFound)) }, time.Second, 100*time.Millisecond) diff --git a/pkg/local_object_storage/engine/control_test.go b/pkg/local_object_storage/engine/control_test.go index 7dd8d8647c..3d42a0acc3 100644 --- a/pkg/local_object_storage/engine/control_test.go +++ b/pkg/local_object_storage/engine/control_test.go @@ -154,7 +154,7 @@ func TestExecBlocks(t *testing.T) { addr := object.AddressOf(obj) - require.NoError(t, Put(e, obj)) + require.NoError(t, e.Put(obj, nil, 0)) // block executions errBlock := errors.New("block exec err") @@ -162,20 +162,20 @@ func TestExecBlocks(t *testing.T) { require.NoError(t, e.BlockExecution(errBlock)) // try to exec some op - _, err := Head(e, addr) + _, err := e.Head(addr, false) require.ErrorIs(t, err, errBlock) // resume executions require.NoError(t, e.ResumeExecution()) - _, err = Head(e, addr) // can be any data-related op + _, err = e.Head(addr, false) // can be any data-related op require.NoError(t, err) // close require.NoError(t, e.Close()) // try exec after close - _, err = Head(e, addr) + _, err = e.Head(addr, false) require.Error(t, err) // try to resume diff --git a/pkg/local_object_storage/engine/delete.go b/pkg/local_object_storage/engine/delete.go index 37c18f89d5..185b9a2dcb 100644 --- a/pkg/local_object_storage/engine/delete.go +++ b/pkg/local_object_storage/engine/delete.go @@ -1,79 +1,25 @@ package engine import ( - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" - apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" - "go.uber.org/zap" ) -// DeletePrm groups the parameters of Delete operation. -type DeletePrm struct { - addr oid.Address - - forceRemoval bool -} - -// DeleteRes groups the resulting values of Delete operation. -type DeleteRes struct{} - -// WithAddress is a Delete option to set the addresses of the objects to delete. -// -// Option is required. -func (p *DeletePrm) WithAddress(addr oid.Address) { - p.addr = addr -} - -// WithForceRemoval is a Delete option to remove an object despite any -// restrictions imposed on deleting that object. Expected to be used -// only in control service. -func (p *DeletePrm) WithForceRemoval() { - p.forceRemoval = true -} - // Delete marks the objects to be removed. // // Returns an error if executions are blocked (see BlockExecution). // -// Returns apistatus.ObjectLocked if at least one object is locked. -// In this case no object from the list is marked to be deleted. -// -// NOTE: Marks any object to be deleted (despite any prohibitions -// on operations with that object) if WithForceRemoval option has -// been provided. -func (e *StorageEngine) Delete(prm DeletePrm) (res DeleteRes, err error) { - err = e.execIfNotBlocked(func() error { - res, err = e.delete(prm) - return err - }) - - return -} - -func (e *StorageEngine) delete(prm DeletePrm) (DeleteRes, error) { +// NOTE: This is a forced removal, marks any object to be deleted (despite +// any prohibitions on operations with that object). +func (e *StorageEngine) Delete(addr oid.Address) error { if e.metrics != nil { defer elapsed(e.metrics.AddDeleteDuration)() } - if !prm.forceRemoval { - locked, err := e.isLocked(prm.addr) - if err != nil { - e.log.Warn("deleting an object without full locking check", - zap.Error(err), - zap.Stringer("addr", prm.addr)) - } else if locked { - var lockedErr apistatus.ObjectLocked - return DeleteRes{}, lockedErr - } - } - - var inhumePrm shard.InhumePrm - inhumePrm.MarkAsGarbage(prm.addr) - if prm.forceRemoval { - inhumePrm.ForceRemoval() - } - - _, err := e.inhumeAddr(prm.addr, inhumePrm) + return e.execIfNotBlocked(func() error { + return e.deleteObj(addr, true) + }) +} - return DeleteRes{}, err +func (e *StorageEngine) deleteObj(addr oid.Address, force bool) error { + return e.inhume([]oid.Address{addr}, force, nil, 0) } diff --git a/pkg/local_object_storage/engine/delete_test.go b/pkg/local_object_storage/engine/delete_test.go index 6207d0ac13..5740c9d90b 100644 --- a/pkg/local_object_storage/engine/delete_test.go +++ b/pkg/local_object_storage/engine/delete_test.go @@ -56,9 +56,9 @@ func TestDeleteBigObject(t *testing.T) { defer e.Close() for i := range children { - require.NoError(t, Put(e, children[i])) + require.NoError(t, e.Put(children[i], nil, 0)) } - require.NoError(t, Put(e, link)) + require.NoError(t, e.Put(link, nil, 0)) var splitErr *objectSDK.SplitInfoError @@ -72,11 +72,7 @@ func TestDeleteBigObject(t *testing.T) { checkGetError(t, e, object.AddressOf(children[i]), nil) } - var deletePrm DeletePrm - deletePrm.WithForceRemoval() - deletePrm.WithAddress(addrParent) - - _, err := e.Delete(deletePrm) + err := e.Delete(addrParent) require.NoError(t, err) checkGetError(t, e, addrParent, &apistatus.ObjectNotFound{}) @@ -87,10 +83,7 @@ func TestDeleteBigObject(t *testing.T) { } func checkGetError(t *testing.T, e *StorageEngine, addr oid.Address, expected any) { - var getPrm GetPrm - getPrm.WithAddress(addr) - - _, err := e.Get(getPrm) + _, err := e.Get(addr) if expected != nil { require.ErrorAs(t, err, expected) } else { diff --git a/pkg/local_object_storage/engine/engine_test.go b/pkg/local_object_storage/engine/engine_test.go index f25d42ca5d..1c0dfe83bf 100644 --- a/pkg/local_object_storage/engine/engine_test.go +++ b/pkg/local_object_storage/engine/engine_test.go @@ -64,7 +64,7 @@ func benchmarkExists(b *testing.B, shardNum int) { addr := oidtest.Address() for range 100 { obj := generateObjectWithCID(cidtest.ID()) - err := Put(e, obj) + err := e.Put(obj, nil, 0) if err != nil { b.Fatal(err) } diff --git a/pkg/local_object_storage/engine/error_test.go b/pkg/local_object_storage/engine/error_test.go index 7c9b549c4e..371a7e5f56 100644 --- a/pkg/local_object_storage/engine/error_test.go +++ b/pkg/local_object_storage/engine/error_test.go @@ -75,7 +75,7 @@ func TestErrorReporting(t *testing.T) { e.mtx.RUnlock() require.NoError(t, err) - _, err = e.Get(GetPrm{addr: object.AddressOf(obj)}) + _, err = e.Get(object.AddressOf(obj)) require.NoError(t, err) checkShardState(t, e, id[0], 0, mode.ReadWrite) @@ -84,7 +84,7 @@ func TestErrorReporting(t *testing.T) { corruptSubDir(t, filepath.Join(dir, "0")) for i := uint32(1); i < 3; i++ { - _, err = e.Get(GetPrm{addr: object.AddressOf(obj)}) + _, err = e.Get(object.AddressOf(obj)) require.Error(t, err) checkShardState(t, e, id[0], i, mode.ReadWrite) checkShardState(t, e, id[1], 0, mode.ReadWrite) @@ -105,7 +105,7 @@ func TestErrorReporting(t *testing.T) { e.mtx.RUnlock() require.NoError(t, err) - _, err = e.Get(GetPrm{addr: object.AddressOf(obj)}) + _, err = e.Get(object.AddressOf(obj)) require.NoError(t, err) checkShardState(t, e, id[0], 0, mode.ReadWrite) @@ -114,14 +114,14 @@ func TestErrorReporting(t *testing.T) { corruptSubDir(t, filepath.Join(dir, "0")) for i := uint32(1); i < errThreshold; i++ { - _, err = e.Get(GetPrm{addr: object.AddressOf(obj)}) + _, err = e.Get(object.AddressOf(obj)) require.Error(t, err) checkShardState(t, e, id[0], i, mode.ReadWrite) checkShardState(t, e, id[1], 0, mode.ReadWrite) } for i := range uint32(2) { - _, err = e.Get(GetPrm{addr: object.AddressOf(obj)}) + _, err = e.Get(object.AddressOf(obj)) require.Error(t, err) checkShardState(t, e, id[0], errThreshold+i, mode.DegradedReadOnly) checkShardState(t, e, id[1], 0, mode.ReadWrite) @@ -159,9 +159,9 @@ func TestBlobstorFailback(t *testing.T) { for i := range objs { addr := object.AddressOf(objs[i]) - _, err = e.Get(GetPrm{addr: addr}) + _, err = e.Get(addr) require.NoError(t, err) - _, err = e.GetRange(RngPrm{addr: addr}) + _, err = e.GetRange(addr, 0, 0) require.NoError(t, err) } @@ -179,15 +179,15 @@ func TestBlobstorFailback(t *testing.T) { for i := range objs { addr := object.AddressOf(objs[i]) - getRes, err := e.Get(GetPrm{addr: addr}) + getObj, err := e.Get(addr) require.NoError(t, err) - require.Equal(t, objs[i], getRes.Object()) + require.Equal(t, objs[i], getObj) - rngRes, err := e.GetRange(RngPrm{addr: addr, off: 1, ln: 10}) + rngRes, err := e.GetRange(addr, 1, 10) require.NoError(t, err) - require.Equal(t, objs[i].Payload()[1:11], rngRes.Object().Payload()) + require.Equal(t, objs[i].Payload()[1:11], rngRes) - _, err = e.GetRange(RngPrm{addr: addr, off: errSmallSize + 10, ln: 1}) + _, err = e.GetRange(addr, errSmallSize+10, 1) require.ErrorAs(t, err, &apistatus.ObjectOutOfRange{}) } diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index aaade013fd..a1629316e9 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -13,39 +13,6 @@ import ( "go.uber.org/zap" ) -// EvacuateShardPrm represents parameters for the EvacuateShard operation. -type EvacuateShardPrm struct { - shardID []*shard.ID - handler func(oid.Address, *objectSDK.Object) error - ignoreErrors bool -} - -// EvacuateShardRes represents result of the EvacuateShard operation. -type EvacuateShardRes struct { - count int -} - -// WithShardIDList sets shard ID. -func (p *EvacuateShardPrm) WithShardIDList(id []*shard.ID) { - p.shardID = id -} - -// WithIgnoreErrors sets flag to ignore errors. -func (p *EvacuateShardPrm) WithIgnoreErrors(ignore bool) { - p.ignoreErrors = ignore -} - -// WithFaultHandler sets handler to call for objects which cannot be saved on other shards. -func (p *EvacuateShardPrm) WithFaultHandler(f func(oid.Address, *objectSDK.Object) error) { - p.handler = f -} - -// Count returns amount of evacuated objects. -// Objects for which handler returned no error are also assumed evacuated. -func (p EvacuateShardRes) Count() int { - return p.count -} - const defaultEvacuateBatchSize = 100 type pooledShard struct { @@ -55,12 +22,18 @@ type pooledShard struct { var errMustHaveTwoShards = errors.New("must have at least 1 spare shard") -// Evacuate moves data from one shard to the others. -// The shard being moved must be in read-only mode. -func (e *StorageEngine) Evacuate(prm EvacuateShardPrm) (EvacuateShardRes, error) { - sidList := make([]string, len(prm.shardID)) - for i := range prm.shardID { - sidList[i] = prm.shardID[i].String() +// Evacuate moves data from a set of given shards to other shards available to +// this engine (so at least one shard must be available unless faultHandler is +// given). Shards being moved must be in read-only mode. Will return an error +// if unable to get an object unless ignoreErrors is set to true. If unable +// to put an object into any of the provided shards invokes faultHandler +// (if provided, fails otherwise) which can return its own error to abort +// evacuation (or nil to continue). Returns the number of evacuated objects +// (which can be non-zero even in case of error). +func (e *StorageEngine) Evacuate(shardIDs []*shard.ID, ignoreErrors bool, faultHandler func(oid.Address, *objectSDK.Object) error) (int, error) { + sidList := make([]string, len(shardIDs)) + for i := range shardIDs { + sidList[i] = shardIDs[i].String() } e.mtx.RLock() @@ -68,18 +41,18 @@ func (e *StorageEngine) Evacuate(prm EvacuateShardPrm) (EvacuateShardRes, error) sh, ok := e.shards[sidList[i]] if !ok { e.mtx.RUnlock() - return EvacuateShardRes{}, errShardNotFound + return 0, errShardNotFound } if !sh.GetMode().ReadOnly() { e.mtx.RUnlock() - return EvacuateShardRes{}, shard.ErrMustBeReadOnly + return 0, shard.ErrMustBeReadOnly } } - if len(e.shards)-len(sidList) < 1 && prm.handler == nil { + if len(e.shards)-len(sidList) < 1 && faultHandler == nil { e.mtx.RUnlock() - return EvacuateShardRes{}, errMustHaveTwoShards + return 0, errMustHaveTwoShards } e.log.Info("started shards evacuation", zap.Strings("shard_ids", sidList)) @@ -108,7 +81,7 @@ func (e *StorageEngine) Evacuate(prm EvacuateShardPrm) (EvacuateShardRes, error) var listPrm shard.ListWithCursorPrm listPrm.WithCount(defaultEvacuateBatchSize) - var res EvacuateShardRes + var count int mainLoop: for n := range sidList { @@ -125,7 +98,7 @@ mainLoop: if errors.Is(err, meta.ErrEndOfListing) || errors.Is(err, shard.ErrDegradedMode) { continue mainLoop } - return res, err + return count, err } // TODO (@fyrchik): #1731 parallelize the loop @@ -141,10 +114,10 @@ mainLoop: getRes, err := sh.Get(getPrm) if err != nil { - if prm.ignoreErrors { + if ignoreErrors { continue } - return res, err + return count, err } hrw.Sort(shards, addrHash) @@ -152,7 +125,7 @@ mainLoop: if _, ok := shardMap[shards[j].ID().String()]; ok { continue } - putDone, exists := e.putToShard(shards[j].hashedShard, j, shards[j].pool, addr, PutPrm{obj: getRes.Object()}) + putDone, exists := e.putToShard(shards[j].hashedShard, j, shards[j].pool, addr, getRes.Object(), nil, 0) if putDone || exists { if putDone { e.log.Debug("object is moved to another shard", @@ -160,7 +133,7 @@ mainLoop: zap.Stringer("to", shards[j].ID()), zap.Stringer("addr", addr)) - res.count++ + count++ } continue loop } @@ -168,17 +141,17 @@ mainLoop: e.log.Debug("could not put to shard, trying another", zap.String("shard", shards[j].ID().String())) } - if prm.handler == nil { + if faultHandler == nil { // Do not check ignoreErrors flag here because // ignoring errors on put make this command kinda useless. - return res, fmt.Errorf("%w: %s", errPutShard, lst[i]) + return count, fmt.Errorf("%w: %s", errPutShard, lst[i]) } - err = prm.handler(addr, getRes.Object()) + err = faultHandler(addr, getRes.Object()) if err != nil { - return res, err + return count, err } - res.count++ + count++ } c = listRes.Cursor() @@ -187,5 +160,5 @@ mainLoop: e.log.Info("finished shards evacuation", zap.Strings("shard_ids", sidList)) - return res, nil + return count, nil } diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index f4c3b283a6..5763f7ee6b 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -55,10 +55,7 @@ func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEng for i := 0; ; i++ { objects = append(objects, generateObjectWithCID(cidtest.ID())) - var putPrm PutPrm - putPrm.WithObject(objects[i]) - - _, err := e.Put(putPrm) + err := e.Put(objects[i], nil, 0) require.NoError(t, err) res, err := e.shards[ids[len(ids)-1].String()].List() @@ -79,30 +76,24 @@ func TestEvacuateShard(t *testing.T) { checkHasObjects := func(t *testing.T) { for i := range objects { - var prm GetPrm - prm.WithAddress(objectCore.AddressOf(objects[i])) - - _, err := e.Get(prm) + _, err := e.Get(objectCore.AddressOf(objects[i])) require.NoError(t, err) } } checkHasObjects(t) - var prm EvacuateShardPrm - prm.WithShardIDList(ids[2:3]) - t.Run("must be read-only", func(t *testing.T) { - res, err := e.Evacuate(prm) + count, err := e.Evacuate(ids[2:3], false, nil) require.ErrorIs(t, err, shard.ErrMustBeReadOnly) - require.Equal(t, 0, res.Count()) + require.Equal(t, 0, count) }) require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly)) - res, err := e.Evacuate(prm) + count, err := e.Evacuate(ids[2:3], false, nil) require.NoError(t, err) - require.Equal(t, objPerShard, res.count) + require.Equal(t, objPerShard, count) // We check that all objects are available both before and after shard removal. // First case is a real-world use-case. It ensures that an object can be put in presence @@ -111,9 +102,9 @@ func TestEvacuateShard(t *testing.T) { checkHasObjects(t) // Calling it again is OK, but all objects are already moved, so no new PUTs should be done. - res, err = e.Evacuate(prm) + count, err = e.Evacuate(ids[2:3], false, nil) require.NoError(t, err) - require.Equal(t, 0, res.count) + require.Equal(t, 0, count) checkHasObjects(t) @@ -153,18 +144,13 @@ func TestEvacuateNetwork(t *testing.T) { require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly)) - var prm EvacuateShardPrm - prm.shardID = ids[0:1] - - res, err := e.Evacuate(prm) + count, err := e.Evacuate(ids[0:1], false, nil) require.ErrorIs(t, err, errMustHaveTwoShards) - require.Equal(t, 0, res.Count()) - - prm.handler = acceptOneOf(objects, 2) + require.Equal(t, 0, count) - res, err = e.Evacuate(prm) + count, err = e.Evacuate(ids[0:1], false, acceptOneOf(objects, 2)) require.ErrorIs(t, err, errReplication) - require.Equal(t, 2, res.Count()) + require.Equal(t, 2, count) }) t.Run("multiple shards, evacuate one", func(t *testing.T) { e, ids, objects := newEngineEvacuate(t, 2, 3) @@ -172,20 +158,14 @@ func TestEvacuateNetwork(t *testing.T) { require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly)) require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly)) - var prm EvacuateShardPrm - prm.shardID = ids[1:2] - prm.handler = acceptOneOf(objects, 2) - - res, err := e.Evacuate(prm) + count, err := e.Evacuate(ids[1:2], false, acceptOneOf(objects, 2)) require.ErrorIs(t, err, errReplication) - require.Equal(t, 2, res.Count()) + require.Equal(t, 2, count) t.Run("no errors", func(t *testing.T) { - prm.handler = acceptOneOf(objects, 3) - - res, err := e.Evacuate(prm) + count, err := e.Evacuate(ids[1:2], false, acceptOneOf(objects, 3)) require.NoError(t, err) - require.Equal(t, 3, res.Count()) + require.Equal(t, 3, count) }) }) t.Run("multiple shards, evacuate many", func(t *testing.T) { @@ -204,20 +184,14 @@ func TestEvacuateNetwork(t *testing.T) { require.NoError(t, e.shards[ids[i].String()].SetMode(mode.ReadOnly)) } - var prm EvacuateShardPrm - prm.shardID = evacuateIDs - prm.handler = acceptOneOf(objects, totalCount-1) - - res, err := e.Evacuate(prm) + count, err := e.Evacuate(evacuateIDs, false, acceptOneOf(objects, totalCount-1)) require.ErrorIs(t, err, errReplication) - require.Equal(t, totalCount-1, res.Count()) + require.Equal(t, totalCount-1, count) t.Run("no errors", func(t *testing.T) { - prm.handler = acceptOneOf(objects, totalCount) - - res, err := e.Evacuate(prm) + count, err = e.Evacuate(evacuateIDs, false, acceptOneOf(objects, totalCount)) require.NoError(t, err) - require.Equal(t, totalCount, res.Count()) + require.Equal(t, totalCount, count) }) }) } diff --git a/pkg/local_object_storage/engine/gc_test.go b/pkg/local_object_storage/engine/gc_test.go index 15c5ff283c..e9a97763e4 100644 --- a/pkg/local_object_storage/engine/gc_test.go +++ b/pkg/local_object_storage/engine/gc_test.go @@ -93,10 +93,10 @@ func TestChildrenExpiration(t *testing.T) { link.SetChildren(child1ID, child2ID, child3ID) link.SetSplitID(splitID) - require.NoError(t, Put(e, child1)) - require.NoError(t, Put(e, child2)) - require.NoError(t, Put(e, child3)) - require.NoError(t, Put(e, link)) + require.NoError(t, e.Put(child1, nil, 0)) + require.NoError(t, e.Put(child2, nil, 0)) + require.NoError(t, e.Put(child3, nil, 0)) + require.NoError(t, e.Put(link, nil, 0)) e.HandleNewEpoch(currEpoch + 1) @@ -144,10 +144,10 @@ func TestChildrenExpiration(t *testing.T) { linkObj.CalculateAndSetPayloadChecksum() require.NoError(t, linkObj.CalculateAndSetID()) - require.NoError(t, Put(e, child1)) - require.NoError(t, Put(e, child2)) - require.NoError(t, Put(e, child3)) - require.NoError(t, Put(e, &linkObj)) + require.NoError(t, e.Put(child1, nil, 0)) + require.NoError(t, e.Put(child2, nil, 0)) + require.NoError(t, e.Put(child3, nil, 0)) + require.NoError(t, e.Put(&linkObj, nil, 0)) e.HandleNewEpoch(currEpoch + 1) @@ -163,7 +163,7 @@ func checkObjectsAsyncRemoval(t *testing.T, e *StorageEngine, cnr cid.ID, objs . for _, obj := range objs { addr.SetObject(obj) - _, err := Get(e, addr) + _, err := e.Get(addr) if !errors.As(err, new(statusSDK.ObjectNotFound)) { return false } diff --git a/pkg/local_object_storage/engine/get.go b/pkg/local_object_storage/engine/get.go index 43ed90eb10..d7ac1941c9 100644 --- a/pkg/local_object_storage/engine/get.go +++ b/pkg/local_object_storage/engine/get.go @@ -12,28 +12,6 @@ import ( "go.uber.org/zap" ) -// GetPrm groups the parameters of Get operation. -type GetPrm struct { - addr oid.Address -} - -// GetRes groups the resulting values of Get operation. -type GetRes struct { - obj *objectSDK.Object -} - -// WithAddress is a Get option to set the address of the requested object. -// -// Option is required. -func (p *GetPrm) WithAddress(addr oid.Address) { - p.addr = addr -} - -// Object returns the requested object. -func (r GetRes) Object() *objectSDK.Object { - return r.obj -} - // Get reads an object from local storage. // // Returns any error encountered that @@ -43,29 +21,34 @@ func (r GetRes) Object() *objectSDK.Object { // Returns an error of type apistatus.ObjectAlreadyRemoved if the object has been marked as removed. // // Returns an error if executions are blocked (see BlockExecution). -func (e *StorageEngine) Get(prm GetPrm) (res GetRes, err error) { - var sp shard.GetPrm - sp.SetAddress(prm.addr) +func (e *StorageEngine) Get(addr oid.Address) (*objectSDK.Object, error) { + var ( + err error + obj *objectSDK.Object + sp shard.GetPrm + ) + + if e.metrics != nil { + defer elapsed(e.metrics.AddGetDuration)() + } + + sp.SetAddress(addr) err = e.execIfNotBlocked(func() error { - return e.get(prm.addr, func(s *shard.Shard, ignoreMetadata bool) (hasMetadata bool, err error) { + return e.get(addr, func(s *shard.Shard, ignoreMetadata bool) (bool, error) { sp.SetIgnoreMeta(ignoreMetadata) sr, err := s.Get(sp) if err != nil { return sr.HasMeta(), err } - res.obj = sr.Object() + obj = sr.Object() return sr.HasMeta(), nil }) }) - return + return obj, err } func (e *StorageEngine) get(addr oid.Address, shardFunc func(s *shard.Shard, ignoreMetadata bool) (hasMetadata bool, err error)) error { - if e.metrics != nil { - defer elapsed(e.metrics.AddGetDuration)() - } - var ( ok bool siErr *objectSDK.SplitInfoError @@ -162,19 +145,6 @@ func (e *StorageEngine) get(addr oid.Address, shardFunc func(s *shard.Shard, ign return nil } -// Get reads object from local storage by provided address. -func Get(storage *StorageEngine, addr oid.Address) (*objectSDK.Object, error) { - var getPrm GetPrm - getPrm.WithAddress(addr) - - res, err := storage.Get(getPrm) - if err != nil { - return nil, err - } - - return res.Object(), nil -} - // GetBytes reads object from the StorageEngine by address into memory buffer in // a canonical NeoFS binary format. Returns [apistatus.ObjectNotFound] if object // is missing. diff --git a/pkg/local_object_storage/engine/get_test.go b/pkg/local_object_storage/engine/get_test.go index c5bf6aee2b..21a02da72c 100644 --- a/pkg/local_object_storage/engine/get_test.go +++ b/pkg/local_object_storage/engine/get_test.go @@ -15,7 +15,7 @@ func TestStorageEngine_GetBytes(t *testing.T) { objBin := obj.Marshal() - err := Put(e, obj) + err := e.Put(obj, nil, 0) require.NoError(t, err) b, err := e.GetBytes(addr) diff --git a/pkg/local_object_storage/engine/head.go b/pkg/local_object_storage/engine/head.go index 0319b5854d..3fbd090900 100644 --- a/pkg/local_object_storage/engine/head.go +++ b/pkg/local_object_storage/engine/head.go @@ -11,39 +11,8 @@ import ( oid "github.com/nspcc-dev/neofs-sdk-go/object/id" ) -// HeadPrm groups the parameters of Head operation. -type HeadPrm struct { - addr oid.Address - raw bool -} - -// HeadRes groups the resulting values of Head operation. -type HeadRes struct { - head *objectSDK.Object -} - -// WithAddress is a Head option to set the address of the requested object. -// -// Option is required. -func (p *HeadPrm) WithAddress(addr oid.Address) { - p.addr = addr -} - -// WithRaw is a Head option to set raw flag value. If flag is unset, then Head -// returns the header of the virtual object, otherwise it returns SplitInfo of the virtual -// object. -func (p *HeadPrm) WithRaw(raw bool) { - p.raw = raw -} - -// Header returns the requested object header. -// -// Instance has empty payload. -func (r HeadRes) Header() *objectSDK.Object { - return r.head -} - -// Head reads object header from local storage. +// Head reads object header from local storage. If raw is true returns +// SplitInfo of the virtual object instead of the virtual object header. // // Returns any error encountered that // did not allow to completely read the object header. @@ -52,20 +21,25 @@ func (r HeadRes) Header() *objectSDK.Object { // Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object was inhumed. // // Returns an error if executions are blocked (see BlockExecution). -func (e *StorageEngine) Head(prm HeadPrm) (res HeadRes, err error) { +func (e *StorageEngine) Head(addr oid.Address, raw bool) (*objectSDK.Object, error) { + var ( + obj *objectSDK.Object + err error + ) + + if e.metrics != nil { + defer elapsed(e.metrics.AddHeadDuration)() + } + err = e.execIfNotBlocked(func() error { - res, err = e.head(prm) + obj, err = e.head(addr, raw) return err }) - return + return obj, err } -func (e *StorageEngine) head(prm HeadPrm) (HeadRes, error) { - if e.metrics != nil { - defer elapsed(e.metrics.AddHeadDuration)() - } - +func (e *StorageEngine) head(addr oid.Address, raw bool) (*objectSDK.Object, error) { var ( head *objectSDK.Object siErr *objectSDK.SplitInfoError @@ -77,10 +51,10 @@ func (e *StorageEngine) head(prm HeadPrm) (HeadRes, error) { ) var shPrm shard.HeadPrm - shPrm.SetAddress(prm.addr) - shPrm.SetRaw(prm.raw) + shPrm.SetAddress(addr) + shPrm.SetRaw(raw) - e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) { + e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { res, err := sh.Head(shPrm) if err != nil { switch { @@ -119,42 +93,12 @@ func (e *StorageEngine) head(prm HeadPrm) (HeadRes, error) { }) if outSI != nil { - return HeadRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI)) + return nil, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI)) } if head == nil { - return HeadRes{}, outError - } - - return HeadRes{ - head: head, - }, nil -} - -// Head reads object header from local storage by provided address. -func Head(storage *StorageEngine, addr oid.Address) (*objectSDK.Object, error) { - var headPrm HeadPrm - headPrm.WithAddress(addr) - - res, err := storage.Head(headPrm) - if err != nil { - return nil, err - } - - return res.Header(), nil -} - -// HeadRaw reads object header from local storage by provided address and raw -// flag. -func HeadRaw(storage *StorageEngine, addr oid.Address, raw bool) (*objectSDK.Object, error) { - var headPrm HeadPrm - headPrm.WithAddress(addr) - headPrm.WithRaw(raw) - - res, err := storage.Head(headPrm) - if err != nil { - return nil, err + return nil, outError } - return res.Header(), nil + return head, nil } diff --git a/pkg/local_object_storage/engine/head_test.go b/pkg/local_object_storage/engine/head_test.go index 50d162c552..6d133d2c34 100644 --- a/pkg/local_object_storage/engine/head_test.go +++ b/pkg/local_object_storage/engine/head_test.go @@ -61,11 +61,7 @@ func TestHeadRaw(t *testing.T) { require.NoError(t, err) // head with raw flag should return SplitInfoError - var headPrm HeadPrm - headPrm.WithAddress(parentAddr) - headPrm.WithRaw(true) - - _, err = e.Head(headPrm) + _, err = e.Head(parentAddr, true) require.Error(t, err) var si *object.SplitInfoError diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index bd21f1def7..45df2e010c 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -13,104 +13,60 @@ import ( "go.uber.org/zap" ) -// InhumePrm encapsulates parameters for inhume operation. -type InhumePrm struct { - tombstone *oid.Address - tombExpiration uint64 - addrs []oid.Address - - forceRemoval bool -} - -// InhumeRes encapsulates results of inhume operation. -type InhumeRes struct{} - -// WithTombstone sets a list of objects that should be inhumed and tombstone address -// as the reason for inhume operation. -// -// addrs should not be empty. -// Should not be called along with MarkAsGarbage. -func (p *InhumePrm) WithTombstone(tombstone oid.Address, tombExpiration uint64, addrs ...oid.Address) { - p.addrs = addrs - p.tombstone = &tombstone - p.tombExpiration = tombExpiration -} - -// MarkAsGarbage marks an object to be physically removed from local storage. -// -// Should not be called along with WithTombstone. -func (p *InhumePrm) MarkAsGarbage(addrs ...oid.Address) { - p.addrs = addrs - p.tombstone = nil -} - -// WithForceRemoval inhumes objects specified via MarkAsGarbage with GC mark -// without any object restrictions checks. -func (p *InhumePrm) WithForceRemoval() { - p.forceRemoval = true - p.tombstone = nil -} - var errInhumeFailure = errors.New("inhume operation failed") -// Inhume calls metabase. Inhume method to mark an object as removed. It won't be -// removed physically from the shard until `Delete` operation. +// Inhume calls [metabase.Inhume] method to mark an object as removed following +// tombstone data. It won't be removed physically from the shard until GC cycle +// does it. // // Allows inhuming non-locked objects only. Returns apistatus.ObjectLocked // if at least one object is locked. // -// NOTE: Marks any object as removed (despite any prohibitions on operations -// with that object) if WithForceRemoval option has been provided. -// // Returns an error if executions are blocked (see BlockExecution). -func (e *StorageEngine) Inhume(prm InhumePrm) (res InhumeRes, err error) { - err = e.execIfNotBlocked(func() error { - res, err = e.inhume(prm) - return err - }) - - return -} - -func (e *StorageEngine) inhume(prm InhumePrm) (InhumeRes, error) { +func (e *StorageEngine) Inhume(tombstone oid.Address, tombExpiration uint64, addrs ...oid.Address) error { if e.metrics != nil { defer elapsed(e.metrics.AddInhumeDuration)() } + return e.execIfNotBlocked(func() error { + return e.inhume(addrs, false, &tombstone, tombExpiration) + }) +} +func (e *StorageEngine) inhume(addrs []oid.Address, force bool, tombstone *oid.Address, tombExpiration uint64) error { var shPrm shard.InhumePrm - if prm.forceRemoval { + if force { shPrm.ForceRemoval() } - for i := range prm.addrs { - if !prm.forceRemoval { - locked, err := e.IsLocked(prm.addrs[i]) + for i := range addrs { + if !force { + locked, err := e.IsLocked(addrs[i]) if err != nil { e.log.Warn("removing an object without full locking check", zap.Error(err), - zap.Stringer("addr", prm.addrs[i])) + zap.Stringer("addr", addrs[i])) } else if locked { var lockedErr apistatus.ObjectLocked - return InhumeRes{}, lockedErr + return lockedErr } } - if prm.tombstone != nil { - shPrm.InhumeByTomb(*prm.tombstone, prm.tombExpiration, prm.addrs[i]) + if tombstone != nil { + shPrm.InhumeByTomb(*tombstone, tombExpiration, addrs[i]) } else { - shPrm.MarkAsGarbage(prm.addrs[i]) + shPrm.MarkAsGarbage(addrs[i]) } - ok, err := e.inhumeAddr(prm.addrs[i], shPrm) + ok, err := e.inhumeAddr(addrs[i], shPrm) if err != nil { - return InhumeRes{}, err + return err } if !ok { - return InhumeRes{}, errInhumeFailure + return errInhumeFailure } } - return InhumeRes{}, nil + return nil } // InhumeContainer marks every object in a container as removed. @@ -185,10 +141,7 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm) (bool, linkAddr.SetContainer(addr.Container()) linkAddr.SetObject(linkID) - var getPrm GetPrm - getPrm.WithAddress(linkAddr) - - res, err := e.Get(getPrm) + linkObj, err := e.Get(linkAddr) if err != nil { e.log.Error("inhuming root object but no link object is found", zap.Error(err)) @@ -199,8 +152,6 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm) (bool, return true } - linkObj := res.Object() - // v2 split if linkObj.Type() == objectSDK.TypeLink { var link objectSDK.Link @@ -328,10 +279,7 @@ func (e *StorageEngine) isLocked(addr oid.Address) (bool, error) { } func (e *StorageEngine) processExpiredObjects(addrs []oid.Address) { - var prm InhumePrm - prm.MarkAsGarbage(addrs...) - - _, err := e.Inhume(prm) + err := e.inhume(addrs, false, nil, 0) if err != nil { e.log.Warn("handling expired objects", zap.Error(err)) } diff --git a/pkg/local_object_storage/engine/inhume_test.go b/pkg/local_object_storage/engine/inhume_test.go index ce3d985243..343ec44ddc 100644 --- a/pkg/local_object_storage/engine/inhume_test.go +++ b/pkg/local_object_storage/engine/inhume_test.go @@ -43,16 +43,13 @@ func TestStorageEngine_Inhume(t *testing.T) { e := testNewEngineWithShardNum(t, 1) defer e.Close() - err := Put(e, parent) + err := e.Put(parent, nil, 0) require.NoError(t, err) - var inhumePrm InhumePrm - inhumePrm.WithTombstone(tombstoneID, 0, object.AddressOf(parent)) - - _, err = e.Inhume(inhumePrm) + err = e.Inhume(tombstoneID, 0, object.AddressOf(parent)) require.NoError(t, err) - addrs, err := Select(e, cnr, fs) + addrs, err := e.Select(cnr, fs) require.NoError(t, err) require.Empty(t, addrs) }) @@ -74,20 +71,17 @@ func TestStorageEngine_Inhume(t *testing.T) { _, err = s2.Put(putLink) require.NoError(t, err) - var inhumePrm InhumePrm - inhumePrm.WithTombstone(tombstoneID, 0, object.AddressOf(parent)) - - _, err = e.Inhume(inhumePrm) + err = e.Inhume(tombstoneID, 0, object.AddressOf(parent)) require.NoError(t, err) t.Run("empty search should fail", func(t *testing.T) { - addrs, err := Select(e, cnr, objectSDK.SearchFilters{}) + addrs, err := e.Select(cnr, objectSDK.SearchFilters{}) require.NoError(t, err) require.Empty(t, addrs) }) t.Run("root search should fail", func(t *testing.T) { - addrs, err := Select(e, cnr, fs) + addrs, err := e.Select(cnr, fs) require.NoError(t, err) require.Empty(t, addrs) }) @@ -97,18 +91,18 @@ func TestStorageEngine_Inhume(t *testing.T) { addr.SetContainer(cnr) addr.SetObject(idChild) - _, err = Get(e, addr) + _, err = e.Get(addr) require.ErrorAs(t, err, new(apistatus.ObjectAlreadyRemoved)) linkID := link.GetID() addr.SetObject(linkID) - _, err = Get(e, addr) + _, err = e.Get(addr) require.ErrorAs(t, err, new(apistatus.ObjectAlreadyRemoved)) }) t.Run("parent get should claim deletion", func(t *testing.T) { - _, err = Get(e, object.AddressOf(parent)) + _, err = e.Get(object.AddressOf(parent)) require.ErrorAs(t, err, new(apistatus.ObjectAlreadyRemoved)) }) }) @@ -144,10 +138,7 @@ func TestStorageEngine_Inhume(t *testing.T) { _, err = wrongShard.Get(getPrm) require.NoError(t, err) - var inhumePrm InhumePrm - inhumePrm.MarkAsGarbage(addr) - - _, err = e.Inhume(inhumePrm) + err = e.Delete(addr) require.NoError(t, err) // object was on the wrong (according to hash sorting) shard but is removed anyway @@ -161,14 +152,11 @@ func TestStorageEngine_Inhume(t *testing.T) { e := testNewEngineWithShardNum(t, 3) defer e.Close() - var inhumePrm InhumePrm - inhumePrm.MarkAsGarbage(addr) - - _, err := e.Inhume(inhumePrm) + err := e.Delete(addr) require.NoError(t, err) // object is marked as garbage but marking it again should not be a problem - _, err = e.Inhume(inhumePrm) + err = e.Delete(addr) require.NoError(t, err) }) } diff --git a/pkg/local_object_storage/engine/list.go b/pkg/local_object_storage/engine/list.go index c97a4d435e..6cec638772 100644 --- a/pkg/local_object_storage/engine/list.go +++ b/pkg/local_object_storage/engine/list.go @@ -12,55 +12,23 @@ import ( // cursor. Use nil cursor object to start listing again. var ErrEndOfListing = shard.ErrEndOfListing -// Cursor is a type for continuous object listing. +// Cursor is a type for continuous object listing. It's returned from +// [StorageEngine.ListWithCursor] and can be reused as a parameter for it for +// subsequent requests. type Cursor struct { shardID string shardCursor *shard.Cursor } -// ListWithCursorPrm contains parameters for ListWithCursor operation. -type ListWithCursorPrm struct { - count uint32 - cursor *Cursor -} - -// WithCount sets the maximum amount of addresses that ListWithCursor should return. -func (p *ListWithCursorPrm) WithCount(count uint32) { - p.count = count -} - -// WithCursor sets a cursor for ListWithCursor operation. For initial request -// ignore this param or use nil value. For consecutive requests, use value -// from ListWithCursorRes. -func (p *ListWithCursorPrm) WithCursor(cursor *Cursor) { - p.cursor = cursor -} - -// ListWithCursorRes contains values returned from ListWithCursor operation. -type ListWithCursorRes struct { - addrList []objectcore.AddressWithType - cursor *Cursor -} - -// AddressList returns addresses selected by ListWithCursor operation. -func (l ListWithCursorRes) AddressList() []objectcore.AddressWithType { - return l.addrList -} - -// Cursor returns cursor for consecutive listing requests. -func (l ListWithCursorRes) Cursor() *Cursor { - return l.cursor -} - // ListWithCursor lists physical objects available in the engine starting // from the cursor. It includes regular, tombstone and storage group objects. // Does not include inhumed objects. Use cursor value from the response -// for consecutive requests. +// for consecutive requests (it's nil when iteration is over). // // Returns ErrEndOfListing if there are no more objects to return or count // parameter set to zero. -func (e *StorageEngine) ListWithCursor(prm ListWithCursorPrm) (ListWithCursorRes, error) { - result := make([]objectcore.AddressWithType, 0, prm.count) +func (e *StorageEngine) ListWithCursor(count uint32, cursor *Cursor) ([]objectcore.AddressWithType, *Cursor, error) { + result := make([]objectcore.AddressWithType, 0, count) // 1. Get available shards and sort them. e.mtx.RLock() @@ -71,20 +39,19 @@ func (e *StorageEngine) ListWithCursor(prm ListWithCursorPrm) (ListWithCursorRes e.mtx.RUnlock() if len(shardIDs) == 0 { - return ListWithCursorRes{}, ErrEndOfListing + return nil, nil, ErrEndOfListing } slices.Sort(shardIDs) // 2. Prepare cursor object. - cursor := prm.cursor if cursor == nil { cursor = &Cursor{shardID: shardIDs[0]} } // 3. Iterate over available shards. Skip unavailable shards. for i := range shardIDs { - if len(result) >= int(prm.count) { + if len(result) >= int(count) { break } @@ -99,7 +66,7 @@ func (e *StorageEngine) ListWithCursor(prm ListWithCursorPrm) (ListWithCursorRes continue } - count := uint32(int(prm.count) - len(result)) + count := uint32(int(count) - len(result)) var shardPrm shard.ListWithCursorPrm shardPrm.WithCount(count) if shardIDs[i] == cursor.shardID { @@ -117,11 +84,8 @@ func (e *StorageEngine) ListWithCursor(prm ListWithCursorPrm) (ListWithCursorRes } if len(result) == 0 { - return ListWithCursorRes{}, ErrEndOfListing + return nil, nil, ErrEndOfListing } - return ListWithCursorRes{ - addrList: result, - cursor: cursor, - }, nil + return result, cursor, nil } diff --git a/pkg/local_object_storage/engine/list_test.go b/pkg/local_object_storage/engine/list_test.go index 9cb27dfdfe..257faa91db 100644 --- a/pkg/local_object_storage/engine/list_test.go +++ b/pkg/local_object_storage/engine/list_test.go @@ -31,37 +31,27 @@ func TestListWithCursor(t *testing.T) { containerID := cidtest.ID() obj := generateObjectWithCID(containerID) - var prm PutPrm - prm.WithObject(obj) - - _, err := e.Put(prm) + err := e.Put(obj, nil, 0) require.NoError(t, err) expected = append(expected, object.AddressWithType{Type: objectSDK.TypeRegular, Address: object.AddressOf(obj)}) } expected = sortAddresses(expected) - var prm ListWithCursorPrm - prm.WithCount(1) - - res, err := e.ListWithCursor(prm) + addrs, cursor, err := e.ListWithCursor(1, nil) require.NoError(t, err) - require.NotEmpty(t, res.AddressList()) - got = append(got, res.AddressList()...) + require.NotEmpty(t, addrs) + got = append(got, addrs...) for range total - 1 { - prm.WithCursor(res.Cursor()) - - res, err = e.ListWithCursor(prm) + addrs, cursor, err = e.ListWithCursor(1, cursor) if errors.Is(err, ErrEndOfListing) { break } - got = append(got, res.AddressList()...) + got = append(got, addrs...) } - prm.WithCursor(res.Cursor()) - - _, err = e.ListWithCursor(prm) + _, _, err = e.ListWithCursor(1, cursor) require.ErrorIs(t, err, ErrEndOfListing) got = sortAddresses(got) diff --git a/pkg/local_object_storage/engine/lock_test.go b/pkg/local_object_storage/engine/lock_test.go index baeb1039cb..7967ebc056 100644 --- a/pkg/local_object_storage/engine/lock_test.go +++ b/pkg/local_object_storage/engine/lock_test.go @@ -81,7 +81,7 @@ func TestLockUserScenario(t *testing.T) { id := obj.GetID() objAddr.SetObject(id) - err = Put(e, obj) + err = e.Put(obj, nil, 0) require.NoError(t, err) // 2. @@ -89,17 +89,14 @@ func TestLockUserScenario(t *testing.T) { locker.WriteMembers([]oid.ID{id}) lockerObj.WriteLock(locker) - err = Put(e, lockerObj) + err = e.Put(lockerObj, nil, 0) require.NoError(t, err) err = e.Lock(cnr, lockerID, []oid.ID{id}) require.NoError(t, err) // 3. - var inhumePrm InhumePrm - inhumePrm.WithTombstone(tombAddr, 0, objAddr) - - _, err = e.Inhume(inhumePrm) + err = e.Inhume(tombAddr, 0, objAddr) require.ErrorAs(t, err, new(apistatus.ObjectLocked)) // 4. @@ -107,12 +104,10 @@ func TestLockUserScenario(t *testing.T) { tombObj.SetID(tombForLockID) tombObj.SetAttributes(a) - err = Put(e, tombObj) + err = e.Put(tombObj, nil, 0) require.NoError(t, err) - inhumePrm.WithTombstone(tombForLockAddr, 0, lockerAddr) - - _, err = e.Inhume(inhumePrm) + err = e.Inhume(tombForLockAddr, 0, lockerAddr) require.ErrorIs(t, err, meta.ErrLockObjectRemoval) // 5. @@ -121,9 +116,7 @@ func TestLockUserScenario(t *testing.T) { // delay for GC time.Sleep(time.Second) - inhumePrm.WithTombstone(tombAddr, 0, objAddr) - - _, err = e.Inhume(inhumePrm) + err = e.Inhume(tombAddr, 0, objAddr) require.NoError(t, err) } @@ -156,7 +149,7 @@ func TestLockExpiration(t *testing.T) { // 1. obj := generateObjectWithCID(cnr) - err = Put(e, obj) + err = e.Put(obj, nil, 0) require.NoError(t, err) // 2. @@ -168,7 +161,7 @@ func TestLockExpiration(t *testing.T) { lock.SetType(object.TypeLock) lock.SetAttributes(a) - err = Put(e, lock) + err = e.Put(lock, nil, 0) require.NoError(t, err) id := obj.GetID() @@ -177,10 +170,7 @@ func TestLockExpiration(t *testing.T) { err = e.Lock(cnr, idLock, []oid.ID{id}) require.NoError(t, err) - var inhumePrm InhumePrm - inhumePrm.WithTombstone(oidtest.Address(), 0, objectcore.AddressOf(obj)) - - _, err = e.Inhume(inhumePrm) + err = e.Inhume(oidtest.Address(), 0, objectcore.AddressOf(obj)) require.ErrorAs(t, err, new(apistatus.ObjectLocked)) // 3. @@ -191,9 +181,7 @@ func TestLockExpiration(t *testing.T) { time.Sleep(time.Second) // 4. - inhumePrm.WithTombstone(oidtest.Address(), 0, objectcore.AddressOf(obj)) - - _, err = e.Inhume(inhumePrm) + err = e.Inhume(oidtest.Address(), 0, objectcore.AddressOf(obj)) require.NoError(t, err) } @@ -227,14 +215,14 @@ func TestLockForceRemoval(t *testing.T) { // 1. obj := generateObjectWithCID(cnr) - err = Put(e, obj) + err = e.Put(obj, nil, 0) require.NoError(t, err) // 2. lock := generateObjectWithCID(cnr) lock.SetType(object.TypeLock) - err = Put(e, lock) + err = e.Put(lock, nil, 0) require.NoError(t, err) id := obj.GetID() @@ -244,28 +232,17 @@ func TestLockForceRemoval(t *testing.T) { require.NoError(t, err) // 3. - var inhumePrm InhumePrm - inhumePrm.MarkAsGarbage(objectcore.AddressOf(obj)) - - _, err = e.Inhume(inhumePrm) + err = e.deleteObj(objectcore.AddressOf(obj), false) require.ErrorAs(t, err, new(apistatus.ObjectLocked)) - inhumePrm.WithTombstone(oidtest.Address(), 0, objectcore.AddressOf(obj)) - - _, err = e.Inhume(inhumePrm) + err = e.Inhume(oidtest.Address(), 0, objectcore.AddressOf(obj)) require.ErrorAs(t, err, new(apistatus.ObjectLocked)) // 4. - var deletePrm DeletePrm - deletePrm.WithAddress(objectcore.AddressOf(lock)) - deletePrm.WithForceRemoval() - - _, err = e.Delete(deletePrm) + err = e.Delete(objectcore.AddressOf(lock)) require.NoError(t, err) // 5. - inhumePrm.MarkAsGarbage(objectcore.AddressOf(obj)) - - _, err = e.Inhume(inhumePrm) + err = e.deleteObj(objectcore.AddressOf(obj), false) require.NoError(t, err) } diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index e85a3c9a7d..da2a3922cd 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -13,38 +13,11 @@ import ( "go.uber.org/zap" ) -// PutPrm groups the parameters of Put operation. -type PutPrm struct { - obj *objectSDK.Object - - binSet bool - objBin []byte - hdrLen int -} - -// PutRes groups the resulting values of Put operation. -type PutRes struct{} - var errPutShard = errors.New("could not put object to any shard") -// WithObject is a Put option to set object to save. -// -// Option is required. -func (p *PutPrm) WithObject(obj *objectSDK.Object) { - p.obj = obj -} - -// SetObjectBinary allows to provide the already encoded object in -// [StorageEngine] format. Object header must be a prefix with specified length. -// If provided, the encoding step is skipped. It's the caller's responsibility -// to ensure that the data matches the object structure being processed. -func (p *PutPrm) SetObjectBinary(objBin []byte, hdrLen int) { - p.binSet = true - p.objBin = objBin - p.hdrLen = hdrLen -} - -// Put saves the object to local storage. +// Put saves an object to local storage. objBin and hdrLen parameters are +// optional and used to optimize out object marshaling, when used both must +// be valid. // // Returns any error encountered that // did not allow to completely save the object. @@ -52,27 +25,24 @@ func (p *PutPrm) SetObjectBinary(objBin []byte, hdrLen int) { // Returns an error if executions are blocked (see BlockExecution). // // Returns an error of type apistatus.ObjectAlreadyRemoved if the object has been marked as removed. -func (e *StorageEngine) Put(prm PutPrm) (res PutRes, err error) { - err = e.execIfNotBlocked(func() error { - res, err = e.put(prm) - return err - }) - - return -} - -func (e *StorageEngine) put(prm PutPrm) (PutRes, error) { +func (e *StorageEngine) Put(obj *objectSDK.Object, objBin []byte, hdrLen int) error { if e.metrics != nil { defer elapsed(e.metrics.AddPutDuration)() } - addr := object.AddressOf(prm.obj) + return e.execIfNotBlocked(func() error { + return e.put(obj, objBin, hdrLen) + }) +} + +func (e *StorageEngine) put(obj *objectSDK.Object, objBin []byte, hdrLen int) error { + addr := object.AddressOf(obj) // In #1146 this check was parallelized, however, it became // much slower on fast machines for 4 shards. _, err := e.exists(addr) if err != nil { - return PutRes{}, err + return err } finished := false @@ -86,7 +56,7 @@ func (e *StorageEngine) put(prm PutPrm) (PutRes, error) { return false } - putDone, exists := e.putToShard(sh, ind, pool, addr, prm) + putDone, exists := e.putToShard(sh, ind, pool, addr, obj, objBin, hdrLen) finished = putDone || exists return finished }) @@ -95,13 +65,13 @@ func (e *StorageEngine) put(prm PutPrm) (PutRes, error) { err = errPutShard } - return PutRes{}, err + return err } // putToShard puts object to sh. // First return value is true iff put has been successfully done. // Second return value is true iff object already exists. -func (e *StorageEngine) putToShard(sh hashedShard, ind int, pool util.WorkerPool, addr oid.Address, prm PutPrm) (bool, bool) { +func (e *StorageEngine) putToShard(sh hashedShard, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object, objBin []byte, hdrLen int) (bool, bool) { var putSuccess, alreadyExists bool exitCh := make(chan struct{}) @@ -151,9 +121,9 @@ func (e *StorageEngine) putToShard(sh hashedShard, ind int, pool util.WorkerPool } var putPrm shard.PutPrm - putPrm.SetObject(prm.obj) - if prm.binSet { - putPrm.SetObjectBinary(prm.objBin, prm.hdrLen) + putPrm.SetObject(obj) + if objBin != nil { + putPrm.SetObjectBinary(objBin, hdrLen) } _, err = sh.Put(putPrm) @@ -180,13 +150,3 @@ func (e *StorageEngine) putToShard(sh hashedShard, ind int, pool util.WorkerPool return putSuccess, alreadyExists } - -// Put writes provided object to local storage. -func Put(storage *StorageEngine, obj *objectSDK.Object) error { - var putPrm PutPrm - putPrm.WithObject(obj) - - _, err := storage.Put(putPrm) - - return err -} diff --git a/pkg/local_object_storage/engine/put_test.go b/pkg/local_object_storage/engine/put_test.go index 4fead6c3df..cbab9fbfa2 100644 --- a/pkg/local_object_storage/engine/put_test.go +++ b/pkg/local_object_storage/engine/put_test.go @@ -30,17 +30,12 @@ func TestStorageEngine_PutBinary(t *testing.T) { e, _, _ := newEngine(t, t.TempDir()) - var putPrm PutPrm - putPrm.WithObject(&obj) - putPrm.SetObjectBinary(objBin, hdrLen) - _, err := e.Put(putPrm) + err := e.Put(&obj, objBin, hdrLen) require.NoError(t, err) - var getPrm GetPrm - getPrm.WithAddress(addr) - res, err := e.Get(getPrm) + gotObj, err := e.Get(addr) require.NoError(t, err) - require.Equal(t, &obj, res.Object()) + require.Equal(t, &obj, gotObj) b, err := e.GetBytes(addr) require.NoError(t, err) @@ -49,17 +44,14 @@ func TestStorageEngine_PutBinary(t *testing.T) { // now place some garbage addr.SetObject(oidtest.ID()) obj.SetID(addr.Object()) // to avoid 'already exists' outcome - putPrm.WithObject(&obj) invalidObjBin := []byte("definitely not an object") - putPrm.SetObjectBinary(invalidObjBin, 5) - _, err = e.Put(putPrm) + err = e.Put(&obj, invalidObjBin, 5) require.NoError(t, err) b, err = e.GetBytes(addr) require.NoError(t, err) require.Equal(t, invalidObjBin, b) - getPrm.WithAddress(addr) - _, err = e.Get(getPrm) + _, err = e.Get(addr) require.Error(t, err) } diff --git a/pkg/local_object_storage/engine/range.go b/pkg/local_object_storage/engine/range.go index 907f05450c..abd4b479b5 100644 --- a/pkg/local_object_storage/engine/range.go +++ b/pkg/local_object_storage/engine/range.go @@ -12,33 +12,11 @@ import ( "go.uber.org/zap" ) -// RngPrm groups the parameters of GetRange operation. -type RngPrm struct { - off, ln uint64 - - addr oid.Address -} - // RngRes groups the resulting values of GetRange operation. type RngRes struct { obj *objectSDK.Object } -// WithAddress is a GetRng option to set the address of the requested object. -// -// Option is required. -func (p *RngPrm) WithAddress(addr oid.Address) { - p.addr = addr -} - -// WithPayloadRange is a GetRange option to set range of requested payload data. -// -// Missing an option or calling with zero length is equivalent -// to getting the full payload range. -func (p *RngPrm) WithPayloadRange(rng *objectSDK.Range) { - p.off, p.ln = rng.GetOffset(), rng.GetLength() -} - // Object returns the requested object part. // // Instance payload contains the requested range of the original object. @@ -46,7 +24,8 @@ func (r RngRes) Object() *objectSDK.Object { return r.obj } -// GetRange reads part of an object from local storage. +// GetRange reads a part of an object from local storage. Zero length is +// interpreted as requiring full object length independent of the offset. // // Returns any error encountered that // did not allow to completely read the object part. @@ -56,22 +35,25 @@ func (r RngRes) Object() *objectSDK.Object { // Returns ErrRangeOutOfBounds if the requested object range is out of bounds. // // Returns an error if executions are blocked (see BlockExecution). -func (e *StorageEngine) GetRange(prm RngPrm) (res RngRes, err error) { +func (e *StorageEngine) GetRange(addr oid.Address, offset uint64, length uint64) ([]byte, error) { + var ( + err error + res []byte + ) + if e.metrics != nil { + defer elapsed(e.metrics.AddRangeDuration)() + } err = e.execIfNotBlocked(func() error { - res, err = e.getRange(prm) + res, err = e.getRange(addr, offset, length) return err }) - return + return res, err } -func (e *StorageEngine) getRange(prm RngPrm) (RngRes, error) { - if e.metrics != nil { - defer elapsed(e.metrics.AddRangeDuration)() - } - +func (e *StorageEngine) getRange(addr oid.Address, offset uint64, length uint64) ([]byte, error) { var ( - obj *objectSDK.Object + out []byte siErr *objectSDK.SplitInfoError errNotFound apistatus.ObjectNotFound @@ -86,10 +68,10 @@ func (e *StorageEngine) getRange(prm RngPrm) (RngRes, error) { var hasDegraded bool var shPrm shard.RngPrm - shPrm.SetAddress(prm.addr) - shPrm.SetRange(prm.off, prm.ln) + shPrm.SetAddress(addr) + shPrm.SetRange(offset, length) - e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) { + e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { noMeta := sh.GetMode().NoMetabase() hasDegraded = hasDegraded || noMeta shPrm.SetIgnoreMeta(noMeta) @@ -124,20 +106,20 @@ func (e *StorageEngine) getRange(prm RngPrm) (RngRes, error) { } } - obj = res.Object() + out = res.Object().Payload() return true }) if outSI != nil { - return RngRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI)) + return nil, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI)) } - if obj == nil { + if out == nil { // If any shard is in a degraded mode, we should assume that metabase could store // info about some object. if shardWithMeta.Shard == nil && !hasDegraded || !shard.IsErrNotFound(outError) { - return RngRes{}, outError + return nil, outError } // If the object is not found but is present in metabase, @@ -145,7 +127,7 @@ func (e *StorageEngine) getRange(prm RngPrm) (RngRes, error) { // blobstor, increase the error counter for the shard which contains the meta. shPrm.SetIgnoreMeta(true) - e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) { + e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { if sh.GetMode().NoMetabase() { // Already processed it without a metabase. return false @@ -158,34 +140,18 @@ func (e *StorageEngine) getRange(prm RngPrm) (RngRes, error) { outError = errOutOfRange return true } - obj = res.Object() + out = res.Object().Payload() return err == nil }) - if obj == nil { - return RngRes{}, outError + if out == nil { + return nil, outError } if shardWithMeta.Shard != nil { e.reportShardError(shardWithMeta, "meta info was present, but object is missing", metaError, - zap.Stringer("address", prm.addr)) + zap.Stringer("address", addr)) } } - return RngRes{ - obj: obj, - }, nil -} - -// GetRange reads object payload range from local storage by provided address. -func GetRange(storage *StorageEngine, addr oid.Address, rng *objectSDK.Range) ([]byte, error) { - var rangePrm RngPrm - rangePrm.WithAddress(addr) - rangePrm.WithPayloadRange(rng) - - res, err := storage.GetRange(rangePrm) - if err != nil { - return nil, err - } - - return res.Object().Payload(), nil + return out, nil } diff --git a/pkg/local_object_storage/engine/select.go b/pkg/local_object_storage/engine/select.go index dc8b2b9ef4..ef124459c3 100644 --- a/pkg/local_object_storage/engine/select.go +++ b/pkg/local_object_storage/engine/select.go @@ -10,59 +10,38 @@ import ( oid "github.com/nspcc-dev/neofs-sdk-go/object/id" ) -// SelectPrm groups the parameters of Select operation. -type SelectPrm struct { - cnr cid.ID - filters object.SearchFilters -} - -// SelectRes groups the resulting values of Select operation. -type SelectRes struct { - addrList []oid.Address -} - -// WithContainerID is a Select option to set the container id to search in. -func (p *SelectPrm) WithContainerID(cnr cid.ID) { - p.cnr = cnr -} - -// WithFilters is a Select option to set the object filters. -func (p *SelectPrm) WithFilters(fs object.SearchFilters) { - p.filters = fs -} - -// AddressList returns list of addresses of the selected objects. -func (r SelectRes) AddressList() []oid.Address { - return r.addrList -} - // Select selects the objects from local storage that match select parameters. // // Returns any error encountered that did not allow to completely select the objects. // // Returns an error if executions are blocked (see BlockExecution). -func (e *StorageEngine) Select(prm SelectPrm) (res SelectRes, err error) { +func (e *StorageEngine) Select(cnr cid.ID, filters object.SearchFilters) ([]oid.Address, error) { + var ( + err error + res []oid.Address + ) + + if e.metrics != nil { + defer elapsed(e.metrics.AddSearchDuration)() + } + err = e.execIfNotBlocked(func() error { - res, err = e._select(prm) + res, err = e._select(cnr, filters) return err }) - return + return res, err } -func (e *StorageEngine) _select(prm SelectPrm) (SelectRes, error) { - if e.metrics != nil { - defer elapsed(e.metrics.AddSearchDuration)() - } - +func (e *StorageEngine) _select(cnr cid.ID, filters object.SearchFilters) ([]oid.Address, error) { addrList := make([]oid.Address, 0) uniqueMap := make(map[string]struct{}) var outError error var shPrm shard.SelectPrm - shPrm.SetContainerID(prm.cnr) - shPrm.SetFilters(prm.filters) + shPrm.SetContainerID(cnr) + shPrm.SetFilters(filters) e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { res, err := sh.Select(shPrm) @@ -85,29 +64,32 @@ func (e *StorageEngine) _select(prm SelectPrm) (SelectRes, error) { return false }) - return SelectRes{ - addrList: addrList, - }, outError + return addrList, outError } // List returns `limit` available physically storage object addresses in engine. // If limit is zero, then returns all available object addresses. // // Returns an error if executions are blocked (see BlockExecution). -func (e *StorageEngine) List(limit uint64) (res SelectRes, err error) { +func (e *StorageEngine) List(limit uint64) ([]oid.Address, error) { + var ( + err error + res []oid.Address + ) + + if e.metrics != nil { + defer elapsed(e.metrics.AddListObjectsDuration)() + } + err = e.execIfNotBlocked(func() error { res, err = e.list(limit) return err }) - return + return res, err } -func (e *StorageEngine) list(limit uint64) (SelectRes, error) { - if e.metrics != nil { - defer elapsed(e.metrics.AddListObjectsDuration)() - } - +func (e *StorageEngine) list(limit uint64) ([]oid.Address, error) { addrList := make([]oid.Address, 0, limit) uniqueMap := make(map[string]struct{}) ln := uint64(0) @@ -134,32 +116,5 @@ func (e *StorageEngine) list(limit uint64) (SelectRes, error) { return false }) - return SelectRes{ - addrList: addrList, - }, nil -} - -// Select selects objects from local storage using provided filters. -func Select(storage *StorageEngine, cnr cid.ID, fs object.SearchFilters) ([]oid.Address, error) { - var selectPrm SelectPrm - selectPrm.WithContainerID(cnr) - selectPrm.WithFilters(fs) - - res, err := storage.Select(selectPrm) - if err != nil { - return nil, err - } - - return res.AddressList(), nil -} - -// List returns `limit` available physically storage object addresses in -// engine. If limit is zero, then returns all available object addresses. -func List(storage *StorageEngine, limit uint64) ([]oid.Address, error) { - res, err := storage.List(limit) - if err != nil { - return nil, err - } - - return res.AddressList(), nil + return addrList, nil } diff --git a/pkg/local_object_storage/engine/tree_test.go b/pkg/local_object_storage/engine/tree_test.go index 2a340d10ac..5ed754e702 100644 --- a/pkg/local_object_storage/engine/tree_test.go +++ b/pkg/local_object_storage/engine/tree_test.go @@ -30,7 +30,7 @@ func benchmarkTreeVsSearch(b *testing.B, objCount int) { for i := range objCount { obj := generateObjectWithCID(cid) addAttribute(obj, pilorama.AttributeFilename, strconv.Itoa(i)) - err := Put(e, obj) + err := e.Put(obj, nil, 0) if err != nil { b.Fatal(err) } @@ -42,19 +42,15 @@ func benchmarkTreeVsSearch(b *testing.B, objCount int) { } b.Run("search", func(b *testing.B) { - var prm SelectPrm - prm.WithContainerID(cid) - var fs object.SearchFilters fs.AddFilter(pilorama.AttributeFilename, strconv.Itoa(objCount/2), object.MatchStringEqual) - prm.WithFilters(fs) for range b.N { - res, err := e.Select(prm) + res, err := e.Select(cid, fs) if err != nil { b.Fatal(err) } - if count := len(res.addrList); count != 1 { + if count := len(res); count != 1 { b.Fatalf("expected 1 object, got %d", count) } } diff --git a/pkg/local_object_storage/engine/writecache.go b/pkg/local_object_storage/engine/writecache.go index 84359cdae2..d7dd469b38 100644 --- a/pkg/local_object_storage/engine/writecache.go +++ b/pkg/local_object_storage/engine/writecache.go @@ -4,39 +4,15 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" ) -// FlushWriteCachePrm groups the parameters of FlushWriteCache operation. -type FlushWriteCachePrm struct { - shardID *shard.ID - ignoreErrors bool -} - -// SetShardID is an option to set shard ID. -// -// Option is required. -func (p *FlushWriteCachePrm) SetShardID(id *shard.ID) { - p.shardID = id -} - -// SetIgnoreErrors sets errors ignore flag.. -func (p *FlushWriteCachePrm) SetIgnoreErrors(ignore bool) { - p.ignoreErrors = ignore -} - -// FlushWriteCacheRes groups the resulting values of FlushWriteCache operation. -type FlushWriteCacheRes struct{} - -// FlushWriteCache flushes write-cache on a single shard. -func (e *StorageEngine) FlushWriteCache(p FlushWriteCachePrm) (FlushWriteCacheRes, error) { +// FlushWriteCache flushes write-cache on a single shard with the given ID. +func (e *StorageEngine) FlushWriteCache(id *shard.ID) error { e.mtx.RLock() - sh, ok := e.shards[p.shardID.String()] + sh, ok := e.shards[id.String()] e.mtx.RUnlock() if !ok { - return FlushWriteCacheRes{}, errShardNotFound + return errShardNotFound } - var prm shard.FlushWriteCachePrm - prm.SetIgnoreErrors(p.ignoreErrors) - - return FlushWriteCacheRes{}, sh.FlushWriteCache(prm) + return sh.FlushWriteCache(shard.FlushWriteCachePrm{}) } diff --git a/pkg/services/control/server/evacuate.go b/pkg/services/control/server/evacuate.go index 56794a3d44..2e6d137b63 100644 --- a/pkg/services/control/server/evacuate.go +++ b/pkg/services/control/server/evacuate.go @@ -8,7 +8,6 @@ import ( "slices" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" "github.com/nspcc-dev/neofs-node/pkg/services/control" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" "github.com/nspcc-dev/neofs-node/pkg/services/replicator" @@ -31,19 +30,14 @@ func (s *Server) EvacuateShard(_ context.Context, req *control.EvacuateShardRequ return nil, err } - var prm engine.EvacuateShardPrm - prm.WithShardIDList(s.getShardIDList(req.GetBody().GetShard_ID())) - prm.WithIgnoreErrors(req.GetBody().GetIgnoreErrors()) - prm.WithFaultHandler(s.replicate) - - res, err := s.storage.Evacuate(prm) + count, err := s.storage.Evacuate(s.getShardIDList(req.GetBody().GetShard_ID()), req.GetBody().GetIgnoreErrors(), s.replicate) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } resp := &control.EvacuateShardResponse{ Body: &control.EvacuateShardResponse_Body{ - Count: uint32(res.Count()), + Count: uint32(count), }, } diff --git a/pkg/services/control/server/flush_cache.go b/pkg/services/control/server/flush_cache.go index 0835b4b2d2..a6305deefc 100644 --- a/pkg/services/control/server/flush_cache.go +++ b/pkg/services/control/server/flush_cache.go @@ -3,7 +3,6 @@ package control import ( "context" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" "github.com/nspcc-dev/neofs-node/pkg/services/control" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -22,10 +21,7 @@ func (s *Server) FlushCache(_ context.Context, req *control.FlushCacheRequest) ( } for _, shardID := range s.getShardIDList(req.GetBody().GetShard_ID()) { - var prm engine.FlushWriteCachePrm - prm.SetShardID(shardID) - - _, err = s.storage.FlushWriteCache(prm) + err = s.storage.FlushWriteCache(shardID) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } diff --git a/pkg/services/control/server/gc.go b/pkg/services/control/server/gc.go index 6dc55ed893..576faa9162 100644 --- a/pkg/services/control/server/gc.go +++ b/pkg/services/control/server/gc.go @@ -4,7 +4,6 @@ import ( "context" "fmt" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" "github.com/nspcc-dev/neofs-node/pkg/services/control" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "google.golang.org/grpc/codes" @@ -43,11 +42,7 @@ func (s *Server) DropObjects(_ context.Context, req *control.DropObjectsRequest) var firstErr error for i := range addrList { - var prm engine.DeletePrm - prm.WithForceRemoval() - prm.WithAddress(addrList[i]) - - _, err := s.storage.Delete(prm) + err := s.storage.Delete(addrList[i]) if err != nil && firstErr == nil { firstErr = err } diff --git a/pkg/services/control/server/list_objects.go b/pkg/services/control/server/list_objects.go index ea70cb5b35..865a04c10c 100644 --- a/pkg/services/control/server/list_objects.go +++ b/pkg/services/control/server/list_objects.go @@ -3,6 +3,7 @@ package control import ( "errors" + objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" "github.com/nspcc-dev/neofs-node/pkg/services/control" "google.golang.org/grpc/codes" @@ -21,11 +22,14 @@ func (s *Server) ListObjects(req *control.ListObjectsRequest, stream control.Con return status.Error(codes.Internal, err.Error()) } - var prm engine.ListWithCursorPrm + var ( + cursor *engine.Cursor + addresses []objectcore.AddressWithType + ) // (Limit 4MB - 64KB for service bytes and future fields) / 89B address length = 46390 addresses can be sent - prm.WithCount(46390) + const count = 46390 for { - res, err := s.storage.ListWithCursor(prm) + addresses, cursor, err = s.storage.ListWithCursor(count, cursor) if err != nil { if errors.Is(err, engine.ErrEndOfListing) { return nil @@ -34,7 +38,6 @@ func (s *Server) ListObjects(req *control.ListObjectsRequest, stream control.Con return status.Error(codes.Internal, err.Error()) } - addresses := res.AddressList() objectsAddresses := make([][]byte, 0, len(addresses)) for _, objectId := range addresses { objectsAddresses = append(objectsAddresses, []byte(objectId.Address.EncodeToString())) @@ -54,7 +57,5 @@ func (s *Server) ListObjects(req *control.ListObjectsRequest, stream control.Con if err = stream.Send(resp); err != nil { return status.Error(codes.Internal, err.Error()) } - - prm.WithCursor(res.Cursor()) } } diff --git a/pkg/services/object/acl/eacl/v2/localstore.go b/pkg/services/object/acl/eacl/v2/localstore.go index 74192c588e..a3652367a5 100644 --- a/pkg/services/object/acl/eacl/v2/localstore.go +++ b/pkg/services/object/acl/eacl/v2/localstore.go @@ -17,5 +17,5 @@ func (s *localStorage) Head(addr oid.Address) (*objectSDK.Object, error) { return nil, io.ErrUnexpectedEOF } - return engine.Head(s.ls, addr) + return s.ls.Head(addr, false) } diff --git a/pkg/services/object/get/util.go b/pkg/services/object/get/util.go index ee7c012e18..34112cb471 100644 --- a/pkg/services/object/get/util.go +++ b/pkg/services/object/get/util.go @@ -195,40 +195,27 @@ func (c *clientWrapper) get(exec *execCtx, key *ecdsa.PrivateKey) (*object.Objec func (e *storageEngineWrapper) get(exec *execCtx) (*object.Object, error) { if exec.headOnly() { - var headPrm engine.HeadPrm - headPrm.WithAddress(exec.address()) - headPrm.WithRaw(exec.isRaw()) - - r, err := e.engine.Head(headPrm) + r, err := e.engine.Head(exec.address(), exec.isRaw()) if err != nil { return nil, err } - return r.Header(), nil + return r, nil } if rng := exec.ctxRange(); rng != nil { - var getRange engine.RngPrm - getRange.WithAddress(exec.address()) - getRange.WithPayloadRange(rng) - - r, err := e.engine.GetRange(getRange) + r, err := e.engine.GetRange(exec.address(), rng.GetOffset(), rng.GetLength()) if err != nil { return nil, err } - return r.Object(), nil - } - - var getPrm engine.GetPrm - getPrm.WithAddress(exec.address()) + o := object.New() + o.SetPayload(r) - r, err := e.engine.Get(getPrm) - if err != nil { - return nil, err + return o, nil } - return r.Object(), nil + return e.engine.Get(exec.address()) } func (w *partWriter) WriteChunk(p []byte) error { diff --git a/pkg/services/object/search/util.go b/pkg/services/object/search/util.go index b83526a846..907be2631c 100644 --- a/pkg/services/object/search/util.go +++ b/pkg/services/object/search/util.go @@ -109,16 +109,12 @@ func (c *clientWrapper) searchObjects(ctx context.Context, exec *execCtx, info c } func (e *storageEngineWrapper) search(exec *execCtx) ([]oid.ID, error) { - var selectPrm engine.SelectPrm - selectPrm.WithFilters(exec.searchFilters()) - selectPrm.WithContainerID(exec.containerID()) - - r, err := e.storage.Select(selectPrm) + r, err := e.storage.Select(exec.containerID(), exec.searchFilters()) if err != nil { return nil, err } - return idsFromAddresses(r.AddressList()), nil + return idsFromAddresses(r), nil } func idsFromAddresses(addrs []oid.Address) []oid.ID { diff --git a/pkg/services/policer/check.go b/pkg/services/policer/check.go index e2eddc1845..6f9cde2e0e 100644 --- a/pkg/services/policer/check.go +++ b/pkg/services/policer/check.go @@ -6,7 +6,6 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/core/container" objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head" "github.com/nspcc-dev/neofs-node/pkg/services/replicator" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" @@ -88,11 +87,7 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add zap.String("error", err.Error()), ) if container.IsErrNotFound(err) { - var prm engine.InhumePrm - prm.MarkAsGarbage(addrWithType.Address) - prm.WithForceRemoval() - - _, err := p.jobQueue.localStorage.Inhume(prm) + err = p.jobQueue.localStorage.Delete(addrWithType.Address) if err != nil { p.log.Error("could not inhume object with missing container", zap.Stringer("cid", idCnr), diff --git a/pkg/services/policer/queue.go b/pkg/services/policer/queue.go index 4b2cc41706..2953e8c08f 100644 --- a/pkg/services/policer/queue.go +++ b/pkg/services/policer/queue.go @@ -12,14 +12,10 @@ type jobQueue struct { } func (q *jobQueue) Select(cursor *engine.Cursor, count uint32) ([]objectcore.AddressWithType, *engine.Cursor, error) { - var prm engine.ListWithCursorPrm - prm.WithCursor(cursor) - prm.WithCount(count) - - res, err := q.localStorage.ListWithCursor(prm) + res, cursor, err := q.localStorage.ListWithCursor(count, cursor) if err != nil { return nil, nil, fmt.Errorf("cannot list objects in engine: %w", err) } - return res.AddressList(), res.Cursor(), nil + return res, cursor, nil }