Skip to content

Commit

Permalink
add subtract op
Browse files Browse the repository at this point in the history
  • Loading branch information
ajprabhu09 committed Jul 4, 2020
1 parent 8bf245e commit 0b69aab
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 0 deletions.
21 changes: 21 additions & 0 deletions examples/subtract.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use vega::*;
use std::sync::Arc;

fn main() -> Result<()> {
let sc = Context::new()?;
let col1 = vec![1, 2, 3, 4, 5, 10, 12, 13, 19, 0];

let col2 = vec![3, 4, 5, 6, 7, 8, 11, 13];

let first = sc.parallelize(col1, 4);
let second = sc.parallelize(col2, 4);
let ans = first.subtract(Arc::new(second));

for elem in ans.collect().iter() {
println!("{:?}",elem);
}

Ok(())


}
80 changes: 80 additions & 0 deletions src/rdd/rdd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,49 @@ use serde_derive::{Deserialize, Serialize};
use serde_traitobject::{Deserialize, Serialize};

mod parallel_collection_rdd;

pub use parallel_collection_rdd::*;

mod cartesian_rdd;

pub use cartesian_rdd::*;

mod co_grouped_rdd;

pub use co_grouped_rdd::*;

mod coalesced_rdd;

pub use coalesced_rdd::*;

mod flatmapper_rdd;
mod mapper_rdd;

pub use flatmapper_rdd::*;
pub use mapper_rdd::*;

mod pair_rdd;

pub use pair_rdd::*;

mod partitionwise_sampled_rdd;

pub use partitionwise_sampled_rdd::*;

mod shuffled_rdd;

pub use shuffled_rdd::*;

mod map_partitions_rdd;

pub use map_partitions_rdd::*;

mod zip_rdd;

pub use zip_rdd::*;

mod union_rdd;

pub use union_rdd::*;

// Values which are needed for all RDDs
Expand Down Expand Up @@ -838,6 +859,65 @@ pub trait Rdd: RddBase + 'static {
self.intersection_with_num_partitions(other, self.number_of_splits())
}

fn subtract<T>(&self, other: Arc<T>) -> SerArc<dyn Rdd<Item = Self::Item>>
where
Self: Clone,
Self::Item: Data + Eq + Hash,
T: Rdd<Item = Self::Item> + Sized,
{
self.subtract_with_num_partition(other, self.number_of_splits())
}

fn subtract_with_num_partition<T>(
&self,
other: Arc<T>,
num_splits: usize,
) -> SerArc<dyn Rdd<Item = Self::Item>>
where
Self: Clone,
Self::Item: Data + Eq + Hash,
T: Rdd<Item = Self::Item> + Sized,
{
let other = other
.map(Box::new(Fn!(
|x: Self::Item| -> (Self::Item, Option<Self::Item>) { (x, None) }
)))
.clone();
let rdd = self
.map(Box::new(Fn!(|x| -> (Self::Item, Option<Self::Item>) {
(x, None)
})))
.cogroup(
other,
Box::new(HashPartitioner::<Self::Item>::new(num_splits))
as Box<dyn Partitioner>,
)
.map(Box::new(Fn!(|(x, (v1, v2)): (
Self::Item,
(Vec::<Option<Self::Item>>, Vec::<Option<Self::Item>>)
)|
-> Option<Self::Item> {
if (v1.len() >= 1) ^ (v2.len() >= 1) {
Some(x)
} else {
None
}
})))
.map_partitions(Box::new(Fn!(|iter: Box<
dyn Iterator<Item = Option<Self::Item>>,
>|
-> Box<
dyn Iterator<Item = Self::Item>,
> {
Box::new(iter.filter(|x| x.is_some()).map(|x| x.unwrap()))
as Box<dyn Iterator<Item = Self::Item>>
})));

let subtraction = self.intersection(Arc::new(rdd));
(&*subtraction).register_op_name("subtraction");
subtraction
}

fn intersection_with_num_partitions<T>(
&self,
other: Arc<T>,
Expand Down
Binary file added tests/.test_async.rs.swp
Binary file not shown.
15 changes: 15 additions & 0 deletions tests/test_rdd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -670,3 +670,18 @@ fn test_take_ordered() {
let res: Vec<usize> = rdd.take_ordered(3).unwrap();
assert_eq!(res, vec![3, 4, 12]);
}

#[test]
fn test_subtract(){
let sc = CONTEXT.clone();
let col1 = vec![1, 2, 3, 4, 5, 10, 12, 13, 19, 0];

let col2 = vec![3, 4, 5, 6, 7, 8, 11, 13];

let first = sc.parallelize(col1, 4);
let second = sc.parallelize(col2, 4);
let ans = first.subtract(Arc::new(second));
assert_eq!(ans.collect().unwrap(),vec![19, 12, 10, 1, 0, 2])


}

0 comments on commit 0b69aab

Please sign in to comment.