diff --git a/agent/Cargo.lock b/agent/Cargo.lock index c1e3e854332d..32cd36bfd297 100644 --- a/agent/Cargo.lock +++ b/agent/Cargo.lock @@ -1009,6 +1009,7 @@ dependencies = [ "http2", "humantime-serde", "hyper 0.14.26", + "integration_skywalking", "ipnet", "ipnetwork", "k8s-openapi", @@ -2008,6 +2009,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "integration_skywalking" +version = "0.1.0" +dependencies = [ + "hyper 0.14.26", + "prost", + "public", +] + [[package]] name = "io-extras" version = "0.18.0" diff --git a/agent/Cargo.toml b/agent/Cargo.toml index 5ba3ef60143f..5b001da7a5f4 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -97,6 +97,7 @@ tonic = "0.8.1" wasmtime = "12.0.1" wasmtime-wasi = "12.0.1" zstd = "0.13.2" +integration_skywalking = { path = "plugins/integration_skywalking" } [target.'cfg(any(target_os = "linux", target_os = "android"))'.dependencies] diff --git a/agent/crates/public/src/sender.rs b/agent/crates/public/src/sender.rs index 41b279c78f77..7c0ff7af6f56 100644 --- a/agent/crates/public/src/sender.rs +++ b/agent/crates/public/src/sender.rs @@ -55,6 +55,7 @@ pub enum SendMessageType { // K8sEvent = 16, ApplicationLog = 17, SyslogDetail = 18, + SkyWalking = 19, } impl fmt::Display for SendMessageType { @@ -78,6 +79,7 @@ impl fmt::Display for SendMessageType { Self::AlarmEvent => write!(f, "alarm_event"), Self::ApplicationLog => write!(f, "application_log"), Self::SyslogDetail => write!(f, "syslog_detail"), + Self::SkyWalking => write!(f, "skywalking"), } } } diff --git a/agent/plugins/integration_skywalking/Cargo.toml b/agent/plugins/integration_skywalking/Cargo.toml new file mode 100644 index 000000000000..ddea66ef4b39 --- /dev/null +++ b/agent/plugins/integration_skywalking/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "integration_skywalking" +version = "0.1.0" +edition = "2021" + +[dependencies] +hyper = { version = "0.14", features = ["full"] } +public = { path = "../../crates/public" } +prost = "0.11" diff --git a/agent/plugins/integration_skywalking/src/lib.rs b/agent/plugins/integration_skywalking/src/lib.rs new file mode 100644 index 000000000000..325828e498fb --- /dev/null +++ b/agent/plugins/integration_skywalking/src/lib.rs @@ -0,0 +1,33 @@ +use hyper::{Body, Response, StatusCode}; +use prost::EncodeError; +use public::{ + proto::flow_log, + queue::DebugSender, + sender::{SendMessageType, Sendable}, +}; +use std::net::SocketAddr; + +#[derive(Debug, PartialEq)] +pub struct SkyWalkingExtra(pub flow_log::SkyWalkingExtra); + +impl Sendable for SkyWalkingExtra { + fn encode(self, _: &mut Vec) -> Result { + return Ok(0); + } + + fn message_type(&self) -> SendMessageType { + SendMessageType::SkyWalking + } +} + +pub async fn handle_skywalking_request( + _: SocketAddr, + _: Vec, + _: &str, + _: DebugSender, +) -> Response { + Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::empty()) + .unwrap() +} diff --git a/agent/src/integration_collector.rs b/agent/src/integration_collector.rs index 62e67c719ae3..f9ac3ed3d6d8 100644 --- a/agent/src/integration_collector.rs +++ b/agent/src/integration_collector.rs @@ -61,6 +61,7 @@ use crate::{ policy::PolicyGetter, }; +use integration_skywalking::{handle_skywalking_request, SkyWalkingExtra}; use public::{ counter::{Counter, CounterType, CounterValue, OwnedCountable}, enums::{CaptureNetworkType, EthernetType, L4Protocol}, @@ -595,6 +596,7 @@ async fn handler( telegraf_sender: DebugSender, profile_sender: DebugSender, application_log_sender: DebugSender, + skywalking_sender: DebugSender, exception_handler: ExceptionHandler, compressed: bool, profile_compressed: bool, @@ -825,6 +827,28 @@ async fn handler( Ok(Response::builder().body(Body::empty()).unwrap()) } + ( + &Method::POST, + "/v3/segments" + | "/skywalking.v3.TraceSegmentReportService/collect" + | "/skywalking.v3.TraceSegmentReportService/collectInSync", + ) => { + if external_trace_integration_disabled { + return Ok(Response::builder().body(Body::empty()).unwrap()); + } + let (part, body) = req.into_parts(); + let whole_body = match aggregate_with_catch_exception(body, &exception_handler).await { + Ok(b) => b, + Err(e) => { + return Ok(e); + } + }; + let data = decode_metric(whole_body, &part.headers)?; + Ok( + handle_skywalking_request(peer_addr, data, part.uri.path(), skywalking_sender) + .await, + ) + } // Return the 404 Not Found for other routes. _ => Ok(Response::builder() .status(StatusCode::NOT_FOUND) @@ -925,6 +949,7 @@ pub struct MetricServer { telegraf_sender: DebugSender, profile_sender: DebugSender, application_log_sender: DebugSender, + skywalking_sender: DebugSender, port: Arc, exception_handler: ExceptionHandler, server_shutdown_tx: Mutex>>, @@ -952,6 +977,7 @@ impl MetricServer { telegraf_sender: DebugSender, profile_sender: DebugSender, application_log_sender: DebugSender, + skywalking_sender: DebugSender, port: u16, exception_handler: ExceptionHandler, compressed: bool, @@ -980,6 +1006,7 @@ impl MetricServer { telegraf_sender, profile_sender, application_log_sender, + skywalking_sender, port: Arc::new(AtomicU16::new(port)), exception_handler, server_shutdown_tx: Default::default(), @@ -1029,6 +1056,7 @@ impl MetricServer { let telegraf_sender = self.telegraf_sender.clone(); let profile_sender = self.profile_sender.clone(); let application_log_sender = self.application_log_sender.clone(); + let skywalking_sender = self.skywalking_sender.clone(); let port = self.port.clone(); let monitor_port = Arc::new(AtomicU16::new(port.load(Ordering::Acquire))); let (mon_tx, mon_rx) = oneshot::channel(); @@ -1101,6 +1129,7 @@ impl MetricServer { let telegraf_sender = telegraf_sender.clone(); let profile_sender = profile_sender.clone(); let application_log_sender = application_log_sender.clone(); + let skywalking_sender = skywalking_sender.clone(); let exception_handler_inner = exception_handler.clone(); let counter = counter.clone(); let compressed = compressed.clone(); @@ -1118,6 +1147,7 @@ impl MetricServer { let telegraf_sender = telegraf_sender.clone(); let profile_sender = profile_sender.clone(); let application_log_sender = application_log_sender.clone(); + let skywalking_sender = skywalking_sender.clone(); let exception_handler = exception_handler_inner.clone(); let peer_addr = conn.remote_addr(); let counter = counter.clone(); @@ -1141,6 +1171,7 @@ impl MetricServer { telegraf_sender.clone(), profile_sender.clone(), application_log_sender.clone(), + skywalking_sender.clone(), exception_handler.clone(), compressed.load(Ordering::Relaxed), profile_compressed.load(Ordering::Relaxed), diff --git a/agent/src/trident.rs b/agent/src/trident.rs index c1346695091b..20fc3a4d70ce 100644 --- a/agent/src/trident.rs +++ b/agent/src/trident.rs @@ -113,6 +113,7 @@ use crate::{ utils::environment::{IN_CONTAINER, K8S_WATCH_POLICY}, }; +use integration_skywalking::SkyWalkingExtra; use packet_sequence_block::BoxedPacketSequenceBlock; use pcap_assembler::{BoxedPcapBatch, PcapAssembler}; @@ -1557,6 +1558,7 @@ pub struct AgentComponents { pub packet_sequence_uniform_sender: UniformSenderThread, // Enterprise Edition Feature: packet-sequence pub proc_event_uniform_sender: UniformSenderThread, pub application_log_uniform_sender: UniformSenderThread, + pub skywalking_uniform_sender: UniformSenderThread, pub exception_handler: ExceptionHandler, pub proto_log_sender: DebugSender, pub pcap_batch_sender: DebugSender, @@ -2430,6 +2432,32 @@ impl AgentComponents { None, ); + let skywalking_queue_name = "1-skywalking-to-sender"; + let (skywalking_sender, skywalking_receiver, counter) = queue::bounded_with_debug( + user_config + .processors + .flow_log + .tunning + .flow_aggregator_queue_size, + skywalking_queue_name, + &queue_debugger, + ); + stats_collector.register_countable( + &QueueStats { + module: skywalking_queue_name, + ..Default::default() + }, + Countable::Owned(Box::new(counter)), + ); + let skywalking_uniform_sender = UniformSenderThread::new( + skywalking_queue_name, + Arc::new(skywalking_receiver), + config_handler.sender(), + stats_collector.clone(), + exception_handler.clone(), + None, + ); + let ebpf_dispatcher_id = dispatcher_components.len(); #[cfg(any(target_os = "linux", target_os = "android"))] let mut ebpf_dispatcher_component = None; @@ -2672,6 +2700,7 @@ impl AgentComponents { telegraf_sender, profile_sender, application_log_sender, + skywalking_sender, candidate_config.metric_server.port, exception_handler.clone(), candidate_config.metric_server.compressed, @@ -2756,6 +2785,7 @@ impl AgentComponents { profile_uniform_sender, proc_event_uniform_sender, application_log_uniform_sender, + skywalking_uniform_sender, capture_mode: candidate_config.capture_mode, packet_sequence_uniform_output, // Enterprise Edition Feature: packet-sequence packet_sequence_uniform_sender, // Enterprise Edition Feature: packet-sequence @@ -2844,6 +2874,7 @@ impl AgentComponents { self.profile_uniform_sender.start(); self.proc_event_uniform_sender.start(); self.application_log_uniform_sender.start(); + self.skywalking_uniform_sender.start(); if self.config.metric_server.enabled { self.metrics_server_component.start(); } @@ -2916,6 +2947,9 @@ impl AgentComponents { if let Some(h) = self.application_log_uniform_sender.notify_stop() { join_handles.push(h); } + if let Some(h) = self.skywalking_uniform_sender.notify_stop() { + join_handles.push(h); + } // Enterprise Edition Feature: packet-sequence if let Some(h) = self.packet_sequence_uniform_sender.notify_stop() { join_handles.push(h); diff --git a/message/flow_log.proto b/message/flow_log.proto index a298fcf2da10..77737f71d3e8 100644 --- a/message/flow_log.proto +++ b/message/flow_log.proto @@ -296,3 +296,7 @@ message MqttTopic { int32 qos = 2; // -1 mean not exist qos } +message SkyWalkingExtra { + bytes data = 1; + bytes peer_ip = 2; +} diff --git a/server/go.mod b/server/go.mod index 2ee92a89def4..72d24552f8da 100644 --- a/server/go.mod +++ b/server/go.mod @@ -20,6 +20,7 @@ replace ( github.com/deepflowio/deepflow/server/querier/app/prometheus/service/packet_wrapper => ./querier/app/prometheus/service/packet_wrapper github.com/deepflowio/deepflow/server/querier/app/tracing-adapter/service/packet_service => ./querier/app/tracing-adapter/service/packet_service github.com/deepflowio/deepflow/server/querier/engine/clickhouse/packet_batch => ./querier/engine/clickhouse/packet_batch + github.com/deepflowio/deepflow/server/ingester/flow_log/log_data/sw_import => ./ingester/flow_log/log_data/sw_import github.com/ionos-cloud/sdk-go/v6 => github.com/ionos-cloud/sdk-go/v6 v6.1.0 ) diff --git a/server/ingester/flow_log/log_data/l7_flow_log.go b/server/ingester/flow_log/log_data/l7_flow_log.go index aeeb6061f3bb..ab0a7d967127 100644 --- a/server/ingester/flow_log/log_data/l7_flow_log.go +++ b/server/ingester/flow_log/log_data/l7_flow_log.go @@ -369,7 +369,7 @@ func base64ToHexString(str string) string { // when the traceId-index data is stored in CK, the generated minmax index will have min non-zero, which improves the filtering performance of the minmax index var lastTraceIdIndex uint64 -func parseTraceIdIndex(traceId string, traceIdIndexCfg *config.TraceIdWithIndex) uint64 { +func ParseTraceIdIndex(traceId string, traceIdIndexCfg *config.TraceIdWithIndex) uint64 { if traceIdIndexCfg.Disabled { return 0 } @@ -474,7 +474,7 @@ func (h *L7FlowLog) fillL7FlowLog(l *pb.AppProtoLogsData, cfg *flowlogCfg.Config h.TraceId = l.TraceInfo.TraceId h.ParentSpanId = l.TraceInfo.ParentSpanId } - h.TraceIdIndex = parseTraceIdIndex(h.TraceId, &cfg.Base.TraceIdWithIndex) + h.TraceIdIndex = ParseTraceIdIndex(h.TraceId, &cfg.Base.TraceIdWithIndex) // 处理内置协议特殊情况 switch datatype.L7Protocol(h.L7Protocol) { @@ -548,6 +548,10 @@ func (h *L7FlowLog) ID() uint64 { return h._id } +func (h *L7FlowLog) SetID(id uint64) { + h._id = id +} + func (b *L7Base) Fill(log *pb.AppProtoLogsData, platformData *grpc.PlatformInfoTable) { l := log.Base // 网络层 diff --git a/server/ingester/flow_log/log_data/otel_import.go b/server/ingester/flow_log/log_data/otel_import.go index 71b0eda7eaf9..53ea1740f283 100644 --- a/server/ingester/flow_log/log_data/otel_import.go +++ b/server/ingester/flow_log/log_data/otel_import.go @@ -87,7 +87,7 @@ func spanStatusToResponseStatus(status *v1.Status) datatype.LogMessageStatus { return datatype.STATUS_NOT_EXIST } -func httpCodeToResponseStatus(code int32) datatype.LogMessageStatus { +func HttpCodeToResponseStatus(code int32) datatype.LogMessageStatus { if code >= 400 && code <= 499 { return datatype.STATUS_CLIENT_ERROR } else if code >= 500 && code <= 600 { @@ -197,7 +197,7 @@ func (h *L7FlowLog) fillAttributes(spanAttributes, resAttributes []*v11.KeyValue } case "sw8.trace_id": h.TraceId = getValueString(value) - h.TraceIdIndex = parseTraceIdIndex(h.TraceId, &cfg.Base.TraceIdWithIndex) + h.TraceIdIndex = ParseTraceIdIndex(h.TraceId, &cfg.Base.TraceIdWithIndex) } } else { @@ -297,7 +297,7 @@ func (h *L7FlowLog) fillAttributes(spanAttributes, resAttributes []*v11.KeyValue // If http.target exists, read it for RequestResource. If not exist, read the part after the domain name from http.url. // eg. http.url = http://nacos:8848/nacos/v1/ns/instance/list, mapped to request_resource is /nacos/v1/ns/instance/list if h.RequestResource == "" && httpURL != "" { - parsedURLPath, err := parseUrlPath(httpURL) + parsedURLPath, err := ParseUrlPath(httpURL) if err != nil { log.Debugf("http.url (%s) parse failed: %s", httpURL, err) } else { @@ -340,7 +340,7 @@ func (h *L7FlowLog) FillOTel(l *v1.Span, resAttributes []*v11.KeyValue, platform h.TapPortType = datatype.TAPPORT_FROM_OTEL h.SignalSource = uint16(datatype.SIGNAL_SOURCE_OTEL) h.TraceId = hex.EncodeToString(l.TraceId) - h.TraceIdIndex = parseTraceIdIndex(h.TraceId, &cfg.Base.TraceIdWithIndex) + h.TraceIdIndex = ParseTraceIdIndex(h.TraceId, &cfg.Base.TraceIdWithIndex) h.SpanId = hex.EncodeToString(l.SpanId) h.ParentSpanId = hex.EncodeToString(l.ParentSpanId) h.TapSideEnum = uint8(spanKindToTapSide(l.Kind)) @@ -362,7 +362,7 @@ func (h *L7FlowLog) FillOTel(l *v1.Span, resAttributes []*v11.KeyValue, platform h.fillAttributes(l.GetAttributes(), resAttributes, l.GetLinks(), cfg) // 优先匹配http的响应码 if h.responseCode != 0 { - h.ResponseStatus = uint8(httpCodeToResponseStatus(h.responseCode)) + h.ResponseStatus = uint8(HttpCodeToResponseStatus(h.responseCode)) if h.ResponseStatus == uint8(datatype.STATUS_CLIENT_ERROR) || h.ResponseStatus == uint8(datatype.STATUS_SERVER_ERROR) { h.ResponseException = GetHTTPExceptionDesc(uint16(h.responseCode)) diff --git a/server/ingester/flow_log/log_data/sw_import/go.mod b/server/ingester/flow_log/log_data/sw_import/go.mod new file mode 100644 index 000000000000..7cd4d709a995 --- /dev/null +++ b/server/ingester/flow_log/log_data/sw_import/go.mod @@ -0,0 +1,3 @@ +module github.com/deepflowio/deepflow/server/ingester/flow_log/log_data/sw_import + +go 1.18 \ No newline at end of file diff --git a/server/ingester/flow_log/log_data/sw_import/sw_import.go b/server/ingester/flow_log/log_data/sw_import/sw_import.go new file mode 100644 index 000000000000..8a113dd3c29a --- /dev/null +++ b/server/ingester/flow_log/log_data/sw_import/sw_import.go @@ -0,0 +1,11 @@ +package sw_import + +import ( + flowlogCfg "github.com/deepflowio/deepflow/server/ingester/flow_log/config" + "github.com/deepflowio/deepflow/server/ingester/flow_log/log_data" + "github.com/deepflowio/deepflow/server/libs/grpc" +) + +func SkyWalkingDataToL7FlowLogs(vtapID, orgId, teamId uint16, segmentData, peerIP []byte, platformData *grpc.PlatformInfoTable, cfg *flowlogCfg.Config) []*log_data.L7FlowLog { + return []*log_data.L7FlowLog{} +} diff --git a/server/ingester/flow_log/log_data/utils.go b/server/ingester/flow_log/log_data/utils.go index 39899ec4edc1..2c5acf6f5c9a 100644 --- a/server/ingester/flow_log/log_data/utils.go +++ b/server/ingester/flow_log/log_data/utils.go @@ -27,7 +27,7 @@ func IPIntToString(ipInt uint32) string { } // eg. url=http://nacos:8848/nacos/v1/ns/instance/list, parse return `/nacos/v1/ns/instance/list` -func parseUrlPath(rawURL string) (string, error) { +func ParseUrlPath(rawURL string) (string, error) { parts := strings.SplitN(rawURL, "://", 2) if len(parts) != 2 || parts[1] == "" { return "", fmt.Errorf("invalid URL format") diff --git a/server/ingester/flow_log/log_data/utils_test.go b/server/ingester/flow_log/log_data/utils_test.go index b9f456cd2518..31fae1e33b42 100644 --- a/server/ingester/flow_log/log_data/utils_test.go +++ b/server/ingester/flow_log/log_data/utils_test.go @@ -37,7 +37,7 @@ func TestParseUrlPath(t *testing.T) { } for _, testCase := range testCases { - result, err := parseUrlPath(testCase.url) + result, err := ParseUrlPath(testCase.url) if (err != nil) != testCase.err { t.Errorf("URL: %s\nExpected error: %v\nGot error: %v\n", testCase.url, testCase.err, err != nil) }