Skip to content

Commit

Permalink
[Relayer] feat: Add Relayer struct (#172)
Browse files Browse the repository at this point in the history
* refactor: `MapFn`s receive context arg

* chore: add `ForEach` map shorthand operator

* chore: add `/pkg/observable/filter`

* chore: add `/pkg/observable/logging`

* chore: add `/pkg/relayer/protocol`

* chore: add `Miner` interface

* feat: add `Miner` implementation

* test: `Miner` implementation

* chore: fix comment

* chore: add godoc comments

* feat: Add Relayer struct

* chore: Rename to RelayMiner

* chore: Rename relay miner file

* chore: Remove unused
RelayerOption parameter

* [Test] First step for automated E2E Relay test (#167)

- Fixed helpers for localnet regenesis
- Added an application & supplier to the genesis file
- Initializing appMap & supplierMap in E2E tests
- Add support for the app's codec (for unmarshaling responses) in E2E tests
- Adding a placeholder for `e2e/tests/relay.feature`

---

Co-authored-by: harry <[email protected]>

* [Relayer] refactor: simplify `RelayerSessionsManager`  (#169)

* refactor: `MapFn`s receive context arg

* feat: add `MapExpand` observable operator

* refactor: `RelayerSessionsManager` to be more reactive

* chore: add godoc comment

* chore: review feedback improvements

* trigger CI

* chore: review feedback improvements

Co-authored-by: Daniel Olshansky <[email protected]>

* chore: review feedback improvements

* chore: update start mining comment

* fix: Update Miner interface

* fix: import cycle & goimports

* chore: review feedback improvements

* chore: cleanup TODO_THIS_COMMIT comments

* chore: improve var & func names for clarity and consistency

* refactor: move claim/proof lifecycle concerns to `relayerSessionsManager`.

* chore: review feedback improvements

* chore: review feedback improvements

* refactor: `miner#hash()` method

* chore: tidy up

* chore: simplify

* chore: review feedback improvements

Co-authored-by: Daniel Olshansky <[email protected]>

* chore: review feedback improvements

Co-authored-by: Daniel Olshansky <[email protected]>

* chore: review feedback improvements

Co-authored-by: Daniel Olshansky <[email protected]>

* chore: review feedback improvements

* chore: review feedback improvements

* fix: incomplete refactor

* chore: simplify

* chore: Reflect responsibility changes of session manager

* chore: Improve comments about waitgroup

---------

Co-authored-by: Bryan White <[email protected]>
Co-authored-by: Daniel Olshansky <[email protected]>
Co-authored-by: harry <[email protected]>
  • Loading branch information
4 people authored Nov 10, 2023
1 parent 5af2ae9 commit 2673bb2
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 4 deletions.
3 changes: 1 addition & 2 deletions pkg/relayer/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/pokt-network/poktroll/pkg/observable"
"github.com/pokt-network/poktroll/x/service/types"
servicetypes "github.com/pokt-network/poktroll/x/service/types"
sessiontypes "github.com/pokt-network/poktroll/x/session/types"
sharedtypes "github.com/pokt-network/poktroll/x/shared/types"
)
Expand All @@ -18,7 +17,7 @@ import (
type Miner interface {
MinedRelays(
ctx context.Context,
servedRelayObs observable.Observable[*servicetypes.Relay],
servedRelayObs observable.Observable[*types.Relay],
) (minedRelaysObs observable.Observable[*MinedRelay])
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/relayer/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func NewRelayerProxy(
}

// Start concurrently starts all advertised relay servers and returns an error if any of them fails to start.
// This method is blocking until all RelayServers are started.
// This method is blocking as long as all RelayServers are running.
func (rp *relayerProxy) Start(ctx context.Context) error {
// The provided services map is built from the supplier's on-chain advertised information,
// which is a runtime parameter that can be changed by the supplier.
Expand Down
65 changes: 65 additions & 0 deletions pkg/relayer/relayminer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package relayer

import (
"context"
"log"

"cosmossdk.io/depinject"
)

// relayMiner is the main struct that encapsulates the relayer's responsibilities (i.e. Relay Mining).
// It starts and stops the RelayerProxy and provide the served relays observable to the miner.
type relayMiner struct {
relayerProxy RelayerProxy
miner Miner
relayerSessionsManager RelayerSessionsManager
}

// NewRelayMiner creates a new Relayer instance with the given dependencies.
// It injects the dependencies into the Relayer instance and returns it.
func NewRelayMiner(ctx context.Context, deps depinject.Config) (*relayMiner, error) {
rel := &relayMiner{}

if err := depinject.Inject(
deps,
&rel.relayerProxy,
&rel.miner,
&rel.relayerSessionsManager,
); err != nil {
return nil, err
}

// Set up relay pipeline
servedRelaysObs := rel.relayerProxy.ServedRelays()
minedRelaysObs := rel.miner.MinedRelays(ctx, servedRelaysObs)
rel.relayerSessionsManager.InsertRelays(minedRelaysObs)

return rel, nil
}

// Start provides the miner with the served relays observable and starts the relayer proxy.
// This method is blocking while the relayer proxy is running and returns when Stop is called
// or when the relayer proxy fails to start.
func (rel *relayMiner) Start(ctx context.Context) error {
// relayerSessionsManager.Start does not block.
// Set up the session (proof/claim) lifecycle pipeline.
log.Println("INFO: Starting relayer sessions manager...")
rel.relayerSessionsManager.Start(ctx)

// Start the flow of relays by starting relayer proxy.
// This is a blocking call as it waits for the waitgroup in relayerProxy.Start()
// that starts all the relay servers to be done.
log.Println("INFO: Starting relayer proxy...")
if err := rel.relayerProxy.Start(ctx); err != nil {
return err
}

log.Println("INFO: Relayer proxy stopped; exiting")
return nil
}

// Stop stops the relayer proxy which in turn stops all advertised relay servers
// and unsubscribes the miner from the served relays observable.
func (rel *relayMiner) Stop(ctx context.Context) error {
return rel.relayerProxy.Stop(ctx)
}
2 changes: 1 addition & 1 deletion pkg/relayer/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func NewRelayerSessions(
// network as necessary.
func (rs *relayerSessionsManager) Start(ctx context.Context) {
// Map eitherMinedRelays to a new observable of an error type which is
// notified if an error was encountered while attampting to add the relay to
// notified if an error was encountered while attempting to add the relay to
// the session tree.
miningErrorsObs := channel.Map(ctx, rs.relayObs, rs.mapAddRelayToSessionTree)
logging.LogErrors(ctx, miningErrorsObs)
Expand Down

0 comments on commit 2673bb2

Please sign in to comment.