From 39993ca5e6eafc85b0f3e6bfdf16b879349c46db Mon Sep 17 00:00:00 2001 From: Aurora Gaffney Date: Mon, 11 Nov 2024 17:31:59 -0600 Subject: [PATCH] feat: recover database on commit timestamp conflict --- database/commit_timestamp.go | 22 +++++++--- database/database.go | 14 +++--- state/state.go | 84 ++++++++++++++++++++++++++++++------ 3 files changed, 96 insertions(+), 24 deletions(-) diff --git a/database/commit_timestamp.go b/database/commit_timestamp.go index b10706f..d597a31 100644 --- a/database/commit_timestamp.go +++ b/database/commit_timestamp.go @@ -38,6 +38,19 @@ func (CommitTimestamp) TableName() string { return "commit_timestamp" } +type CommitTimestampError struct { + MetadataTimestamp int64 + BlobTimestamp int64 +} + +func (e CommitTimestampError) Error() string { + return fmt.Sprintf( + "commit timestamp mismatch: %d (metadata) != %d (blob)", + e.MetadataTimestamp, + e.BlobTimestamp, + ) +} + func (b *BaseDatabase) checkCommitTimestamp() error { // Create table if it doesn't exist if err := b.Metadata().AutoMigrate(&CommitTimestamp{}); err != nil { @@ -66,11 +79,10 @@ func (b *BaseDatabase) checkCommitTimestamp() error { tmpTimestamp := new(big.Int).SetBytes(val).Int64() // Compare values if tmpTimestamp != tmpCommitTimestamp.Timestamp { - return fmt.Errorf( - "commit timestamp mismatch: %d (metadata) != %d (blob)", - tmpCommitTimestamp.Timestamp, - tmpTimestamp, - ) + return CommitTimestampError{ + MetadataTimestamp: tmpCommitTimestamp.Timestamp, + BlobTimestamp: tmpTimestamp, + } } return nil }) diff --git a/database/database.go b/database/database.go index 41d6b40..e7189e2 100644 --- a/database/database.go +++ b/database/database.go @@ -73,15 +73,15 @@ func (b *BaseDatabase) init() error { } // Configure metrics for Badger DB b.registerBadgerMetrics() - // Check commit timestamp - if err := b.checkCommitTimestamp(); err != nil { - return err - } // Run GC periodically for Badger DB if b.blobGcEnabled { b.blobGcTimer = time.NewTicker(5 * time.Minute) go b.blobGc() } + // Check commit timestamp + if err := b.checkCommitTimestamp(); err != nil { + return err + } return nil } @@ -141,7 +141,8 @@ func NewInMemory(logger *slog.Logger) (*InMemoryDatabase, error) { }, } if err := db.init(); err != nil { - return nil, err + // Database is available for recovery, so return it with error + return db, err } return db, nil } @@ -204,7 +205,8 @@ func NewPersistent( dataDir: dataDir, } if err := db.init(); err != nil { - return nil, err + // Database is available for recovery, so return it with error + return db, err } return db, nil } diff --git a/state/state.go b/state/state.go index 4a095c2..dca53ac 100644 --- a/state/state.go +++ b/state/state.go @@ -69,16 +69,42 @@ func NewLedgerState(cfg LedgerStateConfig) (*LedgerState, error) { } if cfg.DataDir == "" { db, err := database.NewInMemory(ls.config.Logger) + ls.db = db if err != nil { - return nil, err + if _, ok := err.(database.CommitTimestampError); !ok { + return nil, err + } + ls.config.Logger.Warn( + "database initialization error", + "error", + err, + "component", + "ledger", + ) + // Run recovery + if err := ls.recoverCommitTimestampConflict(); err != nil { + return nil, err + } } - ls.db = db } else { db, err := database.NewPersistent(cfg.DataDir, cfg.Logger) + ls.db = db if err != nil { - return nil, err + if _, ok := err.(database.CommitTimestampError); !ok { + return nil, err + } + ls.config.Logger.Warn( + "database initialization error", + "error", + err, + "component", + "ledger", + ) + // Run recovery + if err := ls.recoverCommitTimestampConflict(); err != nil { + return nil, err + } } - ls.db = db } // Create the table schemas for _, model := range models.MigrateModels { @@ -109,6 +135,32 @@ func NewLedgerState(cfg LedgerStateConfig) (*LedgerState, error) { return ls, nil } +func (ls *LedgerState) recoverCommitTimestampConflict() error { + // Try to load last n blocks and rollback to the last one we can load + var tmpBlocks []models.Block + result := ls.db.Metadata().Order("id DESC").Limit(100).Find(&tmpBlocks) + if result.Error != nil { + return result.Error + } + for _, tmpBlock := range tmpBlocks { + blockPoint := ocommon.NewPoint( + tmpBlock.Slot, + tmpBlock.Hash, + ) + // Load individual block to also (attempt to) load CBOR + if _, err := models.BlockByPoint(ls.db, blockPoint); err == nil { + if err2 := ls.rollback(blockPoint); err2 != nil { + return fmt.Errorf( + "failed to rollback: %s", + err2, + ) + } + return nil + } + } + return fmt.Errorf("failed to recover database") +} + func (ls *LedgerState) scheduleCleanupConsumedUtxos() { ls.Lock() defer ls.Unlock() @@ -209,13 +261,17 @@ func (ls *LedgerState) handleEventChainSync(evt event.Event) { } func (ls *LedgerState) handleEventChainSyncRollback(e ChainsyncEvent) error { + return ls.rollback(e.Point) +} + +func (ls *LedgerState) rollback(point ocommon.Point) error { // Start a transaction txn := ls.db.Transaction(true) err := txn.Do(func(txn *database.Txn) error { // Remove rolled-back blocks in reverse order var tmpBlocks []models.Block result := txn.Metadata(). - Where("slot > ?", e.Point.Slot). + Where("slot > ?", point.Slot). Order("slot DESC"). Find(&tmpBlocks) if result.Error != nil { @@ -229,19 +285,21 @@ func (ls *LedgerState) handleEventChainSyncRollback(e ChainsyncEvent) error { // Delete rolled-back UTxOs var tmpUtxos []models.Utxo result = txn.Metadata(). - Where("added_slot > ?", e.Point.Slot). + Where("added_slot > ?", point.Slot). Order("id DESC"). Find(&tmpUtxos) if result.Error != nil { - return fmt.Errorf("remove rolled-backup UTxOs: %w", result.Error) + return fmt.Errorf("remove rolled-back UTxOs: %w", result.Error) } - if err := models.UtxosDeleteTxn(txn, tmpUtxos); err != nil { - return fmt.Errorf("remove rolled-back UTxOs: %w", err) + if len(tmpUtxos) > 0 { + if err := models.UtxosDeleteTxn(txn, tmpUtxos); err != nil { + return fmt.Errorf("remove rolled-back UTxOs: %w", err) + } } // Restore spent UTxOs result = txn.Metadata(). Model(models.Utxo{}). - Where("deleted_slot > ?", e.Point.Slot). + Where("deleted_slot > ?", point.Slot). Update("deleted_slot", 0) if result.Error != nil { return fmt.Errorf( @@ -260,15 +318,15 @@ func (ls *LedgerState) handleEventChainSyncRollback(e ChainsyncEvent) error { event.NewEvent( ChainRollbackEventType, ChainRollbackEvent{ - Point: e.Point, + Point: point, }, ), ) ls.config.Logger.Info( fmt.Sprintf( "chain rolled back, new tip: %x at slot %d", - e.Point.Hash, - e.Point.Slot, + point.Hash, + point.Slot, ), "component", "ledger",