Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix panic in speculative execution #1086

Merged
merged 2 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 11 additions & 15 deletions scylla/src/transport/speculative_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,21 +127,17 @@ where
}
}
res = async_tasks.select_next_some() => {
match res {
Some(r) => {
if !can_be_ignored(&r) {
return r;
} else {
last_error = Some(r)
}
},
None => {
if async_tasks.is_empty() && retries_remaining == 0 {
return last_error.unwrap_or({
Err(EMPTY_PLAN_ERROR)
});
}
},
if let Some(r) = res {
if !can_be_ignored(&r) {
return r;
} else {
last_error = Some(r)
}
}
if async_tasks.is_empty() && retries_remaining == 0 {
return last_error.unwrap_or({
Err(EMPTY_PLAN_ERROR)
});
}
}
}
Expand Down
68 changes: 68 additions & 0 deletions scylla/tests/integration/retries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,71 @@ async fn retries_occur() {
Err(err) => panic!("{}", err),
}
}

// See https://github.com/scylladb/scylla-rust-driver/issues/1085
#[tokio::test]
#[ntest::timeout(30000)]
#[cfg(not(scylla_cloud_tests))]
async fn speculative_execution_panic_regression_test() {
use scylla_proxy::RunningProxy;

setup_tracing();
let test = |proxy_uris: [String; 3], translation_map, mut running_proxy: RunningProxy| async move {
let se = SimpleSpeculativeExecutionPolicy {
max_retry_count: 2,
retry_interval: Duration::from_millis(1),
};
let profile = ExecutionProfile::builder()
.speculative_execution_policy(Some(Arc::new(se)))
.retry_policy(Box::new(FallthroughRetryPolicy))
.build();
// DB preparation phase
let session: Session = SessionBuilder::new()
.known_node(proxy_uris[0].as_str())
.address_translator(Arc::new(translation_map))
.default_execution_profile_handle(profile.into_handle())
.build()
.await
.unwrap();

let ks = unique_keyspace_name();
session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", ks), &[]).await.unwrap();
session.use_keyspace(ks, false).await.unwrap();
session
.query_unpaged("CREATE TABLE t (a int primary key)", &[])
.await
.unwrap();

let mut q = session
.prepare("INSERT INTO t (a) VALUES (?)")
.await
.unwrap();
q.set_is_idempotent(true); // this is to allow speculative execution to fire
let id: &[u8] = q.get_id();

let drop_connection_on_execute = RequestRule(
Condition::RequestOpcode(RequestOpcode::Execute)
.and(Condition::not(Condition::ConnectionRegisteredAnyEvent))
.and(Condition::BodyContainsCaseSensitive(id.into())),
RequestReaction::drop_connection(),
);

running_proxy.running_nodes[0]
.change_request_rules(Some(vec![drop_connection_on_execute.clone()]));
running_proxy.running_nodes[1]
.change_request_rules(Some(vec![drop_connection_on_execute.clone()]));
running_proxy.running_nodes[2]
.change_request_rules(Some(vec![drop_connection_on_execute.clone()]));

let _result = session.execute_unpaged(&q, (2,)).await.unwrap_err();

running_proxy
};
let res = test_with_3_node_cluster(ShardAwareness::QueryNode, test).await;

match res {
Ok(()) => (),
Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => (),
Err(err) => panic!("{}", err),
}
}
Loading