diff --git a/golem-worker-executor-base/src/lib.rs b/golem-worker-executor-base/src/lib.rs index 269d8d8ea..c3a808b5a 100644 --- a/golem-worker-executor-base/src/lib.rs +++ b/golem-worker-executor-base/src/lib.rs @@ -210,14 +210,14 @@ pub trait Bootstrap { let listener = TcpListener::bind(addr).await?; let grpc_port = listener.local_addr()?.port(); - let worker_executor = - create_worker_executor_grpc_api::( - golem_config.clone(), - self, - runtime.clone(), - join_set, - grpc_port, - ).await?; + let worker_executor = create_worker_executor_grpc_api::( + golem_config.clone(), + self, + runtime.clone(), + join_set, + grpc_port, + ) + .await?; let service = WorkerExecutorServer::new(worker_executor) .accept_compressed(CompressionEncoding::Gzip) @@ -236,13 +236,12 @@ pub trait Bootstrap { .await .map_err(|err| anyhow!(err)) } - .in_current_span(), + .in_current_span(), ); } else { info!("gRPC server will not be started"); } - let http_port = golem_service_base::observability::start_health_and_metrics_server( golem_config.http_addr()?, prometheus_registry, @@ -297,47 +296,46 @@ async fn create_worker_executor_grpc_api + ?Si } }; - let indexed_storage: Arc = - match &golem_config.indexed_storage { - IndexedStorageConfig::KVStoreRedis => { - info!("Using the same Redis for indexed-storage"); - let redis = redis.expect( - "Redis must be configured as key-value storage when using KVStoreRedis", - ); - Arc::new(RedisIndexedStorage::new(redis.clone())) - } - IndexedStorageConfig::Redis(redis) => { - info!("Using Redis for indexed-storage at {}", redis.url()); - let pool = RedisPool::configured(redis).await?; - Arc::new(RedisIndexedStorage::new(pool.clone())) - } - IndexedStorageConfig::KVStoreSqlite => { - info!("Using the same Sqlite for indexed-storage"); - let sqlite = sqlite.clone().expect( - "Sqlite must be configured as key-value storage when using KVStoreSqlite", - ); - Arc::new( - SqliteIndexedStorage::new(sqlite.clone()) - .await - .map_err(|err| anyhow!(err))?, - ) - } - IndexedStorageConfig::Sqlite(sqlite) => { - info!("Using Sqlite for indexed storage at {}", sqlite.database); - let pool = SqlitePool::configured(sqlite) + let indexed_storage: Arc = match &golem_config.indexed_storage + { + IndexedStorageConfig::KVStoreRedis => { + info!("Using the same Redis for indexed-storage"); + let redis = redis + .expect("Redis must be configured as key-value storage when using KVStoreRedis"); + Arc::new(RedisIndexedStorage::new(redis.clone())) + } + IndexedStorageConfig::Redis(redis) => { + info!("Using Redis for indexed-storage at {}", redis.url()); + let pool = RedisPool::configured(redis).await?; + Arc::new(RedisIndexedStorage::new(pool.clone())) + } + IndexedStorageConfig::KVStoreSqlite => { + info!("Using the same Sqlite for indexed-storage"); + let sqlite = sqlite + .clone() + .expect("Sqlite must be configured as key-value storage when using KVStoreSqlite"); + Arc::new( + SqliteIndexedStorage::new(sqlite.clone()) .await - .map_err(|err| anyhow!(err))?; - Arc::new( - SqliteIndexedStorage::new(pool.clone()) - .await - .map_err(|err| anyhow!(err))?, - ) - } - IndexedStorageConfig::InMemory => { - info!("Using in-memory indexed storage"); - Arc::new(storage::indexed::memory::InMemoryIndexedStorage::new()) - } - }; + .map_err(|err| anyhow!(err))?, + ) + } + IndexedStorageConfig::Sqlite(sqlite) => { + info!("Using Sqlite for indexed storage at {}", sqlite.database); + let pool = SqlitePool::configured(sqlite) + .await + .map_err(|err| anyhow!(err))?; + Arc::new( + SqliteIndexedStorage::new(pool.clone()) + .await + .map_err(|err| anyhow!(err))?, + ) + } + IndexedStorageConfig::InMemory => { + info!("Using in-memory indexed storage"); + Arc::new(storage::indexed::memory::InMemoryIndexedStorage::new()) + } + }; let blob_storage: Arc = match &golem_config.blob_storage { BlobStorageConfig::S3(config) => { info!("Using S3 for blob storage"); @@ -345,9 +343,9 @@ async fn create_worker_executor_grpc_api + ?Si } BlobStorageConfig::LocalFileSystem(config) => { info!( - "Using local file system for blob storage at {:?}", - config.root - ); + "Using local file system for blob storage at {:?}", + config.root + ); Arc::new( golem_service_base::storage::blob::fs::FileSystemBlobStorage::new(&config.root) .await @@ -356,9 +354,8 @@ async fn create_worker_executor_grpc_api + ?Si } BlobStorageConfig::KVStoreSqlite => { info!("Using the same Sqlite for blob-storage"); - let sqlite = sqlite.expect( - "Sqlite must be configured as key-value storage when using KVStoreSqlite", - ); + let sqlite = sqlite + .expect("Sqlite must be configured as key-value storage when using KVStoreSqlite"); Arc::new( SqliteBlobStorage::new(sqlite.clone()) .await @@ -382,12 +379,10 @@ async fn create_worker_executor_grpc_api + ?Si } }; - let initial_files_service = - Arc::new(InitialComponentFilesService::new(blob_storage.clone())); + let initial_files_service = Arc::new(InitialComponentFilesService::new(blob_storage.clone())); let file_loader = Arc::new(FileLoader::new(initial_files_service.clone())?); - let (plugins, plugins_observations) = - bootstrap.create_plugins(&golem_config); + let (plugins, plugins_observations) = bootstrap.create_plugins(&golem_config); let component_service = component::configured( &golem_config.component_service, @@ -396,7 +391,7 @@ async fn create_worker_executor_grpc_api + ?Si blob_storage.clone(), plugins_observations, ) - .await; + .await; let golem_config = Arc::new(golem_config.clone()); let promise_service: Arc = @@ -426,7 +421,7 @@ async fn create_worker_executor_grpc_api + ?Si golem_config.oplog.max_operations_before_commit, golem_config.oplog.max_payload_size, ) - .await, + .await, ), Some(oplog_archives) => { let primary = Arc::new( @@ -436,7 +431,7 @@ async fn create_worker_executor_grpc_api + ?Si golem_config.oplog.max_operations_before_commit, golem_config.oplog.max_payload_size, ) - .await, + .await, ); Arc::new(MultiLayerOplogService::new( @@ -450,9 +445,9 @@ async fn create_worker_executor_grpc_api + ?Si let active_workers = bootstrap.create_active_workers(&golem_config); - let running_worker_enumeration_service = Arc::new( - RunningWorkerEnumerationServiceDefault::new(active_workers.clone()), - ); + let running_worker_enumeration_service = Arc::new(RunningWorkerEnumerationServiceDefault::new( + active_workers.clone(), + )); let shard_manager_service = shard_manager::configured(&golem_config.shard_manager_service); @@ -469,7 +464,7 @@ async fn create_worker_executor_grpc_api + ?Si engine_ref.increment_epoch(); } } - .in_current_span(), + .in_current_span(), ); let linker = Arc::new(linker); @@ -498,13 +493,12 @@ async fn create_worker_executor_grpc_api + ?Si plugins.clone(), )); - let oplog_service: Arc = - Arc::new(ForwardingOplogService::new( - base_oplog_service, - oplog_processor_plugin.clone(), - component_service.clone(), - plugins.clone(), - )); + let oplog_service: Arc = Arc::new(ForwardingOplogService::new( + base_oplog_service, + oplog_processor_plugin.clone(), + component_service.clone(), + plugins.clone(), + )); let worker_service = Arc::new(DefaultWorkerService::new( key_value_storage.clone(), @@ -554,6 +548,5 @@ async fn create_worker_executor_grpc_api + ?Si ) .await?; - WorkerExecutorImpl::>::new(services, lazy_worker_activator, grpc_port) - .await -} \ No newline at end of file + WorkerExecutorImpl::>::new(services, lazy_worker_activator, grpc_port).await +}