diff --git a/compiler_base/parallel/src/executor/tests.rs b/compiler_base/parallel/src/executor/tests.rs index 2e4d21584..ae0fa9b6d 100644 --- a/compiler_base/parallel/src/executor/tests.rs +++ b/compiler_base/parallel/src/executor/tests.rs @@ -124,7 +124,11 @@ mod test_timeout_executor { tasks.push(MyTask { id: i }) } - let executor = TimeoutExecutor::new_with_thread_count(thread_count); + let executor = TimeoutExecutor::new_with_thread_count_and_timeout( + thread_count, + Instant::now() + Duration::from_secs(120), + ); + let mut events_collector = Arc::new(Mutex::new(EventsCollector::default())); let expected_events = diff --git a/compiler_base/parallel/src/executor/timeout.rs b/compiler_base/parallel/src/executor/timeout.rs index 8dce4d1f7..8d9bf0682 100644 --- a/compiler_base/parallel/src/executor/timeout.rs +++ b/compiler_base/parallel/src/executor/timeout.rs @@ -25,6 +25,7 @@ pub(crate) struct TimeoutSituation { pub struct TimeoutExecutor { timeout_queue: VecDeque, capacity: usize, + timeout: Option, } impl TimeoutExecutor { @@ -37,6 +38,20 @@ impl TimeoutExecutor { TimeoutExecutor { timeout_queue: VecDeque::default(), capacity: thread_count, + timeout: Some(default_deadline_60_seconds()), + } + } + + /// New a [`TimeoutExecutor`] with [`thread_count`] and [`timeout`]. + pub fn new_with_thread_count_and_timeout(thread_count: usize, timeout: Instant) -> Self { + debug_assert!( + thread_count > 0, + "At least one thread is required to execute the task." + ); + TimeoutExecutor { + timeout_queue: VecDeque::default(), + capacity: thread_count, + timeout: Some(timeout), } } @@ -98,7 +113,11 @@ impl Executor for TimeoutExecutor { let tinfo = task.info(); // Calculate the deadline. - let deadline = default_deadline_60_seconds(); + let deadline = if let Some(timeout) = self.timeout { + timeout + } else { + default_deadline_60_seconds() + }; // Notify the log that the [`Task`] is waiting to be executed. let event = TaskEvent::wait(task.info());