Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use non-interrupting scheduler in ELUpdater #31

Merged
merged 2 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/scala/units/ConsensusClientDependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class ConsensusClientDependencies(val config: ClientConfig) extends AutoCloseabl
Schedulers.singleThread(s"block-observer-${config.chainContract}", reporter = { e => log.warn("Error in BlockObserver", e) })
val globalScheduler: Scheduler = monix.execution.Scheduler.global
val eluScheduler: SchedulerService =
Scheduler.singleThread(s"el-updater-${config.chainContract}", reporter = { e => log.warn("Exception in ELUpdater", e) })
NonInterruptingScheduledExecutor(s"el-updater-${config.chainContract}", reporter = { e => log.warn("Exception in ELUpdater", e) })

private val httpClientBackend = HttpClientSyncBackend()
private val maybeAuthenticatedBackend = config.jwtSecretFile match {
Expand Down
80 changes: 80 additions & 0 deletions src/main/scala/units/NonInterruptingScheduledExecutor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package units

import monix.execution.schedulers.ExecutorScheduler
import monix.execution.{Cancelable, Features, UncaughtExceptionReporter, ExecutionModel as ExecModel}

import java.util.concurrent.*
import scala.concurrent.duration.TimeUnit

abstract class AdaptedThreadPoolExecutor(corePoolSize: Int, factory: ThreadFactory) extends ScheduledThreadPoolExecutor(corePoolSize, factory) {
def reportFailure(t: Throwable): Unit

override def afterExecute(r: Runnable, t: Throwable): Unit = {
super.afterExecute(r, t)
var exception: Throwable = t

if ((exception eq null) && r.isInstanceOf[Future[?]]) {
try {
val future = r.asInstanceOf[Future[?]]
if (future.isDone) future.get()
} catch {
case ex: ExecutionException =>
exception = ex.getCause
case _: InterruptedException =>
// ignore/reset
Thread.currentThread().interrupt()
case _: CancellationException =>
() // ignore
}
}

if (exception ne null) reportFailure(exception)
}
}

class NonInterruptingScheduledExecutor(
s: ScheduledExecutorService,
r: UncaughtExceptionReporter,
override val executionModel: ExecModel,
override val features: Features
) extends ExecutorScheduler(s, r) {
override def executor: ScheduledExecutorService = s

def scheduleOnce(initialDelay: Long, unit: TimeUnit, r: Runnable): Cancelable = {
if (initialDelay <= 0) {
execute(r)
Cancelable.empty
} else {
val task = s.schedule(r, initialDelay, unit)
Cancelable(() => { task.cancel(false); () })
}
}

override def scheduleWithFixedDelay(initialDelay: Long, delay: Long, unit: TimeUnit, r: Runnable): Cancelable = {
val task = s.scheduleWithFixedDelay(r, initialDelay, delay, unit)
Cancelable(() => { task.cancel(false); () })
}

override def scheduleAtFixedRate(initialDelay: Long, period: Long, unit: TimeUnit, r: Runnable): Cancelable = {
val task = s.scheduleAtFixedRate(r, initialDelay, period, unit)
Cancelable(() => { task.cancel(false); () })
}
}

object NonInterruptingScheduledExecutor {
def apply(name: String, reporter: UncaughtExceptionReporter): NonInterruptingScheduledExecutor = {
val atpe = new AdaptedThreadPoolExecutor(
1,
(r: Runnable) => {
val t = new Thread(r)
t.setDaemon(true)
t.setName(name)
t.setUncaughtExceptionHandler(reporter.asJava)
t
}
) {
override def reportFailure(t: Throwable): Unit = reporter.reportFailure(t)
}
new NonInterruptingScheduledExecutor(atpe, reporter, ExecModel.AlwaysAsyncExecution, Features.empty)
}
}
Loading