From 441bacf53d1ea6bdceeae3fa56429dd5e0d9f7b0 Mon Sep 17 00:00:00 2001 From: etorreborre Date: Tue, 10 Dec 2024 18:30:53 +0100 Subject: [PATCH] refactor(rust): make the auto-retry an implementation detail of repositories --- Cargo.lock | 1 + examples/rust/get_started/src/token.rs | 2 +- .../storage/resource_policy_repository.rs | 40 + .../storage/resource_policy_repository_sql.rs | 17 +- .../src/policy/storage/resource_repository.rs | 20 + .../policy/storage/resource_repository_sql.rs | 17 +- .../resource_type_policy_repository.rs | 40 + .../resource_type_policy_repository_sql.rs | 19 +- .../authenticator/enrollment_tokens/issuer.rs | 2 +- .../authority_enrollment_token_repository.rs | 17 + ...thority_enrollment_token_repository_sql.rs | 46 +- .../storage/authority_members_repository.rs | 30 + .../authority_members_repository_sql.rs | 18 +- .../ockam_api/src/authority_node/authority.rs | 14 +- .../ockam_api/src/cli_state/enrollments.rs | 4 +- .../src/cli_state/identities_attributes.rs | 6 +- .../ockam/ockam_api/src/cli_state/policies.rs | 13 +- .../ockam_api/src/cli_state/repositories.rs | 144 +-- .../ockam_api/src/cli_state/resources.rs | 6 +- .../src/cli_state/secure_channels.rs | 6 +- .../src/cli_state/storage/auto_retry.rs | 821 ------------------ .../storage/enrollments_repository.rs | 30 +- .../storage/enrollments_repository_sql.rs | 20 +- .../storage/identities_repository.rs | 72 ++ .../storage/identities_repository_sql.rs | 15 +- .../cli_state/storage/journeys_repository.rs | 29 + .../storage/journeys_repository_sql.rs | 11 + .../ockam_api/src/cli_state/storage/mod.rs | 2 - .../src/cli_state/storage/nodes_repository.rs | 89 +- .../cli_state/storage/nodes_repository_sql.rs | 19 +- .../cli_state/storage/projects_repository.rs | 36 +- .../storage/projects_repository_sql.rs | 22 +- .../cli_state/storage/spaces_repository.rs | 33 + .../storage/spaces_repository_sql.rs | 11 + .../storage/tcp_portals_repository.rs | 37 + .../storage/tcp_portals_repository_sql.rs | 10 + .../src/cli_state/storage/users_repository.rs | 29 + .../cli_state/storage/users_repository_sql.rs | 14 +- .../cli_state/storage/vaults_repository.rs | 29 + .../storage/vaults_repository_sql.rs | 17 +- .../src/nodes/service/secure_channel.rs | 5 +- .../ockam/ockam_api/tests/token_enrollment.rs | 15 +- .../rust/ockam/ockam_app_lib/Cargo.toml | 1 + .../src/state/model_state_repository.rs | 13 + .../storage/change_history_repository.rs | 37 +- .../storage/change_history_repository_sql.rs | 17 +- .../storage/credential_repository.rs | 34 + .../storage/credential_repository_sql.rs | 19 +- .../storage/identity_attributes_repository.rs | 24 + .../identity_attributes_repository_sql.rs | 20 +- .../storage/purpose_keys_repository.rs | 42 +- .../storage/purpose_keys_repository_sql.rs | 21 +- .../storage/secure_channel_repository.rs | 23 + .../storage/secure_channel_repository_sql.rs | 11 + .../src/storage/database/auto_retry.rs | 40 + .../ockam_node/src/storage/database/mod.rs | 2 + .../src/storage/database/sqlx_database.rs | 9 + .../src/storage/secrets_repository.rs | 76 ++ .../src/storage/secrets_repository_sql.rs | 14 +- 59 files changed, 1136 insertions(+), 1095 deletions(-) delete mode 100644 implementations/rust/ockam/ockam_api/src/cli_state/storage/auto_retry.rs create mode 100644 implementations/rust/ockam/ockam_node/src/storage/database/auto_retry.rs diff --git a/Cargo.lock b/Cargo.lock index 20230e6fb2e..a9146e204ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4801,6 +4801,7 @@ dependencies = [ "ockam_api", "ockam_core", "ockam_multiaddr", + "ockam_node", "serde", "serde_json", "sqlx", diff --git a/examples/rust/get_started/src/token.rs b/examples/rust/get_started/src/token.rs index 9cc798abbf0..3d232a01a57 100644 --- a/examples/rust/get_started/src/token.rs +++ b/examples/rust/get_started/src/token.rs @@ -30,7 +30,7 @@ pub async fn create_token(attribute_name: &str, attribute_value: &str) -> Result ExportedEnrollmentTicket::from_str(token_string).map_err(|e| error(format!("could not decode token: {e}")))?; let ticket = decoded.import().await?; - Ok(ticket.one_time_code.clone()) + Ok(ticket.one_time_code) } fn error(message: String) -> Error { diff --git a/implementations/rust/ockam/ockam_abac/src/policy/storage/resource_policy_repository.rs b/implementations/rust/ockam/ockam_abac/src/policy/storage/resource_policy_repository.rs index 004692f3dbd..4a367e61f08 100644 --- a/implementations/rust/ockam/ockam_abac/src/policy/storage/resource_policy_repository.rs +++ b/implementations/rust/ockam/ockam_abac/src/policy/storage/resource_policy_repository.rs @@ -3,6 +3,10 @@ use ockam_core::async_trait; use ockam_core::compat::boxed::Box; use ockam_core::compat::vec::Vec; use ockam_core::Result; +#[cfg(feature = "std")] +use ockam_node::database::AutoRetry; +#[cfg(feature = "std")] +use ockam_node::retry; /// This repository stores policies for resources. /// A policy is an expression which can be evaluated against an environment (a list of attribute @@ -36,3 +40,39 @@ pub trait ResourcePoliciesRepository: Send + Sync + 'static { /// Delete the policy associated to a given resource name and action async fn delete_policy(&self, resource_name: &ResourceName, action: &Action) -> Result<()>; } + +#[cfg(feature = "std")] +#[async_trait] +impl ResourcePoliciesRepository for AutoRetry { + async fn store_policy( + &self, + resource_name: &ResourceName, + action: &Action, + expression: &Expr, + ) -> Result<()> { + retry!(self.wrapped.store_policy(resource_name, action, expression)) + } + + async fn get_policy( + &self, + resource_name: &ResourceName, + action: &Action, + ) -> Result> { + retry!(self.wrapped.get_policy(resource_name, action)) + } + + async fn get_policies(&self) -> Result> { + retry!(self.wrapped.get_policies()) + } + + async fn get_policies_by_resource_name( + &self, + resource_name: &ResourceName, + ) -> Result> { + retry!(self.wrapped.get_policies_by_resource_name(resource_name)) + } + + async fn delete_policy(&self, resource_name: &ResourceName, action: &Action) -> Result<()> { + retry!(self.wrapped.delete_policy(resource_name, action)) + } +} diff --git a/implementations/rust/ockam/ockam_abac/src/policy/storage/resource_policy_repository_sql.rs b/implementations/rust/ockam/ockam_abac/src/policy/storage/resource_policy_repository_sql.rs index 7d7c7ed62a5..35a5fec3484 100644 --- a/implementations/rust/ockam/ockam_abac/src/policy/storage/resource_policy_repository_sql.rs +++ b/implementations/rust/ockam/ockam_abac/src/policy/storage/resource_policy_repository_sql.rs @@ -1,14 +1,15 @@ use core::str::FromStr; use sqlx::*; +use std::sync::Arc; use tracing::debug; +use crate::{Action, Expr, ResourceName, ResourcePoliciesRepository, ResourcePolicy}; use ockam_core::async_trait; use ockam_core::compat::vec::Vec; use ockam_core::Result; +use ockam_node::database::AutoRetry; use ockam_node::database::{FromSqlxError, SqlxDatabase, ToVoid}; -use crate::{Action, Expr, ResourceName, ResourcePoliciesRepository, ResourcePolicy}; - #[derive(Clone)] pub struct ResourcePolicySqlxDatabase { database: SqlxDatabase, @@ -25,6 +26,18 @@ impl ResourcePolicySqlxDatabase { } } + /// Create a repository + pub fn make_repository( + database: SqlxDatabase, + node_name: &str, + ) -> Arc { + if database.needs_retry() { + Arc::new(AutoRetry::new(Self::new(database, node_name))) + } else { + Arc::new(Self::new(database, node_name)) + } + } + /// Create a new in-memory database for policies pub async fn create() -> Result { Ok(Self::new( diff --git a/implementations/rust/ockam/ockam_abac/src/policy/storage/resource_repository.rs b/implementations/rust/ockam/ockam_abac/src/policy/storage/resource_repository.rs index 3d757787841..cb8a024ad9a 100644 --- a/implementations/rust/ockam/ockam_abac/src/policy/storage/resource_repository.rs +++ b/implementations/rust/ockam/ockam_abac/src/policy/storage/resource_repository.rs @@ -2,6 +2,10 @@ use crate::{Resource, ResourceName}; use ockam_core::async_trait; use ockam_core::compat::boxed::Box; use ockam_core::Result; +#[cfg(feature = "std")] +use ockam_node::database::AutoRetry; +#[cfg(feature = "std")] +use ockam_node::retry; /// This repository stores resources. #[async_trait] @@ -15,3 +19,19 @@ pub trait ResourcesRepository: Send + Sync + 'static { /// Delete all the entries for the given resource name async fn delete_resource(&self, resource_name: &ResourceName) -> Result<()>; } + +#[cfg(feature = "std")] +#[async_trait] +impl ResourcesRepository for AutoRetry { + async fn store_resource(&self, resource: &Resource) -> Result<()> { + retry!(self.wrapped.store_resource(resource)) + } + + async fn get_resource(&self, resource_name: &ResourceName) -> Result> { + retry!(self.wrapped.get_resource(resource_name)) + } + + async fn delete_resource(&self, resource_name: &ResourceName) -> Result<()> { + retry!(self.wrapped.delete_resource(resource_name)) + } +} diff --git a/implementations/rust/ockam/ockam_abac/src/policy/storage/resource_repository_sql.rs b/implementations/rust/ockam/ockam_abac/src/policy/storage/resource_repository_sql.rs index 29354dd82d7..b92828e028b 100644 --- a/implementations/rust/ockam/ockam_abac/src/policy/storage/resource_repository_sql.rs +++ b/implementations/rust/ockam/ockam_abac/src/policy/storage/resource_repository_sql.rs @@ -3,14 +3,15 @@ use sqlx::encode::IsNull; use sqlx::error::BoxDynError; use sqlx::postgres::any::AnyArgumentBuffer; use sqlx::*; +use std::sync::Arc; use tracing::debug; +use crate::{Resource, ResourceName, ResourceType, ResourcesRepository}; use ockam_core::async_trait; use ockam_core::Result; +use ockam_node::database::AutoRetry; use ockam_node::database::{FromSqlxError, SqlxDatabase, ToVoid}; -use crate::{Resource, ResourceName, ResourceType, ResourcesRepository}; - #[derive(Clone)] pub struct ResourcesSqlxDatabase { database: SqlxDatabase, @@ -27,6 +28,18 @@ impl ResourcesSqlxDatabase { } } + /// Create a repository + pub fn make_repository( + database: SqlxDatabase, + node_name: &str, + ) -> Arc { + if database.needs_retry() { + Arc::new(AutoRetry::new(Self::new(database, node_name))) + } else { + Arc::new(Self::new(database, node_name)) + } + } + /// Create a new in-memory database for resources pub async fn create() -> Result { Ok(Self::new( diff --git a/implementations/rust/ockam/ockam_abac/src/policy/storage/resource_type_policy_repository.rs b/implementations/rust/ockam/ockam_abac/src/policy/storage/resource_type_policy_repository.rs index 55b332a0080..46611c77424 100644 --- a/implementations/rust/ockam/ockam_abac/src/policy/storage/resource_type_policy_repository.rs +++ b/implementations/rust/ockam/ockam_abac/src/policy/storage/resource_type_policy_repository.rs @@ -4,6 +4,10 @@ use ockam_core::async_trait; use ockam_core::compat::boxed::Box; use ockam_core::compat::vec::Vec; use ockam_core::Result; +#[cfg(feature = "std")] +use ockam_node::database::AutoRetry; +#[cfg(feature = "std")] +use ockam_node::retry; /// This repository stores policies for resources types. /// A policy is an expression which can be evaluated against an environment (a list of attribute @@ -37,3 +41,39 @@ pub trait ResourceTypePoliciesRepository: Send + Sync + 'static { /// Delete the policy associated to a given resource type and action async fn delete_policy(&self, resource_type: &ResourceType, action: &Action) -> Result<()>; } + +#[cfg(feature = "std")] +#[async_trait] +impl ResourceTypePoliciesRepository for AutoRetry { + async fn store_policy( + &self, + resource_type: &ResourceType, + action: &Action, + expression: &Expr, + ) -> Result<()> { + retry!(self.wrapped.store_policy(resource_type, action, expression)) + } + + async fn get_policy( + &self, + resource_type: &ResourceType, + action: &Action, + ) -> Result> { + retry!(self.wrapped.get_policy(resource_type, action)) + } + + async fn get_policies(&self) -> Result> { + retry!(self.wrapped.get_policies()) + } + + async fn get_policies_by_resource_type( + &self, + resource_type: &ResourceType, + ) -> Result> { + retry!(self.wrapped.get_policies_by_resource_type(resource_type)) + } + + async fn delete_policy(&self, resource_type: &ResourceType, action: &Action) -> Result<()> { + retry!(self.wrapped.delete_policy(resource_type, action)) + } +} diff --git a/implementations/rust/ockam/ockam_abac/src/policy/storage/resource_type_policy_repository_sql.rs b/implementations/rust/ockam/ockam_abac/src/policy/storage/resource_type_policy_repository_sql.rs index 1415a96fdb2..33169429433 100644 --- a/implementations/rust/ockam/ockam_abac/src/policy/storage/resource_type_policy_repository_sql.rs +++ b/implementations/rust/ockam/ockam_abac/src/policy/storage/resource_type_policy_repository_sql.rs @@ -3,16 +3,17 @@ use sqlx::encode::IsNull; use sqlx::error::BoxDynError; use sqlx::postgres::any::AnyArgumentBuffer; use sqlx::*; +use std::sync::Arc; use tracing::debug; +use crate::policy::ResourceTypePolicy; +use crate::{Action, Expr, ResourceType, ResourceTypePoliciesRepository}; use ockam_core::async_trait; use ockam_core::compat::vec::Vec; use ockam_core::Result; +use ockam_node::database::AutoRetry; use ockam_node::database::{FromSqlxError, SqlxDatabase, ToVoid}; -use crate::policy::ResourceTypePolicy; -use crate::{Action, Expr, ResourceType, ResourceTypePoliciesRepository}; - #[derive(Clone)] pub struct ResourceTypePolicySqlxDatabase { database: SqlxDatabase, @@ -29,6 +30,18 @@ impl ResourceTypePolicySqlxDatabase { } } + /// Create a repository + pub fn make_repository( + database: SqlxDatabase, + node_name: &str, + ) -> Arc { + if database.needs_retry() { + Arc::new(AutoRetry::new(Self::new(database, node_name))) + } else { + Arc::new(Self::new(database, node_name)) + } + } + /// Create a new in-memory database for policies pub async fn create() -> Result { Ok(Self::new( diff --git a/implementations/rust/ockam/ockam_api/src/authenticator/enrollment_tokens/issuer.rs b/implementations/rust/ockam/ockam_api/src/authenticator/enrollment_tokens/issuer.rs index 104ae7b19f0..d25046b5481 100644 --- a/implementations/rust/ockam/ockam_api/src/authenticator/enrollment_tokens/issuer.rs +++ b/implementations/rust/ockam/ockam_api/src/authenticator/enrollment_tokens/issuer.rs @@ -98,7 +98,7 @@ impl EnrollmentTokenIssuer { let now = now()?; let expires_at = now + max_token_duration.as_secs(); let tkn = EnrollmentToken { - one_time_code: one_time_code.clone(), + one_time_code, reference: Some(reference.clone()), issued_by: enroller.clone(), created_at: now, diff --git a/implementations/rust/ockam/ockam_api/src/authenticator/storage/authority_enrollment_token_repository.rs b/implementations/rust/ockam/ockam_api/src/authenticator/storage/authority_enrollment_token_repository.rs index 5d56071aa3b..04e44f498b9 100644 --- a/implementations/rust/ockam/ockam_api/src/authenticator/storage/authority_enrollment_token_repository.rs +++ b/implementations/rust/ockam/ockam_api/src/authenticator/storage/authority_enrollment_token_repository.rs @@ -4,6 +4,8 @@ use ockam::identity::TimestampInSeconds; use ockam_core::async_trait; use ockam_core::compat::boxed::Box; use ockam_core::Result; +use ockam_node::database::AutoRetry; +use ockam_node::retry; /// This repository stores enrollment tokens on the Authority node #[async_trait] @@ -18,3 +20,18 @@ pub trait AuthorityEnrollmentTokenRepository: Send + Sync + 'static { /// Store a newly issued enrolment token async fn store_new_token(&self, token: EnrollmentToken) -> Result<()>; } + +#[async_trait] +impl AuthorityEnrollmentTokenRepository for AutoRetry { + async fn use_token( + &self, + one_time_code: OneTimeCode, + now: TimestampInSeconds, + ) -> Result> { + retry!(self.wrapped.use_token(one_time_code, now)) + } + + async fn store_new_token(&self, token: EnrollmentToken) -> Result<()> { + retry!(self.wrapped.store_new_token(token.clone())) + } +} diff --git a/implementations/rust/ockam/ockam_api/src/authenticator/storage/authority_enrollment_token_repository_sql.rs b/implementations/rust/ockam/ockam_api/src/authenticator/storage/authority_enrollment_token_repository_sql.rs index 307e9c9cb88..61d9d5bf78a 100644 --- a/implementations/rust/ockam/ockam_api/src/authenticator/storage/authority_enrollment_token_repository_sql.rs +++ b/implementations/rust/ockam/ockam_api/src/authenticator/storage/authority_enrollment_token_repository_sql.rs @@ -3,16 +3,17 @@ use sqlx::encode::IsNull; use sqlx::error::BoxDynError; use sqlx::postgres::any::AnyArgumentBuffer; use sqlx::*; - -use ockam_core::async_trait; -use ockam_core::Result; -use ockam_node::database::{FromSqlxError, SqlxDatabase, ToVoid}; -use tracing::debug; +use std::sync::Arc; use crate::authenticator::one_time_code::OneTimeCode; use crate::authenticator::{ AuthorityEnrollmentTokenRepository, EnrollmentToken, EnrollmentTokenRow, }; +use ockam_core::async_trait; +use ockam_core::Result; +use ockam_node::database::AutoRetry; +use ockam_node::database::{FromSqlxError, SqlxDatabase, ToVoid}; +use tracing::debug; /// Implementation of [`AuthorityEnrollmentTokenRepository`] trait based on an underlying database /// using sqlx as its API @@ -28,6 +29,15 @@ impl AuthorityEnrollmentTokenSqlxDatabase { Self { database } } + /// Create a repository + pub fn make_repository(database: SqlxDatabase) -> Arc { + if database.needs_retry() { + Arc::new(AutoRetry::new(Self::new(database))) + } else { + Arc::new(Self::new(database)) + } + } + /// Create a new in-memory database pub async fn create() -> Result { Ok(Self::new( @@ -54,7 +64,7 @@ impl AuthorityEnrollmentTokenRepository for AuthorityEnrollmentTokenSqlxDatabase let mut transaction = self.database.pool.begin().await.into_core()?; let query2 = query_as("SELECT one_time_code, reference, issued_by, created_at, expires_at, ttl_count, attributes FROM authority_enrollment_token WHERE one_time_code = $1") - .bind(&one_time_code); + .bind(one_time_code); let row: Option = query2.fetch_optional(&mut *transaction).await.into_core()?; let token: Option = row.map(|r| r.try_into()).transpose()?; @@ -63,7 +73,7 @@ impl AuthorityEnrollmentTokenRepository for AuthorityEnrollmentTokenSqlxDatabase if token.ttl_count <= 1 { let query3 = query("DElETE FROM authority_enrollment_token WHERE one_time_code = $1") - .bind(&one_time_code); + .bind(one_time_code); query3.execute(&mut *transaction).await.void()?; debug!( "Deleted enrollment token because it has been used. Reference: {}", @@ -75,7 +85,7 @@ impl AuthorityEnrollmentTokenRepository for AuthorityEnrollmentTokenSqlxDatabase "UPDATE authority_enrollment_token SET ttl_count = $1 WHERE one_time_code = $2", ) .bind(new_ttl_count as i64) - .bind(&one_time_code); + .bind(one_time_code); query3.execute(&mut *transaction).await.void()?; debug!( "Decreasing enrollment token usage count to {}. Reference: {}", @@ -155,7 +165,7 @@ mod tests { attrs.insert("role".to_string(), "user".to_string()); let token = EnrollmentToken { - one_time_code: one_time_code.clone(), + one_time_code, reference: None, issued_by: issued_by.clone(), created_at, @@ -166,7 +176,7 @@ mod tests { repository.store_new_token(token).await?; - let token1 = repository.use_token(one_time_code.clone(), now()?).await?; + let token1 = repository.use_token(one_time_code, now()?).await?; assert!(token1.is_some()); let token1 = token1.unwrap(); assert_eq!(token1.one_time_code, one_time_code); @@ -206,7 +216,7 @@ mod tests { attrs.insert("role".to_string(), "user".to_string()); let token = EnrollmentToken { - one_time_code: one_time_code.clone(), + one_time_code, reference: reference.clone(), issued_by: issued_by.clone(), created_at, @@ -217,7 +227,7 @@ mod tests { repository.store_new_token(token).await?; - let token1 = repository.use_token(one_time_code.clone(), now()?).await?; + let token1 = repository.use_token(one_time_code, now()?).await?; assert!(token1.is_some()); let token1 = token1.unwrap(); assert_eq!(token1.one_time_code, one_time_code); @@ -253,7 +263,7 @@ mod tests { attrs.insert("role".to_string(), "user".to_string()); let token = EnrollmentToken { - one_time_code: one_time_code.clone(), + one_time_code, reference: None, issued_by: issued_by.clone(), created_at, @@ -264,9 +274,9 @@ mod tests { repository.store_new_token(token).await?; - let token1 = repository.use_token(one_time_code.clone(), now()?).await?; - let token2 = repository.use_token(one_time_code.clone(), now()?).await?; - let token3 = repository.use_token(one_time_code.clone(), now()?).await?; + let token1 = repository.use_token(one_time_code, now()?).await?; + let token2 = repository.use_token(one_time_code, now()?).await?; + let token3 = repository.use_token(one_time_code, now()?).await?; assert!(token1.is_some()); assert!(token2.is_some()); assert!(token3.is_none()); @@ -309,7 +319,7 @@ mod tests { attrs.insert("role".to_string(), "user".to_string()); let token = EnrollmentToken { - one_time_code: one_time_code.clone(), + one_time_code, reference: None, issued_by: issued_by.clone(), created_at, @@ -324,7 +334,7 @@ mod tests { tokio::time::sleep(Duration::from_secs(2)).await; - let token1 = repository.use_token(one_time_code.clone(), now()?).await?; + let token1 = repository.use_token(one_time_code, now()?).await?; assert!(token1.is_none()); Ok(()) }) diff --git a/implementations/rust/ockam/ockam_api/src/authenticator/storage/authority_members_repository.rs b/implementations/rust/ockam/ockam_api/src/authenticator/storage/authority_members_repository.rs index 87c404f4ef1..6c1afe0ac77 100644 --- a/implementations/rust/ockam/ockam_api/src/authenticator/storage/authority_members_repository.rs +++ b/implementations/rust/ockam/ockam_api/src/authenticator/storage/authority_members_repository.rs @@ -4,6 +4,8 @@ use ockam_core::async_trait; use ockam_core::compat::boxed::Box; use ockam_core::compat::vec::Vec; use ockam_core::Result; +use ockam_node::database::AutoRetry; +use ockam_node::retry; /// This repository stores project members on the Authority node #[async_trait] @@ -26,3 +28,31 @@ pub trait AuthorityMembersRepository: Send + Sync + 'static { pre_trusted_identities: &PreTrustedIdentities, ) -> Result<()>; } + +#[async_trait] +impl AuthorityMembersRepository for AutoRetry { + async fn get_member(&self, identifier: &Identifier) -> Result> { + retry!(self.wrapped.get_member(identifier)) + } + + async fn get_members(&self) -> Result> { + retry!(self.wrapped.get_members()) + } + + async fn delete_member(&self, identifier: &Identifier) -> Result<()> { + retry!(self.wrapped.delete_member(identifier)) + } + + async fn add_member(&self, member: AuthorityMember) -> Result<()> { + retry!(self.wrapped.add_member(member.clone())) + } + + async fn bootstrap_pre_trusted_members( + &self, + pre_trusted_identities: &PreTrustedIdentities, + ) -> Result<()> { + retry!(self + .wrapped + .bootstrap_pre_trusted_members(pre_trusted_identities)) + } +} diff --git a/implementations/rust/ockam/ockam_api/src/authenticator/storage/authority_members_repository_sql.rs b/implementations/rust/ockam/ockam_api/src/authenticator/storage/authority_members_repository_sql.rs index 558a8a3ad17..879a743d62a 100644 --- a/implementations/rust/ockam/ockam_api/src/authenticator/storage/authority_members_repository_sql.rs +++ b/implementations/rust/ockam/ockam_api/src/authenticator/storage/authority_members_repository_sql.rs @@ -1,16 +1,17 @@ use core::ops::Deref; use sqlx::*; +use std::sync::Arc; use tracing::debug; +use crate::authenticator::{ + AuthorityMember, AuthorityMemberRow, AuthorityMembersRepository, PreTrustedIdentities, +}; use ockam::identity::Identifier; use ockam_core::async_trait; use ockam_core::Result; +use ockam_node::database::AutoRetry; use ockam_node::database::{FromSqlxError, SqlxDatabase, ToVoid}; -use crate::authenticator::{ - AuthorityMember, AuthorityMemberRow, AuthorityMembersRepository, PreTrustedIdentities, -}; - #[derive(Clone)] pub struct AuthorityMembersSqlxDatabase { database: SqlxDatabase, @@ -23,6 +24,15 @@ impl AuthorityMembersSqlxDatabase { Self { database } } + /// Create a repository + pub fn make_repository(database: SqlxDatabase) -> Arc { + if database.needs_retry() { + Arc::new(AutoRetry::new(Self::new(database))) + } else { + Arc::new(Self::new(database)) + } + } + /// Create a new in-memory database pub async fn create() -> Result { Ok(Self::new( diff --git a/implementations/rust/ockam/ockam_api/src/authority_node/authority.rs b/implementations/rust/ockam/ockam_api/src/authority_node/authority.rs index 00411764f9f..c014eafc883 100644 --- a/implementations/rust/ockam/ockam_api/src/authority_node/authority.rs +++ b/implementations/rust/ockam/ockam_api/src/authority_node/authority.rs @@ -24,7 +24,6 @@ use ockam_node::database::SqlxDatabase; use ockam_node::Context; use crate::authority_node::Configuration; -use crate::cli_state::AutoRetry; use crate::echoer::Echoer; use crate::nodes::service::default_address::DefaultAddress; @@ -74,15 +73,10 @@ impl Authority { SqlxDatabase::create(&configuration.database_configuration).await? }; - let members = Arc::new(AutoRetry::new(AuthorityMembersSqlxDatabase::new( - database.clone(), - ))); - let tokens = Arc::new(AutoRetry::new(AuthorityEnrollmentTokenSqlxDatabase::new( - database.clone(), - ))); - let secure_channel_repository = Arc::new(AutoRetry::new(SecureChannelSqlxDatabase::new( - database.clone(), - ))); + let members = AuthorityMembersSqlxDatabase::make_repository(database.clone()); + let tokens = AuthorityEnrollmentTokenSqlxDatabase::make_repository(database.clone()); + let secure_channel_repository = + SecureChannelSqlxDatabase::make_repository(database.clone()); Self::bootstrap_repository(members.clone(), configuration).await?; diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/enrollments.rs b/implementations/rust/ockam/ockam_api/src/cli_state/enrollments.rs index 9d11423d361..7040fc9b635 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/enrollments.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/enrollments.rs @@ -702,7 +702,7 @@ mod tests { user_roles: vec![], project_change_history: Some(project_change_history.to_string()), }; - let legacy = LegacyEnrollmentTicket::new(otc.clone(), project.clone()); + let legacy = LegacyEnrollmentTicket::new(otc, project.clone()); let enrollment_ticket = EnrollmentTicket::new_from_legacy(legacy).await.unwrap(); assert_eq!(enrollment_ticket.one_time_code, otc); assert_eq!(enrollment_ticket.project_id, project_id); @@ -759,7 +759,7 @@ mod tests { .await .unwrap(); let exported = ExportedEnrollmentTicket::new( - otc.clone(), + otc, ProjectRoute::new_with_id(project_id).unwrap(), project_identity.identifier().to_string(), project_name, diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/identities_attributes.rs b/implementations/rust/ockam/ockam_api/src/cli_state/identities_attributes.rs index 51bc9183d75..259d897934a 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/identities_attributes.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/identities_attributes.rs @@ -1,4 +1,3 @@ -use crate::cli_state::AutoRetry; use crate::CliState; use ockam::identity::{ IdentitiesAttributes, IdentityAttributesRepository, IdentityAttributesSqlxDatabase, @@ -19,9 +18,6 @@ impl CliState { &self, node_name: &str, ) -> Arc { - Arc::new(AutoRetry::new(IdentityAttributesSqlxDatabase::new( - self.database(), - node_name, - ))) + IdentityAttributesSqlxDatabase::make_repository(self.database(), node_name) } } diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/policies.rs b/implementations/rust/ockam/ockam_api/src/cli_state/policies.rs index 882a46af23c..24ea8f1ceb4 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/policies.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/policies.rs @@ -1,18 +1,11 @@ -use crate::cli_state::{AutoRetry, CliState}; +use crate::cli_state::CliState; use ockam_abac::{Policies, ResourcePolicySqlxDatabase, ResourceTypePolicySqlxDatabase}; -use std::sync::Arc; impl CliState { pub fn policies(&self, node_name: &str) -> Policies { Policies::new( - Arc::new(AutoRetry::new(ResourcePolicySqlxDatabase::new( - self.database(), - node_name, - ))), - Arc::new(AutoRetry::new(ResourceTypePolicySqlxDatabase::new( - self.database(), - node_name, - ))), + ResourcePolicySqlxDatabase::make_repository(self.database(), node_name), + ResourceTypePolicySqlxDatabase::make_repository(self.database(), node_name), ) } } diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/repositories.rs b/implementations/rust/ockam/ockam_api/src/cli_state/repositories.rs index 083dcf77183..453acffd484 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/repositories.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/repositories.rs @@ -4,7 +4,6 @@ use ockam::identity::{ CredentialSqlxDatabase, }; use ockam_core::compat::sync::Arc; -use ockam_node::database::DatabaseConfiguration; use ockam_vault::storage::{SecretsRepository, SecretsSqlxDatabase}; use crate::cli_state::storage::*; @@ -18,171 +17,54 @@ use crate::cli_state::{UsersRepository, UsersSqlxDatabase}; /// stored in the database impl CliState { pub fn change_history_repository(&self) -> Arc { - let database = self.database(); - match database.configuration { - DatabaseConfiguration::SqlitePersistent { .. } - | DatabaseConfiguration::SqliteInMemory { .. } => { - Arc::new(AutoRetry::new(ChangeHistorySqlxDatabase::new(database))) - } - DatabaseConfiguration::Postgres { .. } => { - Arc::new(ChangeHistorySqlxDatabase::new(database)) - } - } + ChangeHistorySqlxDatabase::make_repository(self.database()) } pub(super) fn identities_repository(&self) -> Arc { - let database = self.database(); - match database.configuration { - DatabaseConfiguration::SqlitePersistent { .. } - | DatabaseConfiguration::SqliteInMemory { .. } => { - Arc::new(AutoRetry::new(IdentitiesSqlxDatabase::new(self.database()))) - } - DatabaseConfiguration::Postgres { .. } => { - Arc::new(IdentitiesSqlxDatabase::new(self.database())) - } - } + IdentitiesSqlxDatabase::make_repository(self.database()) } pub(super) fn purpose_keys_repository(&self) -> Arc { - let database = self.database(); - match database.configuration { - DatabaseConfiguration::SqlitePersistent { .. } - | DatabaseConfiguration::SqliteInMemory { .. } => Arc::new(AutoRetry::new( - PurposeKeysSqlxDatabase::new(self.database()), - )), - DatabaseConfiguration::Postgres { .. } => { - Arc::new(PurposeKeysSqlxDatabase::new(self.database())) - } - } + PurposeKeysSqlxDatabase::make_repository(self.database()) } pub(super) fn secrets_repository(&self) -> Arc { - let database = self.database(); - match database.configuration { - DatabaseConfiguration::SqlitePersistent { .. } - | DatabaseConfiguration::SqliteInMemory { .. } => { - Arc::new(AutoRetry::new(SecretsSqlxDatabase::new(self.database()))) - } - DatabaseConfiguration::Postgres { .. } => { - Arc::new(SecretsSqlxDatabase::new(self.database())) - } - } + SecretsSqlxDatabase::make_repository(self.database()) } pub(super) fn vaults_repository(&self) -> Arc { - let database = self.database(); - match database.configuration { - DatabaseConfiguration::SqlitePersistent { .. } - | DatabaseConfiguration::SqliteInMemory { .. } => { - Arc::new(AutoRetry::new(VaultsSqlxDatabase::new(self.database()))) - } - DatabaseConfiguration::Postgres { .. } => { - Arc::new(VaultsSqlxDatabase::new(self.database())) - } - } + VaultsSqlxDatabase::make_repository(self.database()) } pub(super) fn enrollment_repository(&self) -> Arc { - let database = self.database(); - match database.configuration { - DatabaseConfiguration::SqlitePersistent { .. } - | DatabaseConfiguration::SqliteInMemory { .. } => Arc::new(AutoRetry::new( - EnrollmentsSqlxDatabase::new(self.database()), - )), - DatabaseConfiguration::Postgres { .. } => { - Arc::new(EnrollmentsSqlxDatabase::new(self.database())) - } - } + EnrollmentsSqlxDatabase::make_repository(self.database()) } pub(super) fn nodes_repository(&self) -> Arc { - let database = self.database(); - match database.configuration { - DatabaseConfiguration::SqlitePersistent { .. } - | DatabaseConfiguration::SqliteInMemory { .. } => { - Arc::new(AutoRetry::new(NodesSqlxDatabase::new(self.database()))) - } - DatabaseConfiguration::Postgres { .. } => { - Arc::new(NodesSqlxDatabase::new(self.database())) - } - } + NodesSqlxDatabase::make_repository(self.database()) } pub(super) fn tcp_portals_repository(&self) -> Arc { - let database = self.database(); - match database.configuration { - DatabaseConfiguration::SqlitePersistent { .. } - | DatabaseConfiguration::SqliteInMemory { .. } => { - Arc::new(AutoRetry::new(TcpPortalsSqlxDatabase::new(self.database()))) - } - DatabaseConfiguration::Postgres { .. } => { - Arc::new(TcpPortalsSqlxDatabase::new(self.database())) - } - } + TcpPortalsSqlxDatabase::make_repository(self.database()) } pub(super) fn projects_repository(&self) -> Arc { - let database = self.database(); - match database.configuration { - DatabaseConfiguration::SqlitePersistent { .. } - | DatabaseConfiguration::SqliteInMemory { .. } => { - Arc::new(AutoRetry::new(ProjectsSqlxDatabase::new(self.database()))) - } - DatabaseConfiguration::Postgres { .. } => { - Arc::new(ProjectsSqlxDatabase::new(self.database())) - } - } + ProjectsSqlxDatabase::make_repository(self.database()) } pub(super) fn spaces_repository(&self) -> Arc { - let database = self.database(); - match database.configuration { - DatabaseConfiguration::SqlitePersistent { .. } - | DatabaseConfiguration::SqliteInMemory { .. } => { - Arc::new(AutoRetry::new(SpacesSqlxDatabase::new(self.database()))) - } - DatabaseConfiguration::Postgres { .. } => { - Arc::new(SpacesSqlxDatabase::new(self.database())) - } - } + SpacesSqlxDatabase::make_repository(self.database()) } pub(super) fn users_repository(&self) -> Arc { - let database = self.database(); - match database.configuration { - DatabaseConfiguration::SqlitePersistent { .. } - | DatabaseConfiguration::SqliteInMemory { .. } => { - Arc::new(AutoRetry::new(UsersSqlxDatabase::new(self.database()))) - } - DatabaseConfiguration::Postgres { .. } => { - Arc::new(UsersSqlxDatabase::new(self.database())) - } - } + UsersSqlxDatabase::make_repository(self.database()) } pub(super) fn user_journey_repository(&self) -> Arc { - let database = self.database(); - match database.configuration { - DatabaseConfiguration::SqlitePersistent { .. } - | DatabaseConfiguration::SqliteInMemory { .. } => Arc::new(AutoRetry::new( - JourneysSqlxDatabase::new(self.application_database()), - )), - DatabaseConfiguration::Postgres { .. } => { - Arc::new(JourneysSqlxDatabase::new(self.application_database())) - } - } + JourneysSqlxDatabase::make_repository(self.application_database()) } pub fn cached_credentials_repository(&self, node_name: &str) -> Arc { - let database = self.database(); - match database.configuration { - DatabaseConfiguration::SqlitePersistent { .. } - | DatabaseConfiguration::SqliteInMemory { .. } => Arc::new(AutoRetry::new( - CredentialSqlxDatabase::new(self.database(), node_name), - )), - DatabaseConfiguration::Postgres { .. } => { - Arc::new(CredentialSqlxDatabase::new(self.database(), node_name)) - } - } + CredentialSqlxDatabase::make_repository(self.database(), node_name) } } diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/resources.rs b/implementations/rust/ockam/ockam_api/src/cli_state/resources.rs index d61259599de..07d3809dfcb 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/resources.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/resources.rs @@ -1,13 +1,11 @@ -use crate::cli_state::AutoRetry; use crate::CliState; use ockam_abac::{Resources, ResourcesSqlxDatabase}; -use std::sync::Arc; impl CliState { pub fn resources(&self, node_name: &str) -> Resources { - Resources::new(Arc::new(AutoRetry::new(ResourcesSqlxDatabase::new( + Resources::new(ResourcesSqlxDatabase::make_repository( self.database(), node_name, - )))) + )) } } diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/secure_channels.rs b/implementations/rust/ockam/ockam_api/src/cli_state/secure_channels.rs index 0a3e0e244db..ff4219011b8 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/secure_channels.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/secure_channels.rs @@ -2,8 +2,8 @@ use std::sync::Arc; use ockam::identity::{Identities, SecureChannelSqlxDatabase, SecureChannels}; +use crate::cli_state::CliState; use crate::cli_state::Result; -use crate::cli_state::{AutoRetry, CliState}; impl CliState { pub async fn secure_channels(&self, node_name: &str) -> Result> { @@ -15,9 +15,7 @@ impl CliState { .build(); Ok(SecureChannels::from_identities( identities, - Arc::new(AutoRetry::new(SecureChannelSqlxDatabase::new( - self.database(), - ))), + SecureChannelSqlxDatabase::make_repository(self.database()), )) } } diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/storage/auto_retry.rs b/implementations/rust/ockam/ockam_api/src/cli_state/storage/auto_retry.rs deleted file mode 100644 index 51aee0c14b9..00000000000 --- a/implementations/rust/ockam/ockam_api/src/cli_state/storage/auto_retry.rs +++ /dev/null @@ -1,821 +0,0 @@ -use crate::authenticator::one_time_code::OneTimeCode; -use crate::authenticator::{ - AuthorityEnrollmentTokenRepository, AuthorityMember, AuthorityMembersRepository, - EnrollmentToken, PreTrustedIdentities, -}; -use crate::cli_state::journeys::{Journey, ProjectJourney}; -use crate::cli_state::{ - EnrollmentsRepository, IdentitiesRepository, IdentityEnrollment, JourneysRepository, - NamedIdentity, NamedVault, NodeInfo, NodesRepository, ProjectsRepository, SpacesRepository, - TcpInlet, TcpPortalsRepository, UsersRepository, VaultType, VaultsRepository, -}; -use crate::cloud::email_address::EmailAddress; -use crate::cloud::enroll::auth0::UserInfo; -use crate::cloud::project::models::ProjectModel; -use crate::cloud::space::Space; -use crate::config::lookup::InternetAddress; -use crate::nodes::models::portal::OutletStatus; -use chrono::{DateTime, Utc}; -use ockam::identity::models::{ChangeHistory, CredentialAndPurposeKey, PurposeKeyAttestation}; -use ockam::identity::storage::PurposeKeysRepository; -use ockam::identity::{ - AttributesEntry, ChangeHistoryRepository, CredentialRepository, Identifier, Identity, - IdentityAttributesRepository, PersistedSecureChannel, Purpose, SecureChannelRepository, - TimestampInSeconds, -}; -use ockam_abac::{ - Action, Expr, Resource, ResourceName, ResourcePoliciesRepository, ResourcePolicy, ResourceType, - ResourceTypePoliciesRepository, ResourceTypePolicy, ResourcesRepository, -}; -use ockam_core::{async_trait, Address}; -use ockam_vault::storage::SecretsRepository; -use ockam_vault::{ - AeadSecret, AeadSecretKeyHandle, SigningSecret, SigningSecretKeyHandle, X25519SecretKey, - X25519SecretKeyHandle, -}; - -macro_rules! retry { - ($async_function:expr) => {{ - let mut retries = 0; - loop { - match $async_function.await { - Ok(result) => break Ok(result), - Err(err) => { - if err.to_string().contains("database is locked") && retries < 100 { - tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; - } else { - break Err(err); - } - retries += 1; - } - } - } - }}; -} - -#[derive(Clone)] -pub struct AutoRetry { - wrapped: T, -} - -impl AutoRetry { - pub(crate) fn new(wrapped_trait: T) -> AutoRetry { - Self { - wrapped: wrapped_trait, - } - } -} - -#[async_trait] -impl EnrollmentsRepository for AutoRetry { - async fn set_as_enrolled( - &self, - identifier: &Identifier, - email: &EmailAddress, - ) -> ockam_core::Result<()> { - retry!(self.wrapped.set_as_enrolled(identifier, email)) - } - - async fn get_enrolled_identities(&self) -> ockam_core::Result> { - retry!(self.wrapped.get_enrolled_identities()) - } - - async fn get_all_identities_enrollments(&self) -> ockam_core::Result> { - retry!(self.wrapped.get_all_identities_enrollments()) - } - - async fn is_default_identity_enrolled(&self) -> ockam_core::Result { - retry!(self.wrapped.is_default_identity_enrolled()) - } - - async fn is_identity_enrolled(&self, name: &str) -> ockam_core::Result { - retry!(self.wrapped.is_identity_enrolled(name)) - } -} - -#[async_trait] -impl AuthorityEnrollmentTokenRepository for AutoRetry { - async fn use_token( - &self, - one_time_code: OneTimeCode, - now: TimestampInSeconds, - ) -> ockam_core::Result> { - retry!(self.wrapped.use_token(one_time_code, now)) - } - - async fn store_new_token(&self, token: EnrollmentToken) -> ockam_core::Result<()> { - retry!(self.wrapped.store_new_token(token.clone())) - } -} - -#[async_trait] -impl IdentitiesRepository for AutoRetry { - async fn store_named_identity( - &self, - identifier: &Identifier, - name: &str, - vault_name: &str, - ) -> ockam_core::Result { - retry!(self - .wrapped - .store_named_identity(identifier, name, vault_name)) - } - - async fn delete_identity(&self, name: &str) -> ockam_core::Result> { - retry!(self.wrapped.delete_identity(name)) - } - - async fn delete_identity_by_identifier( - &self, - identifier: &Identifier, - ) -> ockam_core::Result> { - retry!(self.wrapped.delete_identity_by_identifier(identifier)) - } - - async fn get_identifier(&self, name: &str) -> ockam_core::Result> { - retry!(self.wrapped.get_identifier(name)) - } - - async fn get_identity_name_by_identifier( - &self, - identifier: &Identifier, - ) -> ockam_core::Result> { - retry!(self.wrapped.get_identity_name_by_identifier(identifier)) - } - - async fn get_named_identity(&self, name: &str) -> ockam_core::Result> { - retry!(self.wrapped.get_named_identity(name)) - } - - async fn get_named_identity_by_identifier( - &self, - identifier: &Identifier, - ) -> ockam_core::Result> { - retry!(self.wrapped.get_named_identity_by_identifier(identifier)) - } - - async fn get_named_identities(&self) -> ockam_core::Result> { - retry!(self.wrapped.get_named_identities()) - } - - async fn get_named_identities_by_vault_name( - &self, - vault_name: &str, - ) -> ockam_core::Result> { - retry!(self.wrapped.get_named_identities_by_vault_name(vault_name)) - } - - async fn set_as_default(&self, name: &str) -> ockam_core::Result<()> { - retry!(self.wrapped.set_as_default(name)) - } - - async fn set_as_default_by_identifier( - &self, - identifier: &Identifier, - ) -> ockam_core::Result<()> { - retry!(self.wrapped.set_as_default_by_identifier(identifier)) - } - - async fn get_default_named_identity(&self) -> ockam_core::Result> { - retry!(self.wrapped.get_default_named_identity()) - } -} - -#[async_trait] -impl JourneysRepository for AutoRetry { - async fn store_project_journey( - &self, - project_journey: ProjectJourney, - ) -> ockam_core::Result<()> { - retry!(self.wrapped.store_project_journey(project_journey.clone())) - } - - async fn get_project_journey( - &self, - project_id: &str, - now: DateTime, - ) -> ockam_core::Result> { - retry!(self.wrapped.get_project_journey(project_id, now)) - } - - async fn delete_project_journeys(&self, project_id: &str) -> ockam_core::Result<()> { - retry!(self.wrapped.delete_project_journeys(project_id)) - } - - async fn store_host_journey(&self, host_journey: Journey) -> ockam_core::Result<()> { - retry!(self.wrapped.store_host_journey(host_journey.clone())) - } - - async fn get_host_journey(&self, now: DateTime) -> ockam_core::Result> { - retry!(self.wrapped.get_host_journey(now)) - } -} - -#[async_trait] -impl NodesRepository for AutoRetry { - async fn store_node(&self, node_info: &NodeInfo) -> ockam_core::Result<()> { - retry!(self.wrapped.store_node(node_info)) - } - - async fn get_nodes(&self) -> ockam_core::Result> { - retry!(self.wrapped.get_nodes()) - } - - async fn get_node(&self, node_name: &str) -> ockam_core::Result> { - retry!(self.wrapped.get_node(node_name)) - } - - async fn get_nodes_by_identifier( - &self, - identifier: &Identifier, - ) -> ockam_core::Result> { - retry!(self.wrapped.get_nodes_by_identifier(identifier)) - } - - async fn get_default_node(&self) -> ockam_core::Result> { - retry!(self.wrapped.get_default_node()) - } - - async fn set_default_node(&self, node_name: &str) -> ockam_core::Result<()> { - retry!(self.wrapped.set_default_node(node_name)) - } - - async fn is_default_node(&self, node_name: &str) -> ockam_core::Result { - retry!(self.wrapped.is_default_node(node_name)) - } - - async fn delete_node(&self, node_name: &str) -> ockam_core::Result<()> { - retry!(self.wrapped.delete_node(node_name)) - } - - async fn set_tcp_listener_address( - &self, - node_name: &str, - address: &InternetAddress, - ) -> ockam_core::Result<()> { - retry!(self.wrapped.set_tcp_listener_address(node_name, address)) - } - - async fn set_status_endpoint_address( - &self, - node_name: &str, - address: &InternetAddress, - ) -> ockam_core::Result<()> { - retry!(self.wrapped.set_status_endpoint_address(node_name, address)) - } - - async fn set_as_authority_node(&self, node_name: &str) -> ockam_core::Result<()> { - retry!(self.wrapped.set_as_authority_node(node_name)) - } - - async fn get_tcp_listener_address( - &self, - node_name: &str, - ) -> ockam_core::Result> { - retry!(self.wrapped.get_tcp_listener_address(node_name)) - } - - async fn get_status_endpoint_address( - &self, - node_name: &str, - ) -> ockam_core::Result> { - retry!(self.wrapped.get_status_endpoint_address(node_name)) - } - - async fn set_node_pid(&self, node_name: &str, pid: u32) -> ockam_core::Result<()> { - retry!(self.wrapped.set_node_pid(node_name, pid)) - } - - async fn set_no_node_pid(&self, node_name: &str) -> ockam_core::Result<()> { - retry!(self.wrapped.set_no_node_pid(node_name)) - } - - async fn set_node_project_name( - &self, - node_name: &str, - project_name: &str, - ) -> ockam_core::Result<()> { - retry!(self.wrapped.set_node_project_name(node_name, project_name)) - } - - async fn get_node_project_name(&self, node_name: &str) -> ockam_core::Result> { - retry!(self.wrapped.get_node_project_name(node_name)) - } -} - -#[async_trait] -impl ProjectsRepository for AutoRetry { - async fn store_project(&self, project: &ProjectModel) -> ockam_core::Result<()> { - retry!(self.wrapped.store_project(project)) - } - - async fn get_project(&self, project_id: &str) -> ockam_core::Result> { - retry!(self.wrapped.get_project(project_id)) - } - - async fn get_project_by_name(&self, name: &str) -> ockam_core::Result> { - retry!(self.wrapped.get_project_by_name(name)) - } - - async fn get_projects(&self) -> ockam_core::Result> { - retry!(self.wrapped.get_projects()) - } - - async fn get_default_project(&self) -> ockam_core::Result> { - retry!(self.wrapped.get_default_project()) - } - - async fn set_default_project(&self, project_id: &str) -> ockam_core::Result<()> { - retry!(self.wrapped.set_default_project(project_id)) - } - - async fn delete_project(&self, project_id: &str) -> ockam_core::Result<()> { - retry!(self.wrapped.delete_project(project_id)) - } -} - -#[async_trait] -impl SpacesRepository for AutoRetry { - async fn store_space(&self, space: &Space) -> ockam_core::Result<()> { - retry!(self.wrapped.store_space(space)) - } - - async fn get_space(&self, space_id: &str) -> ockam_core::Result> { - retry!(self.wrapped.get_space(space_id)) - } - - async fn get_space_by_name(&self, name: &str) -> ockam_core::Result> { - retry!(self.wrapped.get_space_by_name(name)) - } - - async fn get_spaces(&self) -> ockam_core::Result> { - retry!(self.wrapped.get_spaces()) - } - - async fn get_default_space(&self) -> ockam_core::Result> { - retry!(self.wrapped.get_default_space()) - } - - async fn set_default_space(&self, space_id: &str) -> ockam_core::Result<()> { - retry!(self.wrapped.set_default_space(space_id)) - } - - async fn delete_space(&self, space_id: &str) -> ockam_core::Result<()> { - retry!(self.wrapped.delete_space(space_id)) - } -} - -#[async_trait] -impl TcpPortalsRepository for AutoRetry { - async fn store_tcp_inlet( - &self, - node_name: &str, - tcp_inlet: &TcpInlet, - ) -> ockam_core::Result<()> { - retry!(self.wrapped.store_tcp_inlet(node_name, tcp_inlet)) - } - - async fn get_tcp_inlet( - &self, - node_name: &str, - alias: &str, - ) -> ockam_core::Result> { - retry!(self.wrapped.get_tcp_inlet(node_name, alias)) - } - - async fn delete_tcp_inlet(&self, node_name: &str, alias: &str) -> ockam_core::Result<()> { - retry!(self.wrapped.delete_tcp_inlet(node_name, alias)) - } - - async fn store_tcp_outlet( - &self, - node_name: &str, - tcp_outlet_status: &OutletStatus, - ) -> ockam_core::Result<()> { - retry!(self.wrapped.store_tcp_outlet(node_name, tcp_outlet_status)) - } - - async fn get_tcp_outlet( - &self, - node_name: &str, - worker_addr: &Address, - ) -> ockam_core::Result> { - retry!(self.wrapped.get_tcp_outlet(node_name, worker_addr)) - } - - async fn delete_tcp_outlet( - &self, - node_name: &str, - worker_addr: &Address, - ) -> ockam_core::Result<()> { - retry!(self.wrapped.delete_tcp_outlet(node_name, worker_addr)) - } -} - -#[async_trait] -impl UsersRepository for AutoRetry { - async fn store_user(&self, user: &UserInfo) -> ockam_core::Result<()> { - retry!(self.wrapped.store_user(user)) - } - - async fn get_default_user(&self) -> ockam_core::Result> { - retry!(self.wrapped.get_default_user()) - } - - async fn set_default_user(&self, email: &EmailAddress) -> ockam_core::Result<()> { - retry!(self.wrapped.set_default_user(email)) - } - - async fn get_user(&self, email: &EmailAddress) -> ockam_core::Result> { - retry!(self.wrapped.get_user(email)) - } - - async fn get_users(&self) -> ockam_core::Result> { - retry!(self.wrapped.get_users()) - } - - async fn delete_user(&self, email: &EmailAddress) -> ockam_core::Result<()> { - retry!(self.wrapped.delete_user(email)) - } -} - -#[async_trait] -impl VaultsRepository for AutoRetry { - async fn store_vault( - &self, - name: &str, - vault_type: VaultType, - ) -> ockam_core::Result { - retry!(self.wrapped.store_vault(name, vault_type.clone())) - } - - async fn update_vault(&self, name: &str, vault_type: VaultType) -> ockam_core::Result<()> { - retry!(self.wrapped.update_vault(name, vault_type.clone())) - } - - async fn delete_named_vault(&self, name: &str) -> ockam_core::Result<()> { - retry!(self.wrapped.delete_named_vault(name)) - } - - async fn get_database_vault(&self) -> ockam_core::Result> { - retry!(self.wrapped.get_database_vault()) - } - - async fn get_named_vault(&self, name: &str) -> ockam_core::Result> { - retry!(self.wrapped.get_named_vault(name)) - } - - async fn get_named_vaults(&self) -> ockam_core::Result> { - retry!(self.wrapped.get_named_vaults()) - } -} - -#[async_trait] -impl ChangeHistoryRepository for AutoRetry { - async fn update_identity( - &self, - identity: &Identity, - ignore_older: bool, - ) -> ockam_core::Result<()> { - retry!(self.wrapped.update_identity(identity, ignore_older)) - } - - async fn store_change_history( - &self, - identifier: &Identifier, - change_history: ChangeHistory, - ) -> ockam_core::Result<()> { - retry!(self - .wrapped - .store_change_history(identifier, change_history.clone())) - } - - async fn delete_change_history(&self, identifier: &Identifier) -> ockam_core::Result<()> { - retry!(self.wrapped.delete_change_history(identifier)) - } - - async fn get_change_history( - &self, - identifier: &Identifier, - ) -> ockam_core::Result> { - retry!(self.wrapped.get_change_history(identifier)) - } - - async fn get_change_histories(&self) -> ockam_core::Result> { - retry!(self.wrapped.get_change_histories()) - } -} - -#[async_trait] -impl PurposeKeysRepository for AutoRetry { - async fn set_purpose_key( - &self, - subject: &Identifier, - purpose: Purpose, - purpose_key_attestation: &PurposeKeyAttestation, - ) -> ockam_core::Result<()> { - retry!(self - .wrapped - .set_purpose_key(subject, purpose, purpose_key_attestation)) - } - - async fn delete_purpose_key( - &self, - subject: &Identifier, - purpose: Purpose, - ) -> ockam_core::Result<()> { - retry!(self.wrapped.delete_purpose_key(subject, purpose)) - } - - async fn get_purpose_key( - &self, - identifier: &Identifier, - purpose: Purpose, - ) -> ockam_core::Result> { - retry!(self.wrapped.get_purpose_key(identifier, purpose)) - } - - async fn delete_all(&self) -> ockam_core::Result<()> { - retry!(self.wrapped.delete_all()) - } -} - -#[async_trait] -impl CredentialRepository for AutoRetry { - async fn get( - &self, - subject: &Identifier, - issuer: &Identifier, - scope: &str, - ) -> ockam_core::Result> { - retry!(self.wrapped.get(subject, issuer, scope)) - } - - async fn put( - &self, - subject: &Identifier, - issuer: &Identifier, - scope: &str, - expires_at: TimestampInSeconds, - credential: CredentialAndPurposeKey, - ) -> ockam_core::Result<()> { - retry!(self - .wrapped - .put(subject, issuer, scope, expires_at, credential.clone())) - } - - async fn delete( - &self, - subject: &Identifier, - issuer: &Identifier, - scope: &str, - ) -> ockam_core::Result<()> { - retry!(self.wrapped.delete(subject, issuer, scope)) - } -} - -#[async_trait] -impl SecretsRepository for AutoRetry { - async fn store_signing_secret( - &self, - handle: &SigningSecretKeyHandle, - secret: SigningSecret, - ) -> ockam_core::Result<()> { - retry!(self.wrapped.store_signing_secret(handle, secret.clone())) - } - - async fn delete_signing_secret( - &self, - handle: &SigningSecretKeyHandle, - ) -> ockam_core::Result { - retry!(self.wrapped.delete_signing_secret(handle)) - } - - async fn get_signing_secret( - &self, - handle: &SigningSecretKeyHandle, - ) -> ockam_core::Result> { - retry!(self.wrapped.get_signing_secret(handle)) - } - - async fn get_signing_secret_handles(&self) -> ockam_core::Result> { - retry!(self.wrapped.get_signing_secret_handles()) - } - - async fn store_x25519_secret( - &self, - handle: &X25519SecretKeyHandle, - secret: X25519SecretKey, - ) -> ockam_core::Result<()> { - retry!(self.wrapped.store_x25519_secret(handle, secret.clone())) - } - - async fn delete_x25519_secret( - &self, - handle: &X25519SecretKeyHandle, - ) -> ockam_core::Result { - retry!(self.wrapped.delete_x25519_secret(handle)) - } - - async fn get_x25519_secret( - &self, - handle: &X25519SecretKeyHandle, - ) -> ockam_core::Result> { - retry!(self.wrapped.get_x25519_secret(handle)) - } - - async fn get_x25519_secret_handles(&self) -> ockam_core::Result> { - retry!(self.wrapped.get_x25519_secret_handles()) - } - - async fn store_aead_secret( - &self, - handle: &AeadSecretKeyHandle, - secret: AeadSecret, - ) -> ockam_core::Result<()> { - retry!(self.wrapped.store_aead_secret(handle, secret.clone())) - } - - async fn delete_aead_secret(&self, handle: &AeadSecretKeyHandle) -> ockam_core::Result { - retry!(self.wrapped.delete_aead_secret(handle)) - } - - async fn get_aead_secret( - &self, - handle: &AeadSecretKeyHandle, - ) -> ockam_core::Result> { - retry!(self.wrapped.get_aead_secret(handle)) - } - - async fn delete_all(&self) -> ockam_core::Result<()> { - retry!(self.wrapped.delete_all()) - } -} - -#[async_trait] -impl ResourcesRepository for AutoRetry { - async fn store_resource(&self, resource: &Resource) -> ockam_core::Result<()> { - retry!(self.wrapped.store_resource(resource)) - } - - async fn get_resource( - &self, - resource_name: &ResourceName, - ) -> ockam_core::Result> { - retry!(self.wrapped.get_resource(resource_name)) - } - - async fn delete_resource(&self, resource_name: &ResourceName) -> ockam_core::Result<()> { - retry!(self.wrapped.delete_resource(resource_name)) - } -} - -#[async_trait] -impl ResourcePoliciesRepository for AutoRetry { - async fn store_policy( - &self, - resource_name: &ResourceName, - action: &Action, - expression: &Expr, - ) -> ockam_core::Result<()> { - retry!(self.wrapped.store_policy(resource_name, action, expression)) - } - - async fn get_policy( - &self, - resource_name: &ResourceName, - action: &Action, - ) -> ockam_core::Result> { - retry!(self.wrapped.get_policy(resource_name, action)) - } - - async fn get_policies(&self) -> ockam_core::Result> { - retry!(self.wrapped.get_policies()) - } - - async fn get_policies_by_resource_name( - &self, - resource_name: &ResourceName, - ) -> ockam_core::Result> { - retry!(self.wrapped.get_policies_by_resource_name(resource_name)) - } - - async fn delete_policy( - &self, - resource_name: &ResourceName, - action: &Action, - ) -> ockam_core::Result<()> { - retry!(self.wrapped.delete_policy(resource_name, action)) - } -} - -#[async_trait] -impl ResourceTypePoliciesRepository for AutoRetry { - async fn store_policy( - &self, - resource_type: &ResourceType, - action: &Action, - expression: &Expr, - ) -> ockam_core::Result<()> { - retry!(self.wrapped.store_policy(resource_type, action, expression)) - } - - async fn get_policy( - &self, - resource_type: &ResourceType, - action: &Action, - ) -> ockam_core::Result> { - retry!(self.wrapped.get_policy(resource_type, action)) - } - - async fn get_policies(&self) -> ockam_core::Result> { - retry!(self.wrapped.get_policies()) - } - - async fn get_policies_by_resource_type( - &self, - resource_type: &ResourceType, - ) -> ockam_core::Result> { - retry!(self.wrapped.get_policies_by_resource_type(resource_type)) - } - - async fn delete_policy( - &self, - resource_type: &ResourceType, - action: &Action, - ) -> ockam_core::Result<()> { - retry!(self.wrapped.delete_policy(resource_type, action)) - } -} - -#[async_trait] -impl AuthorityMembersRepository for AutoRetry { - async fn get_member( - &self, - identifier: &Identifier, - ) -> ockam_core::Result> { - retry!(self.wrapped.get_member(identifier)) - } - - async fn get_members(&self) -> ockam_core::Result> { - retry!(self.wrapped.get_members()) - } - - async fn delete_member(&self, identifier: &Identifier) -> ockam_core::Result<()> { - retry!(self.wrapped.delete_member(identifier)) - } - - async fn add_member(&self, member: AuthorityMember) -> ockam_core::Result<()> { - retry!(self.wrapped.add_member(member.clone())) - } - - async fn bootstrap_pre_trusted_members( - &self, - pre_trusted_identities: &PreTrustedIdentities, - ) -> ockam_core::Result<()> { - retry!(self - .wrapped - .bootstrap_pre_trusted_members(pre_trusted_identities)) - } -} - -#[async_trait] -impl IdentityAttributesRepository for AutoRetry { - async fn get_attributes( - &self, - subject: &Identifier, - attested_by: &Identifier, - ) -> ockam_core::Result> { - retry!(self.wrapped.get_attributes(subject, attested_by)) - } - - async fn put_attributes( - &self, - subject: &Identifier, - entry: AttributesEntry, - ) -> ockam_core::Result<()> { - retry!(self.wrapped.put_attributes(subject, entry.clone())) - } - - async fn delete_expired_attributes(&self, now: TimestampInSeconds) -> ockam_core::Result<()> { - retry!(self.wrapped.delete_expired_attributes(now)) - } -} - -#[async_trait] -impl SecureChannelRepository for AutoRetry { - async fn get( - &self, - decryptor_remote_address: &Address, - ) -> ockam_core::Result> { - retry!(self.wrapped.get(decryptor_remote_address)) - } - - async fn put(&self, secure_channel: PersistedSecureChannel) -> ockam_core::Result<()> { - retry!(self.wrapped.put(secure_channel.clone())) - } - - async fn delete(&self, decryptor_remote_address: &Address) -> ockam_core::Result<()> { - retry!(self.wrapped.delete(decryptor_remote_address)) - } -} diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/storage/enrollments_repository.rs b/implementations/rust/ockam/ockam_api/src/cli_state/storage/enrollments_repository.rs index 7df1efab00d..0b7b4486174 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/storage/enrollments_repository.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/storage/enrollments_repository.rs @@ -1,9 +1,10 @@ +use crate::cli_state::enrollments::IdentityEnrollment; +use crate::cloud::email_address::EmailAddress; use ockam::identity::Identifier; use ockam_core::async_trait; use ockam_core::Result; - -use crate::cli_state::enrollments::IdentityEnrollment; -use crate::cloud::email_address::EmailAddress; +use ockam_node::database::AutoRetry; +use ockam_node::retry; /// This trait stores the enrollment status for local identities /// If an identity has been enrolled it is possible to retrieve: @@ -30,3 +31,26 @@ pub trait EnrollmentsRepository: Send + Sync + 'static { /// Return true if the identity with the given name is enrolled async fn is_identity_enrolled(&self, name: &str) -> Result; } + +#[async_trait] +impl EnrollmentsRepository for AutoRetry { + async fn set_as_enrolled(&self, identifier: &Identifier, email: &EmailAddress) -> Result<()> { + retry!(self.wrapped.set_as_enrolled(identifier, email)) + } + + async fn get_enrolled_identities(&self) -> Result> { + retry!(self.wrapped.get_enrolled_identities()) + } + + async fn get_all_identities_enrollments(&self) -> Result> { + retry!(self.wrapped.get_all_identities_enrollments()) + } + + async fn is_default_identity_enrolled(&self) -> Result { + retry!(self.wrapped.is_default_identity_enrolled()) + } + + async fn is_identity_enrolled(&self, name: &str) -> Result { + retry!(self.wrapped.is_identity_enrolled(name)) + } +} diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/storage/enrollments_repository_sql.rs b/implementations/rust/ockam/ockam_api/src/cli_state/storage/enrollments_repository_sql.rs index c5042e68644..0526e26d413 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/storage/enrollments_repository_sql.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/storage/enrollments_repository_sql.rs @@ -1,20 +1,20 @@ -use std::str::FromStr; - use sqlx::any::AnyRow; use sqlx::FromRow; use sqlx::*; +use std::str::FromStr; +use std::sync::Arc; use time::OffsetDateTime; +use crate::cli_state::enrollments::IdentityEnrollment; +use crate::cli_state::EnrollmentsRepository; +use crate::cloud::email_address::EmailAddress; use ockam::identity::Identifier; use ockam::{FromSqlxError, SqlxDatabase, ToVoid}; use ockam_core::async_trait; use ockam_core::Result; +use ockam_node::database::AutoRetry; use ockam_node::database::{Boolean, Nullable}; -use crate::cli_state::enrollments::IdentityEnrollment; -use crate::cli_state::EnrollmentsRepository; -use crate::cloud::email_address::EmailAddress; - #[derive(Clone)] pub struct EnrollmentsSqlxDatabase { database: SqlxDatabase, @@ -26,6 +26,14 @@ impl EnrollmentsSqlxDatabase { Self { database } } + pub fn make_repository(database: SqlxDatabase) -> Arc { + if database.needs_retry() { + Arc::new(AutoRetry::new(Self::new(database))) + } else { + Arc::new(Self::new(database)) + } + } + /// Create a new in-memory database #[allow(unused)] pub async fn create() -> Result { diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/storage/identities_repository.rs b/implementations/rust/ockam/ockam_api/src/cli_state/storage/identities_repository.rs index 89fd20fe341..c1c53c4f670 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/storage/identities_repository.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/storage/identities_repository.rs @@ -2,6 +2,8 @@ use crate::cli_state::NamedIdentity; use ockam::identity::Identifier; use ockam_core::async_trait; use ockam_core::Result; +use ockam_node::database::AutoRetry; +use ockam_node::retry; /// The identities repository stores metadata about identities /// which change history have been stored in the ChangeHistoryRepository. @@ -71,3 +73,73 @@ pub trait IdentitiesRepository: Send + Sync + 'static { /// Return the default named identity async fn get_default_named_identity(&self) -> Result>; } + +#[async_trait] +impl IdentitiesRepository for AutoRetry { + async fn store_named_identity( + &self, + identifier: &Identifier, + name: &str, + vault_name: &str, + ) -> Result { + retry!(self + .wrapped + .store_named_identity(identifier, name, vault_name)) + } + + async fn delete_identity(&self, name: &str) -> Result> { + retry!(self.wrapped.delete_identity(name)) + } + + async fn delete_identity_by_identifier( + &self, + identifier: &Identifier, + ) -> Result> { + retry!(self.wrapped.delete_identity_by_identifier(identifier)) + } + + async fn get_identifier(&self, name: &str) -> Result> { + retry!(self.wrapped.get_identifier(name)) + } + + async fn get_identity_name_by_identifier( + &self, + identifier: &Identifier, + ) -> Result> { + retry!(self.wrapped.get_identity_name_by_identifier(identifier)) + } + + async fn get_named_identity(&self, name: &str) -> Result> { + retry!(self.wrapped.get_named_identity(name)) + } + + async fn get_named_identity_by_identifier( + &self, + identifier: &Identifier, + ) -> Result> { + retry!(self.wrapped.get_named_identity_by_identifier(identifier)) + } + + async fn get_named_identities(&self) -> Result> { + retry!(self.wrapped.get_named_identities()) + } + + async fn get_named_identities_by_vault_name( + &self, + vault_name: &str, + ) -> Result> { + retry!(self.wrapped.get_named_identities_by_vault_name(vault_name)) + } + + async fn set_as_default(&self, name: &str) -> Result<()> { + retry!(self.wrapped.set_as_default(name)) + } + + async fn set_as_default_by_identifier(&self, identifier: &Identifier) -> Result<()> { + retry!(self.wrapped.set_as_default_by_identifier(identifier)) + } + + async fn get_default_named_identity(&self) -> Result> { + retry!(self.wrapped.get_default_named_identity()) + } +} diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/storage/identities_repository_sql.rs b/implementations/rust/ockam/ockam_api/src/cli_state/storage/identities_repository_sql.rs index d35ac1c2ac6..cc4b08a9b1c 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/storage/identities_repository_sql.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/storage/identities_repository_sql.rs @@ -1,14 +1,14 @@ use core::str::FromStr; - use sqlx::*; +use std::sync::Arc; +use crate::cli_state::{IdentitiesRepository, NamedIdentity}; use ockam::identity::Identifier; use ockam_core::async_trait; use ockam_core::Result; +use ockam_node::database::AutoRetry; use ockam_node::database::{Boolean, FromSqlxError, SqlxDatabase, ToVoid}; -use crate::cli_state::{IdentitiesRepository, NamedIdentity}; - /// Implementation of [`IdentitiesRepository`] trait based on an underlying database /// using sqlx as its API, and Sqlite as its driver #[derive(Clone)] @@ -23,6 +23,15 @@ impl IdentitiesSqlxDatabase { Self { database } } + /// Create a repository + pub fn make_repository(database: SqlxDatabase) -> Arc { + if database.needs_retry() { + Arc::new(AutoRetry::new(Self::new(database))) + } else { + Arc::new(Self::new(database)) + } + } + /// Create a new in-memory database pub async fn create() -> Result { Ok(Self::new(SqlxDatabase::in_memory("identities").await?)) diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/storage/journeys_repository.rs b/implementations/rust/ockam/ockam_api/src/cli_state/storage/journeys_repository.rs index d42f13274f8..280febfb9ed 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/storage/journeys_repository.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/storage/journeys_repository.rs @@ -2,6 +2,8 @@ use crate::cli_state::journeys::{Journey, ProjectJourney}; use chrono::{DateTime, Utc}; use ockam_core::async_trait; use ockam_core::Result; +use ockam_node::database::AutoRetry; +use ockam_node::retry; #[async_trait] pub trait JourneysRepository: Send + Sync + 'static { @@ -24,3 +26,30 @@ pub trait JourneysRepository: Send + Sync + 'static { /// Return the most recent host journey started after now async fn get_host_journey(&self, now: DateTime) -> Result>; } + +#[async_trait] +impl JourneysRepository for AutoRetry { + async fn store_project_journey(&self, project_journey: ProjectJourney) -> Result<()> { + retry!(self.wrapped.store_project_journey(project_journey.clone())) + } + + async fn get_project_journey( + &self, + project_id: &str, + now: DateTime, + ) -> Result> { + retry!(self.wrapped.get_project_journey(project_id, now)) + } + + async fn delete_project_journeys(&self, project_id: &str) -> Result<()> { + retry!(self.wrapped.delete_project_journeys(project_id)) + } + + async fn store_host_journey(&self, host_journey: Journey) -> Result<()> { + retry!(self.wrapped.store_host_journey(host_journey.clone())) + } + + async fn get_host_journey(&self, now: DateTime) -> Result> { + retry!(self.wrapped.get_host_journey(now)) + } +} diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/storage/journeys_repository_sql.rs b/implementations/rust/ockam/ockam_api/src/cli_state/storage/journeys_repository_sql.rs index 4d160009b82..fe7b2682a14 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/storage/journeys_repository_sql.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/storage/journeys_repository_sql.rs @@ -1,11 +1,13 @@ use chrono::{DateTime, Utc}; use sqlx::*; +use std::sync::Arc; use crate::cli_state::journeys::{Journey, ProjectJourney}; use crate::cli_state::JourneysRepository; use ockam_core::errcode::{Kind, Origin}; use ockam_core::Result; use ockam_core::{async_trait, OpenTelemetryContext}; +use ockam_node::database::AutoRetry; use ockam_node::database::{FromSqlxError, Nullable, SqlxDatabase, ToVoid}; #[derive(Clone)] @@ -20,6 +22,15 @@ impl JourneysSqlxDatabase { Self { database } } + /// Create a repository + pub fn make_repository(database: SqlxDatabase) -> Arc { + if database.needs_retry() { + Arc::new(AutoRetry::new(Self::new(database))) + } else { + Arc::new(Self::new(database)) + } + } + /// Create a new in-memory database pub async fn create() -> Result { Ok(Self::new( diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/storage/mod.rs b/implementations/rust/ockam/ockam_api/src/cli_state/storage/mod.rs index 1977bb995a1..ccd17ddcae3 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/storage/mod.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/storage/mod.rs @@ -1,4 +1,3 @@ -pub(crate) use auto_retry::*; pub use enrollments_repository::*; pub use enrollments_repository_sql::*; pub use identities_repository::*; @@ -18,7 +17,6 @@ pub use users_repository_sql::*; pub use vaults_repository::*; pub use vaults_repository_sql::*; -pub mod auto_retry; mod enrollments_repository; mod enrollments_repository_sql; mod identities_repository; diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/storage/nodes_repository.rs b/implementations/rust/ockam/ockam_api/src/cli_state/storage/nodes_repository.rs index f6c0dd986bc..a1baa338361 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/storage/nodes_repository.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/storage/nodes_repository.rs @@ -1,9 +1,10 @@ +use crate::cli_state::NodeInfo; +use crate::config::lookup::InternetAddress; use ockam::identity::Identifier; use ockam_core::async_trait; use ockam_core::Result; - -use crate::cli_state::NodeInfo; -use crate::config::lookup::InternetAddress; +use ockam_node::database::AutoRetry; +use ockam_node::retry; /// This trait supports the storage of node data: /// @@ -77,3 +78,85 @@ pub trait NodesRepository: Send + Sync + 'static { /// Return the name of the project associated to a node async fn get_node_project_name(&self, node_name: &str) -> Result>; } + +#[async_trait] +impl NodesRepository for AutoRetry { + async fn store_node(&self, node_info: &NodeInfo) -> Result<()> { + retry!(self.wrapped.store_node(node_info)) + } + + async fn get_nodes(&self) -> Result> { + retry!(self.wrapped.get_nodes()) + } + + async fn get_node(&self, node_name: &str) -> Result> { + retry!(self.wrapped.get_node(node_name)) + } + + async fn get_nodes_by_identifier(&self, identifier: &Identifier) -> Result> { + retry!(self.wrapped.get_nodes_by_identifier(identifier)) + } + + async fn get_default_node(&self) -> Result> { + retry!(self.wrapped.get_default_node()) + } + + async fn set_default_node(&self, node_name: &str) -> Result<()> { + retry!(self.wrapped.set_default_node(node_name)) + } + + async fn is_default_node(&self, node_name: &str) -> Result { + retry!(self.wrapped.is_default_node(node_name)) + } + + async fn delete_node(&self, node_name: &str) -> Result<()> { + retry!(self.wrapped.delete_node(node_name)) + } + + async fn set_tcp_listener_address( + &self, + node_name: &str, + address: &InternetAddress, + ) -> Result<()> { + retry!(self.wrapped.set_tcp_listener_address(node_name, address)) + } + + async fn set_status_endpoint_address( + &self, + node_name: &str, + address: &InternetAddress, + ) -> Result<()> { + retry!(self.wrapped.set_status_endpoint_address(node_name, address)) + } + + async fn set_as_authority_node(&self, node_name: &str) -> Result<()> { + retry!(self.wrapped.set_as_authority_node(node_name)) + } + + async fn get_tcp_listener_address(&self, node_name: &str) -> Result> { + retry!(self.wrapped.get_tcp_listener_address(node_name)) + } + + async fn get_status_endpoint_address( + &self, + node_name: &str, + ) -> Result> { + retry!(self.wrapped.get_status_endpoint_address(node_name)) + } + + async fn set_node_pid(&self, node_name: &str, pid: u32) -> Result<()> { + retry!(self.wrapped.set_node_pid(node_name, pid)) + } + + async fn set_no_node_pid(&self, node_name: &str) -> Result<()> { + retry!(self.wrapped.set_no_node_pid(node_name)) + } + + async fn set_node_project_name(&self, node_name: &str, project_name: &str) -> Result<()> { + retry!(self.wrapped.set_node_project_name(node_name, project_name)) + } + + async fn get_node_project_name(&self, node_name: &str) -> Result> { + retry!(self.wrapped.get_node_project_name(node_name)) + } +} diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/storage/nodes_repository_sql.rs b/implementations/rust/ockam/ockam_api/src/cli_state/storage/nodes_repository_sql.rs index 60ebaaff0df..2cd89e753c3 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/storage/nodes_repository_sql.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/storage/nodes_repository_sql.rs @@ -1,5 +1,3 @@ -use std::str::FromStr; - use ockam::identity::Identifier; use ockam::{FromSqlxError, SqlxDatabase, ToVoid}; use ockam_core::async_trait; @@ -8,12 +6,14 @@ use sqlx::encode::IsNull; use sqlx::error::BoxDynError; use sqlx::postgres::any::AnyArgumentBuffer; use sqlx::*; - -use ockam_core::Result; -use ockam_node::database::{Boolean, Nullable}; +use std::str::FromStr; +use std::sync::Arc; use crate::cli_state::{NodeInfo, NodesRepository}; use crate::config::lookup::InternetAddress; +use ockam_core::Result; +use ockam_node::database::AutoRetry; +use ockam_node::database::{Boolean, Nullable}; #[derive(Clone)] pub struct NodesSqlxDatabase { @@ -26,6 +26,15 @@ impl NodesSqlxDatabase { Self { database } } + /// Create a repository + pub fn make_repository(database: SqlxDatabase) -> Arc { + if database.needs_retry() { + Arc::new(AutoRetry::new(Self::new(database))) + } else { + Arc::new(Self::new(database)) + } + } + /// Create a new in-memory database pub async fn create() -> Result { Ok(Self::new(SqlxDatabase::in_memory("nodes").await?)) diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/storage/projects_repository.rs b/implementations/rust/ockam/ockam_api/src/cli_state/storage/projects_repository.rs index 524c13daaa7..d531f2aabae 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/storage/projects_repository.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/storage/projects_repository.rs @@ -1,7 +1,8 @@ +use crate::cloud::project::models::ProjectModel; use ockam_core::async_trait; use ockam_core::Result; - -use crate::cloud::project::models::ProjectModel; +use ockam_node::database::AutoRetry; +use ockam_node::retry; /// This trait supports the storage of projects as retrieved from the Controller /// @@ -34,3 +35,34 @@ pub trait ProjectsRepository: Send + Sync + 'static { /// Return true if the project could be deleted async fn delete_project(&self, project_id: &str) -> Result<()>; } + +#[async_trait] +impl ProjectsRepository for AutoRetry { + async fn store_project(&self, project: &ProjectModel) -> Result<()> { + retry!(self.wrapped.store_project(project)) + } + + async fn get_project(&self, project_id: &str) -> Result> { + retry!(self.wrapped.get_project(project_id)) + } + + async fn get_project_by_name(&self, name: &str) -> Result> { + retry!(self.wrapped.get_project_by_name(name)) + } + + async fn get_projects(&self) -> Result> { + retry!(self.wrapped.get_projects()) + } + + async fn get_default_project(&self) -> Result> { + retry!(self.wrapped.get_default_project()) + } + + async fn set_default_project(&self, project_id: &str) -> Result<()> { + retry!(self.wrapped.set_default_project(project_id)) + } + + async fn delete_project(&self, project_id: &str) -> Result<()> { + retry!(self.wrapped.delete_project(project_id)) + } +} diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/storage/projects_repository_sql.rs b/implementations/rust/ockam/ockam_api/src/cli_state/storage/projects_repository_sql.rs index 42cd46e76c1..ada58ee85a8 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/storage/projects_repository_sql.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/storage/projects_repository_sql.rs @@ -1,9 +1,15 @@ +use crate::cloud::addon::KafkaConfig; +use crate::cloud::email_address::EmailAddress; +use crate::cloud::project::models::{OktaConfig, ProjectModel, ProjectUserRole}; +use crate::cloud::share::{RoleInShare, ShareScope}; +use crate::minicbor_url::Url; use itertools::Itertools; use ockam::identity::Identifier; use ockam_core::async_trait; use ockam_core::env::FromString; use ockam_core::errcode::{Kind, Origin}; use ockam_core::{Error, Result}; +use ockam_node::database::AutoRetry; use ockam_node::database::{Boolean, FromSqlxError, Nullable, SqlxDatabase, ToVoid}; use sqlx::any::AnyRow; use sqlx::encode::IsNull; @@ -11,12 +17,7 @@ use sqlx::error::BoxDynError; use sqlx::postgres::any::AnyArgumentBuffer; use sqlx::*; use std::str::FromStr; - -use crate::cloud::addon::KafkaConfig; -use crate::cloud::email_address::EmailAddress; -use crate::cloud::project::models::{OktaConfig, ProjectModel, ProjectUserRole}; -use crate::cloud::share::{RoleInShare, ShareScope}; -use crate::minicbor_url::Url; +use std::sync::Arc; use super::ProjectsRepository; @@ -40,6 +41,15 @@ impl ProjectsSqlxDatabase { Self { database } } + /// Create a repository + pub fn make_repository(database: SqlxDatabase) -> Arc { + if database.needs_retry() { + Arc::new(AutoRetry::new(Self::new(database))) + } else { + Arc::new(Self::new(database)) + } + } + /// Create a new in-memory database pub async fn create() -> Result { Ok(Self::new(SqlxDatabase::in_memory("projects").await?)) diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/storage/spaces_repository.rs b/implementations/rust/ockam/ockam_api/src/cli_state/storage/spaces_repository.rs index a5d7fcc0f29..58dcb5174b4 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/storage/spaces_repository.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/storage/spaces_repository.rs @@ -1,6 +1,8 @@ use crate::cloud::space::Space; use ockam_core::async_trait; use ockam_core::Result; +use ockam_node::database::AutoRetry; +use ockam_node::retry; /// This trait supports the storage of spaces as retrieved from the Controller /// @@ -30,3 +32,34 @@ pub trait SpacesRepository: Send + Sync + 'static { /// Delete a space async fn delete_space(&self, space_id: &str) -> Result<()>; } + +#[async_trait] +impl SpacesRepository for AutoRetry { + async fn store_space(&self, space: &Space) -> Result<()> { + retry!(self.wrapped.store_space(space)) + } + + async fn get_space(&self, space_id: &str) -> Result> { + retry!(self.wrapped.get_space(space_id)) + } + + async fn get_space_by_name(&self, name: &str) -> Result> { + retry!(self.wrapped.get_space_by_name(name)) + } + + async fn get_spaces(&self) -> Result> { + retry!(self.wrapped.get_spaces()) + } + + async fn get_default_space(&self) -> Result> { + retry!(self.wrapped.get_default_space()) + } + + async fn set_default_space(&self, space_id: &str) -> Result<()> { + retry!(self.wrapped.set_default_space(space_id)) + } + + async fn delete_space(&self, space_id: &str) -> Result<()> { + retry!(self.wrapped.delete_space(space_id)) + } +} diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/storage/spaces_repository_sql.rs b/implementations/rust/ockam/ockam_api/src/cli_state/storage/spaces_repository_sql.rs index 35daafae99d..3a08b821c8a 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/storage/spaces_repository_sql.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/storage/spaces_repository_sql.rs @@ -3,10 +3,12 @@ use crate::cloud::space::Space; use crate::cloud::subscription::{Subscription, SubscriptionName}; use ockam_core::async_trait; use ockam_core::Result; +use ockam_node::database::AutoRetry; use ockam_node::database::{Boolean, FromSqlxError, Nullable, SqlxDatabase, ToVoid}; use sqlx::any::AnyRow; use sqlx::*; use std::str::FromStr; +use std::sync::Arc; use time::OffsetDateTime; #[derive(Clone)] @@ -21,6 +23,15 @@ impl SpacesSqlxDatabase { Self { database } } + /// Create a repository + pub fn make_repository(database: SqlxDatabase) -> Arc { + if database.needs_retry() { + Arc::new(AutoRetry::new(Self::new(database))) + } else { + Arc::new(Self::new(database)) + } + } + /// Create a new in-memory database pub async fn create() -> Result { Ok(Self::new(SqlxDatabase::in_memory("spaces").await?)) diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/storage/tcp_portals_repository.rs b/implementations/rust/ockam/ockam_api/src/cli_state/storage/tcp_portals_repository.rs index bae06610ad0..9e8b9efc3c9 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/storage/tcp_portals_repository.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/storage/tcp_portals_repository.rs @@ -2,6 +2,8 @@ use crate::nodes::models::portal::OutletStatus; use ockam_core::Result; use ockam_core::{async_trait, Address}; use ockam_multiaddr::MultiAddr; +use ockam_node::database::AutoRetry; +use ockam_node::retry; use std::net::SocketAddr; /// The TcpPortalsRepository is responsible for accessing the configured tcp inlets and tcp outlets @@ -32,6 +34,41 @@ pub trait TcpPortalsRepository: Send + Sync + 'static { async fn delete_tcp_outlet(&self, node_name: &str, worker_addr: &Address) -> Result<()>; } +#[async_trait] +impl TcpPortalsRepository for AutoRetry { + async fn store_tcp_inlet(&self, node_name: &str, tcp_inlet: &TcpInlet) -> Result<()> { + retry!(self.wrapped.store_tcp_inlet(node_name, tcp_inlet)) + } + + async fn get_tcp_inlet(&self, node_name: &str, alias: &str) -> Result> { + retry!(self.wrapped.get_tcp_inlet(node_name, alias)) + } + + async fn delete_tcp_inlet(&self, node_name: &str, alias: &str) -> Result<()> { + retry!(self.wrapped.delete_tcp_inlet(node_name, alias)) + } + + async fn store_tcp_outlet( + &self, + node_name: &str, + tcp_outlet_status: &OutletStatus, + ) -> Result<()> { + retry!(self.wrapped.store_tcp_outlet(node_name, tcp_outlet_status)) + } + + async fn get_tcp_outlet( + &self, + node_name: &str, + worker_addr: &Address, + ) -> Result> { + retry!(self.wrapped.get_tcp_outlet(node_name, worker_addr)) + } + + async fn delete_tcp_outlet(&self, node_name: &str, worker_addr: &Address) -> Result<()> { + retry!(self.wrapped.delete_tcp_outlet(node_name, worker_addr)) + } +} + #[derive(Debug, Clone, PartialEq, Eq)] pub struct TcpInlet { bind_addr: SocketAddr, diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/storage/tcp_portals_repository_sql.rs b/implementations/rust/ockam/ockam_api/src/cli_state/storage/tcp_portals_repository_sql.rs index 4db61a8cff7..3f532ac3072 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/storage/tcp_portals_repository_sql.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/storage/tcp_portals_repository_sql.rs @@ -14,6 +14,7 @@ use ockam_core::Error; use ockam_core::Result; use ockam_core::{async_trait, Address}; use ockam_multiaddr::MultiAddr; +use ockam_node::database::AutoRetry; use ockam_node::database::Boolean; use ockam_transport_core::HostnamePort; @@ -29,6 +30,15 @@ impl TcpPortalsSqlxDatabase { Self { database } } + /// Create a repository + pub fn make_repository(database: SqlxDatabase) -> Arc { + if database.needs_retry() { + Arc::new(AutoRetry::new(Self::new(database))) + } else { + Arc::new(Self::new(database)) + } + } + /// Create a new in-memory database #[allow(unused)] pub async fn create() -> Result> { diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/storage/users_repository.rs b/implementations/rust/ockam/ockam_api/src/cli_state/storage/users_repository.rs index 1b96330fee3..3aa43e158e4 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/storage/users_repository.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/storage/users_repository.rs @@ -2,6 +2,8 @@ use crate::cloud::email_address::EmailAddress; use crate::cloud::enroll::auth0::UserInfo; use ockam_core::async_trait; use ockam_core::Result; +use ockam_node::database::AutoRetry; +use ockam_node::retry; /// This traits allows user information to be stored locally. /// User information is retrieved when a user has been authenticated. @@ -38,3 +40,30 @@ pub trait UsersRepository: Send + Sync + 'static { /// Delete a user given their email async fn delete_user(&self, email: &EmailAddress) -> Result<()>; } + +#[async_trait] +impl UsersRepository for AutoRetry { + async fn store_user(&self, user: &UserInfo) -> ockam_core::Result<()> { + retry!(self.wrapped.store_user(user)) + } + + async fn get_default_user(&self) -> ockam_core::Result> { + retry!(self.wrapped.get_default_user()) + } + + async fn set_default_user(&self, email: &EmailAddress) -> ockam_core::Result<()> { + retry!(self.wrapped.set_default_user(email)) + } + + async fn get_user(&self, email: &EmailAddress) -> ockam_core::Result> { + retry!(self.wrapped.get_user(email)) + } + + async fn get_users(&self) -> ockam_core::Result> { + retry!(self.wrapped.get_users()) + } + + async fn delete_user(&self, email: &EmailAddress) -> ockam_core::Result<()> { + retry!(self.wrapped.delete_user(email)) + } +} diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/storage/users_repository_sql.rs b/implementations/rust/ockam/ockam_api/src/cli_state/storage/users_repository_sql.rs index c2f00904e0f..89214e1dde2 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/storage/users_repository_sql.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/storage/users_repository_sql.rs @@ -1,13 +1,14 @@ use itertools::Itertools; use sqlx::*; +use std::sync::Arc; use crate::cloud::email_address::EmailAddress; +use crate::cloud::enroll::auth0::UserInfo; use ockam_core::async_trait; use ockam_core::Result; +use ockam_node::database::AutoRetry; use ockam_node::database::{Boolean, FromSqlxError, SqlxDatabase, ToVoid}; -use crate::cloud::enroll::auth0::UserInfo; - use super::UsersRepository; #[derive(Clone)] @@ -22,6 +23,15 @@ impl UsersSqlxDatabase { Self { database } } + /// Create a repository + pub fn make_repository(database: SqlxDatabase) -> Arc { + if database.needs_retry() { + Arc::new(AutoRetry::new(Self::new(database))) + } else { + Arc::new(Self::new(database)) + } + } + /// Create a new in-memory database pub async fn create() -> Result { Ok(Self::new(SqlxDatabase::in_memory("users").await?)) diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/storage/vaults_repository.rs b/implementations/rust/ockam/ockam_api/src/cli_state/storage/vaults_repository.rs index 66cb236a225..3c82df06b73 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/storage/vaults_repository.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/storage/vaults_repository.rs @@ -1,6 +1,8 @@ use crate::cli_state::{NamedVault, VaultType}; use ockam_core::async_trait; use ockam_core::Result; +use ockam_node::database::AutoRetry; +use ockam_node::retry; /// This trait allows vaults to be defined with a name and a path /// in order to make it possible to store identity keys in different databases on disk (or in a KMS) @@ -24,3 +26,30 @@ pub trait VaultsRepository: Send + Sync + 'static { /// Return all vaults async fn get_named_vaults(&self) -> Result>; } + +#[async_trait] +impl VaultsRepository for AutoRetry { + async fn store_vault(&self, name: &str, vault_type: VaultType) -> Result { + retry!(self.wrapped.store_vault(name, vault_type.clone())) + } + + async fn update_vault(&self, name: &str, vault_type: VaultType) -> Result<()> { + retry!(self.wrapped.update_vault(name, vault_type.clone())) + } + + async fn delete_named_vault(&self, name: &str) -> Result<()> { + retry!(self.wrapped.delete_named_vault(name)) + } + + async fn get_database_vault(&self) -> Result> { + retry!(self.wrapped.get_database_vault()) + } + + async fn get_named_vault(&self, name: &str) -> Result> { + retry!(self.wrapped.get_named_vault(name)) + } + + async fn get_named_vaults(&self) -> Result> { + retry!(self.wrapped.get_named_vaults()) + } +} diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/storage/vaults_repository_sql.rs b/implementations/rust/ockam/ockam_api/src/cli_state/storage/vaults_repository_sql.rs index 8b085c70f5e..8042dd1af10 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/storage/vaults_repository_sql.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/storage/vaults_repository_sql.rs @@ -1,14 +1,14 @@ -use std::path::PathBuf; - use sqlx::*; +use std::path::PathBuf; +use std::sync::Arc; +use crate::cli_state::{NamedVault, UseAwsKms, VaultType, VaultsRepository}; use ockam::{FromSqlxError, SqlxDatabase, ToVoid}; use ockam_core::async_trait; use ockam_core::Result; +use ockam_node::database::AutoRetry; use ockam_node::database::{Boolean, Nullable}; -use crate::cli_state::{NamedVault, UseAwsKms, VaultType, VaultsRepository}; - #[derive(Clone)] pub struct VaultsSqlxDatabase { database: SqlxDatabase, @@ -20,6 +20,15 @@ impl VaultsSqlxDatabase { Self { database } } + /// Create a repository + pub fn make_repository(database: SqlxDatabase) -> Arc { + if database.needs_retry() { + Arc::new(AutoRetry::new(Self::new(database))) + } else { + Arc::new(Self::new(database)) + } + } + /// Create a new in-memory database pub async fn create() -> Result { Ok(Self::new(SqlxDatabase::in_memory("vaults").await?)) diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/secure_channel.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/secure_channel.rs index f67d3e9ae04..bf14ddc3b62 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/secure_channel.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/secure_channel.rs @@ -1,6 +1,5 @@ use std::time::Duration; -use crate::cli_state::AutoRetry; use crate::nodes::models::secure_channel::CreateSecureChannelListenerRequest; use crate::nodes::models::secure_channel::CreateSecureChannelRequest; use crate::nodes::models::secure_channel::DeleteSecureChannelListenerRequest; @@ -471,9 +470,7 @@ impl NodeManager { Ok(Arc::new(SecureChannels::new( identities, self.secure_channels.secure_channel_registry(), - Arc::new(AutoRetry::new(SecureChannelSqlxDatabase::new( - self.cli_state.database(), - ))), + SecureChannelSqlxDatabase::make_repository(self.cli_state.database()), ))) } } diff --git a/implementations/rust/ockam/ockam_api/tests/token_enrollment.rs b/implementations/rust/ockam/ockam_api/tests/token_enrollment.rs index 23901d380fc..c5d89e9ecf3 100644 --- a/implementations/rust/ockam/ockam_api/tests/token_enrollment.rs +++ b/implementations/rust/ockam/ockam_api/tests/token_enrollment.rs @@ -336,10 +336,7 @@ async fn usage_count_default(ctx: &mut Context) -> Result<()> { .await?; let member_client2 = change_client_identifier(&admin.client, &member2, None); - member_client1 - .present_token(ctx, otc.clone()) - .await - .unwrap(); + member_client1.present_token(ctx, otc).await.unwrap(); let res = member_client2.present_token(ctx, otc).await; assert!(res.is_err()); @@ -385,14 +382,8 @@ async fn usage_count2(ctx: &mut Context) -> Result<()> { .await?; let member_client3 = change_client_identifier(&admin.client, &member3, None); - member_client1 - .present_token(ctx, otc.clone()) - .await - .unwrap(); - member_client2 - .present_token(ctx, otc.clone()) - .await - .unwrap(); + member_client1.present_token(ctx, otc).await.unwrap(); + member_client2.present_token(ctx, otc).await.unwrap(); let res = member_client3.present_token(ctx, otc).await; assert!(res.is_err()); diff --git a/implementations/rust/ockam/ockam_app_lib/Cargo.toml b/implementations/rust/ockam/ockam_app_lib/Cargo.toml index 5e8a1b16625..45f7b2bc5cb 100644 --- a/implementations/rust/ockam/ockam_app_lib/Cargo.toml +++ b/implementations/rust/ockam/ockam_app_lib/Cargo.toml @@ -41,6 +41,7 @@ ockam = { path = "../ockam", version = "^0.144.0", features = ["software_vault"] ockam_api = { path = "../ockam_api", version = "0.87.0", default-features = false, features = ["std"] } ockam_core = { path = "../ockam_core", version = "^0.122.0" } ockam_multiaddr = { path = "../ockam_multiaddr", version = "0.66.0", features = ["cbor", "serde"] } +ockam_node = { path = "../ockam_node", version = "^0.134.0" } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" sqlx = { version = "0.8.2", default-features = false } diff --git a/implementations/rust/ockam/ockam_app_lib/src/state/model_state_repository.rs b/implementations/rust/ockam/ockam_app_lib/src/state/model_state_repository.rs index d4b1f05997b..556e88bb5ad 100644 --- a/implementations/rust/ockam/ockam_app_lib/src/state/model_state_repository.rs +++ b/implementations/rust/ockam/ockam_app_lib/src/state/model_state_repository.rs @@ -1,4 +1,6 @@ use ockam_core::async_trait; +use ockam_node::database::AutoRetry; +use ockam_node::retry; use crate::state::model::ModelState; use crate::Result; @@ -13,3 +15,14 @@ pub trait ModelStateRepository: Send + Sync + 'static { /// Load the model state from the database async fn load(&self, node_name: &str) -> Result; } + +#[async_trait] +impl ModelStateRepository for AutoRetry { + async fn store(&self, node_name: &str, model_state: &ModelState) -> Result<()> { + retry!(self.wrapped.store(node_name, model_state)) + } + + async fn load(&self, node_name: &str) -> Result { + retry!(self.wrapped.load(node_name)) + } +} diff --git a/implementations/rust/ockam/ockam_identity/src/identities/storage/change_history_repository.rs b/implementations/rust/ockam/ockam_identity/src/identities/storage/change_history_repository.rs index 7b18cee79c4..9750f8dfb6a 100644 --- a/implementations/rust/ockam/ockam_identity/src/identities/storage/change_history_repository.rs +++ b/implementations/rust/ockam/ockam_identity/src/identities/storage/change_history_repository.rs @@ -1,10 +1,13 @@ +use crate::models::{ChangeHistory, Identifier}; use crate::Identity; use ockam_core::async_trait; use ockam_core::compat::boxed::Box; use ockam_core::compat::vec::Vec; use ockam_core::Result; - -use crate::models::{ChangeHistory, Identifier}; +#[cfg(feature = "std")] +use ockam_node::database::AutoRetry; +#[cfg(feature = "std")] +use ockam_node::retry; /// This repository stores identity change histories #[async_trait] @@ -30,3 +33,33 @@ pub trait ChangeHistoryRepository: Send + Sync + 'static { /// Return all the change histories async fn get_change_histories(&self) -> Result>; } + +#[cfg(feature = "std")] +#[async_trait] +impl ChangeHistoryRepository for AutoRetry { + async fn update_identity(&self, identity: &Identity, ignore_older: bool) -> Result<()> { + retry!(self.wrapped.update_identity(identity, ignore_older)) + } + + async fn store_change_history( + &self, + identifier: &Identifier, + change_history: ChangeHistory, + ) -> Result<()> { + retry!(self + .wrapped + .store_change_history(identifier, change_history.clone())) + } + + async fn delete_change_history(&self, identifier: &Identifier) -> Result<()> { + retry!(self.wrapped.delete_change_history(identifier)) + } + + async fn get_change_history(&self, identifier: &Identifier) -> Result> { + retry!(self.wrapped.get_change_history(identifier)) + } + + async fn get_change_histories(&self) -> Result> { + retry!(self.wrapped.get_change_histories()) + } +} diff --git a/implementations/rust/ockam/ockam_identity/src/identities/storage/change_history_repository_sql.rs b/implementations/rust/ockam/ockam_identity/src/identities/storage/change_history_repository_sql.rs index d0787e1b8ea..283249f9489 100644 --- a/implementations/rust/ockam/ockam_identity/src/identities/storage/change_history_repository_sql.rs +++ b/implementations/rust/ockam/ockam_identity/src/identities/storage/change_history_repository_sql.rs @@ -1,20 +1,20 @@ use core::str::FromStr; - use sqlx::any::AnyArguments; use sqlx::encode::IsNull; use sqlx::error::BoxDynError; use sqlx::postgres::any::AnyArgumentBuffer; use sqlx::query::Query; use sqlx::*; +use std::sync::Arc; use tracing::debug; +use crate::models::{ChangeHistory, Identifier}; +use crate::{ChangeHistoryRepository, Identity, IdentityError, IdentityHistoryComparison, Vault}; use ockam_core::async_trait; use ockam_core::Result; +use ockam_node::database::AutoRetry; use ockam_node::database::{FromSqlxError, SqlxDatabase, ToVoid}; -use crate::models::{ChangeHistory, Identifier}; -use crate::{ChangeHistoryRepository, Identity, IdentityError, IdentityHistoryComparison, Vault}; - /// Implementation of [`ChangeHistoryRepository`] trait based on an underlying database /// using sqlx as its API, and Sqlite as its driver #[derive(Clone)] @@ -29,6 +29,15 @@ impl ChangeHistorySqlxDatabase { Self { database } } + /// Create a repository + pub fn make_repository(database: SqlxDatabase) -> Arc { + if database.needs_retry() { + Arc::new(AutoRetry::new(Self::new(database))) + } else { + Arc::new(Self::new(database)) + } + } + /// Create a new in-memory database pub async fn create() -> Result { Ok(Self::new(SqlxDatabase::in_memory("change history").await?)) diff --git a/implementations/rust/ockam/ockam_identity/src/identities/storage/credential_repository.rs b/implementations/rust/ockam/ockam_identity/src/identities/storage/credential_repository.rs index 1ce97a94b89..0a8ca581d8f 100644 --- a/implementations/rust/ockam/ockam_identity/src/identities/storage/credential_repository.rs +++ b/implementations/rust/ockam/ockam_identity/src/identities/storage/credential_repository.rs @@ -3,6 +3,10 @@ use crate::{Identifier, TimestampInSeconds}; use async_trait::async_trait; use ockam_core::compat::boxed::Box; use ockam_core::Result; +#[cfg(feature = "std")] +use ockam_node::database::AutoRetry; +#[cfg(feature = "std")] +use ockam_node::retry; /// This trait supports the persistence of cached credentials #[async_trait] @@ -28,3 +32,33 @@ pub trait CredentialRepository: Send + Sync + 'static { /// Delete credential async fn delete(&self, subject: &Identifier, issuer: &Identifier, scope: &str) -> Result<()>; } + +#[cfg(feature = "std")] +#[async_trait] +impl CredentialRepository for AutoRetry { + async fn get( + &self, + subject: &Identifier, + issuer: &Identifier, + scope: &str, + ) -> Result> { + retry!(self.wrapped.get(subject, issuer, scope)) + } + + async fn put( + &self, + subject: &Identifier, + issuer: &Identifier, + scope: &str, + expires_at: TimestampInSeconds, + credential: CredentialAndPurposeKey, + ) -> Result<()> { + retry!(self + .wrapped + .put(subject, issuer, scope, expires_at, credential.clone())) + } + + async fn delete(&self, subject: &Identifier, issuer: &Identifier, scope: &str) -> Result<()> { + retry!(self.wrapped.delete(subject, issuer, scope)) + } +} diff --git a/implementations/rust/ockam/ockam_identity/src/identities/storage/credential_repository_sql.rs b/implementations/rust/ockam/ockam_identity/src/identities/storage/credential_repository_sql.rs index 62175b9cb55..f4bd614fe7c 100644 --- a/implementations/rust/ockam/ockam_identity/src/identities/storage/credential_repository_sql.rs +++ b/implementations/rust/ockam/ockam_identity/src/identities/storage/credential_repository_sql.rs @@ -2,15 +2,16 @@ use sqlx::encode::IsNull; use sqlx::error::BoxDynError; use sqlx::postgres::any::AnyArgumentBuffer; use sqlx::*; +use std::sync::Arc; use tracing::debug; +use crate::models::{CredentialAndPurposeKey, Identifier}; +use crate::{CredentialRepository, TimestampInSeconds}; use ockam_core::async_trait; use ockam_core::Result; +use ockam_node::database::AutoRetry; use ockam_node::database::{FromSqlxError, SqlxDatabase, ToVoid}; -use crate::models::{CredentialAndPurposeKey, Identifier}; -use crate::{CredentialRepository, TimestampInSeconds}; - /// Implementation of `CredentialRepository` trait based on an underlying database /// using sqlx as its API, and Sqlite as its driver #[derive(Clone)] @@ -29,6 +30,18 @@ impl CredentialSqlxDatabase { } } + /// Create a repository + pub fn make_repository( + database: SqlxDatabase, + node_name: &str, + ) -> Arc { + if database.needs_retry() { + Arc::new(AutoRetry::new(Self::new(database, node_name))) + } else { + Arc::new(Self::new(database, node_name)) + } + } + /// Create a new in-memory database pub async fn create() -> Result { Ok(Self::new( diff --git a/implementations/rust/ockam/ockam_identity/src/identities/storage/identity_attributes_repository.rs b/implementations/rust/ockam/ockam_identity/src/identities/storage/identity_attributes_repository.rs index e51a0fd69d9..614c87b851c 100644 --- a/implementations/rust/ockam/ockam_identity/src/identities/storage/identity_attributes_repository.rs +++ b/implementations/rust/ockam/ockam_identity/src/identities/storage/identity_attributes_repository.rs @@ -2,6 +2,10 @@ use crate::{AttributesEntry, Identifier, TimestampInSeconds}; use async_trait::async_trait; use ockam_core::compat::boxed::Box; use ockam_core::Result; +#[cfg(feature = "std")] +use ockam_node::database::AutoRetry; +#[cfg(feature = "std")] +use ockam_node::retry; /// This trait supports the persistence of attributes associated to identities #[async_trait] @@ -20,3 +24,23 @@ pub trait IdentityAttributesRepository: Send + Sync + 'static { /// Remove all expired attributes async fn delete_expired_attributes(&self, now: TimestampInSeconds) -> Result<()>; } + +#[cfg(feature = "std")] +#[async_trait] +impl IdentityAttributesRepository for AutoRetry { + async fn get_attributes( + &self, + subject: &Identifier, + attested_by: &Identifier, + ) -> Result> { + retry!(self.wrapped.get_attributes(subject, attested_by)) + } + + async fn put_attributes(&self, subject: &Identifier, entry: AttributesEntry) -> Result<()> { + retry!(self.wrapped.put_attributes(subject, entry.clone())) + } + + async fn delete_expired_attributes(&self, now: TimestampInSeconds) -> Result<()> { + retry!(self.wrapped.delete_expired_attributes(now)) + } +} diff --git a/implementations/rust/ockam/ockam_identity/src/identities/storage/identity_attributes_repository_sql.rs b/implementations/rust/ockam/ockam_identity/src/identities/storage/identity_attributes_repository_sql.rs index 983b17f911b..79561bff4fd 100644 --- a/implementations/rust/ockam/ockam_identity/src/identities/storage/identity_attributes_repository_sql.rs +++ b/implementations/rust/ockam/ockam_identity/src/identities/storage/identity_attributes_repository_sql.rs @@ -1,18 +1,18 @@ use core::str::FromStr; - use sqlx::encode::IsNull; use sqlx::error::BoxDynError; use sqlx::postgres::any::AnyArgumentBuffer; use sqlx::*; +use std::sync::Arc; use tracing::debug; +use crate::models::Identifier; +use crate::{AttributesEntry, IdentityAttributesRepository, TimestampInSeconds}; use ockam_core::async_trait; use ockam_core::Result; +use ockam_node::database::AutoRetry; use ockam_node::database::{FromSqlxError, Nullable, SqlxDatabase, ToVoid}; -use crate::models::Identifier; -use crate::{AttributesEntry, IdentityAttributesRepository, TimestampInSeconds}; - /// Implementation of [`IdentityAttributesRepository`] trait based on an underlying database /// using sqlx as its API, and Sqlite as its driver #[derive(Clone)] @@ -31,6 +31,18 @@ impl IdentityAttributesSqlxDatabase { } } + /// Create a repository + pub fn make_repository( + database: SqlxDatabase, + node_name: &str, + ) -> Arc { + if database.needs_retry() { + Arc::new(AutoRetry::new(Self::new(database, node_name))) + } else { + Arc::new(Self::new(database, node_name)) + } + } + /// Create a new in-memory database pub async fn create() -> Result { Ok(Self::new( diff --git a/implementations/rust/ockam/ockam_identity/src/purpose_keys/storage/purpose_keys_repository.rs b/implementations/rust/ockam/ockam_identity/src/purpose_keys/storage/purpose_keys_repository.rs index 75fb02f588a..109b993fc43 100644 --- a/implementations/rust/ockam/ockam_identity/src/purpose_keys/storage/purpose_keys_repository.rs +++ b/implementations/rust/ockam/ockam_identity/src/purpose_keys/storage/purpose_keys_repository.rs @@ -1,9 +1,12 @@ -use ockam_core::async_trait; -use ockam_core::compat::boxed::Box; -use ockam_core::Result; - use crate::models::{Identifier, PurposeKeyAttestation}; use crate::Purpose; +use async_trait::async_trait; +use ockam_core::compat::boxed::Box; +use ockam_core::Result; +#[cfg(feature = "std")] +use ockam_node::database::AutoRetry; +#[cfg(feature = "std")] +use ockam_node::retry; // TODO: Only one PurposeKey per Purpose per Identity is supported for now @@ -34,3 +37,34 @@ pub trait PurposeKeysRepository: Send + Sync + 'static { /// Delete all keys async fn delete_all(&self) -> Result<()>; } + +#[cfg(feature = "std")] +#[async_trait] +impl PurposeKeysRepository for AutoRetry { + async fn set_purpose_key( + &self, + subject: &Identifier, + purpose: Purpose, + purpose_key_attestation: &PurposeKeyAttestation, + ) -> Result<()> { + retry!(self + .wrapped + .set_purpose_key(subject, purpose, purpose_key_attestation)) + } + + async fn delete_purpose_key(&self, subject: &Identifier, purpose: Purpose) -> Result<()> { + retry!(self.wrapped.delete_purpose_key(subject, purpose)) + } + + async fn get_purpose_key( + &self, + identifier: &Identifier, + purpose: Purpose, + ) -> Result> { + retry!(self.wrapped.get_purpose_key(identifier, purpose)) + } + + async fn delete_all(&self) -> Result<()> { + retry!(self.wrapped.delete_all()) + } +} diff --git a/implementations/rust/ockam/ockam_identity/src/purpose_keys/storage/purpose_keys_repository_sql.rs b/implementations/rust/ockam/ockam_identity/src/purpose_keys/storage/purpose_keys_repository_sql.rs index 56e3b0b28cf..042aa299692 100644 --- a/implementations/rust/ockam/ockam_identity/src/purpose_keys/storage/purpose_keys_repository_sql.rs +++ b/implementations/rust/ockam/ockam_identity/src/purpose_keys/storage/purpose_keys_repository_sql.rs @@ -1,23 +1,23 @@ use core::str::FromStr; - use sqlx::encode::IsNull; use sqlx::error::BoxDynError; use sqlx::postgres::any::AnyArgumentBuffer; use sqlx::*; +use std::sync::Arc; use tracing::debug; +use crate::identity::IdentityConstants; +use crate::models::{Identifier, PurposeKeyAttestation}; +use crate::purpose_keys::storage::PurposeKeysRepository; +use crate::Purpose; use ockam_core::async_trait; use ockam_core::compat::string::{String, ToString}; use ockam_core::compat::vec::Vec; use ockam_core::errcode::{Kind, Origin}; use ockam_core::Result; +use ockam_node::database::AutoRetry; use ockam_node::database::{FromSqlxError, SqlxDatabase, ToVoid}; -use crate::identity::IdentityConstants; -use crate::models::{Identifier, PurposeKeyAttestation}; -use crate::purpose_keys::storage::PurposeKeysRepository; -use crate::Purpose; - /// Storage for own [`super::super::super::purpose_key::PurposeKey`]s #[derive(Clone)] pub struct PurposeKeysSqlxDatabase { @@ -31,6 +31,15 @@ impl PurposeKeysSqlxDatabase { Self { database } } + /// Create a repository + pub fn make_repository(database: SqlxDatabase) -> Arc { + if database.needs_retry() { + Arc::new(AutoRetry::new(Self::new(database))) + } else { + Arc::new(Self::new(database)) + } + } + /// Create a new in-memory database for purpose keys pub async fn create() -> Result { Ok(Self::new(SqlxDatabase::in_memory("purpose keys").await?)) diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channels/storage/secure_channel_repository.rs b/implementations/rust/ockam/ockam_identity/src/secure_channels/storage/secure_channel_repository.rs index ff8e6fbfa60..aca9e3a7ec8 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channels/storage/secure_channel_repository.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channels/storage/secure_channel_repository.rs @@ -4,6 +4,10 @@ use async_trait::async_trait; use core::fmt::Debug; use ockam_core::compat::boxed::Box; use ockam_core::{Address, Result}; +#[cfg(feature = "std")] +use ockam_node::database::AutoRetry; +#[cfg(feature = "std")] +use ockam_node::retry; use ockam_vault::AeadSecretKeyHandle; /// Secure Channel that was saved to a storage @@ -82,3 +86,22 @@ pub trait SecureChannelRepository: Send + Sync + 'static { /// Delete a secure channel async fn delete(&self, decryptor_remote_address: &Address) -> Result<()>; } + +#[cfg(feature = "std")] +#[async_trait] +impl SecureChannelRepository for AutoRetry { + async fn get( + &self, + decryptor_remote_address: &Address, + ) -> Result> { + retry!(self.wrapped.get(decryptor_remote_address)) + } + + async fn put(&self, secure_channel: PersistedSecureChannel) -> Result<()> { + retry!(self.wrapped.put(secure_channel.clone())) + } + + async fn delete(&self, decryptor_remote_address: &Address) -> Result<()> { + retry!(self.wrapped.delete(decryptor_remote_address)) + } +} diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channels/storage/secure_channel_repository_sql.rs b/implementations/rust/ockam/ockam_identity/src/secure_channels/storage/secure_channel_repository_sql.rs index bd9651a0c39..32e84e7b266 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channels/storage/secure_channel_repository_sql.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channels/storage/secure_channel_repository_sql.rs @@ -1,10 +1,12 @@ use sqlx::*; +use std::sync::Arc; use tracing::debug; use crate::secure_channel::Role; use crate::Identifier; use ockam_core::{async_trait, Address}; use ockam_core::{Error, Result}; +use ockam_node::database::AutoRetry; use ockam_node::database::{FromSqlxError, SqlxDatabase, ToVoid}; use ockam_vault::{AeadSecretKeyHandle, HandleToSecret}; @@ -26,6 +28,15 @@ impl SecureChannelSqlxDatabase { Self { database } } + /// Create a repository + pub fn make_repository(database: SqlxDatabase) -> Arc { + if database.needs_retry() { + Arc::new(AutoRetry::new(Self::new(database))) + } else { + Arc::new(Self::new(database)) + } + } + /// Create a new in-memory database pub async fn create() -> Result { Ok(Self::new(SqlxDatabase::in_memory("secure_channel").await?)) diff --git a/implementations/rust/ockam/ockam_node/src/storage/database/auto_retry.rs b/implementations/rust/ockam/ockam_node/src/storage/database/auto_retry.rs new file mode 100644 index 00000000000..8a7278fd1ff --- /dev/null +++ b/implementations/rust/ockam/ockam_node/src/storage/database/auto_retry.rs @@ -0,0 +1,40 @@ +/// This macro wraps a function with retry calls to keep calling the function until +/// the error "database is locked" is not raised anymore. +#[macro_export] +macro_rules! retry { + ($async_function:expr) => {{ + let mut retries = 0; + loop { + match $async_function.await { + Ok(result) => break Ok(result), + Err(err) => { + if err.to_string().contains("database is locked") && retries < 100 { + ockam_node::tokio::time::sleep( + ockam_node::tokio::time::Duration::from_millis(10), + ) + .await; + } else { + break Err(err); + } + retries += 1; + } + } + } + }}; +} + +/// Wrapper for an auto-retried struct +#[derive(Clone)] +pub struct AutoRetry { + /// Internal implementation of the trait + pub wrapped: T, +} + +impl AutoRetry { + /// Wrap a trait so that it is auto-retried + pub fn new(wrapped_trait: T) -> AutoRetry { + Self { + wrapped: wrapped_trait, + } + } +} diff --git a/implementations/rust/ockam/ockam_node/src/storage/database/mod.rs b/implementations/rust/ockam/ockam_node/src/storage/database/mod.rs index aa19faf6447..91fdd382ab2 100644 --- a/implementations/rust/ockam/ockam_node/src/storage/database/mod.rs +++ b/implementations/rust/ockam/ockam_node/src/storage/database/mod.rs @@ -1,8 +1,10 @@ +mod auto_retry; mod database_configuration; mod migrations; mod sqlx_database; mod sqlx_from_row_types; +pub use auto_retry::*; pub use database_configuration::*; pub use migrations::*; pub use sqlx_database::*; diff --git a/implementations/rust/ockam/ockam_node/src/storage/database/sqlx_database.rs b/implementations/rust/ockam/ockam_node/src/storage/database/sqlx_database.rs index 72ca49f0f30..68e32e983a9 100644 --- a/implementations/rust/ockam/ockam_node/src/storage/database/sqlx_database.rs +++ b/implementations/rust/ockam/ockam_node/src/storage/database/sqlx_database.rs @@ -239,6 +239,15 @@ impl SqlxDatabase { Ok(db) } + /// Return true if the database implementation might lock (which is the case for Sqlite on disk) + /// and the database user needs to retry several times. + pub fn needs_retry(&self) -> bool { + matches!( + self.configuration, + DatabaseConfiguration::SqlitePersistent { .. } + ) + } + async fn create_at(configuration: &DatabaseConfiguration) -> Result { // Creates database file if it doesn't exist let pool = Self::create_connection_pool(configuration).await?; diff --git a/implementations/rust/ockam/ockam_vault/src/storage/secrets_repository.rs b/implementations/rust/ockam/ockam_vault/src/storage/secrets_repository.rs index 0958a70c712..536bb837dbd 100644 --- a/implementations/rust/ockam/ockam_vault/src/storage/secrets_repository.rs +++ b/implementations/rust/ockam/ockam_vault/src/storage/secrets_repository.rs @@ -7,6 +7,12 @@ use ockam_core::compat::boxed::Box; use ockam_core::compat::vec::Vec; use ockam_core::Result; +#[cfg(feature = "std")] +use ockam_node::database::AutoRetry; + +#[cfg(feature = "std")] +use ockam_node::retry; + /// A secrets repository supports the persistence of signing and X25519 secrets #[async_trait] pub trait SecretsRepository: Send + Sync + 'static { @@ -64,3 +70,73 @@ pub trait SecretsRepository: Send + Sync + 'static { /// Delete all secrets async fn delete_all(&self) -> Result<()>; } + +#[cfg(feature = "std")] +#[async_trait] +impl SecretsRepository for AutoRetry { + async fn store_signing_secret( + &self, + handle: &SigningSecretKeyHandle, + secret: SigningSecret, + ) -> Result<()> { + retry!(self.wrapped.store_signing_secret(handle, secret.clone())) + } + + async fn delete_signing_secret(&self, handle: &SigningSecretKeyHandle) -> Result { + retry!(self.wrapped.delete_signing_secret(handle)) + } + + async fn get_signing_secret( + &self, + handle: &SigningSecretKeyHandle, + ) -> Result> { + retry!(self.wrapped.get_signing_secret(handle)) + } + + async fn get_signing_secret_handles(&self) -> Result> { + retry!(self.wrapped.get_signing_secret_handles()) + } + + async fn store_x25519_secret( + &self, + handle: &X25519SecretKeyHandle, + secret: X25519SecretKey, + ) -> Result<()> { + retry!(self.wrapped.store_x25519_secret(handle, secret.clone())) + } + + async fn delete_x25519_secret(&self, handle: &X25519SecretKeyHandle) -> Result { + retry!(self.wrapped.delete_x25519_secret(handle)) + } + + async fn get_x25519_secret( + &self, + handle: &X25519SecretKeyHandle, + ) -> Result> { + retry!(self.wrapped.get_x25519_secret(handle)) + } + + async fn get_x25519_secret_handles(&self) -> Result> { + retry!(self.wrapped.get_x25519_secret_handles()) + } + + async fn store_aead_secret( + &self, + handle: &AeadSecretKeyHandle, + secret: AeadSecret, + ) -> Result<()> { + retry!(self.wrapped.store_aead_secret(handle, secret.clone())) + } + + async fn delete_aead_secret(&self, handle: &AeadSecretKeyHandle) -> Result { + retry!(self.wrapped.delete_aead_secret(handle)) + } + + async fn get_aead_secret(&self, handle: &AeadSecretKeyHandle) -> Result> { + retry!(self.wrapped.get_aead_secret(handle)) + } + + async fn delete_all(&self) -> Result<()> { + retry!(self.wrapped.delete_all()) + } +} diff --git a/implementations/rust/ockam/ockam_vault/src/storage/secrets_repository_sql.rs b/implementations/rust/ockam/ockam_vault/src/storage/secrets_repository_sql.rs index baaa5e1a0d4..0f4cb2e1bfc 100644 --- a/implementations/rust/ockam/ockam_vault/src/storage/secrets_repository_sql.rs +++ b/implementations/rust/ockam/ockam_vault/src/storage/secrets_repository_sql.rs @@ -2,17 +2,18 @@ use sqlx::encode::IsNull; use sqlx::error::BoxDynError; use sqlx::postgres::any::AnyArgumentBuffer; use sqlx::*; +use std::sync::Arc; use tracing::debug; use zeroize::{Zeroize, ZeroizeOnDrop}; +use crate::storage::secrets_repository::SecretsRepository; use ockam_core::async_trait; use ockam_core::compat::vec::Vec; use ockam_core::errcode::{Kind, Origin}; use ockam_core::Result; +use ockam_node::database::AutoRetry; use ockam_node::database::{FromSqlxError, SqlxDatabase, ToVoid}; -use crate::storage::secrets_repository::SecretsRepository; - use crate::{ AeadSecret, AeadSecretKeyHandle, ECDSASHA256CurveP256SecretKey, EdDSACurve25519SecretKey, HandleToSecret, SigningSecret, SigningSecretKeyHandle, X25519SecretKey, X25519SecretKeyHandle, @@ -32,6 +33,15 @@ impl SecretsSqlxDatabase { Self { database } } + /// Create a repository + pub fn make_repository(database: SqlxDatabase) -> Arc { + if database.needs_retry() { + Arc::new(AutoRetry::new(Self::new(database))) + } else { + Arc::new(Self::new(database)) + } + } + /// Create a new in-memory database for secrets pub async fn create() -> Result { Ok(Self::new(SqlxDatabase::in_memory("secrets").await?))