Skip to content

Commit

Permalink
feat(sequencer): Origin Selector asynchronously prefetches the next o…
Browse files Browse the repository at this point in the history
…rigin from events (ethereum-optimism#12134)

* Sequencer: Origin Selector optimistically prefetches the next origin in background

* L1OriginSelector erases cached state on reset

* L1OriginSelector attempts to fetch on ForkchoiceUpdateEvent

* Move to a fully event-driven model, no extra goroutines

* Add missing test comment

* Minor cleanup, more tests

* Tune the context timeouts
  • Loading branch information
BrianBland authored Oct 2, 2024
1 parent 73038c8 commit 445a3d4
Show file tree
Hide file tree
Showing 4 changed files with 551 additions and 65 deletions.
4 changes: 3 additions & 1 deletion op-e2e/actions/helpers/l2_sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ func NewL2Sequencer(t Testing, log log.Logger, l1 derive.L1Fetcher, blobSrc deri
ver := NewL2Verifier(t, log, l1, blobSrc, altDASrc, eng, cfg, &sync.Config{}, safedb.Disabled, interopBackend)
attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, eng)
seqConfDepthL1 := confdepth.NewConfDepth(seqConfDepth, ver.syncStatus.L1Head, l1)
originSelector := sequencing.NewL1OriginSelector(t.Ctx(), log, cfg, seqConfDepthL1)
l1OriginSelector := &MockL1OriginSelector{
actual: sequencing.NewL1OriginSelector(log, cfg, seqConfDepthL1),
actual: originSelector,
}
metr := metrics.NoopMetrics
seqStateListener := node.DisabledConfigPersistence{}
Expand All @@ -78,6 +79,7 @@ func NewL2Sequencer(t Testing, log log.Logger, l1 derive.L1Fetcher, blobSrc deri
},
}
ver.eventSys.Register("sequencer", seq, opts)
ver.eventSys.Register("origin-selector", originSelector, opts)
require.NoError(t, seq.Init(t.Ctx(), true))
return &L2Sequencer{
L2Verifier: ver,
Expand Down
3 changes: 2 additions & 1 deletion op-node/rollup/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ func NewDriver(
asyncGossiper := async.NewAsyncGossiper(driverCtx, network, log, metrics)
attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, l2)
sequencerConfDepth := confdepth.NewConfDepth(driverCfg.SequencerConfDepth, statusTracker.L1Head, l1)
findL1Origin := sequencing.NewL1OriginSelector(log, cfg, sequencerConfDepth)
findL1Origin := sequencing.NewL1OriginSelector(driverCtx, log, cfg, sequencerConfDepth)
sys.Register("origin-selector", findL1Origin, opts)
sequencer = sequencing.NewSequencer(driverCtx, log, cfg, attrBuilder, findL1Origin,
sequencerStateListener, sequencerConductor, asyncGossiper, metrics)
sys.Register("sequencer", sequencer, opts)
Expand Down
181 changes: 146 additions & 35 deletions op-node/rollup/sequencing/origin_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/log"

"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-service/eth"
)

Expand All @@ -20,78 +23,186 @@ type L1Blocks interface {
}

type L1OriginSelector struct {
ctx context.Context
log log.Logger
cfg *rollup.Config
spec *rollup.ChainSpec

l1 L1Blocks

// Internal cache of L1 origins for faster access.
currentOrigin eth.L1BlockRef
nextOrigin eth.L1BlockRef

mu sync.Mutex
}

func NewL1OriginSelector(log log.Logger, cfg *rollup.Config, l1 L1Blocks) *L1OriginSelector {
func NewL1OriginSelector(ctx context.Context, log log.Logger, cfg *rollup.Config, l1 L1Blocks) *L1OriginSelector {
return &L1OriginSelector{
ctx: ctx,
log: log,
cfg: cfg,
spec: rollup.NewChainSpec(cfg),
l1: l1,
}
}

func (los *L1OriginSelector) OnEvent(ev event.Event) bool {
switch x := ev.(type) {
case engine.ForkchoiceUpdateEvent:
los.onForkchoiceUpdate(x.UnsafeL2Head)
case rollup.ResetEvent:
los.reset()
default:
return false
}
return true
}

// FindL1Origin determines what the next L1 Origin should be.
// The L1 Origin is either the L2 Head's Origin, or the following L1 block
// if the next L2 block's time is greater than or equal to the L2 Head's Origin.
func (los *L1OriginSelector) FindL1Origin(ctx context.Context, l2Head eth.L2BlockRef) (eth.L1BlockRef, error) {
// Grab a reference to the current L1 origin block. This call is by hash and thus easily cached.
currentOrigin, err := los.l1.L1BlockRefByHash(ctx, l2Head.L1Origin.Hash)
currentOrigin, nextOrigin, err := los.CurrentAndNextOrigin(ctx, l2Head)
if err != nil {
return eth.L1BlockRef{}, err
}

// If the next L2 block time is greater than the next origin block's time, we can choose to
// start building on top of the next origin. Sequencer implementation has some leeway here and
// could decide to continue to build on top of the previous origin until the Sequencer runs out
// of slack. For simplicity, we implement our Sequencer to always start building on the latest
// L1 block when we can.
if nextOrigin != (eth.L1BlockRef{}) && l2Head.Time+los.cfg.BlockTime >= nextOrigin.Time {
return nextOrigin, nil
}

msd := los.spec.MaxSequencerDrift(currentOrigin.Time)
log := los.log.New("current", currentOrigin, "current_time", currentOrigin.Time,
"l2_head", l2Head, "l2_head_time", l2Head.Time, "max_seq_drift", msd)

seqDrift := l2Head.Time + los.cfg.BlockTime - currentOrigin.Time
pastSeqDrift := l2Head.Time+los.cfg.BlockTime-currentOrigin.Time > msd

// If we are past the sequencer depth, we may want to advance the origin, but need to still
// check the time of the next origin.
pastSeqDrift := seqDrift > msd
if pastSeqDrift {
log.Warn("Next L2 block time is past the sequencer drift + current origin time")
seqDrift = msd
// If we are not past the max sequencer drift, we can just return the current origin.
if !pastSeqDrift {
return currentOrigin, nil
}

// Calculate the maximum time we can spend attempting to fetch the next L1 origin block.
// Time spent fetching this information is time not spent building the next L2 block, so
// we generally prioritize keeping this value small, allowing for a nonzero failure rate.
// As the next L2 block time approaches the max sequencer drift, increase our tolerance for
// slower L1 fetches in order to avoid falling too far behind.
fetchTimeout := time.Second + (9*time.Second*time.Duration(seqDrift))/time.Duration(msd)
fetchCtx, cancel := context.WithTimeout(ctx, fetchTimeout)
defer cancel()
// Otherwise, we need to find the next L1 origin block in order to continue producing blocks.
log.Warn("Next L2 block time is past the sequencer drift + current origin time")

// Attempt to find the next L1 origin block, where the next origin is the immediate child of
// the current origin block.
// The L1 source can be shimmed to hide new L1 blocks and enforce a sequencer confirmation distance.
nextOrigin, err := los.l1.L1BlockRefByNumber(fetchCtx, currentOrigin.Number+1)
if err != nil {
if pastSeqDrift {
if nextOrigin == (eth.L1BlockRef{}) {
fetchCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

// If the next origin is not set, we need to fetch it now.
nextOrigin, err = los.fetch(fetchCtx, currentOrigin.Number+1)
if err != nil {
return eth.L1BlockRef{}, fmt.Errorf("cannot build next L2 block past current L1 origin %s by more than sequencer time drift, and failed to find next L1 origin: %w", currentOrigin, err)
}
}

// If the next origin is ahead of the L2 head, we must return the current origin.
if l2Head.Time+los.cfg.BlockTime < nextOrigin.Time {
return currentOrigin, nil
}

return nextOrigin, nil
}

func (los *L1OriginSelector) CurrentAndNextOrigin(ctx context.Context, l2Head eth.L2BlockRef) (eth.L1BlockRef, eth.L1BlockRef, error) {
los.mu.Lock()
defer los.mu.Unlock()

if l2Head.L1Origin == los.currentOrigin.ID() {
// Most likely outcome: the L2 head is still on the current origin.
} else if l2Head.L1Origin == los.nextOrigin.ID() {
// If the L2 head has progressed to the next origin, update the current and next origins.
los.currentOrigin = los.nextOrigin
los.nextOrigin = eth.L1BlockRef{}
} else {
// If for some reason the L2 head is not on the current or next origin, we need to find the
// current origin block and reset the next origin.
// This is most likely to occur on the first block after a restart.

// Grab a reference to the current L1 origin block. This call is by hash and thus easily cached.
currentOrigin, err := los.l1.L1BlockRefByHash(ctx, l2Head.L1Origin.Hash)
if err != nil {
return eth.L1BlockRef{}, eth.L1BlockRef{}, err
}

los.currentOrigin = currentOrigin
los.nextOrigin = eth.L1BlockRef{}
}

return los.currentOrigin, los.nextOrigin, nil
}

func (los *L1OriginSelector) maybeSetNextOrigin(nextOrigin eth.L1BlockRef) {
los.mu.Lock()
defer los.mu.Unlock()

// Set the next origin if it is the immediate child of the current origin.
if nextOrigin.ParentHash == los.currentOrigin.Hash {
los.nextOrigin = nextOrigin
}
}

func (los *L1OriginSelector) onForkchoiceUpdate(unsafeL2Head eth.L2BlockRef) {
// Only allow a relatively small window for fetching the next origin, as this is performed
// on a best-effort basis.
ctx, cancel := context.WithTimeout(los.ctx, 500*time.Millisecond)
defer cancel()

currentOrigin, nextOrigin, err := los.CurrentAndNextOrigin(ctx, unsafeL2Head)
if err != nil {
log.Error("Failed to get current and next L1 origin on forkchoice update", "err", err)
return
}

los.tryFetchNextOrigin(ctx, currentOrigin, nextOrigin)
}

// tryFetchNextOrigin schedules a fetch for the next L1 origin block if it is not already set.
// This method always closes the channel, even if the next origin is already set.
func (los *L1OriginSelector) tryFetchNextOrigin(ctx context.Context, currentOrigin, nextOrigin eth.L1BlockRef) {
// If the next origin is already set, we don't need to do anything.
if nextOrigin != (eth.L1BlockRef{}) {
return
}

// If the current origin is not set, we can't schedule the next origin check.
if currentOrigin == (eth.L1BlockRef{}) {
return
}

if _, err := los.fetch(ctx, currentOrigin.Number+1); err != nil {
if errors.Is(err, ethereum.NotFound) {
log.Debug("No next L1 block found, repeating current origin")
log.Debug("No next potential L1 origin found")
} else {
log.Error("Failed to get next origin. Falling back to current origin", "err", err)
log.Error("Failed to get next origin", "err", err)
}
return currentOrigin, nil
}
}

// If the next L2 block time is greater than the next origin block's time, we can choose to
// start building on top of the next origin. Sequencer implementation has some leeway here and
// could decide to continue to build on top of the previous origin until the Sequencer runs out
// of slack. For simplicity, we implement our Sequencer to always start building on the latest
// L1 block when we can.
if l2Head.Time+los.cfg.BlockTime >= nextOrigin.Time {
return nextOrigin, nil
func (los *L1OriginSelector) fetch(ctx context.Context, number uint64) (eth.L1BlockRef, error) {
// Attempt to find the next L1 origin block, where the next origin is the immediate child of
// the current origin block.
// The L1 source can be shimmed to hide new L1 blocks and enforce a sequencer confirmation distance.
nextOrigin, err := los.l1.L1BlockRefByNumber(ctx, number)
if err != nil {
return eth.L1BlockRef{}, err
}

return currentOrigin, nil
los.maybeSetNextOrigin(nextOrigin)

return nextOrigin, nil
}

func (los *L1OriginSelector) reset() {
los.mu.Lock()
defer los.mu.Unlock()

los.currentOrigin = eth.L1BlockRef{}
los.nextOrigin = eth.L1BlockRef{}
}
Loading

0 comments on commit 445a3d4

Please sign in to comment.