Skip to content

Commit

Permalink
feat: sender add compress flag
Browse files Browse the repository at this point in the history
  • Loading branch information
taloric committed Dec 10, 2024
1 parent 76ee884 commit 7652a93
Show file tree
Hide file tree
Showing 10 changed files with 286 additions and 3 deletions.
15 changes: 15 additions & 0 deletions agent/src/config/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2411,13 +2411,28 @@ impl Default for Npb {
}
}

#[derive(Clone, Debug, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct OutputCompression {
pub application_log: bool,
}

impl Default for OutputCompression {
fn default() -> Self {
Self {
application_log: true,
}
}
}

#[derive(Clone, Default, Debug, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct Outputs {
pub socket: Socket,
pub flow_log: OutputsFlowLog,
pub flow_metrics: FlowMetrics,
pub npb: Npb,
pub compression: OutputCompression,
}

#[derive(Clone, Default, Debug, Deserialize, PartialEq, Eq)]
Expand Down
10 changes: 10 additions & 0 deletions agent/src/config/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1490,6 +1490,7 @@ pub struct MetricServerConfig {
pub port: u16,
pub compressed: bool,
pub profile_compressed: bool,
pub application_log_compressed: bool,
}

#[derive(Clone, Debug, PartialEq)]
Expand Down Expand Up @@ -2054,6 +2055,7 @@ impl TryFrom<(Config, UserConfig)> for ModuleConfig {
port: conf.inputs.integration.listen_port,
compressed: conf.inputs.integration.compression.trace,
profile_compressed: conf.inputs.integration.compression.profile,
application_log_compressed: conf.outputs.compression.application_log,
},
agent_type: conf.global.common.agent_type,
port_config: PortConfig {
Expand Down Expand Up @@ -4125,6 +4127,14 @@ impl ConfigHandler {
npb.target_port = new_npb.target_port;
restart_agent = !first_run;
}
if outputs.compression != new_outputs.compression {
info!(
"Update outputs.compression from {:?} to {:?}.",
outputs.compression, new_outputs.compression
);
outputs.compression = new_outputs.compression.clone();
restart_agent = !first_run;
}

// plugins
let plugins = &mut config.plugins;
Expand Down
35 changes: 32 additions & 3 deletions agent/src/sender/uniform_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use super::{get_sender_id, QUEUE_BATCH_SIZE};

use crate::config::handler::SenderAccess;
use crate::exception::ExceptionHandler;
use crate::trident::SenderEncoder;
use crate::utils::stats::{
self, Collector, Countable, Counter, CounterType, CounterValue, RefCountable,
};
Expand Down Expand Up @@ -106,6 +107,7 @@ struct Header {
}

impl Header {
const HEADER_LEN: usize = 19;
fn encode(&self, buffer: &mut Vec<u8>) {
buffer.extend_from_slice(self.frame_size.to_be_bytes().as_slice());
buffer.push(self.msg_type.into());
Expand All @@ -129,7 +131,7 @@ struct Encoder<T> {

impl<T: Sendable> Encoder<T> {
const BUFFER_LEN: usize = 256 << 10;
pub fn new(id: usize, msg_type: SendMessageType, agent_id: u16) -> Self {
pub fn new(id: usize, msg_type: SendMessageType, agent_id: u16, encoder: u8) -> Self {
Self {
id,
buffer: Vec::with_capacity(Self::BUFFER_LEN),
Expand All @@ -142,7 +144,7 @@ impl<T: Sendable> Encoder<T> {
agent_id: agent_id,
reserved_1: 0,
reserved_2: 0,
encoder: 0,
encoder: encoder,
},
_marker: PhantomData,
}
Expand Down Expand Up @@ -196,6 +198,22 @@ impl<T: Sendable> Encoder<T> {
}
}

pub fn compress_buffer(&mut self) {
let buffer_len = self.buffer_len();
match SenderEncoder::from(self.header.encoder).encode(&self.buffer[Header::HEADER_LEN..]) {
Ok(result) => {
if let Some(data) = result {
self.buffer.truncate(Header::HEADER_LEN);
self.buffer.extend_from_slice(&data);
debug!("compressed from {} to {}", buffer_len, data.len());
}
}
Err(e) => {
error!("compression failed {}", e);
}
};
}

pub fn buffer_len(&self) -> usize {
self.buffer.len()
}
Expand All @@ -222,6 +240,7 @@ pub struct UniformSenderThread<T> {
exception_handler: ExceptionHandler,

private_shared_conn: Option<Arc<Mutex<Connection>>>,
sender_encoder: SenderEncoder,
}

impl<T: Sendable> UniformSenderThread<T> {
Expand All @@ -232,6 +251,7 @@ impl<T: Sendable> UniformSenderThread<T> {
stats: Arc<Collector>,
exception_handler: ExceptionHandler,
private_shared_conn: Option<Arc<Mutex<Connection>>>,
sender_encoder: SenderEncoder,
) -> Self {
let running = Arc::new(AtomicBool::new(false));
Self {
Expand All @@ -244,6 +264,7 @@ impl<T: Sendable> UniformSenderThread<T> {
stats,
exception_handler,
private_shared_conn,
sender_encoder,
}
}

Expand All @@ -265,6 +286,7 @@ impl<T: Sendable> UniformSenderThread<T> {
self.stats.clone(),
self.exception_handler.clone(),
self.private_shared_conn.clone(),
self.sender_encoder,
);
self.thread_handle = Some(
thread::Builder::new()
Expand Down Expand Up @@ -381,14 +403,20 @@ impl<T: Sendable> UniformSender<T> {
stats: Arc<Collector>,
exception_handler: ExceptionHandler,
private_shared_conn: Option<Arc<Mutex<Connection>>>,
sender_encoder: SenderEncoder,
) -> Self {
let cfg = config.load();
Self {
id,
name,
input,
counter: Arc::new(SenderCounter::default()),
encoder: Encoder::new(0, SendMessageType::TaggedFlow, cfg.agent_id),
encoder: Encoder::new(
0,
SendMessageType::TaggedFlow,
cfg.agent_id,
u8::from(sender_encoder),
),
config,
private_conn: Mutex::new(Connection::new()),
private_shared_conn,
Expand Down Expand Up @@ -468,6 +496,7 @@ impl<T: Sendable> UniformSender<T> {
fn flush_encoder(&mut self) {
self.cached = true;
if self.encoder.buffer_len() > 0 {
self.encoder.compress_buffer();
self.encoder.set_header_frame_size();
self.send_buffer();
self.encoder.reset_buffer();
Expand Down
81 changes: 81 additions & 0 deletions agent/src/trident.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use std::env;
use std::fmt;
use std::fs;
use std::io::Write;
use std::mem;
use std::net::SocketAddr;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
Expand All @@ -32,12 +33,18 @@ use std::time::Duration;
use anyhow::{anyhow, Result};
use arc_swap::access::Access;
use dns_lookup::lookup_host;
use flate2::{
write::{GzEncoder, ZlibEncoder},
Compression,
};
use flexi_logger::{
colored_opt_format, writers::LogWriter, Age, Cleanup, Criterion, FileSpec, Logger, Naming,
};
use log::{debug, info, warn};
use num_enum::{FromPrimitive, IntoPrimitive};
use tokio::runtime::{Builder, Runtime};
use tokio::sync::broadcast;
use zstd::Encoder as ZstdEncoder;

use crate::{
collector::{
Expand Down Expand Up @@ -247,6 +254,47 @@ impl From<&AgentId> for trident::AgentId {
}
}

#[derive(Clone, Copy, PartialEq, Eq, Debug, FromPrimitive, IntoPrimitive, num_enum::Default)]
#[repr(u8)]
pub enum SenderEncoder {
#[num_enum(default)]
Raw = 0,

Zlib = 1,
Gzip = 2,
Zstd = 3,
}

impl SenderEncoder {
pub fn encode(&self, encode_buffer: &[u8]) -> std::io::Result<Option<Vec<u8>>> {
let result = match self {
Self::Raw => None,
Self::Zlib => {
let mut encoder = ZlibEncoder::new(
Vec::with_capacity(encode_buffer.len()),
Compression::default(),
);
encoder.write_all(&encode_buffer)?;
Some(encoder.finish()?)
}
Self::Gzip => {
let mut encoder = GzEncoder::new(
Vec::with_capacity(encode_buffer.len()),
Compression::default(),
);
encoder.write_all(&encode_buffer)?;
Some(encoder.finish()?)
}
Self::Zstd => {
let mut encoder = ZstdEncoder::new(Vec::with_capacity(encode_buffer.len()), 0)?;
encoder.write_all(&encode_buffer)?;
Some(encoder.finish()?)
}
};
Ok(result)
}
}

pub struct Trident {
state: AgentState,
handle: Option<JoinHandle<()>>,
Expand Down Expand Up @@ -326,6 +374,7 @@ impl Trident {
stats_collector.clone(),
exception_handler.clone(),
Some(log_stats_shared_connection.clone()),
SenderEncoder::Raw,
);
stats_sender.start();

Expand Down Expand Up @@ -2178,6 +2227,7 @@ impl AgentComponents {
stats_collector.clone(),
exception_handler.clone(),
None,
SenderEncoder::Raw,
);

let metrics_queue_name = "3-doc-to-collector-sender";
Expand All @@ -2200,6 +2250,7 @@ impl AgentComponents {
stats_collector.clone(),
exception_handler.clone(),
None,
SenderEncoder::Raw,
);

let proto_log_queue_name = "2-protolog-to-collector-sender";
Expand All @@ -2222,6 +2273,7 @@ impl AgentComponents {
stats_collector.clone(),
exception_handler.clone(),
None,
SenderEncoder::Raw,
);

let analyzer_ip = if candidate_config
Expand Down Expand Up @@ -2289,6 +2341,7 @@ impl AgentComponents {
stats_collector.clone(),
exception_handler.clone(),
Some(pcap_packet_shared_connection.clone()),
SenderEncoder::Raw,
);
// Enterprise Edition Feature: packet-sequence
let packet_sequence_queue_name = "2-packet-sequence-block-to-sender";
Expand All @@ -2314,6 +2367,7 @@ impl AgentComponents {
stats_collector.clone(),
exception_handler.clone(),
Some(pcap_packet_shared_connection),
SenderEncoder::Raw,
);

let bpf_builder = bpf::Builder {
Expand Down Expand Up @@ -2424,6 +2478,7 @@ impl AgentComponents {
stats_collector.clone(),
exception_handler.clone(),
None,
SenderEncoder::Raw,
);

let profile_queue_name = "1-profile-to-sender";
Expand All @@ -2446,6 +2501,9 @@ impl AgentComponents {
stats_collector.clone(),
exception_handler.clone(),
None,
// profiler compress is a special one, it requires compressed and directly write into db
// so we compress profile data inside and not compress secondly
SenderEncoder::Raw,
);
let application_log_queue_name = "1-application-log-to-sender";
let (application_log_sender, application_log_receiver, counter) = queue::bounded_with_debug(
Expand All @@ -2471,6 +2529,11 @@ impl AgentComponents {
stats_collector.clone(),
exception_handler.clone(),
None,
if candidate_config.metric_server.application_log_compressed {
SenderEncoder::Zlib
} else {
SenderEncoder::Raw
},
);

let skywalking_queue_name = "1-skywalking-to-sender";
Expand All @@ -2497,6 +2560,11 @@ impl AgentComponents {
stats_collector.clone(),
exception_handler.clone(),
None,
if candidate_config.metric_server.compressed {
SenderEncoder::Zlib
} else {
SenderEncoder::Raw
},
);

let datadog_queue_name = "1-datadog-to-sender";
Expand All @@ -2523,6 +2591,11 @@ impl AgentComponents {
stats_collector.clone(),
exception_handler.clone(),
None,
if candidate_config.metric_server.compressed {
SenderEncoder::Zlib
} else {
SenderEncoder::Raw
},
);

let ebpf_dispatcher_id = dispatcher_components.len();
Expand Down Expand Up @@ -2647,6 +2720,11 @@ impl AgentComponents {
stats_collector.clone(),
exception_handler.clone(),
None,
if candidate_config.metric_server.compressed {
SenderEncoder::Zlib
} else {
SenderEncoder::Raw
},
);

let otel_dispatcher_id = ebpf_dispatcher_id + 1;
Expand Down Expand Up @@ -2705,6 +2783,7 @@ impl AgentComponents {
stats_collector.clone(),
exception_handler.clone(),
Some(prometheus_telegraf_shared_connection.clone()),
SenderEncoder::Raw,
);

let telegraf_queue_name = "1-telegraf-to-sender";
Expand All @@ -2731,6 +2810,7 @@ impl AgentComponents {
stats_collector.clone(),
exception_handler.clone(),
Some(prometheus_telegraf_shared_connection),
SenderEncoder::Raw,
);

let compressed_otel_queue_name = "1-compressed-otel-to-sender";
Expand All @@ -2757,6 +2837,7 @@ impl AgentComponents {
stats_collector.clone(),
exception_handler.clone(),
None,
SenderEncoder::Raw,
);

let (external_metrics_server, external_metrics_counter) = MetricServer::new(
Expand Down
Loading

0 comments on commit 7652a93

Please sign in to comment.