diff --git a/src/hpr_routing.erl b/src/hpr_routing.erl index e6c54591..5b9b7d4e 100644 --- a/src/hpr_routing.erl +++ b/src/hpr_routing.erl @@ -11,6 +11,9 @@ -define(GATEWAY_THROTTLE, hpr_gateway_rate_limit). -define(DEFAULT_GATEWAY_THROTTLE, 25). +-define(PACKET_THROTTLE, hpr_packet_rate_limit). +-define(DEFAULT_PACKET_THROTTLE, 1). + -ifdef(TEST). -define(SKF_UPDATE, timer:seconds(2)). -else. @@ -27,6 +30,8 @@ init() -> GatewayRateLimit = application:get_env(?APP, gateway_rate_limit, ?DEFAULT_GATEWAY_THROTTLE), ok = throttle:setup(?GATEWAY_THROTTLE, GatewayRateLimit, per_second), + PacketRateLimit = application:get_env(?APP, packet_rate_limit, ?DEFAULT_PACKET_THROTTLE), + ok = throttle:setup(?PACKET_THROTTLE, PacketRateLimit, per_second), ok. -spec handle_packet(PacketUp :: hpr_packet_up:packet(), Opts :: map()) -> @@ -41,7 +46,8 @@ handle_packet(PacketUp, Opts) -> {fun packet_type_check/1, [], invalid_packet_type}, {fun gateway_check/2, [Gateway], wrong_gateway}, {fun packet_session_check/2, [SessionKey], bad_signature}, - {fun throttle_check/1, [], gateway_limit_exceeded} + {fun gateway_throttle_check/1, [], gateway_limit_exceeded}, + {fun packet_throttle_check/1, [], packet_limit_exceeded} ], PacketUpType = hpr_packet_up:type(PacketUp), case execute_checks(PacketUp, Checks) of @@ -385,15 +391,25 @@ packet_session_check(PacketUp, undefined) -> packet_session_check(PacketUp, SessionKey) -> hpr_packet_up:verify(PacketUp, SessionKey). --spec throttle_check(PacketUp :: hpr_packet_up:packet()) -> +-spec gateway_throttle_check(PacketUp :: hpr_packet_up:packet()) -> boolean(). -throttle_check(PacketUp) -> +gateway_throttle_check(PacketUp) -> Gateway = hpr_packet_up:gateway(PacketUp), case throttle:check(?GATEWAY_THROTTLE, Gateway) of {limit_exceeded, _, _} -> false; _ -> true end. +-spec packet_throttle_check(PacketUp :: hpr_packet_up:packet()) -> + boolean(). +packet_throttle_check(PacketUp) -> + Gateway = hpr_packet_up:gateway(PacketUp), + PHash = hpr_packet_up:phash(PacketUp), + case throttle:check(?PACKET_THROTTLE, {Gateway, PHash}) of + {limit_exceeded, _, _} -> false; + _ -> true + end. + -spec execute_checks(PacketUp :: hpr_packet_up:packet(), [{fun(), list(any()), any()}]) -> ok | {error, any()}. execute_checks(_PacketUp, []) -> diff --git a/test/hpr_routing_SUITE.erl b/test/hpr_routing_SUITE.erl index c1e35435..fa1d3b8a 100644 --- a/test/hpr_routing_SUITE.erl +++ b/test/hpr_routing_SUITE.erl @@ -12,6 +12,7 @@ -export([ gateway_limit_exceeded_test/1, + packet_limit_exceeded_test/1, invalid_packet_type_test/1, wrong_gateway_test/1, bad_signature_test/1, @@ -42,6 +43,7 @@ all() -> [ gateway_limit_exceeded_test, + packet_limit_exceeded_test, invalid_packet_type_test, wrong_gateway_test, bad_signature_test, @@ -81,15 +83,15 @@ gateway_limit_exceeded_test(_Config) -> #{secret := PrivKey, public := PubKey} = libp2p_crypto:generate_keys(ed25519), SigFun = libp2p_crypto:mk_sig_fun(PrivKey), Gateway = libp2p_crypto:pubkey_to_bin(PubKey), - JoinPacketUpValid = test_utils:join_packet_up(#{ - gateway => Gateway, sig_fun => SigFun - }), Self = self(), lists:foreach( - fun(_) -> + fun(I) -> erlang:spawn( fun() -> - R = hpr_routing:handle_packet(JoinPacketUpValid, #{gateway => Gateway}), + PacketUpValid = test_utils:uplink_packet_up(#{ + gateway => Gateway, sig_fun => SigFun, fcnt => I + }), + R = hpr_routing:handle_packet(PacketUpValid, #{gateway => Gateway}), Self ! {gateway_limit_exceeded_test, R} end ) @@ -99,6 +101,31 @@ gateway_limit_exceeded_test(_Config) -> ?assertEqual({25, 1}, receive_gateway_limit_exceeded_test({0, 0})), ok. +packet_limit_exceeded_test(_Config) -> + %% Limit is DEFAULT_PACKET_THROTTLE = 1 per second + Limit = 1, + #{secret := PrivKey, public := PubKey} = libp2p_crypto:generate_keys(ed25519), + SigFun = libp2p_crypto:mk_sig_fun(PrivKey), + Gateway = libp2p_crypto:pubkey_to_bin(PubKey), + PacketUpValid = test_utils:uplink_packet_up(#{ + gateway => Gateway, sig_fun => SigFun, fcnt => 0 + }), + Self = self(), + lists:foreach( + fun(_) -> + erlang:spawn( + fun() -> + %% We send the same packet + R = hpr_routing:handle_packet(PacketUpValid, #{gateway => Gateway}), + Self ! {packet_limit_exceeded_test, R} + end + ) + end, + lists:seq(1, Limit + 1) + ), + ?assertEqual({1, 1}, receive_packet_limit_exceeded_test({0, 0})), + ok. + invalid_packet_type_test(_Config) -> #{secret := PrivKey, public := PubKey} = libp2p_crypto:generate_keys(ed25519), SigFun = libp2p_crypto:mk_sig_fun(PrivKey), @@ -147,13 +174,17 @@ mic_check_test(_Config) -> AppSessionKey = crypto:strong_rand_bytes(16), NwkSessionKey = crypto:strong_rand_bytes(16), DevAddr = 16#00000001, - PacketUp = test_utils:uplink_packet_up(#{ - app_session_key => AppSessionKey, - nwk_session_key => NwkSessionKey, - devaddr => DevAddr, - gateway => Gateway, - sig_fun => SigFun - }), + + PacketUp = fun(FCnt) -> + test_utils:uplink_packet_up(#{ + app_session_key => AppSessionKey, + nwk_session_key => NwkSessionKey, + devaddr => DevAddr, + gateway => Gateway, + sig_fun => SigFun, + fcnt => FCnt + }) + end, %% TEST 1: Join always works JoinPacketUpValid = test_utils:join_packet_up(#{ @@ -162,7 +193,7 @@ mic_check_test(_Config) -> ?assertEqual(ok, hpr_routing:handle_packet(JoinPacketUpValid, #{gateway => Gateway})), %% TEST 2: No SFK for devaddr - ?assertEqual(ok, hpr_routing:handle_packet(PacketUp, #{gateway => Gateway})), + ?assertEqual(ok, hpr_routing:handle_packet(PacketUp(0), #{gateway => Gateway})), %% TEST 3: Bad key and route exist Route = hpr_route:test_new(#{ @@ -205,7 +236,9 @@ mic_check_test(_Config) -> end ), - ?assertEqual({error, invalid_mic}, hpr_routing:handle_packet(PacketUp, #{gateway => Gateway})), + ?assertEqual( + {error, invalid_mic}, hpr_routing:handle_packet(PacketUp(1), #{gateway => Gateway}) + ), ok = hpr_route_ets:delete_skf(SKFBadKeyAndRouteExitst), @@ -232,7 +265,7 @@ mic_check_test(_Config) -> end ), - ?assertEqual(ok, hpr_routing:handle_packet(PacketUp, #{gateway => Gateway})), + ?assertEqual(ok, hpr_routing:handle_packet(PacketUp(2), #{gateway => Gateway})), %% TEST 5: Good key and route exist %% Adding a bad key to make sure it still works @@ -257,7 +290,7 @@ mic_check_test(_Config) -> end ), - ?assertEqual(ok, hpr_routing:handle_packet(PacketUp, #{gateway => Gateway})), + ?assertEqual(ok, hpr_routing:handle_packet(PacketUp(3), #{gateway => Gateway})), ok. @@ -897,22 +930,26 @@ active_locked_route_test(_Config) -> SigFun1 = libp2p_crypto:mk_sig_fun(PrivKey1), Gateway1 = libp2p_crypto:pubkey_to_bin(PubKey1), - UplinkPacketUp1 = test_utils:uplink_packet_up(#{ - gateway => Gateway1, - sig_fun => SigFun1, - devaddr => DevAddr, - fcnt => 1, - app_session_key => AppSessionKey, - nwk_session_key => NwkSessionKey - }), + PacketUp = fun(FCnt) -> + test_utils:uplink_packet_up(#{ + gateway => Gateway1, + sig_fun => SigFun1, + devaddr => DevAddr, + fcnt => FCnt, + app_session_key => AppSessionKey, + nwk_session_key => NwkSessionKey + }) + end, - ?assertEqual(ok, hpr_routing:handle_packet(UplinkPacketUp1, #{gateway => Gateway1})), + PacketUp0 = PacketUp(0), + + ?assertEqual(ok, hpr_routing:handle_packet(PacketUp0, #{gateway => Gateway1})), Self = self(), Received1 = {Self, {hpr_protocol_router, send, [ - UplinkPacketUp1, + PacketUp0, Route1 ]}, ok}, @@ -934,7 +971,7 @@ active_locked_route_test(_Config) -> locked => false }), ok = hpr_route_ets:insert_route(Route2), - ?assertEqual(ok, hpr_routing:handle_packet(UplinkPacketUp1, #{gateway => Gateway1})), + ?assertEqual(ok, hpr_routing:handle_packet(PacketUp(1), #{gateway => Gateway1})), ?assertEqual([], meck:history(hpr_protocol_router)), @@ -952,7 +989,7 @@ active_locked_route_test(_Config) -> locked => true }), ok = hpr_route_ets:insert_route(Route3), - ?assertEqual(ok, hpr_routing:handle_packet(UplinkPacketUp1, #{gateway => Gateway1})), + ?assertEqual(ok, hpr_routing:handle_packet(PacketUp(2), #{gateway => Gateway1})), ?assertEqual([], meck:history(hpr_protocol_router)), @@ -998,35 +1035,37 @@ in_cooldown_route_test(_Config) -> SigFun1 = libp2p_crypto:mk_sig_fun(PrivKey1), Gateway1 = libp2p_crypto:pubkey_to_bin(PubKey1), - PacketUp1 = test_utils:uplink_packet_up(#{ - gateway => Gateway1, - sig_fun => SigFun1, - devaddr => DevAddr, - fcnt => 1, - app_session_key => AppSessionKey, - nwk_session_key => NwkSessionKey - }), + PacketUp = fun(FCnt) -> + test_utils:uplink_packet_up(#{ + gateway => Gateway1, + sig_fun => SigFun1, + devaddr => DevAddr, + fcnt => FCnt, + app_session_key => AppSessionKey, + nwk_session_key => NwkSessionKey + }) + end, %% We setup protocol router to fail every call meck:new(hpr_protocol_router, [passthrough]), meck:expect(hpr_protocol_router, send, fun(_, _) -> {error, not_implemented} end), %% We send first packet a call should be made to hpr_protocol_router - ?assertEqual(ok, hpr_routing:handle_packet(PacketUp1, #{gateway => Gateway1})), + ?assertEqual(ok, hpr_routing:handle_packet(PacketUp(0), #{gateway => Gateway1})), ?assertEqual(1, meck:num_calls(hpr_protocol_router, send, 2)), %% We send second packet NO call should be made to hpr_protocol_router as the route would be in cooldown - ?assertEqual(ok, hpr_routing:handle_packet(PacketUp1, #{gateway => Gateway1})), + ?assertEqual(ok, hpr_routing:handle_packet(PacketUp(1), #{gateway => Gateway1})), ?assertEqual(1, meck:num_calls(hpr_protocol_router, send, 2)), %% We wait the initial first timeout 1s %% Send another packet and watch another call made to hpr_protocol_router timer:sleep(1000), - ?assertEqual(ok, hpr_routing:handle_packet(PacketUp1, #{gateway => Gateway1})), + ?assertEqual(ok, hpr_routing:handle_packet(PacketUp(2), #{gateway => Gateway1})), ?assertEqual(2, meck:num_calls(hpr_protocol_router, send, 2)), %% We send couple more packets and check that we still only 2 calls to hpr_protocol_router - ?assertEqual(ok, hpr_routing:handle_packet(PacketUp1, #{gateway => Gateway1})), - ?assertEqual(ok, hpr_routing:handle_packet(PacketUp1, #{gateway => Gateway1})), + ?assertEqual(ok, hpr_routing:handle_packet(PacketUp(3), #{gateway => Gateway1})), + ?assertEqual(ok, hpr_routing:handle_packet(PacketUp(4), #{gateway => Gateway1})), ?assertEqual(2, meck:num_calls(hpr_protocol_router, send, 2)), %% We check the route and make sure that the backoff is setup properly @@ -1039,7 +1078,7 @@ in_cooldown_route_test(_Config) -> timer:sleep(2000), meck:expect(hpr_protocol_router, send, fun(_, _) -> ok end), %% Sending another packet should trigger a new call to hpr_protocol_router - ?assertEqual(ok, hpr_routing:handle_packet(PacketUp1, #{gateway => Gateway1})), + ?assertEqual(ok, hpr_routing:handle_packet(PacketUp(5), #{gateway => Gateway1})), ?assertEqual(3, meck:num_calls(hpr_protocol_router, send, 2)), %% The route backoff should be back to undefined @@ -1551,3 +1590,15 @@ receive_gateway_limit_exceeded_test({OK, Error} = Acc) -> after 100 -> Acc end. + +-spec receive_packet_limit_exceeded_test(Acc :: {non_neg_integer(), non_neg_integer()}) -> + {non_neg_integer(), non_neg_integer()}. +receive_packet_limit_exceeded_test({OK, Error} = Acc) -> + receive + {packet_limit_exceeded_test, {error, packet_limit_exceeded}} -> + receive_packet_limit_exceeded_test({OK, Error + 1}); + {packet_limit_exceeded_test, ok} -> + receive_packet_limit_exceeded_test({OK + 1, Error}) + after 100 -> + Acc + end.