Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check that session init was done 5min after offer was sent #259

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/grpc/iot_config/hpr_route_stream_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@
devaddr_added := non_neg_integer()
}.


-record(state, {
stream :: grpcbox_client:stream() | undefined,
conn_backoff :: backoff:backoff()
Expand Down
42 changes: 36 additions & 6 deletions src/grpc/packet_router/hpr_packet_router_service.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,20 @@
-define(REG_KEY(Gateway), {?MODULE, Gateway}).
-define(SESSION_TIMER, timer:minutes(30)).
-define(SESSION_RESET, session_reset).
-ifdef(TEST).
-define(SESSION_OFFER_RES_TIMEOUT, timer:seconds(2)).
-else.
-define(SESSION_OFFER_RES_TIMEOUT, timer:minutes(5)).
-endif.
-define(SESSION_OFFER_TIMEOUT, session_offer_timeout).

-record(handler_state, {
started :: non_neg_integer(),
pubkey_bin :: undefined | binary(),
nonce :: undefined | binary(),
session_key :: undefined | binary(),
session_timer :: undefined | reference()
session_timer :: undefined | reference(),
session_offer_timer :: undefined | reference()
}).

-spec init(atom(), grpcbox_stream:t()) -> grpcbox_stream:t().
Expand Down Expand Up @@ -71,6 +78,23 @@ handle_info({give_away, NewPid, PubKeyBin}, StreamState) ->
handle_info(?SESSION_RESET, StreamState0) ->
{EnvDown, StreamState1} = create_session_offer(StreamState0),
grpcbox_stream:send(false, EnvDown, StreamState1);
handle_info({?SESSION_OFFER_TIMEOUT, OldSessionKey}, StreamState) ->
lager:debug("got session offer timeout ~s", [hpr_utils:bin_to_hex_string(OldSessionKey)]),
HandlerState = grpcbox_stream:stream_handler_state(StreamState),
CurrSessionKey = HandlerState#handler_state.session_key,
case CurrSessionKey =/= undefined andalso CurrSessionKey =/= OldSessionKey of
false ->
lager:debug("session offer timeout triggered but session_key is set and new"),
StreamState;
true ->
lager:warning(
"session offer ~s timeout triggered, closing stream", [
hpr_utils:bin_to_hex_string(HandlerState#handler_state.nonce)
]
),
EnvDown = hpr_envelope_down:new(undefined),
grpcbox_stream:send(true, EnvDown, StreamState)
end;
handle_info(_Msg, StreamState) ->
StreamState.

Expand Down Expand Up @@ -191,6 +215,9 @@ handle_session_init(SessionInit, StreamState) ->
HandlerState0#handler_state.session_timer
)
},
ok = hpr_utils:cancel_timer(
HandlerState0#handler_state.session_offer_timer
),
{ok, grpcbox_stream:stream_handler_state(StreamState, HandlerState1)}
end
end.
Expand All @@ -200,20 +227,23 @@ handle_session_init(SessionInit, StreamState) ->
create_session_offer(StreamState0) ->
HandlerState0 = grpcbox_stream:stream_handler_state(StreamState0),
Nonce = crypto:strong_rand_bytes(32),
Timer = erlang:send_after(
?SESSION_OFFER_RES_TIMEOUT,
self(),
{?SESSION_OFFER_TIMEOUT, HandlerState0#handler_state.session_key}
),
EnvDown = hpr_envelope_down:new(hpr_session_offer:new(Nonce)),
StreamState1 = grpcbox_stream:stream_handler_state(
StreamState0, HandlerState0#handler_state{nonce = Nonce}
StreamState0, HandlerState0#handler_state{nonce = Nonce, session_offer_timer = Timer}
),
lager:debug("session offer ~s", [
hpr_utils:bin_to_hex_string(Nonce)
]),
{EnvDown, StreamState1}.

-spec schedule_session_reset(OldTimer :: undefined | reference()) -> reference().
schedule_session_reset(OldTimer) when is_reference(OldTimer) ->
_ = erlang:cancel_timer(OldTimer),
erlang:send_after(?SESSION_TIMER, self(), ?SESSION_RESET);
schedule_session_reset(_OldTimer) ->
schedule_session_reset(OldTimer) ->
_ = hpr_utils:cancel_timer(OldTimer),
erlang:send_after(?SESSION_TIMER, self(), ?SESSION_RESET).

%% ------------------------------------------------------------------
Expand Down
18 changes: 7 additions & 11 deletions src/hpr_routing_cache.erl
Original file line number Diff line number Diff line change
Expand Up @@ -174,15 +174,11 @@ terminate(_Reason, _State = #state{}) ->
do_crawl_routing(Window) ->
Now = erlang:system_time(millisecond) - Window,
%% MS = ets:fun2ms(fun(#routing_entry{time = Time}) when Time < 1234 -> true end).
MS = [{
{routing_entry,
'_',
'$1',
'_',
'_',
'_'
},
[{'<', '$1', Now}],
[true]
}],
MS = [
{
{routing_entry, '_', '$1', '_', '_', '_'},
[{'<', '$1', Now}],
[true]
}
],
ets:select_delete(?ROUTING_ETS, MS).
102 changes: 55 additions & 47 deletions src/hpr_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
-endif.

-export([
cancel_timer/1,
gateway_name/1,
gateway_mac/1,
int_to_hex_string/1,
Expand All @@ -37,53 +38,12 @@

-type trace() :: packet_gateway | stream_gateway | devaddr | app_eui | dev_eui.

-spec load_key(KeyFileName :: string()) -> ok.
load_key(KeyFileName) ->
{PubKey, SigFun} =
Key =
case libp2p_crypto:load_keys(KeyFileName) of
{ok, #{secret := PrivKey, public := PubKey0}} ->
{PubKey0, libp2p_crypto:mk_sig_fun(PrivKey)};
{error, enoent} ->
KeyMap =
#{secret := PrivKey, public := PubKey0} = libp2p_crypto:generate_keys(
ed25519
),
ok = libp2p_crypto:save_keys(KeyMap, KeyFileName),
{PubKey0, libp2p_crypto:mk_sig_fun(PrivKey)}
end,

PubKeyBin = libp2p_crypto:pubkey_to_bin(PubKey),
B58 = libp2p_crypto:bin_to_b58(PubKeyBin),
ok = persistent_term:put(?HPR_PUBKEY_BIN, PubKeyBin),

%% Keep as binary for http protocol jsx encoding/decoding
SenderNSID =
case application:get_env(hpr, http_roaming_sender_nsid, erlang:list_to_binary(B58)) of
<<"">> -> erlang:list_to_binary(B58);
Val -> Val
end,
ok = persistent_term:put(?HPR_SENDER_NSID, SenderNSID),

ok = persistent_term:put(?HPR_B58, B58),
ok = persistent_term:put(?HPR_SIG_FUN, SigFun),
ok = persistent_term:put(?HPR_KEY, Key).

-spec pubkey_bin() -> libp2p_crypto:pubkey_bin().
pubkey_bin() ->
persistent_term:get(?HPR_PUBKEY_BIN, undefined).

-spec sig_fun() -> libp2p_crypto:sig_fun().
sig_fun() ->
persistent_term:get(?HPR_SIG_FUN, undefined).

-spec sender_nsid() -> string().
sender_nsid() ->
persistent_term:get(?HPR_SENDER_NSID, undefined).

-spec b58() -> binary().
b58() ->
persistent_term:get(?HPR_B58, undefined).
-spec cancel_timer(Timer :: reference() | any()) -> ok.
cancel_timer(Timer) when is_reference(Timer) ->
_ = erlang:cancel_timer(Timer),
ok;
cancel_timer(_Timer) ->
ok.

-spec gateway_name(PubKeyBin :: libp2p_crypto:pubkey_bin() | string()) -> string().
gateway_name(PubKeyBin) when is_binary(PubKeyBin) ->
Expand Down Expand Up @@ -229,6 +189,54 @@ get_env_int(Key, Default) ->
I -> I
end.

-spec load_key(KeyFileName :: string()) -> ok.
load_key(KeyFileName) ->
{PubKey, SigFun} =
Key =
case libp2p_crypto:load_keys(KeyFileName) of
{ok, #{secret := PrivKey, public := PubKey0}} ->
{PubKey0, libp2p_crypto:mk_sig_fun(PrivKey)};
{error, enoent} ->
KeyMap =
#{secret := PrivKey, public := PubKey0} = libp2p_crypto:generate_keys(
ed25519
),
ok = libp2p_crypto:save_keys(KeyMap, KeyFileName),
{PubKey0, libp2p_crypto:mk_sig_fun(PrivKey)}
end,

PubKeyBin = libp2p_crypto:pubkey_to_bin(PubKey),
B58 = libp2p_crypto:bin_to_b58(PubKeyBin),
ok = persistent_term:put(?HPR_PUBKEY_BIN, PubKeyBin),

%% Keep as binary for http protocol jsx encoding/decoding
SenderNSID =
case application:get_env(hpr, http_roaming_sender_nsid, erlang:list_to_binary(B58)) of
<<"">> -> erlang:list_to_binary(B58);
Val -> Val
end,
ok = persistent_term:put(?HPR_SENDER_NSID, SenderNSID),

ok = persistent_term:put(?HPR_B58, B58),
ok = persistent_term:put(?HPR_SIG_FUN, SigFun),
ok = persistent_term:put(?HPR_KEY, Key).

-spec pubkey_bin() -> libp2p_crypto:pubkey_bin().
pubkey_bin() ->
persistent_term:get(?HPR_PUBKEY_BIN, undefined).

-spec sig_fun() -> libp2p_crypto:sig_fun().
sig_fun() ->
persistent_term:get(?HPR_SIG_FUN, undefined).

-spec sender_nsid() -> string().
sender_nsid() ->
persistent_term:get(?HPR_SENDER_NSID, undefined).

-spec b58() -> binary().
b58() ->
persistent_term:get(?HPR_B58, undefined).

%% ------------------------------------------------------------------
%% Internal Function Definitions
%% ------------------------------------------------------------------
Expand Down
102 changes: 100 additions & 2 deletions test/hpr_packet_router_service_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
]).

-export([
session_test/1
session_test/1,
timeout_test/1
]).

-include_lib("eunit/include/eunit.hrl").
Expand All @@ -24,7 +25,8 @@
%%--------------------------------------------------------------------
all() ->
[
session_test
session_test,
timeout_test
michaeldjeffrey marked this conversation as resolved.
Show resolved Hide resolved
].

%%--------------------------------------------------------------------
Expand Down Expand Up @@ -137,6 +139,102 @@ session_test(_Config) ->

ok.

timeout_test(_Config) ->
RouteID = "8d502f32-4d58-4746-965e-8c7dfdcfc625",
Route = hpr_route:test_new(#{
id => RouteID,
net_id => 0,
oui => 4020,
server => #{
host => "127.0.0.1",
port => 8082,
protocol => {packet_router, #{}}
},
max_copies => 2
}),
EUIPairs = [
hpr_eui_pair:test_new(#{
route_id => RouteID, app_eui => 802041902051071031, dev_eui => 8942655256770396549
})
],
DevAddrRanges = [
hpr_devaddr_range:test_new(#{
route_id => RouteID, start_addr => 16#00000000, end_addr => 16#00000010
})
],

%% Normal test
{ok, GatewayPid1} = hpr_test_gateway:start(#{
forward => self(),
route => Route,
eui_pairs => EUIPairs,
devaddr_ranges => DevAddrRanges,
ignore_session_offer => false
}),

{ok, _} = hpr_test_gateway:receive_session_init(GatewayPid1, timer:seconds(1)),
{error, timeout} = hpr_test_gateway:receive_stream_down(GatewayPid1),

SessionKey1 = hpr_test_gateway:session_key(GatewayPid1),

PubKeyBin1 = hpr_test_gateway:pubkey_bin(GatewayPid1),
{ok, Pid1} = hpr_packet_router_service:locate(PubKeyBin1),

%% Checking that session keys are the same
?assertEqual(SessionKey1, session_key_from_stream(Pid1)),

gen_server:stop(GatewayPid1),

%% Testing with session key offer disabled, should result in closing of stream
{ok, GatewayPid2} = hpr_test_gateway:start(#{
forward => self(),
route => Route,
eui_pairs => EUIPairs,
devaddr_ranges => DevAddrRanges,
ignore_session_offer => true
}),

{error, timeout} = hpr_test_gateway:receive_session_init(GatewayPid2, timer:seconds(3)),
ok = hpr_test_gateway:receive_stream_down(GatewayPid2),

gen_server:stop(GatewayPid2),

%% Normal test with session reset
{ok, GatewayPid3} = hpr_test_gateway:start(#{
forward => self(),
route => Route,
eui_pairs => EUIPairs,
devaddr_ranges => DevAddrRanges,
ignore_session_offer => false
}),

{ok, _} = hpr_test_gateway:receive_session_init(GatewayPid3, timer:seconds(1)),
{error, timeout} = hpr_test_gateway:receive_stream_down(GatewayPid3),

SessionKey3 = hpr_test_gateway:session_key(GatewayPid3),
PubKeyBin3 = hpr_test_gateway:pubkey_bin(GatewayPid3),
{ok, Pid3} = hpr_packet_router_service:locate(PubKeyBin3),

%% Checking that session keys are the same
?assertEqual(SessionKey3, session_key_from_stream(Pid3)),
Pid3 ! session_reset,

{ok, _} = hpr_test_gateway:receive_session_init(GatewayPid3, timer:seconds(1)),
{error, timeout} = hpr_test_gateway:receive_stream_down(GatewayPid3),

SessionKey4 = hpr_test_gateway:session_key(GatewayPid3),
?assert(SessionKey3 =/= SessionKey4),
?assertEqual(SessionKey4, session_key_from_stream(Pid3)),

ok.

%% ===================================================================
%% Helpers
%% ===================================================================

session_key_from_stream(Pid) ->
State = sys:get_state(Pid),
StreamState = erlang:element(2, State),
CallbackState = erlang:element(20, StreamState),
HandlerState = erlang:element(3, CallbackState),
erlang:element(5, HandlerState).
28 changes: 14 additions & 14 deletions test/hpr_protocol_router_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -320,20 +320,20 @@ gateway_disconnect_test(_Config) ->
_ConnPid = h2_stream_set:connection(StreamSet),

ok = gen_server:stop(GatewayPid),
ok =
receive
{hpr_test_gateway, GatewayPid,
{terminate, #{channel := GatewayStreamSet, stream_pid := GatewayStreamPid}}} ->
GatewayConnPid = h2_stream_set:connection(GatewayStreamSet),
ok = test_utils:wait_until(
fun() ->
false == erlang:is_process_alive(GatewayConnPid) andalso
false == erlang:is_process_alive(GatewayStreamPid) andalso
false == erlang:is_process_alive(GatewayPid)
end
)
after timer:seconds(3) -> ct:fail(no_terminate_rcvd)
end,

case hpr_test_gateway:receive_terminate(GatewayPid) of
{error, timeout} ->
ct:fail(no_terminate_rcvd);
{ok, #{channel := GatewayStreamSet, stream_pid := GatewayStreamPid}} ->
GatewayConnPid = h2_stream_set:connection(GatewayStreamSet),
ok = test_utils:wait_until(
fun() ->
false == erlang:is_process_alive(GatewayConnPid) andalso
false == erlang:is_process_alive(GatewayStreamPid) andalso
false == erlang:is_process_alive(GatewayPid)
end
)
end,

ok = test_utils:wait_until(
fun() ->
Expand Down
Loading
Loading