Skip to content

Commit

Permalink
node: Make stream sync more resilient
Browse files Browse the repository at this point in the history
  • Loading branch information
bas-vk committed Jul 16, 2024
1 parent 3ffa5d8 commit 671a903
Show file tree
Hide file tree
Showing 20 changed files with 1,834 additions and 1,569 deletions.
4 changes: 4 additions & 0 deletions core/node/events/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@ type Stream interface {
}

type SyncResultReceiver interface {
// OnUpdate is called each time a new cookie is available for a stream
OnUpdate(r *StreamAndCookie)
// OnSyncError is called when a sync subscription failed unrecoverable
OnSyncError(err error)
// OnStreamSyncDown is called when updates for a stream could not be given.
OnStreamSyncDown(StreamId)
}

// TODO: refactor interfaces.
Expand Down
5 changes: 5 additions & 0 deletions core/node/events/stream_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ func TestCacheEvictionWithFilledMiniBlockPool(t *testing.T) {
type testStreamCacheViewEvictionSub struct {
receivedStreamAndCookies []*protocol.StreamAndCookie
receivedErrors []error
streamErrors []shared.StreamId
}

func (sub *testStreamCacheViewEvictionSub) OnUpdate(sac *protocol.StreamAndCookie) {
Expand All @@ -209,6 +210,10 @@ func (sub *testStreamCacheViewEvictionSub) OnSyncError(err error) {
sub.receivedErrors = append(sub.receivedErrors, err)
}

func (sub *testStreamCacheViewEvictionSub) OnStreamSyncDown(streamID shared.StreamId) {
sub.streamErrors = append(sub.streamErrors, streamID)
}

func (sub *testStreamCacheViewEvictionSub) eventsReceived() int {
count := 0
for _, sac := range sub.receivedStreamAndCookies {
Expand Down
404 changes: 209 additions & 195 deletions core/node/protocol/protocol.pb.go

Large diffs are not rendered by default.

30 changes: 30 additions & 0 deletions core/node/rpc/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/river-build/river/core/node/rpc/sync"
"log/slog"
"strconv"

Expand Down Expand Up @@ -45,6 +46,7 @@ func (s *Service) info(
) (*connect.Response[InfoResponse], error) {
if len(request.Msg.Debug) > 0 {
debug := request.Msg.Debug[0]

if debug == "error" {
return nil, RiverError(Err_DEBUG_ERROR, "Error requested through Info request")
} else if debug == "network_error" {
Expand All @@ -54,6 +56,8 @@ func (s *Service) info(
return nil, errors.New("error requested through Info request")
} else if debug == "make_miniblock" {
return s.debugInfoMakeMiniblock(ctx, request)
} else if debug == "drop_stream" {
return s.debugDropStream(ctx, request)
}

if s.config.EnableTestAPIs {
Expand Down Expand Up @@ -83,6 +87,32 @@ func (s *Service) info(
}), nil
}

func (s *Service) debugDropStream(
ctx context.Context,
request *connect.Request[InfoRequest],
) (*connect.Response[InfoResponse], error) {
if len(request.Msg.GetDebug()) < 3 {
return nil, RiverError(Err_DEBUG_ERROR, "drop_stream requires a sync id and stream id")
}

syncID := request.Msg.Debug[1]
streamID, err := shared.StreamIdFromString(request.Msg.Debug[2])
if err != nil {
return nil, err
}

dbgHandler, ok := s.syncHandler.(sync.DebugHandler)
if !ok {
return nil, RiverError(Err_UNAVAILABLE, "Drop stream not supported")
}

if err = dbgHandler.DebugDropStream(ctx, syncID, streamID); err != nil {
return nil, err
}

return connect.NewResponse(&InfoResponse{}), nil
}

func (s *Service) debugInfoMakeMiniblock(
ctx context.Context,
request *connect.Request[InfoRequest],
Expand Down
15 changes: 7 additions & 8 deletions core/node/rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,6 @@ import (

"connectrpc.com/connect"
"github.com/ethereum/go-ethereum/common"
"github.com/rs/cors"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
httptrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/net/http"

"github.com/river-build/river/core/config"
"github.com/river-build/river/core/node/auth"
. "github.com/river-build/river/core/node/base"
Expand All @@ -32,8 +27,13 @@ import (
. "github.com/river-build/river/core/node/protocol"
"github.com/river-build/river/core/node/protocol/protocolconnect"
"github.com/river-build/river/core/node/registries"
"github.com/river-build/river/core/node/rpc/sync"
"github.com/river-build/river/core/node/storage"
"github.com/river-build/river/core/xchain/entitlement"
"github.com/rs/cors"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
httptrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/net/http"
)

const (
Expand Down Expand Up @@ -504,11 +504,10 @@ func (s *Service) initCacheAndSync() error {

s.mbProducer = events.NewMiniblockProducer(s.serverCtx, s.cache, nil)

s.syncHandler = NewSyncHandler(
s.wallet,
s.syncHandler = sync.NewHandler(
s.wallet.Address,
s.cache,
s.nodeRegistry,
s.streamRegistry,
)

return nil
Expand Down
3 changes: 2 additions & 1 deletion core/node/rpc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rpc

import (
"context"
river_sync "github.com/river-build/river/core/node/rpc/sync"
"log/slog"
"net"
"net/http"
Expand Down Expand Up @@ -44,7 +45,7 @@ type Service struct {
// Streams
cache events.StreamCache
mbProducer events.MiniblockProducer
syncHandler SyncHandler
syncHandler river_sync.Handler

// River chain
riverChain *crypto.Blockchain
Expand Down
55 changes: 55 additions & 0 deletions core/node/rpc/service_sync_streams.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package rpc

import (
"connectrpc.com/connect"
"context"
. "github.com/river-build/river/core/node/protocol"
)

// TODO: wire metrics.
// var (
// syncStreamsRequests = infra.NewSuccessMetrics("sync_streams_requests", serviceRequests)
// syncStreamsResultSize = infra.NewCounter("sync_streams_result_size", "The total number of events returned by sync streams")
// )

// func addUpdatesToCounter(updates []*StreamAndCookie) {
// for _, stream := range updates {
// syncStreamsResultSize.Add(float64(len(stream.Events)))
// }
// }

func (s *Service) SyncStreams(
ctx context.Context,
req *connect.Request[SyncStreamsRequest],
res *connect.ServerStream[SyncStreamsResponse],
) error {
return s.syncHandler.SyncStreams(ctx, req, res)
}

func (s *Service) AddStreamToSync(
ctx context.Context,
req *connect.Request[AddStreamToSyncRequest],
) (*connect.Response[AddStreamToSyncResponse], error) {
return s.syncHandler.AddStreamToSync(ctx, req)
}

func (s *Service) RemoveStreamFromSync(
ctx context.Context,
req *connect.Request[RemoveStreamFromSyncRequest],
) (*connect.Response[RemoveStreamFromSyncResponse], error) {
return s.syncHandler.RemoveStreamFromSync(ctx, req)
}

func (s *Service) CancelSync(
ctx context.Context,
req *connect.Request[CancelSyncRequest],
) (*connect.Response[CancelSyncResponse], error) {
return s.syncHandler.CancelSync(ctx, req)
}

func (s *Service) PingSync(
ctx context.Context,
req *connect.Request[PingSyncRequest],
) (*connect.Response[PingSyncResponse], error) {
return s.syncHandler.PingSync(ctx, req)
}
Loading

0 comments on commit 671a903

Please sign in to comment.