Skip to content

Commit

Permalink
feat: implement relayerSessions and sessionTree (#105)
Browse files Browse the repository at this point in the history
* Implement MsgStakeSupplier

* Unit tests pass

* Rename some test vars

* Fix typo

* Unstake - WIP

* Update makefile

* Finished implementing unstake

* Self review

* [WIP] Session Hydrator

* snapshot commit

* Working on the tests

* One HydrateSessionTest ready

* Self review - before finishing tests

* Prepared templates for all of the unit tests

* feat: add the map channel observable operator

(cherry picked from commit 22371aa550eb0060b528f4573ba6908bbdfa0c1c)

* Implemented TestSession_HydrateSession_Metadata

* feat: add replay observable

(cherry picked from commit ab21790164ab544ae5f1508d3237a3faab33e71e)

* Implemented TestSession_HydrateSession_SessionId

* chore: add query client interface

* chore: add query client errors

* Implemented TestSession_HydrateSession_Success_BaseCase

* Implemented TestSession_HydrateSession_Application

* Option attempt for tests

* Implemented TestSession_HydrateSession_Suppliers

* test: fix false positive, prevent regression, & add comments

* chore: add godoc comment

* feat: add query client implementation

* chore: add connection & dialer wrapper implementations

* test: query client & add testquery helper pkg

* chore: add go_test_integration make target

* chore: add internal mocks pkg

* test: query client integration test

* docs: add event query client docs

* chore: update go.mod

* chore: re-order `eventsQueryClient` methods to improve readability

* chore: add godoc comments to testclient helpers

* fix: comment formatting

* chore: improve comment & naming in evt query client test

* test: tune events query client parameters

* chore: improve godoc comments

* chore: review improvements

* refactor: `replayObservable` as its own interface type

* refactor: `replayObservable#Next() V`  to `ReplayObservable#Last(ctx, n) []V`

* chore: add constructor func for `ReplayObservable`

* test: reorder to improve readibility

* refactor: rename and add godoc comments

* chore: improve naming & comments

* chore: add warning log and improve comments

* test: improve and add tests

* fix: interface assertion

* fix: comment typo

* chore: review improvements

* fix: race

* chore: add block client interface

* chore: add `MapReplay` operator

* feat: add block client

* test: block client integration

* test: block client

* docs: fix install instructions

* fix: race on eventsBytesAndConns map

* fix: interface assertions

Co-authored-by: Redouane Lakrache <[email protected]>

* fix: race

* Apply suggestions from code review

Co-authored-by: Bryan White <[email protected]>
Co-authored-by: Redouane Lakrache <[email protected]>

* [RelayerProxy] feat: implement relayerProxy struct (#82)

* feat: add notifiable observable

Co-authored-by: red-0ne <[email protected]>

* fixup: observable

(cherry picked from commit bcf700405b5e4bd71bf9bb650c988526fa16c728)

* refactor/fix: notifiable observable improvements

* chore: more review improvements

* refactor: renaming

- `notifiable` pkg to `channel`
- `notifiableObservable` struct to `channelObservable`
- `observer` struct to `channelObserver`
- `notifier` vars to `producer`
- `notifee` vars to `observable` (or similar)

* chore: update comments

* refactor: simplify drainCh test helper

* test: fix timeout

* test: rename observable test functions

* test: add test TODOs

* chore: update comments

* refactor: simplify observable & observer

* test: fix & add observable tests

* test: cleanup & comment observable tests

* fixup: observable

(cherry picked from commit 33f3196535b7dae154e01f93aab36f70cda8fc4f)

* fixup: observable test

(cherry picked from commit 9c206da115dc35843d588313c2215a0e649c6df6)

* refactor: simplify & cleanup

* chore: cleanup logs & comments

* chore: improve comments

* refactor: DrainChannel test helper

* shore: cleanup & simplify

* test: comment out flaky test cases

* fixup: drain channel helper

* chore: improve var name

* fixup: drain channel helper

* test: shorten timeout

* chore: cleanup

* chore: cleanup, simplification, review improvements

(cherry picked from commit 92a547da29ec526d415f6967ccfa5988c3f5ca1d)

* chore: improve comments

Co-authored-by: Daniel Olshansky <[email protected]>

* chore: improve comments

Co-authored-by: Daniel Olshansky <[email protected]>

* refactor: rename `Observable#Close()` to `#UnsubscribeAll()`

* chore: improve comments

* chore: misc. review feedback improvements

* chore: improve comment

* chore: review improvements

* chore: last minute improvements

* feat: add RelayerProxy interface

* Fix grammar in comments

Co-authored-by: Daniel Olshansky <[email protected]>

* chore: rename package to relayerproxy

* feat: implement relayerProxy struct and its constructor args

* fix: change directory structure

* fix: change directory structure

* chore: address change requests

* chore: comment unavailable interface and its usage

---------

Co-authored-by: Bryan White <[email protected]>
Co-authored-by: Daniel Olshansky <[email protected]>

* fix: RelayerProxy interface mismatch (#91)

* [Observable] chore: observable touchup (#83)

* chore: add `Observer#IsClosed()` to prevent  redundant unsubscription

(cherry picked from commit 78a9946b3f14353e79b123919416903d4622da4d)

* chore: simplify channel observable

(cherry picked from commit a2629c8bc3decfb5a787e453af67aa78fc8ca1ea)

* test: add case for publisher w/ large buffer size, comment, & cleanup

(cherry picked from commit e97b691e39af8fa1654b8d697a3b34095b32ed82)

* docs: update observable pkg README.md

(cherry picked from commit d5442c7062630d847e048850fa71806086f84172)

* doc: fix pkg README template

* chore: add `Observable#Next()`

(cherry picked from commit cb4142f673fee37ead8520394e314f1fcb9d0dc9)

* chore: update godoc comments

* feat: seperate tests from go_develop (#89)

* [E2E] Add Regression Testing for Send E2E Feature Test (#84)

* chore: enforce go standard interface implementation registration (#87)

* [Miner] feat: add the map channel observable operator (#92)

* feat: add the map channel observable operator

(cherry picked from commit 22371aa550eb0060b528f4573ba6908bbdfa0c1c)

* test: fix false positive, prevent regression, & add comments

* chore: add godoc comment

* chore: review improvements

* Reply to Red0ne's comments

* Update small comment

* Small self review

* Updated TestSession_GetSession

* Fixed last failing test

* feat: add the interfaces for the RelayerSessions and SessionTree

* chore: address change requests

* chore: move-up comment

* feat: add ExpiringSessions to RelaySessions interface

* fix: use appropriate function name in comment

Co-authored-by: Daniel Olshansky <[email protected]>

* feat: implement relayerSessions and sessionTree

* chore: improve comments

* chore: address change requests

* chore: remove alias types for sessionId and block height

* chore: remove previous merge changes

* chore: remove added gitkeep

* chore: go mod tidy

* fix: use conventional module paths

* chore: address change request from PRs 104 & 105

* chore: wrap long comments

* chore: address change requests

---------

Co-authored-by: Daniel Olshansky <[email protected]>
Co-authored-by: Bryan White <[email protected]>
Co-authored-by: harry <[email protected]>
  • Loading branch information
4 people authored Nov 8, 2023
1 parent 937b6f9 commit a1c15cf
Show file tree
Hide file tree
Showing 7 changed files with 410 additions and 2 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
cosmossdk.io/math v1.0.1
github.com/cometbft/cometbft v0.37.2
github.com/cometbft/cometbft-db v0.8.0
github.com/cosmos/cosmos-proto v1.0.0-beta.2
github.com/cosmos/cosmos-sdk v0.47.3
github.com/cosmos/gogoproto v1.4.10
github.com/cosmos/ibc-go/v7 v7.1.0
Expand All @@ -27,6 +28,7 @@ require (
go.uber.org/multierr v1.11.0
golang.org/x/crypto v0.12.0
golang.org/x/sync v0.3.0
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1
google.golang.org/grpc v1.56.1
gopkg.in/yaml.v2 v2.4.0
)
Expand Down Expand Up @@ -70,7 +72,6 @@ require (
github.com/containerd/cgroups v1.1.0 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/cosmos/btcutil v1.0.5 // indirect
github.com/cosmos/cosmos-proto v1.0.0-beta.2 // indirect
github.com/cosmos/go-bip39 v1.0.0 // indirect
github.com/cosmos/gogogateway v1.2.0 // indirect
github.com/cosmos/iavl v0.20.0 // indirect
Expand Down Expand Up @@ -266,7 +267,6 @@ require (
gonum.org/v1/gonum v0.11.0 // indirect
google.golang.org/api v0.122.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
53 changes: 53 additions & 0 deletions pkg/relayer/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package relayer

import (
"github.com/pokt-network/smt"

"github.com/pokt-network/poktroll/pkg/observable"
sessiontypes "github.com/pokt-network/poktroll/x/session/types"
)

// RelayerSessionsManager is an interface for managing the relayer's sessions and Sparse
// Merkle Sum Trees (SMSTs). It provides notifications about closing sessions that are
// ready to be claimed, and handles the creation and retrieval of SMSTs for a given session.
// It also handles the creation and retrieval of SMSTs for a given session.
type RelayerSessionsManager interface {
// SessionsToClaim returns an observable that notifies of sessions ready to be claimed.
SessionsToClaim() observable.Observable[SessionTree]

// EnsureSessionTree returns the SMST (Sparse Merkle State Tree) for a given session.
// It is used to retrieve the SMST and update it when a Relay has been successfully served.
// If the session is seen for the first time, it creates a new SMST for it before returning it.
// An error is returned if the corresponding KVStore for SMST fails to be created.
EnsureSessionTree(session *sessiontypes.Session) (SessionTree, error)
}

// SessionTree is an interface that wraps an SMST (Sparse Merkle State Tree) and its corresponding session.
type SessionTree interface {
// GetSession returns the session corresponding to the SMST.
GetSession() *sessiontypes.Session

// Update is a wrapper for the SMST's Update function. It updates the SMST with
// the given key, value, and weight.
// This function should be called when a Relay has been successfully served.
Update(key, value []byte, weight uint64) error

// ProveClosest is a wrapper for the SMST's ProveClosest function. It returns the
// proof for the given path.
// This function should be called several blocks after a session has been claimed and needs to be proven.
ProveClosest(path []byte) (proof *smt.SparseMerkleClosestProof, err error)

// Flush gets the root hash of the SMST needed for submitting the claim;
// then commits the entire tree to disk and stops the KVStore.
// It should be called before submitting the claim on-chain. This function frees up
// the in-memory resources used by the SMST that are no longer needed while waiting
// for the proof submission window to open.
Flush() (SMSTRoot []byte, err error)

// TODO_DISCUSS: This function should not be part of the interface as it is an optimization
// aiming to free up KVStore resources after the proof is no longer needed.
// Delete deletes the SMST from the KVStore.
// WARNING: This function should be called only after the proof has been successfully
// submitted on-chain and the servicer has confirmed that it has been rewarded.
Delete() error
}
11 changes: 11 additions & 0 deletions pkg/relayer/session/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package session

import sdkerrors "cosmossdk.io/errors"

var (
codespace = "relayer/session"
ErrSessionTreeClosed = sdkerrors.Register(codespace, 1, "session tree already closed")
ErrSessionTreeNotClosed = sdkerrors.Register(codespace, 2, "session tree not closed")
ErrSessionStorePathExists = sdkerrors.Register(codespace, 3, "session store path already exists")
ErrSessionTreeProofPathMismatch = sdkerrors.Register(codespace, 4, "session tree proof path mismatch")
)
128 changes: 128 additions & 0 deletions pkg/relayer/session/session.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package session

import (
"context"
"log"
"sync"

blockclient "github.com/pokt-network/poktroll/pkg/client"
"github.com/pokt-network/poktroll/pkg/observable"
"github.com/pokt-network/poktroll/pkg/observable/channel"
"github.com/pokt-network/poktroll/pkg/relayer"
sessiontypes "github.com/pokt-network/poktroll/x/session/types"
)

var _ relayer.RelayerSessionsManager = (*relayerSessionsManager)(nil)

type sessionsTreesMap = map[int64]map[string]relayer.SessionTree

// relayerSessionsManager is an implementation of the RelayerSessions interface.
// TODO_TEST: Add tests to the relayerSessionsManager.
type relayerSessionsManager struct {
// sessionsToClaim notifies about sessions that are ready to be claimed.
sessionsToClaim observable.Observable[relayer.SessionTree]

// sessionsToClaimPublisher is the channel used to publish sessions to claim.
sessionsToClaimPublisher chan<- relayer.SessionTree

// sessionTrees is a map of block heights pointing to a map of SessionTrees
// indexed by their sessionId.
// The block height index is used to know when the sessions contained in the entry should be closed,
// this helps to avoid iterating over all sessionsTrees to check if they are ready to be closed.
sessionsTrees sessionsTreesMap
sessionsTreesMu *sync.Mutex

// blockClient is used to get the notifications of committed blocks.
blockClient blockclient.BlockClient

// storesDirectory points to a path on disk where KVStore data files are created.
storesDirectory string
}

// NewRelayerSessions creates a new relayerSessions.
func NewRelayerSessions(
ctx context.Context,
storesDirectory string,
blockClient blockclient.BlockClient,
) relayer.RelayerSessionsManager {
rs := &relayerSessionsManager{
sessionsTrees: make(sessionsTreesMap),
storesDirectory: storesDirectory,
blockClient: blockClient,
}
rs.sessionsToClaim, rs.sessionsToClaimPublisher = channel.NewObservable[relayer.SessionTree]()

go rs.goListenToCommittedBlocks(ctx)

return rs
}

// SessionsToClaim returns an observable that notifies when sessions are ready to be claimed.
func (rs *relayerSessionsManager) SessionsToClaim() observable.Observable[relayer.SessionTree] {
return rs.sessionsToClaim
}

// EnsureSessionTree returns the SessionTree for a given session.
// If no tree for the session exists, a new SessionTree is created before returning.
func (rs *relayerSessionsManager) EnsureSessionTree(session *sessiontypes.Session) (relayer.SessionTree, error) {
rs.sessionsTreesMu.Lock()
defer rs.sessionsTreesMu.Unlock()

// Calculate the session end height based on the session start block height
// and the number of blocks per session.
sessionEndHeight := session.Header.SessionStartBlockHeight + session.NumBlocksPerSession
sessionsTrees, ok := rs.sessionsTrees[sessionEndHeight]

// If there is no map for sessions at the sessionEndHeight, create one.
if !ok {
sessionsTrees = make(map[string]relayer.SessionTree)
rs.sessionsTrees[sessionEndHeight] = sessionsTrees
}

// Get the sessionTree for the given session.
sessionTree, ok := sessionsTrees[session.SessionId]

// If the sessionTree does not exist, create it.
if !ok {
sessionTree, err := NewSessionTree(session, rs.storesDirectory, rs.removeFromRelayerSessions)
if err != nil {
return nil, err
}

sessionsTrees[session.SessionId] = sessionTree
}

return sessionTree, nil
}

// goListenToCommittedBlocks listens to committed blocks so that rs.sessionsToClaimPublisher
// can notify when sessions are ready to be claimed.
// It is intended to be called as a background goroutine.
func (rs *relayerSessionsManager) goListenToCommittedBlocks(ctx context.Context) {
committedBlocks := rs.blockClient.CommittedBlocksSequence(ctx).Subscribe(ctx).Ch()

for block := range committedBlocks {
// Check if there are sessions to be closed at this block height.
if sessionsTrees, ok := rs.sessionsTrees[block.Height()]; ok {
// Iterate over the sessionsTrees that end at this block height and publish them.
for _, sessionTree := range sessionsTrees {
rs.sessionsToClaimPublisher <- sessionTree
}
}
}
}

// removeFromRelayerSessions removes the session from the relayerSessions.
func (rs *relayerSessionsManager) removeFromRelayerSessions(session *sessiontypes.Session) {
rs.sessionsTreesMu.Lock()
defer rs.sessionsTreesMu.Unlock()

sessionEndHeight := session.Header.SessionStartBlockHeight + session.NumBlocksPerSession
sessionsTrees, ok := rs.sessionsTrees[sessionEndHeight]
if !ok {
log.Print("session not found in relayerSessionsManager")
return
}

delete(sessionsTrees, session.SessionId)
}
3 changes: 3 additions & 0 deletions pkg/relayer/session/session_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package session_test

// TODO: Add tests to the relayerSessionsManager logic
Loading

0 comments on commit a1c15cf

Please sign in to comment.