Skip to content

Commit

Permalink
Add a pause to fix flaky getstreamex forwarding test. (#262)
Browse files Browse the repository at this point in the history
I think inserting a short sleep will fix the flaky getstreamex
forwarding test, but if it happens again I will disable it.
  • Loading branch information
clemire authored Jun 25, 2024
1 parent 9ad9097 commit 78260ae
Showing 1 changed file with 27 additions and 13 deletions.
40 changes: 27 additions & 13 deletions core/node/rpc/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"strconv"
"testing"
"time"

"github.com/river-build/river/core/node/crypto"
"github.com/river-build/river/core/node/dlog"
Expand Down Expand Up @@ -969,19 +970,32 @@ func TestForwardingWithRetries(t *testing.T) {
require.Equal(t, streamId[:], resp.Msg.Stream.NextSyncCookie.StreamId)
},
"GetStreamEx": func(t *testing.T, ctx context.Context, client protocolconnect.StreamServiceClient, streamId StreamId) {
resp, err := client.GetStreamEx(ctx, connect.NewRequest(&protocol.GetStreamExRequest{
StreamId: streamId[:],
}))
require.NoError(t, err)

// Read messages
msgs := make([]*protocol.GetStreamExResponse, 0)
for resp.Receive() {
msgs = append(msgs, resp.Msg())
}
require.NoError(t, resp.Err())
// Expect 1 miniblock, 1 empty minipool message.
require.Len(t, msgs, 2)
// Note: the GetStreamEx implementation bypasses the stream cache, which fetches miniblocks from the
// registry if none are yet present in the local cache. The stream creation flow returns when a quorum of
// nodes terminates the stream creation call successfully, meaning that some nodes may not have finished
// committing the stream's genesis miniblock to storage yet. We use the info request to force the making of
// a miniblock for this stream, but these streams are replicated and the debug make miniblock call only
// operates on a local node. This means that the GetStreamEx request may occasionally return an empty
// stream on a node that hasn't caught up to the latest state, so we retry until we get the expected result.
require.Eventually(
t,
func() bool {
resp, err := client.GetStreamEx(ctx, connect.NewRequest(&protocol.GetStreamExRequest{
StreamId: streamId[:],
}))
require.NoError(t, err)

// Read messages
msgs := make([]*protocol.GetStreamExResponse, 0)
for resp.Receive() {
msgs = append(msgs, resp.Msg())
}
require.NoError(t, resp.Err())
return len(msgs) == 2
},
10*time.Second,
100*time.Millisecond,
)
},
}

Expand Down

0 comments on commit 78260ae

Please sign in to comment.