Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Telemetry code refactor. #3948

Merged
merged 5 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 77 additions & 39 deletions src/oc_erchef/apps/chef_telemetry/src/chef_telemetry_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
]).

-record(current_scan, {
total_nodes = 0,
active_nodes = 0,
company_name = <<"">>,
fqdns = [],
license_id,
scan_start_time,
scan_end_time,
total_nodes,
active_nodes,
company_name
scan_end_time
}).

-record(state, {
Expand Down Expand Up @@ -91,6 +93,10 @@ handle_cast(send_data, State) ->
try send_data(State) of
State1 -> State1
catch
_:_ when State#state.ctl_command /= "Hab infra server" andalso State#state.ctl_command /= "chef-server-ctl" ->
timer:apply_after(60 * 1000, gen_server, cast, [self(), send_data]),
sqerl:execute(<<"delete from telemetry where property = 'last_send'">>),
State;
_:_ ->
State
end,
Expand Down Expand Up @@ -145,16 +151,17 @@ send_data(State) ->
case chef_telemetry:is_enabled() of
true ->
State1 = init_req(State),
Hostname = get_fqdn(),
case check_send(Hostname) of
NodeName = to_binary("NODE:" ++ binary:bin_to_list(envy:get(oc_chef_wm, actions_fqdn, <<"">>, binary))),
case check_send(NodeName) of
true ->
[{_Server, ServerVersion, _, _}] = release_handler:which_releases(permanent),
State2 = get_nodes(State1),
State3 = get_company_name(State2),
State4 = get_api_fqdn(State3),
Req = generate_request(ServerVersion, State4),
send_req(Req, State3),
State3;
Funs = [fun get_total_nodes/1, fun get_active_nodes/1, fun get_company_name/1, fun get_api_fqdn/1, fun determine_license_id/1],
Pid = self(),
Res = [ erlang:spawn_monitor(runner(Pid, State1, Fun)) || Fun <- Funs ],
Current_scan = gather_res(Res, State1#state.current_scan, length(Funs)),
Req = generate_request(ServerVersion, State1#state{current_scan = Current_scan}),
send_req(Req, State1),
State1;
_ ->
State1
end;
Expand All @@ -163,15 +170,15 @@ send_data(State) ->
end,
State6.

get_api_fqdn(State) ->
sqerl:execute(<<"delete from telemetry where property like 'FQDN:%' and event_timestamp < (current_timestamp - interval '86700')">>),
case sqerl:execute(<<"select trim(property) as property from telemetry where property like 'FQDN:%'">>) of
get_api_fqdn(_State) ->
sqerl:execute(<<"delete from telemetry where property like 'NODE:%' and event_timestamp < (current_timestamp - interval '86700')">>),
case sqerl:execute(<<"select trim(property) as property from telemetry where property like 'NODE:%'">>) of
{ok, Rows} when is_list(Rows) ->
FQDNs = [binary:part(FQDN, 5, size(FQDN) -5) || [{<<"property">>, FQDN}] <- Rows],
FQDNs1 = mask(FQDNs),
State#state{fqdns = FQDNs1};
FQDNs1;
_ ->
State
[]
end.

get_org_nodes(OrgName, Query1, ReqId, DbContext) ->
Expand Down Expand Up @@ -208,7 +215,7 @@ get_license_company_name()->
{_Lic, _Type, _GracePeriod, _ExpDate, _Msg, CN,_LID} = chef_license:get_license(),
CN.

determine_license_id()->
determine_license_id(_State)->
{_Lic, _Type, _GracePeriod, _ExpDate, _Msg, _CN, LicenseID} = chef_license:get_license(),
case LicenseID of
undefined ->
Expand All @@ -221,8 +228,7 @@ determine_license_id()->
LicenseID
end.


get_company_name(State) ->
get_company_name(_State) ->
CompanyName =
case get_license_company_name() of
CN when CN =:= undefined; CN=:= <<"">>; CN =:= "" ->
Expand Down Expand Up @@ -250,9 +256,7 @@ get_company_name(State) ->
end;
CN -> CN
end,
CurrentScan = State#state.current_scan,
State#state{
current_scan = CurrentScan#current_scan{company_name = CompanyName}}.
CompanyName.

get_most_occuring(List) ->
FirstElement = lists:nth(1, List),
Expand All @@ -274,17 +278,20 @@ get_most_occuring(List) ->
Res1 = maps:fold(Fun1, {FirstElement, 0}, Map1),
element(1, Res1).

get_nodes(#state{req_id = ReqId, db_context = DbContext} = State) ->
get_total_nodes(State) ->
Count =
case chef_db:count_nodes(State#state.db_context) of
Count1 when is_integer(Count1) -> Count1;
Error -> throw({db_error, Error})
end,
Count.

get_active_nodes(#state{req_id = ReqId, db_context = DbContext} = State) ->
CurrentScan = State#state.current_scan,
ScanStartTime = CurrentScan#current_scan.scan_start_time,
ScanEndTime = CurrentScan#current_scan.scan_end_time,
QueryString = lists:flatten(io_lib:format("ohai_time:{~p TO ~p}", [ScanStartTime, ScanEndTime])),
Query1 = chef_index:query_from_params("node", QueryString, "0", "10000"),
Count =
case chef_db:count_nodes(DbContext) of
Count1 when is_integer(Count1) -> Count1;
Error -> throw({db_error, Error})
end,
Orgs =
case chef_db:list(#oc_chef_organization{}, DbContext) of
Orgs1 when is_list(Orgs1) -> Orgs1;
Expand All @@ -295,15 +302,12 @@ get_nodes(#state{req_id = ReqId, db_context = DbContext} = State) ->
Sum + Nodes
end,
ActiveNodes = lists:foldl(Fun, 0, Stats),
State#state{
current_scan = CurrentScan#current_scan{
total_nodes = Count,
active_nodes = ActiveNodes}}.
ActiveNodes.

generate_request(ServerVersion, State) ->
CurrentScan = State#state.current_scan,
Res = jiffy:encode({[
{<<"licenseId">>, determine_license_id()},
{<<"licenseId">>, CurrentScan#current_scan.license_id},
{<<"customerName">>, State#state.current_scan#current_scan.company_name},
{<<"periods">>, [
{[
Expand Down Expand Up @@ -337,7 +341,7 @@ generate_request(ServerVersion, State) ->
{<<"Infra Server">>, {[
{<<"deploymentType">>, <<"">>},
{<<"instanceId">>, <<"">>},
{<<"fqdn">>, State#state.fqdns},
{<<"fqdn">>, CurrentScan#current_scan.fqdns},
{<<"config_location">>, to_binary(State#state.running_file)},
{<<"binary_location">>, to_binary(State#state.ctl_command)}
]}}
Expand Down Expand Up @@ -373,10 +377,6 @@ check_send(Hostname) ->
Error
end.

get_fqdn() ->
HostName = binary:bin_to_list(envy:get(oc_chef_wm, actions_fqdn, <<"">>, binary)),
to_binary("FQDN:" ++ HostName).

mask(FQDNs) ->
Join = fun(Elements, Separator) ->
[H | T] = Elements,
Expand Down Expand Up @@ -439,3 +439,41 @@ mask(FQDNs) ->
end
end,
lists:map(Fun, FQDNs).

runner(Parent, State, Fun) ->
fun() ->
Res = Fun(State),
Parent ! {result, self(), Res}
end.

gather_res(_Ids, Res, Count) when Count =< 0->
Res;

gather_res(Ids, Res, Count) ->
Fun = fun(Id) ->
fun({Id1,_}) ->
Id =/= Id1
end
end,
receive
{result, Id, Ret} ->
case lists:search(Fun(Id), Ids) of
{value, _} ->
Res1 = erlang:setelement(length(lists:takewhile(Fun(Id), Ids)) + 2, Res, Ret),
gather_res(Ids, Res1, Count - 1);
_ ->
gather_res(Ids, Res, Count)
end;
{'DOWN', _Ref, process, _Id, normal} ->
gather_res(Ids, Res, Count);
{'DOWN', _Ref, process, Id, _Reason} ->
case lists:search(Fun(Id), Ids) of
{value, _} ->
gather_res(Ids, Res, Count - 1);
_ ->
gather_res(Ids, Res, Count)
end
after
60000 ->
Res
end.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ enable_flag_test() ->
execute(State, Expected, Env) ->
set_env([{chef_telemetry, reporting_url, "http://127.0.0.1:9001/esi/payload:io"}] ++ Env),
application:start(ibrowse),
put(state, State),
ets:new(telemetry_worker_test, [set, public, named_table]),
ets:insert(telemetry_worker_test, {state, State}),
setup(),
chef_telemetry_test_utils:start_server([]),
register(telemetry_mock_consumer, self()),
Expand All @@ -91,28 +92,28 @@ setup() ->
meck:expect(release_handler, which_releases, fun(_) -> [{"chef_server", "15.9.38", [], []}] end),
meck:expect(stats_hero, ctime, fun(_, _, Fun) -> Fun() end).

get_execute(<<"select trim(property) as property from telemetry where property like 'FQDN:%'">>) ->
State = get(state),
get_execute(<<"select trim(property) as property from telemetry where property like 'NODE:%'">>) ->
[{state, State}] = ets:lookup(telemetry_worker_test, state),
State#state.fqdn_select;

get_execute(<<"select telemetry_check_send('", _/binary>>) ->
State = get(state),
[{state, State}] = ets:lookup(telemetry_worker_test, state),
State#state.should_send;

get_execute(_) ->
ok.

adhoc_select([<<"email">>], <<"users">>, all) ->
State = get(state),
[{state, State}] = ets:lookup(telemetry_worker_test, state),
{ok, State#state.user_emails}.

count_nodes(_Context) ->
State = get(state),
[{state, State}] = ets:lookup(telemetry_worker_test, state),
State#state.nodes_count.

chef_db_list(Record, _context) ->
RecordName = element(1, Record),
State = get(state),
[{state, State}] = ets:lookup(telemetry_worker_test, state),
case RecordName of
oc_chef_organization -> State#state.organizations;
_ -> []
Expand All @@ -123,10 +124,10 @@ org_metadata(_context, OrgName) ->
{OrgName1, OrgName1}.

index_search(_) ->
State = get(state),
[{state, State}] = ets:lookup(telemetry_worker_test, state),
[Nodes | Rest] = State#state.index_search,
State1 = State#state{index_search = Rest},
put(state, State1),
ets:insert(telemetry_worker_test, {state, State1}),
{ok, 0, length(Nodes), Nodes}.

trigger_send_data() ->
Expand Down
Loading