Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RIVER-1858] Node and corrupt stream metrics #1857

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
e178d8c
Early draft checkin.
clemire Dec 17, 2024
96c3c54
Merge branch 'main' into crystal/archiver-node-metrics
clemire Dec 18, 2024
7c0c573
Mark a stream as corrupt if unavailable for any reason after the grac…
clemire Dec 18, 2024
e8516e5
refactor isCorrupt to have a single path for determining the corrupti…
clemire Dec 18, 2024
deb7a83
fix comment.
clemire Dec 18, 2024
b71d8c8
Revert debug handler stub.
clemire Dec 18, 2024
0292e69
Merge branch 'main' into crystal/archiver-node-metrics
clemire Dec 18, 2024
b621c98
isBehind
clemire Dec 18, 2024
99b8f2f
Convert to strategy of considering a stream corrupt after a maximum n…
clemire Dec 18, 2024
b36b386
Move placement of clearing consecutive failures. Add some logging.
clemire Dec 18, 2024
8207bd1
Reset consecutive update failures count if the stream is up to date.
clemire Dec 18, 2024
6ce7274
Re-establish prod settings.
clemire Dec 18, 2024
91794cc
Do not remove streams from rotation when marking corrupt, allowing th…
clemire Dec 19, 2024
e3d27d7
Revert change.
clemire Dec 19, 2024
af61f32
Comment.
clemire Dec 19, 2024
fa4e5b7
Lint fix - remove unused struct.
clemire Dec 19, 2024
103dade
Merge branch 'main' into crystal/archiver-node-metrics
clemire Dec 19, 2024
4fe83ff
Merge branch 'main' into crystal/archiver-node-metrics
clemire Dec 19, 2024
520a428
Add a replicated test case to validate retry logic with unavailable n…
clemire Dec 21, 2024
cf088c7
Comment for the test case.
clemire Dec 21, 2024
ae458cf
Remove a stream from the task queue if it is marked corrupt from fail…
clemire Dec 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ type DebugEndpointsConfig struct {
Stacks bool
StacksMaxSizeKb int
TxPool bool
Archive bool

// Make storage statistics available via debug endpoints. This may involve running queries
// on the underlying database.
Expand Down
109 changes: 97 additions & 12 deletions core/node/rpc/archiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rpc

import (
"context"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -23,11 +24,23 @@ import (
"github.com/river-build/river/core/node/storage"
)

// nodeUpdateGracePeriod describes the maximum delay we would expect to see
// when a stream's miniblocks updates in the registry before we consider a
// node behind of it does not have the latest miniblock.
// Two blocks plus change
var nodeUpdateGracePeriod = 5 * time.Second
var (
// nodeUpdateGracePeriod describes the maximum delay we would expect to see
// when a stream's miniblocks updates in the registry before we consider a
// node behind and attempt to advance to the next peer to retrieve the miniblock.
// Two blocks plus change
nodeUpdateGracePeriod = 5 * time.Second

// For streams that cannot be updated to the current contract state, this is the
// grace period we give before we consider this stream to be corrupt.
// By the time a stream is this out of date with the contract state, we should have
// cycled through every node that hosts the replicated stream.
staleStreamGracePeriod = 100 * time.Second

// If for some reason the contract state continues to advance, but we are unable to
// download miniblocks past a certain point, consider the stream corrupt.
maxBlocksBehind = 50
)

type contractState struct {
// Everything in the registry state is protected by this mutex.
Expand Down Expand Up @@ -62,7 +75,7 @@ type ArchiveStream struct {
registryState contractState
numBlocksInDb atomic.Int64 // -1 means not loaded

stale atomic.Bool
corrupt atomic.Bool

// Mutex is used so only one archive operation is performed at a time.
mu sync.Mutex
Expand Down Expand Up @@ -219,6 +232,30 @@ func (a *Archiver) setupStatisticsMetrics(factory infra.MetricsFactory) {
)
}

// getCorruptStreams iterates over all streams in the in-memory cache and collects ids for
// streams that are considered corrupt. This list does not represent a snapshot of the archiver
// at any particular state, as the cache iteration is not thread-safe. However, for the purposes
// of generating a periodic report of corrupt streams, this is good enough.
func (a *Archiver) getCorruptStreams(ctx context.Context) map[StreamId]*ArchiveStream {
corruptStreams := make(map[StreamId]*ArchiveStream, 0)

a.streams.Range(
func(key, value any) bool {
stream, ok := value.(*ArchiveStream)
if ok && stream.corrupt.Load() {
corruptStreams[stream.streamId] = stream
} else if !ok {
dlog.FromCtx(ctx).
Error("Unexpected value stored in stream cache (not an ArchiveStream)", "value", value)
}

return true
},
)

return corruptStreams
}

func (a *Archiver) addNewStream(
ctx context.Context,
streamId StreamId,
Expand All @@ -238,12 +275,11 @@ func (a *Archiver) addNewStream(
a.streamsExamined.Add(1)
}

// Consider this node behind for this stream and advance the node pointer
clemire marked this conversation as resolved.
Show resolved Hide resolved
func (a *Archiver) onNodeBehind(
stream *ArchiveStream,
nodeAddr common.Address,
) {
// Mark the stream as stale and advance the node pointer.
stream.stale.Store(true)
// We now consider this node behind for this stream. Let's advance it.
if a.nodeAdvances != nil {
nodeAddress := prometheus.Labels{"node_address": nodeAddr.String()}
Expand All @@ -253,6 +289,15 @@ func (a *Archiver) onNodeBehind(
stream.nodes.AdvanceStickyPeer(nodeAddr)
}

// isCorrupt determines whether we consider a stream corrupt. We consider a stream corrupt
// if we have not been able to catch up to the current contract block for a certain period
// of time, or alternatively if we are unable to download the current miniblock(s) from any
// of the replicas that host the node, and advance the stream locally.
func isCorrupt(mbsInContract int64, mbsInDb int64, timeSinceLastContractUpdate time.Time) bool {
return time.Since(timeSinceLastContractUpdate) > staleStreamGracePeriod ||
mbsInContract-mbsInDb > int64(maxBlocksBehind)
}

// ArchiveStream attempts to add all new miniblocks seen, according to the registry contract,
// since the last time the stream was archived into storage. It creates a new stream for
// streams that have not yet been seen.
Expand Down Expand Up @@ -333,16 +378,36 @@ func (a *Archiver) ArchiveStream(ctx context.Context, stream *ArchiveStream) err
err,
"streamId",
stream.streamId,
"node",
nodeAddr.Hex(),
)
if a.nodeAdvances != nil {
a.nodeAdvances.With(prometheus.Labels{"node_address": nodeAddr.String()}).Inc()
}
stream.nodes.AdvanceStickyPeer(nodeAddr)

if isCorrupt(mbsInContract, mbsInDb, contractMbsUpdated) {
// Mark this stream as corrupt
stream.corrupt.Store(true)
} else {
// We remove the stream from the rotation when it passes the grace period for stale
// streams. Keeping it here gives us a chance to fetch the stream if a node is booting
// or otherwise unavailable for an intermittent period, or fetch from another node if
// only a subset of nodes are unavailable.
time.AfterFunc(5*time.Second, func() {
a.tasks <- stream.streamId
})
}

return err
}

if (err != nil && AsRiverError(err).Code == Err_NOT_FOUND) || resp.Msg == nil || len(resp.Msg.Miniblocks) == 0 {
if time.Since(contractMbsUpdated) > nodeUpdateGracePeriod {
if isCorrupt(mbsInContract, mbsInDb, contractMbsUpdated) {
stream.corrupt.Store(true)
// Do not re-insert this stream back into the task queue, it is now considered un-updatable.
return nil
} else if time.Since(contractMbsUpdated) > nodeUpdateGracePeriod {
a.onNodeBehind(
stream,
nodeAddr,
Expand Down Expand Up @@ -403,14 +468,34 @@ func (a *Archiver) ArchiveStream(ctx context.Context, stream *ArchiveStream) err
a.miniblocksProcessed.Add(uint64(len(serialized)))
}

// All blocks processed, mark stream as current
stream.stale.Store(false)

return nil
}

func (a *Archiver) emitPeriodicCorruptStreamReport(ctx context.Context) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the debug endpoints will not be available until Kerem gets back in the office and sets it up in AWS, so for now let's periodically print a report to logs

ticker := time.NewTicker(15 * time.Minute)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
corruptStreams := a.getCorruptStreams(ctx)

var builder strings.Builder
for _, as := range corruptStreams {
builder.WriteString(as.streamId.String())
builder.WriteString("\n")
}
dlog.FromCtx(ctx).
Info("Corrupt streams report", "total", len(corruptStreams), "streams", builder.String())
}
}
}

func (a *Archiver) Start(ctx context.Context, once bool, metrics infra.MetricsFactory, exitSignal chan<- error) {
defer a.startedWG.Done()
go a.emitPeriodicCorruptStreamReport(ctx)
err := a.startImpl(ctx, once, metrics)
if err != nil {
exitSignal <- err
Expand Down
Loading