From 97e49be402e851331349bda60aaeed7015d5f162 Mon Sep 17 00:00:00 2001 From: ritchie46 Date: Wed, 17 Jun 2020 11:16:59 +0200 Subject: [PATCH] left joins working --- src/fmt.rs | 2 +- src/frame/hash_join.rs | 162 ++++++++++++++++++++++++++++++++++------- 2 files changed, 136 insertions(+), 28 deletions(-) diff --git a/src/fmt.rs b/src/fmt.rs index 888aebfe5e16..dbcc03286a46 100644 --- a/src/fmt.rs +++ b/src/fmt.rs @@ -92,7 +92,7 @@ impl Display for AnyType<'_> { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { let width = 15; match self { - AnyType::Null => write!(f, "{:width$}", "null", width = width), + AnyType::Null => write!(f, "{:>width$}", "null", width = width), AnyType::U32(v) => write!(f, "{:width$}", v, width = width), AnyType::I32(v) => write!(f, "{:width$}", v, width = width), AnyType::I64(v) => write!(f, "{:width$}", v, width = width), diff --git a/src/frame/hash_join.rs b/src/frame/hash_join.rs index 18fcfdfb9bd4..cbfa07569666 100644 --- a/src/frame/hash_join.rs +++ b/src/frame/hash_join.rs @@ -5,15 +5,12 @@ use crate::{datatypes::UInt32Chunked, prelude::*, series::chunked_array::Chunked use arrow::compute::TakeOptions; use arrow::datatypes::{ArrowPrimitiveType, Field, Schema}; use fnv::{FnvBuildHasher, FnvHashMap}; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::hash::Hash; -/// Hash join a and b. -/// b should be the shorter relation. -fn hash_join( - a: impl Iterator>, +fn prepare_hashed_relation( b: impl Iterator>, -) -> Vec<(usize, usize)> +) -> HashMap, FnvBuildHasher> where T: Hash + Eq + Copy, { @@ -24,6 +21,19 @@ where hashmap.entry(key).or_insert_with(Vec::new).push(idx) } }); + hashmap +} + +/// Hash join a and b. +/// b should be the shorter relation. +fn hash_join( + a: impl Iterator>, + b: impl Iterator>, +) -> Vec<(usize, usize)> +where + T: Hash + Eq + Copy, +{ + let hashmap = prepare_hashed_relation(b); let mut results = Vec::new(); a.enumerate().for_each(|(idx_a, o)| { @@ -37,8 +47,38 @@ where results } +fn hash_join_left( + a: impl Iterator>, + b: impl Iterator>, +) -> Vec<(usize, Option)> +where + T: Hash + Eq + Copy, +{ + let hashmap = prepare_hashed_relation(b); + let mut results = Vec::new(); + + a.enumerate().for_each(|(idx_a, o)| { + match o { + // left value is null, so right is automatically null + None => results.push((idx_a, None)), + Some(key) => { + match hashmap.get(&key) { + // left and right matches + Some(indexes_b) => { + results.extend(indexes_b.iter().map(|&idx_b| (idx_a, Some(idx_b)))) + } + // only left values, right = null + None => results.push((idx_a, None)), + } + } + } + }); + results +} + pub trait HashJoin { fn hash_join(&self, other: &ChunkedArray) -> (UInt32Chunked, UInt32Chunked); + fn hash_join_left(&self, other: &ChunkedArray) -> (UInt32Chunked, UInt32Chunked); } impl HashJoin for ChunkedArray @@ -82,28 +122,36 @@ where }); (left.finish(), right.finish()) } -} -impl DataFrame { - pub fn join(&self, other: &DataFrame, left_on: &str, right_on: &str) -> Result { - let s_left = self.select(left_on).ok_or(PolarsError::NotFound)?; - let s_right = other.select(right_on).ok_or(PolarsError::NotFound)?; - - macro_rules! hash_join { - ($s_right:ident, $ca_left:ident, $type_:ident) => {{ - let ca_right = $s_right.$type_()?; - $ca_left.hash_join(ca_right) - }}; - } + fn hash_join_left(&self, other: &ChunkedArray) -> (UInt32Chunked, UInt32Chunked) { + let join_tuples = hash_join_left(self.iter(), other.iter()); + // Create the UInt32Chunked arrays. These can be used to take values from both the dataframes. + let mut left = + PrimitiveChunkedBuilder::::new("left_take_idx", join_tuples.len()); + let mut right = + PrimitiveChunkedBuilder::::new("right_take_idx", join_tuples.len()); + join_tuples + .into_iter() + .for_each(|(idx_left, opt_idx_right)| { + left.append_value(idx_left as u32); - let (take_left, take_right) = match s_left { - Series::UInt32(ca_left) => hash_join!(s_right, ca_left, u32), - Series::Int32(ca_left) => hash_join!(s_right, ca_left, i32), - Series::Int64(ca_left) => hash_join!(s_right, ca_left, i64), - Series::Bool(ca_left) => hash_join!(s_right, ca_left, bool), - _ => unimplemented!(), - }; + match opt_idx_right { + Some(idx) => right.append_value(idx as u32), + None => right.append_null(), + } + }); + (left.finish(), right.finish()) + } +} +impl DataFrame { + fn finish_join( + &self, + other: &DataFrame, + take_left: &UInt32Chunked, + take_right: &UInt32Chunked, + right_on: &str, + ) -> Result { let mut df_left = self.take(&take_left, Some(TakeOptions::default()))?; let mut df_right = other.take(&take_right, Some(TakeOptions::default()))?; df_right.drop(right_on); @@ -129,6 +177,53 @@ impl DataFrame { df_left.hstack(&df_right.columns); Ok(df_left) } + + pub fn inner_join( + &self, + other: &DataFrame, + left_on: &str, + right_on: &str, + ) -> Result { + let s_left = self.select(left_on).ok_or(PolarsError::NotFound)?; + let s_right = other.select(right_on).ok_or(PolarsError::NotFound)?; + + macro_rules! hash_join { + ($s_right:ident, $ca_left:ident, $type_:ident) => {{ + let ca_right = $s_right.$type_()?; + $ca_left.hash_join(ca_right) + }}; + } + + let (take_left, take_right) = match s_left { + Series::UInt32(ca_left) => hash_join!(s_right, ca_left, u32), + Series::Int32(ca_left) => hash_join!(s_right, ca_left, i32), + Series::Int64(ca_left) => hash_join!(s_right, ca_left, i64), + Series::Bool(ca_left) => hash_join!(s_right, ca_left, bool), + _ => unimplemented!(), + }; + self.finish_join(other, &take_left, &take_right, right_on) + } + + pub fn left_join(&self, other: &DataFrame, left_on: &str, right_on: &str) -> Result { + let s_left = self.select(left_on).ok_or(PolarsError::NotFound)?; + let s_right = other.select(right_on).ok_or(PolarsError::NotFound)?; + + macro_rules! hash_join { + ($s_right:ident, $ca_left:ident, $type_:ident) => {{ + let ca_right = $s_right.$type_()?; + $ca_left.hash_join_left(ca_right) + }}; + } + + let (take_left, take_right) = match s_left { + Series::UInt32(ca_left) => hash_join!(s_right, ca_left, u32), + Series::Int32(ca_left) => hash_join!(s_right, ca_left, i32), + Series::Int64(ca_left) => hash_join!(s_right, ca_left, i64), + Series::Bool(ca_left) => hash_join!(s_right, ca_left, bool), + _ => unimplemented!(), + }; + self.finish_join(other, &take_left, &take_right, right_on) + } } #[cfg(test)] @@ -136,7 +231,7 @@ mod test { use super::*; #[test] - fn test_hash_join() { + fn test_inner_join() { let s0 = Series::init("days", [0, 1, 2].as_ref()); let s1 = Series::init("temp", [22.1, 19.9, 7.].as_ref()); let s2 = Series::init("rain", [0.2, 0.1, 0.3].as_ref()); @@ -146,7 +241,20 @@ mod test { let s1 = Series::init("rain", [0.1, 0.2, 0.3, 0.4].as_ref()); let rain = DataFrame::new_from_columns(vec![s0, s1]).unwrap(); - let joined = temp.join(&rain, "days", "days"); + let joined = temp.inner_join(&rain, "days", "days"); + println!("{}", joined.unwrap()) + } + + #[test] + fn test_left_join() { + let s0 = Series::init("days", [0, 1, 2, 3, 4].as_ref()); + let s1 = Series::init("temp", [22.1, 19.9, 7., 2., 3.].as_ref()); + let temp = DataFrame::new_from_columns(vec![s0, s1]).unwrap(); + + let s0 = Series::init("days", [1, 2].as_ref()); + let s1 = Series::init("rain", [0.1, 0.2].as_ref()); + let rain = DataFrame::new_from_columns(vec![s0, s1]).unwrap(); + let joined = temp.left_join(&rain, "days", "days"); println!("{}", joined.unwrap()) } }