Skip to content

Commit

Permalink
node: store streams in local db and start from delta's instead of fet…
Browse files Browse the repository at this point in the history
…ching all streams on boot
  • Loading branch information
bas-vk committed Dec 11, 2024
1 parent 88fb0bc commit 13ab76a
Show file tree
Hide file tree
Showing 6 changed files with 395 additions and 25 deletions.
222 changes: 200 additions & 22 deletions core/node/events/stream_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package events

import (
"context"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"math/big"
"slices"
"sync/atomic"
"time"
Expand Down Expand Up @@ -112,29 +115,18 @@ func NewStreamCache(
}

func (s *streamCacheImpl) Start(ctx context.Context) error {
// schedule sync tasks for all streams that are local to this node.
// these tasks sync up the local db with the latest block in the registry.
var localStreamResults []*registries.GetStreamResult
err := s.params.Registry.ForAllStreams(
ctx,
s.params.AppliedBlockNum,
func(stream *registries.GetStreamResult) bool {
if slices.Contains(stream.Nodes, s.params.Wallet.Address) {
localStreamResults = append(localStreamResults, stream)
}
return true
},
)
retrievedStreams, err := s.retrieveStreams(ctx)
if err != nil {
return err
}

// load local streams in-memory cache
// load local streams in-memory cache and if enabled stream reconciliation is enabled
// create a sync task that syncs the local DB stream data with the streams registry.
initialSyncWorkerPool := workerpool.New(s.params.Config.StreamReconciliation.InitialWorkerPoolSize)
for _, stream := range localStreamResults {
for streamID, stream := range retrievedStreams {
si := &streamImpl{
params: s.params,
streamId: stream.StreamId,
streamId: streamID,
lastAppliedBlockNum: s.params.AppliedBlockNum,
local: &localStreamState{},
}
Expand All @@ -144,10 +136,10 @@ func (s *streamCacheImpl) Start(ctx context.Context) error {
s.submitSyncStreamTask(
ctx,
initialSyncWorkerPool,
stream.StreamId,
streamID,
&MiniblockRef{
Hash: stream.LastMiniblockHash,
Num: int64(stream.LastMiniblockNum),
Hash: stream.MiniblockHash,
Num: stream.MiniblockNumber,
},
)
}
Expand All @@ -156,9 +148,7 @@ func (s *streamCacheImpl) Start(ctx context.Context) error {
s.appliedBlockNum.Store(uint64(s.params.AppliedBlockNum))

// Close initial worker pool after all tasks are executed.
go func() {
initialSyncWorkerPool.StopWait()
}()
go initialSyncWorkerPool.StopWait()

// TODO: add buffered channel to avoid blocking ChainMonitor
s.params.RiverChain.ChainMonitor.OnBlockWithLogs(
Expand All @@ -177,6 +167,192 @@ func (s *streamCacheImpl) Start(ctx context.Context) error {
return nil
}

// retrieveStreams, either from persistent storage and apply delta since last integrated block.
// Or from the stream registry at s.params.AppliedBlockNum if there is no local state in persistent storage.
func (s *streamCacheImpl) retrieveStreams(ctx context.Context) (map[StreamId]*storage.StreamMetadata, error) {
// try to fetch latest streams state from the DB
streamsMetaData, lastBlock, err := s.params.Storage.AllStreamsMetaData(ctx)
if err != nil {
return nil, err
}

var removed []StreamId // streams replaced away from this node

if lastBlock == 0 { // first time, fetch from River chain
streamsMetaData, err = s.retrieveFromRiverChain(ctx)
} else { // retrieve stream updates since lastBlock and apply to streamsMetaDat
removed, err = s.applyDeltas(ctx, lastBlock, streamsMetaData)
}

if err != nil {
return nil, err
}

if err := s.params.Storage.UpdateStreamsMetaData(ctx, streamsMetaData, removed); err != nil {
return nil, WrapRiverError(Err_DB_OPERATION_FAILURE, err).
Func("NewStreamCache").
Message("Unable to update stream metadata records in DB")
}

return streamsMetaData, nil
}

func (s *streamCacheImpl) retrieveFromRiverChain(ctx context.Context) (map[StreamId]*storage.StreamMetadata, error) {
streams := make(map[StreamId]*storage.StreamMetadata)

err := s.params.Registry.ForAllStreams(ctx, s.params.AppliedBlockNum,
func(stream *registries.GetStreamResult) bool {
if slices.Contains(stream.Nodes, s.params.Wallet.Address) {
streams[stream.StreamId] = &storage.StreamMetadata{
StreamId: stream.StreamId,
RiverChainBlockNumber: s.params.AppliedBlockNum.AsUint64(),
RiverChainBlockLogIndex: storage.RiverChainBlockLogIndexUnspecified,
Nodes: stream.Nodes,
MiniblockHash: stream.LastMiniblockHash,
MiniblockNumber: int64(stream.LastMiniblockNum),
}
}
return true
})

if err != nil {
return nil, err
}

return streams, nil
}

// applyDeltas applies deltas on the given streams between [lastBlock, params.AppliedBlockNum]
// from RiverChain streams registry. It returns a list of streams that are allocated or replaced
// to this node and a list of streams that are removed from this node.
func (s *streamCacheImpl) applyDeltas(
ctx context.Context,
lastBlock int64,
streams map[StreamId]*storage.StreamMetadata,
) (removals []StreamId, err error) {
if lastBlock > int64(s.params.AppliedBlockNum.AsUint64()) {
return nil, RiverError(Err_BAD_BLOCK_NUMBER, "Local database is ahead of River Chain").
Func("loadStreamsUpdatesFromRiverChain").
Tags("riverChainLastBlock", lastBlock, "appliedBlockNum", s.params.AppliedBlockNum)
}

// fetch and apply changes that happened since latest sync
var (
streamRegistryContract = s.params.Registry.StreamRegistry.BoundContract()
from = lastBlock
to = int64(s.params.AppliedBlockNum.AsUint64())
query = ethereum.FilterQuery{
Addresses: []common.Address{s.params.Registry.Address},
Topics: [][]common.Hash{{
s.params.Registry.StreamRegistryAbi.Events[river.Event_StreamAllocated].ID,
s.params.Registry.StreamRegistryAbi.Events[river.Event_StreamLastMiniblockUpdated].ID,
s.params.Registry.StreamRegistryAbi.Events[river.Event_StreamPlacementUpdated].ID,
}},
}
maxBlockRange = int64(2000)
)

for from <= to {
toBlock := min(from+maxBlockRange, to)

query.FromBlock = big.NewInt(from)
query.ToBlock = big.NewInt(toBlock)

logs, err := s.params.RiverChain.Client.FilterLogs(ctx, query)
if err != nil {
return nil, WrapRiverError(Err_CANNOT_CALL_CONTRACT, err).
Message("Unable to fetch stream changes").
Tags("from", from, "to", toBlock).
Func("retrieveFromDeltas")
}

for _, log := range logs {
if len(log.Topics) == 0 {
continue
}

switch log.Topics[0] {
case s.params.Registry.StreamRegistryAbi.Events[river.Event_StreamAllocated].ID:
event := new(river.StreamRegistryV1StreamAllocated)
if err := streamRegistryContract.UnpackLog(event, river.Event_StreamAllocated, log); err != nil {
return nil, WrapRiverError(Err_BAD_EVENT, err).
Tags("transaction", log.TxHash, "logIdx", log.Index).
Message("Unable to unpack stream allocated log").
Func("retrieveFromDeltas")
}

if slices.Contains(event.Nodes, s.params.Wallet.Address) {
streams[event.StreamId] = &storage.StreamMetadata{
StreamId: event.StreamId,
RiverChainBlockNumber: log.BlockNumber,
RiverChainBlockLogIndex: log.Index,
Nodes: event.Nodes,
MiniblockHash: event.GenesisMiniblockHash,
MiniblockNumber: 0,
IsSealed: false,
}
}

case s.params.Registry.StreamRegistryAbi.Events[river.Event_StreamLastMiniblockUpdated].ID:
event := new(river.StreamRegistryV1StreamLastMiniblockUpdated)
if err := streamRegistryContract.UnpackLog(event, river.Event_StreamLastMiniblockUpdated, log); err != nil {
return nil, WrapRiverError(Err_BAD_EVENT, err).
Tags("transaction", log.TxHash, "logIdx", log.Index).
Message("Unable to unpack stream last miniblock updated log").
Func("retrieveFromDeltas")
}

if stream, ok := streams[event.StreamId]; ok {
stream.MiniblockHash = common.BytesToHash(event.LastMiniblockHash[:])
stream.MiniblockNumber = int64(event.LastMiniblockNum)
stream.IsSealed = event.IsSealed
}

case s.params.Registry.StreamRegistryAbi.Events[river.Event_StreamPlacementUpdated].ID:
event := new(river.StreamRegistryV1StreamPlacementUpdated)
if err := streamRegistryContract.UnpackLog(event, river.Event_StreamPlacementUpdated, log); err != nil {
return nil, WrapRiverError(Err_BAD_EVENT, err).
Tags("transaction", log.TxHash, "logIdx", log.Index).
Message("Unable to unpack stream placement updated log").
Func("retrieveFromDeltas")
}

if s.params.Wallet.Address == event.NodeAddress {
if event.IsAdded { // stream was replaced to this node
retrievedStream, err := s.params.Registry.GetStream(ctx, event.StreamId, s.params.AppliedBlockNum)
if err != nil {
return nil, WrapRiverError(Err_BAD_EVENT, err).
Tags("stream", event.StreamId, "transaction", log.TxHash, "logIdx", log.Index).
Message("Unable to retrieve replaced stream").
Func("retrieveFromDeltas")
}

streams[event.StreamId] = &storage.StreamMetadata{
StreamId: event.StreamId,
RiverChainBlockNumber: log.BlockNumber,
RiverChainBlockLogIndex: log.Index,
Nodes: retrievedStream.Nodes,
MiniblockHash: retrievedStream.LastMiniblockHash,
MiniblockNumber: int64(retrievedStream.LastMiniblockNum),
IsSealed: false,
}

slices.DeleteFunc(removals, func(streamID StreamId) bool {

Check failure on line 340 in core/node/events/stream_cache.go

View workflow job for this annotation

GitHub Actions / Common_CI

unusedresult: result of slices.DeleteFunc call not used (govet)
return streamID == event.StreamId
})
} else { // stream was replaced away from this node
removals = append(removals, event.StreamId)
}
}
}
}

from = toBlock + 1
}

return removals, nil
}

func (s *streamCacheImpl) onBlockWithLogs(ctx context.Context, blockNum crypto.BlockNumber, logs []*types.Log) {
streamEvents, errs := s.params.Registry.FilterStreamEvents(ctx, logs)
// Process parsed stream events even if some failed to parse
Expand All @@ -200,6 +376,8 @@ func (s *streamCacheImpl) onBlockWithLogs(ctx context.Context, blockNum crypto.B
}

s.appliedBlockNum.Store(uint64(blockNum))

// TODO(BvK): update last block in DB for delta sync
}

func (s *streamCacheImpl) onStreamAllocated(
Expand Down
1 change: 0 additions & 1 deletion core/node/events/stream_sync_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/gammazero/workerpool"

. "github.com/river-build/river/core/node/base"
"github.com/river-build/river/core/node/dlog"
. "github.com/river-build/river/core/node/protocol"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS streams_metadata;
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
CREATE TABLE IF NOT EXISTS streams_metadata (
stream_id CHAR(64) NOT NULL,
riverblock_num BIGINT NOT NULL,
riverblock_log_index BIGINT NOT NULL,
nodes CHAR(20)[] NOT NULL,
miniblock_hash CHAR(64) NOT NULL,
miniblock_num BIGINT NOT NULL,
is_sealed BOOL NOT NULL,
PRIMARY KEY (stream_id)
);
Loading

0 comments on commit 13ab76a

Please sign in to comment.