Skip to content

Commit

Permalink
fix: add docker networking support
Browse files Browse the repository at this point in the history
  • Loading branch information
lffg committed Jul 9, 2024
1 parent 749b261 commit 4cff9f3
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 13 deletions.
16 changes: 16 additions & 0 deletions tests/perf-analysis/README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,27 @@
# Pre-steps

Note that if you already tested and changed some source code, you'll have to
build without the cache. E.g.

```bash
docker compose -f ./tests/perf-analysis/<...>/docker-compose.yml build --no-cache
```

Build the image

```bash
docker image build -t lffg/number-fact:latest ./tests/containers/number-fact
```

# Setup network

The compose file uses an externally-managed Docker network named
`tucano-cluster-net`. Make sure it is created before starting the environments.

```bash
docker network create tucano-cluster-net
```

# Single node

Start:
Expand Down
2 changes: 1 addition & 1 deletion tests/perf-analysis/single-node/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ services:

networks:
tucano-cluster-net:
attachable: true
external: true
3 changes: 2 additions & 1 deletion tests/perf-analysis/tuc-multi-node-4x/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ services:
entrypoint:
- /usr/local/bin/worker
- -c=ctl
- --use-docker-network=tucano-cluster-net
networks:
- tucano-cluster-net
volumes:
Expand All @@ -48,4 +49,4 @@ services:

networks:
tucano-cluster-net:
attachable: true
external: true
3 changes: 2 additions & 1 deletion tests/perf-analysis/tuc-multi-node/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ services:
entrypoint:
- /usr/local/bin/worker
- -c=ctl
- --use-docker-network=tucano-cluster-net
networks:
- tucano-cluster-net
volumes:
Expand All @@ -48,4 +49,4 @@ services:

networks:
tucano-cluster-net:
attachable: true
external: true
9 changes: 9 additions & 0 deletions worker/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@ pub struct WorkerArgs {
#[arg(short, long)]
pub ctl_addr: String,

/// Whether the worker should run in a Docker-networking aware context.
///
/// If set, must specify the name of the Docker network.
///
/// In general, this option is desirable when executing all Tucano nodes
/// in a single host via Docker containers.
#[arg(long)]
pub use_docker_network: Option<String>,

/// Interval at which metrics are pushed to the controller.
///
/// Notice that this interval MUST be smaller than the value configured for
Expand Down
5 changes: 3 additions & 2 deletions worker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,16 @@ async fn main() -> Result<()> {

let mut bag = JoinSet::new();

let (proxy_state, proxy_handle) = ProxyState::new();
let (proxy_state, proxy_handle) = ProxyState::new(&args);
bag.spawn(async move {
let app = proxy::proxy.with_state(proxy_state);
info!("worker proxy listening at {ANY_IP}:{WORKER_PROXY_PORT}");
axum::serve(proxy_listener, app).await.unwrap();
});

let docker = Arc::new(Docker::connect_with_defaults().unwrap());
let (runner, runner_handle) = Runner::new(docker, ctl_client.clone(), proxy_handle);
let (runner, runner_handle) =
Runner::new(args.clone(), docker, ctl_client.clone(), proxy_handle);
bag.spawn(async move {
runner.run().await;
});
Expand Down
23 changes: 21 additions & 2 deletions worker/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use reqwest::StatusCode;
use tracing::{instrument, trace};
use utils::http::{self, OptionExt as _, ResultExt as _};

use crate::args::WorkerArgs;

#[instrument(skip_all)]
pub async fn proxy(
State(proxy): State<ProxyState>,
Expand All @@ -40,10 +42,15 @@ pub async fn proxy(
.ok_or_else(|| eyre::eyre!("requested instance doesn't exist at requested worker"))
.http_error(StatusCode::BAD_GATEWAY, "bad gateway")?;

let host_name = match proxy.mode {
ProxyMode::Normal => format!("127.0.0.1:{port}"),
ProxyMode::DockerNetwork => format!("instance-{instance_id}:{port}"),
};

*req.uri_mut() = {
let uri = req.uri();
let mut parts = uri.clone().into_parts();
parts.authority = Authority::from_str(&format!("127.0.0.1:{port}")).ok();
parts.authority = Authority::from_str(&host_name).ok();
parts.scheme = Some(Scheme::HTTP);
Uri::from_parts(parts).unwrap()
};
Expand All @@ -59,11 +66,22 @@ pub async fn proxy(
pub struct ProxyState {
pub ports: Arc<RwLock<HashMap<InstanceId, u16>>>,
pub client: Client<HttpConnector, Body>,
pub mode: ProxyMode,
}

#[derive(Copy, Clone)]
pub enum ProxyMode {
Normal,
DockerNetwork,
}

impl ProxyState {
#[must_use]
pub fn new() -> (Self, ProxyHandle) {
pub fn new(worker_args: &WorkerArgs) -> (Self, ProxyHandle) {
let mode = match &worker_args.use_docker_network {
Some(_) => ProxyMode::DockerNetwork,
None => ProxyMode::Normal,
};
let ports = Arc::new(RwLock::new(HashMap::default()));
let state = ProxyState {
ports: ports.clone(),
Expand All @@ -73,6 +91,7 @@ impl ProxyState {
connector.set_nodelay(true);
Client::builder(TokioExecutor::new()).build::<_, Body>(connector)
},
mode,
};
let handle = ProxyHandle { ports };
(state, handle)
Expand Down
15 changes: 11 additions & 4 deletions worker/src/runner/container_rt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use proto::{
use tracing::{error, instrument, trace};

use super::RunnerHandle;
use crate::args::WorkerArgs;

#[derive(Clone)]
pub struct ContainerRuntime {
Expand All @@ -31,6 +32,7 @@ impl ContainerRuntime {
#[instrument(skip_all, fields(instance_id = ?spec.instance_id))]
pub async fn run_instance_lifecycle(
&self,
args: Arc<WorkerArgs>,
spec: InstanceSpec,
port: u16,
handle: RunnerHandle,
Expand All @@ -39,7 +41,7 @@ impl ContainerRuntime {
trace!(?spec, container_name, "running instance lifecycle");

if let Err(error) = self
.create_and_run(&spec, port, container_name.clone())
.create_and_run(&args, &spec, port, container_name.clone())
.await
{
error!(?error, "failed to create/run container");
Expand Down Expand Up @@ -97,11 +99,14 @@ impl ContainerRuntime {

async fn create_and_run(
&self,
args: &WorkerArgs,
spec: &InstanceSpec,
port: u16,
name: String,
) -> eyre::Result<()> {
let create_response = self.create_container(spec, port, name.clone()).await?;
let create_response = self
.create_container(args, spec, port, name.clone())
.await?;
trace!("successfully `create` operation");

self.run_container(create_response).await?;
Expand All @@ -120,11 +125,12 @@ impl ContainerRuntime {

async fn create_container(
&self,
args: &WorkerArgs,
spec: &InstanceSpec,
port: u16,
name: String,
) -> eyre::Result<ContainerCreateResponse> {
let config = Self::create_container_config(spec.clone(), port);
let config = Self::create_container_config(args, spec.clone(), port);

let options = Some(CreateContainerOptions {
name,
Expand Down Expand Up @@ -178,7 +184,7 @@ impl ContainerRuntime {
Ok(())
}

fn create_container_config(spec: InstanceSpec, port: u16) -> Config<String> {
fn create_container_config(args: &WorkerArgs, spec: InstanceSpec, port: u16) -> Config<String> {
const HOST: &str = "0.0.0.0";

Config {
Expand All @@ -191,6 +197,7 @@ impl ContainerRuntime {
env: Some(vec![format!("PORT={port}"), format!("HOST={HOST}")]),
host_config: Some(HostConfig {
auto_remove: Some(true),
network_mode: args.use_docker_network.clone(),
// FIXME: These aren't working right now.
//
// cpu_shares: Some(spec.resource_config.cpu_shares),
Expand Down
8 changes: 6 additions & 2 deletions worker/src/runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,23 @@ use tokio::{
use tracing::{error, trace};

mod container_rt;
use crate::proxy::ProxyHandle;
use crate::{args::WorkerArgs, proxy::ProxyHandle};

pub struct Runner {
rx: mpsc::Receiver<Msg>,
instances: HashMap<InstanceId, u16>,
ports: HashSet<u16>,
handle: RunnerHandle,
proxy_handle: ProxyHandle,
worker_args: Arc<WorkerArgs>,
container_runtime: Arc<ContainerRuntime>,
ctl_client: CtlClient,
}

impl Runner {
#[must_use]
pub fn new(
worker_args: Arc<WorkerArgs>,
docker: Arc<Docker>,
ctl_client: CtlClient,
proxy: ProxyHandle,
Expand All @@ -45,6 +47,7 @@ impl Runner {
ports: HashSet::default(),
handle: handle.clone(),
proxy_handle: proxy,
worker_args,
container_runtime: Arc::new(ContainerRuntime::new(docker)),
ctl_client,
};
Expand Down Expand Up @@ -78,9 +81,10 @@ impl Runner {
self.add_instance(spec.instance_id, port);

let rt = self.container_runtime.clone();
let args = self.worker_args.clone();
let handle = self.handle.clone();
tokio::spawn(async move {
rt.run_instance_lifecycle(spec, port, handle).await;
rt.run_instance_lifecycle(args, spec, port, handle).await;
});
Ok(())
}
Expand Down

0 comments on commit 4cff9f3

Please sign in to comment.