Skip to content

Commit

Permalink
Cancel runs when the manifest generation times out, and show a messag…
Browse files Browse the repository at this point in the history
…e when a run is cancelled
  • Loading branch information
TAGraves committed Oct 3, 2024
1 parent df991d0 commit dbfa8a6
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 71 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ rand_chacha = "0.3.1"

blake3 = "=1.4.0"

tracing = { version = "0.1.37", features = ["release_max_level_info"] }
tracing = { version = "0.1.37", features = ["release_max_level_debug"] }
tracing-subscriber = { version = "0.3.16", features = ["env-filter", "json"] }
tracing-appender = "0.2.2"

Expand Down
2 changes: 2 additions & 0 deletions _typos.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[default.extend-words]
acception = "acception"
38 changes: 20 additions & 18 deletions crates/abq_cli/src/workers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,26 +178,28 @@ async fn do_shutdown(

let (suite_result, errors) = finalized_reporters.finish(&completed_summary);

for error in errors {
eprintln!("{error}");
}
if let WorkersExitStatus::Completed { .. } = status {
for error in errors {
eprintln!("{error}");
}

print!("\n\n");
suite_result
.write_short_summary_lines(&mut stdout, ShortSummaryGrouping::Runner)
.unwrap();
println!("\n");
if execution_mode == ExecutionMode::WriteNormal {
println!("Run the following command to replay these tests locally:");
println!("\n");
println!(
"\tabq test --run-id {} --worker {} --num {} -- <your-test-command>",
run_id,
worker_tag.index(),
num_runners,
);
print!("\n\n");
suite_result
.write_short_summary_lines(&mut stdout, ShortSummaryGrouping::Runner)
.unwrap();
println!("\n");
println!("Specify your Access Token with the RWX_ACCESS_TOKEN env variable, passing --access-token, or running `abq login`.");
if execution_mode == ExecutionMode::WriteNormal {
println!("Run the following command to replay these tests locally:");
println!("\n");
println!(
"\tabq test --run-id {} --worker {} --num {} -- <your-test-command>",
run_id,
worker_tag.index(),
num_runners,
);
println!("\n");
println!("Specify your Access Token with the RWX_ACCESS_TOKEN env variable, passing --access-token, or running `abq login`.");
}
}

// If the workers didn't fault, exit with whatever status the test suite run is at; otherwise,
Expand Down
59 changes: 9 additions & 50 deletions crates/abq_queue/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,12 @@ enum RunState {
#[derive(Debug)]
enum ManifestPersistence {
Persisted(ManifestPersistedCell),
ManifestNeverReceived,
EmptyManifest,
}

#[derive(Debug)]
enum ResultsPersistence {
Persisted(ResultsPersistedCell),
ManifestNeverReceived,
}

const MAX_BATCH_SIZE: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(100) };
Expand Down Expand Up @@ -283,8 +281,6 @@ enum WriteResultsError {
RunNotFound,
#[error("attempting to write results before manifest received")]
WaitingForManifest,
#[error("attempting to write results when manifest failed to be generated")]
ManifestNeverReceived,
#[error("attempting to write results for cancelled run")]
RunCancelled,
}
Expand All @@ -295,8 +291,6 @@ enum ReadResultsError {
RunNotFound,
#[error("results cannot be read before manifest is received")]
WaitingForManifest,
#[error("a manifest failed to be generated")]
ManifestNeverReceived,
#[error("the run was cancelled before all test results were received")]
RunCancelled,
}
Expand Down Expand Up @@ -504,9 +498,7 @@ impl AllRuns {
AssignedRunStatus::AlreadyDone { exit_code }
}
}
RunState::Cancelled { .. } => AssignedRunStatus::AlreadyDone {
exit_code: ExitCode::CANCELLED,
},
RunState::Cancelled { reason } => AssignedRunStatus::Cancelled { reason: *reason },
}
}

Expand Down Expand Up @@ -938,7 +930,6 @@ impl AllRuns {
ResultsPersistence::Persisted(cell) => {
Ok((cell.clone(), EligibleForRemoteDump::Yes))
}
ResultsPersistence::ManifestNeverReceived => Err(ManifestNeverReceived),
},
RunState::Cancelled { .. } => Err(RunCancelled),
}
Expand Down Expand Up @@ -990,7 +981,6 @@ impl AllRuns {
Ok(ReadResultsState::RunInProgress { active_runners })
}
}
ResultsPersistence::ManifestNeverReceived => Err(ManifestNeverReceived),
}
}
RunState::Cancelled { .. } => Err(RunCancelled),
Expand Down Expand Up @@ -1050,9 +1040,6 @@ impl AllRuns {
RetryManifestState::NotYetPersisted
}
}
ManifestPersistence::ManifestNeverReceived => {
RetryManifestState::Error(RetryManifestError::ManifestNeverReceived)
}
ManifestPersistence::EmptyManifest => {
// Ship the empty manifest over.
RetryManifestState::Manifest(NextWorkBundle {
Expand Down Expand Up @@ -1214,37 +1201,24 @@ impl AllRuns {

let mut run = runs.get(&run_id).expect("no run recorded").write();

let test_command_hash = match run.state {
RunState::WaitingForManifest {
test_command_hash, ..
} => {
// okay
test_command_hash
match run.state {
RunState::WaitingForManifest { .. } => {
run.state = RunState::Cancelled {
reason: CancelReason::ManifestNeverReceived,
};
// NB: Always sub last for conversative estimation.
self.num_active.fetch_sub(1, atomic::ORDERING);
}
RunState::Cancelled { .. } => {
// No-op, since the run was already cancelled.
return;
}
RunState::HasWork { .. } | RunState::InitialManifestDone { .. } => {
illegal_state!(
"attempting to mark failed to receive manifest after manifest was received",
?run_id
);
return;
}
};

run.state = RunState::InitialManifestDone {
new_worker_exit_code: ExitCode::FAILURE,
init_metadata: Default::default(),
seen_workers: Default::default(),
manifest_persistence: ManifestPersistence::ManifestNeverReceived,
results_persistence: ResultsPersistence::ManifestNeverReceived,
test_command_hash: Some(test_command_hash),
};

// NB: Always sub last for conversative estimation.
self.num_active.fetch_sub(1, atomic::ORDERING);
}

/// Marks a run as complete because it had the trivial manifest.
Expand Down Expand Up @@ -2221,7 +2195,7 @@ impl QueueServer {
mut stream: Box<dyn net_async::ServerStream>,
) -> OpaqueResult<()> {
// If a worker failed to generate a manifest, or the manifest is empty,
// we're going to immediately end the test run.
// we're going to immediately cancel the test run.
//
// In the former case this indicates a failure in the underlying test runners,
// and in the latter case we have nothing to do.
Expand Down Expand Up @@ -4978,21 +4952,6 @@ mod persist_results {
Ok(ReadResultsState::RunInProgress { active_runners }) if active_runners == &[Tag::runner(2, 1)]
}

get_read_results_cell! {
get_read_results_cell_when_done_with_manifest_never_received,
{
RunState::InitialManifestDone {
new_worker_exit_code: ExitCode::SUCCESS,
init_metadata: Default::default(),
seen_workers: Default::default(),
results_persistence: ResultsPersistence::ManifestNeverReceived,
manifest_persistence: ManifestPersistence::EmptyManifest,
test_command_hash: Some(TestCommandHash::random()),
}
},
Err(ReadResultsError::ManifestNeverReceived)
}

get_read_results_cell! {
get_read_results_cell_when_cancelled,
{
Expand Down
2 changes: 2 additions & 0 deletions crates/abq_utils/src/net_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,8 @@ pub mod queue {
User,
/// Timed out because no progress was made popping items off the manifest.
ManifestHadNoProgress,
/// Timed out because the manifest was never received
ManifestNeverReceived,
}

/// A request sent to the queue.
Expand Down
8 changes: 7 additions & 1 deletion crates/abq_workers/src/assigned_run.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use abq_utils::net_protocol::{entity::Entity, queue::InvokeWork};
use abq_utils::net_protocol::{
entity::Entity,
queue::{CancelReason, InvokeWork},
};
use async_trait::async_trait;
use serde_derive::{Deserialize, Serialize};

Expand Down Expand Up @@ -27,6 +30,9 @@ pub enum AssignedRunStatus {
AlreadyDone {
exit_code: abq_utils::exit::ExitCode,
},
Cancelled {
reason: CancelReason,
},
FatalError(String),
}

Expand Down
37 changes: 36 additions & 1 deletion crates/abq_workers/src/negotiate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use abq_utils::{
entity::{Entity, WorkerTag},
meta::DeprecationRecord,
publicize_addr,
queue::{InvokeWork, NegotiatorInfo},
queue::{CancelReason, InvokeWork, NegotiatorInfo},
workers::{RunId, RunnerKind},
},
results_handler::SharedResultsHandler,
Expand Down Expand Up @@ -81,6 +81,9 @@ enum MessageFromQueueNegotiator {
RunAlreadyCompleted {
exit_code: ExitCode,
},
RunCancelled {
reason: CancelReason,
},
/// The context a worker set should execute a run with.
ExecutionContext(ExecutionContext),
RunUnknown,
Expand Down Expand Up @@ -141,6 +144,8 @@ pub struct WorkersNegotiator(Box<dyn net::ClientStream>, WorkerContext);
pub enum NegotiatedWorkers {
/// No more workers were created, because there is no more work to be done.
Redundant { exit_code: ExitCode },
/// No more workers were created because the run is cancelled
Cancelled { error: String },
/// A pool of workers were created.
Pool(WorkerPool),
}
Expand All @@ -155,13 +160,23 @@ impl NegotiatedWorkers {
process_outputs: Default::default(),
native_runner_info: None,
},
NegotiatedWorkers::Cancelled { error } => WorkersExit {
status: WorkersExitStatus::Error {
errors: vec![error.to_string()],
},
manifest_generation_output: None,
final_stdio_outputs: Default::default(),
process_outputs: Default::default(),
native_runner_info: None,
},
NegotiatedWorkers::Pool(pool) => pool.shutdown().await,
}
}

pub async fn cancel(&mut self) {
match self {
NegotiatedWorkers::Redundant { .. } => {}
NegotiatedWorkers::Cancelled { .. } => {}
NegotiatedWorkers::Pool(pool) => pool.cancel().await,
}
}
Expand All @@ -170,13 +185,15 @@ impl NegotiatedWorkers {
pub async fn wait(&mut self) {
match self {
NegotiatedWorkers::Redundant { .. } => {}
NegotiatedWorkers::Cancelled { .. } => {}
NegotiatedWorkers::Pool(pool) => pool.wait().await,
}
}

pub fn workers_alive(&self) -> bool {
match self {
NegotiatedWorkers::Redundant { .. } => false,
NegotiatedWorkers::Cancelled { .. } => false,
NegotiatedWorkers::Pool(pool) => pool.workers_alive(),
}
}
Expand Down Expand Up @@ -325,6 +342,20 @@ async fn wait_for_execution_context(

let worker_set_decision = match net_protocol::async_read(&mut conn).await? {
MessageFromQueueNegotiator::ExecutionContext(ctx) => Ok(ctx),
MessageFromQueueNegotiator::RunCancelled { reason } => {
let error_suffix = match reason {
CancelReason::User => {
"a worker received a cancellation signal while still working on tests."
}
CancelReason::ManifestHadNoProgress => {
"the run timed out before any tests were completed."
}
CancelReason::ManifestNeverReceived => {
"the run timed out before the test manifest was received."
}
};
Err(NegotiatedWorkers::Cancelled { error: format!("{}{}", "Error: This ABQ run was cancelled. When an ABQ run is cancelled, it can no longer be retried. You must start a run with a new run ID instead.\nThis run was cancelled because ", error_suffix) })
}
MessageFromQueueNegotiator::RunAlreadyCompleted { exit_code } => {
Err(NegotiatedWorkers::Redundant { exit_code })
}
Expand Down Expand Up @@ -588,6 +619,10 @@ impl QueueNegotiator {
tracing::debug!(?run_id, "run already completed");
MessageFromQueueNegotiator::RunAlreadyCompleted { exit_code }
}
Cancelled { reason } => {
tracing::debug!(?run_id, "run cancelled");
MessageFromQueueNegotiator::RunCancelled { reason }
}
RunUnknown => {
tracing::debug!(?run_id, "run not yet known");
MessageFromQueueNegotiator::RunUnknown
Expand Down

0 comments on commit dbfa8a6

Please sign in to comment.