Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
129759: rac2: implement handle raft event r=sumeerbhola a=kvoli

This commit implements `HandleRaftEvent` on the `RangeController`. When `HandleRaftEvent` is called by the `Processor` on the leader, the `RangeController` will perform local replica state management for the range and potentially return tokens if admitted has advanced. Additionally, any entries will subject to admission control will have corresponding tokens deducted and tracked.

Resolves: cockroachdb#129668
Release note: None

Co-authored-by: Austen McClernon <[email protected]>
  • Loading branch information
craig[bot] and kvoli committed Aug 29, 2024
2 parents 65fdb8a + 891e8ce commit 91faa66
Show file tree
Hide file tree
Showing 6 changed files with 1,047 additions and 229 deletions.
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvserver/kvflowcontrol",
"//pkg/kv/kvserver/raftlog",
"//pkg/raft/raftpb",
"//pkg/raft/tracker",
"//pkg/roachpb",
Expand All @@ -38,14 +39,20 @@ go_test(
embed = [":rac2"],
deps = [
"//pkg/kv/kvserver/kvflowcontrol",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/kv/kvserver/raftlog",
"//pkg/raft/raftpb",
"//pkg/raft/tracker",
"//pkg/roachpb",
"//pkg/settings/cluster",
"//pkg/testutils/datapathutils",
"//pkg/util/admission/admissionpb",
"//pkg/util/humanizeutil",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/syncutil",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_dustin_go_humanize//:go-humanize",
Expand Down
277 changes: 257 additions & 20 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,19 @@ package rac2
import (
"cmp"
"context"
"fmt"
"reflect"
"slices"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/raft/tracker"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)

Expand Down Expand Up @@ -102,6 +106,8 @@ type FollowerStateInfo struct {
// (Match, Next) is in-flight.
Match uint64
Next uint64
// TODO(kvoli): Find a better home for this, we need it for token return.
Term uint64
// Invariant: Admitted[i] <= Match.
Admitted [raftpb.NumPriorities]uint64
}
Expand Down Expand Up @@ -309,7 +315,27 @@ retry:
//
// Requires replica.raftMu to be held.
func (rc *rangeController) HandleRaftEventRaftMuLocked(ctx context.Context, e RaftEvent) error {
panic("unimplemented")
shouldWaitChange := false
for r, rs := range rc.replicaMap {
info := rc.opts.RaftInterface.FollowerState(r)
shouldWaitChange = shouldWaitChange || rs.handleReadyState(ctx, info)
}
// If there was a quorum change, update the voter sets, triggering the
// refresh channel for any requests waiting for eval tokens.
if shouldWaitChange {
rc.updateVoterSets()
}

// Compute the flow control state for each entry. We do this once here,
// instead of decoding each entry multiple times for all replicas.
entryStates := make([]entryFCState, len(e.Entries))
for i, entry := range e.Entries {
entryStates[i] = getEntryFCStateOrFatal(ctx, entry)
}
for _, rs := range rc.replicaMap {
rs.handleReadyEntries(ctx, entryStates)
}
return nil
}

// HandleSchedulerEventRaftMuLocked processes an event scheduled by the
Expand Down Expand Up @@ -406,12 +432,10 @@ func (rc *rangeController) updateVoterSets() {
// Is a voter.
rs := rc.replicaMap[r.ReplicaID]
vsfw := voterStateForWaiters{
replicaID: r.ReplicaID,
isLeader: r.ReplicaID == rc.opts.LocalReplicaID,
isLeaseHolder: r.ReplicaID == rc.leaseholder,
// TODO(rac2): Once the send stream is added, check that the send stream
// is initialized here as well.
isStateReplicate: rs.connectedState.shouldWaitForElasticEvalTokens(),
replicaID: r.ReplicaID,
isLeader: r.ReplicaID == rc.opts.LocalReplicaID,
isLeaseHolder: r.ReplicaID == rc.leaseholder,
isStateReplicate: rs.isStateReplicate(),
evalTokenCounter: rs.evalTokenCounter,
}
if isOld {
Expand All @@ -431,10 +455,11 @@ type replicaState struct {
// stream aggregates across the streams for the same (tenant, store). This
// is the identity that is used to deduct tokens or wait for tokens to be
// positive.
stream kvflowcontrol.Stream
evalTokenCounter TokenCounter
desc roachpb.ReplicaDescriptor
connectedState connectedState
stream kvflowcontrol.Stream
evalTokenCounter, sendTokenCounter TokenCounter
desc roachpb.ReplicaDescriptor

sendStream *replicaSendStream
}

func NewReplicaState(
Expand All @@ -445,21 +470,215 @@ func NewReplicaState(
parent: parent,
stream: stream,
evalTokenCounter: parent.opts.SSTokenCounter.Eval(stream),
sendTokenCounter: parent.opts.SSTokenCounter.Send(stream),
desc: desc,
}

// TODO(rac2): Construct the sendStream state here if the replica is in state
// replicate.
state := parent.opts.RaftInterface.FollowerState(desc.ReplicaID)
switch state.State {
case tracker.StateReplicate:
rs.connectedState = replicate
if state.State == tracker.StateReplicate {
rs.createReplicaSendStream()
}

return rs
}

type replicaSendStream struct {
parent *replicaState

mu struct {
syncutil.Mutex
connectedState connectedState
tracker Tracker
closed bool
}
}

func (rs *replicaState) createReplicaSendStream() {
// Must be in StateReplicate on creation.
rs.sendStream = &replicaSendStream{
parent: rs,
}
rs.sendStream.mu.tracker.Init(rs.stream)
rs.sendStream.mu.connectedState = replicate
rs.sendStream.mu.closed = false
}

func (rs *replicaState) isStateReplicate() bool {
if rs.sendStream == nil {
return false
}
rs.sendStream.mu.Lock()
defer rs.sendStream.mu.Unlock()

return rs.sendStream.mu.connectedState.shouldWaitForElasticEvalTokens()
}

type entryFCState struct {
term, index uint64
usesFlowControl bool
tokens kvflowcontrol.Tokens
pri raftpb.Priority
}

// getEntryFCStateOrFatal returns the given entry's flow control state. If the
// entry encoding cannot be determined, a fatal is logged.
func getEntryFCStateOrFatal(ctx context.Context, entry raftpb.Entry) entryFCState {
enc, pri, err := raftlog.EncodingOf(entry)
if err != nil {
log.Fatalf(ctx, "error getting encoding of entry: %v", err)
}

if enc == raftlog.EntryEncodingStandardWithAC || enc == raftlog.EntryEncodingSideloadedWithAC {
// When the entry is encoded with the v1 encoding, we don't have access to
// the priority via the priority bit and would need to decode the admission
// metadata. Instead, assume the priority is low priority, which is the
// only sane flow control priority enforcement level in v1 (elastic only).
pri = raftpb.LowPri
}

return entryFCState{
index: entry.Index,
term: entry.Term,
usesFlowControl: enc.UsesAdmissionControl(),
tokens: kvflowcontrol.Tokens(len(entry.Data)),
pri: pri,
}
}

func (rs *replicaState) handleReadyEntries(ctx context.Context, entries []entryFCState) {
if rs.sendStream == nil {
return
}
rs.sendStream.mu.Lock()
defer rs.sendStream.mu.Unlock()

for _, entry := range entries {
if !entry.usesFlowControl {
continue
}
rs.sendStream.mu.tracker.Track(ctx, entry.term, entry.index, entry.pri, entry.tokens)
rs.evalTokenCounter.Deduct(
ctx, WorkClassFromRaftPriority(entry.pri), entry.tokens)
rs.sendTokenCounter.Deduct(
ctx, WorkClassFromRaftPriority(entry.pri), entry.tokens)
}
}

// handleReadyState handles state management for the replica based on the
// provided follower state information. If the state changes in a way that
// affects requests waiting for evaluation, returns true.
func (rs *replicaState) handleReadyState(
ctx context.Context, info FollowerStateInfo,
) (shouldWaitChange bool) {
switch info.State {
case tracker.StateProbe:
rs.connectedState = probeRecentlyReplicate
if rs.sendStream != nil {
// TODO(kvoli): delay this by 1s.
rs.closeSendStream(ctx)
shouldWaitChange = true
}
case tracker.StateReplicate:
if rs.sendStream == nil {
rs.createReplicaSendStream()
shouldWaitChange = true
} else {
shouldWaitChange = rs.sendStream.makeConsistentInStateReplicate(ctx, info)
}
case tracker.StateSnapshot:
rs.connectedState = snapshot
if rs.sendStream != nil {
switch func() connectedState {
rs.sendStream.mu.Lock()
defer rs.sendStream.mu.Unlock()
return rs.sendStream.mu.connectedState
}() {
case replicate:
rs.sendStream.changeToStateSnapshot(ctx)
shouldWaitChange = true
case probeRecentlyReplicate:
rs.closeSendStream(ctx)
shouldWaitChange = true
case snapshot:
}
}
}
return shouldWaitChange
}

func (rss *replicaState) closeSendStream(ctx context.Context) {
rss.sendStream.mu.Lock()
defer rss.sendStream.mu.Unlock()

if rss.sendStream.mu.connectedState != snapshot {
// changeToStateSnapshot returns all tokens, as we have no liveness
// guarantee of their return with the send stream now closed.
rss.sendStream.changeToStateSnapshotLocked(ctx)
}
rss.sendStream.mu.closed = true
rss.sendStream = nil
}

func (rss *replicaSendStream) makeConsistentInStateReplicate(
ctx context.Context, info FollowerStateInfo,
) (shouldWaitChange bool) {
rss.mu.Lock()
defer rss.mu.Unlock()
defer rss.returnTokens(ctx, rss.mu.tracker.Untrack(info.Term, info.Admitted))

// The leader is always in state replicate.
if rss.parent.parent.opts.LocalReplicaID == rss.parent.desc.ReplicaID {
if rss.mu.connectedState != replicate {
log.Fatalf(ctx, "%v", errors.AssertionFailedf(
"leader should always be in state replicate but found in %v",
rss.mu.connectedState))
}
return false
}

// Follower replica case. Update the connected state.
switch rss.mu.connectedState {
case replicate:
case probeRecentlyReplicate:
rss.mu.connectedState = replicate
case snapshot:
rss.mu.connectedState = replicate
shouldWaitChange = true
}
return shouldWaitChange
}

// changeToStateSnapshot changes the connected state to snapshot and returns
// all tracked entries' tokens.
func (rss *replicaSendStream) changeToStateSnapshot(ctx context.Context) {
rss.mu.Lock()
defer rss.mu.Unlock()

rss.changeToStateSnapshotLocked(ctx)
}

// changeToStateSnapshot changes the connected state to snapshot and returns
// all tracked entries' tokens.
//
// Requires rs.mu to be held.
func (rss *replicaSendStream) changeToStateSnapshotLocked(ctx context.Context) {
rss.mu.connectedState = snapshot
// Since the replica is now in StateSnapshot, there is no need for Raft to
// send MsgApp pings to discover what has been missed. So there is no
// liveness guarantee on when these tokens will be returned, and therefore we
// return all tokens in the tracker.
rss.returnTokens(ctx, rss.mu.tracker.UntrackAll())
}

// returnTokens takes the tokens untracked by the tracker and returns them to
// the eval and send token counters.
func (rss *replicaSendStream) returnTokens(
ctx context.Context, returned [raftpb.NumPriorities]kvflowcontrol.Tokens,
) {
for pri, tokens := range returned {
pri := raftpb.Priority(pri)
if tokens > 0 {
rss.parent.evalTokenCounter.Return(ctx, WorkClassFromRaftPriority(pri), tokens)
rss.parent.sendTokenCounter.Return(ctx, WorkClassFromRaftPriority(pri), tokens)
}
}
return rs
}

type connectedState uint32
Expand Down Expand Up @@ -505,3 +724,21 @@ const (
func (cs connectedState) shouldWaitForElasticEvalTokens() bool {
return cs == replicate || cs == probeRecentlyReplicate
}

func (cs connectedState) String() string {
return redact.StringWithoutMarkers(cs)
}

// SafeFormat implements the redact.SafeFormatter interface.
func (cs connectedState) SafeFormat(w redact.SafePrinter, _ rune) {
switch cs {
case replicate:
w.SafeString("replicate")
case probeRecentlyReplicate:
w.SafeString("probeRecentlyReplicate")
case snapshot:
w.SafeString("snapshot")
default:
panic(fmt.Sprintf("unknown connectedState %v", cs))
}
}
Loading

0 comments on commit 91faa66

Please sign in to comment.