diff --git a/agent/Cargo.lock b/agent/Cargo.lock index c1e3e854332d..37e4fab808d7 100644 --- a/agent/Cargo.lock +++ b/agent/Cargo.lock @@ -1009,6 +1009,7 @@ dependencies = [ "http2", "humantime-serde", "hyper 0.14.26", + "integration_skywalking", "ipnet", "ipnetwork", "k8s-openapi", @@ -2008,6 +2009,18 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "integration_skywalking" +version = "0.1.0" +dependencies = [ + "bytes 1.4.0", + "hyper 0.14.26", + "log 0.4.22", + "prost", + "public", + "tokio", +] + [[package]] name = "io-extras" version = "0.18.0" diff --git a/agent/Cargo.toml b/agent/Cargo.toml index 5ba3ef60143f..5b001da7a5f4 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -97,6 +97,7 @@ tonic = "0.8.1" wasmtime = "12.0.1" wasmtime-wasi = "12.0.1" zstd = "0.13.2" +integration_skywalking = { path = "plugins/integration_skywalking" } [target.'cfg(any(target_os = "linux", target_os = "android"))'.dependencies] diff --git a/agent/crates/public/src/sender.rs b/agent/crates/public/src/sender.rs index 41b279c78f77..7c0ff7af6f56 100644 --- a/agent/crates/public/src/sender.rs +++ b/agent/crates/public/src/sender.rs @@ -55,6 +55,7 @@ pub enum SendMessageType { // K8sEvent = 16, ApplicationLog = 17, SyslogDetail = 18, + SkyWalking = 19, } impl fmt::Display for SendMessageType { @@ -78,6 +79,7 @@ impl fmt::Display for SendMessageType { Self::AlarmEvent => write!(f, "alarm_event"), Self::ApplicationLog => write!(f, "application_log"), Self::SyslogDetail => write!(f, "syslog_detail"), + Self::SkyWalking => write!(f, "skywalking"), } } } diff --git a/agent/plugins/integration_skywalking/Cargo.toml b/agent/plugins/integration_skywalking/Cargo.toml new file mode 100644 index 000000000000..5585f8c18a47 --- /dev/null +++ b/agent/plugins/integration_skywalking/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "integration_skywalking" +version = "0.1.0" +edition = "2021" + +[dependencies] +hyper = { version = "0.14", features = ["full"] } +bytes = "1.0" +tokio = { version = "1.20.1", features = ["full"] } +public = { path = "../../crates/public" } +prost = "0.11" +log = "0.4" \ No newline at end of file diff --git a/agent/plugins/integration_skywalking/src/lib.rs b/agent/plugins/integration_skywalking/src/lib.rs new file mode 100644 index 000000000000..856d18b22dce --- /dev/null +++ b/agent/plugins/integration_skywalking/src/lib.rs @@ -0,0 +1,83 @@ +use bytes::{BufMut, BytesMut}; +use hyper::{ + header::{HeaderValue, CONTENT_TYPE}, + Body, HeaderMap, Response, StatusCode, Version, +}; +use log::warn; +use prost::{EncodeError, Message}; +use public::{ + proto::metric, + queue::DebugSender, + sender::{SendMessageType, Sendable}, +}; +use std::net::{IpAddr, SocketAddr}; + +const CONTENT_TYPE_GRPC: &str = "application/grpc"; + +#[derive(Debug, PartialEq)] +pub struct SkyWalkingExtra(pub metric::SkyWalkingExtra); + +impl Sendable for SkyWalkingExtra { + fn encode(self, buf: &mut Vec) -> Result { + self.0.encode(buf).map(|_| self.0.encoded_len()) + } + + fn message_type(&self) -> SendMessageType { + SendMessageType::SkyWalking + } +} + +pub async fn handle_skywalking_request( + peer_addr: SocketAddr, + data: Vec, + path: &str, + skywalking_sender: DebugSender, +) -> Response { + // Response::builder().status(StatusCode::NOT_FOUND).body(Body::empty()) + + let mut skywalking_extra = metric::SkyWalkingExtra::default(); + skywalking_extra.data = data; + skywalking_extra.peer_ip = match peer_addr.ip() { + IpAddr::V4(ip4) => ip4.octets().to_vec(), + IpAddr::V6(ip6) => ip6.octets().to_vec(), + }; + + if let Err(e) = skywalking_sender.send(SkyWalkingExtra(skywalking_extra)) { + warn!("skywalking_sender failed to send data, because {:?}", e); + } + + let is_grpc_request = match path { + "/v3/segments" => false, + "/skywalking.v3.TraceSegmentReportService/collect" + | "/skywalking.v3.TraceSegmentReportService/collectInSync" => true, + _ => false, + }; + + if is_grpc_request { + // for grpc_request, return empty response to avoid client error + let mut response_buf = BytesMut::new(); + response_buf.put_u8(0); // grpc not compression + response_buf.put_u32(0); // grpc data, return length = 0 + + let mut trailers = HeaderMap::new(); + trailers.insert("grpc-status", HeaderValue::from_static("0")); + + let (mut sender, body) = Body::channel(); + tokio::spawn(async move { + sender.send_data(response_buf.freeze()).await.unwrap(); + sender.send_trailers(trailers).await.unwrap(); + }); + + Response::builder() + .version(Version::HTTP_2) + .status(StatusCode::OK) + .header(CONTENT_TYPE, HeaderValue::from_static(&CONTENT_TYPE_GRPC)) + .body(body) + .unwrap() + } else { + Response::builder() + .status(StatusCode::OK) + .body(Body::empty()) + .unwrap() + } +} diff --git a/agent/src/integration_collector.rs b/agent/src/integration_collector.rs index 62e67c719ae3..d447b8514182 100644 --- a/agent/src/integration_collector.rs +++ b/agent/src/integration_collector.rs @@ -60,7 +60,7 @@ use crate::{ metric::document::{Direction, TapSide}, policy::PolicyGetter, }; - +use integration_skywalking::{handle_skywalking_request, SkyWalkingExtra}; use public::{ counter::{Counter, CounterType, CounterValue, OwnedCountable}, enums::{CaptureNetworkType, EthernetType, L4Protocol}, @@ -595,6 +595,7 @@ async fn handler( telegraf_sender: DebugSender, profile_sender: DebugSender, application_log_sender: DebugSender, + skywalking_sender: DebugSender, exception_handler: ExceptionHandler, compressed: bool, profile_compressed: bool, @@ -825,6 +826,28 @@ async fn handler( Ok(Response::builder().body(Body::empty()).unwrap()) } + ( + &Method::POST, + "/v3/segments" + | "/skywalking.v3.TraceSegmentReportService/collect" + | "/skywalking.v3.TraceSegmentReportService/collectInSync", + ) => { + if external_trace_integration_disabled { + return Ok(Response::builder().body(Body::empty()).unwrap()); + } + let (part, body) = req.into_parts(); + let whole_body = match aggregate_with_catch_exception(body, &exception_handler).await { + Ok(b) => b, + Err(e) => { + return Ok(e); + } + }; + let data = decode_metric(whole_body, &part.headers)?; + Ok( + handle_skywalking_request(peer_addr, data, part.uri.path(), skywalking_sender) + .await, + ) + } // Return the 404 Not Found for other routes. _ => Ok(Response::builder() .status(StatusCode::NOT_FOUND) @@ -925,6 +948,7 @@ pub struct MetricServer { telegraf_sender: DebugSender, profile_sender: DebugSender, application_log_sender: DebugSender, + skywalking_sender: DebugSender, port: Arc, exception_handler: ExceptionHandler, server_shutdown_tx: Mutex>>, @@ -952,6 +976,7 @@ impl MetricServer { telegraf_sender: DebugSender, profile_sender: DebugSender, application_log_sender: DebugSender, + skywalking_sender: DebugSender, port: u16, exception_handler: ExceptionHandler, compressed: bool, @@ -980,6 +1005,7 @@ impl MetricServer { telegraf_sender, profile_sender, application_log_sender, + skywalking_sender, port: Arc::new(AtomicU16::new(port)), exception_handler, server_shutdown_tx: Default::default(), @@ -1029,6 +1055,7 @@ impl MetricServer { let telegraf_sender = self.telegraf_sender.clone(); let profile_sender = self.profile_sender.clone(); let application_log_sender = self.application_log_sender.clone(); + let skywalking_sender = self.skywalking_sender.clone(); let port = self.port.clone(); let monitor_port = Arc::new(AtomicU16::new(port.load(Ordering::Acquire))); let (mon_tx, mon_rx) = oneshot::channel(); @@ -1101,6 +1128,7 @@ impl MetricServer { let telegraf_sender = telegraf_sender.clone(); let profile_sender = profile_sender.clone(); let application_log_sender = application_log_sender.clone(); + let skywalking_sender = skywalking_sender.clone(); let exception_handler_inner = exception_handler.clone(); let counter = counter.clone(); let compressed = compressed.clone(); @@ -1118,6 +1146,7 @@ impl MetricServer { let telegraf_sender = telegraf_sender.clone(); let profile_sender = profile_sender.clone(); let application_log_sender = application_log_sender.clone(); + let skywalking_sender = skywalking_sender.clone(); let exception_handler = exception_handler_inner.clone(); let peer_addr = conn.remote_addr(); let counter = counter.clone(); @@ -1141,6 +1170,7 @@ impl MetricServer { telegraf_sender.clone(), profile_sender.clone(), application_log_sender.clone(), + skywalking_sender.clone(), exception_handler.clone(), compressed.load(Ordering::Relaxed), profile_compressed.load(Ordering::Relaxed), diff --git a/agent/src/trident.rs b/agent/src/trident.rs index c1346695091b..be598be02922 100644 --- a/agent/src/trident.rs +++ b/agent/src/trident.rs @@ -35,6 +35,7 @@ use dns_lookup::lookup_host; use flexi_logger::{ colored_opt_format, writers::LogWriter, Age, Cleanup, Criterion, FileSpec, Logger, Naming, }; +use integration_skywalking::SkyWalkingExtra; use log::{debug, info, warn}; use tokio::runtime::{Builder, Runtime}; use tokio::sync::broadcast; @@ -1557,6 +1558,7 @@ pub struct AgentComponents { pub packet_sequence_uniform_sender: UniformSenderThread, // Enterprise Edition Feature: packet-sequence pub proc_event_uniform_sender: UniformSenderThread, pub application_log_uniform_sender: UniformSenderThread, + pub skywalking_uniform_sender: UniformSenderThread, pub exception_handler: ExceptionHandler, pub proto_log_sender: DebugSender, pub pcap_batch_sender: DebugSender, @@ -2429,6 +2431,31 @@ impl AgentComponents { exception_handler.clone(), None, ); + let skywalking_queue_name = "1-skywalking-to-sender"; + let (skywalking_sender, skywalking_receiver, counter) = queue::bounded_with_debug( + user_config + .processors + .flow_log + .tunning + .flow_aggregator_queue_size, + skywalking_queue_name, + &queue_debugger, + ); + stats_collector.register_countable( + &QueueStats { + module: skywalking_queue_name, + ..Default::default() + }, + Countable::Owned(Box::new(counter)), + ); + let skywalking_uniform_sender = UniformSenderThread::new( + skywalking_queue_name, + Arc::new(skywalking_receiver), + config_handler.sender(), + stats_collector.clone(), + exception_handler.clone(), + None, + ); let ebpf_dispatcher_id = dispatcher_components.len(); #[cfg(any(target_os = "linux", target_os = "android"))] @@ -2672,6 +2699,7 @@ impl AgentComponents { telegraf_sender, profile_sender, application_log_sender, + skywalking_sender, candidate_config.metric_server.port, exception_handler.clone(), candidate_config.metric_server.compressed, @@ -2756,6 +2784,7 @@ impl AgentComponents { profile_uniform_sender, proc_event_uniform_sender, application_log_uniform_sender, + skywalking_uniform_sender, capture_mode: candidate_config.capture_mode, packet_sequence_uniform_output, // Enterprise Edition Feature: packet-sequence packet_sequence_uniform_sender, // Enterprise Edition Feature: packet-sequence @@ -2844,6 +2873,7 @@ impl AgentComponents { self.profile_uniform_sender.start(); self.proc_event_uniform_sender.start(); self.application_log_uniform_sender.start(); + self.skywalking_uniform_sender.start(); if self.config.metric_server.enabled { self.metrics_server_component.start(); } @@ -2916,6 +2946,9 @@ impl AgentComponents { if let Some(h) = self.application_log_uniform_sender.notify_stop() { join_handles.push(h); } + if let Some(h) = self.skywalking_uniform_sender.notify_stop() { + join_handles.push(h); + } // Enterprise Edition Feature: packet-sequence if let Some(h) = self.packet_sequence_uniform_sender.notify_stop() { join_handles.push(h); diff --git a/message/metric.proto b/message/metric.proto index 5e13e8146e00..6f28608c034d 100644 --- a/message/metric.proto +++ b/message/metric.proto @@ -265,3 +265,8 @@ message PrometheusMetric { repeated string extra_label_names = 2; repeated string extra_label_values = 3; } + +message SkyWalkingExtra { + bytes data = 1; + bytes peer_ip = 2; +}