diff --git a/pkg/relayer/interface.go b/pkg/relayer/interface.go index 134ac8f53..539361f2d 100644 --- a/pkg/relayer/interface.go +++ b/pkg/relayer/interface.go @@ -7,7 +7,6 @@ import ( "github.com/pokt-network/poktroll/pkg/observable" "github.com/pokt-network/poktroll/x/service/types" - servicetypes "github.com/pokt-network/poktroll/x/service/types" sessiontypes "github.com/pokt-network/poktroll/x/session/types" sharedtypes "github.com/pokt-network/poktroll/x/shared/types" ) @@ -18,7 +17,7 @@ import ( type Miner interface { MinedRelays( ctx context.Context, - servedRelayObs observable.Observable[*servicetypes.Relay], + servedRelayObs observable.Observable[*types.Relay], ) (minedRelaysObs observable.Observable[*MinedRelay]) } diff --git a/pkg/relayer/proxy/proxy.go b/pkg/relayer/proxy/proxy.go index ea73031c8..58b9549fd 100644 --- a/pkg/relayer/proxy/proxy.go +++ b/pkg/relayer/proxy/proxy.go @@ -113,7 +113,7 @@ func NewRelayerProxy( } // Start concurrently starts all advertised relay servers and returns an error if any of them fails to start. -// This method is blocking until all RelayServers are started. +// This method is blocking as long as all RelayServers are running. func (rp *relayerProxy) Start(ctx context.Context) error { // The provided services map is built from the supplier's on-chain advertised information, // which is a runtime parameter that can be changed by the supplier. diff --git a/pkg/relayer/relayminer.go b/pkg/relayer/relayminer.go new file mode 100644 index 000000000..46ac9d5d8 --- /dev/null +++ b/pkg/relayer/relayminer.go @@ -0,0 +1,65 @@ +package relayer + +import ( + "context" + "log" + + "cosmossdk.io/depinject" +) + +// relayMiner is the main struct that encapsulates the relayer's responsibilities (i.e. Relay Mining). +// It starts and stops the RelayerProxy and provide the served relays observable to the miner. +type relayMiner struct { + relayerProxy RelayerProxy + miner Miner + relayerSessionsManager RelayerSessionsManager +} + +// NewRelayMiner creates a new Relayer instance with the given dependencies. +// It injects the dependencies into the Relayer instance and returns it. +func NewRelayMiner(ctx context.Context, deps depinject.Config) (*relayMiner, error) { + rel := &relayMiner{} + + if err := depinject.Inject( + deps, + &rel.relayerProxy, + &rel.miner, + &rel.relayerSessionsManager, + ); err != nil { + return nil, err + } + + // Set up relay pipeline + servedRelaysObs := rel.relayerProxy.ServedRelays() + minedRelaysObs := rel.miner.MinedRelays(ctx, servedRelaysObs) + rel.relayerSessionsManager.InsertRelays(minedRelaysObs) + + return rel, nil +} + +// Start provides the miner with the served relays observable and starts the relayer proxy. +// This method is blocking while the relayer proxy is running and returns when Stop is called +// or when the relayer proxy fails to start. +func (rel *relayMiner) Start(ctx context.Context) error { + // relayerSessionsManager.Start does not block. + // Set up the session (proof/claim) lifecycle pipeline. + log.Println("INFO: Starting relayer sessions manager...") + rel.relayerSessionsManager.Start(ctx) + + // Start the flow of relays by starting relayer proxy. + // This is a blocking call as it waits for the waitgroup in relayerProxy.Start() + // that starts all the relay servers to be done. + log.Println("INFO: Starting relayer proxy...") + if err := rel.relayerProxy.Start(ctx); err != nil { + return err + } + + log.Println("INFO: Relayer proxy stopped; exiting") + return nil +} + +// Stop stops the relayer proxy which in turn stops all advertised relay servers +// and unsubscribes the miner from the served relays observable. +func (rel *relayMiner) Stop(ctx context.Context) error { + return rel.relayerProxy.Stop(ctx) +} diff --git a/pkg/relayer/session/session.go b/pkg/relayer/session/session.go index 4cf8b0d2c..4a444dfd0 100644 --- a/pkg/relayer/session/session.go +++ b/pkg/relayer/session/session.go @@ -85,7 +85,7 @@ func NewRelayerSessions( // network as necessary. func (rs *relayerSessionsManager) Start(ctx context.Context) { // Map eitherMinedRelays to a new observable of an error type which is - // notified if an error was encountered while attampting to add the relay to + // notified if an error was encountered while attempting to add the relay to // the session tree. miningErrorsObs := channel.Map(ctx, rs.relayObs, rs.mapAddRelayToSessionTree) logging.LogErrors(ctx, miningErrorsObs)