diff --git a/pkg/relayer/interface.go b/pkg/relayer/interface.go index 5d3f99c2c..51735a548 100644 --- a/pkg/relayer/interface.go +++ b/pkg/relayer/interface.go @@ -72,6 +72,8 @@ type RelayerSessionsManager interface { EnsureSessionTree(sessionHeader *sessiontypes.SessionHeader) (SessionTree, error) } +type RelayerSessionsManagerOption func(RelayerSessionsManager) + // SessionTree is an interface that wraps an SMST (Sparse Merkle State Tree) and its corresponding session. type SessionTree interface { // GetSessionHeader returns the header of the session corresponding to the SMST. diff --git a/pkg/relayer/session/errors.go b/pkg/relayer/session/errors.go index adf5a403b..520c85bc1 100644 --- a/pkg/relayer/session/errors.go +++ b/pkg/relayer/session/errors.go @@ -3,9 +3,10 @@ package session import sdkerrors "cosmossdk.io/errors" var ( - codespace = "relayer/session" - ErrSessionTreeClosed = sdkerrors.Register(codespace, 1, "session tree already closed") - ErrSessionTreeNotClosed = sdkerrors.Register(codespace, 2, "session tree not closed") - ErrSessionStorePathExists = sdkerrors.Register(codespace, 3, "session store path already exists") - ErrSessionTreeProofPathMismatch = sdkerrors.Register(codespace, 4, "session tree proof path mismatch") + codespace = "relayer_session" + ErrSessionTreeClosed = sdkerrors.Register(codespace, 1, "session tree already closed") + ErrSessionTreeNotClosed = sdkerrors.Register(codespace, 2, "session tree not closed") + ErrSessionTreeStorePathExists = sdkerrors.Register(codespace, 3, "session tree store path already exists") + ErrSessionTreeProofPathMismatch = sdkerrors.Register(codespace, 4, "session tree proof path mismatch") + ErrSessionTreeUndefinedStoresDirectory = sdkerrors.Register(codespace, 5, "session tree key-value store directory undefined for where they will be saved on disk") ) diff --git a/pkg/relayer/session/options.go b/pkg/relayer/session/options.go new file mode 100644 index 000000000..1edadc132 --- /dev/null +++ b/pkg/relayer/session/options.go @@ -0,0 +1,13 @@ +package session + +import ( + "github.com/pokt-network/poktroll/pkg/relayer" +) + +// WithStoresDirectory sets the path on disk where KVStore data files used to store +// SMST of work sessions are created. +func WithStoresDirectory(storesDirectory string) relayer.RelayerSessionsManagerOption { + return func(relSessionMgr relayer.RelayerSessionsManager) { + relSessionMgr.(*relayerSessionsManager).storesDirectory = storesDirectory + } +} diff --git a/pkg/relayer/session/session.go b/pkg/relayer/session/session.go index 0fc37c0d3..7a45880e1 100644 --- a/pkg/relayer/session/session.go +++ b/pkg/relayer/session/session.go @@ -5,6 +5,7 @@ import ( "log" "sync" + "cosmossdk.io/depinject" "github.com/pokt-network/poktroll/pkg/client" "github.com/pokt-network/poktroll/pkg/observable" "github.com/pokt-network/poktroll/pkg/observable/channel" @@ -22,9 +23,6 @@ type relayerSessionsManager struct { // sessionsToClaim notifies about sessions that are ready to be claimed. sessionsToClaim observable.Observable[relayer.SessionTree] - // sessionsToClaimPublisher is the channel used to publish sessions to claim. - sessionsToClaimPublisher chan<- relayer.SessionTree - // sessionTrees is a map of block heights pointing to a map of SessionTrees // indexed by their sessionId. // The block height index is used to know when the sessions contained in the entry should be closed, @@ -42,22 +40,35 @@ type relayerSessionsManager struct { // NewRelayerSessions creates a new relayerSessions. func NewRelayerSessions( ctx context.Context, - storesDirectory string, - blockClient client.BlockClient, -) relayer.RelayerSessionsManager { + deps depinject.Config, + opts ...relayer.RelayerSessionsManagerOption, +) (relayer.RelayerSessionsManager, error) { rs := &relayerSessionsManager{ - sessionsTrees: make(sessionsTreesMap), - storesDirectory: storesDirectory, - blockClient: blockClient, + sessionsTrees: make(sessionsTreesMap), + } + + if err := depinject.Inject( + deps, + &rs.blockClient, + ); err != nil { + return nil, err + } + + for _, opt := range opts { + opt(rs) + } + + if err := rs.validateConfig(); err != nil { + return nil, err } rs.sessionsToClaim = channel.MapExpand[client.Block, relayer.SessionTree]( ctx, - blockClient.CommittedBlocksSequence(ctx), + rs.blockClient.CommittedBlocksSequence(ctx), rs.mapBlockToSessionsToClaim, ) - return rs + return rs, nil } // SessionsToClaim returns an observable that notifies when sessions are ready to be claimed. @@ -136,3 +147,13 @@ func (rs *relayerSessionsManager) removeFromRelayerSessions(sessionHeader *sessi delete(rs.sessionsTrees, sessionHeader.SessionEndBlockHeight) } } + +// validateConfig validates the relayerSessionsManager's configuration. +// TODO_TEST: Add unit tests to validate these configurations. +func (rp *relayerSessionsManager) validateConfig() error { + if rp.storesDirectory == "" { + return ErrSessionTreeUndefinedStoresDirectory + } + + return nil +} diff --git a/pkg/relayer/session/sessiontree.go b/pkg/relayer/session/sessiontree.go index 187633924..fb27ff307 100644 --- a/pkg/relayer/session/sessiontree.go +++ b/pkg/relayer/session/sessiontree.go @@ -69,7 +69,7 @@ func NewSessionTree( // Make sure storePath does not exist when creating a new SessionTree if _, err := os.Stat(storePath); !os.IsNotExist(err) { - return nil, ErrSessionStorePathExists + return nil, ErrSessionTreeUndefinedStoresDirectory } treeStore, err := smt.NewKVStore(storePath)