Skip to content

Commit

Permalink
Make grpc bit configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
afsalthaj committed Dec 14, 2024
1 parent e2bd089 commit b080aa4
Showing 1 changed file with 70 additions and 77 deletions.
147 changes: 70 additions & 77 deletions golem-worker-executor-base/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,14 +210,14 @@ pub trait Bootstrap<Ctx: WorkerCtx> {
let listener = TcpListener::bind(addr).await?;
let grpc_port = listener.local_addr()?.port();

let worker_executor =
create_worker_executor_grpc_api::<Ctx, Self>(
golem_config.clone(),
self,
runtime.clone(),
join_set,
grpc_port,
).await?;
let worker_executor = create_worker_executor_grpc_api::<Ctx, Self>(
golem_config.clone(),
self,
runtime.clone(),
join_set,
grpc_port,
)
.await?;

let service = WorkerExecutorServer::new(worker_executor)
.accept_compressed(CompressionEncoding::Gzip)
Expand All @@ -236,13 +236,12 @@ pub trait Bootstrap<Ctx: WorkerCtx> {
.await
.map_err(|err| anyhow!(err))
}
.in_current_span(),
.in_current_span(),
);
} else {
info!("gRPC server will not be started");
}


let http_port = golem_service_base::observability::start_health_and_metrics_server(
golem_config.http_addr()?,
prometheus_registry,
Expand Down Expand Up @@ -297,57 +296,56 @@ async fn create_worker_executor_grpc_api<Ctx: WorkerCtx, A: Bootstrap<Ctx> + ?Si
}
};

let indexed_storage: Arc<dyn IndexedStorage + Send + Sync> =
match &golem_config.indexed_storage {
IndexedStorageConfig::KVStoreRedis => {
info!("Using the same Redis for indexed-storage");
let redis = redis.expect(
"Redis must be configured as key-value storage when using KVStoreRedis",
);
Arc::new(RedisIndexedStorage::new(redis.clone()))
}
IndexedStorageConfig::Redis(redis) => {
info!("Using Redis for indexed-storage at {}", redis.url());
let pool = RedisPool::configured(redis).await?;
Arc::new(RedisIndexedStorage::new(pool.clone()))
}
IndexedStorageConfig::KVStoreSqlite => {
info!("Using the same Sqlite for indexed-storage");
let sqlite = sqlite.clone().expect(
"Sqlite must be configured as key-value storage when using KVStoreSqlite",
);
Arc::new(
SqliteIndexedStorage::new(sqlite.clone())
.await
.map_err(|err| anyhow!(err))?,
)
}
IndexedStorageConfig::Sqlite(sqlite) => {
info!("Using Sqlite for indexed storage at {}", sqlite.database);
let pool = SqlitePool::configured(sqlite)
let indexed_storage: Arc<dyn IndexedStorage + Send + Sync> = match &golem_config.indexed_storage
{
IndexedStorageConfig::KVStoreRedis => {
info!("Using the same Redis for indexed-storage");
let redis = redis
.expect("Redis must be configured as key-value storage when using KVStoreRedis");
Arc::new(RedisIndexedStorage::new(redis.clone()))
}
IndexedStorageConfig::Redis(redis) => {
info!("Using Redis for indexed-storage at {}", redis.url());
let pool = RedisPool::configured(redis).await?;
Arc::new(RedisIndexedStorage::new(pool.clone()))
}
IndexedStorageConfig::KVStoreSqlite => {
info!("Using the same Sqlite for indexed-storage");
let sqlite = sqlite
.clone()
.expect("Sqlite must be configured as key-value storage when using KVStoreSqlite");
Arc::new(
SqliteIndexedStorage::new(sqlite.clone())
.await
.map_err(|err| anyhow!(err))?;
Arc::new(
SqliteIndexedStorage::new(pool.clone())
.await
.map_err(|err| anyhow!(err))?,
)
}
IndexedStorageConfig::InMemory => {
info!("Using in-memory indexed storage");
Arc::new(storage::indexed::memory::InMemoryIndexedStorage::new())
}
};
.map_err(|err| anyhow!(err))?,
)
}
IndexedStorageConfig::Sqlite(sqlite) => {
info!("Using Sqlite for indexed storage at {}", sqlite.database);
let pool = SqlitePool::configured(sqlite)
.await
.map_err(|err| anyhow!(err))?;
Arc::new(
SqliteIndexedStorage::new(pool.clone())
.await
.map_err(|err| anyhow!(err))?,
)
}
IndexedStorageConfig::InMemory => {
info!("Using in-memory indexed storage");
Arc::new(storage::indexed::memory::InMemoryIndexedStorage::new())
}
};
let blob_storage: Arc<dyn BlobStorage + Send + Sync> = match &golem_config.blob_storage {
BlobStorageConfig::S3(config) => {
info!("Using S3 for blob storage");
Arc::new(S3BlobStorage::new(config.clone()).await)
}
BlobStorageConfig::LocalFileSystem(config) => {
info!(
"Using local file system for blob storage at {:?}",
config.root
);
"Using local file system for blob storage at {:?}",
config.root
);
Arc::new(
golem_service_base::storage::blob::fs::FileSystemBlobStorage::new(&config.root)
.await
Expand All @@ -356,9 +354,8 @@ async fn create_worker_executor_grpc_api<Ctx: WorkerCtx, A: Bootstrap<Ctx> + ?Si
}
BlobStorageConfig::KVStoreSqlite => {
info!("Using the same Sqlite for blob-storage");
let sqlite = sqlite.expect(
"Sqlite must be configured as key-value storage when using KVStoreSqlite",
);
let sqlite = sqlite
.expect("Sqlite must be configured as key-value storage when using KVStoreSqlite");
Arc::new(
SqliteBlobStorage::new(sqlite.clone())
.await
Expand All @@ -382,12 +379,10 @@ async fn create_worker_executor_grpc_api<Ctx: WorkerCtx, A: Bootstrap<Ctx> + ?Si
}
};

let initial_files_service =
Arc::new(InitialComponentFilesService::new(blob_storage.clone()));
let initial_files_service = Arc::new(InitialComponentFilesService::new(blob_storage.clone()));

let file_loader = Arc::new(FileLoader::new(initial_files_service.clone())?);
let (plugins, plugins_observations) =
bootstrap.create_plugins(&golem_config);
let (plugins, plugins_observations) = bootstrap.create_plugins(&golem_config);

let component_service = component::configured(
&golem_config.component_service,
Expand All @@ -396,7 +391,7 @@ async fn create_worker_executor_grpc_api<Ctx: WorkerCtx, A: Bootstrap<Ctx> + ?Si
blob_storage.clone(),
plugins_observations,
)
.await;
.await;

let golem_config = Arc::new(golem_config.clone());
let promise_service: Arc<dyn PromiseService + Send + Sync> =
Expand Down Expand Up @@ -426,7 +421,7 @@ async fn create_worker_executor_grpc_api<Ctx: WorkerCtx, A: Bootstrap<Ctx> + ?Si
golem_config.oplog.max_operations_before_commit,
golem_config.oplog.max_payload_size,
)
.await,
.await,
),
Some(oplog_archives) => {
let primary = Arc::new(
Expand All @@ -436,7 +431,7 @@ async fn create_worker_executor_grpc_api<Ctx: WorkerCtx, A: Bootstrap<Ctx> + ?Si
golem_config.oplog.max_operations_before_commit,
golem_config.oplog.max_payload_size,
)
.await,
.await,
);

Arc::new(MultiLayerOplogService::new(
Expand All @@ -450,9 +445,9 @@ async fn create_worker_executor_grpc_api<Ctx: WorkerCtx, A: Bootstrap<Ctx> + ?Si

let active_workers = bootstrap.create_active_workers(&golem_config);

let running_worker_enumeration_service = Arc::new(
RunningWorkerEnumerationServiceDefault::new(active_workers.clone()),
);
let running_worker_enumeration_service = Arc::new(RunningWorkerEnumerationServiceDefault::new(
active_workers.clone(),
));

let shard_manager_service = shard_manager::configured(&golem_config.shard_manager_service);

Expand All @@ -469,7 +464,7 @@ async fn create_worker_executor_grpc_api<Ctx: WorkerCtx, A: Bootstrap<Ctx> + ?Si
engine_ref.increment_epoch();
}
}
.in_current_span(),
.in_current_span(),
);

let linker = Arc::new(linker);
Expand Down Expand Up @@ -498,13 +493,12 @@ async fn create_worker_executor_grpc_api<Ctx: WorkerCtx, A: Bootstrap<Ctx> + ?Si
plugins.clone(),
));

let oplog_service: Arc<dyn OplogService + Send + Sync> =
Arc::new(ForwardingOplogService::new(
base_oplog_service,
oplog_processor_plugin.clone(),
component_service.clone(),
plugins.clone(),
));
let oplog_service: Arc<dyn OplogService + Send + Sync> = Arc::new(ForwardingOplogService::new(
base_oplog_service,
oplog_processor_plugin.clone(),
component_service.clone(),
plugins.clone(),
));

let worker_service = Arc::new(DefaultWorkerService::new(
key_value_storage.clone(),
Expand Down Expand Up @@ -554,6 +548,5 @@ async fn create_worker_executor_grpc_api<Ctx: WorkerCtx, A: Bootstrap<Ctx> + ?Si
)
.await?;

WorkerExecutorImpl::<Ctx, All<Ctx>>::new(services, lazy_worker_activator, grpc_port)
.await
}
WorkerExecutorImpl::<Ctx, All<Ctx>>::new(services, lazy_worker_activator, grpc_port).await
}

0 comments on commit b080aa4

Please sign in to comment.