From a5bb6f29a5a7844c1daa7260cd8120759059ccdd Mon Sep 17 00:00:00 2001 From: muzarski Date: Tue, 20 Aug 2024 17:00:43 +0200 Subject: [PATCH 01/33] conn_pool: narrow return error type to io::Error Previously, all functions related to selecting a connection, would return a QueryError::IoError. There is no need to return such broad error type as QueryError, when only std::io::Error can appear. --- scylla/src/transport/connection_pool.rs | 40 +++++++++++++------------ scylla/src/transport/iterator.rs | 2 +- scylla/src/transport/node.rs | 10 +++---- scylla/src/transport/session.rs | 2 +- 4 files changed, 28 insertions(+), 26 deletions(-) diff --git a/scylla/src/transport/connection_pool.rs b/scylla/src/transport/connection_pool.rs index 849ef8cb8d..2cde4cd462 100644 --- a/scylla/src/transport/connection_pool.rs +++ b/scylla/src/transport/connection_pool.rs @@ -234,7 +234,10 @@ impl NodeConnectionPool { .unwrap_or(None) } - pub(crate) fn connection_for_shard(&self, shard: Shard) -> Result, QueryError> { + pub(crate) fn connection_for_shard( + &self, + shard: Shard, + ) -> Result, std::io::Error> { trace!(shard = shard, "Selecting connection for shard"); self.with_connections(|pool_conns| match pool_conns { PoolConnections::NotSharded(conns) => { @@ -257,7 +260,7 @@ impl NodeConnectionPool { }) } - pub(crate) fn random_connection(&self) -> Result, QueryError> { + pub(crate) fn random_connection(&self) -> Result, std::io::Error> { trace!("Selecting random connection"); self.with_connections(|pool_conns| match pool_conns { PoolConnections::NotSharded(conns) => { @@ -341,7 +344,7 @@ impl NodeConnectionPool { } } - pub(crate) fn get_working_connections(&self) -> Result>, QueryError> { + pub(crate) fn get_working_connections(&self) -> Result>, std::io::Error> { self.with_connections(|pool_conns| match pool_conns { PoolConnections::NotSharded(conns) => conns.clone(), PoolConnections::Sharded { connections, .. } => { @@ -370,25 +373,24 @@ impl NodeConnectionPool { } } - fn with_connections(&self, f: impl FnOnce(&PoolConnections) -> T) -> Result { + fn with_connections( + &self, + f: impl FnOnce(&PoolConnections) -> T, + ) -> Result { let conns = self.conns.load_full(); match &*conns { MaybePoolConnections::Ready(pool_connections) => Ok(f(pool_connections)), - MaybePoolConnections::Broken(err) => { - Err(QueryError::IoError(Arc::new(std::io::Error::new( - ErrorKind::Other, - format!( - "No connections in the pool; last connection failed with: {}", - err - ), - )))) - } - MaybePoolConnections::Initializing => { - Err(QueryError::IoError(Arc::new(std::io::Error::new( - ErrorKind::Other, - "No connections in the pool, pool is still being initialized", - )))) - } + MaybePoolConnections::Broken(err) => Err(std::io::Error::new( + ErrorKind::Other, + format!( + "No connections in the pool; last connection failed with: {}", + err + ), + )), + MaybePoolConnections::Initializing => Err(std::io::Error::new( + ErrorKind::Other, + "No connections in the pool, pool is still being initialized", + )), } } } diff --git a/scylla/src/transport/iterator.rs b/scylla/src/transport/iterator.rs index cb5a8141c8..ace9989ee9 100644 --- a/scylla/src/transport/iterator.rs +++ b/scylla/src/transport/iterator.rs @@ -535,7 +535,7 @@ where error = %e, "Choosing connection failed" ); - last_error = e; + last_error = e.into(); // Broken connection doesn't count as a failed query, don't log in metrics continue 'nodes_in_plan; } diff --git a/scylla/src/transport/node.rs b/scylla/src/transport/node.rs index 02ca247bc5..e698820a82 100644 --- a/scylla/src/transport/node.rs +++ b/scylla/src/transport/node.rs @@ -157,7 +157,7 @@ impl Node { pub(crate) async fn connection_for_shard( &self, shard: Shard, - ) -> Result, QueryError> { + ) -> Result, std::io::Error> { self.get_pool()?.connection_for_shard(shard) } @@ -186,7 +186,7 @@ impl Node { Ok(()) } - pub(crate) fn get_working_connections(&self) -> Result>, QueryError> { + pub(crate) fn get_working_connections(&self) -> Result>, std::io::Error> { self.get_pool()?.get_working_connections() } @@ -196,13 +196,13 @@ impl Node { } } - fn get_pool(&self) -> Result<&NodeConnectionPool, QueryError> { + fn get_pool(&self) -> Result<&NodeConnectionPool, std::io::Error> { self.pool.as_ref().ok_or_else(|| { - QueryError::IoError(Arc::new(std::io::Error::new( + std::io::Error::new( std::io::ErrorKind::Other, "No connections in the pool: the node has been disabled \ by the host filter", - ))) + ) }) } } diff --git a/scylla/src/transport/session.rs b/scylla/src/transport/session.rs index 52a2f9cbe4..9ad92b53e7 100644 --- a/scylla/src/transport/session.rs +++ b/scylla/src/transport/session.rs @@ -1873,7 +1873,7 @@ impl Session { error = %e, "Choosing connection failed" ); - last_error = Some(e); + last_error = Some(e.into()); // Broken connection doesn't count as a failed query, don't log in metrics continue 'nodes_in_plan; } From 46c26a9068bd0a9d41f8922ed469a95c6bc53375 Mon Sep 17 00:00:00 2001 From: muzarski Date: Thu, 22 Aug 2024 16:47:33 +0200 Subject: [PATCH 02/33] connection: increase log level to error for non-event response Receiving a non-event response on a -1 stream should be considered an error. --- scylla/src/transport/connection.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scylla/src/transport/connection.rs b/scylla/src/transport/connection.rs index 456f08d386..311e16764f 100644 --- a/scylla/src/transport/connection.rs +++ b/scylla/src/transport/connection.rs @@ -1677,7 +1677,7 @@ impl Connection { let event = match response { Response::Event(e) => e, _ => { - warn!("Expected to receive Event response, got {:?}", response); + error!("Expected to receive Event response, got {:?}", response); return Ok(()); } }; From 215ada4a6cecaa88d08a767216a100556113655b Mon Sep 17 00:00:00 2001 From: muzarski Date: Tue, 17 Sep 2024 19:53:17 +0200 Subject: [PATCH 03/33] errors: introduce CqlResponseKind This type is introduced to provide more context to error types introduced later. --- scylla-cql/src/errors.rs | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/scylla-cql/src/errors.rs b/scylla-cql/src/errors.rs index fa96b2880c..509b6fd004 100644 --- a/scylla-cql/src/errors.rs +++ b/scylla-cql/src/errors.rs @@ -366,6 +366,37 @@ pub enum BadQuery { Other(String), } +/// Possible CQL responses received from the server +#[derive(Debug, Copy, Clone)] +#[non_exhaustive] +pub enum CqlResponseKind { + Error, + Ready, + Authenticate, + Supported, + Result, + Event, + AuthChallenge, + AuthSuccess, +} + +impl std::fmt::Display for CqlResponseKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let kind_str = match self { + CqlResponseKind::Error => "ERROR", + CqlResponseKind::Ready => "READY", + CqlResponseKind::Authenticate => "AUTHENTICATE", + CqlResponseKind::Supported => "SUPPORTED", + CqlResponseKind::Result => "RESULT", + CqlResponseKind::Event => "EVENT", + CqlResponseKind::AuthChallenge => "AUTH_CHALLENGE", + CqlResponseKind::AuthSuccess => "AUTH_SUCCESS", + }; + + f.write_str(kind_str) + } +} + /// Error that occurred during session creation #[derive(Error, Debug, Clone)] pub enum NewSessionError { From 78f3073df00f97896f3c32f8b8890a18680696a9 Mon Sep 17 00:00:00 2001 From: muzarski Date: Tue, 17 Sep 2024 19:56:40 +0200 Subject: [PATCH 04/33] response: get response kind from response and resp parse errors This utility functions will be used to create error types with more context. --- scylla-cql/src/frame/frame_errors.rs | 15 ++++++++++++++ scylla-cql/src/frame/response/mod.rs | 29 +++++++++++++++++++++++++++- 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/scylla-cql/src/frame/frame_errors.rs b/scylla-cql/src/frame/frame_errors.rs index 155491ef91..eef6d0ab47 100644 --- a/scylla-cql/src/frame/frame_errors.rs +++ b/scylla-cql/src/frame/frame_errors.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use super::TryFromPrimitiveError; use crate::cql_to_rust::CqlTypeError; +use crate::errors::CqlResponseKind; use crate::frame::value::SerializeValuesError; use crate::types::deserialize::{DeserializationError, TypeCheckError}; use crate::types::serialize::SerializationError; @@ -78,6 +79,20 @@ pub enum CqlResponseParseError { CqlResultParseError(#[from] CqlResultParseError), } +impl CqlResponseParseError { + pub fn to_response_kind(&self) -> CqlResponseKind { + match self { + CqlResponseParseError::CqlErrorParseError(_) => CqlResponseKind::Error, + CqlResponseParseError::CqlAuthChallengeParseError(_) => CqlResponseKind::AuthChallenge, + CqlResponseParseError::CqlAuthSuccessParseError(_) => CqlResponseKind::AuthSuccess, + CqlResponseParseError::CqlAuthenticateParseError(_) => CqlResponseKind::Authenticate, + CqlResponseParseError::CqlSupportedParseError(_) => CqlResponseKind::Supported, + CqlResponseParseError::CqlEventParseError(_) => CqlResponseKind::Event, + CqlResponseParseError::CqlResultParseError(_) => CqlResponseKind::Result, + } + } +} + /// An error type returned when deserialization of ERROR response fails. #[non_exhaustive] #[derive(Error, Debug, Clone)] diff --git a/scylla-cql/src/frame/response/mod.rs b/scylla-cql/src/frame/response/mod.rs index d084eb71c9..4cceb2197d 100644 --- a/scylla-cql/src/frame/response/mod.rs +++ b/scylla-cql/src/frame/response/mod.rs @@ -10,7 +10,7 @@ use std::sync::Arc; pub use error::Error; pub use supported::Supported; -use crate::errors::QueryError; +use crate::errors::{CqlResponseKind, QueryError}; use crate::frame::protocol_features::ProtocolFeatures; use crate::frame::response::result::ResultMetadata; use crate::frame::TryFromPrimitiveError; @@ -64,6 +64,19 @@ pub enum Response { } impl Response { + pub fn to_response_kind(&self) -> CqlResponseKind { + match self { + Response::Error(_) => CqlResponseKind::Error, + Response::Ready => CqlResponseKind::Ready, + Response::Result(_) => CqlResponseKind::Result, + Response::Authenticate(_) => CqlResponseKind::Authenticate, + Response::AuthSuccess(_) => CqlResponseKind::AuthSuccess, + Response::AuthChallenge(_) => CqlResponseKind::AuthChallenge, + Response::Supported(_) => CqlResponseKind::Supported, + Response::Event(_) => CqlResponseKind::Event, + } + } + pub fn deserialize( features: &ProtocolFeatures, opcode: ResponseOpcode, @@ -118,3 +131,17 @@ pub enum NonErrorResponse { Supported(Supported), Event(event::Event), } + +impl NonErrorResponse { + pub fn to_response_kind(&self) -> CqlResponseKind { + match self { + NonErrorResponse::Ready => CqlResponseKind::Ready, + NonErrorResponse::Result(_) => CqlResponseKind::Result, + NonErrorResponse::Authenticate(_) => CqlResponseKind::Authenticate, + NonErrorResponse::AuthSuccess(_) => CqlResponseKind::AuthSuccess, + NonErrorResponse::AuthChallenge(_) => CqlResponseKind::AuthChallenge, + NonErrorResponse::Supported(_) => CqlResponseKind::Supported, + NonErrorResponse::Event(_) => CqlResponseKind::Event, + } + } +} From dd6d32f4fdc9b02ba417ae165538a60f6112f1b0 Mon Sep 17 00:00:00 2001 From: muzarski Date: Tue, 20 Aug 2024 19:53:00 +0200 Subject: [PATCH 05/33] errors: BrokenConnectionError Introduced a BrokenConnectionError. Since users probably won't really be interested in enum-matching the cause of this error, we hide the kind of broken connection error behind Arc. Other reason for that is there will be a cycle of dependency between BrokenConnectionError and RequestError types (once RequestError is introduced later in this PR). As of now, the cycle is also visible, because of QueryError. We don't want to expose a cyclic error types to the users. The cause of the cycle is: - Request can fail due to connection being broken (RequestError -> BrokenConnectionError) - A keepaliver sends a request, which can fail. If keepaliver, returns an error, we close the connection. The cause of broken connection is now a keepaliver's request error (BrokenConnection -> RequestError) --- scylla-cql/src/errors.rs | 44 +++++++++++++ scylla/src/transport/connection.rs | 65 ++++++++++--------- .../src/transport/load_balancing/default.rs | 1 + 3 files changed, 78 insertions(+), 32 deletions(-) diff --git a/scylla-cql/src/errors.rs b/scylla-cql/src/errors.rs index 509b6fd004..c2b5635491 100644 --- a/scylla-cql/src/errors.rs +++ b/scylla-cql/src/errors.rs @@ -7,7 +7,9 @@ use crate::types::deserialize::{DeserializationError, TypeCheckError}; use crate::types::serialize::SerializationError; use crate::Consistency; use bytes::Bytes; +use std::error::Error; use std::io::ErrorKind; +use std::net::IpAddr; use std::sync::Arc; use thiserror::Error; @@ -45,6 +47,9 @@ pub enum QueryError { #[error("Too many orphaned stream ids: {0}")] TooManyOrphanedStreamIds(u16), + #[error(transparent)] + BrokenConnection(#[from] BrokenConnectionError), + #[error("Unable to allocate stream id")] UnableToAllocStreamId, @@ -440,6 +445,9 @@ pub enum NewSessionError { #[error("Too many orphaned stream ids: {0}")] TooManyOrphanedStreamIds(u16), + #[error(transparent)] + BrokenConnection(#[from] BrokenConnectionError), + #[error("Unable to allocate stream id")] UnableToAllocStreamId, @@ -469,6 +477,41 @@ pub enum BadKeyspaceName { IllegalCharacter(String, char), } +#[derive(Error, Debug, Clone)] +#[error("Connection broken, reason: {0}")] +pub struct BrokenConnectionError(Arc); + +#[derive(Error, Debug)] +#[non_exhaustive] +pub enum BrokenConnectionErrorKind { + #[error("Timed out while waiting for response to keepalive request on connection to node {0}")] + KeepaliveTimeout(IpAddr), + #[error("Failed to execute keepalive query: {0}")] + KeepaliveQueryError(QueryError), + #[error("Failed to deserialize frame: {0}")] + FrameError(FrameError), + #[error("Failed to handle server event: {0}")] + CqlEventHandlingError(QueryError), + #[error("Received a server frame with unexpected stream id: {0}")] + UnexpectedStreamId(i16), + #[error("Failed to write data: {0}")] + WriteError(std::io::Error), + #[error("Too many orphaned stream ids: {0}")] + TooManyOrphanedStreamIds(u16), + #[error( + "Failed to send/receive data needed to perform a request via tokio channel. + It implies that other half of the channel has been dropped. + The connection was already broken for some other reason." + )] + ChannelError, +} + +impl From for BrokenConnectionError { + fn from(value: BrokenConnectionErrorKind) -> Self { + BrokenConnectionError(Arc::new(value)) + } +} + impl std::fmt::Display for WriteType { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{:?}", self) @@ -542,6 +585,7 @@ impl From for NewSessionError { QueryError::TooManyOrphanedStreamIds(ids) => { NewSessionError::TooManyOrphanedStreamIds(ids) } + QueryError::BrokenConnection(e) => NewSessionError::BrokenConnection(e), QueryError::UnableToAllocStreamId => NewSessionError::UnableToAllocStreamId, QueryError::RequestTimeout(msg) => NewSessionError::RequestTimeout(msg), QueryError::TranslationError(e) => NewSessionError::TranslationError(e), diff --git a/scylla/src/transport/connection.rs b/scylla/src/transport/connection.rs index 311e16764f..650ea7a99d 100644 --- a/scylla/src/transport/connection.rs +++ b/scylla/src/transport/connection.rs @@ -1,6 +1,6 @@ use bytes::Bytes; use futures::{future::RemoteHandle, FutureExt}; -use scylla_cql::errors::TranslationError; +use scylla_cql::errors::{BrokenConnectionError, BrokenConnectionErrorKind, TranslationError}; use scylla_cql::frame::request::options::{self, Options}; use scylla_cql::frame::response::result::{ResultMetadata, TableSpec}; use scylla_cql::frame::response::Error; @@ -133,18 +133,12 @@ impl RouterHandle { response_handler, }) .await - .map_err(|_| { - QueryError::IoError(Arc::new(std::io::Error::new( - ErrorKind::Other, - "Connection broken", - ))) + .map_err(|_| -> BrokenConnectionError { + BrokenConnectionErrorKind::ChannelError.into() })?; - let task_response = receiver.await.map_err(|_| { - QueryError::IoError(Arc::new(std::io::Error::new( - ErrorKind::Other, - "Connection broken", - ))) + let task_response = receiver.await.map_err(|_| -> BrokenConnectionError { + BrokenConnectionErrorKind::ChannelError.into() })?; // Response was successfully received, so it's time to disable @@ -1421,7 +1415,7 @@ impl Connection { let error: QueryError = match result { Ok(_) => return, // Connection was dropped, we can return - Err(err) => err, + Err(err) => err.into(), }; // Respond to all pending requests with the error @@ -1441,9 +1435,11 @@ impl Connection { mut read_half: (impl AsyncRead + Unpin), handler_map: &StdMutex, config: ConnectionConfig, - ) -> Result<(), QueryError> { + ) -> Result<(), BrokenConnectionError> { loop { - let (params, opcode, body) = frame::read_response_frame(&mut read_half).await?; + let (params, opcode, body) = frame::read_response_frame(&mut read_half) + .await + .map_err(BrokenConnectionErrorKind::FrameError)?; let response = TaskResponse { params, opcode, @@ -1459,7 +1455,9 @@ impl Connection { } Ordering::Equal => { if let Some(event_sender) = config.event_sender.as_ref() { - Self::handle_event(response, config.compression, event_sender).await?; + Self::handle_event(response, config.compression, event_sender) + .await + .map_err(BrokenConnectionErrorKind::CqlEventHandlingError)? } continue; } @@ -1488,9 +1486,7 @@ impl Connection { "Received response with unexpected StreamId {}", params.stream ); - return Err(QueryError::ProtocolError( - "Received response with unexpected StreamId", - )); + return Err(BrokenConnectionErrorKind::UnexpectedStreamId(params.stream).into()); } Orphaned => { // Do nothing, handler was freed because this stream_id has @@ -1524,7 +1520,7 @@ impl Connection { handler_map: &StdMutex, mut task_receiver: mpsc::Receiver, enable_write_coalescing: bool, - ) -> Result<(), QueryError> { + ) -> Result<(), BrokenConnectionError> { // When the Connection object is dropped, the sender half // of the channel will be dropped, this task will return an error // and the whole worker will be stopped @@ -1537,7 +1533,10 @@ impl Connection { let req_data: &[u8] = req.get_data(); total_sent += req_data.len(); num_requests += 1; - write_half.write_all(req_data).await?; + write_half + .write_all(req_data) + .await + .map_err(BrokenConnectionErrorKind::WriteError)?; task = match task_receiver.try_recv() { Ok(t) => t, Err(_) if enable_write_coalescing => { @@ -1554,7 +1553,10 @@ impl Connection { } } trace!("Sending {} requests; {} bytes", num_requests, total_sent); - write_half.flush().await?; + write_half + .flush() + .await + .map_err(BrokenConnectionErrorKind::WriteError)?; } Ok(()) @@ -1567,7 +1569,7 @@ impl Connection { async fn orphaner( handler_map: &StdMutex, mut orphan_receiver: mpsc::UnboundedReceiver, - ) -> Result<(), QueryError> { + ) -> Result<(), BrokenConnectionError> { let mut interval = tokio::time::interval(OLD_AGE_ORPHAN_THRESHOLD); loop { tokio::select! { @@ -1581,7 +1583,7 @@ impl Connection { "Too many old orphaned stream ids: {}", old_orphan_count, ); - return Err(QueryError::TooManyOrphanedStreamIds(old_orphan_count as u16)) + return Err(BrokenConnectionErrorKind::TooManyOrphanedStreamIds(old_orphan_count as u16).into()) } } Some(request_id) = orphan_receiver.recv() => { @@ -1604,12 +1606,15 @@ impl Connection { keepalive_interval: Option, keepalive_timeout: Option, node_address: IpAddr, // This address is only used to enrich the log messages - ) -> Result<(), QueryError> { - async fn issue_keepalive_query(router_handle: &RouterHandle) -> Result<(), QueryError> { + ) -> Result<(), BrokenConnectionError> { + async fn issue_keepalive_query( + router_handle: &RouterHandle, + ) -> Result<(), BrokenConnectionError> { router_handle .send_request(&Options, None, false) .await .map(|_| ()) + .map_err(|q_err| BrokenConnectionErrorKind::KeepaliveQueryError(q_err).into()) } if let Some(keepalive_interval) = keepalive_interval { @@ -1631,13 +1636,9 @@ impl Connection { "Timed out while waiting for response to keepalive request on connection to node {}", node_address ); - return Err(QueryError::IoError(Arc::new(std::io::Error::new( - std::io::ErrorKind::Other, - format!( - "Timed out while waiting for response to keepalive request on connection to node {}", - node_address - ) - )))); + return Err( + BrokenConnectionErrorKind::KeepaliveTimeout(node_address).into() + ); } } } else { diff --git a/scylla/src/transport/load_balancing/default.rs b/scylla/src/transport/load_balancing/default.rs index 962cbd2440..5115a747e4 100644 --- a/scylla/src/transport/load_balancing/default.rs +++ b/scylla/src/transport/load_balancing/default.rs @@ -2837,6 +2837,7 @@ mod latency_awareness { match error { // "fast" errors, i.e. ones that are returned quickly after the query begins QueryError::BadQuery(_) + | QueryError::BrokenConnection(_) | QueryError::TooManyOrphanedStreamIds(_) | QueryError::UnableToAllocStreamId | QueryError::DbError(DbError::IsBootstrapping, _) From 632c76515805ca8863c8324994e6ad3a7cd19279 Mon Sep 17 00:00:00 2001 From: muzarski Date: Thu, 22 Aug 2024 17:02:07 +0200 Subject: [PATCH 06/33] errors: CqlEventHandlingError Introduced an error returned by `handle_event` method. In addition, adjusted the `handle_event` method's logic, so we propagate the error in case of receiving invalid response (other than EVENT) on stream -1. This way, we assume that this connection should be closed, and so the error will be handled by pool refiller. --- scylla-cql/src/errors.rs | 19 ++++++++++++++++++- scylla/src/transport/connection.rs | 21 +++++++++++---------- 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/scylla-cql/src/errors.rs b/scylla-cql/src/errors.rs index c2b5635491..20336a689b 100644 --- a/scylla-cql/src/errors.rs +++ b/scylla-cql/src/errors.rs @@ -491,7 +491,7 @@ pub enum BrokenConnectionErrorKind { #[error("Failed to deserialize frame: {0}")] FrameError(FrameError), #[error("Failed to handle server event: {0}")] - CqlEventHandlingError(QueryError), + CqlEventHandlingError(#[from] CqlEventHandlingError), #[error("Received a server frame with unexpected stream id: {0}")] UnexpectedStreamId(i16), #[error("Failed to write data: {0}")] @@ -506,6 +506,23 @@ pub enum BrokenConnectionErrorKind { ChannelError, } +/// Failed to handle a CQL event received on a stream -1. +/// Possible error kinds are: +/// - failed to deserialize server response +/// - received invalid server response +/// - failed to send an event info via channel (connection is probably broken) +#[derive(Error, Debug)] +#[non_exhaustive] +pub enum CqlEventHandlingError { + // FIXME: QueryError -> CqlEventParseError + #[error(transparent)] + QueryError(#[from] QueryError), + #[error("Received unexpected server response on stream -1: {0}. Expected EVENT response")] + UnexpectedResponse(CqlResponseKind), + #[error("Failed to send event info via channel. The channel is probably closed, which is caused by connection being broken")] + SendError, +} + impl From for BrokenConnectionError { fn from(value: BrokenConnectionErrorKind) -> Self { BrokenConnectionError(Arc::new(value)) diff --git a/scylla/src/transport/connection.rs b/scylla/src/transport/connection.rs index 650ea7a99d..fce796e61f 100644 --- a/scylla/src/transport/connection.rs +++ b/scylla/src/transport/connection.rs @@ -1,6 +1,8 @@ use bytes::Bytes; use futures::{future::RemoteHandle, FutureExt}; -use scylla_cql::errors::{BrokenConnectionError, BrokenConnectionErrorKind, TranslationError}; +use scylla_cql::errors::{ + BrokenConnectionError, BrokenConnectionErrorKind, CqlEventHandlingError, TranslationError, +}; use scylla_cql::frame::request::options::{self, Options}; use scylla_cql::frame::response::result::{ResultMetadata, TableSpec}; use scylla_cql::frame::response::Error; @@ -31,7 +33,6 @@ use crate::authentication::AuthenticatorProvider; use scylla_cql::frame::response::authenticate::Authenticate; use std::collections::{BTreeSet, HashMap, HashSet}; use std::convert::TryFrom; -use std::io::ErrorKind; use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; use std::sync::Mutex as StdMutex; @@ -1662,7 +1663,7 @@ impl Connection { task_response: TaskResponse, compression: Option, event_sender: &mpsc::Sender, - ) -> Result<(), QueryError> { + ) -> Result<(), CqlEventHandlingError> { // Protocol features are negotiated during connection handshake. // However, the router is already created and sent to a different tokio // task before the handshake begins, therefore it's hard to cleanly @@ -1679,16 +1680,16 @@ impl Connection { Response::Event(e) => e, _ => { error!("Expected to receive Event response, got {:?}", response); - return Ok(()); + return Err(CqlEventHandlingError::UnexpectedResponse( + response.to_response_kind(), + )); } }; - event_sender.send(event).await.map_err(|_| { - QueryError::IoError(Arc::new(std::io::Error::new( - ErrorKind::Other, - "Connection broken", - ))) - }) + event_sender + .send(event) + .await + .map_err(|_| CqlEventHandlingError::SendError) } pub(crate) fn get_shard_info(&self) -> &Option { From a56a9b055a4885bc9a92fd10a0bab918bf1f2062 Mon Sep 17 00:00:00 2001 From: muzarski Date: Thu, 22 Aug 2024 16:18:37 +0200 Subject: [PATCH 07/33] errors: ResponseParseError Introduced an error type returned by Connection::parse_response. This error type is introduced only to narrow the return type of this function. --- scylla-cql/src/errors.rs | 24 ++++++++++++++++++++++-- scylla/src/transport/connection.rs | 11 +++++++---- 2 files changed, 29 insertions(+), 6 deletions(-) diff --git a/scylla-cql/src/errors.rs b/scylla-cql/src/errors.rs index 20336a689b..89c7fc444a 100644 --- a/scylla-cql/src/errors.rs +++ b/scylla-cql/src/errors.rs @@ -514,21 +514,41 @@ pub enum BrokenConnectionErrorKind { #[derive(Error, Debug)] #[non_exhaustive] pub enum CqlEventHandlingError { - // FIXME: QueryError -> CqlEventParseError + // FIXME: ResponseParseError -> CqlEventParseError #[error(transparent)] - QueryError(#[from] QueryError), + QueryError(#[from] ResponseParseError), #[error("Received unexpected server response on stream -1: {0}. Expected EVENT response")] UnexpectedResponse(CqlResponseKind), #[error("Failed to send event info via channel. The channel is probably closed, which is caused by connection being broken")] SendError, } +/// An error type returned from Connection::parse_response. +/// This is driver's internal type. +#[derive(Error, Debug)] +pub enum ResponseParseError { + #[error(transparent)] + FrameError(#[from] FrameError), + #[error(transparent)] + CqlResponseParseError(#[from] CqlResponseParseError), +} + impl From for BrokenConnectionError { fn from(value: BrokenConnectionErrorKind) -> Self { BrokenConnectionError(Arc::new(value)) } } +// FIXME: remove later +impl From for QueryError { + fn from(value: ResponseParseError) -> Self { + match value { + ResponseParseError::FrameError(e) => e.into(), + ResponseParseError::CqlResponseParseError(e) => e.into(), + } + } +} + impl std::fmt::Display for WriteType { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{:?}", self) diff --git a/scylla/src/transport/connection.rs b/scylla/src/transport/connection.rs index fce796e61f..24594eb539 100644 --- a/scylla/src/transport/connection.rs +++ b/scylla/src/transport/connection.rs @@ -1,7 +1,8 @@ use bytes::Bytes; use futures::{future::RemoteHandle, FutureExt}; use scylla_cql::errors::{ - BrokenConnectionError, BrokenConnectionErrorKind, CqlEventHandlingError, TranslationError, + BrokenConnectionError, BrokenConnectionErrorKind, CqlEventHandlingError, ResponseParseError, + TranslationError, }; use scylla_cql::frame::request::options::{self, Options}; use scylla_cql::frame::response::result::{ResultMetadata, TableSpec}; @@ -1281,12 +1282,14 @@ impl Connection { .send_request(request, compression, tracing) .await?; - Self::parse_response( + let response = Self::parse_response( task_response, self.config.compression, &self.features.protocol_features, cached_metadata, - ) + )?; + + Ok(response) } fn parse_response( @@ -1294,7 +1297,7 @@ impl Connection { compression: Option, features: &ProtocolFeatures, cached_metadata: Option<&Arc>, - ) -> Result { + ) -> Result { let body_with_ext = frame::parse_response_body_extensions( task_response.params.flags, compression, From 8347bad281c8536e1439dc7e35544465f2abefc8 Mon Sep 17 00:00:00 2001 From: muzarski Date: Thu, 22 Aug 2024 17:19:03 +0200 Subject: [PATCH 08/33] conn: transform errors in handle_event This commit improves error handling in handle_event. --- scylla-cql/src/errors.rs | 14 ++++++++----- scylla/src/transport/connection.rs | 32 +++++++++++++++++++++--------- 2 files changed, 32 insertions(+), 14 deletions(-) diff --git a/scylla-cql/src/errors.rs b/scylla-cql/src/errors.rs index 89c7fc444a..91e8d2d5df 100644 --- a/scylla-cql/src/errors.rs +++ b/scylla-cql/src/errors.rs @@ -1,6 +1,8 @@ //! This module contains various errors which can be returned by `scylla::Session` -use crate::frame::frame_errors::{CqlResponseParseError, FrameError, ParseError}; +use crate::frame::frame_errors::{ + CqlEventParseError, CqlResponseParseError, FrameError, ParseError, +}; use crate::frame::protocol_features::ProtocolFeatures; use crate::frame::value::SerializeValuesError; use crate::types::deserialize::{DeserializationError, TypeCheckError}; @@ -508,17 +510,19 @@ pub enum BrokenConnectionErrorKind { /// Failed to handle a CQL event received on a stream -1. /// Possible error kinds are: -/// - failed to deserialize server response +/// - failed to deserialize response's frame header +/// - failed to deserialize CQL event response /// - received invalid server response /// - failed to send an event info via channel (connection is probably broken) #[derive(Error, Debug)] #[non_exhaustive] pub enum CqlEventHandlingError { - // FIXME: ResponseParseError -> CqlEventParseError - #[error(transparent)] - QueryError(#[from] ResponseParseError), + #[error("Failed to deserialize EVENT response: {0}")] + CqlEventParseError(#[from] CqlEventParseError), #[error("Received unexpected server response on stream -1: {0}. Expected EVENT response")] UnexpectedResponse(CqlResponseKind), + #[error("Failed to deserialize a header of frame received on stream -1: {0}")] + FrameError(#[from] FrameError), #[error("Failed to send event info via channel. The channel is probably closed, which is caused by connection being broken")] SendError, } diff --git a/scylla/src/transport/connection.rs b/scylla/src/transport/connection.rs index 24594eb539..8432629c0c 100644 --- a/scylla/src/transport/connection.rs +++ b/scylla/src/transport/connection.rs @@ -4,6 +4,7 @@ use scylla_cql::errors::{ BrokenConnectionError, BrokenConnectionErrorKind, CqlEventHandlingError, ResponseParseError, TranslationError, }; +use scylla_cql::frame::frame_errors::CqlResponseParseError; use scylla_cql::frame::request::options::{self, Options}; use scylla_cql::frame::response::result::{ResultMetadata, TableSpec}; use scylla_cql::frame::response::Error; @@ -1678,15 +1679,28 @@ impl Connection { // future implementers. let features = ProtocolFeatures::default(); // TODO: Use the right features - let response = Self::parse_response(task_response, compression, &features, None)?.response; - let event = match response { - Response::Event(e) => e, - _ => { - error!("Expected to receive Event response, got {:?}", response); - return Err(CqlEventHandlingError::UnexpectedResponse( - response.to_response_kind(), - )); - } + let event = match Self::parse_response(task_response, compression, &features, None) { + Ok(r) => match r.response { + Response::Event(event) => event, + _ => { + error!("Expected to receive Event response, got {:?}", r.response); + return Err(CqlEventHandlingError::UnexpectedResponse( + r.response.to_response_kind(), + )); + } + }, + Err(e) => match e { + ResponseParseError::FrameError(e) => return Err(e.into()), + ResponseParseError::CqlResponseParseError(e) => match e { + CqlResponseParseError::CqlEventParseError(e) => return Err(e.into()), + // Received a response other than EVENT, but failed to deserialize it. + _ => { + return Err(CqlEventHandlingError::UnexpectedResponse( + e.to_response_kind(), + )) + } + }, + }, }; event_sender From c89db3c753bb44b0989d9e2edf2c8e97079bdbf7 Mon Sep 17 00:00:00 2001 From: muzarski Date: Wed, 21 Aug 2024 12:39:38 +0200 Subject: [PATCH 09/33] errors: RequestError Introduced a RequestError, which was previously mentioned when introducing a BrokenConnectionError. This error type will be returned by Connection::send_request and RouterHandle::send_request functions. Again, this is an error type introduced to narrow the return error type from QueryError. --- scylla-cql/src/errors.rs | 38 +++++++++++++++++++++++++++--- scylla/src/transport/connection.rs | 34 +++++++++++++++----------- 2 files changed, 55 insertions(+), 17 deletions(-) diff --git a/scylla-cql/src/errors.rs b/scylla-cql/src/errors.rs index 91e8d2d5df..e0d1bcd4a5 100644 --- a/scylla-cql/src/errors.rs +++ b/scylla-cql/src/errors.rs @@ -489,7 +489,7 @@ pub enum BrokenConnectionErrorKind { #[error("Timed out while waiting for response to keepalive request on connection to node {0}")] KeepaliveTimeout(IpAddr), #[error("Failed to execute keepalive query: {0}")] - KeepaliveQueryError(QueryError), + KeepaliveQueryError(RequestError), #[error("Failed to deserialize frame: {0}")] FrameError(FrameError), #[error("Failed to handle server event: {0}")] @@ -537,14 +537,35 @@ pub enum ResponseParseError { CqlResponseParseError(#[from] CqlResponseParseError), } +/// An error that occurred when performing a request. +/// +/// Possible error kinds: +/// - Connection is broken +/// - Response's frame header deserialization error +/// - CQL response (frame body) deserialization error +/// - Driver was unable to allocate a stream id for a request +/// +/// This error type is only destined to narrow the return error type +/// of some functions that would previously return [`crate::errors::QueryError`]. +#[derive(Error, Debug)] +pub enum RequestError { + #[error(transparent)] + FrameError(#[from] FrameError), + #[error(transparent)] + CqlResponseParseError(#[from] CqlResponseParseError), + #[error(transparent)] + BrokenConnection(#[from] BrokenConnectionError), + #[error("Unable to allocate a stream id")] + UnableToAllocStreamId, +} + impl From for BrokenConnectionError { fn from(value: BrokenConnectionErrorKind) -> Self { BrokenConnectionError(Arc::new(value)) } } -// FIXME: remove later -impl From for QueryError { +impl From for RequestError { fn from(value: ResponseParseError) -> Self { match value { ResponseParseError::FrameError(e) => e.into(), @@ -607,6 +628,17 @@ impl From for QueryError { } } +impl From for QueryError { + fn from(value: RequestError) -> Self { + match value { + RequestError::FrameError(e) => e.into(), + RequestError::CqlResponseParseError(e) => e.into(), + RequestError::BrokenConnection(e) => e.into(), + RequestError::UnableToAllocStreamId => QueryError::UnableToAllocStreamId, + } + } +} + impl From for NewSessionError { fn from(io_error: std::io::Error) -> NewSessionError { NewSessionError::IoError(Arc::new(io_error)) diff --git a/scylla/src/transport/connection.rs b/scylla/src/transport/connection.rs index 8432629c0c..37c908200b 100644 --- a/scylla/src/transport/connection.rs +++ b/scylla/src/transport/connection.rs @@ -1,8 +1,8 @@ use bytes::Bytes; use futures::{future::RemoteHandle, FutureExt}; use scylla_cql::errors::{ - BrokenConnectionError, BrokenConnectionErrorKind, CqlEventHandlingError, ResponseParseError, - TranslationError, + BrokenConnectionError, BrokenConnectionErrorKind, CqlEventHandlingError, RequestError, + ResponseParseError, TranslationError, }; use scylla_cql::frame::frame_errors::CqlResponseParseError; use scylla_cql::frame::request::options::{self, Options}; @@ -115,7 +115,7 @@ impl RouterHandle { request: &impl SerializableRequest, compression: Option, tracing: bool, - ) -> Result { + ) -> Result { let serialized_request = SerializedRequest::make(request, compression, tracing)?; let request_id = self.allocate_request_id(); @@ -162,7 +162,7 @@ pub(crate) struct ConnectionFeatures { type RequestId = u64; struct ResponseHandler { - response_sender: oneshot::Sender>, + response_sender: oneshot::Sender>, request_id: RequestId, } @@ -815,8 +815,11 @@ impl Connection { &self, response: Option>, ) -> Result { - self.send_request(&request::AuthResponse { response }, false, false, None) - .await + let response = self + .send_request(&request::AuthResponse { response }, false, false, None) + .await?; + + Ok(response) } #[allow(dead_code)] @@ -916,8 +919,11 @@ impl Connection { }, }; - self.send_request(&query_frame, true, query.config.tracing, None) - .await + let response = self + .send_request(&query_frame, true, query.config.tracing, None) + .await?; + + Ok(response) } #[allow(dead_code)] @@ -1271,7 +1277,7 @@ impl Connection { compress: bool, tracing: bool, cached_metadata: Option<&Arc>, - ) -> Result { + ) -> Result { let compression = if compress { self.config.compression } else { @@ -1418,9 +1424,9 @@ impl Connection { let result = futures::try_join!(r, w, o, k); - let error: QueryError = match result { + let error: BrokenConnectionError = match result { Ok(_) => return, // Connection was dropped, we can return - Err(err) => err.into(), + Err(err) => err, }; // Respond to all pending requests with the error @@ -1429,11 +1435,11 @@ impl Connection { for (_, handler) in response_handlers { // Ignore sending error, request was dropped - let _ = handler.response_sender.send(Err(error.clone())); + let _ = handler.response_sender.send(Err(error.clone().into())); } // If someone is listening for connection errors notify them - let _ = error_sender.send(error); + let _ = error_sender.send(error.into()); } async fn reader( @@ -1514,7 +1520,7 @@ impl Connection { error!("Could not allocate stream id"); let _ = response_handler .response_sender - .send(Err(QueryError::UnableToAllocStreamId)); + .send(Err(RequestError::UnableToAllocStreamId)); None } } From 8381692ed03b04da3b4cf6d13a5dba10a423df27 Mon Sep 17 00:00:00 2001 From: muzarski Date: Wed, 21 Aug 2024 13:50:49 +0200 Subject: [PATCH 10/33] connection: introduce ConnectionError Yet another error type introduced. This time, as name suggests, we introduce an error type corresponding to errors that happened around a specific connection. Notice, that this error type will not be accessed by the user. It will be handled by PoolRefiller. Each error will obviously be logged, but this logic is already implemented in PoolRefiller. This error type is introduced to narrow the type of errors handled by PoolRefiller (which happen on connection-level). We also implement a `is_address_available_for_use` method which is used during source port selection for a given shard. In a following commit, this method will be removed from `QueryError`. --- scylla-cql/src/errors.rs | 37 +++++++++++++++++++++++++ scylla/src/transport/connection.rs | 30 +++++++++++--------- scylla/src/transport/connection_pool.rs | 18 ++++++------ 3 files changed, 62 insertions(+), 23 deletions(-) diff --git a/scylla-cql/src/errors.rs b/scylla-cql/src/errors.rs index e0d1bcd4a5..f5a0b9a9c2 100644 --- a/scylla-cql/src/errors.rs +++ b/scylla-cql/src/errors.rs @@ -479,6 +479,43 @@ pub enum BadKeyspaceName { IllegalCharacter(String, char), } +// FIXME: this should be moved to scylla crate. +/// An error that appeared on a connection level. +/// It indicated that connection can no longer be used +/// and should be dropped. +#[derive(Error, Debug)] +#[non_exhaustive] +pub enum ConnectionError { + #[error("Connect timeout elapsed")] + ConnectTimeout, + #[error(transparent)] + IoError(#[from] std::io::Error), + #[error("Could not find free source port for shard {0}")] + NoSourcePortForShard(u32), + #[error("Address translation failed: {0}")] + TranslationError(#[from] TranslationError), + // TODO: remove it or change it later. + #[error(transparent)] + QueryError(#[from] QueryError), +} + +impl ConnectionError { + /// Checks if this error indicates that a chosen source port/address cannot be bound. + /// This is caused by one of the following: + /// - The source address is already used by another socket, + /// - The source address is reserved and the process does not have sufficient privileges to use it. + pub fn is_address_unavailable_for_use(&self) -> bool { + if let ConnectionError::IoError(io_error) = self { + match io_error.kind() { + ErrorKind::AddrInUse | ErrorKind::PermissionDenied => return true, + _ => {} + } + } + + false + } +} + #[derive(Error, Debug, Clone)] #[error("Connection broken, reason: {0}")] pub struct BrokenConnectionError(Arc); diff --git a/scylla/src/transport/connection.rs b/scylla/src/transport/connection.rs index 37c908200b..61f1a0e5ae 100644 --- a/scylla/src/transport/connection.rs +++ b/scylla/src/transport/connection.rs @@ -1,8 +1,8 @@ use bytes::Bytes; use futures::{future::RemoteHandle, FutureExt}; use scylla_cql::errors::{ - BrokenConnectionError, BrokenConnectionErrorKind, CqlEventHandlingError, RequestError, - ResponseParseError, TranslationError, + BrokenConnectionError, BrokenConnectionErrorKind, ConnectionError, CqlEventHandlingError, + RequestError, ResponseParseError, TranslationError, }; use scylla_cql::frame::frame_errors::CqlResponseParseError; use scylla_cql::frame::request::options::{self, Options}; @@ -632,7 +632,7 @@ impl Connection { addr: SocketAddr, source_port: Option, config: ConnectionConfig, - ) -> Result<(Self, ErrorReceiver), QueryError> { + ) -> Result<(Self, ErrorReceiver), ConnectionError> { let stream_connector = match source_port { Some(p) => { tokio::time::timeout(config.connect_timeout, connect_with_source_port(addr, p)) @@ -641,9 +641,9 @@ impl Connection { None => tokio::time::timeout(config.connect_timeout, TcpStream::connect(addr)).await, }; let stream = match stream_connector { - Ok(stream) => stream?, + Ok(stream) => stream.map_err(ConnectionError::IoError)?, Err(_) => { - return Err(QueryError::TimeoutError); + return Err(ConnectionError::ConnectTimeout); } }; stream.set_nodelay(config.tcp_nodelay)?; @@ -673,7 +673,8 @@ impl Connection { router_handle.clone(), addr.ip(), ) - .await?; + .await + .map_err(ConnectionError::IoError)?; let connection = Connection { _worker_handle, @@ -1806,7 +1807,7 @@ pub(crate) async fn open_connection( endpoint: UntranslatedEndpoint, source_port: Option, config: &ConnectionConfig, -) -> Result<(Connection, ErrorReceiver), QueryError> { +) -> Result<(Connection, ErrorReceiver), ConnectionError> { /* Translate the address, if applicable. */ let addr = maybe_translated_addr(endpoint, config.address_translator.as_deref()).await?; @@ -1826,11 +1827,14 @@ pub(crate) async fn open_connection( let mut supported = match options_result { Response::Supported(supported) => supported, - Response::Error(Error { error, reason }) => return Err(QueryError::DbError(error, reason)), + Response::Error(Error { error, reason }) => { + return Err(QueryError::DbError(error, reason).into()) + } _ => { return Err(QueryError::ProtocolError( "Wrong response to OPTIONS message was received", - )); + ) + .into()); } }; @@ -1899,11 +1903,11 @@ pub(crate) async fn open_connection( Response::Authenticate(authenticate) => { perform_authenticate(&mut connection, &authenticate).await?; } - Response::Error(Error { error, reason }) => return Err(QueryError::DbError(error, reason)), + Response::Error(Error { error, reason }) => { + return Err(QueryError::DbError(error, reason).into()) + } _ => { - return Err(QueryError::ProtocolError( - "Unexpected response to STARTUP message", - )) + return Err(QueryError::ProtocolError("Unexpected response to STARTUP message").into()) } } diff --git a/scylla/src/transport/connection_pool.rs b/scylla/src/transport/connection_pool.rs index 2cde4cd462..e7b0b5158b 100644 --- a/scylla/src/transport/connection_pool.rs +++ b/scylla/src/transport/connection_pool.rs @@ -19,6 +19,7 @@ use super::NodeAddr; use arc_swap::ArcSwap; use futures::{future::RemoteHandle, stream::FuturesUnordered, Future, FutureExt, StreamExt}; use rand::Rng; +use scylla_cql::errors::ConnectionError; use std::convert::TryInto; use std::io::ErrorKind; use std::num::NonZeroUsize; @@ -79,7 +80,7 @@ enum MaybePoolConnections { // The pool is empty because either initial filling failed or all connections // became broken; will be asynchronously refilled. Contains an error // from the last connection attempt. - Broken(QueryError), + Broken(ConnectionError), // The pool has some connections which are usable (or will be removed soon) Ready(PoolConnections), @@ -573,7 +574,7 @@ impl PoolRefiller { evt = self.connection_errors.select_next_some(), if !self.connection_errors.is_empty() => { if let Some(conn) = evt.connection.upgrade() { debug!("[{}] Got error for connection {:p}: {:?}", self.endpoint_description(), Arc::as_ptr(&conn), evt.error); - self.remove_connection(conn, evt.error); + self.remove_connection(conn, evt.error.into()); } } @@ -991,7 +992,7 @@ impl PoolRefiller { // Updates `shared_conns` based on `conns`. // `last_error` must not be `None` if there is a possibility of the pool // being empty. - fn update_shared_conns(&mut self, last_error: Option) { + fn update_shared_conns(&mut self, last_error: Option) { let new_conns = if !self.has_connections() { Arc::new(MaybePoolConnections::Broken(last_error.unwrap())) } else { @@ -1017,7 +1018,7 @@ impl PoolRefiller { // Removes given connection from the pool. It looks both into active // connections and excess connections. - fn remove_connection(&mut self, connection: Arc, last_error: QueryError) { + fn remove_connection(&mut self, connection: Arc, last_error: ConnectionError) { let ptr = Arc::as_ptr(&connection); let maybe_remove_in_vec = |v: &mut Vec>| -> bool { @@ -1225,7 +1226,7 @@ async fn wait_for_error( } struct OpenedConnectionEvent { - result: Result<(Connection, ErrorReceiver), QueryError>, + result: Result<(Connection, ErrorReceiver), ConnectionError>, requested_shard: Option, keyspace_name: Option, } @@ -1235,7 +1236,7 @@ async fn open_connection_to_shard_aware_port( shard: Shard, sharder: Sharder, connection_config: &ConnectionConfig, -) -> Result<(Connection, ErrorReceiver), QueryError> { +) -> Result<(Connection, ErrorReceiver), ConnectionError> { // Create iterator over all possible source ports for this shard let source_port_iter = sharder.iter_source_ports_for_shard(shard); @@ -1250,10 +1251,7 @@ async fn open_connection_to_shard_aware_port( } // Tried all source ports for that shard, give up - Err(QueryError::IoError(Arc::new(std::io::Error::new( - std::io::ErrorKind::AddrInUse, - "Could not find free source port for shard", - )))) + Err(ConnectionError::NoSourcePortForShard(shard)) } #[cfg(test)] From 264fe7856fc5558a0403b1e075a9b75bb915ca13 Mon Sep 17 00:00:00 2001 From: muzarski Date: Wed, 21 Aug 2024 14:04:43 +0200 Subject: [PATCH 11/33] connection: include BrokenConnectionError in ConnectionError --- scylla-cql/src/errors.rs | 8 ++++++++ scylla/src/transport/connection.rs | 17 ++++++++++++----- scylla/src/transport/connection_pool.rs | 11 ++++------- 3 files changed, 24 insertions(+), 12 deletions(-) diff --git a/scylla-cql/src/errors.rs b/scylla-cql/src/errors.rs index f5a0b9a9c2..ad75dd2ab9 100644 --- a/scylla-cql/src/errors.rs +++ b/scylla-cql/src/errors.rs @@ -494,6 +494,8 @@ pub enum ConnectionError { NoSourcePortForShard(u32), #[error("Address translation failed: {0}")] TranslationError(#[from] TranslationError), + #[error(transparent)] + BrokenConnection(#[from] BrokenConnectionError), // TODO: remove it or change it later. #[error(transparent)] QueryError(#[from] QueryError), @@ -520,6 +522,12 @@ impl ConnectionError { #[error("Connection broken, reason: {0}")] pub struct BrokenConnectionError(Arc); +impl BrokenConnectionError { + pub fn get_inner(&self) -> &Arc { + &self.0 + } +} + #[derive(Error, Debug)] #[non_exhaustive] pub enum BrokenConnectionErrorKind { diff --git a/scylla/src/transport/connection.rs b/scylla/src/transport/connection.rs index 61f1a0e5ae..1b6a6472ce 100644 --- a/scylla/src/transport/connection.rs +++ b/scylla/src/transport/connection.rs @@ -622,7 +622,7 @@ impl ConnectionConfig { } // Used to listen for fatal error in connection -pub(crate) type ErrorReceiver = tokio::sync::oneshot::Receiver; +pub(crate) type ErrorReceiver = tokio::sync::oneshot::Receiver; impl Connection { // Returns new connection and ErrorReceiver which can be used to wait for a fatal error @@ -1338,7 +1338,7 @@ impl Connection { config: ConnectionConfig, stream: TcpStream, receiver: mpsc::Receiver, - error_sender: tokio::sync::oneshot::Sender, + error_sender: tokio::sync::oneshot::Sender, orphan_notification_receiver: mpsc::UnboundedReceiver, router_handle: Arc, node_address: IpAddr, @@ -1381,7 +1381,7 @@ impl Connection { config: ConnectionConfig, stream: (impl AsyncRead + AsyncWrite), receiver: mpsc::Receiver, - error_sender: tokio::sync::oneshot::Sender, + error_sender: tokio::sync::oneshot::Sender, orphan_notification_receiver: mpsc::UnboundedReceiver, router_handle: Arc, node_address: IpAddr, @@ -2221,7 +2221,6 @@ impl VerifiedKeyspaceName { #[cfg(test)] mod tests { use assert_matches::assert_matches; - use scylla_cql::errors::QueryError; use scylla_cql::frame::protocol_features::{ LWT_OPTIMIZATION_META_BIT_MASK_KEY, SCYLLA_LWT_ADD_METADATA_MARK_EXTENSION, }; @@ -2570,6 +2569,8 @@ mod tests { #[ntest::timeout(20000)] #[cfg(not(scylla_cloud_tests))] async fn connection_is_closed_on_no_response_to_keepalives() { + use scylla_cql::errors::BrokenConnectionErrorKind; + setup_tracing(); let proxy_addr = SocketAddr::new(scylla_proxy::get_exclusive_local_address(), 9042); @@ -2631,7 +2632,13 @@ mod tests { // Wait until keepaliver gots impatient and terminates router. // Then, the error from keepaliver will be propagated to the error receiver. let err = error_receiver.await.unwrap(); - assert_matches!(err, QueryError::IoError(_)); + let err_inner: &BrokenConnectionErrorKind = match err { + crate::transport::connection::ConnectionError::BrokenConnection(ref e) => { + e.get_inner().downcast_ref().unwrap() + } + _ => panic!("Bad error type. Expected keepalive timeout."), + }; + assert_matches!(err_inner, BrokenConnectionErrorKind::KeepaliveTimeout(_)); // As the router is invalidated, all further queries should immediately // return error. diff --git a/scylla/src/transport/connection_pool.rs b/scylla/src/transport/connection_pool.rs index e7b0b5158b..a539941dfd 100644 --- a/scylla/src/transport/connection_pool.rs +++ b/scylla/src/transport/connection_pool.rs @@ -19,7 +19,7 @@ use super::NodeAddr; use arc_swap::ArcSwap; use futures::{future::RemoteHandle, stream::FuturesUnordered, Future, FutureExt, StreamExt}; use rand::Rng; -use scylla_cql::errors::ConnectionError; +use scylla_cql::errors::{BrokenConnectionErrorKind, ConnectionError}; use std::convert::TryInto; use std::io::ErrorKind; use std::num::NonZeroUsize; @@ -574,7 +574,7 @@ impl PoolRefiller { evt = self.connection_errors.select_next_some(), if !self.connection_errors.is_empty() => { if let Some(conn) = evt.connection.upgrade() { debug!("[{}] Got error for connection {:p}: {:?}", self.endpoint_description(), Arc::as_ptr(&conn), evt.error); - self.remove_connection(conn, evt.error.into()); + self.remove_connection(conn, evt.error); } } @@ -1207,7 +1207,7 @@ impl PoolRefiller { struct BrokenConnectionEvent { connection: Weak, - error: QueryError, + error: ConnectionError, } async fn wait_for_error( @@ -1217,10 +1217,7 @@ async fn wait_for_error( BrokenConnectionEvent { connection, error: error_receiver.await.unwrap_or_else(|_| { - QueryError::IoError(Arc::new(std::io::Error::new( - ErrorKind::Other, - "Connection broken", - ))) + ConnectionError::BrokenConnection(BrokenConnectionErrorKind::ChannelError.into()) }), } } From 939d6f2fd12fc51ce8e1b7fc688a73578ebfbf2e Mon Sep 17 00:00:00 2001 From: muzarski Date: Wed, 21 Aug 2024 14:07:38 +0200 Subject: [PATCH 12/33] errors: remove TranslationError from Query/NewSessionError It appears, that it was not possible for the user to receive a TranslationError variant at all. Notice that in previous commit, we moved TranslationError to ConnectionError, which is not included in neither Query nor NewSessionError. --- scylla-cql/src/errors.rs | 9 --------- scylla/src/transport/load_balancing/default.rs | 1 - 2 files changed, 10 deletions(-) diff --git a/scylla-cql/src/errors.rs b/scylla-cql/src/errors.rs index ad75dd2ab9..0a4a211c14 100644 --- a/scylla-cql/src/errors.rs +++ b/scylla-cql/src/errors.rs @@ -58,10 +58,6 @@ pub enum QueryError { /// Client timeout occurred before any response arrived #[error("Request timeout: {0}")] RequestTimeout(String), - - /// Address translation failed - #[error("Address translation failed: {0}")] - TranslationError(#[from] TranslationError), } /// An error sent from the database in response to a query @@ -457,10 +453,6 @@ pub enum NewSessionError { /// during `Session` creation. #[error("Client timeout: {0}")] RequestTimeout(String), - - /// Address translation failed - #[error("Address translation failed: {0}")] - TranslationError(#[from] TranslationError), } /// Invalid keyspace name given to `Session::use_keyspace()` @@ -706,7 +698,6 @@ impl From for NewSessionError { QueryError::BrokenConnection(e) => NewSessionError::BrokenConnection(e), QueryError::UnableToAllocStreamId => NewSessionError::UnableToAllocStreamId, QueryError::RequestTimeout(msg) => NewSessionError::RequestTimeout(msg), - QueryError::TranslationError(e) => NewSessionError::TranslationError(e), } } } diff --git a/scylla/src/transport/load_balancing/default.rs b/scylla/src/transport/load_balancing/default.rs index 5115a747e4..07e4b58033 100644 --- a/scylla/src/transport/load_balancing/default.rs +++ b/scylla/src/transport/load_balancing/default.rs @@ -2843,7 +2843,6 @@ mod latency_awareness { | QueryError::DbError(DbError::IsBootstrapping, _) | QueryError::DbError(DbError::Unavailable { .. }, _) | QueryError::DbError(DbError::Unprepared { .. }, _) - | QueryError::TranslationError(_) | QueryError::DbError(DbError::Overloaded { .. }, _) | QueryError::DbError(DbError::RateLimitReached { .. }, _) => false, From 3396efa2a89d27507691f5781435ceb607324b9e Mon Sep 17 00:00:00 2001 From: muzarski Date: Wed, 21 Aug 2024 14:12:39 +0200 Subject: [PATCH 13/33] errors: remove QueryError::is_address_available_for_use This was moved to `ConnectionError`, and is handled by connection pool. --- scylla-cql/src/errors.rs | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/scylla-cql/src/errors.rs b/scylla-cql/src/errors.rs index 0a4a211c14..f8b0fc7613 100644 --- a/scylla-cql/src/errors.rs +++ b/scylla-cql/src/errors.rs @@ -708,23 +708,6 @@ impl From for QueryError { } } -impl QueryError { - /// Checks if this error indicates that a chosen source port/address cannot be bound. - /// This is caused by one of the following: - /// - The source address is already used by another socket, - /// - The source address is reserved and the process does not have sufficient privileges to use it. - pub fn is_address_unavailable_for_use(&self) -> bool { - if let QueryError::IoError(io_error) = self { - match io_error.kind() { - ErrorKind::AddrInUse | ErrorKind::PermissionDenied => return true, - _ => {} - } - } - - false - } -} - impl From for OperationType { fn from(operation_type: u8) -> OperationType { match operation_type { From 076330ab4591cfb5b40c098b25c9631b0e093439 Mon Sep 17 00:00:00 2001 From: muzarski Date: Wed, 21 Aug 2024 14:46:53 +0200 Subject: [PATCH 14/33] conn: move ssl check below options extraction Code is more readable this way IMO. --- scylla/src/transport/connection.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/scylla/src/transport/connection.rs b/scylla/src/transport/connection.rs index 1b6a6472ce..d854ae5bc1 100644 --- a/scylla/src/transport/connection.rs +++ b/scylla/src/transport/connection.rs @@ -1820,11 +1820,6 @@ pub(crate) async fn open_connection( // Get OPTIONS SUPPORTED by the cluster. let options_result = connection.get_options().await?; - let shard_aware_port_key = match config.is_ssl() { - true => options::SCYLLA_SHARD_AWARE_PORT_SSL, - false => options::SCYLLA_SHARD_AWARE_PORT, - }; - let mut supported = match options_result { Response::Supported(supported) => supported, Response::Error(Error { error, reason }) => { @@ -1838,6 +1833,11 @@ pub(crate) async fn open_connection( } }; + let shard_aware_port_key = match config.is_ssl() { + true => options::SCYLLA_SHARD_AWARE_PORT_SSL, + false => options::SCYLLA_SHARD_AWARE_PORT, + }; + // If this is ScyllaDB that we connected to, we received sharding information. let shard_info = ShardInfo::try_from(&supported.options).ok(); let supported_compression = supported From bd81495b5c8e18d9574220ef7098acd942028f56 Mon Sep 17 00:00:00 2001 From: muzarski Date: Tue, 17 Sep 2024 17:37:57 +0200 Subject: [PATCH 15/33] errors: CqlRequestErrorKind Add an enum representing possible CQL requests. This enum is useful for creating strongly typed error types (e.g. ConnectionSetupRequestError introduced in a following commit). --- scylla-cql/src/errors.rs | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/scylla-cql/src/errors.rs b/scylla-cql/src/errors.rs index f8b0fc7613..40508cc18a 100644 --- a/scylla-cql/src/errors.rs +++ b/scylla-cql/src/errors.rs @@ -341,6 +341,37 @@ pub enum WriteType { Other(String), } +/// Possible requests sent by the client. +#[derive(Debug, Copy, Clone)] +#[non_exhaustive] +pub enum CqlRequestKind { + Startup, + AuthResponse, + Options, + Query, + Prepare, + Execute, + Batch, + Register, +} + +impl std::fmt::Display for CqlRequestKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let kind_str = match self { + CqlRequestKind::Startup => "STARTUP", + CqlRequestKind::AuthResponse => "AUTH_RESPONSE", + CqlRequestKind::Options => "OPTIONS", + CqlRequestKind::Query => "QUERY", + CqlRequestKind::Prepare => "PREPARE", + CqlRequestKind::Execute => "EXECUTE", + CqlRequestKind::Batch => "BATCH", + CqlRequestKind::Register => "REGISTER", + }; + + f.write_str(kind_str) + } +} + /// Error caused by caller creating an invalid query #[derive(Error, Debug, Clone)] #[error("Invalid query passed to Session")] From 3529b6e384878cead4def613780b46d6ca47cf03 Mon Sep 17 00:00:00 2001 From: muzarski Date: Wed, 21 Aug 2024 15:48:17 +0200 Subject: [PATCH 16/33] conn: introduce ConnectionSetupRequestError Introduced new error type designed to be used for the requests performed during connection setup - e.g. OPTIONS or STARTUP. In this commit, we also adjust Connection::get_options method. It will now extract the supported options, and return appropriate error in case of failure. --- scylla-cql/src/errors.rs | 42 ++++++++++++++++++- scylla/src/transport/connection.rs | 67 +++++++++++++++++++++--------- 2 files changed, 88 insertions(+), 21 deletions(-) diff --git a/scylla-cql/src/errors.rs b/scylla-cql/src/errors.rs index 40508cc18a..65c1af015a 100644 --- a/scylla-cql/src/errors.rs +++ b/scylla-cql/src/errors.rs @@ -1,7 +1,8 @@ //! This module contains various errors which can be returned by `scylla::Session` use crate::frame::frame_errors::{ - CqlEventParseError, CqlResponseParseError, FrameError, ParseError, + CqlErrorParseError, CqlEventParseError, CqlResponseParseError, CqlSupportedParseError, + FrameError, ParseError, }; use crate::frame::protocol_features::ProtocolFeatures; use crate::frame::value::SerializeValuesError; @@ -519,6 +520,8 @@ pub enum ConnectionError { TranslationError(#[from] TranslationError), #[error(transparent)] BrokenConnection(#[from] BrokenConnectionError), + #[error(transparent)] + ConnectionSetupRequestError(#[from] ConnectionSetupRequestError), // TODO: remove it or change it later. #[error(transparent)] QueryError(#[from] QueryError), @@ -541,6 +544,43 @@ impl ConnectionError { } } +/// An error that occurred during connection setup request execution. +/// It indicates that request needed to initiate a connection failed. +#[derive(Error, Debug)] +#[error("Failed to perform a connection setup request. Request: {request_kind}, reason: {error}")] +pub struct ConnectionSetupRequestError { + request_kind: CqlRequestKind, + error: ConnectionSetupRequestErrorKind, +} + +#[derive(Error, Debug)] +#[non_exhaustive] +pub enum ConnectionSetupRequestErrorKind { + #[error(transparent)] + FrameError(#[from] FrameError), + #[error("Unable to allocate stream id")] + UnableToAllocStreamId, + #[error(transparent)] + BrokenConnection(#[from] BrokenConnectionError), + #[error("Database returned an error: {0}, Error message: {1}")] + DbError(DbError, String), + #[error("Received unexpected response from the server: {0}")] + UnexpectedResponse(CqlResponseKind), + #[error("Failed to deserialize SUPPORTED response: {0}")] + CqlSupportedParseError(#[from] CqlSupportedParseError), + #[error("Failed to deserialize ERROR response: {0}")] + CqlErrorParseError(#[from] CqlErrorParseError), +} + +impl ConnectionSetupRequestError { + pub fn new(request_kind: CqlRequestKind, error: ConnectionSetupRequestErrorKind) -> Self { + ConnectionSetupRequestError { + request_kind, + error, + } + } +} + #[derive(Error, Debug, Clone)] #[error("Connection broken, reason: {0}")] pub struct BrokenConnectionError(Arc); diff --git a/scylla/src/transport/connection.rs b/scylla/src/transport/connection.rs index d854ae5bc1..5acba43d0d 100644 --- a/scylla/src/transport/connection.rs +++ b/scylla/src/transport/connection.rs @@ -1,11 +1,13 @@ use bytes::Bytes; use futures::{future::RemoteHandle, FutureExt}; use scylla_cql::errors::{ - BrokenConnectionError, BrokenConnectionErrorKind, ConnectionError, CqlEventHandlingError, - RequestError, ResponseParseError, TranslationError, + BrokenConnectionError, BrokenConnectionErrorKind, ConnectionError, ConnectionSetupRequestError, + ConnectionSetupRequestErrorKind, CqlEventHandlingError, CqlRequestKind, RequestError, + ResponseParseError, TranslationError, }; use scylla_cql::frame::frame_errors::CqlResponseParseError; use scylla_cql::frame::request::options::{self, Options}; +use scylla_cql::frame::response; use scylla_cql::frame::response::result::{ResultMetadata, TableSpec}; use scylla_cql::frame::response::Error; use scylla_cql::frame::types::SerialConsistency; @@ -749,11 +751,49 @@ impl Connection { .response) } - pub(crate) async fn get_options(&self) -> Result { - Ok(self + pub(crate) async fn get_options( + &self, + ) -> Result { + let err = |kind: ConnectionSetupRequestErrorKind| { + ConnectionSetupRequestError::new(CqlRequestKind::Options, kind) + }; + + let req_result = self .send_request(&request::Options {}, false, false, None) - .await? - .response) + .await; + + // Extract the supported options and tidy up the errors. + let supported = match req_result { + Ok(r) => match r.response { + Response::Supported(supported) => supported, + Response::Error(Error { error, reason }) => { + return Err(err(ConnectionSetupRequestErrorKind::DbError(error, reason))) + } + _ => { + return Err(err(ConnectionSetupRequestErrorKind::UnexpectedResponse( + r.response.to_response_kind(), + ))) + } + }, + Err(e) => match e { + RequestError::FrameError(e) => return Err(err(e.into())), + RequestError::CqlResponseParseError(e) => match e { + CqlResponseParseError::CqlSupportedParseError(e) => return Err(err(e.into())), + CqlResponseParseError::CqlErrorParseError(e) => return Err(err(e.into())), + _ => { + return Err(err(ConnectionSetupRequestErrorKind::UnexpectedResponse( + e.to_response_kind(), + ))) + } + }, + RequestError::BrokenConnection(e) => return Err(err(e.into())), + RequestError::UnableToAllocStreamId => { + return Err(err(ConnectionSetupRequestErrorKind::UnableToAllocStreamId)) + } + }, + }; + + Ok(supported) } pub(crate) async fn prepare(&self, query: &Query) -> Result { @@ -1818,20 +1858,7 @@ pub(crate) async fn open_connection( /* Perform OPTIONS/SUPPORTED/STARTUP handshake. */ // Get OPTIONS SUPPORTED by the cluster. - let options_result = connection.get_options().await?; - - let mut supported = match options_result { - Response::Supported(supported) => supported, - Response::Error(Error { error, reason }) => { - return Err(QueryError::DbError(error, reason).into()) - } - _ => { - return Err(QueryError::ProtocolError( - "Wrong response to OPTIONS message was received", - ) - .into()); - } - }; + let mut supported = connection.get_options().await?; let shard_aware_port_key = match config.is_ssl() { true => options::SCYLLA_SHARD_AWARE_PORT_SSL, From bf46f018f6027ae452deba188f86975d53038637 Mon Sep 17 00:00:00 2001 From: muzarski Date: Wed, 21 Aug 2024 16:01:10 +0200 Subject: [PATCH 17/33] conn: adjust startup request logic Now error filtering happens in Connection::startup method. Introduced NonErrorStartupResponse to represent one of two possible non-error responses to STARTUP request. --- scylla-cql/src/errors.rs | 6 ++- scylla/src/transport/connection.rs | 69 ++++++++++++++++++++++++------ 2 files changed, 59 insertions(+), 16 deletions(-) diff --git a/scylla-cql/src/errors.rs b/scylla-cql/src/errors.rs index 65c1af015a..a60a82f0d8 100644 --- a/scylla-cql/src/errors.rs +++ b/scylla-cql/src/errors.rs @@ -1,8 +1,8 @@ //! This module contains various errors which can be returned by `scylla::Session` use crate::frame::frame_errors::{ - CqlErrorParseError, CqlEventParseError, CqlResponseParseError, CqlSupportedParseError, - FrameError, ParseError, + CqlAuthenticateParseError, CqlErrorParseError, CqlEventParseError, CqlResponseParseError, + CqlSupportedParseError, FrameError, ParseError, }; use crate::frame::protocol_features::ProtocolFeatures; use crate::frame::value::SerializeValuesError; @@ -568,6 +568,8 @@ pub enum ConnectionSetupRequestErrorKind { UnexpectedResponse(CqlResponseKind), #[error("Failed to deserialize SUPPORTED response: {0}")] CqlSupportedParseError(#[from] CqlSupportedParseError), + #[error("Failed to deserialize AUTHENTICATE response: {0}")] + CqlAuthenticateParseError(#[from] CqlAuthenticateParseError), #[error("Failed to deserialize ERROR response: {0}")] CqlErrorParseError(#[from] CqlErrorParseError), } diff --git a/scylla/src/transport/connection.rs b/scylla/src/transport/connection.rs index 5acba43d0d..f6afaa6071 100644 --- a/scylla/src/transport/connection.rs +++ b/scylla/src/transport/connection.rs @@ -306,6 +306,12 @@ impl NonErrorQueryResponse { Ok(result) } } + +pub(crate) enum NonErrorStartupResponse { + Ready, + Authenticate(response::authenticate::Authenticate), +} + #[cfg(feature = "ssl")] mod ssl_config { use openssl::{ @@ -744,11 +750,52 @@ impl Connection { pub(crate) async fn startup( &self, options: HashMap, Cow<'_, str>>, - ) -> Result { - Ok(self + ) -> Result { + let err = |kind: ConnectionSetupRequestErrorKind| { + ConnectionSetupRequestError::new(CqlRequestKind::Startup, kind) + }; + + let req_result = self .send_request(&request::Startup { options }, false, false, None) - .await? - .response) + .await; + + // Extract the response to STARTUP request and tidy up the errors. + let response = match req_result { + Ok(r) => match r.response { + Response::Ready => NonErrorStartupResponse::Ready, + Response::Authenticate(auth) => NonErrorStartupResponse::Authenticate(auth), + Response::Error(Error { error, reason }) => { + return Err(err(ConnectionSetupRequestErrorKind::DbError(error, reason))) + } + _ => { + return Err(err(ConnectionSetupRequestErrorKind::UnexpectedResponse( + r.response.to_response_kind(), + ))) + } + }, + Err(e) => match e { + RequestError::FrameError(e) => return Err(err(e.into())), + RequestError::CqlResponseParseError(e) => match e { + // Parsing of READY response cannot fail, since its body is empty. + // Remaining valid responses are AUTHENTICATE and ERROR. + CqlResponseParseError::CqlAuthenticateParseError(e) => { + return Err(err(e.into())) + } + CqlResponseParseError::CqlErrorParseError(e) => return Err(err(e.into())), + _ => { + return Err(err(ConnectionSetupRequestErrorKind::UnexpectedResponse( + e.to_response_kind(), + ))) + } + }, + RequestError::BrokenConnection(e) => return Err(err(e.into())), + RequestError::UnableToAllocStreamId => { + return Err(err(ConnectionSetupRequestErrorKind::UnableToAllocStreamId)) + } + }, + }; + + Ok(response) } pub(crate) async fn get_options( @@ -1924,18 +1971,12 @@ pub(crate) async fn open_connection( } /* Send the STARTUP frame with all the requested options. */ - let result = connection.startup(options).await?; - match result { - Response::Ready => {} - Response::Authenticate(authenticate) => { + let startup_result = connection.startup(options).await?; + match startup_result { + NonErrorStartupResponse::Ready => {} + NonErrorStartupResponse::Authenticate(authenticate) => { perform_authenticate(&mut connection, &authenticate).await?; } - Response::Error(Error { error, reason }) => { - return Err(QueryError::DbError(error, reason).into()) - } - _ => { - return Err(QueryError::ProtocolError("Unexpected response to STARTUP message").into()) - } } /* If this is a control connection, REGISTER to receive all event types. */ From fd694da0df1c18ebff33b6f720eaae12aeba2089 Mon Sep 17 00:00:00 2001 From: muzarski Date: Wed, 21 Aug 2024 16:08:09 +0200 Subject: [PATCH 18/33] conn: handle errors in register --- scylla/src/transport/connection.rs | 41 ++++++++++++++++++++++-------- 1 file changed, 30 insertions(+), 11 deletions(-) diff --git a/scylla/src/transport/connection.rs b/scylla/src/transport/connection.rs index f6afaa6071..47154350bb 100644 --- a/scylla/src/transport/connection.rs +++ b/scylla/src/transport/connection.rs @@ -1322,21 +1322,40 @@ impl Connection { async fn register( &self, event_types_to_register_for: Vec, - ) -> Result<(), QueryError> { + ) -> Result<(), ConnectionSetupRequestError> { + let err = |kind: ConnectionSetupRequestErrorKind| { + ConnectionSetupRequestError::new(CqlRequestKind::Register, kind) + }; + let register_frame = register::Register { event_types_to_register_for, }; - match self - .send_request(®ister_frame, true, false, None) - .await? - .response - { - Response::Ready => Ok(()), - Response::Error(err) => Err(err.into()), - _ => Err(QueryError::ProtocolError( - "Unexpected response to REGISTER message", - )), + // Extract the response and tidy up the errors. + match self.send_request(®ister_frame, true, false, None).await { + Ok(r) => match r.response { + Response::Ready => Ok(()), + Response::Error(Error { error, reason }) => { + Err(err(ConnectionSetupRequestErrorKind::DbError(error, reason))) + } + _ => Err(err(ConnectionSetupRequestErrorKind::UnexpectedResponse( + r.response.to_response_kind(), + ))), + }, + Err(e) => match e { + RequestError::FrameError(e) => Err(err(e.into())), + RequestError::CqlResponseParseError(e) => match e { + // Parsing the READY response cannot fail. Only remaining valid response is ERROR. + CqlResponseParseError::CqlErrorParseError(e) => Err(err(e.into())), + _ => Err(err(ConnectionSetupRequestErrorKind::UnexpectedResponse( + e.to_response_kind(), + ))), + }, + RequestError::BrokenConnection(e) => Err(err(e.into())), + RequestError::UnableToAllocStreamId => { + Err(err(ConnectionSetupRequestErrorKind::UnableToAllocStreamId)) + } + }, } } From 0ef7cc2601eb72391ee6eb4450d68d1c7d353134 Mon Sep 17 00:00:00 2001 From: muzarski Date: Wed, 21 Aug 2024 16:38:34 +0200 Subject: [PATCH 19/33] conn: handle errors during authentication --- scylla-cql/src/errors.rs | 19 +++++- scylla/src/transport/connection.rs | 104 +++++++++++++++++++++-------- 2 files changed, 92 insertions(+), 31 deletions(-) diff --git a/scylla-cql/src/errors.rs b/scylla-cql/src/errors.rs index a60a82f0d8..5bff0331d8 100644 --- a/scylla-cql/src/errors.rs +++ b/scylla-cql/src/errors.rs @@ -1,8 +1,9 @@ //! This module contains various errors which can be returned by `scylla::Session` use crate::frame::frame_errors::{ - CqlAuthenticateParseError, CqlErrorParseError, CqlEventParseError, CqlResponseParseError, - CqlSupportedParseError, FrameError, ParseError, + CqlAuthChallengeParseError, CqlAuthSuccessParseError, CqlAuthenticateParseError, + CqlErrorParseError, CqlEventParseError, CqlResponseParseError, CqlSupportedParseError, + FrameError, ParseError, }; use crate::frame::protocol_features::ProtocolFeatures; use crate::frame::value::SerializeValuesError; @@ -553,6 +554,8 @@ pub struct ConnectionSetupRequestError { error: ConnectionSetupRequestErrorKind, } +type AuthError = String; + #[derive(Error, Debug)] #[non_exhaustive] pub enum ConnectionSetupRequestErrorKind { @@ -570,8 +573,20 @@ pub enum ConnectionSetupRequestErrorKind { CqlSupportedParseError(#[from] CqlSupportedParseError), #[error("Failed to deserialize AUTHENTICATE response: {0}")] CqlAuthenticateParseError(#[from] CqlAuthenticateParseError), + #[error("Failed to deserialize AUTH_SUCCESS response: {0}")] + CqlAuthSuccessParseError(#[from] CqlAuthSuccessParseError), + #[error("Failed to deserialize AUTH_CHALLENGE response: {0}")] + CqlAuthChallengeParseError(#[from] CqlAuthChallengeParseError), #[error("Failed to deserialize ERROR response: {0}")] CqlErrorParseError(#[from] CqlErrorParseError), + #[error("Failed to start client's auth session: {0}")] + StartAuthSessionError(AuthError), + #[error("Failed to evaluate auth challenge on client side: {0}")] + AuthChallengeEvaluationError(AuthError), + #[error("Failed to finish auth challenge on client side: {0}")] + AuthFinishError(AuthError), + #[error("Authentication is required. You can use SessionBuilder::user(\"user\", \"pass\") to provide credentials or SessionBuilder::authenticator_provider to provide custom authenticator")] + MissingAuthentication, } impl ConnectionSetupRequestError { diff --git a/scylla/src/transport/connection.rs b/scylla/src/transport/connection.rs index 47154350bb..a7b730f5be 100644 --- a/scylla/src/transport/connection.rs +++ b/scylla/src/transport/connection.rs @@ -312,6 +312,11 @@ pub(crate) enum NonErrorStartupResponse { Authenticate(response::authenticate::Authenticate), } +pub(crate) enum NonErrorAuthResponse { + AuthChallenge(response::authenticate::AuthChallenge), + AuthSuccess(response::authenticate::AuthSuccess), +} + #[cfg(feature = "ssl")] mod ssl_config { use openssl::{ @@ -902,10 +907,55 @@ impl Connection { pub(crate) async fn authenticate_response( &self, response: Option>, - ) -> Result { - let response = self + ) -> Result { + let err = |kind: ConnectionSetupRequestErrorKind| { + ConnectionSetupRequestError::new(CqlRequestKind::AuthResponse, kind) + }; + + let req_result = self .send_request(&request::AuthResponse { response }, false, false, None) - .await?; + .await; + + // Extract non-error response to AUTH_RESPONSE request and tidy up errors. + let response = match req_result { + Ok(r) => match r.response { + Response::AuthSuccess(auth_success) => { + NonErrorAuthResponse::AuthSuccess(auth_success) + } + Response::AuthChallenge(auth_challenge) => { + NonErrorAuthResponse::AuthChallenge(auth_challenge) + } + Response::Error(Error { error, reason }) => { + return Err(err(ConnectionSetupRequestErrorKind::DbError(error, reason))) + } + _ => { + return Err(err(ConnectionSetupRequestErrorKind::UnexpectedResponse( + r.response.to_response_kind(), + ))) + } + }, + Err(e) => match e { + RequestError::FrameError(e) => return Err(err(e.into())), + RequestError::CqlResponseParseError(e) => match e { + CqlResponseParseError::CqlAuthSuccessParseError(e) => { + return Err(err(e.into())) + } + CqlResponseParseError::CqlAuthChallengeParseError(e) => { + return Err(err(e.into())) + } + CqlResponseParseError::CqlErrorParseError(e) => return Err(err(e.into())), + _ => { + return Err(err(ConnectionSetupRequestErrorKind::UnexpectedResponse( + e.to_response_kind(), + ))) + } + }, + RequestError::BrokenConnection(e) => return Err(err(e.into())), + RequestError::UnableToAllocStreamId => { + return Err(err(ConnectionSetupRequestErrorKind::UnableToAllocStreamId)) + } + }, + }; Ok(response) } @@ -2014,7 +2064,11 @@ pub(crate) async fn open_connection( async fn perform_authenticate( connection: &mut Connection, authenticate: &Authenticate, -) -> Result<(), QueryError> { +) -> Result<(), ConnectionSetupRequestError> { + let err = |kind: ConnectionSetupRequestErrorKind| { + ConnectionSetupRequestError::new(CqlRequestKind::AuthResponse, kind) + }; + let authenticator = &authenticate.authenticator_name as &str; match connection.config.authenticator { @@ -2022,43 +2076,35 @@ async fn perform_authenticate( let (mut response, mut auth_session) = authenticator_provider .start_authentication_session(authenticator) .await - .map_err(QueryError::InvalidMessage)?; + .map_err(|e| err(ConnectionSetupRequestErrorKind::StartAuthSessionError(e)))?; loop { - match connection - .authenticate_response(response) - .await?.response - { - Response::AuthChallenge(challenge) => { + match connection.authenticate_response(response).await? { + NonErrorAuthResponse::AuthChallenge(challenge) => { response = auth_session - .evaluate_challenge( - challenge.authenticate_message.as_deref(), - ) + .evaluate_challenge(challenge.authenticate_message.as_deref()) .await - .map_err(QueryError::InvalidMessage)?; + .map_err(|e| { + err( + ConnectionSetupRequestErrorKind::AuthChallengeEvaluationError( + e, + ), + ) + })?; } - Response::AuthSuccess(success) => { + NonErrorAuthResponse::AuthSuccess(success) => { auth_session .success(success.success_message.as_deref()) .await - .map_err(QueryError::InvalidMessage)?; + .map_err(|e| { + err(ConnectionSetupRequestErrorKind::AuthFinishError(e)) + })?; break; } - Response::Error(err) => { - return Err(err.into()); - } - _ => { - return Err(QueryError::ProtocolError( - "Unexpected response to Authenticate Response message", - )) - } } } - }, - None => return Err(QueryError::InvalidMessage( - "Authentication is required. You can use SessionBuilder::user(\"user\", \"pass\") to provide credentials \ - or SessionBuilder::authenticator_provider to provide custom authenticator".to_string(), - )), + } + None => return Err(err(ConnectionSetupRequestErrorKind::MissingAuthentication)), } Ok(()) From c7eacd63cbeeaae8614dcd34fe9499c084f87616 Mon Sep 17 00:00:00 2001 From: muzarski Date: Wed, 21 Aug 2024 16:39:26 +0200 Subject: [PATCH 20/33] conn: remove QueryError from ConnectionError Since we tidied up the error types for connection setup requests, so now they return the ConnectionSetupRequestError, we can remove the QueryError dependency from ConnectionError. --- scylla-cql/src/errors.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/scylla-cql/src/errors.rs b/scylla-cql/src/errors.rs index 5bff0331d8..a8ad55b09f 100644 --- a/scylla-cql/src/errors.rs +++ b/scylla-cql/src/errors.rs @@ -523,9 +523,6 @@ pub enum ConnectionError { BrokenConnection(#[from] BrokenConnectionError), #[error(transparent)] ConnectionSetupRequestError(#[from] ConnectionSetupRequestError), - // TODO: remove it or change it later. - #[error(transparent)] - QueryError(#[from] QueryError), } impl ConnectionError { From 026b2514e0de6d4d57b8a9895c716a5576055303 Mon Sep 17 00:00:00 2001 From: muzarski Date: Thu, 22 Aug 2024 17:42:38 +0200 Subject: [PATCH 21/33] errors: introduce UserRequestError This is an error that appears case of failure of user request (requests triggered via public API, i.e. PREPARE, EXECUTE, QUERY, BATCH). Its only purpose is to filter out impossible error variants from `CqlResponseParseError` that appear in `RequestError` type. For example, it should not be possible to receive `CqlSupportedParseError` when parsing the response to user request. It means that the server sent an invalid response to the corresponding request. The From implementation will be adjusted in a following commits. Firstly, we need to narrow the type of corresponding functions and methods to UserRequestError. --- scylla-cql/src/errors.rs | 54 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 52 insertions(+), 2 deletions(-) diff --git a/scylla-cql/src/errors.rs b/scylla-cql/src/errors.rs index a8ad55b09f..30f9d46368 100644 --- a/scylla-cql/src/errors.rs +++ b/scylla-cql/src/errors.rs @@ -2,8 +2,8 @@ use crate::frame::frame_errors::{ CqlAuthChallengeParseError, CqlAuthSuccessParseError, CqlAuthenticateParseError, - CqlErrorParseError, CqlEventParseError, CqlResponseParseError, CqlSupportedParseError, - FrameError, ParseError, + CqlErrorParseError, CqlEventParseError, CqlResponseParseError, CqlResultParseError, + CqlSupportedParseError, FrameError, ParseError, }; use crate::frame::protocol_features::ProtocolFeatures; use crate::frame::value::SerializeValuesError; @@ -62,6 +62,33 @@ pub enum QueryError { RequestTimeout(String), } +/// An error type that occurred when executing one of: +/// - QUERY +/// - PREPARE +/// - EXECUTE +/// - BATCH +/// +/// requests. +#[derive(Error, Debug)] +pub enum UserRequestError { + #[error("Database returned an error: {0}, Error message: {1}")] + DbError(DbError, String), + #[error(transparent)] + CqlResultParseError(#[from] CqlResultParseError), + #[error("Failed to deserialize ERROR response: {0}")] + CqlErrorParseError(#[from] CqlErrorParseError), + #[error( + "Received unexpected response from the server: {0}. Expected RESULT or ERROR response." + )] + UnexpectedResponse(CqlResponseKind), + #[error(transparent)] + BrokenConnectionError(#[from] BrokenConnectionError), + #[error(transparent)] + FrameError(#[from] FrameError), + #[error("Unable to allocate stream id")] + UnableToAllocStreamId, +} + /// An error sent from the database in response to a query /// as described in the [specification](https://github.com/apache/cassandra/blob/5ed5e84613ef0e9664a774493db7d2604e3596e0/doc/native_protocol_v4.spec#L1029)\ #[derive(Error, Debug, Clone, PartialEq, Eq)] @@ -761,6 +788,29 @@ impl From for QueryError { } } +impl From for QueryError { + fn from(value: UserRequestError) -> Self { + match value { + UserRequestError::DbError(err, msg) => QueryError::DbError(err, msg), + UserRequestError::CqlResultParseError(e) => { + // FIXME: change later + CqlResponseParseError::CqlResultParseError(e).into() + } + UserRequestError::CqlErrorParseError(e) => { + // FIXME: change later + CqlResponseParseError::CqlErrorParseError(e).into() + } + UserRequestError::BrokenConnectionError(e) => e.into(), + UserRequestError::UnexpectedResponse(_) => { + // FIXME: make it typed. It needs to wait for ProtocolError refactor. + QueryError::ProtocolError("Received unexpected response from the server. Expected RESULT or ERROR response.") + } + UserRequestError::FrameError(e) => e.into(), + UserRequestError::UnableToAllocStreamId => QueryError::UnableToAllocStreamId, + } + } +} + impl From for NewSessionError { fn from(io_error: std::io::Error) -> NewSessionError { NewSessionError::IoError(Arc::new(io_error)) From f3240a6acc40eb2482e271fefb98eff5a5fb9a03 Mon Sep 17 00:00:00 2001 From: muzarski Date: Mon, 26 Aug 2024 15:02:51 +0200 Subject: [PATCH 22/33] errors: From for UserRequestError As commit title states, this commit implements a transition from RequestError to UserRequestError. The only accepted responses for user requests are RESULT and ERROR. If we started deserialization of some other response, but failed, we will treat it as unexpected response error. --- scylla-cql/src/errors.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/scylla-cql/src/errors.rs b/scylla-cql/src/errors.rs index 30f9d46368..5879223dec 100644 --- a/scylla-cql/src/errors.rs +++ b/scylla-cql/src/errors.rs @@ -708,6 +708,23 @@ pub enum RequestError { UnableToAllocStreamId, } +impl From for UserRequestError { + fn from(value: RequestError) -> Self { + match value { + RequestError::FrameError(e) => e.into(), + RequestError::CqlResponseParseError(e) => match e { + // Only possible responses are RESULT and ERROR. If we failed parsing + // other response, treat it as unexpected response. + CqlResponseParseError::CqlErrorParseError(e) => e.into(), + CqlResponseParseError::CqlResultParseError(e) => e.into(), + _ => UserRequestError::UnexpectedResponse(e.to_response_kind()), + }, + RequestError::BrokenConnection(e) => e.into(), + RequestError::UnableToAllocStreamId => UserRequestError::UnableToAllocStreamId, + } + } +} + impl From for BrokenConnectionError { fn from(value: BrokenConnectionErrorKind) -> Self { BrokenConnectionError(Arc::new(value)) From f8eb4abd402dae66eb15ffa7ad543e5e373fbca9 Mon Sep 17 00:00:00 2001 From: muzarski Date: Mon, 26 Aug 2024 15:08:55 +0200 Subject: [PATCH 23/33] connection: return UserRequestError in connection::prepare It just narrows the error type from QueryError to UserRequestError --- scylla/src/transport/connection.rs | 17 +++++++++++------ scylla/src/transport/session.rs | 4 ++-- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/scylla/src/transport/connection.rs b/scylla/src/transport/connection.rs index a7b730f5be..4ae607f0b0 100644 --- a/scylla/src/transport/connection.rs +++ b/scylla/src/transport/connection.rs @@ -3,13 +3,13 @@ use futures::{future::RemoteHandle, FutureExt}; use scylla_cql::errors::{ BrokenConnectionError, BrokenConnectionErrorKind, ConnectionError, ConnectionSetupRequestError, ConnectionSetupRequestErrorKind, CqlEventHandlingError, CqlRequestKind, RequestError, - ResponseParseError, TranslationError, + ResponseParseError, TranslationError, UserRequestError, }; use scylla_cql::frame::frame_errors::CqlResponseParseError; use scylla_cql::frame::request::options::{self, Options}; -use scylla_cql::frame::response; use scylla_cql::frame::response::result::{ResultMetadata, TableSpec}; use scylla_cql::frame::response::Error; +use scylla_cql::frame::response::{self, error}; use scylla_cql::frame::types::SerialConsistency; use scylla_cql::types::serialize::batch::{BatchValues, BatchValuesIterator}; use scylla_cql::types::serialize::raw_batch::RawBatchValuesAdapter; @@ -848,7 +848,10 @@ impl Connection { Ok(supported) } - pub(crate) async fn prepare(&self, query: &Query) -> Result { + pub(crate) async fn prepare( + &self, + query: &Query, + ) -> Result { let query_response = self .send_request( &request::Prepare { @@ -861,7 +864,9 @@ impl Connection { .await?; let mut prepared_statement = match query_response.response { - Response::Error(err) => return Err(err.into()), + Response::Error(error::Error { error, reason }) => { + return Err(UserRequestError::DbError(error, reason)) + } Response::Result(result::Result::Prepared(p)) => PreparedStatement::new( p.id, self.features @@ -874,8 +879,8 @@ impl Connection { query.config.clone(), ), _ => { - return Err(QueryError::ProtocolError( - "PREPARE: Unexpected server response", + return Err(UserRequestError::UnexpectedResponse( + query_response.response.to_response_kind(), )) } }; diff --git a/scylla/src/transport/session.rs b/scylla/src/transport/session.rs index 9ad92b53e7..ef85411aa7 100644 --- a/scylla/src/transport/session.rs +++ b/scylla/src/transport/session.rs @@ -76,7 +76,7 @@ pub use crate::transport::connection_pool::PoolSize; use crate::authentication::AuthenticatorProvider; #[cfg(feature = "ssl")] use openssl::ssl::SslContext; -use scylla_cql::errors::BadQuery; +use scylla_cql::errors::{BadQuery, UserRequestError}; pub(crate) const TABLET_CHANNEL_SIZE: usize = 8192; @@ -975,7 +975,7 @@ impl Session { // Safety: there is at least one node in the cluster, and `Cluster::iter_working_connections()` // returns either an error or an iterator with at least one connection, so there will be at least one result. - let first_ok: Result = + let first_ok: Result = results.by_ref().find_or_first(Result::is_ok).unwrap(); let mut prepared: PreparedStatement = first_ok?; From ac44d8433f54e098713875c42b092e79f24aa127 Mon Sep 17 00:00:00 2001 From: muzarski Date: Mon, 26 Aug 2024 15:22:26 +0200 Subject: [PATCH 24/33] connection: narrow error return type of Connection::reprepare --- scylla-cql/src/errors.rs | 5 +++++ scylla/src/transport/connection.rs | 6 ++---- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/scylla-cql/src/errors.rs b/scylla-cql/src/errors.rs index 5879223dec..eaaf12b148 100644 --- a/scylla-cql/src/errors.rs +++ b/scylla-cql/src/errors.rs @@ -87,6 +87,8 @@ pub enum UserRequestError { FrameError(#[from] FrameError), #[error("Unable to allocate stream id")] UnableToAllocStreamId, + #[error("Prepared statement Id changed, md5 sum should stay the same")] + RepreparedIdChanged, } /// An error sent from the database in response to a query @@ -824,6 +826,9 @@ impl From for QueryError { } UserRequestError::FrameError(e) => e.into(), UserRequestError::UnableToAllocStreamId => QueryError::UnableToAllocStreamId, + UserRequestError::RepreparedIdChanged => QueryError::ProtocolError( + "Prepared statement Id changed, md5 sum should stay the same", + ), } } } diff --git a/scylla/src/transport/connection.rs b/scylla/src/transport/connection.rs index 4ae607f0b0..dac614cc9e 100644 --- a/scylla/src/transport/connection.rs +++ b/scylla/src/transport/connection.rs @@ -895,15 +895,13 @@ impl Connection { &self, query: impl Into, previous_prepared: &PreparedStatement, - ) -> Result<(), QueryError> { + ) -> Result<(), UserRequestError> { let reprepare_query: Query = query.into(); let reprepared = self.prepare(&reprepare_query).await?; // Reprepared statement should keep its id - it's the md5 sum // of statement contents if reprepared.get_id() != previous_prepared.get_id() { - Err(QueryError::ProtocolError( - "Prepared statement Id changed, md5 sum should stay the same", - )) + Err(UserRequestError::RepreparedIdChanged) } else { Ok(()) } From da48224701a206a6d324e8259b11bb850a750cb7 Mon Sep 17 00:00:00 2001 From: muzarski Date: Mon, 26 Aug 2024 15:17:22 +0200 Subject: [PATCH 25/33] conn: narrow return type for execute and query Unfortunately, I didn't know how to decouple this commit into two separate commits. This is due to the trait bounds on e.g. RowIteratorWorker::work implementation. --- scylla/src/transport/connection.rs | 10 ++++++---- scylla/src/transport/iterator.rs | 8 +++++--- scylla/src/transport/session.rs | 3 +++ 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/scylla/src/transport/connection.rs b/scylla/src/transport/connection.rs index dac614cc9e..5dc0fc3978 100644 --- a/scylla/src/transport/connection.rs +++ b/scylla/src/transport/connection.rs @@ -1018,6 +1018,7 @@ impl Connection { self.query_raw_unpaged(&query, PagingState::start()) .await + .map_err(Into::into) .and_then(QueryResponse::into_query_result) } @@ -1025,7 +1026,7 @@ impl Connection { &self, query: &Query, paging_state: PagingState, - ) -> Result { + ) -> Result { // This method is used only for driver internal queries, so no need to consult execution profile here. self.query_raw_with_consistency( query, @@ -1046,7 +1047,7 @@ impl Connection { serial_consistency: Option, page_size: Option, paging_state: PagingState, - ) -> Result { + ) -> Result { let query_frame = query::Query { contents: Cow::Borrowed(&query.contents), parameters: query::QueryParameters { @@ -1076,6 +1077,7 @@ impl Connection { // This method is used only for driver internal queries, so no need to consult execution profile here. self.execute_raw_unpaged(prepared, values, PagingState::start()) .await + .map_err(Into::into) .and_then(QueryResponse::into_query_result) } @@ -1085,7 +1087,7 @@ impl Connection { prepared: &PreparedStatement, values: SerializedValues, paging_state: PagingState, - ) -> Result { + ) -> Result { // This method is used only for driver internal queries, so no need to consult execution profile here. self.execute_raw_with_consistency( prepared, @@ -1108,7 +1110,7 @@ impl Connection { serial_consistency: Option, page_size: Option, paging_state: PagingState, - ) -> Result { + ) -> Result { let execute_frame = execute::Execute { id: prepared_statement.get_id().to_owned(), parameters: query::QueryParameters { diff --git a/scylla/src/transport/iterator.rs b/scylla/src/transport/iterator.rs index ace9989ee9..04aab06f13 100644 --- a/scylla/src/transport/iterator.rs +++ b/scylla/src/transport/iterator.rs @@ -9,6 +9,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use futures::Stream; +use scylla_cql::errors::UserRequestError; use scylla_cql::frame::response::NonErrorResponse; use scylla_cql::types::serialize::row::SerializedValues; use std::result::Result; @@ -479,7 +480,7 @@ struct RowIteratorWorker<'a, QueryFunc, SpanCreatorFunc> { sender: ProvingSender>, // Closure used to perform a single page query - // AsyncFn(Arc, Option>) -> Result + // AsyncFn(Arc, Option>) -> Result page_query: QueryFunc, statement_info: RoutingInfo<'a>, @@ -502,7 +503,7 @@ struct RowIteratorWorker<'a, QueryFunc, SpanCreatorFunc> { impl RowIteratorWorker<'_, QueryFunc, SpanCreator> where QueryFunc: Fn(Arc, Consistency, PagingState) -> QueryFut, - QueryFut: Future>, + QueryFut: Future>, SpanCreator: Fn() -> RequestSpan, { // Contract: this function MUST send at least one item through self.sender @@ -656,6 +657,7 @@ where let query_response = (self.page_query)(connection.clone(), consistency, self.paging_state.clone()) .await + .map_err(Into::into) .and_then(QueryResponse::into_non_error_query_response); let elapsed = query_start.elapsed(); @@ -826,7 +828,7 @@ struct SingleConnectionRowIteratorWorker { impl SingleConnectionRowIteratorWorker where Fetcher: Fn(PagingState) -> FetchFut + Send + Sync, - FetchFut: Future> + Send, + FetchFut: Future> + Send, { async fn work(mut self) -> PageSendAttemptedProof { match self.do_work().await { diff --git a/scylla/src/transport/session.rs b/scylla/src/transport/session.rs index ef85411aa7..7478af1cbf 100644 --- a/scylla/src/transport/session.rs +++ b/scylla/src/transport/session.rs @@ -773,6 +773,7 @@ impl Session { paging_state_ref.clone(), ) .await + .map_err(Into::into) .and_then(QueryResponse::into_non_error_query_response) } else { let prepared = connection.prepare(query_ref).await?; @@ -788,6 +789,7 @@ impl Session { paging_state_ref.clone(), ) .await + .map_err(Into::into) .and_then(QueryResponse::into_non_error_query_response) } } @@ -1226,6 +1228,7 @@ impl Session { paging_state_ref.clone(), ) .await + .map_err(Into::into) .and_then(QueryResponse::into_non_error_query_response) } }, From e9078cf05709872edbdf6234224bd8ea88102900 Mon Sep 17 00:00:00 2001 From: muzarski Date: Mon, 26 Aug 2024 16:12:55 +0200 Subject: [PATCH 26/33] non_error_response: narrow return type Narrowed return error type of functions that convert QueryResponse into NonErrorQueryResponse and QueryResult. --- scylla-cql/src/frame/response/mod.rs | 14 +++++++++----- scylla/src/transport/connection.rs | 16 +++++++++------- scylla/src/transport/iterator.rs | 2 +- scylla/src/transport/session.rs | 6 +++--- 4 files changed, 22 insertions(+), 16 deletions(-) diff --git a/scylla-cql/src/frame/response/mod.rs b/scylla-cql/src/frame/response/mod.rs index 4cceb2197d..516f7d24e9 100644 --- a/scylla-cql/src/frame/response/mod.rs +++ b/scylla-cql/src/frame/response/mod.rs @@ -10,7 +10,7 @@ use std::sync::Arc; pub use error::Error; pub use supported::Supported; -use crate::errors::{CqlResponseKind, QueryError}; +use crate::errors::{CqlResponseKind, UserRequestError}; use crate::frame::protocol_features::ProtocolFeatures; use crate::frame::response::result::ResultMetadata; use crate::frame::TryFromPrimitiveError; @@ -106,9 +106,11 @@ impl Response { Ok(response) } - pub fn into_non_error_response(self) -> Result { - Ok(match self { - Response::Error(err) => return Err(QueryError::from(err)), + pub fn into_non_error_response(self) -> Result { + let non_error_response = match self { + Response::Error(error::Error { error, reason }) => { + return Err(UserRequestError::DbError(error, reason)) + } Response::Ready => NonErrorResponse::Ready, Response::Result(res) => NonErrorResponse::Result(res), Response::Authenticate(auth) => NonErrorResponse::Authenticate(auth), @@ -116,7 +118,9 @@ impl Response { Response::AuthChallenge(auth_chal) => NonErrorResponse::AuthChallenge(auth_chal), Response::Supported(sup) => NonErrorResponse::Supported(sup), Response::Event(eve) => NonErrorResponse::Event(eve), - }) + }; + + Ok(non_error_response) } } diff --git a/scylla/src/transport/connection.rs b/scylla/src/transport/connection.rs index 5dc0fc3978..00163ca5ca 100644 --- a/scylla/src/transport/connection.rs +++ b/scylla/src/transport/connection.rs @@ -228,7 +228,9 @@ pub(crate) struct NonErrorQueryResponse { } impl QueryResponse { - pub(crate) fn into_non_error_query_response(self) -> Result { + pub(crate) fn into_non_error_query_response( + self, + ) -> Result { Ok(NonErrorQueryResponse { response: self.response.into_non_error_response()?, tracing_id: self.tracing_id, @@ -238,7 +240,7 @@ impl QueryResponse { pub(crate) fn into_query_result_and_paging_state( self, - ) -> Result<(QueryResult, PagingStateResponse), QueryError> { + ) -> Result<(QueryResult, PagingStateResponse), UserRequestError> { self.into_non_error_query_response()? .into_query_result_and_paging_state() } @@ -265,7 +267,7 @@ impl NonErrorQueryResponse { pub(crate) fn into_query_result_and_paging_state( self, - ) -> Result<(QueryResult, PagingStateResponse), QueryError> { + ) -> Result<(QueryResult, PagingStateResponse), UserRequestError> { let (rows, paging_state, metadata, serialized_size) = match self.response { NonErrorResponse::Result(result::Result::Rows(rs)) => ( Some(rs.rows), @@ -275,8 +277,8 @@ impl NonErrorQueryResponse { ), NonErrorResponse::Result(_) => (None, PagingStateResponse::NoMorePages, None, 0), _ => { - return Err(QueryError::ProtocolError( - "Unexpected server response, expected Result or Error", + return Err(UserRequestError::UnexpectedResponse( + self.response.to_response_kind(), )) } }; @@ -968,7 +970,7 @@ impl Connection { &self, query: impl Into, paging_state: PagingState, - ) -> Result<(QueryResult, PagingStateResponse), QueryError> { + ) -> Result<(QueryResult, PagingStateResponse), UserRequestError> { let query: Query = query.into(); // This method is used only for driver internal queries, so no need to consult execution profile here. @@ -993,7 +995,7 @@ impl Connection { paging_state: PagingState, consistency: Consistency, serial_consistency: Option, - ) -> Result<(QueryResult, PagingStateResponse), QueryError> { + ) -> Result<(QueryResult, PagingStateResponse), UserRequestError> { let query: Query = query.into(); let page_size = query.get_validated_page_size(); diff --git a/scylla/src/transport/iterator.rs b/scylla/src/transport/iterator.rs index 04aab06f13..6dfcb622b6 100644 --- a/scylla/src/transport/iterator.rs +++ b/scylla/src/transport/iterator.rs @@ -657,7 +657,6 @@ where let query_response = (self.page_query)(connection.clone(), consistency, self.paging_state.clone()) .await - .map_err(Into::into) .and_then(QueryResponse::into_non_error_query_response); let elapsed = query_start.elapsed(); @@ -707,6 +706,7 @@ where Ok(ControlFlow::Continue(())) } Err(err) => { + let err = err.into(); self.metrics.inc_failed_paged_queries(); self.execution_profile .load_balancing_policy diff --git a/scylla/src/transport/session.rs b/scylla/src/transport/session.rs index 7478af1cbf..ea11174d44 100644 --- a/scylla/src/transport/session.rs +++ b/scylla/src/transport/session.rs @@ -773,8 +773,8 @@ impl Session { paging_state_ref.clone(), ) .await - .map_err(Into::into) .and_then(QueryResponse::into_non_error_query_response) + .map_err(Into::into) } else { let prepared = connection.prepare(query_ref).await?; let serialized = prepared.serialize_values(values_ref)?; @@ -789,8 +789,8 @@ impl Session { paging_state_ref.clone(), ) .await - .map_err(Into::into) .and_then(QueryResponse::into_non_error_query_response) + .map_err(Into::into) } } }, @@ -1228,8 +1228,8 @@ impl Session { paging_state_ref.clone(), ) .await - .map_err(Into::into) .and_then(QueryResponse::into_non_error_query_response) + .map_err(Into::into) } }, &span, From 1128cd4f55809794a14a4a94f98fffecb3f12403 Mon Sep 17 00:00:00 2001 From: muzarski Date: Mon, 26 Aug 2024 16:24:08 +0200 Subject: [PATCH 27/33] conn: map sub-error type to UserRequestError Ultimately, we want to get rid of From for QueryError implementation. This is because, RequestError contains a CqlResponseParseError which is overloaded with variants such as CqlAuthChallengeParseError which should not be returned to the user who uses only BATCH, QUERY, EXECUTE and PREPARE requests. This is why, we make two transitions in error types in this place. The first transition is RequestError -> UserRequestError (map_err()), which filters out variants such as CqlAuthChallengeParseError, and leaves only either CqlErrorParseError or CqlResultParseError. The second transition (? operator) is UserRequestError -> QueryError which makes use of From for QueryError implementation. --- scylla/src/transport/connection.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/scylla/src/transport/connection.rs b/scylla/src/transport/connection.rs index 00163ca5ca..48423ed08f 100644 --- a/scylla/src/transport/connection.rs +++ b/scylla/src/transport/connection.rs @@ -1265,7 +1265,8 @@ impl Connection { loop { let query_response = self .send_request(&batch_frame, true, batch.config.tracing, None) - .await?; + .await + .map_err(UserRequestError::from)?; return match query_response.response { Response::Error(err) => match err.error { From 0d0d267db642edbc64f436e3c4273b16ecb6f09a Mon Sep 17 00:00:00 2001 From: muzarski Date: Mon, 26 Aug 2024 16:30:08 +0200 Subject: [PATCH 28/33] errors: remove From for QueryError Since we introduced a transition error type for user requests (UserRequestError), and adjusted the code to it, we can now revert the temporary From implementation. --- scylla-cql/src/errors.rs | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/scylla-cql/src/errors.rs b/scylla-cql/src/errors.rs index eaaf12b148..f310a8bd33 100644 --- a/scylla-cql/src/errors.rs +++ b/scylla-cql/src/errors.rs @@ -796,17 +796,6 @@ impl From for QueryError { } } -impl From for QueryError { - fn from(value: RequestError) -> Self { - match value { - RequestError::FrameError(e) => e.into(), - RequestError::CqlResponseParseError(e) => e.into(), - RequestError::BrokenConnection(e) => e.into(), - RequestError::UnableToAllocStreamId => QueryError::UnableToAllocStreamId, - } - } -} - impl From for QueryError { fn from(value: UserRequestError) -> Self { match value { From 09b06fc78b31fddcb79ce059851405b90eddb9c3 Mon Sep 17 00:00:00 2001 From: muzarski Date: Mon, 26 Aug 2024 16:36:21 +0200 Subject: [PATCH 29/33] errors: replace CqlResponseParseError variant in QueryError Since requests triggered via user API, cannot receive a response other than ERROR or RESULT, we need to narrow the response parsing errors accordingly. Before this commit, we would hold a CqlResponseParseError which contains a parsing errors for responses that should never be returned by the server for user requests. After hard work from previous commits, we can finally replace this overloaded variant with two variants - CqlResultParseError -> failed to deserialize RESULT response - CqlErrorParseError -> failed to deserialize ERROR response In case of failure of parsing other response, we will return a QueryError::ProtocolError saying that server returned an unexpected response. I think we should add more context to this error, but let's leave it for other PR which will be focused on ProtocolError refactor. --- scylla-cql/src/errors.rs | 29 ++++++++++--------- .../src/transport/load_balancing/default.rs | 3 +- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/scylla-cql/src/errors.rs b/scylla-cql/src/errors.rs index f310a8bd33..39864502f4 100644 --- a/scylla-cql/src/errors.rs +++ b/scylla-cql/src/errors.rs @@ -28,9 +28,13 @@ pub enum QueryError { #[error(transparent)] BadQuery(#[from] BadQuery), - /// Failed to deserialize a CQL response from the server. + /// Received a RESULT server response, but failed to deserialize it. #[error(transparent)] - CqlResponseParseError(#[from] CqlResponseParseError), + CqlResultParseError(#[from] CqlResultParseError), + + /// Received an ERROR server response, but failed to deserialize it. + #[error("Failed to deserialize ERROR response: {0}")] + CqlErrorParseError(#[from] CqlErrorParseError), /// Input/Output error has occurred, connection broken etc. #[error("IO Error: {0}")] @@ -482,9 +486,13 @@ pub enum NewSessionError { #[error(transparent)] BadQuery(#[from] BadQuery), - /// Failed to deserialize a CQL response from the server. + /// Received a RESULT server response, but failed to deserialize it. #[error(transparent)] - CqlResponseParseError(#[from] CqlResponseParseError), + CqlResultParseError(#[from] CqlResultParseError), + + /// Received an ERROR server response, but failed to deserialize it. + #[error("Failed to deserialize ERROR response: {0}")] + CqlErrorParseError(#[from] CqlErrorParseError), /// Input/Output error has occurred, connection broken etc. #[error("IO Error: {0}")] @@ -800,14 +808,8 @@ impl From for QueryError { fn from(value: UserRequestError) -> Self { match value { UserRequestError::DbError(err, msg) => QueryError::DbError(err, msg), - UserRequestError::CqlResultParseError(e) => { - // FIXME: change later - CqlResponseParseError::CqlResultParseError(e).into() - } - UserRequestError::CqlErrorParseError(e) => { - // FIXME: change later - CqlResponseParseError::CqlErrorParseError(e).into() - } + UserRequestError::CqlResultParseError(e) => e.into(), + UserRequestError::CqlErrorParseError(e) => e.into(), UserRequestError::BrokenConnectionError(e) => e.into(), UserRequestError::UnexpectedResponse(_) => { // FIXME: make it typed. It needs to wait for ProtocolError refactor. @@ -833,7 +835,8 @@ impl From for NewSessionError { match query_error { QueryError::DbError(e, msg) => NewSessionError::DbError(e, msg), QueryError::BadQuery(e) => NewSessionError::BadQuery(e), - QueryError::CqlResponseParseError(e) => NewSessionError::CqlResponseParseError(e), + QueryError::CqlResultParseError(e) => NewSessionError::CqlResultParseError(e), + QueryError::CqlErrorParseError(e) => NewSessionError::CqlErrorParseError(e), QueryError::IoError(e) => NewSessionError::IoError(e), QueryError::ProtocolError(m) => NewSessionError::ProtocolError(m), QueryError::InvalidMessage(m) => NewSessionError::InvalidMessage(m), diff --git a/scylla/src/transport/load_balancing/default.rs b/scylla/src/transport/load_balancing/default.rs index 07e4b58033..38eb6915ec 100644 --- a/scylla/src/transport/load_balancing/default.rs +++ b/scylla/src/transport/load_balancing/default.rs @@ -2848,7 +2848,8 @@ mod latency_awareness { // "slow" errors, i.e. ones that are returned after considerable time of query being run QueryError::DbError(_, _) - | QueryError::CqlResponseParseError(_) + | QueryError::CqlResultParseError(_) + | QueryError::CqlErrorParseError(_) | QueryError::InvalidMessage(_) | QueryError::IoError(_) | QueryError::ProtocolError(_) From b242c60d7ecfc62fb68c141b8751df445b9602cc Mon Sep 17 00:00:00 2001 From: muzarski Date: Thu, 29 Aug 2024 16:11:03 +0200 Subject: [PATCH 30/33] conn: make ConnectionError clonable In a following commit, we will introduce a ConnectionPoolError, which will contain a variant with a ConnectionError (the error registered for last connection). At the place of creating this variant, we only have access to the reference of ConnectionError. Thus, to make it owned, cloning is required. --- scylla-cql/src/errors.rs | 23 ++++++++++++++++++----- scylla/src/transport/connection.rs | 5 ++--- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/scylla-cql/src/errors.rs b/scylla-cql/src/errors.rs index 39864502f4..418ab6258e 100644 --- a/scylla-cql/src/errors.rs +++ b/scylla-cql/src/errors.rs @@ -545,13 +545,13 @@ pub enum BadKeyspaceName { /// An error that appeared on a connection level. /// It indicated that connection can no longer be used /// and should be dropped. -#[derive(Error, Debug)] +#[derive(Error, Debug, Clone)] #[non_exhaustive] pub enum ConnectionError { #[error("Connect timeout elapsed")] ConnectTimeout, #[error(transparent)] - IoError(#[from] std::io::Error), + IoError(Arc), #[error("Could not find free source port for shard {0}")] NoSourcePortForShard(u32), #[error("Address translation failed: {0}")] @@ -562,6 +562,12 @@ pub enum ConnectionError { ConnectionSetupRequestError(#[from] ConnectionSetupRequestError), } +impl From for ConnectionError { + fn from(value: std::io::Error) -> Self { + ConnectionError::IoError(Arc::new(value)) + } +} + impl ConnectionError { /// Checks if this error indicates that a chosen source port/address cannot be bound. /// This is caused by one of the following: @@ -581,7 +587,7 @@ impl ConnectionError { /// An error that occurred during connection setup request execution. /// It indicates that request needed to initiate a connection failed. -#[derive(Error, Debug)] +#[derive(Error, Debug, Clone)] #[error("Failed to perform a connection setup request. Request: {request_kind}, reason: {error}")] pub struct ConnectionSetupRequestError { request_kind: CqlRequestKind, @@ -590,11 +596,12 @@ pub struct ConnectionSetupRequestError { type AuthError = String; -#[derive(Error, Debug)] +#[derive(Error, Debug, Clone)] #[non_exhaustive] pub enum ConnectionSetupRequestErrorKind { + // TODO: Make FrameError clonable. #[error(transparent)] - FrameError(#[from] FrameError), + FrameError(Arc), #[error("Unable to allocate stream id")] UnableToAllocStreamId, #[error(transparent)] @@ -623,6 +630,12 @@ pub enum ConnectionSetupRequestErrorKind { MissingAuthentication, } +impl From for ConnectionSetupRequestErrorKind { + fn from(value: FrameError) -> Self { + ConnectionSetupRequestErrorKind::FrameError(Arc::new(value)) + } +} + impl ConnectionSetupRequestError { pub fn new(request_kind: CqlRequestKind, error: ConnectionSetupRequestErrorKind) -> Self { ConnectionSetupRequestError { diff --git a/scylla/src/transport/connection.rs b/scylla/src/transport/connection.rs index 48423ed08f..9709f682eb 100644 --- a/scylla/src/transport/connection.rs +++ b/scylla/src/transport/connection.rs @@ -656,7 +656,7 @@ impl Connection { None => tokio::time::timeout(config.connect_timeout, TcpStream::connect(addr)).await, }; let stream = match stream_connector { - Ok(stream) => stream.map_err(ConnectionError::IoError)?, + Ok(stream) => stream?, Err(_) => { return Err(ConnectionError::ConnectTimeout); } @@ -688,8 +688,7 @@ impl Connection { router_handle.clone(), addr.ip(), ) - .await - .map_err(ConnectionError::IoError)?; + .await?; let connection = Connection { _worker_handle, From 8e48c50a18066708d1c5a937cef9098c453d8b4c Mon Sep 17 00:00:00 2001 From: muzarski Date: Wed, 28 Aug 2024 17:11:15 +0200 Subject: [PATCH 31/33] node: ConnectionPoolError Introduced a ConnectionPoolError which appears when we were unable to select a connection from the connection pool. --- scylla-cql/src/errors.rs | 24 ++++++++++++++++ scylla/src/transport/connection_pool.rs | 28 ++++++++----------- .../src/transport/load_balancing/default.rs | 1 + scylla/src/transport/node.rs | 19 ++++++------- 4 files changed, 45 insertions(+), 27 deletions(-) diff --git a/scylla-cql/src/errors.rs b/scylla-cql/src/errors.rs index 418ab6258e..f0bd2e9845 100644 --- a/scylla-cql/src/errors.rs +++ b/scylla-cql/src/errors.rs @@ -40,6 +40,10 @@ pub enum QueryError { #[error("IO Error: {0}")] IoError(Arc), + /// Selected node's connection pool is in invalid state. + #[error("No connections in the pool: {0}")] + ConnectionPoolError(#[from] ConnectionPoolError), + /// Unexpected message received #[error("Protocol Error: {0}")] ProtocolError(&'static str), @@ -498,6 +502,10 @@ pub enum NewSessionError { #[error("IO Error: {0}")] IoError(Arc), + /// Selected node's connection pool is in invalid state. + #[error("No connections in the pool: {0}")] + ConnectionPoolError(#[from] ConnectionPoolError), + /// Unexpected message received #[error("Protocol Error: {0}")] ProtocolError(&'static str), @@ -645,6 +653,21 @@ impl ConnectionSetupRequestError { } } +/// An error that occurred when selecting a node connection +/// to perform a request on. +#[derive(Error, Debug, Clone)] +#[non_exhaustive] +pub enum ConnectionPoolError { + #[error("The pool is broken; Last connection failed with: {last_connection_error}")] + Broken { + last_connection_error: ConnectionError, + }, + #[error("Pool is still being initialized")] + Initializing, + #[error("The node has been disabled by a host filter")] + NodeDisabledByHostFilter, +} + #[derive(Error, Debug, Clone)] #[error("Connection broken, reason: {0}")] pub struct BrokenConnectionError(Arc); @@ -851,6 +874,7 @@ impl From for NewSessionError { QueryError::CqlResultParseError(e) => NewSessionError::CqlResultParseError(e), QueryError::CqlErrorParseError(e) => NewSessionError::CqlErrorParseError(e), QueryError::IoError(e) => NewSessionError::IoError(e), + QueryError::ConnectionPoolError(e) => NewSessionError::ConnectionPoolError(e), QueryError::ProtocolError(m) => NewSessionError::ProtocolError(m), QueryError::InvalidMessage(m) => NewSessionError::InvalidMessage(m), QueryError::TimeoutError => NewSessionError::TimeoutError, diff --git a/scylla/src/transport/connection_pool.rs b/scylla/src/transport/connection_pool.rs index a539941dfd..54921aee1e 100644 --- a/scylla/src/transport/connection_pool.rs +++ b/scylla/src/transport/connection_pool.rs @@ -19,9 +19,8 @@ use super::NodeAddr; use arc_swap::ArcSwap; use futures::{future::RemoteHandle, stream::FuturesUnordered, Future, FutureExt, StreamExt}; use rand::Rng; -use scylla_cql::errors::{BrokenConnectionErrorKind, ConnectionError}; +use scylla_cql::errors::{BrokenConnectionErrorKind, ConnectionError, ConnectionPoolError}; use std::convert::TryInto; -use std::io::ErrorKind; use std::num::NonZeroUsize; use std::pin::Pin; use std::sync::{Arc, RwLock, Weak}; @@ -238,7 +237,7 @@ impl NodeConnectionPool { pub(crate) fn connection_for_shard( &self, shard: Shard, - ) -> Result, std::io::Error> { + ) -> Result, ConnectionPoolError> { trace!(shard = shard, "Selecting connection for shard"); self.with_connections(|pool_conns| match pool_conns { PoolConnections::NotSharded(conns) => { @@ -261,7 +260,7 @@ impl NodeConnectionPool { }) } - pub(crate) fn random_connection(&self) -> Result, std::io::Error> { + pub(crate) fn random_connection(&self) -> Result, ConnectionPoolError> { trace!("Selecting random connection"); self.with_connections(|pool_conns| match pool_conns { PoolConnections::NotSharded(conns) => { @@ -345,7 +344,9 @@ impl NodeConnectionPool { } } - pub(crate) fn get_working_connections(&self) -> Result>, std::io::Error> { + pub(crate) fn get_working_connections( + &self, + ) -> Result>, ConnectionPoolError> { self.with_connections(|pool_conns| match pool_conns { PoolConnections::NotSharded(conns) => conns.clone(), PoolConnections::Sharded { connections, .. } => { @@ -377,21 +378,14 @@ impl NodeConnectionPool { fn with_connections( &self, f: impl FnOnce(&PoolConnections) -> T, - ) -> Result { + ) -> Result { let conns = self.conns.load_full(); match &*conns { MaybePoolConnections::Ready(pool_connections) => Ok(f(pool_connections)), - MaybePoolConnections::Broken(err) => Err(std::io::Error::new( - ErrorKind::Other, - format!( - "No connections in the pool; last connection failed with: {}", - err - ), - )), - MaybePoolConnections::Initializing => Err(std::io::Error::new( - ErrorKind::Other, - "No connections in the pool, pool is still being initialized", - )), + MaybePoolConnections::Broken(err) => Err(ConnectionPoolError::Broken { + last_connection_error: err.clone(), + }), + MaybePoolConnections::Initializing => Err(ConnectionPoolError::Initializing), } } } diff --git a/scylla/src/transport/load_balancing/default.rs b/scylla/src/transport/load_balancing/default.rs index 38eb6915ec..35d1f926b3 100644 --- a/scylla/src/transport/load_balancing/default.rs +++ b/scylla/src/transport/load_balancing/default.rs @@ -2838,6 +2838,7 @@ mod latency_awareness { // "fast" errors, i.e. ones that are returned quickly after the query begins QueryError::BadQuery(_) | QueryError::BrokenConnection(_) + | QueryError::ConnectionPoolError(_) | QueryError::TooManyOrphanedStreamIds(_) | QueryError::UnableToAllocStreamId | QueryError::DbError(DbError::IsBootstrapping, _) diff --git a/scylla/src/transport/node.rs b/scylla/src/transport/node.rs index e698820a82..79895d65c0 100644 --- a/scylla/src/transport/node.rs +++ b/scylla/src/transport/node.rs @@ -1,3 +1,4 @@ +use scylla_cql::errors::ConnectionPoolError; use tokio::net::lookup_host; use tracing::warn; use uuid::Uuid; @@ -157,7 +158,7 @@ impl Node { pub(crate) async fn connection_for_shard( &self, shard: Shard, - ) -> Result, std::io::Error> { + ) -> Result, ConnectionPoolError> { self.get_pool()?.connection_for_shard(shard) } @@ -186,7 +187,9 @@ impl Node { Ok(()) } - pub(crate) fn get_working_connections(&self) -> Result>, std::io::Error> { + pub(crate) fn get_working_connections( + &self, + ) -> Result>, ConnectionPoolError> { self.get_pool()?.get_working_connections() } @@ -196,14 +199,10 @@ impl Node { } } - fn get_pool(&self) -> Result<&NodeConnectionPool, std::io::Error> { - self.pool.as_ref().ok_or_else(|| { - std::io::Error::new( - std::io::ErrorKind::Other, - "No connections in the pool: the node has been disabled \ - by the host filter", - ) - }) + fn get_pool(&self) -> Result<&NodeConnectionPool, ConnectionPoolError> { + self.pool + .as_ref() + .ok_or(ConnectionPoolError::NodeDisabledByHostFilter) } } From 885115b52e88d2044389e5335673239817bc49ba Mon Sep 17 00:00:00 2001 From: muzarski Date: Thu, 29 Aug 2024 16:03:51 +0200 Subject: [PATCH 32/33] error: remove From for QueryError This conversion is no longer used, as IO errors are already handled by transition error types. --- scylla-cql/src/errors.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/scylla-cql/src/errors.rs b/scylla-cql/src/errors.rs index f0bd2e9845..26195009a3 100644 --- a/scylla-cql/src/errors.rs +++ b/scylla-cql/src/errors.rs @@ -792,12 +792,6 @@ impl std::fmt::Display for WriteType { } } -impl From for QueryError { - fn from(io_error: std::io::Error) -> QueryError { - QueryError::IoError(Arc::new(io_error)) - } -} - impl From for QueryError { fn from(serialized_err: SerializeValuesError) -> QueryError { QueryError::BadQuery(BadQuery::SerializeValuesError(serialized_err)) From 9259deaba6cd23fd0a707fa4068816d117300531 Mon Sep 17 00:00:00 2001 From: muzarski Date: Thu, 29 Aug 2024 16:53:31 +0200 Subject: [PATCH 33/33] f_errors: Enrich CqlResultParseError messages with response kind Appended `RESULT:` prefix to RESULT response types. --- scylla-cql/src/frame/frame_errors.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/scylla-cql/src/frame/frame_errors.rs b/scylla-cql/src/frame/frame_errors.rs index eef6d0ab47..d5e9fa859d 100644 --- a/scylla-cql/src/frame/frame_errors.rs +++ b/scylla-cql/src/frame/frame_errors.rs @@ -149,15 +149,15 @@ pub enum CqlResultParseError { ResultIdParseError(LowLevelDeserializationError), #[error("Unknown RESULT response id: {0}")] UnknownResultId(i32), - #[error("'Set_keyspace' response deserialization failed: {0}")] + #[error("RESULT:Set_keyspace response deserialization failed: {0}")] SetKeyspaceParseError(#[from] SetKeyspaceParseError), // This is an error returned during deserialization of // `RESULT::Schema_change` response, and not `EVENT` response. - #[error("'Schema_change' response deserialization failed: {0}")] + #[error("RESULT:Schema_change response deserialization failed: {0}")] SchemaChangeParseError(#[from] SchemaChangeEventParseError), - #[error("'Prepared' response deserialization failed: {0}")] + #[error("RESULT:Prepared response deserialization failed: {0}")] PreparedParseError(#[from] PreparedParseError), - #[error("'Rows' response deserialization failed: {0}")] + #[error("RESULT:Rows response deserialization failed: {0}")] RowsParseError(#[from] RowsParseError), }