diff --git a/go.mod b/go.mod index 70dafa1e4..b4658dfb1 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( cosmossdk.io/math v1.0.1 github.com/cometbft/cometbft v0.37.2 github.com/cometbft/cometbft-db v0.8.0 + github.com/cosmos/cosmos-proto v1.0.0-beta.2 github.com/cosmos/cosmos-sdk v0.47.3 github.com/cosmos/gogoproto v1.4.10 github.com/cosmos/ibc-go/v7 v7.1.0 @@ -27,6 +28,7 @@ require ( go.uber.org/multierr v1.11.0 golang.org/x/crypto v0.12.0 golang.org/x/sync v0.3.0 + google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 google.golang.org/grpc v1.56.1 gopkg.in/yaml.v2 v2.4.0 ) @@ -70,7 +72,6 @@ require ( github.com/containerd/cgroups v1.1.0 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/cosmos/btcutil v1.0.5 // indirect - github.com/cosmos/cosmos-proto v1.0.0-beta.2 // indirect github.com/cosmos/go-bip39 v1.0.0 // indirect github.com/cosmos/gogogateway v1.2.0 // indirect github.com/cosmos/iavl v0.20.0 // indirect @@ -266,7 +267,6 @@ require ( gonum.org/v1/gonum v0.11.0 // indirect google.golang.org/api v0.122.0 // indirect google.golang.org/appengine v1.6.7 // indirect - google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect google.golang.org/protobuf v1.31.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/pkg/relayer/interface.go b/pkg/relayer/interface.go new file mode 100644 index 000000000..68714efd7 --- /dev/null +++ b/pkg/relayer/interface.go @@ -0,0 +1,53 @@ +package relayer + +import ( + "github.com/pokt-network/smt" + + "github.com/pokt-network/poktroll/pkg/observable" + sessiontypes "github.com/pokt-network/poktroll/x/session/types" +) + +// RelayerSessionsManager is an interface for managing the relayer's sessions and Sparse +// Merkle Sum Trees (SMSTs). It provides notifications about closing sessions that are +// ready to be claimed, and handles the creation and retrieval of SMSTs for a given session. +// It also handles the creation and retrieval of SMSTs for a given session. +type RelayerSessionsManager interface { + // SessionsToClaim returns an observable that notifies of sessions ready to be claimed. + SessionsToClaim() observable.Observable[SessionTree] + + // EnsureSessionTree returns the SMST (Sparse Merkle State Tree) for a given session. + // It is used to retrieve the SMST and update it when a Relay has been successfully served. + // If the session is seen for the first time, it creates a new SMST for it before returning it. + // An error is returned if the corresponding KVStore for SMST fails to be created. + EnsureSessionTree(session *sessiontypes.Session) (SessionTree, error) +} + +// SessionTree is an interface that wraps an SMST (Sparse Merkle State Tree) and its corresponding session. +type SessionTree interface { + // GetSession returns the session corresponding to the SMST. + GetSession() *sessiontypes.Session + + // Update is a wrapper for the SMST's Update function. It updates the SMST with + // the given key, value, and weight. + // This function should be called when a Relay has been successfully served. + Update(key, value []byte, weight uint64) error + + // ProveClosest is a wrapper for the SMST's ProveClosest function. It returns the + // proof for the given path. + // This function should be called several blocks after a session has been claimed and needs to be proven. + ProveClosest(path []byte) (proof *smt.SparseMerkleClosestProof, err error) + + // Flush gets the root hash of the SMST needed for submitting the claim; + // then commits the entire tree to disk and stops the KVStore. + // It should be called before submitting the claim on-chain. This function frees up + // the in-memory resources used by the SMST that are no longer needed while waiting + // for the proof submission window to open. + Flush() (SMSTRoot []byte, err error) + + // TODO_DISCUSS: This function should not be part of the interface as it is an optimization + // aiming to free up KVStore resources after the proof is no longer needed. + // Delete deletes the SMST from the KVStore. + // WARNING: This function should be called only after the proof has been successfully + // submitted on-chain and the servicer has confirmed that it has been rewarded. + Delete() error +} diff --git a/pkg/relayer/session/errors.go b/pkg/relayer/session/errors.go new file mode 100644 index 000000000..adf5a403b --- /dev/null +++ b/pkg/relayer/session/errors.go @@ -0,0 +1,11 @@ +package session + +import sdkerrors "cosmossdk.io/errors" + +var ( + codespace = "relayer/session" + ErrSessionTreeClosed = sdkerrors.Register(codespace, 1, "session tree already closed") + ErrSessionTreeNotClosed = sdkerrors.Register(codespace, 2, "session tree not closed") + ErrSessionStorePathExists = sdkerrors.Register(codespace, 3, "session store path already exists") + ErrSessionTreeProofPathMismatch = sdkerrors.Register(codespace, 4, "session tree proof path mismatch") +) diff --git a/pkg/relayer/session/session.go b/pkg/relayer/session/session.go new file mode 100644 index 000000000..17b1282c0 --- /dev/null +++ b/pkg/relayer/session/session.go @@ -0,0 +1,128 @@ +package session + +import ( + "context" + "log" + "sync" + + blockclient "github.com/pokt-network/poktroll/pkg/client" + "github.com/pokt-network/poktroll/pkg/observable" + "github.com/pokt-network/poktroll/pkg/observable/channel" + "github.com/pokt-network/poktroll/pkg/relayer" + sessiontypes "github.com/pokt-network/poktroll/x/session/types" +) + +var _ relayer.RelayerSessionsManager = (*relayerSessionsManager)(nil) + +type sessionsTreesMap = map[int64]map[string]relayer.SessionTree + +// relayerSessionsManager is an implementation of the RelayerSessions interface. +// TODO_TEST: Add tests to the relayerSessionsManager. +type relayerSessionsManager struct { + // sessionsToClaim notifies about sessions that are ready to be claimed. + sessionsToClaim observable.Observable[relayer.SessionTree] + + // sessionsToClaimPublisher is the channel used to publish sessions to claim. + sessionsToClaimPublisher chan<- relayer.SessionTree + + // sessionTrees is a map of block heights pointing to a map of SessionTrees + // indexed by their sessionId. + // The block height index is used to know when the sessions contained in the entry should be closed, + // this helps to avoid iterating over all sessionsTrees to check if they are ready to be closed. + sessionsTrees sessionsTreesMap + sessionsTreesMu *sync.Mutex + + // blockClient is used to get the notifications of committed blocks. + blockClient blockclient.BlockClient + + // storesDirectory points to a path on disk where KVStore data files are created. + storesDirectory string +} + +// NewRelayerSessions creates a new relayerSessions. +func NewRelayerSessions( + ctx context.Context, + storesDirectory string, + blockClient blockclient.BlockClient, +) relayer.RelayerSessionsManager { + rs := &relayerSessionsManager{ + sessionsTrees: make(sessionsTreesMap), + storesDirectory: storesDirectory, + blockClient: blockClient, + } + rs.sessionsToClaim, rs.sessionsToClaimPublisher = channel.NewObservable[relayer.SessionTree]() + + go rs.goListenToCommittedBlocks(ctx) + + return rs +} + +// SessionsToClaim returns an observable that notifies when sessions are ready to be claimed. +func (rs *relayerSessionsManager) SessionsToClaim() observable.Observable[relayer.SessionTree] { + return rs.sessionsToClaim +} + +// EnsureSessionTree returns the SessionTree for a given session. +// If no tree for the session exists, a new SessionTree is created before returning. +func (rs *relayerSessionsManager) EnsureSessionTree(session *sessiontypes.Session) (relayer.SessionTree, error) { + rs.sessionsTreesMu.Lock() + defer rs.sessionsTreesMu.Unlock() + + // Calculate the session end height based on the session start block height + // and the number of blocks per session. + sessionEndHeight := session.Header.SessionStartBlockHeight + session.NumBlocksPerSession + sessionsTrees, ok := rs.sessionsTrees[sessionEndHeight] + + // If there is no map for sessions at the sessionEndHeight, create one. + if !ok { + sessionsTrees = make(map[string]relayer.SessionTree) + rs.sessionsTrees[sessionEndHeight] = sessionsTrees + } + + // Get the sessionTree for the given session. + sessionTree, ok := sessionsTrees[session.SessionId] + + // If the sessionTree does not exist, create it. + if !ok { + sessionTree, err := NewSessionTree(session, rs.storesDirectory, rs.removeFromRelayerSessions) + if err != nil { + return nil, err + } + + sessionsTrees[session.SessionId] = sessionTree + } + + return sessionTree, nil +} + +// goListenToCommittedBlocks listens to committed blocks so that rs.sessionsToClaimPublisher +// can notify when sessions are ready to be claimed. +// It is intended to be called as a background goroutine. +func (rs *relayerSessionsManager) goListenToCommittedBlocks(ctx context.Context) { + committedBlocks := rs.blockClient.CommittedBlocksSequence(ctx).Subscribe(ctx).Ch() + + for block := range committedBlocks { + // Check if there are sessions to be closed at this block height. + if sessionsTrees, ok := rs.sessionsTrees[block.Height()]; ok { + // Iterate over the sessionsTrees that end at this block height and publish them. + for _, sessionTree := range sessionsTrees { + rs.sessionsToClaimPublisher <- sessionTree + } + } + } +} + +// removeFromRelayerSessions removes the session from the relayerSessions. +func (rs *relayerSessionsManager) removeFromRelayerSessions(session *sessiontypes.Session) { + rs.sessionsTreesMu.Lock() + defer rs.sessionsTreesMu.Unlock() + + sessionEndHeight := session.Header.SessionStartBlockHeight + session.NumBlocksPerSession + sessionsTrees, ok := rs.sessionsTrees[sessionEndHeight] + if !ok { + log.Print("session not found in relayerSessionsManager") + return + } + + delete(sessionsTrees, session.SessionId) +} diff --git a/pkg/relayer/session/session_test.go b/pkg/relayer/session/session_test.go new file mode 100644 index 000000000..9efb49710 --- /dev/null +++ b/pkg/relayer/session/session_test.go @@ -0,0 +1,3 @@ +package session_test + +// TODO: Add tests to the relayerSessionsManager logic diff --git a/pkg/relayer/session/sessiontree.go b/pkg/relayer/session/sessiontree.go new file mode 100644 index 000000000..b97711451 --- /dev/null +++ b/pkg/relayer/session/sessiontree.go @@ -0,0 +1,210 @@ +package session + +import ( + "bytes" + "crypto/sha256" + "os" + "path/filepath" + "sync" + + "github.com/pokt-network/smt" + + "github.com/pokt-network/poktroll/pkg/relayer" + sessiontypes "github.com/pokt-network/poktroll/x/session/types" +) + +var _ relayer.SessionTree = (*sessionTree)(nil) + +// sessionTree is an implementation of the SessionTree interface. +// TODO_TEST: Add tests to the sessionTree. +type sessionTree struct { + // sessionMu is a mutex used to protect sessionTree operations from concurrent access. + sessionMu *sync.Mutex + + // session is the Session corresponding to the SMST (Sparse Merkle State Tree). + session *sessiontypes.Session + + // tree is the SMST (Sparse Merkle State Tree) corresponding the session. + tree *smt.SMST + + // claimedRoot is the root hash of the SMST needed for submitting the claim. + // If it holds a non-nil value, it means that the SMST has been flushed, + // committed to disk and no more updates can be made to it. A non-nil value also + // indicates that a proof could be generated using ProveClosest function. + claimedRoot []byte + + // proofPath is the path for which the proof was generated. + proofPath []byte + + // proof is the generated proof for the session given a proofPath. + proof *smt.SparseMerkleClosestProof + + // treeStore is the KVStore used to store the SMST. + treeStore smt.KVStore + + // storePath is the path to the KVStore used to store the SMST. + // It is created from the storePrefix and the session.sessionId. + // We keep track of it so we can use it at the end of the claim/proof lifecycle + // to delete the KVStore when it is no longer needed. + storePath string + + // removeFromRelayerSessions is a function that removes the sessionTree from + // the RelayerSessionsManager. + // Since the sessionTree has no knowledge of the RelayerSessionsManager, + // we pass this callback from the session manager to the sessionTree so + // it can remove itself from the RelayerSessionsManager when it is no longer needed. + removeFromRelayerSessions func(session *sessiontypes.Session) +} + +// NewSessionTree creates a new sessionTree from a Session and a storePrefix. It also takes a function +// removeFromRelayerSessions that removes the sessionTree from the RelayerSessionsManager. +// It returns an error if the KVStore fails to be created. +func NewSessionTree( + session *sessiontypes.Session, + storesDirectory string, + removeFromRelayerSessions func(session *sessiontypes.Session), +) (relayer.SessionTree, error) { + // Join the storePrefix and the session.sessionId to create a unique storePath + storePath := filepath.Join(storesDirectory, session.SessionId) + + // Make sure storePath does not exist when creating a new SessionTree + if _, err := os.Stat(storePath); !os.IsNotExist(err) { + return nil, ErrSessionStorePathExists + } + + treeStore, err := smt.NewKVStore(storePath) + if err != nil { + return nil, err + } + + // Create the SMST from the KVStore and a nil value hasher so the proof would + // contain a non-hashed Relay that could be used to validate the proof on-chain. + tree := smt.NewSparseMerkleSumTree(treeStore, sha256.New(), smt.WithValueHasher(nil)) + + sessionTree := &sessionTree{ + session: session, + storePath: storePath, + treeStore: treeStore, + tree: tree, + + removeFromRelayerSessions: removeFromRelayerSessions, + } + + return sessionTree, nil +} + +// GetSession returns the session corresponding to the SMST. +func (st *sessionTree) GetSession() *sessiontypes.Session { + return st.session +} + +// Update is a wrapper for the SMST's Update function. It updates the SMST with +// the given key, value, and weight. +// This function should be called by the Miner when a Relay has been successfully served. +// It returns an error if the SMST has been flushed to disk which indicates +// that updates are no longer allowed. +func (st *sessionTree) Update(key, value []byte, weight uint64) error { + st.sessionMu.Lock() + defer st.sessionMu.Unlock() + + if st.claimedRoot != nil { + return ErrSessionTreeClosed + } + + return st.tree.Update(key, value, weight) +} + +// ProveClosest is a wrapper for the SMST's ProveClosest function. It returns a proof for the given path. +// This function is intended to be called after a session has been claimed and needs to be proven. +// If the proof has already been generated, it returns the cached proof. +// It returns an error if the SMST has not been flushed yet (the claim has not been generated) +func (st *sessionTree) ProveClosest(path []byte) (proof *smt.SparseMerkleClosestProof, err error) { + st.sessionMu.Lock() + defer st.sessionMu.Unlock() + + // A claim need to be generated before a proof can be generated. + if st.claimedRoot == nil { + return nil, ErrSessionTreeNotClosed + } + + // If the proof has already been generated, return the cached proof. + if st.proof != nil { + // Make sure the path is the same as the one for which the proof was generated. + if !bytes.Equal(path, st.proofPath) { + return nil, ErrSessionTreeProofPathMismatch + } + + return st.proof, nil + } + + // Restore the KVStore from disk since it has been closed after the claim has been generated. + st.treeStore, err = smt.NewKVStore(st.storePath) + if err != nil { + return nil, err + } + + st.tree = smt.ImportSparseMerkleSumTree(st.treeStore, sha256.New(), st.claimedRoot, smt.WithValueHasher(nil)) + + // Generate the proof and cache it along with the path for which it was generated. + st.proof, err = st.tree.ProveClosest(path) + st.proofPath = path + + return st.proof, err +} + +// Flush gets the root hash of the SMST needed for submitting the claim; +// then commits the entire tree to disk and stops the KVStore. +// It should be called before submitting the claim on-chain. This function frees up the KVStore resources. +// If the SMST has already been flushed to disk, it returns the cached root hash. +func (st *sessionTree) Flush() (SMSTRoot []byte, err error) { + st.sessionMu.Lock() + defer st.sessionMu.Unlock() + + // We already have the root hash, return it. + if st.claimedRoot != nil { + return st.claimedRoot, nil + } + + st.claimedRoot = st.tree.Root() + + // Commit the tree to disk + if err := st.tree.Commit(); err != nil { + return nil, err + } + + // Stop the KVStore + if err := st.treeStore.Stop(); err != nil { + return nil, err + } + + st.treeStore = nil + st.tree = nil + + return st.claimedRoot, nil +} + +// Delete deletes the SMST from the KVStore and removes the sessionTree from the RelayerSessionsManager. +// WARNING: This function deletes the KVStore associated to the session and should be +// called only after the proof has been successfully submitted on-chain and the servicer +// has confirmed that it has been rewarded. +func (st *sessionTree) Delete() error { + st.sessionMu.Lock() + defer st.sessionMu.Unlock() + + st.removeFromRelayerSessions(st.session) + + if err := st.treeStore.ClearAll(); err != nil { + return err + } + + if err := st.treeStore.Stop(); err != nil { + return err + } + + // Delete the KVStore from disk + if err := os.RemoveAll(st.storePath); err != nil { + return err + } + + return nil +} diff --git a/pkg/relayer/session/sessiontree_test.go b/pkg/relayer/session/sessiontree_test.go new file mode 100644 index 000000000..4e199dcfe --- /dev/null +++ b/pkg/relayer/session/sessiontree_test.go @@ -0,0 +1,3 @@ +package session_test + +// TODO: Add tests to the sessionTree logic