diff --git a/scylla/src/history.rs b/scylla/src/history.rs index c358eac97..6844bef19 100644 --- a/scylla/src/history.rs +++ b/scylla/src/history.rs @@ -1,4 +1,4 @@ -//! Collecting history of query executions - retries, speculative, etc. +//! Collecting history of request executions - retries, speculative, etc. use std::{ collections::BTreeMap, fmt::{Debug, Display}, @@ -19,7 +19,7 @@ use tracing::warn; #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)] pub struct RequestId(pub usize); -/// Id of a single attempt within a query, a single request sent on some connection. +/// Id of a single attempt within a request run - a single request sent on some connection. #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)] pub struct AttemptId(pub usize); @@ -35,17 +35,17 @@ pub struct SpeculativeId(pub usize); /// `Query`, `PreparedStatement`, etc...\ /// The listener has to generate unique IDs for new requests, attempts and speculative fibers. /// These ids are then used by the caller to identify them.\ -/// It's important to note that even after a query is finished there still might come events related to it. -/// These events come from speculative futures that didn't notice the query is done already. +/// It's important to note that even after a request is finished there still might come events related to it. +/// These events come from speculative futures that didn't notice the request is done already. pub trait HistoryListener: Debug + Send + Sync { - /// Log that a query has started on query start - right after the call to Session::{query,execute}_*/batch. - fn log_query_start(&self) -> RequestId; + /// Log that a request has started on request start - right after the call to Session::{query,execute}_*/batch. + fn log_request_start(&self) -> RequestId; - /// Log that query was successful - called right before returning the result from Session::query_*, execute_*, etc. - fn log_query_success(&self, request_id: RequestId); + /// Log that request was successful - called right before returning the result from Session::query_*, execute_*, etc. + fn log_request_success(&self, request_id: RequestId); - /// Log that query ended with an error - called right before returning the error from Session::query_*, execute_*, etc. - fn log_query_error(&self, request_id: RequestId, error: &TimeoutableRequestError); + /// Log that request ended with an error - called right before returning the error from Session::query_*, execute_*, etc. + fn log_request_error(&self, request_id: RequestId, error: &TimeoutableRequestError); /// Log that a new speculative fiber has started. fn log_new_speculative_fiber(&self, request_id: RequestId) -> SpeculativeId; @@ -72,7 +72,7 @@ pub trait HistoryListener: Debug + Send + Sync { pub type TimePoint = DateTime; -/// HistoryCollector can be used as HistoryListener to collect all the query history events. +/// HistoryCollector can be used as HistoryListener to collect all the request history events. /// Each event is marked with an UTC timestamp. #[derive(Debug, Default)] pub struct HistoryCollector { @@ -89,9 +89,9 @@ pub struct HistoryCollectorData { #[derive(Debug, Clone)] pub enum HistoryEvent { - NewQuery(RequestId), - QuerySuccess(RequestId), - QueryError(RequestId, TimeoutableRequestError), + NewRequest(RequestId), + RequestSuccess(RequestId), + RequestError(RequestId, TimeoutableRequestError), NewSpeculativeFiber(SpeculativeId, RequestId), NewAttempt(AttemptId, RequestId, Option, SocketAddr), AttemptSuccess(AttemptId), @@ -132,7 +132,7 @@ impl HistoryCollector { } /// Takes the data out of the collector. The collected events are cleared.\ - /// It's possible that after finishing a query and taking out the events + /// It's possible that after finishing a request and taking out the events /// new ones will still come - from requests that haven't been cancelled yet. pub fn take_collected(&self) -> HistoryCollectorData { self.do_with_data(|data| { @@ -176,24 +176,24 @@ impl HistoryCollector { } impl HistoryListener for HistoryCollector { - fn log_query_start(&self) -> RequestId { + fn log_request_start(&self) -> RequestId { self.do_with_data(|data| { let new_request_id: RequestId = data.next_request_id; data.next_request_id.0 += 1; - data.add_event(HistoryEvent::NewQuery(new_request_id)); + data.add_event(HistoryEvent::NewRequest(new_request_id)); new_request_id }) } - fn log_query_success(&self, request_id: RequestId) { + fn log_request_success(&self, request_id: RequestId) { self.do_with_data(|data| { - data.add_event(HistoryEvent::QuerySuccess(request_id)); + data.add_event(HistoryEvent::RequestSuccess(request_id)); }) } - fn log_query_error(&self, request_id: RequestId, error: &TimeoutableRequestError) { + fn log_request_error(&self, request_id: RequestId, error: &TimeoutableRequestError) { self.do_with_data(|data| { - data.add_event(HistoryEvent::QueryError(request_id, error.clone())) + data.add_event(HistoryEvent::RequestError(request_id, error.clone())) }) } @@ -325,7 +325,7 @@ impl From<&HistoryCollectorData> for StructuredHistory { None => warn!("StructuredHistory - attempt with id {:?} finished with an error but not created", attempt_id) } } - HistoryEvent::NewQuery(request_id) => { + HistoryEvent::NewRequest(request_id) => { requests.insert( *request_id, RequestHistory { @@ -339,14 +339,14 @@ impl From<&HistoryCollectorData> for StructuredHistory { }, ); } - HistoryEvent::QuerySuccess(request_id) => { - if let Some(query) = requests.get_mut(request_id) { - query.result = Some(RequestHistoryResult::Success(*event_time)); + HistoryEvent::RequestSuccess(request_id) => { + if let Some(request) = requests.get_mut(request_id) { + request.result = Some(RequestHistoryResult::Success(*event_time)); } } - HistoryEvent::QueryError(request_id, error) => { - if let Some(query) = requests.get_mut(request_id) { - query.result = + HistoryEvent::RequestError(request_id, error) => { + if let Some(request) = requests.get_mut(request_id) { + request.result = Some(RequestHistoryResult::Error(*event_time, error.clone())); } } @@ -373,8 +373,8 @@ impl From<&HistoryCollectorData> for StructuredHistory { } } None => { - if let Some(query) = requests.get_mut(request_id) { - query.non_speculative_fiber.attempts.push(attempt); + if let Some(request) = requests.get_mut(request_id) { + request.non_speculative_fiber.attempts.push(attempt); } } } @@ -386,8 +386,8 @@ impl From<&HistoryCollectorData> for StructuredHistory { for (event, _) in &data.events { if let HistoryEvent::NewSpeculativeFiber(speculative_id, request_id) = event { if let Some(fiber) = fibers.remove(speculative_id) { - if let Some(query) = requests.get_mut(request_id) { - query.speculative_fibers.push(fiber); + if let Some(request) = requests.get_mut(request_id) { + request.speculative_fibers.push(fiber); } } } @@ -399,16 +399,16 @@ impl From<&HistoryCollectorData> for StructuredHistory { } } -/// StructuredHistory should be used for printing query history. +/// StructuredHistory should be used for printing request history. impl Display for StructuredHistory { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { writeln!(f, "Requests History:")?; - for (i, query) in self.requests.iter().enumerate() { + for (i, request) in self.requests.iter().enumerate() { writeln!(f, "=== Request #{} ===", i)?; - writeln!(f, "| start_time: {}", query.start_time)?; + writeln!(f, "| start_time: {}", request.start_time)?; writeln!(f, "| Non-speculative attempts:")?; - write_fiber_attempts(&query.non_speculative_fiber, f)?; - for (spec_i, speculative_fiber) in query.speculative_fibers.iter().enumerate() { + write_fiber_attempts(&request.non_speculative_fiber, f)?; + for (spec_i, speculative_fiber) in request.speculative_fibers.iter().enumerate() { writeln!(f, "|")?; writeln!(f, "|")?; writeln!(f, "| > Speculative fiber #{}", spec_i)?; @@ -416,7 +416,7 @@ impl Display for StructuredHistory { write_fiber_attempts(speculative_fiber, f)?; } writeln!(f, "|")?; - match &query.result { + match &request.result { Some(RequestHistoryResult::Success(succ_time)) => { writeln!(f, "| Request successful at {}", succ_time)?; } @@ -485,16 +485,16 @@ mod tests { Utc, ); - for query in &mut history.requests { - query.start_time = the_time; - match &mut query.result { + for request in &mut history.requests { + request.start_time = the_time; + match &mut request.result { Some(RequestHistoryResult::Success(succ_time)) => *succ_time = the_time, Some(RequestHistoryResult::Error(err_time, _)) => *err_time = the_time, None => {} }; - for fiber in std::iter::once(&mut query.non_speculative_fiber) - .chain(query.speculative_fibers.iter_mut()) + for fiber in std::iter::once(&mut request.non_speculative_fiber) + .chain(request.speculative_fibers.iter_mut()) { fiber.start_time = the_time; for attempt in &mut fiber.attempts { @@ -556,11 +556,11 @@ mod tests { } #[test] - fn empty_query() { + fn empty_request() { setup_tracing(); let history_collector = HistoryCollector::new(); - let _request_id: RequestId = history_collector.log_query_start(); + let _request_id: RequestId = history_collector.log_request_start(); let history: StructuredHistory = history_collector.clone_structured_history(); @@ -588,11 +588,11 @@ mod tests { setup_tracing(); let history_collector = HistoryCollector::new(); - let request_id: RequestId = history_collector.log_query_start(); + let request_id: RequestId = history_collector.log_request_start(); let attempt_id: AttemptId = history_collector.log_attempt_start(request_id, None, node1_addr()); history_collector.log_attempt_success(attempt_id); - history_collector.log_query_success(request_id); + history_collector.log_request_success(request_id); let history: StructuredHistory = history_collector.clone_structured_history(); @@ -623,7 +623,7 @@ mod tests { setup_tracing(); let history_collector = HistoryCollector::new(); - let request_id: RequestId = history_collector.log_query_start(); + let request_id: RequestId = history_collector.log_request_start(); let attempt_id: AttemptId = history_collector.log_attempt_start(request_id, None, node1_addr()); @@ -641,7 +641,7 @@ mod tests { &RetryDecision::DontRetry, ); - history_collector.log_query_error( + history_collector.log_request_error( request_id, &TimeoutableRequestError::RequestFailure(RetriableRequestError::RequestFailure( unavailable_error(), @@ -679,7 +679,7 @@ mod tests { setup_tracing(); let history_collector = HistoryCollector::new(); - let request_id: RequestId = history_collector.log_query_start(); + let request_id: RequestId = history_collector.log_request_start(); history_collector.log_new_speculative_fiber(request_id); history_collector.log_new_speculative_fiber(request_id); history_collector.log_new_speculative_fiber(request_id); @@ -730,7 +730,7 @@ mod tests { setup_tracing(); let history_collector = HistoryCollector::new(); - let request_id: RequestId = history_collector.log_query_start(); + let request_id: RequestId = history_collector.log_request_start(); let attempt1: AttemptId = history_collector.log_attempt_start(request_id, None, node1_addr()); @@ -774,7 +774,7 @@ mod tests { history_collector.log_attempt_start(request_id, Some(speculative4), node2_addr()); history_collector.log_attempt_success(spec2_attempt2); - history_collector.log_query_success(request_id); + history_collector.log_request_success(request_id); let history: StructuredHistory = history_collector.clone_structured_history(); @@ -836,24 +836,24 @@ mod tests { setup_tracing(); let history_collector = HistoryCollector::new(); - let query1_id: RequestId = history_collector.log_query_start(); - let query1_attempt1: AttemptId = - history_collector.log_attempt_start(query1_id, None, node1_addr()); + let request1_id: RequestId = history_collector.log_request_start(); + let request1_attempt1: AttemptId = + history_collector.log_attempt_start(request1_id, None, node1_addr()); history_collector.log_attempt_error( - query1_attempt1, + request1_attempt1, &unexpected_response(CqlResponseKind::Supported), &RetryDecision::RetryNextNode(Some(Consistency::Quorum)), ); - let query1_attempt2: AttemptId = - history_collector.log_attempt_start(query1_id, None, node2_addr()); - history_collector.log_attempt_success(query1_attempt2); - history_collector.log_query_success(query1_id); - - let query2_id: RequestId = history_collector.log_query_start(); - let query2_attempt1: AttemptId = - history_collector.log_attempt_start(query2_id, None, node1_addr()); - history_collector.log_attempt_success(query2_attempt1); - history_collector.log_query_success(query2_id); + let request1_attempt2: AttemptId = + history_collector.log_attempt_start(request1_id, None, node2_addr()); + history_collector.log_attempt_success(request1_attempt2); + history_collector.log_request_success(request1_id); + + let request2_id: RequestId = history_collector.log_request_start(); + let request2_attempt1: AttemptId = + history_collector.log_attempt_start(request2_id, None, node1_addr()); + history_collector.log_attempt_success(request2_attempt1); + history_collector.log_request_success(request2_id); let history: StructuredHistory = history_collector.clone_structured_history(); diff --git a/scylla/src/transport/iterator.rs b/scylla/src/transport/iterator.rs index 3df21a26e..f2976a062 100644 --- a/scylla/src/transport/iterator.rs +++ b/scylla/src/transport/iterator.rs @@ -171,7 +171,7 @@ where let mut last_error: RetriableRequestError = RetriableRequestError::EmptyPlan; let mut current_consistency: Consistency = self.query_consistency; - self.log_query_start(); + self.log_request_start(); 'nodes_in_plan: for (node, shard) in query_plan { let span = @@ -267,7 +267,7 @@ where } let error: TimeoutableRequestError = last_error.into(); - self.log_query_error(&error); + self.log_request_error(&error); let (proof, _) = self.sender.send(Err(error.into_query_error())).await; proof } @@ -330,7 +330,7 @@ where }) => { let _ = self.metrics.log_query_latency(elapsed.as_millis() as u64); self.log_attempt_success(); - self.log_query_success(); + self.log_request_success(); self.execution_profile .load_balancing_policy .on_request_success(&self.statement_info, elapsed, node); @@ -358,7 +358,7 @@ where // Query succeeded, reset retry policy for future retries self.retry_session.reset(); - self.log_query_start(); + self.log_request_start(); Ok(ControlFlow::Continue(())) } @@ -393,16 +393,16 @@ where } } - fn log_query_start(&mut self) { + fn log_request_start(&mut self) { let history_listener: &dyn HistoryListener = match &self.history_listener { Some(hl) => &**hl, None => return, }; - self.current_request_id = Some(history_listener.log_query_start()); + self.current_request_id = Some(history_listener.log_request_start()); } - fn log_query_success(&mut self) { + fn log_request_success(&mut self) { let history_listener: &dyn HistoryListener = match &self.history_listener { Some(hl) => &**hl, None => return, @@ -413,10 +413,10 @@ where None => return, }; - history_listener.log_query_success(request_id); + history_listener.log_request_success(request_id); } - fn log_query_error(&mut self, error: &TimeoutableRequestError) { + fn log_request_error(&mut self, error: &TimeoutableRequestError) { let history_listener: &dyn HistoryListener = match &self.history_listener { Some(hl) => &**hl, None => return, @@ -427,7 +427,7 @@ where None => return, }; - history_listener.log_query_error(request_id, error); + history_listener.log_request_error(request_id, error); } fn log_attempt_start(&mut self, node_addr: SocketAddr) { diff --git a/scylla/src/transport/session.rs b/scylla/src/transport/session.rs index ca14a0abd..c03ff4af0 100644 --- a/scylla/src/transport/session.rs +++ b/scylla/src/transport/session.rs @@ -1946,7 +1946,7 @@ where statement_config .history_listener .as_ref() - .map(|hl| (&**hl, hl.log_query_start())); + .map(|hl| (&**hl, hl.log_request_start())); let load_balancer = &execution_profile.load_balancing_policy; @@ -2078,8 +2078,8 @@ where if let Some((history_listener, request_id)) = history_listener_and_id { match &result { - Ok(_) => history_listener.log_query_success(request_id), - Err(e) => history_listener.log_query_error(request_id, e), + Ok(_) => history_listener.log_request_success(request_id), + Err(e) => history_listener.log_request_error(request_id, e), } }