diff --git a/arrow-select/src/interleave.rs b/arrow-select/src/interleave.rs index 4a47017b79ab..3557bda8f4c9 100644 --- a/arrow-select/src/interleave.rs +++ b/arrow-select/src/interleave.rs @@ -24,7 +24,9 @@ use arrow_array::types::*; use arrow_array::*; use arrow_buffer::{ArrowNativeType, MutableBuffer, NullBuffer, NullBufferBuilder, OffsetBuffer}; use arrow_data::transform::MutableArrayData; +use arrow_data::ByteView; use arrow_schema::{ArrowError, DataType}; +use std::collections::HashMap; use std::sync::Arc; macro_rules! primitive_helper { @@ -97,6 +99,8 @@ pub fn interleave( DataType::LargeUtf8 => interleave_bytes::(values, indices), DataType::Binary => interleave_bytes::(values, indices), DataType::LargeBinary => interleave_bytes::(values, indices), + DataType::BinaryView => interleave_views::(values, indices), + DataType::Utf8View => interleave_views::(values, indices), DataType::Dictionary(k, _) => downcast_integer! { k.as_ref() => (dict_helper, values, indices), _ => unreachable!("illegal dictionary key type {k}") @@ -231,6 +235,41 @@ fn interleave_dictionaries( Ok(Arc::new(array)) } +fn interleave_views( + values: &[&dyn Array], + indices: &[(usize, usize)], +) -> Result { + let interleaved = Interleave::<'_, GenericByteViewArray>::new(values, indices); + let mut views_builder = BufferBuilder::new(indices.len()); + let mut buffers = Vec::new(); + + // (input array_index, input buffer_index) -> output buffer_index + let mut buffer_lookup: HashMap<(usize, u32), u32> = HashMap::new(); + for (array_idx, value_idx) in indices { + let array = interleaved.arrays[*array_idx]; + let raw_view = array.views().get(*value_idx).unwrap(); + let view_len = *raw_view as u32; + if view_len <= 12 { + views_builder.append(*raw_view); + continue; + } + // value is big enough to be in a variadic buffer + let view = ByteView::from(*raw_view); + let new_buffer_idx: &mut u32 = buffer_lookup + .entry((*array_idx, view.buffer_index)) + .or_insert_with(|| { + buffers.push(array.data_buffers()[view.buffer_index as usize].clone()); + (buffers.len() - 1) as u32 + }); + views_builder.append(view.with_buffer_index(*new_buffer_idx).into()); + } + + let array = unsafe { + GenericByteViewArray::::new_unchecked(views_builder.into(), buffers, interleaved.nulls) + }; + Ok(Arc::new(array)) +} + /// Fallback implementation of interleave using [`MutableArrayData`] fn interleave_fallback( values: &[&dyn Array], @@ -461,4 +500,209 @@ mod tests { DictionaryArray::::from_iter(vec![Some("0"), Some("1"), Some("2"), None]); assert_eq!(array.as_ref(), &expected) } + + #[test] + fn test_interleave_views() { + let values = StringArray::from_iter_values([ + "hello", + "world_long_string_not_inlined", + "foo", + "bar", + "baz", + ]); + let view_a = StringViewArray::from(&values); + + let values = StringArray::from_iter_values([ + "test", + "data", + "more_long_string_not_inlined", + "views", + "here", + ]); + let view_b = StringViewArray::from(&values); + + let indices = &[ + (0, 2), // "foo" + (1, 0), // "test" + (0, 4), // "baz" + (1, 3), // "views" + (0, 1), // "world_long_string_not_inlined" + ]; + + // Test specialized implementation + let values = interleave(&[&view_a, &view_b], indices).unwrap(); + let result = values.as_string_view(); + assert_eq!(result.data_buffers().len(), 1); + + let fallback = interleave_fallback(&[&view_a, &view_b], indices).unwrap(); + let fallback_result = fallback.as_string_view(); + // note that fallback_result has 2 buffers, but only one long enough string to warrant a buffer + assert_eq!(fallback_result.data_buffers().len(), 2); + + // Convert to strings for easier assertion + let collected: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect(); + + let fallback_collected: Vec<_> = fallback_result + .iter() + .map(|x| x.map(|s| s.to_string())) + .collect(); + + assert_eq!(&collected, &fallback_collected); + + assert_eq!( + &collected, + &[ + Some("foo".to_string()), + Some("test".to_string()), + Some("baz".to_string()), + Some("views".to_string()), + Some("world_long_string_not_inlined".to_string()), + ] + ); + } + + #[test] + fn test_interleave_views_with_nulls() { + let values = StringArray::from_iter([ + Some("hello"), + None, + Some("foo_long_string_not_inlined"), + Some("bar"), + None, + ]); + let view_a = StringViewArray::from(&values); + + let values = StringArray::from_iter([ + Some("test"), + Some("data_long_string_not_inlined"), + None, + None, + Some("here"), + ]); + let view_b = StringViewArray::from(&values); + + let indices = &[ + (0, 1), // null + (1, 2), // null + (0, 2), // "foo_long_string_not_inlined" + (1, 3), // null + (0, 4), // null + ]; + + // Test specialized implementation + let values = interleave(&[&view_a, &view_b], indices).unwrap(); + let result = values.as_string_view(); + assert_eq!(result.data_buffers().len(), 1); + + let fallback = interleave_fallback(&[&view_a, &view_b], indices).unwrap(); + let fallback_result = fallback.as_string_view(); + + // Convert to strings for easier assertion + let collected: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect(); + + let fallback_collected: Vec<_> = fallback_result + .iter() + .map(|x| x.map(|s| s.to_string())) + .collect(); + + assert_eq!(&collected, &fallback_collected); + + assert_eq!( + &collected, + &[ + None, + None, + Some("foo_long_string_not_inlined".to_string()), + None, + None, + ] + ); + } + + #[test] + fn test_interleave_views_multiple_buffers() { + let str1 = "very_long_string_from_first_buffer".as_bytes(); + let str2 = "very_long_string_from_second_buffer".as_bytes(); + let buffer1 = str1.to_vec().into(); + let buffer2 = str2.to_vec().into(); + + let view1 = ByteView::new(str1.len() as u32, &str1[..4]) + .with_buffer_index(0) + .with_offset(0) + .as_u128(); + let view2 = ByteView::new(str2.len() as u32, &str2[..4]) + .with_buffer_index(1) + .with_offset(0) + .as_u128(); + let view_a = + StringViewArray::try_new(vec![view1, view2].into(), vec![buffer1, buffer2], None) + .unwrap(); + + let str3 = "another_very_long_string_buffer_three".as_bytes(); + let str4 = "different_long_string_in_buffer_four".as_bytes(); + let buffer3 = str3.to_vec().into(); + let buffer4 = str4.to_vec().into(); + + let view3 = ByteView::new(str3.len() as u32, &str3[..4]) + .with_buffer_index(0) + .with_offset(0) + .as_u128(); + let view4 = ByteView::new(str4.len() as u32, &str4[..4]) + .with_buffer_index(1) + .with_offset(0) + .as_u128(); + let view_b = + StringViewArray::try_new(vec![view3, view4].into(), vec![buffer3, buffer4], None) + .unwrap(); + + let indices = &[ + (0, 0), // String from first buffer of array A + (1, 0), // String from first buffer of array B + (0, 1), // String from second buffer of array A + (1, 1), // String from second buffer of array B + (0, 0), // String from first buffer of array A again + (1, 1), // String from second buffer of array B again + ]; + + // Test interleave + let values = interleave(&[&view_a, &view_b], indices).unwrap(); + let result = values.as_string_view(); + + assert_eq!( + result.data_buffers().len(), + 4, + "Expected four buffers (two from each input array)" + ); + + let result_strings: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect(); + assert_eq!( + result_strings, + vec![ + Some("very_long_string_from_first_buffer".to_string()), + Some("another_very_long_string_buffer_three".to_string()), + Some("very_long_string_from_second_buffer".to_string()), + Some("different_long_string_in_buffer_four".to_string()), + Some("very_long_string_from_first_buffer".to_string()), + Some("different_long_string_in_buffer_four".to_string()), + ] + ); + + let views = result.views(); + let buffer_indices: Vec<_> = views + .iter() + .map(|raw_view| ByteView::from(*raw_view).buffer_index) + .collect(); + + assert_eq!( + buffer_indices, + vec![ + 0, // First buffer from array A + 1, // First buffer from array B + 2, // Second buffer from array A + 3, // Second buffer from array B + 0, // First buffer from array A (reused) + 3, // Second buffer from array B (reused) + ] + ); + } }