diff --git a/lib/kubetruth/etl.rb b/lib/kubetruth/etl.rb index c240835..d9c096f 100644 --- a/lib/kubetruth/etl.rb +++ b/lib/kubetruth/etl.rb @@ -26,6 +26,31 @@ def interruptible_sleep(interval) Kernel.sleep interval end + def watch_crds_to_interrupt(&block) + begin + begin + watcher = kubeapi.watch_project_mappings + + thr = Thread.new do + logger.debug "Created watcher for CRD" + watcher.each do |notice| + logger.debug {"Interrupting polling sleep, CRD watcher woke up for: #{notice}"} + interrupt_sleep + break + end + logger.debug "CRD watcher exiting" + end + + block.call + ensure + watcher.finish + thr.join(5) + end + rescue => e + logger.log_exception(e, "Failure in watch/polling logic") + end + end + def interrupt_sleep Thread.new { @sleeper&.run } end @@ -33,40 +58,20 @@ def interrupt_sleep def with_polling(interval, &block) while true - begin - watcher = kubeapi.watch_project_mappings - + run_time = Benchmark.measure do begin - thr = Thread.new do - logger.debug "Created watcher for CRD" - watcher.each do |notice| - logger.debug {"Interrupting polling sleep, CRD watcher woke up for: #{notice}"} - interrupt_sleep - break - end - logger.debug "CRD watcher exiting" - end - - run_time = Benchmark.measure do - begin - block.call - rescue Kubetruth::Template::Error => e - logger.error e.message - rescue => e - logger.log_exception(e, "Failure while applying config transforms") - end - end - logger.info "Benchmark: #{run_time}" - - logger.info("Poller sleeping for #{interval}") - interruptible_sleep(interval) - ensure - watcher.finish - thr.join(5) + block.call + rescue Kubetruth::Template::Error => e + logger.error e.message + rescue => e + logger.log_exception(e, "Failure while applying config transforms") end + end + logger.info "Benchmark: #{run_time}" - rescue => e - logger.log_exception(e, "Failure in watch/polling logic") + watch_crds_to_interrupt do + logger.info("Poller sleeping for #{interval}") + interruptible_sleep(interval) end end