Skip to content

Commit

Permalink
Extract more telemetry helpers
Browse files Browse the repository at this point in the history
  • Loading branch information
NelsonVides committed Dec 15, 2023
1 parent 33f8bb0 commit 49ac503
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 48 deletions.
14 changes: 9 additions & 5 deletions guides/telemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,12 @@ Events related to internals of the throttle processes, these might expose unstab
might want to log or reconfigure:
```erlang
event_name: [amoc, throttle, process]
measurements: #{msg := binary(), process := pid()}
metadata: #{monotonic_time := integer(), name := atom(), printable_state => map()}
measurements: #{logger:level() => 1}
metadata: #{log_level := logger:level(),
msg := binary(),
rate => non_neg_integer(),
state => map(),
_ => _}
```

## Coordinator
Expand All @@ -105,8 +109,8 @@ metadata: #{monotonic_time := integer(), name := atom()}
There are related to bad configuration events, they might deserve logging
```erlang
event_name: [amoc, config, get | verify | env]
measurements: #{}
metadata: #{log_class => syslog_level(), _ => _}
measurements: #{logger:level() => 1}
metadata: #{log_level => logger:level(), setting => atom(), msg => binary(), _ => _}
```

## Cluster
Expand All @@ -116,5 +120,5 @@ There are related to clustering events
```erlang
event_name: [amoc, cluster, connect_nodes | nodedown | master_node_down]
measurements: #{count => non_neg_integer()},
metadata: #{node => node(), nodes => nodes(), state => map()}
metadata: #{nodes => nodes(), state => map()}
```
11 changes: 5 additions & 6 deletions src/amoc_config/amoc_config.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,16 @@ get(Name) ->
get(Name, Default) when is_atom(Name) ->
case ets:lookup(amoc_config, Name) of
[] ->
telemetry:execute([amoc, config, get], #{},
#{log_class => error, msg => <<"no scenario setting">>,
scenario => Name}),
amoc_telemetry:execute_log(
error, [config, get], #{setting => Name}, <<"no scenario setting">>),
throw({invalid_setting, Name});
[#module_parameter{name = Name, value = undefined}] ->
Default;
[#module_parameter{name = Name, value = Value}] ->
Value;
InvalidLookupRet ->
telemetry:execute([amoc, config, get], #{},
#{log_class => error, msg => <<"invalid lookup return value">>,
scenario => Name, return => InvalidLookupRet}),
amoc_telemetry:execute_log(
error, [config, get], #{setting => Name, return => InvalidLookupRet},
<<"invalid lookup return value">>),
throw({invalid_lookup_ret_value, InvalidLookupRet})
end.
10 changes: 5 additions & 5 deletions src/amoc_config/amoc_config_env.erl
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ get_os_env(Name, Default) ->
case parse_value(Value, Default) of
{ok, Term} -> Term;
{error, Error} ->
telemetry:execute(
[amoc, config, env], #{error => 1},
#{log_class => error, error => Error, variable_name => EnvName,
variable_value => Value, default_value => Default,
msg => <<"cannot parse environment variable, using default value">>}),
amoc_telemetry:execute_log(
error, [config, env],
#{error => Error, variable_name => EnvName,
variable_value => Value, default_value => Default},
<<"cannot parse environment variable, using default value">>),
Default
end.

Expand Down
18 changes: 9 additions & 9 deletions src/amoc_config/amoc_config_verification.erl
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,17 @@ verify(Fun, Value) ->
{true, NewValue} -> {true, NewValue};
{false, Reason} -> {false, {verification_failed, Reason}};
Ret ->
telemetry:execute([amoc, config, verify], #{error => 1},
#{log_class => error, verification_method => Fun,
verification_arg => Value, verification_return => Ret,
msg => <<"invalid verification method">>}),
amoc_telemetry:execute_log(
error, [config, verify],
#{verification_method => Fun, verification_arg => Value, verification_return => Ret},
<<"invalid verification method">>),
{false, {invalid_verification_return_value, Ret}}
catch
C:E:S ->
telemetry:execute([amoc, config, verify], #{error => 1},
#{log_class => error, verification_method => Fun,
verification_arg => Value,
kind => C, reason => E, stacktrace => S,
msg => <<"invalid verification method">>}),
amoc_telemetry:execute_log(
error, [config, verify],
#{verification_method => Fun, verification_arg => Value,
kind => C, reason => E, stacktrace => S},
<<"invalid verification method">>),
{false, {exception_during_verification, {C, E, S}}}
end.
15 changes: 8 additions & 7 deletions src/amoc_distribution/amoc_cluster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,7 @@ handle_call(_Request, _From, State) ->

-spec handle_cast(any(), state()) -> {noreply, state()}.
handle_cast({connect_nodes, Nodes}, State) ->
telemetry:execute([amoc, cluster, connect_nodes], #{count => length(Nodes)},
#{nodes => Nodes, state => state_to_map(State)}),
execute_nodes(connect_nodes, Nodes, state_to_map(State)),
NewState = handle_connect_nodes(Nodes, State),
schedule_timer(NewState),
{noreply, NewState};
Expand All @@ -148,14 +147,11 @@ handle_info(timeout, State) ->
schedule_timer(NewState),
{noreply, NewState};
handle_info({nodedown, Node}, #state{master = Node} = State) ->
telemetry:execute([amoc, cluster, master_node_down],
#{count => 1},
#{node => Node, state => state_to_map(State)}),
execute_nodes(master_node_down, [Node], state_to_map(State)),
erlang:halt(),
{noreply, State};
handle_info({nodedown, Node}, State) ->
telemetry:execute([amoc, cluster, nodedown], #{count => 1},
#{node => Node, state => state_to_map(State)}),
execute_nodes(nodedown, [Node], state_to_map(State)),
{noreply, merge(connection_lost, [Node], State)};
handle_info(_Info, State) ->
{noreply, State}.
Expand Down Expand Up @@ -284,3 +280,8 @@ maybe_set_master(Node, #state{new_connection_action = Action}) ->
%% to avoid a possibility of the amoc_cluster deadlock while
%% running the Action call set_master_node/2 asynchronously
spawn(fun() -> set_master_node(Node, Action) end).

-spec execute_nodes(atom(), [node()], #{any() => any()}) -> ok.
execute_nodes(Name, Nodes, State) ->
PrefixedName = [amoc, cluster, Name],
telemetry:execute(PrefixedName, #{count => length(Nodes)}, #{nodes => Nodes, state => State}).
16 changes: 13 additions & 3 deletions src/amoc_telemetry.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,24 @@
%% @copyright 2023 Erlang Solutions Ltd.
-module(amoc_telemetry).

-export([execute/3]).
-export([execute/3, execute_log/4]).

-spec execute(EventName, Measurements, Metadata) -> ok when
EventName :: telemetry:event_name(),
Measurements :: telemetry:event_measurements(),
Metadata :: telemetry:event_metadata().
execute(Name, Measurements, Metadata) ->
TimeStamp = erlang:monotonic_time(),
NameWithAmocPrefix = [amoc | Name],
PrefixedName = [amoc | Name],
MetadataWithTS = Metadata#{monotonic_time => TimeStamp},
telemetry:execute(NameWithAmocPrefix, Measurements, MetadataWithTS).
telemetry:execute(PrefixedName, Measurements, MetadataWithTS).

-spec execute_log(Level, EventName, Metadata, Msg) -> ok when
Level :: logger:level(),
EventName :: telemetry:event_name(),
Metadata :: telemetry:event_metadata(),
Msg :: binary().
execute_log(Level, Name, Metadata, Message) ->
PrefixedName = [amoc | Name],
MetadataWithLog = Metadata#{log_level => Level, msg => Message},
telemetry:execute(PrefixedName, #{Level => 1}, MetadataWithLog).
29 changes: 16 additions & 13 deletions src/amoc_throttle/amoc_throttle_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -129,18 +129,18 @@ format_status(_Opt, [_PDict, State]) ->
initial_state(Name, Interval, Rate) when Rate >= 0 ->
NewRate = case {Rate =:= 0, Rate < 5} of
{true, _} ->
internal_event(<<"invalid rate, must be higher than zero">>, Name),
internal_error(<<"invalid rate, must be higher than zero">>, Name, Rate),
1;
{_, true} ->
internal_event(<<"too low rate, please reduce NoOfProcesses">>, Name),
internal_error(<<"too low rate, please reduce NoOfProcesses">>, Name, Rate),
Rate;
{_, false} ->
Rate
end,
Delay = case {Interval, Interval div NewRate, Interval rem NewRate} of
{0, _, _} -> 0; %% limit only No of simultaneous executions
{_, I, _} when I < 10 ->
internal_event(<<"too high rate, please increase NoOfProcesses">>, Name),
internal_error(<<"too high rate, please increase NoOfProcesses">>, Name, Rate),
10;
{_, DelayBetweenExecutions, 0} -> DelayBetweenExecutions;
{_, DelayBetweenExecutions, _} -> DelayBetweenExecutions + 1
Expand Down Expand Up @@ -202,26 +202,29 @@ async_runner(Fun) ->
timeout(State) ->
State#state.interval + ?DEFAULT_MSG_TIMEOUT.

inc_n(#state{n = N, max_n = MaxN} = State) ->
inc_n(#state{name = Name, n = N, max_n = MaxN} = State) ->
NewN = N + 1,
case MaxN < NewN of
true ->
internal_event(<<"throttle proccess has invalid N">>, State),
PrintableState = printable_state(State),
Msg = <<"throttle proccess has invalid N">>,
amoc_telemetry:execute_log(
error, [throttle, process], #{name => Name, n => NewN, state => PrintableState}, Msg),
State#state{n = MaxN};
false ->
State#state{n = NewN}
end.

-spec internal_event(binary(), state()) -> any().
internal_event(Msg, #state{name = Name} = State) ->
PrintableState = printable_state(State),
telemetry:execute([amoc, throttle, process],
#{msg => Msg, process => self()},
#{printable_state => PrintableState,
monotonic_time => erlang:monotonic_time(), name => Name});
internal_event(Msg, Name) when is_atom(Name) ->
telemetry:execute([amoc, throttle, process],
#{msg => Msg, process => self()},
#{monotonic_time => erlang:monotonic_time(), name => Name}).
amoc_telemetry:execute_log(
debug, [throttle, process], #{self => self(), name => Name, state => PrintableState}, Msg).

-spec internal_error(binary(), atom(), non_neg_integer()) -> any().
internal_error(Msg, Name, Rate) ->
amoc_telemetry:execute_log(
error, [throttle, process], #{name => Name, rate => Rate}, Msg).

printable_state(#state{} = State) ->
Fields = record_info(fields, state),
Expand Down

0 comments on commit 49ac503

Please sign in to comment.