From 8ffcf60f308e49a0003b40ff82a962e4fab89d25 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Mon, 24 Jun 2024 14:58:46 +1000 Subject: [PATCH] cassandra 5.0 vector type CREATE/INSERT support makes progress towards: https://github.com/scylladb/scylla-rust-driver/issues/1014 The vector type is introduced by the currently in beta cassandra 5. See: https://cassandra.apache.org/doc/latest/cassandra/reference/vector-data-type.html Scylla does not support vector types and so the tests are setup to only compile/run with a new cassandra_tests config. This commit does not add support for retrieving the data via a SELECT. That was omitted to reduce scope and will be implemented in follow up work. --- .github/workflows/cassandra5.yml | 42 +++++++++ scylla/src/transport/session_test.rs | 102 +++++++++++++++++++++ scylla/src/transport/topology.rs | 47 ++++++++++ scylla/src/utils/parse.rs | 15 +++ test/cluster/cassandra5/docker-compose.yml | 63 +++++++++++++ 5 files changed, 269 insertions(+) create mode 100644 .github/workflows/cassandra5.yml create mode 100644 test/cluster/cassandra5/docker-compose.yml diff --git a/.github/workflows/cassandra5.yml b/.github/workflows/cassandra5.yml new file mode 100644 index 0000000000..2e23e1f5a5 --- /dev/null +++ b/.github/workflows/cassandra5.yml @@ -0,0 +1,42 @@ +# This workflow runs cassandra5 specific tests # TODO: When cassandra 5.0.0 releases, delete this workflow and move RUSTFLAGS="--cfg cassandra_tests" into the cassandra.yml workflow +name: Cassandra 5 beta tests + +on: + push: + branches: + - main + - 'branch-*' + pull_request: + branches: + - main + - 'branch-*' + +env: + CARGO_TERM_COLOR: always + RUST_BACKTRACE: full + +jobs: + build: + runs-on: ubuntu-latest + timeout-minutes: 60 + steps: + - uses: actions/checkout@v3 + - name: Setup 3-node Cassandra cluster + run: | + docker compose -f test/cluster/cassandra5/docker-compose.yml up -d --wait + # A separate step for building to separate measuring time of compilation and testing + - name: Update rust toolchain + run: rustup update + - name: Build the project + run: cargo build --verbose --tests --features "full-serialization" + - name: Run tests on cassandra 5 beta + run: | + CDC='disabled' RUSTFLAGS="--cfg cassandra_tests" RUST_LOG=trace SCYLLA_URI=172.42.0.2:9042 SCYLLA_URI2=172.42.0.3:9042 SCYLLA_URI3=172.42.0.4:9042 cargo test --verbose --features "full-serialization" -- --skip test_views_in_schema_info --skip test_large_batch_statements + - name: Stop the cluster + if: ${{ always() }} + run: docker compose -f test/cluster/cassandra5/docker-compose.yml stop + - name: Print the cluster logs + if: ${{ always() }} + run: docker compose -f test/cluster/cassandra5/docker-compose.yml logs + - name: Remove cluster + run: docker compose -f test/cluster/cassandra5/docker-compose.yml down diff --git a/scylla/src/transport/session_test.rs b/scylla/src/transport/session_test.rs index 22ce02ae84..0cba562353 100644 --- a/scylla/src/transport/session_test.rs +++ b/scylla/src/transport/session_test.rs @@ -2936,3 +2936,105 @@ async fn test_manual_primary_key_computation() { .await; } } + +#[cfg(cassandra_tests)] +#[tokio::test] +async fn test_vector_type_metadata() { + setup_tracing(); + let session = create_new_session_builder().build().await.unwrap(); + let ks = unique_keyspace_name(); + + session.query(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]).await.unwrap(); + session + .query( + format!( + "CREATE TABLE IF NOT EXISTS {}.t (a int PRIMARY KEY, b vector, c vector)", + ks + ), + &[], + ) + .await + .unwrap(); + + session.refresh_metadata().await.unwrap(); + let metadata = session.get_cluster_data(); + let columns = &metadata.keyspaces[&ks].tables["t"].columns; + assert_eq!( + columns["b"].type_, + CqlType::Vector { + type_: Box::new(CqlType::Native(NativeType::Int)), + dimensions: 4, + }, + ); + assert_eq!( + columns["c"].type_, + CqlType::Vector { + type_: Box::new(CqlType::Native(NativeType::Text)), + dimensions: 2, + }, + ); +} + +#[cfg(cassandra_tests)] +#[tokio::test] +async fn test_vector_type_unprepared() { + setup_tracing(); + let session = create_new_session_builder().build().await.unwrap(); + let ks = unique_keyspace_name(); + + session.query(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]).await.unwrap(); + session + .query( + format!( + "CREATE TABLE IF NOT EXISTS {}.t (a int PRIMARY KEY, b vector, c vector)", + ks + ), + &[], + ) + .await + .unwrap(); + + session + .query( + format!( + "INSERT INTO {}.t (a, b, c) VALUES (1, [1, 2, 3, 4], ['foo', 'bar'])", + ks + ), + &[], + ) + .await + .unwrap(); + + // TODO: Implement and test SELECT statements and bind values (`?`) +} + +#[cfg(cassandra_tests)] +#[tokio::test] +async fn test_vector_type_prepared() { + setup_tracing(); + let session = create_new_session_builder().build().await.unwrap(); + let ks = unique_keyspace_name(); + + session.query(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]).await.unwrap(); + session + .query( + format!( + "CREATE TABLE IF NOT EXISTS {}.t (a int PRIMARY KEY, b vector, c vector)", + ks + ), + &[], + ) + .await + .unwrap(); + + let prepared_statement = session + .prepare(format!( + "INSERT INTO {}.t (a, b, c) VALUES (?, [11, 12, 13, 14], ['afoo', 'abar'])", + ks + )) + .await + .unwrap(); + session.execute(&prepared_statement, &(2,)).await.unwrap(); + + // TODO: Implement and test SELECT statements and bind values (`?`) +} diff --git a/scylla/src/transport/topology.rs b/scylla/src/transport/topology.rs index b468050c0b..696bbe0c1a 100644 --- a/scylla/src/transport/topology.rs +++ b/scylla/src/transport/topology.rs @@ -184,6 +184,12 @@ enum PreCqlType { type_: PreCollectionType, }, Tuple(Vec), + Vector { + type_: Box, + /// matches the datatype used by the java driver: + /// + dimensions: i32, + }, UserDefinedType { frozen: bool, name: String, @@ -207,6 +213,10 @@ impl PreCqlType { .map(|t| t.into_cql_type(keyspace_name, udts)) .collect(), ), + PreCqlType::Vector { type_, dimensions } => CqlType::Vector { + type_: Box::new(type_.into_cql_type(keyspace_name, udts)), + dimensions, + }, PreCqlType::UserDefinedType { frozen, name } => { let definition = match udts .get(keyspace_name) @@ -232,6 +242,12 @@ pub enum CqlType { type_: CollectionType, }, Tuple(Vec), + Vector { + type_: Box, + /// matches the datatype used by the java driver: + /// + dimensions: i32, + }, UserDefinedType { frozen: bool, // Using Arc here in order not to have many copies of the same definition @@ -1093,6 +1109,7 @@ fn topo_sort_udts(udts: &mut Vec) -> Result<(), Quer PreCqlType::Tuple(types) => types .iter() .for_each(|type_| do_with_referenced_udts(what, type_)), + PreCqlType::Vector { type_, .. } => do_with_referenced_udts(what, type_), PreCqlType::UserDefinedType { name, .. } => what(name), } } @@ -1602,6 +1619,22 @@ fn parse_cql_type(p: ParserState<'_>) -> ParseResult<(PreCqlType, ParserState<'_ })?; Ok((PreCqlType::Tuple(types), p)) + } else if let Ok(p) = p.accept("vector<") { + let (inner_type, p) = parse_cql_type(p)?; + + let p = p.skip_white(); + let p = p.accept(",")?; + let p = p.skip_white(); + let (size, p) = p.parse_i32()?; + let p = p.skip_white(); + let p = p.accept(">")?; + + let typ = PreCqlType::Vector { + type_: Box::new(inner_type), + dimensions: size, + }; + + Ok((typ, p)) } else if let Ok((typ, p)) = parse_native_type(p) { Ok((PreCqlType::Native(typ), p)) } else if let Ok((name, p)) = parse_user_defined_type(p) { @@ -1787,6 +1820,20 @@ mod tests { PreCqlType::Native(NativeType::Varint), ]), ), + ( + "vector", + PreCqlType::Vector { + type_: Box::new(PreCqlType::Native(NativeType::Int)), + dimensions: 5, + }, + ), + ( + "vector", + PreCqlType::Vector { + type_: Box::new(PreCqlType::Native(NativeType::Text)), + dimensions: 1234, + }, + ), ( "com.scylladb.types.AwesomeType", PreCqlType::UserDefinedType { diff --git a/scylla/src/utils/parse.rs b/scylla/src/utils/parse.rs index 1c5e59ecb7..96aa7976d7 100644 --- a/scylla/src/utils/parse.rs +++ b/scylla/src/utils/parse.rs @@ -87,6 +87,21 @@ impl<'s> ParserState<'s> { me } + /// Parses a sequence of digits and '-' as an integer. + /// Consumes characters until it finds a character that is not a digit or '-'. + /// + /// An error is returned if: + /// * The first character is not a digit or '-' + /// * The integer is larger than i32 + pub(crate) fn parse_i32(self) -> ParseResult<(i32, Self)> { + let (digits, p) = self.take_while(|c| c.is_ascii_digit() || c == '-'); + if let Ok(value) = digits.parse() { + Ok((value, p)) + } else { + Err(p.error(ParseErrorCause::Expected("integer of max length 2**32"))) + } + } + /// Skips characters from the beginning while they satisfy given predicate /// and returns new parser state which pub(crate) fn take_while(self, mut pred: impl FnMut(char) -> bool) -> (&'s str, Self) { diff --git a/test/cluster/cassandra5/docker-compose.yml b/test/cluster/cassandra5/docker-compose.yml new file mode 100644 index 0000000000..2c60afc3b2 --- /dev/null +++ b/test/cluster/cassandra5/docker-compose.yml @@ -0,0 +1,63 @@ +# TODO: when cassandra 5.0.0 releases, remove this file and use cluster/cassandra/docker-compose.yml instead + +version: '2.4' # 2.4 is the last version that supports depends_on conditions for service health + +networks: + public: + name: scylla_rust_driver_public + driver: bridge + ipam: + driver: default + config: + - subnet: 172.42.0.0/16 +services: + cassandra1: + image: cassandra:5.0-beta1 + healthcheck: + test: [ "CMD", "cqlsh", "-e", "describe keyspaces" ] + interval: 5s + timeout: 5s + retries: 60 + networks: + public: + ipv4_address: 172.42.0.2 + environment: + - CASSANDRA_BROADCAST_ADDRESS=172.42.0.2 + - HEAP_NEWSIZE=512M + - MAX_HEAP_SIZE=2048M + cassandra2: + image: cassandra:5.0-beta1 + healthcheck: + test: [ "CMD", "cqlsh", "-e", "describe keyspaces" ] + interval: 5s + timeout: 5s + retries: 60 + networks: + public: + ipv4_address: 172.42.0.3 + environment: + - CASSANDRA_BROADCAST_ADDRESS=172.42.0.3 + - CASSANDRA_SEEDS=172.42.0.2 + - HEAP_NEWSIZE=512M + - MAX_HEAP_SIZE=2048M + depends_on: + cassandra1: + condition: service_healthy + cassandra3: + image: cassandra:5.0-beta1 + healthcheck: + test: [ "CMD", "cqlsh", "-e", "describe keyspaces" ] + interval: 5s + timeout: 5s + retries: 60 + networks: + public: + ipv4_address: 172.42.0.4 + environment: + - CASSANDRA_BROADCAST_ADDRESS=172.42.0.4 + - CASSANDRA_SEEDS=172.42.0.2,172.42.0.3 + - HEAP_NEWSIZE=512M + - MAX_HEAP_SIZE=2048M + depends_on: + cassandra2: + condition: service_healthy