Skip to content

Commit

Permalink
Enable controlled logging for tests using testfmt package. (#1754)
Browse files Browse the repository at this point in the history
  • Loading branch information
sergekh2 authored Dec 8, 2024
1 parent bbba9a4 commit e92b4f3
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 30 deletions.
10 changes: 7 additions & 3 deletions core/node/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,17 @@ See [conventions](conventions.md)

# Debugging Tests

Logs are turned off by default in tests. To enable set `RIVER_TEST_LOG` variable to the desired logging level:
Logs are turned off by default in tests. To enable set `RIVER_TEST_LOG` variable to the desired logging level.
Printing from logs should be done through `testfmt` package, set `RIVER_TEST_PRINT` to enable:

# Run all test in rpc with info logging level
RIVER_TEST_LOG=info go test ./rpc -v
RIVER_TEST_LOG=info RIVER_TEST_PRINT=1 go test ./rpc -v

# Run single test by name with debug logging on
RIVER_TEST_LOG=debug go test ./rpc -v -run TestSingleAndMulti/multi/testMethods
RIVER_TEST_LOG=debug RIVER_TEST_PRINT=1 go test ./rpc -v -run TestSingleAndMulti/multi/testMethods

Go test's -v flag serves dual purpose: print names of running tests and enable test output.
These env vars give more control over what should be printed.

# Checking on Gamma Status from Local Host

Expand Down
9 changes: 5 additions & 4 deletions core/node/base/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/river-build/river/core/node/base/test"
"github.com/river-build/river/core/node/dlog"
"github.com/river-build/river/core/node/protocol"
"github.com/river-build/river/core/node/testutils/testfmt"
)

func TestRiverError(t *testing.T) {
Expand All @@ -33,12 +34,12 @@ func TestRiverError(t *testing.T) {
"bytes", []byte("test 123213 123123 12312312312 123"),
"error", errors.New("test error"),
).Func("TestRiverError").Tag("int", 3)
println(e.Error())
testfmt.Println(t, e.Error())
log.Error("test error", "error", e)
_ = e.Log(log)

e = AsRiverError(errors.New("base error"))
println(e.Error())
testfmt.Println(t, e.Error())
log.Error("test error", "error", e)
_ = e.LogInfo(log)

Expand Down Expand Up @@ -75,10 +76,10 @@ func TestRiverErrorBytes(t *testing.T) {
assert := assert.New(t)
slice := []byte{1, 2, 3, 15}
err := RiverError(protocol.Err_INTERNAL, "bytes", "val", slice)
println(err.Error())
testfmt.Println(t, err.Error())
assert.Contains(err.Error(), "0102030f")
err = RiverError(protocol.Err_INTERNAL, "bytesPtr", "val", &slice)
println(err.Error())
testfmt.Println(t, err.Error())
assert.Contains(err.Error(), "0102030f")
}

Expand Down
4 changes: 4 additions & 0 deletions core/node/dlog/dlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/river-build/river/core/node/dlog"
"github.com/river-build/river/core/node/testutils/testfmt"
)

type Data2 struct {
Expand Down Expand Up @@ -61,6 +62,9 @@ func makeTestData2() *Data2 {
}

func TestDlog(t *testing.T) {
if !testfmt.Enabled() {
t.SkipNow()
}
log := slog.New(dlog.NewPrettyTextHandler(os.Stderr, &dlog.PrettyHandlerOptions{
AddSource: false,
ReplaceAttr: nil,
Expand Down
37 changes: 19 additions & 18 deletions core/node/rpc/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
river_sync "github.com/river-build/river/core/node/rpc/sync"
. "github.com/river-build/river/core/node/shared"
"github.com/river-build/river/core/node/testutils"
"github.com/river-build/river/core/node/testutils/testfmt"
)

func TestMain(m *testing.M) {
Expand Down Expand Up @@ -1094,7 +1095,7 @@ func TestUnstableStreams(t *testing.T) {

syncRes.Receive()
syncID := syncRes.Msg().SyncId
t.Logf("subscription %s created on node: %s", syncID, services.nodes[1].address)
testfmt.Logf(t, "subscription %s created on node: %s", syncID, services.nodes[1].address)

// collect sync cookie updates for channels
var (
Expand All @@ -1111,7 +1112,7 @@ func TestUnstableStreams(t *testing.T) {
switch msg.GetSyncOp() {
case protocol.SyncOp_SYNC_NEW:
syncID := msg.GetSyncId()
t.Logf("start stream sync %s ", syncID)
testfmt.Logf(t, "start stream sync %s ", syncID)
case protocol.SyncOp_SYNC_UPDATE:
req.Equal(syncID, msg.GetSyncId(), "sync id")
req.NotNil(msg.GetStream(), "stream")
Expand Down Expand Up @@ -1208,7 +1209,7 @@ func TestUnstableStreams(t *testing.T) {
// send a bunch of messages and ensure that all are received
sendMessagesAndReceive(100, wallets, channels, req, client0, ctx, messages, func(StreamId) bool { return false })

t.Logf("first messages batch received")
testfmt.Logf(t, "first messages batch received")

// bring ~25% of the streams down
streamsDownCounter := 0
Expand All @@ -1226,7 +1227,7 @@ func TestUnstableStreams(t *testing.T) {

streamsDownCounter++

t.Logf("bring stream %s down", streamID)
testfmt.Logf(t, "bring stream %s down", streamID)

if i > TestStreams/4 {
break
Expand All @@ -1242,7 +1243,7 @@ func TestUnstableStreams(t *testing.T) {
return count == streamsDownCounter
}, 20*time.Second, 100*time.Millisecond, "didn't receive for all streams a down message")

t.Logf("received SyncOp_Down message for all expected streams")
testfmt.Logf(t, "received SyncOp_Down message for all expected streams")

// make sure that no more stream down messages are received
req.Never(func() bool {
Expand All @@ -1261,7 +1262,7 @@ func TestUnstableStreams(t *testing.T) {
return found
})

t.Logf("second messages batch received")
testfmt.Logf(t, "second messages batch received")

// resubscribe to the head on down streams and ensure that messages are received for all streams again
mu.Lock()
Expand All @@ -1280,12 +1281,12 @@ func TestUnstableStreams(t *testing.T) {
}
mu.Unlock()

t.Logf("resubscribed to streams that where brought down")
testfmt.Logf(t, "resubscribed to streams that where brought down")

// ensure that messages for all streams are received again
sendMessagesAndReceive(100, wallets, channels, req, client0, ctx, messages, func(StreamId) bool { return false })

t.Logf("third messages batch received")
testfmt.Logf(t, "third messages batch received")

// unsub from ~25% streams and ensure that no updates are received again
unsubbedStreams := make(map[StreamId]struct{})
Expand All @@ -1300,7 +1301,7 @@ func TestUnstableStreams(t *testing.T) {

unsubbedStreams[streamID] = struct{}{}

t.Logf("unsubbed from stream %s", streamID)
testfmt.Logf(t, "unsubbed from stream %s", streamID)

if i > TestStreams/4 {
break
Expand All @@ -1312,7 +1313,7 @@ func TestUnstableStreams(t *testing.T) {
return found
})

t.Logf("fourth messages batch received")
testfmt.Logf(t, "fourth messages batch received")

// resubscribe to the head on down streams and ensure that messages are received for all streams again
mu.Lock()
Expand All @@ -1331,13 +1332,13 @@ func TestUnstableStreams(t *testing.T) {
}
mu.Unlock()

t.Logf("resubscribed to streams that where brought down")
testfmt.Logf(t, "resubscribed to streams that where brought down")

sendMessagesAndReceive(100, wallets, channels, req, client0, ctx, messages, func(streamID StreamId) bool {
return false
})

t.Logf("fifth messages batch received")
testfmt.Logf(t, "fifth messages batch received")

// drop all streams from a node
var (
Expand Down Expand Up @@ -1374,7 +1375,7 @@ func TestUnstableStreams(t *testing.T) {
return count == len(targetStreams)
}, 20*time.Second, 100*time.Millisecond, "didn't receive for all streams a down message")

t.Logf("received SyncOp_Down message for all expected streams")
testfmt.Logf(t, "received SyncOp_Down message for all expected streams")

sendMessagesAndReceive(100, wallets, channels, req, client0, ctx, messages, func(streamID StreamId) bool {
mu.Lock()
Expand All @@ -1383,7 +1384,7 @@ func TestUnstableStreams(t *testing.T) {
return found
})

t.Logf("sixt messages batch received")
testfmt.Logf(t, "sixt messages batch received")

// make sure we can resubscribe to these streams
for _, streamID := range targetStreams {
Expand All @@ -1404,18 +1405,18 @@ func TestUnstableStreams(t *testing.T) {
return false
})

t.Logf("seventh messages batch received")
testfmt.Logf(t, "seventh messages batch received")

_, err = client1.CancelSync(ctx, connect.NewRequest(&protocol.CancelSyncRequest{SyncId: syncID}))
req.NoError(err, "cancel sync")

t.Logf("Streams subscription cancelled")
testfmt.Logf(t, "Streams subscription cancelled")

sendMessagesAndReceive(100, wallets, channels, req, client0, ctx, messages, func(streamID StreamId) bool {
return true
})

t.Logf("eight messages batch received")
testfmt.Logf(t, "eight messages batch received")

// make sure that SyncOp_Close msg is received (messages is closed)
req.Eventuallyf(func() bool {
Expand Down Expand Up @@ -1612,7 +1613,7 @@ func TestSyncSubscriptionWithTooSlowClient(t *testing.T) {
}

// subscribe to channel updates on node 1 direct through a sync op to have better control over it
t.Logf("subscribe on node %s", node1.address)
testfmt.Logf(t, "subscribe on node %s", node1.address)
syncPos := append(users, channels...)
syncOp, err := river_sync.NewStreamsSyncOperation(
ctx, syncID, node1.address, node1.service.cache, node1.service.nodeRegistry)
Expand Down
95 changes: 95 additions & 0 deletions core/node/testutils/testfmt/testfmt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package testfmt

import (
"os"
)

// TestingLogger is a subset of *testing.T that is used for logging.
type TestingLogger interface {
Log(a ...any)
Logf(format string, a ...any)
Helper()
}

// Print logs a message to the testing logger if RIVER_TEST_PRINT is set.
func Print(t TestingLogger, a ...any) {
if enabled {
t.Helper()
t.Log(a...)
}
}

// Printf logs a formatted message to the testing logger if RIVER_TEST_PRINT is set.
func Printf(t TestingLogger, format string, a ...any) {
if enabled {
t.Helper()
t.Logf(format, a...)
}
}

// Println logs a message to the testing logger if RIVER_TEST_PRINT is set.
func Println(t TestingLogger, a ...any) {
if enabled {
t.Helper()
t.Log(a...)
}
}

// Log logs a message to the testing logger if RIVER_TEST_PRINT is set.
func Log(t TestingLogger, a ...any) {
if enabled {
t.Helper()
t.Log(a...)
}
}

// Logf logs a formatted message to the testing logger if RIVER_TEST_PRINT is set.
func Logf(t TestingLogger, format string, a ...any) {
if enabled {
t.Helper()
t.Logf(format, a...)
}
}

type TestFmt struct {
t TestingLogger
}

// New returns a new TestFmt that logs to the given testing logger if RIVER_TEST_PRINT is set.
func New(t TestingLogger) TestFmt {
return TestFmt{t}
}

func (f TestFmt) Print(a ...any) {
Print(f.t, a...)
}

func (f TestFmt) Printf(format string, a ...any) {
Printf(f.t, format, a...)
}

func (f TestFmt) Println(a ...any) {
Println(f.t, a...)
}

func (f TestFmt) Log(a ...any) {
Log(f.t, a...)
}

func (f TestFmt) Logf(format string, a ...any) {
Logf(f.t, format, a...)
}

func Enabled() bool {
return enabled
}

func Enable(v bool) {
enabled = v
}

var enabled bool

func init() {
enabled = os.Getenv("RIVER_TEST_PRINT") != ""
}
16 changes: 13 additions & 3 deletions core/xchain/entitlement/entitlement_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/river-build/river/core/config"
"github.com/river-build/river/core/node/base/test"
"github.com/river-build/river/core/node/crypto"
"github.com/river-build/river/core/node/infra"
"github.com/river-build/river/core/xchain/examples"
Expand Down Expand Up @@ -353,6 +354,9 @@ var (
)

func TestAndOperation(t *testing.T) {
ctx, cancel := test.NewTestContext()
defer cancel()

testCases := []struct {
description string
a Operation
Expand Down Expand Up @@ -415,7 +419,7 @@ func TestAndOperation(t *testing.T) {

callerAddress := common.Address{}

result, actualErr := evaluator.evaluateOp(context.Background(), tree, []common.Address{callerAddress})
result, actualErr := evaluator.evaluateOp(ctx, tree, []common.Address{callerAddress})
elapsedTime := time.Since(startTime)
if tc.expectedErr != nil {
require.EqualError(t, actualErr, tc.expectedErr.Error(), "Expected error was not found")
Expand All @@ -436,6 +440,9 @@ func TestAndOperation(t *testing.T) {
}

func TestOrOperation(t *testing.T) {
ctx, cancel := test.NewTestContext()
defer cancel()

testCases := []struct {
description string
a Operation
Expand Down Expand Up @@ -498,7 +505,7 @@ func TestOrOperation(t *testing.T) {

callerAddress := common.Address{}

result, actualErr := evaluator.evaluateOp(context.Background(), tree, []common.Address{callerAddress})
result, actualErr := evaluator.evaluateOp(ctx, tree, []common.Address{callerAddress})
elapsedTime := time.Since(startTime)
if tc.expectedErr != nil {
require.EqualError(t, actualErr, tc.expectedErr.Error(), "Expected error was not found")
Expand Down Expand Up @@ -527,6 +534,9 @@ func areDurationsClose(d1, d2, threshold time.Duration) bool {
}

func TestCheckOperation(t *testing.T) {
ctx, cancel := test.NewTestContext()
defer cancel()

testCases := []struct {
a Operation
wallets []common.Address
Expand All @@ -542,7 +552,7 @@ func TestCheckOperation(t *testing.T) {
for _, tc := range testCases {
startTime := time.Now() // Get the current time

result, err := evaluator.evaluateOp(context.Background(), tc.a, tc.wallets)
result, err := evaluator.evaluateOp(ctx, tc.a, tc.wallets)
elapsedTime := time.Since(startTime)

if err != nil {
Expand Down
Loading

0 comments on commit e92b4f3

Please sign in to comment.