Skip to content

Commit

Permalink
Merge pull request #1086 from Lorak-mmk/fix-speculative-execution-race
Browse files Browse the repository at this point in the history
Fix panic in speculative execution
  • Loading branch information
Lorak-mmk authored Oct 10, 2024
2 parents 4aa7305 + 630f8c3 commit afa91f2
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 15 deletions.
26 changes: 11 additions & 15 deletions scylla/src/transport/speculative_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,21 +127,17 @@ where
}
}
res = async_tasks.select_next_some() => {
match res {
Some(r) => {
if !can_be_ignored(&r) {
return r;
} else {
last_error = Some(r)
}
},
None => {
if async_tasks.is_empty() && retries_remaining == 0 {
return last_error.unwrap_or({
Err(EMPTY_PLAN_ERROR)
});
}
},
if let Some(r) = res {
if !can_be_ignored(&r) {
return r;
} else {
last_error = Some(r)
}
}
if async_tasks.is_empty() && retries_remaining == 0 {
return last_error.unwrap_or({
Err(EMPTY_PLAN_ERROR)
});
}
}
}
Expand Down
68 changes: 68 additions & 0 deletions scylla/tests/integration/retries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,71 @@ async fn retries_occur() {
Err(err) => panic!("{}", err),
}
}

// See https://github.com/scylladb/scylla-rust-driver/issues/1085
#[tokio::test]
#[ntest::timeout(30000)]
#[cfg(not(scylla_cloud_tests))]
async fn speculative_execution_panic_regression_test() {
use scylla_proxy::RunningProxy;

setup_tracing();
let test = |proxy_uris: [String; 3], translation_map, mut running_proxy: RunningProxy| async move {
let se = SimpleSpeculativeExecutionPolicy {
max_retry_count: 2,
retry_interval: Duration::from_millis(1),
};
let profile = ExecutionProfile::builder()
.speculative_execution_policy(Some(Arc::new(se)))
.retry_policy(Box::new(FallthroughRetryPolicy))
.build();
// DB preparation phase
let session: Session = SessionBuilder::new()
.known_node(proxy_uris[0].as_str())
.address_translator(Arc::new(translation_map))
.default_execution_profile_handle(profile.into_handle())
.build()
.await
.unwrap();

let ks = unique_keyspace_name();
session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", ks), &[]).await.unwrap();
session.use_keyspace(ks, false).await.unwrap();
session
.query_unpaged("CREATE TABLE t (a int primary key)", &[])
.await
.unwrap();

let mut q = session
.prepare("INSERT INTO t (a) VALUES (?)")
.await
.unwrap();
q.set_is_idempotent(true); // this is to allow speculative execution to fire
let id: &[u8] = q.get_id();

let drop_connection_on_execute = RequestRule(
Condition::RequestOpcode(RequestOpcode::Execute)
.and(Condition::not(Condition::ConnectionRegisteredAnyEvent))
.and(Condition::BodyContainsCaseSensitive(id.into())),
RequestReaction::drop_connection(),
);

running_proxy.running_nodes[0]
.change_request_rules(Some(vec![drop_connection_on_execute.clone()]));
running_proxy.running_nodes[1]
.change_request_rules(Some(vec![drop_connection_on_execute.clone()]));
running_proxy.running_nodes[2]
.change_request_rules(Some(vec![drop_connection_on_execute.clone()]));

let _result = session.execute_unpaged(&q, (2,)).await.unwrap_err();

running_proxy
};
let res = test_with_3_node_cluster(ShardAwareness::QueryNode, test).await;

match res {
Ok(()) => (),
Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => (),
Err(err) => panic!("{}", err),
}
}

0 comments on commit afa91f2

Please sign in to comment.