Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

node: Make stream sync more resilient #408

Merged
merged 16 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/node/events/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@ type Stream interface {
}

type SyncResultReceiver interface {
// OnUpdate is called each time a new cookie is available for a stream
OnUpdate(r *StreamAndCookie)
// OnSyncError is called when a sync subscription failed unrecoverable
OnSyncError(err error)
// OnStreamSyncDown is called when updates for a stream could not be given.
OnStreamSyncDown(StreamId)
}

// TODO: refactor interfaces.
Expand Down
5 changes: 5 additions & 0 deletions core/node/events/stream_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ func TestCacheEvictionWithFilledMiniBlockPool(t *testing.T) {
type testStreamCacheViewEvictionSub struct {
receivedStreamAndCookies []*protocol.StreamAndCookie
receivedErrors []error
streamErrors []shared.StreamId
}

func (sub *testStreamCacheViewEvictionSub) OnUpdate(sac *protocol.StreamAndCookie) {
Expand All @@ -209,6 +210,10 @@ func (sub *testStreamCacheViewEvictionSub) OnSyncError(err error) {
sub.receivedErrors = append(sub.receivedErrors, err)
}

func (sub *testStreamCacheViewEvictionSub) OnStreamSyncDown(streamID shared.StreamId) {
sub.streamErrors = append(sub.streamErrors, streamID)
}

func (sub *testStreamCacheViewEvictionSub) eventsReceived() int {
count := 0
for _, sac := range sub.receivedStreamAndCookies {
Expand Down
439 changes: 238 additions & 201 deletions core/node/protocol/protocol.pb.go

Large diffs are not rendered by default.

13 changes: 7 additions & 6 deletions core/node/rpc/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rpc

import (
"context"
"github.com/river-build/river/core/node/utils"

"connectrpc.com/connect"

Expand Down Expand Up @@ -141,7 +142,7 @@ func (s *Service) CreateStream(
ctx context.Context,
req *connect.Request[CreateStreamRequest],
) (*connect.Response[CreateStreamResponse], error) {
ctx, log := ctxAndLogForRequest(ctx, req)
ctx, log := utils.CtxAndLogForRequest(ctx, req)
log.Debug("CreateStream REQUEST", "streamId", req.Msg.StreamId)
r, e := s.createStreamImpl(ctx, req)
if e != nil {
Expand Down Expand Up @@ -174,7 +175,7 @@ func (s *Service) GetStream(
ctx context.Context,
req *connect.Request[GetStreamRequest],
) (*connect.Response[GetStreamResponse], error) {
ctx, log := ctxAndLogForRequest(ctx, req)
ctx, log := utils.CtxAndLogForRequest(ctx, req)
log.Debug("GetStream ENTER")
r, e := s.getStreamImpl(ctx, req)
if e != nil {
Expand All @@ -194,7 +195,7 @@ func (s *Service) GetStreamEx(
req *connect.Request[GetStreamExRequest],
resp *connect.ServerStream[GetStreamExResponse],
) error {
ctx, log := ctxAndLogForRequest(ctx, req)
ctx, log := utils.CtxAndLogForRequest(ctx, req)
log.Debug("GetStreamEx ENTER")
e := s.getStreamExImpl(ctx, req, resp)
if e != nil {
Expand Down Expand Up @@ -316,7 +317,7 @@ func (s *Service) GetMiniblocks(
ctx context.Context,
req *connect.Request[GetMiniblocksRequest],
) (*connect.Response[GetMiniblocksResponse], error) {
ctx, log := ctxAndLogForRequest(ctx, req)
ctx, log := utils.CtxAndLogForRequest(ctx, req)
log.Debug("GetMiniblocks ENTER", "req", req.Msg)
r, e := s.getMiniblocksImpl(ctx, req)
if e != nil {
Expand Down Expand Up @@ -368,7 +369,7 @@ func (s *Service) GetLastMiniblockHash(
ctx context.Context,
req *connect.Request[GetLastMiniblockHashRequest],
) (*connect.Response[GetLastMiniblockHashResponse], error) {
ctx, log := ctxAndLogForRequest(ctx, req)
ctx, log := utils.CtxAndLogForRequest(ctx, req)
log.Debug("GetLastMiniblockHash ENTER", "req", req.Msg)
r, e := s.getLastMiniblockHashImpl(ctx, req)
if e != nil {
Expand Down Expand Up @@ -420,7 +421,7 @@ func (s *Service) AddEvent(
ctx context.Context,
req *connect.Request[AddEventRequest],
) (*connect.Response[AddEventResponse], error) {
ctx, log := ctxAndLogForRequest(ctx, req)
ctx, log := utils.CtxAndLogForRequest(ctx, req)
log.Debug("AddEvent ENTER", "req", req.Msg)
r, e := s.addEventImpl(ctx, req)
if e != nil {
Expand Down
33 changes: 32 additions & 1 deletion core/node/rpc/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"errors"
"fmt"
"github.com/river-build/river/core/node/rpc/sync"
"github.com/river-build/river/core/node/utils"
"log/slog"
"strconv"

Expand All @@ -24,7 +26,7 @@ func (s *Service) Info(
ctx context.Context,
req *connect.Request[InfoRequest],
) (*connect.Response[InfoResponse], error) {
ctx, log := ctxAndLogForRequest(ctx, req)
ctx, log := utils.CtxAndLogForRequest(ctx, req)

log.Debug("Info ENTER", "request", req.Msg)

Expand All @@ -45,6 +47,7 @@ func (s *Service) info(
) (*connect.Response[InfoResponse], error) {
if len(request.Msg.Debug) > 0 {
debug := request.Msg.Debug[0]

if debug == "error" {
return nil, RiverError(Err_DEBUG_ERROR, "Error requested through Info request")
} else if debug == "network_error" {
Expand All @@ -54,6 +57,8 @@ func (s *Service) info(
return nil, errors.New("error requested through Info request")
} else if debug == "make_miniblock" {
return s.debugInfoMakeMiniblock(ctx, request)
} else if debug == "drop_stream" {
return s.debugDropStream(ctx, request)
}

if s.config.EnableTestAPIs {
Expand Down Expand Up @@ -83,6 +88,32 @@ func (s *Service) info(
}), nil
}

func (s *Service) debugDropStream(
ctx context.Context,
request *connect.Request[InfoRequest],
) (*connect.Response[InfoResponse], error) {
if len(request.Msg.GetDebug()) < 3 {
return nil, RiverError(Err_DEBUG_ERROR, "drop_stream requires a sync id and stream id")
}

syncID := request.Msg.Debug[1]
streamID, err := shared.StreamIdFromString(request.Msg.Debug[2])
if err != nil {
return nil, err
}

dbgHandler, ok := s.syncHandler.(sync.DebugHandler)
if !ok {
return nil, RiverError(Err_UNAVAILABLE, "Drop stream not supported")
}

if err = dbgHandler.DebugDropStream(ctx, syncID, streamID); err != nil {
return nil, err
}

return connect.NewResponse(&InfoResponse{}), nil
}

func (s *Service) debugInfoMakeMiniblock(
ctx context.Context,
request *connect.Request[InfoRequest],
Expand Down
5 changes: 2 additions & 3 deletions core/node/rpc/metrics_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ import (

"connectrpc.com/connect"
"github.com/prometheus/client_golang/prometheus"
"github.com/river-build/river/core/node/shared"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/river-build/river/core/node/shared"
)

type streamIdProvider interface {
Expand Down Expand Up @@ -46,7 +45,7 @@ func (i *metricsInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc
m.Dec()
prometheus.NewTimer(i.rpcDuration.WithLabelValues(proc)).ObserveDuration()
}()

// add streamId to tracing span
r, ok := req.Any().(streamIdProvider)
if ok {
Expand Down
9 changes: 5 additions & 4 deletions core/node/rpc/node2node.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rpc

import (
"context"
"github.com/river-build/river/core/node/utils"

"connectrpc.com/connect"

Expand All @@ -15,7 +16,7 @@ func (s *Service) AllocateStream(
ctx context.Context,
req *connect.Request[AllocateStreamRequest],
) (*connect.Response[AllocateStreamResponse], error) {
ctx, log := ctxAndLogForRequest(ctx, req)
ctx, log := utils.CtxAndLogForRequest(ctx, req)
log.Debug("AllocateStream ENTER")
r, e := s.allocateStream(ctx, req.Msg)
if e != nil {
Expand Down Expand Up @@ -51,7 +52,7 @@ func (s *Service) NewEventReceived(
ctx context.Context,
req *connect.Request[NewEventReceivedRequest],
) (*connect.Response[NewEventReceivedResponse], error) {
ctx, log := ctxAndLogForRequest(ctx, req)
ctx, log := utils.CtxAndLogForRequest(ctx, req)
log.Debug("NewEventReceived ENTER")
r, e := s.newEventReceived(ctx, req.Msg)
if e != nil {
Expand Down Expand Up @@ -105,7 +106,7 @@ func (s *Service) ProposeMiniblock(
ctx context.Context,
req *connect.Request[ProposeMiniblockRequest],
) (*connect.Response[ProposeMiniblockResponse], error) {
ctx, log := ctxAndLogForRequest(ctx, req)
ctx, log := utils.CtxAndLogForRequest(ctx, req)
log.Debug("ProposeMiniblock ENTER")
r, e := s.proposeMiniblock(ctx, req.Msg)
if e != nil {
Expand Down Expand Up @@ -148,7 +149,7 @@ func (s *Service) SaveMiniblockCandidate(
ctx context.Context,
req *connect.Request[SaveMiniblockCandidateRequest],
) (*connect.Response[SaveMiniblockCandidateResponse], error) {
ctx, log := ctxAndLogForRequest(ctx, req)
ctx, log := utils.CtxAndLogForRequest(ctx, req)
log.Debug("SaveMiniblockCandidate ENTER")
r, e := s.saveMiniblockCandidate(ctx, req.Msg)
if e != nil {
Expand Down
13 changes: 6 additions & 7 deletions core/node/rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ import (
"connectrpc.com/connect"
"github.com/ethereum/go-ethereum/common"
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/cors"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"

"github.com/river-build/river/core/config"
"github.com/river-build/river/core/node/auth"
. "github.com/river-build/river/core/node/base"
Expand All @@ -31,8 +27,12 @@ import (
. "github.com/river-build/river/core/node/protocol"
"github.com/river-build/river/core/node/protocol/protocolconnect"
"github.com/river-build/river/core/node/registries"
"github.com/river-build/river/core/node/rpc/sync"
"github.com/river-build/river/core/node/storage"
"github.com/river-build/river/core/xchain/entitlement"
"github.com/rs/cors"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
)

const (
Expand Down Expand Up @@ -563,11 +563,10 @@ func (s *Service) initCacheAndSync() error {

s.mbProducer = events.NewMiniblockProducer(s.serverCtx, s.cache, nil)

s.syncHandler = NewSyncHandler(
s.wallet,
s.syncHandler = sync.NewHandler(
s.wallet.Address,
s.cache,
s.nodeRegistry,
s.streamRegistry,
)

return nil
Expand Down
3 changes: 2 additions & 1 deletion core/node/rpc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rpc

import (
"context"
river_sync "github.com/river-build/river/core/node/rpc/sync"
"log/slog"
"net"
"net/http"
Expand Down Expand Up @@ -46,7 +47,7 @@ type Service struct {
// Streams
cache events.StreamCache
mbProducer events.MiniblockProducer
syncHandler SyncHandler
syncHandler river_sync.Handler

// River chain
riverChain *crypto.Blockchain
Expand Down
55 changes: 55 additions & 0 deletions core/node/rpc/service_sync_streams.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package rpc

import (
"connectrpc.com/connect"
"context"
. "github.com/river-build/river/core/node/protocol"
)

// TODO: wire metrics.
// var (
// syncStreamsRequests = infra.NewSuccessMetrics("sync_streams_requests", serviceRequests)
// syncStreamsResultSize = infra.NewCounter("sync_streams_result_size", "The total number of events returned by sync streams")
// )

// func addUpdatesToCounter(updates []*StreamAndCookie) {
// for _, stream := range updates {
// syncStreamsResultSize.Add(float64(len(stream.Events)))
// }
// }

func (s *Service) SyncStreams(
ctx context.Context,
req *connect.Request[SyncStreamsRequest],
res *connect.ServerStream[SyncStreamsResponse],
) error {
return s.syncHandler.SyncStreams(ctx, req, res)
}

func (s *Service) AddStreamToSync(
ctx context.Context,
req *connect.Request[AddStreamToSyncRequest],
) (*connect.Response[AddStreamToSyncResponse], error) {
return s.syncHandler.AddStreamToSync(ctx, req)
}

func (s *Service) RemoveStreamFromSync(
ctx context.Context,
req *connect.Request[RemoveStreamFromSyncRequest],
) (*connect.Response[RemoveStreamFromSyncResponse], error) {
return s.syncHandler.RemoveStreamFromSync(ctx, req)
}

func (s *Service) CancelSync(
ctx context.Context,
req *connect.Request[CancelSyncRequest],
) (*connect.Response[CancelSyncResponse], error) {
return s.syncHandler.CancelSync(ctx, req)
}

func (s *Service) PingSync(
ctx context.Context,
req *connect.Request[PingSyncRequest],
) (*connect.Response[PingSyncResponse], error) {
return s.syncHandler.PingSync(ctx, req)
}
Loading
Loading