From 5592437ab46324ac7a7f98d2104f94ad19d66e5c Mon Sep 17 00:00:00 2001 From: Mira Radeva Date: Tue, 16 Jul 2024 13:09:42 -0400 Subject: [PATCH] storeliveness: supporter and requester models and transitions This commit introduces the Store Liveness algorithm implementation, including the data structures to store the `supportFor` and `supportFrom` state, and the functions to handle Store Liveness messages that update these data structures. Each of the support data structures can be safely updated in batch, which includes writing the changes to disk (not implemented yet) first, and then reflecting the changes in the in-memory data structures. During these batch updates read access to the data structures is not blocked (by holding mutexes or by synchronously waiting for a disk write), enabling quick responses to `SupportFor` and `SupportFrom` calls from the client. Fixes: #125060 Release note: None --- pkg/kv/kvserver/storeliveness/BUILD.bazel | 10 +- .../kvserver/storeliveness/requester_state.go | 308 ++++++++++++++++++ .../storeliveness/store_liveness_test.go | 212 ++++++++++++ .../kvserver/storeliveness/supporter_state.go | 240 ++++++++++++++ pkg/kv/kvserver/storeliveness/testdata/basic | 54 +++ .../storeliveness/testdata/liveness_interval | 63 ++++ .../storeliveness/testdata/multi-store | 77 +++++ .../storeliveness/testdata/requester_state | 142 ++++++++ .../kvserver/storeliveness/testdata/restart | 81 +++++ .../storeliveness/testdata/supporter_state | 107 ++++++ 10 files changed, 1293 insertions(+), 1 deletion(-) create mode 100644 pkg/kv/kvserver/storeliveness/requester_state.go create mode 100644 pkg/kv/kvserver/storeliveness/store_liveness_test.go create mode 100644 pkg/kv/kvserver/storeliveness/supporter_state.go create mode 100644 pkg/kv/kvserver/storeliveness/testdata/basic create mode 100644 pkg/kv/kvserver/storeliveness/testdata/liveness_interval create mode 100644 pkg/kv/kvserver/storeliveness/testdata/multi-store create mode 100644 pkg/kv/kvserver/storeliveness/testdata/requester_state create mode 100644 pkg/kv/kvserver/storeliveness/testdata/restart create mode 100644 pkg/kv/kvserver/storeliveness/testdata/supporter_state diff --git a/pkg/kv/kvserver/storeliveness/BUILD.bazel b/pkg/kv/kvserver/storeliveness/BUILD.bazel index 40d8e08e174f..3d6786ffb1b0 100644 --- a/pkg/kv/kvserver/storeliveness/BUILD.bazel +++ b/pkg/kv/kvserver/storeliveness/BUILD.bazel @@ -4,6 +4,8 @@ go_library( name = "storeliveness", srcs = [ "fabric.go", + "requester_state.go", + "supporter_state.go", "transport.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/storeliveness", @@ -25,7 +27,11 @@ go_library( go_test( name = "storeliveness_test", - srcs = ["transport_test.go"], + srcs = [ + "store_liveness_test.go", + "transport_test.go", + ], + data = glob(["testdata/**"]), embed = [":storeliveness"], deps = [ "//pkg/gossip", @@ -35,6 +41,7 @@ go_test( "//pkg/rpc/nodedialer", "//pkg/settings/cluster", "//pkg/testutils", + "//pkg/testutils/datapathutils", "//pkg/util", "//pkg/util/hlc", "//pkg/util/leaktest", @@ -42,6 +49,7 @@ go_test( "//pkg/util/metric", "//pkg/util/netutil", "//pkg/util/stop", + "@com_github_cockroachdb_datadriven//:datadriven", "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//require", ], diff --git a/pkg/kv/kvserver/storeliveness/requester_state.go b/pkg/kv/kvserver/storeliveness/requester_state.go new file mode 100644 index 000000000000..357f752a1ea2 --- /dev/null +++ b/pkg/kv/kvserver/storeliveness/requester_state.go @@ -0,0 +1,308 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package storeliveness + +import ( + "sync/atomic" + "time" + + slpb "github.com/cockroachdb/cockroach/pkg/kv/kvserver/storeliveness/storelivenesspb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +// requesterState stores the core data structures for requesting support. +type requesterState struct { + // meta stores the RequesterMeta, including the max timestamp and max epoch at + // which this store has requested support. + meta slpb.RequesterMeta + // supportFrom stores the SupportState for each remote store from which this + // store has received support. + supportFrom map[slpb.StoreIdent]slpb.SupportState +} + +// requesterStateHandler is the main interface for handling support from other +// stores. The typical interactions with requesterStateHandler are: +// - getSupportFrom(id slpb.StoreIdent) +// - addStore(id slpb.StoreIdent) +// - removeStore(id slpb.StoreIdent) +// - rsfu := checkOutUpdate() +// rsfu.getHeartbeatsToSend(now hlc.Timestamp, interval time.Duration) +// checkInUpdate(rsfu) +// - rsfu := checkOutUpdate() +// rsfu.handleHeartbeatResponse(msg slpb.Message) +// checkInUpdate(rsfu) +// +// Only one update can be in progress to ensure that multiple mutation methods +// are not run concurrently. Adding or removing a store while an update is in +// progress is not allowed. +type requesterStateHandler struct { + // requesterState is the source of truth for requested support. + requesterState requesterState + // mu controls access to requesterState. The access pattern to requesterState + // is single writer, multi reader. Concurrent reads come from API calls to + // SupportFrom; these require RLocking mu. Updates to requesterState are done + // from a single goroutine; this requires Locking mu when writing the updates. + // These updates also read from requesterState but there is no need to RLock + // mu during these reads (since there are no concurrent writes). + mu syncutil.RWMutex + // update is a reference to an in-progress change in requesterStateForUpdate. + // A non-nil update implies there is no ongoing update; i.e. the referenced + // requesterStateForUpdate is available to be checked out. + update atomic.Pointer[requesterStateForUpdate] +} + +func newRequesterStateHandler() *requesterStateHandler { + rsh := &requesterStateHandler{ + requesterState: requesterState{ + meta: slpb.RequesterMeta{MaxEpoch: 1}, + supportFrom: make(map[slpb.StoreIdent]slpb.SupportState), + }, + } + rsh.update.Store( + &requesterStateForUpdate{ + checkedIn: &rsh.requesterState, + inProgress: requesterState{ + meta: slpb.RequesterMeta{}, + supportFrom: make(map[slpb.StoreIdent]slpb.SupportState), + }, + }, + ) + return rsh +} + +// requesterStateForUpdate is a helper struct that facilitates updates to +// requesterState. It is necessary only for batch updates where the individual +// updates need to see each other's changes, while concurrent calls to +// SupportFrom see the persisted-to-disk view until the in-progress batch is +// successfully persisted. +type requesterStateForUpdate struct { + // checkedIn is a reference to the original requesterState struct stored in + // requesterStateHandler. It is used to respond to calls to SupportFrom (while + // an update is in progress) to provide a response consistent with the state + // persisted on disk. + checkedIn *requesterState + // inProgress holds all the updates to requesterState that are in progress and + // have not yet been reflected in the checkedIn view. The inProgress view + // ensures that ongoing updates from the same batch see each other's changes. + inProgress requesterState +} + +// getSupportFrom returns the SupportState corresponding to the given store in +// requesterState.supportFrom. The returned boolean indicates whether the given +// store is present in the supportFrom map; it does NOT indicate whether support +// from that store is provided. +func (rsh *requesterStateHandler) getSupportFrom(id slpb.StoreIdent) (slpb.SupportState, bool) { + rsh.mu.RLock() + defer rsh.mu.RUnlock() + ss, ok := rsh.requesterState.supportFrom[id] + return ss, ok +} + +// addStore adds a store to the requesterState.supportFrom map, if not present. +func (rsh *requesterStateHandler) addStore(id slpb.StoreIdent) { + // Adding a store doesn't require persisting anything to disk, so it doesn't + // need to go through the full checkOut/checkIn process. However, we still + // check out the update to ensure that there are no concurrent updates. + defer rsh.checkInUpdate(rsh.checkOutUpdate()) + rsh.mu.Lock() + defer rsh.mu.Unlock() + if _, ok := rsh.requesterState.supportFrom[id]; !ok { + ss := slpb.SupportState{Target: id, Epoch: rsh.requesterState.meta.MaxEpoch} + rsh.requesterState.supportFrom[id] = ss + } +} + +// removeStore removes a store from the requesterState.supportFrom map. +func (rsh *requesterStateHandler) removeStore(id slpb.StoreIdent) { + // Removing a store doesn't require persisting anything to disk, so it doesn't + // need to go through the full checkOut/checkIn process. However, we still + // check out the update to ensure that there are no concurrent updates. + defer rsh.checkInUpdate(rsh.checkOutUpdate()) + rsh.mu.Lock() + defer rsh.mu.Unlock() + delete(rsh.requesterState.supportFrom, id) +} + +// Functions for handling requesterState updates. + +// getMeta returns the RequesterMeta from the inProgress view; if not present, +// it falls back to the RequesterMeta from the checkedIn view. +func (rsfu *requesterStateForUpdate) getMeta() slpb.RequesterMeta { + if rsfu.inProgress.meta != (slpb.RequesterMeta{}) { + return rsfu.inProgress.meta + } + return rsfu.checkedIn.meta +} + +// getSupportFrom returns the SupportState from the inProgress view; if not +// present, it falls back to the SupportState from the checkedIn view. The +// returned boolean indicates whether the store is present in the supportFrom +// map; it does NOT indicate whether support from that store is provided. +func (rsfu *requesterStateForUpdate) getSupportFrom( + storeID slpb.StoreIdent, +) (slpb.SupportState, bool) { + ss, ok := rsfu.inProgress.supportFrom[storeID] + if !ok { + ss, ok = rsfu.checkedIn.supportFrom[storeID] + } + return ss, ok +} + +// reset clears the inProgress view of requesterStateForUpdate. +func (rsfu *requesterStateForUpdate) reset() { + rsfu.inProgress.meta = slpb.RequesterMeta{} + clear(rsfu.inProgress.supportFrom) +} + +// checkOutUpdate returns the requesterStateForUpdate referenced in +// requesterStateHandler.update and replaces it with a nil pointer to ensure it +// cannot be checked out concurrently as part of another mutation. +func (rsh *requesterStateHandler) checkOutUpdate() *requesterStateForUpdate { + rsfu := rsh.update.Swap(nil) + if rsfu == nil { + panic("unsupported concurrent update") + } + return rsfu +} + +// checkInUpdate updates the checkedIn view of requesterStateForUpdate with any +// updates from the inProgress view. It clears the inProgress view, and swaps it +// back in requesterStateHandler.update to be checked out by future updates. +func (rsh *requesterStateHandler) checkInUpdate(rsfu *requesterStateForUpdate) { + defer func() { + rsfu.reset() + rsh.update.Swap(rsfu) + }() + if rsfu.inProgress.meta == (slpb.RequesterMeta{}) && len(rsfu.inProgress.supportFrom) == 0 { + return + } + rsh.mu.Lock() + defer rsh.mu.Unlock() + if rsfu.inProgress.meta != (slpb.RequesterMeta{}) { + if !rsfu.inProgress.meta.MaxRequested.IsEmpty() { + rsfu.checkedIn.meta.MaxRequested = rsfu.inProgress.meta.MaxRequested + } + if rsfu.inProgress.meta.MaxEpoch != 0 { + rsfu.checkedIn.meta.MaxEpoch = rsfu.inProgress.meta.MaxEpoch + } + } + for storeID, ss := range rsfu.inProgress.supportFrom { + rsfu.checkedIn.supportFrom[storeID] = ss + } +} + +// Functions for generating heartbeats. + +// getHeartbeatsToSend updates MaxRequested and generates heartbeats. These +// heartbeats must not be sent before the MaxRequested update is persisted to +// disk. +func (rsfu *requesterStateForUpdate) getHeartbeatsToSend( + from slpb.StoreIdent, now hlc.Timestamp, interval time.Duration, +) []slpb.Message { + rsfu.updateMaxRequested(now, interval) + return rsfu.generateHeartbeats(from) +} + +// updateMaxRequested forwards the current MaxRequested timestamp to now + +// interval, where now is the node's clock timestamp and interval is the +// liveness interval. +func (rsfu *requesterStateForUpdate) updateMaxRequested(now hlc.Timestamp, interval time.Duration) { + newMaxRequested := now.Add(interval.Nanoseconds(), 0) + if rsfu.getMeta().MaxRequested.Less(newMaxRequested) { + rsfu.inProgress.meta.MaxRequested.Forward(newMaxRequested) + } +} + +func (rsfu *requesterStateForUpdate) generateHeartbeats(from slpb.StoreIdent) []slpb.Message { + heartbeats := make([]slpb.Message, 0, len(rsfu.checkedIn.supportFrom)) + // It's ok to read store IDs directly from rsfu.checkedIn.supportFrom since + // adding and removing stores is not allowed while there's an update in + // progress. + maxRequested := rsfu.getMeta().MaxRequested + // Assert that there are no updates in rsfu.inProgress.supportFrom to make + // sure we can iterate over rsfu.checkedIn.supportFrom in the loop below. + assert( + len(rsfu.inProgress.supportFrom) == 0, "reading from requesterStateForUpdate."+ + "checkedIn.supportFrom while requesterStateForUpdate.inProgress.supportFrom is not empty", + ) + for _, ss := range rsfu.checkedIn.supportFrom { + heartbeat := slpb.Message{ + Type: slpb.MsgHeartbeat, + From: from, + To: ss.Target, + Epoch: ss.Epoch, + Expiration: maxRequested, + } + heartbeats = append(heartbeats, heartbeat) + } + return heartbeats +} + +// Functions for handling heartbeat responses. + +// handleHeartbeatResponse handles a single heartbeat response message. It +// updates the inProgress view of requesterStateForUpdate only if there are any +// changes. +func (rsfu *requesterStateForUpdate) handleHeartbeatResponse(msg slpb.Message) { + from := msg.From + meta := rsfu.getMeta() + ss, ok := rsfu.getSupportFrom(from) + if !ok { + ss = slpb.SupportState{Target: from} + } + metaNew, ssNew := handleHeartbeatResponse(meta, ss, msg) + if meta != metaNew { + rsfu.inProgress.meta = metaNew + } + if ss != ssNew { + rsfu.inProgress.supportFrom[from] = ssNew + } +} + +// handleHeartbeatResponse contains the core logic for updating the epoch and +// expiration for a support provider upon receiving a heartbeat response. +func handleHeartbeatResponse( + rm slpb.RequesterMeta, ss slpb.SupportState, msg slpb.Message, +) (slpb.RequesterMeta, slpb.SupportState) { + if rm.MaxEpoch < msg.Epoch { + rm.MaxEpoch = msg.Epoch + } + if ss.Epoch == msg.Epoch { + ss.Expiration.Forward(msg.Expiration) + } else if ss.Epoch < msg.Epoch { + assert( + ss.Epoch == msg.Epoch-1, + "the supporter epoch leads the requester epoch by more than 1", + ) + ss.Epoch = msg.Epoch + assert( + msg.Expiration == hlc.Timestamp{}, + "the supporter responded with an incremented epoch but non-zero timestamp", + ) + ss.Expiration = msg.Expiration + } + return rm, ss +} + +// Functions for incrementing MaxEpoch. + +// incrementMaxEpoch increments the inProgress view of MaxEpoch. +func (rsfu *requesterStateForUpdate) incrementMaxEpoch() { + currentEpoch := rsfu.getMeta().MaxEpoch + rsfu.inProgress.meta.MaxEpoch = currentEpoch + 1 +} + +func assert(condition bool, msg string) { + if !condition { + panic(msg) + } +} diff --git a/pkg/kv/kvserver/storeliveness/store_liveness_test.go b/pkg/kv/kvserver/storeliveness/store_liveness_test.go new file mode 100644 index 000000000000..71b1e8b9cf54 --- /dev/null +++ b/pkg/kv/kvserver/storeliveness/store_liveness_test.go @@ -0,0 +1,212 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package storeliveness + +import ( + "context" + "fmt" + "slices" + "strings" + "testing" + "time" + + slpb "github.com/cockroachdb/cockroach/pkg/kv/kvserver/storeliveness/storelivenesspb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/datadriven" +) + +func TestStoreLiveness(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + storeID := slpb.StoreIdent{NodeID: roachpb.NodeID(1), StoreID: roachpb.StoreID(1)} + + datadriven.Walk( + t, datapathutils.TestDataPath(t), func(t *testing.T, path string) { + ss := newSupporterStateHandler() + rs := newRequesterStateHandler() + datadriven.RunTest( + t, path, func(t *testing.T, d *datadriven.TestData) string { + switch d.Cmd { + case "add-store": + remoteID := parseStoreID(t, d, "node-id", "store-id") + rs.addStore(remoteID) + return "" + + case "remove-store": + remoteID := parseStoreID(t, d, "node-id", "store-id") + rs.removeStore(remoteID) + return "" + + case "support-from": + remoteID := parseStoreID(t, d, "node-id", "store-id") + supportState, _ := rs.getSupportFrom(remoteID) + return fmt.Sprintf("requester state: %+v", supportState) + + case "support-for": + remoteID := parseStoreID(t, d, "node-id", "store-id") + supportState := ss.getSupportFor(remoteID) + return fmt.Sprintf("supporter state: %+v", supportState) + + case "send-heartbeats": + now := parseTimestamp(t, d, "now") + var interval string + d.ScanArgs(t, "liveness-interval", &interval) + livenessInterval, err := time.ParseDuration(interval) + if err != nil { + t.Errorf("can't parse liveness interval duration %s; error: %v", interval, err) + } + rsfu := rs.checkOutUpdate() + heartbeats := rsfu.getHeartbeatsToSend(storeID, now, livenessInterval) + rs.checkInUpdate(rsfu) + return fmt.Sprintf("heartbeats:\n%s", printMsgs(heartbeats)) + + case "handle-messages": + msgs := parseMsgs(t, d, storeID) + var responses []slpb.Message + rsfu := rs.checkOutUpdate() + ssfu := ss.checkOutUpdate() + for _, msg := range msgs { + switch msg.Type { + case slpb.MsgHeartbeat: + responses = append(responses, ssfu.handleHeartbeat(msg)) + case slpb.MsgHeartbeatResp: + rsfu.handleHeartbeatResponse(msg) + default: + log.Errorf(context.Background(), "unexpected message type: %v", msg.Type) + } + } + rs.checkInUpdate(rsfu) + ss.checkInUpdate(ssfu) + if len(responses) > 0 { + return fmt.Sprintf("responses:\n%s", printMsgs(responses)) + } else { + return "" + } + + case "withdraw-support": + now := parseTimestamp(t, d, "now") + ssfu := ss.checkOutUpdate() + ssfu.withdrawSupport(hlc.ClockTimestamp(now)) + ss.checkInUpdate(ssfu) + return "" + + case "restart": + // TODO(mira): wipe out all in-memory state properly, once we have + // real disk persistence. + rs.requesterState.supportFrom = make(map[slpb.StoreIdent]slpb.SupportState) + rsfu := rs.checkOutUpdate() + rsfu.incrementMaxEpoch() + rs.checkInUpdate(rsfu) + return "" + + case "debug-requester-state": + return fmt.Sprintf( + "meta:\n%+v\nsupport from:\n%+v", rs.requesterState.meta, + printSupportMap(rs.requesterState.supportFrom), + ) + + case "debug-supporter-state": + return fmt.Sprintf( + "meta:\n%+v\nsupport for:\n%+v", ss.supporterState.meta, + printSupportMap(ss.supporterState.supportFor), + ) + + default: + return fmt.Sprintf("unknown command: %s", d.Cmd) + } + }, + ) + }, + ) +} + +func printMsgs(msgs []slpb.Message) string { + var sortedMsgs []string + for _, msg := range msgs { + sortedMsgs = append(sortedMsgs, fmt.Sprintf("%+v", msg)) + } + // Sort the messages for a deterministic output. + slices.Sort(sortedMsgs) + return strings.Join(sortedMsgs, "\n") +} + +func printSupportMap(m map[slpb.StoreIdent]slpb.SupportState) string { + var sortedSupportMap []string + for _, support := range m { + sortedSupportMap = append(sortedSupportMap, fmt.Sprintf("%+v", support)) + } + slices.Sort(sortedSupportMap) + return strings.Join(sortedSupportMap, "\n") +} + +func parseStoreID( + t *testing.T, d *datadriven.TestData, nodeStr string, storeStr string, +) slpb.StoreIdent { + var nodeID int64 + d.ScanArgs(t, nodeStr, &nodeID) + var storeID int64 + d.ScanArgs(t, storeStr, &storeID) + return slpb.StoreIdent{ + NodeID: roachpb.NodeID(nodeID), + StoreID: roachpb.StoreID(storeID), + } +} + +func parseTimestamp(t *testing.T, d *datadriven.TestData, timeStr string) hlc.Timestamp { + var wallTimeSecs int64 + d.ScanArgs(t, timeStr, &wallTimeSecs) + wallTime := wallTimeSecs * int64(time.Second) + return hlc.Timestamp{WallTime: wallTime} +} + +func parseMsgs(t *testing.T, d *datadriven.TestData, storeIdent slpb.StoreIdent) []slpb.Message { + var msgs []slpb.Message + lines := strings.Split(d.Input, "\n") + for _, line := range lines { + var err error + d.Cmd, d.CmdArgs, err = datadriven.ParseLine(line) + if err != nil { + d.Fatalf(t, "error parsing message: %v", err) + } + if d.Cmd != "msg" { + d.Fatalf(t, "expected \"msg\", found %s", d.Cmd) + } + var msgTypeStr string + d.ScanArgs(t, "type", &msgTypeStr) + var msgType slpb.MessageType + switch msgTypeStr { + case slpb.MsgHeartbeat.String(): + msgType = slpb.MsgHeartbeat + case slpb.MsgHeartbeatResp.String(): + msgType = slpb.MsgHeartbeatResp + default: + d.Fatalf(t, "unexpected \"type\", found %s", msgTypeStr) + } + remoteID := parseStoreID(t, d, "from-node-id", "from-store-id") + var epoch int64 + d.ScanArgs(t, "epoch", &epoch) + expiration := parseTimestamp(t, d, "expiration") + msg := slpb.Message{ + Type: msgType, + From: remoteID, + To: storeIdent, + Epoch: slpb.Epoch(epoch), + Expiration: expiration, + } + msgs = append(msgs, msg) + } + return msgs +} diff --git a/pkg/kv/kvserver/storeliveness/supporter_state.go b/pkg/kv/kvserver/storeliveness/supporter_state.go new file mode 100644 index 000000000000..a3f17d4a1b6c --- /dev/null +++ b/pkg/kv/kvserver/storeliveness/supporter_state.go @@ -0,0 +1,240 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package storeliveness + +import ( + "sync/atomic" + + slpb "github.com/cockroachdb/cockroach/pkg/kv/kvserver/storeliveness/storelivenesspb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +// supporterState stores the core data structures for providing support. +type supporterState struct { + // meta stores the SupporterMeta, including the max timestamp at which this + // store has withdrawn support. + meta slpb.SupporterMeta + // supportFor stores the SupportState for each remote store for which this + // store has provided support. + supportFor map[slpb.StoreIdent]slpb.SupportState +} + +// supporterStateHandler is the main interface for handling support for other +// stores. The typical interactions with supporterStateHandler are: +// - getSupportFor(id slpb.StoreIdent) +// - ssfu := checkOutUpdate() +// ssfu.handleHeartbeat(msg slpb.Message) +// checkInUpdate(ssfu) +// - ssfu := checkOutUpdate() +// ssfu.withdrawSupport(now hlc.ClockTimestamp) +// checkInUpdate(ssfu) +// +// Only one update can be in progress to ensure that multiple mutation methods +// are not run concurrently. +// +// Adding a store to support is done automatically when a heartbeat from that +// store is first received. Currently, a store is never removed. +type supporterStateHandler struct { + // supporterState is the source of truth for provided support. + supporterState supporterState + // mu controls access to supporterState. The access pattern to supporterState + // is single writer, multi reader. Concurrent reads come from API calls to + // SupportFor; these require RLocking mu. Updates to supporterState are done + // from a single goroutine; these require Locking mu when writing the updates. + // These updates also read from supporterState but there is no need to RLock + // mu during these reads (since there are no concurrent writes). + mu syncutil.RWMutex + // update is a reference to an in-progress change in supporterStateForUpdate. + // A non-nil update implies there is no ongoing update; i.e. the referenced + // requesterStateForUpdate is available to be checked out. + update atomic.Pointer[supporterStateForUpdate] +} + +func newSupporterStateHandler() *supporterStateHandler { + ssh := &supporterStateHandler{ + supporterState: supporterState{ + meta: slpb.SupporterMeta{}, + supportFor: make(map[slpb.StoreIdent]slpb.SupportState), + }, + } + ssh.update.Store( + &supporterStateForUpdate{ + checkedIn: &ssh.supporterState, + inProgress: supporterState{ + meta: slpb.SupporterMeta{}, + supportFor: make(map[slpb.StoreIdent]slpb.SupportState), + }, + }, + ) + return ssh +} + +// supporterStateForUpdate is a helper struct that facilitates updates to +// supporterState. It is necessary only for batch updates where the individual +// updates need to see each other's changes, while concurrent calls to +// SupportFor see the persisted-to-disk view until the in-progress batch is +// successfully persisted. +type supporterStateForUpdate struct { + // checkedIn is a reference to the original supporterState struct stored in + // supporterStateHandler. It is used to respond to calls to SupportFor (while + // an update is in progress) to provide a response consistent with the state + // persisted on disk. + checkedIn *supporterState + // inProgress holds all the updates to supporterState that are in progress and + // have not yet been reflected in the checkedIn view. The inProgress view + // ensures that ongoing updates from the same batch see each other's changes. + inProgress supporterState +} + +// getSupportFor returns the SupportState corresponding to the given store in +// supporterState.supportFor. +func (ssh *supporterStateHandler) getSupportFor(id slpb.StoreIdent) slpb.SupportState { + ssh.mu.RLock() + defer ssh.mu.RUnlock() + return ssh.supporterState.supportFor[id] +} + +// Functions for handling supporterState updates. + +// getMeta returns the SupporterMeta from the inProgress view; if not present, +// it falls back to the SupporterMeta from the checkedIn view. +func (ssfu *supporterStateForUpdate) getMeta() slpb.SupporterMeta { + if ssfu.inProgress.meta != (slpb.SupporterMeta{}) { + return ssfu.inProgress.meta + } + return ssfu.checkedIn.meta +} + +// getSupportFor returns the SupportState from the inProgress view; if not +// present, it falls back to the SupportState from the checkedIn view. +// The returned boolean indicates whether the store is present in the supportFor +// map; it does NOT indicate whether support is provided. +func (ssfu *supporterStateForUpdate) getSupportFor( + storeID slpb.StoreIdent, +) (slpb.SupportState, bool) { + ss, ok := ssfu.inProgress.supportFor[storeID] + if !ok { + ss, ok = ssfu.checkedIn.supportFor[storeID] + } + return ss, ok +} + +// reset clears the inProgress view of supporterStateForUpdate. +func (ssfu *supporterStateForUpdate) reset() { + ssfu.inProgress.meta = slpb.SupporterMeta{} + clear(ssfu.inProgress.supportFor) +} + +// checkOutUpdate returns the supporterStateForUpdate referenced in +// supporterStateHandler.update and replaces it with a nil pointer to ensure it +// cannot be checked out concurrently as part of another mutation. +func (ssh *supporterStateHandler) checkOutUpdate() *supporterStateForUpdate { + ssfu := ssh.update.Swap(nil) + if ssfu == nil { + panic("unsupported concurrent update") + } + return ssfu +} + +// checkInUpdate updates the checkedIn view of supporterStateForUpdate with any +// updates from the inProgress view. It clears the inProgress view, and swaps it +// back in supporterStateHandler.update to be checked out by future updates. +func (ssh *supporterStateHandler) checkInUpdate(ssfu *supporterStateForUpdate) { + defer func() { + ssfu.reset() + ssh.update.Swap(ssfu) + }() + if ssfu.inProgress.meta == (slpb.SupporterMeta{}) && len(ssfu.inProgress.supportFor) == 0 { + return + } + ssh.mu.Lock() + defer ssh.mu.Unlock() + if ssfu.inProgress.meta != (slpb.SupporterMeta{}) { + if !ssfu.inProgress.meta.MaxWithdrawn.IsEmpty() { + ssfu.checkedIn.meta.MaxWithdrawn = ssfu.inProgress.meta.MaxWithdrawn + } + } + for storeID, ss := range ssfu.inProgress.supportFor { + ssfu.checkedIn.supportFor[storeID] = ss + } +} + +// Functions for handling heartbeats. + +// handleHeartbeat handles a single heartbeat message. It updates the inProgress +// view of supporterStateForUpdate only if there are any changes, and returns +// a heartbeat response message. +func (ssfu *supporterStateForUpdate) handleHeartbeat(msg slpb.Message) slpb.Message { + from := msg.From + ss, ok := ssfu.getSupportFor(from) + if !ok { + ss = slpb.SupportState{Target: from} + } + ssNew := handleHeartbeat(ss, msg) + if ss != ssNew { + ssfu.inProgress.supportFor[from] = ssNew + } + return slpb.Message{ + Type: slpb.MsgHeartbeatResp, + From: msg.To, + To: msg.From, + Epoch: ssNew.Epoch, + Expiration: ssNew.Expiration, + } +} + +// handleHeartbeat contains the core logic for updating the epoch and expiration +// of a support requester upon receiving a heartbeat. +func handleHeartbeat(ss slpb.SupportState, msg slpb.Message) slpb.SupportState { + if ss.Epoch == msg.Epoch { + ss.Expiration.Forward(msg.Expiration) + } else if ss.Epoch < msg.Epoch { + assert( + ss.Expiration.Less(msg.Expiration), "support expiration regression across epochs", + ) + ss.Epoch = msg.Epoch + ss.Expiration = msg.Expiration + } + return ss +} + +// Functions for withdrawing support. + +// withdrawSupport handles a single support withdrawal. It updates the +// inProgress view of supporterStateForUpdate only if there are any changes. +func (ssfu *supporterStateForUpdate) withdrawSupport(now hlc.ClockTimestamp) { + // Assert that there are no updates in ssfu.inProgress.supportFor to make + // sure we can iterate over ssfu.checkedIn.supportFor in the loop below. + assert( + len(ssfu.inProgress.supportFor) == 0, "reading from supporterStateForUpdate."+ + "checkedIn.supportFor while supporterStateForUpdate.inProgress.supportFor is not empty", + ) + for id, ss := range ssfu.checkedIn.supportFor { + ssNew := maybeWithdrawSupport(ss, now) + if ss != ssNew { + ssfu.inProgress.supportFor[id] = ssNew + if ssfu.getMeta().MaxWithdrawn.Less(now) { + ssfu.inProgress.meta.MaxWithdrawn.Forward(now) + } + } + } +} + +// maybeWithdrawSupport contains the core logic for updating the epoch and +// expiration of a support requester when withdrawing support. +func maybeWithdrawSupport(ss slpb.SupportState, now hlc.ClockTimestamp) slpb.SupportState { + if !ss.Expiration.IsEmpty() && ss.Expiration.LessEq(now.ToTimestamp()) { + ss.Epoch++ + ss.Expiration = hlc.Timestamp{} + } + return ss +} diff --git a/pkg/kv/kvserver/storeliveness/testdata/basic b/pkg/kv/kvserver/storeliveness/testdata/basic new file mode 100644 index 000000000000..632b2132a173 --- /dev/null +++ b/pkg/kv/kvserver/storeliveness/testdata/basic @@ -0,0 +1,54 @@ +# ------------------------------------------------------------- +# A basic test that includes store (n1, s1): +# - requesting support from another store, +# - providing support for another store, +# - querying supportFor and supportFrom, +# - withdrawing support. +# ------------------------------------------------------------- + +add-store node-id=2 store-id=2 +---- + +send-heartbeats now=100 liveness-interval=10s +---- +heartbeats: +{Type:MsgHeartbeat From:{NodeID:1 StoreID:1} To:{NodeID:2 StoreID:2} Epoch:1 Expiration:110.000000000,0} + +handle-messages + msg type=MsgHeartbeat from-node-id=2 from-store-id=2 epoch=2 expiration=200 +---- +responses: +{Type:MsgHeartbeatResp From:{NodeID:1 StoreID:1} To:{NodeID:2 StoreID:2} Epoch:2 Expiration:200.000000000,0} + +handle-messages + msg type=MsgHeartbeatResp from-node-id=2 from-store-id=2 epoch=1 expiration=110 +---- + +support-from node-id=2 store-id=2 +---- +requester state: {Target:{NodeID:2 StoreID:2} Epoch:1 Expiration:110.000000000,0} + +support-for node-id=2 store-id=2 +---- +supporter state: {Target:{NodeID:2 StoreID:2} Epoch:2 Expiration:200.000000000,0} + +withdraw-support now=201 +---- + +support-for node-id=2 store-id=2 +---- +supporter state: {Target:{NodeID:2 StoreID:2} Epoch:3 Expiration:0,0} + +debug-requester-state +---- +meta: +{MaxEpoch:1 MaxRequested:110.000000000,0} +support from: +{Target:{NodeID:2 StoreID:2} Epoch:1 Expiration:110.000000000,0} + +debug-supporter-state +---- +meta: +{MaxWithdrawn:201.000000000,0} +support for: +{Target:{NodeID:2 StoreID:2} Epoch:3 Expiration:0,0} diff --git a/pkg/kv/kvserver/storeliveness/testdata/liveness_interval b/pkg/kv/kvserver/storeliveness/testdata/liveness_interval new file mode 100644 index 000000000000..867256a9e4a5 --- /dev/null +++ b/pkg/kv/kvserver/storeliveness/testdata/liveness_interval @@ -0,0 +1,63 @@ +# ------------------------------------------------------------- +# In this test a store (n1, s1) requests support with different +# values for the liveness interval. +# ------------------------------------------------------------- + +add-store node-id=2 store-id=2 +---- + +# ------------------------------------------------------------- +# Store (n1, s1) requests and receives support with +# liveness-interval=10s. +# ------------------------------------------------------------- + +send-heartbeats now=100 liveness-interval=10s +---- +heartbeats: +{Type:MsgHeartbeat From:{NodeID:1 StoreID:1} To:{NodeID:2 StoreID:2} Epoch:1 Expiration:110.000000000,0} + +handle-messages + msg type=MsgHeartbeatResp from-node-id=2 from-store-id=2 epoch=1 expiration=110 +---- + +support-from node-id=2 store-id=2 +---- +requester state: {Target:{NodeID:2 StoreID:2} Epoch:1 Expiration:110.000000000,0} + + +# ------------------------------------------------------------- +# Store (n1, s1) requests support with liveness-interval=20s +# and successfully extends support from (n2, s2). +# ------------------------------------------------------------- + +send-heartbeats now=101 liveness-interval=20s +---- +heartbeats: +{Type:MsgHeartbeat From:{NodeID:1 StoreID:1} To:{NodeID:2 StoreID:2} Epoch:1 Expiration:121.000000000,0} + +handle-messages + msg type=MsgHeartbeatResp from-node-id=2 from-store-id=2 epoch=1 expiration=121 +---- + +support-from node-id=2 store-id=2 +---- +requester state: {Target:{NodeID:2 StoreID:2} Epoch:1 Expiration:121.000000000,0} + + +# ------------------------------------------------------------- +# Store (n1, s1) requests support with liveness-interval=5s. +# The support expiration does not regress. +# ------------------------------------------------------------- + +send-heartbeats now=102 liveness-interval=5s +---- +heartbeats: +{Type:MsgHeartbeat From:{NodeID:1 StoreID:1} To:{NodeID:2 StoreID:2} Epoch:1 Expiration:121.000000000,0} + +handle-messages + msg type=MsgHeartbeatResp from-node-id=2 from-store-id=2 epoch=1 expiration=121 +---- + +support-from node-id=2 store-id=2 +---- +requester state: {Target:{NodeID:2 StoreID:2} Epoch:1 Expiration:121.000000000,0} diff --git a/pkg/kv/kvserver/storeliveness/testdata/multi-store b/pkg/kv/kvserver/storeliveness/testdata/multi-store new file mode 100644 index 000000000000..b102287b38d4 --- /dev/null +++ b/pkg/kv/kvserver/storeliveness/testdata/multi-store @@ -0,0 +1,77 @@ +# ------------------------------------------------------------- +# A basic test that includes store (n1, s1): +# - requesting support from many stores, +# - providing support for many stores, +# - withdrawing support. +# ------------------------------------------------------------- + +add-store node-id=1 store-id=2 +---- + +add-store node-id=2 store-id=3 +---- + +add-store node-id=2 store-id=4 +---- + +send-heartbeats now=100 liveness-interval=10s +---- +heartbeats: +{Type:MsgHeartbeat From:{NodeID:1 StoreID:1} To:{NodeID:1 StoreID:2} Epoch:1 Expiration:110.000000000,0} +{Type:MsgHeartbeat From:{NodeID:1 StoreID:1} To:{NodeID:2 StoreID:3} Epoch:1 Expiration:110.000000000,0} +{Type:MsgHeartbeat From:{NodeID:1 StoreID:1} To:{NodeID:2 StoreID:4} Epoch:1 Expiration:110.000000000,0} + +handle-messages + msg type=MsgHeartbeat from-node-id=1 from-store-id=2 epoch=2 expiration=102 + msg type=MsgHeartbeat from-node-id=2 from-store-id=3 epoch=3 expiration=103 + msg type=MsgHeartbeat from-node-id=2 from-store-id=4 epoch=4 expiration=104 +---- +responses: +{Type:MsgHeartbeatResp From:{NodeID:1 StoreID:1} To:{NodeID:1 StoreID:2} Epoch:2 Expiration:102.000000000,0} +{Type:MsgHeartbeatResp From:{NodeID:1 StoreID:1} To:{NodeID:2 StoreID:3} Epoch:3 Expiration:103.000000000,0} +{Type:MsgHeartbeatResp From:{NodeID:1 StoreID:1} To:{NodeID:2 StoreID:4} Epoch:4 Expiration:104.000000000,0} + +handle-messages + msg type=MsgHeartbeatResp from-node-id=1 from-store-id=2 epoch=1 expiration=110 + msg type=MsgHeartbeatResp from-node-id=2 from-store-id=3 epoch=2 expiration=0 + msg type=MsgHeartbeatResp from-node-id=2 from-store-id=4 epoch=1 expiration=110 +---- + +debug-requester-state +---- +meta: +{MaxEpoch:2 MaxRequested:110.000000000,0} +support from: +{Target:{NodeID:1 StoreID:2} Epoch:1 Expiration:110.000000000,0} +{Target:{NodeID:2 StoreID:3} Epoch:2 Expiration:0,0} +{Target:{NodeID:2 StoreID:4} Epoch:1 Expiration:110.000000000,0} + +debug-supporter-state +---- +meta: +{MaxWithdrawn:0,0} +support for: +{Target:{NodeID:1 StoreID:2} Epoch:2 Expiration:102.000000000,0} +{Target:{NodeID:2 StoreID:3} Epoch:3 Expiration:103.000000000,0} +{Target:{NodeID:2 StoreID:4} Epoch:4 Expiration:104.000000000,0} + +withdraw-support now=103 +---- + +debug-requester-state +---- +meta: +{MaxEpoch:2 MaxRequested:110.000000000,0} +support from: +{Target:{NodeID:1 StoreID:2} Epoch:1 Expiration:110.000000000,0} +{Target:{NodeID:2 StoreID:3} Epoch:2 Expiration:0,0} +{Target:{NodeID:2 StoreID:4} Epoch:1 Expiration:110.000000000,0} + +debug-supporter-state +---- +meta: +{MaxWithdrawn:103.000000000,0} +support for: +{Target:{NodeID:1 StoreID:2} Epoch:3 Expiration:0,0} +{Target:{NodeID:2 StoreID:3} Epoch:4 Expiration:0,0} +{Target:{NodeID:2 StoreID:4} Epoch:4 Expiration:104.000000000,0} diff --git a/pkg/kv/kvserver/storeliveness/testdata/requester_state b/pkg/kv/kvserver/storeliveness/testdata/requester_state new file mode 100644 index 000000000000..0858cb2f9e7a --- /dev/null +++ b/pkg/kv/kvserver/storeliveness/testdata/requester_state @@ -0,0 +1,142 @@ +# ------------------------------------------------------------- +# In this test a store (n1, s1) acts as a requester of +# support from another store (n2, s2). +# ------------------------------------------------------------- + +add-store node-id=2 store-id=2 +---- + +# ------------------------------------------------------------- +# Store (n1, s1) successfully establishes support. +# ------------------------------------------------------------- + +send-heartbeats now=100 liveness-interval=10s +---- +heartbeats: +{Type:MsgHeartbeat From:{NodeID:1 StoreID:1} To:{NodeID:2 StoreID:2} Epoch:1 Expiration:110.000000000,0} + +handle-messages + msg type=MsgHeartbeatResp from-node-id=2 from-store-id=2 epoch=1 expiration=110 +---- + +support-from node-id=2 store-id=2 +---- +requester state: {Target:{NodeID:2 StoreID:2} Epoch:1 Expiration:110.000000000,0} + +debug-requester-state +---- +meta: +{MaxEpoch:1 MaxRequested:110.000000000,0} +support from: +{Target:{NodeID:2 StoreID:2} Epoch:1 Expiration:110.000000000,0} + + +# ------------------------------------------------------------- +# Store (n1, s1) successfully extends support. +# ------------------------------------------------------------- + +send-heartbeats now=200 liveness-interval=10s +---- +heartbeats: +{Type:MsgHeartbeat From:{NodeID:1 StoreID:1} To:{NodeID:2 StoreID:2} Epoch:1 Expiration:210.000000000,0} + +handle-messages + msg type=MsgHeartbeatResp from-node-id=2 from-store-id=2 epoch=1 expiration=210 +---- + +support-from node-id=2 store-id=2 +---- +requester state: {Target:{NodeID:2 StoreID:2} Epoch:1 Expiration:210.000000000,0} + + +# ------------------------------------------------------------- +# Store (n1, s1) loses support. +# ------------------------------------------------------------- + +send-heartbeats now=300 liveness-interval=10s +---- +heartbeats: +{Type:MsgHeartbeat From:{NodeID:1 StoreID:1} To:{NodeID:2 StoreID:2} Epoch:1 Expiration:310.000000000,0} + +handle-messages + msg type=MsgHeartbeatResp from-node-id=2 from-store-id=2 epoch=2 expiration=0 +---- + +support-from node-id=2 store-id=2 +---- +requester state: {Target:{NodeID:2 StoreID:2} Epoch:2 Expiration:0,0} + +debug-requester-state +---- +meta: +{MaxEpoch:2 MaxRequested:310.000000000,0} +support from: +{Target:{NodeID:2 StoreID:2} Epoch:2 Expiration:0,0} + + +# ------------------------------------------------------------- +# Store (n1, s1) re-establishes support at a higher epoch. +# ------------------------------------------------------------- + +send-heartbeats now=400 liveness-interval=10s +---- +heartbeats: +{Type:MsgHeartbeat From:{NodeID:1 StoreID:1} To:{NodeID:2 StoreID:2} Epoch:2 Expiration:410.000000000,0} + +handle-messages + msg type=MsgHeartbeatResp from-node-id=2 from-store-id=2 epoch=2 expiration=410 +---- + +support-from node-id=2 store-id=2 +---- +requester state: {Target:{NodeID:2 StoreID:2} Epoch:2 Expiration:410.000000000,0} + + +# ------------------------------------------------------------- +# Store (n1, s1) does not regress support epoch or expiration. +# ------------------------------------------------------------- + +handle-messages + msg type=MsgHeartbeatResp from-node-id=2 from-store-id=2 epoch=1 expiration=500 +---- + +support-from node-id=2 store-id=2 +---- +requester state: {Target:{NodeID:2 StoreID:2} Epoch:2 Expiration:410.000000000,0} + +handle-messages + msg type=MsgHeartbeatResp from-node-id=2 from-store-id=2 epoch=1 expiration=0 +---- + +support-from node-id=2 store-id=2 +---- +requester state: {Target:{NodeID:2 StoreID:2} Epoch:2 Expiration:410.000000000,0} + +handle-messages + msg type=MsgHeartbeatResp from-node-id=2 from-store-id=2 epoch=2 expiration=400 +---- + +support-from node-id=2 store-id=2 +---- +requester state: {Target:{NodeID:2 StoreID:2} Epoch:2 Expiration:410.000000000,0} + + +# ------------------------------------------------------------- +# Store (n1, s1) requests support but receives to response. +# ------------------------------------------------------------- + +send-heartbeats now=500 liveness-interval=10s +---- +heartbeats: +{Type:MsgHeartbeat From:{NodeID:1 StoreID:1} To:{NodeID:2 StoreID:2} Epoch:2 Expiration:510.000000000,0} + +support-from node-id=2 store-id=2 +---- +requester state: {Target:{NodeID:2 StoreID:2} Epoch:2 Expiration:410.000000000,0} + +debug-requester-state +---- +meta: +{MaxEpoch:2 MaxRequested:510.000000000,0} +support from: +{Target:{NodeID:2 StoreID:2} Epoch:2 Expiration:410.000000000,0} diff --git a/pkg/kv/kvserver/storeliveness/testdata/restart b/pkg/kv/kvserver/storeliveness/testdata/restart new file mode 100644 index 000000000000..7ca7def0c849 --- /dev/null +++ b/pkg/kv/kvserver/storeliveness/testdata/restart @@ -0,0 +1,81 @@ +# ------------------------------------------------------------- +# In this test (n1, s1) is restarted and loses all supportFrom +# records, but keeps all metadata and supportFor records. +# ------------------------------------------------------------- + +add-store node-id=2 store-id=2 +---- + +# ------------------------------------------------------------- +# Store (n1, s1) established support for and from (n2, s2). +# ------------------------------------------------------------- + +send-heartbeats now=100 liveness-interval=10s +---- +heartbeats: +{Type:MsgHeartbeat From:{NodeID:1 StoreID:1} To:{NodeID:2 StoreID:2} Epoch:1 Expiration:110.000000000,0} + +handle-messages + msg type=MsgHeartbeat from-node-id=2 from-store-id=2 epoch=2 expiration=200 +---- +responses: +{Type:MsgHeartbeatResp From:{NodeID:1 StoreID:1} To:{NodeID:2 StoreID:2} Epoch:2 Expiration:200.000000000,0} + +handle-messages + msg type=MsgHeartbeatResp from-node-id=2 from-store-id=2 epoch=1 expiration=110 +---- + +debug-requester-state +---- +meta: +{MaxEpoch:1 MaxRequested:110.000000000,0} +support from: +{Target:{NodeID:2 StoreID:2} Epoch:1 Expiration:110.000000000,0} + +debug-supporter-state +---- +meta: +{MaxWithdrawn:0,0} +support for: +{Target:{NodeID:2 StoreID:2} Epoch:2 Expiration:200.000000000,0} + +# ------------------------------------------------------------- +# Store (n1, s1) restarts. +# ------------------------------------------------------------- + +restart +---- + +debug-requester-state +---- +meta: +{MaxEpoch:2 MaxRequested:110.000000000,0} +support from: + +debug-supporter-state +---- +meta: +{MaxWithdrawn:0,0} +support for: +{Target:{NodeID:2 StoreID:2} Epoch:2 Expiration:200.000000000,0} + +# ------------------------------------------------------------- +# Store (n1, s1) sends heartbeats but it forgot about support +# from (n2, s2), so it doesn't send any heartbeats. +# ------------------------------------------------------------- + +send-heartbeats now=200 liveness-interval=10s +---- +heartbeats: + +add-store node-id=2 store-id=2 +---- + +# ------------------------------------------------------------- +# Store (n1, s1) sends heartbeats with an incremented epoch. +# ------------------------------------------------------------- + +send-heartbeats now=200 liveness-interval=10s +---- +heartbeats: +{Type:MsgHeartbeat From:{NodeID:1 StoreID:1} To:{NodeID:2 StoreID:2} Epoch:2 Expiration:210.000000000,0} diff --git a/pkg/kv/kvserver/storeliveness/testdata/supporter_state b/pkg/kv/kvserver/storeliveness/testdata/supporter_state new file mode 100644 index 000000000000..c60c349255b8 --- /dev/null +++ b/pkg/kv/kvserver/storeliveness/testdata/supporter_state @@ -0,0 +1,107 @@ +# ------------------------------------------------------------- +# In this test a store (n1, s1) acts as a provider of +# support from another store (n2, s2). +# ------------------------------------------------------------- + +# ------------------------------------------------------------- +# Store (n1, s1) provides support. +# ------------------------------------------------------------- + +handle-messages + msg type=MsgHeartbeat from-node-id=2 from-store-id=2 epoch=1 expiration=100 +---- +responses: +{Type:MsgHeartbeatResp From:{NodeID:1 StoreID:1} To:{NodeID:2 StoreID:2} Epoch:1 Expiration:100.000000000,0} + +support-for node-id=2 store-id=2 +---- +supporter state: {Target:{NodeID:2 StoreID:2} Epoch:1 Expiration:100.000000000,0} + + +# ------------------------------------------------------------- +# Store (n1, s1) extends support. +# ------------------------------------------------------------- + +handle-messages + msg type=MsgHeartbeat from-node-id=2 from-store-id=2 epoch=1 expiration=200 +---- +responses: +{Type:MsgHeartbeatResp From:{NodeID:1 StoreID:1} To:{NodeID:2 StoreID:2} Epoch:1 Expiration:200.000000000,0} + +support-for node-id=2 store-id=2 +---- +supporter state: {Target:{NodeID:2 StoreID:2} Epoch:1 Expiration:200.000000000,0} + + +# ------------------------------------------------------------- +# Store (n1, s1) attempts to withdraw support but doesn't. +# ------------------------------------------------------------- + +withdraw-support now=199 +---- + +support-for node-id=2 store-id=2 +---- +supporter state: {Target:{NodeID:2 StoreID:2} Epoch:1 Expiration:200.000000000,0} + + +# ------------------------------------------------------------- +# Store (n1, s1) withdraws support. +# ------------------------------------------------------------- + +withdraw-support now=201 +---- + +support-for node-id=2 store-id=2 +---- +supporter state: {Target:{NodeID:2 StoreID:2} Epoch:2 Expiration:0,0} + +debug-supporter-state +---- +meta: +{MaxWithdrawn:201.000000000,0} +support for: +{Target:{NodeID:2 StoreID:2} Epoch:2 Expiration:0,0} + + +# ------------------------------------------------------------- +# Store (n1, s1) provides support at a higher epoch. +# TODO(mira): the expiration can regress here wrt the previous +# supported expiration but it won't be possible after clocks are +# integrated into this test. +# ------------------------------------------------------------- + +handle-messages + msg type=MsgHeartbeat from-node-id=2 from-store-id=2 epoch=2 expiration=300 +---- +responses: +{Type:MsgHeartbeatResp From:{NodeID:1 StoreID:1} To:{NodeID:2 StoreID:2} Epoch:2 Expiration:300.000000000,0} + +support-for node-id=2 store-id=2 +---- +supporter state: {Target:{NodeID:2 StoreID:2} Epoch:2 Expiration:300.000000000,0} + + +# ------------------------------------------------------------- +# Store (n1, s1) does not regress support epoch or expiration. +# ------------------------------------------------------------- + +handle-messages + msg type=MsgHeartbeat from-node-id=2 from-store-id=2 epoch=1 expiration=301 +---- +responses: +{Type:MsgHeartbeatResp From:{NodeID:1 StoreID:1} To:{NodeID:2 StoreID:2} Epoch:2 Expiration:300.000000000,0} + +support-for node-id=2 store-id=2 +---- +supporter state: {Target:{NodeID:2 StoreID:2} Epoch:2 Expiration:300.000000000,0} + +handle-messages + msg type=MsgHeartbeat from-node-id=2 from-store-id=2 epoch=2 expiration=299 +---- +responses: +{Type:MsgHeartbeatResp From:{NodeID:1 StoreID:1} To:{NodeID:2 StoreID:2} Epoch:2 Expiration:300.000000000,0} + +support-for node-id=2 store-id=2 +---- +supporter state: {Target:{NodeID:2 StoreID:2} Epoch:2 Expiration:300.000000000,0}