diff --git a/core/node/base/test/context.go b/core/node/base/test/context.go index 737fbf7c6..17c05b703 100644 --- a/core/node/base/test/context.go +++ b/core/node/base/test/context.go @@ -10,24 +10,31 @@ import ( func NewTestContext() (context.Context, context.CancelFunc) { logLevel := os.Getenv("RIVER_TEST_LOG") - var handler slog.Handler if logLevel == "" { - handler = &dlog.NullHandler{} + handler := &dlog.NullHandler{} + //lint:ignore LE0000 context.Background() used correctly + ctx := dlog.CtxWithLog(context.Background(), slog.New(handler)) + return context.WithCancel(ctx) } else { - var level slog.Level - err := level.UnmarshalText([]byte(logLevel)) - if err != nil { - level = slog.LevelInfo - } - handler = dlog.NewPrettyTextHandler( - os.Stdout, - &dlog.PrettyHandlerOptions{ - Level: level, - PrintLongTime: false, - Colors: dlog.ColorMap_Enabled, - }, - ) + return NewTestContextWithLogging(logLevel) } +} + +func NewTestContextWithLogging(logLevel string) (context.Context, context.CancelFunc) { + var level slog.Level + err := level.UnmarshalText([]byte(logLevel)) + if err != nil { + level = slog.LevelInfo + } + handler := dlog.NewPrettyTextHandler( + os.Stdout, + &dlog.PrettyHandlerOptions{ + Level: level, + PrintLongTime: false, + Colors: dlog.ColorMap_Enabled, + }, + ) + //lint:ignore LE0000 context.Background() used correctly ctx := dlog.CtxWithLog(context.Background(), slog.New(handler)) return context.WithCancel(ctx) diff --git a/core/node/events/miniblock_producer.go b/core/node/events/miniblock_producer.go index ff993e3c3..20bb0af6c 100644 --- a/core/node/events/miniblock_producer.go +++ b/core/node/events/miniblock_producer.go @@ -577,7 +577,13 @@ func (p *miniblockProducer) jobStart(ctx context.Context, j *mbJob, forceSnapsho candidate, err := mbProduceCandidate(ctx, p.streamCache.Params(), j.stream, forceSnapshot) if err != nil { dlog.FromCtx(ctx). - Error("MiniblockProducer: jobStart: Error creating new miniblock proposal", "streamId", j.stream.streamId, "err", err) + Error( + "MiniblockProducer: jobStart: Error creating new miniblock proposal", + "streamId", + j.stream.streamId, + "err", + err, + ) p.jobDone(ctx, j) return } diff --git a/core/node/events/quorum_pool.go b/core/node/events/quorum_pool.go index dfe3dbb42..2eade1bda 100644 --- a/core/node/events/quorum_pool.go +++ b/core/node/events/quorum_pool.go @@ -3,10 +3,12 @@ package events import ( "context" "errors" + "time" "github.com/ethereum/go-ethereum/common" "github.com/river-build/river/core/node/dlog" + "github.com/river-build/river/core/node/utils" ) type QuorumPool struct { @@ -46,7 +48,11 @@ func (q *QuorumPool) GoRemotes( q.remoteErrChannel = make(chan error, len(nodes)) q.remotes += len(nodes) for _, node := range nodes { - go q.executeRemote(ctx, node, f) + ctx, cancel := utils.UncancelContext(ctx, 5*time.Second, 10*time.Second) + go func() { + defer cancel() + q.executeRemote(ctx, node, f) + }() } } diff --git a/core/node/events/stream.go b/core/node/events/stream.go index 471ed08c4..c47b3eb78 100644 --- a/core/node/events/stream.go +++ b/core/node/events/stream.go @@ -3,7 +3,9 @@ package events import ( "bytes" "context" + "fmt" "slices" + "strings" "sync" "time" @@ -734,8 +736,16 @@ func (s *streamImpl) addEventLocked(ctx context.Context, event *ParsedEvent) err // TODO: for some classes of errors, it's not clear if event was added or not // for those, perhaps entire Stream structure should be scrapped and reloaded if err != nil { + var sb strings.Builder + sb.WriteString("[\n") + for hash, event := range s.view().minipool.events.Map { + sb.WriteString(fmt.Sprintf(" %s %s,\n", hash, event.ShortDebugStr())) + } + sb.WriteString("]") + return AsRiverError(err, Err_DB_OPERATION_FAILURE). - Tag("inMemoryBlocks", len(s.view().blocks)) + Tag("inMemoryBlocks", len(s.view().blocks)). + Tag("inMemoryEvents", sb.String()) } s.setView(newSV) diff --git a/core/node/rpc/forwarder.go b/core/node/rpc/forwarder.go index 577a20ecd..eb1a52e5c 100644 --- a/core/node/rpc/forwarder.go +++ b/core/node/rpc/forwarder.go @@ -192,6 +192,7 @@ func executeConnectHandler[Req, Res any]( Tags( "nodeAddress", service.wallet.Address.Hex(), "nodeUrl", service.config.Address, + "handler", methodName, "elapsed", elapsed, ). Func(methodName) diff --git a/core/node/rpc/node2node.go b/core/node/rpc/node2node.go index de207539f..34c1c20ef 100644 --- a/core/node/rpc/node2node.go +++ b/core/node/rpc/node2node.go @@ -119,7 +119,6 @@ func (s *Service) ProposeMiniblock( req *connect.Request[ProposeMiniblockRequest], ) (*connect.Response[ProposeMiniblockResponse], error) { ctx, log := utils.CtxAndLogForRequest(ctx, req) - log.Debug("ProposeMiniblock ENTER") r, e := s.proposeMiniblock(ctx, req.Msg) if e != nil { return nil, AsRiverError( @@ -129,7 +128,6 @@ func (s *Service) ProposeMiniblock( LogWarn(log). AsConnectError() } - log.Debug("ProposeMiniblock LEAVE", "response", r) return connect.NewResponse(r), nil } diff --git a/core/node/rpc/repl_test.go b/core/node/rpc/repl_test.go index 7aa2864e6..9367f4d46 100644 --- a/core/node/rpc/repl_test.go +++ b/core/node/rpc/repl_test.go @@ -125,10 +125,10 @@ func TestStreamReconciliationFromGenesis(t *testing.T) { latestMbNum := int64(0) mbRef := MiniblockRefFromCookie(cookie) - for range N { + for i := range N { require.NoError(addUserBlockedFillerEvent(ctx, wallet, client, streamId, mbRef)) mbRef, err = tt.nodes[2].service.mbProducer.TestMakeMiniblock(ctx, streamId, false) - require.NoError(err) + require.NoError(err, "Failed to make miniblock on round %d", i) if mbChain[latestMbNum] != mbRef.Hash { latestMbNum = mbRef.Num diff --git a/core/node/rpc/tester_test.go b/core/node/rpc/tester_test.go index 002d89911..4d6ee9b4b 100644 --- a/core/node/rpc/tester_test.go +++ b/core/node/rpc/tester_test.go @@ -79,6 +79,7 @@ type serviceTesterOpts struct { replicationFactor int start bool btcParams *crypto.TestParams + printTestLogs bool } func makeTestListenerNoCleanup(t *testing.T) (net.Listener, string) { @@ -105,7 +106,13 @@ func newServiceTester(t *testing.T, opts serviceTesterOpts) *serviceTester { opts.replicationFactor = 1 } - ctx, ctxCancel := test.NewTestContext() + var ctx context.Context + var ctxCancel func() + if opts.printTestLogs { + ctx, ctxCancel = test.NewTestContextWithLogging("error") + } else { + ctx, ctxCancel = test.NewTestContext() + } require := require.New(t) st := &serviceTester{ diff --git a/core/node/storage/pg_stream_store.go b/core/node/storage/pg_stream_store.go index ebf1a2787..432f51cbe 100644 --- a/core/node/storage/pg_stream_store.go +++ b/core/node/storage/pg_stream_store.go @@ -163,17 +163,10 @@ func (s *PostgresStreamStore) maintainSchemaLock( lockId := s.computeLockIdFromSchema() - count := 0 for { // Check for connection health with a ping. Also, maintain the connection in the // case of idle timeouts. err := conn.Ping(ctx) - count++ - - if count%100 == 0 { - log.Debug("DB Ping!", "error", err) - } - if err != nil { // We expect cancellation only on node shutdown. In this case, // do not send an error signal. @@ -239,10 +232,10 @@ func (s *PostgresStreamStore) maintainSchemaLock( LogError(dlog.FromCtx(ctx)) s.exitSignal <- err } + } - if err = SleepWithContext(ctx, 1*time.Second); err != nil { - return - } + if err = SleepWithContext(ctx, 1*time.Second); err != nil { + return } } } @@ -897,8 +890,15 @@ func (s *PostgresStreamStore) writeEventTx( // At this moment counter should be equal to minipoolSlot otherwise it is discrepancy of actual and expected records in minipool // Keep in mind that there is service record with seqNum equal to -1 if counter != minipoolSlot { + var seqNum int + mbErr := tx.QueryRow( + ctx, + s.sqlForStream("select max(seq_num) from {{miniblocks}} where stream_id = $1", streamId), + streamId, + ).Scan(&seqNum) return RiverError(Err_DB_OPERATION_FAILURE, "Wrong number of records in minipool"). - Tag("ActualRecordsNumber", counter).Tag("ExpectedRecordsNumber", minipoolSlot) + Tag("ActualRecordsNumber", counter).Tag("ExpectedRecordsNumber", minipoolSlot). + Tag("maxSeqNum", seqNum).Tag("mbErr", mbErr) } // All checks passed - we need to insert event into minipool @@ -1633,7 +1633,9 @@ func (s *PostgresStreamStore) listOtherInstancesTx(ctx context.Context, tx pgx.T } if delay > 0 { log.Info("singlenodekey is not empty; Delaying startup to let other instance exit", "delay", delay) - time.Sleep(delay) + if err = SleepWithContext(ctx, delay); err != nil { + return AsRiverError(err, Err_DB_OPERATION_FAILURE).Message("Could not list other instances") + } } } @@ -1685,7 +1687,9 @@ func (s *PostgresStreamStore) acquireListeningConnection(ctx context.Context) *p log.Debug("Failed to acquire listening connection, retrying", "error", err) // In the event of networking issues, wait a small period of time for recovery. - time.Sleep(100 * time.Millisecond) + if err = SleepWithContext(ctx, 100*time.Millisecond); err != nil { + return nil + } } } @@ -1721,7 +1725,9 @@ func (s *PostgresStreamStore) acquireConnection(ctx context.Context) (*pgxpool.C ) // In the event of networking issues, wait a small period of time for recovery. - time.Sleep(500 * time.Millisecond) + if err = SleepWithContext(ctx, 500*time.Millisecond); err != nil { + break + } } log.Error("Failed to acquire pgx connection", "error", err) diff --git a/core/node/testutils/testcert/testcert.go b/core/node/testutils/testcert/testcert.go index 10c6c21a1..52ad45608 100644 --- a/core/node/testutils/testcert/testcert.go +++ b/core/node/testutils/testcert/testcert.go @@ -4,10 +4,10 @@ import ( "context" "crypto/tls" "crypto/x509" + "net" "net/http" "strings" - - "golang.org/x/net/http2" + "time" "github.com/river-build/river/core/config" ) @@ -98,10 +98,24 @@ func GetHttp2LocalhostTLSConfig() *tls.Config { func GetHttp2LocalhostTLSClient(ctx context.Context, cfg *config.Config) (*http.Client, error) { return &http.Client{ - Transport: &http2.Transport{ + Transport: &http.Transport{ TLSClientConfig: &tls.Config{ RootCAs: LocalhostCertPool, }, + + // Node-2-node connections to local nodes in tests sometimes seem to hang if the + // local node is down, although they do terminate when the http service is torn down. + // This setting limits the duration of attempting to establish a connection to + // another node. + DialContext: (&net.Dialer{ + Timeout: 2 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + + // ForceAttemptHTTP2 ensures the transport negotiates HTTP/2 if possible. + // This allows us to use the http.Transport, whose DialContext timeout is + // respected by the service. + ForceAttemptHTTP2: true, }, }, nil }