Skip to content

Commit

Permalink
[Miner] feat: add Miner component (#168)
Browse files Browse the repository at this point in the history
* refactor: `MapFn`s receive context arg

* chore: add `ForEach` map shorthand operator

* chore: add `/pkg/observable/filter`

* chore: add `/pkg/observable/logging`

* chore: add `/pkg/relayer/protocol`

* chore: add `Miner` interface

* feat: add `Miner` implementation

* test: `Miner` implementation

* chore: fix comment

* chore: add godoc comments

* [Test] First step for automated E2E Relay test (#167)

- Fixed helpers for localnet regenesis
- Added an application & supplier to the genesis file
- Initializing appMap & supplierMap in E2E tests
- Add support for the app's codec (for unmarshaling responses) in E2E tests
- Adding a placeholder for `e2e/tests/relay.feature`

---

Co-authored-by: harry <[email protected]>

* [Relayer] refactor: simplify `RelayerSessionsManager`  (#169)

* refactor: `MapFn`s receive context arg

* feat: add `MapExpand` observable operator

* refactor: `RelayerSessionsManager` to be more reactive

* chore: add godoc comment

* chore: review feedback improvements

* trigger CI

* chore: review feedback improvements

Co-authored-by: Daniel Olshansky <[email protected]>

* chore: review feedback improvements

* fix: import cycle & goimports

* chore: review feedback improvements

* chore: cleanup TODO_THIS_COMMIT comments

* chore: improve var & func names for clarity and consistency

* refactor: move claim/proof lifecycle concerns to `relayerSessionsManager`.

* chore: review feedback improvements

* chore: review feedback improvements

* refactor: `miner#hash()` method

* chore: tidy up

* chore: simplify

* chore: review feedback improvements

Co-authored-by: Daniel Olshansky <[email protected]>

* chore: review feedback improvements

Co-authored-by: Daniel Olshansky <[email protected]>

* chore: review feedback improvements

Co-authored-by: Daniel Olshansky <[email protected]>

* chore: review feedback improvements

* chore: review feedback improvements

* fix: incomplete refactor

* chore: simplify

---------

Co-authored-by: Daniel Olshansky <[email protected]>
Co-authored-by: harry <[email protected]>
  • Loading branch information
3 people authored Nov 10, 2023
1 parent e49434e commit 5af2ae9
Show file tree
Hide file tree
Showing 11 changed files with 565 additions and 21 deletions.
7 changes: 5 additions & 2 deletions pkg/either/types.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package either

import "github.com/pokt-network/poktroll/pkg/relayer"

type (
// AsyncError represents a value which could either be a synchronous error or
// an asynchronous error (sent through a channel). It wraps the more generic
// `Either` type specific for error channels.
AsyncError Either[chan error]
Bytes = Either[[]byte]
AsyncError Either[chan error]
Bytes = Either[[]byte]
SessionTree = Either[relayer.SessionTree]
)
5 changes: 5 additions & 0 deletions pkg/observable/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package observable

type (
Error = Observable[error]
)
50 changes: 38 additions & 12 deletions pkg/relayer/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,23 @@ import (

"github.com/pokt-network/poktroll/pkg/observable"
"github.com/pokt-network/poktroll/x/service/types"
servicetypes "github.com/pokt-network/poktroll/x/service/types"
sessiontypes "github.com/pokt-network/poktroll/x/session/types"
sharedtypes "github.com/pokt-network/poktroll/x/shared/types"
)

// Miner is responsible for observing servedRelayObs, hashing and checking the
// difficulty of each, finally publishing those with sufficient difficulty to
// minedRelayObs as they are applicable for relay volume.
type Miner interface {
MinedRelays(
ctx context.Context,
servedRelayObs observable.Observable[*servicetypes.Relay],
) (minedRelaysObs observable.Observable[*MinedRelay])
}

type MinerOption func(Miner)

// RelayerProxy is the interface for the proxy that serves relays to the application.
// It is responsible for starting and stopping all supported RelayServers.
// While handling requests and responding in a closed loop, it also notifies
Expand Down Expand Up @@ -59,19 +72,32 @@ type RelayServer interface {
Service() *sharedtypes.Service
}

// RelayerSessionsManager is an interface for managing the relayer's sessions and Sparse
// Merkle Sum Trees (SMSTs). It provides notifications about closing sessions that are
// ready to be claimed, and handles the creation and retrieval of SMSTs for a given session.
// It also handles the creation and retrieval of SMSTs for a given session.
// RelayerSessionsManager is responsible for managing the relayer's session lifecycles.
// It handles the creation and retrieval of SMSTs (trees) for a given session, as
// well as the respective and subsequent claim creation and proof submission.
// This is largely accomplished by pipelining observables of relays and sessions
// through a series of map operations.
//
// TODO_TECHDEBT: add architecture diagrams covering observable flows throughout
// the relayer package.
type RelayerSessionsManager interface {
// SessionsToClaim returns an observable that notifies of sessions ready to be claimed.
SessionsToClaim() observable.Observable[SessionTree]

// EnsureSessionTree returns the SMST (Sparse Merkle State Tree) for a given session header.
// It is used to retrieve the SMST and update it when a Relay has been successfully served.
// If the session is seen for the first time, it creates a new SMST for it before returning it.
// An error is returned if the corresponding KVStore for SMST fails to be created.
EnsureSessionTree(sessionHeader *sessiontypes.SessionHeader) (SessionTree, error)
// InsertRelays receives an observable of relays that should be included
// in their respective session's SMST (tree).
InsertRelays(minedRelaysObs observable.Observable[*MinedRelay])

// Start iterates over the session trees at the end of each, respective, session.
// The session trees are piped through a series of map operations which progress
// them through the claim/proof lifecycle, broadcasting transactions to the
// network as necessary.
Start(ctx context.Context)

// Stop unsubscribes all observables from the InsertRelays observable which
// will close downstream observables as they drain.
//
// TODO_TECHDEBT: Either add a mechanism to wait for draining to complete
// and/or ensure that the state at each pipeline stage is persisted to disk
// and exit as early as possible.
Stop()
}

type RelayerSessionsManagerOption func(RelayerSessionsManager)
Expand Down
122 changes: 122 additions & 0 deletions pkg/relayer/miner/miner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package miner

import (
"context"
"crypto/sha256"
"hash"

"github.com/pokt-network/poktroll/pkg/either"
"github.com/pokt-network/poktroll/pkg/observable"
"github.com/pokt-network/poktroll/pkg/observable/channel"
"github.com/pokt-network/poktroll/pkg/observable/filter"
"github.com/pokt-network/poktroll/pkg/observable/logging"
"github.com/pokt-network/poktroll/pkg/relayer"
"github.com/pokt-network/poktroll/pkg/relayer/protocol"
servicetypes "github.com/pokt-network/poktroll/x/service/types"
)

var (
_ relayer.Miner = (*miner)(nil)
defaultRelayHasher = sha256.New
// TODO_BLOCKER: query on-chain governance params once available.
// Setting this to 0 to effectively disables mining for now.
// I.e., all relays are added to the tree.
defaultRelayDifficulty = 0
)

// Miner is responsible for observing servedRelayObs, hashing and checking the
// difficulty of each, finally publishing those with sufficient difficulty to
// minedRelayObs as they are applicable for relay volume.
//
// TODO_BLOCKER: The relay hashing and relay difficulty mechanisms & values must come
type miner struct {
// relayHasher is a function which returns a hash.Hash interfact type. It is
// used to hash serialized relays to measure their mining difficulty.
relayHasher func() hash.Hash
// relayDifficulty is the minimum difficulty that a relay must have to be
// volume / reward applicable.
relayDifficulty int
}

// NewMiner creates a new miner from the given dependencies and options. It
// returns an error if it has not been sufficiently configured or supplied.
func NewMiner(
opts ...relayer.MinerOption,
) (*miner, error) {
mnr := &miner{}

for _, opt := range opts {
opt(mnr)
}

mnr.setDefaults()

return mnr, nil
}

// MinedRelays maps servedRelaysObs through a pipeline which:
// 1. Hashes the relay
// 2. Checks if it's above the mining difficulty
// 3. Adds it to the session tree if so
// It DOES NOT BLOCK as map operations run in their own goroutines.
func (mnr *miner) MinedRelays(
ctx context.Context,
servedRelaysObs observable.Observable[*servicetypes.Relay],
) observable.Observable[*relayer.MinedRelay] {
// Map servedRelaysObs to a new observable of an either type, populated with
// the minedRelay or an error. It is notified after the relay has been mined
// or an error has been encountered, respectively.
eitherMinedRelaysObs := channel.Map(ctx, servedRelaysObs, mnr.mapMineRelay)
logging.LogErrors(ctx, filter.EitherError(ctx, eitherMinedRelaysObs))

return filter.EitherSuccess(ctx, eitherMinedRelaysObs)
}

// setDefaults ensures that the miner has been configured with a hasherConstructor and uses
// the default hasherConstructor if not.
func (mnr *miner) setDefaults() {
if mnr.relayHasher == nil {
mnr.relayHasher = defaultRelayHasher
}
}

// mapMineRelay is intended to be used as a MapFn.
// 1. It hashes the relay and compares its difficult to the minimum threshold.
// 2. If the relay difficulty is sufficient -> return an Either[MineRelay Value]
// 3. If an error is encountered -> return an Either[error]
// 4. Otherwise, skip the relay.
func (mnr *miner) mapMineRelay(
_ context.Context,
relay *servicetypes.Relay,
) (_ either.Either[*relayer.MinedRelay], skip bool) {
relayBz, err := relay.Marshal()
if err != nil {
return either.Error[*relayer.MinedRelay](err), false
}

// TODO_BLOCKER: Centralize the logic of hashing a relay. It should live
// alongside signing & verification.
//
// TODO_IMPROVE: We need to hash the key; it would be nice if smst.Update() could do it
// since smst has a reference to the hasherConstructor
relayHash := mnr.hash(relayBz)

// The relay IS NOT volume / reward applicable
if !protocol.BytesDifficultyGreaterThan(relayHash, defaultRelayDifficulty) {
return either.Success[*relayer.MinedRelay](nil), true
}

// The relay IS volume / reward applicable
return either.Success(&relayer.MinedRelay{
Relay: *relay,
Bytes: relayBz,
Hash: relayHash,
}), false
}

// hash constructs a new hasher and hashes the given input bytes.
func (mnr *miner) hash(inputBz []byte) []byte {
hasher := mnr.relayHasher()
hasher.Write(inputBz)
return hasher.Sum(nil)
}
10 changes: 10 additions & 0 deletions pkg/relayer/miner/miner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package miner_test

import (
"testing"
)

// TODO_TECHDEBT(@bryanchriswhite): add all the test coverage...
func TestNewMiner(t *testing.T) {
t.Skip("TODO_TECHDEBT(@bryanchriswhite): add all the test coverage...")
}
45 changes: 45 additions & 0 deletions pkg/relayer/protocol/block_heights.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package protocol

import (
"encoding/binary"
"log"
"math/rand"

"github.com/pokt-network/poktroll/pkg/client"
)

// GetEarliestCreateClaimHeight returns the earliest block height at which a claim
// for a session with the given createClaimWindowStartHeight can be created.
//
// TODO_TEST(@bryanchriswhite): Add test coverage and more logs
func GetEarliestCreateClaimHeight(createClaimWindowStartBlock client.Block) int64 {
createClaimWindowStartBlockHash := createClaimWindowStartBlock.Hash()
log.Printf("using createClaimWindowStartBlock %d's hash %x as randomness", createClaimWindowStartBlock.Height(), createClaimWindowStartBlockHash)
rngSeed, _ := binary.Varint(createClaimWindowStartBlockHash)
randomNumber := rand.NewSource(rngSeed).Int63()

// TODO_TECHDEBT: query the on-chain governance parameter once available.
// randCreateClaimHeightOffset := randomNumber % (claimproofparams.GovCreateClaimIntervalBlocks - claimproofparams.GovCreateClaimWindowBlocks - 1)
_ = randomNumber
randCreateClaimHeightOffset := int64(0)

return createClaimWindowStartBlock.Height() + randCreateClaimHeightOffset
}

// GetEarliestSubmitProofHeight returns the earliest block height at which a proof
// for a session with the given submitProofWindowStartHeight can be submitted.
//
// TODO_TEST(@bryanchriswhite): Add test coverage and more logs
func GetEarliestSubmitProofHeight(submitProofWindowStartBlock client.Block) int64 {
earliestSubmitProofBlockHash := submitProofWindowStartBlock.Hash()
log.Printf("using submitProofWindowStartBlock %d's hash %x as randomness", submitProofWindowStartBlock.Height(), earliestSubmitProofBlockHash)
rngSeed, _ := binary.Varint(earliestSubmitProofBlockHash)
randomNumber := rand.NewSource(rngSeed).Int63()

// TODO_TECHDEBT: query the on-chain governance parameter once available.
// randSubmitProofHeightOffset := randomNumber % (claimproofparams.GovSubmitProofIntervalBlocks - claimproofparams.GovSubmitProofWindowBlocks - 1)
_ = randomNumber
randSubmitProofHeightOffset := int64(0)

return submitProofWindowStartBlock.Height() + randSubmitProofHeightOffset
}
17 changes: 17 additions & 0 deletions pkg/relayer/protocol/difficulty.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package protocol

import (
"encoding/hex"
"strings"
)

// TODO_BLOCKER: Revisit this part of the algorithm after initial TestNet Launch.
// TODO_TEST: Add extensive tests for the core relay mining business logic.
// BytesDifficultyGreaterThan determines if the bytes exceed a certain difficulty, and it
// is used to determine if a relay is volume applicable. See the spec for more details: https://github.com/pokt-network/pocket-network-protocol
func BytesDifficultyGreaterThan(bz []byte, compDifficultyBytes int) bool {
hexZerosPrefix := strings.Repeat("0", compDifficultyBytes*2) // 2 hex chars per byte.
hexBz := hex.EncodeToString(bz)

return strings.HasPrefix(hexBz, hexZerosPrefix)
}
Loading

0 comments on commit 5af2ae9

Please sign in to comment.