diff --git a/Cargo.lock b/Cargo.lock index 801ccad448..9e6b2b4bd6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1504,7 +1504,6 @@ dependencies = [ "postgres-types", "pretty_assertions_sorted", "rand 0.8.5", - "rebaser-client", "rebaser-core", "rebaser-server", "refinery", @@ -1554,7 +1553,6 @@ dependencies = [ "opentelemetry-otlp", "opentelemetry_sdk", "pinga-server", - "rebaser-client", "rebaser-server", "remain", "serde", @@ -3967,7 +3965,6 @@ dependencies = [ "derive_builder", "futures", "nats-subscriber", - "rebaser-client", "remain", "serde", "serde_json", @@ -4449,22 +4446,6 @@ dependencies = [ "tokio-util", ] -[[package]] -name = "rebaser-client" -version = "0.1.0" -dependencies = [ - "futures", - "rebaser-core", - "remain", - "serde", - "serde_json", - "si-data-nats", - "si-events", - "telemetry", - "thiserror", - "ulid", -] - [[package]] name = "rebaser-core" version = "0.1.0" @@ -4985,7 +4966,6 @@ dependencies = [ "clap 4.5.3", "color-eyre", "nats-multiplexer", - "rebaser-client", "sdf-server", "si-std", "telemetry-application", @@ -5592,6 +5572,7 @@ dependencies = [ "tempfile", "thiserror", "tokio", + "tokio-stream", "tokio-util", "ulid", ] diff --git a/Cargo.toml b/Cargo.toml index b380d1eb38..b8cf951be4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,7 +32,6 @@ members = [ "lib/naxum", "lib/object-tree", "lib/pinga-server", - "lib/rebaser-client", "lib/rebaser-core", "lib/rebaser-server", "lib/sdf-server", diff --git a/bin/sdf/BUCK b/bin/sdf/BUCK index ea14648bd2..c1e732a7de 100644 --- a/bin/sdf/BUCK +++ b/bin/sdf/BUCK @@ -9,7 +9,6 @@ rust_binary( name = "sdf", deps = [ "//lib/nats-multiplexer:nats-multiplexer", - "//lib/rebaser-client:rebaser-client", "//lib/sdf-server:sdf-server", "//lib/si-std:si-std", "//lib/telemetry-application-rs:telemetry-application", diff --git a/bin/sdf/Cargo.toml b/bin/sdf/Cargo.toml index f4d06d7791..cf81a03ee9 100644 --- a/bin/sdf/Cargo.toml +++ b/bin/sdf/Cargo.toml @@ -11,7 +11,6 @@ path = "src/main.rs" [dependencies] nats-multiplexer = { path = "../../lib/nats-multiplexer" } -rebaser-client = { path = "../../lib/rebaser-client" } sdf-server = { path = "../../lib/sdf-server" } si-std = { path = "../../lib/si-std" } telemetry-application = { path = "../../lib/telemetry-application-rs" } diff --git a/bin/sdf/src/main.rs b/bin/sdf/src/main.rs index a2e6927ed3..a45a0fb8a5 100644 --- a/bin/sdf/src/main.rs +++ b/bin/sdf/src/main.rs @@ -13,8 +13,6 @@ use sdf_server::{ use telemetry_application::prelude::*; use tokio_util::{sync::CancellationToken, task::TaskTracker}; -use rebaser_client::Config as RebaserClientConfig; - mod args; type JobProcessor = sdf_server::NatsProcessor; @@ -111,10 +109,6 @@ async fn async_main() -> Result<()> { let module_index_url = config.module_index_url().to_string(); - // TODO: accept command line arguments and or environment variables to configure the rebaser - // client - let rebaser_config = RebaserClientConfig::default(); - let (ws_multiplexer, ws_multiplexer_client) = Multiplexer::new(&nats_conn, WS_MULTIPLEXER_SUBJECT).await?; let (crdt_multiplexer, crdt_multiplexer_client) = @@ -138,7 +132,6 @@ async fn async_main() -> Result<()> { Some(pkgs_path), Some(module_index_url), symmetric_crypto_service, - rebaser_config, layer_db, ); diff --git a/lib/dal-test/BUCK b/lib/dal-test/BUCK index 23b4942694..3040ab423e 100644 --- a/lib/dal-test/BUCK +++ b/lib/dal-test/BUCK @@ -8,7 +8,6 @@ rust_library( "//lib/dal:dal", "//lib/module-index-client:module-index-client", "//lib/pinga-server:pinga-server", - "//lib/rebaser-client:rebaser-client", "//lib/rebaser-server:rebaser-server", "//lib/si-crypto:si-crypto", "//lib/si-data-nats:si-data-nats", diff --git a/lib/dal-test/Cargo.toml b/lib/dal-test/Cargo.toml index 3c0e3a2c6f..ce1811b039 100644 --- a/lib/dal-test/Cargo.toml +++ b/lib/dal-test/Cargo.toml @@ -18,7 +18,6 @@ names = { workspace = true } opentelemetry-otlp = { workspace = true } opentelemetry_sdk = { workspace = true } pinga-server = { path = "../../lib/pinga-server" } -rebaser-client = { path = "../../lib/rebaser-client"} rebaser-server = { path = "../../lib/rebaser-server" } remain = { workspace = true } serde = { workspace = true } diff --git a/lib/dal-test/src/lib.rs b/lib/dal-test/src/lib.rs index 762b4fa933..56da008af3 100644 --- a/lib/dal-test/src/lib.rs +++ b/lib/dal-test/src/lib.rs @@ -16,7 +16,6 @@ use dal::{ use derive_builder::Builder; use jwt_simple::prelude::RS256KeyPair; use lazy_static::lazy_static; -use rebaser_client::Config as RebaserClientConfig; use si_crypto::{ SymmetricCryptoService, SymmetricCryptoServiceConfig, SymmetricCryptoServiceConfigFile, }; @@ -109,8 +108,6 @@ pub struct Config { symmetric_crypto_service_config: SymmetricCryptoServiceConfig, // TODO(nick): determine why this is unused. #[allow(dead_code)] - #[builder(default)] - rebaser_config: RebaserClientConfig, #[builder(default = "si_layer_cache::default_pg_pool_config()")] layer_cache_pg_pool: PgPoolConfig, } @@ -219,8 +216,6 @@ pub struct TestContext { layer_db_pg_pool: PgPool, /// The sled path for the layer db layer_db_sled_path: String, - /// The configuration for the rebaser client used in tests - rebaser_config: RebaserClientConfig, } impl TestContext { @@ -313,7 +308,6 @@ impl TestContext { self.config.pkgs_path.to_owned(), None, self.symmetric_crypto_service.clone(), - self.rebaser_config.clone(), layer_db, ) } @@ -396,9 +390,6 @@ impl TestContextBuilder { SymmetricCryptoService::from_config(&self.config.symmetric_crypto_service_config) .await?; - let mut rebaser_config = RebaserClientConfig::default(); - rebaser_config.set_subject_prefix(universal_prefix); - Ok(TestContext { config, pg_pool, @@ -406,7 +397,6 @@ impl TestContextBuilder { job_processor, encryption_key: self.encryption_key.clone(), symmetric_crypto_service, - rebaser_config, layer_db_pg_pool, layer_db_sled_path: si_layer_cache::disk_cache::default_sled_path()?.to_string(), }) @@ -540,15 +530,6 @@ pub fn pinga_server(services_context: &ServicesContext) -> Result Result { - let _config: rebaser_server::Config = { - let mut config_file = rebaser_server::ConfigFile::default(); - rebaser_server::detect_and_configure_development(&mut config_file) - .wrap_err("failed to detect and configure Rebaser ConfigFile")?; - config_file - .try_into() - .wrap_err("failed to build Rebaser server config")? - }; - let server = rebaser_server::Server::from_services( services_context.encryption_key(), services_context.nats_conn().clone(), @@ -556,7 +537,6 @@ pub fn rebaser_server(services_context: &ServicesContext) -> Result Result<()> { .expect("no pkgs path configured"), test_context.config.module_index_url.clone(), services_ctx.symmetric_crypto_service(), - services_ctx.rebaser_config().clone(), services_ctx.layer_db().clone(), ) .await @@ -745,7 +724,6 @@ async fn migrate_local_builtins( pkgs_path: PathBuf, module_index_url: String, symmetric_crypto_service: &SymmetricCryptoService, - rebaser_config: RebaserClientConfig, layer_db: DalLayerDb, ) -> ModelResult<()> { let services_context = ServicesContext::new( @@ -757,7 +735,6 @@ async fn migrate_local_builtins( Some(pkgs_path), Some(module_index_url), symmetric_crypto_service.clone(), - rebaser_config, layer_db.clone(), ); let dal_context = services_context.into_builder(true); diff --git a/lib/dal/BUCK b/lib/dal/BUCK index 36041bef69..46c937b43f 100644 --- a/lib/dal/BUCK +++ b/lib/dal/BUCK @@ -11,7 +11,6 @@ rust_library( "//lib/council-server:council-server", "//lib/nats-subscriber:nats-subscriber", "//lib/object-tree:object-tree", - "//lib/rebaser-client:rebaser-client", "//lib/si-crypto:si-crypto", "//lib/si-data-nats:si-data-nats", "//lib/si-data-pg:si-data-pg", @@ -83,7 +82,6 @@ rust_test( name = "test-integration", deps = [ "//lib/dal-test:dal-test", - "//lib/rebaser-client:rebaser-client", "//lib/rebaser-core:rebaser-core", "//lib/rebaser-server:rebaser-server", "//lib/si-pkg:si-pkg", diff --git a/lib/dal/Cargo.toml b/lib/dal/Cargo.toml index 6dcd973f59..318bbe4aec 100644 --- a/lib/dal/Cargo.toml +++ b/lib/dal/Cargo.toml @@ -33,7 +33,6 @@ petgraph = { workspace = true } postcard = { version = "1.0.8", features = ["alloc"] } postgres-types = { workspace = true } rand = { workspace = true } -rebaser-client = { path = "../../lib/rebaser-client" } refinery = { workspace = true } regex = { workspace = true } remain = { workspace = true } diff --git a/lib/dal/src/context.rs b/lib/dal/src/context.rs index a16e0562d2..34bbc1e88e 100644 --- a/lib/dal/src/context.rs +++ b/lib/dal/src/context.rs @@ -1,15 +1,17 @@ use std::{collections::HashMap, collections::HashSet, mem, path::PathBuf, sync::Arc}; use futures::Future; -use rebaser_client::ClientError as RebaserClientError; -use rebaser_client::Config as RebaserClientConfig; -use rebaser_client::ReplyRebaseMessage; use serde::{Deserialize, Serialize}; use si_crypto::SymmetricCryptoService; use si_data_nats::{NatsClient, NatsError, NatsTxn}; use si_data_pg::{InstrumentedClient, PgError, PgPool, PgPoolError, PgPoolResult, PgTxn}; use si_events::WorkspaceSnapshotAddress; +use si_layer_cache::activities::rebase::RebaseStatus; +use si_layer_cache::activities::ActivityPayload; +use si_layer_cache::activities::ActivityPayloadDiscriminants; use si_layer_cache::db::LayerDb; +use si_layer_cache::event::LayeredEventMetadata; +use si_layer_cache::LayerDbError; use telemetry::prelude::*; use thiserror::Error; use tokio::sync::{MappedMutexGuard, Mutex, MutexGuard}; @@ -58,8 +60,6 @@ pub struct ServicesContext { module_index_url: Option, /// A service that can encrypt and decrypt values with a set of symmetric keys symmetric_crypto_service: SymmetricCryptoService, - /// Config for the the rebaser service - rebaser_config: RebaserClientConfig, /// The layer db (moka-rs, sled and postgres) layer_db: DalLayerDb, } @@ -76,7 +76,6 @@ impl ServicesContext { pkgs_path: Option, module_index_url: Option, symmetric_crypto_service: SymmetricCryptoService, - rebaser_config: RebaserClientConfig, layer_db: DalLayerDb, ) -> Self { Self { @@ -88,7 +87,6 @@ impl ServicesContext { pkgs_path, module_index_url, symmetric_crypto_service, - rebaser_config, layer_db, } } @@ -136,11 +134,6 @@ impl ServicesContext { &self.symmetric_crypto_service } - /// Gets a reference to the rebaser client configuration - pub fn rebaser_config(&self) -> &RebaserClientConfig { - &self.rebaser_config - } - /// Gets a reference to the Layer Db pub fn layer_db(&self) -> &DalLayerDb { &self.layer_db @@ -151,14 +144,8 @@ impl ServicesContext { let pg_conn = self.pg_pool.get().await?; let nats_conn = self.nats_conn.clone(); let job_processor = self.job_processor.clone(); - let rebaser_config = self.rebaser_config.clone(); - Ok(Connections::new( - pg_conn, - nats_conn, - job_processor, - rebaser_config, - )) + Ok(Connections::new(pg_conn, nats_conn, job_processor)) } } @@ -206,16 +193,15 @@ impl ConnectionState { async fn commit( self, tenancy: &Tenancy, + layer_db: &DalLayerDb, rebase_request: Option, ) -> Result<(Self, Option), TransactionsError> { let (conns, conflicts) = match self { Self::Connections(conns) => { - let (nats_conn, rebaser_config) = - (conns.nats_conn().clone(), conns.rebaser_config.clone()); // We need to rebase and wait for the rebaser to update the change set // pointer, even if we are not in a "transactions" state let conflicts = if let Some(rebase_request) = rebase_request { - rebase(tenancy, nats_conn, rebaser_config, rebase_request).await? + rebase(tenancy, layer_db, rebase_request).await? } else { None }; @@ -224,7 +210,9 @@ impl ConnectionState { Ok((Self::Connections(conns), conflicts)) } Self::Transactions(txns) => { - let (conns, conflicts) = txns.commit_into_conns(tenancy, rebase_request).await?; + let (conns, conflicts) = txns + .commit_into_conns(tenancy, layer_db, rebase_request) + .await?; Ok((Self::Connections(conns), conflicts)) } Self::Invalid => Err(TransactionsError::TxnCommit), @@ -236,6 +224,7 @@ impl ConnectionState { async fn blocking_commit( self, tenancy: &Tenancy, + layer_db: &DalLayerDb, rebase_request: Option, ) -> Result<(Self, Option), TransactionsError> { match self { @@ -245,13 +234,7 @@ impl ConnectionState { // Even if there are no open dal transactions, we may have written to the layer db // and we need to perform a rebase if one is requested let conflicts = if let Some(rebase_request) = rebase_request { - rebase( - tenancy, - conns.nats_conn.clone(), - conns.rebaser_config.clone(), - rebase_request, - ) - .await? + rebase(tenancy, layer_db, rebase_request).await? } else { None }; @@ -260,7 +243,7 @@ impl ConnectionState { } Self::Transactions(txns) => { let (conns, conflicts) = txns - .blocking_commit_into_conns(tenancy, rebase_request) + .blocking_commit_into_conns(tenancy, layer_db, rebase_request) .await?; Ok((Self::Connections(conns), conflicts)) } @@ -405,13 +388,7 @@ impl DalContext { &self, rebase_request: RebaseRequest, ) -> Result, TransactionsError> { - rebase( - &self.tenancy, - self.services_context.nats_conn.clone(), - self.services_context().rebaser_config.clone(), - rebase_request, - ) - .await + rebase(&self.tenancy, &self.layer_db(), rebase_request).await } async fn commit_internal( @@ -422,7 +399,10 @@ impl DalContext { self.blocking_commit_internal(rebase_request).await? } else { let mut guard = self.conns_state.lock().await; - let (new_guard, conflicts) = guard.take().commit(&self.tenancy, rebase_request).await?; + let (new_guard, conflicts) = guard + .take() + .commit(&self.tenancy, &self.layer_db(), rebase_request) + .await?; *guard = new_guard; conflicts @@ -439,7 +419,7 @@ impl DalContext { let (new_guard, conflicts) = guard .take() - .blocking_commit(&self.tenancy, rebase_request) + .blocking_commit(&self.tenancy, &self.layer_db(), rebase_request) .await?; *guard = new_guard; @@ -1018,6 +998,8 @@ impl DalContextBuilder { #[remain::sorted] #[derive(Debug, Error)] pub enum TransactionsError { + #[error("expected a {0:?} activity, but received a {1:?}")] + BadActivity(ActivityPayloadDiscriminants, ActivityPayloadDiscriminants), #[error("change set error: {0}")] ChangeSet(String), #[error("change set pointer not found for change set id: {0}")] @@ -1027,6 +1009,8 @@ pub enum TransactionsError { #[error(transparent)] JobQueueProcessor(#[from] JobQueueProcessorError), #[error(transparent)] + LayerDb(#[from] LayerDbError), + #[error(transparent)] Nats(#[from] NatsError), #[error("no base change set for change set: {0}")] NoBaseChangeSet(ChangeSetId), @@ -1037,8 +1021,6 @@ pub enum TransactionsError { #[error("rebase of snapshot {0} change set id {1} failed {2}")] RebaseFailed(WorkspaceSnapshotAddress, ChangeSetId, String), #[error(transparent)] - RebaserClient(#[from] RebaserClientError), - #[error(transparent)] SerdeJson(#[from] serde_json::Error), #[error(transparent)] Tenancy(#[from] TenancyError), @@ -1063,7 +1045,6 @@ pub enum TransactionsError { pub struct Connections { pg_conn: InstrumentedClient, nats_conn: NatsClient, - rebaser_config: RebaserClientConfig, job_processor: Box, } @@ -1074,12 +1055,10 @@ impl Connections { pg_conn: InstrumentedClient, nats_conn: NatsClient, job_processor: Box, - rebaser_config: RebaserClientConfig, ) -> Self { Self { pg_conn, nats_conn, - rebaser_config, job_processor, } } @@ -1089,14 +1068,8 @@ impl Connections { let pg_txn = PgTxn::create(self.pg_conn).await?; let nats_txn = self.nats_conn.transaction(); let job_processor = self.job_processor; - let rebaser_config = self.rebaser_config; - Ok(Transactions::new( - pg_txn, - nats_txn, - job_processor, - rebaser_config, - )) + Ok(Transactions::new(pg_txn, nats_txn, job_processor)) } /// Gets a reference to a PostgreSQL connection. @@ -1120,8 +1093,6 @@ pub struct Transactions { pg_txn: PgTxn, /// A NATS transaction. nats_txn: NatsTxn, - /// Rebaser client - rebaser_config: RebaserClientConfig, job_processor: Box, job_queue: JobQueue, #[allow(clippy::type_complexity)] @@ -1147,44 +1118,56 @@ pub struct Conflicts { // not want it long term. async fn rebase( tenancy: &Tenancy, - nats: NatsClient, - rebaser_config: RebaserClientConfig, + layer_db: &DalLayerDb, rebase_request: RebaseRequest, ) -> Result, TransactionsError> { let start = Instant::now(); - // TODO(nick): make this cleaner. - let workspace_id = tenancy.workspace_pk().unwrap_or(WorkspacePk::NONE).into(); - let rebaser_client = rebaser_client::Client::new(nats, rebaser_config, workspace_id); - - info!("got client and requesting rebase: {:?}", start.elapsed()); - let response = rebaser_client - .request_rebase( + let metadata = LayeredEventMetadata::new( + si_events::Tenancy::new( + tenancy.workspace_pk().unwrap_or(WorkspacePk::NONE).into(), + rebase_request.to_rebase_change_set_id.into(), + ), + si_events::Actor::System, + ); + + info!("requesting rebase: {:?}", start.elapsed()); + let rebase_finished_activity = layer_db + .activity() + .rebase() + .rebase_and_wait( rebase_request.to_rebase_change_set_id.into(), rebase_request.onto_workspace_snapshot_address, rebase_request.onto_vector_clock_id.into(), + metadata, ) .await?; info!("got response from rebaser: {:?}", start.elapsed()); - match response { - ReplyRebaseMessage::Success { .. } => Ok(None), - ReplyRebaseMessage::Error { message } => Err(TransactionsError::RebaseFailed( - rebase_request.onto_workspace_snapshot_address, - rebase_request.to_rebase_change_set_id, - message, + match rebase_finished_activity.payload { + ActivityPayload::RebaseFinished(rebase_finished) => match rebase_finished.status() { + RebaseStatus::Success { .. } => Ok(None), + RebaseStatus::Error { message } => Err(TransactionsError::RebaseFailed( + rebase_request.onto_workspace_snapshot_address, + rebase_request.to_rebase_change_set_id, + message.to_string(), + )), + RebaseStatus::ConflictsFound { + conflicts_found, + updates_found_and_skipped, + } => { + let conflicts = Conflicts { + conflicts_found: serde_json::from_str(conflicts_found)?, + updates_found_and_skipped: serde_json::from_str(updates_found_and_skipped)?, + }; + + Ok(Some(conflicts)) + } + }, + p => Err(TransactionsError::BadActivity( + ActivityPayloadDiscriminants::RebaseFinished, + p.into(), )), - ReplyRebaseMessage::ConflictsFound { - conflicts_found, - updates_found_and_skipped, - } => { - let conflicts = Conflicts { - conflicts_found: serde_json::from_value(conflicts_found)?, - updates_found_and_skipped: serde_json::from_value(updates_found_and_skipped)?, - }; - - Ok(Some(conflicts)) - } } } @@ -1194,12 +1177,10 @@ impl Transactions { pg_txn: PgTxn, nats_txn: NatsTxn, job_processor: Box, - rebaser_config: RebaserClientConfig, ) -> Self { Self { pg_txn, nats_txn, - rebaser_config, job_processor, job_queue: JobQueue::new(), dependencies_update_component: Default::default(), @@ -1227,6 +1208,7 @@ impl Transactions { pub async fn commit_into_conns( self, tenancy: &Tenancy, + layer_db: &DalLayerDb, rebase_request: Option, ) -> Result<(Connections, Option), TransactionsError> { let pg_conn = self.pg_txn.commit_into_conn().await?; @@ -1238,13 +1220,7 @@ impl Transactions { // and that will be the snapshot we expect the dependent values update // (for example) to access. let conflicts = if let Some(rebase_request) = rebase_request { - rebase( - tenancy, - nats_conn.clone(), - self.rebaser_config.clone(), - rebase_request, - ) - .await? + rebase(tenancy, layer_db, rebase_request).await? } else { None }; @@ -1252,7 +1228,7 @@ impl Transactions { self.job_processor.process_queue(self.job_queue).await?; Ok(( - Connections::new(pg_conn, nats_conn, self.job_processor, self.rebaser_config), + Connections::new(pg_conn, nats_conn, self.job_processor), conflicts, )) } @@ -1268,19 +1244,14 @@ impl Transactions { pub async fn blocking_commit_into_conns( self, tenancy: &Tenancy, + layer_db: &DalLayerDb, rebase_request: Option, ) -> Result<(Connections, Option), TransactionsError> { let pg_conn = self.pg_txn.commit_into_conn().await?; let nats_conn = self.nats_txn.commit_into_conn().await?; let conflicts = if let Some(rebase_request) = rebase_request { - rebase( - tenancy, - nats_conn.clone(), - self.rebaser_config.clone(), - rebase_request, - ) - .await? + rebase(tenancy, layer_db, rebase_request).await? } else { None }; @@ -1288,7 +1259,7 @@ impl Transactions { self.job_processor .blocking_process_queue(self.job_queue) .await?; - let conns = Connections::new(pg_conn, nats_conn, self.job_processor, self.rebaser_config); + let conns = Connections::new(pg_conn, nats_conn, self.job_processor); Ok((conns, conflicts)) } @@ -1301,7 +1272,7 @@ impl Transactions { pub async fn rollback_into_conns(self) -> Result { let pg_conn = self.pg_txn.rollback_into_conn().await?; let nats_conn = self.nats_txn.rollback_into_conn().await?; - let conns = Connections::new(pg_conn, nats_conn, self.job_processor, self.rebaser_config); + let conns = Connections::new(pg_conn, nats_conn, self.job_processor); Ok(conns) } diff --git a/lib/dal/src/schema.rs b/lib/dal/src/schema.rs index 76bcf4838f..a1bc7d938a 100644 --- a/lib/dal/src/schema.rs +++ b/lib/dal/src/schema.rs @@ -5,7 +5,7 @@ use si_layer_cache::LayerDbError; use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; -use telemetry::tracing::log::info; +use telemetry::prelude::*; use thiserror::Error; use tokio::sync::TryLockError; diff --git a/lib/pinga-server/BUCK b/lib/pinga-server/BUCK index 5caeec1c81..934474e9ec 100644 --- a/lib/pinga-server/BUCK +++ b/lib/pinga-server/BUCK @@ -6,7 +6,6 @@ rust_library( "//lib/buck2-resources:buck2-resources", "//lib/dal:dal", "//lib/nats-subscriber:nats-subscriber", - "//lib/rebaser-client:rebaser-client", "//lib/si-crypto:si-crypto", "//lib/si-data-nats:si-data-nats", "//lib/si-data-pg:si-data-pg", diff --git a/lib/pinga-server/Cargo.toml b/lib/pinga-server/Cargo.toml index 7672daff05..1dec4766e1 100644 --- a/lib/pinga-server/Cargo.toml +++ b/lib/pinga-server/Cargo.toml @@ -14,7 +14,6 @@ nats-subscriber = { path = "../../lib/nats-subscriber" } remain = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } -rebaser-client = { path = "../../lib/rebaser-client" } si-crypto = { path = "../../lib/si-crypto" } si-data-nats = { path = "../../lib/si-data-nats" } si-data-pg = { path = "../../lib/si-data-pg" } diff --git a/lib/pinga-server/src/server.rs b/lib/pinga-server/src/server.rs index f2efd166f5..47488f2a1b 100644 --- a/lib/pinga-server/src/server.rs +++ b/lib/pinga-server/src/server.rs @@ -11,7 +11,6 @@ use dal::{ }; use futures::{FutureExt, Stream, StreamExt}; use nats_subscriber::{Request, SubscriberError}; -use rebaser_client::Config as RebaserClientConfig; use si_crypto::{ CryptoConfig, SymmetricCryptoError, SymmetricCryptoService, SymmetricCryptoServiceConfig, }; @@ -119,7 +118,6 @@ impl Server { let job_processor = Self::create_job_processor(nats.clone()); let symmetric_crypto_service = Self::create_symmetric_crypto_service(config.symmetric_crypto_service()).await?; - let rebaser_config = RebaserClientConfig::default(); let (layer_db, layer_db_graceful_shutdown) = LayerDb::initialize( config.layer_cache_sled_path(), @@ -139,7 +137,6 @@ impl Server { None, None, symmetric_crypto_service, - rebaser_config, layer_db, ); diff --git a/lib/rebaser-client/BUCK b/lib/rebaser-client/BUCK deleted file mode 100644 index d0ad52d8ac..0000000000 --- a/lib/rebaser-client/BUCK +++ /dev/null @@ -1,20 +0,0 @@ -load("@prelude-si//:macros.bzl", "rust_library") - -rust_library( - name = "rebaser-client", - deps = [ - "//lib/rebaser-core:rebaser-core", - "//lib/si-data-nats:si-data-nats", - "//lib/si-events-rs:si-events", - "//lib/telemetry-rs:telemetry", - "//third-party/rust:futures", - "//third-party/rust:remain", - "//third-party/rust:serde", - "//third-party/rust:serde_json", - "//third-party/rust:thiserror", - "//third-party/rust:ulid", - ], - srcs = glob([ - "src/**/*.rs", - ]), -) diff --git a/lib/rebaser-client/Cargo.toml b/lib/rebaser-client/Cargo.toml deleted file mode 100644 index daf56d1122..0000000000 --- a/lib/rebaser-client/Cargo.toml +++ /dev/null @@ -1,18 +0,0 @@ -[package] -name = "rebaser-client" -version = "0.1.0" -edition = "2021" -publish = false - -[dependencies] -rebaser-core = { path = "../../lib/rebaser-core" } -si-data-nats = { path = "../../lib/si-data-nats" } -si-events = { path = "../../lib/si-events-rs" } -telemetry = { path = "../../lib/telemetry-rs" } - -futures = { workspace = true } -remain = { workspace = true } -serde = { workspace = true } -serde_json = { workspace = true } -thiserror = { workspace = true } -ulid = { workspace = true } diff --git a/lib/rebaser-client/src/lib.rs b/lib/rebaser-client/src/lib.rs deleted file mode 100644 index ae901dcc4d..0000000000 --- a/lib/rebaser-client/src/lib.rs +++ /dev/null @@ -1,142 +0,0 @@ -//! This crate provides the rebaser [`Client`], which is used for communicating with a running -//! rebaser [`Server`](rebaser_server::Server). - -#![warn( - bad_style, - clippy::missing_panics_doc, - clippy::panic, - clippy::panic_in_result_fn, - clippy::unwrap_in_result, - clippy::unwrap_used, - dead_code, - improper_ctypes, - missing_debug_implementations, - missing_docs, - no_mangle_generic_items, - non_shorthand_field_patterns, - overflowing_literals, - path_statements, - patterns_in_fns_without_body, - rust_2018_idioms, - unconditional_recursion, - unreachable_pub, - unused, - unused_allocation, - unused_comparisons, - unused_parens, - while_true -)] - -use futures::StreamExt; -use rebaser_core::{RebaserMessagingConfig, RequestRebaseMessage, SubjectGenerator}; -use si_data_nats::jetstream::{Context, JetstreamError}; -use si_data_nats::subject::ToSubject; -use si_data_nats::NatsClient; -use si_events::WorkspaceSnapshotAddress; -use telemetry::prelude::error; -use thiserror::Error; -use ulid::Ulid; - -// The client does yet need to have its own config, so it uses the messaging config. -pub use rebaser_core::RebaserMessagingConfig as Config; -pub use rebaser_core::ReplyRebaseMessage; - -#[allow(missing_docs)] -#[remain::sorted] -#[derive(Debug, Error)] -pub enum ClientError { - #[error("jetstream error: {0}")] - Jetstream(#[from] JetstreamError), - #[error("nats error: {0}")] - Nats(#[from] si_data_nats::Error), - #[error("serde json error: {0}")] - SerdeJson(#[from] serde_json::Error), - #[error("unexpected empty stream when subscribing to subject: {0}")] - UnexpectedEmptyStream(String), -} - -#[allow(missing_docs)] -pub type ClientResult = Result; - -/// A tenant-scoped client used for communicating with 1:N rebaser servers. -#[derive(Debug)] -pub struct Client { - jetstream_ctx: Context, - nats: NatsClient, - subject_prefix: Option, - workspace_id: Ulid, -} - -impl Client { - /// Creates a new [`Client`]. - pub fn new( - nats: NatsClient, - messaging_config: RebaserMessagingConfig, - workspace_id: Ulid, - ) -> Self { - Self { - jetstream_ctx: nats.clone().to_jetstream_ctx(), - nats, - subject_prefix: messaging_config.subject_prefix().map(ToOwned::to_owned), - workspace_id, - } - } - - /// Publishes a rebase requester to the rebaser stream. - pub async fn request_rebase( - &self, - to_rebase_change_set_id: Ulid, - onto_workspace_snapshot_address: WorkspaceSnapshotAddress, - onto_vector_clock_id: Ulid, - ) -> ClientResult { - let subject = SubjectGenerator::request( - self.workspace_id, - to_rebase_change_set_id, - self.subject_prefix.as_ref(), - ); - - let serialized_messaged = serde_json::to_vec(&RequestRebaseMessage { - to_rebase_change_set_id, - onto_workspace_snapshot_address, - onto_vector_clock_id, - })?; - - let reply_subject = self - .jetstream_ctx - .publish_with_reply_mailbox_and_immediately_ack( - &self.nats, - subject, - serialized_messaged.into(), - ) - .await?; - - // NOTE(nick): we may want to add a timeout in the future when waiting for a reply. - self.wait_for_reply(reply_subject).await - } - - async fn wait_for_reply( - &self, - reply_subject: impl ToSubject, - ) -> ClientResult { - let reply_subject = reply_subject.to_subject(); - - let mut subscriber = self.nats.subscribe(reply_subject.clone()).await?; - - // Get the first immediate message (there should only ever be one) and deserialize it. - let message: ReplyRebaseMessage = if let Some(serialized_message) = subscriber.next().await - { - serde_json::from_slice(serialized_message.payload().to_vec().as_slice())? - } else { - return Err(ClientError::UnexpectedEmptyStream( - reply_subject.to_string(), - )); - }; - - // Attempt to unsubscribe. - if let Err(err) = subscriber.unsubscribe().await { - error!(error = ?err, %reply_subject, "error when unsubscribing"); - } - - Ok(message) - } -} diff --git a/lib/rebaser-core/src/lib.rs b/lib/rebaser-core/src/lib.rs index 36f23424f1..c481e42a18 100644 --- a/lib/rebaser-core/src/lib.rs +++ b/lib/rebaser-core/src/lib.rs @@ -1,7 +1,3 @@ -//! This library exists to ensure that crate "rebaser-client" crate does not depend on the "rebaser-server" crate and -//! vice versa. Keeping the dependency chain intact is important because "rebaser-server" depends on the -//! dal. The dal, and any crate other than "rebaser-server" and this crate, must be able to use the "rebaser-client". - #![warn( bad_style, clippy::missing_panics_doc, @@ -12,7 +8,6 @@ dead_code, improper_ctypes, missing_debug_implementations, - missing_docs, no_mangle_generic_items, non_shorthand_field_patterns, overflowing_literals, diff --git a/lib/rebaser-server/src/server.rs b/lib/rebaser-server/src/server.rs index 6aea53437a..a2efe74c77 100644 --- a/lib/rebaser-server/src/server.rs +++ b/lib/rebaser-server/src/server.rs @@ -1,7 +1,6 @@ use std::{future::IntoFuture, io, path::Path, sync::Arc}; use dal::{DalLayerDb, InitializationError, JobQueueProcessor, NatsProcessor}; -use rebaser_core::RebaserMessagingConfig; use si_crypto::SymmetricCryptoServiceConfig; use si_crypto::{SymmetricCryptoError, SymmetricCryptoService}; use si_data_nats::{NatsClient, NatsConfig, NatsError}; @@ -74,8 +73,6 @@ pub struct Server { /// An internal graceful shutdown receiver handle which the server's main thread uses to stop /// accepting work when a shutdown event is in progress. graceful_shutdown_rx: oneshot::Receiver<()>, - /// The messaging configuration - messaging_config: RebaserMessagingConfig, /// The layer db layer_db: DalLayerDb, } @@ -98,7 +95,6 @@ impl Server { let job_processor = Self::create_job_processor(nats.clone()); let symmetric_crypto_service = Self::create_symmetric_crypto_service(config.symmetric_crypto_service()).await?; - let messaging_config = config.messaging_config(); let (layer_db, layer_db_graceful_shutdown) = DalLayerDb::initialize( config.layer_cache_sled_path(), @@ -116,7 +112,6 @@ impl Server { veritech, job_processor, symmetric_crypto_service, - messaging_config.to_owned(), layer_db, ) } @@ -131,7 +126,6 @@ impl Server { veritech: VeritechClient, job_processor: Box, symmetric_crypto_service: SymmetricCryptoService, - messaging_config: RebaserMessagingConfig, layer_db: DalLayerDb, ) -> ServerResult { // An mpsc channel which can be used to externally shut down the server. @@ -156,7 +150,6 @@ impl Server { shutdown_watch_rx, external_shutdown_tx, graceful_shutdown_rx, - messaging_config, layer_db, }) } @@ -172,7 +165,6 @@ impl Server { self.symmetric_crypto_service, self.encryption_key, self.shutdown_watch_rx, - self.messaging_config, self.layer_db, ) .await?; diff --git a/lib/rebaser-server/src/server/core_loop.rs b/lib/rebaser-server/src/server/core_loop.rs index ab2644b102..29b61b84d1 100644 --- a/lib/rebaser-server/src/server/core_loop.rs +++ b/lib/rebaser-server/src/server/core_loop.rs @@ -4,14 +4,14 @@ use dal::{ }; use futures::FutureExt; use futures::StreamExt; -use rebaser_core::{ - RebaserMessagingConfig, ReplyRebaseMessage, RequestRebaseMessage, SubjectGenerator, -}; use si_crypto::SymmetricCryptoService; -use si_data_nats::jetstream::{AckKind, JetstreamError, Stream, REPLY_SUBJECT_HEADER_NAME}; -use si_data_nats::subject::ToSubject; +use si_data_nats::jetstream::{AckKind, JetstreamError}; use si_data_nats::NatsClient; use si_data_pg::PgPool; +use si_layer_cache::activities::rebase::RebaseStatus; +use si_layer_cache::activities::AckRebaseRequest; +use si_layer_cache::activities::RebaserRequestsWorkQueueStream; +use si_layer_cache::LayerDbError; use std::sync::Arc; use std::time::Instant; use stream_cancel::StreamExt as CancelStreamExt; @@ -26,6 +26,8 @@ use crate::server::rebase::perform_rebase; pub enum CoreLoopSetupError { #[error("jetstream error: {0}")] Jetstream(#[from] JetstreamError), + #[error("layerdb error: {0}")] + LayerDb(#[from] LayerDbError), #[error("serde json erorr: {0}")] SerdeJson(#[from] serde_json::Error), #[error("transactions error: {0}")] @@ -49,7 +51,6 @@ pub(crate) async fn setup_and_run_core_loop( symmetric_crypto_service: SymmetricCryptoService, encryption_key: Arc, shutdown_watch_rx: watch::Receiver<()>, - messaging_config: RebaserMessagingConfig, layer_db: DalLayerDb, ) -> CoreLoopSetupResult<()> { let services_context = ServicesContext::new( @@ -61,33 +62,15 @@ pub(crate) async fn setup_and_run_core_loop( None, None, symmetric_crypto_service, - messaging_config.clone(), - layer_db, + layer_db.clone(), ); - // Setup the subjects. - let subject_all = SubjectGenerator::all(messaging_config.subject_prefix()); - let subject_root = SubjectGenerator::root(messaging_config.subject_prefix()); - info!(%subject_all, %subject_root, "created services context and prepared subjects"); - - // Setup the stream and the consumer. - let jetstream_ctx = nats.clone().to_jetstream_ctx(); - info!(%subject_all, %subject_root, "finding or creating stream"); - let rebaser_jetstream_stream = jetstream_ctx - .get_or_create_work_queue_stream(&subject_root, vec![subject_all.clone()]) - .await?; - - info!(%subject_all, %subject_root, "finding or creating durable consumer"); - let consumer = jetstream_ctx - .get_or_create_durable_consumer(&rebaser_jetstream_stream, &subject_root) - .await?; - - info!(%subject_all, %subject_root, "getting stream from consumer"); - let stream = consumer.stream().await?; - info!("getting dal context builder"); let ctx_builder = DalContext::builder(services_context.clone(), false); + info!("subscribing to work queue"); + let stream = layer_db.activity().rebase().subscribe_work_queue().await?; + info!("setup complete, entering core loop"); core_loop_infallible(ctx_builder, stream, shutdown_watch_rx).await; info!("exited core loop"); @@ -97,7 +80,7 @@ pub(crate) async fn setup_and_run_core_loop( async fn core_loop_infallible( ctx_builder: DalContextBuilder, - stream: Stream, + stream: RebaserRequestsWorkQueueStream, mut shutdown_watch_rx: watch::Receiver<()>, ) { let mut stream = stream.take_until_if(Box::pin(shutdown_watch_rx.changed().map(|_| true))); @@ -115,42 +98,9 @@ async fn core_loop_infallible( error!(error = ?err, "could not ack with progress, going to continue anyway"); } - // Deserialize the message payload so that we can process it. - let request_message: RequestRebaseMessage = - match serde_json::from_slice(message.message.payload.to_vec().as_slice()) { - Ok(deserialized) => deserialized, - Err(err) => { - error!(error = ?err, ?message, "failed to deserialize message payload"); - continue; - } - }; - - // Pull the reply subject off of the message. - let reply_subject = if let Some(headers) = &message.headers { - if let Some(value) = headers.get(REPLY_SUBJECT_HEADER_NAME.clone()) { - value.to_string() - } else { - // NOTE(nick): we may actually want to process the message anyway, but things would be super messed up - // at that point... because no one should be sending messages exterior to rebaser clients. - error!( - ?message, - "no reply subject found in headers, skipping messages because we cannot reply" - ); - continue; - } - } else { - // NOTE(nick): we may actually want to process the message anyway, but things would be super messed up - // at that point... because no one should be sending messages exterior to rebaser clients. - error!( - ?message, - "no headers found, skipping message because we cannot reply" - ); - continue; - }; - let ctx_builder = ctx_builder.clone(); tokio::spawn(async move { - perform_rebase_and_reply_infallible(ctx_builder, request_message, reply_subject).await; + perform_rebase_and_reply_infallible(ctx_builder, &message).await; if let Err(err) = message.ack_with(AckKind::Ack).await { error!(?message, ?err, "failing acking message"); } @@ -160,8 +110,7 @@ async fn core_loop_infallible( async fn perform_rebase_and_reply_infallible( ctx_builder: DalContextBuilder, - message: RequestRebaseMessage, - reply_subject: impl ToSubject, + message: &AckRebaseRequest, ) { let start = Instant::now(); @@ -175,28 +124,38 @@ async fn perform_rebase_and_reply_infallible( ctx.update_visibility_deprecated(Visibility::new_head()); ctx.update_tenancy(Tenancy::new(WorkspacePk::NONE)); - let reply_subject = reply_subject.to_subject(); - - let reply_message = perform_rebase(&mut ctx, message).await.unwrap_or_else(|err| { - error!(error = ?err, ?message, ?reply_subject, "performing rebase failed, attempting to reply"); - ReplyRebaseMessage::Error { - message: err.to_string(), - } - }); - - match serde_json::to_vec(&reply_message) { - Ok(serialized_payload) => { - if let Err(publish_err) = ctx - .nats_conn() - .publish(reply_subject.clone(), serialized_payload.into()) - .await - { - error!(error = ?publish_err, %reply_subject, "replying to requester failed"); + let rebase_status = perform_rebase(&mut ctx, message) + .await + .unwrap_or_else(|err| { + error!(error = ?err, ?message, "performing rebase failed, attempting to reply"); + RebaseStatus::Error { + message: err.to_string(), } - } - Err(serialization_err) => { - error!(error = ?serialization_err, %reply_subject, "failed to serialize reply message"); - } + }); + + if let Err(e) = message.ack().await { + error!(error = ?e, ?message, "failed to acknolwedge the nats message after rebase; likely a timeout"); + } + + if let Err(e) = ctx + .layer_db() + .activity() + .rebase() + .finished( + rebase_status, + message.payload.to_rebase_change_set_id, + message.payload.onto_workspace_snapshot_address, + message.metadata.clone(), + message.id, + ) + .await + { + error!(error = ?e, ?message, "failed to send rebase finished activity"); } - info!("perform rebase and reply total: {:?}", start.elapsed()); + + info!( + ?message, + "perform rebase and reply total: {:?}", + start.elapsed() + ); } diff --git a/lib/rebaser-server/src/server/rebase.rs b/lib/rebaser-server/src/server/rebase.rs index 6806d0b31d..1d904dca42 100644 --- a/lib/rebaser-server/src/server/rebase.rs +++ b/lib/rebaser-server/src/server/rebase.rs @@ -5,7 +5,8 @@ use dal::{ DalContext, Tenancy, TransactionsError, Visibility, WorkspacePk, WorkspaceSnapshot, WsEvent, WsEventError, }; -use rebaser_core::{ReplyRebaseMessage, RequestRebaseMessage}; +use si_layer_cache::activities::rebase::RebaseStatus; +use si_layer_cache::activities::AckRebaseRequest; use telemetry::prelude::*; use thiserror::Error; use tokio::time::Instant; @@ -34,15 +35,15 @@ type RebaseResult = Result; pub(crate) async fn perform_rebase( ctx: &mut DalContext, - message: RequestRebaseMessage, -) -> RebaseResult { + message: &AckRebaseRequest, +) -> RebaseResult { let start = Instant::now(); // Gather everything we need to detect conflicts and updates from the inbound message. let mut to_rebase_change_set = - ChangeSetPointer::find(ctx, message.to_rebase_change_set_id.into()) + ChangeSetPointer::find(ctx, message.payload.to_rebase_change_set_id.into()) .await? .ok_or(RebaseError::MissingChangeSetPointer( - message.to_rebase_change_set_id.into(), + message.payload.to_rebase_change_set_id.into(), ))?; let to_rebase_workspace_snapshot_address = to_rebase_change_set.workspace_snapshot_address.ok_or( @@ -52,7 +53,7 @@ pub(crate) async fn perform_rebase( let to_rebase_workspace_snapshot = WorkspaceSnapshot::find(ctx, to_rebase_workspace_snapshot_address).await?; let onto_workspace_snapshot: WorkspaceSnapshot = - WorkspaceSnapshot::find(ctx, message.onto_workspace_snapshot_address).await?; + WorkspaceSnapshot::find(ctx, message.payload.onto_workspace_snapshot_address).await?; info!( "to_rebase_id: {}, onto_id: {}", to_rebase_workspace_snapshot_address, @@ -60,8 +61,13 @@ pub(crate) async fn perform_rebase( ); info!("after snapshot fetch and parse: {:?}", start.elapsed()); + // Let NATS know we are still working + let _ = message + .ack_with(si_layer_cache::activities::AckKind::Progress) + .await; + // Perform the conflicts and updates detection. - let onto_vector_clock_id: VectorClockId = message.onto_vector_clock_id.into(); + let onto_vector_clock_id: VectorClockId = message.payload.onto_vector_clock_id.into(); let (conflicts, updates) = to_rebase_workspace_snapshot .detect_conflicts_and_updates( to_rebase_change_set.vector_clock_id(), @@ -78,7 +84,7 @@ pub(crate) async fn perform_rebase( // If there are conflicts, immediately assemble a reply message that conflicts were found. // Otherwise, we can perform updates and assemble a "success" reply message. - let message: ReplyRebaseMessage = if conflicts.is_empty() { + let message: RebaseStatus = if conflicts.is_empty() { // TODO(nick): store the offset with the change set. to_rebase_workspace_snapshot .perform_updates( @@ -102,13 +108,13 @@ pub(crate) async fn perform_rebase( info!("pointer updated: {:?}", start.elapsed()); } - ReplyRebaseMessage::Success { - updates_performed: serde_json::to_value(updates)?, + RebaseStatus::Success { + updates_performed: serde_json::to_value(updates)?.to_string(), } } else { - ReplyRebaseMessage::ConflictsFound { - conflicts_found: serde_json::to_value(conflicts)?, - updates_found_and_skipped: serde_json::to_value(updates)?, + RebaseStatus::ConflictsFound { + conflicts_found: serde_json::to_value(conflicts)?.to_string(), + updates_found_and_skipped: serde_json::to_value(updates)?.to_string(), } }; diff --git a/lib/si-layer-cache/BUCK b/lib/si-layer-cache/BUCK index 8b38dd247d..0c610cc404 100644 --- a/lib/si-layer-cache/BUCK +++ b/lib/si-layer-cache/BUCK @@ -29,6 +29,7 @@ rust_library( "//third-party/rust:tempfile", "//third-party/rust:thiserror", "//third-party/rust:tokio", + "//third-party/rust:tokio-stream", "//third-party/rust:tokio-util", "//third-party/rust:ulid", ], @@ -61,6 +62,7 @@ rust_test( "//third-party/rust:sled", "//third-party/rust:tempfile", "//third-party/rust:tokio", + "//third-party/rust:tokio-stream", "//third-party/rust:tokio-util", "//third-party/rust:ulid", ":si-layer-cache", diff --git a/lib/si-layer-cache/Cargo.toml b/lib/si-layer-cache/Cargo.toml index eed1df0ad1..c29657a2be 100644 --- a/lib/si-layer-cache/Cargo.toml +++ b/lib/si-layer-cache/Cargo.toml @@ -27,6 +27,7 @@ telemetry = { path = "../../lib/telemetry-rs" } tempfile = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } +tokio-stream = { workspace = true } tokio-util = { workspace = true } ulid = { workspace = true } serde_json = { workspace = true } diff --git a/lib/si-layer-cache/src/activities.rs b/lib/si-layer-cache/src/activities.rs index 7b7f7fbc61..969f97f4b9 100644 --- a/lib/si-layer-cache/src/activities.rs +++ b/lib/si-layer-cache/src/activities.rs @@ -1,5 +1,7 @@ use std::{ - fmt, ops, + collections::HashMap, + fmt::{self, Debug}, + ops, pin::Pin, str::FromStr, sync::Arc, @@ -13,6 +15,8 @@ use si_data_nats::{ NatsClient, }; use strum::EnumDiscriminants; +use tokio::sync::{broadcast, RwLock}; +use tokio_util::{sync::CancellationToken, task::TaskTracker}; use ulid::{Ulid, ULID_LEN}; use crate::{ @@ -22,11 +26,17 @@ use crate::{ LayerDbError, }; -use self::rebase::{RebaseFinished, RebaseRequest}; +use self::{ + rebase::{RebaseFinished, RebaseRequest}, + test::{IntegrationTest, IntegrationTestAlt}, +}; + +use telemetry::prelude::*; pub use si_data_nats::async_nats::jetstream::AckKind; pub mod rebase; +pub mod test; #[derive(Copy, Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)] pub struct ActivityId(Ulid); @@ -67,7 +77,7 @@ impl From for ActivityId { impl fmt::Display for ActivityId { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.0.fmt(f) + std::fmt::Display::fmt(&self.0, f) } } @@ -76,23 +86,37 @@ pub struct Activity { pub id: ActivityId, pub payload: ActivityPayload, pub metadata: LayeredEventMetadata, + pub parent_activity_id: Option, } impl Activity { - pub fn new(payload: ActivityPayload, metadata: LayeredEventMetadata) -> Activity { + pub fn new( + payload: ActivityPayload, + metadata: LayeredEventMetadata, + parent_activity_id: Option, + ) -> Activity { Activity { id: ActivityId::new(), payload, metadata, + parent_activity_id, } } pub fn rebase(request: RebaseRequest, metadata: LayeredEventMetadata) -> Activity { - Activity::new(ActivityPayload::RebaseRequest(request), metadata) + Activity::new(ActivityPayload::RebaseRequest(request), metadata, None) } - pub fn rebase_finished(request: RebaseFinished, metadata: LayeredEventMetadata) -> Activity { - Activity::new(ActivityPayload::RebaseFinished(request), metadata) + pub fn rebase_finished( + request: RebaseFinished, + metadata: LayeredEventMetadata, + from_rebase_activity_id: ActivityId, + ) -> Activity { + Activity::new( + ActivityPayload::RebaseFinished(request), + metadata, + Some(from_rebase_activity_id), + ) } } @@ -100,6 +124,8 @@ impl Activity { pub enum ActivityPayload { RebaseRequest(RebaseRequest), RebaseFinished(RebaseFinished), + IntegrationTest(IntegrationTest), + IntegrationTestAlt(IntegrationTestAlt), } impl ActivityPayload { @@ -114,6 +140,8 @@ impl ActivityPayloadDiscriminants { match self { ActivityPayloadDiscriminants::RebaseRequest => "rebase.request".to_string(), ActivityPayloadDiscriminants::RebaseFinished => "rebase.finished".to_string(), + ActivityPayloadDiscriminants::IntegrationTest => "integration_test.test".to_string(), + ActivityPayloadDiscriminants::IntegrationTestAlt => "integration_test.alt".to_string(), } } } @@ -200,6 +228,118 @@ impl ActivityPublisher { } } +#[derive(Debug, Clone)] +pub struct ActivityMultiplexer { + instance_id: Ulid, + nats_client: NatsClient, + tracker: TaskTracker, + shutdown_token: CancellationToken, + channels: Arc>>>, +} + +impl ActivityMultiplexer { + pub fn new( + instance_id: Ulid, + nats_client: NatsClient, + + shutdown_token: CancellationToken, + ) -> Self { + let tracker = TaskTracker::new(); + Self { + tracker, + instance_id, + nats_client, + shutdown_token, + channels: Arc::new(RwLock::new(HashMap::new())), + } + } + + pub async fn subscribe( + &self, + filters: Option>, + ) -> LayerDbResult> { + let (multiplex_key, has_filter_array) = if let Some(filters) = filters { + let filter_array: Vec = filters.into_iter().collect(); + ( + filter_array + .iter() + .map(|d| d.to_subject()) + .collect::>() + .join("."), + Some(filter_array), + ) + } else { + ("everything".to_string(), None) + }; + { + let reader = self.channels.read().await; + if let Some(sender) = reader.get(&multiplex_key) { + return Ok(sender.subscribe()); + } + } + let activity_stream = + ActivityStream::create(self.instance_id, &self.nats_client, has_filter_array).await?; + let (tx, rx) = broadcast::channel(1000); // the 1000 here is the depth the channel will + // keep if a reader is slow + let mut amx_task = ActivityMultiplexerTask::new(activity_stream, tx.clone()); + let amx_shutdown_token = self.shutdown_token.clone(); + self.tracker + .spawn(async move { amx_task.run(amx_shutdown_token).await }); + { + let mut writer = self.channels.write().await; + writer.insert(multiplex_key, tx); + } + Ok(rx) + } +} + +pub struct ActivityMultiplexerTask { + activity_stream: ActivityStream, + tx: broadcast::Sender, +} + +impl ActivityMultiplexerTask { + pub fn new(activity_stream: ActivityStream, tx: broadcast::Sender) -> Self { + Self { + activity_stream, + tx, + } + } + + pub async fn run(&mut self, token: CancellationToken) -> LayerDbResult<()> { + tokio::select! { + () = self.process() => { + debug!("activity multiplexer task has ended; likely a bug"); + }, + () = token.cancelled() => { + debug!("activity multiplexer has been cancelled; shutting down"); + }, + } + Ok(()) + } + + pub async fn process(&mut self) { + while let Some(ack_activity_result) = self.activity_stream.next().await { + match ack_activity_result { + Ok(ack_activity) => match ack_activity.ack().await { + Ok(_) => { + if let Err(e) = self.tx.send(ack_activity.inner) { + trace!( + ?e, + "activity multiplexer skipping message; no receivers listening. this can be totally normal!" + ); + } + } + Err(e) => warn!(?e, "Failed to ack an activity stream message; bug!"), + }, + Err(e) => { + warn!(?e, "Activity stream message had an error; bug!"); + } + } + } + } +} + pub struct ActivityStream { inner: jetstream::consumer::pull::Stream, } @@ -271,7 +411,10 @@ impl Stream for ActivityStream { acker: Arc::new(acker), }))), // Error deserializing message - Err(err) => Poll::Ready(Some(Err(err.into()))), + Err(err) => { + error!(?msg, "failure to deserialize message in activity stream"); + Poll::Ready(Some(Err(err.into()))) + } } } // Upstream errors are propagated downstream @@ -292,6 +435,16 @@ pub struct AckRebaseRequest { acker: Arc, } +impl Debug for AckRebaseRequest { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "AckRebaseRequest {{ id: {:?}, payload: {:?}, metadata: {:?} }}", + self.id, self.payload, self.metadata + ) + } +} + impl AckRebaseRequest { pub async fn ack(&self) -> LayerDbResult<()> { self.acker.ack().await.map_err(LayerDbError::NatsAck) @@ -361,6 +514,7 @@ impl Stream for RebaserRequestsWorkQueueStream { Ok(activity) => match activity.payload { // Correct variant, convert to work-specific type ActivityPayload::RebaseRequest(req) => { + warn!(?req, "received rebase request over pull channel"); Poll::Ready(Some(Ok(AckRebaseRequest { id: activity.id, payload: req, @@ -375,7 +529,10 @@ impl Stream for RebaserRequestsWorkQueueStream { )))), }, // Error deserializing message - Err(err) => Poll::Ready(Some(Err(err.into()))), + Err(err) => { + warn!(?msg, "failed to deserialize message"); + Poll::Ready(Some(Err(err.into()))) + } } } // Upstream errors are propagated downstream diff --git a/lib/si-layer-cache/src/activities/rebase.rs b/lib/si-layer-cache/src/activities/rebase.rs index be472123e4..f66296f72c 100644 --- a/lib/si-layer-cache/src/activities/rebase.rs +++ b/lib/si-layer-cache/src/activities/rebase.rs @@ -1,7 +1,16 @@ use serde::{Deserialize, Serialize}; -use serde_json::Value; + +use si_events::WorkspaceSnapshotAddress; +use telemetry::prelude::*; +use telemetry::tracing::instrument; +use tokio::time::Instant; +use tokio_stream::wrappers::BroadcastStream; use ulid::Ulid; +use super::{Activity, ActivityId, ActivityPayloadDiscriminants, RebaserRequestsWorkQueueStream}; +use crate::activity_client::ActivityClient; +use crate::{error::LayerDbResult, event::LayeredEventMetadata}; + /// The message that the server receives to perform a rebase. #[derive(Debug, Serialize, Deserialize, Copy, Clone, PartialEq, Eq)] pub struct RebaseRequest { @@ -9,7 +18,7 @@ pub struct RebaseRequest { pub to_rebase_change_set_id: Ulid, /// Corresponds to the workspace snapshot that will be the "onto" workspace snapshot when /// rebasing the "to rebase" workspace snapshot. - pub onto_workspace_snapshot_id: Ulid, + pub onto_workspace_snapshot_address: WorkspaceSnapshotAddress, /// Derived from the ephemeral or persisted change set that's either the base change set, the /// last change set before edits were made, or the change set that you are trying to rebase /// onto base. @@ -19,12 +28,12 @@ pub struct RebaseRequest { impl RebaseRequest { pub fn new( to_rebase_change_set_id: Ulid, - onto_workspace_snapshot_id: Ulid, + onto_workspace_snapshot_address: WorkspaceSnapshotAddress, onto_vector_clock_id: Ulid, ) -> RebaseRequest { RebaseRequest { to_rebase_change_set_id, - onto_workspace_snapshot_id, + onto_workspace_snapshot_address, onto_vector_clock_id, } } @@ -34,37 +43,51 @@ impl RebaseRequest { pub struct RebaseFinished { status: RebaseStatus, to_rebase_change_set_id: Ulid, - onto_workspace_snapshot_id: Ulid, + onto_workspace_snapshot_address: WorkspaceSnapshotAddress, } impl RebaseFinished { pub fn new( status: RebaseStatus, to_rebase_change_set_id: Ulid, - onto_workspace_snapshot_id: Ulid, + onto_workspace_snapshot_address: WorkspaceSnapshotAddress, ) -> RebaseFinished { RebaseFinished { status, to_rebase_change_set_id, - onto_workspace_snapshot_id, + onto_workspace_snapshot_address, } } + + pub fn status(&self) -> &RebaseStatus { + &self.status + } + + pub fn to_rebase_change_set_id(&self) -> &Ulid { + &self.to_rebase_change_set_id + } + + pub fn onto_workspace_snapshot_address(&self) -> &WorkspaceSnapshotAddress { + &self.onto_workspace_snapshot_address + } } +// NOTE: We're basically smashing the data in here, and we really do have to figure out what we +// actually want when things work / or don't work. #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] pub enum RebaseStatus { /// Processing the request and performing updates were both successful. Additionally, no conflicts were found. Success { /// The serialized updates performed when rebasing. - updates_performed: Value, + updates_performed: String, }, /// Conflicts found when processing the request. ConflictsFound { /// A serialized list of the conflicts found during detection. - conflicts_found: Value, + conflicts_found: String, /// A serialized list of the updates found during detection and skipped because at least /// once conflict was found. - updates_found_and_skipped: Value, + updates_found_and_skipped: String, }, /// Error encountered when processing the request. Error { @@ -72,3 +95,91 @@ pub enum RebaseStatus { message: String, }, } + +#[derive(Debug)] +pub struct ActivityRebase<'a> { + activity_base: &'a ActivityClient, +} + +impl<'a> ActivityRebase<'a> { + pub fn new(activity_base: &'a ActivityClient) -> Self { + Self { activity_base } + } + + #[instrument(name = "activity::rebase::rebase", level = "info")] + pub async fn rebase( + &self, + to_rebase_change_set_id: Ulid, + onto_workspace_snapshot_address: WorkspaceSnapshotAddress, + onto_vector_clock_id: Ulid, + metadata: LayeredEventMetadata, + ) -> LayerDbResult { + let payload = RebaseRequest::new( + to_rebase_change_set_id, + onto_workspace_snapshot_address, + onto_vector_clock_id, + ); + let activity = Activity::rebase(payload, metadata); + self.activity_base.publish(&activity).await?; + Ok(activity) + } + + #[instrument(name = "activity::rebase::rebase_and_wait", level = "info")] + pub async fn rebase_and_wait( + &self, + to_rebase_change_set_id: Ulid, + onto_workspace_snapshot_address: WorkspaceSnapshotAddress, + onto_vector_clock_id: Ulid, + metadata: LayeredEventMetadata, + ) -> LayerDbResult { + let payload = RebaseRequest::new( + to_rebase_change_set_id, + onto_workspace_snapshot_address, + onto_vector_clock_id, + ); + let activity = Activity::rebase(payload, metadata); + debug!(?activity, "sending rebase and waiting for response"); + + // Why is this in two? We want to start listening before the publish call, to ensure we + // aren't racing with any listening service. + let start = Instant::now(); + let rx = self.rebase_finished_activity_stream().await?; + let join_handle = + tokio::spawn(ActivityClient::wait_for_parent_activity_id(rx, activity.id)); + self.activity_base.publish(&activity).await?; + let rebase_finished_activity = join_handle.await??; + debug!(?rebase_finished_activity, elapsed = ?start.elapsed(), "received rebase finished"); + Ok(rebase_finished_activity) + } + + pub async fn rebase_finished_activity_stream( + &self, + ) -> LayerDbResult> { + self.activity_base + .subscribe(Some(ActivityPayloadDiscriminants::RebaseFinished)) + .await + } + + #[instrument(name = "activity::rebase::rebase_finished", level = "info")] + pub async fn finished( + &self, + status: RebaseStatus, + to_rebase_change_set_id: Ulid, + onto_workspace_snapshot_address: WorkspaceSnapshotAddress, + metadata: LayeredEventMetadata, + parent_activity_id: ActivityId, + ) -> LayerDbResult { + let payload = RebaseFinished::new( + status, + to_rebase_change_set_id, + onto_workspace_snapshot_address, + ); + let activity = Activity::rebase_finished(payload, metadata, parent_activity_id); + self.activity_base.publish(&activity).await?; + Ok(activity) + } + + pub async fn subscribe_work_queue(&self) -> LayerDbResult { + RebaserRequestsWorkQueueStream::create(self.activity_base.nats_client()).await + } +} diff --git a/lib/si-layer-cache/src/activities/test.rs b/lib/si-layer-cache/src/activities/test.rs new file mode 100644 index 0000000000..49c7fee82d --- /dev/null +++ b/lib/si-layer-cache/src/activities/test.rs @@ -0,0 +1,91 @@ +use serde::{Deserialize, Serialize}; + +use crate::{activity_client::ActivityClient, error::LayerDbResult, event::LayeredEventMetadata}; + +use super::{Activity, ActivityId, ActivityPayload}; + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +pub struct IntegrationTest { + pub name: String, +} + +impl IntegrationTest { + pub fn new(name: impl Into) -> Self { + Self { name: name.into() } + } +} + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +pub struct IntegrationTestAlt { + pub name: String, +} + +impl IntegrationTestAlt { + pub fn new(name: impl Into) -> Self { + Self { name: name.into() } + } +} + +#[derive(Debug)] +pub struct ActivityIntegrationTest<'a> { + activity_client: &'a ActivityClient, +} + +impl<'a> ActivityIntegrationTest<'a> { + pub fn new(activity_client: &'a ActivityClient) -> Self { + Self { activity_client } + } + + pub async fn integration_test( + &self, + name: impl Into, + metadata: LayeredEventMetadata, + parent_activity_id: Option, + ) -> LayerDbResult { + let activity = + Activity::integration_test(IntegrationTest::new(name), metadata, parent_activity_id); + self.activity_client.publish(&activity).await?; + Ok(activity) + } + + pub async fn integration_test_alt( + &self, + name: impl Into, + metadata: LayeredEventMetadata, + parent_activity_id: Option, + ) -> LayerDbResult { + let activity = Activity::integration_test_alt( + IntegrationTestAlt::new(name), + metadata, + parent_activity_id, + ); + self.activity_client.publish(&activity).await?; + Ok(activity) + } +} + +impl Activity { + pub fn integration_test( + payload: IntegrationTest, + metadata: LayeredEventMetadata, + parent_activity_id: Option, + ) -> Activity { + Activity::new( + ActivityPayload::IntegrationTest(payload), + metadata, + parent_activity_id, + ) + } + + pub fn integration_test_alt( + payload: IntegrationTestAlt, + metadata: LayeredEventMetadata, + parent_activity_id: Option, + ) -> Activity { + Activity::new( + ActivityPayload::IntegrationTestAlt(payload), + metadata, + parent_activity_id, + ) + } +} diff --git a/lib/si-layer-cache/src/activity_client.rs b/lib/si-layer-cache/src/activity_client.rs new file mode 100644 index 0000000000..dbbd52a901 --- /dev/null +++ b/lib/si-layer-cache/src/activity_client.rs @@ -0,0 +1,130 @@ +use std::time::Duration; + +use si_data_nats::NatsClient; +use telemetry::prelude::*; +use tokio::pin; +use tokio_stream::{wrappers::BroadcastStream, StreamExt}; +use tokio_util::sync::CancellationToken; +use ulid::Ulid; + +use crate::activities::rebase::ActivityRebase; +use crate::activities::test::ActivityIntegrationTest; +use crate::activities::{ + Activity, ActivityId, ActivityMultiplexer, ActivityPayloadDiscriminants, ActivityPublisher, +}; +use crate::error::{LayerDbError, LayerDbResult}; + +#[derive(Debug, Clone)] +#[allow(dead_code)] +pub struct ActivityClient { + instance_id: Ulid, + nats_client: NatsClient, + activity_publisher: ActivityPublisher, + activity_multiplexer: ActivityMultiplexer, +} + +impl ActivityClient { + pub fn new( + instance_id: Ulid, + nats_client: NatsClient, + shutdown_token: CancellationToken, + ) -> ActivityClient { + let activity_publisher = ActivityPublisher::new(&nats_client); + let activity_multiplexer = + ActivityMultiplexer::new(instance_id, nats_client.clone(), shutdown_token); + + ActivityClient { + activity_publisher, + activity_multiplexer, + instance_id, + nats_client, + } + } + + pub fn nats_client(&self) -> &NatsClient { + &self.nats_client + } + + pub fn activity_publisher(&self) -> &ActivityPublisher { + &self.activity_publisher + } + + pub fn activity_multiplexer(&self) -> &ActivityMultiplexer { + &self.activity_multiplexer + } + + // Publish an activity + #[instrument(name = "activity_base::publish", level = "trace")] + pub async fn publish(&self, activity: &Activity) -> LayerDbResult<()> { + self.activity_publisher.publish(activity).await + } + + // Subscribe to all activities, or provide an optional array of activity kinds + // to subscribe to. + pub async fn subscribe( + &self, + to_receive: impl IntoIterator, + ) -> LayerDbResult> { + Ok(BroadcastStream::new( + self.activity_multiplexer + .subscribe(Some(to_receive)) + .await?, + )) + } + + pub async fn subscribe_all(&self) -> LayerDbResult> { + Ok(BroadcastStream::new( + self.activity_multiplexer + .subscribe(None::>) + .await?, + )) + } + + pub async fn wait_for_parent_activity_id( + stream: BroadcastStream, + wait_for_parent_activity_id: ActivityId, + ) -> LayerDbResult { + let filter_stream = stream.filter(move |activity_result| { + if let Ok(activity) = activity_result { + if let Some(parent_activity_id) = activity.parent_activity_id { + parent_activity_id == wait_for_parent_activity_id + } else { + false + } + } else { + false + } + }); + let timeout_stream = filter_stream.timeout(Duration::from_secs(30)); + pin!(timeout_stream); + if let Some(activity_result_or_timeout) = timeout_stream.next().await { + match activity_result_or_timeout { + Ok(activity_result) => match activity_result { + Ok(activity) => return Ok(activity), + Err(_) => { + return Err(LayerDbError::ActivityWaitLagged( + wait_for_parent_activity_id, + )) + } + }, + Err(elapsed) => { + return Err(LayerDbError::ActivityWaitTimeout( + wait_for_parent_activity_id, + elapsed, + )); + } + } + } + Err(LayerDbError::ActivityWaitClosed( + wait_for_parent_activity_id, + )) + } + + pub fn rebase(&self) -> ActivityRebase { + ActivityRebase::new(self) + } + + pub fn test(&self) -> ActivityIntegrationTest { + ActivityIntegrationTest::new(self) + } +} diff --git a/lib/si-layer-cache/src/db.rs b/lib/si-layer-cache/src/db.rs index 1a2d20aa40..caa7494489 100644 --- a/lib/si-layer-cache/src/db.rs +++ b/lib/si-layer-cache/src/db.rs @@ -9,10 +9,7 @@ use tokio_util::{sync::CancellationToken, task::TaskTracker}; use ulid::Ulid; use crate::{ - activities::{ - Activity, ActivityPayloadDiscriminants, ActivityPublisher, ActivityStream, - RebaserRequestsWorkQueueStream, - }, + activity_client::ActivityClient, error::LayerDbResult, layer_cache::LayerCache, persister::{PersisterClient, PersisterTask}, @@ -36,7 +33,7 @@ where pg_pool: PgPool, nats_client: NatsClient, persister_client: PersisterClient, - activity_publisher: ActivityPublisher, + activity: ActivityClient, instance_id: Ulid, } @@ -94,12 +91,12 @@ where let cas = CasDb::new(cas_cache, persister_client.clone()); let workspace_snapshot = WorkspaceSnapshotDb::new(snapshot_cache, persister_client.clone()); - let activity_publisher = ActivityPublisher::new(&nats_client); + let activity = ActivityClient::new(instance_id, nats_client.clone(), token.clone()); let graceful_shutdown = LayerDbGracefulShutdown { tracker, token }; let layerdb = LayerDb { - activity_publisher, + activity, cas, workspace_snapshot, sled, @@ -140,6 +137,10 @@ where self.instance_id } + pub fn activity(&self) -> &ActivityClient { + &self.activity + } + /// Run all migrations pub async fn pg_migrate(&self) -> LayerDbResult<()> { // This will do all migrations, not just "cas" migrations. We might want @@ -148,35 +149,6 @@ where Ok(()) } - - // Publish an activity - pub async fn publish_activity(&self, activity: &Activity) -> LayerDbResult<()> { - self.activity_publisher.publish(activity).await - } - - // Subscribe to all activities, or provide an optional array of activity kinds - // to subscribe to. - pub async fn subscribe_activities( - &self, - to_receive: impl IntoIterator, - ) -> LayerDbResult { - ActivityStream::create(self.instance_id, &self.nats_client, Some(to_receive)).await - } - - pub async fn subscribe_all_activities(&self) -> LayerDbResult { - ActivityStream::create( - self.instance_id, - &self.nats_client, - None::>, - ) - .await - } - - pub async fn subscribe_rebaser_requests_work_queue( - &self, - ) -> LayerDbResult { - RebaserRequestsWorkQueueStream::create(&self.nats_client).await - } } #[must_use = "graceful shutdown must be spawned on runtime"] diff --git a/lib/si-layer-cache/src/db/cache_updates.rs b/lib/si-layer-cache/src/db/cache_updates.rs index 2276f826ee..3136569c1c 100644 --- a/lib/si-layer-cache/src/db/cache_updates.rs +++ b/lib/si-layer-cache/src/db/cache_updates.rs @@ -2,7 +2,10 @@ use std::{str::FromStr, sync::Arc}; use futures::StreamExt; use serde::{de::DeserializeOwned, Serialize}; -use si_data_nats::{async_nats::jetstream, NatsClient}; +use si_data_nats::{ + async_nats::jetstream::{self, consumer::DeliverPolicy}, + NatsClient, +}; use strum::{AsRefStr, EnumString}; use telemetry::prelude::*; use tokio_util::sync::CancellationToken; @@ -72,6 +75,9 @@ where while let Some(result) = self.messages.next().await { match result { Ok(msg) => { + if let Err(e) = msg.ack().await { + warn!(error = ?e, "error acknowledging message from stream"); + } let cache_update_task = CacheUpdateTask::new( self.instance_id, self.cas_cache.clone(), @@ -104,6 +110,7 @@ where jetstream::consumer::pull::Config { name: Some(name), description: Some(description), + deliver_policy: DeliverPolicy::New, ..Default::default() } } @@ -220,6 +227,10 @@ where } async fn run(&self, msg: chunking_nats::Message) { + error!( + subject = msg.subject.as_str(), + "processing message for layerdb event" + ); match self.process_message(msg).await { Ok(()) => {} Err(e) => { diff --git a/lib/si-layer-cache/src/error.rs b/lib/si-layer-cache/src/error.rs index a745f68367..9b7248fce8 100644 --- a/lib/si-layer-cache/src/error.rs +++ b/lib/si-layer-cache/src/error.rs @@ -5,12 +5,22 @@ use si_data_pg::{PgError, PgPoolError}; use si_events::content_hash::ContentHashParseError; use si_std::CanonicalFileError; use thiserror::Error; +use tokio_stream::Elapsed; -use crate::persister::{PersistMessage, PersisterTaskError}; +use crate::{ + activities::ActivityId, + persister::{PersistMessage, PersisterTaskError}, +}; #[remain::sorted] #[derive(Error, Debug)] pub enum LayerDbError { + #[error("While waiting for an activity id {0}, all senders have closed. The activity will never arrive.")] + ActivityWaitClosed(ActivityId), + #[error("While waiting for an activity id {0}, the receiving stream has lagged. Cancelling.")] + ActivityWaitLagged(ActivityId), + #[error("Timed out waiting for activity id {0} after {1}")] + ActivityWaitTimeout(ActivityId, Elapsed), #[error("cache update message with bad headers: {0}")] CacheUpdateBadHeaders(String), #[error("cache update message had no headers")] diff --git a/lib/si-layer-cache/src/lib.rs b/lib/si-layer-cache/src/lib.rs index 58bc41f89e..4ead92deaa 100644 --- a/lib/si-layer-cache/src/lib.rs +++ b/lib/si-layer-cache/src/lib.rs @@ -24,6 +24,7 @@ //! pub mod activities; +mod activity_client; pub mod chunking_nats; pub mod db; pub mod disk_cache; diff --git a/lib/si-layer-cache/tests/integration_test/activities.rs b/lib/si-layer-cache/tests/integration_test/activities.rs new file mode 100644 index 0000000000..83f4a9d6ba --- /dev/null +++ b/lib/si-layer-cache/tests/integration_test/activities.rs @@ -0,0 +1,140 @@ +mod rebase; + +use std::sync::Arc; + +use futures::StreamExt; +use si_events::{Actor, ChangeSetId, Tenancy, WorkspacePk}; +use si_layer_cache::{ + activities::ActivityPayloadDiscriminants, event::LayeredEventMetadata, LayerDb, +}; +use tokio_util::sync::CancellationToken; + +use crate::integration_test::{setup_nats_client, setup_pg_db}; + +type TestLayerDb = LayerDb, String>; + +#[tokio::test] +async fn activities() { + let token = CancellationToken::new(); + + let tempdir_slash = tempfile::TempDir::new_in("/tmp").expect("cannot create tempdir"); + let tempdir_axl = tempfile::TempDir::new_in("/tmp").expect("cannot create tempdir"); + let db = setup_pg_db("activities").await; + + // First, we need a layerdb for slash + let (ldb_slash, _): (TestLayerDb, _) = LayerDb::initialize( + tempdir_slash, + db.clone(), + setup_nats_client(Some("activities".to_string())).await, + token.clone(), + ) + .await + .expect("cannot create layerdb"); + ldb_slash.pg_migrate().await.expect("migrate layerdb"); + + // Then, we need a layerdb for axl + let (ldb_axl, _): (TestLayerDb, _) = LayerDb::initialize( + tempdir_axl, + db, + setup_nats_client(Some("activities".to_string())).await, + token.clone(), + ) + .await + .expect("cannot create layerdb"); + ldb_axl.pg_migrate().await.expect("migrate layerdb"); + + // Create our metadata + let tenancy = Tenancy::new(WorkspacePk::new(), ChangeSetId::new()); + let actor = Actor::System; + let metadata = LayeredEventMetadata::new(tenancy, actor); + + // Subscribe to all activities + let mut activities = ldb_axl + .activity() + .subscribe_all() + .await + .expect("cannot subscribe to all activities"); + + // Publish an activity + let activity = ldb_slash + .activity() + .test() + .integration_test("drop me the bomb", metadata, None) + .await + .expect("cannot publish activity"); + + let restored_activity = activities + .next() + .await + .expect("no message waiting when one was expected") + .expect("error receiving message"); + assert_eq!(activity, restored_activity); +} + +#[tokio::test] +async fn activities_subscribe_partial() { + let token = CancellationToken::new(); + + let tempdir_slash = tempfile::TempDir::new_in("/tmp").expect("cannot create tempdir"); + let tempdir_axl = tempfile::TempDir::new_in("/tmp").expect("cannot create tempdir"); + let db = setup_pg_db("activities_subscribe_partial").await; + + // First, we need a layerdb for slash + let (ldb_slash, _): (TestLayerDb, _) = LayerDb::initialize( + tempdir_slash, + db.clone(), + setup_nats_client(Some("activities_subscribe_partial".to_string())).await, + token.clone(), + ) + .await + .expect("cannot create layerdb"); + ldb_slash.pg_migrate().await.expect("migrate layerdb"); + + // Then, we need a layerdb for axl + let (ldb_axl, _): (TestLayerDb, _) = LayerDb::initialize( + tempdir_axl, + db, + setup_nats_client(Some("activities_subscribe_partial".to_string())).await, + token.clone(), + ) + .await + .expect("cannot create layerdb"); + ldb_axl.pg_migrate().await.expect("migrate layerdb"); + + // Subscribe to only rebase finished activities + let mut activities = ldb_axl + .activity() + .subscribe(vec![ActivityPayloadDiscriminants::IntegrationTestAlt]) + .await + .expect("cannot subscribe to all activities"); + + let tenancy = Tenancy::new(WorkspacePk::new(), ChangeSetId::new()); + let actor = Actor::System; + let metadata = LayeredEventMetadata::new(tenancy, actor); + + // Publish an activity + ldb_slash + .activity() + .test() + .integration_test("skid row", metadata.clone(), None) + .await + .expect("cannot publish activity"); + + // Publish an activity + let activity = ldb_slash + .activity() + .test() + .integration_test_alt("kix", metadata, None) + .await + .expect("cannot publish activity"); + + // The nats publishing rules would require that the first activity (the rebase request) be + // recieved before the second (the rebase finished event). So we can confirm we have subject + // filtering working. + let restored_activity = activities + .next() + .await + .expect("no message waiting") + .expect("error receiving message"); + assert_eq!(activity, restored_activity); +} diff --git a/lib/si-layer-cache/tests/integration_test/activities/rebase.rs b/lib/si-layer-cache/tests/integration_test/activities/rebase.rs new file mode 100644 index 0000000000..61558fcf65 --- /dev/null +++ b/lib/si-layer-cache/tests/integration_test/activities/rebase.rs @@ -0,0 +1,226 @@ +use std::sync::Arc; + +use futures::StreamExt; +use si_events::{Actor, ChangeSetId, Tenancy, WorkspacePk, WorkspaceSnapshotAddress}; +use si_layer_cache::{activities::ActivityId, event::LayeredEventMetadata, LayerDb}; +use tokio_util::sync::CancellationToken; +use ulid::Ulid; + +use crate::integration_test::{setup_nats_client, setup_pg_db}; + +type TestLayerDb = LayerDb, String>; + +#[tokio::test] +async fn subscribe_rebaser_requests_work_queue() { + let token = CancellationToken::new(); + + let tempdir_slash = tempfile::TempDir::new_in("/tmp").expect("cannot create tempdir"); + let tempdir_axl = tempfile::TempDir::new_in("/tmp").expect("cannot create tempdir"); + let tempdir_duff = tempfile::TempDir::new_in("/tmp").expect("cannot create tempdir"); + let db = setup_pg_db("subscribe_rebaser_requests_work_queue").await; + + // we need a layerdb for slash, which will be a consumer of our work queue + let (ldb_slash, _): (TestLayerDb, _) = LayerDb::initialize( + tempdir_slash, + db.clone(), + setup_nats_client(Some("subscribe_rebaser_requests_work_queue".to_string())).await, + token.clone(), + ) + .await + .expect("cannot create layerdb"); + ldb_slash.pg_migrate().await.expect("migrate layerdb"); + + // we need a layerdb for axl, who will also be a consumer for our work queue + let (ldb_axl, _): (TestLayerDb, _) = LayerDb::initialize( + tempdir_axl, + db.clone(), + setup_nats_client(Some("subscribe_rebaser_requests_work_queue".to_string())).await, + token.clone(), + ) + .await + .expect("cannot create layerdb"); + ldb_axl.pg_migrate().await.expect("migrate layerdb"); + + // we need a layerdb for duff, who will also be a consumer for our work queue + let (ldb_duff, _): (TestLayerDb, _) = LayerDb::initialize( + tempdir_duff, + db, + setup_nats_client(Some("subscribe_rebaser_requests_work_queue".to_string())).await, + token.clone(), + ) + .await + .expect("cannot create layerdb"); + ldb_duff.pg_migrate().await.expect("migrate layerdb"); + + // Subscribe to a work queue of rebase activities on axl and slash + let mut axl_work_queue = ldb_axl + .activity() + .rebase() + .subscribe_work_queue() + .await + .expect("cannot retrieve a work queue"); + let mut slash_work_queue = ldb_slash + .activity() + .rebase() + .subscribe_work_queue() + .await + .expect("cannot retrieve a work queue"); + + let tenancy = Tenancy::new(WorkspacePk::new(), ChangeSetId::new()); + let actor = Actor::System; + let metadata = LayeredEventMetadata::new(tenancy, actor); + + let rebase_request_activity = ldb_duff + .activity() + .rebase() + .rebase( + Ulid::new(), + WorkspaceSnapshotAddress::new(b"poop"), + Ulid::new(), + metadata.clone(), + ) + .await + .expect("cannot publish rebase request"); + + // Send a rebase finished activity + let _rebase_finished_activity = ldb_duff + .activity() + .rebase() + .finished( + si_layer_cache::activities::rebase::RebaseStatus::Error { + message: "poop".to_string(), + }, + Ulid::new(), + WorkspaceSnapshotAddress::new(b"skid row"), + metadata, + ActivityId::new(), + ) + .await + .expect("cannot send rebase finished"); + + let which = tokio::select! { + maybe_result = slash_work_queue.next() => { + let request = maybe_result.expect("had no messages").expect("cannot retrieve the ack rebase request"); + assert_eq!(request.id, rebase_request_activity.id); + request.ack().await.expect("cannot ack message"); + "slash".to_string() + }, + maybe_result = axl_work_queue.next() => { + let request = maybe_result.expect("had no messages").expect("cannot retrieve the ack rebase request"); + assert_eq!(request.id, rebase_request_activity.id); + request.ack().await.expect("cannot ack message"); + "axl".to_string() + }, + }; + + // This is long enough to confirm that we get once-and-only-once delivery. + // It isn't long enough to confirm that we didn't ack the payload, but that + // is totally fine - we don't need to test that NATS works as directed. + let sleep = tokio::time::sleep(tokio::time::Duration::from_millis(100)); + tokio::pin!(sleep); + + if which == "slash" { + tokio::select! { + maybe_result = axl_work_queue.next() => { + assert!(maybe_result.is_none(), "expected no work, but there is some work to do"); + }, + _ = &mut sleep => { + } + } + } else { + tokio::select! { + maybe_result = slash_work_queue.next() => { + assert!(maybe_result.is_none(), "expected no work, but there is some work to do"); + }, + _ = &mut sleep => { + } + } + } +} + +#[tokio::test] +async fn rebase_and_wait() { + let token = CancellationToken::new(); + + let tempdir_slash = tempfile::TempDir::new_in("/tmp").expect("cannot create tempdir"); + let tempdir_axl = tempfile::TempDir::new_in("/tmp").expect("cannot create tempdir"); + let db = setup_pg_db("rebase_and_wait").await; + + // we need a layerdb for slash, who will send the rebase request + let (ldb_slash, _): (TestLayerDb, _) = LayerDb::initialize( + tempdir_slash, + db.clone(), + setup_nats_client(Some("rebase_and_wait".to_string())).await, + token.clone(), + ) + .await + .expect("cannot create layerdb"); + ldb_slash.pg_migrate().await.expect("migrate layerdb"); + + // we need a layerdb for axl, who will send the reply + let (ldb_axl, _): (TestLayerDb, _) = LayerDb::initialize( + tempdir_axl, + db.clone(), + setup_nats_client(Some("rebase_and_wait".to_string())).await, + token.clone(), + ) + .await + .expect("cannot create layerdb"); + ldb_axl.pg_migrate().await.expect("migrate layerdb"); + + // Subscribe to a work queue of rebase activities on axl + let mut axl_work_queue = ldb_axl + .activity() + .rebase() + .subscribe_work_queue() + .await + .expect("cannot retrieve a work queue"); + + let tenancy = Tenancy::new(WorkspacePk::new(), ChangeSetId::new()); + let actor = Actor::System; + let metadata = LayeredEventMetadata::new(tenancy, actor); + let metadata_for_task = metadata.clone(); + + let rebase_request_task = tokio::spawn(async move { + ldb_slash + .activity() + .rebase() + .rebase_and_wait( + Ulid::new(), + WorkspaceSnapshotAddress::new(b"poop"), + Ulid::new(), + metadata_for_task, + ) + .await + }); + + let rebase_request = axl_work_queue + .next() + .await + .expect("should have an message, but the channel is closed") + .expect("should have a rebase request, but we have an error"); + rebase_request.ack().await.expect("cannot ack the message"); + + // Send a rebase finished activity + let rebase_finished_activity = ldb_axl + .activity() + .rebase() + .finished( + si_layer_cache::activities::rebase::RebaseStatus::Error { + message: "poop".to_string(), + }, + Ulid::new(), + WorkspaceSnapshotAddress::new(b"skid row"), + metadata, + rebase_request.id, + ) + .await + .expect("cannot send rebase finished"); + + let received_finish_activity = rebase_request_task + .await + .expect("rebase request task failed") + .expect("expected rebase finished activity, but got an error"); + + assert_eq!(received_finish_activity, rebase_finished_activity); +} diff --git a/lib/si-layer-cache/tests/integration_test/db/mod.rs b/lib/si-layer-cache/tests/integration_test/db/mod.rs index e6b22f29ba..027a01a0f6 100644 --- a/lib/si-layer-cache/tests/integration_test/db/mod.rs +++ b/lib/si-layer-cache/tests/integration_test/db/mod.rs @@ -1,275 +1 @@ -use std::sync::Arc; - -use futures::StreamExt; -use si_events::{Actor, ChangeSetId, Tenancy, WorkspacePk}; -use si_layer_cache::{ - activities::{ - rebase::{RebaseFinished, RebaseRequest}, - Activity, ActivityPayloadDiscriminants, - }, - event::LayeredEventMetadata, - LayerDb, -}; -use tokio_util::sync::CancellationToken; -use ulid::Ulid; - -use crate::integration_test::{setup_nats_client, setup_pg_db}; - mod cas; - -type TestLayerDb = LayerDb, String>; - -#[tokio::test] -async fn activities() { - let token = CancellationToken::new(); - - let tempdir_slash = tempfile::TempDir::new_in("/tmp").expect("cannot create tempdir"); - let tempdir_axl = tempfile::TempDir::new_in("/tmp").expect("cannot create tempdir"); - let db = setup_pg_db("activities").await; - - // First, we need a layerdb for slash - let (ldb_slash, _): (TestLayerDb, _) = LayerDb::initialize( - tempdir_slash, - db.clone(), - setup_nats_client(Some("activities".to_string())).await, - token.clone(), - ) - .await - .expect("cannot create layerdb"); - ldb_slash.pg_migrate().await.expect("migrate layerdb"); - - // Then, we need a layerdb for axl - let (ldb_axl, _): (TestLayerDb, _) = LayerDb::initialize( - tempdir_axl, - db, - setup_nats_client(Some("activities".to_string())).await, - token.clone(), - ) - .await - .expect("cannot create layerdb"); - ldb_axl.pg_migrate().await.expect("migrate layerdb"); - - // Subscribe to all activities - let mut activities = ldb_axl - .subscribe_all_activities() - .await - .expect("cannot subscribe to all activities"); - - let rebase_request = RebaseRequest::new(Ulid::new(), Ulid::new(), Ulid::new()); - let tenancy = Tenancy::new(WorkspacePk::new(), ChangeSetId::new()); - let actor = Actor::System; - let metadata = LayeredEventMetadata::new(tenancy, actor); - let activity = Activity::rebase(rebase_request, metadata); - // Publish an activity - ldb_slash - .publish_activity(&activity) - .await - .expect("cannot publish activity"); - - let (restored_activity, _acker) = activities - .next() - .await - .expect("no message waiting") - .expect("error receiving message") - .into_parts(); - assert_eq!(activity, restored_activity); -} - -#[tokio::test] -async fn activities_subscribe_partial() { - let token = CancellationToken::new(); - - let tempdir_slash = tempfile::TempDir::new_in("/tmp").expect("cannot create tempdir"); - let tempdir_axl = tempfile::TempDir::new_in("/tmp").expect("cannot create tempdir"); - let db = setup_pg_db("activities_subscribe_partial").await; - - // First, we need a layerdb for slash - let (ldb_slash, _): (TestLayerDb, _) = LayerDb::initialize( - tempdir_slash, - db.clone(), - setup_nats_client(Some("activities_subscribe_partial".to_string())).await, - token.clone(), - ) - .await - .expect("cannot create layerdb"); - ldb_slash.pg_migrate().await.expect("migrate layerdb"); - - // Then, we need a layerdb for axl - let (ldb_axl, _): (TestLayerDb, _) = LayerDb::initialize( - tempdir_axl, - db, - setup_nats_client(Some("activities_subscribe_partial".to_string())).await, - token.clone(), - ) - .await - .expect("cannot create layerdb"); - ldb_axl.pg_migrate().await.expect("migrate layerdb"); - - // Subscribe to only rebase finished activities - let mut activities = ldb_axl - .subscribe_activities(vec![ActivityPayloadDiscriminants::RebaseFinished]) - .await - .expect("cannot subscribe to all activities"); - - // Send a rebase request activity - let rebase_request = RebaseRequest::new(Ulid::new(), Ulid::new(), Ulid::new()); - let tenancy = Tenancy::new(WorkspacePk::new(), ChangeSetId::new()); - let actor = Actor::System; - let metadata = LayeredEventMetadata::new(tenancy, actor); - let rebase_request_activity = Activity::rebase(rebase_request, metadata); - // Publish an activity - ldb_slash - .publish_activity(&rebase_request_activity) - .await - .expect("cannot publish activity"); - - // Send a rebase finished activity - let rebase_finished = RebaseFinished::new( - si_layer_cache::activities::rebase::RebaseStatus::Error { - message: "poop".to_string(), - }, - Ulid::new(), - Ulid::new(), - ); - let tenancy = Tenancy::new(WorkspacePk::new(), ChangeSetId::new()); - let actor = Actor::System; - let metadata = LayeredEventMetadata::new(tenancy, actor); - let rebase_finished_activity = Activity::rebase_finished(rebase_finished, metadata); - // Publish an activity - ldb_slash - .publish_activity(&rebase_finished_activity) - .await - .expect("cannot publish activity"); - - // The nats publishing rules would require that the first activity (the rebase request) be - // recieved before the second (the rebase finished event). So we can confirm we have subject - // filtering working. - let (restored_activity, _acker) = activities - .next() - .await - .expect("no message waiting") - .expect("error receiving message") - .into_parts(); - assert_eq!(rebase_finished_activity, restored_activity); -} - -#[tokio::test] -async fn subscribe_rebaser_requests_work_queue() { - let token = CancellationToken::new(); - - let tempdir_slash = tempfile::TempDir::new_in("/tmp").expect("cannot create tempdir"); - let tempdir_axl = tempfile::TempDir::new_in("/tmp").expect("cannot create tempdir"); - let tempdir_duff = tempfile::TempDir::new_in("/tmp").expect("cannot create tempdir"); - let db = setup_pg_db("subscribe_rebaser_requests_work_queue").await; - - // we need a layerdb for slash, which will be a consumer of our work queue - let (ldb_slash, _): (TestLayerDb, _) = LayerDb::initialize( - tempdir_slash, - db.clone(), - setup_nats_client(Some("subscribe_rebaser_requests_work_queue".to_string())).await, - token.clone(), - ) - .await - .expect("cannot create layerdb"); - ldb_slash.pg_migrate().await.expect("migrate layerdb"); - - // we need a layerdb for axl, who will also be a consumer for our work queue - let (ldb_axl, _): (TestLayerDb, _) = LayerDb::initialize( - tempdir_axl, - db.clone(), - setup_nats_client(Some("subscribe_rebaser_requests_work_queue".to_string())).await, - token.clone(), - ) - .await - .expect("cannot create layerdb"); - ldb_axl.pg_migrate().await.expect("migrate layerdb"); - - // we need a layerdb for duff, who will also be a consumer for our work queue - let (ldb_duff, _): (TestLayerDb, _) = LayerDb::initialize( - tempdir_duff, - db, - setup_nats_client(Some("subscribe_rebaser_requests_work_queue".to_string())).await, - token.clone(), - ) - .await - .expect("cannot create layerdb"); - ldb_duff.pg_migrate().await.expect("migrate layerdb"); - - // Subscribe to a work queue of rebase activities on axl and slash - let mut axl_work_queue = ldb_axl - .subscribe_rebaser_requests_work_queue() - .await - .expect("cannot retrieve a work queue"); - let mut slash_work_queue = ldb_slash - .subscribe_rebaser_requests_work_queue() - .await - .expect("cannot retrieve a work queue"); - - // Send a rebase request activity from duff - let rebase_request = RebaseRequest::new(Ulid::new(), Ulid::new(), Ulid::new()); - let tenancy = Tenancy::new(WorkspacePk::new(), ChangeSetId::new()); - let actor = Actor::System; - let metadata = LayeredEventMetadata::new(tenancy, actor); - let rebase_request_activity = Activity::rebase(rebase_request, metadata); - // Publish an activity - ldb_duff - .publish_activity(&rebase_request_activity) - .await - .expect("cannot publish activity"); - - // Send a rebase finished activity - let rebase_finished = RebaseFinished::new( - si_layer_cache::activities::rebase::RebaseStatus::Error { - message: "poop".to_string(), - }, - Ulid::new(), - Ulid::new(), - ); - let tenancy = Tenancy::new(WorkspacePk::new(), ChangeSetId::new()); - let actor = Actor::System; - let metadata = LayeredEventMetadata::new(tenancy, actor); - let rebase_finished_activity = Activity::rebase_finished(rebase_finished, metadata); - ldb_duff - .publish_activity(&rebase_finished_activity) - .await - .expect("cannot publish activity"); - - let which = tokio::select! { - maybe_result = slash_work_queue.next() => { - let request = maybe_result.expect("had no messages").expect("cannot retrieve the ack rebase request"); - assert_eq!(request.payload, rebase_request); - request.ack().await.expect("cannot ack message"); - "slash".to_string() - }, - maybe_result = axl_work_queue.next() => { - let request = maybe_result.expect("had no messages").expect("cannot retrieve the ack rebase request"); - assert_eq!(request.payload, rebase_request); - request.ack().await.expect("cannot ack message"); - "axl".to_string() - }, - }; - - // This is long enough to confirm that we get once-and-only-once delivery. - // It isn't long enough to confirm that we didn't ack the payload, but that - // is totally fine - we don't need to test that NATS works as directed. - let sleep = tokio::time::sleep(tokio::time::Duration::from_millis(100)); - tokio::pin!(sleep); - - if which == "slash" { - tokio::select! { - maybe_result = axl_work_queue.next() => { - assert!(maybe_result.is_none(), "expected no work, but there is some work to do"); - }, - _ = &mut sleep => { - } - } - } else { - tokio::select! { - maybe_result = slash_work_queue.next() => { - assert!(maybe_result.is_none(), "expected no work, but there is some work to do"); - }, - _ = &mut sleep => { - } - } - } -} diff --git a/lib/si-layer-cache/tests/integration_test/mod.rs b/lib/si-layer-cache/tests/integration_test/mod.rs index 98e009f148..993200d91e 100644 --- a/lib/si-layer-cache/tests/integration_test/mod.rs +++ b/lib/si-layer-cache/tests/integration_test/mod.rs @@ -6,6 +6,7 @@ use std::path::Path; use crate::TEST_PG_DBNAME; +mod activities; mod chunking_nats; mod db; mod disk_cache;