diff --git a/pkg/either/types.go b/pkg/either/types.go index ae7092479..4f5f53f00 100644 --- a/pkg/either/types.go +++ b/pkg/either/types.go @@ -1,9 +1,12 @@ package either +import "github.com/pokt-network/poktroll/pkg/relayer" + type ( // AsyncError represents a value which could either be a synchronous error or // an asynchronous error (sent through a channel). It wraps the more generic // `Either` type specific for error channels. - AsyncError Either[chan error] - Bytes = Either[[]byte] + AsyncError Either[chan error] + Bytes = Either[[]byte] + SessionTree = Either[relayer.SessionTree] ) diff --git a/pkg/observable/types.go b/pkg/observable/types.go new file mode 100644 index 000000000..04df98201 --- /dev/null +++ b/pkg/observable/types.go @@ -0,0 +1,5 @@ +package observable + +type ( + Error = Observable[error] +) diff --git a/pkg/relayer/interface.go b/pkg/relayer/interface.go index 24ef17dfd..134ac8f53 100644 --- a/pkg/relayer/interface.go +++ b/pkg/relayer/interface.go @@ -7,10 +7,23 @@ import ( "github.com/pokt-network/poktroll/pkg/observable" "github.com/pokt-network/poktroll/x/service/types" + servicetypes "github.com/pokt-network/poktroll/x/service/types" sessiontypes "github.com/pokt-network/poktroll/x/session/types" sharedtypes "github.com/pokt-network/poktroll/x/shared/types" ) +// Miner is responsible for observing servedRelayObs, hashing and checking the +// difficulty of each, finally publishing those with sufficient difficulty to +// minedRelayObs as they are applicable for relay volume. +type Miner interface { + MinedRelays( + ctx context.Context, + servedRelayObs observable.Observable[*servicetypes.Relay], + ) (minedRelaysObs observable.Observable[*MinedRelay]) +} + +type MinerOption func(Miner) + // RelayerProxy is the interface for the proxy that serves relays to the application. // It is responsible for starting and stopping all supported RelayServers. // While handling requests and responding in a closed loop, it also notifies @@ -59,19 +72,32 @@ type RelayServer interface { Service() *sharedtypes.Service } -// RelayerSessionsManager is an interface for managing the relayer's sessions and Sparse -// Merkle Sum Trees (SMSTs). It provides notifications about closing sessions that are -// ready to be claimed, and handles the creation and retrieval of SMSTs for a given session. -// It also handles the creation and retrieval of SMSTs for a given session. +// RelayerSessionsManager is responsible for managing the relayer's session lifecycles. +// It handles the creation and retrieval of SMSTs (trees) for a given session, as +// well as the respective and subsequent claim creation and proof submission. +// This is largely accomplished by pipelining observables of relays and sessions +// through a series of map operations. +// +// TODO_TECHDEBT: add architecture diagrams covering observable flows throughout +// the relayer package. type RelayerSessionsManager interface { - // SessionsToClaim returns an observable that notifies of sessions ready to be claimed. - SessionsToClaim() observable.Observable[SessionTree] - - // EnsureSessionTree returns the SMST (Sparse Merkle State Tree) for a given session header. - // It is used to retrieve the SMST and update it when a Relay has been successfully served. - // If the session is seen for the first time, it creates a new SMST for it before returning it. - // An error is returned if the corresponding KVStore for SMST fails to be created. - EnsureSessionTree(sessionHeader *sessiontypes.SessionHeader) (SessionTree, error) + // InsertRelays receives an observable of relays that should be included + // in their respective session's SMST (tree). + InsertRelays(minedRelaysObs observable.Observable[*MinedRelay]) + + // Start iterates over the session trees at the end of each, respective, session. + // The session trees are piped through a series of map operations which progress + // them through the claim/proof lifecycle, broadcasting transactions to the + // network as necessary. + Start(ctx context.Context) + + // Stop unsubscribes all observables from the InsertRelays observable which + // will close downstream observables as they drain. + // + // TODO_TECHDEBT: Either add a mechanism to wait for draining to complete + // and/or ensure that the state at each pipeline stage is persisted to disk + // and exit as early as possible. + Stop() } type RelayerSessionsManagerOption func(RelayerSessionsManager) diff --git a/pkg/relayer/miner/miner.go b/pkg/relayer/miner/miner.go new file mode 100644 index 000000000..79d905ac7 --- /dev/null +++ b/pkg/relayer/miner/miner.go @@ -0,0 +1,122 @@ +package miner + +import ( + "context" + "crypto/sha256" + "hash" + + "github.com/pokt-network/poktroll/pkg/either" + "github.com/pokt-network/poktroll/pkg/observable" + "github.com/pokt-network/poktroll/pkg/observable/channel" + "github.com/pokt-network/poktroll/pkg/observable/filter" + "github.com/pokt-network/poktroll/pkg/observable/logging" + "github.com/pokt-network/poktroll/pkg/relayer" + "github.com/pokt-network/poktroll/pkg/relayer/protocol" + servicetypes "github.com/pokt-network/poktroll/x/service/types" +) + +var ( + _ relayer.Miner = (*miner)(nil) + defaultRelayHasher = sha256.New + // TODO_BLOCKER: query on-chain governance params once available. + // Setting this to 0 to effectively disables mining for now. + // I.e., all relays are added to the tree. + defaultRelayDifficulty = 0 +) + +// Miner is responsible for observing servedRelayObs, hashing and checking the +// difficulty of each, finally publishing those with sufficient difficulty to +// minedRelayObs as they are applicable for relay volume. +// +// TODO_BLOCKER: The relay hashing and relay difficulty mechanisms & values must come +type miner struct { + // relayHasher is a function which returns a hash.Hash interfact type. It is + // used to hash serialized relays to measure their mining difficulty. + relayHasher func() hash.Hash + // relayDifficulty is the minimum difficulty that a relay must have to be + // volume / reward applicable. + relayDifficulty int +} + +// NewMiner creates a new miner from the given dependencies and options. It +// returns an error if it has not been sufficiently configured or supplied. +func NewMiner( + opts ...relayer.MinerOption, +) (*miner, error) { + mnr := &miner{} + + for _, opt := range opts { + opt(mnr) + } + + mnr.setDefaults() + + return mnr, nil +} + +// MinedRelays maps servedRelaysObs through a pipeline which: +// 1. Hashes the relay +// 2. Checks if it's above the mining difficulty +// 3. Adds it to the session tree if so +// It DOES NOT BLOCK as map operations run in their own goroutines. +func (mnr *miner) MinedRelays( + ctx context.Context, + servedRelaysObs observable.Observable[*servicetypes.Relay], +) observable.Observable[*relayer.MinedRelay] { + // Map servedRelaysObs to a new observable of an either type, populated with + // the minedRelay or an error. It is notified after the relay has been mined + // or an error has been encountered, respectively. + eitherMinedRelaysObs := channel.Map(ctx, servedRelaysObs, mnr.mapMineRelay) + logging.LogErrors(ctx, filter.EitherError(ctx, eitherMinedRelaysObs)) + + return filter.EitherSuccess(ctx, eitherMinedRelaysObs) +} + +// setDefaults ensures that the miner has been configured with a hasherConstructor and uses +// the default hasherConstructor if not. +func (mnr *miner) setDefaults() { + if mnr.relayHasher == nil { + mnr.relayHasher = defaultRelayHasher + } +} + +// mapMineRelay is intended to be used as a MapFn. +// 1. It hashes the relay and compares its difficult to the minimum threshold. +// 2. If the relay difficulty is sufficient -> return an Either[MineRelay Value] +// 3. If an error is encountered -> return an Either[error] +// 4. Otherwise, skip the relay. +func (mnr *miner) mapMineRelay( + _ context.Context, + relay *servicetypes.Relay, +) (_ either.Either[*relayer.MinedRelay], skip bool) { + relayBz, err := relay.Marshal() + if err != nil { + return either.Error[*relayer.MinedRelay](err), false + } + + // TODO_BLOCKER: Centralize the logic of hashing a relay. It should live + // alongside signing & verification. + // + // TODO_IMPROVE: We need to hash the key; it would be nice if smst.Update() could do it + // since smst has a reference to the hasherConstructor + relayHash := mnr.hash(relayBz) + + // The relay IS NOT volume / reward applicable + if !protocol.BytesDifficultyGreaterThan(relayHash, defaultRelayDifficulty) { + return either.Success[*relayer.MinedRelay](nil), true + } + + // The relay IS volume / reward applicable + return either.Success(&relayer.MinedRelay{ + Relay: *relay, + Bytes: relayBz, + Hash: relayHash, + }), false +} + +// hash constructs a new hasher and hashes the given input bytes. +func (mnr *miner) hash(inputBz []byte) []byte { + hasher := mnr.relayHasher() + hasher.Write(inputBz) + return hasher.Sum(nil) +} diff --git a/pkg/relayer/miner/miner_test.go b/pkg/relayer/miner/miner_test.go new file mode 100644 index 000000000..c362005bd --- /dev/null +++ b/pkg/relayer/miner/miner_test.go @@ -0,0 +1,10 @@ +package miner_test + +import ( + "testing" +) + +// TODO_TECHDEBT(@bryanchriswhite): add all the test coverage... +func TestNewMiner(t *testing.T) { + t.Skip("TODO_TECHDEBT(@bryanchriswhite): add all the test coverage...") +} diff --git a/pkg/relayer/protocol/block_heights.go b/pkg/relayer/protocol/block_heights.go new file mode 100644 index 000000000..b372376f7 --- /dev/null +++ b/pkg/relayer/protocol/block_heights.go @@ -0,0 +1,45 @@ +package protocol + +import ( + "encoding/binary" + "log" + "math/rand" + + "github.com/pokt-network/poktroll/pkg/client" +) + +// GetEarliestCreateClaimHeight returns the earliest block height at which a claim +// for a session with the given createClaimWindowStartHeight can be created. +// +// TODO_TEST(@bryanchriswhite): Add test coverage and more logs +func GetEarliestCreateClaimHeight(createClaimWindowStartBlock client.Block) int64 { + createClaimWindowStartBlockHash := createClaimWindowStartBlock.Hash() + log.Printf("using createClaimWindowStartBlock %d's hash %x as randomness", createClaimWindowStartBlock.Height(), createClaimWindowStartBlockHash) + rngSeed, _ := binary.Varint(createClaimWindowStartBlockHash) + randomNumber := rand.NewSource(rngSeed).Int63() + + // TODO_TECHDEBT: query the on-chain governance parameter once available. + // randCreateClaimHeightOffset := randomNumber % (claimproofparams.GovCreateClaimIntervalBlocks - claimproofparams.GovCreateClaimWindowBlocks - 1) + _ = randomNumber + randCreateClaimHeightOffset := int64(0) + + return createClaimWindowStartBlock.Height() + randCreateClaimHeightOffset +} + +// GetEarliestSubmitProofHeight returns the earliest block height at which a proof +// for a session with the given submitProofWindowStartHeight can be submitted. +// +// TODO_TEST(@bryanchriswhite): Add test coverage and more logs +func GetEarliestSubmitProofHeight(submitProofWindowStartBlock client.Block) int64 { + earliestSubmitProofBlockHash := submitProofWindowStartBlock.Hash() + log.Printf("using submitProofWindowStartBlock %d's hash %x as randomness", submitProofWindowStartBlock.Height(), earliestSubmitProofBlockHash) + rngSeed, _ := binary.Varint(earliestSubmitProofBlockHash) + randomNumber := rand.NewSource(rngSeed).Int63() + + // TODO_TECHDEBT: query the on-chain governance parameter once available. + // randSubmitProofHeightOffset := randomNumber % (claimproofparams.GovSubmitProofIntervalBlocks - claimproofparams.GovSubmitProofWindowBlocks - 1) + _ = randomNumber + randSubmitProofHeightOffset := int64(0) + + return submitProofWindowStartBlock.Height() + randSubmitProofHeightOffset +} diff --git a/pkg/relayer/protocol/difficulty.go b/pkg/relayer/protocol/difficulty.go new file mode 100644 index 000000000..4743d546d --- /dev/null +++ b/pkg/relayer/protocol/difficulty.go @@ -0,0 +1,17 @@ +package protocol + +import ( + "encoding/hex" + "strings" +) + +// TODO_BLOCKER: Revisit this part of the algorithm after initial TestNet Launch. +// TODO_TEST: Add extensive tests for the core relay mining business logic. +// BytesDifficultyGreaterThan determines if the bytes exceed a certain difficulty, and it +// is used to determine if a relay is volume applicable. See the spec for more details: https://github.com/pokt-network/pocket-network-protocol +func BytesDifficultyGreaterThan(bz []byte, compDifficultyBytes int) bool { + hexZerosPrefix := strings.Repeat("0", compDifficultyBytes*2) // 2 hex chars per byte. + hexBz := hex.EncodeToString(bz) + + return strings.HasPrefix(hexBz, hexZerosPrefix) +} diff --git a/pkg/relayer/session/claim.go b/pkg/relayer/session/claim.go new file mode 100644 index 000000000..712e7a9e5 --- /dev/null +++ b/pkg/relayer/session/claim.go @@ -0,0 +1,117 @@ +package session + +import ( + "context" + "log" + + "github.com/pokt-network/poktroll/pkg/either" + "github.com/pokt-network/poktroll/pkg/observable" + "github.com/pokt-network/poktroll/pkg/observable/channel" + "github.com/pokt-network/poktroll/pkg/observable/filter" + "github.com/pokt-network/poktroll/pkg/observable/logging" + "github.com/pokt-network/poktroll/pkg/relayer" + "github.com/pokt-network/poktroll/pkg/relayer/protocol" +) + +// createClaims maps over the sessionsToClaimObs observable. For each claim, it: +// 1. Calculates the earliest block height at which it is safe to CreateClaim +// 2. Waits for said block and creates the claim on-chain +// 3. Maps errors to a new observable and logs them +// 4. Returns an observable of the successfully claimed sessions +// It DOES NOT BLOCK as map operations run in their own goroutines. +func (rs *relayerSessionsManager) createClaims(ctx context.Context) observable.Observable[relayer.SessionTree] { + // Map sessionsToClaimObs to a new observable of the same type which is notified + // when the session is eligible to be claimed. + sessionsWithOpenClaimWindowObs := channel.Map( + ctx, rs.sessionsToClaimObs, + rs.mapWaitForEarliestCreateClaimHeight, + ) + + failedCreateClaimSessionsObs, failedCreateClaimSessionsPublishCh := + channel.NewObservable[relayer.SessionTree]() + + // Map sessionsWithOpenClaimWindowObs to a new observable of an either type, + // populated with the session or an error, which is notified after the session + // claim has been created or an error has been encountered, respectively. + eitherClaimedSessionsObs := channel.Map( + ctx, sessionsWithOpenClaimWindowObs, + rs.newMapClaimSessionFn(failedCreateClaimSessionsPublishCh), + ) + + // TODO_TECHDEBT: pass failed create claim sessions to some retry mechanism. + _ = failedCreateClaimSessionsObs + logging.LogErrors(ctx, filter.EitherError(ctx, eitherClaimedSessionsObs)) + + // Map eitherClaimedSessions to a new observable of relayer.SessionTree which + // is notified when the corresponding claim creation succeeded. + return filter.EitherSuccess(ctx, eitherClaimedSessionsObs) +} + +// mapWaitForEarliestCreateClaimHeight is intended to be used as a MapFn. It +// calculates and waits for the earliest block height, allowed by the protocol, +// at which a claim can be created for the given session, then emits the session +// **at that moment**. +func (rs *relayerSessionsManager) mapWaitForEarliestCreateClaimHeight( + ctx context.Context, + session relayer.SessionTree, +) (_ relayer.SessionTree, skip bool) { + rs.waitForEarliestCreateClaimHeight( + ctx, session.GetSessionHeader().GetSessionEndBlockHeight(), + ) + return session, false +} + +// waitForEarliestCreateClaimHeight calculates and waits for (blocking until) the +// earliest block height, allowed by the protocol, at which a claim can be created +// for a session with the given sessionEndHeight. It is calculated relative to +// sessionEndHeight using on-chain governance parameters and randomized input. +// It IS A BLOCKING function. +func (rs *relayerSessionsManager) waitForEarliestCreateClaimHeight( + ctx context.Context, + sessionEndHeight int64, +) { + // TODO_TECHDEBT: refactor this logic to a shared package. + + createClaimWindowStartHeight := sessionEndHeight + // TODO_TECHDEBT: query the on-chain governance parameter once available. + // + claimproofparams.GovCreateClaimWindowStartHeightOffset + + // we wait for createClaimWindowStartHeight to be received before proceeding since we need its hash + // to know where this servicer's claim submission window starts. + log.Printf("waiting & blocking for global earliest claim submission createClaimWindowStartBlock height: %d", createClaimWindowStartHeight) + createClaimWindowStartBlock := rs.waitForBlock(ctx, createClaimWindowStartHeight) + + log.Printf("received earliest claim submission createClaimWindowStartBlock height: %d, use its hash to have a random submission for the servicer", createClaimWindowStartBlock.Height()) + + earliestCreateClaimHeight := + protocol.GetEarliestCreateClaimHeight(createClaimWindowStartBlock) + + log.Printf("earliest claim submission createClaimWindowStartBlock height for this supplier: %d", earliestCreateClaimHeight) + _ = rs.waitForBlock(ctx, earliestCreateClaimHeight) +} + +// newMapClaimSessionFn returns a new MapFn that creates a claim for the given +// session. Any session which encouters an error while creating a claim is sent +// on the failedCreateClaimSessions channel. +func (rs *relayerSessionsManager) newMapClaimSessionFn( + failedCreateClaimSessionsPublishCh chan<- relayer.SessionTree, +) channel.MapFn[relayer.SessionTree, either.SessionTree] { + return func( + ctx context.Context, + session relayer.SessionTree, + ) (_ either.SessionTree, skip bool) { + // this session should no longer be updated + claimRoot, err := session.Flush() + if err != nil { + return either.Error[relayer.SessionTree](err), false + } + + sessionHeader := session.GetSessionHeader() + if err := rs.supplierClient.CreateClaim(ctx, *sessionHeader, claimRoot); err != nil { + failedCreateClaimSessionsPublishCh <- session + return either.Error[relayer.SessionTree](err), false + } + + return either.Success(session), false + } +} diff --git a/pkg/relayer/session/proof.go b/pkg/relayer/session/proof.go new file mode 100644 index 000000000..4a7c415aa --- /dev/null +++ b/pkg/relayer/session/proof.go @@ -0,0 +1,116 @@ +package session + +import ( + "context" + "log" + + "github.com/pokt-network/poktroll/pkg/either" + "github.com/pokt-network/poktroll/pkg/observable" + "github.com/pokt-network/poktroll/pkg/observable/channel" + "github.com/pokt-network/poktroll/pkg/observable/filter" + "github.com/pokt-network/poktroll/pkg/observable/logging" + "github.com/pokt-network/poktroll/pkg/relayer" + "github.com/pokt-network/poktroll/pkg/relayer/protocol" +) + +// submitProofs maps over the given claimedSessions observable. +// For each session, it: +// 1. Calculates the earliest block height at which to submit a proof +// 2. Waits for said height and submits the proof on-chain +// 3. Maps errors to a new observable and logs them +// It DOES NOT BLOCKas map operations run in their own goroutines. +func (rs *relayerSessionsManager) submitProofs( + ctx context.Context, + claimedSessionsObs observable.Observable[relayer.SessionTree], +) { + // Map claimedSessionsObs to a new observable of the same type which is notified + // when the session is eligible to be proven. + sessionsWithOpenProofWindowObs := channel.Map( + ctx, claimedSessionsObs, + rs.mapWaitForEarliestSubmitProofHeight, + ) + + failedSubmitProofSessionsObs, failedSubmitProofSessionsPublishCh := + channel.NewObservable[relayer.SessionTree]() + + // Map sessionsWithOpenProofWindow to a new observable of an either type, + // populated with the session or an error, which is notified after the session + // proof has been submitted or an error has been encountered, respectively. + eitherProvenSessionsObs := channel.Map( + ctx, sessionsWithOpenProofWindowObs, + rs.newMapProveSessionFn(failedSubmitProofSessionsPublishCh), + ) + + // TODO_TECHDEBT: pass failed submit proof sessions to some retry mechanism. + _ = failedSubmitProofSessionsObs + logging.LogErrors(ctx, filter.EitherError(ctx, eitherProvenSessionsObs)) +} + +// mapWaitForEarliestSubmitProofHeight is intended to be used as a MapFn. It +// calculates and waits for the earliest block height, allowed by the protocol, +// at which a proof can be submitted for the given session, then emits the session +// **at that moment**. +func (rs *relayerSessionsManager) mapWaitForEarliestSubmitProofHeight( + ctx context.Context, + session relayer.SessionTree, +) (_ relayer.SessionTree, skip bool) { + rs.waitForEarliestSubmitProofHeight( + ctx, session.GetSessionHeader().GetSessionEndBlockHeight(), + ) + return session, false +} + +// waitForEarliestSubmitProofHeight calculates and waits for (blocking until) the +// earliest block height, allowed by the protocol, at which a proof can be submitted +// for a session which was claimed at createClaimHeight. It is calculated relative +// to createClaimHeight using on-chain governance parameters and randomized input. +// It IS A BLOCKING function. +func (rs *relayerSessionsManager) waitForEarliestSubmitProofHeight( + ctx context.Context, + createClaimHeight int64, +) { + submitProofWindowStartHeight := createClaimHeight + // TODO_TECHDEBT: query the on-chain governance parameter once available. + // + claimproofparams.GovSubmitProofWindowStartHeightOffset + + // we wait for submitProofWindowStartHeight to be received before proceeding since we need its hash + log.Printf("waiting and blocking for global earliest proof submission submitProofWindowStartBlock height: %d", submitProofWindowStartHeight) + submitProofWindowStartBlock := rs.waitForBlock(ctx, submitProofWindowStartHeight) + + earliestSubmitProofHeight := protocol.GetEarliestSubmitProofHeight(submitProofWindowStartBlock) + _ = rs.waitForBlock(ctx, earliestSubmitProofHeight) +} + +// newMapProveSessionFn returns a new MapFn that submits a proof for the given +// session. Any session which encouters errors while submitting a proof is sent +// on the failedSubmitProofSessions channel. +func (rs *relayerSessionsManager) newMapProveSessionFn( + failedSubmitProofSessionsCh chan<- relayer.SessionTree, +) channel.MapFn[relayer.SessionTree, either.SessionTree] { + return func( + ctx context.Context, + session relayer.SessionTree, + ) (_ either.SessionTree, skip bool) { + // TODO_BLOCKER: The block that'll be used as a source of entropy for which + // branch(es) to prove should be deterministic and use on-chain governance params + // rather than latest. + latestBlock := rs.blockClient.LatestBlock(ctx) + proof, err := session.ProveClosest(latestBlock.Hash()) + if err != nil { + return either.Error[relayer.SessionTree](err), false + } + + log.Printf("currentBlock: %d, submitting proof", latestBlock.Height()+1) + // SubmitProof ensures on-chain proof inclusion so we can safely prune the tree. + if err := rs.supplierClient.SubmitProof( + ctx, + *session.GetSessionHeader(), + proof, + ); err != nil { + failedSubmitProofSessionsCh <- session + return either.Error[relayer.SessionTree](err), false + } + + return either.Success(session), false + } +} diff --git a/pkg/relayer/session/session.go b/pkg/relayer/session/session.go index 7a45880e1..4cf8b0d2c 100644 --- a/pkg/relayer/session/session.go +++ b/pkg/relayer/session/session.go @@ -6,9 +6,11 @@ import ( "sync" "cosmossdk.io/depinject" + "github.com/pokt-network/poktroll/pkg/client" "github.com/pokt-network/poktroll/pkg/observable" "github.com/pokt-network/poktroll/pkg/observable/channel" + "github.com/pokt-network/poktroll/pkg/observable/logging" "github.com/pokt-network/poktroll/pkg/relayer" sessiontypes "github.com/pokt-network/poktroll/x/session/types" ) @@ -20,8 +22,10 @@ type sessionsTreesMap = map[int64]map[string]relayer.SessionTree // relayerSessionsManager is an implementation of the RelayerSessions interface. // TODO_TEST: Add tests to the relayerSessionsManager. type relayerSessionsManager struct { - // sessionsToClaim notifies about sessions that are ready to be claimed. - sessionsToClaim observable.Observable[relayer.SessionTree] + relayObs observable.Observable[*relayer.MinedRelay] + + // sessionsToClaimObs notifies about sessions that are ready to be claimed. + sessionsToClaimObs observable.Observable[relayer.SessionTree] // sessionTrees is a map of block heights pointing to a map of SessionTrees // indexed by their sessionId. @@ -33,6 +37,9 @@ type relayerSessionsManager struct { // blockClient is used to get the notifications of committed blocks. blockClient client.BlockClient + // supplierClient is used to create claims and submit proofs for sessions. + supplierClient client.SupplierClient + // storesDirectory points to a path on disk where KVStore data files are created. storesDirectory string } @@ -50,6 +57,7 @@ func NewRelayerSessions( if err := depinject.Inject( deps, &rs.blockClient, + &rs.supplierClient, ); err != nil { return nil, err } @@ -62,7 +70,7 @@ func NewRelayerSessions( return nil, err } - rs.sessionsToClaim = channel.MapExpand[client.Block, relayer.SessionTree]( + rs.sessionsToClaimObs = channel.MapExpand[client.Block, relayer.SessionTree]( ctx, rs.blockClient.CommittedBlocksSequence(ctx), rs.mapBlockToSessionsToClaim, @@ -71,14 +79,40 @@ func NewRelayerSessions( return rs, nil } +// Start iterates over the session trees at the end of each, respective, session. +// The session trees are piped through a series of map operations which progress +// them through the claim/proof lifecycle, broadcasting transactions to the +// network as necessary. +func (rs *relayerSessionsManager) Start(ctx context.Context) { + // Map eitherMinedRelays to a new observable of an error type which is + // notified if an error was encountered while attampting to add the relay to + // the session tree. + miningErrorsObs := channel.Map(ctx, rs.relayObs, rs.mapAddRelayToSessionTree) + logging.LogErrors(ctx, miningErrorsObs) + + // Start claim/proof pipeline. + claimedSessionsObs := rs.createClaims(ctx) + rs.submitProofs(ctx, claimedSessionsObs) +} + +// Stop unsubscribes all observables from the InsertRelays observable which +// will close downstream observables as they drain. +// +// TODO_TECHDEBT: Either add a mechanism to wait for draining to complete +// and/or ensure that the state at each pipeline stage is persisted to disk +// and exit as early as possible. +func (rs *relayerSessionsManager) Stop() { + rs.relayObs.UnsubscribeAll() +} + // SessionsToClaim returns an observable that notifies when sessions are ready to be claimed. -func (rs *relayerSessionsManager) SessionsToClaim() observable.Observable[relayer.SessionTree] { - return rs.sessionsToClaim +func (rs *relayerSessionsManager) InsertRelays(relays observable.Observable[*relayer.MinedRelay]) { + rs.relayObs = relays } -// EnsureSessionTree returns the SessionTree for a given session. +// ensureSessionTree returns the SessionTree for a given session. // If no tree for the session exists, a new SessionTree is created before returning. -func (rs *relayerSessionsManager) EnsureSessionTree(sessionHeader *sessiontypes.SessionHeader) (relayer.SessionTree, error) { +func (rs *relayerSessionsManager) ensureSessionTree(sessionHeader *sessiontypes.SessionHeader) (relayer.SessionTree, error) { rs.sessionsTreesMu.Lock() defer rs.sessionsTreesMu.Unlock() @@ -157,3 +191,42 @@ func (rp *relayerSessionsManager) validateConfig() error { return nil } + +// waitForBlock blocks until the block at the given height (or greater) is +// observed as having been committed. +func (rs *relayerSessionsManager) waitForBlock(ctx context.Context, height int64) client.Block { + subscription := rs.blockClient.CommittedBlocksSequence(ctx).Subscribe(ctx) + defer subscription.Unsubscribe() + + for block := range subscription.Ch() { + if block.Height() >= height { + return block + } + } + + return nil +} + +// mapAddRelayToSessionTree is intended to be used as a MapFn. It adds the relay +// to the session tree. If it encounters an error, it returns the error. Otherwise, +// it skips output (only outputs errors). +func (rs *relayerSessionsManager) mapAddRelayToSessionTree( + _ context.Context, + relay *relayer.MinedRelay, +) (_ error, skip bool) { + // ensure the session tree exists for this relay + sessionHeader := relay.GetReq().GetMeta().GetSessionHeader() + smst, err := rs.ensureSessionTree(sessionHeader) + if err != nil { + log.Printf("failed to ensure session tree: %s\n", err) + return err, false + } + + if err := smst.Update(relay.Hash, relay.Bytes, 1); err != nil { + log.Printf("failed to update smt: %s\n", err) + return err, false + } + + // Skip because this map function only outputs errors. + return nil, true +} diff --git a/pkg/relayer/types.go b/pkg/relayer/types.go new file mode 100644 index 000000000..1216dd25e --- /dev/null +++ b/pkg/relayer/types.go @@ -0,0 +1,10 @@ +package relayer + +import "github.com/pokt-network/poktroll/x/service/types" + +// MinedRelay is a wrapper around a relay that has been serialized and hashed. +type MinedRelay struct { + types.Relay + Bytes []byte + Hash []byte +}