Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
bas-vk committed Dec 11, 2024
1 parent 13ab76a commit 589d13c
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 144 deletions.
20 changes: 13 additions & 7 deletions core/node/events/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ func (s *streamImpl) ApplyMiniblock(ctx context.Context, miniblock *MiniblockInf
// importMiniblocks imports the given miniblocks.
func (s *streamImpl) importMiniblocks(
ctx context.Context,
nodes []common.Address,
miniblocks []*MiniblockInfo,
) error {
if len(miniblocks) == 0 {
Expand All @@ -199,11 +200,12 @@ func (s *streamImpl) importMiniblocks(

s.mu.Lock()
defer s.mu.Unlock()
return s.importMiniblocksLocked(ctx, miniblocks)
return s.importMiniblocksLocked(ctx, nodes, miniblocks)
}

func (s *streamImpl) importMiniblocksLocked(
ctx context.Context,
nodes []common.Address,
miniblocks []*MiniblockInfo,
) error {
firstMbNum := miniblocks[0].Ref.Num
Expand All @@ -222,7 +224,7 @@ func (s *streamImpl) importMiniblocksLocked(
if s.view() == nil {
// Do we have genesis miniblock?
if miniblocks[0].Header().MiniblockNum == 0 {
err := s.initFromGenesis(ctx, miniblocks[0], blocksToWriteToStorage[0].Data)
err := s.initFromGenesis(ctx, nodes, miniblocks[0], blocksToWriteToStorage[0].Data)
if err != nil {
return err
}
Expand Down Expand Up @@ -413,6 +415,7 @@ func (s *streamImpl) schedulePromotionLocked(ctx context.Context, mb *MiniblockR

func (s *streamImpl) initFromGenesis(
ctx context.Context,
nodes []common.Address,
genesisInfo *MiniblockInfo,
genesisBytes []byte,
) error {
Expand All @@ -432,7 +435,7 @@ func (s *streamImpl) initFromGenesis(
Func("initFromGenesis")
}

if err := s.params.Storage.CreateStreamStorage(ctx, s.streamId, genesisBytes); err != nil {
if err := s.params.Storage.CreateStreamStorage(ctx, s.streamId, nodes, registeredGenesisHash, genesisBytes); err != nil {
// TODO: this error is not handle correctly here: if stream is in storage, caller of this initFromGenesis
// should read it from storage.
if AsRiverError(err).Code != Err_ALREADY_EXISTS {
Expand All @@ -459,7 +462,7 @@ func (s *streamImpl) initFromGenesis(

func (s *streamImpl) initFromBlockchain(ctx context.Context) error {
// TODO: move this call out of the lock
record, _, mb, blockNum, err := s.params.Registry.GetStreamWithGenesis(ctx, s.streamId)
record, mbHash, mb, blockNum, err := s.params.Registry.GetStreamWithGenesis(ctx, s.streamId)
if err != nil {
return err
}
Expand All @@ -486,7 +489,7 @@ func (s *streamImpl) initFromBlockchain(ctx context.Context) error {
)
}

err = s.params.Storage.CreateStreamStorage(ctx, s.streamId, mb)
err = s.params.Storage.CreateStreamStorage(ctx, s.streamId, record.Nodes, mbHash, mb)
if err != nil {
return err
}
Expand Down Expand Up @@ -946,7 +949,7 @@ func (s *streamImpl) tryApplyCandidate(ctx context.Context, mb *MiniblockInfo) (
if len(s.local.pendingCandidates) > 0 {
pending := s.local.pendingCandidates[0]
if mb.Ref.Num == pending.Num && mb.Ref.Hash == pending.Hash {
err = s.importMiniblocksLocked(ctx, []*MiniblockInfo{mb})
err = s.importMiniblocksLocked(ctx, s.nodesLocked.GetNodes(), []*MiniblockInfo{mb})
if err != nil {
return false, err
}
Expand All @@ -971,7 +974,7 @@ func (s *streamImpl) tryReadAndApplyCandidateLocked(ctx context.Context, mbRef *
if err == nil {
miniblock, err := NewMiniblockInfoFromBytes(miniblockBytes, mbRef.Num)
if err == nil {
err = s.importMiniblocksLocked(ctx, []*MiniblockInfo{miniblock})
err = s.importMiniblocksLocked(ctx, s.nodesLocked.GetNodes(), []*MiniblockInfo{miniblock})
if err == nil {
return true
}
Expand Down Expand Up @@ -1035,6 +1038,9 @@ func (s *streamImpl) applyStreamEvents(
if err != nil {
dlog.FromCtx(ctx).Error("applyStreamEventsNoLock: failed to update nodes", "err", err, "streamId", s.streamId)
}
if s.nodesLocked.IsLocal() {
// TODO: update streams metadata table
}
default:
dlog.FromCtx(ctx).Error("applyStreamEventsNoLock: unknown event", "event", event, "streamId", s.streamId)
}
Expand Down
155 changes: 81 additions & 74 deletions core/node/events/stream_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,17 @@ package events

import (
"context"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"math/big"
"slices"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/gammazero/workerpool"
"github.com/prometheus/client_golang/prometheus"
"github.com/puzpuzpuz/xsync/v3"

"github.com/river-build/river/core/config"
"github.com/river-build/river/core/contracts/river"
. "github.com/river-build/river/core/node/base"
Expand Down Expand Up @@ -204,12 +203,10 @@ func (s *streamCacheImpl) retrieveFromRiverChain(ctx context.Context) (map[Strea
func(stream *registries.GetStreamResult) bool {
if slices.Contains(stream.Nodes, s.params.Wallet.Address) {
streams[stream.StreamId] = &storage.StreamMetadata{
StreamId: stream.StreamId,
RiverChainBlockNumber: s.params.AppliedBlockNum.AsUint64(),
RiverChainBlockLogIndex: storage.RiverChainBlockLogIndexUnspecified,
Nodes: stream.Nodes,
MiniblockHash: stream.LastMiniblockHash,
MiniblockNumber: int64(stream.LastMiniblockNum),
StreamId: stream.StreamId,
Nodes: stream.Nodes,
MiniblockHash: stream.LastMiniblockHash,
MiniblockNumber: int64(stream.LastMiniblockNum),
}
}
return true
Expand All @@ -227,19 +224,20 @@ func (s *streamCacheImpl) retrieveFromRiverChain(ctx context.Context) (map[Strea
// to this node and a list of streams that are removed from this node.
func (s *streamCacheImpl) applyDeltas(
ctx context.Context,
lastBlock int64,
lastDBBlock int64,
streams map[StreamId]*storage.StreamMetadata,
) (removals []StreamId, err error) {
if lastBlock > int64(s.params.AppliedBlockNum.AsUint64()) {
if lastDBBlock > int64(s.params.AppliedBlockNum.AsUint64()) {
return nil, RiverError(Err_BAD_BLOCK_NUMBER, "Local database is ahead of River Chain").
Func("loadStreamsUpdatesFromRiverChain").
Tags("riverChainLastBlock", lastBlock, "appliedBlockNum", s.params.AppliedBlockNum)
Tags("riverChainLastBlock", lastDBBlock, "appliedBlockNum", s.params.AppliedBlockNum)
}

// fetch and apply changes that happened since latest sync
var (
log = dlog.FromCtx(ctx)
streamRegistryContract = s.params.Registry.StreamRegistry.BoundContract()
from = lastBlock
from = lastDBBlock + 1
to = int64(s.params.AppliedBlockNum.AsUint64())
query = ethereum.FilterQuery{
Addresses: []common.Address{s.params.Registry.Address},
Expand All @@ -249,104 +247,110 @@ func (s *streamCacheImpl) applyDeltas(
s.params.Registry.StreamRegistryAbi.Events[river.Event_StreamPlacementUpdated].ID,
}},
}
maxBlockRange = int64(2000)
maxBlockRange = int64(2000) // if too large the number of events in a single rpc call can become too big
retryCounter = 0
)

for from <= to {
toBlock := min(from+maxBlockRange, to)

query.FromBlock = big.NewInt(from)
query.ToBlock = big.NewInt(toBlock)
query.FromBlock, query.ToBlock = big.NewInt(from), big.NewInt(toBlock)

logs, err := s.params.RiverChain.Client.FilterLogs(ctx, query)
if err != nil {
return nil, WrapRiverError(Err_CANNOT_CALL_CONTRACT, err).
Message("Unable to fetch stream changes").
Tags("from", from, "to", toBlock).
Func("retrieveFromDeltas")
log.Error("Unable to retrieve logs from RiverChain", "retry", retryCounter, "err", err)

retryCounter++
if retryCounter > 40 {
return nil, WrapRiverError(Err_CANNOT_CALL_CONTRACT, err).
Message("Unable to fetch stream changes").
Tags("from", from, "to", toBlock).
Func("retrieveFromDeltas")
}

select {
case <-time.After(3 * time.Second):
continue
case <-ctx.Done():
return nil, ctx.Err()
}
}

for _, log := range logs {
if len(log.Topics) == 0 {
for _, event := range logs {
if len(event.Topics) == 0 {
continue
}

switch log.Topics[0] {
switch event.Topics[0] {
case s.params.Registry.StreamRegistryAbi.Events[river.Event_StreamAllocated].ID:
event := new(river.StreamRegistryV1StreamAllocated)
if err := streamRegistryContract.UnpackLog(event, river.Event_StreamAllocated, log); err != nil {
return nil, WrapRiverError(Err_BAD_EVENT, err).
Tags("transaction", log.TxHash, "logIdx", log.Index).
Message("Unable to unpack stream allocated log").
Func("retrieveFromDeltas")
streamAllocatedEvent := new(river.StreamRegistryV1StreamAllocated)
if err := streamRegistryContract.UnpackLog(event, river.Event_StreamAllocated, event); err != nil {
log.Error("Unable to unpack StreamRegistryV1StreamAllocated event",
"transaction", event.TxHash, "logIdx", event.Index, "err", err)
continue
}

if slices.Contains(event.Nodes, s.params.Wallet.Address) {
streams[event.StreamId] = &storage.StreamMetadata{
StreamId: event.StreamId,
RiverChainBlockNumber: log.BlockNumber,
RiverChainBlockLogIndex: log.Index,
Nodes: event.Nodes,
MiniblockHash: event.GenesisMiniblockHash,
MiniblockNumber: 0,
IsSealed: false,
if slices.Contains(streamAllocatedEvent.Nodes, s.params.Wallet.Address) {
streams[streamAllocatedEvent.StreamId] = &storage.StreamMetadata{
StreamId: streamAllocatedEvent.StreamId,
Nodes: streamAllocatedEvent.Nodes,
MiniblockHash: streamAllocatedEvent.GenesisMiniblockHash,
MiniblockNumber: 0,
IsSealed: false,
}
}

case s.params.Registry.StreamRegistryAbi.Events[river.Event_StreamLastMiniblockUpdated].ID:
event := new(river.StreamRegistryV1StreamLastMiniblockUpdated)
if err := streamRegistryContract.UnpackLog(event, river.Event_StreamLastMiniblockUpdated, log); err != nil {
return nil, WrapRiverError(Err_BAD_EVENT, err).
Tags("transaction", log.TxHash, "logIdx", log.Index).
Message("Unable to unpack stream last miniblock updated log").
Func("retrieveFromDeltas")
lastMiniblockUpdatedEvent := new(river.StreamRegistryV1StreamLastMiniblockUpdated)
if err := streamRegistryContract.UnpackLog(event, river.Event_StreamLastMiniblockUpdated, event); err != nil {
log.Error("Unable to unpack StreamRegistryV1StreamLastMiniblockUpdated event",
"transaction", event.TxHash, "logIdx", event.Index, "err", err)
continue
}

if stream, ok := streams[event.StreamId]; ok {
stream.MiniblockHash = common.BytesToHash(event.LastMiniblockHash[:])
stream.MiniblockNumber = int64(event.LastMiniblockNum)
stream.IsSealed = event.IsSealed
if stream, ok := streams[lastMiniblockUpdatedEvent.StreamId]; ok {
stream.MiniblockHash = common.BytesToHash(lastMiniblockUpdatedEvent.LastMiniblockHash[:])
stream.MiniblockNumber = int64(lastMiniblockUpdatedEvent.LastMiniblockNum)
stream.IsSealed = lastMiniblockUpdatedEvent.IsSealed
}

case s.params.Registry.StreamRegistryAbi.Events[river.Event_StreamPlacementUpdated].ID:
event := new(river.StreamRegistryV1StreamPlacementUpdated)
if err := streamRegistryContract.UnpackLog(event, river.Event_StreamPlacementUpdated, log); err != nil {
return nil, WrapRiverError(Err_BAD_EVENT, err).
Tags("transaction", log.TxHash, "logIdx", log.Index).
Message("Unable to unpack stream placement updated log").
Func("retrieveFromDeltas")
streamPlacementUpdatedEvent := new(river.StreamRegistryV1StreamPlacementUpdated)
if err := streamRegistryContract.UnpackLog(event, river.Event_StreamPlacementUpdated, event); err != nil {
log.Error("Unable to unpack StreamRegistryV1StreamPlacementUpdated event",
"transaction", event.TxHash, "logIdx", event.Index, "err", err)
continue
}

if s.params.Wallet.Address == event.NodeAddress {
if event.IsAdded { // stream was replaced to this node
retrievedStream, err := s.params.Registry.GetStream(ctx, event.StreamId, s.params.AppliedBlockNum)
if s.params.Wallet.Address == streamPlacementUpdatedEvent.NodeAddress {
if streamPlacementUpdatedEvent.IsAdded { // stream was replaced to this node
retrievedStream, err := s.params.Registry.GetStream(
ctx, streamPlacementUpdatedEvent.StreamId, s.params.AppliedBlockNum)
if err != nil {
return nil, WrapRiverError(Err_BAD_EVENT, err).
Tags("stream", event.StreamId, "transaction", log.TxHash, "logIdx", log.Index).
Tags("stream", streamPlacementUpdatedEvent.StreamId, "transaction", event.TxHash, "logIdx", event.Index).
Message("Unable to retrieve replaced stream").
Func("retrieveFromDeltas")
}

streams[event.StreamId] = &storage.StreamMetadata{
StreamId: event.StreamId,
RiverChainBlockNumber: log.BlockNumber,
RiverChainBlockLogIndex: log.Index,
Nodes: retrievedStream.Nodes,
MiniblockHash: retrievedStream.LastMiniblockHash,
MiniblockNumber: int64(retrievedStream.LastMiniblockNum),
IsSealed: false,
streams[streamPlacementUpdatedEvent.StreamId] = &storage.StreamMetadata{
StreamId: streamPlacementUpdatedEvent.StreamId,
Nodes: retrievedStream.Nodes,
MiniblockHash: retrievedStream.LastMiniblockHash,
MiniblockNumber: int64(retrievedStream.LastMiniblockNum),
IsSealed: false,
}

slices.DeleteFunc(removals, func(streamID StreamId) bool {

Check failure on line 343 in core/node/events/stream_cache.go

View workflow job for this annotation

GitHub Actions / Common_CI

unusedresult: result of slices.DeleteFunc call not used (govet)
return streamID == event.StreamId
return streamID == streamPlacementUpdatedEvent.StreamId
})
} else { // stream was replaced away from this node
removals = append(removals, event.StreamId)
removals = append(removals, streamPlacementUpdatedEvent.StreamId)
}
}
}
}

retryCounter = 0
from = toBlock + 1
}

Expand Down Expand Up @@ -394,10 +398,11 @@ func (s *streamCacheImpl) onStreamAllocated(
lastAccessedTime: time.Now(),
local: &localStreamState{},
}

stream.nodesLocked.Reset(event.Nodes, s.params.Wallet.Address)
stream, created, err := s.createStreamStorage(ctx, stream, event.GenesisMiniblock)
stream, created, err := s.createStreamStorage(ctx, stream, event.Nodes, event.GenesisMiniblockHash, event.GenesisMiniblock)
if err != nil {
dlog.FromCtx(ctx).Error("Failed to allocate stream", "err", err, "streamId", stream.streamId)
dlog.FromCtx(ctx).Error("Failed to allocate stream", "err", err, "streamId", StreamId(event.StreamId))
}
if created && len(otherEvents) > 0 {
stream.applyStreamEvents(ctx, otherEvents, blockNum)
Expand Down Expand Up @@ -477,7 +482,7 @@ func (s *streamCacheImpl) tryLoadStreamRecord(
// Blockchain record is already created, but this fact is not reflected yet in local storage.
// This may happen if somebody observes record allocation on blockchain and tries to get stream
// while local storage is being initialized.
record, _, mb, blockNum, err := s.params.Registry.GetStreamWithGenesis(ctx, streamId)
record, mbHash, mb, blockNum, err := s.params.Registry.GetStreamWithGenesis(ctx, streamId)
if err != nil {
if !waitForLocal {
return nil, err
Expand Down Expand Up @@ -535,13 +540,15 @@ func (s *streamCacheImpl) tryLoadStreamRecord(
)
}

stream, _, err = s.createStreamStorage(ctx, stream, mb)
stream, _, err = s.createStreamStorage(ctx, stream, record.Nodes, mbHash, mb)
return stream, err
}

func (s *streamCacheImpl) createStreamStorage(
ctx context.Context,
stream *streamImpl,
nodes []common.Address,
mbHash common.Hash,
mb []byte,
) (*streamImpl, bool, error) {
// Lock stream, so parallel creators have to wait for the stream to be intialized.
Expand All @@ -552,7 +559,7 @@ func (s *streamCacheImpl) createStreamStorage(
// TODO: delete entry on failures below?

// Our stream won the race, put into storage.
err := s.params.Storage.CreateStreamStorage(ctx, stream.streamId, mb)
err := s.params.Storage.CreateStreamStorage(ctx, stream.streamId, nodes, mbHash, mb)
if err != nil {
if AsRiverError(err).Code == Err_ALREADY_EXISTS {
// Attempt to load stream from storage. Might as well do it while under lock.
Expand Down
2 changes: 1 addition & 1 deletion core/node/events/stream_sync_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (s *streamCacheImpl) syncStreamFromSinglePeer(
mbs[i] = mb
}

err = stream.importMiniblocks(ctx, mbs)
err = stream.importMiniblocks(ctx, stream.nodesLocked.GetNodes(), mbs)
if err != nil {
return currentFromInclusive, err
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
CREATE TABLE IF NOT EXISTS streams_metadata (
stream_id CHAR(64) NOT NULL,
riverblock_num BIGINT NOT NULL,
riverblock_log_index BIGINT NOT NULL,
nodes CHAR(20)[] NOT NULL,
nodes CHAR(40)[] NOT NULL,
miniblock_hash CHAR(64) NOT NULL,
miniblock_num BIGINT NOT NULL,
is_sealed BOOL NOT NULL,
Expand Down
Loading

0 comments on commit 589d13c

Please sign in to comment.