Skip to content

Commit

Permalink
config streaming (#254)
Browse files Browse the repository at this point in the history
* allow longer for updating routes with lots of data

* split apart initial adding of skf and updating

SKF tables do not have write_concurrency because of the memory overhead.
The process of "adding" a SKF involves removing other copies from the
table with a select_delete/2. For sufficiently large tables of SKF this
would bottleneck quite badly and slam all of the processing available.

Instead, on starting up the connection we don't try to remove any skfs,
we know we'll receive only `adds` from the config service for the
initial dump of the db, and there's a fairly consistent stream of
`removes` coming in. Once we hit one of those it can be assumed we're
done with the initial loading of data and we can go back to removing
SKFs as they come in to be updated.

* fix eunit complaining
  • Loading branch information
michaeldjeffrey authored Aug 31, 2023
1 parent 86123f5 commit 2a36195
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 43 deletions.
77 changes: 46 additions & 31 deletions src/grpc/iot_config/hpr_route_ets.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
lookup_devaddr_range/1,

insert_skf/1,
insert_new_skf/1,
update_skf/4,
delete_skf/1,
lookup_skf/2,
Expand Down Expand Up @@ -305,12 +306,33 @@ lookup_devaddr_range(DevAddr) ->
-spec insert_skf(SKF :: hpr_skf:skf()) -> ok.
insert_skf(SKF) ->
RouteID = hpr_skf:route_id(SKF),
MD = skf_md(RouteID, SKF),
case ?MODULE:lookup_route(RouteID) of
[#hpr_route_ets{skf_ets = SKFETS}] ->
do_insert_skf(RouteID, SKFETS, SKF);
Deleted = do_remove_skf(SKFETS, SKF),
do_insert_skf(SKFETS, SKF),
case Deleted of
0 -> lager:debug(MD, "inserted SKF");
_ -> lager:debug([{deleted, Deleted} | MD], "updated SKF")
end;
_Other ->
lager:error("failed to insert skf table not found ~p for ~s", [
_Other, RouteID
lager:error(MD, "failed to insert skf table not found ~p", [
_Other
])
end,
ok.

-spec insert_new_skf(SKF :: hpr_skf:skf()) -> ok.
insert_new_skf(SKF) ->
RouteID = hpr_skf:route_id(SKF),
MD = skf_md(RouteID, SKF),
case ?MODULE:lookup_route(RouteID) of
[#hpr_route_ets{skf_ets = SKFETS}] ->
do_insert_skf(SKFETS, SKF),
lager:debug(MD, "inserted SKF");
_Other ->
lager:error(MD, "failed to insert new skf, tabl not found ~p", [
_Other
])
end,
ok.
Expand Down Expand Up @@ -445,7 +467,7 @@ replace_route_skfs(RouteID, NewSKFs) ->
{write_concurrency, true},
{heir, erlang:whereis(?SKF_HEIR), RouteID}
]),
lists:foreach(fun(SKF) -> do_insert_skf(RouteID, NewTab, SKF) end, NewSKFs),
lists:foreach(fun(SKF) -> do_insert_skf(NewTab, SKF) end, NewSKFs),
true = ets:insert(?ETS_ROUTES, Route#hpr_route_ets{skf_ets = NewTab}),

OldSize = ets:info(OldTab, size),
Expand Down Expand Up @@ -534,41 +556,34 @@ skfs_count_for_route(RouteID) ->
%% Internal Function Definitions
%% ------------------------------------------------------------------

-spec do_insert_skf(hpr_route:id(), ets:table(), hpr_skf:skf()) -> ok.
do_insert_skf(RouteID, SKFETS, SKF) ->
-spec do_remove_skf(ets:table(), hpr_skf:skf()) ->
DeletedCount :: non_neg_integer().
do_remove_skf(SKFETS, SKF) ->
DevAddr = hpr_skf:devaddr(SKF),
SessionKey = hpr_skf:session_key(SKF),
MaxCopies = hpr_skf:max_copies(SKF),
MS = [{{{'_', hpr_utils:hex_to_bin(SessionKey)}, {DevAddr, '_'}}, [], [true]}],
Deleted = ets:select_delete(SKFETS, MS),
ets:select_delete(SKFETS, MS).

-spec do_insert_skf(ets:table(), hpr_skf:skf()) -> ok.
do_insert_skf(SKFETS, SKF) ->
DevAddr = hpr_skf:devaddr(SKF),
SessionKey = hpr_skf:session_key(SKF),
MaxCopies = hpr_skf:max_copies(SKF),
%% This is negative to make newest time on top
Now = erlang:system_time(millisecond) * -1,
true = ets:insert(SKFETS, {
{Now, hpr_utils:hex_to_bin(SessionKey)}, {DevAddr, MaxCopies}
}),
case Deleted of
0 ->
lager:debug(
[
{route_id, RouteID},
{devaddr, hpr_utils:int_to_hex_string(DevAddr)},
{session_key, SessionKey},
{max_copies, MaxCopies}
],
"inserted SKF"
);
X ->
lager:debug(
[
{route_id, RouteID},
{devaddr, hpr_utils:int_to_hex_string(DevAddr)},
{session_key, SessionKey},
{max_copies, MaxCopies}
],
"updated SKF (~w)",
[X]
)
end.
ok.

-spec skf_md(hpr_route:id(), hpr_skf:skf()) -> proplists:proplist().
skf_md(RouteID, SKF) ->
[
{route_id, RouteID},
{devaddr, hpr_utils:int_to_hex_string(hpr_skf:devaddr(SKF))},
{session_key, hpr_skf:session_key(SKF)},
{max_copies, hpr_skf:max_copies(SKF)}
].

%% ------------------------------------------------------------------
%% EUnit tests
Expand Down
70 changes: 59 additions & 11 deletions src/grpc/iot_config/hpr_route_stream_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,28 @@
devaddr_added := non_neg_integer()
}.

%% There is no indicator for the stream that tells us everything from the
%% database has been loaded and the data has moved to updates. But we do know
%% that the db will send everything as an `add' action. Meaning the first
%% `remove' can be assumed as the start of the updating part of the stream.
-type stream_status() ::
% worker has started but not initialized
undefined
% successful connection
| awaiting_data
% initial loading of configuration
| loading
% first remove received, switching to update
| updating
% channel is not defined
| undefined_channel
% could not connect to config service
| error_connecting.

-record(state, {
stream :: grpcbox_client:stream() | undefined,
conn_backoff :: backoff:backoff(),
stream_awaiting_data = false :: boolean()
stream_status :: stream_status()
}).

-define(SERVER, ?MODULE).
Expand All @@ -120,7 +138,7 @@ start_link(Args) ->

-spec refresh_route(hpr_route:id()) -> {ok, refresh_map()} | {error, any()}.
refresh_route(RouteID) ->
gen_server:call(?MODULE, {refresh_route, RouteID}).
gen_server:call(?MODULE, {refresh_route, RouteID}, timer:seconds(120)).

%% ------------------------------------------------------------------
%% gen_server Function Definitions
Expand All @@ -133,7 +151,7 @@ init(Args) ->
{ok, #state{
stream = undefined,
conn_backoff = Backoff,
stream_awaiting_data = false
stream_status = undefined
}}.

handle_call({refresh_route, RouteID}, _From, State) ->
Expand Down Expand Up @@ -186,31 +204,44 @@ handle_info(?INIT_STREAM, #state{conn_backoff = Backoff0} = State) ->
lager:info("stream initialized"),
{_, Backoff1} = backoff:succeed(Backoff0),
{noreply, State#state{
stream = Stream, conn_backoff = Backoff1, stream_awaiting_data = true
stream = Stream, conn_backoff = Backoff1, stream_status = awaiting_data
}};
{error, undefined_channel} ->
lager:error(
"`iot_config_channel` is not defined, or not started. Not attempting to reconnect."
),
{noreply, State};
{noreply, State#state{stream_status = undefined_channel}};
{error, _E} ->
{Delay, Backoff1} = backoff:fail(Backoff0),
lager:error("failed to get stream sleeping ~wms : ~p", [Delay, _E]),
_ = erlang:send_after(Delay, self(), ?INIT_STREAM),
{noreply, State#state{conn_backoff = Backoff1}}
{noreply, State#state{conn_backoff = Backoff1, stream_status = error_connecting}}
end;
%% GRPC stream callbacks
handle_info({data, _, _} = Msg, #state{stream_awaiting_data = true} = State) ->
handle_info({data, _, _} = Msg, #state{stream_status = awaiting_data} = State) ->
%% Only delete route_ets when we start receiving data.
ok = hpr_route_ets:delete_all(),
?MODULE:handle_info(Msg, State#state{stream_awaiting_data = false});
handle_info({data, _StreamID, RouteStreamRes}, State) ->
ok = start_loading(),
?MODULE:handle_info(Msg, State#state{stream_status = loading});
handle_info(
{data, _StreamID, RouteStreamRes},
#state{stream_status = StreamStatus0} = State
) ->
Action = hpr_route_stream_res:action(RouteStreamRes),
Data = hpr_route_stream_res:data(RouteStreamRes),
{Type, _} = Data,
StreamStatus1 =
case {Action, StreamStatus0} of
{remove, loading} ->
lager:info("first remove, done loading"),
ok = done_loading(),
updating;
_ ->
StreamStatus0
end,
lager:debug([{action, Action}, {type, Type}], "got route stream update"),
_ = erlang:spawn(fun() -> ok = process_route_stream_res(Action, Data) end),
{noreply, State};
{noreply, State#state{stream_status = StreamStatus1}};
handle_info({headers, _StreamID, _Headers}, State) ->
%% noop on headers
{noreply, State};
Expand Down Expand Up @@ -284,7 +315,12 @@ process_route_stream_res(add, {eui_pair, EUIPair}) ->
process_route_stream_res(add, {devaddr_range, DevAddrRange}) ->
hpr_route_ets:insert_devaddr_range(DevAddrRange);
process_route_stream_res(add, {skf, SKF}) ->
hpr_route_ets:insert_skf(SKF);
case is_done_with_initial_loading() of
true ->
hpr_route_ets:insert_skf(SKF);
false ->
hpr_route_ets:insert_new_skf(SKF)
end;
process_route_stream_res(remove, {route, Route}) ->
hpr_route_ets:delete_route(Route);
process_route_stream_res(remove, {eui_pair, EUIPair}) ->
Expand Down Expand Up @@ -430,3 +466,15 @@ do_recv_from_stream(stream_finished, _Stream, Acc) ->
do_recv_from_stream(Msg, _Stream, _Acc) ->
lager:warning("unhandled msg from stream: ~p", [Msg]),
{error, {unhandled_message, Msg}}.

-spec start_loading() -> ok.
start_loading() ->
persistent_term:put(config_loading, true).

-spec done_loading() -> ok.
done_loading() ->
persistent_term:put(config_loading, false).

-spec is_done_with_initial_loading() -> boolean().
is_done_with_initial_loading() ->
persistent_term:get(config_loading, false).
2 changes: 1 addition & 1 deletion src/grpc/packet_router/hpr_packet_router_service.erl
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ route_packet_test() ->
meck:new(hpr_routing, [passthrough]),
PacketUp = hpr_packet_up:test_new(#{}),
EnvUp = hpr_envelope_up:new(PacketUp),
meck:expect(hpr_routing, handle_packet, [PacketUp, undefined], ok),
meck:expect(hpr_routing, handle_packet, fun(_PacketUp, _Opts) -> ok end),

StreamState = grpcbox_stream:stream_handler_state(
#state{}, #handler_state{}
Expand Down

0 comments on commit 2a36195

Please sign in to comment.