From 58d07f1f7b142f2013841f0c8a43bfe4f0885067 Mon Sep 17 00:00:00 2001 From: Irach Ramos Date: Wed, 2 Oct 2024 14:10:39 +0200 Subject: [PATCH] code review changes --- .../src/storage/cassandra.rs | 481 ++---------------- .../src/storage/keyvalue/cassandra.rs | 297 ++++++++++- .../tests/key_value_storage.rs | 2 +- 3 files changed, 322 insertions(+), 458 deletions(-) diff --git a/golem-worker-executor-base/src/storage/cassandra.rs b/golem-worker-executor-base/src/storage/cassandra.rs index f6b219d7f..af51d0275 100644 --- a/golem-worker-executor-base/src/storage/cassandra.rs +++ b/golem-worker-executor-base/src/storage/cassandra.rs @@ -12,9 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use bytes::Bytes; use futures::StreamExt; - use golem_common::config::CassandraConfig; use golem_common::metrics::db::{record_db_failure, record_db_success}; use scylla::batch::{Batch, BatchType}; @@ -24,18 +22,15 @@ use scylla::serialize::row::SerializeRow; use scylla::transport::errors::QueryError; use scylla::FromRow; use scylla::{transport::session::PoolSize, Session, SessionBuilder}; -use serde::Deserialize; -use std::collections::HashMap; use std::fmt::Debug; -use std::iter::repeat; use std::time::Instant; use std::{num::NonZeroUsize, sync::Arc}; #[derive(Debug, Clone)] pub struct CassandraSession { pub session: Arc, - keyspace: String, - set_tracing: bool, + pub keyspace: String, + pub set_tracing: bool, } impl CassandraSession { @@ -70,7 +65,7 @@ impl CassandraSession { }) } - pub async fn create_docker_schema(&self) -> Result<(), String> { + pub async fn create_schema(&self) -> Result<(), String> { self.session.query_unpaged( Query::new( format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{ 'class' : 'SimpleStrategy', 'replication_factor' : 1 }};", self.keyspace), @@ -137,50 +132,14 @@ impl CassandraSession { CassandraLabelledApi { svc_name, api_name, - keyspace: self.keyspace.clone(), cassandra: self.clone(), } } } -#[derive(FromRow, Debug, Deserialize)] -struct ValueRow { - value: Vec, -} - -impl ValueRow { - fn into_bytes(self) -> Bytes { - Bytes::from(self.value) - } -} - -#[derive(FromRow, Debug, Deserialize)] -struct KeyValueRow { - key: String, - value: Vec, -} - -impl KeyValueRow { - fn into_pair(self) -> (String, Bytes) { - (self.key, Bytes::from(self.value)) - } -} -#[derive(FromRow, Debug, Deserialize)] -struct ScoreValueRow { - score: f64, - value: Vec, -} - -impl ScoreValueRow { - fn into_pair(self) -> (f64, Bytes) { - (self.score, Bytes::from(self.value)) - } -} - pub struct CassandraLabelledApi { svc_name: &'static str, api_name: &'static str, - keyspace: String, pub cassandra: CassandraSession, } @@ -216,37 +175,34 @@ impl CassandraLabelledApi { statement } - pub async fn set(&self, namespace: &str, key: &str, value: &[u8]) -> Result<(), QueryError> { - let query = format!( - "INSERT INTO {}.kv_store (namespace, key, value) VALUES (?, ?, ?);", - self.keyspace - ); - + pub async fn perform_query( + &self, + cmd_name: &'static str, + query: String, + values: impl SerializeRow, + ) -> Result<(), QueryError> { let start = Instant::now(); self.record( start, - "set", + cmd_name, self.cassandra .session - .execute_unpaged(&self.statement(&query).await, (namespace, key, value)) + .execute_unpaged(&self.statement(&query).await, values) .await, ) .map(|_| ()) } - pub async fn set_many( + pub async fn perform_batch( &self, - namespace: &str, - pairs: &[(&str, &[u8])], + cmd_name: &'static str, + query: String, + values: Vec, ) -> Result<(), QueryError> { - let query = format!( - "INSERT INTO {}.kv_store (namespace, key, value) VALUES (?, ?, ?)", - self.keyspace - ); let mut batch: Batch = Batch::new(BatchType::Logged); let start = Instant::now(); - for _ in 1..=pairs.len() { + for _ in 1..=values.len() { batch.append_statement(self.statement(&query).await); } @@ -254,384 +210,66 @@ impl CassandraLabelledApi { batch.set_tracing(self.cassandra.set_tracing); - let values = pairs - .iter() - .map(|(field_key, field_value)| (namespace, *field_key, *field_value)) - .collect::>(); - self.record( start, - "set_many", + cmd_name, self.cassandra.session.batch(&batch, &values).await, ) .map(|_| ()) } - pub async fn set_if_not_exists( + pub async fn maybe_row( &self, - namespace: &str, - key: &str, - value: &[u8], - ) -> Result { - let existing = self - .cassandra - .session - .execute_unpaged( - &self - .statement(&format!( - "SELECT key FROM {}.kv_store WHERE namespace = ? AND key = ?;", - self.keyspace - )) - .await, - (namespace, key), - ) - .await? - .maybe_first_row_typed::<(String,)>() - .map_err(|e| QueryError::InvalidMessage(e.to_string()))?; - - let query = format!( - "INSERT INTO {}.kv_store (namespace, key, value) VALUES (?, ?, ?) IF NOT EXISTS;", - self.keyspace - ); - - let start = Instant::now(); - self.record( - start, - "set_if_not_exists", - self.cassandra - .session - .execute_unpaged(&self.statement(&query).await, (namespace, key, value)) - .await, - ) - .map(|_| existing.is_none()) - } - - pub async fn get(&self, namespace: &str, key: &str) -> Result, QueryError> { - let query = format!( - "SELECT value FROM {}.kv_store WHERE namespace = ? AND key = ?;", - self.keyspace - ); - + cmd_name: &'static str, + query: String, + values: impl SerializeRow, + map_to_value: F, + ) -> Result, QueryError> + where + RowT: FromRow, + T: Debug, + F: FnOnce(RowT) -> T, + { let start = Instant::now(); self.record( start, - "get", + cmd_name, self.cassandra .session - .execute_unpaged(&self.statement(&query).await, (namespace, key)) + .execute_unpaged(&self.statement(&query).await, values) .await? - .maybe_first_row_typed::() + .maybe_first_row_typed::() .map_err(|e| QueryError::InvalidMessage(e.to_string())) - .map(|opt_row| opt_row.map(|row| row.into_bytes())), + .map(|opt_row| opt_row.map(map_to_value)), ) } - pub async fn get_many( + pub async fn get_rows( &self, - namespace: &str, - keys: Vec, - ) -> Result>, QueryError> { - let placeholders: String = repeat("?").take(keys.len()).collect::>().join(", "); - let query = format!( - "SELECT key, value FROM {}.kv_store WHERE namespace = ? AND key IN ({});", - self.keyspace, placeholders - ); - - let start = Instant::now(); - let parameters: Vec = vec![namespace.to_string()] - .into_iter() - .chain(keys) - .collect(); - let mut rows = self - .cassandra - .session - .execute_iter(self.statement(&query).await, ¶meters) - .await? - .into_typed::(); - - let keys = parameters[1..].to_vec(); - - let mut result = Vec::new(); - while let Some(row) = rows.next().await { - match row { - Ok(row) => result.push(row.into_pair()), - Err(err) => { - return self.record( - start, - "get_many", - Err(QueryError::InvalidMessage(err.to_string())), - ) - } - } - } - let result = self.record(start, "get_many", Ok(result)).unwrap(); - - let mut result_map = result.into_iter().collect::>(); - - let values = keys - .into_iter() - .map(|key| result_map.remove(&key)) - .collect::>>(); - - Ok(values) - } - - pub async fn del(&self, namespace: &str, key: &str) -> Result<(), QueryError> { - let query = format!( - "DELETE FROM {}.kv_store WHERE namespace = ? AND key = ?;", - self.keyspace - ); - - let start = Instant::now(); - self.record( - start, - "del", - self.cassandra - .session - .execute_unpaged(&self.statement(&query).await, (namespace, key)) - .await, - ) - .map(|_| ()) - } - - pub async fn del_many(&self, namespace: &str, keys: Vec) -> Result<(), QueryError> { - let placeholders: String = repeat("?").take(keys.len()).collect::>().join(", "); - let query = format!( - "DELETE FROM {}.kv_store WHERE namespace = ? AND key IN ({});", - self.keyspace, placeholders - ); - - let start = Instant::now(); - let parameters: Vec = vec![namespace.to_string()] - .into_iter() - .chain(keys) - .collect(); - - self.record( - start, - "del_many", - self.cassandra - .session - .execute_unpaged(&self.statement(&query).await, ¶meters) - .await, - ) - .map(|_| ()) - } - - pub async fn exists(&self, namespace: &str, key: &str) -> Result { - let query = format!( - "SELECT value FROM {}.kv_store WHERE namespace = ? AND key = ? LIMIT 1;", - self.keyspace - ); - - let start = Instant::now(); - let rows = self - .record( - start, - "exists", - self.cassandra - .session - .execute_unpaged(&self.statement(&query).await, (namespace, key)) - .await, - )? - .rows; - Ok(rows.map_or(false, |rows| !rows.is_empty())) - } - - pub async fn keys(&self, namespace: &str) -> Result, QueryError> { - let query = format!( - "SELECT key FROM {}.kv_store WHERE namespace = ?;", - self.keyspace - ); - let mut result = Vec::new(); - - let start = Instant::now(); - let mut rows = self - .cassandra - .session - .execute_iter(self.statement(&query).await, &(namespace,)) - .await? - .into_typed::<(String,)>(); - - while let Some(row) = rows.next().await { - match row { - Ok(row) => result.push(row.0), - Err(err) => return Err(QueryError::InvalidMessage(err.to_string())), - } - } - self.record(start, "keys", Ok(result)) - } - - pub async fn add_to_set( - &self, - namespace: &str, - key: &str, - value: &[u8], - ) -> Result<(), QueryError> { - let query = format!( - "INSERT INTO {}.kv_sets (namespace, key, value) VALUES (?, ?, ?);", - self.keyspace - ); - - let start = Instant::now(); - self.record( - start, - "add_to_set", - self.cassandra - .session - .execute_unpaged(&self.statement(&query).await, (namespace, key, value)) - .await, - ) - .map(|_| ()) - } - - pub async fn remove_from_set( - &self, - namespace: &str, - key: &str, - value: &[u8], - ) -> Result<(), QueryError> { - let query = format!( - "DELETE FROM {}.kv_sets WHERE namespace = ? AND key = ? AND value = ?;", - self.keyspace - ); - - let start = Instant::now(); - self.record( - start, - "del", - self.cassandra - .session - .execute_unpaged(&self.statement(&query).await, (namespace, key, value)) - .await, - ) - .map(|_| ()) - } - - pub async fn members_of_set( - &self, - namespace: &str, - key: &str, - ) -> Result, QueryError> { - let query = format!( - "SELECT value FROM {}.kv_sets WHERE namespace = ? AND key = ?;", - self.keyspace - ); - - let start = Instant::now(); - let mut rows = self - .cassandra - .session - .execute_iter(self.statement(&query).await, (namespace, key)) - .await? - .into_typed::(); - - let mut result = Vec::new(); - while let Some(row) = rows.next().await { - match row { - Ok(row) => result.push(row.into_bytes()), - Err(err) => { - return self.record( - start, - "members_of_set", - Err(QueryError::InvalidMessage(err.to_string())), - ) - } - } - } - self.record(start, "members_of_set", Ok(result)) - } - - pub async fn add_to_sorted_set( - &self, - namespace: &str, - key: &str, - score: f64, - value: &[u8], - ) -> Result<(), QueryError> { - self.remove_from_sorted_set(namespace, key, value).await?; - let insert_statement = format!( - "INSERT INTO {}.kv_sorted_sets (namespace, key, score, value) VALUES (?, ?, ?, ?);", - self.keyspace - ); - - let start = Instant::now(); - self.record( - start, - "add_to_sorted_set", - self.cassandra - .session - .execute_unpaged( - &self.statement(&insert_statement).await, - (namespace, key, score, value), - ) - .await, - ) - .map(|_| ()) - } - - pub async fn remove_from_sorted_set( - &self, - namespace: &str, - key: &str, - value: &[u8], - ) -> Result<(), QueryError> { - let get_score = format!( - "SELECT SCORE FROM {}.kv_sorted_sets WHERE namespace = ? AND key = ? AND value = ? ALLOW FILTERING;", - self.keyspace - ); - let start = Instant::now(); - match self - .cassandra - .session - .execute_unpaged(&self.statement(&get_score).await, &(namespace, key, value)) - .await? - .maybe_first_row_typed::<(f64,)>() - { - Ok(None) => self.record(start, "remove_from_sorted_set", Ok(())), - Ok(Some((score,))) => { - let delete_statement = format!("DELETE FROM {}.kv_sorted_sets WHERE namespace = ? AND key = ? AND score = ? AND value = ?;", self.keyspace); - self.record( - start, - "remove_from_sorted_set", - self.cassandra - .session - .execute_unpaged( - &self.statement(&delete_statement).await, - &(namespace, key, score, value), - ) - .await, - ) - .map(|_| ()) - } - Err(err) => self.record( - start, - "remove_from_sorted_set", - Err(QueryError::InvalidMessage(err.to_string())), - ), - } - } - - async fn execute_query( - &self, - statement: String, - values: impl SerializeRow, cmd_name: &'static str, - ) -> Result, QueryError> { + query: String, + values: impl SerializeRow, + mut map_to_value: F, + ) -> Result, QueryError> + where + RowT: FromRow, + T: Debug, + F: FnMut(RowT) -> T, + { let start = Instant::now(); let mut rows = self .cassandra .session - .execute_iter(self.statement(&statement).await, values) + .execute_iter(self.statement(&query).await, &values) .await? - .into_typed::(); + .into_typed::(); let mut result = Vec::new(); while let Some(row) = rows.next().await { match row { - Ok(row) => result.push(row.into_pair()), + Ok(row) => result.push(map_to_value(row)), Err(err) => { return self.record( start, @@ -643,33 +281,4 @@ impl CassandraLabelledApi { } self.record(start, cmd_name, Ok(result)) } - - pub async fn get_sorted_set( - &self, - namespace: &str, - key: &str, - ) -> Result, QueryError> { - let query = format!( - "SELECT score, value FROM {}.kv_sorted_sets WHERE namespace = ? AND key = ? ORDER BY score ASC;", - self.keyspace - ); - - self.execute_query(query, (namespace, key), "get_sorted_set") - .await - } - - pub async fn query_sorted_set( - &self, - namespace: &str, - key: &str, - min: f64, - max: f64, - ) -> Result, QueryError> { - let query = format!( - "SELECT score, value FROM {}.kv_sorted_sets WHERE namespace = ? AND key = ? AND score >= ? AND score <= ? ORDER BY score ASC;", - self.keyspace - ); - self.execute_query(query, (namespace, key, min, max), "query_sorted_set") - .await - } } diff --git a/golem-worker-executor-base/src/storage/keyvalue/cassandra.rs b/golem-worker-executor-base/src/storage/keyvalue/cassandra.rs index 6c30354d7..972c724fc 100644 --- a/golem-worker-executor-base/src/storage/keyvalue/cassandra.rs +++ b/golem-worker-executor-base/src/storage/keyvalue/cassandra.rs @@ -18,6 +18,13 @@ use crate::storage::{ }; use async_trait::async_trait; use bytes::Bytes; +use scylla::{ + prepared_statement::PreparedStatement, serialize::row::SerializeRow, + transport::errors::QueryError, FromRow, +}; +use serde::Deserialize; +use std::fmt::Debug; +use std::{collections::HashMap, iter::repeat}; #[derive(Debug)] pub struct CassandraKeyValueStorage { @@ -39,6 +46,66 @@ impl CassandraKeyValueStorage { } } } + + async fn statement(&self, query_text: &str) -> PreparedStatement { + let mut statement = self.session.session.prepare(query_text).await.unwrap(); + statement.set_tracing(self.session.set_tracing); + statement + } + + async fn maybe_row( + &self, + query: String, + values: impl SerializeRow, + map_to_value: F, + ) -> Result, QueryError> + where + RowT: FromRow, + T: Debug, + F: FnOnce(RowT) -> T, + { + self.session + .session + .execute_unpaged(&self.statement(&query).await, values) + .await? + .maybe_first_row_typed::() + .map_err(|e| QueryError::InvalidMessage(e.to_string())) + .map(|opt_row| opt_row.map(map_to_value)) + } +} + +#[derive(FromRow, Debug, Deserialize)] +struct ValueRow { + value: Vec, +} + +impl ValueRow { + fn into_bytes(self) -> Bytes { + Bytes::from(self.value) + } +} + +#[derive(FromRow, Debug, Deserialize)] +struct KeyValueRow { + key: String, + value: Vec, +} + +impl KeyValueRow { + fn into_pair(self) -> (String, Bytes) { + (self.key, Bytes::from(self.value)) + } +} +#[derive(FromRow, Debug, Deserialize)] +struct ScoreValueRow { + score: f64, + value: Vec, +} + +impl ScoreValueRow { + fn into_pair(self) -> (f64, Bytes) { + (self.score, Bytes::from(self.value)) + } } #[async_trait] @@ -52,9 +119,13 @@ impl KeyValueStorage for CassandraKeyValueStorage { key: &str, value: &[u8], ) -> Result<(), String> { + let query = format!( + "INSERT INTO {}.kv_store (namespace, key, value) VALUES (?, ?, ?);", + self.session.keyspace.clone() + ); self.session .with(svc_name, api_name) - .set(&self.to_string(namespace), key, value) + .perform_query("set", query, (&self.to_string(namespace), key, value)) .await .map_err(|e| e.to_string()) } @@ -67,9 +138,19 @@ impl KeyValueStorage for CassandraKeyValueStorage { namespace: KeyValueStorageNamespace, pairs: &[(&str, &[u8])], ) -> Result<(), String> { + let query = format!( + "INSERT INTO {}.kv_store (namespace, key, value) VALUES (?, ?, ?)", + self.session.keyspace + ); + let namespace = self.to_string(namespace); + let values = pairs + .iter() + .map(|(field_key, field_value)| (&namespace, *field_key, *field_value)) + .collect::>(); + self.session .with(svc_name, api_name) - .set_many(&self.to_string(namespace), pairs) + .perform_batch("set_many", query, values) .await .map_err(|e| e.to_string()) } @@ -83,11 +164,27 @@ impl KeyValueStorage for CassandraKeyValueStorage { key: &str, value: &[u8], ) -> Result { + let exists_query = format!( + "SELECT value FROM {}.kv_store WHERE namespace = ? AND key = ? LIMIT 1;", + self.session.keyspace + ); + let namespace = self.to_string(namespace); + let not_exists = self + .maybe_row(exists_query, (&namespace, key), |r: ValueRow| r) + .await + .map_or(true, |opt| opt.is_none()); + + let insert_query = format!( + "INSERT INTO {}.kv_store (namespace, key, value) VALUES (?, ?, ?) IF NOT EXISTS;", + self.session.keyspace + ); + self.session .with(svc_name, api_name) - .set_if_not_exists(&self.to_string(namespace), key, value) + .perform_query("set_if_not_exists", insert_query, (&namespace, key, value)) .await .map_err(|e| e.to_string()) + .map(|_| not_exists) } async fn get( @@ -98,9 +195,19 @@ impl KeyValueStorage for CassandraKeyValueStorage { namespace: KeyValueStorageNamespace, key: &str, ) -> Result, String> { + let query = format!( + "SELECT value FROM {}.kv_store WHERE namespace = ? AND key = ?;", + self.session.keyspace + ); + self.session .with(svc_name, api_name) - .get(&self.to_string(namespace), key) + .maybe_row( + "get", + query, + (self.to_string(namespace), key), + |row: ValueRow| row.into_bytes(), + ) .await .map_err(|e| e.to_string()) } @@ -113,11 +220,36 @@ impl KeyValueStorage for CassandraKeyValueStorage { namespace: KeyValueStorageNamespace, keys: Vec, ) -> Result>, String> { - self.session + let placeholders: String = repeat("?").take(keys.len()).collect::>().join(", "); + let query = format!( + "SELECT key, value FROM {}.kv_store WHERE namespace = ? AND key IN ({});", + self.session.keyspace, placeholders + ); + + let parameters: Vec = vec![self.to_string(namespace)] + .into_iter() + .chain(keys) + .collect(); + + let result = self + .session .with(svc_name, api_name) - .get_many(&self.to_string(namespace), keys) + .get_rows("get_many", query, ¶meters, |row: KeyValueRow| { + row.into_pair() + }) .await .map_err(|e| e.to_string()) + .unwrap(); + + let mut result_map = result.into_iter().collect::>(); + + let keys = parameters[1..].to_vec(); + let values = keys + .into_iter() + .map(|key| result_map.remove(&key)) + .collect::>>(); + + Ok(values) } async fn del( @@ -127,9 +259,14 @@ impl KeyValueStorage for CassandraKeyValueStorage { namespace: KeyValueStorageNamespace, key: &str, ) -> Result<(), String> { + let query = format!( + "DELETE FROM {}.kv_store WHERE namespace = ? AND key = ?;", + self.session.keyspace + ); + self.session .with(svc_name, api_name) - .del(&self.to_string(namespace), key) + .perform_query("del", query, (&self.to_string(namespace), key)) .await .map_err(|e| e.to_string()) } @@ -141,9 +278,20 @@ impl KeyValueStorage for CassandraKeyValueStorage { namespace: KeyValueStorageNamespace, keys: Vec, ) -> Result<(), String> { + let placeholders: String = repeat("?").take(keys.len()).collect::>().join(", "); + let query = format!( + "DELETE FROM {}.kv_store WHERE namespace = ? AND key IN ({});", + self.session.keyspace, placeholders + ); + + let parameters: Vec = vec![self.to_string(namespace)] + .into_iter() + .chain(keys) + .collect(); + self.session .with(svc_name, api_name) - .del_many(&self.to_string(namespace), keys) + .perform_query("del_many", query, ¶meters) .await .map_err(|e| e.to_string()) } @@ -155,10 +303,21 @@ impl KeyValueStorage for CassandraKeyValueStorage { namespace: KeyValueStorageNamespace, key: &str, ) -> Result { + let query = format!( + "SELECT value FROM {}.kv_store WHERE namespace = ? AND key = ? LIMIT 1;", + self.session.keyspace + ); + self.session .with(svc_name, api_name) - .exists(&self.to_string(namespace), key) + .maybe_row( + "exists", + query, + (self.to_string(namespace), key), + |row: ValueRow| row, + ) .await + .map(|opt| opt.is_some()) .map_err(|e| e.to_string()) } @@ -168,9 +327,19 @@ impl KeyValueStorage for CassandraKeyValueStorage { api_name: &'static str, namespace: KeyValueStorageNamespace, ) -> Result, String> { + let query = format!( + "SELECT key FROM {}.kv_store WHERE namespace = ?;", + self.session.keyspace + ); + self.session .with(svc_name, api_name) - .keys(&self.to_string(namespace)) + .get_rows( + "keys", + query, + (self.to_string(namespace),), + |row: (String,)| row.0, + ) .await .map_err(|e| e.to_string()) } @@ -184,9 +353,18 @@ impl KeyValueStorage for CassandraKeyValueStorage { key: &str, value: &[u8], ) -> Result<(), String> { + let query = format!( + "INSERT INTO {}.kv_sets (namespace, key, value) VALUES (?, ?, ?);", + self.session.keyspace + ); + self.session .with(svc_name, api_name) - .add_to_set(&self.to_string(namespace), key, value) + .perform_query( + "add_to_set", + query, + (&self.to_string(namespace), key, value), + ) .await .map_err(|e| e.to_string()) } @@ -200,9 +378,18 @@ impl KeyValueStorage for CassandraKeyValueStorage { key: &str, value: &[u8], ) -> Result<(), String> { + let query = format!( + "DELETE FROM {}.kv_sets WHERE namespace = ? AND key = ? AND value = ?;", + self.session.keyspace + ); + self.session .with(svc_name, api_name) - .remove_from_set(&self.to_string(namespace), key, value) + .perform_query( + "remove_from_set", + query, + (&self.to_string(namespace), key, value), + ) .await .map_err(|e| e.to_string()) } @@ -215,9 +402,19 @@ impl KeyValueStorage for CassandraKeyValueStorage { namespace: KeyValueStorageNamespace, key: &str, ) -> Result, String> { + let query = format!( + "SELECT value FROM {}.kv_sets WHERE namespace = ? AND key = ?;", + self.session.keyspace + ); + self.session .with(svc_name, api_name) - .members_of_set(&self.to_string(namespace), key) + .get_rows( + "members_of_set", + query, + (&self.to_string(namespace), key), + |row: ValueRow| row.into_bytes(), + ) .await .map_err(|e| e.to_string()) } @@ -226,15 +423,33 @@ impl KeyValueStorage for CassandraKeyValueStorage { &self, svc_name: &'static str, api_name: &'static str, - _entity_name: &'static str, + entity_name: &'static str, namespace: KeyValueStorageNamespace, key: &str, score: f64, value: &[u8], ) -> Result<(), String> { + self.remove_from_sorted_set( + svc_name, + api_name, + entity_name, + namespace.clone(), + key, + value, + ) + .await?; + let insert_statement = format!( + "INSERT INTO {}.kv_sorted_sets (namespace, key, score, value) VALUES (?, ?, ?, ?);", + self.session.keyspace + ); + self.session .with(svc_name, api_name) - .add_to_sorted_set(&self.to_string(namespace), key, score, value) + .perform_query( + "add_to_sorted_set", + insert_statement, + (&self.to_string(namespace), key, score, value), + ) .await .map_err(|e| e.to_string()) } @@ -248,11 +463,32 @@ impl KeyValueStorage for CassandraKeyValueStorage { key: &str, value: &[u8], ) -> Result<(), String> { - self.session - .with(svc_name, api_name) - .remove_from_sorted_set(&self.to_string(namespace), key, value) + let get_score = format!( + "SELECT score, value FROM {}.kv_sorted_sets WHERE namespace = ? AND key = ? AND value = ? ALLOW FILTERING;", + self.session.keyspace + ); + let namespace = self.to_string(namespace); + match self + .maybe_row(get_score, (&namespace, key, value), |row: ScoreValueRow| { + row.score + }) .await - .map_err(|e| e.to_string()) + .map_err(|e| e.to_string())? + { + None => Ok(()), + Some(score) => { + let delete_statement = format!("DELETE FROM {}.kv_sorted_sets WHERE namespace = ? AND key = ? AND score = ? AND value = ?;", self.session.keyspace); + self.session + .with(svc_name, api_name) + .perform_query( + "remove_from_sorted_set", + delete_statement, + (&namespace, key, score, value), + ) + .await + .map_err(|e| e.to_string()) + } + } } async fn get_sorted_set( @@ -263,9 +499,19 @@ impl KeyValueStorage for CassandraKeyValueStorage { namespace: KeyValueStorageNamespace, key: &str, ) -> Result, String> { + let query = format!( + "SELECT score, value FROM {}.kv_sorted_sets WHERE namespace = ? AND key = ? ORDER BY score ASC;", + self.session.keyspace + ); + self.session .with(svc_name, api_name) - .get_sorted_set(&self.to_string(namespace), key) + .get_rows( + "get_sorted_set", + query, + (&self.to_string(namespace), key), + |row: ScoreValueRow| row.into_pair(), + ) .await .map_err(|e| e.to_string()) } @@ -280,9 +526,18 @@ impl KeyValueStorage for CassandraKeyValueStorage { min: f64, max: f64, ) -> Result, String> { + let query = format!( + "SELECT score, value FROM {}.kv_sorted_sets WHERE namespace = ? AND key = ? AND score >= ? AND score <= ? ORDER BY score ASC;", + self.session.keyspace + ); self.session .with(svc_name, api_name) - .query_sorted_set(&self.to_string(namespace), key, min, max) + .get_rows( + "query_sorted_set", + query, + (&self.to_string(namespace), key, min, max), + |row: ScoreValueRow| row.into_pair(), + ) .await .map_err(|e| e.to_string()) } diff --git a/golem-worker-executor-base/tests/key_value_storage.rs b/golem-worker-executor-base/tests/key_value_storage.rs index 313ca6d5f..a8cc901db 100644 --- a/golem-worker-executor-base/tests/key_value_storage.rs +++ b/golem-worker-executor-base/tests/key_value_storage.rs @@ -129,7 +129,7 @@ pub(crate) async fn cassandra_storage() -> impl GetKeyValueStorage { let test_keyspace = format!("golem_test_{}", &Uuid::new_v4().to_string()[..8]); let session = cassandra.get_session(None).await; let cassandra_session = CassandraSession::new(session, true, &test_keyspace); - if let Err(err_msg) = cassandra_session.create_docker_schema().await { + if let Err(err_msg) = cassandra_session.create_schema().await { cassandra.kill(); panic!("Cannot create schema : {}", err_msg); }