Skip to content

Commit

Permalink
feat: support skywalking integration
Browse files Browse the repository at this point in the history
  • Loading branch information
taloric committed Oct 9, 2024
1 parent 481d7e1 commit 5b15798
Show file tree
Hide file tree
Showing 7 changed files with 195 additions and 1 deletion.
13 changes: 13 additions & 0 deletions agent/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ tonic = "0.8.1"
wasmtime = "12.0.1"
wasmtime-wasi = "12.0.1"
zstd = "0.13.2"
integration_skywalking = { path = "plugins/integration_skywalking" }


[target.'cfg(any(target_os = "linux", target_os = "android"))'.dependencies]
Expand Down
2 changes: 2 additions & 0 deletions agent/crates/public/src/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub enum SendMessageType {
// K8sEvent = 16,
ApplicationLog = 17,
SyslogDetail = 18,
SkyWalking = 19,
}

impl fmt::Display for SendMessageType {
Expand All @@ -78,6 +79,7 @@ impl fmt::Display for SendMessageType {
Self::AlarmEvent => write!(f, "alarm_event"),
Self::ApplicationLog => write!(f, "application_log"),
Self::SyslogDetail => write!(f, "syslog_detail"),
Self::SkyWalking => write!(f, "skywalking"),
}
}
}
12 changes: 12 additions & 0 deletions agent/plugins/integration_skywalking/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[package]
name = "integration_skywalking"
version = "0.1.0"
edition = "2021"

[dependencies]
hyper = { version = "0.14", features = ["full"] }
bytes = "1.0"
tokio = { version = "1.20.1", features = ["full"] }
public = { path = "../../crates/public" }
prost = "0.11"
log = "0.4"
93 changes: 93 additions & 0 deletions agent/plugins/integration_skywalking/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
use bytes::{BufMut, BytesMut};
use hyper::{
header::{HeaderValue, CONTENT_TYPE},
Body, HeaderMap, Request, Response, StatusCode, Version,
};
use log::warn;
use prost::{EncodeError, Message};
use public::{
proto::metric,
queue::DebugSender,
sender::{SendMessageType, Sendable},
};
use std::{
future::Future,
net::{IpAddr, SocketAddr},
};

const CONTENT_TYPE_GRPC: &str = "application/grpc";

#[derive(Debug, PartialEq)]
pub struct SkyWalkingExtra(pub metric::SkyWalkingExtra);

impl Sendable for SkyWalkingExtra {
fn encode(self, buf: &mut Vec<u8>) -> Result<usize, EncodeError> {
self.0.encode(buf).map(|_| self.0.encoded_len())
}

fn message_type(&self) -> SendMessageType {
SendMessageType::SkyWalking
}
}

pub async fn handle_skywalking_request<F, Fut>(
peer_addr: SocketAddr,
req: Request<Body>,
skywalking_sender: DebugSender<SkyWalkingExtra>,
decode_data: F,
) -> Response<Body>
where
F: FnOnce(Body, HeaderMap) -> Fut,
Fut: Future<Output = Result<Vec<u8>, Response<Body>>>,
{
let is_http2_request = req
.headers()
.get(CONTENT_TYPE.as_str())
.map(|v| v.as_bytes().starts_with(CONTENT_TYPE_GRPC.as_bytes()))
.unwrap_or(false);

// uniform sender send to remote server
let (parts, body) = req.into_parts();
let skywalking_data = decode_data(body, parts.headers.clone()).await.unwrap();

let mut skywalking_extra = metric::SkyWalkingExtra::default();
skywalking_extra.data = skywalking_data;
skywalking_extra.peer_ip = match peer_addr.ip() {
IpAddr::V4(ip4) => ip4.octets().to_vec(),
IpAddr::V6(ip6) => ip6.octets().to_vec(),
};

if let Err(e) = skywalking_sender.send(SkyWalkingExtra(skywalking_extra)) {
warn!("skywalking_sender failed to send data, because {:?}", e);
}
if is_http2_request {
empty_grpc_response()
} else {
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap()
}
}

pub fn empty_grpc_response() -> Response<Body> {
let mut response_buf = BytesMut::new();
response_buf.put_u8(0); // grpc not compression
response_buf.put_u32(0); // grpc data, return length = 0

let mut trailers = HeaderMap::new();
trailers.insert("grpc-status", HeaderValue::from_static("0"));

let (mut sender, body) = Body::channel();
tokio::spawn(async move {
sender.send_data(response_buf.freeze()).await.unwrap();
sender.send_trailers(trailers).await.unwrap();
});

Response::builder()
.version(Version::HTTP_2)
.status(StatusCode::OK)
.header(CONTENT_TYPE, HeaderValue::from_static(&CONTENT_TYPE_GRPC))
.body(body)
.unwrap()
}
42 changes: 41 additions & 1 deletion agent/src/integration_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ use crate::{
metric::document::{Direction, TapSide},
policy::PolicyGetter,
};

use integration_skywalking::{handle_skywalking_request, SkyWalkingExtra};
use public::{
counter::{Counter, CounterType, CounterValue, OwnedCountable},
enums::{CaptureNetworkType, EthernetType, L4Protocol},
Expand Down Expand Up @@ -595,6 +595,7 @@ async fn handler(
telegraf_sender: DebugSender<TelegrafMetric>,
profile_sender: DebugSender<Profile>,
application_log_sender: DebugSender<ApplicationLog>,
skywalking_sender: DebugSender<SkyWalkingExtra>,
exception_handler: ExceptionHandler,
compressed: bool,
profile_compressed: bool,
Expand Down Expand Up @@ -825,6 +826,38 @@ async fn handler(

Ok(Response::builder().body(Body::empty()).unwrap())
}
(
&Method::POST,
"/v3/segments"
| "/skywalking.v3.TraceSegmentReportService/collect"
| "/skywalking.v3.TraceSegmentReportService/collectInSync",
) => {
if external_trace_integration_disabled {
return Ok(Response::builder().body(Body::empty()).unwrap());
}
Ok(handle_skywalking_request(
peer_addr,
req,
skywalking_sender,
|body, headers| async move {
let whole_body =
match aggregate_with_catch_exception(body, &exception_handler).await {
Ok(b) => b,
Err(e) => {
return Err(e);
}
};
let data = match decode_metric(whole_body, &headers) {
Ok(d) => Ok(d),
Err(_) => {
return Err(Response::builder().body(Body::empty()).unwrap());
}
};
data
},
)
.await)
}
// Return the 404 Not Found for other routes.
_ => Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
Expand Down Expand Up @@ -925,6 +958,7 @@ pub struct MetricServer {
telegraf_sender: DebugSender<TelegrafMetric>,
profile_sender: DebugSender<Profile>,
application_log_sender: DebugSender<ApplicationLog>,
skywalking_sender: DebugSender<SkyWalkingExtra>,
port: Arc<AtomicU16>,
exception_handler: ExceptionHandler,
server_shutdown_tx: Mutex<Option<mpsc::Sender<()>>>,
Expand Down Expand Up @@ -952,6 +986,7 @@ impl MetricServer {
telegraf_sender: DebugSender<TelegrafMetric>,
profile_sender: DebugSender<Profile>,
application_log_sender: DebugSender<ApplicationLog>,
skywalking_sender: DebugSender<SkyWalkingExtra>,
port: u16,
exception_handler: ExceptionHandler,
compressed: bool,
Expand Down Expand Up @@ -980,6 +1015,7 @@ impl MetricServer {
telegraf_sender,
profile_sender,
application_log_sender,
skywalking_sender,
port: Arc::new(AtomicU16::new(port)),
exception_handler,
server_shutdown_tx: Default::default(),
Expand Down Expand Up @@ -1029,6 +1065,7 @@ impl MetricServer {
let telegraf_sender = self.telegraf_sender.clone();
let profile_sender = self.profile_sender.clone();
let application_log_sender = self.application_log_sender.clone();
let skywalking_sender = self.skywalking_sender.clone();
let port = self.port.clone();
let monitor_port = Arc::new(AtomicU16::new(port.load(Ordering::Acquire)));
let (mon_tx, mon_rx) = oneshot::channel();
Expand Down Expand Up @@ -1101,6 +1138,7 @@ impl MetricServer {
let telegraf_sender = telegraf_sender.clone();
let profile_sender = profile_sender.clone();
let application_log_sender = application_log_sender.clone();
let skywalking_sender = skywalking_sender.clone();
let exception_handler_inner = exception_handler.clone();
let counter = counter.clone();
let compressed = compressed.clone();
Expand All @@ -1118,6 +1156,7 @@ impl MetricServer {
let telegraf_sender = telegraf_sender.clone();
let profile_sender = profile_sender.clone();
let application_log_sender = application_log_sender.clone();
let skywalking_sender = skywalking_sender.clone();
let exception_handler = exception_handler_inner.clone();
let peer_addr = conn.remote_addr();
let counter = counter.clone();
Expand All @@ -1141,6 +1180,7 @@ impl MetricServer {
telegraf_sender.clone(),
profile_sender.clone(),
application_log_sender.clone(),
skywalking_sender.clone(),
exception_handler.clone(),
compressed.load(Ordering::Relaxed),
profile_compressed.load(Ordering::Relaxed),
Expand Down
33 changes: 33 additions & 0 deletions agent/src/trident.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use dns_lookup::lookup_host;
use flexi_logger::{
colored_opt_format, writers::LogWriter, Age, Cleanup, Criterion, FileSpec, Logger, Naming,
};
use integration_http2::SkyWalkingExtra;
use log::{debug, info, warn};
use tokio::runtime::{Builder, Runtime};
use tokio::sync::broadcast;
Expand Down Expand Up @@ -1557,6 +1558,7 @@ pub struct AgentComponents {
pub packet_sequence_uniform_sender: UniformSenderThread<BoxedPacketSequenceBlock>, // Enterprise Edition Feature: packet-sequence
pub proc_event_uniform_sender: UniformSenderThread<BoxedProcEvents>,
pub application_log_uniform_sender: UniformSenderThread<ApplicationLog>,
pub skywalking_uniform_sender: UniformSenderThread<SkyWalkingExtra>,
pub exception_handler: ExceptionHandler,
pub proto_log_sender: DebugSender<BoxAppProtoLogsData>,
pub pcap_batch_sender: DebugSender<BoxedPcapBatch>,
Expand Down Expand Up @@ -2429,6 +2431,31 @@ impl AgentComponents {
exception_handler.clone(),
None,
);
let skywalking_queue_name = "1-skywalking-to-sender";
let (skywalking_sender, skywalking_receiver, counter) = queue::bounded_with_debug(
user_config
.processors
.flow_log
.tunning
.flow_aggregator_queue_size,
skywalking_queue_name,
&queue_debugger,
);
stats_collector.register_countable(
&QueueStats {
module: skywalking_queue_name,
..Default::default()
},
Countable::Owned(Box::new(counter)),
);
let skywalking_uniform_sender = UniformSenderThread::new(
skywalking_queue_name,
Arc::new(skywalking_receiver),
config_handler.sender(),
stats_collector.clone(),
exception_handler.clone(),
None,
);

let ebpf_dispatcher_id = dispatcher_components.len();
#[cfg(any(target_os = "linux", target_os = "android"))]
Expand Down Expand Up @@ -2672,6 +2699,7 @@ impl AgentComponents {
telegraf_sender,
profile_sender,
application_log_sender,
skywalking_sender,
candidate_config.metric_server.port,
exception_handler.clone(),
candidate_config.metric_server.compressed,
Expand Down Expand Up @@ -2756,6 +2784,7 @@ impl AgentComponents {
profile_uniform_sender,
proc_event_uniform_sender,
application_log_uniform_sender,
skywalking_uniform_sender,
capture_mode: candidate_config.capture_mode,
packet_sequence_uniform_output, // Enterprise Edition Feature: packet-sequence
packet_sequence_uniform_sender, // Enterprise Edition Feature: packet-sequence
Expand Down Expand Up @@ -2844,6 +2873,7 @@ impl AgentComponents {
self.profile_uniform_sender.start();
self.proc_event_uniform_sender.start();
self.application_log_uniform_sender.start();
self.skywalking_uniform_sender.start();
if self.config.metric_server.enabled {
self.metrics_server_component.start();
}
Expand Down Expand Up @@ -2916,6 +2946,9 @@ impl AgentComponents {
if let Some(h) = self.application_log_uniform_sender.notify_stop() {
join_handles.push(h);
}
if let Some(h) = self.skywalking_uniform_sender.notify_stop() {
join_handles.push(h);
}
// Enterprise Edition Feature: packet-sequence
if let Some(h) = self.packet_sequence_uniform_sender.notify_stop() {
join_handles.push(h);
Expand Down

0 comments on commit 5b15798

Please sign in to comment.