From a5170c6ffd2fd46cfa721105c82489d45dbb1ecf Mon Sep 17 00:00:00 2001 From: "Dawei.H" Date: Sat, 2 May 2020 18:55:14 +0800 Subject: [PATCH 01/20] top() and take_ordered() --- src/lib.rs | 3 +- src/rdd/rdd.rs | 64 +++++++++++++++ src/utils/bounded_priority_queue.rs | 119 ++++++++++++++++++++++++++++ src/utils/mod.rs | 1 + tests/test_rdd.rs | 21 +++++ 5 files changed, 207 insertions(+), 1 deletion(-) create mode 100644 src/utils/bounded_priority_queue.rs diff --git a/src/lib.rs b/src/lib.rs index c33576bb..a2645567 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,7 +6,8 @@ never_type, specialization, unboxed_closures, - unsize + unsize, + binary_heap_into_iter_sorted )] #![allow(dead_code, where_clauses_object_safety, deprecated)] #![allow(clippy::single_component_path_imports)] diff --git a/src/rdd/rdd.rs b/src/rdd/rdd.rs index 3970c94c..6a7d056b 100644 --- a/src/rdd/rdd.rs +++ b/src/rdd/rdd.rs @@ -15,6 +15,8 @@ use crate::partitioner::{HashPartitioner, Partitioner}; use crate::scheduler::TaskContext; use crate::serializable_traits::{AnyData, Data, Func, SerFunc}; use crate::split::Split; +use crate::task::TaskContext; +use crate::utils::bounded_priority_queue::{BoundedMaxPriorityQueue, BoundedMinPriorityQueue}; use crate::utils::random::{BernoulliCellSampler, BernoulliSampler, PoissonSampler, RandomSampler}; use crate::{utils, Fn, SerArc, SerBox}; use fasthash::MetroHasher; @@ -992,6 +994,68 @@ pub trait Rdd: RddBase + 'static { let min_fn = Fn!(|x: Self::Item, y: Self::Item| x.min(y)); self.reduce(min_fn) } + + fn top(&self, num: usize) -> Result> + where + Self: Sized, + Self::Item: Data + Ord, + { + if num == 0 { + Ok(vec![]) + } else { + let first_k_func = Fn!(move |partition: Box>| + -> Box>> { + let mut queue = BoundedMinPriorityQueue::new(num); + partition.for_each(|item: Self::Item| queue.append(item)); + Box::new(std::iter::once(queue)) + }); + + let queue = self + .map_partitions(first_k_func) + .reduce(Fn!( + move |mut queue1: BoundedMinPriorityQueue, + queue2: BoundedMinPriorityQueue| + -> BoundedMinPriorityQueue { + queue1.merge(queue2); + queue1 + } + ))? + .unwrap() as BoundedMinPriorityQueue; + + Ok(queue.into_vec_sorted()) + } + } + + fn take_ordered(&self, num: usize) -> Result> + where + Self: Sized, + Self::Item: Data + Ord, + { + if num == 0 { + Ok(vec![]) + } else { + let first_k_func = Fn!(move |partition: Box>| + -> Box>> { + let mut queue = BoundedMaxPriorityQueue::new(num); + partition.for_each(|item: Self::Item| queue.append(item)); + Box::new(std::iter::once(queue)) + }); + + let queue = self + .map_partitions(first_k_func) + .reduce(Fn!( + move |mut queue1: BoundedMaxPriorityQueue, + queue2: BoundedMaxPriorityQueue| + -> BoundedMaxPriorityQueue { + queue1.merge(queue2); + queue1 + } + ))? + .unwrap() as BoundedMaxPriorityQueue; + + Ok(queue.into_vec_sorted()) + } + } } pub trait Reduce { diff --git a/src/utils/bounded_priority_queue.rs b/src/utils/bounded_priority_queue.rs new file mode 100644 index 00000000..222eb607 --- /dev/null +++ b/src/utils/bounded_priority_queue.rs @@ -0,0 +1,119 @@ +use crate::serializable_traits::Data; +use serde_derive::{Deserialize, Serialize}; +use std::cmp::Reverse; +use std::collections::BinaryHeap; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub(crate) struct BoundedMinPriorityQueue { + max_size: usize, + underlying: BinaryHeap>, +} + +impl BoundedMinPriorityQueue { + pub fn new(max_size: usize) -> BoundedMinPriorityQueue { + BoundedMinPriorityQueue { + max_size: max_size, + underlying: BinaryHeap::with_capacity(max_size), + } + } + + pub fn get_size(&self) -> usize { + self.underlying.len() + } + + pub fn into_vec_sorted(&self) -> Vec { + let mut res = self + .underlying + .clone() + .into_iter_sorted() + .map(|Reverse(i)| i) + .collect::>(); + res.reverse(); + res + } + + pub fn merge(&mut self, other: BoundedMinPriorityQueue) -> &Self { + other + .underlying + .into_iter() + .for_each(|Reverse(elem)| self.append(elem)); + self + } + + pub fn append(&mut self, elem: T) { + if self.underlying.len() < self.max_size { + self.underlying.push(Reverse(elem)); + } else { + self.maybe_replace_lowest(elem); + } + } + + pub(self) fn maybe_replace_lowest(&mut self, elem: T) + where + T: Data + Ord, + { + if let Some(Reverse(head)) = self.underlying.peek() { + if elem.gt(head) { + self.underlying.pop(); + self.underlying.push(Reverse(elem)); + } + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub(crate) struct BoundedMaxPriorityQueue { + max_size: usize, + underlying: BinaryHeap, +} + +impl BoundedMaxPriorityQueue { + pub fn new(max_size: usize) -> BoundedMaxPriorityQueue { + BoundedMaxPriorityQueue { + max_size: max_size, + underlying: BinaryHeap::with_capacity(max_size), + } + } + + pub fn get_size(&self) -> usize { + self.underlying.len() + } + + pub fn into_vec_sorted(&self) -> Vec { + let mut res = self + .underlying + .clone() + .into_iter_sorted() + .collect::>(); + res.reverse(); + res + } + + pub fn merge(&mut self, other: BoundedMaxPriorityQueue) -> &Self { + other + .underlying + .into_iter() + .for_each(|elem| self.append(elem)); + self + } + + pub fn append(&mut self, elem: T) { + if self.underlying.len() < self.max_size { + self.underlying.push(elem); + } else { + self.maybe_replace_lowest(elem); + } + } + + pub(self) fn maybe_replace_lowest(&mut self, elem: T) + where + T: Data + Ord, + { + if let Some(head) = self.underlying.peek() { + if elem.lt(head) { + self.underlying.pop(); + self.underlying.push(elem); + } + } + } +} diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 093d4b1a..f2e37e3b 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -6,6 +6,7 @@ use crate::env; use crate::error; use rand::Rng; +pub(crate) mod bounded_priority_queue; pub(crate) mod random; #[cfg(test)] pub(crate) mod test_utils; diff --git a/tests/test_rdd.rs b/tests/test_rdd.rs index 2e2d6adf..305fd629 100644 --- a/tests/test_rdd.rs +++ b/tests/test_rdd.rs @@ -629,3 +629,24 @@ fn test_random_split() { assert!(rdds[0].iter().all(|i| !rdds[2].contains(i))); assert!(rdds[1].iter().all(|i| !rdds[2].contains(i))); } + +#[test] +fn test_top() { + let sc = CONTEXT.clone(); + + let col1 = vec![13, 28, 3, 4, 51, 108, 12, 113, 19]; + let rdd = sc.parallelize(col1, 4); + let res: Vec = rdd.top(3).unwrap(); + assert_eq!(res, vec![113, 108, 51]); +} + +#[test] +fn test_take_ordered() { + let sc = CONTEXT.clone(); + + let col1 = vec![13, 28, 3, 4, 51, 108, 12, 113, 19]; + let rdd = sc.parallelize(col1, 4); + + let res: Vec = rdd.take_ordered(3).unwrap(); + assert_eq!(res, vec![3, 4, 12]); +} From 99fc11654ebefb87e7bd5d76c78d19fb0e2d599c Mon Sep 17 00:00:00 2001 From: "Dawei.H" Date: Sat, 2 May 2020 19:03:44 +0800 Subject: [PATCH 02/20] cargo formatting --- src/scheduler/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index 19253a11..64b81b4e 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -17,13 +17,13 @@ mod task; pub(self) use self::base_scheduler::EventQueue; pub(self) use self::dag_scheduler::{CompletionEvent, FetchFailedVals, TastEndReason}; pub(self) use self::job::{Job, JobTracker}; +pub(self) use self::job_listener::NoOpListener; pub(self) use self::live_listener_bus::LiveListenerBus; pub(self) use self::stage::Stage; -pub(self) use self::job_listener::NoOpListener; pub(crate) use self::base_scheduler::NativeScheduler; pub(crate) use self::distributed_scheduler::DistributedScheduler; -pub(crate) use self::job_listener::{JobListener}; +pub(crate) use self::job_listener::JobListener; pub(crate) use self::local_scheduler::LocalScheduler; pub(crate) use self::result_task::ResultTask; pub(crate) use self::task::TaskContext; From 0654774c9bdce2129a891572d2cf7b65f0db66dc Mon Sep 17 00:00:00 2001 From: "Dawei.H" Date: Sat, 2 May 2020 21:40:12 +0800 Subject: [PATCH 03/20] fix compile error --- src/rdd/rdd.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/rdd/rdd.rs b/src/rdd/rdd.rs index 6a7d056b..93721660 100644 --- a/src/rdd/rdd.rs +++ b/src/rdd/rdd.rs @@ -15,7 +15,6 @@ use crate::partitioner::{HashPartitioner, Partitioner}; use crate::scheduler::TaskContext; use crate::serializable_traits::{AnyData, Data, Func, SerFunc}; use crate::split::Split; -use crate::task::TaskContext; use crate::utils::bounded_priority_queue::{BoundedMaxPriorityQueue, BoundedMinPriorityQueue}; use crate::utils::random::{BernoulliCellSampler, BernoulliSampler, PoissonSampler, RandomSampler}; use crate::{utils, Fn, SerArc, SerBox}; From 90a7d499cb8659846ff9a6dda32ba5ee7c31718b Mon Sep 17 00:00:00 2001 From: "Dawei.H" Date: Sun, 3 May 2020 01:23:18 +0800 Subject: [PATCH 04/20] remove unused functions --- src/utils/bounded_priority_queue.rs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/utils/bounded_priority_queue.rs b/src/utils/bounded_priority_queue.rs index 222eb607..60d694d5 100644 --- a/src/utils/bounded_priority_queue.rs +++ b/src/utils/bounded_priority_queue.rs @@ -17,10 +17,6 @@ impl BoundedMinPriorityQueue { } } - pub fn get_size(&self) -> usize { - self.underlying.len() - } - pub fn into_vec_sorted(&self) -> Vec { let mut res = self .underlying @@ -75,10 +71,6 @@ impl BoundedMaxPriorityQueue { } } - pub fn get_size(&self) -> usize { - self.underlying.len() - } - pub fn into_vec_sorted(&self) -> Vec { let mut res = self .underlying From 18c9c84d5acdfefb643d260a260ddf4b7bf4a766 Mon Sep 17 00:00:00 2001 From: "Dawei.H" Date: Sun, 3 May 2020 12:23:52 +0800 Subject: [PATCH 05/20] call take_ordered in top --- src/rdd/rdd.rs | 45 +++++++------------- src/utils/bounded_priority_queue.rs | 65 +++-------------------------- 2 files changed, 19 insertions(+), 91 deletions(-) diff --git a/src/rdd/rdd.rs b/src/rdd/rdd.rs index 93721660..c2e64525 100644 --- a/src/rdd/rdd.rs +++ b/src/rdd/rdd.rs @@ -1,4 +1,5 @@ use std::cmp::Ordering; +use std::cmp::Reverse; use std::fs; use std::hash::Hash; use std::io::{BufWriter, Write}; @@ -15,7 +16,7 @@ use crate::partitioner::{HashPartitioner, Partitioner}; use crate::scheduler::TaskContext; use crate::serializable_traits::{AnyData, Data, Func, SerFunc}; use crate::split::Split; -use crate::utils::bounded_priority_queue::{BoundedMaxPriorityQueue, BoundedMinPriorityQueue}; +use crate::utils::bounded_priority_queue::BoundedPriorityQueue; use crate::utils::random::{BernoulliCellSampler, BernoulliSampler, PoissonSampler, RandomSampler}; use crate::{utils, Fn, SerArc, SerBox}; use fasthash::MetroHasher; @@ -999,30 +1000,12 @@ pub trait Rdd: RddBase + 'static { Self: Sized, Self::Item: Data + Ord, { - if num == 0 { - Ok(vec![]) - } else { - let first_k_func = Fn!(move |partition: Box>| - -> Box>> { - let mut queue = BoundedMinPriorityQueue::new(num); - partition.for_each(|item: Self::Item| queue.append(item)); - Box::new(std::iter::once(queue)) - }); - - let queue = self - .map_partitions(first_k_func) - .reduce(Fn!( - move |mut queue1: BoundedMinPriorityQueue, - queue2: BoundedMinPriorityQueue| - -> BoundedMinPriorityQueue { - queue1.merge(queue2); - queue1 - } - ))? - .unwrap() as BoundedMinPriorityQueue; - - Ok(queue.into_vec_sorted()) - } + Ok(self + .map(Fn!(|x| Reverse(x))) + .take_ordered(num)? + .into_iter() + .map(|x| x.0) + .collect()) } fn take_ordered(&self, num: usize) -> Result> @@ -1034,8 +1017,8 @@ pub trait Rdd: RddBase + 'static { Ok(vec![]) } else { let first_k_func = Fn!(move |partition: Box>| - -> Box>> { - let mut queue = BoundedMaxPriorityQueue::new(num); + -> Box>> { + let mut queue = BoundedPriorityQueue::new(num); partition.for_each(|item: Self::Item| queue.append(item)); Box::new(std::iter::once(queue)) }); @@ -1043,14 +1026,14 @@ pub trait Rdd: RddBase + 'static { let queue = self .map_partitions(first_k_func) .reduce(Fn!( - move |mut queue1: BoundedMaxPriorityQueue, - queue2: BoundedMaxPriorityQueue| - -> BoundedMaxPriorityQueue { + move |mut queue1: BoundedPriorityQueue, + queue2: BoundedPriorityQueue| + -> BoundedPriorityQueue { queue1.merge(queue2); queue1 } ))? - .unwrap() as BoundedMaxPriorityQueue; + .unwrap() as BoundedPriorityQueue; Ok(queue.into_vec_sorted()) } diff --git a/src/utils/bounded_priority_queue.rs b/src/utils/bounded_priority_queue.rs index 60d694d5..c444da0a 100644 --- a/src/utils/bounded_priority_queue.rs +++ b/src/utils/bounded_priority_queue.rs @@ -1,71 +1,16 @@ use crate::serializable_traits::Data; use serde_derive::{Deserialize, Serialize}; -use std::cmp::Reverse; use std::collections::BinaryHeap; #[derive(Clone, Debug, Serialize, Deserialize)] -pub(crate) struct BoundedMinPriorityQueue { - max_size: usize, - underlying: BinaryHeap>, -} - -impl BoundedMinPriorityQueue { - pub fn new(max_size: usize) -> BoundedMinPriorityQueue { - BoundedMinPriorityQueue { - max_size: max_size, - underlying: BinaryHeap::with_capacity(max_size), - } - } - - pub fn into_vec_sorted(&self) -> Vec { - let mut res = self - .underlying - .clone() - .into_iter_sorted() - .map(|Reverse(i)| i) - .collect::>(); - res.reverse(); - res - } - - pub fn merge(&mut self, other: BoundedMinPriorityQueue) -> &Self { - other - .underlying - .into_iter() - .for_each(|Reverse(elem)| self.append(elem)); - self - } - - pub fn append(&mut self, elem: T) { - if self.underlying.len() < self.max_size { - self.underlying.push(Reverse(elem)); - } else { - self.maybe_replace_lowest(elem); - } - } - - pub(self) fn maybe_replace_lowest(&mut self, elem: T) - where - T: Data + Ord, - { - if let Some(Reverse(head)) = self.underlying.peek() { - if elem.gt(head) { - self.underlying.pop(); - self.underlying.push(Reverse(elem)); - } - } - } -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub(crate) struct BoundedMaxPriorityQueue { +pub(crate) struct BoundedPriorityQueue { max_size: usize, underlying: BinaryHeap, } -impl BoundedMaxPriorityQueue { - pub fn new(max_size: usize) -> BoundedMaxPriorityQueue { - BoundedMaxPriorityQueue { +impl BoundedPriorityQueue { + pub fn new(max_size: usize) -> BoundedPriorityQueue { + BoundedPriorityQueue { max_size: max_size, underlying: BinaryHeap::with_capacity(max_size), } @@ -81,7 +26,7 @@ impl BoundedMaxPriorityQueue { res } - pub fn merge(&mut self, other: BoundedMaxPriorityQueue) -> &Self { + pub fn merge(&mut self, other: BoundedPriorityQueue) -> &Self { other .underlying .into_iter() From 9e33ab83683526d9e88330df9430d89062cf972f Mon Sep 17 00:00:00 2001 From: "Dawei.H" Date: Sun, 3 May 2020 14:45:30 +0800 Subject: [PATCH 06/20] impl Into trait for BoundedPriorityQueue --- src/rdd/rdd.rs | 2 +- src/utils/bounded_priority_queue.rs | 21 +++++++++++---------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/src/rdd/rdd.rs b/src/rdd/rdd.rs index c2e64525..2b1c5ae1 100644 --- a/src/rdd/rdd.rs +++ b/src/rdd/rdd.rs @@ -1035,7 +1035,7 @@ pub trait Rdd: RddBase + 'static { ))? .unwrap() as BoundedPriorityQueue; - Ok(queue.into_vec_sorted()) + Ok(queue.into()) } } } diff --git a/src/utils/bounded_priority_queue.rs b/src/utils/bounded_priority_queue.rs index c444da0a..82d58c22 100644 --- a/src/utils/bounded_priority_queue.rs +++ b/src/utils/bounded_priority_queue.rs @@ -8,6 +8,17 @@ pub(crate) struct BoundedPriorityQueue { underlying: BinaryHeap, } +impl Into> for BoundedPriorityQueue { + fn into(self) -> Vec { + let mut col: Vec<_> = self + .underlying + .into_iter_sorted() + .collect(); + col.reverse(); + col + } +} + impl BoundedPriorityQueue { pub fn new(max_size: usize) -> BoundedPriorityQueue { BoundedPriorityQueue { @@ -16,16 +27,6 @@ impl BoundedPriorityQueue { } } - pub fn into_vec_sorted(&self) -> Vec { - let mut res = self - .underlying - .clone() - .into_iter_sorted() - .collect::>(); - res.reverse(); - res - } - pub fn merge(&mut self, other: BoundedPriorityQueue) -> &Self { other .underlying From f9334f8fe2126b9df2027d6ff4166bdd88d4cbc9 Mon Sep 17 00:00:00 2001 From: "Dawei.H" Date: Sun, 3 May 2020 15:07:01 +0800 Subject: [PATCH 07/20] minor refactoring --- src/rdd/rdd.rs | 3 +-- src/utils/bounded_priority_queue.rs | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/rdd/rdd.rs b/src/rdd/rdd.rs index 2b1c5ae1..b8c2a049 100644 --- a/src/rdd/rdd.rs +++ b/src/rdd/rdd.rs @@ -1029,8 +1029,7 @@ pub trait Rdd: RddBase + 'static { move |mut queue1: BoundedPriorityQueue, queue2: BoundedPriorityQueue| -> BoundedPriorityQueue { - queue1.merge(queue2); - queue1 + queue1.merge(queue2) } ))? .unwrap() as BoundedPriorityQueue; diff --git a/src/utils/bounded_priority_queue.rs b/src/utils/bounded_priority_queue.rs index 82d58c22..4f4daab9 100644 --- a/src/utils/bounded_priority_queue.rs +++ b/src/utils/bounded_priority_queue.rs @@ -27,7 +27,7 @@ impl BoundedPriorityQueue { } } - pub fn merge(&mut self, other: BoundedPriorityQueue) -> &Self { + pub fn merge(mut self, other: BoundedPriorityQueue) -> Self { other .underlying .into_iter() @@ -43,7 +43,7 @@ impl BoundedPriorityQueue { } } - pub(self) fn maybe_replace_lowest(&mut self, elem: T) + fn maybe_replace_lowest(&mut self, elem: T) where T: Data + Ord, { From 0a09c1754feb0137ff5bb235ace6b3ad5b23aa06 Mon Sep 17 00:00:00 2001 From: "Dawei.H" Date: Sun, 3 May 2020 15:14:27 +0800 Subject: [PATCH 08/20] no need to mut in Fn! --- src/rdd/rdd.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rdd/rdd.rs b/src/rdd/rdd.rs index b8c2a049..03841ec3 100644 --- a/src/rdd/rdd.rs +++ b/src/rdd/rdd.rs @@ -1026,7 +1026,7 @@ pub trait Rdd: RddBase + 'static { let queue = self .map_partitions(first_k_func) .reduce(Fn!( - move |mut queue1: BoundedPriorityQueue, + move |queue1: BoundedPriorityQueue, queue2: BoundedPriorityQueue| -> BoundedPriorityQueue { queue1.merge(queue2) From 8782c176fdc6e7724e26e3135fcb784bc1e2996d Mon Sep 17 00:00:00 2001 From: Ignacio Duart Date: Sun, 3 May 2020 09:20:56 +0200 Subject: [PATCH 09/20] Change a few prefixes from ns to vega --- .travis.yml | 2 +- README.md | 2 +- docker/Dockerfile | 6 +++--- docker/docker-compose.yml | 8 ++++---- docker/id_rsa.pub | 2 +- docker/testing_cluster.sh | 20 ++++++++++---------- src/env.rs | 4 ++-- src/io/local_file_reader.rs | 2 +- tests/test_rdd.rs | 4 ++-- user_guide/src/chapter_1.md | 10 +++++----- 10 files changed, 30 insertions(+), 30 deletions(-) diff --git a/.travis.yml b/.travis.yml index 722ceac7..f360afc9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,7 +7,7 @@ env: global: - export PATH="$PATH:$HOME/bin" - export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$HOME/lib" - - export NS_LOCAL_IP=0.0.0.0 + - export VEGA_LOCAL_IP=0.0.0.0 addons: apt: packages: diff --git a/README.md b/README.md index e18f0861..f032a162 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # vega -Previously known as native_spark +Previously known as `native_spark`. [![Join the chat at https://gitter.im/fast_spark/community](https://badges.gitter.im/fast_spark/community.svg)](https://gitter.im/fast_spark/community?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) [![Build Status](https://travis-ci.org/rajasekarv/native_spark.svg?branch=master)](https://travis-ci.org/rajasekarv/native_spark) diff --git a/docker/Dockerfile b/docker/Dockerfile index 38e0388a..e64e68fc 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -46,16 +46,16 @@ RUN set -e; \ # Locales locale-gen en_US.UTF-8; \ # Set SSH user - groupadd ns && useradd -ms /bin/bash -g ns ns_user; \ + groupadd ns && useradd -ms /bin/bash -g ns vega_user; \ # Cleanup #apt-get purge -y --auto-remove $tempPkgs; \ apt-get autoremove -q -y; \ apt-get clean -yq; \ rm -rf /var/lib/apt/lists/* COPY --from=building /home/release . -COPY --chown=ns_user:ns ./docker/id_rsa.pub /home/ns_user/.ssh/authorized_keys +COPY --chown=vega_user:ns ./docker/id_rsa.pub /home/vega_user/.ssh/authorized_keys COPY ./docker/id_rsa /root/.ssh/ -RUN chmod 600 /root/.ssh/id_rsa /home/ns_user/.ssh/authorized_keys +RUN chmod 600 /root/.ssh/id_rsa /home/vega_user/.ssh/authorized_keys ENV LANG=en_US.UTF-8 \ LANGUAGE=en_US:en \ LC_ALL=en_US.UTF-8 diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 0ad062be..08b96da1 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -4,7 +4,7 @@ networks: native-spark: services: - ns_master: + vega_master: image: vega:latest ports: - "3000" @@ -16,11 +16,11 @@ services: source: "../target" target: "/home/dev" environment: - NS_DEPLOYMENT_MODE: distributed + VEGA_DEPLOYMENT_MODE: distributed depends_on: - - ns_worker + - vega_worker - ns_worker: + vega_worker: image: vega:latest ports: - "10500" diff --git a/docker/id_rsa.pub b/docker/id_rsa.pub index 5ffd2c2c..c73d4c1b 100644 --- a/docker/id_rsa.pub +++ b/docker/id_rsa.pub @@ -1 +1 @@ -ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDCcjmvyQT2FsdM795b3dGv3GGDGIsz7v6YsogfjQmEbGdpaXNtqyYIpp/VGyc5/ia1e2n85MAUO2mjWxrAHU6+4owl8q6enARG+/xC9a6vuWdsBNhWV6V/pYVJnPAi/v4t8+uPnAY+6KaMuUKSOYNsfu0r88tzIW5nXTS/yG24BryyMzDuLbWzLGpR6PCjxhsMKqn41WoomJ+GEfH2acz/0Le82fGxUSuukkQJoSXGA5aMFaOgTmhXlVKyN2b/+pbxThB9YAB7mTsevQrit+J+Xvz0SOD332CV79DIysWwx19kKjcVYsIt5wNrknCehxV+I4IZ3+1LT4arWe+j0STJ ns_user@a0e0c0941a35 \ No newline at end of file +ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDCcjmvyQT2FsdM795b3dGv3GGDGIsz7v6YsogfjQmEbGdpaXNtqyYIpp/VGyc5/ia1e2n85MAUO2mjWxrAHU6+4owl8q6enARG+/xC9a6vuWdsBNhWV6V/pYVJnPAi/v4t8+uPnAY+6KaMuUKSOYNsfu0r88tzIW5nXTS/yG24BryyMzDuLbWzLGpR6PCjxhsMKqn41WoomJ+GEfH2acz/0Le82fGxUSuukkQJoSXGA5aMFaOgTmhXlVKyN2b/+pbxThB9YAB7mTsevQrit+J+Xvz0SOD332CV79DIysWwx19kKjcVYsIt5wNrknCehxV+I4IZ3+1LT4arWe+j0STJ vega_user@a0e0c0941a35 \ No newline at end of file diff --git a/docker/testing_cluster.sh b/docker/testing_cluster.sh index d1b1a02a..da8b93f2 100644 --- a/docker/testing_cluster.sh +++ b/docker/testing_cluster.sh @@ -3,16 +3,16 @@ SCRIPT_PATH=`dirname $(readlink -f $0)` cd $SCRIPT_PATH # Deploy a testing cluster with one master and 2 workers -docker-compose up --scale ns_worker=2 -d +docker-compose up --scale vega_worker=2 -d # Since we can't resolved domains yet, we have to get each container IP to create the config file -WORKER_IPS=$(docker-compose ps | grep -oE "docker_ns_worker_[0-9]+" \ +WORKER_IPS=$(docker-compose ps | grep -oE "docker_vega_worker_[0-9]+" \ | xargs -I{} docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' {}) read -a WORKER_IPS <<< $(echo $WORKER_IPS) -slaves=$(printf ",\"ns_user@%s\"" "${WORKER_IPS[@]}") +slaves=$(printf ",\"vega_user@%s\"" "${WORKER_IPS[@]}") slaves="slaves = [${slaves:1}]" -MASTER_IP=$(docker-compose ps | grep -oE "docker_ns_master_[0-9]+" \ +MASTER_IP=$(docker-compose ps | grep -oE "docker_vega_master_[0-9]+" \ | xargs -I{} docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' {}) CONF_FILE=`cat <> hosts.conf && \ - echo "NS_LOCAL_IP=$NS_LOCAL_IP" >> .ssh/environment && \ + echo "VEGA_LOCAL_IP=$VEGA_LOCAL_IP" >> .ssh/environment && \ echo "PermitUserEnvironment yes" >> /etc/ssh/sshd_config && \ echo "AcceptEnv RUST_BACKTRACE" >> /etc/ssh/sshd_config && \ service ssh start'; (( count++ )); done -docker exec -e CONF_FILE="$CONF_FILE" -e NS_LOCAL_IP="${MASTER_IP}" -w /root/ docker_ns_master_1 \ +docker exec -e CONF_FILE="$CONF_FILE" -e VEGA_LOCAL_IP="${MASTER_IP}" -w /root/ docker_vega_master_1 \ bash -c 'echo "$CONF_FILE" >> hosts.conf && \ - echo "export NS_LOCAL_IP=$NS_LOCAL_IP" >> .bashrc && + echo "export VEGA_LOCAL_IP=$VEGA_LOCAL_IP" >> .bashrc && echo "SendEnv RUST_BACKTRACE" >> ~/.ssh/config '; for WORKER_IP in ${WORKER_IPS[@]} do - docker exec docker_ns_master_1 bash -c "ssh-keyscan ${WORKER_IP} >> ~/.ssh/known_hosts" + docker exec docker_vega_master_1 bash -c "ssh-keyscan ${WORKER_IP} >> ~/.ssh/known_hosts" done # When done is posible to open a shell into the master and run any of the examples in distributed mode diff --git a/src/env.rs b/src/env.rs index d593e3c8..ffa7f316 100644 --- a/src/env.rs +++ b/src/env.rs @@ -18,8 +18,8 @@ use tokio::runtime::{Handle, Runtime}; /// The key is: {shuffle_id}/{input_id}/{reduce_id} type ShuffleCache = Arc>>; -const ENV_VAR_PREFIX: &str = "NS_"; -pub(crate) const THREAD_PREFIX: &str = "_NS"; +const ENV_VAR_PREFIX: &str = "VEGA_"; +pub(crate) const THREAD_PREFIX: &str = "_VEGA"; static CONF: OnceCell = OnceCell::new(); static ENV: OnceCell = OnceCell::new(); static ASYNC_RT: Lazy> = Lazy::new(Env::build_async_executor); diff --git a/src/io/local_file_reader.rs b/src/io/local_file_reader.rs index 94a5e249..d3a32a23 100644 --- a/src/io/local_file_reader.rs +++ b/src/io/local_file_reader.rs @@ -49,7 +49,7 @@ impl LocalFsReaderConfig { /// Number of partitions to use per executor to perform the load tasks. /// One executor must be used per host with as many partitions as CPUs available (ideally). - pub fn num_partitions_per_executor(mut self, num: u64) -> Self { + pub fn num_partitiovega_per_executor(mut self, num: u64) -> Self { self.executor_partitions = Some(num); self } diff --git a/tests/test_rdd.rs b/tests/test_rdd.rs index e33d5f95..e2aa54af 100644 --- a/tests/test_rdd.rs +++ b/tests/test_rdd.rs @@ -2,16 +2,16 @@ use std::fs::{create_dir_all, File}; use std::io::prelude::*; use std::sync::Arc; +use once_cell::sync::Lazy; use vega::io::*; use vega::partitioner::HashPartitioner; use vega::rdd::CoGroupedRdd; use vega::*; -use once_cell::sync::Lazy; static CONTEXT: Lazy> = Lazy::new(|| Context::new().unwrap()); //static AGGREGATOR: Lazy>> = Lazy::new(|| Aggregator::new().unwrap()); static WORK_DIR: Lazy = Lazy::new(std::env::temp_dir); -const TEST_DIR: &str = "ns_test_dir"; +const TEST_DIR: &str = "vega_test_dir"; #[allow(unused_must_use)] fn set_up(file_name: &str) { diff --git a/user_guide/src/chapter_1.md b/user_guide/src/chapter_1.md index d51d0261..0192c9c0 100644 --- a/user_guide/src/chapter_1.md +++ b/user_guide/src/chapter_1.md @@ -36,13 +36,13 @@ In order to execute application code some preliminary setup is required. (So far # create the same hosts.conf file in every machine: $ cd ~ && vim hosts.conf ... ``` -* The environment variable `NS_LOCAL_IP` must be set for the user executing application code. +* The environment variable `VEGA_LOCAL_IP` must be set for the user executing application code. * In `local` it suffices to set up for the current user: - > $ export NS_LOCAL_IP=0.0.0.0 + > $ export VEGA_LOCAL_IP=0.0.0.0 * In `distributed` the variable is required, aditionally, to be set up for the users remotely connecting. Depending on the O.S. and ssh defaults this may require some additional configuration. E.g.: ```doc $ ssh remote_user@172.0.0.10 - $ sudo echo "NS_LOCAL_IP=172.0.0.10" >> .ssh/environment + $ sudo echo "VEGA_LOCAL_IP=172.0.0.10" >> .ssh/environment $ sudo echo "PermitUserEnvironment yes" >> /etc/ssh/sshd_config $ service ssh restart ``` @@ -52,7 +52,7 @@ examples just run them. In `local`: > cargo run --example make_rdd In `distributed`: -> export NS_DEPLOYMENT_MODE=distributed +> export VEGA_DEPLOYMENT_MODE=distributed > > cargo run --example make_rdd @@ -69,7 +69,7 @@ and deploying distributed mode on your local host. In order to use them: This will execute all the necessary steeps to to deploy a working network of containers where you can execute the tests. When finished you can attach a shell to the master and run the examples: ```doc -$ docker exec -it docker_ns_master_1 bash +$ docker exec -it docker_vega_master_1 bash $ ./make_rdd ``` From 80bbbbea1a36b30ce4b95619544f22ad77d2e0d1 Mon Sep 17 00:00:00 2001 From: Ignacio Duart Date: Sun, 3 May 2020 10:20:28 +0200 Subject: [PATCH 10/20] Fix docker image build script --- docker/build_image.sh | 2 +- docker/id_rsa.pub | 2 +- src/io/local_file_reader.rs | 2 +- src/rdd/rdd.rs | 15 +++++---------- 4 files changed, 8 insertions(+), 13 deletions(-) diff --git a/docker/build_image.sh b/docker/build_image.sh index afe4fd66..963e979c 100644 --- a/docker/build_image.sh +++ b/docker/build_image.sh @@ -12,7 +12,7 @@ PACKAGE="vega:${VERSION}" cd $SCRIPT_PATH && cd .. echo "work dir: $(pwd)" -RUST_VERSION="$(cat ./rust-toolchain | tr -d '[:space:]')" +RUST_VERSION="nightly" echo "rust version: $RUST_VERSION" echo "building $PACKAGE..." diff --git a/docker/id_rsa.pub b/docker/id_rsa.pub index c73d4c1b..5ffd2c2c 100644 --- a/docker/id_rsa.pub +++ b/docker/id_rsa.pub @@ -1 +1 @@ -ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDCcjmvyQT2FsdM795b3dGv3GGDGIsz7v6YsogfjQmEbGdpaXNtqyYIpp/VGyc5/ia1e2n85MAUO2mjWxrAHU6+4owl8q6enARG+/xC9a6vuWdsBNhWV6V/pYVJnPAi/v4t8+uPnAY+6KaMuUKSOYNsfu0r88tzIW5nXTS/yG24BryyMzDuLbWzLGpR6PCjxhsMKqn41WoomJ+GEfH2acz/0Le82fGxUSuukkQJoSXGA5aMFaOgTmhXlVKyN2b/+pbxThB9YAB7mTsevQrit+J+Xvz0SOD332CV79DIysWwx19kKjcVYsIt5wNrknCehxV+I4IZ3+1LT4arWe+j0STJ vega_user@a0e0c0941a35 \ No newline at end of file +ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDCcjmvyQT2FsdM795b3dGv3GGDGIsz7v6YsogfjQmEbGdpaXNtqyYIpp/VGyc5/ia1e2n85MAUO2mjWxrAHU6+4owl8q6enARG+/xC9a6vuWdsBNhWV6V/pYVJnPAi/v4t8+uPnAY+6KaMuUKSOYNsfu0r88tzIW5nXTS/yG24BryyMzDuLbWzLGpR6PCjxhsMKqn41WoomJ+GEfH2acz/0Le82fGxUSuukkQJoSXGA5aMFaOgTmhXlVKyN2b/+pbxThB9YAB7mTsevQrit+J+Xvz0SOD332CV79DIysWwx19kKjcVYsIt5wNrknCehxV+I4IZ3+1LT4arWe+j0STJ ns_user@a0e0c0941a35 \ No newline at end of file diff --git a/src/io/local_file_reader.rs b/src/io/local_file_reader.rs index d3a32a23..94a5e249 100644 --- a/src/io/local_file_reader.rs +++ b/src/io/local_file_reader.rs @@ -49,7 +49,7 @@ impl LocalFsReaderConfig { /// Number of partitions to use per executor to perform the load tasks. /// One executor must be used per host with as many partitions as CPUs available (ideally). - pub fn num_partitiovega_per_executor(mut self, num: u64) -> Self { + pub fn num_partitions_per_executor(mut self, num: u64) -> Self { self.executor_partitions = Some(num); self } diff --git a/src/rdd/rdd.rs b/src/rdd/rdd.rs index 03841ec3..4035626f 100644 --- a/src/rdd/rdd.rs +++ b/src/rdd/rdd.rs @@ -1023,16 +1023,11 @@ pub trait Rdd: RddBase + 'static { Box::new(std::iter::once(queue)) }); - let queue = self - .map_partitions(first_k_func) - .reduce(Fn!( - move |queue1: BoundedPriorityQueue, - queue2: BoundedPriorityQueue| - -> BoundedPriorityQueue { - queue1.merge(queue2) - } - ))? - .unwrap() as BoundedPriorityQueue; + let queue = self.map_partitions(first_k_func).reduce(Fn!( + move |queue1: BoundedPriorityQueue, + queue2: BoundedPriorityQueue| + -> BoundedPriorityQueue { queue1.merge(queue2) } + ))?.ok_or_else(|| Error::Other)? as BoundedPriorityQueue; Ok(queue.into()) } From 58974531eeec554ffd07653e2339d6e8878c7f2a Mon Sep 17 00:00:00 2001 From: Ignacio Duart Date: Sun, 3 May 2020 11:19:52 +0200 Subject: [PATCH 11/20] Add grouped count evaluator --- src/partial/count_evaluator.rs | 3 +- src/partial/grouped_count_evaluator.rs | 61 ++++++++++++++++++++++++++ src/partial/mod.rs | 2 + src/rdd/rdd.rs | 61 +++++++++++++++++++++++--- src/shuffle/shuffle_fetcher.rs | 6 +-- 5 files changed, 122 insertions(+), 11 deletions(-) create mode 100644 src/partial/grouped_count_evaluator.rs diff --git a/src/partial/count_evaluator.rs b/src/partial/count_evaluator.rs index dd7fa611..ffc5d7ef 100644 --- a/src/partial/count_evaluator.rs +++ b/src/partial/count_evaluator.rs @@ -1,6 +1,5 @@ -use statrs::{distribution::Poisson, statistics::Mean}; - use crate::partial::{approximate_evaluator::ApproximateEvaluator, bounded_double::BoundedDouble}; +use statrs::{distribution::Poisson, statistics::Mean}; /// An ApproximateEvaluator for counts. pub(crate) struct CountEvaluator { diff --git a/src/partial/grouped_count_evaluator.rs b/src/partial/grouped_count_evaluator.rs new file mode 100644 index 00000000..6de3997d --- /dev/null +++ b/src/partial/grouped_count_evaluator.rs @@ -0,0 +1,61 @@ +use std::collections::HashMap; +use std::hash::Hash; + +use crate::partial::{ + approximate_evaluator::ApproximateEvaluator, bounded_double::BoundedDouble, + count_evaluator::bound, +}; +use crate::serializable_traits::Data; + +/// An ApproximateEvaluator for counts by key. Returns a map of key to confidence interval. +pub(crate) struct GroupedCountEvaluator +where + T: Eq + Hash, +{ + total_outputs: usize, + confidence: f64, + outputs_merged: usize, + sums: HashMap, +} + +impl GroupedCountEvaluator { + pub fn new(total_outputs: usize, confidence: f64) -> Self { + GroupedCountEvaluator { + total_outputs, + confidence, + outputs_merged: 0, + sums: HashMap::new(), + } + } +} + +impl ApproximateEvaluator, HashMap> + for GroupedCountEvaluator +{ + fn merge(&mut self, _output_id: usize, task_result: &HashMap) { + self.outputs_merged += 1; + task_result.iter().for_each(|(k, v)| { + *self.sums.entry(k.clone()).or_insert(0) += v; + }); + } + + fn current_result(&self) -> HashMap { + if self.outputs_merged == 0 { + HashMap::new() + } else if self.outputs_merged == self.total_outputs { + self.sums + .iter() + .map(|(k, sum)| { + let sum = *sum as f64; + (k.clone(), BoundedDouble::from((sum, 1.0, sum, sum))) + }) + .collect() + } else { + let p = self.outputs_merged as f64 / self.total_outputs as f64; + self.sums + .iter() + .map(|(k, sum)| (k.clone(), bound(self.confidence, *sum as f64, p))) + .collect() + } + } +} diff --git a/src/partial/mod.rs b/src/partial/mod.rs index 963963bd..4e6edaa1 100644 --- a/src/partial/mod.rs +++ b/src/partial/mod.rs @@ -6,12 +6,14 @@ mod approximate_action_listener; pub(self) mod approximate_evaluator; mod bounded_double; mod count_evaluator; +mod grouped_count_evaluator; mod partial_result; pub(crate) use approximate_action_listener::ApproximateActionListener; pub(crate) use approximate_evaluator::ApproximateEvaluator; pub use bounded_double::BoundedDouble; pub(crate) use count_evaluator::CountEvaluator; +pub(crate) use grouped_count_evaluator::GroupedCountEvaluator; pub use partial_result::PartialResult; #[derive(Debug, Error)] diff --git a/src/rdd/rdd.rs b/src/rdd/rdd.rs index 4035626f..4551248b 100644 --- a/src/rdd/rdd.rs +++ b/src/rdd/rdd.rs @@ -1,5 +1,6 @@ use std::cmp::Ordering; use std::cmp::Reverse; +use std::collections::HashMap; use std::fs; use std::hash::Hash; use std::io::{BufWriter, Write}; @@ -11,7 +12,7 @@ use std::time::Duration; use crate::context::Context; use crate::dependency::Dependency; use crate::error::{Error, Result}; -use crate::partial::{BoundedDouble, CountEvaluator, PartialResult}; +use crate::partial::{BoundedDouble, CountEvaluator, GroupedCountEvaluator, PartialResult}; use crate::partitioner::{HashPartitioner, Partitioner}; use crate::scheduler::TaskContext; use crate::serializable_traits::{AnyData, Data, Func, SerFunc}; @@ -458,6 +459,46 @@ pub trait Rdd: RddBase + 'static { ) } + /// Approximate version of `count_by_value`. + /// + /// # Arguments + /// * `timeout` - maximum time to wait for the job, in milliseconds + /// * `confidence` - the desired statistical confidence in the result + fn count_by_value_aprox( + &self, + timeout: Duration, + confidence: Option, + ) -> Result>> + where + Self: Sized, + Self::Item: Data + Eq + Hash, + { + let confidence = if let Some(confidence) = confidence { + confidence + } else { + 0.95 + }; + assert!(0.0 <= confidence && confidence <= 1.0); + + let count_partition = Fn!(|(_ctx, iter): ( + TaskContext, + Box>, + )| + -> HashMap { + let mut map = HashMap::new(); + iter.for_each(|e| { + *map.entry(e).or_insert(0) += 1; + }); + map + }); + + let evaluator = GroupedCountEvaluator::new(self.number_of_splits(), confidence); + let rdd = self.get_rdd(); + rdd.register_op_name("count_by_value_approx"); + self.get_context() + .run_approximate_job(count_partition, rdd, evaluator, timeout) + } + /// Return a new RDD containing the distinct elements in this RDD. fn distinct_with_num_partitions( &self, @@ -939,6 +980,8 @@ pub trait Rdd: RddBase + 'static { } else { 0.95 }; + assert!(0.0 <= confidence && confidence <= 1.0); + let count_elements = Fn!(|(_ctx, iter): ( TaskContext, Box> @@ -1023,11 +1066,17 @@ pub trait Rdd: RddBase + 'static { Box::new(std::iter::once(queue)) }); - let queue = self.map_partitions(first_k_func).reduce(Fn!( - move |queue1: BoundedPriorityQueue, - queue2: BoundedPriorityQueue| - -> BoundedPriorityQueue { queue1.merge(queue2) } - ))?.ok_or_else(|| Error::Other)? as BoundedPriorityQueue; + let queue = self + .map_partitions(first_k_func) + .reduce(Fn!( + move |queue1: BoundedPriorityQueue, + queue2: BoundedPriorityQueue| + -> BoundedPriorityQueue { + queue1.merge(queue2) + } + ))? + .ok_or_else(|| Error::Other)? + as BoundedPriorityQueue; Ok(queue.into()) } diff --git a/src/shuffle/shuffle_fetcher.rs b/src/shuffle/shuffle_fetcher.rs index 50ef644f..dd32374e 100644 --- a/src/shuffle/shuffle_fetcher.rs +++ b/src/shuffle/shuffle_fetcher.rs @@ -27,7 +27,7 @@ impl ShuffleFetcher { source: Box::new(err), })?; log::debug!( - "server uris for shuffle id {:?} - {:?}", + "server uris for shuffle id #{}: {:?}", shuffle_id, server_uris ); @@ -44,7 +44,7 @@ impl ShuffleFetcher { server_queue.push((key, value)); } log::debug!( - "servers for shuffle id {:?}, reduce id {:?} - {:?}", + "servers for shuffle id #{:?} & reduce id #{}: {:?}", shuffle_id, reduce_id, server_queue @@ -98,7 +98,7 @@ impl ShuffleFetcher { }; tasks.push(tokio::spawn(task)); } - log::debug!("total_results {}", total_results); + log::debug!("total_results fetch results: {}", total_results); let task_results = future::join_all(tasks.into_iter()).await; let results = task_results.into_iter().fold( Ok(Vec::<(K, V)>::with_capacity(total_results)), From 7f06f7286d319a66c9b760f5a72b9cadc38256a0 Mon Sep 17 00:00:00 2001 From: Ignacio Duart Date: Sun, 3 May 2020 11:40:06 +0200 Subject: [PATCH 12/20] Added count_by_value test --- tests/test_rdd.rs | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/tests/test_rdd.rs b/tests/test_rdd.rs index 262c72e5..c3c1a5dd 100644 --- a/tests/test_rdd.rs +++ b/tests/test_rdd.rs @@ -565,6 +565,26 @@ fn count_aprox() -> Result<()> { Ok(()) } +#[test] +fn count_by_value_aprox() -> Result<()> { + let sc = CONTEXT.clone(); + + // this should complete and return the final value, so confidence should be 100% + let time_out = std::time::Duration::from_nanos(100); + let mut res: Vec<_> = sc + .make_rdd(vec![1i32, 2, 2, 3, 3, 3], 6) + .count_by_value_aprox(time_out, Some(0.9))? + .get_final_value()? + .into_iter() + .map(|(k, v)| (k, v.mean)) + .collect(); + res.sort_by(|e1, e2| e1.0.cmp(&e2.0)); + + let expected = vec![(1i32, 1.0f64), (2, 2.0), (3, 3.0)]; + assert_eq!(res, expected); + Ok(()) +} + #[test] fn test_is_empty() { let sc = CONTEXT.clone(); From 7c3228deb4c69c5b18fe3c51be4ddc679666648b Mon Sep 17 00:00:00 2001 From: "Dawei.H" Date: Sun, 3 May 2020 22:41:03 +0800 Subject: [PATCH 13/20] documentation and minor formatting --- examples/parquet_column_read.rs | 2 +- src/rdd/rdd.rs | 10 ++++++++++ src/utils/bounded_priority_queue.rs | 11 ++++++----- tests/test_async.rs | 2 +- tests/test_pair_rdd.rs | 2 +- 5 files changed, 19 insertions(+), 8 deletions(-) diff --git a/examples/parquet_column_read.rs b/examples/parquet_column_read.rs index e1eb667b..b10890e4 100644 --- a/examples/parquet_column_read.rs +++ b/examples/parquet_column_read.rs @@ -1,10 +1,10 @@ #![allow(where_clauses_object_safety, clippy::single_component_path_imports)] use chrono::prelude::*; use itertools::izip; -use vega::*; use parquet::column::reader::get_typed_column_reader; use parquet::data_type::{ByteArrayType, Int32Type, Int64Type}; use parquet::file::reader::{FileReader, SerializedFileReader}; +use vega::*; use std::fs::File; use std::path::PathBuf; diff --git a/src/rdd/rdd.rs b/src/rdd/rdd.rs index 4551248b..5d8fc267 100644 --- a/src/rdd/rdd.rs +++ b/src/rdd/rdd.rs @@ -1038,6 +1038,11 @@ pub trait Rdd: RddBase + 'static { self.reduce(min_fn) } + /// Returns the first k (largest) elements from this RDD as defined by the specified + /// Ord and maintains ordering. This does the opposite of [take_ordered](#take_ordered). + /// # Notes + /// This method should only be used if the resulting array is expected to be small, as + /// all the data is loaded into the driver's memory. fn top(&self, num: usize) -> Result> where Self: Sized, @@ -1051,6 +1056,11 @@ pub trait Rdd: RddBase + 'static { .collect()) } + /// Returns the first k (smallest) elements from this RDD as defined by the specified + /// Ord and maintains ordering. This does the opposite of [top()](#top). + /// # Notes + /// This method should only be used if the resulting array is expected to be small, as + /// all the data is loaded into the driver's memory. fn take_ordered(&self, num: usize) -> Result> where Self: Sized, diff --git a/src/utils/bounded_priority_queue.rs b/src/utils/bounded_priority_queue.rs index 4f4daab9..cebc76f7 100644 --- a/src/utils/bounded_priority_queue.rs +++ b/src/utils/bounded_priority_queue.rs @@ -1,7 +1,9 @@ use crate::serializable_traits::Data; use serde_derive::{Deserialize, Serialize}; use std::collections::BinaryHeap; - +/// Bounded priority queue. This class wraps the original BinaryHeap +/// struct and wraps it such that only the top K elements are retained. +/// The top K elements are defined by `T: Ord` #[derive(Clone, Debug, Serialize, Deserialize)] pub(crate) struct BoundedPriorityQueue { max_size: usize, @@ -10,10 +12,7 @@ pub(crate) struct BoundedPriorityQueue { impl Into> for BoundedPriorityQueue { fn into(self) -> Vec { - let mut col: Vec<_> = self - .underlying - .into_iter_sorted() - .collect(); + let mut col: Vec<_> = self.underlying.into_iter_sorted().collect(); col.reverse(); col } @@ -27,6 +26,7 @@ impl BoundedPriorityQueue { } } + /// The equivalent of `++=` method in Scala Spark. pub fn merge(mut self, other: BoundedPriorityQueue) -> Self { other .underlying @@ -35,6 +35,7 @@ impl BoundedPriorityQueue { self } + /// The equivalent of `+=` method in Scala Spark. pub fn append(&mut self, elem: T) { if self.underlying.len() < self.max_size { self.underlying.push(elem); diff --git a/tests/test_async.rs b/tests/test_async.rs index 835daf6f..f33ceb7c 100644 --- a/tests/test_async.rs +++ b/tests/test_async.rs @@ -1,8 +1,8 @@ //! Test whether the library can be used with different running async executors. use std::sync::Arc; -use vega::*; use once_cell::sync::Lazy; +use vega::*; static CONTEXT: Lazy> = Lazy::new(|| Context::new().unwrap()); diff --git a/tests/test_pair_rdd.rs b/tests/test_pair_rdd.rs index e42715ab..fa518157 100644 --- a/tests/test_pair_rdd.rs +++ b/tests/test_pair_rdd.rs @@ -1,7 +1,7 @@ use std::sync::Arc; -use vega::*; use once_cell::sync::Lazy; +use vega::*; static CONTEXT: Lazy> = Lazy::new(|| Context::new().unwrap()); From b0899bd9c7b1b34ed8144e094a8c98c12941e5ba Mon Sep 17 00:00:00 2001 From: "Dawei.H" Date: Sun, 3 May 2020 22:46:07 +0800 Subject: [PATCH 14/20] minor wording --- src/utils/bounded_priority_queue.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/utils/bounded_priority_queue.rs b/src/utils/bounded_priority_queue.rs index cebc76f7..aa971649 100644 --- a/src/utils/bounded_priority_queue.rs +++ b/src/utils/bounded_priority_queue.rs @@ -1,8 +1,8 @@ use crate::serializable_traits::Data; use serde_derive::{Deserialize, Serialize}; use std::collections::BinaryHeap; -/// Bounded priority queue. This class wraps the original BinaryHeap -/// struct and wraps it such that only the top K elements are retained. +/// Bounded priority queue. This struct wraps the original BinaryHeap +/// struct and modifies it such that only the top K elements are retained. /// The top K elements are defined by `T: Ord` #[derive(Clone, Debug, Serialize, Deserialize)] pub(crate) struct BoundedPriorityQueue { From 0b69aab27c12bbd2bb81cc93a82540e27ca77ca2 Mon Sep 17 00:00:00 2001 From: Ajinkya Prabhu Date: Sat, 4 Jul 2020 17:14:26 +0530 Subject: [PATCH 15/20] add subtract op --- examples/subtract.rs | 21 ++++++++++ src/rdd/rdd.rs | 80 +++++++++++++++++++++++++++++++++++++++ tests/.test_async.rs.swp | Bin 0 -> 12288 bytes tests/test_rdd.rs | 15 ++++++++ 4 files changed, 116 insertions(+) create mode 100644 examples/subtract.rs create mode 100644 tests/.test_async.rs.swp diff --git a/examples/subtract.rs b/examples/subtract.rs new file mode 100644 index 00000000..5a472d9f --- /dev/null +++ b/examples/subtract.rs @@ -0,0 +1,21 @@ +use vega::*; +use std::sync::Arc; + +fn main() -> Result<()> { + let sc = Context::new()?; + let col1 = vec![1, 2, 3, 4, 5, 10, 12, 13, 19, 0]; + + let col2 = vec![3, 4, 5, 6, 7, 8, 11, 13]; + + let first = sc.parallelize(col1, 4); + let second = sc.parallelize(col2, 4); + let ans = first.subtract(Arc::new(second)); + + for elem in ans.collect().iter() { + println!("{:?}",elem); + } + + Ok(()) + + +} diff --git a/src/rdd/rdd.rs b/src/rdd/rdd.rs index 5d8fc267..58267238 100644 --- a/src/rdd/rdd.rs +++ b/src/rdd/rdd.rs @@ -26,28 +26,49 @@ use serde_derive::{Deserialize, Serialize}; use serde_traitobject::{Deserialize, Serialize}; mod parallel_collection_rdd; + pub use parallel_collection_rdd::*; + mod cartesian_rdd; + pub use cartesian_rdd::*; + mod co_grouped_rdd; + pub use co_grouped_rdd::*; + mod coalesced_rdd; + pub use coalesced_rdd::*; + mod flatmapper_rdd; mod mapper_rdd; + pub use flatmapper_rdd::*; pub use mapper_rdd::*; + mod pair_rdd; + pub use pair_rdd::*; + mod partitionwise_sampled_rdd; + pub use partitionwise_sampled_rdd::*; + mod shuffled_rdd; + pub use shuffled_rdd::*; + mod map_partitions_rdd; + pub use map_partitions_rdd::*; + mod zip_rdd; + pub use zip_rdd::*; + mod union_rdd; + pub use union_rdd::*; // Values which are needed for all RDDs @@ -838,6 +859,65 @@ pub trait Rdd: RddBase + 'static { self.intersection_with_num_partitions(other, self.number_of_splits()) } + fn subtract(&self, other: Arc) -> SerArc> + where + Self: Clone, + Self::Item: Data + Eq + Hash, + T: Rdd + Sized, + { + self.subtract_with_num_partition(other, self.number_of_splits()) + } + + fn subtract_with_num_partition( + &self, + other: Arc, + num_splits: usize, + ) -> SerArc> + where + Self: Clone, + Self::Item: Data + Eq + Hash, + T: Rdd + Sized, + { + let other = other + .map(Box::new(Fn!( + |x: Self::Item| -> (Self::Item, Option) { (x, None) } + ))) + .clone(); + let rdd = self + .map(Box::new(Fn!(|x| -> (Self::Item, Option) { + (x, None) + }))) + .cogroup( + other, + Box::new(HashPartitioner::::new(num_splits)) + as Box, + ) + .map(Box::new(Fn!(|(x, (v1, v2)): ( + Self::Item, + (Vec::>, Vec::>) + )| + -> Option { + if (v1.len() >= 1) ^ (v2.len() >= 1) { + Some(x) + } else { + None + } + }))) + .map_partitions(Box::new(Fn!(|iter: Box< + dyn Iterator>, + >| + -> Box< + dyn Iterator, + > { + Box::new(iter.filter(|x| x.is_some()).map(|x| x.unwrap())) + as Box> + }))); + + let subtraction = self.intersection(Arc::new(rdd)); + (&*subtraction).register_op_name("subtraction"); + subtraction + } + fn intersection_with_num_partitions( &self, other: Arc, diff --git a/tests/.test_async.rs.swp b/tests/.test_async.rs.swp new file mode 100644 index 0000000000000000000000000000000000000000..502efc791f03343b0a1782e1482793663d09015a GIT binary patch literal 12288 zcmeI2O=}ZD7{{l2(%LF|@FYxAgy-BCqL$9hWpv03C7&TKrYqXq@?TRi{(gmjc67=OdHHzj=ajUkUGxZQqiLDkeCtk z?j2nzm$Pd~WwnTrnfy9#T$7%q;=kyCLdXCa=whIcjg5}vl7PX?06%g3ZkJdpA_HWA z43GgbKnBPF86X2>;9oPKS9;hMUg|*lV#DdZXU{!-q#GF^17v^G4|{*29N*$Ti^e0jxx3lo`Wr*z$!QcPJ@HsV?SfBz$U1JIgkN6 zeKfDDjbNhGCkGq#MUVX-BLM80A2^VOSgDnKYF# z^NpBLXr5;A3%qf1ly9`7_@?Dn-9YEr-WJ9B3XY))e~2p)r;VX|Zh27{qQtffGr}u4 z#iG$!y6mnP^`Op;oMi>(uY#LnPsY{i0s~9JM{?~V^oiUx;W>d8s@05PUTnWrs}OM* zl9hCPRSx{A^A0Vjg}Skas!PIAzHg+XXm!{5ibH&Cw4-k9q3Yz+&FL$%)0P)TQY04k zIt*l>gLGbY)15Q3n~BzT=VmuE)1IAXVOhdi5bHqBHxB_jvGyIM4U-QS`E3zbLR~P- zBHv(x_Zn4_0NYMZ4d2ICLt}ijfUt^_<@cX2ARR3Q(eD;dqp-Pf3@=i`(F;nrezGag znOI7(c6H#jc7V;VyANuGOUf%uMpBD)T`VFp$+T@Lsz$xe(}L!+X;oxZVaJ-uuBpTV zlQHkw_St;nA(EbOJQ0Srog8(y?WCnNu(6< = rdd.take_ordered(3).unwrap(); assert_eq!(res, vec![3, 4, 12]); } + +#[test] +fn test_subtract(){ + let sc = CONTEXT.clone(); + let col1 = vec![1, 2, 3, 4, 5, 10, 12, 13, 19, 0]; + + let col2 = vec![3, 4, 5, 6, 7, 8, 11, 13]; + + let first = sc.parallelize(col1, 4); + let second = sc.parallelize(col2, 4); + let ans = first.subtract(Arc::new(second)); + assert_eq!(ans.collect().unwrap(),vec![19, 12, 10, 1, 0, 2]) + + +} \ No newline at end of file From ee4393998ab7cd0172ebdbdd7d8b030cfc62fdb9 Mon Sep 17 00:00:00 2001 From: Ajinkya Prabhu Date: Sun, 5 Jul 2020 12:23:39 +0530 Subject: [PATCH 16/20] formatted code --- examples/subtract.rs | 6 ++---- src/rdd/rdd.rs | 25 +------------------------ tests/.test_async.rs.swp | Bin 12288 -> 0 bytes tests/test_rdd.rs | 8 +++----- 4 files changed, 6 insertions(+), 33 deletions(-) delete mode 100644 tests/.test_async.rs.swp diff --git a/examples/subtract.rs b/examples/subtract.rs index 5a472d9f..bd509b50 100644 --- a/examples/subtract.rs +++ b/examples/subtract.rs @@ -1,5 +1,5 @@ -use vega::*; use std::sync::Arc; +use vega::*; fn main() -> Result<()> { let sc = Context::new()?; @@ -12,10 +12,8 @@ fn main() -> Result<()> { let ans = first.subtract(Arc::new(second)); for elem in ans.collect().iter() { - println!("{:?}",elem); + println!("{:?}", elem); } Ok(()) - - } diff --git a/src/rdd/rdd.rs b/src/rdd/rdd.rs index 58267238..1ff4fd13 100644 --- a/src/rdd/rdd.rs +++ b/src/rdd/rdd.rs @@ -26,51 +26,29 @@ use serde_derive::{Deserialize, Serialize}; use serde_traitobject::{Deserialize, Serialize}; mod parallel_collection_rdd; - pub use parallel_collection_rdd::*; - mod cartesian_rdd; - pub use cartesian_rdd::*; - mod co_grouped_rdd; - pub use co_grouped_rdd::*; - mod coalesced_rdd; - pub use coalesced_rdd::*; - mod flatmapper_rdd; mod mapper_rdd; - pub use flatmapper_rdd::*; pub use mapper_rdd::*; - mod pair_rdd; - pub use pair_rdd::*; - mod partitionwise_sampled_rdd; - pub use partitionwise_sampled_rdd::*; - mod shuffled_rdd; - pub use shuffled_rdd::*; - mod map_partitions_rdd; - pub use map_partitions_rdd::*; - mod zip_rdd; - pub use zip_rdd::*; - mod union_rdd; - pub use union_rdd::*; - // Values which are needed for all RDDs #[derive(Serialize, Deserialize)] pub(crate) struct RddVals { @@ -889,8 +867,7 @@ pub trait Rdd: RddBase + 'static { }))) .cogroup( other, - Box::new(HashPartitioner::::new(num_splits)) - as Box, + Box::new(HashPartitioner::::new(num_splits)) as Box, ) .map(Box::new(Fn!(|(x, (v1, v2)): ( Self::Item, diff --git a/tests/.test_async.rs.swp b/tests/.test_async.rs.swp deleted file mode 100644 index 502efc791f03343b0a1782e1482793663d09015a..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 12288 zcmeI2O=}ZD7{{l2(%LF|@FYxAgy-BCqL$9hWpv03C7&TKrYqXq@?TRi{(gmjc67=OdHHzj=ajUkUGxZQqiLDkeCtk z?j2nzm$Pd~WwnTrnfy9#T$7%q;=kyCLdXCa=whIcjg5}vl7PX?06%g3ZkJdpA_HWA z43GgbKnBPF86X2>;9oPKS9;hMUg|*lV#DdZXU{!-q#GF^17v^G4|{*29N*$Ti^e0jxx3lo`Wr*z$!QcPJ@HsV?SfBz$U1JIgkN6 zeKfDDjbNhGCkGq#MUVX-BLM80A2^VOSgDnKYF# z^NpBLXr5;A3%qf1ly9`7_@?Dn-9YEr-WJ9B3XY))e~2p)r;VX|Zh27{qQtffGr}u4 z#iG$!y6mnP^`Op;oMi>(uY#LnPsY{i0s~9JM{?~V^oiUx;W>d8s@05PUTnWrs}OM* zl9hCPRSx{A^A0Vjg}Skas!PIAzHg+XXm!{5ibH&Cw4-k9q3Yz+&FL$%)0P)TQY04k zIt*l>gLGbY)15Q3n~BzT=VmuE)1IAXVOhdi5bHqBHxB_jvGyIM4U-QS`E3zbLR~P- zBHv(x_Zn4_0NYMZ4d2ICLt}ijfUt^_<@cX2ARR3Q(eD;dqp-Pf3@=i`(F;nrezGag znOI7(c6H#jc7V;VyANuGOUf%uMpBD)T`VFp$+T@Lsz$xe(}L!+X;oxZVaJ-uuBpTV zlQHkw_St;nA(EbOJQ0Srog8(y?WCnNu(6< Date: Sun, 5 Jul 2020 12:30:54 +0530 Subject: [PATCH 17/20] added docs for subtract --- src/rdd/rdd.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/rdd/rdd.rs b/src/rdd/rdd.rs index 1ff4fd13..c7fbee92 100644 --- a/src/rdd/rdd.rs +++ b/src/rdd/rdd.rs @@ -837,6 +837,9 @@ pub trait Rdd: RddBase + 'static { self.intersection_with_num_partitions(other, self.number_of_splits()) } + /// subtract function, same as the one found in apache spark + /// example of subtract can be found in subtract.rs + /// performs a full outer join followed by and intersection with self to get subtraction. fn subtract(&self, other: Arc) -> SerArc> where Self: Clone, From 408520c0976af4bc90fb3ab72c40b082e59a12eb Mon Sep 17 00:00:00 2001 From: Ajinkya Prabhu Date: Mon, 6 Jul 2020 19:03:36 +0530 Subject: [PATCH 18/20] fixed test case for subtract --- src/rdd/rdd.rs | 2 +- tests/test_rdd.rs | 16 +++++++++++++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/src/rdd/rdd.rs b/src/rdd/rdd.rs index c7fbee92..73761179 100644 --- a/src/rdd/rdd.rs +++ b/src/rdd/rdd.rs @@ -848,7 +848,7 @@ pub trait Rdd: RddBase + 'static { { self.subtract_with_num_partition(other, self.number_of_splits()) } - + fn subtract_with_num_partition( &self, other: Arc, diff --git a/tests/test_rdd.rs b/tests/test_rdd.rs index c5f9e510..023415d1 100644 --- a/tests/test_rdd.rs +++ b/tests/test_rdd.rs @@ -1,3 +1,4 @@ +use std::collections::HashSet; use std::fs::{create_dir_all, File}; use std::io::prelude::*; use std::sync::Arc; @@ -681,5 +682,18 @@ fn test_subtract() { let first = sc.parallelize(col1, 4); let second = sc.parallelize(col2, 4); let ans = first.subtract(Arc::new(second)); - assert_eq!(ans.collect().unwrap(), vec![19, 12, 10, 1, 0, 2]) + // assert_eq!(HashSet::from_iter(ans.collect().unwrap().iter().cloned()), HashSet::from_iter(vec![19, 12, 10, 1, 0, 2].iter().cloned())); + + let mut expected_vec = vec![19, 12, 10, 1, 0, 2]; + expected_vec.sort(); + let mut actual = ans.collect().unwrap(); + actual.sort(); + + + + println!("{:?}",expected_vec); + println!("{:?}",actual); + + assert_eq!(actual,expected_vec) + } From d3f5f78ff13beb0361501cb92e97b14c52b86048 Mon Sep 17 00:00:00 2001 From: Colin Dean Date: Thu, 13 Aug 2020 12:01:01 -0400 Subject: [PATCH 19/20] Mentions need for Rust Nightly Direct dependency `serde_traitobject` 0.2.4 depends on `metatype` 0.2.0, which requires nightly. --- user_guide/src/chapter_1.md | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/user_guide/src/chapter_1.md b/user_guide/src/chapter_1.md index 0192c9c0..58751eff 100644 --- a/user_guide/src/chapter_1.md +++ b/user_guide/src/chapter_1.md @@ -4,11 +4,23 @@ ## Getting started -### Installation +### Setting up Rust -Right now the framework lacks any sort of cluster manager of submit program/script. +Vega requires Rust Nightly channel because it depends on libraries that require Nightly (`serde_traitobject` -> `metatype`). +Ensure that you have and are using a Nightly toolchain when +building examples. -In order to use the framework you have to clone the repository and add the local dependency or add the upstream GitHub repository to your Rust project (the crate is not yet published on [crates.io](https://crates.io/)). E.g. add to your application Cargo.toml or: + rustup toolchain install nightly + +Then set the default, or pass the toolchain in when invoking Cargo: + + rustup default nightly + +### Installing Vega + +Right now, the framework lacks any sort of cluster manager of submit program/script. + +In order to use the framework, you have to clone the repository and add the local dependency or add the upstream GitHub repository to your Rust project (the crate is not yet published on [crates.io](https://crates.io/)). E.g. add to your application Cargo.toml or: ```doc [dependencies] @@ -17,7 +29,7 @@ vega = { path = "/path/to/local/git/repo" } vega = { git = "https://github.com/rajasekarv/vega", branch = "master } ``` -Is not recommended to use the application for any sort of production code yet as it's under heavy development. +It is _not recommended_ to use the application for any sort of production code yet as it's under heavy development. Check [examples](https://github.com/rajasekarv/vega/tree/master/examples) and [tests](https://github.com/rajasekarv/vega/tree/master/tests) in the source code to get a basic idea of how the framework works. From 2b2ae610fb1cac2adfb40f0978e8bba938e98cc2 Mon Sep 17 00:00:00 2001 From: Colin Dean Date: Thu, 13 Aug 2020 12:31:37 -0400 Subject: [PATCH 20/20] Fixes code block style so it's not executed --- user_guide/src/chapter_1.md | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/user_guide/src/chapter_1.md b/user_guide/src/chapter_1.md index 58751eff..f673d82d 100644 --- a/user_guide/src/chapter_1.md +++ b/user_guide/src/chapter_1.md @@ -10,11 +10,15 @@ Vega requires Rust Nightly channel because it depends on libraries that require Ensure that you have and are using a Nightly toolchain when building examples. - rustup toolchain install nightly +```doc +$ rustup toolchain install nightly +``` Then set the default, or pass the toolchain in when invoking Cargo: - rustup default nightly +```doc +$ rustup default nightly +``` ### Installing Vega