From 153eee4046becadc1c5841a0a2f5f89db5495b5c Mon Sep 17 00:00:00 2001 From: taloric Date: Thu, 14 Nov 2024 17:46:10 +0800 Subject: [PATCH] feat: support dd integration --- agent/crates/public/src/sender.rs | 2 + .../plugins/integration_skywalking/src/lib.rs | 2 +- agent/src/integration_collector.rs | 51 ++++++++++++++++++ agent/src/trident.rs | 33 +++++++++++- message/flow_log.proto | 2 +- server/go.mod | 2 + server/ingester/flow_log/decoder/decoder.go | 54 ++++++++++++++++++- .../flow_log/log_data/dd_import/dd_import.go | 11 ++++ .../flow_log/log_data/dd_import/go.mod | 3 ++ server/libs/datatype/droplet-message.go | 5 +- 10 files changed, 158 insertions(+), 7 deletions(-) create mode 100644 server/ingester/flow_log/log_data/dd_import/dd_import.go create mode 100644 server/ingester/flow_log/log_data/dd_import/go.mod diff --git a/agent/crates/public/src/sender.rs b/agent/crates/public/src/sender.rs index 7c0ff7af6f5..a306d7b604d 100644 --- a/agent/crates/public/src/sender.rs +++ b/agent/crates/public/src/sender.rs @@ -56,6 +56,7 @@ pub enum SendMessageType { ApplicationLog = 17, SyslogDetail = 18, SkyWalking = 19, + Datadog = 20, } impl fmt::Display for SendMessageType { @@ -80,6 +81,7 @@ impl fmt::Display for SendMessageType { Self::ApplicationLog => write!(f, "application_log"), Self::SyslogDetail => write!(f, "syslog_detail"), Self::SkyWalking => write!(f, "skywalking"), + Self::Datadog => write!(f, "datadog"), } } } diff --git a/agent/plugins/integration_skywalking/src/lib.rs b/agent/plugins/integration_skywalking/src/lib.rs index 7812401abbf..12ead96185c 100644 --- a/agent/plugins/integration_skywalking/src/lib.rs +++ b/agent/plugins/integration_skywalking/src/lib.rs @@ -24,7 +24,7 @@ use public::{ use std::net::SocketAddr; #[derive(Debug, PartialEq)] -pub struct SkyWalkingExtra(pub flow_log::SkyWalkingExtra); +pub struct SkyWalkingExtra(pub flow_log::ThirdPartyTrace); impl Sendable for SkyWalkingExtra { fn encode(self, _: &mut Vec) -> Result { diff --git a/agent/src/integration_collector.rs b/agent/src/integration_collector.rs index dd64bc783c7..3eb5e87be12 100644 --- a/agent/src/integration_collector.rs +++ b/agent/src/integration_collector.rs @@ -70,6 +70,7 @@ use public::{ l7_protocol::L7Protocol, proto::{ agent::Exception, + flow_log, integration::opentelemetry::proto::{ common::v1::{ any_value::Value::{IntValue, StringValue}, @@ -226,6 +227,19 @@ async fn aggregate_with_catch_exception( }) } +#[derive(Debug, PartialEq)] +pub struct Datadog(flow_log::ThirdPartyTrace); + +impl Sendable for Datadog { + fn encode(mut self, buf: &mut Vec) -> Result { + self.0.encode(buf).map(|_| self.0.encoded_len()) + } + + fn message_type(&self) -> SendMessageType { + SendMessageType::Datadog + } +} + // for log capture from vector #[derive(Debug, PartialEq)] pub struct ApplicationLog(Vec); @@ -599,6 +613,7 @@ async fn handler( profile_sender: DebugSender, application_log_sender: DebugSender, skywalking_sender: DebugSender, + datadog_sender: DebugSender, exception_handler: ExceptionHandler, compressed: bool, profile_compressed: bool, @@ -862,6 +877,35 @@ async fn handler( ) .await) } + // TODO: confirm regex/fuzzy path + // path: /api/v0.2,v0.3,v0.4,v0.5,v0.7, + // TODO: repeated code, consider make it common + (&Method::POST, "/v0.4/traces") => { + if external_trace_integration_disabled { + return Ok(Response::builder().body(Body::empty()).unwrap()); + } + let (part, body) = req.into_parts(); + let whole_body = match aggregate_with_catch_exception(body, &exception_handler).await { + Ok(b) => b, + Err(e) => { + return Ok(e); + } + }; + let datadog = decode_metric(whole_body, &part.headers)?; + let mut third_party_data = flow_log::ThirdPartyTrace::default(); + third_party_data.data = datadog; + third_party_data.uri = part.uri.path().to_string(); + third_party_data.peer_ip = match peer_addr.ip() { + IpAddr::V4(ip4) => ip4.octets().to_vec(), + IpAddr::V6(ip6) => ip6.octets().to_vec(), + }; + + if let Err(e) = datadog_sender.send(Datadog(third_party_data)) { + warn!("datadog_sender failed to send data, because {:?}", e); + } + + Ok(Response::builder().body(Body::empty()).unwrap()) + } // Return the 404 Not Found for other routes. _ => Ok(Response::builder() .status(StatusCode::NOT_FOUND) @@ -963,6 +1007,7 @@ pub struct MetricServer { profile_sender: DebugSender, application_log_sender: DebugSender, skywalking_sender: DebugSender, + datadog_sender: DebugSender, port: Arc, exception_handler: ExceptionHandler, server_shutdown_tx: Mutex>>, @@ -991,6 +1036,7 @@ impl MetricServer { profile_sender: DebugSender, application_log_sender: DebugSender, skywalking_sender: DebugSender, + datadog_sender: DebugSender, port: u16, exception_handler: ExceptionHandler, compressed: bool, @@ -1020,6 +1066,7 @@ impl MetricServer { profile_sender, application_log_sender, skywalking_sender, + datadog_sender, port: Arc::new(AtomicU16::new(port)), exception_handler, server_shutdown_tx: Default::default(), @@ -1070,6 +1117,7 @@ impl MetricServer { let profile_sender = self.profile_sender.clone(); let application_log_sender = self.application_log_sender.clone(); let skywalking_sender = self.skywalking_sender.clone(); + let datadog_sender = self.datadog_sender.clone(); let port = self.port.clone(); let monitor_port = Arc::new(AtomicU16::new(port.load(Ordering::Acquire))); let (mon_tx, mon_rx) = oneshot::channel(); @@ -1143,6 +1191,7 @@ impl MetricServer { let profile_sender = profile_sender.clone(); let application_log_sender = application_log_sender.clone(); let skywalking_sender = skywalking_sender.clone(); + let datadog_sender = datadog_sender.clone(); let exception_handler_inner = exception_handler.clone(); let counter = counter.clone(); let compressed = compressed.clone(); @@ -1161,6 +1210,7 @@ impl MetricServer { let profile_sender = profile_sender.clone(); let application_log_sender = application_log_sender.clone(); let skywalking_sender = skywalking_sender.clone(); + let datadog_sender = datadog_sender.clone(); let exception_handler = exception_handler_inner.clone(); let peer_addr = conn.remote_addr(); let counter = counter.clone(); @@ -1185,6 +1235,7 @@ impl MetricServer { profile_sender.clone(), application_log_sender.clone(), skywalking_sender.clone(), + datadog_sender.clone(), exception_handler.clone(), compressed.load(Ordering::Relaxed), profile_compressed.load(Ordering::Relaxed), diff --git a/agent/src/trident.rs b/agent/src/trident.rs index 0cbd6256355..e865dfa81f0 100644 --- a/agent/src/trident.rs +++ b/agent/src/trident.rs @@ -72,8 +72,8 @@ use crate::{ }, handler::{NpbBuilder, PacketHandlerBuilder}, integration_collector::{ - ApplicationLog, BoxedPrometheusExtra, MetricServer, OpenTelemetry, OpenTelemetryCompressed, - Profile, TelegrafMetric, + ApplicationLog, BoxedPrometheusExtra, Datadog, MetricServer, OpenTelemetry, + OpenTelemetryCompressed, Profile, TelegrafMetric, }, metric::document::BoxedDocument, monitor::Monitor, @@ -1568,6 +1568,7 @@ pub struct AgentComponents { pub proc_event_uniform_sender: UniformSenderThread, pub application_log_uniform_sender: UniformSenderThread, pub skywalking_uniform_sender: UniformSenderThread, + pub datadog_uniform_sender: UniformSenderThread, pub exception_handler: ExceptionHandler, pub proto_log_sender: DebugSender, pub pcap_batch_sender: DebugSender, @@ -2467,6 +2468,32 @@ impl AgentComponents { None, ); + let datadog_queue_name = "1-datadog-to-sender"; + let (datadog_sender, datadog_receiver, counter) = queue::bounded_with_debug( + user_config + .processors + .flow_log + .tunning + .flow_aggregator_queue_size, + datadog_queue_name, + &queue_debugger, + ); + stats_collector.register_countable( + &QueueStats { + module: datadog_queue_name, + ..Default::default() + }, + Countable::Owned(Box::new(counter)), + ); + let datadog_uniform_sender = UniformSenderThread::new( + datadog_queue_name, + Arc::new(datadog_receiver), + config_handler.sender(), + stats_collector.clone(), + exception_handler.clone(), + None, + ); + let ebpf_dispatcher_id = dispatcher_components.len(); #[cfg(any(target_os = "linux", target_os = "android"))] let mut ebpf_dispatcher_component = None; @@ -2710,6 +2737,7 @@ impl AgentComponents { profile_sender, application_log_sender, skywalking_sender, + datadog_sender, candidate_config.metric_server.port, exception_handler.clone(), candidate_config.metric_server.compressed, @@ -2795,6 +2823,7 @@ impl AgentComponents { proc_event_uniform_sender, application_log_uniform_sender, skywalking_uniform_sender, + datadog_uniform_sender, capture_mode: candidate_config.capture_mode, packet_sequence_uniform_output, // Enterprise Edition Feature: packet-sequence packet_sequence_uniform_sender, // Enterprise Edition Feature: packet-sequence diff --git a/message/flow_log.proto b/message/flow_log.proto index ee06ea9c778..684754b9032 100644 --- a/message/flow_log.proto +++ b/message/flow_log.proto @@ -296,7 +296,7 @@ message MqttTopic { int32 qos = 2; // -1 mean not exist qos } -message SkyWalkingExtra { +message ThirdPartyTrace { bytes data = 1; bytes peer_ip = 2; string uri = 3; diff --git a/server/go.mod b/server/go.mod index f7c3509f7a3..d46528a329f 100644 --- a/server/go.mod +++ b/server/go.mod @@ -14,6 +14,7 @@ replace ( github.com/deepflowio/deepflow/server/controller/http/service/configuration => ./controller/http/service/configuration github.com/deepflowio/deepflow/server/controller/monitor/license => ./controller/monitor/license github.com/deepflowio/deepflow/server/ingester/config/configdefaults => ./ingester/config/configdefaults + github.com/deepflowio/deepflow/server/ingester/flow_log/log_data/dd_import => ./ingester/flow_log/log_data/dd_import github.com/deepflowio/deepflow/server/ingester/flow_log/log_data/sw_import => ./ingester/flow_log/log_data/sw_import github.com/deepflowio/deepflow/server/libs/logger/blocker => ./libs/logger/blocker github.com/deepflowio/deepflow/server/querier/app/distributed_tracing/service/tracemap => ./querier/app/distributed_tracing/service/tracemap @@ -106,6 +107,7 @@ require ( github.com/bytedance/sonic v1.11.8 github.com/deepflowio/deepflow/server/controller/http/appender v0.0.0-00010101000000-000000000000 github.com/deepflowio/deepflow/server/controller/http/service/agentlicense v0.0.0-00010101000000-000000000000 + github.com/deepflowio/deepflow/server/ingester/flow_log/log_data/dd_import v0.0.0-00010101000000-000000000000 github.com/deepflowio/deepflow/server/ingester/flow_log/log_data/sw_import v0.0.0-00010101000000-000000000000 github.com/deepflowio/deepflow/server/libs/logger/blocker v0.0.0-20240822020041-cdaf0f82ce6f github.com/deepflowio/deepflow/server/querier/app/distributed_tracing/service/tracemap v0.0.0-00010101000000-000000000000 diff --git a/server/ingester/flow_log/decoder/decoder.go b/server/ingester/flow_log/decoder/decoder.go index 00d790542c8..a1a83b977ac 100644 --- a/server/ingester/flow_log/decoder/decoder.go +++ b/server/ingester/flow_log/decoder/decoder.go @@ -35,6 +35,7 @@ import ( "github.com/deepflowio/deepflow/server/ingester/flow_log/config" "github.com/deepflowio/deepflow/server/ingester/flow_log/dbwriter" "github.com/deepflowio/deepflow/server/ingester/flow_log/log_data" + "github.com/deepflowio/deepflow/server/ingester/flow_log/log_data/dd_import" "github.com/deepflowio/deepflow/server/ingester/flow_log/log_data/sw_import" "github.com/deepflowio/deepflow/server/ingester/flow_log/throttler" "github.com/deepflowio/deepflow/server/ingester/flow_tag" @@ -154,7 +155,8 @@ func (d *Decoder) Run() { decoder := &codec.SimpleDecoder{} pbTaggedFlow := pb.NewTaggedFlow() pbTracesData := &v1.TracesData{} - pbSkywalkingData := &pb.SkyWalkingExtra{} + pbSkywalkingData := &pb.ThirdPartyTrace{} + pbDdTrace := &pb.ThirdPartyTrace{} for { n := d.inQueue.Gets(buffer) start := time.Now() @@ -185,6 +187,8 @@ func (d *Decoder) Run() { d.handleL4Packet(decoder) case datatype.MESSAGE_TYPE_SKYWALKING: d.handleSkyWalking(decoder, pbSkywalkingData, false) + case datatype.MESSAGE_TYPE_DATADOG: + d.handleDatadog(decoder, pbDdTrace, false) default: log.Warningf("unknown msg type: %d", d.msgType) @@ -283,7 +287,53 @@ func (d *Decoder) sendOpenMetetry(tracesData *v1.TracesData) { } } -func (d *Decoder) handleSkyWalking(decoder *codec.SimpleDecoder, pbSkyWalkingData *pb.SkyWalkingExtra, compressed bool) { +func (d *Decoder) handleDatadog(decoder *codec.SimpleDecoder, pbThirdPartyData *pb.ThirdPartyTrace, compressed bool) { + var err error + for !decoder.IsEnd() { + bytes := decoder.ReadBytes() + pbThirdPartyData.Reset() + if len(bytes) > 0 { + // universal compression + if compressed { + bytes, err = decompressOpenTelemetry(bytes) + } + if err == nil { + err = proto.Unmarshal(bytes, pbThirdPartyData) + } + } + if decoder.Failed() || err != nil { + if d.counter.ErrorCount == 0 { + log.Errorf("skywalking data decode failed, offset=%d len=%d err: %s", decoder.Offset(), len(decoder.Bytes()), err) + } + d.counter.ErrorCount++ + continue + } + d.sendDatadog(pbThirdPartyData.Data, pbThirdPartyData.PeerIp, pbThirdPartyData.Uri) + } +} + +func (d *Decoder) sendDatadog(ddogdata, peerIP []byte, uri string) { + if d.debugEnabled { + log.Debugf("decoder %d vtap %d recv skywalking data length: %d", d.index, d.agentId, len(ddogdata)) + } + d.counter.Count++ + ls := dd_import.DDogDataToL7FlowLogs(d.agentId, d.orgId, d.teamId, ddogdata, peerIP, uri, d.platformData, d.cfg) + for _, l := range ls { + l.AddReferenceCount() + if !d.throttler.SendWithThrottling(l) { + d.counter.DropCount++ + } else { + d.fieldsBuf, d.fieldValuesBuf = d.fieldsBuf[:0], d.fieldValuesBuf[:0] + l.GenerateNewFlowTags(d.flowTagWriter.Cache) + d.flowTagWriter.WriteFieldsAndFieldValuesInCache() + d.appServiceTagWrite(l) + d.spanWrite(l) + } + l.Release() + } +} + +func (d *Decoder) handleSkyWalking(decoder *codec.SimpleDecoder, pbSkyWalkingData *pb.ThirdPartyTrace, compressed bool) { var err error for !decoder.IsEnd() { pbSkyWalkingData.Reset() diff --git a/server/ingester/flow_log/log_data/dd_import/dd_import.go b/server/ingester/flow_log/log_data/dd_import/dd_import.go new file mode 100644 index 00000000000..6ecb1f13040 --- /dev/null +++ b/server/ingester/flow_log/log_data/dd_import/dd_import.go @@ -0,0 +1,11 @@ +package dd_import + +import ( + flowlogCfg "github.com/deepflowio/deepflow/server/ingester/flow_log/config" + "github.com/deepflowio/deepflow/server/ingester/flow_log/log_data" + "github.com/deepflowio/deepflow/server/libs/grpc" +) + +func DDogDataToL7FlowLogs(vtapID, orgId, teamId uint16, ddog, peerIP []byte, uri string, platformData *grpc.PlatformInfoTable, cfg *flowlogCfg.Config) []*log_data.L7FlowLog { + return []*log_data.L7FlowLog{} +} diff --git a/server/ingester/flow_log/log_data/dd_import/go.mod b/server/ingester/flow_log/log_data/dd_import/go.mod new file mode 100644 index 00000000000..0b22d323077 --- /dev/null +++ b/server/ingester/flow_log/log_data/dd_import/go.mod @@ -0,0 +1,3 @@ +module github.com/deepflowio/deepflow/server/ingester/flow_log/log_data/dd_import + +go 1.18 \ No newline at end of file diff --git a/server/libs/datatype/droplet-message.go b/server/libs/datatype/droplet-message.go index b529fdf7f1e..c3a09e8c864 100644 --- a/server/libs/datatype/droplet-message.go +++ b/server/libs/datatype/droplet-message.go @@ -55,7 +55,8 @@ const ( MESSAGE_TYPE_K8S_EVENT MESSAGE_TYPE_APPLICATION_LOG MESSAGE_TYPE_AGENT_LOG - MESSAGE_TYPE_SKYWALKING // 19 + MESSAGE_TYPE_SKYWALKING + MESSAGE_TYPE_DATADOG // 20 MESSAGE_TYPE_MAX ) @@ -82,6 +83,7 @@ var MessageTypeString = [MESSAGE_TYPE_MAX]string{ MESSAGE_TYPE_APPLICATION_LOG: "application_log", MESSAGE_TYPE_AGENT_LOG: "agent_log", MESSAGE_TYPE_SKYWALKING: "skywalking", + MESSAGE_TYPE_DATADOG: "datadog", } func (m MessageType) String() string { @@ -126,6 +128,7 @@ var MessageHeaderTypes = [MESSAGE_TYPE_MAX]MessageHeaderType{ MESSAGE_TYPE_APPLICATION_LOG: HEADER_TYPE_LT_VTAP, MESSAGE_TYPE_AGENT_LOG: HEADER_TYPE_LT_VTAP, MESSAGE_TYPE_SKYWALKING: HEADER_TYPE_LT_VTAP, + MESSAGE_TYPE_DATADOG: HEADER_TYPE_LT_VTAP, } func (m MessageType) HeaderType() MessageHeaderType {