Skip to content

Commit

Permalink
add argument/env config setup
Browse files Browse the repository at this point in the history
  • Loading branch information
iduartgomez committed Nov 27, 2019
1 parent 3656a31 commit 5ea1f8f
Show file tree
Hide file tree
Showing 21 changed files with 266 additions and 197 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ env:
global:
- export PATH="$PATH:$HOME/bin"
- export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$HOME/lib"
- export SPARK_LOCAL_IP=0.0.0.0
- export NS_LOCAL_IP=0.0.0.0
addons:
apt:
packages:
Expand Down
27 changes: 27 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ downcast-rs = "1.1.1"
objekt = "0.1.2"
serde_traitobject = "0.2"

[dependencies.clap]
version = "2.33"
default-features = false

[build-dependencies]
capnpc = "0.10.1"

Expand Down
8 changes: 4 additions & 4 deletions docker/testing_cluster.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@ count=0
for WORKER in $(docker-compose ps | grep -oE "docker_ns_worker_[0-9]+")
do
echo "Setting $WORKER";
docker exec -e CONF_FILE="$CONF_FILE" -e SPARK_LOCAL_IP="${WORKER_IPS[count]}" -w /home/ns_user/ $WORKER \
docker exec -e CONF_FILE="$CONF_FILE" -e NS_LOCAL_IP="${WORKER_IPS[count]}" -w /home/ns_user/ $WORKER \
bash -c 'echo "$CONF_FILE" >> hosts.conf && \
echo "SPARK_LOCAL_IP=$SPARK_LOCAL_IP" >> .ssh/environment && \
echo "NS_LOCAL_IP=$NS_LOCAL_IP" >> .ssh/environment && \
echo "PermitUserEnvironment yes" >> /etc/ssh/sshd_config && \
service ssh start';
(( count++ ));
done

docker exec -e CONF_FILE="$CONF_FILE" -e SPARK_LOCAL_IP="${MASTER_IP}" -w /root/ docker_ns_master_1 \
bash -c 'echo "$CONF_FILE" >> hosts.conf && echo "export SPARK_LOCAL_IP=$SPARK_LOCAL_IP" >> .bashrc'
docker exec -e CONF_FILE="$CONF_FILE" -e NS_LOCAL_IP="${MASTER_IP}" -w /root/ docker_ns_master_1 \
bash -c 'echo "$CONF_FILE" >> hosts.conf && echo "export NS_LOCAL_IP=$NS_LOCAL_IP" >> .bashrc'
for WORKER_IP in ${WORKER_IPS[@]}
do
docker exec docker_ns_master_1 bash -c "ssh-keyscan ${WORKER_IP} >> ~/.ssh/known_hosts"
Expand Down
11 changes: 2 additions & 9 deletions examples/file_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,12 @@ use native_spark::*;
#[macro_use]
extern crate serde_closure;
use chrono::prelude::*;

use std::fs;
use std::io::{BufRead, BufReader};

fn get_mode() -> String {
let args = std::env::args().skip(1).collect::<Vec<_>>();
match args.get(0) {
Some(val) if val == "distributed" => val.to_owned(),
_ => "local".to_owned(),
}
}

fn main() -> Result<()> {
let sc = Context::new(&get_mode())?;
let sc = Context::new()?;
let files = fs::read_dir("csv_folder")
.unwrap()
.map(|x| x.unwrap().path().to_str().unwrap().to_owned())
Expand Down
10 changes: 1 addition & 9 deletions examples/group_by.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,8 @@
#![allow(where_clauses_object_safety)]
use native_spark::*;

fn get_mode() -> String {
let args = std::env::args().skip(1).collect::<Vec<_>>();
match args.get(0) {
Some(val) if val == "distributed" => val.to_owned(),
_ => "local".to_owned(),
}
}

fn main() -> Result<()> {
let sc = Context::new(&get_mode())?;
let sc = Context::new()?;
let vec = vec![
("x".to_string(), 1),
("x".to_string(), 2),
Expand Down
10 changes: 1 addition & 9 deletions examples/join.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,8 @@
#![allow(where_clauses_object_safety)]
use native_spark::*;

fn get_mode() -> String {
let args = std::env::args().skip(1).collect::<Vec<_>>();
match args.get(0) {
Some(val) if val == "distributed" => val.to_owned(),
_ => "local".to_owned(),
}
}

fn main() -> Result<()> {
let sc = Context::new(&get_mode())?;
let sc = Context::new()?;
let col1 = vec![
(1, ("A".to_string(), "B".to_string())),
(2, ("C".to_string(), "D".to_string())),
Expand Down
10 changes: 1 addition & 9 deletions examples/make_rdd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,8 @@ use native_spark::*;
#[macro_use]
extern crate serde_closure;

fn get_mode() -> String {
let args = std::env::args().skip(1).collect::<Vec<_>>();
match args.get(0) {
Some(val) if val == "distributed" => val.to_owned(),
_ => "local".to_owned(),
}
}

fn main() -> Result<()> {
let sc = Context::new(&get_mode())?;
let sc = Context::new()?;
let col = sc.make_rdd((0..10).collect::<Vec<_>>(), 32);
//Fn! will make the closures serializable. It is necessary. use serde_closure version 0.1.3.
let vec_iter = col.map(Fn!(|i| (0..i).collect::<Vec<_>>()));
Expand Down
11 changes: 2 additions & 9 deletions examples/parquet_column_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,13 @@ use native_spark::*;
use parquet::column::reader::get_typed_column_reader;
use parquet::data_type::{ByteArrayType, Int32Type, Int64Type};
use parquet::file::reader::{FileReader, SerializedFileReader};

use std::fs;
use std::fs::File;
use std::path::Path;

fn get_mode() -> String {
let args = std::env::args().skip(1).collect::<Vec<_>>();
match args.get(0) {
Some(val) if val == "distributed" => val.to_owned(),
_ => "local".to_owned(),
}
}

fn main() -> Result<()> {
let sc = Context::new(&get_mode())?;
let sc = Context::new()?;
let files = fs::read_dir("parquet_file_dir")
.unwrap()
.map(|x| x.unwrap().path().to_str().unwrap().to_owned())
Expand Down
1 change: 1 addition & 0 deletions src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ impl BoundedMemoryCache {
map: Arc::new(Mutex::new(HashMap::new())),
}
}

fn new_key_space_id(&self) -> usize {
self.next_key_space_id.fetch_add(1, Ordering::SeqCst)
}
Expand Down
4 changes: 2 additions & 2 deletions src/cache_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl CacheTracker {
};
m.server();
m.client(CacheTrackerMessage::SlaveCacheStarted {
host: *env::local_ip,
host: env::config.local_ip,
size: m.cache.get_capacity(),
});
m
Expand Down Expand Up @@ -354,7 +354,7 @@ impl CacheTracker {
self.client(CacheTrackerMessage::AddedToCache {
rdd_id: rdd.get_rdd_id(),
partition: split.get_index(),
host: *env::local_ip,
host: env::config.local_ip,
size,
});
}
Expand Down
Loading

0 comments on commit 5ea1f8f

Please sign in to comment.