diff --git a/core/node/events/stream.go b/core/node/events/stream.go index 7e9d90ebd..35a0de178 100644 --- a/core/node/events/stream.go +++ b/core/node/events/stream.go @@ -191,6 +191,7 @@ func (s *streamImpl) ApplyMiniblock(ctx context.Context, miniblock *MiniblockInf // importMiniblocks imports the given miniblocks. func (s *streamImpl) importMiniblocks( ctx context.Context, + nodes []common.Address, miniblocks []*MiniblockInfo, ) error { if len(miniblocks) == 0 { @@ -199,11 +200,12 @@ func (s *streamImpl) importMiniblocks( s.mu.Lock() defer s.mu.Unlock() - return s.importMiniblocksLocked(ctx, miniblocks) + return s.importMiniblocksLocked(ctx, nodes, miniblocks) } func (s *streamImpl) importMiniblocksLocked( ctx context.Context, + nodes []common.Address, miniblocks []*MiniblockInfo, ) error { firstMbNum := miniblocks[0].Ref.Num @@ -222,7 +224,7 @@ func (s *streamImpl) importMiniblocksLocked( if s.view() == nil { // Do we have genesis miniblock? if miniblocks[0].Header().MiniblockNum == 0 { - err := s.initFromGenesis(ctx, miniblocks[0], blocksToWriteToStorage[0].Data) + err := s.initFromGenesis(ctx, nodes, miniblocks[0], blocksToWriteToStorage[0].Data) if err != nil { return err } @@ -413,6 +415,7 @@ func (s *streamImpl) schedulePromotionLocked(ctx context.Context, mb *MiniblockR func (s *streamImpl) initFromGenesis( ctx context.Context, + nodes []common.Address, genesisInfo *MiniblockInfo, genesisBytes []byte, ) error { @@ -432,7 +435,7 @@ func (s *streamImpl) initFromGenesis( Func("initFromGenesis") } - if err := s.params.Storage.CreateStreamStorage(ctx, s.streamId, genesisBytes); err != nil { + if err := s.params.Storage.CreateStreamStorage(ctx, s.streamId, nodes, registeredGenesisHash, genesisBytes); err != nil { // TODO: this error is not handle correctly here: if stream is in storage, caller of this initFromGenesis // should read it from storage. if AsRiverError(err).Code != Err_ALREADY_EXISTS { @@ -459,7 +462,7 @@ func (s *streamImpl) initFromGenesis( func (s *streamImpl) initFromBlockchain(ctx context.Context) error { // TODO: move this call out of the lock - record, _, mb, blockNum, err := s.params.Registry.GetStreamWithGenesis(ctx, s.streamId) + record, mbHash, mb, blockNum, err := s.params.Registry.GetStreamWithGenesis(ctx, s.streamId) if err != nil { return err } @@ -486,7 +489,7 @@ func (s *streamImpl) initFromBlockchain(ctx context.Context) error { ) } - err = s.params.Storage.CreateStreamStorage(ctx, s.streamId, mb) + err = s.params.Storage.CreateStreamStorage(ctx, s.streamId, record.Nodes, mbHash, mb) if err != nil { return err } @@ -946,7 +949,7 @@ func (s *streamImpl) tryApplyCandidate(ctx context.Context, mb *MiniblockInfo) ( if len(s.local.pendingCandidates) > 0 { pending := s.local.pendingCandidates[0] if mb.Ref.Num == pending.Num && mb.Ref.Hash == pending.Hash { - err = s.importMiniblocksLocked(ctx, []*MiniblockInfo{mb}) + err = s.importMiniblocksLocked(ctx, s.nodesLocked.GetNodes(), []*MiniblockInfo{mb}) if err != nil { return false, err } @@ -971,7 +974,7 @@ func (s *streamImpl) tryReadAndApplyCandidateLocked(ctx context.Context, mbRef * if err == nil { miniblock, err := NewMiniblockInfoFromBytes(miniblockBytes, mbRef.Num) if err == nil { - err = s.importMiniblocksLocked(ctx, []*MiniblockInfo{miniblock}) + err = s.importMiniblocksLocked(ctx, s.nodesLocked.GetNodes(), []*MiniblockInfo{miniblock}) if err == nil { return true } @@ -1035,6 +1038,9 @@ func (s *streamImpl) applyStreamEvents( if err != nil { dlog.FromCtx(ctx).Error("applyStreamEventsNoLock: failed to update nodes", "err", err, "streamId", s.streamId) } + if s.nodesLocked.IsLocal() { + // TODO: update streams metadata table + } default: dlog.FromCtx(ctx).Error("applyStreamEventsNoLock: unknown event", "event", event, "streamId", s.streamId) } diff --git a/core/node/events/stream_cache.go b/core/node/events/stream_cache.go index 59896f2ce..dc02ad6d2 100644 --- a/core/node/events/stream_cache.go +++ b/core/node/events/stream_cache.go @@ -2,18 +2,17 @@ package events import ( "context" - "github.com/ethereum/go-ethereum" - "github.com/ethereum/go-ethereum/common" "math/big" "slices" "sync/atomic" "time" + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/gammazero/workerpool" "github.com/prometheus/client_golang/prometheus" "github.com/puzpuzpuz/xsync/v3" - "github.com/river-build/river/core/config" "github.com/river-build/river/core/contracts/river" . "github.com/river-build/river/core/node/base" @@ -204,12 +203,10 @@ func (s *streamCacheImpl) retrieveFromRiverChain(ctx context.Context) (map[Strea func(stream *registries.GetStreamResult) bool { if slices.Contains(stream.Nodes, s.params.Wallet.Address) { streams[stream.StreamId] = &storage.StreamMetadata{ - StreamId: stream.StreamId, - RiverChainBlockNumber: s.params.AppliedBlockNum.AsUint64(), - RiverChainBlockLogIndex: storage.RiverChainBlockLogIndexUnspecified, - Nodes: stream.Nodes, - MiniblockHash: stream.LastMiniblockHash, - MiniblockNumber: int64(stream.LastMiniblockNum), + StreamId: stream.StreamId, + Nodes: stream.Nodes, + MiniblockHash: stream.LastMiniblockHash, + MiniblockNumber: int64(stream.LastMiniblockNum), } } return true @@ -227,19 +224,20 @@ func (s *streamCacheImpl) retrieveFromRiverChain(ctx context.Context) (map[Strea // to this node and a list of streams that are removed from this node. func (s *streamCacheImpl) applyDeltas( ctx context.Context, - lastBlock int64, + lastDBBlock int64, streams map[StreamId]*storage.StreamMetadata, ) (removals []StreamId, err error) { - if lastBlock > int64(s.params.AppliedBlockNum.AsUint64()) { + if lastDBBlock > int64(s.params.AppliedBlockNum.AsUint64()) { return nil, RiverError(Err_BAD_BLOCK_NUMBER, "Local database is ahead of River Chain"). Func("loadStreamsUpdatesFromRiverChain"). - Tags("riverChainLastBlock", lastBlock, "appliedBlockNum", s.params.AppliedBlockNum) + Tags("riverChainLastBlock", lastDBBlock, "appliedBlockNum", s.params.AppliedBlockNum) } // fetch and apply changes that happened since latest sync var ( + log = dlog.FromCtx(ctx) streamRegistryContract = s.params.Registry.StreamRegistry.BoundContract() - from = lastBlock + from = lastDBBlock + 1 to = int64(s.params.AppliedBlockNum.AsUint64()) query = ethereum.FilterQuery{ Addresses: []common.Address{s.params.Registry.Address}, @@ -249,104 +247,110 @@ func (s *streamCacheImpl) applyDeltas( s.params.Registry.StreamRegistryAbi.Events[river.Event_StreamPlacementUpdated].ID, }}, } - maxBlockRange = int64(2000) + maxBlockRange = int64(2000) // if too large the number of events in a single rpc call can become too big + retryCounter = 0 ) for from <= to { toBlock := min(from+maxBlockRange, to) - - query.FromBlock = big.NewInt(from) - query.ToBlock = big.NewInt(toBlock) + query.FromBlock, query.ToBlock = big.NewInt(from), big.NewInt(toBlock) logs, err := s.params.RiverChain.Client.FilterLogs(ctx, query) if err != nil { - return nil, WrapRiverError(Err_CANNOT_CALL_CONTRACT, err). - Message("Unable to fetch stream changes"). - Tags("from", from, "to", toBlock). - Func("retrieveFromDeltas") + log.Error("Unable to retrieve logs from RiverChain", "retry", retryCounter, "err", err) + + retryCounter++ + if retryCounter > 40 { + return nil, WrapRiverError(Err_CANNOT_CALL_CONTRACT, err). + Message("Unable to fetch stream changes"). + Tags("from", from, "to", toBlock). + Func("retrieveFromDeltas") + } + + select { + case <-time.After(3 * time.Second): + continue + case <-ctx.Done(): + return nil, ctx.Err() + } } - for _, log := range logs { - if len(log.Topics) == 0 { + for _, event := range logs { + if len(event.Topics) == 0 { continue } - switch log.Topics[0] { + switch event.Topics[0] { case s.params.Registry.StreamRegistryAbi.Events[river.Event_StreamAllocated].ID: - event := new(river.StreamRegistryV1StreamAllocated) - if err := streamRegistryContract.UnpackLog(event, river.Event_StreamAllocated, log); err != nil { - return nil, WrapRiverError(Err_BAD_EVENT, err). - Tags("transaction", log.TxHash, "logIdx", log.Index). - Message("Unable to unpack stream allocated log"). - Func("retrieveFromDeltas") + streamAllocatedEvent := new(river.StreamRegistryV1StreamAllocated) + if err := streamRegistryContract.UnpackLog(event, river.Event_StreamAllocated, event); err != nil { + log.Error("Unable to unpack StreamRegistryV1StreamAllocated event", + "transaction", event.TxHash, "logIdx", event.Index, "err", err) + continue } - if slices.Contains(event.Nodes, s.params.Wallet.Address) { - streams[event.StreamId] = &storage.StreamMetadata{ - StreamId: event.StreamId, - RiverChainBlockNumber: log.BlockNumber, - RiverChainBlockLogIndex: log.Index, - Nodes: event.Nodes, - MiniblockHash: event.GenesisMiniblockHash, - MiniblockNumber: 0, - IsSealed: false, + if slices.Contains(streamAllocatedEvent.Nodes, s.params.Wallet.Address) { + streams[streamAllocatedEvent.StreamId] = &storage.StreamMetadata{ + StreamId: streamAllocatedEvent.StreamId, + Nodes: streamAllocatedEvent.Nodes, + MiniblockHash: streamAllocatedEvent.GenesisMiniblockHash, + MiniblockNumber: 0, + IsSealed: false, } } case s.params.Registry.StreamRegistryAbi.Events[river.Event_StreamLastMiniblockUpdated].ID: - event := new(river.StreamRegistryV1StreamLastMiniblockUpdated) - if err := streamRegistryContract.UnpackLog(event, river.Event_StreamLastMiniblockUpdated, log); err != nil { - return nil, WrapRiverError(Err_BAD_EVENT, err). - Tags("transaction", log.TxHash, "logIdx", log.Index). - Message("Unable to unpack stream last miniblock updated log"). - Func("retrieveFromDeltas") + lastMiniblockUpdatedEvent := new(river.StreamRegistryV1StreamLastMiniblockUpdated) + if err := streamRegistryContract.UnpackLog(event, river.Event_StreamLastMiniblockUpdated, event); err != nil { + log.Error("Unable to unpack StreamRegistryV1StreamLastMiniblockUpdated event", + "transaction", event.TxHash, "logIdx", event.Index, "err", err) + continue } - if stream, ok := streams[event.StreamId]; ok { - stream.MiniblockHash = common.BytesToHash(event.LastMiniblockHash[:]) - stream.MiniblockNumber = int64(event.LastMiniblockNum) - stream.IsSealed = event.IsSealed + if stream, ok := streams[lastMiniblockUpdatedEvent.StreamId]; ok { + stream.MiniblockHash = common.BytesToHash(lastMiniblockUpdatedEvent.LastMiniblockHash[:]) + stream.MiniblockNumber = int64(lastMiniblockUpdatedEvent.LastMiniblockNum) + stream.IsSealed = lastMiniblockUpdatedEvent.IsSealed } case s.params.Registry.StreamRegistryAbi.Events[river.Event_StreamPlacementUpdated].ID: - event := new(river.StreamRegistryV1StreamPlacementUpdated) - if err := streamRegistryContract.UnpackLog(event, river.Event_StreamPlacementUpdated, log); err != nil { - return nil, WrapRiverError(Err_BAD_EVENT, err). - Tags("transaction", log.TxHash, "logIdx", log.Index). - Message("Unable to unpack stream placement updated log"). - Func("retrieveFromDeltas") + streamPlacementUpdatedEvent := new(river.StreamRegistryV1StreamPlacementUpdated) + if err := streamRegistryContract.UnpackLog(event, river.Event_StreamPlacementUpdated, event); err != nil { + log.Error("Unable to unpack StreamRegistryV1StreamPlacementUpdated event", + "transaction", event.TxHash, "logIdx", event.Index, "err", err) + continue } - if s.params.Wallet.Address == event.NodeAddress { - if event.IsAdded { // stream was replaced to this node - retrievedStream, err := s.params.Registry.GetStream(ctx, event.StreamId, s.params.AppliedBlockNum) + if s.params.Wallet.Address == streamPlacementUpdatedEvent.NodeAddress { + if streamPlacementUpdatedEvent.IsAdded { // stream was replaced to this node + retrievedStream, err := s.params.Registry.GetStream( + ctx, streamPlacementUpdatedEvent.StreamId, s.params.AppliedBlockNum) if err != nil { return nil, WrapRiverError(Err_BAD_EVENT, err). - Tags("stream", event.StreamId, "transaction", log.TxHash, "logIdx", log.Index). + Tags("stream", streamPlacementUpdatedEvent.StreamId, "transaction", event.TxHash, "logIdx", event.Index). Message("Unable to retrieve replaced stream"). Func("retrieveFromDeltas") } - streams[event.StreamId] = &storage.StreamMetadata{ - StreamId: event.StreamId, - RiverChainBlockNumber: log.BlockNumber, - RiverChainBlockLogIndex: log.Index, - Nodes: retrievedStream.Nodes, - MiniblockHash: retrievedStream.LastMiniblockHash, - MiniblockNumber: int64(retrievedStream.LastMiniblockNum), - IsSealed: false, + streams[streamPlacementUpdatedEvent.StreamId] = &storage.StreamMetadata{ + StreamId: streamPlacementUpdatedEvent.StreamId, + Nodes: retrievedStream.Nodes, + MiniblockHash: retrievedStream.LastMiniblockHash, + MiniblockNumber: int64(retrievedStream.LastMiniblockNum), + IsSealed: false, } slices.DeleteFunc(removals, func(streamID StreamId) bool { - return streamID == event.StreamId + return streamID == streamPlacementUpdatedEvent.StreamId }) } else { // stream was replaced away from this node - removals = append(removals, event.StreamId) + removals = append(removals, streamPlacementUpdatedEvent.StreamId) } } } } + retryCounter = 0 from = toBlock + 1 } @@ -394,10 +398,11 @@ func (s *streamCacheImpl) onStreamAllocated( lastAccessedTime: time.Now(), local: &localStreamState{}, } + stream.nodesLocked.Reset(event.Nodes, s.params.Wallet.Address) - stream, created, err := s.createStreamStorage(ctx, stream, event.GenesisMiniblock) + stream, created, err := s.createStreamStorage(ctx, stream, event.Nodes, event.GenesisMiniblockHash, event.GenesisMiniblock) if err != nil { - dlog.FromCtx(ctx).Error("Failed to allocate stream", "err", err, "streamId", stream.streamId) + dlog.FromCtx(ctx).Error("Failed to allocate stream", "err", err, "streamId", StreamId(event.StreamId)) } if created && len(otherEvents) > 0 { stream.applyStreamEvents(ctx, otherEvents, blockNum) @@ -477,7 +482,7 @@ func (s *streamCacheImpl) tryLoadStreamRecord( // Blockchain record is already created, but this fact is not reflected yet in local storage. // This may happen if somebody observes record allocation on blockchain and tries to get stream // while local storage is being initialized. - record, _, mb, blockNum, err := s.params.Registry.GetStreamWithGenesis(ctx, streamId) + record, mbHash, mb, blockNum, err := s.params.Registry.GetStreamWithGenesis(ctx, streamId) if err != nil { if !waitForLocal { return nil, err @@ -535,13 +540,15 @@ func (s *streamCacheImpl) tryLoadStreamRecord( ) } - stream, _, err = s.createStreamStorage(ctx, stream, mb) + stream, _, err = s.createStreamStorage(ctx, stream, record.Nodes, mbHash, mb) return stream, err } func (s *streamCacheImpl) createStreamStorage( ctx context.Context, stream *streamImpl, + nodes []common.Address, + mbHash common.Hash, mb []byte, ) (*streamImpl, bool, error) { // Lock stream, so parallel creators have to wait for the stream to be intialized. @@ -552,7 +559,7 @@ func (s *streamCacheImpl) createStreamStorage( // TODO: delete entry on failures below? // Our stream won the race, put into storage. - err := s.params.Storage.CreateStreamStorage(ctx, stream.streamId, mb) + err := s.params.Storage.CreateStreamStorage(ctx, stream.streamId, nodes, mbHash, mb) if err != nil { if AsRiverError(err).Code == Err_ALREADY_EXISTS { // Attempt to load stream from storage. Might as well do it while under lock. diff --git a/core/node/events/stream_sync_task.go b/core/node/events/stream_sync_task.go index c1fccde70..b600fbeb3 100644 --- a/core/node/events/stream_sync_task.go +++ b/core/node/events/stream_sync_task.go @@ -123,7 +123,7 @@ func (s *streamCacheImpl) syncStreamFromSinglePeer( mbs[i] = mb } - err = stream.importMiniblocks(ctx, mbs) + err = stream.importMiniblocks(ctx, stream.nodesLocked.GetNodes(), mbs) if err != nil { return currentFromInclusive, err } diff --git a/core/node/storage/migrations/000007_create_streams_metadata_table.up.sql b/core/node/storage/migrations/000007_create_streams_metadata_table.up.sql index aaf36ecec..272db8e29 100644 --- a/core/node/storage/migrations/000007_create_streams_metadata_table.up.sql +++ b/core/node/storage/migrations/000007_create_streams_metadata_table.up.sql @@ -1,8 +1,6 @@ CREATE TABLE IF NOT EXISTS streams_metadata ( stream_id CHAR(64) NOT NULL, - riverblock_num BIGINT NOT NULL, - riverblock_log_index BIGINT NOT NULL, - nodes CHAR(20)[] NOT NULL, + nodes CHAR(40)[] NOT NULL, miniblock_hash CHAR(64) NOT NULL, miniblock_num BIGINT NOT NULL, is_sealed BOOL NOT NULL, diff --git a/core/node/storage/pg_stream_store.go b/core/node/storage/pg_stream_store.go index 9091c36d3..7e5f6ef72 100644 --- a/core/node/storage/pg_stream_store.go +++ b/core/node/storage/pg_stream_store.go @@ -254,6 +254,8 @@ func (s *PostgresStreamStore) sqlForStream(sql string, streamId StreamId) string func (s *PostgresStreamStore) CreateStreamStorage( ctx context.Context, streamId StreamId, + nodes []common.Address, + genesisMiniblockHash common.Hash, genesisMiniblock []byte, ) error { return s.txRunnerWithUUIDCheck( @@ -261,7 +263,7 @@ func (s *PostgresStreamStore) CreateStreamStorage( "CreateStreamStorage", pgx.ReadWrite, func(ctx context.Context, tx pgx.Tx) error { - return s.createStreamStorageTx(ctx, tx, streamId, genesisMiniblock) + return s.createStreamStorageTx(ctx, tx, streamId, nodes, genesisMiniblockHash, genesisMiniblock) }, nil, "streamId", streamId, @@ -305,16 +307,28 @@ func (s *PostgresStreamStore) createStreamStorageTx( ctx context.Context, tx pgx.Tx, streamId StreamId, + nodes []common.Address, + genesisMiniblockHash common.Hash, genesisMiniblock []byte, ) error { sql := s.sqlForStream( ` INSERT INTO es (stream_id, latest_snapshot_miniblock, migrated) VALUES ($1, 0, true); - INSERT INTO {{miniblocks}} (stream_id, seq_num, blockdata) VALUES ($1, 0, $2); - INSERT INTO {{minipools}} (stream_id, generation, slot_num) VALUES ($1, 1, -1);`, + INSERT INTO {{miniblocks}} (stream_id, seq_num, blockdata) VALUES ($1, 0, $4); + INSERT INTO {{minipools}} (stream_id, generation, slot_num) VALUES ($1, 1, -1); + INSERT INTO streams_metadata (stream_id, nodes, miniblock_hash, miniblock_num, is_sealed) VALUES ($1, $2, $3, 0, false) ON CONFLICT (stream_id) DO NOTHING;`, + streamId, ) - _, err := tx.Exec(ctx, sql, streamId, genesisMiniblock) + + nodeAddrs := make([]string, 0, len(nodes)) + for _, node := range nodes { + nodeAddrs = append(nodeAddrs, hex.EncodeToString(node[:])) + } + + hash := hex.EncodeToString(genesisMiniblockHash[:]) + + _, err := tx.Exec(ctx, sql, streamId, nodeAddrs, hash, genesisMiniblock) if err != nil { if pgerr, ok := err.(*pgconn.PgError); ok && pgerr.Code == pgerrcode.UniqueViolation { return WrapRiverError(Err_ALREADY_EXISTS, err).Message("stream already exists") @@ -1189,15 +1203,21 @@ func (s *PostgresStreamStore) writeMiniblocksTx( ) } + lastMbNumberInMiniblocks := miniblocks[len(miniblocks)-1].Number + lastMbHashInMiniblocks := miniblocks[len(miniblocks)-1].Hash + // Insert -1 marker and all new minipool events into minipool. _, err = tx.Exec( ctx, s.sqlForStream( - "INSERT INTO {{minipools}} (stream_id, generation, slot_num) VALUES ($1, $2, -1)", + `INSERT INTO {{minipools}} (stream_id, generation, slot_num) VALUES ($1, $2, -1); + UPDATE streams_metadata set miniblock_hash=$3, miniblock_num=$4 WHERE stream_id = $1 `, streamId, ), streamId, newMinipoolGeneration, + hex.EncodeToString(lastMbHashInMiniblocks[:]), + lastMbNumberInMiniblocks, ) if err != nil { return err @@ -1778,7 +1798,7 @@ func (s *PostgresStreamStore) allStreamsMetaDataTx( ) (map[StreamId]*StreamMetadata, int64, error) { streamMetaDataRows, err := tx.Query( ctx, - `SELECT stream_id, riverblock_num, riverblock_log_index, nodes, miniblock_hash, miniblock_num FROM streams_metadata`, + `SELECT stream_id, nodes, miniblock_hash, miniblock_num FROM streams_metadata`, ) if err != nil { return nil, 0, err @@ -1820,12 +1840,10 @@ func (s *PostgresStreamStore) allStreamsMetaDataTx( lastBlock = max(lastBlock, riverBlockNum) results[streamID] = &StreamMetadata{ - StreamId: streamID, - RiverChainBlockNumber: uint64(riverBlockNum), - RiverChainBlockLogIndex: uint(riverBlockLogIndex), - Nodes: nodes, - MiniblockHash: common.HexToHash(miniBlockHashStr), - MiniblockNumber: miniBlockNum, + StreamId: streamID, + Nodes: nodes, + MiniblockHash: common.HexToHash(miniBlockHashStr), + MiniblockNumber: miniBlockNum, } } @@ -1854,32 +1872,30 @@ func (s *PostgresStreamStore) updateStreamsMetaDataTx( streams map[StreamId]*StreamMetadata, removals []StreamId, ) error { - upserts := `INSERT INTO streams_metadata (stream_id, riverblock_num, riverblock_log_index, nodes, miniblock_hash, miniblock_num) - VALUES (@stream_id, @riverblock_num, @riverblock_log_index, @nodes, @miniblock_hash, @miniblock_num) - ON CONFLICT (stream_id) DO UPDATE SET riverblock_num = @riverblock_num, riverblock_log_index=@riverblock_log_index, nodes=@nodes, miniblock_hash=@miniblock_hash, miniblock_num=@miniblock_num` + upserts := `INSERT INTO streams_metadata (stream_id, nodes, miniblock_hash, miniblock_num, is_sealed) + VALUES (@stream_id, @nodes, @miniblock_hash, @miniblock_num, @is_sealed) + ON CONFLICT (stream_id) DO UPDATE SET nodes=@nodes, miniblock_hash=@miniblock_hash, miniblock_num=@miniblock_num` remove := `DELETE FROM streams_metadata WHERE stream_id = $1` batch := &pgx.Batch{} for _, stream := range streams { - var nodes []string - for addr := range slices.Values(nodes) { - nodes = append(nodes, addr) + nodes := make([]string, len(stream.Nodes)) + for i, node := range stream.Nodes { + nodes[i] = hex.EncodeToString(node[:0]) } - args := pgx.NamedArgs{ - "stream_id": stream.StreamId.String(), - "riverblock_num": stream.RiverChainBlockNumber, - "riverblock_log_index": stream.RiverChainBlockLogIndex, - "nodes": nodes, - "miniblock_hash": stream.MiniblockHash.String(), - "miniblock_num": stream.MiniblockNumber, + "stream_id": stream.StreamId, + "nodes": nodes, + "miniblock_hash": hex.EncodeToString(stream.MiniblockHash[:]), + "miniblock_num": stream.MiniblockNumber, + "is_sealed": stream.IsSealed, } batch.Queue(upserts, args) } for _, streamID := range removals { - args := pgx.NamedArgs{"stream_id": streamID.String()} + args := pgx.NamedArgs{"stream_id": streamID} batch.Queue(remove, args) } diff --git a/core/node/storage/pg_stream_store_test.go b/core/node/storage/pg_stream_store_test.go index 189eef811..11361385a 100644 --- a/core/node/storage/pg_stream_store_test.go +++ b/core/node/storage/pg_stream_store_test.go @@ -8,9 +8,6 @@ import ( "time" "github.com/ethereum/go-ethereum/common" - "github.com/stretchr/testify/require" - "golang.org/x/exp/rand" - "github.com/river-build/river/core/config" . "github.com/river-build/river/core/node/base" "github.com/river-build/river/core/node/base/test" @@ -19,6 +16,8 @@ import ( . "github.com/river-build/river/core/node/shared" "github.com/river-build/river/core/node/testutils" "github.com/river-build/river/core/node/testutils/dbtestutils" + "github.com/stretchr/testify/require" + "golang.org/x/exp/rand" ) type testStreamStoreParams struct { @@ -128,7 +127,7 @@ func TestPostgresStreamStore(t *testing.T) { // Test that created stream will have proper genesis miniblock genesisMiniblock := []byte("genesisMiniblock") - err = pgStreamStore.CreateStreamStorage(ctx, streamId1, genesisMiniblock) + err = pgStreamStore.CreateStreamStorage(ctx, streamId1, []common.Address{}, common.Hash{1, 2, 3}, genesisMiniblock) require.NoError(err) streamsNumber, err = pgStreamStore.GetStreamsNumber(ctx) @@ -147,11 +146,11 @@ func TestPostgresStreamStore(t *testing.T) { // Test that we cannot add second stream with same id genesisMiniblock2 := []byte("genesisMiniblock2") - err = pgStreamStore.CreateStreamStorage(ctx, streamId1, genesisMiniblock2) + err = pgStreamStore.CreateStreamStorage(ctx, streamId1, []common.Address{}, common.Hash{1, 2, 3}, genesisMiniblock2) require.Error(err) // Test that we can add second stream and then GetStreams will return both - err = pgStreamStore.CreateStreamStorage(ctx, streamId2, genesisMiniblock2) + err = pgStreamStore.CreateStreamStorage(ctx, streamId2, []common.Address{}, common.Hash{1, 2, 3}, genesisMiniblock2) require.NoError(err) streams, err := pgStreamStore.GetStreams(ctx) @@ -160,7 +159,7 @@ func TestPostgresStreamStore(t *testing.T) { // Test that we can delete stream and proper stream will be deleted genesisMiniblock3 := []byte("genesisMiniblock3") - err = pgStreamStore.CreateStreamStorage(ctx, streamId3, genesisMiniblock3) + err = pgStreamStore.CreateStreamStorage(ctx, streamId3, []common.Address{}, common.Hash{1, 2, 3}, genesisMiniblock3) require.NoError(err) err = pgStreamStore.DeleteStream(ctx, streamId2) @@ -256,7 +255,7 @@ func TestPromoteMiniblockCandidate(t *testing.T) { // Add candidate from another stream. This candidate should be untouched by the delete when a // candidate from the first stream is promoted. genesisMiniblock := []byte("genesisMiniblock") - _ = pgStreamStore.CreateStreamStorage(ctx, streamId2, genesisMiniblock) + _ = pgStreamStore.CreateStreamStorage(ctx, streamId2, []common.Address{}, common.Hash{1, 2, 3}, genesisMiniblock) err = pgStreamStore.WriteMiniblockCandidate(ctx, streamId2, candidateHash, 1, []byte("some bytes")) require.NoError(err) @@ -316,7 +315,7 @@ func TestPromoteMiniblockCandidate(t *testing.T) { func prepareTestDataForAddEventConsistencyCheck(ctx context.Context, s *PostgresStreamStore, streamId StreamId) { genesisMiniblock := []byte("genesisMiniblock") - _ = s.CreateStreamStorage(ctx, streamId, genesisMiniblock) + _ = s.CreateStreamStorage(ctx, streamId, []common.Address{}, common.Hash{1, 2, 3}, genesisMiniblock) _ = s.WriteEvent(ctx, streamId, 1, 0, []byte("event1")) _ = s.WriteEvent(ctx, streamId, 1, 1, []byte("event2")) _ = s.WriteEvent(ctx, streamId, 1, 2, []byte("event3")) @@ -428,7 +427,7 @@ func TestCreateBlockProposalConsistencyChecksProperNewMinipoolGeneration(t *test streamId := testutils.FakeStreamId(STREAM_CHANNEL_BIN) genesisMiniblock := []byte("genesisMiniblock") - _ = pgStreamStore.CreateStreamStorage(ctx, streamId, genesisMiniblock) + _ = pgStreamStore.CreateStreamStorage(ctx, streamId, []common.Address{}, common.Hash{1, 2, 3}, genesisMiniblock) var testEnvelopes1 [][]byte testEnvelopes1 = append(testEnvelopes1, []byte("event1")) @@ -460,7 +459,7 @@ func TestPromoteBlockConsistencyChecksProperNewMinipoolGeneration(t *testing.T) streamId := testutils.FakeStreamId(STREAM_CHANNEL_BIN) genesisMiniblock := []byte("genesisMiniblock") - _ = pgStreamStore.CreateStreamStorage(ctx, streamId, genesisMiniblock) + _ = pgStreamStore.CreateStreamStorage(ctx, streamId, []common.Address{}, common.Hash{1, 2, 3}, genesisMiniblock) var testEnvelopes1 [][]byte testEnvelopes1 = append(testEnvelopes1, []byte("event1")) @@ -505,7 +504,7 @@ func TestCreateBlockProposalNoSuchStreamError(t *testing.T) { streamId := testutils.FakeStreamId(STREAM_CHANNEL_BIN) genesisMiniblock := []byte("genesisMiniblock") - _ = pgStreamStore.CreateStreamStorage(ctx, streamId, genesisMiniblock) + _ = pgStreamStore.CreateStreamStorage(ctx, streamId, []common.Address{}, common.Hash{1, 2, 3}, genesisMiniblock) _, _ = pgStreamStore.pool.Exec( ctx, @@ -537,7 +536,7 @@ func TestPromoteBlockNoSuchStreamError(t *testing.T) { streamId := testutils.FakeStreamId(STREAM_CHANNEL_BIN) genesisMiniblock := []byte("genesisMiniblock") - _ = pgStreamStore.CreateStreamStorage(ctx, streamId, genesisMiniblock) + _ = pgStreamStore.CreateStreamStorage(ctx, streamId, []common.Address{}, common.Hash{1, 2, 3}, genesisMiniblock) var testEnvelopes1 [][]byte testEnvelopes1 = append(testEnvelopes1, []byte("event1")) @@ -571,7 +570,7 @@ func TestExitIfSecondStorageCreated(t *testing.T) { genesisMiniblock := []byte("genesisMiniblock") streamId := testutils.FakeStreamId(STREAM_CHANNEL_BIN) - err := pgStreamStore.CreateStreamStorage(ctx, streamId, genesisMiniblock) + err := pgStreamStore.CreateStreamStorage(ctx, streamId, []common.Address{}, common.Hash{1, 2, 3}, genesisMiniblock) require.NoError(err) pool, err := CreateAndValidatePgxPool( @@ -616,7 +615,7 @@ func TestGetStreamFromLastSnapshotConsistencyChecksMissingBlockFailure(t *testin streamId := testutils.FakeStreamId(STREAM_CHANNEL_BIN) genesisMiniblock := []byte("genesisMiniblock") - _ = pgStreamStore.CreateStreamStorage(ctx, streamId, genesisMiniblock) + _ = pgStreamStore.CreateStreamStorage(ctx, streamId, []common.Address{}, common.Hash{1, 2, 3}, genesisMiniblock) var testEnvelopes1 [][]byte testEnvelopes1 = append(testEnvelopes1, []byte("event1")) var testEnvelopes2 [][]byte @@ -700,7 +699,7 @@ func TestGetStreamFromLastSnapshotConsistencyCheckWrongEnvelopeGeneration(t *tes streamId := testutils.FakeStreamId(STREAM_CHANNEL_BIN) genesisMiniblock := []byte("genesisMiniblock") - _ = pgStreamStore.CreateStreamStorage(ctx, streamId, genesisMiniblock) + _ = pgStreamStore.CreateStreamStorage(ctx, streamId, []common.Address{}, common.Hash{1, 2, 3}, genesisMiniblock) var testEnvelopes1 [][]byte testEnvelopes1 = append(testEnvelopes1, []byte("event1")) @@ -765,7 +764,7 @@ func TestGetStreamFromLastSnapshotConsistencyCheckNoZeroIndexEnvelope(t *testing streamId := testutils.FakeStreamId(STREAM_CHANNEL_BIN) genesisMiniblock := []byte("genesisMiniblock") - _ = pgStreamStore.CreateStreamStorage(ctx, streamId, genesisMiniblock) + _ = pgStreamStore.CreateStreamStorage(ctx, streamId, []common.Address{}, common.Hash{1, 2, 3}, genesisMiniblock) var testEnvelopes1 [][]byte testEnvelopes1 = append(testEnvelopes1, []byte("event1")) @@ -831,7 +830,7 @@ func TestGetStreamFromLastSnapshotConsistencyCheckGapInEnvelopesIndexes(t *testi streamId := testutils.FakeStreamId(STREAM_CHANNEL_BIN) genesisMiniblock := []byte("genesisMiniblock") - _ = pgStreamStore.CreateStreamStorage(ctx, streamId, genesisMiniblock) + _ = pgStreamStore.CreateStreamStorage(ctx, streamId, []common.Address{}, common.Hash{1, 2, 3}, genesisMiniblock) var testEnvelopes1 [][]byte testEnvelopes1 = append(testEnvelopes1, []byte("event1")) @@ -897,7 +896,7 @@ func TestGetMiniblocksConsistencyChecks(t *testing.T) { streamId := testutils.FakeStreamId(STREAM_CHANNEL_BIN) genesisMiniblock := []byte("genesisMiniblock") - _ = pgStreamStore.CreateStreamStorage(ctx, streamId, genesisMiniblock) + _ = pgStreamStore.CreateStreamStorage(ctx, streamId, []common.Address{}, common.Hash{1, 2, 3}, genesisMiniblock) var testEnvelopes1 [][]byte testEnvelopes1 = append(testEnvelopes1, []byte("event1")) @@ -980,10 +979,10 @@ func TestAlreadyExists(t *testing.T) { streamId := testutils.FakeStreamId(STREAM_CHANNEL_BIN) genesisMiniblock := []byte("genesisMiniblock") - err := pgStreamStore.CreateStreamStorage(ctx, streamId, genesisMiniblock) + err := pgStreamStore.CreateStreamStorage(ctx, streamId, []common.Address{}, common.Hash{1, 2, 3}, genesisMiniblock) require.NoError(err) - err = pgStreamStore.CreateStreamStorage(ctx, streamId, genesisMiniblock) + err = pgStreamStore.CreateStreamStorage(ctx, streamId, []common.Address{}, common.Hash{1, 2, 3}, genesisMiniblock) require.Equal(Err_ALREADY_EXISTS, AsRiverError(err).Code) } @@ -1070,7 +1069,7 @@ func TestReadStreamFromLastSnapshot(t *testing.T) { genMB, _ := dataMaker.mb() mbs := [][]byte{genMB} - require.NoError(store.CreateStreamStorage(ctx, streamId, genMB)) + require.NoError(store.CreateStreamStorage(ctx, streamId, []common.Address{}, common.Hash{1, 2, 3}, genMB)) mb1, h1 := dataMaker.mb() mbs = append(mbs, mb1) @@ -1139,8 +1138,8 @@ func TestQueryPlan(t *testing.T) { var candHash common.Hash for range 20 { streamId = testutils.FakeStreamId(STREAM_CHANNEL_BIN) - genMB, _ := dataMaker.mb() - require.NoError(store.CreateStreamStorage(ctx, streamId, genMB)) + genMB, getMbHash := dataMaker.mb() + require.NoError(store.CreateStreamStorage(ctx, streamId, []common.Address{}, getMbHash, genMB)) require.NoError(store.WriteMiniblocks(ctx, streamId, dataMaker.mbs(1, 10), 11, dataMaker.events(10), 1, 0)) diff --git a/core/node/storage/storage.go b/core/node/storage/storage.go index 5b2feb311..619b5cf33 100644 --- a/core/node/storage/storage.go +++ b/core/node/storage/storage.go @@ -25,7 +25,7 @@ type StreamStorage interface { // CreateStreamStorage creates a new stream with the given genesis miniblock at index 0. // Last snapshot minblock index is set to 0. // Minipool is set to generation number 1 (i.e. number of miniblock that is going to be produced next) and is empty. - CreateStreamStorage(ctx context.Context, streamId StreamId, genesisMiniblock []byte) error + CreateStreamStorage(ctx context.Context, streamId StreamId, nodes []common.Address, genesisMiniblockHash common.Hash, genesisMiniblock []byte) error // ReadStreamFromLastSnapshot reads last stream miniblocks and guarantees that last snapshot miniblock is included. // It attempts to read at least numToRead miniblocks, but may return less if there are not enough miniblocks in storage, @@ -175,14 +175,6 @@ type DebugReadStreamDataResult struct { type StreamMetadata struct { // StreamId is the unique stream identifier StreamId StreamId - // RiverChainBlockNumber contains the river chain block number from which the nodes and - // miniblock data is taken. - RiverChainBlockNumber uint64 - // RiverChainBlockLogIndex contains the transaction index from which the nodes and - // miniblock data is taken. Can be 9_999_999 in case the record was created for the - // first time from the state at RiverChainBlockNumber and not through an update from - // an event/log. - RiverChainBlockLogIndex uint // Nodes that this stream is managed by. Nodes []common.Address // MiniblockHash contains the latest miniblock hash for this stream as locally known.