Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] worker: Added broker adapter to work with old code. #387

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions worker/src/broker/adapter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use std::net::SocketAddr;
use std::sync::Arc;

use tokio_core::reactor::Core;
// use futures::{future, Stream};
use futures::Stream;

use super::*;
use operations::OperationHandler;

use cerberus_proto::datatypes as dpb;
use cerberus_proto::worker as wpb;

/// BrokerAdapter is used to make the new broker conform with the requirements of the ol worker
/// code.
pub struct BrokerAdapter;

impl BrokerAdapter {
pub fn run(broker_addr: SocketAddr, operation_handler: Arc<OperationHandler>) {
let broker = Broker::connect(broker_addr, Core::new().unwrap().handle()).unwrap();

broker.handle.map_err(|_| unimplemented!("receiver should not error"))
.for_each(move |task| {
match convert_to_old_task(task) {
OldTask::Map(old_task) => {
operation_handler.perform_map(old_task)
}
OldTask::Reduce(old_task) => {
operation_handler.perform_reduce(old_task)
}
}
});
}
}

enum OldTask {
Map(wpb::PerformMapRequest),
Reduce(wpb::PerformReduceRequest),
}

fn convert_to_old_task(new_task: dpb::Task) -> OldTask {
match new_task.field_type {
dpb::TaskType::MAP => {
let mut task = wpb::PerformMapRequest::new();
task.mapper_file_path = new_task.payload_path;
if !new_task.input_files.is_empty(){
task.input_file_path = new_task.input_files.first().unwrap().path;
}
OldTask::Map(task)
}
dpb::TaskType::REDUCE => {
let mut task = wpb::PerformReduceRequest::new();
task.reducer_file_path = new_task.payload_path;
OldTask::Reduce(task)
}
}
}
2 changes: 2 additions & 0 deletions worker/src/broker/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
pub mod adapter;
pub mod amqp;

pub use self::amqp::Amqp;
pub use self::adapter::BrokerAdapter;

use std::fmt;
use std::fmt::Display;
Expand Down
12 changes: 12 additions & 0 deletions worker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ mod parser;
mod worker_interface;

use errors::*;
use broker::BrokerAdapter;
use master_interface::MasterInterface;
use operations::OperationHandler;
use server::{IntermediateDataService, ScheduleOperationService, Server};
Expand All @@ -66,6 +67,7 @@ const WORKER_REGISTRATION_RETRY_WAIT_DURATION_MS: u64 = 1000;
// Setting the port to 0 means a random available port will be selected
const DEFAULT_PORT: &str = "0";
const DEFAULT_MASTER_ADDR: &str = "[::]:8081";
const DEFAULT_BROKER_ADDR: &str = "rabbitmq:5672"; // Assumes Docker location via link

fn register_worker(master_interface: &MasterInterface, address: &SocketAddr) -> Result<()> {
let mut retries = WORKER_REGISTRATION_RETRIES;
Expand Down Expand Up @@ -99,10 +101,20 @@ fn run() -> Result<()> {
).chain_err(|| "Error parsing master address")?;
let port = u16::from_str(matches.value_of("port").unwrap_or(DEFAULT_PORT))
.chain_err(|| "Error parsing port")?;
let broker_addr = SocketAddr::from_str(
matches.value_of("broker").unwrap_or(DEFAULT_BROKER_ADDR),
).chain_err(|| "Error parsing broker address")?;

let master_interface = Arc::new(MasterInterface::new(master_addr).chain_err(|| "Error creating master interface.")?);
let operation_handler = Arc::new(OperationHandler::new(Arc::clone(&master_interface)));

// TODO: Consider removing this code once the entire worker is modified to better fit the new
// architecture.
thread::spawn(move || {
BrokerAdapter::run(broker_addr, Arc::clone(&operation_handler));
});


let scheduler_service = ScheduleOperationService::new(Arc::clone(&operation_handler));
let interm_data_service = IntermediateDataService;
let srv = Server::new(port, scheduler_service, interm_data_service)
Expand Down
8 changes: 8 additions & 0 deletions worker/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,13 @@ pub fn parse_command_line<'a>() -> ArgMatches<'a> {
.takes_value(true)
.required(false),
)
.arg(
Arg::with_name("broker")
.long("broker")
.short("b")
.help("Address of the broker")
.takes_value(true)
.required(false)
)
.get_matches()
}