diff --git a/lib/distributing_iterator/version.rb b/lib/distributing_iterator/version.rb index 8cac532..ee42ca6 100644 --- a/lib/distributing_iterator/version.rb +++ b/lib/distributing_iterator/version.rb @@ -1,3 +1,3 @@ module DistributingIterator - VERSION = "0.0.1" + VERSION = "0.0.2" end diff --git a/src/distribute_csv.rs b/src/distribute_csv.rs index 9b6bdb6..52b9176 100644 --- a/src/distribute_csv.rs +++ b/src/distribute_csv.rs @@ -1,5 +1,5 @@ +use anyhow::{Context, Result}; use std::collections::VecDeque; -use anyhow::{Result, Context}; use crate::distributing_iterator; use csv::ByteRecord; @@ -11,14 +11,14 @@ pub fn distribute(data: &str, field: &str, spread: u64) -> Result { let headers = csv.headers()?.clone(); let field_index = headers .iter() - .position(|header| header == field).context(format!("field `{field}` not found in CSV headers"))?; + .position(|header| header == field) + .context(format!("field `{field}` not found in CSV headers"))?; let data = csv .into_byte_records() .map(|record| record.map_err(anyhow::Error::from)) .collect::>>()?; let id_func = move |item: &ByteRecord| item[field_index].to_vec(); - let iterator = - distributing_iterator::DistributingIterator::new(data, spread as usize, id_func); + let iterator = distributing_iterator::DistributingIterator::new(data, spread as usize, id_func); let data: Vec<_> = iterator.collect(); let mut wtr = csv::Writer::from_writer(vec![]); wtr.write_record(&headers).context("writing headers")?; diff --git a/src/distribute_indexes.rs b/src/distribute_indexes.rs new file mode 100644 index 0000000..51b0535 --- /dev/null +++ b/src/distribute_indexes.rs @@ -0,0 +1,133 @@ +use fnv::FnvHashMap; +use indexmap::IndexMap; +use std::collections::VecDeque; + +pub fn distribute<'a, T: 'a, ID>( + data: &'a [T], + mut spread: usize, + id_func: impl Fn(&'a T) -> ID + Send + 'static, +) -> Vec +where + ID: Eq + std::hash::Hash, +{ + let mut result = Vec::with_capacity(data.len()); + let mut queue_per_id: FnvHashMap> = Default::default(); + let mut last_pos: IndexMap = Default::default(); + let mut output_pos = 0; + let mut data_pos = 0; + let mut iterator_reached_end = false; + + loop { + let item = loop { + let mut result = None; + let mut adjust_spread = false; + let sorted_spreadable_ids = last_pos + .iter() + .filter(|(_id, &last_pos)| output_pos - last_pos >= spread) + .map(|(id, _last_pos)| id); + for id in sorted_spreadable_ids { + match queue_per_id.get_mut(id) { + Some(queue) => { + if let Some(item) = queue.pop_front() { + if iterator_reached_end && queue.is_empty() { + queue_per_id.remove(id); + adjust_spread = true + } + result = Some(item); + break; + } + } + None => continue, + } + } + if result.is_some() { + if adjust_spread { + spread = calculate_spread(&queue_per_id); + } + break result; + } + + if iterator_reached_end { + if queue_per_id.values().flatten().any(|_| true) { + panic!( + "Nothing can be returned even though the queue is not empty. This is a bug" + ); + } else { + break None; + } + } + + let current_data_pos = data_pos; + data_pos += 1; + + match data.get(current_data_pos) { + Some(item) => { + let id = (id_func)(item); + if !last_pos.contains_key(&id) { + break Some(current_data_pos); + } else { + queue_per_id + .entry(id) + .or_insert_with(|| VecDeque::with_capacity(100)) + .push_back(current_data_pos); + } + } + None => { + spread = calculate_spread(&queue_per_id); + iterator_reached_end = true; + } + } + }; + if let Some(output_idx) = item { + let id = (id_func)(&data[output_idx]); + last_pos.shift_remove(&id); + last_pos.insert(id, output_pos); + result.push(output_idx); + output_pos += 1; + } else { + break; + } + } + result +} + +/// Distribute items that are themselves IDs +pub fn distribute_ids(data: &[T], spread: usize) -> Vec where T: Eq + std::hash::Hash, { + distribute(data, spread, |item| item) +} + +fn calculate_spread(queue_per_id: &FnvHashMap>) -> usize { + queue_per_id + .iter() + .filter(|(_id, queue)| !queue.is_empty()) + .count() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_distribute() { + let data = vec![ + "1", "1", "1", "2", "3", "3", "2", "2", "3", "3", "2", "3", "2", "3", "3", + ]; + let result = distribute(&data, 3, |item| item.parse::().unwrap()); + assert_eq!( + result, + vec![0, 3, 4, 1, 6, 5, 2, 7, 8, 10, 9, 12, 11, 13, 14] + ); + } + + #[test] + fn test_distribute2() { + let data = vec!["Picture", "Post", "Video", "Video", "Picture", "Post", "Picture", "Picture", "Video"]; + let result = distribute_ids(&data, 3); + let result_with_labels = result + .iter() + .map(|idx| data[*idx]) + .collect::>(); + assert_eq!(result_with_labels, vec!["Picture", "Post", "Video", "Picture", "Post", "Video", "Picture", "Video", "Picture"]); + assert_eq!(result, vec![0, 1, 2, 4, 5, 3, 6, 8, 7]); + } +} diff --git a/src/distributing_iterator.rs b/src/distributing_iterator.rs index d78a55f..85261d0 100644 --- a/src/distributing_iterator.rs +++ b/src/distributing_iterator.rs @@ -101,6 +101,9 @@ where } } if result.is_some() { + if adjust_spread { + self.spread = Self::calculate_spread(&queue_per_id); + } break result; } @@ -133,9 +136,6 @@ where } } }; - if adjust_spread { - self.spread = Self::calculate_spread(&queue_per_id); - } self.queue_per_id = Some(queue_per_id); result } diff --git a/src/lib.rs b/src/lib.rs index a65ce4a..2e617b3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,11 @@ mod distribute_csv; +mod distribute_indexes; mod distributing_iterator; + #[cfg(feature = "magnus")] mod ruby_ext; pub use distribute_csv::distribute as distribute_csv; +pub use distribute_indexes::distribute; +pub use distribute_indexes::distribute_ids; pub use distributing_iterator::DistributingIterator; diff --git a/src/ruby_ext.rs b/src/ruby_ext.rs index ecd892a..1b1c09a 100644 --- a/src/ruby_ext.rs +++ b/src/ruby_ext.rs @@ -1,19 +1,22 @@ -use magnus::{error::Result, function, exception, Error}; +use magnus::{error::Result, exception, function, Error, Value}; -use crate::distribute_csv; +use crate::{distribute_ids, distribute_csv}; fn distribute_csv_ruby(data: String, field: String, spread: u64) -> Result { match distribute_csv(&data, &field, spread) { Ok(result) => Ok(result), - Err(e) => { - Err(Error::new(exception::standard_error(), format!("{:?}", e))) - } + Err(e) => Err(Error::new(exception::standard_error(), format!("{:?}", e))), } } +fn distribute_indexes_ruby(data: Vec, spread: usize) -> Result> { + Ok(distribute_ids(&data, spread)) +} + #[magnus::init] fn init(ruby: &magnus::Ruby) -> Result<()> { let module = ruby.define_module("DistributingIterator")?; - module.define_module_function("distribute", function!(distribute_csv_ruby, 3))?; + module.define_module_function("distribute_csv", function!(distribute_csv_ruby, 3))?; + module.define_module_function("distribute_indexes", function!(distribute_indexes_ruby, 2))?; Ok(()) }