Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow nextgenrepl to real-time replicate reaps #6

Merged
merged 5 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,5 @@
{riak_api, {git, "https://github.com/nhs-riak/riak_api.git", {branch, "nhse-develop-3.0"}}},
{hyper, {git, "https://github.com/nhs-riak/hyper", {branch, "nhse-develop-3.0"}}},
{kv_index_tictactree, {git, "https://github.com/nhs-riak/kv_index_tictactree.git", {branch, "nhse-develop-3.0"}}},
{riakhttpc, {git, "https://github.com/nhs-riak/riak-erlang-http-client", {branch, "nhse-develop-3.0"}}}
{riakhttpc, {git, "https://github.com/nhs-riak/riak-erlang-http-client", {branch, "nhse-d30-nhskv5"}}}
]}.
47 changes: 36 additions & 11 deletions src/riak_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ replrtq_reset_all_workercounts(WorkerC, PerPeerL) ->
{ok, riak_object:riak_object()} |
{ok, queue_empty} |
{ok, {deleted, vclock:vclock(), riak_object:riak_object()}} |
{ok, {reap, {riak_object:bucket(), riak_object:key(), vclock:vclock(), erlang:timestamp()}}}|
{error, timeout} |
{error, not_yet_implemented} |
{error, Err :: term()}.
Expand Down Expand Up @@ -223,10 +224,11 @@ fetch(QueueName, {?MODULE, [Node, _ClientId]}) ->
-spec push(riak_object:riak_object()|binary(),
martinsumner marked this conversation as resolved.
Show resolved Hide resolved
boolean(), list(), riak_client()) ->
{ok, erlang:timestamp()} |
{ok, reap} |
{error, too_many_fails} |
{error, timeout} |
{error, {n_val_violation, N::integer()}}.
push(RObjMaybeBin, IsDeleted, _Opts, {?MODULE, [Node, _ClientId]}) ->
push(RObjMaybeBin, IsDeleted, Opts, RiakClient) ->
RObj =
case riak_object:is_robject(RObjMaybeBin) of
% May get pushed a riak object, or a riak object as a binary, but
Expand All @@ -236,6 +238,25 @@ push(RObjMaybeBin, IsDeleted, _Opts, {?MODULE, [Node, _ClientId]}) ->
false ->
riak_object:nextgenrepl_decode(RObjMaybeBin)
end,
case RObj of
{reap, {B, K, TC, LMD}} ->
{repl_reap(B, K, TC), LMD};
RObj ->
repl_push(RObj, IsDeleted, Opts, RiakClient)
end.

-spec repl_reap(
riak_object:bucket(), riak_object:key(), vclock:vclock()) -> ok.
repl_reap(B, K, TC) ->
riak_kv_reaper:request_reap({{B, K}, TC, false}).

-spec repl_push(riak_object:riak_object()|binary(),
boolean(), list(), riak_client()) ->
{ok, erlang:timestamp()} |
{error, too_many_fails} |
{error, timeout} |
{error, {n_val_violation, N::integer()}}.
repl_push(RObj, IsDeleted, _Opts, {?MODULE, [Node, _ClientId]}) ->
Bucket = riak_object:bucket(RObj),
Key = riak_object:key(RObj),
Me = self(),
Expand Down Expand Up @@ -579,26 +600,30 @@ consistent_delete(Bucket, Key, Options, _Timeout, {?MODULE, [Node, _ClientId]})
end.


-spec reap(riak_object:bucket(), riak_object:key(), riak_client())
-> boolean().
-spec reap(
riak_object:bucket(), riak_object:key(), riak_client()) -> boolean().
reap(Bucket, Key, Client) ->
case normal_get(Bucket, Key, [deletedvclock], Client) of
{error, {deleted, TombstoneVClock}} ->
DeleteHash = riak_object:delete_hash(TombstoneVClock),
reap(Bucket, Key, DeleteHash, Client);
reap(Bucket, Key, TombstoneVClock, Client);
_Unexpected ->
false
end.

-spec reap(riak_object:bucket(), riak_object:key(), pos_integer(),
riak_client()) -> boolean().
reap(Bucket, Key, DeleteHash, {?MODULE, [Node, _ClientId]}) ->
-spec reap(
riak_object:bucket(), riak_object:key(), vclock:vclock(), riak_client())
-> boolean().
reap(Bucket, Key, TombClock, {?MODULE, [Node, _ClientId]}) ->
martinsumner marked this conversation as resolved.
Show resolved Hide resolved
case node() of
Node ->
riak_kv_reaper:direct_reap({{Bucket, Key}, DeleteHash});
riak_kv_reaper:direct_reap({{Bucket, Key}, TombClock, true});
_ ->
riak_core_util:safe_rpc(Node, riak_kv_reaper, direct_reap,
[{{Bucket, Key}, DeleteHash}])
riak_core_util:safe_rpc(
Node,
riak_kv_reaper,
direct_reap,
[{{Bucket, Key}, TombClock, true}]
)
end.

%% @spec delete_vclock(riak_object:bucket(), riak_object:key(), vclock:vclock(), riak_client()) ->
Expand Down
12 changes: 7 additions & 5 deletions src/riak_kv_clusteraae_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -585,8 +585,8 @@ json_encode_results(find_keys, Result) ->
Keys = {struct, [{<<"results">>, [{struct, encode_find_key(Key, Int)} || {_Bucket, Key, Int} <- Result]}
]},
mochijson2:encode(Keys);
json_encode_results(find_tombs, Result) ->
json_encode_results(find_keys, Result);
json_encode_results(find_tombs, KeysNClocks) ->
encode_keys_and_clocks(KeysNClocks);
json_encode_results(reap_tombs, Count) ->
mochijson2:encode({struct, [{<<"dispatched_count">>, Count}]});
json_encode_results(erase_keys, Count) ->
Expand Down Expand Up @@ -616,7 +616,7 @@ pb_encode_results(merge_branch_nval, _QD, Branches) ->
level_two = L2
};
pb_encode_results(fetch_clocks_nval, _QD, KeysNClocks) ->
#rpbaaefoldkeyvalueresp{
#rpbaaefoldkeyvalueresp{
response_type = atom_to_binary(clock, unicode),
keys_value = lists:map(fun pb_encode_bucketkeyclock/1, KeysNClocks)};
pb_encode_results(merge_tree_range, QD, Tree) ->
Expand Down Expand Up @@ -662,8 +662,10 @@ pb_encode_results(find_keys, _QD, Results) ->
end,
#rpbaaefoldkeycountresp{response_type = <<"find_keys">>,
keys_count = lists:map(KeyCountMap, Results)};
pb_encode_results(find_tombs, QD, Results) ->
pb_encode_results(find_keys, QD, Results);
pb_encode_results(find_tombs, _QD, KeysNClocks) ->
#rpbaaefoldkeyvalueresp{
response_type = atom_to_binary(clock, unicode),
keys_value = lists:map(fun pb_encode_bucketkeyclock/1, KeysNClocks)};
pb_encode_results(reap_tombs, _QD, Count) ->
#rpbaaefoldkeycountresp{response_type = <<"reap_tombs">>,
keys_count =
Expand Down
7 changes: 7 additions & 0 deletions src/riak_kv_get_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,13 @@ queue_fetch(timeout, StateData) ->
Msg = {ReqID, {ok, {deleted, ExpectedClock, Obj}}},
Pid ! Msg,
ok = riak_kv_stat:update(ngrfetch_prefetch),
{stop, normal, StateData};
{Bucket, Key, TombClock, {reap, LMD}} ->
% A reap request was queued - so there is no need to fetch
% A tombstone was queued - so there is no need to fetch
Msg = {ReqID, {ok, {reap, {Bucket, Key, TombClock, LMD}}}},
Pid ! Msg,
ok = riak_kv_stat:update(ngrfetch_prefetch),
{stop, normal, StateData}
end.

Expand Down
70 changes: 44 additions & 26 deletions src/riak_kv_pb_object.erl
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,19 @@ process(#rpbfetchreq{queuename = QueueName, encoding = EncodingBin},
case Result of
{ok, queue_empty} ->
{reply, #rpbfetchresp{queue_empty = true}, State};
{ok, {reap, {B, K, TC, LMD}}} ->
EncObj =
riak_object:nextgenrepl_encode(
repl_v1, {reap, {B, K, TC, LMD}}, false),
CRC32 = erlang:crc32(EncObj),
Resp =
#rpbfetchresp{
queue_empty = false,
replencoded_object = EncObj,
crc_check = CRC32},
{reply,
encode_nextgenrepl_response(Encoding, Resp, {B, K, TC}),
State};
{ok, {deleted, TombClock, RObj}} ->
% Never bother compressing tombstones, they're practically empty
EncObj = riak_object:nextgenrepl_encode(repl_v1, RObj, false),
Expand All @@ -212,18 +225,7 @@ process(#rpbfetchreq{queuename = QueueName, encoding = EncodingBin},
replencoded_object = EncObj,
crc_check = CRC32,
deleted_vclock = pbify_rpbvc(TombClock)},
case Encoding of
internal ->
{reply, Resp, State};
internal_aaehash ->
BK = make_binarykey(RObj),
{SegID, SegHash} =
leveled_tictac:tictac_hash(BK, lists:sort(TombClock)),
{reply,
Resp#rpbfetchresp{segment_id = SegID,
segment_hash = SegHash},
State}
end;
{reply, encode_nextgenrepl_response(Encoding, Resp, RObj), State};
{ok, RObj} ->
EncObj = riak_object:nextgenrepl_encode(repl_v1, RObj, ToCompress),
CRC32 = erlang:crc32(EncObj),
Expand All @@ -232,19 +234,7 @@ process(#rpbfetchreq{queuename = QueueName, encoding = EncodingBin},
deleted = false,
replencoded_object = EncObj,
crc_check = CRC32},
case Encoding of
internal ->
{reply, Resp, State};
internal_aaehash ->
BK = make_binarykey(RObj),
Clock = lists:sort(riak_object:vclock(RObj)),
{SegID, SegHash} =
leveled_tictac:tictac_hash(BK, Clock),
{reply,
Resp#rpbfetchresp{segment_id = SegID,
segment_hash = SegHash},
State}
end;
{reply, encode_nextgenrepl_response(Encoding, Resp, RObj), State};
{error, Reason} ->
{error, {format, Reason}, State}
end;
Expand Down Expand Up @@ -443,7 +433,35 @@ process_stream(_,_,State) ->
%% Internal functions
%% ===================================================================

-spec make_binarykey(riak_object:riak_object()) -> binary().
-spec encode_nextgenrepl_response(
intenal|internal_aaehash,
#rpbfetchresp{},
riak_object:riak_object()|
{riak_object:bucket(), riak_object:key(), vclock:vclock()})
-> #rpbfetchresp{}.
encode_nextgenrepl_response(Encoding, Resp, RObj) ->
case Encoding of
internal ->
Resp;
internal_aaehash ->
{SegID, SegHash} =
case RObj of
{B, K, TC} ->
BK = make_binarykey({B, K}),
leveled_tictac:tictac_hash(BK, lists:sort(TC));
RObj ->
BK = make_binarykey(RObj),
leveled_tictac:tictac_hash(
BK, lists:sort(riak_object:vclock(RObj)))
end,
Resp#rpbfetchresp{segment_id = SegID, segment_hash = SegHash}
end.

-spec make_binarykey(
riak_object:riak_object()|{riak_object:bucket(), riak_object:key()})
-> binary().
make_binarykey({B, K}) ->
make_binarykey(B, K);
make_binarykey(RObj) ->
make_binarykey(riak_object:bucket(RObj), riak_object:key(RObj)).

Expand Down
23 changes: 20 additions & 3 deletions src/riak_kv_reaper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
redo/0]).

-type reap_reference() ::
{{riak_object:bucket(), riak_object:key()}, non_neg_integer()}.
{{riak_object:bucket(), riak_object:key()}, vclock:vclock(), boolean()}.
-type job_id() :: pos_integer().

-export_type([reap_reference/0, job_id/0]).
Expand Down Expand Up @@ -149,7 +149,7 @@ get_limits() ->
%% we will not redo - redo is only to handle the failure related to unavailable
%% primaries
-spec action(reap_reference(), boolean()) -> boolean().
action({{Bucket, Key}, DeleteHash}, Redo) ->
action({{Bucket, Key}, TombClock, ToRepl}, Redo) ->
BucketProps = riak_core_bucket:get_bucket(Bucket),
DocIdx = riak_core_util:chash_key({Bucket, Key}, BucketProps),
{n_val, N} = lists:keyfind(n_val, 1, BucketProps),
Expand All @@ -160,7 +160,11 @@ action({{Bucket, Key}, DeleteHash}, Redo) ->
PL0 = lists:map(fun({Target, primary}) -> Target end, PrefList),
case check_all_mailboxes(PL0) of
ok ->
riak_kv_vnode:reap(PL0, {Bucket, Key}, DeleteHash),
riak_kv_vnode:reap(
PL0,
{Bucket, Key},
riak_object:delete_hash(TombClock)),
maybe_repl_reap(Bucket, Key, TombClock, ToRepl),
timer:sleep(TombPause),
true;
soft_loaded ->
Expand All @@ -171,6 +175,7 @@ action({{Bucket, Key}, DeleteHash}, Redo) ->
if Redo -> false; true -> true end
end.


-spec redo() -> boolean().
redo() -> true.

Expand All @@ -180,6 +185,18 @@ redo() -> true.

-type preflist_entry() :: {non_neg_integer(), node()}.

-spec maybe_repl_reap(
riak_object:bucket(), riak_object:key(), vclock:vclock(), boolean()) -> ok.
maybe_repl_reap(Bucket, Key, TombClock, ToReap) ->
case application:get_env(riak_kv, repl_reap, false) and ToReap of
true ->
riak_kv_replrtq_src:replrtq_reap(
Bucket, Key, TombClock, os:timestamp());
false ->
ok
end.


%% Protect against overloading the system when not reaping should any
%% mailbox be in soft overload state
-spec check_all_mailboxes(list(preflist_entry())) -> ok|soft_loaded.
Expand Down
32 changes: 20 additions & 12 deletions src/riak_kv_replrtq_snk.erl
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,10 @@
% Modified time by bucket - second, minute, hour, day, longer}

-type reply_tuple() ::
{queue_empty, non_neg_integer()} |
{tomb, non_neg_integer(), non_neg_integer(), non_neg_integer()} |
{object, non_neg_integer(), non_neg_integer(), non_neg_integer()} |
{error, any(), any()}.
{queue_empty, non_neg_integer()}|
{tomb, non_neg_integer(), non_neg_integer(), non_neg_integer()}|
{object, non_neg_integer(), non_neg_integer(), non_neg_integer()}|
{error, any(), any()}.

-export_type([peer_info/0, queue_name/0]).

Expand Down Expand Up @@ -721,11 +721,11 @@ repl_fetcher(WorkItem) ->
{tomb, FetchSplit, PushSplit, ModSplit});
{ok, RObj} ->
SWFetched = os:timestamp(),
{ok, LMD} = riak_client:push(RObj, false, [], LocalClient),
SWPushed = os:timestamp(),
ModSplit = timer:now_diff(SWPushed, LMD),
FetchSplit = timer:now_diff(SWFetched, SW),
PushSplit = timer:now_diff(SWPushed, SWFetched),
{ok, LMD} = riak_client:push(RObj, false, [], LocalClient),
martinsumner marked this conversation as resolved.
Show resolved Hide resolved
ModSplit =timer:now_diff(SWPushed, LMD),
ok = riak_kv_stat:update(ngrrepl_object),
done_work(WorkItem, true,
{object, FetchSplit, PushSplit, ModSplit});
Expand All @@ -750,9 +750,16 @@ repl_fetcher(WorkItem) ->
done_work(UpdWorkItem, false, {error, error, remote_error})
end
catch
Type:Exception ->
lager:warning("Snk worker failed at Peer ~w due to ~w error ~w",
[Peer, Type, Exception]),
Type:Exception:Stk ->
lager:warning(
"Snk worker failed at Peer ~w due to ~w error ~w",
[Peer, Type, Exception]),
case app_helper:get_env(riak_kv, log_snk_stacktrace, false) of
true ->
lager:warning("Snk worker failed due to ~p", [Stk]);
_ ->
ok
end,
RemoteFun(close),
UpdWorkItem0 = setelement(3, WorkItem, RenewClientFun()),
ok = riak_kv_stat:update(ngrrepl_error),
Expand Down Expand Up @@ -797,8 +804,9 @@ add_success({{success, Success}, F, FT, PT, RT, MT}) ->
add_failure({S, {failure, Failure}, FT, PT, RT, MT}) ->
{S, {failure, Failure + 1}, FT, PT, RT, MT}.

-spec add_repltime(queue_stats(),
{integer(), integer(), integer()}) -> queue_stats().
-spec add_repltime(
queue_stats(), {non_neg_integer(), non_neg_integer(), non_neg_integer()})
-> queue_stats().
add_repltime({S,
F,
{replfetch_time, FT}, {replpush_time, PT}, {replmod_time, RT},
Expand All @@ -810,7 +818,7 @@ add_repltime({S,
{replmod_time, RT + RT0},
MT}.

-spec add_modtime(queue_stats(), integer()) -> queue_stats().
-spec add_modtime(queue_stats(), non_neg_integer()) -> queue_stats().
add_modtime({S, F, FT, PT, RT, MT}, ModTime) ->
E = mod_split_element(ModTime div 1000) + 1,
C = element(E, MT),
Expand Down
Loading