diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel index fed4caeead8e..df2a993e1017 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel @@ -13,6 +13,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/kv/kvserver/kvflowcontrol", + "//pkg/kv/kvserver/raftlog", "//pkg/raft/raftpb", "//pkg/raft/tracker", "//pkg/roachpb", @@ -38,14 +39,20 @@ go_test( embed = [":rac2"], deps = [ "//pkg/kv/kvserver/kvflowcontrol", + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", + "//pkg/kv/kvserver/kvserverbase", + "//pkg/kv/kvserver/kvserverpb", + "//pkg/kv/kvserver/raftlog", "//pkg/raft/raftpb", "//pkg/raft/tracker", "//pkg/roachpb", "//pkg/settings/cluster", + "//pkg/testutils/datapathutils", "//pkg/util/admission/admissionpb", "//pkg/util/humanizeutil", "//pkg/util/leaktest", "//pkg/util/log", + "//pkg/util/protoutil", "//pkg/util/syncutil", "@com_github_cockroachdb_datadriven//:datadriven", "@com_github_dustin_go_humanize//:go-humanize", diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go index 967ee2197811..e62f65217c9b 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go @@ -13,15 +13,19 @@ package rac2 import ( "cmp" "context" + "fmt" "reflect" "slices" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog" "github.com/cockroachdb/cockroach/pkg/raft/raftpb" "github.com/cockroachdb/cockroach/pkg/raft/tracker" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" ) @@ -102,6 +106,8 @@ type FollowerStateInfo struct { // (Match, Next) is in-flight. Match uint64 Next uint64 + // TODO(kvoli): Find a better home for this, we need it for token return. + Term uint64 // Invariant: Admitted[i] <= Match. Admitted [raftpb.NumPriorities]uint64 } @@ -309,7 +315,27 @@ retry: // // Requires replica.raftMu to be held. func (rc *rangeController) HandleRaftEventRaftMuLocked(ctx context.Context, e RaftEvent) error { - panic("unimplemented") + shouldWaitChange := false + for r, rs := range rc.replicaMap { + info := rc.opts.RaftInterface.FollowerState(r) + shouldWaitChange = shouldWaitChange || rs.handleReadyState(ctx, info) + } + // If there was a quorum change, update the voter sets, triggering the + // refresh channel for any requests waiting for eval tokens. + if shouldWaitChange { + rc.updateVoterSets() + } + + // Compute the flow control state for each entry. We do this once here, + // instead of decoding each entry multiple times for all replicas. + entryStates := make([]entryFCState, len(e.Entries)) + for i, entry := range e.Entries { + entryStates[i] = getEntryFCStateOrFatal(ctx, entry) + } + for _, rs := range rc.replicaMap { + rs.handleReadyEntries(ctx, entryStates) + } + return nil } // HandleSchedulerEventRaftMuLocked processes an event scheduled by the @@ -406,12 +432,10 @@ func (rc *rangeController) updateVoterSets() { // Is a voter. rs := rc.replicaMap[r.ReplicaID] vsfw := voterStateForWaiters{ - replicaID: r.ReplicaID, - isLeader: r.ReplicaID == rc.opts.LocalReplicaID, - isLeaseHolder: r.ReplicaID == rc.leaseholder, - // TODO(rac2): Once the send stream is added, check that the send stream - // is initialized here as well. - isStateReplicate: rs.connectedState.shouldWaitForElasticEvalTokens(), + replicaID: r.ReplicaID, + isLeader: r.ReplicaID == rc.opts.LocalReplicaID, + isLeaseHolder: r.ReplicaID == rc.leaseholder, + isStateReplicate: rs.isStateReplicate(), evalTokenCounter: rs.evalTokenCounter, } if isOld { @@ -431,10 +455,11 @@ type replicaState struct { // stream aggregates across the streams for the same (tenant, store). This // is the identity that is used to deduct tokens or wait for tokens to be // positive. - stream kvflowcontrol.Stream - evalTokenCounter TokenCounter - desc roachpb.ReplicaDescriptor - connectedState connectedState + stream kvflowcontrol.Stream + evalTokenCounter, sendTokenCounter TokenCounter + desc roachpb.ReplicaDescriptor + + sendStream *replicaSendStream } func NewReplicaState( @@ -445,21 +470,215 @@ func NewReplicaState( parent: parent, stream: stream, evalTokenCounter: parent.opts.SSTokenCounter.Eval(stream), + sendTokenCounter: parent.opts.SSTokenCounter.Send(stream), desc: desc, } - - // TODO(rac2): Construct the sendStream state here if the replica is in state - // replicate. state := parent.opts.RaftInterface.FollowerState(desc.ReplicaID) - switch state.State { - case tracker.StateReplicate: - rs.connectedState = replicate + if state.State == tracker.StateReplicate { + rs.createReplicaSendStream() + } + + return rs +} + +type replicaSendStream struct { + parent *replicaState + + mu struct { + syncutil.Mutex + connectedState connectedState + tracker Tracker + closed bool + } +} + +func (rs *replicaState) createReplicaSendStream() { + // Must be in StateReplicate on creation. + rs.sendStream = &replicaSendStream{ + parent: rs, + } + rs.sendStream.mu.tracker.Init(rs.stream) + rs.sendStream.mu.connectedState = replicate + rs.sendStream.mu.closed = false +} + +func (rs *replicaState) isStateReplicate() bool { + if rs.sendStream == nil { + return false + } + rs.sendStream.mu.Lock() + defer rs.sendStream.mu.Unlock() + + return rs.sendStream.mu.connectedState.shouldWaitForElasticEvalTokens() +} + +type entryFCState struct { + term, index uint64 + usesFlowControl bool + tokens kvflowcontrol.Tokens + pri raftpb.Priority +} + +// getEntryFCStateOrFatal returns the given entry's flow control state. If the +// entry encoding cannot be determined, a fatal is logged. +func getEntryFCStateOrFatal(ctx context.Context, entry raftpb.Entry) entryFCState { + enc, pri, err := raftlog.EncodingOf(entry) + if err != nil { + log.Fatalf(ctx, "error getting encoding of entry: %v", err) + } + + if enc == raftlog.EntryEncodingStandardWithAC || enc == raftlog.EntryEncodingSideloadedWithAC { + // When the entry is encoded with the v1 encoding, we don't have access to + // the priority via the priority bit and would need to decode the admission + // metadata. Instead, assume the priority is low priority, which is the + // only sane flow control priority enforcement level in v1 (elastic only). + pri = raftpb.LowPri + } + + return entryFCState{ + index: entry.Index, + term: entry.Term, + usesFlowControl: enc.UsesAdmissionControl(), + tokens: kvflowcontrol.Tokens(len(entry.Data)), + pri: pri, + } +} + +func (rs *replicaState) handleReadyEntries(ctx context.Context, entries []entryFCState) { + if rs.sendStream == nil { + return + } + rs.sendStream.mu.Lock() + defer rs.sendStream.mu.Unlock() + + for _, entry := range entries { + if !entry.usesFlowControl { + continue + } + rs.sendStream.mu.tracker.Track(ctx, entry.term, entry.index, entry.pri, entry.tokens) + rs.evalTokenCounter.Deduct( + ctx, WorkClassFromRaftPriority(entry.pri), entry.tokens) + rs.sendTokenCounter.Deduct( + ctx, WorkClassFromRaftPriority(entry.pri), entry.tokens) + } +} + +// handleReadyState handles state management for the replica based on the +// provided follower state information. If the state changes in a way that +// affects requests waiting for evaluation, returns true. +func (rs *replicaState) handleReadyState( + ctx context.Context, info FollowerStateInfo, +) (shouldWaitChange bool) { + switch info.State { case tracker.StateProbe: - rs.connectedState = probeRecentlyReplicate + if rs.sendStream != nil { + // TODO(kvoli): delay this by 1s. + rs.closeSendStream(ctx) + shouldWaitChange = true + } + case tracker.StateReplicate: + if rs.sendStream == nil { + rs.createReplicaSendStream() + shouldWaitChange = true + } else { + shouldWaitChange = rs.sendStream.makeConsistentInStateReplicate(ctx, info) + } case tracker.StateSnapshot: - rs.connectedState = snapshot + if rs.sendStream != nil { + switch func() connectedState { + rs.sendStream.mu.Lock() + defer rs.sendStream.mu.Unlock() + return rs.sendStream.mu.connectedState + }() { + case replicate: + rs.sendStream.changeToStateSnapshot(ctx) + shouldWaitChange = true + case probeRecentlyReplicate: + rs.closeSendStream(ctx) + shouldWaitChange = true + case snapshot: + } + } + } + return shouldWaitChange +} + +func (rss *replicaState) closeSendStream(ctx context.Context) { + rss.sendStream.mu.Lock() + defer rss.sendStream.mu.Unlock() + + if rss.sendStream.mu.connectedState != snapshot { + // changeToStateSnapshot returns all tokens, as we have no liveness + // guarantee of their return with the send stream now closed. + rss.sendStream.changeToStateSnapshotLocked(ctx) + } + rss.sendStream.mu.closed = true + rss.sendStream = nil +} + +func (rss *replicaSendStream) makeConsistentInStateReplicate( + ctx context.Context, info FollowerStateInfo, +) (shouldWaitChange bool) { + rss.mu.Lock() + defer rss.mu.Unlock() + defer rss.returnTokens(ctx, rss.mu.tracker.Untrack(info.Term, info.Admitted)) + + // The leader is always in state replicate. + if rss.parent.parent.opts.LocalReplicaID == rss.parent.desc.ReplicaID { + if rss.mu.connectedState != replicate { + log.Fatalf(ctx, "%v", errors.AssertionFailedf( + "leader should always be in state replicate but found in %v", + rss.mu.connectedState)) + } + return false + } + + // Follower replica case. Update the connected state. + switch rss.mu.connectedState { + case replicate: + case probeRecentlyReplicate: + rss.mu.connectedState = replicate + case snapshot: + rss.mu.connectedState = replicate + shouldWaitChange = true + } + return shouldWaitChange +} + +// changeToStateSnapshot changes the connected state to snapshot and returns +// all tracked entries' tokens. +func (rss *replicaSendStream) changeToStateSnapshot(ctx context.Context) { + rss.mu.Lock() + defer rss.mu.Unlock() + + rss.changeToStateSnapshotLocked(ctx) +} + +// changeToStateSnapshot changes the connected state to snapshot and returns +// all tracked entries' tokens. +// +// Requires rs.mu to be held. +func (rss *replicaSendStream) changeToStateSnapshotLocked(ctx context.Context) { + rss.mu.connectedState = snapshot + // Since the replica is now in StateSnapshot, there is no need for Raft to + // send MsgApp pings to discover what has been missed. So there is no + // liveness guarantee on when these tokens will be returned, and therefore we + // return all tokens in the tracker. + rss.returnTokens(ctx, rss.mu.tracker.UntrackAll()) +} + +// returnTokens takes the tokens untracked by the tracker and returns them to +// the eval and send token counters. +func (rss *replicaSendStream) returnTokens( + ctx context.Context, returned [raftpb.NumPriorities]kvflowcontrol.Tokens, +) { + for pri, tokens := range returned { + pri := raftpb.Priority(pri) + if tokens > 0 { + rss.parent.evalTokenCounter.Return(ctx, WorkClassFromRaftPriority(pri), tokens) + rss.parent.sendTokenCounter.Return(ctx, WorkClassFromRaftPriority(pri), tokens) + } } - return rs } type connectedState uint32 @@ -505,3 +724,21 @@ const ( func (cs connectedState) shouldWaitForElasticEvalTokens() bool { return cs == replicate || cs == probeRecentlyReplicate } + +func (cs connectedState) String() string { + return redact.StringWithoutMarkers(cs) +} + +// SafeFormat implements the redact.SafeFormatter interface. +func (cs connectedState) SafeFormat(w redact.SafePrinter, _ rune) { + switch cs { + case replicate: + w.SafeString("replicate") + case probeRecentlyReplicate: + w.SafeString("probeRecentlyReplicate") + case snapshot: + w.SafeString("snapshot") + default: + panic(fmt.Sprintf("unknown connectedState %v", cs)) + } +} diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go index 6113715bcb52..cbbf949346b9 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go @@ -20,11 +20,19 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog" + "github.com/cockroachdb/cockroach/pkg/raft/raftpb" "github.com/cockroachdb/cockroach/pkg/raft/tracker" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/datadriven" "github.com/stretchr/testify/require" @@ -84,6 +92,31 @@ func (r *testingRCRange) startWaitForEval(name string, pri admissionpb.WorkPrior }() } +func (r *testingRCRange) admit( + ctx context.Context, + t *testing.T, + storeID roachpb.StoreID, + term uint64, + toIndex uint64, + pri admissionpb.WorkPriority, +) { + r.mu.Lock() + + for _, replica := range r.mu.r.replicaSet { + if replica.desc.StoreID == storeID { + replica := replica + replica.info.Admitted[AdmissionToRaftPriority(pri)] = toIndex + replica.info.Term = term + r.mu.r.replicaSet[replica.desc.ReplicaID] = replica + break + } + } + + r.mu.Unlock() + // Send an empty raft event in order to trigger potential token return. + require.NoError(t, r.rc.HandleRaftEventRaftMuLocked(ctx, RaftEvent{})) +} + type testingRange struct { rangeID roachpb.RangeID tenantID roachpb.TenantID @@ -233,12 +266,59 @@ func parsePriority(t *testing.T, input string) admissionpb.WorkPriority { case "HighPri": return admissionpb.HighPri default: - require.Fail(t, "unknown work class") + require.Failf(t, "unknown work class", "%v", input) return admissionpb.WorkPriority(-1) } } -// TestRangeControllerWaitForEval tests the RangeController WaitForEval method. +type entryInfo struct { + term uint64 + index uint64 + enc raftlog.EntryEncoding + pri raftpb.Priority + tokens kvflowcontrol.Tokens +} + +func testingCreateEntry(t *testing.T, info entryInfo) raftpb.Entry { + cmdID := kvserverbase.CmdIDKey("11111111") + var metaBuf []byte + if info.enc.UsesAdmissionControl() { + meta := kvflowcontrolpb.RaftAdmissionMeta{ + AdmissionPriority: int32(info.pri), + } + var err error + metaBuf, err = protoutil.Marshal(&meta) + require.NoError(t, err) + } + cmdBufPrefix := raftlog.EncodeCommandBytes(info.enc, cmdID, nil, info.pri) + paddingLen := int(info.tokens) - len(cmdBufPrefix) - len(metaBuf) + // Padding also needs to decode as part of the RaftCommand proto, so we + // abuse the WriteBatch.Data field which is a byte slice. Since it is a + // nested field it consumes two tags plus two lengths. We'll approximate + // this as needing a maximum of 15 bytes, to be on the safe side. + require.LessOrEqual(t, 15, paddingLen) + cmd := kvserverpb.RaftCommand{ + WriteBatch: &kvserverpb.WriteBatch{Data: make([]byte, paddingLen)}} + // Shrink by 1 on each iteration. This doesn't give us a guarantee that we + // will get exactly paddingLen since the length of data affects the encoded + // lengths, but it should usually work, and cause fewer questions when + // looking at the testdata file. + for cmd.Size() > paddingLen { + cmd.WriteBatch.Data = cmd.WriteBatch.Data[:len(cmd.WriteBatch.Data)-1] + } + cmdBuf, err := protoutil.Marshal(&cmd) + require.NoError(t, err) + data := append(cmdBufPrefix, metaBuf...) + data = append(data, cmdBuf...) + return raftpb.Entry{ + Term: info.term, + Index: info.index, + Type: raftpb.EntryNormal, + Data: data, + } +} + +// TestRangeController tests the RangeController's various methods. // // - init: Initializes the range controller with the given ranges. // range_id= tenant_id= local_replica_id= @@ -264,245 +344,567 @@ func parsePriority(t *testing.T, input string) admissionpb.WorkPriority { // // - set_leaseholder: Sets the leaseholder for the given range. // range_id= replica_id= -func TestRangeControllerWaitForEval(t *testing.T) { +// +// - close_rcs: Closes all range controllers. +// +// - admit: Admits the given store to the given range. +// range_id= +// store_id= term= to_index= pri= +// ... +// +// - raft_event: Simulates a raft event on the given rangeStateProbe, calling +// HandleRaftEvent. +// range_id= +// term= index= pri= size= +// ... +// +// - stream_state: Prints the state of the stream(s) for the given range's +// replicas. +// range_id= +func TestRangeController(t *testing.T) { + defer leaktest.AfterTest(t)() ctx := context.Background() - settings := cluster.MakeTestingClusterSettings() - ranges := make(map[roachpb.RangeID]*testingRCRange) - ssTokenCounter := NewStreamTokenCounterProvider(settings) - - // Eval will only wait on a positive token amount, set the limit to 1 in - // order to simplify testing. - kvflowcontrol.RegularTokensPerStream.Override(ctx, &settings.SV, 1) - kvflowcontrol.ElasticTokensPerStream.Override(ctx, &settings.SV, 1) - // We will initialize each token counter to 0 tokens initially. The map is - // used to do so exactly once per stream. - zeroedTokenCounters := make(map[kvflowcontrol.Stream]struct{}) - - rangeStateString := func() string { - - var b strings.Builder - - // Sort the ranges by rangeID to ensure deterministic output. - sortedRanges := make([]*testingRCRange, 0, len(ranges)) - for _, testRC := range ranges { - sortedRanges = append(sortedRanges, testRC) - // We retain the lock until the end of the function call. - testRC.mu.Lock() - defer testRC.mu.Unlock() - } - sort.Slice(sortedRanges, func(i, j int) bool { - return sortedRanges[i].mu.r.rangeID < sortedRanges[j].mu.r.rangeID - }) - for _, testRC := range sortedRanges { - replicaIDs := make([]int, 0, len(testRC.mu.r.replicaSet)) - for replicaID := range testRC.mu.r.replicaSet { - replicaIDs = append(replicaIDs, int(replicaID)) + datadriven.Walk(t, datapathutils.TestDataPath(t, "range_controller"), func(t *testing.T, path string) { + settings := cluster.MakeTestingClusterSettings() + ranges := make(map[roachpb.RangeID]*testingRCRange) + ssTokenCounter := NewStreamTokenCounterProvider(settings) + + // setTokenCounters is used to ensure that we only set the initial token + // counts once per counter. + setTokenCounters := make(map[kvflowcontrol.Stream]struct{}) + initialRegularTokens, initialElasticTokens := int64(-1), int64(-1) + + sortRanges := func() []*testingRCRange { + sorted := make([]*testingRCRange, 0, len(ranges)) + for _, testRC := range ranges { + sorted = append(sorted, testRC) } - sort.Ints(replicaIDs) + sort.Slice(sorted, func(i, j int) bool { + return sorted[i].mu.r.rangeID < sorted[j].mu.r.rangeID + }) + return sorted + } + + rangeStateString := func() string { + var b strings.Builder - fmt.Fprintf(&b, "r%d: [", testRC.mu.r.rangeID) - for i, replicaID := range replicaIDs { - replica := testRC.mu.r.replicaSet[roachpb.ReplicaID(replicaID)] - if i > 0 { - fmt.Fprintf(&b, ",") + // Sort the ranges by rangeID to ensure deterministic output. + sortedRanges := sortRanges() + for _, testRC := range sortedRanges { + // We retain the lock until the end of the function call. + testRC.mu.Lock() + defer testRC.mu.Unlock() + + replicaIDs := make([]int, 0, len(testRC.mu.r.replicaSet)) + for replicaID := range testRC.mu.r.replicaSet { + replicaIDs = append(replicaIDs, int(replicaID)) } - fmt.Fprintf(&b, "%v", replica.desc) - if replica.desc.ReplicaID == testRC.rc.leaseholder { - fmt.Fprint(&b, "*") + sort.Ints(replicaIDs) + + fmt.Fprintf(&b, "r%d: [", testRC.mu.r.rangeID) + for i, replicaID := range replicaIDs { + replica := testRC.mu.r.replicaSet[roachpb.ReplicaID(replicaID)] + if i > 0 { + fmt.Fprintf(&b, ",") + } + fmt.Fprintf(&b, "%v", replica.desc) + if replica.desc.ReplicaID == testRC.rc.leaseholder { + fmt.Fprint(&b, "*") + } } + fmt.Fprintf(&b, "]\n") } - fmt.Fprintf(&b, "]\n") + return b.String() } - return b.String() - } - tokenCountsString := func() string { - var b strings.Builder - streams := make([]kvflowcontrol.Stream, 0, len(ssTokenCounter.mu.evalCounters)) - for stream := range ssTokenCounter.mu.evalCounters { - streams = append(streams, stream) - } - sort.Slice(streams, func(i, j int) bool { - return streams[i].StoreID < streams[j].StoreID - }) - for _, stream := range streams { - fmt.Fprintf(&b, "%v: %v\n", stream, ssTokenCounter.Eval(stream)) + tokenCountsString := func() string { + var b strings.Builder + streams := make([]kvflowcontrol.Stream, 0, len(ssTokenCounter.mu.evalCounters)) + for stream := range ssTokenCounter.mu.evalCounters { + streams = append(streams, stream) + } + sort.Slice(streams, func(i, j int) bool { + return streams[i].StoreID < streams[j].StoreID + }) + for _, stream := range streams { + fmt.Fprintf(&b, "%v: %v\n", stream, ssTokenCounter.Eval(stream)) + } + + return b.String() } - return b.String() - } + evalStateString := func() string { + time.Sleep(100 * time.Millisecond) + var b strings.Builder - evalStateString := func() string { - time.Sleep(100 * time.Millisecond) - var b strings.Builder - - // Sort the ranges by rangeID to ensure deterministic output. - sortedRanges := make([]*testingRCRange, 0, len(ranges)) - for _, testRC := range ranges { - sortedRanges = append(sortedRanges, testRC) - // We retain the lock until the end of the function call. - testRC.mu.Lock() - defer testRC.mu.Unlock() - } - sort.Slice(sortedRanges, func(i, j int) bool { - return sortedRanges[i].mu.r.rangeID < sortedRanges[j].mu.r.rangeID - }) + // Sort the ranges by rangeID to ensure deterministic output. + sortedRanges := sortRanges() + for _, testRC := range sortedRanges { + // We retain the lock until the end of the function call. + testRC.mu.Lock() + defer testRC.mu.Unlock() - for _, testRC := range sortedRanges { - fmt.Fprintf(&b, "range_id=%d tenant_id=%d local_replica_id=%d\n", - testRC.mu.r.rangeID, testRC.mu.r.tenantID, testRC.mu.r.localReplicaID) - // Sort the evals by name to ensure deterministic output. - evals := make([]string, 0, len(testRC.mu.evals)) - for name := range testRC.mu.evals { - evals = append(evals, name) - } - sort.Strings(evals) - for _, name := range evals { - eval := testRC.mu.evals[name] - fmt.Fprintf(&b, " name=%s pri=%-8v done=%-5t waited=%-5t err=%v\n", name, eval.pri, - eval.done, eval.waited, eval.err) + fmt.Fprintf(&b, "range_id=%d tenant_id=%d local_replica_id=%d\n", + testRC.mu.r.rangeID, testRC.mu.r.tenantID, testRC.mu.r.localReplicaID) + // Sort the evals by name to ensure deterministic output. + evals := make([]string, 0, len(testRC.mu.evals)) + for name := range testRC.mu.evals { + evals = append(evals, name) + } + sort.Strings(evals) + for _, name := range evals { + eval := testRC.mu.evals[name] + fmt.Fprintf(&b, " name=%s pri=%-8v done=%-5t waited=%-5t err=%v\n", name, eval.pri, + eval.done, eval.waited, eval.err) + } } + return b.String() } - return b.String() - } - maybeZeroTokenCounters := func(r testingRange) { - for _, replica := range r.replicaSet { - stream := kvflowcontrol.Stream{ - StoreID: replica.desc.StoreID, - TenantID: r.tenantID, + sendStreamString := func(rangeID roachpb.RangeID) string { + var b strings.Builder + + testRC := ranges[rangeID] + var replicaIDs []int + for replicaID := range testRC.mu.r.replicaSet { + replicaIDs = append(replicaIDs, int(replicaID)) } - if _, ok := zeroedTokenCounters[stream]; !ok { - zeroedTokenCounters[stream] = struct{}{} - ssTokenCounter.Eval(stream).(*tokenCounter).adjust(ctx, admissionpb.RegularWorkClass, -1) + sort.Ints(replicaIDs) + for _, replicaID := range replicaIDs { + replica := testRC.rc.replicaMap[roachpb.ReplicaID(replicaID)] + fmt.Fprintf(&b, "%v: ", replica.desc) + if replica.sendStream == nil { + fmt.Fprintf(&b, "closed\n") + continue + } + replica.sendStream.mu.Lock() + defer replica.sendStream.mu.Unlock() + + fmt.Fprintf(&b, "state=%v closed=%v\n", + replica.sendStream.mu.connectedState, replica.sendStream.mu.closed) + b.WriteString(formatTrackerState(&replica.sendStream.mu.tracker)) + b.WriteString("++++\n") } + return b.String() } - } - getOrInitRange := func(r testingRange) *testingRCRange { - testRC, ok := ranges[r.rangeID] - if !ok { - testRC = &testingRCRange{} - testRC.mu.r = r - testRC.mu.evals = make(map[string]*testingRCEval) - options := RangeControllerOptions{ - RangeID: r.rangeID, - TenantID: r.tenantID, - LocalReplicaID: r.localReplicaID, - SSTokenCounter: ssTokenCounter, - RaftInterface: testRC, + maybeSetInitialTokens := func(r testingRange) { + for _, replica := range r.replicaSet { + stream := kvflowcontrol.Stream{ + StoreID: replica.desc.StoreID, + TenantID: r.tenantID, + } + if _, ok := setTokenCounters[stream]; !ok { + setTokenCounters[stream] = struct{}{} + if initialRegularTokens != -1 { + ssTokenCounter.Eval(stream).(*tokenCounter).testingSetTokens(ctx, + admissionpb.RegularWorkClass, kvflowcontrol.Tokens(initialRegularTokens)) + ssTokenCounter.Send(stream).(*tokenCounter).testingSetTokens(ctx, + admissionpb.RegularWorkClass, kvflowcontrol.Tokens(initialRegularTokens)) + } + if initialElasticTokens != -1 { + ssTokenCounter.Eval(stream).(*tokenCounter).testingSetTokens(ctx, + admissionpb.ElasticWorkClass, kvflowcontrol.Tokens(initialElasticTokens)) + ssTokenCounter.Send(stream).(*tokenCounter).testingSetTokens(ctx, + admissionpb.ElasticWorkClass, kvflowcontrol.Tokens(initialElasticTokens)) + } + } } + } + + getOrInitRange := func(r testingRange) *testingRCRange { + testRC, ok := ranges[r.rangeID] + if !ok { + testRC = &testingRCRange{} + testRC.mu.r = r + testRC.mu.evals = make(map[string]*testingRCEval) + options := RangeControllerOptions{ + RangeID: r.rangeID, + TenantID: r.tenantID, + LocalReplicaID: r.localReplicaID, + SSTokenCounter: ssTokenCounter, + RaftInterface: testRC, + } - init := RangeControllerInitState{ - ReplicaSet: r.replicas(), - Leaseholder: r.localReplicaID, + init := RangeControllerInitState{ + ReplicaSet: r.replicas(), + Leaseholder: r.localReplicaID, + } + testRC.rc = NewRangeController(ctx, options, init) + ranges[r.rangeID] = testRC } - testRC.rc = NewRangeController(ctx, options, init) - ranges[r.rangeID] = testRC + maybeSetInitialTokens(r) + return testRC } - maybeZeroTokenCounters(r) - return testRC - } - datadriven.RunTest(t, "testdata/range_controller_wait_for_eval", func(t *testing.T, d *datadriven.TestData) string { - switch d.Cmd { - case "init": - for _, r := range scanRanges(t, d.Input) { - getOrInitRange(r) - } - return rangeStateString() + tokenCountsString() - - case "wait_for_eval": - var rangeID int - var name, priString string - d.ScanArgs(t, "range_id", &rangeID) - d.ScanArgs(t, "name", &name) - d.ScanArgs(t, "pri", &priString) - testRC := ranges[roachpb.RangeID(rangeID)] - testRC.startWaitForEval(name, parsePriority(t, priString)) - return evalStateString() - - case "check_state": - return evalStateString() - - case "adjust_tokens": - for _, line := range strings.Split(d.Input, "\n") { - parts := strings.Fields(line) - parts[0] = strings.TrimSpace(parts[0]) - require.True(t, strings.HasPrefix(parts[0], "store_id=")) - parts[0] = strings.TrimPrefix(parts[0], "store_id=") - store, err := strconv.Atoi(parts[0]) - require.NoError(t, err) - - parts[1] = strings.TrimSpace(parts[1]) - require.True(t, strings.HasPrefix(parts[1], "pri=")) - pri := parsePriority(t, strings.TrimPrefix(parts[1], "pri=")) - - parts[2] = strings.TrimSpace(parts[2]) - require.True(t, strings.HasPrefix(parts[2], "tokens=")) - tokenString := strings.TrimPrefix(parts[2], "tokens=") - tokens, err := humanizeutil.ParseBytes(tokenString) - require.NoError(t, err) - - ssTokenCounter.Eval(kvflowcontrol.Stream{ - StoreID: roachpb.StoreID(store), - TenantID: roachpb.SystemTenantID, - }).(*tokenCounter).adjust(ctx, - admissionpb.WorkClassFromPri(pri), - kvflowcontrol.Tokens(tokens)) - } + datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { + switch d.Cmd { + case "init": + var regularInitString, elasticInitString string + var regularLimitString, elasticLimitString string + d.MaybeScanArgs(t, "regular_init", ®ularInitString) + d.MaybeScanArgs(t, "elastic_init", &elasticInitString) + d.MaybeScanArgs(t, "regular_limit", ®ularLimitString) + d.MaybeScanArgs(t, "elastic_limit", &elasticLimitString) + // If the test specifies different token limits or initial token counts + // (default is the limit), then we override the default limit and also + // store the initial token count. tokenCounters are created + // dynamically, so we update them on the fly as well. + if regularLimitString != "" { + regularLimit, err := humanizeutil.ParseBytes(regularLimitString) + require.NoError(t, err) + kvflowcontrol.RegularTokensPerStream.Override(ctx, &settings.SV, regularLimit) + } + if elasticLimitString != "" { + elasticLimit, err := humanizeutil.ParseBytes(elasticLimitString) + require.NoError(t, err) + kvflowcontrol.ElasticTokensPerStream.Override(ctx, &settings.SV, elasticLimit) + } + if regularInitString != "" { + regularInit, err := humanizeutil.ParseBytes(regularInitString) + require.NoError(t, err) + initialRegularTokens = regularInit + } - return tokenCountsString() + if elasticInitString != "" { + elasticInit, err := humanizeutil.ParseBytes(elasticInitString) + require.NoError(t, err) + initialElasticTokens = elasticInit + } - case "cancel_context": - var rangeID int - var name string + for _, r := range scanRanges(t, d.Input) { + getOrInitRange(r) + } + return rangeStateString() + tokenCountsString() + + case "wait_for_eval": + var rangeID int + var name, priString string + d.ScanArgs(t, "range_id", &rangeID) + d.ScanArgs(t, "name", &name) + d.ScanArgs(t, "pri", &priString) + testRC := ranges[roachpb.RangeID(rangeID)] + testRC.startWaitForEval(name, parsePriority(t, priString)) + return evalStateString() + + case "check_state": + return evalStateString() + + case "adjust_tokens": + for _, line := range strings.Split(d.Input, "\n") { + parts := strings.Fields(line) + parts[0] = strings.TrimSpace(parts[0]) + require.True(t, strings.HasPrefix(parts[0], "store_id=")) + parts[0] = strings.TrimPrefix(parts[0], "store_id=") + store, err := strconv.Atoi(parts[0]) + require.NoError(t, err) + + parts[1] = strings.TrimSpace(parts[1]) + require.True(t, strings.HasPrefix(parts[1], "pri=")) + pri := parsePriority(t, strings.TrimPrefix(parts[1], "pri=")) + + parts[2] = strings.TrimSpace(parts[2]) + require.True(t, strings.HasPrefix(parts[2], "tokens=")) + tokenString := strings.TrimPrefix(parts[2], "tokens=") + tokens, err := humanizeutil.ParseBytes(tokenString) + require.NoError(t, err) + + ssTokenCounter.Eval(kvflowcontrol.Stream{ + StoreID: roachpb.StoreID(store), + TenantID: roachpb.SystemTenantID, + }).(*tokenCounter).adjust(ctx, + admissionpb.WorkClassFromPri(pri), + kvflowcontrol.Tokens(tokens)) + } - d.ScanArgs(t, "range_id", &rangeID) - d.ScanArgs(t, "name", &name) - testRC := ranges[roachpb.RangeID(rangeID)] - func() { - testRC.mu.Lock() - defer testRC.mu.Unlock() - testRC.mu.evals[name].cancel() - }() + return tokenCountsString() - return evalStateString() + case "cancel_context": + var rangeID int + var name string - case "set_replicas": - for _, r := range scanRanges(t, d.Input) { - testRC := getOrInitRange(r) + d.ScanArgs(t, "range_id", &rangeID) + d.ScanArgs(t, "name", &name) + testRC := ranges[roachpb.RangeID(rangeID)] func() { testRC.mu.Lock() defer testRC.mu.Unlock() - testRC.mu.r = r + testRC.mu.evals[name].cancel() }() - err := testRC.rc.SetReplicasRaftMuLocked(ctx, r.replicas()) - require.NoError(t, err) - } - return rangeStateString() - - case "set_leaseholder": - var rangeID, replicaID int - d.ScanArgs(t, "range_id", &rangeID) - d.ScanArgs(t, "replica_id", &replicaID) - testRC := ranges[roachpb.RangeID(rangeID)] - testRC.rc.SetLeaseholderRaftMuLocked(ctx, roachpb.ReplicaID(replicaID)) - return rangeStateString() - - case "close_rcs": - for _, r := range ranges { - r.rc.CloseRaftMuLocked(ctx) - } - evalStr := evalStateString() - for k := range ranges { - delete(ranges, k) - } - return evalStr - default: - panic(fmt.Sprintf("unknown command: %s", d.Cmd)) - } + return evalStateString() + + case "set_replicas": + for _, r := range scanRanges(t, d.Input) { + testRC := getOrInitRange(r) + func() { + testRC.mu.Lock() + defer testRC.mu.Unlock() + testRC.mu.r = r + }() + require.NoError(t, testRC.rc.SetReplicasRaftMuLocked(ctx, r.replicas())) + // Send an empty raft event in order to trigger any potential + // connectedState changes. + require.NoError(t, testRC.rc.HandleRaftEventRaftMuLocked(ctx, RaftEvent{})) + } + return rangeStateString() + + case "set_leaseholder": + var rangeID, replicaID int + d.ScanArgs(t, "range_id", &rangeID) + d.ScanArgs(t, "replica_id", &replicaID) + testRC := ranges[roachpb.RangeID(rangeID)] + testRC.rc.SetLeaseholderRaftMuLocked(ctx, roachpb.ReplicaID(replicaID)) + return rangeStateString() + + case "close_rcs": + for _, r := range ranges { + r.rc.CloseRaftMuLocked(ctx) + } + evalStr := evalStateString() + for k := range ranges { + delete(ranges, k) + } + return evalStr + + case "admit": + var lastRangeID roachpb.RangeID + for _, line := range strings.Split(d.Input, "\n") { + var ( + rangeID int + storeID int + term int + to_index int + err error + ) + parts := strings.Fields(line) + parts[0] = strings.TrimSpace(parts[0]) + + if strings.HasPrefix(parts[0], "range_id=") { + parts[0] = strings.TrimPrefix(strings.TrimSpace(parts[0]), "range_id=") + rangeID, err = strconv.Atoi(parts[0]) + require.NoError(t, err) + lastRangeID = roachpb.RangeID(rangeID) + } else { + parts[0] = strings.TrimSpace(parts[0]) + require.True(t, strings.HasPrefix(parts[0], "store_id=")) + parts[0] = strings.TrimPrefix(strings.TrimSpace(parts[0]), "store_id=") + storeID, err = strconv.Atoi(parts[0]) + require.NoError(t, err) + + parts[1] = strings.TrimSpace(parts[1]) + require.True(t, strings.HasPrefix(parts[1], "term=")) + parts[1] = strings.TrimPrefix(strings.TrimSpace(parts[1]), "term=") + term, err = strconv.Atoi(parts[1]) + require.NoError(t, err) + + parts[2] = strings.TrimSpace(parts[2]) + require.True(t, strings.HasPrefix(parts[2], "to_index=")) + parts[2] = strings.TrimPrefix(strings.TrimSpace(parts[2]), "to_index=") + to_index, err = strconv.Atoi(parts[2]) + require.NoError(t, err) + + parts[3] = strings.TrimSpace(parts[3]) + require.True(t, strings.HasPrefix(parts[3], "pri=")) + parts[3] = strings.TrimPrefix(strings.TrimSpace(parts[3]), "pri=") + pri := parsePriority(t, parts[3]) + ranges[lastRangeID].admit(ctx, t, roachpb.StoreID(storeID), uint64(term), uint64(to_index), pri) + } + } + return tokenCountsString() + + case "raft_event": + var lastRangeID roachpb.RangeID + init := false + var buf []entryInfo + + propRangeEntries := func() { + event := RaftEvent{ + Entries: make([]raftpb.Entry, len(buf)), + } + for i, state := range buf { + event.Entries[i] = testingCreateEntry(t, state) + } + err := ranges[lastRangeID].rc.HandleRaftEventRaftMuLocked(ctx, event) + require.NoError(t, err) + } + + for _, line := range strings.Split(d.Input, "\n") { + var ( + rangeID, term, index int + size int64 + err error + pri admissionpb.WorkPriority + ) + + parts := strings.Fields(line) + parts[0] = strings.TrimSpace(parts[0]) + if strings.HasPrefix(parts[0], "range_id=") { + if init { + // We are moving to another range, if a previous range has entries + // created then create the raft event and call handle raft ready + // using all the entries added so far. + propRangeEntries() + init = false + } + + parts[0] = strings.TrimPrefix(strings.TrimSpace(parts[0]), "range_id=") + rangeID, err = strconv.Atoi(parts[0]) + require.NoError(t, err) + lastRangeID = roachpb.RangeID(rangeID) + } else { + require.True(t, strings.HasPrefix(parts[0], "term=")) + parts[0] = strings.TrimPrefix(strings.TrimSpace(parts[0]), "term=") + term, err = strconv.Atoi(parts[0]) + require.NoError(t, err) + + parts[1] = strings.TrimSpace(parts[1]) + require.True(t, strings.HasPrefix(parts[1], "index=")) + parts[1] = strings.TrimPrefix(strings.TrimSpace(parts[1]), "index=") + index, err = strconv.Atoi(parts[1]) + require.NoError(t, err) + + parts[2] = strings.TrimSpace(parts[2]) + require.True(t, strings.HasPrefix(parts[2], "pri=")) + parts[2] = strings.TrimPrefix(strings.TrimSpace(parts[2]), "pri=") + pri = parsePriority(t, parts[2]) + + parts[3] = strings.TrimSpace(parts[3]) + require.True(t, strings.HasPrefix(parts[3], "size=")) + parts[3] = strings.TrimPrefix(strings.TrimSpace(parts[3]), "size=") + size, err = humanizeutil.ParseBytes(parts[3]) + require.NoError(t, err) + + init = true + buf = append(buf, entryInfo{ + term: uint64(term), + index: uint64(index), + enc: raftlog.EntryEncodingStandardWithACAndPriority, + tokens: kvflowcontrol.Tokens(size), + pri: AdmissionToRaftPriority(pri), + }) + } + } + if init { + propRangeEntries() + } + return tokenCountsString() + + case "stream_state": + var rangeID int + d.ScanArgs(t, "range_id", &rangeID) + return sendStreamString(roachpb.RangeID(rangeID)) + + default: + panic(fmt.Sprintf("unknown command: %s", d.Cmd)) + } + }) }) } + +func TestGetEntryFCState(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + + for _, tc := range []struct { + name string + entryInfo entryInfo + expectedFCState entryFCState + }{ + { + // V1 encoded entries with AC should end up with LowPri and otherwise + // matching entry information. + name: "v1_entry_with_ac", + entryInfo: entryInfo{ + term: 1, + index: 1, + enc: raftlog.EntryEncodingStandardWithAC, + pri: raftpb.NormalPri, + tokens: 100, + }, + expectedFCState: entryFCState{ + term: 1, + index: 1, + pri: raftpb.LowPri, + usesFlowControl: true, + tokens: 100, + }, + }, + { + // Likewise for V1 sideloaded entries with AC enabled. + name: "v1_entry_with_ac_sideloaded", + entryInfo: entryInfo{ + term: 2, + index: 2, + enc: raftlog.EntryEncodingSideloadedWithAC, + pri: raftpb.HighPri, + tokens: 200, + }, + expectedFCState: entryFCState{ + term: 2, + index: 2, + pri: raftpb.LowPri, + usesFlowControl: true, + tokens: 200, + }, + }, + { + name: "entry_without_ac", + entryInfo: entryInfo{ + term: 3, + index: 3, + enc: raftlog.EntryEncodingStandardWithoutAC, + tokens: 300, + }, + expectedFCState: entryFCState{ + term: 3, + index: 3, + usesFlowControl: false, + tokens: 300, + }, + }, + { + // V2 encoded entries with AC should end up with their original priority. + name: "v2_entry_with_ac", + entryInfo: entryInfo{ + term: 4, + index: 4, + enc: raftlog.EntryEncodingStandardWithACAndPriority, + pri: raftpb.NormalPri, + tokens: 400, + }, + expectedFCState: entryFCState{ + term: 4, + index: 4, + pri: raftpb.NormalPri, + usesFlowControl: true, + tokens: 400, + }, + }, + { + // Likewise for V2 sideloaded entries with AC enabled. + name: "v2_entry_with_ac", + entryInfo: entryInfo{ + term: 5, + index: 5, + enc: raftlog.EntryEncodingSideloadedWithACAndPriority, + pri: raftpb.AboveNormalPri, + tokens: 500, + }, + expectedFCState: entryFCState{ + term: 5, + index: 5, + pri: raftpb.AboveNormalPri, + usesFlowControl: true, + tokens: 500, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + entry := testingCreateEntry(t, tc.entryInfo) + fcState := getEntryFCStateOrFatal(ctx, entry) + require.Equal(t, tc.expectedFCState, fcState) + }) + } +} diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/handle_raft_event b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/handle_raft_event new file mode 100644 index 000000000000..68151ae00dcc --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/handle_raft_event @@ -0,0 +1,160 @@ +# Intialize a range with voters on s1,s2 and s3. The local replica and +# leaseholder will be s1. The leaseholder is denoted by the '*' suffix. +init +range_id=1 tenant_id=1 local_replica_id=1 + store_id=1 replica_id=1 type=VOTER_FULL state=StateReplicate + store_id=2 replica_id=2 type=VOTER_FULL state=StateReplicate + store_id=3 replica_id=3 type=VOTER_FULL state=StateReplicate +---- +r1: [(n1,s1):1*,(n2,s2):2,(n3,s3):3] +t1/s1: reg=+16 MiB/+16 MiB ela=+8.0 MiB/+8.0 MiB +t1/s2: reg=+16 MiB/+16 MiB ela=+8.0 MiB/+8.0 MiB +t1/s3: reg=+16 MiB/+16 MiB ela=+8.0 MiB/+8.0 MiB + +# There should be no tracked entries for the range. +stream_state range_id=1 +---- +(n1,s1):1: state=replicate closed=false +++++ +(n2,s2):2: state=replicate closed=false +++++ +(n3,s3):3: state=replicate closed=false +++++ + +# Simulate a call to `HandleRaftEventRaftMuLocked` on s1 (leader/local +# replica). The event will have three entries, each 1MiB in size. Following, we +# see there are 3MiB of tokens deducted from each replica stream (both elastic +# and regular, as regular entries deduct from the elastic stream as well). +raft_event +range_id=1 + term=1 index=1 pri=NormalPri size=1MiB + term=1 index=2 pri=NormalPri size=1MiB + term=1 index=3 pri=NormalPri size=1MiB +---- +t1/s1: reg=+13 MiB/+16 MiB ela=+5.0 MiB/+8.0 MiB +t1/s2: reg=+13 MiB/+16 MiB ela=+5.0 MiB/+8.0 MiB +t1/s3: reg=+13 MiB/+16 MiB ela=+5.0 MiB/+8.0 MiB + +# The tracker should be tracking the three entries at indices 1..3, for each +# replica stream (1,2,3). +stream_state range_id=1 +---- +(n1,s1):1: state=replicate closed=false +NormalPri: + term=1 index=1 tokens=1048576 + term=1 index=2 tokens=1048576 + term=1 index=3 tokens=1048576 +++++ +(n2,s2):2: state=replicate closed=false +NormalPri: + term=1 index=1 tokens=1048576 + term=1 index=2 tokens=1048576 + term=1 index=3 tokens=1048576 +++++ +(n3,s3):3: state=replicate closed=false +NormalPri: + term=1 index=1 tokens=1048576 + term=1 index=2 tokens=1048576 + term=1 index=3 tokens=1048576 +++++ + +# Simulate the admitted index advancing to 3 for the same leader term (1) on a +# quorum of replicas. This should result in all of the tracked tokens (3MiB) +# being returned for s1,s2 and their trackers emptied. +admit +range_id=1 + store_id=1 term=1 to_index=3 pri=NormalPri + store_id=2 term=1 to_index=3 pri=NormalPri +---- +t1/s1: reg=+16 MiB/+16 MiB ela=+8.0 MiB/+8.0 MiB +t1/s2: reg=+16 MiB/+16 MiB ela=+8.0 MiB/+8.0 MiB +t1/s3: reg=+13 MiB/+16 MiB ela=+5.0 MiB/+8.0 MiB + +stream_state range_id=1 +---- +(n1,s1):1: state=replicate closed=false +++++ +(n2,s2):2: state=replicate closed=false +++++ +(n3,s3):3: state=replicate closed=false +NormalPri: + term=1 index=1 tokens=1048576 + term=1 index=2 tokens=1048576 + term=1 index=3 tokens=1048576 +++++ + +# Change the tracker state of s3 to be StateProbe, this should trigger token +# return for s3 and untracking all entries and closing the stream. +set_replicas +range_id=1 tenant_id=1 local_replica_id=1 + store_id=1 replica_id=1 type=VOTER_FULL state=StateReplicate + store_id=2 replica_id=2 type=VOTER_FULL state=StateReplicate + store_id=3 replica_id=3 type=VOTER_FULL state=StateProbe +---- +r1: [(n1,s1):1*,(n2,s2):2,(n3,s3):3] + +stream_state range_id=1 +---- +(n1,s1):1: state=replicate closed=false +++++ +(n2,s2):2: state=replicate closed=false +++++ +(n3,s3):3: closed + +# Next, start a WaitForEval operation. We will update the state of s3 to be +# Replicate, which should trigger the WaitForEval to refresh. First, deduct all +# the tokens from s2 so that the operation is forced to wait. +raft_event +range_id=1 + term=1 index=4 pri=NormalPri size=16MiB +---- +t1/s1: reg=+0 B/+16 MiB ela=-8.0 MiB/+8.0 MiB +t1/s2: reg=+0 B/+16 MiB ela=-8.0 MiB/+8.0 MiB +t1/s3: reg=+16 MiB/+16 MiB ela=+8.0 MiB/+8.0 MiB + +wait_for_eval name=a range_id=1 pri=LowPri +---- +range_id=1 tenant_id={1} local_replica_id=1 + name=a pri=low-pri done=false waited=false err= + +admit +range_id=1 + store_id=1 term=1 to_index=4 pri=NormalPri +---- +t1/s1: reg=+16 MiB/+16 MiB ela=+8.0 MiB/+8.0 MiB +t1/s2: reg=+0 B/+16 MiB ela=-8.0 MiB/+8.0 MiB +t1/s3: reg=+16 MiB/+16 MiB ela=+8.0 MiB/+8.0 MiB + +# The operation should still be waiting, as it requires all replicas which are +# in state replicate to have tokens available, s1 does but s2 doesn't. +check_state +---- +range_id=1 tenant_id={1} local_replica_id=1 + name=a pri=low-pri done=false waited=false err= + +# Change the state of s3 to replicate and s2 to StateSnapshot, this should +# trigger the operation to refresh, ignore s2 now that it is in StateProbe and +# check s3 for available tokens as it is now in StateReplicate. +set_replicas +range_id=1 tenant_id=1 local_replica_id=1 + store_id=1 replica_id=1 type=VOTER_FULL state=StateReplicate + store_id=2 replica_id=2 type=VOTER_FULL state=StateSnapshot + store_id=3 replica_id=3 type=VOTER_FULL state=StateReplicate +---- +r1: [(n1,s1):1*,(n2,s2):2,(n3,s3):3] + +raft_event +range_id=1 + term=1 index=5 pri=NormalPri size=1MiB + term=1 index=6 pri=NormalPri size=1MiB + term=1 index=7 pri=NormalPri size=1MiB +---- +t1/s1: reg=+13 MiB/+16 MiB ela=+5.0 MiB/+8.0 MiB +t1/s2: reg=+13 MiB/+16 MiB ela=+5.0 MiB/+8.0 MiB +t1/s3: reg=+13 MiB/+16 MiB ela=+5.0 MiB/+8.0 MiB + +# The operation should now be done and have waited for s1 and s3. +check_state +---- +range_id=1 tenant_id={1} local_replica_id=1 + name=a pri=low-pri done=true waited=true err= diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller_wait_for_eval b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/wait_for_eval similarity index 98% rename from pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller_wait_for_eval rename to pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/wait_for_eval index c534c45ee05d..a5d9b2ea6e4f 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller_wait_for_eval +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/wait_for_eval @@ -1,6 +1,8 @@ # Intialize a range with voters on s1,s2 and s3. The local replica and -# leaseholder will be s1. The leaseholder is denoted by the '*' suffix. -init +# leaseholder will be s1. The leaseholder is denoted by the '*' suffix. Also +# set all streams to initially have 0 tokens and a limit of 1 token to simplify +# the test, as evaluation requests only wait for positive tokens. +init regular_limit=1 regular_init=0 elastic_limit=1 elastic_init=0 range_id=1 tenant_id=1 local_replica_id=1 store_id=1 replica_id=1 type=VOTER_FULL state=StateReplicate store_id=2 replica_id=2 type=VOTER_FULL state=StateReplicate diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go b/pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go index 4e0fbb141575..3a83f6dcd20d 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go @@ -460,3 +460,13 @@ func (t *tokenCounter) adjustLocked( t.mu.counters[admissionpb.ElasticWorkClass].adjustTokensLocked(ctx, delta) } } + +// testingSetTokens is used in tests to set the tokens for a given work class, +// ignoring any adjustments. +func (t *tokenCounter) testingSetTokens( + ctx context.Context, wc admissionpb.WorkClass, tokens kvflowcontrol.Tokens, +) { + t.mu.Lock() + defer t.mu.Unlock() + t.mu.counters[wc].adjustTokensLocked(ctx, tokens-t.mu.counters[wc].tokens) +}