Skip to content

Commit

Permalink
Smart partition groups.
Browse files Browse the repository at this point in the history
  • Loading branch information
milesj committed Nov 23, 2024
1 parent cd0c9f7 commit 54b16c2
Showing 1 changed file with 76 additions and 55 deletions.
131 changes: 76 additions & 55 deletions crates/remote/src/remote_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use moon_action::Operation;
use moon_common::{color, is_ci};
use moon_config::RemoteConfig;
use rustc_hash::FxHashMap;
use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
use std::sync::{Arc, OnceLock};
use std::time::SystemTime;
Expand Down Expand Up @@ -388,55 +389,34 @@ async fn batch_upload_blobs(
blobs: Vec<Blob>,
max_size: usize,
) -> miette::Result<bool> {
let mut blob_groups = FxHashMap::default();
let mut group_sizes = FxHashMap::default();
let mut group_index = 0;
let mut current_size = 0;
let blob_groups = partition_into_groups(blobs, max_size, |blob| blob.bytes.len());

for blob in blobs {
let blob_size = blob.bytes.len();

if blob_size >= max_size {
warn!(
size = blob_size,
max_size,
"Encountered a blob larger than the max upload size; this is currently not supported until we support the ByteStream API; aborting upload"
);

return Ok(false);
}

if current_size >= max_size || (current_size + blob_size) >= max_size {
group_sizes.insert(group_index, current_size);
group_index += 1;
current_size = 0;
}

current_size += blob_size;
blob_groups.entry(group_index).or_insert(vec![]).push(blob);
if blob_groups.is_empty() {
return Ok(false);
}

let group_total = blob_groups.len();
let mut set = JoinSet::default();

for (group, blobs) in blob_groups.into_iter() {
for (group_index, group) in blob_groups.into_iter() {
let client = Arc::clone(&client);
let digest = digest.to_owned();

trace!(
hash = &digest.hash,
blobs = blobs.len(),
size = group_sizes.get(&group),
blobs = group.items.len(),
size = group.size,
max_size,
"Batching blobs upload (group {} of {})",
group + 1,
group_index + 1
group_index + 1,
group_total
);

set.spawn(async move {
if let Err(error) = client.batch_update_blobs(&digest, blobs).await {
if let Err(error) = client.batch_update_blobs(&digest, group.items).await {
warn!(
hash = &digest.hash,
group = group + 1,
group = group_index + 1,
"Failed to upload blobs: {}",
color::muted_light(error.to_string()),
);
Expand All @@ -461,49 +441,40 @@ async fn batch_download_blobs(
max_size: usize,
) -> miette::Result<()> {
let mut file_map = FxHashMap::default();
let mut digest_groups = FxHashMap::default();
let mut group_sizes = FxHashMap::default();
let mut group_index = 0;
let mut current_size = 0;
let mut digests = vec![];

// TODO support directories
for file in &result.output_files {
if let Some(digest) = &file.digest {
file_map.insert(&digest.hash, file);
digests.push(digest.to_owned());
}
}

let blob_size = digest.size_bytes as usize;

if current_size >= max_size || (current_size + blob_size) >= max_size {
group_sizes.insert(group_index, current_size);
group_index += 1;
current_size = 0;
}
let digest_groups = partition_into_groups(digests, max_size, |dig| dig.size_bytes as usize);

current_size += blob_size;
digest_groups
.entry(group_index)
.or_insert(vec![])
.push(digest.to_owned());
}
if digest_groups.is_empty() {
return Ok(());
}

let group_total = digest_groups.len();
let mut set = JoinSet::<miette::Result<Vec<Blob>>>::default();

for (group, blob_digests) in digest_groups.into_iter() {
for (group_index, group) in digest_groups.into_iter() {
let client = Arc::clone(&client);
let digest = digest.to_owned();

trace!(
hash = &digest.hash,
blobs = blob_digests.len(),
size = group_sizes.get(&group),
blobs = group.items.len(),
size = group.size,
max_size,
"Batching blobs download (group {} of {})",
group + 1,
group_index + 1
group_index + 1,
group_total
);

set.spawn(async move { client.batch_read_blobs(&digest, blob_digests).await });
set.spawn(async move { client.batch_read_blobs(&digest, group.items).await });
}

while let Some(res) = set.join_next().await {
Expand All @@ -526,3 +497,53 @@ async fn batch_download_blobs(

Ok(())
}

struct Partition<T> {
pub items: Vec<T>,
pub size: usize,
}

fn partition_into_groups<T>(
items: Vec<T>,
max_size: usize,
get_size: impl Fn(&T) -> usize,
) -> BTreeMap<i32, Partition<T>> {
let mut groups = BTreeMap::<i32, Partition<T>>::default();

for item in items {
let item_size = get_size(&item);
let mut index_to_use = -1;

if item_size >= max_size {
warn!(
size = item_size,
max_size,
"Encountered a blob larger than the max size; this is currently not supported until we support the ByteStream API; aborting"
);

return BTreeMap::default();
}

// Try and find a partition that this item can go into
for (index, group) in &groups {
if group.size + item_size < max_size {
index_to_use = *index;
break;
}
}

// If no partition available, create a new one
if index_to_use == -1 {
index_to_use = groups.len() as i32;
}

let entry = groups.entry(index_to_use).or_insert_with(|| Partition {
items: vec![],
size: 0,
});
entry.size += item_size;
entry.items.push(item);
}

groups
}

0 comments on commit 54b16c2

Please sign in to comment.