diff --git a/core/node/crypto/testutil.go b/core/node/crypto/testutil.go index 075876057..94312a2e7 100644 --- a/core/node/crypto/testutil.go +++ b/core/node/crypto/testutil.go @@ -679,17 +679,10 @@ func (NoopChainMonitor) OnStopped(OnChainMonitorStoppedCallback) {} // Run individual tests with -run to find specific leaking tests. func TestMainForLeaksIgnoreGeth() { // Geth's simulated backend leaks a lot of goroutines. - // Unfortunately goleak doesn't have options to ignore by module or package, + // Unfortunately goleak doesn't have optiosn to ignore by module or package, // so some custom error string parsing is required to filter them out. - - // pgx also sometimes has a "leaked" goroutine that takes about 500ms to terminate after - // a pool is closed. Here we can configure ignoring this as it is a specific function. - ignorePgxPoolHealthCheck := goleak.IgnoreAnyFunction( - "github.com/jackc/pgx/v5/pgxpool.(*Pool).triggerHealthCheck.func1", - ) - now := time.Now() - err := goleak.Find(ignorePgxPoolHealthCheck) + err := goleak.Find() elapsed := time.Since(now) if err != nil { msg := err.Error() diff --git a/core/node/events/miniblock_producer.go b/core/node/events/miniblock_producer.go index ff993e3c3..32da2f2d5 100644 --- a/core/node/events/miniblock_producer.go +++ b/core/node/events/miniblock_producer.go @@ -276,10 +276,7 @@ func (p *miniblockProducer) TestMakeMiniblock( err = SleepWithContext(ctx, 10*time.Millisecond) if err != nil { - return nil, AsRiverError(err, Err_INTERNAL). - Func("TestMakeMiniblock"). - Message("Timed out while waiting for make_miniblock job to be scheduled"). - Tag("streamId", streamId) + return nil, err } } @@ -291,10 +288,7 @@ func (p *miniblockProducer) TestMakeMiniblock( err = SleepWithContext(ctx, 10*time.Millisecond) if err != nil { - return nil, AsRiverError(err, Err_INTERNAL). - Func("TestMakeMiniblock"). - Message("Timed out while waiting for make_miniblock job to terminate"). - Tag("streamId", streamId) + return nil, err } } diff --git a/core/node/events/stream.go b/core/node/events/stream.go index 471ed08c4..677f9fbc2 100644 --- a/core/node/events/stream.go +++ b/core/node/events/stream.go @@ -734,8 +734,7 @@ func (s *streamImpl) addEventLocked(ctx context.Context, event *ParsedEvent) err // TODO: for some classes of errors, it's not clear if event was added or not // for those, perhaps entire Stream structure should be scrapped and reloaded if err != nil { - return AsRiverError(err, Err_DB_OPERATION_FAILURE). - Tag("inMemoryBlocks", len(s.view().blocks)) + return err } s.setView(newSV) diff --git a/core/node/events/stream_cache.go b/core/node/events/stream_cache.go index 9d7be479c..1f2f08a1e 100644 --- a/core/node/events/stream_cache.go +++ b/core/node/events/stream_cache.go @@ -315,7 +315,7 @@ func (s *streamCacheImpl) tryLoadStreamRecord( for { select { case <-ctx.Done(): - return nil, AsRiverError(ctx.Err(), Err_INTERNAL).Message("Timeout waiting for cache record to be created") + return nil, ctx.Err() case <-time.After(delay): stream, _ := s.cache.Load(streamId) if stream != nil { diff --git a/core/node/rpc/archiver_test.go b/core/node/rpc/archiver_test.go index 36d77aa34..7f2cfe9c0 100644 --- a/core/node/rpc/archiver_test.go +++ b/core/node/rpc/archiver_test.go @@ -9,7 +9,6 @@ import ( "connectrpc.com/connect" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" - "github.com/gammazero/workerpool" "github.com/stretchr/testify/assert" "github.com/river-build/river/core/contracts/river" @@ -45,11 +44,7 @@ func fillUserSettingsStreamWithData( err, Err_INTERNAL, ).Message("Failed to add event to stream"). - Func("fillUserSettingsStreamWithData"). - Tag("streamId", streamId). - Tag("miniblockNum", i). - Tag("mbEventNum", j). - Tag("numMbs", numMBs) + Func("fillUserSettingsStreamWithData") } } prevMB, err = makeMiniblock(ctx, client, streamId, false, prevMB.Num) @@ -58,10 +53,7 @@ func fillUserSettingsStreamWithData( err, Err_INTERNAL, ).Message("Failed to create miniblock"). - Func("fillUserSettingsStreamWithData"). - Tag("streamId", streamId). - Tag("miniblockNum", i). - Tag("numMbs", numMBs) + Func("fillUserSettingsStreamWithData") } } return prevMB, nil @@ -78,10 +70,12 @@ func createUserSettingsStreamsWithData( streamIds := make([]StreamId, numStreams) errChan := make(chan error, numStreams) - wp := workerpool.New(10) + var wg sync.WaitGroup + wg.Add(numStreams) for i := 0; i < numStreams; i++ { - wp.Submit(func() { + go func(i int) { + defer wg.Done() wallet, err := crypto.NewWallet(ctx) if err != nil { errChan <- err @@ -96,29 +90,20 @@ func createUserSettingsStreamsWithData( &StreamSettings{DisableMiniblockCreation: true}, ) if err != nil { - errChan <- AsRiverError(err, Err_INTERNAL). - Message("Failed to create stream"). - Func("createUserSettingsStreamsWithData"). - Tag("streamNum", i). - Tag("streamId", streamId) + errChan <- AsRiverError(err, Err_INTERNAL).Message("Failed to create stream").Func("createUserSettingsStreamsWithData") return } streamIds[i] = streamId _, err = fillUserSettingsStreamWithData(ctx, streamId, wallet, client, numMBs, numEventsPerMB, mbRef) if err != nil { - errChan <- AsRiverError(err, Err_INTERNAL). - Message("Failed to fill stream with data"). - Func("createUserSettingsStreamsWithData"). - Tag("streamNum", i). - Tag("streamId", streamId) + errChan <- AsRiverError(err, Err_INTERNAL).Message("Failed to fill stream with data").Func("createUserSettingsStreamsWithData") return } - }) + }(i) } - wp.StopWait() - + wg.Wait() if len(errChan) > 0 { return nil, nil, <-errChan } diff --git a/core/node/rpc/notification_test.go b/core/node/rpc/notification_test.go index 2f99d00f3..f2bde17d9 100644 --- a/core/node/rpc/notification_test.go +++ b/core/node/rpc/notification_test.go @@ -31,8 +31,6 @@ import ( "github.com/river-build/river/core/node/testutils/testcert" ) -var notificationDeliveryDelay = 30 * time.Second - // TestNotifications is designed in such a way that all tests are run in parallel // and share the same set of nodes, notification service and client. func TestNotifications(t *testing.T) { @@ -122,7 +120,7 @@ func testGDMAPNNotificationAfterUnsubscribe( notificationsForEvent := nc.ApnPushNotifications[eventHash] return cmp.Equal(notificationsForEvent, expectedUsersToReceiveNotification) - }, notificationDeliveryDelay, 2500*time.Millisecond, "Didn't receive expected notifications for stream %s", test.gdmStreamID) + }, 10*time.Second, 2500*time.Millisecond, "Didn't receive expected notifications") // userA unsubscribes and userB subscribes using the same device. // for tests the deviceToken is the users wallet address, in this case @@ -165,7 +163,7 @@ func testGDMAPNNotificationAfterUnsubscribe( notificationsForEvent := nc.ApnPushNotifications[eventHash] return len(notificationsForEvent) != 0 - }, notificationDeliveryDelay, 2500*time.Millisecond, "Receive unexpected notification") + }, 10*time.Second, 2500*time.Millisecond, "Receive unexpected notification") } func testGDMMessageWithNoMentionsRepliesAndReaction( @@ -234,7 +232,7 @@ func testGDMMessageWithNoMentionsRepliesAndReaction( return cmp.Equal(webNotifications, expectedUsersToReceiveNotification) && cmp.Equal(apnNotifications, expectedUsersToReceiveNotification) - }, notificationDeliveryDelay, 2500*time.Millisecond, "Didn't receive expected notifications for stream %s", test.gdmStreamID) + }, 10*time.Second, 2500*time.Millisecond, "Didn't receive expected notifications") // Wait a bit to ensure that no more notifications come in test.req.Never(func() bool { @@ -280,7 +278,7 @@ func testGDMReactionMessage( defer nc.WebPushNotificationsMu.Unlock() return cmp.Equal(nc.WebPushNotifications[eventHash], expectedUsersToReceiveNotification) - }, notificationDeliveryDelay, 100*time.Millisecond, "user A Didn't receive expected notification for stream %s", test.gdmStreamID) + }, 15*time.Second, 100*time.Millisecond, "user A Didn't receive expected notification") // ensure that user B and C never get a notification test.req.Never(func() bool { @@ -428,7 +426,7 @@ func testDMMessageWithDefaultUserNotificationsPreferences( return cmp.Equal(webNotifications, expectedUsersToReceiveNotification) && cmp.Equal(apnNotifications, expectedUsersToReceiveNotification) - }, notificationDeliveryDelay, 100*time.Millisecond, "Didn't receive expected notifications for stream %s", test.dmStreamID[:]) + }, 20*time.Second, 100*time.Millisecond, "Didn't receive expected notifications") // Wait a bit to ensure that no more notifications come in test.req.Never(func() bool { @@ -562,7 +560,7 @@ func testSpaceChannelPlainMessage( return cmp.Equal(webNotifications, expectedUsersToReceiveNotification) && cmp.Equal(apnNotifications, expectedUsersToReceiveNotification) - }, notificationDeliveryDelay, 100*time.Millisecond, "Didn't receive expected notifications for stream %s", test.channelID[:]) + }, 20*time.Second, 100*time.Millisecond, "Didn't receive expected notifications") // Wait a bit to ensure that no more notifications come in test.req.Never(func() bool { @@ -641,7 +639,7 @@ func testSpaceChannelAtChannelTag( return cmp.Equal(webNotifications, expectedUsersToReceiveNotification) && cmp.Equal(apnNotifications, expectedUsersToReceiveNotification) - }, notificationDeliveryDelay, 100*time.Millisecond, "Didn't receive expected notifications for stream %s", test.channelID[:]) + }, 20*time.Second, 100*time.Millisecond, "Didn't receive expected notifications") // Wait a bit to ensure that no more notifications come in test.req.Never(func() bool { @@ -722,7 +720,7 @@ func testSpaceChannelMentionTag( return webCount == len(expectedUsersToReceiveNotification) || apnCount == len(expectedUsersToReceiveNotification) - }, notificationDeliveryDelay, 100*time.Millisecond, "Didn't receive expected notifications for stream %s", test.channelID) + }, 20*time.Second, 100*time.Millisecond, "Didn't receive expected notifications") // Wait a bit to ensure that no more notifications come in test.req.Never(func() bool { diff --git a/core/node/storage/migrations_test.go b/core/node/storage/migrations_test.go index 5ac2c4c4f..b607adcb3 100644 --- a/core/node/storage/migrations_test.go +++ b/core/node/storage/migrations_test.go @@ -14,11 +14,7 @@ func TestMigrateExistingDb(t *testing.T) { testParams := setupStreamStorageTest(t) ctx := testParams.ctx - - // Tear down the store and defer remaining cleanup - testParams.pgStreamStore.Close(ctx) - defer testParams.ctxCloser() - defer testParams.schemaDeleter() + defer testParams.closer() pool, err := CreateAndValidatePgxPool( ctx, diff --git a/core/node/storage/pg_stream_store.go b/core/node/storage/pg_stream_store.go index ebf1a2787..0ab75fac1 100644 --- a/core/node/storage/pg_stream_store.go +++ b/core/node/storage/pg_stream_store.go @@ -31,7 +31,6 @@ type PostgresStreamStore struct { exitSignal chan error nodeUUID string cleanupListenFunc func() - cleanupLockFunc func() numPartitions int } @@ -142,194 +141,7 @@ func NewPostgresStreamStore( return store, nil } -// computeLockIdFromSchema computes an int64 which is a hash of the schema name. -// We will use this int64 as the key of a pg advisory lock to ensure only one -// node has R/W access to the schema at a time. -func (s *PostgresStreamStore) computeLockIdFromSchema() int64 { - return (int64)(xxhash.Sum64String(s.schemaName)) -} - -// maintainSchemaLock periodically checks the connection that established the -// lock on the schema and will attempt to establish a new connection and take -// the lock again if the connection is lost. If the lock is lost and cannot be -// re-established, the store will send an exit signal to shut down the node. -// This is blocking and is intended to be launched as a go routine. -func (s *PostgresStreamStore) maintainSchemaLock( - ctx context.Context, - conn *pgxpool.Conn, -) { - log := dlog.FromCtx(ctx) - defer conn.Release() - - lockId := s.computeLockIdFromSchema() - - count := 0 - for { - // Check for connection health with a ping. Also, maintain the connection in the - // case of idle timeouts. - err := conn.Ping(ctx) - count++ - - if count%100 == 0 { - log.Debug("DB Ping!", "error", err) - } - - if err != nil { - // We expect cancellation only on node shutdown. In this case, - // do not send an error signal. - if errors.Is(err, context.Canceled) { - return - } - - log.Warn("Error pinging pgx connection maintaining the session lock, closing connection", "error", err) - - // Close the connection to encourage the db server to immediately clean up the - // session so we can go ahead and re-take the lock from a new session. - conn.Conn().Close(ctx) - // Fine to call multiple times. - conn.Release() - - // Attempt to re-acquire a connection - conn, err = s.acquireConnection(ctx) - - // Shutdown the node for non-cancellation errors - if errors.Is(err, context.Canceled) { - return - } else if err != nil { - err = AsRiverError(err, Err_RESOURCE_EXHAUSTED). - Message("Lost connection and unable to re-acquire a connection"). - Func("maintainSchemaLock"). - Tag("schema", s.schemaName). - Tag("lockId", lockId). - LogError(dlog.FromCtx(ctx)) - s.exitSignal <- err - return - } - - log.Info("maintainSchemaLock: reacquired connection, re-establishing session lock") - defer conn.Release() - - // Attempt to re-establish the lock - var acquired bool - err := conn.QueryRow( - ctx, - "select pg_try_advisory_lock($1)", - lockId, - ).Scan(&acquired) - - // Shutdown the node for non-cancellation errors. - if errors.Is(err, context.Canceled) { - return - } else if err != nil { - err = AsRiverError(err, Err_RESOURCE_EXHAUSTED). - Message("Lost connection and unable to re-acquire schema lock"). - Func("maintainSchemaLock"). - Tag("schema", s.schemaName). - Tag("lockId", lockId). - LogError(dlog.FromCtx(ctx)) - s.exitSignal <- err - } - - if !acquired { - err = AsRiverError(fmt.Errorf("schema lock was not available"), Err_RESOURCE_EXHAUSTED). - Message("Lost connection and unable to re-acquire schema lock"). - Func("maintainSchemaLock"). - Tag("schema", s.schemaName). - Tag("lockId", lockId). - LogError(dlog.FromCtx(ctx)) - s.exitSignal <- err - } - - if err = SleepWithContext(ctx, 1*time.Second); err != nil { - return - } - } - } -} - -// acquireSchemaLock waits until it is able to acquire a session-wide pg advisory lock -// on the integer id derived from the hash of this node's schema name, and launches a -// go routine to periodically check the connection maintaining the lock. -func (s *PostgresStreamStore) acquireSchemaLock(ctx context.Context) error { - log := dlog.FromCtx(ctx) - lockId := s.computeLockIdFromSchema() - - // Acquire connection - conn, err := s.acquireConnection(ctx) - if err != nil { - return err - } - - log.Info("Acquiring lock on database schema", "lockId", lockId, "nodeUUID", s.nodeUUID) - - var lockWasUnavailable bool - for { - var acquired bool - err := conn.QueryRow( - ctx, - "select pg_try_advisory_lock($1)", - lockId, - ).Scan(&acquired) - if err != nil { - return AsRiverError( - err, - Err_DB_OPERATION_FAILURE, - ).Message("Could not acquire lock on schema"). - Func("acquireSchemaLock"). - Tag("lockId", lockId). - Tag("nodeUUID", s.nodeUUID). - LogError(log) - } - - if acquired { - log.Info("Schema lock acquired", "lockId", lockId, "nodeUUID", s.nodeUUID) - break - } - - lockWasUnavailable = true - if err = SleepWithContext(ctx, 1*time.Second); err != nil { - return err - } - - log.Info( - "Unable to acquire lock on schema, retrying...", - "lockId", - lockId, - "nodeUUID", - s.nodeUUID, - ) - } - - // If we were not initially able to acquire the lock, delay startup after lock - // acquisition to give the other node any needed time to fully release all resources. - if lockWasUnavailable { - delay := s.config.StartupDelay - if delay == 0 { - delay = 2 * time.Second - } else if delay <= time.Millisecond { - delay = 0 - } - if delay > 0 { - log.Info( - "schema lock could not be immediately acquired; Delaying startup to let other instance exit", - "delay", - delay, - ) - - // Be responsive to context cancellations - if err = SleepWithContext(ctx, delay); err != nil { - return err - } - } - } - - // maintainSchemaLock is responsible for connection cleanup. - go s.maintainSchemaLock(ctx, conn) - return nil -} - func (s *PostgresStreamStore) initStreamStorage(ctx context.Context) error { - dlog.FromCtx(ctx).Info("Detecting other instances") err := s.txRunner( ctx, "listOtherInstances", @@ -341,32 +153,13 @@ func (s *PostgresStreamStore) initStreamStorage(ctx context.Context) error { return err } - dlog.FromCtx(ctx).Info("Establishing database usage") - err = s.txRunner( + return s.txRunner( ctx, "initializeSingleNodeKey", pgx.ReadWrite, s.initializeSingleNodeKeyTx, nil, ) - if err != nil { - return err - } - - // After writing to the singlenodekey table, wait until we acquire the schema lock. - // In the meantime, any other nodes should detect the new entry in the table and - // shut themselves down. - ctx, cancel := context.WithCancel(ctx) - err = s.acquireSchemaLock(ctx) - s.cleanupLockFunc = cancel - - if err != nil { - return AsRiverError(err, Err_DB_OPERATION_FAILURE). - Message("Unable to acquire lock on database schema"). - Func("initStreamStorage") - } - - return nil } // txRunner runs transactions against the underlying postgres store. This override @@ -390,6 +183,31 @@ func (s *PostgresStreamStore) txRunner( ) } +// txRunnerWithUUIDCheck conditionally run the transaction only if a check against the +// singlenodekey table shows that this is still the only node writing to the database. +func (s *PostgresStreamStore) txRunnerWithUUIDCheck( + ctx context.Context, + name string, + accessMode pgx.TxAccessMode, + txFn func(context.Context, pgx.Tx) error, + opts *txRunnerOpts, + tags ...any, +) error { + return s.txRunner( + ctx, + name, + accessMode, + func(ctx context.Context, txn pgx.Tx) error { + if err := s.compareUUID(ctx, txn); err != nil { + return err + } + return txFn(ctx, txn) + }, + opts, + tags..., + ) +} + // CreatePartitionSuffix determines the partition mapping for a particular stream id the // hex encoding of the first byte of the xxHash of the stream ID. func CreatePartitionSuffix(streamId StreamId, numPartitions int) string { @@ -438,7 +256,7 @@ func (s *PostgresStreamStore) CreateStreamStorage( streamId StreamId, genesisMiniblock []byte, ) error { - return s.txRunner( + return s.txRunnerWithUUIDCheck( ctx, "CreateStreamStorage", pgx.ReadWrite, @@ -510,7 +328,7 @@ func (s *PostgresStreamStore) CreateStreamArchiveStorage( ctx context.Context, streamId StreamId, ) error { - return s.txRunner( + return s.txRunnerWithUUIDCheck( ctx, "CreateStreamArchiveStorage", pgx.ReadWrite, @@ -543,7 +361,7 @@ func (s *PostgresStreamStore) GetMaxArchivedMiniblockNumber( streamId StreamId, ) (int64, error) { var maxArchivedMiniblockNumber int64 - err := s.txRunner( + err := s.txRunnerWithUUIDCheck( ctx, "GetMaxArchivedMiniblockNumber", pgx.ReadWrite, @@ -605,7 +423,7 @@ func (s *PostgresStreamStore) WriteArchiveMiniblocks( startMiniblockNum int64, miniblocks [][]byte, ) error { - return s.txRunner( + return s.txRunnerWithUUIDCheck( ctx, "WriteArchiveMiniblocks", pgx.ReadWrite, @@ -669,7 +487,7 @@ func (s *PostgresStreamStore) ReadStreamFromLastSnapshot( numToRead int, ) (*ReadStreamFromLastSnapshotResult, error) { var ret *ReadStreamFromLastSnapshotResult - err := s.txRunner( + err := s.txRunnerWithUUIDCheck( ctx, "ReadStreamFromLastSnapshot", pgx.ReadWrite, @@ -757,7 +575,7 @@ func (s *PostgresStreamStore) readStreamFromLastSnapshotTx( if !(readFirstSeqNum <= snapshotMiniblockIndex && snapshotMiniblockIndex <= readLastSeqNum) { return nil, RiverError( Err_INTERNAL, - "Miniblocks consistency violation - snapshotMiniblockIndex is out of range", + "Miniblocks consistency violation - snapshotMiniblocIndex is out of range", "snapshotMiniblockIndex", snapshotMiniblockIndex, "readFirstSeqNum", readFirstSeqNum, "readLastSeqNum", readLastSeqNum) @@ -828,7 +646,7 @@ func (s *PostgresStreamStore) WriteEvent( minipoolSlot int, envelope []byte, ) error { - return s.txRunner( + return s.txRunnerWithUUIDCheck( ctx, "WriteEvent", pgx.ReadWrite, @@ -884,7 +702,8 @@ func (s *PostgresStreamStore) writeEventTx( } if generation != minipoolGeneration { return RiverError(Err_DB_OPERATION_FAILURE, "Wrong event generation in minipool"). - Tag("ExpectedGeneration", minipoolGeneration).Tag("ActualGeneration", generation) + Tag("ExpectedGeneration", minipoolGeneration).Tag("ActualGeneration", generation). + Tag("SlotNumber", slotNum) } if slotNum != counter { return RiverError(Err_DB_OPERATION_FAILURE, "Wrong slot number in minipool"). @@ -932,7 +751,7 @@ func (s *PostgresStreamStore) ReadMiniblocks( toExclusive int64, ) ([][]byte, error) { var miniblocks [][]byte - err := s.txRunner( + err := s.txRunnerWithUUIDCheck( ctx, "ReadMiniblocks", pgx.ReadWrite, @@ -1013,7 +832,7 @@ func (s *PostgresStreamStore) ReadMiniblocksByStream( streamId StreamId, onEachMb func(blockdata []byte, seqNum int) error, ) error { - return s.txRunner( + return s.txRunnerWithUUIDCheck( ctx, "ReadMiniblocksByStream", pgx.ReadWrite, @@ -1075,7 +894,7 @@ func (s *PostgresStreamStore) WriteMiniblockCandidate( blockNumber int64, miniblock []byte, ) error { - return s.txRunner( + return s.txRunnerWithUUIDCheck( ctx, "WriteMiniblockCandidate", pgx.ReadWrite, @@ -1153,7 +972,7 @@ func (s *PostgresStreamStore) ReadMiniblockCandidate( blockNumber int64, ) ([]byte, error) { var miniblock []byte - err := s.txRunner( + err := s.txRunnerWithUUIDCheck( ctx, "ReadMiniblockCandidate", pgx.ReadWrite, @@ -1236,7 +1055,7 @@ func (s *PostgresStreamStore) WriteMiniblocks( ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - return s.txRunner( + return s.txRunnerWithUUIDCheck( ctx, "WriteMiniblocks", pgx.ReadWrite, @@ -1446,7 +1265,7 @@ func (s *PostgresStreamStore) writeMiniblocksTx( func (s *PostgresStreamStore) GetStreamsNumber(ctx context.Context) (int, error) { var count int - err := s.txRunner( + err := s.txRunnerWithUUIDCheck( ctx, "GetStreamsNumber", pgx.ReadOnly, @@ -1474,6 +1293,39 @@ func (s *PostgresStreamStore) getStreamsNumberTx(ctx context.Context, tx pgx.Tx) return count, nil } +func (s *PostgresStreamStore) compareUUID(ctx context.Context, tx pgx.Tx) error { + log := dlog.FromCtx(ctx) + + rows, err := tx.Query(ctx, "SELECT uuid FROM singlenodekey") + if err != nil { + return err + } + defer rows.Close() + + allIds := []string{} + for rows.Next() { + var id string + err = rows.Scan(&id) + if err != nil { + return err + } + allIds = append(allIds, id) + } + + if len(allIds) == 1 && allIds[0] == s.nodeUUID { + return nil + } + + err = RiverError(Err_RESOURCE_EXHAUSTED, "No longer a current node, shutting down"). + Func("pg.compareUUID"). + Tag("currentUUID", s.nodeUUID). + Tag("schema", s.schemaName). + Tag("newUUIDs", allIds). + LogError(log) + s.exitSignal <- err + return err +} + // Close removes instance record from singlenodekey table, releases the listener connection, and // closes the postgres connection pool func (s *PostgresStreamStore) Close(ctx context.Context) { @@ -1483,9 +1335,6 @@ func (s *PostgresStreamStore) Close(ctx context.Context) { log.Error("Error when deleting singlenodekey entry", "error", err) } - // Cancel the go process that maintains the connection holding the session-wide schema lock - // and release it back to the pool. - s.cleanupLockFunc() // Cancel the notify listening func to release the listener connection before closing the pool. s.cleanupListenFunc() @@ -1510,7 +1359,7 @@ func (s *PostgresStreamStore) cleanupStreamStorageTx(ctx context.Context, tx pgx // GetStreams returns a list of all event streams func (s *PostgresStreamStore) GetStreams(ctx context.Context) ([]StreamId, error) { var streams []StreamId - err := s.txRunner( + err := s.txRunnerWithUUIDCheck( ctx, "GetStreams", pgx.ReadOnly, @@ -1554,7 +1403,7 @@ func (s *PostgresStreamStore) getStreamsTx(ctx context.Context, tx pgx.Tx) ([]St } func (s *PostgresStreamStore) DeleteStream(ctx context.Context, streamId StreamId) error { - return s.txRunner( + return s.txRunnerWithUUIDCheck( ctx, "DeleteStream", pgx.ReadWrite, @@ -1678,7 +1527,6 @@ func (s *PostgresStreamStore) acquireListeningConnection(ctx context.Context) *p conn.Release() } } - // Expect cancellations if node is shut down if err == context.Canceled { return nil } @@ -1689,56 +1537,9 @@ func (s *PostgresStreamStore) acquireListeningConnection(ctx context.Context) *p } } -// acquireConnection acquires a connection from the pgx pool. In the event of a failure to obtain -// a connection, the method retries multiple times to compensate for intermittent networking errors. -// If a connection cannot be obtained after multiple retries, it returns the error. Callers should -// make sure to release the connection when it is no longer being used. -func (s *PostgresStreamStore) acquireConnection(ctx context.Context) (*pgxpool.Conn, error) { - var err error - var conn *pgxpool.Conn - - log := dlog.FromCtx(ctx) - - // 20 retries * 1s delay = 20s of connection attempts - retries := 20 - for i := 0; i < retries; i++ { - conn, err = s.pool.Acquire(ctx) - if err == nil { - return conn, nil - } - - // Expect cancellations if node is shut down, abort retries and return wrapped error - if errors.Is(err, context.Canceled) { - break - } - - log.Info( - "Failed to acquire pgx connection, retrying", - "error", - err, - "nthRetry", - i+1, - ) - - // In the event of networking issues, wait a small period of time for recovery. - time.Sleep(500 * time.Millisecond) - } - - log.Error("Failed to acquire pgx connection", "error", err) - - // Assume final error is representative and return it. - return nil, AsRiverError( - err, - Err_DB_OPERATION_FAILURE, - ).Message("Could not acquire postgres connection"). - Func("acquireConnection") -} - -// listenForNewNodes maintains an open connection with postgres that listens for -// changes to the singlenodekey table in order to detect startup of competing nodes. -// Call it with a cancellable context and the method will return when the context is -// cancelled. Call it after storage has been initialized in order to not receive a -// notification when this node updates the table with it's own entry. +// Call with a cancellable context and pgx should terminate when the context is +// cancelled. Call after storage has been initialized in order to not receive a +// notification when this node updates the table. func (s *PostgresStreamStore) listenForNewNodes(ctx context.Context) { conn := s.acquireListeningConnection(ctx) if conn == nil { @@ -1771,7 +1572,6 @@ func (s *PostgresStreamStore) listenForNewNodes(ctx context.Context) { err = RiverError(Err_RESOURCE_EXHAUSTED, "No longer a current node, shutting down"). Func("listenForNewNodes"). Tag("schema", s.schemaName). - Tag("nodeUUID", s.nodeUUID). LogWarn(dlog.FromCtx(ctx)) // In the event of detecting node conflict, send the error to the main thread to shut down. @@ -1786,7 +1586,7 @@ func (s *PostgresStreamStore) DebugReadStreamData( streamId StreamId, ) (*DebugReadStreamDataResult, error) { var ret *DebugReadStreamDataResult - err := s.txRunner( + err := s.txRunnerWithUUIDCheck( ctx, "DebugReadStreamData", pgx.ReadWrite, @@ -1900,7 +1700,7 @@ func (s *PostgresStreamStore) GetLastMiniblockNumber( streamID StreamId, ) (int64, error) { var ret int64 - err := s.txRunner( + err := s.txRunnerWithUUIDCheck( ctx, "GetLastMiniblockNumber", pgx.ReadWrite, diff --git a/core/node/storage/pg_stream_store_test.go b/core/node/storage/pg_stream_store_test.go index 78d01cbd5..189eef811 100644 --- a/core/node/storage/pg_stream_store_test.go +++ b/core/node/storage/pg_stream_store_test.go @@ -4,7 +4,6 @@ import ( "context" "encoding/hex" "strings" - "sync" "testing" "time" @@ -28,11 +27,6 @@ type testStreamStoreParams struct { schema string config *config.DatabaseConfig closer func() - // For retaining schema and manually closing the store, use - // the following two cleanup functions to manually delete the - // schema and cancel the test context. - schemaDeleter func() - ctxCloser func() exitSignal chan error } @@ -76,13 +70,11 @@ func setupStreamStorageTest(t *testing.T) *testStreamStoreParams { schema: dbSchemaName, config: dbCfg, exitSignal: exitSignal, - closer: sync.OnceFunc(func() { + closer: func() { store.Close(ctx) dbCloser() ctxCloser() - }), - schemaDeleter: dbCloser, - ctxCloser: ctxCloser, + }, } return params @@ -572,8 +564,7 @@ func TestExitIfSecondStorageCreated(t *testing.T) { require := require.New(t) ctx := params.ctx pgStreamStore := params.pgStreamStore - defer params.schemaDeleter() - defer params.ctxCloser() + defer params.closer() // Give listener thread some time to start time.Sleep(500 * time.Millisecond) @@ -593,21 +584,15 @@ func TestExitIfSecondStorageCreated(t *testing.T) { instanceId2 := GenShortNanoid() exitSignal2 := make(chan error, 1) - - var secondStoreInitialized sync.WaitGroup - secondStoreInitialized.Add(1) - var pgStreamStore2 *PostgresStreamStore - go func() { - pgStreamStore2, err = NewPostgresStreamStore( - ctx, - pool, - instanceId2, - exitSignal2, - infra.NewMetricsFactory(nil, "", ""), - ) - require.NoError(err) - secondStoreInitialized.Done() - }() + pgStreamStore2, err := NewPostgresStreamStore( + ctx, + pool, + instanceId2, + exitSignal2, + infra.NewMetricsFactory(nil, "", ""), + ) + require.NoError(err) + defer pgStreamStore2.Close(ctx) // Give listener thread for the first store some time to detect the notification and emit an error time.Sleep(500 * time.Millisecond) @@ -615,10 +600,6 @@ func TestExitIfSecondStorageCreated(t *testing.T) { exitErr := <-params.exitSignal require.Error(exitErr) require.Equal(Err_RESOURCE_EXHAUSTED, AsRiverError(exitErr).Code) - pgStreamStore.Close(ctx) - - secondStoreInitialized.Wait() - defer pgStreamStore2.Close(ctx) result, err := pgStreamStore2.ReadStreamFromLastSnapshot(ctx, streamId, 0) require.NoError(err) diff --git a/core/scripts/db_blip.sh b/core/scripts/db_blip.sh deleted file mode 100755 index fb3cd5f44..000000000 --- a/core/scripts/db_blip.sh +++ /dev/null @@ -1,40 +0,0 @@ -# To test node resiliency in the event of intermittent network outages between -# the node and the database, do the following: -# -# 1. Update the docker-compose.yaml exposed port for postgres from 5433 to -# another port, such as 6433 -# -# 2. Run a toxiproxy docker container on the same network as the postgres -# instance, like so: -# -# docker run -d --name toxiproxy \ -# --network river_default \ -# -p 8474:8474 \ -# -p 5433:5433 \ -# ghcr.io/shopify/toxiproxy:2.5.0 -# -# 3. Create the toxiproxy proxy for postgres -# -# brew install shopify/shopify/toxiproxy -# -# toxiproxy-cli -h localhost:8474 create --listen 0.0.0.0:5433 \ -# --upstream river-postgres-1:5432 postgres -# -# 4. Restart any local dev and confirm proxy is functioning with unit tests. -# -# Note: to interact with the db while traffic is interrupted through another -# tool such as pgadmin, be sure to reconfigure your pgadmin connection -# to use port 6433, or whatever port you used in step 1. -# -# 5. Add and remove toxiproxy rules, which are called toxics, or run the commands -# below to simulate a temporary network outage of 10s. - - -# Break all connections. New connections will timeout after 100ms. -toxiproxy-cli -h localhost:8474 toxic add -t timeout -a timeout=100 postgres - -# Wait for your desired duration -sleep 10 - -# Remove the toxic to restore connections -toxiproxy-cli -h localhost:8474 toxic remove -toxicName timeout_downstream postgres