Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

draft: improve CPU cores affinity to NUMA nodes #11521

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -1413,7 +1413,7 @@
exit 1
fi
CFLAGS="${CFLAGS} `pkg-config --cflags libdpdk`"
LIBS="${LIBS} -Wl,-R,`pkg-config --libs-only-L libdpdk | cut -c 3-` -lnuma `pkg-config --libs libdpdk`"
LIBS="${LIBS} -Wl,-R,`pkg-config --libs-only-L libdpdk | cut -c 3-` -lnuma `pkg-config --libs libdpdk` -lhwloc"

if test ! -z "$(ldconfig -p | grep librte_net_bond)"; then
AC_DEFINE([HAVE_DPDK_BOND],[1],(DPDK Bond PMD support enabled))
Expand Down
2 changes: 2 additions & 0 deletions src/threadvars.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ typedef struct ThreadVars_ {
struct FlowQueue_ *flow_queue;
bool break_loop;

char *iface_name; // set if the TV is TVT_PPT

} ThreadVars;

/** Thread setup flags: */
Expand Down
2 changes: 1 addition & 1 deletion src/tm-threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,7 @@ TmEcode TmThreadSetupOptions(ThreadVars *tv)
if (tv->thread_setup_flags & THREAD_SET_AFFTYPE) {
ThreadsAffinityType *taf = &thread_affinity[tv->cpu_affinity];
if (taf->mode_flag == EXCLUSIVE_AFFINITY) {
uint16_t cpu = AffinityGetNextCPU(taf);
uint16_t cpu = AffinityGetNextCPU(tv, taf);
SetCPUAffinity(cpu);
/* If CPU is in a set overwrite the default thread prio */
if (CPU_ISSET(cpu, &taf->lowprio_cpu)) {
Expand Down
302 changes: 301 additions & 1 deletion src/util-affinity.c
Original file line number Diff line number Diff line change
Expand Up @@ -276,17 +276,268 @@ void AffinitySetupLoadFromConfig(void)
#endif /* OS_WIN32 and __OpenBSD__ */
}

static hwloc_topology_t topology = NULL;

int HwLocDeviceNumaGet(hwloc_topology_t topology, hwloc_obj_t obj) {
hwloc_obj_t non_io_ancestor = hwloc_get_non_io_ancestor_obj(topology, obj);
if (non_io_ancestor == NULL) {
fprintf(stderr, "Failed to find non-IO ancestor object.\n");
return -1;
}

// Iterate over NUMA nodes and check their nodeset
hwloc_obj_t numa_node = NULL;
while ((numa_node = hwloc_get_next_obj_by_type(topology, HWLOC_OBJ_NUMANODE, numa_node)) != NULL) {
if (hwloc_bitmap_isset(non_io_ancestor->nodeset, numa_node->os_index)) {
return numa_node->logical_index;
}
}

return -1;
}

// can only be used from hwloc version 2.5 and up
void get_numa_nodes_from_pcie(hwloc_topology_t topology, hwloc_obj_t pcie_obj) {
hwloc_obj_t nodes[16]; // Assuming a maximum of 16 NUMA nodes
unsigned num_nodes = 16;
struct hwloc_location location;

location.type = HWLOC_LOCATION_TYPE_OBJECT;
location.location.object = pcie_obj;

int result = hwloc_get_local_numanode_objs(topology, &location, &num_nodes, nodes, 0);
if (result == 0 && num_nodes > 0) {
printf("NUMA nodes for PCIe device:\n");
for (unsigned i = 0; i < num_nodes; i++) {
printf("NUMA node %d\n", nodes[i]->logical_index);
}
} else {
printf("No NUMA node found for PCIe device.\n");
}
}

// Static function to find the NUMA node of a given hwloc object
static hwloc_obj_t find_numa_node(hwloc_topology_t topology, hwloc_obj_t obj) {
if (!obj) {
fprintf(stderr, "Invalid hwloc object.\n");
return NULL;
}

hwloc_obj_t parent = obj->parent;
while (parent) {
printf("Object type: %s\n", hwloc_obj_type_string(parent->type));
if (parent->type == HWLOC_OBJ_PACKAGE || parent->type == HWLOC_OBJ_NUMANODE) {
break;
}
parent = parent->parent;
}

if (parent == NULL) {
fprintf(stderr, "No parent found for the given object.\n");
return NULL;
}

// Iterate over all NUMA nodes and check if they intersect with the given object
hwloc_obj_t numa_node = NULL;
while ((numa_node = hwloc_get_next_obj_by_type(topology, HWLOC_OBJ_NUMANODE, numa_node)) != NULL) {
if (hwloc_bitmap_intersects(parent->cpuset, numa_node->cpuset)) {
return numa_node;
}
}

return NULL;
}

static hwloc_obj_t HwLocDeviceGetByKernelName(hwloc_topology_t topology, const char *interface_name) {
hwloc_obj_t obj = NULL;

while ((obj = hwloc_get_next_osdev(topology, obj)) != NULL) {
if (obj->attr->osdev.type == HWLOC_OBJ_OSDEV_NETWORK && strcmp(obj->name, interface_name) == 0) {
hwloc_obj_t parent = obj->parent;
while (parent) {
if (parent->type == HWLOC_OBJ_PCI_DEVICE) {
return parent;
}
parent = parent->parent;
}
}
}
return NULL;
}

// Static function to deparse PCIe interface string name to individual components
static void deparse_pcie_address(const char *pcie_address, unsigned int *domain, unsigned int *bus, unsigned int *device, unsigned int *function) {
*domain = 0; // Default domain to 0 if not provided

// Handle both full and short PCIe address formats
if (sscanf(pcie_address, "%x:%x:%x.%x", domain, bus, device, function) != 4) {
if (sscanf(pcie_address, "%x:%x.%x", bus, device, function) != 3) {
fprintf(stderr, "Error parsing PCIe address: %s\n", pcie_address);
exit(EXIT_FAILURE);
}
}
}

// Function to convert PCIe address to hwloc object
static hwloc_obj_t HwLocDeviceGetByPcie(hwloc_topology_t topology, const char *pcie_address) {
hwloc_obj_t obj = NULL;
unsigned int domain, bus, device, function;
deparse_pcie_address(pcie_address, &domain, &bus, &device, &function);
while ((obj = hwloc_get_next_pcidev(topology, obj)) != NULL) {
if (obj->attr->pcidev.domain == domain && obj->attr->pcidev.bus == bus && obj->attr->pcidev.dev == device && obj->attr->pcidev.func == function) {
return obj;
}
}
return NULL;
}

// Function to print hwloc object attributes
void print_hwloc_object(hwloc_obj_t obj) {
if (!obj) {
printf("No object found for the given PCIe address.\n");
return;
}

printf("Object type: %s\n", hwloc_obj_type_string(obj->type));
printf("Logical index: %u\n", obj->logical_index);
printf("Depth: %u\n", obj->depth);
printf("Attributes:\n");
if (obj->type == HWLOC_OBJ_PCI_DEVICE) {
printf(" Domain: %04x\n", obj->attr->pcidev.domain);
printf(" Bus: %02x\n", obj->attr->pcidev.bus);
printf(" Device: %02x\n", obj->attr->pcidev.dev);
printf(" Function: %01x\n", obj->attr->pcidev.func);
printf(" Class ID: %04x\n", obj->attr->pcidev.class_id);
printf(" Vendor ID: %04x\n", obj->attr->pcidev.vendor_id);
printf(" Device ID: %04x\n", obj->attr->pcidev.device_id);
printf(" Subvendor ID: %04x\n", obj->attr->pcidev.subvendor_id);
printf(" Subdevice ID: %04x\n", obj->attr->pcidev.subdevice_id);
printf(" Revision: %02x\n", obj->attr->pcidev.revision);
printf(" Link speed: %f GB/s\n", obj->attr->pcidev.linkspeed);
} else {
printf(" No PCI device attributes available.\n");
}
}

static bool CPUIsFromNuma(uint16_t ncpu, uint16_t numa)
{
int core_id = ncpu;
int depth = hwloc_get_type_depth(topology, HWLOC_OBJ_NUMANODE);
hwloc_obj_t numa_node = NULL;

while ((numa_node = hwloc_get_next_obj_by_depth(topology, depth, numa_node)) != NULL) {
hwloc_cpuset_t cpuset = hwloc_bitmap_alloc();
hwloc_bitmap_copy(cpuset, numa_node->cpuset);

if (hwloc_bitmap_isset(cpuset, core_id)) {
printf("Core %d belongs to NUMA node %d\n", core_id, numa_node->logical_index);
hwloc_bitmap_free(cpuset);
break;
}
hwloc_bitmap_free(cpuset);
}

if (numa == numa_node->logical_index)
return true;

return false;
}


/**
* \brief Return next cpu to use for a given thread family
* \retval the cpu to used given by its id
*/
uint16_t AffinityGetNextCPU(ThreadsAffinityType *taf)
uint16_t AffinityGetNextCPU(ThreadVars *tv, ThreadsAffinityType *taf)
{
// todo: instead of adding iface to the threadvars
// add a preffered NUMA node - that can be filled out in prior and it is more universal
int iface_numa = -1;

// threading.cpu-assignment:
// - legacy - assign as usual
// - auto - use hwloc to determine NUMA locality of the NIC and try to assign a core from this NUMA node.
// If it fails then use the other NUMA node.
// Using this approach e.g. on bonded devices/aliased and any other will not work
// Warn/Notify a user when device's NUMA node cannot be determined.
// Mention in the docs that NUMA locatity supports PCIe addresses and Kernel interfaces
// - manual - in workers CPU set either:
// - Specify in one line ([ "eth0@1,2,3,4,7-9", "eth1@10,11" ])
// - Specify threading in a list:
// - worker-cpu-set:
// - interface: eth0
// cpu: [ 1,2,3,4 ]
// mode: "exclusive"
// prio:
// high: [ 3 ]
// default: "medium"

if (tv->type == TVT_PPT && tv->iface_name) {
if (topology == NULL) {
if (hwloc_topology_init(&topology) == -1) {
FatalError("Failed to initialize topology");
}
int ret = hwloc_topology_set_flags(topology, HWLOC_TOPOLOGY_FLAG_WHOLE_SYSTEM);
ret = hwloc_topology_set_io_types_filter(topology, HWLOC_TYPE_FILTER_KEEP_ALL);
if (ret == -1) {
FatalError("Failed to set topology flags");
hwloc_topology_destroy(topology);
}
if (hwloc_topology_load(topology) == -1) {
FatalError("Failed to load topology");
hwloc_topology_destroy(topology);
}
}

// try kernel inteface first
hwloc_obj_t obj1 = HwLocDeviceGetByKernelName(topology, tv->iface_name);
if (obj1 == NULL) {
// if unsuccessful try PCIe search
obj1 = HwLocDeviceGetByPcie(topology, tv->iface_name);
}

if (obj1 != NULL) {
static char pcie_address[32];
snprintf(pcie_address, sizeof(pcie_address), "%04x:%02x:%02x.%x", obj1->attr->pcidev.domain, obj1->attr->pcidev.bus, obj1->attr->pcidev.dev, obj1->attr->pcidev.func);
SCLogNotice("PCIe addr of ens1f0 is %s with NUMA id %d or %p", pcie_address, HwLocDeviceNumaGet(topology, obj1), find_numa_node(topology, obj1));
}

iface_numa = HwLocDeviceNumaGet(topology, obj1);
// can be combined with newer api in get_numa_nodes_from_pcie(topology, obj1);

}

// if (topology != NULL) {
// int numa = get_numa_node_for_net_device(topology, "ens1f0");
// FatalError("NUMA node for ens1f0: %d\n", numa);
// }
// hwloc_topology_destroy(topology);

uint16_t ncpu = 0;
#if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
int iter = 0;
SCMutexLock(&taf->taf_mutex);
ncpu = taf->lcpu;

// not ideal cuz if you have one if and threads 1,2,3,4
// then 1,3 are double assigned

// probably divide configured CPU sets into NUMA nodes and operate on that independently
// e.g. for NICs on NUMA 1 primarily use corres from NUMA 1,
// when exhausted start using cores from NUMA 0.
// when exhausted use cores from other NUMAs(?)
// when exhausted reset counters on NUMAs and use the cores again

if (iface_numa != -1) {
while ((!CPU_ISSET(ncpu, &taf->cpu_set) || !CPUIsFromNuma(ncpu, iface_numa))) {
ncpu++;
if (ncpu >= UtilCpuGetNumProcessorsOnline()) {
ncpu = 0;
break;
}
}
}

while (!CPU_ISSET(ncpu, &taf->cpu_set) && iter < 2) {
ncpu++;
if (ncpu >= UtilCpuGetNumProcessorsOnline()) {
Expand All @@ -307,6 +558,55 @@ uint16_t AffinityGetNextCPU(ThreadsAffinityType *taf)
return ncpu;
}


// uint16_t AffinityGetNextCPUFromNUMANode(ThreadsAffinityType *taf, int numa_node) {
// uint16_t ncpu = 0;
// #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
// int iter = 0;
// SCMutexLock(&taf->taf_mutex);
// ncpu = taf->lcpu;

// // Check for CPUs within the preferred NUMA node first
// while (!CPU_ISSET(ncpu, &taf->cpu_set) || hwloc_get_obj_by_os_index(topology, HWLOC_OBJ_PU, ncpu)->nodeset->first != numa_node) {
// ncpu++;
// if (ncpu >= UtilCpuGetNumProcessorsOnline()) {
// ncpu = 0;
// iter++;
// }
// if (iter >= 2) {
// break;
// }
// }

// if (iter == 2) {
// // Fallback to any available CPU if no CPU found within the preferred NUMA node
// ncpu = taf->lcpu;
// while (!CPU_ISSET(ncpu, &taf->cpu_set) && iter < 2) {
// ncpu++;
// if (ncpu >= UtilCpuGetNumProcessorsOnline()) {
// ncpu = 0;
// iter++;
// }
// }
// if (iter == 2) {
// SCLogError("cpu_set does not contain "
// "available cpus, cpu affinity conf is invalid");
// }
// }

// taf->lcpu = ncpu + 1;
// if (taf->lcpu >= UtilCpuGetNumProcessorsOnline())
// taf->lcpu = 0;
// SCMutexUnlock(&taf->taf_mutex);
// SCLogDebug("Setting affinity on CPU %d", ncpu);
// #endif /* OS_WIN32 and __OpenBSD__ */
// return ncpu;
// }

/**
* \brief Return the total number of CPUs in a given affinity
* \retval the number of affined CPUs
*/
uint16_t UtilAffinityGetAffinedCPUNum(ThreadsAffinityType *taf)
{
uint16_t ncpu = 0;
Expand Down
5 changes: 4 additions & 1 deletion src/util-affinity.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
#include "suricata-common.h"
#include "conf.h"
#include "threads.h"
#include "threadvars.h"

#include <hwloc.h>

#if defined OS_FREEBSD
#include <sched.h>
Expand Down Expand Up @@ -86,7 +89,7 @@ extern ThreadsAffinityType thread_affinity[MAX_CPU_SET];
void AffinitySetupLoadFromConfig(void);
ThreadsAffinityType * GetAffinityTypeFromName(const char *name);

uint16_t AffinityGetNextCPU(ThreadsAffinityType *taf);
uint16_t AffinityGetNextCPU(ThreadVars *tv, ThreadsAffinityType *taf);
uint16_t UtilAffinityGetAffinedCPUNum(ThreadsAffinityType *taf);
#ifdef HAVE_DPDK
uint16_t UtilAffinityCpusOverlap(ThreadsAffinityType *taf1, ThreadsAffinityType *taf2);
Expand Down
7 changes: 7 additions & 0 deletions src/util-runmodes.c
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,13 @@ static int RunModeSetLiveCaptureWorkersForDevice(ConfigIfaceThreadsCountFunc Mod

TmThreadSetCPU(tv, WORKER_CPU_SET);

if (tv->type == TVT_PPT) {
tv->iface_name = strdup(live_dev);
SCLogNotice("Duplicated livedev %s to %s", live_dev, tv->iface_name);
} else {
tv->iface_name = NULL;
}

if (TmThreadSpawn(tv) != TM_ECODE_OK) {
FatalError("TmThreadSpawn failed");
}
Expand Down