diff --git a/worker/src/broker/adapter.rs b/worker/src/broker/adapter.rs new file mode 100644 index 0000000..8ead693 --- /dev/null +++ b/worker/src/broker/adapter.rs @@ -0,0 +1,57 @@ +use std::net::SocketAddr; +use std::sync::Arc; + +use tokio_core::reactor::Core; +// use futures::{future, Stream}; +use futures::Stream; + +use super::*; +use operations::OperationHandler; + +use cerberus_proto::datatypes as dpb; +use cerberus_proto::worker as wpb; + +/// BrokerAdapter is used to make the new broker conform with the requirements of the ol worker +/// code. +pub struct BrokerAdapter; + +impl BrokerAdapter { + pub fn run(broker_addr: SocketAddr, operation_handler: Arc) { + let broker = Broker::connect(broker_addr, Core::new().unwrap().handle()).unwrap(); + + broker.handle.map_err(|_| unimplemented!("receiver should not error")) + .for_each(move |task| { + match convert_to_old_task(task) { + OldTask::Map(old_task) => { + operation_handler.perform_map(old_task) + } + OldTask::Reduce(old_task) => { + operation_handler.perform_reduce(old_task) + } + } + }); + } +} + +enum OldTask { + Map(wpb::PerformMapRequest), + Reduce(wpb::PerformReduceRequest), +} + +fn convert_to_old_task(new_task: dpb::Task) -> OldTask { + match new_task.field_type { + dpb::TaskType::MAP => { + let mut task = wpb::PerformMapRequest::new(); + task.mapper_file_path = new_task.payload_path; + if !new_task.input_files.is_empty(){ + task.input_file_path = new_task.input_files.first().unwrap().path; + } + OldTask::Map(task) + } + dpb::TaskType::REDUCE => { + let mut task = wpb::PerformReduceRequest::new(); + task.reducer_file_path = new_task.payload_path; + OldTask::Reduce(task) + } + } +} diff --git a/worker/src/broker/mod.rs b/worker/src/broker/mod.rs index b9d1e76..33a83bc 100644 --- a/worker/src/broker/mod.rs +++ b/worker/src/broker/mod.rs @@ -1,6 +1,8 @@ +pub mod adapter; pub mod amqp; pub use self::amqp::Amqp; +pub use self::adapter::BrokerAdapter; use std::fmt; use std::fmt::Display; diff --git a/worker/src/main.rs b/worker/src/main.rs index aca6522..49dda1f 100644 --- a/worker/src/main.rs +++ b/worker/src/main.rs @@ -54,6 +54,7 @@ mod parser; mod worker_interface; use errors::*; +use broker::BrokerAdapter; use master_interface::MasterInterface; use operations::OperationHandler; use server::{IntermediateDataService, ScheduleOperationService, Server}; @@ -66,6 +67,7 @@ const WORKER_REGISTRATION_RETRY_WAIT_DURATION_MS: u64 = 1000; // Setting the port to 0 means a random available port will be selected const DEFAULT_PORT: &str = "0"; const DEFAULT_MASTER_ADDR: &str = "[::]:8081"; +const DEFAULT_BROKER_ADDR: &str = "rabbitmq:5672"; // Assumes Docker location via link fn register_worker(master_interface: &MasterInterface, address: &SocketAddr) -> Result<()> { let mut retries = WORKER_REGISTRATION_RETRIES; @@ -99,10 +101,20 @@ fn run() -> Result<()> { ).chain_err(|| "Error parsing master address")?; let port = u16::from_str(matches.value_of("port").unwrap_or(DEFAULT_PORT)) .chain_err(|| "Error parsing port")?; + let broker_addr = SocketAddr::from_str( + matches.value_of("broker").unwrap_or(DEFAULT_BROKER_ADDR), + ).chain_err(|| "Error parsing broker address")?; let master_interface = Arc::new(MasterInterface::new(master_addr).chain_err(|| "Error creating master interface.")?); let operation_handler = Arc::new(OperationHandler::new(Arc::clone(&master_interface))); + // TODO: Consider removing this code once the entire worker is modified to better fit the new + // architecture. + thread::spawn(move || { + BrokerAdapter::run(broker_addr, Arc::clone(&operation_handler)); + }); + + let scheduler_service = ScheduleOperationService::new(Arc::clone(&operation_handler)); let interm_data_service = IntermediateDataService; let srv = Server::new(port, scheduler_service, interm_data_service) diff --git a/worker/src/parser.rs b/worker/src/parser.rs index ad03ed0..44ce664 100644 --- a/worker/src/parser.rs +++ b/worker/src/parser.rs @@ -21,5 +21,13 @@ pub fn parse_command_line<'a>() -> ArgMatches<'a> { .takes_value(true) .required(false), ) + .arg( + Arg::with_name("broker") + .long("broker") + .short("b") + .help("Address of the broker") + .takes_value(true) + .required(false) + ) .get_matches() }