Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
127262: storeliveness: supporter and requester models and transitions r=nvanbenschoten a=miraradeva

This commit introduces the Store Liveness algorithm implementation, including the data structures to store the `supportFor` and `supportFrom` state, and the functions to handle Store Liveness messages that update these data structures.

Each of the support data structures can be safely "checked out" for batch update and then "checked in", which allows to make multiple changes, write them to disk (not implemented yet) in batch first, and then reflect the changes in the in-memory data structures. During these batch updates read access to the data structures is not blocked (by holding mutexes or by synchronously waiting for a disk write), enabling quick responses to `SupportFor` and `SupportFrom` calls from the client.

Fixes: cockroachdb#125060

Release note: None

Co-authored-by: Mira Radeva <[email protected]>
  • Loading branch information
craig[bot] and miraradeva committed Aug 13, 2024
2 parents ed0cad0 + 5592437 commit 3ec1740
Show file tree
Hide file tree
Showing 11 changed files with 1,313 additions and 20 deletions.
39 changes: 20 additions & 19 deletions docs/tla-plus/StoreLiveness/StoreLiveness.tla
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ define
\* Has i ever supported j for the current epoch?
SupportForEpochValid(i, j) == EpochValid(support_for, i, j)

EpochSupportExpired(map, i, j, supporter_time) == map[i][j].expiration < supporter_time
EpochSupportExpired(map, i, j, supporter_time) == map[i][j].expiration <= supporter_time
\* Is i's support from j (according to i's support_from map) expired (according to j's clock)?
SupportFromExpired(i, j) == EpochSupportExpired(support_from, i, j, clocks[j])
\* Is i's support for j (according to i's support_for map) expired (according to i's clock)?
Expand Down Expand Up @@ -127,14 +127,15 @@ end macro
macro send_heartbeat(to)
begin
with interval \in HeartbeatIntervals do
forward(max_requested[self], clocks[self] + interval);
send_msg(to, [
type |-> MsgHeartbeat,
from |-> self,
epoch |-> support_from[self][to].epoch,
expiration |-> clocks[self] + interval,
expiration |-> max_requested[self],
now |-> clocks[self]
]);
forward(max_requested[self], clocks[self] + interval);

end with;
end macro

Expand Down Expand Up @@ -244,7 +245,7 @@ begin Loop:
\* increasing the epoch and forgetting the previous epoch's expiration.
\* We check the expiration wrt clocks[self] because, by the propagation
\* of clocks via messages, we know that clocks[self] <= clock[msg.from].
assert support_from[self][msg.from].expiration < clocks[self];
assert support_from[self][msg.from].expiration <= clocks[self];
support_from[self][msg.from].epoch := msg.epoch ||
support_from[self][msg.from].expiration := msg.expiration;
end if;
Expand All @@ -255,7 +256,7 @@ begin Loop:
end while;
end process;
end algorithm; *)
\* BEGIN TRANSLATION (chksum(pcal) = "95fcd15e" /\ chksum(tla) = "5ae5f9d3")
\* BEGIN TRANSLATION (chksum(pcal) = "891e8ae9" /\ chksum(tla) = "38884a6c")
VARIABLES max_epoch, max_requested, max_withdrawn, support_from, support_for,
clocks, network, pc

Expand All @@ -275,7 +276,7 @@ SupportFromEpochValid(i, j) == EpochValid(support_from, i, j)

SupportForEpochValid(i, j) == EpochValid(support_for, i, j)

EpochSupportExpired(map, i, j, supporter_time) == map[i][j].expiration < supporter_time
EpochSupportExpired(map, i, j, supporter_time) == map[i][j].expiration <= supporter_time

SupportFromExpired(i, j) == EpochSupportExpired(support_from, i, j, clocks[j])

Expand Down Expand Up @@ -378,7 +379,7 @@ Loop(self) == /\ pc[self] = "Loop"
ELSE /\ IF msg'[self].type = MsgHeartbeatResp
THEN /\ pc' = [pc EXCEPT ![self] = "ReceiveHeartbeatResp"]
ELSE /\ Assert(FALSE,
"Failure of assertion at line 252, column 9.")
"Failure of assertion at line 253, column 9.")
/\ pc' = [pc EXCEPT ![self] = "Loop"]
/\ UNCHANGED << max_epoch, max_requested, max_withdrawn,
support_from, support_for, restarts >>
Expand All @@ -394,25 +395,25 @@ TickClockAndSendHeartbeats(self) == /\ pc[self] = "TickClockAndSendHeartbeats"
/\ clocks' = [clocks EXCEPT ![self] = clocks[self] + 1]
/\ \E i \in Nodes \ {self}:
\E interval \in HeartbeatIntervals:
/\ IF (max_requested[self]) < (clocks'[self] + interval)
THEN /\ max_requested' = [max_requested EXCEPT ![self] = clocks'[self] + interval]
ELSE /\ TRUE
/\ UNCHANGED max_requested
/\ IF AllowMsgReordering
THEN /\ network' = [network EXCEPT ![i] = network[i] \union {( [
type |-> MsgHeartbeat,
from |-> self,
epoch |-> support_from[self][i].epoch,
expiration |-> clocks'[self] + interval,
expiration |-> max_requested'[self],
now |-> clocks'[self]
])}]
ELSE /\ network' = [network EXCEPT ![i] = Append(network[i], ( [
type |-> MsgHeartbeat,
from |-> self,
epoch |-> support_from[self][i].epoch,
expiration |-> clocks'[self] + interval,
expiration |-> max_requested'[self],
now |-> clocks'[self]
]))]
/\ IF (max_requested[self]) < (clocks'[self] + interval)
THEN /\ max_requested' = [max_requested EXCEPT ![self] = clocks'[self] + interval]
ELSE /\ TRUE
/\ UNCHANGED max_requested
/\ pc' = [pc EXCEPT ![self] = "Loop"]
/\ UNCHANGED << max_epoch, max_withdrawn,
support_from, support_for,
Expand Down Expand Up @@ -450,9 +451,9 @@ ReceiveHeartbeat(self) == /\ pc[self] = "ReceiveHeartbeat"
/\ UNCHANGED support_for
ELSE /\ IF support_for[self][msg[self].from].epoch < msg[self].epoch
THEN /\ Assert(support_for[self][msg[self].from].expiration < msg[self].expiration,
"Failure of assertion at line 219, column 13.")
"Failure of assertion at line 220, column 13.")
/\ Assert(support_for[self][msg[self].from].expiration < clocks[self],
"Failure of assertion at line 223, column 13.")
"Failure of assertion at line 224, column 13.")
/\ support_for' = [support_for EXCEPT ![self][msg[self].from].epoch = msg[self].epoch,
![self][msg[self].from].expiration = msg[self].expiration]
ELSE /\ TRUE
Expand Down Expand Up @@ -489,11 +490,11 @@ ReceiveHeartbeatResp(self) == /\ pc[self] = "ReceiveHeartbeatResp"
/\ UNCHANGED support_from
ELSE /\ IF support_from[self][msg[self].from].epoch < msg[self].epoch
THEN /\ Assert(support_from[self][msg[self].from].epoch = msg[self].epoch - 1,
"Failure of assertion at line 240, column 13.")
/\ Assert(msg[self].expiration = 0,
"Failure of assertion at line 241, column 13.")
/\ Assert(support_from[self][msg[self].from].expiration < clocks[self],
"Failure of assertion at line 247, column 13.")
/\ Assert(msg[self].expiration = 0,
"Failure of assertion at line 242, column 13.")
/\ Assert(support_from[self][msg[self].from].expiration <= clocks[self],
"Failure of assertion at line 248, column 13.")
/\ support_from' = [support_from EXCEPT ![self][msg[self].from].epoch = msg[self].epoch,
![self][msg[self].from].expiration = msg[self].expiration]
ELSE /\ TRUE
Expand Down
10 changes: 9 additions & 1 deletion pkg/kv/kvserver/storeliveness/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ go_library(
name = "storeliveness",
srcs = [
"fabric.go",
"requester_state.go",
"supporter_state.go",
"transport.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/storeliveness",
Expand All @@ -25,7 +27,11 @@ go_library(

go_test(
name = "storeliveness_test",
srcs = ["transport_test.go"],
srcs = [
"store_liveness_test.go",
"transport_test.go",
],
data = glob(["testdata/**"]),
embed = [":storeliveness"],
deps = [
"//pkg/gossip",
Expand All @@ -35,13 +41,15 @@ go_test(
"//pkg/rpc/nodedialer",
"//pkg/settings/cluster",
"//pkg/testutils",
"//pkg/testutils/datapathutils",
"//pkg/util",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/netutil",
"//pkg/util/stop",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
Expand Down
Loading

0 comments on commit 3ec1740

Please sign in to comment.