Skip to content

Commit

Permalink
Merge branch 'master' of github.com:rajasekarv/native_spark
Browse files Browse the repository at this point in the history
  • Loading branch information
rajasekarv committed Nov 18, 2020
2 parents 080849e + 8533d4b commit 0471eda
Show file tree
Hide file tree
Showing 22 changed files with 430 additions and 46 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ env:
global:
- export PATH="$PATH:$HOME/bin"
- export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$HOME/lib"
- export NS_LOCAL_IP=0.0.0.0
- export VEGA_LOCAL_IP=0.0.0.0
addons:
apt:
packages:
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# vega

Previously known as native_spark
Previously known as `native_spark`.

[![Join the chat at https://gitter.im/fast_spark/community](https://badges.gitter.im/fast_spark/community.svg)](https://gitter.im/fast_spark/community?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
[![Build Status](https://travis-ci.org/rajasekarv/native_spark.svg?branch=master)](https://travis-ci.org/rajasekarv/native_spark)
Expand Down
6 changes: 3 additions & 3 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,16 @@ RUN set -e; \
# Locales
locale-gen en_US.UTF-8; \
# Set SSH user
groupadd ns && useradd -ms /bin/bash -g ns ns_user; \
groupadd ns && useradd -ms /bin/bash -g ns vega_user; \
# Cleanup
#apt-get purge -y --auto-remove $tempPkgs; \
apt-get autoremove -q -y; \
apt-get clean -yq; \
rm -rf /var/lib/apt/lists/*
COPY --from=building /home/release .
COPY --chown=ns_user:ns ./docker/id_rsa.pub /home/ns_user/.ssh/authorized_keys
COPY --chown=vega_user:ns ./docker/id_rsa.pub /home/vega_user/.ssh/authorized_keys
COPY ./docker/id_rsa /root/.ssh/
RUN chmod 600 /root/.ssh/id_rsa /home/ns_user/.ssh/authorized_keys
RUN chmod 600 /root/.ssh/id_rsa /home/vega_user/.ssh/authorized_keys
ENV LANG=en_US.UTF-8 \
LANGUAGE=en_US:en \
LC_ALL=en_US.UTF-8
2 changes: 1 addition & 1 deletion docker/build_image.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ PACKAGE="vega:${VERSION}"
cd $SCRIPT_PATH && cd ..
echo "work dir: $(pwd)"

RUST_VERSION="$(cat ./rust-toolchain | tr -d '[:space:]')"
RUST_VERSION="nightly"
echo "rust version: $RUST_VERSION"

echo "building $PACKAGE..."
Expand Down
8 changes: 4 additions & 4 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ networks:
native-spark:

services:
ns_master:
vega_master:
image: vega:latest
ports:
- "3000"
Expand All @@ -16,11 +16,11 @@ services:
source: "../target"
target: "/home/dev"
environment:
NS_DEPLOYMENT_MODE: distributed
VEGA_DEPLOYMENT_MODE: distributed
depends_on:
- ns_worker
- vega_worker

ns_worker:
vega_worker:
image: vega:latest
ports:
- "10500"
Expand Down
20 changes: 10 additions & 10 deletions docker/testing_cluster.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ SCRIPT_PATH=`dirname $(readlink -f $0)`
cd $SCRIPT_PATH

# Deploy a testing cluster with one master and 2 workers
docker-compose up --scale ns_worker=2 -d
docker-compose up --scale vega_worker=2 -d

# Since we can't resolved domains yet, we have to get each container IP to create the config file
WORKER_IPS=$(docker-compose ps | grep -oE "docker_ns_worker_[0-9]+" \
WORKER_IPS=$(docker-compose ps | grep -oE "docker_vega_worker_[0-9]+" \
| xargs -I{} docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' {})
read -a WORKER_IPS <<< $(echo $WORKER_IPS)
slaves=$(printf ",\"ns_user@%s\"" "${WORKER_IPS[@]}")
slaves=$(printf ",\"vega_user@%s\"" "${WORKER_IPS[@]}")
slaves="slaves = [${slaves:1}]"

MASTER_IP=$(docker-compose ps | grep -oE "docker_ns_master_[0-9]+" \
MASTER_IP=$(docker-compose ps | grep -oE "docker_vega_master_[0-9]+" \
| xargs -I{} docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' {})

CONF_FILE=`cat <<EOF
Expand All @@ -22,26 +22,26 @@ EOF
`

count=0
for WORKER in $(docker-compose ps | grep -oE "docker_ns_worker_[0-9]+")
for WORKER in $(docker-compose ps | grep -oE "docker_vega_worker_[0-9]+")
do
echo "Setting $WORKER";
docker exec -e CONF_FILE="$CONF_FILE" -e NS_LOCAL_IP="${WORKER_IPS[count]}" -w /home/ns_user/ $WORKER \
docker exec -e CONF_FILE="$CONF_FILE" -e VEGA_LOCAL_IP="${WORKER_IPS[count]}" -w /home/vega_user/ $WORKER \
bash -c 'echo "$CONF_FILE" >> hosts.conf && \
echo "NS_LOCAL_IP=$NS_LOCAL_IP" >> .ssh/environment && \
echo "VEGA_LOCAL_IP=$VEGA_LOCAL_IP" >> .ssh/environment && \
echo "PermitUserEnvironment yes" >> /etc/ssh/sshd_config && \
echo "AcceptEnv RUST_BACKTRACE" >> /etc/ssh/sshd_config && \
service ssh start';
(( count++ ));
done

docker exec -e CONF_FILE="$CONF_FILE" -e NS_LOCAL_IP="${MASTER_IP}" -w /root/ docker_ns_master_1 \
docker exec -e CONF_FILE="$CONF_FILE" -e VEGA_LOCAL_IP="${MASTER_IP}" -w /root/ docker_vega_master_1 \
bash -c 'echo "$CONF_FILE" >> hosts.conf && \
echo "export NS_LOCAL_IP=$NS_LOCAL_IP" >> .bashrc &&
echo "export VEGA_LOCAL_IP=$VEGA_LOCAL_IP" >> .bashrc &&
echo "SendEnv RUST_BACKTRACE" >> ~/.ssh/config
';
for WORKER_IP in ${WORKER_IPS[@]}
do
docker exec docker_ns_master_1 bash -c "ssh-keyscan ${WORKER_IP} >> ~/.ssh/known_hosts"
docker exec docker_vega_master_1 bash -c "ssh-keyscan ${WORKER_IP} >> ~/.ssh/known_hosts"
done

# When done is posible to open a shell into the master and run any of the examples in distributed mode
2 changes: 1 addition & 1 deletion examples/parquet_column_read.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
#![allow(where_clauses_object_safety, clippy::single_component_path_imports)]
use chrono::prelude::*;
use itertools::izip;
use vega::*;
use parquet::column::reader::get_typed_column_reader;
use parquet::data_type::{ByteArrayType, Int32Type, Int64Type};
use parquet::file::reader::{FileReader, SerializedFileReader};
use vega::*;

use std::fs::File;
use std::path::PathBuf;
Expand Down
19 changes: 19 additions & 0 deletions examples/subtract.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use std::sync::Arc;
use vega::*;

fn main() -> Result<()> {
let sc = Context::new()?;
let col1 = vec![1, 2, 3, 4, 5, 10, 12, 13, 19, 0];

let col2 = vec![3, 4, 5, 6, 7, 8, 11, 13];

let first = sc.parallelize(col1, 4);
let second = sc.parallelize(col2, 4);
let ans = first.subtract(Arc::new(second));

for elem in ans.collect().iter() {
println!("{:?}", elem);
}

Ok(())
}
4 changes: 2 additions & 2 deletions src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use tokio::runtime::{Handle, Runtime};
/// The key is: {shuffle_id}/{input_id}/{reduce_id}
type ShuffleCache = Arc<DashMap<(usize, usize, usize), Vec<u8>>>;

const ENV_VAR_PREFIX: &str = "NS_";
pub(crate) const THREAD_PREFIX: &str = "_NS";
const ENV_VAR_PREFIX: &str = "VEGA_";
pub(crate) const THREAD_PREFIX: &str = "_VEGA";
static CONF: OnceCell<Configuration> = OnceCell::new();
static ENV: OnceCell<Env> = OnceCell::new();
static ASYNC_RT: Lazy<Option<Runtime>> = Lazy::new(Env::build_async_executor);
Expand Down
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
never_type,
specialization,
unboxed_closures,
unsize
unsize,
binary_heap_into_iter_sorted
)]
#![allow(dead_code, where_clauses_object_safety, deprecated)]
#![allow(clippy::single_component_path_imports)]
Expand Down
3 changes: 1 addition & 2 deletions src/partial/count_evaluator.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use statrs::{distribution::Poisson, statistics::Mean};

use crate::partial::{approximate_evaluator::ApproximateEvaluator, bounded_double::BoundedDouble};
use statrs::{distribution::Poisson, statistics::Mean};

/// An ApproximateEvaluator for counts.
pub(crate) struct CountEvaluator {
Expand Down
61 changes: 61 additions & 0 deletions src/partial/grouped_count_evaluator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use std::collections::HashMap;
use std::hash::Hash;

use crate::partial::{
approximate_evaluator::ApproximateEvaluator, bounded_double::BoundedDouble,
count_evaluator::bound,
};
use crate::serializable_traits::Data;

/// An ApproximateEvaluator for counts by key. Returns a map of key to confidence interval.
pub(crate) struct GroupedCountEvaluator<T>
where
T: Eq + Hash,
{
total_outputs: usize,
confidence: f64,
outputs_merged: usize,
sums: HashMap<T, usize>,
}

impl<T: Eq + Hash> GroupedCountEvaluator<T> {
pub fn new(total_outputs: usize, confidence: f64) -> Self {
GroupedCountEvaluator {
total_outputs,
confidence,
outputs_merged: 0,
sums: HashMap::new(),
}
}
}

impl<T: Data + Eq + Hash> ApproximateEvaluator<HashMap<T, usize>, HashMap<T, BoundedDouble>>
for GroupedCountEvaluator<T>
{
fn merge(&mut self, _output_id: usize, task_result: &HashMap<T, usize>) {
self.outputs_merged += 1;
task_result.iter().for_each(|(k, v)| {
*self.sums.entry(k.clone()).or_insert(0) += v;
});
}

fn current_result(&self) -> HashMap<T, BoundedDouble> {
if self.outputs_merged == 0 {
HashMap::new()
} else if self.outputs_merged == self.total_outputs {
self.sums
.iter()
.map(|(k, sum)| {
let sum = *sum as f64;
(k.clone(), BoundedDouble::from((sum, 1.0, sum, sum)))
})
.collect()
} else {
let p = self.outputs_merged as f64 / self.total_outputs as f64;
self.sums
.iter()
.map(|(k, sum)| (k.clone(), bound(self.confidence, *sum as f64, p)))
.collect()
}
}
}
2 changes: 2 additions & 0 deletions src/partial/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ mod approximate_action_listener;
pub(self) mod approximate_evaluator;
mod bounded_double;
mod count_evaluator;
mod grouped_count_evaluator;
mod partial_result;

pub(crate) use approximate_action_listener::ApproximateActionListener;
pub(crate) use approximate_evaluator::ApproximateEvaluator;
pub use bounded_double::BoundedDouble;
pub(crate) use count_evaluator::CountEvaluator;
pub(crate) use grouped_count_evaluator::GroupedCountEvaluator;
pub use partial_result::PartialResult;

#[derive(Debug, Error)]
Expand Down
Loading

0 comments on commit 0471eda

Please sign in to comment.