Skip to content

Commit

Permalink
chore: error handling (succinctlabs#280)
Browse files Browse the repository at this point in the history
* feat: better proposer error handling

* feat: fix proposer witnessgen timeout

* add
  • Loading branch information
ratankaliani authored and adam-xu-mantle committed Dec 16, 2024
1 parent d7c4119 commit 92bc868
Show file tree
Hide file tree
Showing 7 changed files with 303 additions and 124 deletions.
6 changes: 6 additions & 0 deletions proposer/op/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,10 @@ github.com/slack-go/slack v0.14.0 h1:6c0UTfbRnvRssZUsZ2qe0Iu07VAMPjRqOa6oX8ewF4k
github.com/slack-go/slack v0.14.0/go.mod h1:hlGi5oXA+Gt+yWTPP0plCdRKmjsDxecdHxYQdlMQKOw=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I=
github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/status-im/keycard-go v0.2.0 h1:QDLFswOQu1r5jsycloeQh3bVU8n/NatHHaZobtDnDzA=
github.com/status-im/keycard-go v0.2.0/go.mod h1:wlp8ZLbsmrF6g6WjugPAx+IzoLrkdf9+mHxBEeo3Hbg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down Expand Up @@ -443,6 +447,8 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.24.0 h1:J1shsA93PJUEVaUSaay7UXAyE8aimq3GW0pjlolpa24=
golang.org/x/tools v0.24.0/go.mod h1:YhNqVBIfWHdzvTLs0d8LCuMhkKUgSUKldakyV7W/WDQ=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
47 changes: 3 additions & 44 deletions proposer/op/proposer/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func InitDB(dbPath string, useCachedDb bool) (*ProofDB, error) {
}

// Use the TL;DR SQLite settings from https://kerkour.com/sqlite-for-servers.
connectionUrl := fmt.Sprintf("file:%s?_fk=1&journal_mode=WAL&synchronous=normal&cache_size=100000000&busy_timeout=15000&_txlock=immediate", dbPath)
connectionUrl := fmt.Sprintf("file:%s?_fk=1&journal_mode=WAL&synchronous=normal&cache_size=100000000&busy_timeout=30000&_txlock=immediate", dbPath)

writeDrv, err := sql.Open("sqlite3", connectionUrl)
if err != nil {
Expand All @@ -47,15 +47,15 @@ func InitDB(dbPath string, useCachedDb bool) (*ProofDB, error) {

// The write lock only allows one connection to the DB at a time.
writeDb.SetMaxOpenConns(1)
writeDb.SetConnMaxLifetime(time.Hour)
writeDb.SetConnMaxLifetime(10 * time.Minute)

readDrv, err := sql.Open("sqlite3", connectionUrl)
if err != nil {
return nil, fmt.Errorf("failed opening connection to sqlite: %v", err)
}
readDb := readDrv.DB()
readDb.SetMaxOpenConns(max(4, runtime.NumCPU()/4))
readDb.SetConnMaxLifetime(time.Hour)
readDb.SetConnMaxLifetime(10 * time.Minute)

readClient := ent.NewClient(ent.Driver(readDrv))
writeClient := ent.NewClient(ent.Driver(writeDrv))
Expand Down Expand Up @@ -252,47 +252,6 @@ func (db *ProofDB) GetLatestEndBlock() (uint64, error) {
return uint64(maxEnd.EndBlock), nil
}

// When restarting the L2OutputSubmitter, some proofs may have been left in a "requested" state without a prover request ID on the server. Until we
// implement a mechanism for querying the status of the witness generation, we need to time out these proofs after a period of time so they can be requested.
func (db *ProofDB) GetWitnessGenerationTimeoutProofsOnServer() ([]*ent.ProofRequest, error) {
currentTime := time.Now().Unix()
twentyMinutesAgo := currentTime - 20*60

proofs, err := db.readClient.ProofRequest.Query().
Where(
proofrequest.StatusEQ(proofrequest.StatusWITNESSGEN),
proofrequest.ProverRequestIDIsNil(),
proofrequest.LastUpdatedTimeLT(uint64(twentyMinutesAgo)),
).
All(context.Background())

if err != nil {
return nil, fmt.Errorf("failed to query witness generation timeout proofs: %w", err)
}

return proofs, nil
}

// If a proof failed to be sent to the prover network, it's status will be set to FAILED, but the prover request ID will be empty.
// This function returns all such proofs.
func (db *ProofDB) GetProofsFailedOnServer() ([]*ent.ProofRequest, error) {
proofs, err := db.readClient.ProofRequest.Query().
Where(
proofrequest.StatusEQ(proofrequest.StatusFAILED),
proofrequest.ProverRequestIDEQ(""),
).
All(context.Background())

if err != nil {
if ent.IsNotFound(err) {
return nil, nil
}
return nil, fmt.Errorf("failed to query failed proof: %w", err)
}

return proofs, nil
}

// GetAllProofsWithStatus returns all proofs with the given status.
func (db *ProofDB) GetAllProofsWithStatus(status proofrequest.Status) ([]*ent.ProofRequest, error) {
proofs, err := db.readClient.ProofRequest.Query().
Expand Down
5 changes: 5 additions & 0 deletions proposer/op/proposer/db/ent/migrate/schema.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 13 additions & 4 deletions proposer/op/proposer/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,13 +622,22 @@ func (l *L2OutputSubmitter) loopL2OO(ctx context.Context) {
continue
}

// 2) Check the statuses of all requested proofs.
// 2) Check the statuses of PROVING requests.
// If it's successfully returned, we validate that we have it on disk and set status = "COMPLETE".
// If it fails or times out, we set status = "FAILED" (and, if it's a span proof, split the request in half to try again).
l.Log.Info("Stage 2: Processing Pending Proofs...")
err = l.ProcessPendingProofs()
l.Log.Info("Stage 2: Processing PROVING requests...")
err = l.ProcessProvingRequests()
if err != nil {
l.Log.Error("failed to update requested proofs", "err", err)
l.Log.Error("failed to update PROVING requests", "err", err)
continue
}

// 3) Check the statuses of WITNESSGEN requests.
// If the witness generation request has been in the WITNESSGEN state for longer than the timeout, set status to FAILED and retry.
l.Log.Info("Stage 3: Processing WITNESSGEN requests...")
err = l.ProcessWitnessgenRequests()
if err != nil {
l.Log.Error("failed to update WITNESSGEN requests", "err", err)
continue
}

Expand Down
29 changes: 24 additions & 5 deletions proposer/op/proposer/prove.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ import (
)

const PROOF_STATUS_TIMEOUT = 30 * time.Second
const WITNESS_GEN_TIMEOUT = 20 * time.Minute
const WITNESSGEN_TIMEOUT = 20 * time.Minute

// This limit is set to prevent overloading the witness generation server. Until Kona improves their native I/O API (https://github.com/anton-rs/kona/issues/553)
// the maximum number of concurrent witness generation requests is roughly num_cpu / 2. Set it to 5 for now to be safe.
const MAX_CONCURRENT_WITNESS_GEN = 5

// Process all of the pending proofs.
func (l *L2OutputSubmitter) ProcessPendingProofs() error {
// Process all of requests in PROVING state.
func (l *L2OutputSubmitter) ProcessProvingRequests() error {
// Get all proof requests that are currently in the PROVING state.
reqs, err := l.db.GetAllProofsWithStatus(proofrequest.StatusPROVING)
if err != nil {
Expand Down Expand Up @@ -72,6 +72,25 @@ func (l *L2OutputSubmitter) ProcessPendingProofs() error {
return nil
}

// Process all of requests in WITNESSGEN state.
func (l *L2OutputSubmitter) ProcessWitnessgenRequests() error {
// Get all proof requests that are currently in the WITNESSGEN state.
reqs, err := l.db.GetAllProofsWithStatus(proofrequest.StatusWITNESSGEN)
if err != nil {
return err
}
for _, req := range reqs {
// If the request has been in the WITNESSGEN state for longer than the timeout, set status to FAILED.
// This is a catch-all in case the witness generation state update failed.
if req.LastUpdatedTime+uint64(WITNESSGEN_TIMEOUT.Seconds()) < uint64(time.Now().Unix()) {
// Retry the request if it timed out.
l.RetryRequest(req, ProofStatusResponse{})
}
}

return nil
}

// Retry a proof request. Sets the status of a proof to FAILED and retries the proof based on the optional proof status response.
// If the response is a program execution error, the proof is split into two, which will avoid SP1 out of memory execution errors.
func (l *L2OutputSubmitter) RetryRequest(req *ent.ProofRequest, status ProofStatusResponse) error {
Expand Down Expand Up @@ -314,12 +333,12 @@ func (l *L2OutputSubmitter) makeProofRequest(proofType proofrequest.Type, jsonBo
}
req.Header.Set("Content-Type", "application/json")

client := &http.Client{Timeout: WITNESS_GEN_TIMEOUT}
client := &http.Client{Timeout: WITNESSGEN_TIMEOUT}
resp, err := client.Do(req)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
l.Metr.RecordWitnessGenFailure("Timeout")
return nil, fmt.Errorf("request timed out after %s: %w", WITNESS_GEN_TIMEOUT, err)
return nil, fmt.Errorf("request timed out after %s: %w", WITNESSGEN_TIMEOUT, err)
}
return nil, fmt.Errorf("failed to send request: %w", err)
}
Expand Down
201 changes: 201 additions & 0 deletions proposer/op/proposer/range.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package proposer

import (
"context"
"fmt"
"slices"
"sync"

"github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/succinctlabs/op-succinct-go/proposer/db/ent"
"github.com/succinctlabs/op-succinct-go/proposer/db/ent/proofrequest"
"golang.org/x/sync/errgroup"
)

type Span struct {
Start uint64
End uint64
}

// GetL1HeadForL2Block returns the L1 block from which the L2 block can be derived.
func (l *L2OutputSubmitter) GetL1HeadForL2Block(ctx context.Context, rollupClient *sources.RollupClient, l2End uint64) (uint64, error) {
status, err := rollupClient.SyncStatus(ctx)
if err != nil {
return 0, fmt.Errorf("failed to get sync status: %w", err)
}
latestL1Block := status.HeadL1.Number

// Get the L1 origin of the end block.
outputResponse, err := rollupClient.OutputAtBlock(ctx, l2End)
if err != nil {
return 0, fmt.Errorf("failed to get l1 origin: %w", err)
}
L2EndL1Origin := outputResponse.BlockRef.L1Origin.Number

// Search forward from the L1 origin of the L2 end block until we find a safe head greater than the L2 end block.
for currentL1Block := L2EndL1Origin; currentL1Block <= latestL1Block; currentL1Block++ {
safeHead, err := rollupClient.SafeHeadAtL1Block(ctx, currentL1Block)
if err != nil {
return 0, fmt.Errorf("failed to get safe head: %w", err)
}
// If the safe head is greater than or equal to the L2 end block at this L1 block, then we can derive the L2 end block from this L1 block.
if safeHead.SafeHead.Number >= l2End {
return currentL1Block, nil
}
}

return 0, fmt.Errorf("could not find an L1 block with an L2 safe head greater than the L2 end block")
}

func (l *L2OutputSubmitter) isSafeDBActivated(ctx context.Context, rollupClient *sources.RollupClient) (bool, error) {
// Get the sync status of the rollup node.
status, err := rollupClient.SyncStatus(ctx)
if err != nil {
return false, fmt.Errorf("failed to get sync status: %w", err)
}

// Attempt querying the safe head at the latest L1 block.
_, err = rollupClient.SafeHeadAtL1Block(ctx, status.HeadL1.Number)
if err != nil {
return false, fmt.Errorf("failed to get safe head: %w", err)
}

return true, nil
}

// SplitRangeBasedOnSafeHeads splits a range into spans based on safe head boundaries.
// This is useful when we want to ensure that each span aligns with L2 safe head boundaries.
func (l *L2OutputSubmitter) SplitRangeBasedOnSafeHeads(ctx context.Context, l2Start, l2End uint64) ([]Span, error) {
spans := []Span{}
currentStart := l2Start

rollupClient, err := dial.DialRollupClientWithTimeout(ctx, dial.DefaultDialTimeout, l.Log, l.Cfg.RollupRpc)
if err != nil {
return nil, err
}

L1Head, err := l.GetL1HeadForL2Block(ctx, rollupClient, l2End)
if err != nil {
return nil, fmt.Errorf("failed to get l1 head for l2 block: %w", err)
}

l2StartOutput, err := rollupClient.OutputAtBlock(ctx, l2Start)
if err != nil {
return nil, fmt.Errorf("failed to get l2 start output: %w", err)
}
L2StartL1Origin := l2StartOutput.BlockRef.L1Origin.Number

safeHeads := make(map[uint64]struct{})
mu := sync.Mutex{}
g := errgroup.Group{}
g.SetLimit(10)

// Get all of the safe heads between the L2 start block and the L1 head. Use parallel requests to speed up the process.
// This is useful for when a chain is behind.
for currentL1Block := L2StartL1Origin; currentL1Block <= L1Head; currentL1Block++ {
l1Block := currentL1Block
g.Go(func() error {
safeHead, err := rollupClient.SafeHeadAtL1Block(ctx, l1Block)
if err != nil {
return fmt.Errorf("failed to get safe head at block %d: %w", l1Block, err)
}

mu.Lock()
safeHeads[safeHead.SafeHead.Number] = struct{}{}
mu.Unlock()
return nil
})
}

if err := g.Wait(); err != nil {
return nil, fmt.Errorf("failed while getting safe heads: %w", err)
}

uniqueSafeHeads := make([]uint64, 0, len(safeHeads))
for safeHead := range safeHeads {
uniqueSafeHeads = append(uniqueSafeHeads, safeHead)
}
slices.Sort(uniqueSafeHeads)

// Loop over all of the safe heads and create spans.
for _, safeHead := range uniqueSafeHeads {
if safeHead > currentStart {
rangeStart := currentStart
for rangeStart+l.Cfg.MaxBlockRangePerSpanProof < min(l2End, safeHead) {
spans = append(spans, Span{
Start: rangeStart,
End: rangeStart + l.Cfg.MaxBlockRangePerSpanProof,
})
rangeStart += l.Cfg.MaxBlockRangePerSpanProof
}
spans = append(spans, Span{
Start: rangeStart,
End: min(l2End, safeHead),
})
currentStart = safeHead
}
}

return spans, nil
}

// CreateSpans creates a list of spans of size MaxBlockRangePerSpanProof from start to end. Note: The end of span i = start of span i+1.
func (l *L2OutputSubmitter) SplitRangeBasic(start, end uint64) []Span {
spans := []Span{}
// Create spans of size MaxBlockRangePerSpanProof from start to end.
// Each span starts where the previous one ended.
// Continue until we can't fit another full span before reaching end.
for i := start; i+l.Cfg.MaxBlockRangePerSpanProof <= end; i += l.Cfg.MaxBlockRangePerSpanProof {
spans = append(spans, Span{Start: i, End: i + l.Cfg.MaxBlockRangePerSpanProof})
}
return spans
}

func (l *L2OutputSubmitter) GetRangeProofBoundaries(ctx context.Context) error {
// nextBlock is equal to the highest value in the `EndBlock` column of the DB, plus 1.
latestL2EndBlock, err := l.db.GetLatestEndBlock()
if err != nil {
if ent.IsNotFound(err) {
latestEndBlockU256, err := l.l2ooContract.LatestBlockNumber(&bind.CallOpts{Context: ctx})
if err != nil {
return fmt.Errorf("failed to get latest output index: %w", err)
} else {
latestL2EndBlock = latestEndBlockU256.Uint64()
}
} else {
l.Log.Error("failed to get latest end requested", "err", err)
return err
}
}
newL2StartBlock := latestL2EndBlock

rollupClient, err := dial.DialRollupClientWithTimeout(ctx, dial.DefaultDialTimeout, l.Log, l.Cfg.RollupRpc)
if err != nil {
return err
}

// Get the latest finalized L2 block.
status, err := rollupClient.SyncStatus(ctx)
if err != nil {
l.Log.Error("proposer unable to get sync status", "err", err)
return err
}
// Note: Originally, this used the L1 finalized block. However, to satisfy the new API, we now use the L2 finalized block.
newL2EndBlock := status.FinalizedL2.Number

spans := l.SplitRangeBasic(newL2StartBlock, newL2EndBlock)

// Add each span to the DB. If there are no spans, we will not create any proofs.
for _, span := range spans {
err := l.db.NewEntry(proofrequest.TypeSPAN, span.Start, span.End)
l.Log.Info("New range proof request.", "start", span.Start, "end", span.End)
if err != nil {
l.Log.Error("failed to add span to db", "err", err)
return err
}
}

return nil
}
Loading

0 comments on commit 92bc868

Please sign in to comment.