diff --git a/agent/crates/public/src/sender.rs b/agent/crates/public/src/sender.rs index 7c0ff7af6f56..a306d7b604db 100644 --- a/agent/crates/public/src/sender.rs +++ b/agent/crates/public/src/sender.rs @@ -56,6 +56,7 @@ pub enum SendMessageType { ApplicationLog = 17, SyslogDetail = 18, SkyWalking = 19, + Datadog = 20, } impl fmt::Display for SendMessageType { @@ -80,6 +81,7 @@ impl fmt::Display for SendMessageType { Self::ApplicationLog => write!(f, "application_log"), Self::SyslogDetail => write!(f, "syslog_detail"), Self::SkyWalking => write!(f, "skywalking"), + Self::Datadog => write!(f, "datadog"), } } } diff --git a/agent/plugins/integration_skywalking/src/lib.rs b/agent/plugins/integration_skywalking/src/lib.rs index 7812401abbf2..12ead96185cd 100644 --- a/agent/plugins/integration_skywalking/src/lib.rs +++ b/agent/plugins/integration_skywalking/src/lib.rs @@ -24,7 +24,7 @@ use public::{ use std::net::SocketAddr; #[derive(Debug, PartialEq)] -pub struct SkyWalkingExtra(pub flow_log::SkyWalkingExtra); +pub struct SkyWalkingExtra(pub flow_log::ThirdPartyTrace); impl Sendable for SkyWalkingExtra { fn encode(self, _: &mut Vec) -> Result { diff --git a/agent/src/integration_collector.rs b/agent/src/integration_collector.rs index dd64bc783c74..dd34a547044b 100644 --- a/agent/src/integration_collector.rs +++ b/agent/src/integration_collector.rs @@ -70,6 +70,7 @@ use public::{ l7_protocol::L7Protocol, proto::{ agent::Exception, + flow_log, integration::opentelemetry::proto::{ common::v1::{ any_value::Value::{IntValue, StringValue}, @@ -226,6 +227,19 @@ async fn aggregate_with_catch_exception( }) } +#[derive(Debug, PartialEq)] +pub struct Datadog(flow_log::ThirdPartyTrace); + +impl Sendable for Datadog { + fn encode(self, buf: &mut Vec) -> Result { + self.0.encode(buf).map(|_| self.0.encoded_len()) + } + + fn message_type(&self) -> SendMessageType { + SendMessageType::Datadog + } +} + // for log capture from vector #[derive(Debug, PartialEq)] pub struct ApplicationLog(Vec); @@ -599,6 +613,7 @@ async fn handler( profile_sender: DebugSender, application_log_sender: DebugSender, skywalking_sender: DebugSender, + datadog_sender: DebugSender, exception_handler: ExceptionHandler, compressed: bool, profile_compressed: bool, @@ -862,6 +877,37 @@ async fn handler( ) .await) } + // TODO: confirm regex/fuzzy path + // path: /api/v0.2,v0.3,v0.4,v0.5,v0.7, + // TODO: repeated code, consider make it common + (&Method::POST, "/v0.4/traces") => { + if external_trace_integration_disabled { + return Ok(Response::builder().body(Body::empty()).unwrap()); + } + let (part, body) = req.into_parts(); + let whole_body = match aggregate_with_catch_exception(body, &exception_handler).await { + Ok(b) => b, + Err(e) => { + return Ok(e); + } + }; + + let mut third_party_data = flow_log::ThirdPartyTrace::default(); + parse_dd_headers(&part.headers, &mut third_party_data); + let datadog = decode_metric(whole_body, &part.headers)?; + third_party_data.data = datadog; + third_party_data.uri = part.uri.path().to_string(); + third_party_data.peer_ip = match peer_addr.ip() { + IpAddr::V4(ip4) => ip4.octets().to_vec(), + IpAddr::V6(ip6) => ip6.octets().to_vec(), + }; + + if let Err(e) = datadog_sender.send(Datadog(third_party_data)) { + warn!("datadog_sender failed to send data, because {:?}", e); + } + + Ok(Response::builder().body(Body::empty()).unwrap()) + } // Return the 404 Not Found for other routes. _ => Ok(Response::builder() .status(StatusCode::NOT_FOUND) @@ -904,6 +950,23 @@ fn parse_profile_query(query: &str, profile: &mut metric::Profile) { }; } +fn parse_dd_headers(headers: &HeaderMap, third_party_data: &mut flow_log::ThirdPartyTrace) { + for key in vec![ + "Datadog-Meta-Lang", // headers.lang + "Datadog-Meta-Lang-Version", // headers.lang_version + "Datadog-Meta-Tracer-Version", // headers.tracer_version + "Datadog-Container-Id", // headers.container_id + "Content-Type", // for decode format validate + ] { + if let Some(value) = headers.get(key) { + third_party_data.extend_keys.push(key.to_string()); + third_party_data + .extend_values + .push(value.to_str().unwrap().to_string()); + } + } +} + #[derive(Default)] struct CompressedMetric { compressed: AtomicU64, // unit (bytes) @@ -963,6 +1026,7 @@ pub struct MetricServer { profile_sender: DebugSender, application_log_sender: DebugSender, skywalking_sender: DebugSender, + datadog_sender: DebugSender, port: Arc, exception_handler: ExceptionHandler, server_shutdown_tx: Mutex>>, @@ -991,6 +1055,7 @@ impl MetricServer { profile_sender: DebugSender, application_log_sender: DebugSender, skywalking_sender: DebugSender, + datadog_sender: DebugSender, port: u16, exception_handler: ExceptionHandler, compressed: bool, @@ -1020,6 +1085,7 @@ impl MetricServer { profile_sender, application_log_sender, skywalking_sender, + datadog_sender, port: Arc::new(AtomicU16::new(port)), exception_handler, server_shutdown_tx: Default::default(), @@ -1070,6 +1136,7 @@ impl MetricServer { let profile_sender = self.profile_sender.clone(); let application_log_sender = self.application_log_sender.clone(); let skywalking_sender = self.skywalking_sender.clone(); + let datadog_sender = self.datadog_sender.clone(); let port = self.port.clone(); let monitor_port = Arc::new(AtomicU16::new(port.load(Ordering::Acquire))); let (mon_tx, mon_rx) = oneshot::channel(); @@ -1143,6 +1210,7 @@ impl MetricServer { let profile_sender = profile_sender.clone(); let application_log_sender = application_log_sender.clone(); let skywalking_sender = skywalking_sender.clone(); + let datadog_sender = datadog_sender.clone(); let exception_handler_inner = exception_handler.clone(); let counter = counter.clone(); let compressed = compressed.clone(); @@ -1161,6 +1229,7 @@ impl MetricServer { let profile_sender = profile_sender.clone(); let application_log_sender = application_log_sender.clone(); let skywalking_sender = skywalking_sender.clone(); + let datadog_sender = datadog_sender.clone(); let exception_handler = exception_handler_inner.clone(); let peer_addr = conn.remote_addr(); let counter = counter.clone(); @@ -1185,6 +1254,7 @@ impl MetricServer { profile_sender.clone(), application_log_sender.clone(), skywalking_sender.clone(), + datadog_sender.clone(), exception_handler.clone(), compressed.load(Ordering::Relaxed), profile_compressed.load(Ordering::Relaxed), diff --git a/agent/src/trident.rs b/agent/src/trident.rs index 0cbd62563550..1d1862d08fb4 100644 --- a/agent/src/trident.rs +++ b/agent/src/trident.rs @@ -72,8 +72,8 @@ use crate::{ }, handler::{NpbBuilder, PacketHandlerBuilder}, integration_collector::{ - ApplicationLog, BoxedPrometheusExtra, MetricServer, OpenTelemetry, OpenTelemetryCompressed, - Profile, TelegrafMetric, + ApplicationLog, BoxedPrometheusExtra, Datadog, MetricServer, OpenTelemetry, + OpenTelemetryCompressed, Profile, TelegrafMetric, }, metric::document::BoxedDocument, monitor::Monitor, @@ -1568,6 +1568,7 @@ pub struct AgentComponents { pub proc_event_uniform_sender: UniformSenderThread, pub application_log_uniform_sender: UniformSenderThread, pub skywalking_uniform_sender: UniformSenderThread, + pub datadog_uniform_sender: UniformSenderThread, pub exception_handler: ExceptionHandler, pub proto_log_sender: DebugSender, pub pcap_batch_sender: DebugSender, @@ -2467,6 +2468,32 @@ impl AgentComponents { None, ); + let datadog_queue_name = "1-datadog-to-sender"; + let (datadog_sender, datadog_receiver, counter) = queue::bounded_with_debug( + user_config + .processors + .flow_log + .tunning + .flow_aggregator_queue_size, + datadog_queue_name, + &queue_debugger, + ); + stats_collector.register_countable( + &QueueStats { + module: datadog_queue_name, + ..Default::default() + }, + Countable::Owned(Box::new(counter)), + ); + let datadog_uniform_sender = UniformSenderThread::new( + datadog_queue_name, + Arc::new(datadog_receiver), + config_handler.sender(), + stats_collector.clone(), + exception_handler.clone(), + None, + ); + let ebpf_dispatcher_id = dispatcher_components.len(); #[cfg(any(target_os = "linux", target_os = "android"))] let mut ebpf_dispatcher_component = None; @@ -2710,6 +2737,7 @@ impl AgentComponents { profile_sender, application_log_sender, skywalking_sender, + datadog_sender, candidate_config.metric_server.port, exception_handler.clone(), candidate_config.metric_server.compressed, @@ -2795,6 +2823,7 @@ impl AgentComponents { proc_event_uniform_sender, application_log_uniform_sender, skywalking_uniform_sender, + datadog_uniform_sender, capture_mode: candidate_config.capture_mode, packet_sequence_uniform_output, // Enterprise Edition Feature: packet-sequence packet_sequence_uniform_sender, // Enterprise Edition Feature: packet-sequence @@ -2884,6 +2913,7 @@ impl AgentComponents { self.proc_event_uniform_sender.start(); self.application_log_uniform_sender.start(); self.skywalking_uniform_sender.start(); + self.datadog_uniform_sender.start(); if self.config.metric_server.enabled { self.metrics_server_component.start(); } @@ -2959,6 +2989,9 @@ impl AgentComponents { if let Some(h) = self.skywalking_uniform_sender.notify_stop() { join_handles.push(h); } + if let Some(h) = self.datadog_uniform_sender.notify_stop() { + join_handles.push(h); + } // Enterprise Edition Feature: packet-sequence if let Some(h) = self.packet_sequence_uniform_sender.notify_stop() { join_handles.push(h); diff --git a/message/flow_log.proto b/message/flow_log.proto index ee06ea9c7780..52e0d7858190 100644 --- a/message/flow_log.proto +++ b/message/flow_log.proto @@ -296,8 +296,10 @@ message MqttTopic { int32 qos = 2; // -1 mean not exist qos } -message SkyWalkingExtra { +message ThirdPartyTrace { bytes data = 1; bytes peer_ip = 2; string uri = 3; + repeated string extend_keys = 4; + repeated string extend_values = 5; } diff --git a/server/go.mod b/server/go.mod index f7c3509f7a35..d46528a329f9 100644 --- a/server/go.mod +++ b/server/go.mod @@ -14,6 +14,7 @@ replace ( github.com/deepflowio/deepflow/server/controller/http/service/configuration => ./controller/http/service/configuration github.com/deepflowio/deepflow/server/controller/monitor/license => ./controller/monitor/license github.com/deepflowio/deepflow/server/ingester/config/configdefaults => ./ingester/config/configdefaults + github.com/deepflowio/deepflow/server/ingester/flow_log/log_data/dd_import => ./ingester/flow_log/log_data/dd_import github.com/deepflowio/deepflow/server/ingester/flow_log/log_data/sw_import => ./ingester/flow_log/log_data/sw_import github.com/deepflowio/deepflow/server/libs/logger/blocker => ./libs/logger/blocker github.com/deepflowio/deepflow/server/querier/app/distributed_tracing/service/tracemap => ./querier/app/distributed_tracing/service/tracemap @@ -106,6 +107,7 @@ require ( github.com/bytedance/sonic v1.11.8 github.com/deepflowio/deepflow/server/controller/http/appender v0.0.0-00010101000000-000000000000 github.com/deepflowio/deepflow/server/controller/http/service/agentlicense v0.0.0-00010101000000-000000000000 + github.com/deepflowio/deepflow/server/ingester/flow_log/log_data/dd_import v0.0.0-00010101000000-000000000000 github.com/deepflowio/deepflow/server/ingester/flow_log/log_data/sw_import v0.0.0-00010101000000-000000000000 github.com/deepflowio/deepflow/server/libs/logger/blocker v0.0.0-20240822020041-cdaf0f82ce6f github.com/deepflowio/deepflow/server/querier/app/distributed_tracing/service/tracemap v0.0.0-00010101000000-000000000000 diff --git a/server/ingester/flow_log/decoder/decoder.go b/server/ingester/flow_log/decoder/decoder.go index 00d790542c85..7c69e1f94971 100644 --- a/server/ingester/flow_log/decoder/decoder.go +++ b/server/ingester/flow_log/decoder/decoder.go @@ -35,6 +35,7 @@ import ( "github.com/deepflowio/deepflow/server/ingester/flow_log/config" "github.com/deepflowio/deepflow/server/ingester/flow_log/dbwriter" "github.com/deepflowio/deepflow/server/ingester/flow_log/log_data" + "github.com/deepflowio/deepflow/server/ingester/flow_log/log_data/dd_import" "github.com/deepflowio/deepflow/server/ingester/flow_log/log_data/sw_import" "github.com/deepflowio/deepflow/server/ingester/flow_log/throttler" "github.com/deepflowio/deepflow/server/ingester/flow_tag" @@ -154,7 +155,8 @@ func (d *Decoder) Run() { decoder := &codec.SimpleDecoder{} pbTaggedFlow := pb.NewTaggedFlow() pbTracesData := &v1.TracesData{} - pbSkywalkingData := &pb.SkyWalkingExtra{} + pbSkywalkingData := &pb.ThirdPartyTrace{} + pbDdTrace := &pb.ThirdPartyTrace{} for { n := d.inQueue.Gets(buffer) start := time.Now() @@ -185,6 +187,8 @@ func (d *Decoder) Run() { d.handleL4Packet(decoder) case datatype.MESSAGE_TYPE_SKYWALKING: d.handleSkyWalking(decoder, pbSkywalkingData, false) + case datatype.MESSAGE_TYPE_DATADOG: + d.handleDatadog(decoder, pbDdTrace, false) default: log.Warningf("unknown msg type: %d", d.msgType) @@ -283,10 +287,12 @@ func (d *Decoder) sendOpenMetetry(tracesData *v1.TracesData) { } } -func (d *Decoder) handleSkyWalking(decoder *codec.SimpleDecoder, pbSkyWalkingData *pb.SkyWalkingExtra, compressed bool) { +func (d *Decoder) handleSkyWalking(decoder *codec.SimpleDecoder, pbSkyWalkingData *pb.ThirdPartyTrace, compressed bool) { var err error + buffer := log_data.GetBuffer() for !decoder.IsEnd() { pbSkyWalkingData.Reset() + pbSkyWalkingData.Data = buffer.Bytes() bytes := decoder.ReadBytes() if len(bytes) > 0 { // universal compression @@ -305,6 +311,7 @@ func (d *Decoder) handleSkyWalking(decoder *codec.SimpleDecoder, pbSkyWalkingDat continue } d.sendSkyWalking(pbSkyWalkingData.Data, pbSkyWalkingData.PeerIp, pbSkyWalkingData.Uri) + log_data.PutBuffer(buffer) } } @@ -329,6 +336,55 @@ func (d *Decoder) sendSkyWalking(segmentData, peerIP []byte, uri string) { } } +func (d *Decoder) handleDatadog(decoder *codec.SimpleDecoder, pbThirdPartyData *pb.ThirdPartyTrace, compressed bool) { + var err error + buffer := log_data.GetBuffer() + for !decoder.IsEnd() { + pbThirdPartyData.Reset() + pbThirdPartyData.Data = buffer.Bytes() + bytes := decoder.ReadBytes() + if len(bytes) > 0 { + // universal compression + if compressed { + bytes, err = decompressOpenTelemetry(bytes) + } + if err == nil { + err = proto.Unmarshal(bytes, pbThirdPartyData) + } + } + if decoder.Failed() || err != nil { + if d.counter.ErrorCount == 0 { + log.Errorf("datadog data decode failed, offset=%d len=%d err: %s", decoder.Offset(), len(decoder.Bytes()), err) + } + d.counter.ErrorCount++ + continue + } + d.sendDatadog(pbThirdPartyData) + log_data.PutBuffer(buffer) + } +} + +func (d *Decoder) sendDatadog(ddogData *pb.ThirdPartyTrace) { + if d.debugEnabled { + log.Debugf("decoder %d vtap %d recv datadog data length: %d", d.index, d.agentId, len(ddogData.Data)) + } + d.counter.Count++ + ls := dd_import.DDogDataToL7FlowLogs(d.agentId, d.orgId, d.teamId, ddogData, d.platformData, d.cfg) + for _, l := range ls { + l.AddReferenceCount() + if !d.throttler.SendWithThrottling(l) { + d.counter.DropCount++ + } else { + d.fieldsBuf, d.fieldValuesBuf = d.fieldsBuf[:0], d.fieldValuesBuf[:0] + l.GenerateNewFlowTags(d.flowTagWriter.Cache) + d.flowTagWriter.WriteFieldsAndFieldValuesInCache() + d.appServiceTagWrite(l) + d.spanWrite(l) + } + l.Release() + } +} + func (d *Decoder) handleL4Packet(decoder *codec.SimpleDecoder) { for !decoder.IsEnd() { l4Packet, err := log_data.DecodePacketSequence(d.agentId, d.orgId, d.teamId, decoder) diff --git a/server/ingester/flow_log/flow_log/flow_log.go b/server/ingester/flow_log/flow_log/flow_log.go index 902341c43210..c2964121a0a9 100644 --- a/server/ingester/flow_log/flow_log/flow_log.go +++ b/server/ingester/flow_log/flow_log/flow_log.go @@ -55,6 +55,7 @@ type FlowLog struct { OtelCompressedLogger *Logger L4PacketLogger *Logger SkyWalkingLogger *Logger + DdogLogger *Logger Exporters *exporters.Exporters SpanWriter *dbwriter.SpanWriter TraceTreeWriter *dbwriter.TraceTreeWriter @@ -123,6 +124,10 @@ func NewFlowLog(config *config.Config, traceTreeQueue *queue.OverwriteQueue, rec if err != nil { return nil, err } + ddogLogger, err := NewLogger(datatype.MESSAGE_TYPE_DATADOG, config, platformDataManager, manager, recv, flowLogWriter, common.L7_FLOW_ID, nil, spanWriter) + if err != nil { + return nil, err + } return &FlowLog{ FlowLogConfig: config, L4FlowLogger: l4FlowLogger, @@ -131,6 +136,7 @@ func NewFlowLog(config *config.Config, traceTreeQueue *queue.OverwriteQueue, rec OtelCompressedLogger: otelCompressedLogger, L4PacketLogger: l4PacketLogger, SkyWalkingLogger: skywalkingLogger, + DdogLogger: ddogLogger, Exporters: exporters, SpanWriter: spanWriter, TraceTreeWriter: traceTreeWriter, @@ -365,6 +371,9 @@ func (s *FlowLog) Start() { if s.SkyWalkingLogger != nil { s.SkyWalkingLogger.Start() } + if s.DdogLogger != nil { + s.DdogLogger.Start() + } if s.SpanWriter != nil { s.SpanWriter.Start() } @@ -392,6 +401,9 @@ func (s *FlowLog) Close() error { if s.SkyWalkingLogger != nil { s.SkyWalkingLogger.Close() } + if s.DdogLogger != nil { + s.DdogLogger.Close() + } if s.SpanWriter != nil { s.SpanWriter.Close() } diff --git a/server/ingester/flow_log/log_data/dd_import/dd_import.go b/server/ingester/flow_log/log_data/dd_import/dd_import.go new file mode 100644 index 000000000000..b2d077974bea --- /dev/null +++ b/server/ingester/flow_log/log_data/dd_import/dd_import.go @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2024 Yunshan Networks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dd_import + +import ( + flowlogCfg "github.com/deepflowio/deepflow/server/ingester/flow_log/config" + "github.com/deepflowio/deepflow/server/ingester/flow_log/log_data" + "github.com/deepflowio/deepflow/server/libs/datatype/pb" + "github.com/deepflowio/deepflow/server/libs/grpc" +) + +func DDogDataToL7FlowLogs(vtapID, orgId, teamId uint16, pbThirdPartyData *pb.ThirdPartyTrace, platformData *grpc.PlatformInfoTable, cfg *flowlogCfg.Config) []*log_data.L7FlowLog { + return []*log_data.L7FlowLog{} +} diff --git a/server/ingester/flow_log/log_data/dd_import/go.mod b/server/ingester/flow_log/log_data/dd_import/go.mod new file mode 100644 index 000000000000..0b22d3230774 --- /dev/null +++ b/server/ingester/flow_log/log_data/dd_import/go.mod @@ -0,0 +1,3 @@ +module github.com/deepflowio/deepflow/server/ingester/flow_log/log_data/dd_import + +go 1.18 \ No newline at end of file diff --git a/server/ingester/flow_log/log_data/utils.go b/server/ingester/flow_log/log_data/utils.go index 2c5acf6f5c9a..63d79ebc00d1 100644 --- a/server/ingester/flow_log/log_data/utils.go +++ b/server/ingester/flow_log/log_data/utils.go @@ -17,9 +17,11 @@ package log_data import ( + "bytes" "fmt" "net" "strings" + "sync" ) func IPIntToString(ipInt uint32) string { @@ -39,3 +41,19 @@ func ParseUrlPath(rawURL string) (string, error) { return parts[1][pathStart:], nil } + +var bufferPool = sync.Pool{ + New: func() any { + return new(bytes.Buffer) + }, +} + +func GetBuffer() *bytes.Buffer { + buffer := bufferPool.Get().(*bytes.Buffer) + buffer.Reset() + return buffer +} + +func PutBuffer(buffer *bytes.Buffer) { + bufferPool.Put(buffer) +} diff --git a/server/libs/datatype/droplet-message.go b/server/libs/datatype/droplet-message.go index b529fdf7f1e0..c3a09e8c864c 100644 --- a/server/libs/datatype/droplet-message.go +++ b/server/libs/datatype/droplet-message.go @@ -55,7 +55,8 @@ const ( MESSAGE_TYPE_K8S_EVENT MESSAGE_TYPE_APPLICATION_LOG MESSAGE_TYPE_AGENT_LOG - MESSAGE_TYPE_SKYWALKING // 19 + MESSAGE_TYPE_SKYWALKING + MESSAGE_TYPE_DATADOG // 20 MESSAGE_TYPE_MAX ) @@ -82,6 +83,7 @@ var MessageTypeString = [MESSAGE_TYPE_MAX]string{ MESSAGE_TYPE_APPLICATION_LOG: "application_log", MESSAGE_TYPE_AGENT_LOG: "agent_log", MESSAGE_TYPE_SKYWALKING: "skywalking", + MESSAGE_TYPE_DATADOG: "datadog", } func (m MessageType) String() string { @@ -126,6 +128,7 @@ var MessageHeaderTypes = [MESSAGE_TYPE_MAX]MessageHeaderType{ MESSAGE_TYPE_APPLICATION_LOG: HEADER_TYPE_LT_VTAP, MESSAGE_TYPE_AGENT_LOG: HEADER_TYPE_LT_VTAP, MESSAGE_TYPE_SKYWALKING: HEADER_TYPE_LT_VTAP, + MESSAGE_TYPE_DATADOG: HEADER_TYPE_LT_VTAP, } func (m MessageType) HeaderType() MessageHeaderType {