Skip to content

Commit

Permalink
node: Make stream sync more resilient
Browse files Browse the repository at this point in the history
  • Loading branch information
bas-vk committed Jul 15, 2024
1 parent 1893e10 commit 12d6537
Show file tree
Hide file tree
Showing 19 changed files with 1,815 additions and 1,571 deletions.
4 changes: 4 additions & 0 deletions core/node/events/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@ type Stream interface {
}

type SyncResultReceiver interface {
// OnUpdate is called each time a new cookie is available for a stream
OnUpdate(r *StreamAndCookie)
// OnSyncError is called when a sync subscription failed unrecoverable
OnSyncError(err error)
// OnStreamSyncDown is called when updates for a stream could not be given.
OnStreamSyncDown(StreamId)
}

// TODO: refactor interfaces.
Expand Down
5 changes: 5 additions & 0 deletions core/node/events/stream_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ func TestCacheEvictionWithFilledMiniBlockPool(t *testing.T) {
type testStreamCacheViewEvictionSub struct {
receivedStreamAndCookies []*protocol.StreamAndCookie
receivedErrors []error
streamErrors []shared.StreamId
}

func (sub *testStreamCacheViewEvictionSub) OnUpdate(sac *protocol.StreamAndCookie) {
Expand All @@ -209,6 +210,10 @@ func (sub *testStreamCacheViewEvictionSub) OnSyncError(err error) {
sub.receivedErrors = append(sub.receivedErrors, err)
}

func (sub *testStreamCacheViewEvictionSub) OnStreamSyncDown(streamID shared.StreamId) {
sub.streamErrors = append(sub.streamErrors, streamID)
}

func (sub *testStreamCacheViewEvictionSub) eventsReceived() int {
count := 0
for _, sac := range sub.receivedStreamAndCookies {
Expand Down
404 changes: 209 additions & 195 deletions core/node/protocol/protocol.pb.go

Large diffs are not rendered by default.

15 changes: 7 additions & 8 deletions core/node/rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,6 @@ import (

"connectrpc.com/connect"
"github.com/ethereum/go-ethereum/common"
"github.com/rs/cors"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
httptrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/net/http"

"github.com/river-build/river/core/config"
"github.com/river-build/river/core/node/auth"
. "github.com/river-build/river/core/node/base"
Expand All @@ -32,8 +27,13 @@ import (
. "github.com/river-build/river/core/node/protocol"
"github.com/river-build/river/core/node/protocol/protocolconnect"
"github.com/river-build/river/core/node/registries"
"github.com/river-build/river/core/node/rpc/sync"
"github.com/river-build/river/core/node/storage"
"github.com/river-build/river/core/xchain/entitlement"
"github.com/rs/cors"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
httptrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/net/http"
)

const (
Expand Down Expand Up @@ -502,11 +502,10 @@ func (s *Service) initCacheAndSync() error {

s.mbProducer = events.NewMiniblockProducer(s.serverCtx, s.cache, nil)

s.syncHandler = NewSyncHandler(
s.wallet,
s.syncHandler = sync.NewHandler(
s.wallet.Address,
s.cache,
s.nodeRegistry,
s.streamRegistry,
)

return nil
Expand Down
3 changes: 2 additions & 1 deletion core/node/rpc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rpc

import (
"context"
river_sync "github.com/river-build/river/core/node/rpc/sync"
"log/slog"
"net"
"net/http"
Expand Down Expand Up @@ -44,7 +45,7 @@ type Service struct {
// Streams
cache events.StreamCache
mbProducer events.MiniblockProducer
syncHandler SyncHandler
syncHandler river_sync.Handler

// River chain
riverChain *crypto.Blockchain
Expand Down
55 changes: 55 additions & 0 deletions core/node/rpc/service_sync_streams.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package rpc

import (
"connectrpc.com/connect"
"context"
. "github.com/river-build/river/core/node/protocol"
)

// TODO: wire metrics.
// var (
// syncStreamsRequests = infra.NewSuccessMetrics("sync_streams_requests", serviceRequests)
// syncStreamsResultSize = infra.NewCounter("sync_streams_result_size", "The total number of events returned by sync streams")
// )

// func addUpdatesToCounter(updates []*StreamAndCookie) {
// for _, stream := range updates {
// syncStreamsResultSize.Add(float64(len(stream.Events)))
// }
// }

func (s *Service) SyncStreams(
ctx context.Context,
req *connect.Request[SyncStreamsRequest],
res *connect.ServerStream[SyncStreamsResponse],
) error {
return s.syncHandler.SyncStreams(ctx, req, res)
}

func (s *Service) AddStreamToSync(
ctx context.Context,
req *connect.Request[AddStreamToSyncRequest],
) (*connect.Response[AddStreamToSyncResponse], error) {
return s.syncHandler.AddStreamToSync(ctx, req)
}

func (s *Service) RemoveStreamFromSync(
ctx context.Context,
req *connect.Request[RemoveStreamFromSyncRequest],
) (*connect.Response[RemoveStreamFromSyncResponse], error) {
return s.syncHandler.RemoveStreamFromSync(ctx, req)
}

func (s *Service) CancelSync(
ctx context.Context,
req *connect.Request[CancelSyncRequest],
) (*connect.Response[CancelSyncResponse], error) {
return s.syncHandler.CancelSync(ctx, req)
}

func (s *Service) PingSync(
ctx context.Context,
req *connect.Request[PingSyncRequest],
) (*connect.Response[PingSyncResponse], error) {
return s.syncHandler.PingSync(ctx, req)
}
Loading

0 comments on commit 12d6537

Please sign in to comment.