Skip to content

Commit

Permalink
Instruct runners to always pull from online queue after getting their…
Browse files Browse the repository at this point in the history
… server-side retry manifest (#81)
  • Loading branch information
ayazhafiz authored Dec 5, 2023
1 parent a1942ce commit e9d4285
Show file tree
Hide file tree
Showing 11 changed files with 455 additions and 82 deletions.
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
## 1.7.0

ABQ 1.7.0 is a minor release.

This release improves ABQ's behavior when an ABQ runner is terminated before
being assigned all applicable test in a run manifest. In previous versions of
ABQ, retrying such a runner would only retry the tests the runner was assigned
before it terminated. Starting with ABQ 1.7.0, a runner that connects for a run
ID after it was terminated will run all tests it ran on its first connection,
and then pull tests from the run queue.

ABQ continues to cancel runs when a runner is terminated with SIGTERM, SIGINT,
or SIGQUIT. The changes in 1.7.0 apply to runners terminated in other ways, for
example via SIGKILL.

## 1.6.4

ABQ 1.6.4 is a patch release.
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/abq_cli/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "abq"
version = "1.6.4"
version = "1.7.0"
edition = "2021"

[dependencies]
Expand Down
2 changes: 1 addition & 1 deletion crates/abq_cli/src/report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ async fn wait_for_results_help(
}
RunInProgress { active_runners } => {
if active_runners.is_empty() {
bail!("this ABQ run has not assigned all tests in your test suite, but there are no active runners to assign them to. Please either add more runners, or launch a new run.")
bail!("this ABQ run has not assigned all tests in your test suite, but there are no active runners to assign them to. Please retry a runner, add more runners, or launch a new run.")
}

let active_runners = active_runners
Expand Down
162 changes: 158 additions & 4 deletions crates/abq_cli/tests/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,12 +417,15 @@ fn wait_for_live_worker(worker_stderr: &mut ChildStderr) {

// Waits for the debug line "starting execution of all tests" in the worker.
fn wait_for_worker_executing(worker_stderr: &mut ChildStderr) {
let mut worker_reader = BufReader::new(worker_stderr).lines();
// Spin until we know the worker0 is UP
wait_for_line(worker_stderr, "starting execution of all tests");
}

fn wait_for_line<R: std::io::Read>(reader: R, output: &str) {
let mut reader = BufReader::new(reader).lines();
loop {
if let Some(line) = worker_reader.next() {
if let Some(line) = reader.next() {
let line = line.expect("line is not a string");
if line.contains("starting execution of all tests") {
if line.contains(output) {
break;
}
}
Expand Down Expand Up @@ -5017,3 +5020,154 @@ fn write_partial_rwx_v1_json_results_on_early_runner_termination() {
}
"###);
}

#[test]
#[with_protocol_version]
#[serial]
fn retry_continued_manifest_read_on_worker_death() {
let name = "retry_continued_manifest_read_on_worker_death";
let conf = CSConfigOptions {
use_auth_token: true,
tls: true,
};

let (_queue_proc, queue_addr) = setup_queue!(name, conf);

let proto = AbqProtocolVersion::V0_2.get_supported_witness().unwrap();

let manifest = (0..4)
.map(|i| {
TestOrGroup::test(Test::new(
proto,
format!("test{}", i),
[],
Default::default(),
))
})
.collect::<Vec<_>>();

let manifest = ManifestMessage::new(Manifest::new(manifest, Default::default()));

let make_simulation = |i| {
[
Connect,
//
// Write spawn message
OpaqueWrite(pack(legal_spawned_message(proto))),
//
// Write the manifest if we need to.
// Otherwise handle the one test.
IfGenerateManifest {
then_do: vec![OpaqueWrite(pack(&manifest))],
else_do: {
let mut actions = vec![
//
// Read init context message + write ACK
OpaqueRead,
OpaqueWrite(pack(InitSuccessMessage::new(proto))),
// Read first test, write okay
IfAliveReadAndWriteFake(Status::Success),
Stdout("finished running first test\n".into()),
];
if i == 1 {
// First run: sleep forever, we will kill the worker.
actions.push(Sleep(Duration::from_secs(600)));
} else {
for _ in 0..3 {
actions.push(IfAliveReadAndWriteFake(Status::Success));
}
}
actions
},
},
//
// Finish
Exit(0),
]
};

let simulation1 = make_simulation(1);
let simulation2 = make_simulation(2);

let packed = pack_msgs_to_disk(simulation1);

let run_id = "test-run-id";

let test_args = {
let simulator = native_runner_simulation_bin();
let simfile_path = packed.path.display().to_string();
let args = vec![
format!("test"),
format!("--worker=0"),
format!("--queue-addr={queue_addr}"),
format!("--run-id={run_id}"),
format!("--batch-size=1"),
];
let mut args = conf.extend_args_for_client(args);
args.extend([s!("--"), simulator, simfile_path]);
args
};

let report_args = {
let args = vec![
format!("report"),
format!("--reporter=dot"),
format!("--queue-addr={queue_addr}"),
format!("--run-id={run_id}"),
format!("--color=never"),
];
conf.extend_args_for_client(args)
};

let mut worker0_attempt1 = Abq::new(format!("{name}_worker0_attempt1"))
.args(&test_args)
.spawn();

let attempt1_stdout = worker0_attempt1.stdout.as_mut().unwrap();
wait_for_line(attempt1_stdout, "finished running first test");

// Kill the worker.
worker0_attempt1.kill().unwrap();

{
let CmdOutput { exit_status, .. } = Abq::new(name.to_string() + "_report1")
.args(&report_args)
.run();
assert!(!exit_status.success());
}

std::fs::write(packed.path, pack_msgs(simulation2)).unwrap();

{
let CmdOutput {
stdout,
stderr,
exit_status,
} = Abq::new(format!("{name}_worker0_attempt1"))
.args(test_args)
.run();
assert!(
exit_status.success(),
"STDOUT:\n{stdout}\nSTDERR:\n{stderr}"
);
assert!(
stdout.contains("4 tests, 0 failures"),
"STDOUT:\n{stdout}\nSTDERR:\n{stderr}"
);
}

{
let CmdOutput {
stdout,
stderr,
exit_status,
} = Abq::new(name.to_string() + "_report2")
.args(report_args)
.run();
assert!(exit_status.success());
assert!(
stdout.contains("4 tests, 0 failures"),
"STDOUT:\n{stdout}\nSTDERR:\n{stderr}"
);
}
}
91 changes: 73 additions & 18 deletions crates/abq_queue/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ impl AllRuns {
runner_test_command_differs,
})
}
Some((old_entity, old_finished_state)) => {
Some((old_entity, old_finished_time)) => {
if old_entity.id == entity.id {
// The same worker entity is connecting twice - this implies that the
// same worker process is asking to find a run more than once, which
Expand Down Expand Up @@ -453,13 +453,13 @@ impl AllRuns {
// itself, and if this is done we could hand out the manifest right here,
// rather than asking the runner to reconnect to retrieve the manifest.
tracing::info!(
?old_finished_state,
?old_finished_time,
?entity,
"worker reconnecting for out-of-process retry manifest during active run"
);

AssignedRunStatus::Run(AssignedRun {
kind: AssignedRunKind::Retry,
kind: AssignedRunKind::RetryAndContinue,
runner_test_command_differs,
})
}
Expand Down Expand Up @@ -488,7 +488,7 @@ impl AllRuns {
}

AssignedRunStatus::Run(AssignedRun {
kind: AssignedRunKind::Retry,
kind: AssignedRunKind::RetryAndContinue,
runner_test_command_differs,
})
} else {
Expand Down Expand Up @@ -1305,21 +1305,11 @@ impl AllRuns {
// legal cancellation states
}
RunState::InitialManifestDone { seen_workers, .. } => {
// Since we already have issued the full manifest out, don't mark this run as
// cancelled; this might be a stragling worker or a worker that cancelled an
// out-of-process retry.
tracing::info!(
?run_id,
"refusing to cancel run whose manifest has already been exhausted"
);
// Mark the worker as now-inactive.
let old_tag = seen_workers.write().insert_by_tag(entity, false);
log_assert!(
old_tag.is_some(),
?entity,
?run_id,
"entity was not seen before it marked cancellation"
);
seen_workers.write().insert_by_tag(entity, false);
return;
}
}
Expand Down Expand Up @@ -3995,7 +3985,7 @@ mod test {
assert_eq!(
assigned,
AssignedRunStatus::Run(AssignedRun {
kind: AssignedRunKind::Retry,
kind: AssignedRunKind::RetryAndContinue,
runner_test_command_differs: false
})
);
Expand Down Expand Up @@ -4331,7 +4321,7 @@ mod persistence_on_end_of_manifest {

#[tokio::test]
#[with_protocol_version]
async fn worker_told_to_pull_retry_manifest() {
async fn worker_told_to_pull_retry_manifest_and_continue() {
let queues = SharedRuns::default();
let remote = remote::NoopPersister::new().into();

Expand Down Expand Up @@ -4388,10 +4378,75 @@ mod persistence_on_end_of_manifest {
.build(),
)
.await;

assert_eq!(
assigned,
AssignedRunStatus::Run(AssignedRun {
kind: AssignedRunKind::RetryAndContinue,
runner_test_command_differs: false
})
);
}

#[tokio::test]
#[with_protocol_version]
async fn worker_told_to_pull_retry_manifest_no_continue() {
let queues = SharedRuns::default();
let remote = remote::NoopPersister::new().into();

let run_id = RunId::unique();

let worker0 = Entity::runner(1, 1);
let worker0_shadow = Entity::runner(1, 1);
assert_ne!(worker0.id, worker0_shadow.id);
assert_eq!(worker0.tag, worker0_shadow.tag);

let test1 = fake_test_spec(proto);
let test2 = fake_test_spec(proto);
let test3 = fake_test_spec(proto);

let test_command_hash = TestCommandHash::random();

// Create run, add manifest by worker0
{
let run_params = RunParamsBuilder::new(&run_id, &remote)
.entity(worker0)
.runner_test_command_hash(test_command_hash)
.build();
let manifest = vec![
(test1.clone(), GroupId::new()),
(test2, GroupId::new()),
(test3, GroupId::new()),
];
let _ = queues.find_or_create_run(run_params).await;
let _ = queues.add_manifest(&run_id, manifest, Default::default());
}

// worker0 pulls tests
{
let NextWorkResult { bundle, .. } = queues.next_work(worker0, &run_id);
assert_eq!(
bundle.work,
vec![WorkerTest::new(test1.clone(), INIT_RUN_NUMBER)]
);
}

queues.mark_worker_complete(&run_id, worker0, std::time::Instant::now());

// Suppose worker0 re-runs.
let assigned = queues
.find_or_create_run(
RunParamsBuilder::new(&run_id, &remote)
.entity(worker0_shadow)
.runner_test_command_hash(test_command_hash)
.build(),
)
.await;

assert_eq!(
assigned,
AssignedRunStatus::Run(AssignedRun {
kind: AssignedRunKind::Retry,
kind: AssignedRunKind::RetryAndContinue,
runner_test_command_differs: false
})
);
Expand Down
4 changes: 2 additions & 2 deletions crates/abq_workers/src/assigned_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use serde_derive::{Deserialize, Serialize};
pub enum AssignedRunKind {
/// This worker is connecting for a fresh run, and should fetch tests online.
Fresh { should_generate_manifest: bool },
/// This worker is connecting for a retry, and should fetch its manifest from the queue once.
Retry,
/// This worker should pull its retry manifest, and then continue to fetch tests online.
RetryAndContinue,
}

#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
Expand Down
2 changes: 1 addition & 1 deletion crates/abq_workers/src/negotiate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ impl WorkersNegotiator {
AssignedRunKind::Fresh {
should_generate_manifest,
} => should_generate_manifest,
AssignedRunKind::Retry => false,
AssignedRunKind::RetryAndContinue => false,
};

let runner_strategy_generator = RunnerStrategyGenerator::new(
Expand Down
Loading

0 comments on commit e9d4285

Please sign in to comment.