Skip to content

Commit

Permalink
Allow nextgenrepl to real-time replicate reaps (#6)
Browse files Browse the repository at this point in the history
* Allow nextgenrepl to real-time replicate reaps

This is to address the issue of reaping across sync'd clusters.  Without this feature it is necessary to disable full-sync whilst independently replicating on each cluster.

Now if reaping via riak_kv_reaper the reap will be replicated assuming the `riak_kv.repl_reap` flag has been enabled.  At the receiving cluster the reap will not be replicated any further.

There are some API changes to support this.  The `find_tombs` aae_fold will now return Keys/Clocks and not Keys/DeleteHash.  The ReapReference for riak_kv_repaer will now expect a clock (version vector) not a DeleteHash, and will also now expect an additional boolean to indicate if this repl is a replication candidate (it will be false for all pushed reaps).

The object encoding for nextgenrepl now has a flag to indicate a reap, with a special encoding for reap references.

* Update riak_object.erl

Clarify specs

* Take timestamp at correct point (after push)

* Updates following review

* Update rebar.config
  • Loading branch information
martinsumner authored Sep 14, 2023
1 parent 1c87b3e commit ca26ab2
Show file tree
Hide file tree
Showing 11 changed files with 297 additions and 145 deletions.
9 changes: 8 additions & 1 deletion priv/riak_kv.schema
Original file line number Diff line number Diff line change
Expand Up @@ -1337,13 +1337,20 @@
{default, "q1_ttaaefs:block_rtq"}
]}.

%% @doc Enable this node zlib compress objects over the wire
%% @doc Enable this node to zlib compress objects over the wire
{mapping, "replrtq_compressonwire", "riak_kv.replrtq_compressonwire", [
{datatype, {flag, enabled, disabled}},
{default, disabled},
{commented, enabled}
]}.

%% @doc Enable this node to replicate reap requests to other clusters
{mapping, "repl_reap", "riak_kv.repl_reap", [
{datatype, {flag, enabled, disabled}},
{default, disabled},
{commented, enabled}
]}.

%% @doc Enable this node to act as a sink and consume from a src cluster
{mapping, "replrtq_enablesink", "riak_kv.replrtq_enablesink", [
{datatype, {flag, enabled, disabled}},
Expand Down
47 changes: 36 additions & 11 deletions src/riak_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ replrtq_reset_all_workercounts(WorkerC, PerPeerL) ->
{ok, riak_object:riak_object()} |
{ok, queue_empty} |
{ok, {deleted, vclock:vclock(), riak_object:riak_object()}} |
{ok, {reap, {riak_object:bucket(), riak_object:key(), vclock:vclock(), erlang:timestamp()}}}|
{error, timeout} |
{error, not_yet_implemented} |
{error, Err :: term()}.
Expand Down Expand Up @@ -223,10 +224,11 @@ fetch(QueueName, {?MODULE, [Node, _ClientId]}) ->
-spec push(riak_object:riak_object()|binary(),
boolean(), list(), riak_client()) ->
{ok, erlang:timestamp()} |
{ok, reap} |
{error, too_many_fails} |
{error, timeout} |
{error, {n_val_violation, N::integer()}}.
push(RObjMaybeBin, IsDeleted, _Opts, {?MODULE, [Node, _ClientId]}) ->
push(RObjMaybeBin, IsDeleted, Opts, RiakClient) ->
RObj =
case riak_object:is_robject(RObjMaybeBin) of
% May get pushed a riak object, or a riak object as a binary, but
Expand All @@ -236,6 +238,25 @@ push(RObjMaybeBin, IsDeleted, _Opts, {?MODULE, [Node, _ClientId]}) ->
false ->
riak_object:nextgenrepl_decode(RObjMaybeBin)
end,
case RObj of
{reap, {B, K, TC, LMD}} ->
{repl_reap(B, K, TC), LMD};
RObj ->
repl_push(RObj, IsDeleted, Opts, RiakClient)
end.

-spec repl_reap(
riak_object:bucket(), riak_object:key(), vclock:vclock()) -> ok.
repl_reap(B, K, TC) ->
riak_kv_reaper:request_reap({{B, K}, TC, false}).

-spec repl_push(riak_object:riak_object()|binary(),
boolean(), list(), riak_client()) ->
{ok, erlang:timestamp()} |
{error, too_many_fails} |
{error, timeout} |
{error, {n_val_violation, N::integer()}}.
repl_push(RObj, IsDeleted, _Opts, {?MODULE, [Node, _ClientId]}) ->
Bucket = riak_object:bucket(RObj),
Key = riak_object:key(RObj),
Me = self(),
Expand Down Expand Up @@ -579,26 +600,30 @@ consistent_delete(Bucket, Key, Options, _Timeout, {?MODULE, [Node, _ClientId]})
end.


-spec reap(riak_object:bucket(), riak_object:key(), riak_client())
-> boolean().
-spec reap(
riak_object:bucket(), riak_object:key(), riak_client()) -> boolean().
reap(Bucket, Key, Client) ->
case normal_get(Bucket, Key, [deletedvclock], Client) of
{error, {deleted, TombstoneVClock}} ->
DeleteHash = riak_object:delete_hash(TombstoneVClock),
reap(Bucket, Key, DeleteHash, Client);
reap(Bucket, Key, TombstoneVClock, Client);
_Unexpected ->
false
end.

-spec reap(riak_object:bucket(), riak_object:key(), pos_integer(),
riak_client()) -> boolean().
reap(Bucket, Key, DeleteHash, {?MODULE, [Node, _ClientId]}) ->
-spec reap(
riak_object:bucket(), riak_object:key(), vclock:vclock(), riak_client())
-> boolean().
reap(Bucket, Key, TombClock, {?MODULE, [Node, _ClientId]}) ->
case node() of
Node ->
riak_kv_reaper:direct_reap({{Bucket, Key}, DeleteHash});
riak_kv_reaper:direct_reap({{Bucket, Key}, TombClock, true});
_ ->
riak_core_util:safe_rpc(Node, riak_kv_reaper, direct_reap,
[{{Bucket, Key}, DeleteHash}])
riak_core_util:safe_rpc(
Node,
riak_kv_reaper,
direct_reap,
[{{Bucket, Key}, TombClock, true}]
)
end.

%% @spec delete_vclock(riak_object:bucket(), riak_object:key(), vclock:vclock(), riak_client()) ->
Expand Down
12 changes: 7 additions & 5 deletions src/riak_kv_clusteraae_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -585,8 +585,8 @@ json_encode_results(find_keys, Result) ->
Keys = {struct, [{<<"results">>, [{struct, encode_find_key(Key, Int)} || {_Bucket, Key, Int} <- Result]}
]},
mochijson2:encode(Keys);
json_encode_results(find_tombs, Result) ->
json_encode_results(find_keys, Result);
json_encode_results(find_tombs, KeysNClocks) ->
encode_keys_and_clocks(KeysNClocks);
json_encode_results(reap_tombs, Count) ->
mochijson2:encode({struct, [{<<"dispatched_count">>, Count}]});
json_encode_results(erase_keys, Count) ->
Expand Down Expand Up @@ -616,7 +616,7 @@ pb_encode_results(merge_branch_nval, _QD, Branches) ->
level_two = L2
};
pb_encode_results(fetch_clocks_nval, _QD, KeysNClocks) ->
#rpbaaefoldkeyvalueresp{
#rpbaaefoldkeyvalueresp{
response_type = atom_to_binary(clock, unicode),
keys_value = lists:map(fun pb_encode_bucketkeyclock/1, KeysNClocks)};
pb_encode_results(merge_tree_range, QD, Tree) ->
Expand Down Expand Up @@ -662,8 +662,10 @@ pb_encode_results(find_keys, _QD, Results) ->
end,
#rpbaaefoldkeycountresp{response_type = <<"find_keys">>,
keys_count = lists:map(KeyCountMap, Results)};
pb_encode_results(find_tombs, QD, Results) ->
pb_encode_results(find_keys, QD, Results);
pb_encode_results(find_tombs, _QD, KeysNClocks) ->
#rpbaaefoldkeyvalueresp{
response_type = atom_to_binary(clock, unicode),
keys_value = lists:map(fun pb_encode_bucketkeyclock/1, KeysNClocks)};
pb_encode_results(reap_tombs, _QD, Count) ->
#rpbaaefoldkeycountresp{response_type = <<"reap_tombs">>,
keys_count =
Expand Down
7 changes: 7 additions & 0 deletions src/riak_kv_get_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,13 @@ queue_fetch(timeout, StateData) ->
Msg = {ReqID, {ok, {deleted, ExpectedClock, Obj}}},
Pid ! Msg,
ok = riak_kv_stat:update(ngrfetch_prefetch),
{stop, normal, StateData};
{Bucket, Key, TombClock, {reap, LMD}} ->
% A reap request was queued - so there is no need to fetch
% A tombstone was queued - so there is no need to fetch
Msg = {ReqID, {ok, {reap, {Bucket, Key, TombClock, LMD}}}},
Pid ! Msg,
ok = riak_kv_stat:update(ngrfetch_prefetch),
{stop, normal, StateData}
end.

Expand Down
70 changes: 44 additions & 26 deletions src/riak_kv_pb_object.erl
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,19 @@ process(#rpbfetchreq{queuename = QueueName, encoding = EncodingBin},
case Result of
{ok, queue_empty} ->
{reply, #rpbfetchresp{queue_empty = true}, State};
{ok, {reap, {B, K, TC, LMD}}} ->
EncObj =
riak_object:nextgenrepl_encode(
repl_v1, {reap, {B, K, TC, LMD}}, false),
CRC32 = erlang:crc32(EncObj),
Resp =
#rpbfetchresp{
queue_empty = false,
replencoded_object = EncObj,
crc_check = CRC32},
{reply,
encode_nextgenrepl_response(Encoding, Resp, {B, K, TC}),
State};
{ok, {deleted, TombClock, RObj}} ->
% Never bother compressing tombstones, they're practically empty
EncObj = riak_object:nextgenrepl_encode(repl_v1, RObj, false),
Expand All @@ -212,18 +225,7 @@ process(#rpbfetchreq{queuename = QueueName, encoding = EncodingBin},
replencoded_object = EncObj,
crc_check = CRC32,
deleted_vclock = pbify_rpbvc(TombClock)},
case Encoding of
internal ->
{reply, Resp, State};
internal_aaehash ->
BK = make_binarykey(RObj),
{SegID, SegHash} =
leveled_tictac:tictac_hash(BK, lists:sort(TombClock)),
{reply,
Resp#rpbfetchresp{segment_id = SegID,
segment_hash = SegHash},
State}
end;
{reply, encode_nextgenrepl_response(Encoding, Resp, RObj), State};
{ok, RObj} ->
EncObj = riak_object:nextgenrepl_encode(repl_v1, RObj, ToCompress),
CRC32 = erlang:crc32(EncObj),
Expand All @@ -232,19 +234,7 @@ process(#rpbfetchreq{queuename = QueueName, encoding = EncodingBin},
deleted = false,
replencoded_object = EncObj,
crc_check = CRC32},
case Encoding of
internal ->
{reply, Resp, State};
internal_aaehash ->
BK = make_binarykey(RObj),
Clock = lists:sort(riak_object:vclock(RObj)),
{SegID, SegHash} =
leveled_tictac:tictac_hash(BK, Clock),
{reply,
Resp#rpbfetchresp{segment_id = SegID,
segment_hash = SegHash},
State}
end;
{reply, encode_nextgenrepl_response(Encoding, Resp, RObj), State};
{error, Reason} ->
{error, {format, Reason}, State}
end;
Expand Down Expand Up @@ -443,7 +433,35 @@ process_stream(_,_,State) ->
%% Internal functions
%% ===================================================================

-spec make_binarykey(riak_object:riak_object()) -> binary().
-spec encode_nextgenrepl_response(
intenal|internal_aaehash,
#rpbfetchresp{},
riak_object:riak_object()|
{riak_object:bucket(), riak_object:key(), vclock:vclock()})
-> #rpbfetchresp{}.
encode_nextgenrepl_response(Encoding, Resp, RObj) ->
case Encoding of
internal ->
Resp;
internal_aaehash ->
{SegID, SegHash} =
case RObj of
{B, K, TC} ->
BK = make_binarykey({B, K}),
leveled_tictac:tictac_hash(BK, lists:sort(TC));
RObj ->
BK = make_binarykey(RObj),
leveled_tictac:tictac_hash(
BK, lists:sort(riak_object:vclock(RObj)))
end,
Resp#rpbfetchresp{segment_id = SegID, segment_hash = SegHash}
end.

-spec make_binarykey(
riak_object:riak_object()|{riak_object:bucket(), riak_object:key()})
-> binary().
make_binarykey({B, K}) ->
make_binarykey(B, K);
make_binarykey(RObj) ->
make_binarykey(riak_object:bucket(RObj), riak_object:key(RObj)).

Expand Down
28 changes: 25 additions & 3 deletions src/riak_kv_reaper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,12 @@
redo/0]).

-type reap_reference() ::
{{riak_object:bucket(), riak_object:key()}, non_neg_integer()}.
{{riak_object:bucket(), riak_object:key()}, vclock:vclock(), boolean()}.
%% the reap reference is {Bucket, Key, Clock (of tombstone), Forward}. The
%% Forward boolean() indicates if this reap should be replicated if
%% riak_kv.repl_reap is true. When a reap is received via replication
%% Forward should be set to false, to prevent reaps from perpetually
%% circulating
-type job_id() :: pos_integer().

-export_type([reap_reference/0, job_id/0]).
Expand Down Expand Up @@ -149,7 +154,7 @@ get_limits() ->
%% we will not redo - redo is only to handle the failure related to unavailable
%% primaries
-spec action(reap_reference(), boolean()) -> boolean().
action({{Bucket, Key}, DeleteHash}, Redo) ->
action({{Bucket, Key}, TombClock, ToRepl}, Redo) ->
BucketProps = riak_core_bucket:get_bucket(Bucket),
DocIdx = riak_core_util:chash_key({Bucket, Key}, BucketProps),
{n_val, N} = lists:keyfind(n_val, 1, BucketProps),
Expand All @@ -160,7 +165,11 @@ action({{Bucket, Key}, DeleteHash}, Redo) ->
PL0 = lists:map(fun({Target, primary}) -> Target end, PrefList),
case check_all_mailboxes(PL0) of
ok ->
riak_kv_vnode:reap(PL0, {Bucket, Key}, DeleteHash),
riak_kv_vnode:reap(
PL0,
{Bucket, Key},
riak_object:delete_hash(TombClock)),
maybe_repl_reap(Bucket, Key, TombClock, ToRepl),
timer:sleep(TombPause),
true;
soft_loaded ->
Expand All @@ -171,6 +180,7 @@ action({{Bucket, Key}, DeleteHash}, Redo) ->
if Redo -> false; true -> true end
end.


-spec redo() -> boolean().
redo() -> true.

Expand All @@ -180,6 +190,18 @@ redo() -> true.

-type preflist_entry() :: {non_neg_integer(), node()}.

-spec maybe_repl_reap(
riak_object:bucket(), riak_object:key(), vclock:vclock(), boolean()) -> ok.
maybe_repl_reap(Bucket, Key, TombClock, ToReap) ->
case application:get_env(riak_kv, repl_reap, false) and ToReap of
true ->
riak_kv_replrtq_src:replrtq_reap(
Bucket, Key, TombClock, os:timestamp());
false ->
ok
end.


%% Protect against overloading the system when not reaping should any
%% mailbox be in soft overload state
-spec check_all_mailboxes(list(preflist_entry())) -> ok|soft_loaded.
Expand Down
30 changes: 19 additions & 11 deletions src/riak_kv_replrtq_snk.erl
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,10 @@
% Modified time by bucket - second, minute, hour, day, longer}

-type reply_tuple() ::
{queue_empty, non_neg_integer()} |
{tomb, non_neg_integer(), non_neg_integer(), non_neg_integer()} |
{object, non_neg_integer(), non_neg_integer(), non_neg_integer()} |
{error, any(), any()}.
{queue_empty, non_neg_integer()}|
{tomb, non_neg_integer(), non_neg_integer(), non_neg_integer()}|
{object, non_neg_integer(), non_neg_integer(), non_neg_integer()}|
{error, any(), any()}.

-export_type([peer_info/0, queue_name/0]).

Expand Down Expand Up @@ -723,9 +723,9 @@ repl_fetcher(WorkItem) ->
SWFetched = os:timestamp(),
{ok, LMD} = riak_client:push(RObj, false, [], LocalClient),
SWPushed = os:timestamp(),
ModSplit = timer:now_diff(SWPushed, LMD),
FetchSplit = timer:now_diff(SWFetched, SW),
PushSplit = timer:now_diff(SWPushed, SWFetched),
ModSplit = timer:now_diff(SWPushed, LMD),
ok = riak_kv_stat:update(ngrrepl_object),
done_work(WorkItem, true,
{object, FetchSplit, PushSplit, ModSplit});
Expand All @@ -750,9 +750,16 @@ repl_fetcher(WorkItem) ->
done_work(UpdWorkItem, false, {error, error, remote_error})
end
catch
Type:Exception ->
lager:warning("Snk worker failed at Peer ~w due to ~w error ~w",
[Peer, Type, Exception]),
Type:Exception:Stk ->
lager:warning(
"Snk worker failed at Peer ~w due to ~w error ~w",
[Peer, Type, Exception]),
case app_helper:get_env(riak_kv, log_snk_stacktrace, false) of
true ->
lager:warning("Snk worker failed due to ~p", [Stk]);
_ ->
ok
end,
RemoteFun(close),
UpdWorkItem0 = setelement(3, WorkItem, RenewClientFun()),
ok = riak_kv_stat:update(ngrrepl_error),
Expand Down Expand Up @@ -797,8 +804,9 @@ add_success({{success, Success}, F, FT, PT, RT, MT}) ->
add_failure({S, {failure, Failure}, FT, PT, RT, MT}) ->
{S, {failure, Failure + 1}, FT, PT, RT, MT}.

-spec add_repltime(queue_stats(),
{integer(), integer(), integer()}) -> queue_stats().
-spec add_repltime(
queue_stats(), {non_neg_integer(), non_neg_integer(), non_neg_integer()})
-> queue_stats().
add_repltime({S,
F,
{replfetch_time, FT}, {replpush_time, PT}, {replmod_time, RT},
Expand All @@ -810,7 +818,7 @@ add_repltime({S,
{replmod_time, RT + RT0},
MT}.

-spec add_modtime(queue_stats(), integer()) -> queue_stats().
-spec add_modtime(queue_stats(), non_neg_integer()) -> queue_stats().
add_modtime({S, F, FT, PT, RT, MT}, ModTime) ->
E = mod_split_element(ModTime div 1000) + 1,
C = element(E, MT),
Expand Down
Loading

0 comments on commit ca26ab2

Please sign in to comment.