diff --git a/golem-worker-executor-base/src/lib.rs b/golem-worker-executor-base/src/lib.rs index ca93f75ad..630cbf9c7 100644 --- a/golem-worker-executor-base/src/lib.rs +++ b/golem-worker-executor-base/src/lib.rs @@ -55,7 +55,7 @@ use crate::services::worker_enumeration::{ RunningWorkerEnumerationServiceDefault, WorkerEnumerationService, }; use crate::services::worker_proxy::{RemoteWorkerProxy, WorkerProxy}; -use crate::services::{component, shard_manager, All}; +use crate::services::{component, shard_manager, All, HasConfig}; use crate::storage::indexed::redis::RedisIndexedStorage; use crate::storage::indexed::sqlite::SqliteIndexedStorage; use crate::storage::indexed::IndexedStorage; @@ -108,6 +108,57 @@ pub trait Bootstrap { /// Allows customizing the `ActiveWorkers` service. fn create_active_workers(&self, golem_config: &GolemConfig) -> Arc>; + async fn run_server( + &self, + service_dependencies: All, + lazy_worker_activator: Arc>, + join_set: &mut JoinSet>, + ) -> anyhow::Result<()> { + let golem_config = service_dependencies.config(); + let (mut health_reporter, health_service) = tonic_health::server::health_reporter(); + health_reporter + .set_serving::>>>() + .await; + + let reflection_service = tonic_reflection::server::Builder::configure() + .register_encoded_file_descriptor_set(proto::FILE_DESCRIPTOR_SET) + .build_v1()?; + + let addr = golem_config.grpc_addr()?; + + let listener = TcpListener::bind(addr).await?; + let grpc_port = listener.local_addr()?.port(); + + let worker_impl = WorkerExecutorImpl::>::new( + service_dependencies, + lazy_worker_activator, + grpc_port, + ) + .await?; + + let service = WorkerExecutorServer::new(worker_impl) + .accept_compressed(CompressionEncoding::Gzip) + .send_compressed(CompressionEncoding::Gzip); + + info!("Starting gRPC server on port {grpc_port}"); + + join_set.spawn( + async move { + Server::builder() + .max_concurrent_streams(Some(golem_config.limits.max_concurrent_streams)) + .add_service(reflection_service) + .add_service(service) + .add_service(health_service) + .serve_with_incoming(TcpListenerStream::new(listener)) + .await + .map_err(|err| anyhow!(err)) + } + .in_current_span(), + ); + + Ok(()) + } + #[allow(clippy::type_complexity)] fn create_plugins( &self, @@ -193,14 +244,21 @@ pub trait Bootstrap { ISizeFormatter::new(worker_memory, BINARY) ); - let (mut health_reporter, health_service) = tonic_health::server::health_reporter(); - health_reporter - .set_serving::>>>() - .await; + let addr = golem_config.grpc_addr()?; - let reflection_service = tonic_reflection::server::Builder::configure() - .register_encoded_file_descriptor_set(proto::FILE_DESCRIPTOR_SET) - .build_v1()?; + let lazy_worker_activator = Arc::new(LazyWorkerActivator::new()); + + let worker_executor_impl = create_worker_executor_impl::( + golem_config.clone(), + self, + runtime.clone(), + &lazy_worker_activator, + join_set, + ) + .await?; + + self.run_server(worker_executor_impl, lazy_worker_activator, join_set) + .await?; let http_port = golem_service_base::observability::start_health_and_metrics_server( golem_config.http_addr()?, @@ -210,160 +268,180 @@ pub trait Bootstrap { ) .await?; - let (redis, sqlite, key_value_storage): ( - Option, - Option, - Arc, - ) = match &golem_config.key_value_storage { - KeyValueStorageConfig::Redis(redis) => { - info!("Using Redis for key-value storage at {}", redis.url()); - let pool = RedisPool::configured(redis) + Ok(RunDetails { + http_port, + grpc_port: addr.port(), + }) + } +} + +async fn create_worker_executor_impl + ?Sized>( + golem_config: GolemConfig, + bootstrap: &A, + runtime: Handle, + lazy_worker_activator: &Arc>, + join_set: &mut JoinSet>, +) -> Result, anyhow::Error> { + let (redis, sqlite, key_value_storage): ( + Option, + Option, + Arc, + ) = match &golem_config.key_value_storage { + KeyValueStorageConfig::Redis(redis) => { + info!("Using Redis for key-value storage at {}", redis.url()); + let pool = RedisPool::configured(redis) + .await + .map_err(|err| anyhow!(err))?; + let key_value_storage: Arc = + Arc::new(RedisKeyValueStorage::new(pool.clone())); + (Some(pool), None, key_value_storage) + } + KeyValueStorageConfig::InMemory => { + info!("Using in-memory key-value storage"); + (None, None, Arc::new(InMemoryKeyValueStorage::new())) + } + KeyValueStorageConfig::Sqlite(sqlite) => { + info!("Using Sqlite for key-value storage at {}", sqlite.database); + let pool = SqlitePool::configured(sqlite) + .await + .map_err(|err| anyhow!(err))?; + let key_value_storage: Arc = Arc::new( + SqliteKeyValueStorage::new(pool.clone()) .await - .map_err(|err| anyhow!(err))?; - let key_value_storage: Arc = - Arc::new(RedisKeyValueStorage::new(pool.clone())); - (Some(pool), None, key_value_storage) - } - KeyValueStorageConfig::InMemory => { - info!("Using in-memory key-value storage"); - (None, None, Arc::new(InMemoryKeyValueStorage::new())) - } - KeyValueStorageConfig::Sqlite(sqlite) => { - info!("Using Sqlite for key-value storage at {}", sqlite.database); - let pool = SqlitePool::configured(sqlite) + .map_err(|err| anyhow!(err))?, + ); + (None, Some(pool), key_value_storage) + } + }; + + let indexed_storage: Arc = match &golem_config.indexed_storage + { + IndexedStorageConfig::KVStoreRedis => { + info!("Using the same Redis for indexed-storage"); + let redis = redis + .expect("Redis must be configured as key-value storage when using KVStoreRedis"); + Arc::new(RedisIndexedStorage::new(redis.clone())) + } + IndexedStorageConfig::Redis(redis) => { + info!("Using Redis for indexed-storage at {}", redis.url()); + let pool = RedisPool::configured(redis).await?; + Arc::new(RedisIndexedStorage::new(pool.clone())) + } + IndexedStorageConfig::KVStoreSqlite => { + info!("Using the same Sqlite for indexed-storage"); + let sqlite = sqlite + .clone() + .expect("Sqlite must be configured as key-value storage when using KVStoreSqlite"); + Arc::new( + SqliteIndexedStorage::new(sqlite.clone()) .await - .map_err(|err| anyhow!(err))?; - let key_value_storage: Arc = Arc::new( - SqliteKeyValueStorage::new(pool.clone()) - .await - .map_err(|err| anyhow!(err))?, - ); - (None, Some(pool), key_value_storage) - } - }; - - let indexed_storage: Arc = - match &golem_config.indexed_storage { - IndexedStorageConfig::KVStoreRedis => { - info!("Using the same Redis for indexed-storage"); - let redis = redis.expect( - "Redis must be configured as key-value storage when using KVStoreRedis", - ); - Arc::new(RedisIndexedStorage::new(redis.clone())) - } - IndexedStorageConfig::Redis(redis) => { - info!("Using Redis for indexed-storage at {}", redis.url()); - let pool = RedisPool::configured(redis).await?; - Arc::new(RedisIndexedStorage::new(pool.clone())) - } - IndexedStorageConfig::KVStoreSqlite => { - info!("Using the same Sqlite for indexed-storage"); - let sqlite = sqlite.clone().expect( - "Sqlite must be configured as key-value storage when using KVStoreSqlite", - ); - Arc::new( - SqliteIndexedStorage::new(sqlite.clone()) - .await - .map_err(|err| anyhow!(err))?, - ) - } - IndexedStorageConfig::Sqlite(sqlite) => { - info!("Using Sqlite for indexed storage at {}", sqlite.database); - let pool = SqlitePool::configured(sqlite) - .await - .map_err(|err| anyhow!(err))?; - Arc::new( - SqliteIndexedStorage::new(pool.clone()) - .await - .map_err(|err| anyhow!(err))?, - ) - } - IndexedStorageConfig::InMemory => { - info!("Using in-memory indexed storage"); - Arc::new(storage::indexed::memory::InMemoryIndexedStorage::new()) - } - }; - let blob_storage: Arc = match &golem_config.blob_storage { - BlobStorageConfig::S3(config) => { - info!("Using S3 for blob storage"); - Arc::new(S3BlobStorage::new(config.clone()).await) - } - BlobStorageConfig::LocalFileSystem(config) => { - info!( - "Using local file system for blob storage at {:?}", - config.root - ); - Arc::new( - golem_service_base::storage::blob::fs::FileSystemBlobStorage::new(&config.root) - .await - .map_err(|err| anyhow!(err))?, - ) - } - BlobStorageConfig::KVStoreSqlite => { - info!("Using the same Sqlite for blob-storage"); - let sqlite = sqlite.expect( - "Sqlite must be configured as key-value storage when using KVStoreSqlite", - ); - Arc::new( - SqliteBlobStorage::new(sqlite.clone()) - .await - .map_err(|err| anyhow!(err))?, - ) - } - BlobStorageConfig::Sqlite(sqlite) => { - info!("Using Sqlite for blob storage at {}", sqlite.database); - let pool = SqlitePool::configured(sqlite) + .map_err(|err| anyhow!(err))?, + ) + } + IndexedStorageConfig::Sqlite(sqlite) => { + info!("Using Sqlite for indexed storage at {}", sqlite.database); + let pool = SqlitePool::configured(sqlite) + .await + .map_err(|err| anyhow!(err))?; + Arc::new( + SqliteIndexedStorage::new(pool.clone()) .await - .map_err(|err| anyhow!(err))?; - Arc::new( - SqliteBlobStorage::new(pool.clone()) - .await - .map_err(|err| anyhow!(err))?, - ) - } - BlobStorageConfig::InMemory => { - info!("Using in-memory blob storage"); - Arc::new(golem_service_base::storage::blob::memory::InMemoryBlobStorage::new()) - } - }; - - let initial_files_service = - Arc::new(InitialComponentFilesService::new(blob_storage.clone())); - - let file_loader = Arc::new(FileLoader::new(initial_files_service.clone())?); - let (plugins, plugins_observations) = self.create_plugins(&golem_config); - - let component_service = component::configured( - &golem_config.component_service, - &golem_config.component_cache, - &golem_config.compiled_component_service, - blob_storage.clone(), - plugins_observations, - ) - .await; - - let golem_config = Arc::new(golem_config.clone()); - let promise_service: Arc = - Arc::new(DefaultPromiseService::new(key_value_storage.clone())); - let shard_service = Arc::new(ShardServiceDefault::new()); - let lazy_worker_activator = Arc::new(LazyWorkerActivator::new()); - - let mut oplog_archives: Vec> = Vec::new(); - for idx in 1..golem_config.oplog.indexed_storage_layers { - let svc: Arc = Arc::new( - CompressedOplogArchiveService::new(indexed_storage.clone(), idx), + .map_err(|err| anyhow!(err))?, + ) + } + IndexedStorageConfig::InMemory => { + info!("Using in-memory indexed storage"); + Arc::new(storage::indexed::memory::InMemoryIndexedStorage::new()) + } + }; + let blob_storage: Arc = match &golem_config.blob_storage { + BlobStorageConfig::S3(config) => { + info!("Using S3 for blob storage"); + Arc::new(S3BlobStorage::new(config.clone()).await) + } + BlobStorageConfig::LocalFileSystem(config) => { + info!( + "Using local file system for blob storage at {:?}", + config.root ); - oplog_archives.push(svc); + Arc::new( + golem_service_base::storage::blob::fs::FileSystemBlobStorage::new(&config.root) + .await + .map_err(|err| anyhow!(err))?, + ) } - for idx in 0..golem_config.oplog.blob_storage_layers { - let svc: Arc = - Arc::new(BlobOplogArchiveService::new(blob_storage.clone(), idx)); - oplog_archives.push(svc); + BlobStorageConfig::KVStoreSqlite => { + info!("Using the same Sqlite for blob-storage"); + let sqlite = sqlite + .expect("Sqlite must be configured as key-value storage when using KVStoreSqlite"); + Arc::new( + SqliteBlobStorage::new(sqlite.clone()) + .await + .map_err(|err| anyhow!(err))?, + ) } - let oplog_archives = NEVec::from_vec(oplog_archives); - - let base_oplog_service: Arc = match oplog_archives { - None => Arc::new( + BlobStorageConfig::Sqlite(sqlite) => { + info!("Using Sqlite for blob storage at {}", sqlite.database); + let pool = SqlitePool::configured(sqlite) + .await + .map_err(|err| anyhow!(err))?; + Arc::new( + SqliteBlobStorage::new(pool.clone()) + .await + .map_err(|err| anyhow!(err))?, + ) + } + BlobStorageConfig::InMemory => { + info!("Using in-memory blob storage"); + Arc::new(golem_service_base::storage::blob::memory::InMemoryBlobStorage::new()) + } + }; + + let initial_files_service = Arc::new(InitialComponentFilesService::new(blob_storage.clone())); + + let file_loader = Arc::new(FileLoader::new(initial_files_service.clone())?); + let (plugins, plugins_observations) = bootstrap.create_plugins(&golem_config); + + let component_service = component::configured( + &golem_config.component_service, + &golem_config.component_cache, + &golem_config.compiled_component_service, + blob_storage.clone(), + plugins_observations, + ) + .await; + + let golem_config = Arc::new(golem_config.clone()); + let promise_service: Arc = + Arc::new(DefaultPromiseService::new(key_value_storage.clone())); + let shard_service = Arc::new(ShardServiceDefault::new()); + + let mut oplog_archives: Vec> = Vec::new(); + for idx in 1..golem_config.oplog.indexed_storage_layers { + let svc: Arc = Arc::new( + CompressedOplogArchiveService::new(indexed_storage.clone(), idx), + ); + oplog_archives.push(svc); + } + for idx in 0..golem_config.oplog.blob_storage_layers { + let svc: Arc = + Arc::new(BlobOplogArchiveService::new(blob_storage.clone(), idx)); + oplog_archives.push(svc); + } + let oplog_archives = NEVec::from_vec(oplog_archives); + + let base_oplog_service: Arc = match oplog_archives { + None => Arc::new( + PrimaryOplogService::new( + indexed_storage.clone(), + blob_storage.clone(), + golem_config.oplog.max_operations_before_commit, + golem_config.oplog.max_payload_size, + ) + .await, + ), + Some(oplog_archives) => { + let primary = Arc::new( PrimaryOplogService::new( indexed_storage.clone(), blob_storage.clone(), @@ -371,165 +449,119 @@ pub trait Bootstrap { golem_config.oplog.max_payload_size, ) .await, - ), - Some(oplog_archives) => { - let primary = Arc::new( - PrimaryOplogService::new( - indexed_storage.clone(), - blob_storage.clone(), - golem_config.oplog.max_operations_before_commit, - golem_config.oplog.max_payload_size, - ) - .await, - ); - - Arc::new(MultiLayerOplogService::new( - primary, - oplog_archives, - golem_config.oplog.entry_count_limit, - golem_config.oplog.max_operations_before_commit_ephemeral, - )) - } - }; - - let active_workers = self.create_active_workers(&golem_config); - - let running_worker_enumeration_service = Arc::new( - RunningWorkerEnumerationServiceDefault::new(active_workers.clone()), - ); - - let shard_manager_service = shard_manager::configured(&golem_config.shard_manager_service); + ); - let config = self.create_wasmtime_config(); - let engine = Arc::new(Engine::new(&config)?); - let linker = self.create_wasmtime_linker(&engine)?; + Arc::new(MultiLayerOplogService::new( + primary, + oplog_archives, + golem_config.oplog.entry_count_limit, + golem_config.oplog.max_operations_before_commit_ephemeral, + )) + } + }; - let mut epoch_interval = tokio::time::interval(golem_config.limits.epoch_interval); - let engine_ref: Arc = engine.clone(); - join_set.spawn( - async move { - loop { - epoch_interval.tick().await; - engine_ref.increment_epoch(); - } - } - .in_current_span(), - ); + let active_workers = bootstrap.create_active_workers(&golem_config); - let linker = Arc::new(linker); + let running_worker_enumeration_service = Arc::new(RunningWorkerEnumerationServiceDefault::new( + active_workers.clone(), + )); - let key_value_service = Arc::new(DefaultKeyValueService::new(key_value_storage.clone())); + let shard_manager_service = shard_manager::configured(&golem_config.shard_manager_service); - let blob_store_service = Arc::new(DefaultBlobStoreService::new(blob_storage.clone())); + let config = bootstrap.create_wasmtime_config(); + let engine = Arc::new(Engine::new(&config)?); + let linker = bootstrap.create_wasmtime_linker(&engine)?; - let worker_proxy: Arc = Arc::new(RemoteWorkerProxy::new( - golem_config.public_worker_api.uri(), - golem_config - .public_worker_api - .access_token - .parse::() - .expect("Access token must be an UUID"), - )); + let mut epoch_interval = tokio::time::interval(golem_config.limits.epoch_interval); + let engine_ref: Arc = engine.clone(); + join_set.spawn( + async move { + loop { + epoch_interval.tick().await; + engine_ref.increment_epoch(); + } + } + .in_current_span(), + ); - let events = Arc::new(Events::new( - golem_config.limits.invocation_result_broadcast_capacity, - )); + let linker = Arc::new(linker); + + let key_value_service = Arc::new(DefaultKeyValueService::new(key_value_storage.clone())); + + let blob_store_service = Arc::new(DefaultBlobStoreService::new(blob_storage.clone())); + + let worker_proxy: Arc = Arc::new(RemoteWorkerProxy::new( + golem_config.public_worker_api.uri(), + golem_config + .public_worker_api + .access_token + .parse::() + .expect("Access token must be an UUID"), + )); + + let events = Arc::new(Events::new( + golem_config.limits.invocation_result_broadcast_capacity, + )); + + let oplog_processor_plugin = Arc::new(PerExecutorOplogProcessorPlugin::new( + component_service.clone(), + shard_service.clone(), + lazy_worker_activator.clone(), + plugins.clone(), + )); + + let oplog_service: Arc = Arc::new(ForwardingOplogService::new( + base_oplog_service, + oplog_processor_plugin.clone(), + component_service.clone(), + plugins.clone(), + )); + + let worker_service = Arc::new(DefaultWorkerService::new( + key_value_storage.clone(), + shard_service.clone(), + oplog_service.clone(), + )); + let worker_enumeration_service = Arc::new(DefaultWorkerEnumerationService::new( + worker_service.clone(), + oplog_service.clone(), + golem_config.clone(), + )); + + let scheduler_service = SchedulerServiceDefault::new( + key_value_storage.clone(), + shard_service.clone(), + promise_service.clone(), + Arc::new(lazy_worker_activator.clone() as Arc + Send + Sync>), + oplog_service.clone(), + worker_service.clone(), + golem_config.scheduler.refresh_interval, + ); - let oplog_processor_plugin = Arc::new(PerExecutorOplogProcessorPlugin::new( - component_service.clone(), - shard_service.clone(), - lazy_worker_activator.clone(), - plugins.clone(), - )); - - let oplog_service: Arc = - Arc::new(ForwardingOplogService::new( - base_oplog_service, - oplog_processor_plugin.clone(), - component_service.clone(), - plugins.clone(), - )); - - let worker_service = Arc::new(DefaultWorkerService::new( - key_value_storage.clone(), - shard_service.clone(), - oplog_service.clone(), - )); - let worker_enumeration_service = Arc::new(DefaultWorkerEnumerationService::new( - worker_service.clone(), - oplog_service.clone(), + bootstrap + .create_services( + active_workers, + engine, + linker, + runtime.clone(), + component_service, + shard_manager_service, + worker_service, + worker_enumeration_service, + running_worker_enumeration_service, + promise_service, golem_config.clone(), - )); - - let scheduler_service = SchedulerServiceDefault::new( - key_value_storage.clone(), - shard_service.clone(), - promise_service.clone(), - Arc::new(lazy_worker_activator.clone() as Arc + Send + Sync>), - oplog_service.clone(), - worker_service.clone(), - golem_config.scheduler.refresh_interval, - ); - - let services = self - .create_services( - active_workers, - engine, - linker, - runtime.clone(), - component_service, - shard_manager_service, - worker_service, - worker_enumeration_service, - running_worker_enumeration_service, - promise_service, - golem_config.clone(), - shard_service, - key_value_service, - blob_store_service, - lazy_worker_activator.clone(), - oplog_service, - scheduler_service, - worker_proxy, - events, - file_loader, - plugins, - oplog_processor_plugin, - ) - .await?; - - let addr = golem_config.grpc_addr()?; - - let listener = TcpListener::bind(addr).await?; - let grpc_port = listener.local_addr()?.port(); - - let worker_executor = - WorkerExecutorImpl::>::new(services, lazy_worker_activator, grpc_port) - .await?; - - let service = WorkerExecutorServer::new(worker_executor) - .accept_compressed(CompressionEncoding::Gzip) - .send_compressed(CompressionEncoding::Gzip); - - info!("Starting gRPC server on port {grpc_port}"); - - join_set.spawn( - async move { - Server::builder() - .max_concurrent_streams(Some(golem_config.limits.max_concurrent_streams)) - .add_service(reflection_service) - .add_service(service) - .add_service(health_service) - .serve_with_incoming(TcpListenerStream::new(listener)) - .await - .map_err(|err| anyhow!(err)) - } - .in_current_span(), - ); - - Ok(RunDetails { - http_port, - grpc_port, - }) - } + shard_service, + key_value_service, + blob_store_service, + lazy_worker_activator.clone(), + oplog_service, + scheduler_service, + worker_proxy, + events, + file_loader, + plugins, + oplog_processor_plugin, + ) + .await } diff --git a/golem-worker-executor-base/src/services/golem_config.rs b/golem-worker-executor-base/src/services/golem_config.rs index 44899e75c..66787adab 100644 --- a/golem-worker-executor-base/src/services/golem_config.rs +++ b/golem-worker-executor-base/src/services/golem_config.rs @@ -138,6 +138,7 @@ pub struct CompiledComponentServiceDisabledConfig {} pub enum ShardManagerServiceConfig { Grpc(ShardManagerServiceGrpcConfig), SingleShard, + Disabled, } #[derive(Clone, Debug, Serialize, Deserialize)] diff --git a/golem-worker-executor-base/src/services/shard_manager.rs b/golem-worker-executor-base/src/services/shard_manager.rs index b8a38f75e..ce06256a2 100644 --- a/golem-worker-executor-base/src/services/shard_manager.rs +++ b/golem-worker-executor-base/src/services/shard_manager.rs @@ -41,6 +41,7 @@ pub fn configured( Arc::new(ShardManagerServiceGrpc::new(config.clone())) } ShardManagerServiceConfig::SingleShard => Arc::new(ShardManagerServiceSingleShard::new()), + ShardManagerServiceConfig::Disabled => Arc::new(ShardManagerServiceDisabled {}), } } @@ -149,3 +150,12 @@ impl ShardManagerService for ShardManagerServiceSingleShard { )) } } + +pub struct ShardManagerServiceDisabled {} + +#[async_trait] +impl ShardManagerService for ShardManagerServiceDisabled { + async fn register(&self, _host: String, _port: u16) -> Result { + Ok(ShardAssignment::new(0, HashSet::new())) + } +}