-
Notifications
You must be signed in to change notification settings - Fork 179
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
replace engine.Unit with ComponentManager in Pusher Engine #6747
base: feature/pusher-engine-refactor
Are you sure you want to change the base?
Changes from all commits
3552f06
6d3f462
0aadc13
dec0d58
b5418dd
b7166f3
e25cba7
f2f53a8
f66ba69
1c80949
fbed8e7
051e6d4
ddb0cb7
17a9a2b
aad332c
ad04f27
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -4,15 +4,19 @@ | |||
package pusher | ||||
|
||||
import ( | ||||
"context" | ||||
"fmt" | ||||
|
||||
"github.com/rs/zerolog" | ||||
|
||||
"github.com/onflow/flow-go/engine" | ||||
"github.com/onflow/flow-go/engine/common/fifoqueue" | ||||
"github.com/onflow/flow-go/model/flow" | ||||
"github.com/onflow/flow-go/model/flow/filter" | ||||
"github.com/onflow/flow-go/model/messages" | ||||
"github.com/onflow/flow-go/module" | ||||
"github.com/onflow/flow-go/module/component" | ||||
"github.com/onflow/flow-go/module/irrecoverable" | ||||
"github.com/onflow/flow-go/module/metrics" | ||||
"github.com/onflow/flow-go/network" | ||||
"github.com/onflow/flow-go/network/channels" | ||||
|
@@ -21,30 +25,62 @@ import ( | |||
"github.com/onflow/flow-go/utils/logging" | ||||
) | ||||
|
||||
// Engine is the collection pusher engine, which provides access to resources | ||||
// held by the collection node. | ||||
// Engine is part of the Collection Node. It broadcasts finalized collections | ||||
// ("collection guarantees") that the cluster generates to Consensus Nodes | ||||
// for inclusion in blocks. | ||||
type Engine struct { | ||||
unit *engine.Unit | ||||
log zerolog.Logger | ||||
engMetrics module.EngineMetrics | ||||
colMetrics module.CollectionMetrics | ||||
conduit network.Conduit | ||||
me module.Local | ||||
state protocol.State | ||||
collections storage.Collections | ||||
transactions storage.Transactions | ||||
|
||||
notifier engine.Notifier | ||||
queue *fifoqueue.FifoQueue | ||||
|
||||
component.Component | ||||
cm *component.ComponentManager | ||||
} | ||||
|
||||
func New(log zerolog.Logger, net network.EngineRegistry, state protocol.State, engMetrics module.EngineMetrics, colMetrics module.CollectionMetrics, me module.Local, collections storage.Collections, transactions storage.Transactions) (*Engine, error) { | ||||
// TODO convert to network.MessageProcessor | ||||
var _ network.Engine = (*Engine)(nil) | ||||
var _ component.Component = (*Engine)(nil) | ||||
Comment on lines
+47
to
+49
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It may also be a good exercise to make the change to I'll add some thoughts below; I suggest we spend some time discussing this live as well. There is no need to implement any of the comments below before we have a chance to discuss. The
Replacing
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1. Had a similar thought 👉 my comment There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Change to |
||||
|
||||
// New creates a new pusher engine. | ||||
func New( | ||||
log zerolog.Logger, | ||||
net network.EngineRegistry, | ||||
state protocol.State, | ||||
engMetrics module.EngineMetrics, | ||||
mempoolMetrics module.MempoolMetrics, | ||||
me module.Local, | ||||
collections storage.Collections, | ||||
transactions storage.Transactions, | ||||
) (*Engine, error) { | ||||
queue, err := fifoqueue.NewFifoQueue( | ||||
200, // roughly 1 minute of collections, at 3BPS | ||||
fifoqueue.WithLengthObserver(func(len int) { | ||||
mempoolMetrics.MempoolEntries(metrics.ResourceSubmitCollectionGuaranteesQueue, uint(len)) | ||||
}), | ||||
) | ||||
if err != nil { | ||||
return nil, fmt.Errorf("could not create fifoqueue: %w", err) | ||||
} | ||||
|
||||
notifier := engine.NewNotifier() | ||||
|
||||
e := &Engine{ | ||||
unit: engine.NewUnit(), | ||||
log: log.With().Str("engine", "pusher").Logger(), | ||||
engMetrics: engMetrics, | ||||
colMetrics: colMetrics, | ||||
me: me, | ||||
state: state, | ||||
collections: collections, | ||||
transactions: transactions, | ||||
|
||||
notifier: notifier, | ||||
queue: queue, | ||||
} | ||||
|
||||
conduit, err := net.Register(channels.PushGuarantees, e) | ||||
|
@@ -53,81 +89,108 @@ func New(log zerolog.Logger, net network.EngineRegistry, state protocol.State, e | |||
} | ||||
e.conduit = conduit | ||||
|
||||
e.cm = component.NewComponentManagerBuilder(). | ||||
AddWorker(e.outboundQueueWorker). | ||||
Build() | ||||
e.Component = e.cm | ||||
|
||||
return e, nil | ||||
} | ||||
|
||||
// Ready returns a ready channel that is closed once the engine has fully | ||||
// started. | ||||
func (e *Engine) Ready() <-chan struct{} { | ||||
return e.unit.Ready() | ||||
// outboundQueueWorker implements a component worker which broadcasts collection guarantees, | ||||
// enqueued by the Finalizer upon finalization, to Consensus Nodes. | ||||
func (e *Engine) outboundQueueWorker(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { | ||||
ready() | ||||
|
||||
done := ctx.Done() | ||||
wake := e.notifier.Channel() | ||||
for { | ||||
select { | ||||
case <-done: | ||||
return | ||||
case <-wake: | ||||
err := e.processOutboundMessages(ctx) | ||||
if err != nil { | ||||
ctx.Throw(err) | ||||
} | ||||
} | ||||
} | ||||
} | ||||
|
||||
// Done returns a done channel that is closed once the engine has fully stopped. | ||||
func (e *Engine) Done() <-chan struct{} { | ||||
return e.unit.Done() | ||||
// processOutboundMessages processes any available messages from the queue. | ||||
// Only returns when the queue is empty (or the engine is terminated). | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add error documentation here. Either enumerate possible error types, or add |
||||
func (e *Engine) processOutboundMessages(ctx context.Context) error { | ||||
for { | ||||
nextMessage, ok := e.queue.Pop() | ||||
if !ok { | ||||
return nil | ||||
} | ||||
|
||||
asSCGMsg, ok := nextMessage.(*messages.SubmitCollectionGuarantee) | ||||
if !ok { | ||||
return fmt.Errorf("invalid message type in pusher engine queue") | ||||
} | ||||
|
||||
err := e.publishCollectionGuarantee(&asSCGMsg.Guarantee) | ||||
if err != nil { | ||||
return err | ||||
} | ||||
|
||||
select { | ||||
case <-ctx.Done(): | ||||
return nil | ||||
default: | ||||
} | ||||
} | ||||
} | ||||
|
||||
// SubmitLocal submits an event originating on the local node. | ||||
func (e *Engine) SubmitLocal(event interface{}) { | ||||
Comment on lines
147
to
148
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As an exercise, try to figure out which other components are feeing inputs into the Turns out that the Finalizer is the only component that is feeding the
To summarize: one of the main benefits of the Sorry for the lengthy comment. I hope you see how the very ambiguous structure of the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||
e.unit.Launch(func() { | ||||
err := e.process(e.me.NodeID(), event) | ||||
if err != nil { | ||||
engine.LogError(e.log, err) | ||||
} | ||||
}) | ||||
ev, ok := event.(*messages.SubmitCollectionGuarantee) | ||||
if ok { | ||||
e.SubmitCollectionGuarantee(ev) | ||||
} else { | ||||
engine.LogError(e.log, fmt.Errorf("invalid message argument to pusher engine")) | ||||
} | ||||
} | ||||
|
||||
// Submit submits the given event from the node with the given origin ID | ||||
// for processing in a non-blocking manner. It returns instantly and logs | ||||
// a potential processing error internally when done. | ||||
func (e *Engine) Submit(channel channels.Channel, originID flow.Identifier, event interface{}) { | ||||
e.unit.Launch(func() { | ||||
err := e.process(originID, event) | ||||
if err != nil { | ||||
engine.LogError(e.log, err) | ||||
} | ||||
}) | ||||
engine.LogError(e.log, fmt.Errorf("pusher engine should only receive local messages on the same node")) | ||||
} | ||||
|
||||
// ProcessLocal processes an event originating on the local node. | ||||
jordanschalm marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
func (e *Engine) ProcessLocal(event interface{}) error { | ||||
return e.unit.Do(func() error { | ||||
return e.process(e.me.NodeID(), event) | ||||
}) | ||||
ev, ok := event.(*messages.SubmitCollectionGuarantee) | ||||
if ok { | ||||
e.SubmitCollectionGuarantee(ev) | ||||
return nil | ||||
} else { | ||||
return fmt.Errorf("invalid message argument to pusher engine") | ||||
} | ||||
} | ||||
|
||||
// Process processes the given event from the node with the given origin ID in | ||||
// a blocking manner. It returns the potential processing error when done. | ||||
func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, event interface{}) error { | ||||
return e.unit.Do(func() error { | ||||
return e.process(originID, event) | ||||
}) | ||||
func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, message any) error { | ||||
return fmt.Errorf("pusher engine should only receive local messages on the same node") | ||||
} | ||||
|
||||
// process processes events for the pusher engine on the collection node. | ||||
func (e *Engine) process(originID flow.Identifier, event interface{}) error { | ||||
switch ev := event.(type) { | ||||
case *messages.SubmitCollectionGuarantee: | ||||
e.engMetrics.MessageReceived(metrics.EngineCollectionProvider, metrics.MessageSubmitGuarantee) | ||||
defer e.engMetrics.MessageHandled(metrics.EngineCollectionProvider, metrics.MessageSubmitGuarantee) | ||||
return e.onSubmitCollectionGuarantee(originID, ev) | ||||
default: | ||||
return fmt.Errorf("invalid event type (%T)", event) | ||||
// SubmitCollectionGuarantee adds a collection guarantee to the engine's queue | ||||
// to later be published to consensus nodes. | ||||
func (e *Engine) SubmitCollectionGuarantee(msg *messages.SubmitCollectionGuarantee) { | ||||
ok := e.queue.Push(msg) | ||||
Comment on lines
+183
to
+184
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When we were using a generic function (eg. When we have a context-specific method for enqueueing collection guarantees, we no longer need the type wrapper, and can simply pass (Let's apply this change in the follow-up PR.) |
||||
if !ok { | ||||
engine.LogError(e.log, fmt.Errorf("failed to store collection guarantee in queue")) | ||||
return | ||||
} | ||||
e.notifier.Notify() | ||||
} | ||||
|
||||
// onSubmitCollectionGuarantee handles submitting the given collection guarantee | ||||
// to consensus nodes. | ||||
func (e *Engine) onSubmitCollectionGuarantee(originID flow.Identifier, req *messages.SubmitCollectionGuarantee) error { | ||||
if originID != e.me.NodeID() { | ||||
return fmt.Errorf("invalid remote request to submit collection guarantee (from=%x)", originID) | ||||
} | ||||
|
||||
return e.SubmitCollectionGuarantee(&req.Guarantee) | ||||
} | ||||
|
||||
// SubmitCollectionGuarantee submits the collection guarantee to all consensus nodes. | ||||
func (e *Engine) SubmitCollectionGuarantee(guarantee *flow.CollectionGuarantee) error { | ||||
// publishCollectionGuarantee publishes the collection guarantee to all consensus nodes. | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you please add error return documentation to this function. If it might return an expected sentinel error (including from sub-calls into If you haven't already, take a look at the godoc references for In particular, |
||||
func (e *Engine) publishCollectionGuarantee(guarantee *flow.CollectionGuarantee) error { | ||||
consensusNodes, err := e.state.Final().Identities(filter.HasRole[flow.Identity](flow.RoleConsensus)) | ||||
if err != nil { | ||||
return fmt.Errorf("could not get consensus nodes: %w", err) | ||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please take a look at the core processing logic in the Engine and lets try to remove layers of complexity and indirection where possible:
flow-go/engine/collection/pusher/engine.go
Lines 182 to 191 in b5418dd
Looking at this code raises a few questions:
we are only propagating collections that are originating from this node (👉 code). But the exported method
SubmitCollectionGuarantee
is not checking this? Why not?onSubmitCollectionGuarantee
either..SubmitCollectionGuarantee
is only ever called by thepusher.Engine
itself (not even the tests call it).Lets simplify this: merge
SubmitCollectionGuarantee
andonSubmitCollectionGuarantee
.Further digging into the code, I found that the
originID
is just completely useless in this context:pusher.Engine
is fed only from internal components (sooriginID
is always filled with the node's own identifier)pusher.Engine
drops messages that are not from itself. Just hypothetically, lets assume that there were collections withoriginID
other than the node's own ID. ❗Note that those messages would be queued first, wasting resources and then we have a different thread discard the Collection after dequeueing. Checks like that should always be applied before queueing irrelevant messages.What you see here is a pattern we struggle with a lot: the code is very ambiguous and allows a whole bunch of stuff that is never intended to happen in practise. This is an unsuitable design for engineering complex systems such as Flow, as we then have to manually handle undesired cases.
SubmitCollectionGuarantee
, the compiler would enforce this constraint for us as opposed to us having to write code. Often, people don't bother to manually disallow undesired inputs, resulting in software that is doing something incorrect in case of bugs as opposed to crashing (or the code not compiling in the first place)originID
. We employ a generic patter which obfuscates that we don't need anoriginID
in this particular example, because theEngine
interface does not differentiate between node-external inputs (can be malicious) vs inputs from other components within its own node (always trusted) and mixes modes of processing (synchronous vs asynchronous).For those reasons, we deprecated the
Engine
interface. Lets try to update thepusher
and remove the engine interface (for concrete steps towards this, please see my next comment).For Flow, the APIs should generally be as descriptive and restrictive as possible and make it hard to feed in wrong / mismatching inputs. Writing extra lines of boilerplate code is not a challenge for Flow. In contrast, it is challenging if you have to dig through hundreds or thousands of lines of code because the APIs are too generic and you need to know implementation details to use them correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partially adressed in b7166f3 and e25cba7:
originID
has been pretty much removed from the interface, now thatProcess
andSubmit
(the functions in the Engine interface handling node-external inputs) simply error on any input.