Skip to content

Commit

Permalink
Refactor stream cache to use xsync for improved performance and safet…
Browse files Browse the repository at this point in the history
…y. (#1570)
  • Loading branch information
sergekh2 authored Nov 19, 2024
1 parent de586d0 commit 890a725
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 62 deletions.
39 changes: 17 additions & 22 deletions core/node/events/stream_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package events
import (
"context"
"slices"
"sync"
"time"

"github.com/gammazero/workerpool"
"github.com/prometheus/client_golang/prometheus"
"github.com/puzpuzpuz/xsync/v3"

"github.com/river-build/river/core/config"
"github.com/river-build/river/core/contracts/river"
Expand Down Expand Up @@ -50,7 +50,7 @@ type streamCacheImpl struct {
// streamId -> *streamImpl
// cache is populated by getting all streams that should be on local node from River chain.
// streamImpl can be in unloaded state, in which case it will be loaded on first GetStream call.
cache sync.Map
cache *xsync.MapOf[StreamId, *streamImpl]

chainConfig crypto.OnChainConfiguration

Expand All @@ -68,6 +68,7 @@ func NewStreamCache(
) (*streamCacheImpl, error) {
s := &streamCacheImpl{
params: params,
cache: xsync.NewMapOf[StreamId, *streamImpl](),
streamCacheSizeGauge: params.Metrics.NewGaugeVecEx(
"stream_cache_size", "Number of streams in stream cache",
"chain_id", "address",
Expand Down Expand Up @@ -160,14 +161,12 @@ func (s *streamCacheImpl) onStreamLastMiniblockUpdated(
ctx context.Context,
event *river.StreamRegistryV1StreamLastMiniblockUpdated,
) {
entry, _ := s.cache.Load(StreamId(event.StreamId))
if entry == nil {
stream, _ := s.cache.Load(StreamId(event.StreamId))
if stream == nil {
// Stream is not local, ignore.
return
}

stream := entry.(*streamImpl)

view, err := stream.getView(ctx)
if err != nil {
dlog.FromCtx(ctx).Error("onStreamLastMiniblockUpdated: failed to get stream view", "err", err)
Expand Down Expand Up @@ -230,10 +229,10 @@ func (s *streamCacheImpl) CacheCleanup(ctx context.Context, enabled bool, expira

// TODO: add data structure that supports to loop over streams that have their view loaded instead of
// looping over all streams.
s.cache.Range(func(streamID, streamVal any) bool {
s.cache.Range(func(streamID StreamId, stream *streamImpl) bool {
result.TotalStreams++
if enabled {
if stream := streamVal.(*streamImpl); stream.tryCleanup(expiration) {
if stream.tryCleanup(expiration) {
result.UnloadedStreams++
log.Debug("stream view is unloaded from cache", "streamId", stream.streamId)
}
Expand Down Expand Up @@ -273,9 +272,9 @@ func (s *streamCacheImpl) tryLoadStreamRecord(
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(delay):
entry, _ := s.cache.Load(streamId)
if entry != nil {
return entry.(*streamImpl), nil
stream, _ := s.cache.Load(streamId)
if stream != nil {
return stream, nil
}
record, _, mb, err = s.params.Registry.GetStreamWithGenesis(ctx, streamId)
if err == nil {
Expand Down Expand Up @@ -361,8 +360,7 @@ func (s *streamCacheImpl) createStreamStorage(
if entry == nil {
return nil, RiverError(Err_INTERNAL, "tryLoadStreamRecord: Cache corruption", "streamId", stream.streamId)
}
stream = entry.(*streamImpl)
return stream, nil
return entry, nil
}
}

Expand All @@ -375,25 +373,23 @@ func (s *streamCacheImpl) GetStream(ctx context.Context, streamId StreamId) (Syn
}

func (s *streamCacheImpl) getStreamImpl(ctx context.Context, streamId StreamId) (*streamImpl, error) {
entry, _ := s.cache.Load(streamId)
if entry == nil {
stream, _ := s.cache.Load(streamId)
if stream == nil {
return s.tryLoadStreamRecord(ctx, streamId)
}
return entry.(*streamImpl), nil
return stream, nil
}

func (s *streamCacheImpl) ForceFlushAll(ctx context.Context) {
s.cache.Range(func(key, value interface{}) bool {
stream := value.(*streamImpl)
s.cache.Range(func(streamID StreamId, stream *streamImpl) bool {
stream.ForceFlush(ctx)
return true
})
}

func (s *streamCacheImpl) GetLoadedViews(ctx context.Context) []StreamView {
var result []StreamView
s.cache.Range(func(key, value interface{}) bool {
stream := value.(*streamImpl)
s.cache.Range(func(streamID StreamId, stream *streamImpl) bool {
view := stream.tryGetView()
if view != nil {
result = append(result, view)
Expand All @@ -405,8 +401,7 @@ func (s *streamCacheImpl) GetLoadedViews(ctx context.Context) []StreamView {

func (s *streamCacheImpl) GetMbCandidateStreams(ctx context.Context) []*streamImpl {
var candidates []*streamImpl
s.cache.Range(func(key, value interface{}) bool {
stream := value.(*streamImpl)
s.cache.Range(func(streamID StreamId, stream *streamImpl) bool {
if stream.canCreateMiniblock() {
candidates = append(candidates, stream)
}
Expand Down
64 changes: 24 additions & 40 deletions core/node/events/stream_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@ func TestStreamCacheViewEviction(t *testing.T) {

streamCache := tc.initCache(0, nil)

streamCache.cache.Range(func(key, value any) bool {
require.Fail("stream cache must be empty")
return true
})
require.Zero(streamCache.cache.Size(), "stream cache must be empty")

node := tc.getBC()
streamID := testutils.FakeStreamId(STREAM_SPACE_BIN)
Expand All @@ -44,9 +41,8 @@ func TestStreamCacheViewEviction(t *testing.T) {
// stream just loaded and should be with view in cache
streamWithoutLoadedView := 0
streamWithLoadedViewCount := 0
streamCache.cache.Range(func(key, value any) bool {
stream := value.(*streamImpl)
if stream.view() == nil {
streamCache.cache.Range(func(key StreamId, value *streamImpl) bool {
if value.view() == nil {
streamWithoutLoadedView++
} else {
streamWithLoadedViewCount++
Expand All @@ -71,9 +67,8 @@ func TestStreamCacheViewEviction(t *testing.T) {
// cache must have view dropped even there is a subscriber
streamWithoutLoadedView = 0
streamWithLoadedViewCount = 0
streamCache.cache.Range(func(key, value any) bool {
stream := value.(*streamImpl)
if stream.view() == nil {
streamCache.cache.Range(func(key StreamId, value *streamImpl) bool {
if value.view() == nil {
streamWithoutLoadedView++
} else {
streamWithLoadedViewCount++
Expand All @@ -94,9 +89,8 @@ func TestStreamCacheViewEviction(t *testing.T) {

streamWithoutLoadedView = 0
streamWithLoadedViewCount = 0
streamCache.cache.Range(func(key, value any) bool {
stream := value.(*streamImpl)
if stream.view() == nil {
streamCache.cache.Range(func(key StreamId, value *streamImpl) bool {
if value.view() == nil {
streamWithoutLoadedView++
} else {
streamWithLoadedViewCount++
Expand All @@ -113,9 +107,8 @@ func TestStreamCacheViewEviction(t *testing.T) {
require.NoError(err, "get view")
streamWithoutLoadedView = 0
streamWithLoadedViewCount = 0
streamCache.cache.Range(func(key, value any) bool {
stream := value.(*streamImpl)
if stream.view() == nil {
streamCache.cache.Range(func(key StreamId, value *streamImpl) bool {
if value.view() == nil {
streamWithoutLoadedView++
} else {
streamWithLoadedViewCount++
Expand All @@ -135,10 +128,7 @@ func TestCacheEvictionWithFilledMiniBlockPool(t *testing.T) {

streamCache := tc.initCache(0, nil)

streamCache.cache.Range(func(key, value any) bool {
require.Fail("stream cache must be empty")
return true
})
require.Zero(streamCache.cache.Size(), "stream cache must be empty")

node := tc.getBC()
streamID := testutils.FakeStreamId(STREAM_SPACE_BIN)
Expand All @@ -154,9 +144,8 @@ func TestCacheEvictionWithFilledMiniBlockPool(t *testing.T) {
// stream just loaded and should have view loaded
streamWithoutLoadedView := 0
streamWithLoadedViewCount := 0
streamCache.cache.Range(func(key, value any) bool {
stream := value.(*streamImpl)
if stream.view() == nil {
streamCache.cache.Range(func(key StreamId, value *streamImpl) bool {
if value.view() == nil {
streamWithoutLoadedView++
} else {
streamWithLoadedViewCount++
Expand All @@ -172,7 +161,7 @@ func TestCacheEvictionWithFilledMiniBlockPool(t *testing.T) {
streamCache.CacheCleanup(ctxShort, true, time.Millisecond)
cancelShort()
loadedStream, _ := streamCache.cache.Load(streamID)
require.Nil(loadedStream.(*streamImpl).view(), "view not unloaded")
require.Nil(loadedStream.view(), "view not unloaded")

// try to create a miniblock, pool is empty so it should not fail but also should not create a miniblock
_ = tc.makeMiniblock(0, streamID, false)
Expand All @@ -193,7 +182,7 @@ func TestCacheEvictionWithFilledMiniBlockPool(t *testing.T) {
streamCache.CacheCleanup(ctxShort, true, time.Millisecond)
cancelShort()
loadedStream, _ = streamCache.cache.Load(streamID)
require.NotNil(loadedStream.(*streamImpl).view(), "view unloaded")
require.NotNil(loadedStream.view(), "view unloaded")

// now it should be possible to create a miniblock
mbRef := tc.makeMiniblock(0, streamID, false)
Expand All @@ -206,7 +195,7 @@ func TestCacheEvictionWithFilledMiniBlockPool(t *testing.T) {
streamCache.CacheCleanup(ctxShort, true, time.Millisecond)
cancelShort()
loadedStream, _ = streamCache.cache.Load(streamID)
require.Nil(loadedStream.(*streamImpl).view(), "view loaded in cache")
require.Nil(loadedStream.view(), "view loaded in cache")
}

type testStreamCacheViewEvictionSub struct {
Expand Down Expand Up @@ -246,7 +235,7 @@ func TestStreamMiniblockBatchProduction(t *testing.T) {

streamCache := tc.initCache(0, nil)

streamCache.cache.Range(func(key, value any) bool {
streamCache.cache.Range(func(key StreamId, value *streamImpl) bool {
require.Fail("stream cache must be empty")
return true
})
Expand All @@ -265,12 +254,12 @@ func TestStreamMiniblockBatchProduction(t *testing.T) {
go func(streamID StreamId, genesis *Miniblock) {
defer wg.Done()

streamSync, err := streamCache.GetStream(ctx, streamID)
streamSync, err := streamCache.getStreamImpl(ctx, streamID)
require.NoError(err, "get stream")

// unload view for half of the streams
if streamID[1]%2 == 1 {
ss := streamSync.(*streamImpl)
ss := streamSync
ss.tryCleanup(time.Duration(0))
}

Expand Down Expand Up @@ -339,34 +328,29 @@ func TestStreamMiniblockBatchProduction(t *testing.T) {
}

func isCacheEmpty(streamCache *streamCacheImpl) bool {
empty := true
streamCache.cache.Range(func(key, value any) bool {
empty = false
return false
})
return empty
return streamCache.cache.Size() == 0
}

func cleanUpCache(streamCache *streamCacheImpl) bool {
cleanedUp := true
streamCache.cache.Range(func(key, streamVal any) bool {
cleanedUp = cleanedUp && streamVal.(*streamImpl).tryCleanup(0)
streamCache.cache.Range(func(key StreamId, streamVal *streamImpl) bool {
cleanedUp = cleanedUp && streamVal.tryCleanup(0)
return true
})
return cleanedUp
}

func areAllViewsDropped(streamCache *streamCacheImpl) bool {
allDropped := true
streamCache.cache.Range(func(key, streamVal any) bool {
st := streamVal.(*streamImpl).getStatus()
streamCache.cache.Range(func(key StreamId, streamVal *streamImpl) bool {
st := streamVal.getStatus()
allDropped = allDropped && !st.loaded
return true
})
return allDropped
}

// TODO: temp disable flacky test. Passes locally, often fails on CI.
// TODO: temp disable flaky test. Passes locally, often fails on CI.
func Disabled_TestStreamUnloadWithSubscribers(t *testing.T) {
require := require.New(t)
ctx, tc := makeCacheTestContext(t, testParams{})
Expand Down

0 comments on commit 890a725

Please sign in to comment.