Skip to content

Commit

Permalink
storeliveness: supporter and requester models and transitions
Browse files Browse the repository at this point in the history
This commit introduces the Store Liveness algorithm implementation,
including the data structures to store the `supportFor` and
`supportFrom` state, and the functions to handle Store Liveness
messages that update these data structures.

Each of the support data structures can be safely updated in batch,
which includes writing the changes to disk (not implemented yet) first,
and then reflecting the changes in the in-memory data structures.
During these batch updates read access to the data structures is not
blocked (by holding mutexes or by synchronously waiting for a disk
write), enabling quick responses to `SupportFor` and `SupportFrom`
calls from the client.

Fixes: cockroachdb#125060

Release note: None
  • Loading branch information
miraradeva committed Aug 13, 2024
1 parent 0318e55 commit 5592437
Show file tree
Hide file tree
Showing 10 changed files with 1,293 additions and 1 deletion.
10 changes: 9 additions & 1 deletion pkg/kv/kvserver/storeliveness/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ go_library(
name = "storeliveness",
srcs = [
"fabric.go",
"requester_state.go",
"supporter_state.go",
"transport.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/storeliveness",
Expand All @@ -25,7 +27,11 @@ go_library(

go_test(
name = "storeliveness_test",
srcs = ["transport_test.go"],
srcs = [
"store_liveness_test.go",
"transport_test.go",
],
data = glob(["testdata/**"]),
embed = [":storeliveness"],
deps = [
"//pkg/gossip",
Expand All @@ -35,13 +41,15 @@ go_test(
"//pkg/rpc/nodedialer",
"//pkg/settings/cluster",
"//pkg/testutils",
"//pkg/testutils/datapathutils",
"//pkg/util",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/netutil",
"//pkg/util/stop",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
Expand Down
308 changes: 308 additions & 0 deletions pkg/kv/kvserver/storeliveness/requester_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,308 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package storeliveness

import (
"sync/atomic"
"time"

slpb "github.com/cockroachdb/cockroach/pkg/kv/kvserver/storeliveness/storelivenesspb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

// requesterState stores the core data structures for requesting support.
type requesterState struct {
// meta stores the RequesterMeta, including the max timestamp and max epoch at
// which this store has requested support.
meta slpb.RequesterMeta
// supportFrom stores the SupportState for each remote store from which this
// store has received support.
supportFrom map[slpb.StoreIdent]slpb.SupportState
}

// requesterStateHandler is the main interface for handling support from other
// stores. The typical interactions with requesterStateHandler are:
// - getSupportFrom(id slpb.StoreIdent)
// - addStore(id slpb.StoreIdent)
// - removeStore(id slpb.StoreIdent)
// - rsfu := checkOutUpdate()
// rsfu.getHeartbeatsToSend(now hlc.Timestamp, interval time.Duration)
// checkInUpdate(rsfu)
// - rsfu := checkOutUpdate()
// rsfu.handleHeartbeatResponse(msg slpb.Message)
// checkInUpdate(rsfu)
//
// Only one update can be in progress to ensure that multiple mutation methods
// are not run concurrently. Adding or removing a store while an update is in
// progress is not allowed.
type requesterStateHandler struct {
// requesterState is the source of truth for requested support.
requesterState requesterState
// mu controls access to requesterState. The access pattern to requesterState
// is single writer, multi reader. Concurrent reads come from API calls to
// SupportFrom; these require RLocking mu. Updates to requesterState are done
// from a single goroutine; this requires Locking mu when writing the updates.
// These updates also read from requesterState but there is no need to RLock
// mu during these reads (since there are no concurrent writes).
mu syncutil.RWMutex
// update is a reference to an in-progress change in requesterStateForUpdate.
// A non-nil update implies there is no ongoing update; i.e. the referenced
// requesterStateForUpdate is available to be checked out.
update atomic.Pointer[requesterStateForUpdate]
}

func newRequesterStateHandler() *requesterStateHandler {
rsh := &requesterStateHandler{
requesterState: requesterState{
meta: slpb.RequesterMeta{MaxEpoch: 1},
supportFrom: make(map[slpb.StoreIdent]slpb.SupportState),
},
}
rsh.update.Store(
&requesterStateForUpdate{
checkedIn: &rsh.requesterState,
inProgress: requesterState{
meta: slpb.RequesterMeta{},
supportFrom: make(map[slpb.StoreIdent]slpb.SupportState),
},
},
)
return rsh
}

// requesterStateForUpdate is a helper struct that facilitates updates to
// requesterState. It is necessary only for batch updates where the individual
// updates need to see each other's changes, while concurrent calls to
// SupportFrom see the persisted-to-disk view until the in-progress batch is
// successfully persisted.
type requesterStateForUpdate struct {
// checkedIn is a reference to the original requesterState struct stored in
// requesterStateHandler. It is used to respond to calls to SupportFrom (while
// an update is in progress) to provide a response consistent with the state
// persisted on disk.
checkedIn *requesterState
// inProgress holds all the updates to requesterState that are in progress and
// have not yet been reflected in the checkedIn view. The inProgress view
// ensures that ongoing updates from the same batch see each other's changes.
inProgress requesterState
}

// getSupportFrom returns the SupportState corresponding to the given store in
// requesterState.supportFrom. The returned boolean indicates whether the given
// store is present in the supportFrom map; it does NOT indicate whether support
// from that store is provided.
func (rsh *requesterStateHandler) getSupportFrom(id slpb.StoreIdent) (slpb.SupportState, bool) {
rsh.mu.RLock()
defer rsh.mu.RUnlock()
ss, ok := rsh.requesterState.supportFrom[id]
return ss, ok
}

// addStore adds a store to the requesterState.supportFrom map, if not present.
func (rsh *requesterStateHandler) addStore(id slpb.StoreIdent) {
// Adding a store doesn't require persisting anything to disk, so it doesn't
// need to go through the full checkOut/checkIn process. However, we still
// check out the update to ensure that there are no concurrent updates.
defer rsh.checkInUpdate(rsh.checkOutUpdate())
rsh.mu.Lock()
defer rsh.mu.Unlock()
if _, ok := rsh.requesterState.supportFrom[id]; !ok {
ss := slpb.SupportState{Target: id, Epoch: rsh.requesterState.meta.MaxEpoch}
rsh.requesterState.supportFrom[id] = ss
}
}

// removeStore removes a store from the requesterState.supportFrom map.
func (rsh *requesterStateHandler) removeStore(id slpb.StoreIdent) {
// Removing a store doesn't require persisting anything to disk, so it doesn't
// need to go through the full checkOut/checkIn process. However, we still
// check out the update to ensure that there are no concurrent updates.
defer rsh.checkInUpdate(rsh.checkOutUpdate())
rsh.mu.Lock()
defer rsh.mu.Unlock()
delete(rsh.requesterState.supportFrom, id)
}

// Functions for handling requesterState updates.

// getMeta returns the RequesterMeta from the inProgress view; if not present,
// it falls back to the RequesterMeta from the checkedIn view.
func (rsfu *requesterStateForUpdate) getMeta() slpb.RequesterMeta {
if rsfu.inProgress.meta != (slpb.RequesterMeta{}) {
return rsfu.inProgress.meta
}
return rsfu.checkedIn.meta
}

// getSupportFrom returns the SupportState from the inProgress view; if not
// present, it falls back to the SupportState from the checkedIn view. The
// returned boolean indicates whether the store is present in the supportFrom
// map; it does NOT indicate whether support from that store is provided.
func (rsfu *requesterStateForUpdate) getSupportFrom(
storeID slpb.StoreIdent,
) (slpb.SupportState, bool) {
ss, ok := rsfu.inProgress.supportFrom[storeID]
if !ok {
ss, ok = rsfu.checkedIn.supportFrom[storeID]
}
return ss, ok
}

// reset clears the inProgress view of requesterStateForUpdate.
func (rsfu *requesterStateForUpdate) reset() {
rsfu.inProgress.meta = slpb.RequesterMeta{}
clear(rsfu.inProgress.supportFrom)
}

// checkOutUpdate returns the requesterStateForUpdate referenced in
// requesterStateHandler.update and replaces it with a nil pointer to ensure it
// cannot be checked out concurrently as part of another mutation.
func (rsh *requesterStateHandler) checkOutUpdate() *requesterStateForUpdate {
rsfu := rsh.update.Swap(nil)
if rsfu == nil {
panic("unsupported concurrent update")
}
return rsfu
}

// checkInUpdate updates the checkedIn view of requesterStateForUpdate with any
// updates from the inProgress view. It clears the inProgress view, and swaps it
// back in requesterStateHandler.update to be checked out by future updates.
func (rsh *requesterStateHandler) checkInUpdate(rsfu *requesterStateForUpdate) {
defer func() {
rsfu.reset()
rsh.update.Swap(rsfu)
}()
if rsfu.inProgress.meta == (slpb.RequesterMeta{}) && len(rsfu.inProgress.supportFrom) == 0 {
return
}
rsh.mu.Lock()
defer rsh.mu.Unlock()
if rsfu.inProgress.meta != (slpb.RequesterMeta{}) {
if !rsfu.inProgress.meta.MaxRequested.IsEmpty() {
rsfu.checkedIn.meta.MaxRequested = rsfu.inProgress.meta.MaxRequested
}
if rsfu.inProgress.meta.MaxEpoch != 0 {
rsfu.checkedIn.meta.MaxEpoch = rsfu.inProgress.meta.MaxEpoch
}
}
for storeID, ss := range rsfu.inProgress.supportFrom {
rsfu.checkedIn.supportFrom[storeID] = ss
}
}

// Functions for generating heartbeats.

// getHeartbeatsToSend updates MaxRequested and generates heartbeats. These
// heartbeats must not be sent before the MaxRequested update is persisted to
// disk.
func (rsfu *requesterStateForUpdate) getHeartbeatsToSend(
from slpb.StoreIdent, now hlc.Timestamp, interval time.Duration,
) []slpb.Message {
rsfu.updateMaxRequested(now, interval)
return rsfu.generateHeartbeats(from)
}

// updateMaxRequested forwards the current MaxRequested timestamp to now +
// interval, where now is the node's clock timestamp and interval is the
// liveness interval.
func (rsfu *requesterStateForUpdate) updateMaxRequested(now hlc.Timestamp, interval time.Duration) {
newMaxRequested := now.Add(interval.Nanoseconds(), 0)
if rsfu.getMeta().MaxRequested.Less(newMaxRequested) {
rsfu.inProgress.meta.MaxRequested.Forward(newMaxRequested)
}
}

func (rsfu *requesterStateForUpdate) generateHeartbeats(from slpb.StoreIdent) []slpb.Message {
heartbeats := make([]slpb.Message, 0, len(rsfu.checkedIn.supportFrom))
// It's ok to read store IDs directly from rsfu.checkedIn.supportFrom since
// adding and removing stores is not allowed while there's an update in
// progress.
maxRequested := rsfu.getMeta().MaxRequested
// Assert that there are no updates in rsfu.inProgress.supportFrom to make
// sure we can iterate over rsfu.checkedIn.supportFrom in the loop below.
assert(
len(rsfu.inProgress.supportFrom) == 0, "reading from requesterStateForUpdate."+
"checkedIn.supportFrom while requesterStateForUpdate.inProgress.supportFrom is not empty",
)
for _, ss := range rsfu.checkedIn.supportFrom {
heartbeat := slpb.Message{
Type: slpb.MsgHeartbeat,
From: from,
To: ss.Target,
Epoch: ss.Epoch,
Expiration: maxRequested,
}
heartbeats = append(heartbeats, heartbeat)
}
return heartbeats
}

// Functions for handling heartbeat responses.

// handleHeartbeatResponse handles a single heartbeat response message. It
// updates the inProgress view of requesterStateForUpdate only if there are any
// changes.
func (rsfu *requesterStateForUpdate) handleHeartbeatResponse(msg slpb.Message) {
from := msg.From
meta := rsfu.getMeta()
ss, ok := rsfu.getSupportFrom(from)
if !ok {
ss = slpb.SupportState{Target: from}
}
metaNew, ssNew := handleHeartbeatResponse(meta, ss, msg)
if meta != metaNew {
rsfu.inProgress.meta = metaNew
}
if ss != ssNew {
rsfu.inProgress.supportFrom[from] = ssNew
}
}

// handleHeartbeatResponse contains the core logic for updating the epoch and
// expiration for a support provider upon receiving a heartbeat response.
func handleHeartbeatResponse(
rm slpb.RequesterMeta, ss slpb.SupportState, msg slpb.Message,
) (slpb.RequesterMeta, slpb.SupportState) {
if rm.MaxEpoch < msg.Epoch {
rm.MaxEpoch = msg.Epoch
}
if ss.Epoch == msg.Epoch {
ss.Expiration.Forward(msg.Expiration)
} else if ss.Epoch < msg.Epoch {
assert(
ss.Epoch == msg.Epoch-1,
"the supporter epoch leads the requester epoch by more than 1",
)
ss.Epoch = msg.Epoch
assert(
msg.Expiration == hlc.Timestamp{},
"the supporter responded with an incremented epoch but non-zero timestamp",
)
ss.Expiration = msg.Expiration
}
return rm, ss
}

// Functions for incrementing MaxEpoch.

// incrementMaxEpoch increments the inProgress view of MaxEpoch.
func (rsfu *requesterStateForUpdate) incrementMaxEpoch() {
currentEpoch := rsfu.getMeta().MaxEpoch
rsfu.inProgress.meta.MaxEpoch = currentEpoch + 1
}

func assert(condition bool, msg string) {
if !condition {
panic(msg)
}
}
Loading

0 comments on commit 5592437

Please sign in to comment.