Skip to content

Commit

Permalink
Merge pull request #2459 from subspace/use-l3-cache-topology
Browse files Browse the repository at this point in the history
Use L3 cache topology instead of NUMA topology for default plotting threads
  • Loading branch information
nazar-pc authored Jan 29, 2024
2 parents afd74b3 + 20ae3f5 commit b5a7e3c
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 17 deletions.
22 changes: 10 additions & 12 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ use subspace_farmer::utils::readers_and_pieces::ReadersAndPieces;
use subspace_farmer::utils::ss58::parse_ss58_reward_address;
use subspace_farmer::utils::{
all_cpu_cores, create_plotting_thread_pool_manager, parse_cpu_cores_sets,
run_future_in_dedicated_thread, thread_pool_core_indices, AsyncJoinOnDrop,
recommended_number_of_farming_threads, run_future_in_dedicated_thread,
thread_pool_core_indices, AsyncJoinOnDrop,
};
use subspace_farmer::{Identity, NodeClient, NodeRpcClient};
use subspace_farmer_components::plotting::PlottedSector;
Expand Down Expand Up @@ -116,7 +117,8 @@ pub(crate) struct FarmingArgs {
#[arg(long)]
sector_downloading_concurrency: Option<NonZeroUsize>,
/// Defines how many sectors farmer will encode concurrently, defaults to 1 on UMA system and
/// number of NUMA nodes on NUMA system. It is further restricted by
/// number of NUMA nodes on NUMA system or L3 cache groups on large CPUs. It is further
/// restricted by
/// `--sector-downloading-concurrency` and setting this option higher than
/// `--sector-downloading-concurrency` will have no effect.
#[arg(long)]
Expand All @@ -133,7 +135,8 @@ pub(crate) struct FarmingArgs {
#[arg(long)]
farming_thread_pool_size: Option<NonZeroUsize>,
/// Size of one thread pool used for plotting, defaults to number of logical CPUs available
/// on UMA system and number of logical CPUs available in NUMA node on NUMA system.
/// on UMA system and number of logical CPUs available in NUMA node on NUMA system or L3 cache
/// groups on large CPUs.
///
/// Number of thread pools is defined by `--sector-encoding-concurrency` option, different
/// thread pools might have different number of threads if NUMA nodes do not have the same size.
Expand All @@ -154,7 +157,8 @@ pub(crate) struct FarmingArgs {
plotting_cpu_cores: Option<String>,
/// Size of one thread pool used for replotting, typically smaller pool than for plotting
/// to not affect farming as much, defaults to half of the number of logical CPUs available on
/// UMA system and number of logical CPUs available in NUMA node on NUMA system.
/// UMA system and number of logical CPUs available in NUMA node on NUMA system or L3 cache
/// groups on large CPUs.
///
/// Number of thread pools is defined by `--sector-encoding-concurrency` option, different
/// thread pools might have different number of threads if NUMA nodes do not have the same size.
Expand Down Expand Up @@ -498,22 +502,16 @@ where
.unwrap_or(plotting_thread_pool_core_indices.len() + 1),
));

let all_cpu_cores = all_cpu_cores();
let plotting_thread_pool_manager = create_plotting_thread_pool_manager(
plotting_thread_pool_core_indices
.into_iter()
.zip(replotting_thread_pool_core_indices),
)?;
let farming_thread_pool_size = farming_thread_pool_size
.map(|farming_thread_pool_size| farming_thread_pool_size.get())
.unwrap_or_else(|| {
all_cpu_cores
.first()
.expect("Not empty according to function description; qed")
.cpu_cores()
.len()
});
.unwrap_or_else(recommended_number_of_farming_threads);

let all_cpu_cores = all_cpu_cores();
if all_cpu_cores.len() > 1 {
info!(numa_nodes = %all_cpu_cores.len(), "NUMA system detected");

Expand Down
37 changes: 32 additions & 5 deletions crates/subspace-farmer/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::thread_pool_manager::{PlottingThreadPoolManager, PlottingThreadPoolPa
use futures::channel::oneshot;
use futures::channel::oneshot::Canceled;
use futures::future::Either;
use hwlocality::object::types::ObjectType;
use rayon::{ThreadBuilder, ThreadPool, ThreadPoolBuildError, ThreadPoolBuilder};
use std::future::Future;
use std::num::{NonZeroUsize, ParseIntError};
Expand Down Expand Up @@ -187,7 +188,30 @@ impl CpuCoreSet {
}
}

/// Get all cpu cores, grouped into sets according to NUMA nodes.
/// Recommended number of thread pool size for farming, equal to number of CPU cores in the first
/// NUMA node
pub fn recommended_number_of_farming_threads() -> usize {
#[cfg(feature = "numa")]
match hwlocality::Topology::new().map(std::sync::Arc::new) {
Ok(topology) => {
return topology
// Iterate over NUMA nodes
.objects_at_depth(hwlocality::object::depth::Depth::NUMANode)
// For each NUMA nodes get CPU set
.filter_map(|node| node.cpuset())
// Get number of CPU cores
.map(|cpuset| cpuset.iter_set().count())
.find(|&count| count > 0)
.unwrap_or_else(num_cpus::get);
}
Err(error) => {
warn!(%error, "Failed to get NUMA topology");
}
}
num_cpus::get()
}

/// Get all cpu cores, grouped into sets according to NUMA nodes or L3 cache groups on large CPUs.
///
/// Returned vector is guaranteed to have at least one element and have non-zero number of CPU cores
/// in each set.
Expand All @@ -196,8 +220,8 @@ pub fn all_cpu_cores() -> Vec<CpuCoreSet> {
match hwlocality::Topology::new().map(std::sync::Arc::new) {
Ok(topology) => {
let cpu_cores = topology
// Iterate over NUMA nodes
.objects_at_depth(hwlocality::object::depth::Depth::NUMANode)
// Iterate over groups of L3 caches
.objects_with_type(ObjectType::L3Cache)
// For each NUMA nodes get CPU set
.filter_map(|node| node.cpuset())
// For each CPU set extract individual cores
Expand All @@ -214,7 +238,7 @@ pub fn all_cpu_cores() -> Vec<CpuCoreSet> {
}
}
Err(error) => {
warn!(%error, "Failed to get CPU topology");
warn!(%error, "Failed to get L3 cache topology");
}
}
vec![CpuCoreSet {
Expand All @@ -227,6 +251,9 @@ pub fn all_cpu_cores() -> Vec<CpuCoreSet> {
/// Parse space-separated set of groups of CPU cores (individual cores are coma-separated) into
/// vector of CPU core sets that can be used for creation of plotting/replotting thread pools.
pub fn parse_cpu_cores_sets(s: &str) -> Result<Vec<CpuCoreSet>, ParseIntError> {
#[cfg(feature = "numa")]
let topology = hwlocality::Topology::new().map(std::sync::Arc::new).ok();

s.split(' ')
.map(|s| {
let cores = s
Expand All @@ -237,7 +264,7 @@ pub fn parse_cpu_cores_sets(s: &str) -> Result<Vec<CpuCoreSet>, ParseIntError> {
Ok(CpuCoreSet {
cores,
#[cfg(feature = "numa")]
topology: hwlocality::Topology::new().map(std::sync::Arc::new).ok(),
topology: topology.clone(),
})
})
.collect()
Expand Down

0 comments on commit b5a7e3c

Please sign in to comment.