From 14b1829a65617186e0c8398c1e987e0a5d7180b9 Mon Sep 17 00:00:00 2001 From: Sergey Nazarov Date: Mon, 9 Dec 2024 15:43:01 +0300 Subject: [PATCH] use non-interrupting scheduler --- .../units/ConsensusClientDependencies.scala | 2 +- .../NonInterruptingScheduledExecutor.scala | 80 +++++++++++++++++++ 2 files changed, 81 insertions(+), 1 deletion(-) create mode 100644 src/main/scala/units/NonInterruptingScheduledExecutor.scala diff --git a/src/main/scala/units/ConsensusClientDependencies.scala b/src/main/scala/units/ConsensusClientDependencies.scala index 292c2057..c8588375 100644 --- a/src/main/scala/units/ConsensusClientDependencies.scala +++ b/src/main/scala/units/ConsensusClientDependencies.scala @@ -24,7 +24,7 @@ class ConsensusClientDependencies(val config: ClientConfig) extends AutoCloseabl Schedulers.singleThread(s"block-observer-${config.chainContract}", reporter = { e => log.warn("Error in BlockObserver", e) }) val globalScheduler: Scheduler = monix.execution.Scheduler.global val eluScheduler: SchedulerService = - Scheduler.singleThread(s"el-updater-${config.chainContract}", reporter = { e => log.warn("Exception in ELUpdater", e) }) + NonInterruptingScheduledExecutor(s"el-updater-${config.chainContract}", reporter = { e => log.warn("Exception in ELUpdater", e) }) private val httpClientBackend = HttpClientSyncBackend() private val maybeAuthenticatedBackend = config.jwtSecretFile match { diff --git a/src/main/scala/units/NonInterruptingScheduledExecutor.scala b/src/main/scala/units/NonInterruptingScheduledExecutor.scala new file mode 100644 index 00000000..c81c248d --- /dev/null +++ b/src/main/scala/units/NonInterruptingScheduledExecutor.scala @@ -0,0 +1,80 @@ +package units + +import monix.execution.schedulers.ExecutorScheduler +import monix.execution.{Cancelable, Features, UncaughtExceptionReporter, ExecutionModel as ExecModel} + +import java.util.concurrent.* +import scala.concurrent.duration.TimeUnit + +abstract class AdaptedThreadPoolExecutor(corePoolSize: Int, factory: ThreadFactory) extends ScheduledThreadPoolExecutor(corePoolSize, factory) { + def reportFailure(t: Throwable): Unit + + override def afterExecute(r: Runnable, t: Throwable): Unit = { + super.afterExecute(r, t) + var exception: Throwable = t + + if ((exception eq null) && r.isInstanceOf[Future[?]]) { + try { + val future = r.asInstanceOf[Future[?]] + if (future.isDone) future.get() + } catch { + case ex: ExecutionException => + exception = ex.getCause + case _: InterruptedException => + // ignore/reset + Thread.currentThread().interrupt() + case _: CancellationException => + () // ignore + } + } + + if (exception ne null) reportFailure(exception) + } +} + +class NonInterruptingScheduledExecutor( + s: ScheduledExecutorService, + r: UncaughtExceptionReporter, + override val executionModel: ExecModel, + override val features: Features +) extends ExecutorScheduler(s, r) { + override def executor: ScheduledExecutorService = s + + def scheduleOnce(initialDelay: Long, unit: TimeUnit, r: Runnable): Cancelable = { + if (initialDelay <= 0) { + execute(r) + Cancelable.empty + } else { + val task = s.schedule(r, initialDelay, unit) + Cancelable(() => { task.cancel(false); () }) + } + } + + override def scheduleWithFixedDelay(initialDelay: Long, delay: Long, unit: TimeUnit, r: Runnable): Cancelable = { + val task = s.scheduleWithFixedDelay(r, initialDelay, delay, unit) + Cancelable(() => { task.cancel(false); () }) + } + + override def scheduleAtFixedRate(initialDelay: Long, period: Long, unit: TimeUnit, r: Runnable): Cancelable = { + val task = s.scheduleAtFixedRate(r, initialDelay, period, unit) + Cancelable(() => { task.cancel(false); () }) + } +} + +object NonInterruptingScheduledExecutor { + def apply(name: String, reporter: UncaughtExceptionReporter): NonInterruptingScheduledExecutor = { + val atpe = new AdaptedThreadPoolExecutor( + 1, + (r: Runnable) => { + val t = new Thread(r) + t.setDaemon(true) + t.setName(name) + t.setUncaughtExceptionHandler(reporter.asJava) + t + } + ) { + override def reportFailure(t: Throwable): Unit = reporter.reportFailure(t) + } + new NonInterruptingScheduledExecutor(atpe, reporter, ExecModel.AlwaysAsyncExecution, Features.empty) + } +}