Skip to content

Commit

Permalink
feat: support fanout in mirror mode and analyzer mode
Browse files Browse the repository at this point in the history
  • Loading branch information
yuanchaoa committed Dec 2, 2024
1 parent ff2d68f commit fe272c2
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 24 deletions.
23 changes: 23 additions & 0 deletions agent/src/dispatcher/base_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use super::{
recv_engine::{self, bpf, RecvEngine},
BpfOptions, Options, PacketCounter, Pipeline,
};
#[cfg(any(target_os = "linux", target_os = "android"))]
pub use recv_engine::af_packet::{bpf::*, BpfSyntax};

use special_recv_engine::Libpcap;

Expand Down Expand Up @@ -419,6 +421,27 @@ impl BaseDispatcher {
false
}

pub fn add_skip_outgoing(&self) {
let mut syntax = vec![
BpfSyntax::LoadExtension(LoadExtension {
num: Extension::ExtType,
}),
BpfSyntax::JumpIf(JumpIf {
cond: JumpTest::JumpNotEqual,
val: public::enums::LinuxSllPacketType::Outgoing as u32,
skip_true: 1,
..Default::default()
}),
BpfSyntax::RetConstant(RetConstant { val: 0 }),
];

self.bpf_options
.lock()
.unwrap()
.bpf_syntax
.append(&mut syntax);
}

pub(super) fn init(&mut self) -> Result<()> {
match self.engine.init() {
Ok(_) => {
Expand Down
30 changes: 7 additions & 23 deletions agent/src/dispatcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,8 @@ pub struct Options {
#[cfg(any(target_os = "linux", target_os = "android"))]
pub cpu_set: CpuSet,
pub dpdk_ebpf_receiver: Option<Receiver<Box<packet::Packet<'static>>>>,
#[cfg(any(target_os = "linux", target_os = "android"))]
pub fanout_enabled: bool,
}

impl Options {
Expand Down Expand Up @@ -1141,28 +1143,8 @@ impl DispatcherBuilder {
PacketCaptureType::Analyzer => {
#[cfg(target_os = "linux")]
{
base.bpf_options
.lock()
.unwrap()
.bpf_syntax
.push(BpfSyntax::LoadExtension(LoadExtension {
num: Extension::ExtType,
}));
base.bpf_options
.lock()
.unwrap()
.bpf_syntax
.push(BpfSyntax::JumpIf(JumpIf {
cond: JumpTest::JumpNotEqual,
val: public::enums::LinuxSllPacketType::Outgoing as u32,
skip_true: 1,
..Default::default()
}));
base.bpf_options
.lock()
.unwrap()
.bpf_syntax
.push(BpfSyntax::RetConstant(RetConstant { val: 0 })); // Do not capture tx direction traffic
// Do not capture tx direction traffic
base.add_skip_outgoing();
}
#[cfg(target_os = "windows")]
{
Expand Down Expand Up @@ -1307,7 +1289,9 @@ impl DispatcherBuilder {
poll_timeout: POLL_TIMEOUT.as_nanos() as isize,
version: options.af_packet_version,
iface: src_interface.as_ref().unwrap_or(&"".to_string()).clone(),
packet_fanout_mode: if options.capture_mode == PacketCaptureType::Local {
packet_fanout_mode: if src_interface.as_ref().is_some()
&& !src_interface.as_ref().unwrap().is_empty()
{
Some(options.packet_fanout_mode)
} else {
None
Expand Down
2 changes: 1 addition & 1 deletion agent/src/dispatcher/recv_engine/af_packet/tpacket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ impl Tpacket {
return Ok(());
}
let Some(packet_fanout_mode) = self.opts.packet_fanout_mode else {
info!("Packet fanout can only be set in PacketCaptureType::Local mode");
info!("Packet fanout disabled.");
return Ok(());
};
// The first 16 bits encode the fanout group ID, and the second set of 16 bits encode the fanout mode and options.
Expand Down

0 comments on commit fe272c2

Please sign in to comment.