Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DEPRECATED][BUGFIX] Move sleep to implement a 1s delay between pings for schema lock connection #1853

Open
wants to merge 23 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 22 additions & 15 deletions core/node/base/test/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,31 @@ import (

func NewTestContext() (context.Context, context.CancelFunc) {
logLevel := os.Getenv("RIVER_TEST_LOG")
var handler slog.Handler
if logLevel == "" {
handler = &dlog.NullHandler{}
handler := &dlog.NullHandler{}
//lint:ignore LE0000 context.Background() used correctly
ctx := dlog.CtxWithLog(context.Background(), slog.New(handler))
return context.WithCancel(ctx)
} else {
var level slog.Level
err := level.UnmarshalText([]byte(logLevel))
if err != nil {
level = slog.LevelInfo
}
handler = dlog.NewPrettyTextHandler(
os.Stdout,
&dlog.PrettyHandlerOptions{
Level: level,
PrintLongTime: false,
Colors: dlog.ColorMap_Enabled,
},
)
return NewTestContextWithLogging(logLevel)
}
}

func NewTestContextWithLogging(logLevel string) (context.Context, context.CancelFunc) {
var level slog.Level
err := level.UnmarshalText([]byte(logLevel))
if err != nil {
level = slog.LevelInfo
}
handler := dlog.NewPrettyTextHandler(
os.Stdout,
&dlog.PrettyHandlerOptions{
Level: level,
PrintLongTime: false,
Colors: dlog.ColorMap_Enabled,
},
)

//lint:ignore LE0000 context.Background() used correctly
ctx := dlog.CtxWithLog(context.Background(), slog.New(handler))
return context.WithCancel(ctx)
Expand Down
8 changes: 7 additions & 1 deletion core/node/events/miniblock_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,13 @@ func (p *miniblockProducer) jobStart(ctx context.Context, j *mbJob, forceSnapsho
candidate, err := mbProduceCandidate(ctx, p.streamCache.Params(), j.stream, forceSnapshot)
if err != nil {
dlog.FromCtx(ctx).
Error("MiniblockProducer: jobStart: Error creating new miniblock proposal", "streamId", j.stream.streamId, "err", err)
Error(
"MiniblockProducer: jobStart: Error creating new miniblock proposal",
"streamId",
j.stream.streamId,
"err",
err,
)
p.jobDone(ctx, j)
return
}
Expand Down
8 changes: 7 additions & 1 deletion core/node/events/quorum_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package events
import (
"context"
"errors"
"time"

"github.com/ethereum/go-ethereum/common"

"github.com/river-build/river/core/node/dlog"
"github.com/river-build/river/core/node/utils"
)

type QuorumPool struct {
Expand Down Expand Up @@ -46,7 +48,11 @@ func (q *QuorumPool) GoRemotes(
q.remoteErrChannel = make(chan error, len(nodes))
q.remotes += len(nodes)
for _, node := range nodes {
go q.executeRemote(ctx, node, f)
ctx, cancel := utils.UncancelContext(ctx, 5*time.Second, 10*time.Second)
go func() {
defer cancel()
q.executeRemote(ctx, node, f)
}()
}
}

Expand Down
12 changes: 11 additions & 1 deletion core/node/events/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package events
import (
"bytes"
"context"
"fmt"
"slices"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -734,8 +736,16 @@ func (s *streamImpl) addEventLocked(ctx context.Context, event *ParsedEvent) err
// TODO: for some classes of errors, it's not clear if event was added or not
// for those, perhaps entire Stream structure should be scrapped and reloaded
if err != nil {
var sb strings.Builder
sb.WriteString("[\n")
for hash, event := range s.view().minipool.events.Map {
sb.WriteString(fmt.Sprintf(" %s %s,\n", hash, event.ShortDebugStr()))
}
sb.WriteString("]")

return AsRiverError(err, Err_DB_OPERATION_FAILURE).
Tag("inMemoryBlocks", len(s.view().blocks))
Tag("inMemoryBlocks", len(s.view().blocks)).
Tag("inMemoryEvents", sb.String())
}

s.setView(newSV)
Expand Down
1 change: 1 addition & 0 deletions core/node/rpc/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ func executeConnectHandler[Req, Res any](
Tags(
"nodeAddress", service.wallet.Address.Hex(),
"nodeUrl", service.config.Address,
"handler", methodName,
"elapsed", elapsed,
).
Func(methodName)
Expand Down
2 changes: 0 additions & 2 deletions core/node/rpc/node2node.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ func (s *Service) ProposeMiniblock(
req *connect.Request[ProposeMiniblockRequest],
) (*connect.Response[ProposeMiniblockResponse], error) {
ctx, log := utils.CtxAndLogForRequest(ctx, req)
log.Debug("ProposeMiniblock ENTER")
r, e := s.proposeMiniblock(ctx, req.Msg)
if e != nil {
return nil, AsRiverError(
Expand All @@ -129,7 +128,6 @@ func (s *Service) ProposeMiniblock(
LogWarn(log).
AsConnectError()
}
log.Debug("ProposeMiniblock LEAVE", "response", r)
return connect.NewResponse(r), nil
}

Expand Down
4 changes: 2 additions & 2 deletions core/node/rpc/repl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,10 @@ func TestStreamReconciliationFromGenesis(t *testing.T) {
latestMbNum := int64(0)

mbRef := MiniblockRefFromCookie(cookie)
for range N {
for i := range N {
require.NoError(addUserBlockedFillerEvent(ctx, wallet, client, streamId, mbRef))
mbRef, err = tt.nodes[2].service.mbProducer.TestMakeMiniblock(ctx, streamId, false)
require.NoError(err)
require.NoError(err, "Failed to make miniblock on round %d", i)

if mbChain[latestMbNum] != mbRef.Hash {
latestMbNum = mbRef.Num
Expand Down
9 changes: 8 additions & 1 deletion core/node/rpc/tester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type serviceTesterOpts struct {
replicationFactor int
start bool
btcParams *crypto.TestParams
printTestLogs bool
}

func makeTestListenerNoCleanup(t *testing.T) (net.Listener, string) {
Expand All @@ -105,7 +106,13 @@ func newServiceTester(t *testing.T, opts serviceTesterOpts) *serviceTester {
opts.replicationFactor = 1
}

ctx, ctxCancel := test.NewTestContext()
var ctx context.Context
var ctxCancel func()
if opts.printTestLogs {
ctx, ctxCancel = test.NewTestContextWithLogging("error")
} else {
ctx, ctxCancel = test.NewTestContext()
}
require := require.New(t)

st := &serviceTester{
Expand Down
34 changes: 20 additions & 14 deletions core/node/storage/pg_stream_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,17 +163,10 @@ func (s *PostgresStreamStore) maintainSchemaLock(

lockId := s.computeLockIdFromSchema()

count := 0
for {
// Check for connection health with a ping. Also, maintain the connection in the
// case of idle timeouts.
err := conn.Ping(ctx)
count++

if count%100 == 0 {
log.Debug("DB Ping!", "error", err)
}

if err != nil {
// We expect cancellation only on node shutdown. In this case,
// do not send an error signal.
Expand Down Expand Up @@ -239,10 +232,10 @@ func (s *PostgresStreamStore) maintainSchemaLock(
LogError(dlog.FromCtx(ctx))
s.exitSignal <- err
}
}

if err = SleepWithContext(ctx, 1*time.Second); err != nil {
return
}
if err = SleepWithContext(ctx, 1*time.Second); err != nil {
return
}
}
}
Expand Down Expand Up @@ -897,8 +890,15 @@ func (s *PostgresStreamStore) writeEventTx(
// At this moment counter should be equal to minipoolSlot otherwise it is discrepancy of actual and expected records in minipool
// Keep in mind that there is service record with seqNum equal to -1
if counter != minipoolSlot {
var seqNum int
mbErr := tx.QueryRow(
ctx,
s.sqlForStream("select max(seq_num) from {{miniblocks}} where stream_id = $1", streamId),
streamId,
).Scan(&seqNum)
return RiverError(Err_DB_OPERATION_FAILURE, "Wrong number of records in minipool").
Tag("ActualRecordsNumber", counter).Tag("ExpectedRecordsNumber", minipoolSlot)
Tag("ActualRecordsNumber", counter).Tag("ExpectedRecordsNumber", minipoolSlot).
Tag("maxSeqNum", seqNum).Tag("mbErr", mbErr)
}

// All checks passed - we need to insert event into minipool
Expand Down Expand Up @@ -1633,7 +1633,9 @@ func (s *PostgresStreamStore) listOtherInstancesTx(ctx context.Context, tx pgx.T
}
if delay > 0 {
log.Info("singlenodekey is not empty; Delaying startup to let other instance exit", "delay", delay)
time.Sleep(delay)
if err = SleepWithContext(ctx, delay); err != nil {
return AsRiverError(err, Err_DB_OPERATION_FAILURE).Message("Could not list other instances")
}
}
}

Expand Down Expand Up @@ -1685,7 +1687,9 @@ func (s *PostgresStreamStore) acquireListeningConnection(ctx context.Context) *p
log.Debug("Failed to acquire listening connection, retrying", "error", err)

// In the event of networking issues, wait a small period of time for recovery.
time.Sleep(100 * time.Millisecond)
if err = SleepWithContext(ctx, 100*time.Millisecond); err != nil {
return nil
}
}
}

Expand Down Expand Up @@ -1721,7 +1725,9 @@ func (s *PostgresStreamStore) acquireConnection(ctx context.Context) (*pgxpool.C
)

// In the event of networking issues, wait a small period of time for recovery.
time.Sleep(500 * time.Millisecond)
if err = SleepWithContext(ctx, 500*time.Millisecond); err != nil {
break
}
}

log.Error("Failed to acquire pgx connection", "error", err)
Expand Down
20 changes: 17 additions & 3 deletions core/node/testutils/testcert/testcert.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import (
"context"
"crypto/tls"
"crypto/x509"
"net"
"net/http"
"strings"

"golang.org/x/net/http2"
"time"

"github.com/river-build/river/core/config"
)
Expand Down Expand Up @@ -98,10 +98,24 @@ func GetHttp2LocalhostTLSConfig() *tls.Config {

func GetHttp2LocalhostTLSClient(ctx context.Context, cfg *config.Config) (*http.Client, error) {
return &http.Client{
Transport: &http2.Transport{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: LocalhostCertPool,
},

// Node-2-node connections to local nodes in tests sometimes seem to hang if the
// local node is down, although they do terminate when the http service is torn down.
// This setting limits the duration of attempting to establish a connection to
// another node.
DialContext: (&net.Dialer{
Timeout: 2 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,

// ForceAttemptHTTP2 ensures the transport negotiates HTTP/2 if possible.
// This allows us to use the http.Transport, whose DialContext timeout is
// respected by the service.
ForceAttemptHTTP2: true,
},
}, nil
}