Skip to content

Commit

Permalink
feat: support fanout in mirror mode and analyzer mode
Browse files Browse the repository at this point in the history
  • Loading branch information
yuanchaoa committed Dec 6, 2024
1 parent 0476e66 commit 09080f0
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 48 deletions.
9 changes: 7 additions & 2 deletions agent/src/dispatcher/analyzer_mode_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ impl AnalyzerModeDispatcherListener {
&self.base.netns
}

pub fn on_tap_interface_change(&self, _: &[Link], _: IfMacSource) {
pub fn on_tap_interface_change(&self, links: &[Link], _: IfMacSource) {
self.base
.on_tap_interface_change(vec![], IfMacSource::IfMac);
.on_tap_interface_change(links.to_vec(), IfMacSource::IfMac);
}

pub fn on_vm_change(&self, vm_mac_addrs: &[MacAddr], gateway_vmac_addrs: &[MacAddr]) {
Expand Down Expand Up @@ -581,6 +581,7 @@ impl AnalyzerModeDispatcher {
base.check_and_update_bpf();
continue;
}

if base.pause.load(Ordering::Relaxed) {
continue;
}
Expand All @@ -602,6 +603,10 @@ impl AnalyzerModeDispatcher {
if_index: 0,
};
batch.push(info);

drop(packet);

base.check_and_update_bpf();
}
if let Some(handler) = self.flow_generator_thread_handler.take() {
let _ = handler.join();
Expand Down
23 changes: 23 additions & 0 deletions agent/src/dispatcher/base_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use super::{
recv_engine::{self, bpf, RecvEngine},
BpfOptions, Options, PacketCounter, Pipeline,
};
#[cfg(any(target_os = "linux", target_os = "android"))]
pub use recv_engine::af_packet::{bpf::*, BpfSyntax};

use special_recv_engine::Libpcap;

Expand Down Expand Up @@ -419,6 +421,27 @@ impl BaseDispatcher {
false
}

pub fn add_skip_outgoing(&self) {
let mut syntax = vec![
BpfSyntax::LoadExtension(LoadExtension {
num: Extension::ExtType,
}),
BpfSyntax::JumpIf(JumpIf {
cond: JumpTest::JumpNotEqual,
val: public::enums::LinuxSllPacketType::Outgoing as u32,
skip_true: 1,
..Default::default()
}),
BpfSyntax::RetConstant(RetConstant { val: 0 }),
];

self.bpf_options
.lock()
.unwrap()
.bpf_syntax
.append(&mut syntax);
}

pub(super) fn init(&mut self) -> Result<()> {
match self.engine.init() {
Ok(_) => {
Expand Down
8 changes: 6 additions & 2 deletions agent/src/dispatcher/mirror_mode_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,11 @@ impl MirrorModeDispatcherListener {
&self.base.netns
}

pub fn on_tap_interface_change(&self, _: &[Link], _: IfMacSource, agent_type: AgentType) {
pub fn on_tap_interface_change(&self, links: &[Link], _: IfMacSource, agent_type: AgentType) {
let mut old_agent_type = self.agent_type.write().unwrap();
*old_agent_type = agent_type;
self.base
.on_tap_interface_change(vec![], IfMacSource::IfMac);
.on_tap_interface_change(links.to_vec(), IfMacSource::IfMac);
}

pub fn on_vm_change_with_bridge_macs(
Expand Down Expand Up @@ -729,6 +729,10 @@ impl MirrorModeDispatcher {
self.base.npb_dedup_enabled.load(Ordering::Relaxed),
);
}

drop(packet);

self.base.check_and_update_bpf();
}

self.pipelines.clear();
Expand Down
4 changes: 2 additions & 2 deletions agent/src/dispatcher/mirror_plus_mode_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ impl MirrorPlusModeDispatcherListener {
&self.base.netns
}

pub fn on_tap_interface_change(&self, _: &[Link], _: IfMacSource, agent_type: AgentType) {
pub fn on_tap_interface_change(&self, links: &[Link], _: IfMacSource, agent_type: AgentType) {
let mut old_agent_type = self.agent_type.write().unwrap();
*old_agent_type = agent_type;
self.base
.on_tap_interface_change(vec![], IfMacSource::IfMac);
.on_tap_interface_change(links.to_vec(), IfMacSource::IfMac);
}

pub fn on_vm_change_with_bridge_macs(
Expand Down
33 changes: 10 additions & 23 deletions agent/src/dispatcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,11 @@ impl BpfOptions {
) -> Vec<BpfSyntax> {
let mut bpf_syntax = self.bpf_syntax.clone();

if tap_interfaces.is_empty() {
bpf_syntax.push(BpfSyntax::RetConstant(RetConstant { val: 0 }));
return bpf_syntax;
}

bpf_syntax.push(BpfSyntax::LoadExtension(LoadExtension {
num: Extension::ExtInterfaceIndex,
}));
Expand Down Expand Up @@ -564,6 +569,8 @@ pub struct Options {
#[cfg(any(target_os = "linux", target_os = "android"))]
pub cpu_set: CpuSet,
pub dpdk_ebpf_receiver: Option<Receiver<Box<packet::Packet<'static>>>>,
#[cfg(any(target_os = "linux", target_os = "android"))]
pub fanout_enabled: bool,
}

impl Options {
Expand Down Expand Up @@ -1141,28 +1148,8 @@ impl DispatcherBuilder {
PacketCaptureType::Analyzer => {
#[cfg(target_os = "linux")]
{
base.bpf_options
.lock()
.unwrap()
.bpf_syntax
.push(BpfSyntax::LoadExtension(LoadExtension {
num: Extension::ExtType,
}));
base.bpf_options
.lock()
.unwrap()
.bpf_syntax
.push(BpfSyntax::JumpIf(JumpIf {
cond: JumpTest::JumpNotEqual,
val: public::enums::LinuxSllPacketType::Outgoing as u32,
skip_true: 1,
..Default::default()
}));
base.bpf_options
.lock()
.unwrap()
.bpf_syntax
.push(BpfSyntax::RetConstant(RetConstant { val: 0 })); // Do not capture tx direction traffic
// Do not capture tx direction traffic
base.add_skip_outgoing();
}
#[cfg(target_os = "windows")]
{
Expand Down Expand Up @@ -1307,7 +1294,7 @@ impl DispatcherBuilder {
poll_timeout: POLL_TIMEOUT.as_nanos() as isize,
version: options.af_packet_version,
iface: src_interface.as_ref().unwrap_or(&"".to_string()).clone(),
packet_fanout_mode: if options.capture_mode == PacketCaptureType::Local {
packet_fanout_mode: if options.fanout_enabled {
Some(options.packet_fanout_mode)
} else {
None
Expand Down
2 changes: 1 addition & 1 deletion agent/src/dispatcher/recv_engine/af_packet/tpacket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ impl Tpacket {
return Ok(());
}
let Some(packet_fanout_mode) = self.opts.packet_fanout_mode else {
info!("Packet fanout can only be set in PacketCaptureType::Local mode");
info!("Packet fanout disabled.");
return Ok(());
};
// The first 16 bits encode the fanout group ID, and the second set of 16 bits encode the fanout mode and options.
Expand Down
54 changes: 36 additions & 18 deletions agent/src/trident.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1087,6 +1087,8 @@ fn component_on_config_change(
libvirt_xml_extractor.clone(),
#[cfg(target_os = "linux")]
None,
#[cfg(target_os = "linux")]
false,
) {
Ok(mut d) => {
d.start();
Expand All @@ -1102,8 +1104,13 @@ fn component_on_config_change(
}
PacketCaptureType::Mirror | PacketCaptureType::Analyzer => {
for d in components.dispatcher_components.iter() {
let links = get_listener_links(
conf,
#[cfg(target_os = "linux")]
&netns::NsFile::Root,
);
d.dispatcher_listener.on_tap_interface_change(
&vec![],
&links,
conf.if_mac_source,
conf.agent_type,
&blacklist,
Expand Down Expand Up @@ -1197,6 +1204,8 @@ fn component_on_config_change(
libvirt_xml_extractor.clone(),
#[cfg(target_os = "linux")]
None,
#[cfg(target_os = "linux")]
false,
) {
Ok(mut d) => {
d.start();
Expand Down Expand Up @@ -1920,9 +1929,7 @@ impl AgentComponents {
}

#[cfg(target_os = "linux")]
let local_dispatcher_count = if candidate_config.capture_mode == PacketCaptureType::Local
&& candidate_config.dispatcher.extra_netns_regex == ""
{
let mut packet_fanout_count = if candidate_config.dispatcher.extra_netns_regex == "" {
user_config
.inputs
.cbpf
Expand All @@ -1933,32 +1940,33 @@ impl AgentComponents {
1
};
#[cfg(any(target_os = "windows", target_os = "android"))]
let local_dispatcher_count = 1;
let packet_fanout_count = 1;

let links = get_listener_links(
&candidate_config.dispatcher,
#[cfg(target_os = "linux")]
&netns::NsFile::Root,
);
if interfaces_and_ns.is_empty() && !links.is_empty() {
if candidate_config.capture_mode != PacketCaptureType::Local {
for l in links {
if packet_fanout_count > 1 || candidate_config.capture_mode == PacketCaptureType::Local
{
for _ in 0..packet_fanout_count {
#[cfg(target_os = "linux")]
interfaces_and_ns.push((vec![l], netns::NsFile::Root));
interfaces_and_ns.push((links.clone(), netns::NsFile::Root));
#[cfg(any(target_os = "windows", target_os = "android"))]
interfaces_and_ns.push(vec![l]);
interfaces_and_ns.push(links.clone());
}
} else {
for _ in 0..local_dispatcher_count {
for l in links {
#[cfg(target_os = "linux")]
interfaces_and_ns.push((links.clone(), netns::NsFile::Root));
interfaces_and_ns.push((vec![l], netns::NsFile::Root));
#[cfg(any(target_os = "windows", target_os = "android"))]
interfaces_and_ns.push(links.clone());
interfaces_and_ns.push(vec![l]);
}
}
}
#[cfg(target_os = "linux")]
if candidate_config.capture_mode == PacketCaptureType::Mirror
if candidate_config.capture_mode != PacketCaptureType::Local
&& (!user_config
.inputs
.cbpf
Expand All @@ -1968,6 +1976,7 @@ impl AgentComponents {
.is_empty()
|| candidate_config.dispatcher.dpdk_source != DpdkSource::None)
{
packet_fanout_count = 1;
interfaces_and_ns = vec![(vec![], netns::NsFile::Root)];
}

Expand Down Expand Up @@ -2370,6 +2379,10 @@ impl AgentComponents {
libvirt_xml_extractor.clone(),
#[cfg(target_os = "linux")]
dpdk_ebpf_receiver.take(),
#[cfg(target_os = "linux")]
{
packet_fanout_count > 1
},
)?;
dispatcher_components.push(dispatcher_component);
}
Expand Down Expand Up @@ -3158,6 +3171,7 @@ fn build_dispatchers(
#[cfg(target_os = "linux")] kubernetes_poller: Arc<GenericPoller>,
#[cfg(target_os = "linux")] libvirt_xml_extractor: Arc<LibvirtXmlExtractor>,
#[cfg(target_os = "linux")] dpdk_ebpf_receiver: Option<Receiver<Box<packet::Packet<'static>>>>,
#[cfg(target_os = "linux")] fanout_enabled: bool,
) -> Result<DispatcherComponent> {
let candidate_config = &config_handler.candidate_config;
let user_config = &candidate_config.user_config;
Expand Down Expand Up @@ -3274,7 +3288,7 @@ fn build_dispatchers(
)),
]));

let pcap_interfaces = if candidate_config.capture_mode == PacketCaptureType::Mirror
let pcap_interfaces = if candidate_config.capture_mode != PacketCaptureType::Local
&& candidate_config
.user_config
.inputs
Expand Down Expand Up @@ -3327,6 +3341,8 @@ fn build_dispatchers(
cpu_set: dispatcher_config.cpu_set,
#[cfg(target_os = "linux")]
dpdk_ebpf_receiver,
#[cfg(target_os = "linux")]
fanout_enabled,
..Default::default()
})))
.bpf_options(bpf_options)
Expand Down Expand Up @@ -3359,13 +3375,15 @@ fn build_dispatchers(
.policy_getter(policy_getter)
.exception_handler(exception_handler.clone())
.ntp_diff(synchronizer.ntp_diff())
.src_interface(
if candidate_config.capture_mode != PacketCaptureType::Local {
.src_interface(if !fanout_enabled {
if cfg!(any(target_os = "linux", target_os = "android")) {
src_link.name.clone()
} else {
"".into()
},
)
}
} else {
"".into()
})
.agent_type(dispatcher_config.agent_type)
.queue_debugger(queue_debugger.clone())
.analyzer_queue_size(user_config.inputs.cbpf.tunning.raw_packet_queue_size)
Expand Down

0 comments on commit 09080f0

Please sign in to comment.