Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

input workers exception handling #11603

Open
colinsurprenant opened this issue Feb 13, 2020 · 2 comments
Open

input workers exception handling #11603

colinsurprenant opened this issue Feb 13, 2020 · 2 comments
Labels

Comments

@colinsurprenant
Copy link
Contributor

There are cases of logstash completely crashing by an exception raised from inside an input sub-thread (a thread created by the input plugin to add concurrency to its input duties).

This is troubling since the pipeline inputworker plugin.run is wrapped in a begin ... rescue => e statement and thus any unrescued exception in an input plugin should always be rescued at the pipeline inputworker.

I was able to reproduce this behaviour when an input exception is triggered from within one of its sub-treads. The interesting part is that it depends on the sub-threads join order.

Here's the reproduction code which mimics logstash threads initialization sequence and exception handling.

Thread.abort_on_exception = true
Thread.report_on_exception = false

def plugin_worker2
  begin
    puts("plugin_worker2 start sleep")
    sleep(60)
    puts("plugin_worker2 end sleep")
  ensure
    puts("plugin_worker2 ensure")
  end
end

def plugin_worker1
  begin
    sleep(1)
    puts("plugin_worker1 raising exception")
    raise("foo")
  ensure
    puts("plugin_worker1 ensure")
  end
end

def plugin_run
  worker_thread1 = Thread.new { plugin_worker1 }
  worker_thread2 = Thread.new { plugin_worker2 }

  worker_thread2.join
  worker_thread1.join
end

def input_worker
  begin
    plugin_thread = Thread.new { plugin_run }
    plugin_thread.join
  rescue => e
    puts("input_worker rescue #{e.inspect}")
  ensure
    puts("input_worker ensure")
  end
end

def pipeline
  input_thread = Thread.new { input_worker }
  input_thread.join
end


begin
  pipeline_thread = Thread.new { pipeline }
  pipeline_thread.join
rescue => e
  puts("main rescue #{e.inspect}")
ensure
  puts("main ensure")
end

When running this it produces:

plugin_worker2 start sleep
plugin_worker1 raising exception
plugin_worker1 ensure
main rescue #<RuntimeError: foo>
main ensure

We see that the input worker exception handling was skipped and logstash crashed (main rescue #<RuntimeError: foo>)

But simply changing the order of the workers join to

  worker_thread1.join
  worker_thread2.join

It behaves correctly:

plugin_worker2 start sleep
plugin_worker1 raising exception
plugin_worker1 ensure
input_worker rescue #<RuntimeError: foo>
input_worker ensure
main rescue #<RuntimeError: foo>
main ensure

We see that the input worker was able to rescue the exception input_worker rescue #<RuntimeError: foo> and in logstash this would have actually restarted the input plugin.

So this logic will only work iif the first sub-thread joined is the one raising the exception which is not very practical. I will investigate how to improve this; will look into the ThreadsWait class.

@colinsurprenant
Copy link
Contributor Author

The kafka input is especially prone to the problem because it basically spans a number of consumer threads that does the work in the run method. I created a specific kafka issue for that logstash-plugins/logstash-integration-kafka#15

@kaisecheng
Copy link
Contributor

relates #10612

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants