Skip to content

Commit

Permalink
Merge pull request #635 from SergeTupchiy/fix-batch-processor-max-que…
Browse files Browse the repository at this point in the history
…ue-size

fix(otel_batch_processor): don't divide `max_queue_size` by word-size
  • Loading branch information
bryannaegele authored Oct 29, 2023
2 parents b51f206 + 22f3417 commit 0fb4e6c
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 14 deletions.
27 changes: 15 additions & 12 deletions apps/opentelemetry/src/otel_batch_processor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,7 @@ init([Args=#{reg_name := RegName}]) ->
exporter_config=ExporterConfig,
resource = Resource,
handed_off_table=undefined,
max_queue_size=case SizeLimit of
infinity -> infinity;
_ -> SizeLimit div erlang:system_info(wordsize)
end,
max_queue_size=SizeLimit,
exporting_timeout_ms=ExportingTimeout,
check_table_size_ms=CheckTableSize,
scheduled_delay_ms=ScheduledDelay,
Expand All @@ -193,11 +190,17 @@ callback_mode() ->
idle(enter, _OldState, Data=#data{exporter=undefined,
exporter_config=ExporterConfig,
scheduled_delay_ms=SendInterval,
check_table_size_ms=CheckInterval,
reg_name=RegName}) ->
Exporter = init_exporter(RegName, ExporterConfig),
{keep_state, Data#data{exporter=Exporter}, [{{timeout, export_spans}, SendInterval, export_spans}]};
idle(enter, _OldState, #data{scheduled_delay_ms=SendInterval}) ->
{keep_state_and_data, [{{timeout, export_spans}, SendInterval, export_spans}]};
{keep_state, Data#data{exporter=Exporter},
[{{timeout, export_spans}, SendInterval, export_spans},
{{timeout, check_table_size}, CheckInterval, check_table_size}]};
idle(enter, _OldState, #data{scheduled_delay_ms=SendInterval,
check_table_size_ms=CheckInterval}) ->
{keep_state_and_data,
[{{timeout, export_spans}, SendInterval, export_spans},
{{timeout, check_table_size}, CheckInterval, check_table_size}]};
idle(_, export_spans, Data=#data{exporter=undefined,
exporter_config=ExporterConfig,
reg_name=RegName}) ->
Expand Down Expand Up @@ -274,15 +277,15 @@ handle_event_(_State, _, force_flush, Data) ->
handle_event_(_State, {timeout, check_table_size}, check_table_size, #data{max_queue_size=infinity}) ->
keep_state_and_data;
handle_event_(_State, {timeout, check_table_size}, check_table_size, #data{max_queue_size=MaxQueueSize,
check_table_size_ms=CheckInterval,
reg_name=RegName}) ->
case ets:info(?CURRENT_TABLE(RegName), size) of
M when M >= MaxQueueSize ->
disable(RegName),
keep_state_and_data;
disable(RegName);
_ ->
enable(RegName),
keep_state_and_data
end;
enable(RegName)
end,
{keep_state_and_data, [{{timeout, check_table_size}, CheckInterval, check_table_size}]};
handle_event_(_, {call, From}, {set_exporter, ExporterConfig}, Data=#data{exporter=OldExporter,
reg_name=RegName}) ->
otel_exporter:shutdown(OldExporter),
Expand Down
44 changes: 42 additions & 2 deletions apps/opentelemetry/test/otel_batch_processor_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@
-include_lib("stdlib/include/assert.hrl").
-include_lib("common_test/include/ct.hrl").

-include("otel_span.hrl").
-include_lib("opentelemetry_api/include/opentelemetry.hrl").

all() ->
[exporting_timeout_test].
[exporting_timeout_test,
check_table_size_test].

%% verifies that after the runner has to be killed for taking too long
%% that everything is still functional and the exporter does not crash
exporting_timeout_test(_Config) ->
process_flag(trap_exit, true),

{ok, Pid, _} = otel_batch_processor:start_link(#{reg_name => test_processor,
{ok, Pid, _} = otel_batch_processor:start_link(#{name => test_processor,
resource => otel_resource:create([]),
exporter => ?MODULE,
exporting_timeout_ms => 1,
Expand All @@ -30,6 +32,34 @@ exporting_timeout_test(_Config) ->
ok
end.

check_table_size_test(_Config) ->
MaxQueueSize = 10,
CheckTableSizeMs = 1,
{ok, _Pid, #{reg_name := RegName}} = otel_batch_processor:start_link(
#{name => test_processor_check_size_test,
resource => otel_resource:create([]),
exporter => ?MODULE,
exporting_timeout_ms => timer:minutes(10),
%% long enough, so that it never happens during the test
scheduled_delay_ms => timer:minutes(10),
check_table_size_ms => CheckTableSizeMs,
max_queue_size => MaxQueueSize}
),
%% max_queue_size limit is not reached
true = otel_batch_processor:on_end(generate_span(), #{reg_name => RegName}),
lists:foreach(fun(_) ->
otel_batch_processor:on_end(generate_span(), #{reg_name => RegName})
end,
lists:seq(1, MaxQueueSize)),
%% Wait for more than CheckTablesizeMS to be sure check timeout occurred
timer:sleep(CheckTableSizeMs * 5),
dropped = otel_batch_processor:on_end(generate_span(), #{reg_name => RegName}),

otel_batch_processor:force_flush(#{reg_name => RegName}),
%% force_flush is async, have to wait for some long enough time again,
timer:sleep(CheckTableSizeMs * 10),
true = otel_batch_processor:on_end(generate_span(), #{reg_name => RegName}).

%% exporter behaviour

init(_) ->
Expand All @@ -40,3 +70,13 @@ export(_, _) ->

shutdown(_) ->
ok.

%% helpers

generate_span() ->
#span{trace_id = otel_id_generator:generate_trace_id(),
span_id = otel_id_generator:generate_span_id(),
name = "test_span",
trace_flags = 1,
is_recording = true,
instrumentation_scope = #instrumentation_scope{name = "test"}}.

0 comments on commit 0fb4e6c

Please sign in to comment.