Skip to content

Commit

Permalink
history: replace remaining "query" mentions with "request"
Browse files Browse the repository at this point in the history
  • Loading branch information
muzarski committed Dec 24, 2024
1 parent de3c0e2 commit 71539ec
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 79 deletions.
132 changes: 66 additions & 66 deletions scylla/src/history.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Collecting history of query executions - retries, speculative, etc.
//! Collecting history of request executions - retries, speculative, etc.
use std::{
collections::BTreeMap,
fmt::{Debug, Display},
Expand All @@ -19,7 +19,7 @@ use tracing::warn;
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
pub struct RequestId(pub usize);

/// Id of a single attempt within a query, a single request sent on some connection.
/// Id of a single attempt within a request run - a single request sent on some connection.
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
pub struct AttemptId(pub usize);

Expand All @@ -35,17 +35,17 @@ pub struct SpeculativeId(pub usize);
/// `Query`, `PreparedStatement`, etc...\
/// The listener has to generate unique IDs for new requests, attempts and speculative fibers.
/// These ids are then used by the caller to identify them.\
/// It's important to note that even after a query is finished there still might come events related to it.
/// These events come from speculative futures that didn't notice the query is done already.
/// It's important to note that even after a request is finished there still might come events related to it.
/// These events come from speculative futures that didn't notice the request is done already.
pub trait HistoryListener: Debug + Send + Sync {
/// Log that a query has started on query start - right after the call to Session::{query,execute}_*/batch.
fn log_query_start(&self) -> RequestId;
/// Log that a request has started on request start - right after the call to Session::{query,execute}_*/batch.
fn log_request_start(&self) -> RequestId;

/// Log that query was successful - called right before returning the result from Session::query_*, execute_*, etc.
fn log_query_success(&self, request_id: RequestId);
/// Log that request was successful - called right before returning the result from Session::query_*, execute_*, etc.
fn log_request_success(&self, request_id: RequestId);

/// Log that query ended with an error - called right before returning the error from Session::query_*, execute_*, etc.
fn log_query_error(&self, request_id: RequestId, error: &TimeoutableRequestError);
/// Log that request ended with an error - called right before returning the error from Session::query_*, execute_*, etc.
fn log_request_error(&self, request_id: RequestId, error: &TimeoutableRequestError);

/// Log that a new speculative fiber has started.
fn log_new_speculative_fiber(&self, request_id: RequestId) -> SpeculativeId;
Expand All @@ -72,7 +72,7 @@ pub trait HistoryListener: Debug + Send + Sync {

pub type TimePoint = DateTime<Utc>;

/// HistoryCollector can be used as HistoryListener to collect all the query history events.
/// HistoryCollector can be used as HistoryListener to collect all the request history events.
/// Each event is marked with an UTC timestamp.
#[derive(Debug, Default)]
pub struct HistoryCollector {
Expand All @@ -89,9 +89,9 @@ pub struct HistoryCollectorData {

#[derive(Debug, Clone)]
pub enum HistoryEvent {
NewQuery(RequestId),
QuerySuccess(RequestId),
QueryError(RequestId, TimeoutableRequestError),
NewRequest(RequestId),
RequestSuccess(RequestId),
RequestError(RequestId, TimeoutableRequestError),
NewSpeculativeFiber(SpeculativeId, RequestId),
NewAttempt(AttemptId, RequestId, Option<SpeculativeId>, SocketAddr),
AttemptSuccess(AttemptId),
Expand Down Expand Up @@ -132,7 +132,7 @@ impl HistoryCollector {
}

/// Takes the data out of the collector. The collected events are cleared.\
/// It's possible that after finishing a query and taking out the events
/// It's possible that after finishing a request and taking out the events
/// new ones will still come - from requests that haven't been cancelled yet.
pub fn take_collected(&self) -> HistoryCollectorData {
self.do_with_data(|data| {
Expand Down Expand Up @@ -176,24 +176,24 @@ impl HistoryCollector {
}

impl HistoryListener for HistoryCollector {
fn log_query_start(&self) -> RequestId {
fn log_request_start(&self) -> RequestId {
self.do_with_data(|data| {
let new_request_id: RequestId = data.next_request_id;
data.next_request_id.0 += 1;
data.add_event(HistoryEvent::NewQuery(new_request_id));
data.add_event(HistoryEvent::NewRequest(new_request_id));
new_request_id
})
}

fn log_query_success(&self, request_id: RequestId) {
fn log_request_success(&self, request_id: RequestId) {
self.do_with_data(|data| {
data.add_event(HistoryEvent::QuerySuccess(request_id));
data.add_event(HistoryEvent::RequestSuccess(request_id));
})
}

fn log_query_error(&self, request_id: RequestId, error: &TimeoutableRequestError) {
fn log_request_error(&self, request_id: RequestId, error: &TimeoutableRequestError) {
self.do_with_data(|data| {
data.add_event(HistoryEvent::QueryError(request_id, error.clone()))
data.add_event(HistoryEvent::RequestError(request_id, error.clone()))
})
}

Expand Down Expand Up @@ -325,7 +325,7 @@ impl From<&HistoryCollectorData> for StructuredHistory {
None => warn!("StructuredHistory - attempt with id {:?} finished with an error but not created", attempt_id)
}
}
HistoryEvent::NewQuery(request_id) => {
HistoryEvent::NewRequest(request_id) => {
requests.insert(
*request_id,
RequestHistory {
Expand All @@ -339,14 +339,14 @@ impl From<&HistoryCollectorData> for StructuredHistory {
},
);
}
HistoryEvent::QuerySuccess(request_id) => {
if let Some(query) = requests.get_mut(request_id) {
query.result = Some(RequestHistoryResult::Success(*event_time));
HistoryEvent::RequestSuccess(request_id) => {
if let Some(request) = requests.get_mut(request_id) {
request.result = Some(RequestHistoryResult::Success(*event_time));
}
}
HistoryEvent::QueryError(request_id, error) => {
if let Some(query) = requests.get_mut(request_id) {
query.result =
HistoryEvent::RequestError(request_id, error) => {
if let Some(request) = requests.get_mut(request_id) {
request.result =
Some(RequestHistoryResult::Error(*event_time, error.clone()));
}
}
Expand All @@ -373,8 +373,8 @@ impl From<&HistoryCollectorData> for StructuredHistory {
}
}
None => {
if let Some(query) = requests.get_mut(request_id) {
query.non_speculative_fiber.attempts.push(attempt);
if let Some(request) = requests.get_mut(request_id) {
request.non_speculative_fiber.attempts.push(attempt);
}
}
}
Expand All @@ -386,8 +386,8 @@ impl From<&HistoryCollectorData> for StructuredHistory {
for (event, _) in &data.events {
if let HistoryEvent::NewSpeculativeFiber(speculative_id, request_id) = event {
if let Some(fiber) = fibers.remove(speculative_id) {
if let Some(query) = requests.get_mut(request_id) {
query.speculative_fibers.push(fiber);
if let Some(request) = requests.get_mut(request_id) {
request.speculative_fibers.push(fiber);
}
}
}
Expand All @@ -399,24 +399,24 @@ impl From<&HistoryCollectorData> for StructuredHistory {
}
}

/// StructuredHistory should be used for printing query history.
/// StructuredHistory should be used for printing request history.
impl Display for StructuredHistory {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f, "Requests History:")?;
for (i, query) in self.requests.iter().enumerate() {
for (i, request) in self.requests.iter().enumerate() {
writeln!(f, "=== Request #{} ===", i)?;
writeln!(f, "| start_time: {}", query.start_time)?;
writeln!(f, "| start_time: {}", request.start_time)?;
writeln!(f, "| Non-speculative attempts:")?;
write_fiber_attempts(&query.non_speculative_fiber, f)?;
for (spec_i, speculative_fiber) in query.speculative_fibers.iter().enumerate() {
write_fiber_attempts(&request.non_speculative_fiber, f)?;
for (spec_i, speculative_fiber) in request.speculative_fibers.iter().enumerate() {
writeln!(f, "|")?;
writeln!(f, "|")?;
writeln!(f, "| > Speculative fiber #{}", spec_i)?;
writeln!(f, "| fiber start time: {}", speculative_fiber.start_time)?;
write_fiber_attempts(speculative_fiber, f)?;
}
writeln!(f, "|")?;
match &query.result {
match &request.result {
Some(RequestHistoryResult::Success(succ_time)) => {
writeln!(f, "| Request successful at {}", succ_time)?;
}
Expand Down Expand Up @@ -485,16 +485,16 @@ mod tests {
Utc,
);

for query in &mut history.requests {
query.start_time = the_time;
match &mut query.result {
for request in &mut history.requests {
request.start_time = the_time;
match &mut request.result {
Some(RequestHistoryResult::Success(succ_time)) => *succ_time = the_time,
Some(RequestHistoryResult::Error(err_time, _)) => *err_time = the_time,
None => {}
};

for fiber in std::iter::once(&mut query.non_speculative_fiber)
.chain(query.speculative_fibers.iter_mut())
for fiber in std::iter::once(&mut request.non_speculative_fiber)
.chain(request.speculative_fibers.iter_mut())
{
fiber.start_time = the_time;
for attempt in &mut fiber.attempts {
Expand Down Expand Up @@ -556,11 +556,11 @@ mod tests {
}

#[test]
fn empty_query() {
fn empty_request() {
setup_tracing();
let history_collector = HistoryCollector::new();

let _request_id: RequestId = history_collector.log_query_start();
let _request_id: RequestId = history_collector.log_request_start();

let history: StructuredHistory = history_collector.clone_structured_history();

Expand Down Expand Up @@ -588,11 +588,11 @@ mod tests {
setup_tracing();
let history_collector = HistoryCollector::new();

let request_id: RequestId = history_collector.log_query_start();
let request_id: RequestId = history_collector.log_request_start();
let attempt_id: AttemptId =
history_collector.log_attempt_start(request_id, None, node1_addr());
history_collector.log_attempt_success(attempt_id);
history_collector.log_query_success(request_id);
history_collector.log_request_success(request_id);

let history: StructuredHistory = history_collector.clone_structured_history();

Expand Down Expand Up @@ -623,7 +623,7 @@ mod tests {
setup_tracing();
let history_collector = HistoryCollector::new();

let request_id: RequestId = history_collector.log_query_start();
let request_id: RequestId = history_collector.log_request_start();

let attempt_id: AttemptId =
history_collector.log_attempt_start(request_id, None, node1_addr());
Expand All @@ -641,7 +641,7 @@ mod tests {
&RetryDecision::DontRetry,
);

history_collector.log_query_error(
history_collector.log_request_error(
request_id,
&TimeoutableRequestError::RequestFailure(RetriableRequestError::RequestFailure(
unavailable_error(),
Expand Down Expand Up @@ -679,7 +679,7 @@ mod tests {
setup_tracing();
let history_collector = HistoryCollector::new();

let request_id: RequestId = history_collector.log_query_start();
let request_id: RequestId = history_collector.log_request_start();
history_collector.log_new_speculative_fiber(request_id);
history_collector.log_new_speculative_fiber(request_id);
history_collector.log_new_speculative_fiber(request_id);
Expand Down Expand Up @@ -730,7 +730,7 @@ mod tests {
setup_tracing();
let history_collector = HistoryCollector::new();

let request_id: RequestId = history_collector.log_query_start();
let request_id: RequestId = history_collector.log_request_start();

let attempt1: AttemptId =
history_collector.log_attempt_start(request_id, None, node1_addr());
Expand Down Expand Up @@ -774,7 +774,7 @@ mod tests {
history_collector.log_attempt_start(request_id, Some(speculative4), node2_addr());

history_collector.log_attempt_success(spec2_attempt2);
history_collector.log_query_success(request_id);
history_collector.log_request_success(request_id);

let history: StructuredHistory = history_collector.clone_structured_history();

Expand Down Expand Up @@ -836,24 +836,24 @@ mod tests {
setup_tracing();
let history_collector = HistoryCollector::new();

let query1_id: RequestId = history_collector.log_query_start();
let query1_attempt1: AttemptId =
history_collector.log_attempt_start(query1_id, None, node1_addr());
let request1_id: RequestId = history_collector.log_request_start();
let request1_attempt1: AttemptId =
history_collector.log_attempt_start(request1_id, None, node1_addr());
history_collector.log_attempt_error(
query1_attempt1,
request1_attempt1,
&unexpected_response(CqlResponseKind::Supported),
&RetryDecision::RetryNextNode(Some(Consistency::Quorum)),
);
let query1_attempt2: AttemptId =
history_collector.log_attempt_start(query1_id, None, node2_addr());
history_collector.log_attempt_success(query1_attempt2);
history_collector.log_query_success(query1_id);

let query2_id: RequestId = history_collector.log_query_start();
let query2_attempt1: AttemptId =
history_collector.log_attempt_start(query2_id, None, node1_addr());
history_collector.log_attempt_success(query2_attempt1);
history_collector.log_query_success(query2_id);
let request1_attempt2: AttemptId =
history_collector.log_attempt_start(request1_id, None, node2_addr());
history_collector.log_attempt_success(request1_attempt2);
history_collector.log_request_success(request1_id);

let request2_id: RequestId = history_collector.log_request_start();
let request2_attempt1: AttemptId =
history_collector.log_attempt_start(request2_id, None, node1_addr());
history_collector.log_attempt_success(request2_attempt1);
history_collector.log_request_success(request2_id);

let history: StructuredHistory = history_collector.clone_structured_history();

Expand Down
20 changes: 10 additions & 10 deletions scylla/src/transport/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ where
let mut last_error: RetriableRequestError = RetriableRequestError::EmptyPlan;
let mut current_consistency: Consistency = self.query_consistency;

self.log_query_start();
self.log_request_start();

'nodes_in_plan: for (node, shard) in query_plan {
let span =
Expand Down Expand Up @@ -267,7 +267,7 @@ where
}

let error: TimeoutableRequestError = last_error.into();
self.log_query_error(&error);
self.log_request_error(&error);
let (proof, _) = self.sender.send(Err(error.into_query_error())).await;
proof
}
Expand Down Expand Up @@ -330,7 +330,7 @@ where
}) => {
let _ = self.metrics.log_query_latency(elapsed.as_millis() as u64);
self.log_attempt_success();
self.log_query_success();
self.log_request_success();
self.execution_profile
.load_balancing_policy
.on_request_success(&self.statement_info, elapsed, node);
Expand Down Expand Up @@ -358,7 +358,7 @@ where

// Query succeeded, reset retry policy for future retries
self.retry_session.reset();
self.log_query_start();
self.log_request_start();

Ok(ControlFlow::Continue(()))
}
Expand Down Expand Up @@ -393,16 +393,16 @@ where
}
}

fn log_query_start(&mut self) {
fn log_request_start(&mut self) {
let history_listener: &dyn HistoryListener = match &self.history_listener {
Some(hl) => &**hl,
None => return,
};

self.current_request_id = Some(history_listener.log_query_start());
self.current_request_id = Some(history_listener.log_request_start());
}

fn log_query_success(&mut self) {
fn log_request_success(&mut self) {
let history_listener: &dyn HistoryListener = match &self.history_listener {
Some(hl) => &**hl,
None => return,
Expand All @@ -413,10 +413,10 @@ where
None => return,
};

history_listener.log_query_success(request_id);
history_listener.log_request_success(request_id);
}

fn log_query_error(&mut self, error: &TimeoutableRequestError) {
fn log_request_error(&mut self, error: &TimeoutableRequestError) {
let history_listener: &dyn HistoryListener = match &self.history_listener {
Some(hl) => &**hl,
None => return,
Expand All @@ -427,7 +427,7 @@ where
None => return,
};

history_listener.log_query_error(request_id, error);
history_listener.log_request_error(request_id, error);
}

fn log_attempt_start(&mut self, node_addr: SocketAddr) {
Expand Down
Loading

0 comments on commit 71539ec

Please sign in to comment.