diff --git a/cmd/collection/main.go b/cmd/collection/main.go index 1a241ba703b..2319c5141b2 100644 --- a/cmd/collection/main.go +++ b/cmd/collection/main.go @@ -480,7 +480,7 @@ func main() { node.EngineRegistry, node.State, node.Metrics.Engine, - colMetrics, + node.Metrics.Mempool, node.Me, node.Storage.Collections, node.Storage.Transactions, diff --git a/engine/collection/pusher/engine.go b/engine/collection/pusher/engine.go index 317729108dd..c2570fb17c8 100644 --- a/engine/collection/pusher/engine.go +++ b/engine/collection/pusher/engine.go @@ -4,15 +4,19 @@ package pusher import ( + "context" "fmt" "github.com/rs/zerolog" "github.com/onflow/flow-go/engine" + "github.com/onflow/flow-go/engine/common/fifoqueue" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/model/flow/filter" "github.com/onflow/flow-go/model/messages" "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/module/component" + "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/network" "github.com/onflow/flow-go/network/channels" @@ -21,30 +25,62 @@ import ( "github.com/onflow/flow-go/utils/logging" ) -// Engine is the collection pusher engine, which provides access to resources -// held by the collection node. +// Engine is part of the Collection Node. It broadcasts finalized collections +// ("collection guarantees") that the cluster generates to Consensus Nodes +// for inclusion in blocks. type Engine struct { - unit *engine.Unit log zerolog.Logger engMetrics module.EngineMetrics - colMetrics module.CollectionMetrics conduit network.Conduit me module.Local state protocol.State collections storage.Collections transactions storage.Transactions + + notifier engine.Notifier + queue *fifoqueue.FifoQueue + + component.Component + cm *component.ComponentManager } -func New(log zerolog.Logger, net network.EngineRegistry, state protocol.State, engMetrics module.EngineMetrics, colMetrics module.CollectionMetrics, me module.Local, collections storage.Collections, transactions storage.Transactions) (*Engine, error) { +// TODO convert to network.MessageProcessor +var _ network.Engine = (*Engine)(nil) +var _ component.Component = (*Engine)(nil) + +// New creates a new pusher engine. +func New( + log zerolog.Logger, + net network.EngineRegistry, + state protocol.State, + engMetrics module.EngineMetrics, + mempoolMetrics module.MempoolMetrics, + me module.Local, + collections storage.Collections, + transactions storage.Transactions, +) (*Engine, error) { + queue, err := fifoqueue.NewFifoQueue( + 200, // roughly 1 minute of collections, at 3BPS + fifoqueue.WithLengthObserver(func(len int) { + mempoolMetrics.MempoolEntries(metrics.ResourceSubmitCollectionGuaranteesQueue, uint(len)) + }), + ) + if err != nil { + return nil, fmt.Errorf("could not create fifoqueue: %w", err) + } + + notifier := engine.NewNotifier() + e := &Engine{ - unit: engine.NewUnit(), log: log.With().Str("engine", "pusher").Logger(), engMetrics: engMetrics, - colMetrics: colMetrics, me: me, state: state, collections: collections, transactions: transactions, + + notifier: notifier, + queue: queue, } conduit, err := net.Register(channels.PushGuarantees, e) @@ -53,81 +89,108 @@ func New(log zerolog.Logger, net network.EngineRegistry, state protocol.State, e } e.conduit = conduit + e.cm = component.NewComponentManagerBuilder(). + AddWorker(e.outboundQueueWorker). + Build() + e.Component = e.cm + return e, nil } -// Ready returns a ready channel that is closed once the engine has fully -// started. -func (e *Engine) Ready() <-chan struct{} { - return e.unit.Ready() +// outboundQueueWorker implements a component worker which broadcasts collection guarantees, +// enqueued by the Finalizer upon finalization, to Consensus Nodes. +func (e *Engine) outboundQueueWorker(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { + ready() + + done := ctx.Done() + wake := e.notifier.Channel() + for { + select { + case <-done: + return + case <-wake: + err := e.processOutboundMessages(ctx) + if err != nil { + ctx.Throw(err) + } + } + } } -// Done returns a done channel that is closed once the engine has fully stopped. -func (e *Engine) Done() <-chan struct{} { - return e.unit.Done() +// processOutboundMessages processes any available messages from the queue. +// Only returns when the queue is empty (or the engine is terminated). +func (e *Engine) processOutboundMessages(ctx context.Context) error { + for { + nextMessage, ok := e.queue.Pop() + if !ok { + return nil + } + + asSCGMsg, ok := nextMessage.(*messages.SubmitCollectionGuarantee) + if !ok { + return fmt.Errorf("invalid message type in pusher engine queue") + } + + err := e.publishCollectionGuarantee(&asSCGMsg.Guarantee) + if err != nil { + return err + } + + select { + case <-ctx.Done(): + return nil + default: + } + } } // SubmitLocal submits an event originating on the local node. func (e *Engine) SubmitLocal(event interface{}) { - e.unit.Launch(func() { - err := e.process(e.me.NodeID(), event) - if err != nil { - engine.LogError(e.log, err) - } - }) + ev, ok := event.(*messages.SubmitCollectionGuarantee) + if ok { + e.SubmitCollectionGuarantee(ev) + } else { + engine.LogError(e.log, fmt.Errorf("invalid message argument to pusher engine")) + } } // Submit submits the given event from the node with the given origin ID // for processing in a non-blocking manner. It returns instantly and logs // a potential processing error internally when done. func (e *Engine) Submit(channel channels.Channel, originID flow.Identifier, event interface{}) { - e.unit.Launch(func() { - err := e.process(originID, event) - if err != nil { - engine.LogError(e.log, err) - } - }) + engine.LogError(e.log, fmt.Errorf("pusher engine should only receive local messages on the same node")) } // ProcessLocal processes an event originating on the local node. func (e *Engine) ProcessLocal(event interface{}) error { - return e.unit.Do(func() error { - return e.process(e.me.NodeID(), event) - }) + ev, ok := event.(*messages.SubmitCollectionGuarantee) + if ok { + e.SubmitCollectionGuarantee(ev) + return nil + } else { + return fmt.Errorf("invalid message argument to pusher engine") + } } // Process processes the given event from the node with the given origin ID in // a blocking manner. It returns the potential processing error when done. -func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, event interface{}) error { - return e.unit.Do(func() error { - return e.process(originID, event) - }) +func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, message any) error { + return fmt.Errorf("pusher engine should only receive local messages on the same node") } -// process processes events for the pusher engine on the collection node. -func (e *Engine) process(originID flow.Identifier, event interface{}) error { - switch ev := event.(type) { - case *messages.SubmitCollectionGuarantee: - e.engMetrics.MessageReceived(metrics.EngineCollectionProvider, metrics.MessageSubmitGuarantee) - defer e.engMetrics.MessageHandled(metrics.EngineCollectionProvider, metrics.MessageSubmitGuarantee) - return e.onSubmitCollectionGuarantee(originID, ev) - default: - return fmt.Errorf("invalid event type (%T)", event) +// SubmitCollectionGuarantee adds a collection guarantee to the engine's queue +// to later be published to consensus nodes. +func (e *Engine) SubmitCollectionGuarantee(msg *messages.SubmitCollectionGuarantee) { + ok := e.queue.Push(msg) + if !ok { + engine.LogError(e.log, fmt.Errorf("failed to store collection guarantee in queue")) + return } + e.notifier.Notify() } -// onSubmitCollectionGuarantee handles submitting the given collection guarantee -// to consensus nodes. -func (e *Engine) onSubmitCollectionGuarantee(originID flow.Identifier, req *messages.SubmitCollectionGuarantee) error { - if originID != e.me.NodeID() { - return fmt.Errorf("invalid remote request to submit collection guarantee (from=%x)", originID) - } - - return e.SubmitCollectionGuarantee(&req.Guarantee) -} - -// SubmitCollectionGuarantee submits the collection guarantee to all consensus nodes. -func (e *Engine) SubmitCollectionGuarantee(guarantee *flow.CollectionGuarantee) error { +// publishCollectionGuarantee publishes the collection guarantee to all consensus nodes. +func (e *Engine) publishCollectionGuarantee(guarantee *flow.CollectionGuarantee) error { consensusNodes, err := e.state.Final().Identities(filter.HasRole[flow.Identity](flow.RoleConsensus)) if err != nil { return fmt.Errorf("could not get consensus nodes: %w", err) diff --git a/engine/collection/pusher/engine_test.go b/engine/collection/pusher/engine_test.go index fde6d9696dc..9f59f79f666 100644 --- a/engine/collection/pusher/engine_test.go +++ b/engine/collection/pusher/engine_test.go @@ -1,8 +1,10 @@ package pusher_test import ( + "context" "io" "testing" + "time" "github.com/rs/zerolog" "github.com/stretchr/testify/mock" @@ -12,6 +14,7 @@ import ( "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/model/flow/filter" "github.com/onflow/flow-go/model/messages" + "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/module/metrics" module "github.com/onflow/flow-go/module/mock" "github.com/onflow/flow-go/network/channels" @@ -82,12 +85,17 @@ func TestPusherEngine(t *testing.T) { // should be able to submit collection guarantees to consensus nodes func (suite *Suite) TestSubmitCollectionGuarantee() { + ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(suite.T(), context.Background()) + suite.engine.Start(ctx) + defer cancel() + done := make(chan struct{}) guarantee := unittest.CollectionGuaranteeFixture() // should submit the collection to consensus nodes consensus := suite.identities.Filter(filter.HasRole[flow.Identity](flow.RoleConsensus)) - suite.conduit.On("Publish", guarantee, consensus[0].NodeID).Return(nil) + suite.conduit.On("Publish", guarantee, consensus[0].NodeID). + Run(func(_ mock.Arguments) { close(done) }).Return(nil).Once() msg := &messages.SubmitCollectionGuarantee{ Guarantee: *guarantee, @@ -95,6 +103,8 @@ func (suite *Suite) TestSubmitCollectionGuarantee() { err := suite.engine.ProcessLocal(msg) suite.Require().Nil(err) + unittest.RequireCloseBefore(suite.T(), done, time.Second, "message not sent") + suite.conduit.AssertExpectations(suite.T()) } diff --git a/engine/testutil/mock/nodes.go b/engine/testutil/mock/nodes.go index 8a2ea4fed1a..aae7383ada0 100644 --- a/engine/testutil/mock/nodes.go +++ b/engine/testutil/mock/nodes.go @@ -140,6 +140,7 @@ func (n CollectionNode) Start(t *testing.T) { n.IngestionEngine.Start(n.Ctx) n.EpochManagerEngine.Start(n.Ctx) n.ProviderEngine.Start(n.Ctx) + n.PusherEngine.Start(n.Ctx) } func (n CollectionNode) Ready() <-chan struct{} { diff --git a/module/metrics/labels.go b/module/metrics/labels.go index 20b66ad7d68..1e5b6601a2e 100644 --- a/module/metrics/labels.go +++ b/module/metrics/labels.go @@ -113,6 +113,7 @@ const ( ResourceFollowerLoopCertifiedBlocksChannel = "follower_loop_certified_blocks_channel" // follower loop, certified blocks buffered channel ResourceClusterBlockProposalQueue = "cluster_compliance_proposal_queue" // collection node, compliance engine ResourceTransactionIngestQueue = "ingest_transaction_queue" // collection node, ingest engine + ResourceSubmitCollectionGuaranteesQueue = "pusher_col_guarantee_queue" // collection node, pusher engine ResourceBeaconKey = "beacon-key" // consensus node, DKG engine ResourceDKGMessage = "dkg_private_message" // consensus, DKG messaging engine ResourceApprovalQueue = "sealing_approval_queue" // consensus node, sealing engine