Skip to content

Commit

Permalink
[4.3] Listener federators and memory improvements (#6297)
Browse files Browse the repository at this point in the history
* [4.3] Listener federators and memory improvements

Prior, when a gen_listener needed to federate its queue and bindings
to other zones, it would start_link the listener_federator (and trap
exits). However, should a listener_federator process actually exit,
the parent gen_listener would receive the EXIT message but failed to
have a handle_info clause to restart the process.

Introduce a sofo supervisor for starting listener_federator processes
and restart them if they die unexpectedly (non-normal exits).

Secondly, introduce memory consumption checks and GC gen_listeners
when they exceed 100K in total_heap_size. In addition, copy the binary
payloads as they arrive off the AMQP channel has shown to decelerate
the memory growth as the original binary can be garbage collected
sooner and not count against the gen_listener's "old" heap.

All told, these manual GC runs appear to be minimal and only affect
long-running and busy gen_listeners; even fewer runs needed to release
memory after the binary:copy/1 introduction.

* edoc

* just ok

* fun police

* monitor parent and terminate if parent dies
  • Loading branch information
jamesaimonetti authored Feb 10, 2020
1 parent ff5c058 commit b918e2b
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 83 deletions.
191 changes: 140 additions & 51 deletions core/kazoo_amqp/src/gen_listener.erl

Large diffs are not rendered by default.

35 changes: 35 additions & 0 deletions core/kazoo_amqp/src/kz_amqp_federated_listeners_sup.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
%%%-----------------------------------------------------------------------------
%%% @copyright (C) 2012-2020, 2600Hz
%%% @doc
%%% @end
%%%-----------------------------------------------------------------------------
-module(kz_amqp_federated_listeners_sup).

-behaviour(supervisor).

-export([start_link/0
,start_child/3
,init/1
]).

-include("kz_amqp_util.hrl").

-define(CHILDREN, [?WORKER_TYPE('listener_federator', 'transient')]).

-spec start_link() -> kz_types:startlink_ret().
start_link() ->
supervisor:start_link({'local', ?MODULE}, ?MODULE, []).

-spec start_child(pid(), kz_term:ne_binary(), kz_term:proplist()) -> kz_types:sup_startchild_ret().
start_child(Parent, Broker, FederateParams) ->
ParentCallId = kz_util:get_callid(),
supervisor:start_child(?MODULE, [Parent, ParentCallId, Broker, FederateParams]).

-spec init(any()) -> kz_types:sup_init_ret().
init([]) ->
RestartStrategy = 'simple_one_for_one',
MaxRestarts = 5,
MaxSecondsBetweenRestarts = 10,
SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts},

{'ok', {SupFlags, ?CHILDREN}}.
2 changes: 1 addition & 1 deletion core/kazoo_amqp/src/kz_amqp_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

-define(CHILDREN, [?WORKER('kz_amqp_connections')
,?SUPER('kz_amqp_connection_sup')
,?SUPER('kz_amqp_federated_listeners_sup')
,?WORKER('kz_amqp_assignments')
,?WORKER('kz_amqp_bootstrap')
]).
Expand Down Expand Up @@ -134,7 +135,6 @@ pool_pid(Pool) ->
[P | _] -> P
end.


%%==============================================================================
%% Supervisor callbacks
%%==============================================================================
Expand Down
28 changes: 19 additions & 9 deletions core/kazoo_amqp/src/listener_federator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

-behaviour(gen_listener).

-export([start_link/3
-export([start_link/4
,stop/1
,broker/1
]).
Expand All @@ -26,7 +26,7 @@

-define(SERVER, ?MODULE).

-record(state, {parent :: pid()
-record(state, {parent :: {pid(), reference()}
,broker :: kz_term:ne_binary()
,self_binary = kz_term:to_binary(pid_to_list(self())) :: kz_term:ne_binary()
,zone :: kz_term:ne_binary()
Expand All @@ -41,9 +41,8 @@
%% @doc Starts the server.
%% @end
%%------------------------------------------------------------------------------
-spec start_link(pid(), kz_term:ne_binary(), kz_term:proplist()) -> kz_types:startlink_ret().
start_link(Parent, Broker, Params) ->
ParentCallId = kz_util:get_callid(),
-spec start_link(pid(), kz_term:ne_binary(), kz_term:ne_binary(), kz_term:proplist()) -> kz_types:startlink_ret().
start_link(Parent, ParentCallId, Broker, Params) ->
gen_listener:start_link(?SERVER, Params, [Parent, ParentCallId, Broker]).

-spec broker(kz_types:server_ref()) -> kz_term:ne_binary().
Expand Down Expand Up @@ -71,7 +70,9 @@ init([Parent, ParentCallId, Broker]=L) ->
CallId = kz_binary:join([ParentCallId, Zone], <<"-">>),
kz_util:put_callid(CallId),

{'ok', #state{parent=Parent
gen_listener:notify_of_federator_listener(Parent, {Broker, self()}),

{'ok', #state{parent={Parent, monitor('process', Parent)}
,broker=Broker
,zone=Zone
}}.
Expand All @@ -81,7 +82,7 @@ init([Parent, ParentCallId, Broker]=L) ->
%% @end
%%------------------------------------------------------------------------------
-spec handle_call(any(), any(), state()) -> kz_types:handle_call_ret_state(state()).
handle_call({'stop', Parent}, _From, #state{parent=Parent}=State) ->
handle_call({'stop', Parent}, _From, #state{parent={Parent, _Ref}}=State) ->
{'stop', 'normal', 'ok', State};
handle_call('get_broker', _From, #state{broker=Broker}=State) ->
{'reply', Broker, State};
Expand All @@ -95,7 +96,11 @@ handle_call(_Request, _From, State) ->
-spec handle_cast(any(), state()) -> kz_types:handle_cast_ret_state(state()).
handle_cast({'gen_listener', {'created_queue', _}}, State) ->
{'noreply', State};
handle_cast({'gen_listener', {'is_consuming', 'true'}}, #state{parent=Parent, broker=Broker}=State) ->
handle_cast({'gen_listener', {'is_consuming', 'true'}}
,#state{parent={Parent, _Ref}
,broker=Broker
}=State
) ->
gen_server:cast(Parent, {'federator_is_consuming', Broker, 'true'}),
{'noreply', State};
handle_cast(_Msg, State) ->
Expand All @@ -107,12 +112,17 @@ handle_cast(_Msg, State) ->
%% @end
%%------------------------------------------------------------------------------
-spec handle_info(any(), state()) -> kz_types:handle_info_ret_state(state()).
handle_info({'DOWN', Ref, 'process', Parent, _Reason}
,#state{parent={Parent, Ref}}=State
) ->
lager:info("parent gen_listener ~p down: ~p", [Parent, _Reason]),
{'stop', 'normal', State};
handle_info(_Info, State) ->
lager:info("unhandled message: ~p", [_Info]),
{'noreply', State}.

-spec handle_event(kz_json:object(), gen_listener:basic_deliver(), amqp_basic(), state()) -> gen_listener:handle_event_return().
handle_event(JObj, BasicDeliver, BasicData, #state{parent=Parent
handle_event(JObj, BasicDeliver, BasicData, #state{parent={Parent, _Ref}
,broker=Broker
,self_binary=Self
,zone=Zone
Expand Down
16 changes: 9 additions & 7 deletions core/kazoo_apps/src/kapps_maintenance.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1056,7 +1056,7 @@ fix_services_tree(AccountId, Tree) ->
Services = kz_services:fetch(AccountId),
fix_services_tree(Services, Tree, kz_services:services_jobj(Services)).

-spec fix_services_tree(kz_term:ne_binary(), kz_term:ne_binaries(), kzd_services:doc()) -> 'ok'.
-spec fix_services_tree(kz_services:services(), kz_term:ne_binaries(), kzd_services:doc()) -> 'ok'.
fix_services_tree(Services, Tree, ServicesJObj) ->
case kzd_services:tree(ServicesJObj) =:= Tree of
'true' -> 'ok';
Expand Down Expand Up @@ -2188,19 +2188,21 @@ check_release() ->

-spec run_check(fun()) -> 'ok'.
run_check(CheckFun) ->
StartTimeMs = kz_time:now_ms(),
{Pid, Ref} = kz_util:spawn_monitor(CheckFun, []),
wait_for_check(Pid, Ref).
wait_for_check(Pid, Ref, StartTimeMs).

-spec wait_for_check(pid(), reference()) -> 'ok'.
wait_for_check(Pid, Ref) ->
-spec wait_for_check(pid(), reference(), pos_integer()) -> 'ok'.
wait_for_check(Pid, Ref, StartTimeMs) ->
receive
{'DOWN', Ref, 'process', Pid, 'normal'} -> 'ok';
{'DOWN', Ref, 'process', Pid, 'normal'} ->
lager:info("check finished in ~pms", [kz_time:elapsed_ms(StartTimeMs)]);
{'DOWN', Ref, 'process', Pid, Reason} ->
lager:error("check in ~p failed to run: ~p", [Pid, Reason]),
lager:error("check in ~p failed to run after ~pms: ~p", [Pid, kz_time:elapsed_ms(StartTimeMs), Reason]),
throw(Reason)
after 5 * ?MILLISECONDS_IN_MINUTE ->
lager:error("check in ~p timed out", [Pid]),
exit(Pid, 'timeout'),
exit(Pid, 'kill'),
throw('timeout')
end.

Expand Down
13 changes: 8 additions & 5 deletions core/kazoo_proper/src/kazoo_proper_maintenance.erl
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,17 @@ run_seq_modules() ->

-spec run_seq_module(atom() | kz_term:ne_binary()) -> 'no_return'.
run_seq_module(Module) when is_atom(Module) ->
_ = [Module:Function()
|| Function <- ['seq'],
kz_module:is_exported(Module, Function, 0)
],
'no_return';
run_seq_module(Module, kz_module:is_exported(Module, 'seq', 0));
run_seq_module(ModuleBin) ->
run_seq_module(kz_term:to_atom(ModuleBin)).

run_seq_module(_Module, 'false') -> 'no_return';
run_seq_module(Module, 'true') ->
StartTimeMs = kz_time:now_ms(),
Module:seq(),
?SUP_LOG_DEBUG("~s:seq() ran in ~pms", [Module, kz_time:elapsed_ms(StartTimeMs)]),
'no_return'.

-spec modules() -> [module()].
modules() ->
case application:get_key('kazoo_proper', 'modules') of
Expand Down
17 changes: 7 additions & 10 deletions core/kazoo_services/src/kz_services.erl
Original file line number Diff line number Diff line change
Expand Up @@ -947,18 +947,15 @@ handle_fetch_options(Services, ['skip_cache'|Options]) ->
%% @doc
%% @end
%%------------------------------------------------------------------------------
-spec commit_updates(kz_term:ne_binary()
,kz_services_quantities:billables() | kz_services_quantities:billable()
,kz_services_quantities:billables() | kz_services_quantities:billable()
) -> services().
-type billable_updates() :: 'undefined' | kz_services_quantities:billables() | kz_services_quantities:billable().

-spec commit_updates(kz_term:ne_binary(), billable_updates(), billable_updates()) ->
services().
commit_updates(Account, Current, Proposed) ->
commit_updates(Account, Current, Proposed, kz_json:new()).

-spec commit_updates(kz_term:ne_binary()
,kz_services_quantities:billables() | kz_services_quantities:billable()
,kz_services_quantities:billables() | kz_services_quantities:billable()
,kz_json:object()
) -> services().
-spec commit_updates(kz_term:ne_binary(), billable_updates(), billable_updates(), kz_json:object()) ->
services().
commit_updates(Account, Current, Proposed, AuditLog) ->
AccountId = kz_util:format_account_id(Account),
FetchOptions = [{'updates', AccountId, to_billables(Current), to_billables(Proposed)}
Expand All @@ -974,7 +971,7 @@ commit_updates(Account, Current, Proposed, AuditLog) ->
commit_updates(Services, FetchOptions)
end.

-spec to_billables(kz_services_quantities:billables() | kz_services_quantities:billable()) -> kz_services_quantities:billables().
-spec to_billables(billable_updates()) -> kz_services_quantities:billables().
to_billables('undefined') -> [];
to_billables(Bs) when is_list(Bs) -> Bs;
to_billables(B) -> [B].
Expand Down

0 comments on commit b918e2b

Please sign in to comment.