Skip to content

Commit

Permalink
Fix minipools event comparison in test and re-enable repl test (#404)
Browse files Browse the repository at this point in the history
  • Loading branch information
sergekh2 authored Jul 14, 2024
1 parent f6eefb3 commit ba97c03
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 8 deletions.
5 changes: 0 additions & 5 deletions core/node/rpc/repl_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package rpc

import (
"os"
"testing"

"github.com/river-build/river/core/node/crypto"
Expand Down Expand Up @@ -53,10 +52,6 @@ func TestReplAdd(t *testing.T) {
}

func TestReplMiniblock(t *testing.T) {
if os.Getenv("RIVER_TEST_ENABLE_FLAKY") != "true" {
t.Skip("skipping flaky test")
}

tt := newServiceTester(t, serviceTesterOpts{numNodes: 5, replicationFactor: 5, start: true})
ctx := tt.ctx
require := tt.require
Expand Down
67 changes: 64 additions & 3 deletions core/node/rpc/tester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package rpc
import (
"context"
"fmt"
"hash/fnv"
"log"
"math/big"
"net"
"slices"
"testing"
"time"

Expand Down Expand Up @@ -274,6 +276,12 @@ func testClient(url string) protocolconnect.StreamServiceClient {
return protocolconnect.NewStreamServiceClient(nodes.TestHttpClientMaker(), url, connect.WithGRPCWeb())
}

func bytesHash(b []byte) uint64 {
h := fnv.New64a()
_, _ = h.Write(b)
return h.Sum64()
}

func (st *serviceTester) compareStreamDataInStorage(
t assert.TestingT,
streamId StreamId,
Expand All @@ -290,8 +298,12 @@ func (st *serviceTester) compareStreamDataInStorage(
data = append(data, d)
}

var evHashes0 []uint64
for i, d := range data {
failed := false

failed = !assert.Equal(t, streamId, d.StreamId, "StreamId, node %d", i) || failed

failed = !assert.Equal(t, expectedMbs, len(d.Miniblocks), "Miniblocks, node %d", i) || failed

eventsLen := 0
Expand All @@ -303,9 +315,58 @@ func (st *serviceTester) compareStreamDataInStorage(
}
failed = !assert.Equal(t, expectedEvents, eventsLen, "Events, node %d", i) || failed

if !failed && i > 0 {
assert.EqualValues(t, data[0].Miniblocks, d.Miniblocks, "Bad mbs in node %d", i)
assert.EqualValues(t, data[0].Events, d.Events, "Bad events in node %d", i)
if !failed {
// All events should have the same generation and consecutive slots
// starting with -1 (marker slot for in database table)
if len(d.Events) > 1 {
gen := d.Events[0].Generation
for j, e := range d.Events {
if !assert.Equal(t, gen, e.Generation, "Mismatching event generation") ||
!assert.EqualValues(t, j-1, e.Slot, "Mismatching event slot") {
failed = true
break
}
}
}
}

// Events in minipools might be in different order
evHashes := []uint64{}
for _, e := range d.Events {
evHashes = append(evHashes, bytesHash(e.Data))
}
slices.Sort(evHashes)

if i > 0 {
if !failed {
// Compare fields separately to get better error messages
assert.Equal(
t,
data[0].LatestSnapshotMiniblockNum,
d.LatestSnapshotMiniblockNum,
"Bad snapshot num in node %d",
i,
)
for j, mb := range data[i].Miniblocks {
exp := data[0].Miniblocks[j]
_ = assert.EqualValues(t, exp.MiniblockNumber, mb.MiniblockNumber, "Bad mb num in node %d", i) &&
assert.EqualValues(t, exp.Hash, mb.Hash, "Bad mb hash in node %d", i) &&
assert.Equal(
t,
bytesHash(exp.Data),
bytesHash(mb.Data),
"Bad mb data in node %d, mb %d",
i,
j,
)
}

if !slices.Equal(evHashes0, evHashes) {
assert.Fail(t, "Events mismatch", "node %d", i)
}
}
} else {
evHashes0 = evHashes
}

if failed {
Expand Down

0 comments on commit ba97c03

Please sign in to comment.