Skip to content

Commit

Permalink
Revert "[RIVER-1738] Update pg store to use advisory locking instead …
Browse files Browse the repository at this point in the history
…of comparing UUIDs per transaction" (#1885)

Reverts #1739

Local dev is hecka slow for Evan et al, reverting this change until I
can find a fix for the flaky tests that emerge when I do proper polling.
  • Loading branch information
clemire authored Dec 19, 2024
1 parent 3bd95cc commit ef278e8
Show file tree
Hide file tree
Showing 10 changed files with 118 additions and 412 deletions.
11 changes: 2 additions & 9 deletions core/node/crypto/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,17 +679,10 @@ func (NoopChainMonitor) OnStopped(OnChainMonitorStoppedCallback) {}
// Run individual tests with -run to find specific leaking tests.
func TestMainForLeaksIgnoreGeth() {
// Geth's simulated backend leaks a lot of goroutines.
// Unfortunately goleak doesn't have options to ignore by module or package,
// Unfortunately goleak doesn't have optiosn to ignore by module or package,
// so some custom error string parsing is required to filter them out.

// pgx also sometimes has a "leaked" goroutine that takes about 500ms to terminate after
// a pool is closed. Here we can configure ignoring this as it is a specific function.
ignorePgxPoolHealthCheck := goleak.IgnoreAnyFunction(
"github.com/jackc/pgx/v5/pgxpool.(*Pool).triggerHealthCheck.func1",
)

now := time.Now()
err := goleak.Find(ignorePgxPoolHealthCheck)
err := goleak.Find()
elapsed := time.Since(now)
if err != nil {
msg := err.Error()
Expand Down
10 changes: 2 additions & 8 deletions core/node/events/miniblock_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,10 +276,7 @@ func (p *miniblockProducer) TestMakeMiniblock(

err = SleepWithContext(ctx, 10*time.Millisecond)
if err != nil {
return nil, AsRiverError(err, Err_INTERNAL).
Func("TestMakeMiniblock").
Message("Timed out while waiting for make_miniblock job to be scheduled").
Tag("streamId", streamId)
return nil, err
}
}

Expand All @@ -291,10 +288,7 @@ func (p *miniblockProducer) TestMakeMiniblock(

err = SleepWithContext(ctx, 10*time.Millisecond)
if err != nil {
return nil, AsRiverError(err, Err_INTERNAL).
Func("TestMakeMiniblock").
Message("Timed out while waiting for make_miniblock job to terminate").
Tag("streamId", streamId)
return nil, err
}
}

Expand Down
3 changes: 1 addition & 2 deletions core/node/events/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,8 +734,7 @@ func (s *streamImpl) addEventLocked(ctx context.Context, event *ParsedEvent) err
// TODO: for some classes of errors, it's not clear if event was added or not
// for those, perhaps entire Stream structure should be scrapped and reloaded
if err != nil {
return AsRiverError(err, Err_DB_OPERATION_FAILURE).
Tag("inMemoryBlocks", len(s.view().blocks))
return err
}

s.setView(newSV)
Expand Down
2 changes: 1 addition & 1 deletion core/node/events/stream_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func (s *streamCacheImpl) tryLoadStreamRecord(
for {
select {
case <-ctx.Done():
return nil, AsRiverError(ctx.Err(), Err_INTERNAL).Message("Timeout waiting for cache record to be created")
return nil, ctx.Err()
case <-time.After(delay):
stream, _ := s.cache.Load(streamId)
if stream != nil {
Expand Down
35 changes: 10 additions & 25 deletions core/node/rpc/archiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"connectrpc.com/connect"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/gammazero/workerpool"
"github.com/stretchr/testify/assert"

"github.com/river-build/river/core/contracts/river"
Expand Down Expand Up @@ -45,11 +44,7 @@ func fillUserSettingsStreamWithData(
err,
Err_INTERNAL,
).Message("Failed to add event to stream").
Func("fillUserSettingsStreamWithData").
Tag("streamId", streamId).
Tag("miniblockNum", i).
Tag("mbEventNum", j).
Tag("numMbs", numMBs)
Func("fillUserSettingsStreamWithData")
}
}
prevMB, err = makeMiniblock(ctx, client, streamId, false, prevMB.Num)
Expand All @@ -58,10 +53,7 @@ func fillUserSettingsStreamWithData(
err,
Err_INTERNAL,
).Message("Failed to create miniblock").
Func("fillUserSettingsStreamWithData").
Tag("streamId", streamId).
Tag("miniblockNum", i).
Tag("numMbs", numMBs)
Func("fillUserSettingsStreamWithData")
}
}
return prevMB, nil
Expand All @@ -78,10 +70,12 @@ func createUserSettingsStreamsWithData(
streamIds := make([]StreamId, numStreams)
errChan := make(chan error, numStreams)

wp := workerpool.New(10)
var wg sync.WaitGroup
wg.Add(numStreams)

for i := 0; i < numStreams; i++ {
wp.Submit(func() {
go func(i int) {
defer wg.Done()
wallet, err := crypto.NewWallet(ctx)
if err != nil {
errChan <- err
Expand All @@ -96,29 +90,20 @@ func createUserSettingsStreamsWithData(
&StreamSettings{DisableMiniblockCreation: true},
)
if err != nil {
errChan <- AsRiverError(err, Err_INTERNAL).
Message("Failed to create stream").
Func("createUserSettingsStreamsWithData").
Tag("streamNum", i).
Tag("streamId", streamId)
errChan <- AsRiverError(err, Err_INTERNAL).Message("Failed to create stream").Func("createUserSettingsStreamsWithData")
return
}
streamIds[i] = streamId

_, err = fillUserSettingsStreamWithData(ctx, streamId, wallet, client, numMBs, numEventsPerMB, mbRef)
if err != nil {
errChan <- AsRiverError(err, Err_INTERNAL).
Message("Failed to fill stream with data").
Func("createUserSettingsStreamsWithData").
Tag("streamNum", i).
Tag("streamId", streamId)
errChan <- AsRiverError(err, Err_INTERNAL).Message("Failed to fill stream with data").Func("createUserSettingsStreamsWithData")
return
}
})
}(i)
}

wp.StopWait()

wg.Wait()
if len(errChan) > 0 {
return nil, nil, <-errChan
}
Expand Down
18 changes: 8 additions & 10 deletions core/node/rpc/notification_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ import (
"github.com/river-build/river/core/node/testutils/testcert"
)

var notificationDeliveryDelay = 30 * time.Second

// TestNotifications is designed in such a way that all tests are run in parallel
// and share the same set of nodes, notification service and client.
func TestNotifications(t *testing.T) {
Expand Down Expand Up @@ -122,7 +120,7 @@ func testGDMAPNNotificationAfterUnsubscribe(
notificationsForEvent := nc.ApnPushNotifications[eventHash]

return cmp.Equal(notificationsForEvent, expectedUsersToReceiveNotification)
}, notificationDeliveryDelay, 2500*time.Millisecond, "Didn't receive expected notifications for stream %s", test.gdmStreamID)
}, 10*time.Second, 2500*time.Millisecond, "Didn't receive expected notifications")

// userA unsubscribes and userB subscribes using the same device.
// for tests the deviceToken is the users wallet address, in this case
Expand Down Expand Up @@ -165,7 +163,7 @@ func testGDMAPNNotificationAfterUnsubscribe(
notificationsForEvent := nc.ApnPushNotifications[eventHash]

return len(notificationsForEvent) != 0
}, notificationDeliveryDelay, 2500*time.Millisecond, "Receive unexpected notification")
}, 10*time.Second, 2500*time.Millisecond, "Receive unexpected notification")
}

func testGDMMessageWithNoMentionsRepliesAndReaction(
Expand Down Expand Up @@ -234,7 +232,7 @@ func testGDMMessageWithNoMentionsRepliesAndReaction(

return cmp.Equal(webNotifications, expectedUsersToReceiveNotification) &&
cmp.Equal(apnNotifications, expectedUsersToReceiveNotification)
}, notificationDeliveryDelay, 2500*time.Millisecond, "Didn't receive expected notifications for stream %s", test.gdmStreamID)
}, 10*time.Second, 2500*time.Millisecond, "Didn't receive expected notifications")

// Wait a bit to ensure that no more notifications come in
test.req.Never(func() bool {
Expand Down Expand Up @@ -280,7 +278,7 @@ func testGDMReactionMessage(
defer nc.WebPushNotificationsMu.Unlock()

return cmp.Equal(nc.WebPushNotifications[eventHash], expectedUsersToReceiveNotification)
}, notificationDeliveryDelay, 100*time.Millisecond, "user A Didn't receive expected notification for stream %s", test.gdmStreamID)
}, 15*time.Second, 100*time.Millisecond, "user A Didn't receive expected notification")

// ensure that user B and C never get a notification
test.req.Never(func() bool {
Expand Down Expand Up @@ -428,7 +426,7 @@ func testDMMessageWithDefaultUserNotificationsPreferences(

return cmp.Equal(webNotifications, expectedUsersToReceiveNotification) &&
cmp.Equal(apnNotifications, expectedUsersToReceiveNotification)
}, notificationDeliveryDelay, 100*time.Millisecond, "Didn't receive expected notifications for stream %s", test.dmStreamID[:])
}, 20*time.Second, 100*time.Millisecond, "Didn't receive expected notifications")

// Wait a bit to ensure that no more notifications come in
test.req.Never(func() bool {
Expand Down Expand Up @@ -562,7 +560,7 @@ func testSpaceChannelPlainMessage(

return cmp.Equal(webNotifications, expectedUsersToReceiveNotification) &&
cmp.Equal(apnNotifications, expectedUsersToReceiveNotification)
}, notificationDeliveryDelay, 100*time.Millisecond, "Didn't receive expected notifications for stream %s", test.channelID[:])
}, 20*time.Second, 100*time.Millisecond, "Didn't receive expected notifications")

// Wait a bit to ensure that no more notifications come in
test.req.Never(func() bool {
Expand Down Expand Up @@ -641,7 +639,7 @@ func testSpaceChannelAtChannelTag(

return cmp.Equal(webNotifications, expectedUsersToReceiveNotification) &&
cmp.Equal(apnNotifications, expectedUsersToReceiveNotification)
}, notificationDeliveryDelay, 100*time.Millisecond, "Didn't receive expected notifications for stream %s", test.channelID[:])
}, 20*time.Second, 100*time.Millisecond, "Didn't receive expected notifications")

// Wait a bit to ensure that no more notifications come in
test.req.Never(func() bool {
Expand Down Expand Up @@ -722,7 +720,7 @@ func testSpaceChannelMentionTag(

return webCount == len(expectedUsersToReceiveNotification) ||
apnCount == len(expectedUsersToReceiveNotification)
}, notificationDeliveryDelay, 100*time.Millisecond, "Didn't receive expected notifications for stream %s", test.channelID)
}, 20*time.Second, 100*time.Millisecond, "Didn't receive expected notifications")

// Wait a bit to ensure that no more notifications come in
test.req.Never(func() bool {
Expand Down
6 changes: 1 addition & 5 deletions core/node/storage/migrations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,7 @@ func TestMigrateExistingDb(t *testing.T) {

testParams := setupStreamStorageTest(t)
ctx := testParams.ctx

// Tear down the store and defer remaining cleanup
testParams.pgStreamStore.Close(ctx)
defer testParams.ctxCloser()
defer testParams.schemaDeleter()
defer testParams.closer()

pool, err := CreateAndValidatePgxPool(
ctx,
Expand Down
Loading

0 comments on commit ef278e8

Please sign in to comment.