Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Throttle based on packet hash #252

Merged
merged 2 commits into from
Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions src/hpr_routing.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
-define(GATEWAY_THROTTLE, hpr_gateway_rate_limit).
-define(DEFAULT_GATEWAY_THROTTLE, 25).

-define(PACKET_THROTTLE, hpr_packet_rate_limit).
-define(DEFAULT_PACKET_THROTTLE, 1).

-ifdef(TEST).
-define(SKF_UPDATE, timer:seconds(2)).
-else.
Expand All @@ -27,6 +30,8 @@
init() ->
GatewayRateLimit = application:get_env(?APP, gateway_rate_limit, ?DEFAULT_GATEWAY_THROTTLE),
ok = throttle:setup(?GATEWAY_THROTTLE, GatewayRateLimit, per_second),
PacketRateLimit = application:get_env(?APP, packet_rate_limit, ?DEFAULT_PACKET_THROTTLE),
ok = throttle:setup(?PACKET_THROTTLE, PacketRateLimit, per_second),
ok.

-spec handle_packet(PacketUp :: hpr_packet_up:packet(), Opts :: map()) ->
Expand All @@ -41,7 +46,8 @@ handle_packet(PacketUp, Opts) ->
{fun packet_type_check/1, [], invalid_packet_type},
{fun gateway_check/2, [Gateway], wrong_gateway},
{fun packet_session_check/2, [SessionKey], bad_signature},
{fun throttle_check/1, [], gateway_limit_exceeded}
{fun gateway_throttle_check/1, [], gateway_limit_exceeded},
{fun packet_throttle_check/1, [], packet_limit_exceeded}
],
PacketUpType = hpr_packet_up:type(PacketUp),
case execute_checks(PacketUp, Checks) of
Expand Down Expand Up @@ -385,15 +391,25 @@ packet_session_check(PacketUp, undefined) ->
packet_session_check(PacketUp, SessionKey) ->
hpr_packet_up:verify(PacketUp, SessionKey).

-spec throttle_check(PacketUp :: hpr_packet_up:packet()) ->
-spec gateway_throttle_check(PacketUp :: hpr_packet_up:packet()) ->
boolean().
throttle_check(PacketUp) ->
gateway_throttle_check(PacketUp) ->
Gateway = hpr_packet_up:gateway(PacketUp),
case throttle:check(?GATEWAY_THROTTLE, Gateway) of
{limit_exceeded, _, _} -> false;
_ -> true
end.

-spec packet_throttle_check(PacketUp :: hpr_packet_up:packet()) ->
boolean().
packet_throttle_check(PacketUp) ->
Gateway = hpr_packet_up:gateway(PacketUp),
PHash = hpr_packet_up:phash(PacketUp),
case throttle:check(?PACKET_THROTTLE, {Gateway, PHash}) of
{limit_exceeded, _, _} -> false;
_ -> true
end.

-spec execute_checks(PacketUp :: hpr_packet_up:packet(), [{fun(), list(any()), any()}]) ->
ok | {error, any()}.
execute_checks(_PacketUp, []) ->
Expand Down
135 changes: 93 additions & 42 deletions test/hpr_routing_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

-export([
gateway_limit_exceeded_test/1,
packet_limit_exceeded_test/1,
invalid_packet_type_test/1,
wrong_gateway_test/1,
bad_signature_test/1,
Expand Down Expand Up @@ -42,6 +43,7 @@
all() ->
[
gateway_limit_exceeded_test,
packet_limit_exceeded_test,
invalid_packet_type_test,
wrong_gateway_test,
bad_signature_test,
Expand Down Expand Up @@ -81,15 +83,15 @@ gateway_limit_exceeded_test(_Config) ->
#{secret := PrivKey, public := PubKey} = libp2p_crypto:generate_keys(ed25519),
SigFun = libp2p_crypto:mk_sig_fun(PrivKey),
Gateway = libp2p_crypto:pubkey_to_bin(PubKey),
JoinPacketUpValid = test_utils:join_packet_up(#{
gateway => Gateway, sig_fun => SigFun
}),
Self = self(),
lists:foreach(
fun(_) ->
fun(I) ->
erlang:spawn(
fun() ->
R = hpr_routing:handle_packet(JoinPacketUpValid, #{gateway => Gateway}),
PacketUpValid = test_utils:uplink_packet_up(#{
gateway => Gateway, sig_fun => SigFun, fcnt => I
}),
R = hpr_routing:handle_packet(PacketUpValid, #{gateway => Gateway}),
Self ! {gateway_limit_exceeded_test, R}
end
)
Expand All @@ -99,6 +101,31 @@ gateway_limit_exceeded_test(_Config) ->
?assertEqual({25, 1}, receive_gateway_limit_exceeded_test({0, 0})),
ok.

packet_limit_exceeded_test(_Config) ->
%% Limit is DEFAULT_PACKET_THROTTLE = 1 per second
Limit = 1,
#{secret := PrivKey, public := PubKey} = libp2p_crypto:generate_keys(ed25519),
SigFun = libp2p_crypto:mk_sig_fun(PrivKey),
Gateway = libp2p_crypto:pubkey_to_bin(PubKey),
PacketUpValid = test_utils:uplink_packet_up(#{
gateway => Gateway, sig_fun => SigFun, fcnt => 0
}),
Self = self(),
lists:foreach(
fun(_) ->
erlang:spawn(
fun() ->
%% We send the same packet
R = hpr_routing:handle_packet(PacketUpValid, #{gateway => Gateway}),
Self ! {packet_limit_exceeded_test, R}
end
)
end,
lists:seq(1, Limit + 1)
),
?assertEqual({1, 1}, receive_packet_limit_exceeded_test({0, 0})),
ok.

invalid_packet_type_test(_Config) ->
#{secret := PrivKey, public := PubKey} = libp2p_crypto:generate_keys(ed25519),
SigFun = libp2p_crypto:mk_sig_fun(PrivKey),
Expand Down Expand Up @@ -147,13 +174,17 @@ mic_check_test(_Config) ->
AppSessionKey = crypto:strong_rand_bytes(16),
NwkSessionKey = crypto:strong_rand_bytes(16),
DevAddr = 16#00000001,
PacketUp = test_utils:uplink_packet_up(#{
app_session_key => AppSessionKey,
nwk_session_key => NwkSessionKey,
devaddr => DevAddr,
gateway => Gateway,
sig_fun => SigFun
}),

PacketUp = fun(FCnt) ->
test_utils:uplink_packet_up(#{
app_session_key => AppSessionKey,
nwk_session_key => NwkSessionKey,
devaddr => DevAddr,
gateway => Gateway,
sig_fun => SigFun,
fcnt => FCnt
})
end,

%% TEST 1: Join always works
JoinPacketUpValid = test_utils:join_packet_up(#{
Expand All @@ -162,7 +193,7 @@ mic_check_test(_Config) ->
?assertEqual(ok, hpr_routing:handle_packet(JoinPacketUpValid, #{gateway => Gateway})),

%% TEST 2: No SFK for devaddr
?assertEqual(ok, hpr_routing:handle_packet(PacketUp, #{gateway => Gateway})),
?assertEqual(ok, hpr_routing:handle_packet(PacketUp(0), #{gateway => Gateway})),

%% TEST 3: Bad key and route exist
Route = hpr_route:test_new(#{
Expand Down Expand Up @@ -205,7 +236,9 @@ mic_check_test(_Config) ->
end
),

?assertEqual({error, invalid_mic}, hpr_routing:handle_packet(PacketUp, #{gateway => Gateway})),
?assertEqual(
{error, invalid_mic}, hpr_routing:handle_packet(PacketUp(1), #{gateway => Gateway})
),

ok = hpr_route_ets:delete_skf(SKFBadKeyAndRouteExitst),

Expand All @@ -232,7 +265,7 @@ mic_check_test(_Config) ->
end
),

?assertEqual(ok, hpr_routing:handle_packet(PacketUp, #{gateway => Gateway})),
?assertEqual(ok, hpr_routing:handle_packet(PacketUp(2), #{gateway => Gateway})),

%% TEST 5: Good key and route exist
%% Adding a bad key to make sure it still works
Expand All @@ -257,7 +290,7 @@ mic_check_test(_Config) ->
end
),

?assertEqual(ok, hpr_routing:handle_packet(PacketUp, #{gateway => Gateway})),
?assertEqual(ok, hpr_routing:handle_packet(PacketUp(3), #{gateway => Gateway})),

ok.

Expand Down Expand Up @@ -897,22 +930,26 @@ active_locked_route_test(_Config) ->
SigFun1 = libp2p_crypto:mk_sig_fun(PrivKey1),
Gateway1 = libp2p_crypto:pubkey_to_bin(PubKey1),

UplinkPacketUp1 = test_utils:uplink_packet_up(#{
gateway => Gateway1,
sig_fun => SigFun1,
devaddr => DevAddr,
fcnt => 1,
app_session_key => AppSessionKey,
nwk_session_key => NwkSessionKey
}),
PacketUp = fun(FCnt) ->
test_utils:uplink_packet_up(#{
gateway => Gateway1,
sig_fun => SigFun1,
devaddr => DevAddr,
fcnt => FCnt,
app_session_key => AppSessionKey,
nwk_session_key => NwkSessionKey
})
end,

?assertEqual(ok, hpr_routing:handle_packet(UplinkPacketUp1, #{gateway => Gateway1})),
PacketUp0 = PacketUp(0),

?assertEqual(ok, hpr_routing:handle_packet(PacketUp0, #{gateway => Gateway1})),

Self = self(),
Received1 =
{Self,
{hpr_protocol_router, send, [
UplinkPacketUp1,
PacketUp0,
Route1
]},
ok},
Expand All @@ -934,7 +971,7 @@ active_locked_route_test(_Config) ->
locked => false
}),
ok = hpr_route_ets:insert_route(Route2),
?assertEqual(ok, hpr_routing:handle_packet(UplinkPacketUp1, #{gateway => Gateway1})),
?assertEqual(ok, hpr_routing:handle_packet(PacketUp(1), #{gateway => Gateway1})),

?assertEqual([], meck:history(hpr_protocol_router)),

Expand All @@ -952,7 +989,7 @@ active_locked_route_test(_Config) ->
locked => true
}),
ok = hpr_route_ets:insert_route(Route3),
?assertEqual(ok, hpr_routing:handle_packet(UplinkPacketUp1, #{gateway => Gateway1})),
?assertEqual(ok, hpr_routing:handle_packet(PacketUp(2), #{gateway => Gateway1})),

?assertEqual([], meck:history(hpr_protocol_router)),

Expand Down Expand Up @@ -998,35 +1035,37 @@ in_cooldown_route_test(_Config) ->
SigFun1 = libp2p_crypto:mk_sig_fun(PrivKey1),
Gateway1 = libp2p_crypto:pubkey_to_bin(PubKey1),

PacketUp1 = test_utils:uplink_packet_up(#{
gateway => Gateway1,
sig_fun => SigFun1,
devaddr => DevAddr,
fcnt => 1,
app_session_key => AppSessionKey,
nwk_session_key => NwkSessionKey
}),
PacketUp = fun(FCnt) ->
test_utils:uplink_packet_up(#{
gateway => Gateway1,
sig_fun => SigFun1,
devaddr => DevAddr,
fcnt => FCnt,
app_session_key => AppSessionKey,
nwk_session_key => NwkSessionKey
})
end,

%% We setup protocol router to fail every call
meck:new(hpr_protocol_router, [passthrough]),
meck:expect(hpr_protocol_router, send, fun(_, _) -> {error, not_implemented} end),

%% We send first packet a call should be made to hpr_protocol_router
?assertEqual(ok, hpr_routing:handle_packet(PacketUp1, #{gateway => Gateway1})),
?assertEqual(ok, hpr_routing:handle_packet(PacketUp(0), #{gateway => Gateway1})),
?assertEqual(1, meck:num_calls(hpr_protocol_router, send, 2)),
%% We send second packet NO call should be made to hpr_protocol_router as the route would be in cooldown
?assertEqual(ok, hpr_routing:handle_packet(PacketUp1, #{gateway => Gateway1})),
?assertEqual(ok, hpr_routing:handle_packet(PacketUp(1), #{gateway => Gateway1})),
?assertEqual(1, meck:num_calls(hpr_protocol_router, send, 2)),

%% We wait the initial first timeout 1s
%% Send another packet and watch another call made to hpr_protocol_router
timer:sleep(1000),
?assertEqual(ok, hpr_routing:handle_packet(PacketUp1, #{gateway => Gateway1})),
?assertEqual(ok, hpr_routing:handle_packet(PacketUp(2), #{gateway => Gateway1})),
?assertEqual(2, meck:num_calls(hpr_protocol_router, send, 2)),

%% We send couple more packets and check that we still only 2 calls to hpr_protocol_router
?assertEqual(ok, hpr_routing:handle_packet(PacketUp1, #{gateway => Gateway1})),
?assertEqual(ok, hpr_routing:handle_packet(PacketUp1, #{gateway => Gateway1})),
?assertEqual(ok, hpr_routing:handle_packet(PacketUp(3), #{gateway => Gateway1})),
?assertEqual(ok, hpr_routing:handle_packet(PacketUp(4), #{gateway => Gateway1})),
?assertEqual(2, meck:num_calls(hpr_protocol_router, send, 2)),

%% We check the route and make sure that the backoff is setup properly
Expand All @@ -1039,7 +1078,7 @@ in_cooldown_route_test(_Config) ->
timer:sleep(2000),
meck:expect(hpr_protocol_router, send, fun(_, _) -> ok end),
%% Sending another packet should trigger a new call to hpr_protocol_router
?assertEqual(ok, hpr_routing:handle_packet(PacketUp1, #{gateway => Gateway1})),
?assertEqual(ok, hpr_routing:handle_packet(PacketUp(5), #{gateway => Gateway1})),
?assertEqual(3, meck:num_calls(hpr_protocol_router, send, 2)),

%% The route backoff should be back to undefined
Expand Down Expand Up @@ -1551,3 +1590,15 @@ receive_gateway_limit_exceeded_test({OK, Error} = Acc) ->
after 100 ->
Acc
end.

-spec receive_packet_limit_exceeded_test(Acc :: {non_neg_integer(), non_neg_integer()}) ->
{non_neg_integer(), non_neg_integer()}.
receive_packet_limit_exceeded_test({OK, Error} = Acc) ->
receive
{packet_limit_exceeded_test, {error, packet_limit_exceeded}} ->
receive_packet_limit_exceeded_test({OK, Error + 1});
{packet_limit_exceeded_test, ok} ->
receive_packet_limit_exceeded_test({OK + 1, Error})
after 100 ->
Acc
end.
Loading