Skip to content

Commit

Permalink
location improvement (#278)
Browse files Browse the repository at this point in the history
* * Location now has its own channel
* Requests are done in order

* Add log

* fixup gwmp location test, wait for location request

* Fix test

* Improve test

---------

Co-authored-by: Michael Jeffrey <[email protected]>
  • Loading branch information
macpie and michaeldjeffrey authored Dec 5, 2023
1 parent f6cda77 commit af9e3af
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 35 deletions.
1 change: 1 addition & 0 deletions include/hpr.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
-define(DATA_DIR, "/var/data").

-define(IOT_CONFIG_CHANNEL, iot_config_channel).
-define(LOCATION_CHANNEL, location_channel).
-define(DOWNLINK_CHANNEL, downlink_channel).
-define(MULTI_BUY_CHANNEL, multi_buy_channel).

Expand Down
145 changes: 116 additions & 29 deletions src/grpc/iot_config/hpr_gateway_location.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,67 @@
%%%-------------------------------------------------------------------
-module(hpr_gateway_location).

-behaviour(gen_server).

-include("hpr.hrl").
-include("../autogen/iot_config_pb.hrl").

%% ------------------------------------------------------------------
%% API Function Exports
%% ------------------------------------------------------------------
-export([
start_link/1,
init/0,
get/1,
update_location/1,
expire_locations/0
]).

%% ------------------------------------------------------------------
%% gen_server Function Exports
%% ------------------------------------------------------------------
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2
]).

-define(SERVER, ?MODULE).
-define(ETS, hpr_gateway_location_ets).
-define(DETS, hpr_gateway_location_dets).
-define(DEFAULT_DETS_FILE, "hpr_gateway_location_dets").
-define(CLEANUP_INTERVAL, timer:hours(1)).
-define(CACHE_TIME, timer:hours(24)).
-define(ERROR_CACHE_TIME, timer:hours(1)).
-define(NOT_FOUND, not_found).
-define(REQUESTED, requested).

-record(state, {}).

-record(location, {
status :: ok | ?NOT_FOUND | error | ?REQUESTED,
gateway :: libp2p_crypto:pubkey_bin(),
timestamp :: non_neg_integer(),
h3_index :: h3:index() | undefined,
lat :: float() | undefined,
long :: float() | undefined
h3_index = undefined :: h3:index() | undefined,
lat = undefined :: float() | undefined,
long = undefined :: float() | undefined
}).

-type state() :: #state{}.
-type loc() :: {h3:index(), float(), float()} | undefined.

-export_type([loc/0]).

%% ------------------------------------------------------------------
%%% API Function Definitions
%% ------------------------------------------------------------------

-spec start_link(map()) -> any().
start_link(Args) ->
gen_server:start_link({local, ?SERVER}, ?SERVER, Args, []).

-spec init() -> ok.
init() ->
?ETS = ets:new(?ETS, [
Expand All @@ -54,62 +87,110 @@ init() ->

-spec get(libp2p_crypto:pubkey_bin()) -> {ok, h3:index(), float(), float()} | {error, any()}.
get(PubKeyBin) ->
Yesterday = erlang:system_time(millisecond) - ?CACHE_TIME,
Now = erlang:system_time(millisecond),
Yesterday = Now - ?CACHE_TIME,
LastHour = Now - ?ERROR_CACHE_TIME,
case ets:lookup(?ETS, PubKeyBin) of
[] ->
update_location(PubKeyBin);
[#location{timestamp = T}] when T < Yesterday ->
update_location(PubKeyBin);
[#location{h3_index = undefined}] ->
ok = ?MODULE:update_location(PubKeyBin),
{error, ?NOT_FOUND};
[#location{status = ok, timestamp = T, h3_index = H3Index, lat = Lat, long = Long}] when
T < Yesterday
->
ok = ?MODULE:update_location(PubKeyBin),
{ok, H3Index, Lat, Long};
[#location{status = _, timestamp = T}] when T < Yesterday ->
ok = ?MODULE:update_location(PubKeyBin),
{error, ?NOT_FOUND};
[#location{h3_index = H3Index, lat = Lat, long = Long}] ->
[#location{status = error, timestamp = T}] when T < LastHour ->
ok = ?MODULE:update_location(PubKeyBin),
{error, undefined};
[#location{status = error}] ->
{error, undefined};
[#location{status = requested, timestamp = T, gateway = PubKeyBin}] when T < LastHour ->
GatewayName = hpr_utils:gateway_name(PubKeyBin),
lager:warning("got an old request for ~p ~s", [PubKeyBin, GatewayName]),
ok = ?MODULE:update_location(PubKeyBin),
{error, ?REQUESTED};
[#location{status = requested}] ->
{error, ?REQUESTED};
[#location{status = ?NOT_FOUND}] ->
{error, ?NOT_FOUND};
[#location{status = ok, h3_index = H3Index, lat = Lat, long = Long}] ->
{ok, H3Index, Lat, Long}
end.

-spec update_location(libp2p_crypto:pubkey_bin()) -> ok.
update_location(PubKeyBin) ->
gen_server:cast(?SERVER, {update_location, PubKeyBin}).

-spec expire_locations() -> ok.
expire_locations() ->
Time = erlang:system_time(millisecond) - ?CACHE_TIME,
DETSDeleted = dets:select_delete(?DETS, [
{{'_', '_', '$3', '_', '_', '_'}, [{'<', '$3', Time}], [true]}
{{'_', '_', '_', '$3', '_', '_', '_'}, [{'<', '$3', Time}], [true]}
]),
lager:info("expiring ~w dets keys", [DETSDeleted]).

% ------------------------------------------------------------------
%%% gen_server Function Definitions
%% ------------------------------------------------------------------
%% Internal Function Definitions
%% ------------------------------------------------------------------
-spec update_location(libp2p_crypto:pubkey_bin()) ->
{ok, h3:index(), float(), float()} | {error, any()}.
update_location(PubKeyBin) ->
Start = erlang:system_time(millisecond),
-spec init(map()) -> {ok, state()}.
init(_Args) ->
{ok, #state{}}.

handle_call(_Msg, _From, State) ->
{reply, ok, State}.

handle_cast({update_location, PubKeyBin}, State) ->
NewLoc = #location{
status = ?REQUESTED,
gateway = PubKeyBin,
timestamp = erlang:system_time(millisecond),
h3_index = undefined,
lat = undefined,
long = undefined
timestamp = erlang:system_time(millisecond)
},
true = ets:insert(?ETS, NewLoc),
Start = erlang:system_time(millisecond),
case get_location_from_ics(PubKeyBin) of
{error, ?NOT_FOUND} ->
hpr_metrics:observe_gateway_location(Start, ?NOT_FOUND),
GatewayName = hpr_utils:gateway_name(PubKeyBin),
lager:info(
"fail to get_location_from_ics ~p for ~s",
[?NOT_FOUND, GatewayName]
),
ok = insert(NewLoc#location{status = ?NOT_FOUND});
{error, Reason} ->
hpr_metrics:observe_gateway_location(Start, error),
GatewauName = hpr_utils:gateway_name(PubKeyBin),
GatewayName = hpr_utils:gateway_name(PubKeyBin),
lager:warning(
"fail to get_location_from_ics ~p for ~s",
[Reason, GatewauName]
[Reason, GatewayName]
),
ok = insert(NewLoc),
{error, not_found};
ok = insert(NewLoc#location{status = error});
{ok, H3IndexString} ->
hpr_metrics:observe_gateway_location(Start, ok),
H3Index = h3:from_string(H3IndexString),
{Lat, Long} = h3:to_geo(H3Index),
ok = insert(NewLoc#location{
status = ok,
h3_index = H3Index,
lat = Lat,
long = Long
}),
{ok, H3Index, Lat, Long}
end.
})
end,
{noreply, State};
handle_cast(_Msg, State) ->
{noreply, State}.

handle_info(_Info, State) ->
{noreply, State}.

terminate(_Reason, _state) ->
ok.

%% ------------------------------------------------------------------
%% Internal Function Definitions
%% ------------------------------------------------------------------
-spec insert(Loc :: #location{}) -> ok.
insert(Loc) ->
true = ets:insert(?ETS, Loc),
Expand All @@ -131,11 +212,17 @@ get_location_from_ics(PubKeyBin) ->
SignedReq = Req#iot_config_gateway_location_req_v1_pb{signature = SigFun(EncodedReq)},
case
helium_iot_config_gateway_client:location(SignedReq, #{
channel => ?IOT_CONFIG_CHANNEL
channel => ?LOCATION_CHANNEL
})
of
{error, {Status, Reason}, _} when is_binary(Status) ->
{error, {grpcbox_utils:status_to_string(Status), Reason}};
StringStatus = grpcbox_utils:status_to_string(Status),
case StringStatus of
<<"NOT_FOUND">> ->
{error, ?NOT_FOUND};
_ ->
{error, {StringStatus, Reason}}
end;
{grpc_error, Reason} ->
{error, Reason};
{error, Reason} ->
Expand Down
3 changes: 3 additions & 0 deletions src/hpr_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ init([]) ->
%% Starting config service client channel here because of the way we get
%% .env vars into the app.
_ = maybe_start_channel(ConfigServiceConfig, ?IOT_CONFIG_CHANNEL),
_ = maybe_start_channel(ConfigServiceConfig, ?LOCATION_CHANNEL),
_ = maybe_start_channel(DownlinkServiceConfig, ?DOWNLINK_CHANNEL),
_ = maybe_start_channel(MultiBuyServiceConfig, ?MULTI_BUY_CHANNEL),

Expand All @@ -84,6 +85,8 @@ init([]) ->

?WORKER(hpr_packet_reporter, [PacketReporterConfig]),

?WORKER(hpr_gateway_location, [#{}]),

?WORKER(hpr_route_stream_worker, [#{}]),

?WORKER(hpr_protocol_router, [#{}]),
Expand Down
2 changes: 1 addition & 1 deletion src/metrics/hpr_metrics.erl
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ ics_update(Type, Action) ->

-spec observe_gateway_location(
Start :: non_neg_integer(),
Status :: ok | error
Status :: ok | error | not_found
) -> ok.
observe_gateway_location(Start, Status) ->
prometheus_histogram:observe(
Expand Down
26 changes: 22 additions & 4 deletions test/hpr_gateway_location_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@
main_test/1
]).

-define(NOT_FOUND, not_found).
-define(REQUESTED, requested).

-record(location, {
status :: ok | ?NOT_FOUND | error | ?REQUESTED,
gateway :: libp2p_crypto:pubkey_bin(),
timestamp :: non_neg_integer(),
h3_index :: h3:index() | undefined,
lat :: float() | undefined,
long :: float() | undefined
h3_index = undefined :: h3:index() | undefined,
lat = undefined :: float() | undefined,
long = undefined :: float() | undefined
}).

%%--------------------------------------------------------------------
Expand Down Expand Up @@ -65,8 +69,20 @@ main_test(_Config) ->
),
{ExpectedLat, ExpectedLong} = h3:to_geo(ExpectedIndex),

%% Make request to get gateway location
%% The location update is now async
Before = erlang:system_time(millisecond) - 1,

%% Wait until the location has been fetched
ok = test_utils:wait_until(fun() ->
case hpr_gateway_location:get(PubKeyBin1) of
{ok, _, _, _} ->
true;
Other ->
{false, Other}
end
end),

%% Make request to get gateway location
?assertEqual(
{ok, ExpectedIndex, ExpectedLat, ExpectedLong}, hpr_gateway_location:get(PubKeyBin1)
),
Expand All @@ -77,6 +93,7 @@ main_test(_Config) ->

%% Verify ETS data
[ETSLocationRec] = ets:lookup(hpr_gateway_location_ets, PubKeyBin1),
?assertEqual(ok, ETSLocationRec#location.status),
?assertEqual(PubKeyBin1, ETSLocationRec#location.gateway),
?assertEqual(ExpectedIndex, ETSLocationRec#location.h3_index),
?assertEqual(ExpectedLat, ETSLocationRec#location.lat),
Expand All @@ -86,6 +103,7 @@ main_test(_Config) ->

%% Verify DETS data
[DETSLocationRec] = dets:lookup(hpr_gateway_location_dets, PubKeyBin1),
?assertEqual(ok, DETSLocationRec#location.status),
?assertEqual(PubKeyBin1, DETSLocationRec#location.gateway),
?assertEqual(ExpectedIndex, DETSLocationRec#location.h3_index),
?assertEqual(ExpectedLat, DETSLocationRec#location.lat),
Expand Down
2 changes: 1 addition & 1 deletion test/hpr_metrics_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ main_test(_Config) ->
fun() ->
undefined =/=
prometheus_histogram:value(?METRICS_ICS_GATEWAY_LOCATION_HISTOGRAM, [
error
not_found
])
end
),
Expand Down
11 changes: 11 additions & 0 deletions test/hpr_protocol_gwmp_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,17 @@ with_location_test(_Config) ->
h3_index_str => IndexString
}),

%% Wait until the location has been fetched
PubKeyBin = hpr_test_gateway:pubkey_bin(GatewayPid),
ok = test_utils:wait_until(fun() ->
case hpr_gateway_location:get(PubKeyBin) of
{ok, _, _, _} ->
true;
Other ->
{false, Other}
end
end),

%% Send packet and route directly through interface
ok = hpr_test_gateway:send_packet(GatewayPid, #{}),

Expand Down
10 changes: 10 additions & 0 deletions test/hpr_protocol_http_roaming_packet_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,16 @@ uplink_with_gateway_location_test(_Config) ->
IndexString
),

%% Trigger an update location and wait until the location has been fetched
ok = test_utils:wait_until(fun() ->
case hpr_gateway_location:get(PubKeyBin) of
{ok, _, _, _} ->
true;
Other ->
{false, Other}
end
end),

ok = start_uplink_listener(),

SendPacketFun = fun(DevAddr) ->
Expand Down

0 comments on commit af9e3af

Please sign in to comment.