Skip to content

Commit

Permalink
refactor(rust): propagate the async constraint when using an in-memor…
Browse files Browse the repository at this point in the history
…y repository
  • Loading branch information
etorreborre committed Nov 22, 2023
1 parent 13cf605 commit 69ab0e6
Show file tree
Hide file tree
Showing 61 changed files with 337 additions and 277 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use ockam_vault::{EdDSACurve25519SecretKey, SigningSecret, SoftwareVaultForSigni

#[ockam::node]
async fn main(ctx: Context) -> Result<()> {
let identity_vault = SoftwareVaultForSigning::create();
let identity_vault = SoftwareVaultForSigning::create().await?;
// Import the signing secret key to the Vault
let secret = identity_vault
.import_key(SigningSecret::EdDSACurve25519(EdDSACurve25519SecretKey::new(
Expand All @@ -21,10 +21,10 @@ async fn main(ctx: Context) -> Result<()> {
.await?;

// Create a default Vault but use the signing vault with our secret in it
let mut vault = Vault::create();
let mut vault = Vault::create().await?;
vault.identity_vault = identity_vault;

let mut node = Node::builder().with_vault(vault).build(&ctx).await?;
let mut node = Node::builder().await?.with_vault(vault).build(&ctx).await?;
// Initialize the TCP Transport
let tcp = node.create_tcp_transport().await?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use ockam_vault::{EdDSACurve25519SecretKey, SigningSecret, SoftwareVaultForSigni

#[ockam::node]
async fn main(ctx: Context) -> Result<()> {
let identity_vault = SoftwareVaultForSigning::create();
let identity_vault = SoftwareVaultForSigning::create().await?;
// Import the signing secret key to the Vault
let secret = identity_vault
.import_key(SigningSecret::EdDSACurve25519(EdDSACurve25519SecretKey::new(
Expand All @@ -21,7 +21,7 @@ async fn main(ctx: Context) -> Result<()> {
.await?;

// Create a default Vault but use the signing vault with our secret in it
let mut vault = Vault::create();
let mut vault = Vault::create().await?;
vault.identity_vault = identity_vault;

let node = Node::builder().with_vault(vault).build(&ctx).await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use ockam_vault::{EdDSACurve25519SecretKey, SigningSecret, SoftwareVaultForSigni

#[ockam::node]
async fn main(ctx: Context) -> Result<()> {
let identity_vault = SoftwareVaultForSigning::create();
let identity_vault = SoftwareVaultForSigning::create().await?;
// Import the signing secret key to the Vault
let secret = identity_vault
.import_key(SigningSecret::EdDSACurve25519(EdDSACurve25519SecretKey::new(
Expand All @@ -26,10 +26,10 @@ async fn main(ctx: Context) -> Result<()> {
.await?;

// Create a default Vault but use the signing vault with our secret in it
let mut vault = Vault::create();
let mut vault = Vault::create().await?;
vault.identity_vault = identity_vault;

let node = Node::builder().with_vault(vault).build(&ctx).await?;
let node = Node::builder().await?.with_vault(vault).build(&ctx).await?;

// Initialize the TCP Transport
let tcp = node.create_tcp_transport().await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,11 @@ async fn start_node(ctx: Context, project_information_path: &str, token: OneTime
.await?;

// 3. create an access control policy checking the value of the "component" attribute of the caller
let access_control =
AbacAccessControl::create(identities().identity_attributes_repository(), "component", "control");
let access_control = AbacAccessControl::create(
identities().await?.identity_attributes_repository(),
"component",
"control",
);

// 4. create a tcp inlet with the above policy

Expand Down
3 changes: 1 addition & 2 deletions implementations/elixir/ockam/ockly/native/ockly/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 20 additions & 14 deletions implementations/elixir/ockam/ockly/native/ockly/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,19 @@ fn identities_ref() -> NifResult<Arc<Identities>> {

fn load_memory_vault() -> bool {
block_future(async move {
let identity_vault = SoftwareVaultForSigning::create();
let secure_channel_vault = SoftwareVaultForSecureChannels::create();
let identity_vault = SoftwareVaultForSigning::create().await.unwrap();
let secure_channel_vault = SoftwareVaultForSecureChannels::create().await.unwrap();
*IDENTITY_MEMORY_VAULT.write().unwrap() = Some(identity_vault.clone());
*SECURE_CHANNEL_MEMORY_VAULT.write().unwrap() = Some(secure_channel_vault.clone());
let builder = ockam_identity::Identities::builder().with_vault(Vault::new(
identity_vault,
secure_channel_vault,
Vault::create_credential_vault(),
Vault::create_verifying_vault(),
));
let builder = ockam_identity::Identities::builder()
.await
.unwrap()
.with_vault(Vault::new(
identity_vault,
secure_channel_vault,
Vault::create_credential_vault().await.unwrap(),
Vault::create_verifying_vault(),
));
*IDENTITIES.write().unwrap() = Some(builder.build());
});
true
Expand All @@ -125,12 +128,15 @@ fn setup_aws_kms(key_ids: Vec<String>) -> NifResult<bool> {
match AwsSigningVault::create_with_config(config).await {
Ok(vault) => {
let aws_vault = Arc::new(vault);
let builder = ockam_identity::Identities::builder().with_vault(Vault::new(
aws_vault.clone(),
secure_channel_vault,
aws_vault,
Vault::create_verifying_vault(),
));
let builder = ockam_identity::Identities::builder()
.await
.map_err(|e| Error::Term(Box::new(e.to_string())))?
.with_vault(Vault::new(
aws_vault.clone(),
secure_channel_vault,
aws_vault,
Vault::create_verifying_vault(),
));
*IDENTITIES.write().unwrap() = Some(builder.build());
Ok(true)
}
Expand Down
72 changes: 36 additions & 36 deletions implementations/rust/ockam/ockam/src/node.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
use ockam_core::compat::string::String;
use ockam_core::compat::sync::Arc;
use ockam_core::flow_control::FlowControls;
use ockam_core::{
Address, AsyncTryClone, IncomingAccessControl, Message, OutgoingAccessControl, Processor,
Result, Route, Routed, Worker,
};
use ockam_core::compat::string::String;
use ockam_core::compat::sync::Arc;
use ockam_core::flow_control::FlowControls;
use ockam_identity::{IdentityAttributesRepository, PurposeKeys, Vault};
use ockam_node::{Context, HasContext, MessageReceiveOptions, MessageSendReceiveOptions};
use ockam_vault::storage::SecretsRepository;
use ockam_vault::SigningSecretKeyHandle;
use ockam_vault::storage::SecretsRepository;

use crate::identity::models::Identifier;
#[cfg(feature = "storage")]
use crate::identity::secure_channels;
use crate::identity::{
ChangeHistoryRepository, Credentials, CredentialsServer, Identities, IdentitiesCreation,
IdentitiesKeys, SecureChannel, SecureChannelListener, SecureChannelRegistry, SecureChannels,
SecureChannelsBuilder,
};
use crate::identity::{SecureChannelListenerOptions, SecureChannelOptions};
use crate::identity::models::Identifier;
#[cfg(feature = "storage")]
use crate::identity::secure_channels;
use crate::OckamError;
use crate::remote::{RemoteRelay, RemoteRelayInfo, RemoteRelayOptions};
use crate::stream::Stream;
use crate::OckamError;

/// This struct supports all the ockam services for managing identities
/// and creating secure channels
Expand All @@ -41,18 +41,18 @@ pub struct Node {
/// use ockam_vault::storage::SecretsSqlxDatabase;
///
/// async fn make_node(ctx: Context) -> Result<Node> {
/// let node = Node::builder().with_secrets_repository(SecretsSqlxDatabase::create()).build(&ctx).await?;
/// let node = Node::builder().await?.with_secrets_repository(SecretsSqlxDatabase::create().await?).build(&ctx).await?;
/// Ok(node)
/// }
///
///
/// ```
#[cfg(feature = "storage")]
pub fn node(ctx: Context) -> Node {
Node {
pub async fn node(ctx: Context) -> Result<Node> {
Ok(Node {
context: ctx,
secure_channels: secure_channels(),
}
secure_channels: secure_channels().await?,
})
}

impl Node {
Expand Down Expand Up @@ -159,8 +159,8 @@ impl Node {

/// Start a new worker instance at the given address. Default Access Control is AllowAll
pub async fn start_worker<W>(&self, address: impl Into<Address>, worker: W) -> Result<()>
where
W: Worker<Context = Context>,
where
W: Worker<Context=Context>,
{
self.context.start_worker(address, worker).await
}
Expand All @@ -173,8 +173,8 @@ impl Node {
incoming: impl IncomingAccessControl,
outgoing: impl OutgoingAccessControl,
) -> Result<()>
where
W: Worker<Context = Context>,
where
W: Worker<Context=Context>,
{
self.context
.start_worker_with_access_control(address, worker, incoming, outgoing)
Expand All @@ -183,8 +183,8 @@ impl Node {

/// Start a new processor instance at the given address. Default Access Control is DenyAll
pub async fn start_processor<P>(&self, address: impl Into<Address>, processor: P) -> Result<()>
where
P: Processor<Context = Context>,
where
P: Processor<Context=Context>,
{
self.context.start_processor(address, processor).await
}
Expand All @@ -197,8 +197,8 @@ impl Node {
incoming: impl IncomingAccessControl,
outgoing: impl OutgoingAccessControl,
) -> Result<()>
where
P: Processor<Context = Context>,
where
P: Processor<Context=Context>,
{
self.context
.start_processor_with_access_control(address, processor, incoming, outgoing)
Expand All @@ -212,17 +212,17 @@ impl Node {

/// Send a message to an address or via a fully-qualified route
pub async fn send<R, M>(&self, route: R, msg: M) -> Result<()>
where
R: Into<Route>,
M: Message + Send + 'static,
where
R: Into<Route>,
M: Message + Send + 'static,
{
self.context.send(route, msg).await
}

/// Send a message to an address or via a fully-qualified route and receive a response
pub async fn send_and_receive<M>(&self, route: impl Into<Route>, msg: impl Message) -> Result<M>
where
M: Message,
where
M: Message,
{
self.context.send_and_receive(route, msg).await
}
Expand All @@ -234,8 +234,8 @@ impl Node {
msg: impl Message,
options: MessageSendReceiveOptions,
) -> Result<Routed<M>>
where
M: Message,
where
M: Message,
{
self.context
.send_and_receive_extended(route, msg, options)
Expand All @@ -252,8 +252,8 @@ impl Node {
&mut self,
options: MessageReceiveOptions,
) -> Result<Routed<M>>
where
M: Message,
where
M: Message,
{
self.context.receive_extended(options).await
}
Expand Down Expand Up @@ -314,8 +314,8 @@ impl Node {

/// Return a new builder for top-level services
#[cfg(feature = "storage")]
pub fn builder() -> NodeBuilder {
NodeBuilder::new()
pub async fn builder() -> Result<NodeBuilder> {
NodeBuilder::new().await
}
}

Expand All @@ -336,10 +336,10 @@ pub struct NodeBuilder {

impl NodeBuilder {
#[cfg(feature = "storage")]
fn new() -> Self {
Self {
builder: SecureChannels::builder(),
}
async fn new() -> Result<Self> {
Ok(Self {
builder: SecureChannels::builder().await?,
})
}

/// Set [`Vault`]
Expand Down
2 changes: 1 addition & 1 deletion implementations/rust/ockam/ockam/tests/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ async fn test4(ctx: &mut Context) -> Result<()> {
.relay_as_consumer(&cloud_secure_channel_listener_options.spawner_flow_control_id());
RelayService::create(ctx, "forwarding_service", options).await?;

let secure_channels = secure_channels();
let secure_channels = secure_channels().await?;
let identities_creation = secure_channels.identities().identities_creation();
let cloud = identities_creation.create_identity().await?;
secure_channels
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ impl PolicySqlxDatabase {
}

/// Create a new in-memory database for policies
pub fn create() -> Arc<Self> {
Arc::new(Self::new(Arc::new(SqlxDatabase::in_memory("policies"))))
pub async fn create() -> Result<Arc<Self>> {
Ok(Arc::new(Self::new(
SqlxDatabase::in_memory("policies").await?,
)))
}
}

Expand Down Expand Up @@ -114,7 +116,7 @@ mod test {

#[tokio::test]
async fn test_repository() -> Result<()> {
let repository = create_repository();
let repository = create_repository().await?;

// a policy can be associated to a resource and an action
let r = Resource::from("outlet");
Expand Down Expand Up @@ -143,7 +145,7 @@ mod test {
}

/// HELPERS
fn create_repository() -> Arc<dyn PoliciesRepository> {
PolicySqlxDatabase::create()
async fn create_repository() -> Result<Arc<dyn PoliciesRepository>> {
Ok(PolicySqlxDatabase::create().await?)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ impl Authority {
let purpose_keys_repository = Arc::new(PurposeKeysSqlxDatabase::new(database));

let secure_channels = SecureChannels::builder()
.await?
.with_vault(vault)
.with_identity_attributes_repository(identity_attributes_repository)
.with_change_history_repository(change_history_repository)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ mod test {
#[tokio::test]
async fn test_cli_spaces() -> Result<()> {
let cli = CliState::test().await?;
let identities = identities();
let identities = identities().await?;
let issuer_identifier = identities.identities_creation().create_identity().await?;
let issuer = identities.get_identity(&issuer_identifier).await?;
let credential = create_credential(identities, &issuer_identifier).await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,10 @@ impl EnrollmentsSqlxDatabase {
}

/// Create a new in-memory database
pub fn create() -> Arc<Self> {
Arc::new(Self::new(Arc::new(SqlxDatabase::in_memory("enrollments"))))
pub async fn create() -> Result<Arc<EnrollmentsSqlxDatabase>> {
Ok(Arc::new(Self::new(
SqlxDatabase::in_memory("enrollments").await?,
)))
}
}

Expand Down
1 change: 1 addition & 0 deletions implementations/rust/ockam/ockam_api/src/cli_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ impl CliState {

pub async fn get_identities_for_vault(&self, vault: NamedVault) -> Result<Arc<Identities>> {
Ok(Identities::builder()
.await?
.with_vault(vault.vault().await?)
.with_change_history_repository(self.change_history_repository().await?)
.with_identity_attributes_repository(self.identity_attributes_repository().await?)
Expand Down
Loading

0 comments on commit 69ab0e6

Please sign in to comment.