diff --git a/agent/src/dispatcher/analyzer_mode_dispatcher.rs b/agent/src/dispatcher/analyzer_mode_dispatcher.rs index b3c606360b03..7e3c2c889c16 100644 --- a/agent/src/dispatcher/analyzer_mode_dispatcher.rs +++ b/agent/src/dispatcher/analyzer_mode_dispatcher.rs @@ -83,9 +83,9 @@ impl AnalyzerModeDispatcherListener { &self.base.netns } - pub fn on_tap_interface_change(&self, _: &[Link], _: IfMacSource) { + pub fn on_tap_interface_change(&self, links: &[Link], _: IfMacSource) { self.base - .on_tap_interface_change(vec![], IfMacSource::IfMac); + .on_tap_interface_change(links.to_vec(), IfMacSource::IfMac); } pub fn on_vm_change(&self, vm_mac_addrs: &[MacAddr], gateway_vmac_addrs: &[MacAddr]) { @@ -581,6 +581,7 @@ impl AnalyzerModeDispatcher { base.check_and_update_bpf(); continue; } + if base.pause.load(Ordering::Relaxed) { continue; } @@ -602,6 +603,10 @@ impl AnalyzerModeDispatcher { if_index: 0, }; batch.push(info); + + drop(packet); + + base.check_and_update_bpf(); } if let Some(handler) = self.flow_generator_thread_handler.take() { let _ = handler.join(); diff --git a/agent/src/dispatcher/base_dispatcher.rs b/agent/src/dispatcher/base_dispatcher.rs index 37617fa22d3c..b7b490c4ad33 100644 --- a/agent/src/dispatcher/base_dispatcher.rs +++ b/agent/src/dispatcher/base_dispatcher.rs @@ -33,6 +33,8 @@ use super::{ recv_engine::{self, bpf, RecvEngine}, BpfOptions, Options, PacketCounter, Pipeline, }; +#[cfg(any(target_os = "linux", target_os = "android"))] +pub use recv_engine::af_packet::{bpf::*, BpfSyntax}; use special_recv_engine::Libpcap; @@ -419,6 +421,27 @@ impl BaseDispatcher { false } + pub fn add_skip_outgoing(&self) { + let mut syntax = vec![ + BpfSyntax::LoadExtension(LoadExtension { + num: Extension::ExtType, + }), + BpfSyntax::JumpIf(JumpIf { + cond: JumpTest::JumpNotEqual, + val: public::enums::LinuxSllPacketType::Outgoing as u32, + skip_true: 1, + ..Default::default() + }), + BpfSyntax::RetConstant(RetConstant { val: 0 }), + ]; + + self.bpf_options + .lock() + .unwrap() + .bpf_syntax + .append(&mut syntax); + } + pub(super) fn init(&mut self) -> Result<()> { match self.engine.init() { Ok(_) => { diff --git a/agent/src/dispatcher/mirror_mode_dispatcher.rs b/agent/src/dispatcher/mirror_mode_dispatcher.rs index 902961873134..ead1d404654d 100644 --- a/agent/src/dispatcher/mirror_mode_dispatcher.rs +++ b/agent/src/dispatcher/mirror_mode_dispatcher.rs @@ -81,11 +81,11 @@ impl MirrorModeDispatcherListener { &self.base.netns } - pub fn on_tap_interface_change(&self, _: &[Link], _: IfMacSource, agent_type: AgentType) { + pub fn on_tap_interface_change(&self, links: &[Link], _: IfMacSource, agent_type: AgentType) { let mut old_agent_type = self.agent_type.write().unwrap(); *old_agent_type = agent_type; self.base - .on_tap_interface_change(vec![], IfMacSource::IfMac); + .on_tap_interface_change(links.to_vec(), IfMacSource::IfMac); } pub fn on_vm_change_with_bridge_macs( @@ -729,6 +729,10 @@ impl MirrorModeDispatcher { self.base.npb_dedup_enabled.load(Ordering::Relaxed), ); } + + drop(packet); + + self.base.check_and_update_bpf(); } self.pipelines.clear(); diff --git a/agent/src/dispatcher/mirror_plus_mode_dispatcher.rs b/agent/src/dispatcher/mirror_plus_mode_dispatcher.rs index 0ee881b384d4..1702b93f548e 100644 --- a/agent/src/dispatcher/mirror_plus_mode_dispatcher.rs +++ b/agent/src/dispatcher/mirror_plus_mode_dispatcher.rs @@ -78,11 +78,11 @@ impl MirrorPlusModeDispatcherListener { &self.base.netns } - pub fn on_tap_interface_change(&self, _: &[Link], _: IfMacSource, agent_type: AgentType) { + pub fn on_tap_interface_change(&self, links: &[Link], _: IfMacSource, agent_type: AgentType) { let mut old_agent_type = self.agent_type.write().unwrap(); *old_agent_type = agent_type; self.base - .on_tap_interface_change(vec![], IfMacSource::IfMac); + .on_tap_interface_change(links.to_vec(), IfMacSource::IfMac); } pub fn on_vm_change_with_bridge_macs( diff --git a/agent/src/dispatcher/mod.rs b/agent/src/dispatcher/mod.rs index 38947bb295c0..5a8ed2e785b9 100644 --- a/agent/src/dispatcher/mod.rs +++ b/agent/src/dispatcher/mod.rs @@ -370,6 +370,11 @@ impl BpfOptions { ) -> Vec { let mut bpf_syntax = self.bpf_syntax.clone(); + if tap_interfaces.is_empty() { + bpf_syntax.push(BpfSyntax::RetConstant(RetConstant { val: 0 })); + return bpf_syntax; + } + bpf_syntax.push(BpfSyntax::LoadExtension(LoadExtension { num: Extension::ExtInterfaceIndex, })); @@ -564,6 +569,8 @@ pub struct Options { #[cfg(any(target_os = "linux", target_os = "android"))] pub cpu_set: CpuSet, pub dpdk_ebpf_receiver: Option>>>, + #[cfg(any(target_os = "linux", target_os = "android"))] + pub fanout_enabled: bool, } impl Options { @@ -1141,28 +1148,8 @@ impl DispatcherBuilder { PacketCaptureType::Analyzer => { #[cfg(target_os = "linux")] { - base.bpf_options - .lock() - .unwrap() - .bpf_syntax - .push(BpfSyntax::LoadExtension(LoadExtension { - num: Extension::ExtType, - })); - base.bpf_options - .lock() - .unwrap() - .bpf_syntax - .push(BpfSyntax::JumpIf(JumpIf { - cond: JumpTest::JumpNotEqual, - val: public::enums::LinuxSllPacketType::Outgoing as u32, - skip_true: 1, - ..Default::default() - })); - base.bpf_options - .lock() - .unwrap() - .bpf_syntax - .push(BpfSyntax::RetConstant(RetConstant { val: 0 })); // Do not capture tx direction traffic + // Do not capture tx direction traffic + base.add_skip_outgoing(); } #[cfg(target_os = "windows")] { @@ -1307,7 +1294,7 @@ impl DispatcherBuilder { poll_timeout: POLL_TIMEOUT.as_nanos() as isize, version: options.af_packet_version, iface: src_interface.as_ref().unwrap_or(&"".to_string()).clone(), - packet_fanout_mode: if options.capture_mode == PacketCaptureType::Local { + packet_fanout_mode: if options.fanout_enabled { Some(options.packet_fanout_mode) } else { None diff --git a/agent/src/dispatcher/recv_engine/af_packet/tpacket.rs b/agent/src/dispatcher/recv_engine/af_packet/tpacket.rs index 75cc3684a65e..ea8572ac96e6 100644 --- a/agent/src/dispatcher/recv_engine/af_packet/tpacket.rs +++ b/agent/src/dispatcher/recv_engine/af_packet/tpacket.rs @@ -221,7 +221,7 @@ impl Tpacket { return Ok(()); } let Some(packet_fanout_mode) = self.opts.packet_fanout_mode else { - info!("Packet fanout can only be set in PacketCaptureType::Local mode"); + info!("Packet fanout disabled."); return Ok(()); }; // The first 16 bits encode the fanout group ID, and the second set of 16 bits encode the fanout mode and options. diff --git a/agent/src/trident.rs b/agent/src/trident.rs index 0aea547561c5..0ee890d0a482 100644 --- a/agent/src/trident.rs +++ b/agent/src/trident.rs @@ -1087,6 +1087,8 @@ fn component_on_config_change( libvirt_xml_extractor.clone(), #[cfg(target_os = "linux")] None, + #[cfg(target_os = "linux")] + false, ) { Ok(mut d) => { d.start(); @@ -1102,8 +1104,13 @@ fn component_on_config_change( } PacketCaptureType::Mirror | PacketCaptureType::Analyzer => { for d in components.dispatcher_components.iter() { + let links = get_listener_links( + conf, + #[cfg(target_os = "linux")] + &netns::NsFile::Root, + ); d.dispatcher_listener.on_tap_interface_change( - &vec![], + &links, conf.if_mac_source, conf.agent_type, &blacklist, @@ -1197,6 +1204,8 @@ fn component_on_config_change( libvirt_xml_extractor.clone(), #[cfg(target_os = "linux")] None, + #[cfg(target_os = "linux")] + false, ) { Ok(mut d) => { d.start(); @@ -1920,9 +1929,7 @@ impl AgentComponents { } #[cfg(target_os = "linux")] - let local_dispatcher_count = if candidate_config.capture_mode == PacketCaptureType::Local - && candidate_config.dispatcher.extra_netns_regex == "" - { + let mut packet_fanout_count = if candidate_config.dispatcher.extra_netns_regex == "" { user_config .inputs .cbpf @@ -1933,7 +1940,7 @@ impl AgentComponents { 1 }; #[cfg(any(target_os = "windows", target_os = "android"))] - let local_dispatcher_count = 1; + let packet_fanout_count = 1; let links = get_listener_links( &candidate_config.dispatcher, @@ -1941,24 +1948,25 @@ impl AgentComponents { &netns::NsFile::Root, ); if interfaces_and_ns.is_empty() && !links.is_empty() { - if candidate_config.capture_mode != PacketCaptureType::Local { - for l in links { + if packet_fanout_count > 1 || candidate_config.capture_mode == PacketCaptureType::Local + { + for _ in 0..packet_fanout_count { #[cfg(target_os = "linux")] - interfaces_and_ns.push((vec![l], netns::NsFile::Root)); + interfaces_and_ns.push((links.clone(), netns::NsFile::Root)); #[cfg(any(target_os = "windows", target_os = "android"))] - interfaces_and_ns.push(vec![l]); + interfaces_and_ns.push(links.clone()); } } else { - for _ in 0..local_dispatcher_count { + for l in links { #[cfg(target_os = "linux")] - interfaces_and_ns.push((links.clone(), netns::NsFile::Root)); + interfaces_and_ns.push((vec![l], netns::NsFile::Root)); #[cfg(any(target_os = "windows", target_os = "android"))] - interfaces_and_ns.push(links.clone()); + interfaces_and_ns.push(vec![l]); } } } #[cfg(target_os = "linux")] - if candidate_config.capture_mode == PacketCaptureType::Mirror + if candidate_config.capture_mode != PacketCaptureType::Local && (!user_config .inputs .cbpf @@ -1968,6 +1976,7 @@ impl AgentComponents { .is_empty() || candidate_config.dispatcher.dpdk_source != DpdkSource::None) { + packet_fanout_count = 1; interfaces_and_ns = vec![(vec![], netns::NsFile::Root)]; } @@ -2370,6 +2379,10 @@ impl AgentComponents { libvirt_xml_extractor.clone(), #[cfg(target_os = "linux")] dpdk_ebpf_receiver.take(), + #[cfg(target_os = "linux")] + { + packet_fanout_count > 1 + }, )?; dispatcher_components.push(dispatcher_component); } @@ -3158,6 +3171,7 @@ fn build_dispatchers( #[cfg(target_os = "linux")] kubernetes_poller: Arc, #[cfg(target_os = "linux")] libvirt_xml_extractor: Arc, #[cfg(target_os = "linux")] dpdk_ebpf_receiver: Option>>>, + #[cfg(target_os = "linux")] fanout_enabled: bool, ) -> Result { let candidate_config = &config_handler.candidate_config; let user_config = &candidate_config.user_config; @@ -3274,7 +3288,7 @@ fn build_dispatchers( )), ])); - let pcap_interfaces = if candidate_config.capture_mode == PacketCaptureType::Mirror + let pcap_interfaces = if candidate_config.capture_mode != PacketCaptureType::Local && candidate_config .user_config .inputs @@ -3327,6 +3341,8 @@ fn build_dispatchers( cpu_set: dispatcher_config.cpu_set, #[cfg(target_os = "linux")] dpdk_ebpf_receiver, + #[cfg(target_os = "linux")] + fanout_enabled, ..Default::default() }))) .bpf_options(bpf_options) @@ -3359,13 +3375,15 @@ fn build_dispatchers( .policy_getter(policy_getter) .exception_handler(exception_handler.clone()) .ntp_diff(synchronizer.ntp_diff()) - .src_interface( - if candidate_config.capture_mode != PacketCaptureType::Local { + .src_interface(if !fanout_enabled { + if cfg!(any(target_os = "linux", target_os = "android")) { src_link.name.clone() } else { "".into() - }, - ) + } + } else { + "".into() + }) .agent_type(dispatcher_config.agent_type) .queue_debugger(queue_debugger.clone()) .analyzer_queue_size(user_config.inputs.cbpf.tunning.raw_packet_queue_size)