diff --git a/core/client/syncer/checker/checker.go b/core/client/syncer/checker/checker.go new file mode 100644 index 000000000..d59c447ae --- /dev/null +++ b/core/client/syncer/checker/checker.go @@ -0,0 +1,311 @@ +package checker + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/common" + + "github.com/river-build/river/core/client/syncer" + "github.com/river-build/river/core/config" + "github.com/river-build/river/core/contracts/river" + "github.com/river-build/river/core/node/crypto" + "github.com/river-build/river/core/node/dlog" + "github.com/river-build/river/core/node/infra" + "github.com/river-build/river/core/node/nodes" + . "github.com/river-build/river/core/node/protocol" + "github.com/river-build/river/core/node/registries" + . "github.com/river-build/river/core/node/shared" + "github.com/river-build/river/core/utils/syncmap" +) + +type streamState struct { + streamId StreamId + + mu sync.Mutex + bcMbNum int64 + bcMbHash common.Hash + syncMbNum int64 + syncMbHash common.Hash +} + +func newStreamState(streamId StreamId) *streamState { + return &streamState{ + streamId: streamId, + bcMbNum: -1, + syncMbNum: -1, + } +} + +func (s *streamState) onUpdate(ctx context.Context, bcMbHash common.Hash, bcMbNum uint64, stats *streamCheckerStats) { + s.mu.Lock() + defer s.mu.Unlock() + + s.bcMbHash = bcMbHash + s.bcMbNum = int64(bcMbNum) + + s.compare(ctx, "blockchain", stats) +} + +func (s *streamState) onSync(ctx context.Context, update *syncer.SyncUpdate, stats *streamCheckerStats) { + s.mu.Lock() + defer s.mu.Unlock() + + s.syncMbNum = update.Stream.GetNextSyncCookie().GetMinipoolGen() - 1 + s.syncMbHash = common.Hash(update.Stream.GetNextSyncCookie().GetPrevMiniblockHash()) + + s.compare(ctx, "sync", stats) +} + +func withinOne(a, b int64) bool { + diff := a - b + return diff >= -1 && diff <= 1 +} + +func (s *streamState) compare(ctx context.Context, lastUpdate string, stats *streamCheckerStats) { + log := dlog.FromCtx(ctx) + + if s.bcMbNum != -1 && s.syncMbNum != -1 && !withinOne(s.bcMbNum, s.syncMbNum) { + log.Error( + "Miniblock number out of sync", + "streamId", + s.streamId, + "bcMbNum", + s.bcMbNum, + "syncMbNum", + s.syncMbNum, + "lastUpdate", + lastUpdate, + ) + stats.outOfSync.Add(1) + } else if s.bcMbNum != -1 && s.bcMbNum == s.syncMbNum && s.bcMbHash != s.syncMbHash { + log.Error( + "Miniblock hash mismatch", + "streamId", s.streamId, "bcMbNum", s.bcMbNum, "syncMbNum", s.syncMbNum, + "bcHash", s.bcMbHash, "syncHash", s.syncMbHash, + "lastUpdate", lastUpdate) + stats.hashMismatch.Add(1) + } else if s.bcMbNum == -1 || s.syncMbNum == -1 { + log.Debug("Waiting for initial miniblock", "streamId", s.streamId, "bcMbNum", s.bcMbNum, "syncMbNum", s.syncMbNum, "lastUpdate", lastUpdate) + stats.waitingForInit.Add(1) + } else { + log.Debug("Miniblock number in sync", "streamId", s.streamId, "bcMbNum", s.bcMbNum, "syncMbNum", s.syncMbNum, "lastUpdate", lastUpdate) + stats.inSync.Add(1) + } +} + +type streamCheckerStats struct { + bcUpdates atomic.Uint64 + syncUpdates atomic.Uint64 + streams atomic.Uint64 + outOfSync atomic.Uint64 + hashMismatch atomic.Uint64 + waitingForInit atomic.Uint64 + inSync atomic.Uint64 + down atomic.Uint64 + up atomic.Uint64 + added atomic.Uint64 +} + +type streamChecker struct { + config *config.Config + blockchain *crypto.Blockchain + registry *registries.RiverRegistryContract + + updates chan *syncer.SyncUpdate + + syncReceiver syncer.SyncReceiver + + streams syncmap.Typed[StreamId, *streamState] + + stats streamCheckerStats +} + +func StartStreamChecker( + ctx context.Context, + config *config.Config, + node common.Address, + onExit chan<- error, +) error { + checker := &streamChecker{ + config: config, + updates: make(chan *syncer.SyncUpdate, 100), + } + + var err error + checker.blockchain, err = crypto.NewBlockchain( + ctx, + &config.RiverChain, + nil, + infra.NewMetricsFactory(nil, "river", "cmdline"), + nil, + ) + if err != nil { + return err + } + + checker.blockchain.StartChainMonitor(ctx) + + checker.registry, err = registries.NewRiverRegistryContract(ctx, checker.blockchain, &config.RegistryContract) + if err != nil { + return err + } + + err = checker.registry.OnStreamEvent( + ctx, + checker.blockchain.InitialBlockNum, + checker.onAllocated, + checker.onLastMiniblockUpdated, + checker.onPlacementUpdated, + ) + if err != nil { + return err + } + + nodeRegistry, err := nodes.LoadNodeRegistry( + ctx, + checker.registry, + common.Address{}, + checker.blockchain.InitialBlockNum, + checker.blockchain.ChainMonitor, + nil, + ) + if err != nil { + return err + } + + stub, err := nodeRegistry.GetStreamServiceClientForAddress(node) + if err != nil { + return err + } + + checker.syncReceiver, err = syncer.StartSyncReceiver(ctx, stub, onExit) + if err != nil { + return err + } + + go checker.run(ctx) + + return nil +} + +func (c *streamChecker) onAllocated(ctx context.Context, event *river.StreamRegistryV1StreamAllocated) { + streamId := StreamId(event.StreamId) + state := newStreamState(streamId) + c.streams.Store(streamId, state) + c.stats.streams.Add(1) + c.stats.bcUpdates.Add(1) + go c.addToSync(ctx, state) +} + +func (c *streamChecker) onLastMiniblockUpdated( + ctx context.Context, + event *river.StreamRegistryV1StreamLastMiniblockUpdated, +) { + c.stats.bcUpdates.Add(1) + + streamId := StreamId(event.StreamId) + state, loaded := c.streams.Load(streamId) + if !loaded { + state = newStreamState(streamId) + c.streams.Store(streamId, state) + c.stats.streams.Add(1) + go c.addToSync(ctx, state) + return + } + + state.onUpdate(ctx, common.Hash(event.LastMiniblockHash), event.LastMiniblockNum, &c.stats) +} + +func (c *streamChecker) onPlacementUpdated(ctx context.Context, event *river.StreamRegistryV1StreamPlacementUpdated) { + // Do nothing +} + +func (c *streamChecker) addToSync(ctx context.Context, state *streamState) { + info, err := c.registry.GetStream(ctx, state.streamId) + if err != nil { + dlog.FromCtx(ctx).Error("addToSync: Failed to get stream", "error", err) + return + } + state.onUpdate(ctx, info.LastMiniblockHash, info.LastMiniblockNum, &c.stats) + cookie := &SyncCookie{ + NodeAddress: info.Nodes[0][:], + StreamId: state.streamId[:], + MinipoolGen: int64(info.LastMiniblockNum + 1), + PrevMiniblockHash: info.LastMiniblockHash[:], + } + + err = c.syncReceiver.AddStream(ctx, state.streamId, cookie, c.updates) + if err != nil { + dlog.FromCtx(ctx).Error("addToSync: Failed to add stream", "error", err) + } +} + +func (c *streamChecker) run(ctx context.Context) { + log := dlog.FromCtx(ctx) + + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case update := <-c.updates: + c.handleUpdate(ctx, update) + case <-ticker.C: + log.Info( + "Stream checker stats", + "bcUpdates", + c.stats.bcUpdates.Load(), + "syncUpdates", + c.stats.syncUpdates.Load(), + "streams", + c.stats.streams.Load(), + "outOfSync", + c.stats.outOfSync.Load(), + "hashMismatch", + c.stats.hashMismatch.Load(), + "waitingForInit", + c.stats.waitingForInit.Load(), + "inSync", + c.stats.inSync.Load(), + "down", + c.stats.down.Load(), + "up", + c.stats.up.Load(), + "added", + c.stats.added.Load(), + ) + } + } +} + +func (c *streamChecker) handleUpdate(ctx context.Context, update *syncer.SyncUpdate) { + c.stats.syncUpdates.Add(1) + + streamId, err := StreamIdFromBytes(update.Stream.GetNextSyncCookie().GetStreamId()) + if err != nil { + dlog.FromCtx(ctx).Error("handleUpdate: Failed to parse stream id", "error", err) + return + } + s, loaded := c.streams.Load(streamId) + if !loaded { + dlog.FromCtx(ctx).Error("handleUpdate:Stream not found", "streamId", streamId) + return + } + + if update.Status != syncer.SyncUpdate_Down { + s.onSync(ctx, update, &c.stats) + + switch update.Status { //nolint:exhaustive + case syncer.SyncUpdate_Up: + c.stats.up.Add(1) + case syncer.SyncUpdate_Added: + c.stats.added.Add(1) + } + } else { + c.stats.down.Add(1) + } +} diff --git a/core/client/syncer/receiver.go b/core/client/syncer/receiver.go new file mode 100644 index 000000000..f96e85d6c --- /dev/null +++ b/core/client/syncer/receiver.go @@ -0,0 +1,340 @@ +package syncer + +import ( + "context" + "sync" + "time" + + "connectrpc.com/connect" + + . "github.com/river-build/river/core/node/base" + "github.com/river-build/river/core/node/dlog" + . "github.com/river-build/river/core/node/protocol" + "github.com/river-build/river/core/node/protocol/protocolconnect" + . "github.com/river-build/river/core/node/shared" +) + +type SyncUpdateStatus int + +const ( + SyncUpdate_Update SyncUpdateStatus = iota + SyncUpdate_Down + SyncUpdate_Up + SyncUpdate_Added +) + +type SyncUpdate struct { + Status SyncUpdateStatus + Id StreamId + Stream *StreamAndCookie +} + +type SyncReceiver interface { + AddStream(ctx context.Context, stream StreamId, cookie *SyncCookie, c chan<- *SyncUpdate) error +} + +func StartSyncReceiver( + ctx context.Context, + stub protocolconnect.StreamServiceClient, + onSyncExit chan<- error, +) (SyncReceiver, error) { + resp, err := stub.SyncStreams(ctx, connect.NewRequest(&SyncStreamsRequest{})) + if err != nil { + return nil, err + } + + // Receive syncId + received := resp.Receive() + if !received { + return nil, resp.Err() + } + + msg := resp.Msg() + if msg.SyncOp != SyncOp_SYNC_NEW { + defer resp.Close() + return nil, RiverError(Err_BAD_SYNC_COOKIE, "expected new sync", "syncOp", msg.SyncOp) + } + + receiver := &syncReceiver{ + syncId: msg.SyncId, + stub: stub, + streams: make(map[StreamId]*streamInfo), + } + + go receiver.receive(ctx, resp, onSyncExit) + + return receiver, nil +} + +type streamInfoStatus int + +const ( + streamInfoStatus_Ok streamInfoStatus = iota + streamInfoStatus_Added + streamInfoStatus_Down +) + +type streamInfo struct { + cookie *SyncCookie + status streamInfoStatus + ch chan<- *SyncUpdate +} + +type syncReceiver struct { + syncId string + stub protocolconnect.StreamServiceClient + + mu sync.Mutex + streams map[StreamId]*streamInfo +} + +var _ SyncReceiver = &syncReceiver{} + +func (s *syncReceiver) receive( + ctx context.Context, + resp *connect.ServerStreamForClient[SyncStreamsResponse], + onSyncExit chan<- error, +) { + log := dlog.FromCtx(ctx) + defer resp.Close() + + for { + select { + case <-ctx.Done(): + onSyncExit <- ctx.Err() + return + default: + received := resp.Receive() + if !received { + onSyncExit <- resp.Err() + return + } + + msg := resp.Msg() + log.Debug("received sync message", "syncId", s.syncId, "msg", msg) + switch msg.SyncOp { + case SyncOp_SYNC_NEW: + onSyncExit <- RiverError(Err_BAD_SYNC_COOKIE, "only one SYNC_NEW is expected", "syncId", s.syncId).LogError(log) + return + case SyncOp_SYNC_CLOSE: + log.Info("received sync close", "syncId", s.syncId) + onSyncExit <- nil + return + case SyncOp_SYNC_UPDATE: + s.handleUpdate(ctx, msg) + case SyncOp_SYNC_PONG: + s.handlePong(ctx, msg) + case SyncOp_SYNC_DOWN: + s.handleDown(ctx, msg) + case SyncOp_SYNC_UNSPECIFIED: + fallthrough + default: + log.Error("unknown sync op", "syncId", s.syncId, "syncOp", msg.SyncOp) + } + } + } +} + +func (s *syncReceiver) handleUpdate(ctx context.Context, msg *SyncStreamsResponse) { + log := dlog.FromCtx(ctx) + + ch, update, err := s.handleUpdateImpl(ctx, msg) + if err != nil { + log.Error("error handling update", "syncId", s.syncId, "error", err) + return + } + + ch <- update +} + +func (s *syncReceiver) handleUpdateImpl( + ctx context.Context, + msg *SyncStreamsResponse, +) (chan<- *SyncUpdate, *SyncUpdate, error) { + id, err := StreamIdFromBytes(msg.Stream.GetNextSyncCookie().GetStreamId()) + if err != nil { + return nil, nil, err + } + + s.mu.Lock() + defer s.mu.Unlock() + + stream, ok := s.streams[id] + if !ok { + return nil, nil, RiverError( + Err_BAD_SYNC_COOKIE, + "stream not found in sync", + "streamId", + stream, + "syncId", + s.syncId, + ) + } + + stream.cookie = msg.Stream.GetNextSyncCookie() + + upd := SyncUpdate_Update + if stream.status == streamInfoStatus_Down { + upd = SyncUpdate_Up + // TODO: cancel retries + } else if stream.status == streamInfoStatus_Added { + upd = SyncUpdate_Added + } + stream.status = streamInfoStatus_Ok + + return stream.ch, &SyncUpdate{ + Status: upd, + Id: id, + Stream: msg.Stream, + }, nil +} + +func (s *syncReceiver) handlePong(ctx context.Context, msg *SyncStreamsResponse) { + log := dlog.FromCtx(ctx) + + log.Info("received pong", "syncId", s.syncId, "pong", msg.PongNonce) + + // TODO: handle pong +} + +func (s *syncReceiver) handleDown(ctx context.Context, msg *SyncStreamsResponse) { + log := dlog.FromCtx(ctx) + + log.Info("received down", "syncId", s.syncId, "streamId", msg.StreamId) + + ch, update, err := s.handleDownImpl(ctx, msg) + if err != nil { + log.Error("error handling down", "syncId", s.syncId, "error", err) + return + } + + ch <- update + + go s.retryDownStream(ctx, update.Id) +} + +func (s *syncReceiver) handleDownImpl( + ctx context.Context, + msg *SyncStreamsResponse, +) (chan<- *SyncUpdate, *SyncUpdate, error) { + id, err := StreamIdFromBytes(msg.GetStreamId()) + if err != nil { + return nil, nil, err + } + + s.mu.Lock() + defer s.mu.Unlock() + + stream, ok := s.streams[id] + if !ok { + return nil, nil, RiverError( + Err_BAD_SYNC_COOKIE, + "stream not found in sync", + "streamId", + stream, + "syncId", + s.syncId, + ) + } + + if stream.status == streamInfoStatus_Down { + dlog.FromCtx(ctx).Error("stream already down", "streamId", id, "syncId", s.syncId) + } + stream.status = streamInfoStatus_Down + + return stream.ch, &SyncUpdate{ + Status: SyncUpdate_Down, + Id: id, + }, nil +} + +func (s *syncReceiver) AddStream( + ctx context.Context, + streamId StreamId, + cookie *SyncCookie, + c chan<- *SyncUpdate, +) error { + dlog.FromCtx(ctx).Debug("adding stream to sync", "syncId", s.syncId, "cookie", cookie) + + s.insertStream(ctx, streamId, cookie, c) + + _, err := s.stub.AddStreamToSync(ctx, connect.NewRequest(&AddStreamToSyncRequest{ + SyncId: s.syncId, + SyncPos: cookie, + })) + if err != nil { + // Notice that this error can be a transport error, and stream still can be successfully added + // in this case, because of this it can't be removed from the map here. + return err + } + + // TODO: add monitor for this stream to become added. + return nil +} + +func (s *syncReceiver) insertStream( + ctx context.Context, + streamId StreamId, + cookie *SyncCookie, + c chan<- *SyncUpdate, +) { + s.mu.Lock() + defer s.mu.Unlock() + + _, exists := s.streams[streamId] + if exists { + dlog.FromCtx(ctx). + Warn("stream already added to sync, maybe this is a retry on transport error", "streamId", streamId, "syncId", s.syncId) + return + } + + s.streams[streamId] = &streamInfo{ + cookie: cookie, + status: streamInfoStatus_Added, + ch: c, + } +} + +func (s *syncReceiver) getRetryCookie(streamId StreamId) *SyncCookie { + s.mu.Lock() + defer s.mu.Unlock() + + stream, ok := s.streams[streamId] + if !ok || stream.status != streamInfoStatus_Down { + return nil + } + return stream.cookie +} + +func (s *syncReceiver) retryDownStream(ctx context.Context, streamId StreamId) { + dlog.FromCtx(ctx).Debug("retrying down stream", "streamId", streamId, "syncId", s.syncId) + + duration := 1 * time.Second + timer := time.NewTimer(duration) + defer timer.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-timer.C: + cookie := s.getRetryCookie(streamId) + if cookie == nil { + return + } + + _, err := s.stub.AddStreamToSync(ctx, connect.NewRequest(&AddStreamToSyncRequest{ + SyncId: s.syncId, + SyncPos: cookie, + })) + if err == nil { + dlog.FromCtx(ctx).Debug("stream added back to sync", "streamId", streamId, "syncId", s.syncId) + // TODO: add monitor for this stream to become up. + return + } + + duration := max(duration*2, 30*time.Second) + timer.Reset(duration) + } + } +} diff --git a/core/cmd/debug_cmd.go b/core/cmd/debug_cmd.go new file mode 100644 index 000000000..a0563d78b --- /dev/null +++ b/core/cmd/debug_cmd.go @@ -0,0 +1,55 @@ +package cmd + +import ( + "context" + "os" + "os/signal" + "syscall" + + "github.com/ethereum/go-ethereum/common" + "github.com/spf13/cobra" + + "github.com/river-build/river/core/client/syncer/checker" + "github.com/river-build/river/core/config" +) + +func runDebugSync(ctx context.Context, cfg *config.Config, nodeAddr string) error { + node := common.HexToAddress(nodeAddr) + + onExit := make(chan error, 1) + err := checker.StartStreamChecker(ctx, cfg, node, onExit) + if err != nil { + return err + } + + // Wait for either Ctrl-C or onExit + osSignal := make(chan os.Signal, 1) + signal.Notify(osSignal, syscall.SIGINT, syscall.SIGTERM) + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-onExit: + return err + case <-osSignal: + return nil + } +} + +func init() { + cmd := &cobra.Command{ + Use: "debug", + Short: "Debug commands", + } + + cmd.AddCommand(&cobra.Command{ + Use: "sync ", + Short: "Sync streams from a specified node", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + nodeAddr := args[0] + return runDebugSync(cmd.Context(), cmdConfig, nodeAddr) + }, + }) + + rootCmd.AddCommand(cmd) +} diff --git a/core/utils/syncmap/typed.go b/core/utils/syncmap/typed.go new file mode 100644 index 000000000..d174fd923 --- /dev/null +++ b/core/utils/syncmap/typed.go @@ -0,0 +1,54 @@ +package syncmap + +import ( + "sync" +) + +type Typed[K any, V any] struct { + sync.Map +} + +func (m *Typed[K, V]) CompareAndDelete(key K, old V) (deleted bool) { + return m.Map.CompareAndDelete(key, old) +} + +func (m *Typed[K, V]) CompareAndSwap(key K, old, new V) (swapped bool) { + return m.Map.CompareAndSwap(key, old, new) +} + +func (m *Typed[K, V]) Delete(key K) { + m.Map.Delete(key) +} + +func (m *Typed[K, V]) Load(key K) (V, bool) { + v, ok := m.Map.Load(key) + if !ok { + var zero V + return zero, false + } + return v.(V), ok +} + +func (m *Typed[K, V]) LoadOrStore(key K, value V) (V, bool) { + v, loaded := m.Map.LoadOrStore(key, value) + return v.(V), loaded +} + +func (m *Typed[K, V]) Range(f func(key K, value V) bool) { + m.Map.Range(func(key, value any) bool { + return f(key.(K), value.(V)) + }) +} + +func (m *Typed[K, V]) Store(key K, value V) { + m.Map.Store(key, value) +} + +func (m *Typed[K, V]) Swap(key K, value V) (V, bool) { + v, loaded := m.Map.Swap(key, value) + if loaded { + return v.(V), loaded + } + var zero V + return zero, false +}