From 13ab76a59ccffd5c9f0247945cb019c86f172476 Mon Sep 17 00:00:00 2001 From: Bas van Kervel Date: Wed, 11 Dec 2024 08:45:19 +0100 Subject: [PATCH] node: store streams in local db and start from delta's instead of fetching all streams on boot --- core/node/events/stream_cache.go | 222 ++++++++++++++++-- core/node/events/stream_sync_task.go | 1 - ...007_create_streams_metadata_table.down.sql | 1 + ...00007_create_streams_metadata_table.up.sql | 10 + core/node/storage/pg_stream_store.go | 140 +++++++++++ core/node/storage/storage.go | 46 +++- 6 files changed, 395 insertions(+), 25 deletions(-) create mode 100644 core/node/storage/migrations/000007_create_streams_metadata_table.down.sql create mode 100644 core/node/storage/migrations/000007_create_streams_metadata_table.up.sql diff --git a/core/node/events/stream_cache.go b/core/node/events/stream_cache.go index 1f2f08a1e..59896f2ce 100644 --- a/core/node/events/stream_cache.go +++ b/core/node/events/stream_cache.go @@ -2,6 +2,9 @@ package events import ( "context" + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "math/big" "slices" "sync/atomic" "time" @@ -112,29 +115,18 @@ func NewStreamCache( } func (s *streamCacheImpl) Start(ctx context.Context) error { - // schedule sync tasks for all streams that are local to this node. - // these tasks sync up the local db with the latest block in the registry. - var localStreamResults []*registries.GetStreamResult - err := s.params.Registry.ForAllStreams( - ctx, - s.params.AppliedBlockNum, - func(stream *registries.GetStreamResult) bool { - if slices.Contains(stream.Nodes, s.params.Wallet.Address) { - localStreamResults = append(localStreamResults, stream) - } - return true - }, - ) + retrievedStreams, err := s.retrieveStreams(ctx) if err != nil { return err } - // load local streams in-memory cache + // load local streams in-memory cache and if enabled stream reconciliation is enabled + // create a sync task that syncs the local DB stream data with the streams registry. initialSyncWorkerPool := workerpool.New(s.params.Config.StreamReconciliation.InitialWorkerPoolSize) - for _, stream := range localStreamResults { + for streamID, stream := range retrievedStreams { si := &streamImpl{ params: s.params, - streamId: stream.StreamId, + streamId: streamID, lastAppliedBlockNum: s.params.AppliedBlockNum, local: &localStreamState{}, } @@ -144,10 +136,10 @@ func (s *streamCacheImpl) Start(ctx context.Context) error { s.submitSyncStreamTask( ctx, initialSyncWorkerPool, - stream.StreamId, + streamID, &MiniblockRef{ - Hash: stream.LastMiniblockHash, - Num: int64(stream.LastMiniblockNum), + Hash: stream.MiniblockHash, + Num: stream.MiniblockNumber, }, ) } @@ -156,9 +148,7 @@ func (s *streamCacheImpl) Start(ctx context.Context) error { s.appliedBlockNum.Store(uint64(s.params.AppliedBlockNum)) // Close initial worker pool after all tasks are executed. - go func() { - initialSyncWorkerPool.StopWait() - }() + go initialSyncWorkerPool.StopWait() // TODO: add buffered channel to avoid blocking ChainMonitor s.params.RiverChain.ChainMonitor.OnBlockWithLogs( @@ -177,6 +167,192 @@ func (s *streamCacheImpl) Start(ctx context.Context) error { return nil } +// retrieveStreams, either from persistent storage and apply delta since last integrated block. +// Or from the stream registry at s.params.AppliedBlockNum if there is no local state in persistent storage. +func (s *streamCacheImpl) retrieveStreams(ctx context.Context) (map[StreamId]*storage.StreamMetadata, error) { + // try to fetch latest streams state from the DB + streamsMetaData, lastBlock, err := s.params.Storage.AllStreamsMetaData(ctx) + if err != nil { + return nil, err + } + + var removed []StreamId // streams replaced away from this node + + if lastBlock == 0 { // first time, fetch from River chain + streamsMetaData, err = s.retrieveFromRiverChain(ctx) + } else { // retrieve stream updates since lastBlock and apply to streamsMetaDat + removed, err = s.applyDeltas(ctx, lastBlock, streamsMetaData) + } + + if err != nil { + return nil, err + } + + if err := s.params.Storage.UpdateStreamsMetaData(ctx, streamsMetaData, removed); err != nil { + return nil, WrapRiverError(Err_DB_OPERATION_FAILURE, err). + Func("NewStreamCache"). + Message("Unable to update stream metadata records in DB") + } + + return streamsMetaData, nil +} + +func (s *streamCacheImpl) retrieveFromRiverChain(ctx context.Context) (map[StreamId]*storage.StreamMetadata, error) { + streams := make(map[StreamId]*storage.StreamMetadata) + + err := s.params.Registry.ForAllStreams(ctx, s.params.AppliedBlockNum, + func(stream *registries.GetStreamResult) bool { + if slices.Contains(stream.Nodes, s.params.Wallet.Address) { + streams[stream.StreamId] = &storage.StreamMetadata{ + StreamId: stream.StreamId, + RiverChainBlockNumber: s.params.AppliedBlockNum.AsUint64(), + RiverChainBlockLogIndex: storage.RiverChainBlockLogIndexUnspecified, + Nodes: stream.Nodes, + MiniblockHash: stream.LastMiniblockHash, + MiniblockNumber: int64(stream.LastMiniblockNum), + } + } + return true + }) + + if err != nil { + return nil, err + } + + return streams, nil +} + +// applyDeltas applies deltas on the given streams between [lastBlock, params.AppliedBlockNum] +// from RiverChain streams registry. It returns a list of streams that are allocated or replaced +// to this node and a list of streams that are removed from this node. +func (s *streamCacheImpl) applyDeltas( + ctx context.Context, + lastBlock int64, + streams map[StreamId]*storage.StreamMetadata, +) (removals []StreamId, err error) { + if lastBlock > int64(s.params.AppliedBlockNum.AsUint64()) { + return nil, RiverError(Err_BAD_BLOCK_NUMBER, "Local database is ahead of River Chain"). + Func("loadStreamsUpdatesFromRiverChain"). + Tags("riverChainLastBlock", lastBlock, "appliedBlockNum", s.params.AppliedBlockNum) + } + + // fetch and apply changes that happened since latest sync + var ( + streamRegistryContract = s.params.Registry.StreamRegistry.BoundContract() + from = lastBlock + to = int64(s.params.AppliedBlockNum.AsUint64()) + query = ethereum.FilterQuery{ + Addresses: []common.Address{s.params.Registry.Address}, + Topics: [][]common.Hash{{ + s.params.Registry.StreamRegistryAbi.Events[river.Event_StreamAllocated].ID, + s.params.Registry.StreamRegistryAbi.Events[river.Event_StreamLastMiniblockUpdated].ID, + s.params.Registry.StreamRegistryAbi.Events[river.Event_StreamPlacementUpdated].ID, + }}, + } + maxBlockRange = int64(2000) + ) + + for from <= to { + toBlock := min(from+maxBlockRange, to) + + query.FromBlock = big.NewInt(from) + query.ToBlock = big.NewInt(toBlock) + + logs, err := s.params.RiverChain.Client.FilterLogs(ctx, query) + if err != nil { + return nil, WrapRiverError(Err_CANNOT_CALL_CONTRACT, err). + Message("Unable to fetch stream changes"). + Tags("from", from, "to", toBlock). + Func("retrieveFromDeltas") + } + + for _, log := range logs { + if len(log.Topics) == 0 { + continue + } + + switch log.Topics[0] { + case s.params.Registry.StreamRegistryAbi.Events[river.Event_StreamAllocated].ID: + event := new(river.StreamRegistryV1StreamAllocated) + if err := streamRegistryContract.UnpackLog(event, river.Event_StreamAllocated, log); err != nil { + return nil, WrapRiverError(Err_BAD_EVENT, err). + Tags("transaction", log.TxHash, "logIdx", log.Index). + Message("Unable to unpack stream allocated log"). + Func("retrieveFromDeltas") + } + + if slices.Contains(event.Nodes, s.params.Wallet.Address) { + streams[event.StreamId] = &storage.StreamMetadata{ + StreamId: event.StreamId, + RiverChainBlockNumber: log.BlockNumber, + RiverChainBlockLogIndex: log.Index, + Nodes: event.Nodes, + MiniblockHash: event.GenesisMiniblockHash, + MiniblockNumber: 0, + IsSealed: false, + } + } + + case s.params.Registry.StreamRegistryAbi.Events[river.Event_StreamLastMiniblockUpdated].ID: + event := new(river.StreamRegistryV1StreamLastMiniblockUpdated) + if err := streamRegistryContract.UnpackLog(event, river.Event_StreamLastMiniblockUpdated, log); err != nil { + return nil, WrapRiverError(Err_BAD_EVENT, err). + Tags("transaction", log.TxHash, "logIdx", log.Index). + Message("Unable to unpack stream last miniblock updated log"). + Func("retrieveFromDeltas") + } + + if stream, ok := streams[event.StreamId]; ok { + stream.MiniblockHash = common.BytesToHash(event.LastMiniblockHash[:]) + stream.MiniblockNumber = int64(event.LastMiniblockNum) + stream.IsSealed = event.IsSealed + } + + case s.params.Registry.StreamRegistryAbi.Events[river.Event_StreamPlacementUpdated].ID: + event := new(river.StreamRegistryV1StreamPlacementUpdated) + if err := streamRegistryContract.UnpackLog(event, river.Event_StreamPlacementUpdated, log); err != nil { + return nil, WrapRiverError(Err_BAD_EVENT, err). + Tags("transaction", log.TxHash, "logIdx", log.Index). + Message("Unable to unpack stream placement updated log"). + Func("retrieveFromDeltas") + } + + if s.params.Wallet.Address == event.NodeAddress { + if event.IsAdded { // stream was replaced to this node + retrievedStream, err := s.params.Registry.GetStream(ctx, event.StreamId, s.params.AppliedBlockNum) + if err != nil { + return nil, WrapRiverError(Err_BAD_EVENT, err). + Tags("stream", event.StreamId, "transaction", log.TxHash, "logIdx", log.Index). + Message("Unable to retrieve replaced stream"). + Func("retrieveFromDeltas") + } + + streams[event.StreamId] = &storage.StreamMetadata{ + StreamId: event.StreamId, + RiverChainBlockNumber: log.BlockNumber, + RiverChainBlockLogIndex: log.Index, + Nodes: retrievedStream.Nodes, + MiniblockHash: retrievedStream.LastMiniblockHash, + MiniblockNumber: int64(retrievedStream.LastMiniblockNum), + IsSealed: false, + } + + slices.DeleteFunc(removals, func(streamID StreamId) bool { + return streamID == event.StreamId + }) + } else { // stream was replaced away from this node + removals = append(removals, event.StreamId) + } + } + } + } + + from = toBlock + 1 + } + + return removals, nil +} + func (s *streamCacheImpl) onBlockWithLogs(ctx context.Context, blockNum crypto.BlockNumber, logs []*types.Log) { streamEvents, errs := s.params.Registry.FilterStreamEvents(ctx, logs) // Process parsed stream events even if some failed to parse @@ -200,6 +376,8 @@ func (s *streamCacheImpl) onBlockWithLogs(ctx context.Context, blockNum crypto.B } s.appliedBlockNum.Store(uint64(blockNum)) + + // TODO(BvK): update last block in DB for delta sync } func (s *streamCacheImpl) onStreamAllocated( diff --git a/core/node/events/stream_sync_task.go b/core/node/events/stream_sync_task.go index 9b1941125..c1fccde70 100644 --- a/core/node/events/stream_sync_task.go +++ b/core/node/events/stream_sync_task.go @@ -5,7 +5,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/gammazero/workerpool" - . "github.com/river-build/river/core/node/base" "github.com/river-build/river/core/node/dlog" . "github.com/river-build/river/core/node/protocol" diff --git a/core/node/storage/migrations/000007_create_streams_metadata_table.down.sql b/core/node/storage/migrations/000007_create_streams_metadata_table.down.sql new file mode 100644 index 000000000..7df64942f --- /dev/null +++ b/core/node/storage/migrations/000007_create_streams_metadata_table.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS streams_metadata; diff --git a/core/node/storage/migrations/000007_create_streams_metadata_table.up.sql b/core/node/storage/migrations/000007_create_streams_metadata_table.up.sql new file mode 100644 index 000000000..aaf36ecec --- /dev/null +++ b/core/node/storage/migrations/000007_create_streams_metadata_table.up.sql @@ -0,0 +1,10 @@ +CREATE TABLE IF NOT EXISTS streams_metadata ( + stream_id CHAR(64) NOT NULL, + riverblock_num BIGINT NOT NULL, + riverblock_log_index BIGINT NOT NULL, + nodes CHAR(20)[] NOT NULL, + miniblock_hash CHAR(64) NOT NULL, + miniblock_num BIGINT NOT NULL, + is_sealed BOOL NOT NULL, + PRIMARY KEY (stream_id) +); diff --git a/core/node/storage/pg_stream_store.go b/core/node/storage/pg_stream_store.go index 663ff9f9d..9091c36d3 100644 --- a/core/node/storage/pg_stream_store.go +++ b/core/node/storage/pg_stream_store.go @@ -1747,6 +1747,146 @@ func (s *PostgresStreamStore) getLastMiniblockNumberTx( return maxSeqNum, nil } +func (s *PostgresStreamStore) AllStreamsMetaData(ctx context.Context) (map[StreamId]*StreamMetadata, int64, error) { + var ( + ret map[StreamId]*StreamMetadata + lastBlock int64 + ) + + err := s.txRunnerWithUUIDCheck( + ctx, + "AllStreamsMetaData", + pgx.ReadOnly, + func(ctx context.Context, tx pgx.Tx) error { + var err error + ret, lastBlock, err = s.allStreamsMetaDataTx(ctx, tx) + return err + }, + nil, + ) + + if err != nil { + return nil, 0, err + } + + return ret, lastBlock, nil +} + +func (s *PostgresStreamStore) allStreamsMetaDataTx( + ctx context.Context, + tx pgx.Tx, +) (map[StreamId]*StreamMetadata, int64, error) { + streamMetaDataRows, err := tx.Query( + ctx, + `SELECT stream_id, riverblock_num, riverblock_log_index, nodes, miniblock_hash, miniblock_num FROM streams_metadata`, + ) + if err != nil { + return nil, 0, err + } + defer streamMetaDataRows.Close() + + var ( + lastBlock int64 + results = make(map[StreamId]*StreamMetadata) + ) + + for streamMetaDataRows.Next() { + var ( + streamIDStr string + riverBlockNum int64 + riverBlockLogIndex int64 + nodesStr []string + miniBlockHashStr string + miniBlockNum int64 + ) + + if err := streamMetaDataRows.Scan(&streamIDStr, &riverBlockNum, + &riverBlockLogIndex, &nodesStr, &miniBlockHashStr, &miniBlockNum); err != nil { + return nil, 0, err + } + + streamID, err := StreamIdFromString(streamIDStr) + if err != nil { + return nil, 0, WrapRiverError(Err_BAD_STREAM_ID, err). + Tag("streamId", streamIDStr). + Message("corrupt record in streams_metadata table") + } + + nodes := make([]common.Address, len(nodesStr)) + for i, nodeStr := range nodesStr { + nodes[i] = common.HexToAddress(nodeStr) + } + + lastBlock = max(lastBlock, riverBlockNum) + + results[streamID] = &StreamMetadata{ + StreamId: streamID, + RiverChainBlockNumber: uint64(riverBlockNum), + RiverChainBlockLogIndex: uint(riverBlockLogIndex), + Nodes: nodes, + MiniblockHash: common.HexToHash(miniBlockHashStr), + MiniblockNumber: miniBlockNum, + } + } + + return results, lastBlock, nil +} + +func (s *PostgresStreamStore) UpdateStreamsMetaData( + ctx context.Context, + streams map[StreamId]*StreamMetadata, + removals []StreamId, +) error { + return s.txRunnerWithUUIDCheck( + ctx, + "UpdateStreamsMetaData", + pgx.ReadWrite, + func(ctx context.Context, tx pgx.Tx) error { + return s.updateStreamsMetaDataTx(ctx, tx, streams, removals) + }, + nil, + ) +} + +func (s *PostgresStreamStore) updateStreamsMetaDataTx( + ctx context.Context, + tx pgx.Tx, + streams map[StreamId]*StreamMetadata, + removals []StreamId, +) error { + upserts := `INSERT INTO streams_metadata (stream_id, riverblock_num, riverblock_log_index, nodes, miniblock_hash, miniblock_num) + VALUES (@stream_id, @riverblock_num, @riverblock_log_index, @nodes, @miniblock_hash, @miniblock_num) + ON CONFLICT (stream_id) DO UPDATE SET riverblock_num = @riverblock_num, riverblock_log_index=@riverblock_log_index, nodes=@nodes, miniblock_hash=@miniblock_hash, miniblock_num=@miniblock_num` + remove := `DELETE FROM streams_metadata WHERE stream_id = $1` + + batch := &pgx.Batch{} + for _, stream := range streams { + var nodes []string + for addr := range slices.Values(nodes) { + nodes = append(nodes, addr) + } + + args := pgx.NamedArgs{ + "stream_id": stream.StreamId.String(), + "riverblock_num": stream.RiverChainBlockNumber, + "riverblock_log_index": stream.RiverChainBlockLogIndex, + "nodes": nodes, + "miniblock_hash": stream.MiniblockHash.String(), + "miniblock_num": stream.MiniblockNumber, + } + + batch.Queue(upserts, args) + } + + for _, streamID := range removals { + args := pgx.NamedArgs{"stream_id": streamID.String()} + batch.Queue(remove, args) + } + + results := tx.SendBatch(ctx, batch) + return results.Close() +} + func getCurrentNodeProcessInfo(currentSchemaName string) string { currentHostname, err := os.Hostname() if err != nil { diff --git a/core/node/storage/storage.go b/core/node/storage/storage.go index 1f685611f..5b2feb311 100644 --- a/core/node/storage/storage.go +++ b/core/node/storage/storage.go @@ -9,8 +9,9 @@ import ( ) const ( - StreamStorageTypePostgres = "postgres" - NotificationStorageTypePostgres = "postgres" + StreamStorageTypePostgres = "postgres" + NotificationStorageTypePostgres = "postgres" + RiverChainBlockLogIndexUnspecified = 9_999_999 ) type ReadStreamFromLastSnapshotResult struct { @@ -117,6 +118,23 @@ type StreamStorage interface { // GetLastMiniblockNumber returns the last miniblock number for the given stream from storage. GetLastMiniblockNumber(ctx context.Context, streamID StreamId) (int64, error) + // AllStreamsMetaData returns all available stream metadata. Stream metadata is a local copy + // of streams data as stored in the River chain stream registry. Its purpose is to provide a + // fast local cache of stream data allowing the node to only fetch changes that happened since + // the last time this local copy was updated. Each metadata record contains the River chain + // block number and log index that indicates when the last time the record was updated from + // chain. + AllStreamsMetaData(ctx context.Context) (map[StreamId]*StreamMetadata, int64, error) + + // UpdateStreamsMetaData updates for all given streams their metadata. + // This performs an upsert on the given `upserts` allowing for new streams to be inserted and + // existing streams to be updated. Records in the given `removals` collection are deleted. + UpdateStreamsMetaData( + ctx context.Context, + streams map[StreamId]*StreamMetadata, + removals []StreamId, + ) error + Close(ctx context.Context) } @@ -152,3 +170,27 @@ type DebugReadStreamDataResult struct { Events []EventDescriptor MbCandidates []MiniblockDescriptor } + +// StreamMetadata represents stream metadata. +type StreamMetadata struct { + // StreamId is the unique stream identifier + StreamId StreamId + // RiverChainBlockNumber contains the river chain block number from which the nodes and + // miniblock data is taken. + RiverChainBlockNumber uint64 + // RiverChainBlockLogIndex contains the transaction index from which the nodes and + // miniblock data is taken. Can be 9_999_999 in case the record was created for the + // first time from the state at RiverChainBlockNumber and not through an update from + // an event/log. + RiverChainBlockLogIndex uint + // Nodes that this stream is managed by. + Nodes []common.Address + // MiniblockHash contains the latest miniblock hash for this stream as locally known. + MiniblockHash common.Hash + // MiniblockNumber contains the latest miniblock number for this stream as locally known. + MiniblockNumber int64 + // IsSealed indicates that no more blocks can be added to the stream. + IsSealed bool + // dirty is an internal bool indicating if the record was changed + dirty bool +}