diff --git a/examples/subtract.rs b/examples/subtract.rs new file mode 100644 index 00000000..5a472d9f --- /dev/null +++ b/examples/subtract.rs @@ -0,0 +1,21 @@ +use vega::*; +use std::sync::Arc; + +fn main() -> Result<()> { + let sc = Context::new()?; + let col1 = vec![1, 2, 3, 4, 5, 10, 12, 13, 19, 0]; + + let col2 = vec![3, 4, 5, 6, 7, 8, 11, 13]; + + let first = sc.parallelize(col1, 4); + let second = sc.parallelize(col2, 4); + let ans = first.subtract(Arc::new(second)); + + for elem in ans.collect().iter() { + println!("{:?}",elem); + } + + Ok(()) + + +} diff --git a/src/rdd/rdd.rs b/src/rdd/rdd.rs index 5d8fc267..58267238 100644 --- a/src/rdd/rdd.rs +++ b/src/rdd/rdd.rs @@ -26,28 +26,49 @@ use serde_derive::{Deserialize, Serialize}; use serde_traitobject::{Deserialize, Serialize}; mod parallel_collection_rdd; + pub use parallel_collection_rdd::*; + mod cartesian_rdd; + pub use cartesian_rdd::*; + mod co_grouped_rdd; + pub use co_grouped_rdd::*; + mod coalesced_rdd; + pub use coalesced_rdd::*; + mod flatmapper_rdd; mod mapper_rdd; + pub use flatmapper_rdd::*; pub use mapper_rdd::*; + mod pair_rdd; + pub use pair_rdd::*; + mod partitionwise_sampled_rdd; + pub use partitionwise_sampled_rdd::*; + mod shuffled_rdd; + pub use shuffled_rdd::*; + mod map_partitions_rdd; + pub use map_partitions_rdd::*; + mod zip_rdd; + pub use zip_rdd::*; + mod union_rdd; + pub use union_rdd::*; // Values which are needed for all RDDs @@ -838,6 +859,65 @@ pub trait Rdd: RddBase + 'static { self.intersection_with_num_partitions(other, self.number_of_splits()) } + fn subtract(&self, other: Arc) -> SerArc> + where + Self: Clone, + Self::Item: Data + Eq + Hash, + T: Rdd + Sized, + { + self.subtract_with_num_partition(other, self.number_of_splits()) + } + + fn subtract_with_num_partition( + &self, + other: Arc, + num_splits: usize, + ) -> SerArc> + where + Self: Clone, + Self::Item: Data + Eq + Hash, + T: Rdd + Sized, + { + let other = other + .map(Box::new(Fn!( + |x: Self::Item| -> (Self::Item, Option) { (x, None) } + ))) + .clone(); + let rdd = self + .map(Box::new(Fn!(|x| -> (Self::Item, Option) { + (x, None) + }))) + .cogroup( + other, + Box::new(HashPartitioner::::new(num_splits)) + as Box, + ) + .map(Box::new(Fn!(|(x, (v1, v2)): ( + Self::Item, + (Vec::>, Vec::>) + )| + -> Option { + if (v1.len() >= 1) ^ (v2.len() >= 1) { + Some(x) + } else { + None + } + }))) + .map_partitions(Box::new(Fn!(|iter: Box< + dyn Iterator>, + >| + -> Box< + dyn Iterator, + > { + Box::new(iter.filter(|x| x.is_some()).map(|x| x.unwrap())) + as Box> + }))); + + let subtraction = self.intersection(Arc::new(rdd)); + (&*subtraction).register_op_name("subtraction"); + subtraction + } + fn intersection_with_num_partitions( &self, other: Arc, diff --git a/tests/.test_async.rs.swp b/tests/.test_async.rs.swp new file mode 100644 index 00000000..502efc79 Binary files /dev/null and b/tests/.test_async.rs.swp differ diff --git a/tests/test_rdd.rs b/tests/test_rdd.rs index c3c1a5dd..909883f4 100644 --- a/tests/test_rdd.rs +++ b/tests/test_rdd.rs @@ -670,3 +670,18 @@ fn test_take_ordered() { let res: Vec = rdd.take_ordered(3).unwrap(); assert_eq!(res, vec![3, 4, 12]); } + +#[test] +fn test_subtract(){ + let sc = CONTEXT.clone(); + let col1 = vec![1, 2, 3, 4, 5, 10, 12, 13, 19, 0]; + + let col2 = vec![3, 4, 5, 6, 7, 8, 11, 13]; + + let first = sc.parallelize(col1, 4); + let second = sc.parallelize(col2, 4); + let ans = first.subtract(Arc::new(second)); + assert_eq!(ans.collect().unwrap(),vec![19, 12, 10, 1, 0, 2]) + + +} \ No newline at end of file