Skip to content

Commit

Permalink
feat: support dd integration
Browse files Browse the repository at this point in the history
  • Loading branch information
taloric committed Nov 15, 2024
1 parent 898d085 commit 153eee4
Show file tree
Hide file tree
Showing 10 changed files with 158 additions and 7 deletions.
2 changes: 2 additions & 0 deletions agent/crates/public/src/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub enum SendMessageType {
ApplicationLog = 17,
SyslogDetail = 18,
SkyWalking = 19,
Datadog = 20,
}

impl fmt::Display for SendMessageType {
Expand All @@ -80,6 +81,7 @@ impl fmt::Display for SendMessageType {
Self::ApplicationLog => write!(f, "application_log"),
Self::SyslogDetail => write!(f, "syslog_detail"),
Self::SkyWalking => write!(f, "skywalking"),
Self::Datadog => write!(f, "datadog"),
}
}
}
2 changes: 1 addition & 1 deletion agent/plugins/integration_skywalking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use public::{
use std::net::SocketAddr;

#[derive(Debug, PartialEq)]
pub struct SkyWalkingExtra(pub flow_log::SkyWalkingExtra);
pub struct SkyWalkingExtra(pub flow_log::ThirdPartyTrace);

impl Sendable for SkyWalkingExtra {
fn encode(self, _: &mut Vec<u8>) -> Result<usize, EncodeError> {
Expand Down
51 changes: 51 additions & 0 deletions agent/src/integration_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ use public::{
l7_protocol::L7Protocol,
proto::{
agent::Exception,
flow_log,
integration::opentelemetry::proto::{
common::v1::{
any_value::Value::{IntValue, StringValue},
Expand Down Expand Up @@ -226,6 +227,19 @@ async fn aggregate_with_catch_exception(
})
}

#[derive(Debug, PartialEq)]
pub struct Datadog(flow_log::ThirdPartyTrace);

impl Sendable for Datadog {
fn encode(mut self, buf: &mut Vec<u8>) -> Result<usize, prost::EncodeError> {
self.0.encode(buf).map(|_| self.0.encoded_len())
}

fn message_type(&self) -> SendMessageType {
SendMessageType::Datadog
}
}

// for log capture from vector
#[derive(Debug, PartialEq)]
pub struct ApplicationLog(Vec<u8>);
Expand Down Expand Up @@ -599,6 +613,7 @@ async fn handler(
profile_sender: DebugSender<Profile>,
application_log_sender: DebugSender<ApplicationLog>,
skywalking_sender: DebugSender<SkyWalkingExtra>,
datadog_sender: DebugSender<Datadog>,
exception_handler: ExceptionHandler,
compressed: bool,
profile_compressed: bool,
Expand Down Expand Up @@ -862,6 +877,35 @@ async fn handler(
)
.await)
}
// TODO: confirm regex/fuzzy path
// path: /api/v0.2,v0.3,v0.4,v0.5,v0.7,
// TODO: repeated code, consider make it common
(&Method::POST, "/v0.4/traces") => {
if external_trace_integration_disabled {
return Ok(Response::builder().body(Body::empty()).unwrap());
}
let (part, body) = req.into_parts();
let whole_body = match aggregate_with_catch_exception(body, &exception_handler).await {
Ok(b) => b,
Err(e) => {
return Ok(e);
}
};
let datadog = decode_metric(whole_body, &part.headers)?;
let mut third_party_data = flow_log::ThirdPartyTrace::default();
third_party_data.data = datadog;
third_party_data.uri = part.uri.path().to_string();
third_party_data.peer_ip = match peer_addr.ip() {
IpAddr::V4(ip4) => ip4.octets().to_vec(),
IpAddr::V6(ip6) => ip6.octets().to_vec(),
};

if let Err(e) = datadog_sender.send(Datadog(third_party_data)) {
warn!("datadog_sender failed to send data, because {:?}", e);
}

Ok(Response::builder().body(Body::empty()).unwrap())
}
// Return the 404 Not Found for other routes.
_ => Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
Expand Down Expand Up @@ -963,6 +1007,7 @@ pub struct MetricServer {
profile_sender: DebugSender<Profile>,
application_log_sender: DebugSender<ApplicationLog>,
skywalking_sender: DebugSender<SkyWalkingExtra>,
datadog_sender: DebugSender<Datadog>,
port: Arc<AtomicU16>,
exception_handler: ExceptionHandler,
server_shutdown_tx: Mutex<Option<mpsc::Sender<()>>>,
Expand Down Expand Up @@ -991,6 +1036,7 @@ impl MetricServer {
profile_sender: DebugSender<Profile>,
application_log_sender: DebugSender<ApplicationLog>,
skywalking_sender: DebugSender<SkyWalkingExtra>,
datadog_sender: DebugSender<Datadog>,
port: u16,
exception_handler: ExceptionHandler,
compressed: bool,
Expand Down Expand Up @@ -1020,6 +1066,7 @@ impl MetricServer {
profile_sender,
application_log_sender,
skywalking_sender,
datadog_sender,
port: Arc::new(AtomicU16::new(port)),
exception_handler,
server_shutdown_tx: Default::default(),
Expand Down Expand Up @@ -1070,6 +1117,7 @@ impl MetricServer {
let profile_sender = self.profile_sender.clone();
let application_log_sender = self.application_log_sender.clone();
let skywalking_sender = self.skywalking_sender.clone();
let datadog_sender = self.datadog_sender.clone();
let port = self.port.clone();
let monitor_port = Arc::new(AtomicU16::new(port.load(Ordering::Acquire)));
let (mon_tx, mon_rx) = oneshot::channel();
Expand Down Expand Up @@ -1143,6 +1191,7 @@ impl MetricServer {
let profile_sender = profile_sender.clone();
let application_log_sender = application_log_sender.clone();
let skywalking_sender = skywalking_sender.clone();
let datadog_sender = datadog_sender.clone();
let exception_handler_inner = exception_handler.clone();
let counter = counter.clone();
let compressed = compressed.clone();
Expand All @@ -1161,6 +1210,7 @@ impl MetricServer {
let profile_sender = profile_sender.clone();
let application_log_sender = application_log_sender.clone();
let skywalking_sender = skywalking_sender.clone();
let datadog_sender = datadog_sender.clone();
let exception_handler = exception_handler_inner.clone();
let peer_addr = conn.remote_addr();
let counter = counter.clone();
Expand All @@ -1185,6 +1235,7 @@ impl MetricServer {
profile_sender.clone(),
application_log_sender.clone(),
skywalking_sender.clone(),
datadog_sender.clone(),
exception_handler.clone(),
compressed.load(Ordering::Relaxed),
profile_compressed.load(Ordering::Relaxed),
Expand Down
33 changes: 31 additions & 2 deletions agent/src/trident.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ use crate::{
},
handler::{NpbBuilder, PacketHandlerBuilder},
integration_collector::{
ApplicationLog, BoxedPrometheusExtra, MetricServer, OpenTelemetry, OpenTelemetryCompressed,
Profile, TelegrafMetric,
ApplicationLog, BoxedPrometheusExtra, Datadog, MetricServer, OpenTelemetry,
OpenTelemetryCompressed, Profile, TelegrafMetric,
},
metric::document::BoxedDocument,
monitor::Monitor,
Expand Down Expand Up @@ -1568,6 +1568,7 @@ pub struct AgentComponents {
pub proc_event_uniform_sender: UniformSenderThread<BoxedProcEvents>,
pub application_log_uniform_sender: UniformSenderThread<ApplicationLog>,
pub skywalking_uniform_sender: UniformSenderThread<SkyWalkingExtra>,
pub datadog_uniform_sender: UniformSenderThread<Datadog>,
pub exception_handler: ExceptionHandler,
pub proto_log_sender: DebugSender<BoxAppProtoLogsData>,
pub pcap_batch_sender: DebugSender<BoxedPcapBatch>,
Expand Down Expand Up @@ -2467,6 +2468,32 @@ impl AgentComponents {
None,
);

let datadog_queue_name = "1-datadog-to-sender";
let (datadog_sender, datadog_receiver, counter) = queue::bounded_with_debug(
user_config
.processors
.flow_log
.tunning
.flow_aggregator_queue_size,
datadog_queue_name,
&queue_debugger,
);
stats_collector.register_countable(
&QueueStats {
module: datadog_queue_name,
..Default::default()
},
Countable::Owned(Box::new(counter)),
);
let datadog_uniform_sender = UniformSenderThread::new(
datadog_queue_name,
Arc::new(datadog_receiver),
config_handler.sender(),
stats_collector.clone(),
exception_handler.clone(),
None,
);

let ebpf_dispatcher_id = dispatcher_components.len();
#[cfg(any(target_os = "linux", target_os = "android"))]
let mut ebpf_dispatcher_component = None;
Expand Down Expand Up @@ -2710,6 +2737,7 @@ impl AgentComponents {
profile_sender,
application_log_sender,
skywalking_sender,
datadog_sender,
candidate_config.metric_server.port,
exception_handler.clone(),
candidate_config.metric_server.compressed,
Expand Down Expand Up @@ -2795,6 +2823,7 @@ impl AgentComponents {
proc_event_uniform_sender,
application_log_uniform_sender,
skywalking_uniform_sender,
datadog_uniform_sender,
capture_mode: candidate_config.capture_mode,
packet_sequence_uniform_output, // Enterprise Edition Feature: packet-sequence
packet_sequence_uniform_sender, // Enterprise Edition Feature: packet-sequence
Expand Down
2 changes: 1 addition & 1 deletion message/flow_log.proto
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ message MqttTopic {
int32 qos = 2; // -1 mean not exist qos
}

message SkyWalkingExtra {
message ThirdPartyTrace {
bytes data = 1;
bytes peer_ip = 2;
string uri = 3;
Expand Down
2 changes: 2 additions & 0 deletions server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ replace (
github.com/deepflowio/deepflow/server/controller/http/service/configuration => ./controller/http/service/configuration
github.com/deepflowio/deepflow/server/controller/monitor/license => ./controller/monitor/license
github.com/deepflowio/deepflow/server/ingester/config/configdefaults => ./ingester/config/configdefaults
github.com/deepflowio/deepflow/server/ingester/flow_log/log_data/dd_import => ./ingester/flow_log/log_data/dd_import
github.com/deepflowio/deepflow/server/ingester/flow_log/log_data/sw_import => ./ingester/flow_log/log_data/sw_import
github.com/deepflowio/deepflow/server/libs/logger/blocker => ./libs/logger/blocker
github.com/deepflowio/deepflow/server/querier/app/distributed_tracing/service/tracemap => ./querier/app/distributed_tracing/service/tracemap
Expand Down Expand Up @@ -106,6 +107,7 @@ require (
github.com/bytedance/sonic v1.11.8
github.com/deepflowio/deepflow/server/controller/http/appender v0.0.0-00010101000000-000000000000
github.com/deepflowio/deepflow/server/controller/http/service/agentlicense v0.0.0-00010101000000-000000000000
github.com/deepflowio/deepflow/server/ingester/flow_log/log_data/dd_import v0.0.0-00010101000000-000000000000
github.com/deepflowio/deepflow/server/ingester/flow_log/log_data/sw_import v0.0.0-00010101000000-000000000000
github.com/deepflowio/deepflow/server/libs/logger/blocker v0.0.0-20240822020041-cdaf0f82ce6f
github.com/deepflowio/deepflow/server/querier/app/distributed_tracing/service/tracemap v0.0.0-00010101000000-000000000000
Expand Down
54 changes: 52 additions & 2 deletions server/ingester/flow_log/decoder/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/deepflowio/deepflow/server/ingester/flow_log/config"
"github.com/deepflowio/deepflow/server/ingester/flow_log/dbwriter"
"github.com/deepflowio/deepflow/server/ingester/flow_log/log_data"
"github.com/deepflowio/deepflow/server/ingester/flow_log/log_data/dd_import"
"github.com/deepflowio/deepflow/server/ingester/flow_log/log_data/sw_import"
"github.com/deepflowio/deepflow/server/ingester/flow_log/throttler"
"github.com/deepflowio/deepflow/server/ingester/flow_tag"
Expand Down Expand Up @@ -154,7 +155,8 @@ func (d *Decoder) Run() {
decoder := &codec.SimpleDecoder{}
pbTaggedFlow := pb.NewTaggedFlow()
pbTracesData := &v1.TracesData{}
pbSkywalkingData := &pb.SkyWalkingExtra{}
pbSkywalkingData := &pb.ThirdPartyTrace{}
pbDdTrace := &pb.ThirdPartyTrace{}
for {
n := d.inQueue.Gets(buffer)
start := time.Now()
Expand Down Expand Up @@ -185,6 +187,8 @@ func (d *Decoder) Run() {
d.handleL4Packet(decoder)
case datatype.MESSAGE_TYPE_SKYWALKING:
d.handleSkyWalking(decoder, pbSkywalkingData, false)
case datatype.MESSAGE_TYPE_DATADOG:
d.handleDatadog(decoder, pbDdTrace, false)
default:
log.Warningf("unknown msg type: %d", d.msgType)

Expand Down Expand Up @@ -283,7 +287,53 @@ func (d *Decoder) sendOpenMetetry(tracesData *v1.TracesData) {
}
}

func (d *Decoder) handleSkyWalking(decoder *codec.SimpleDecoder, pbSkyWalkingData *pb.SkyWalkingExtra, compressed bool) {
func (d *Decoder) handleDatadog(decoder *codec.SimpleDecoder, pbThirdPartyData *pb.ThirdPartyTrace, compressed bool) {
var err error
for !decoder.IsEnd() {
bytes := decoder.ReadBytes()
pbThirdPartyData.Reset()
if len(bytes) > 0 {
// universal compression
if compressed {
bytes, err = decompressOpenTelemetry(bytes)
}
if err == nil {
err = proto.Unmarshal(bytes, pbThirdPartyData)
}
}
if decoder.Failed() || err != nil {
if d.counter.ErrorCount == 0 {
log.Errorf("skywalking data decode failed, offset=%d len=%d err: %s", decoder.Offset(), len(decoder.Bytes()), err)
}
d.counter.ErrorCount++
continue
}
d.sendDatadog(pbThirdPartyData.Data, pbThirdPartyData.PeerIp, pbThirdPartyData.Uri)
}
}

func (d *Decoder) sendDatadog(ddogdata, peerIP []byte, uri string) {
if d.debugEnabled {
log.Debugf("decoder %d vtap %d recv skywalking data length: %d", d.index, d.agentId, len(ddogdata))
}
d.counter.Count++
ls := dd_import.DDogDataToL7FlowLogs(d.agentId, d.orgId, d.teamId, ddogdata, peerIP, uri, d.platformData, d.cfg)
for _, l := range ls {
l.AddReferenceCount()
if !d.throttler.SendWithThrottling(l) {
d.counter.DropCount++
} else {
d.fieldsBuf, d.fieldValuesBuf = d.fieldsBuf[:0], d.fieldValuesBuf[:0]
l.GenerateNewFlowTags(d.flowTagWriter.Cache)
d.flowTagWriter.WriteFieldsAndFieldValuesInCache()
d.appServiceTagWrite(l)
d.spanWrite(l)
}
l.Release()
}
}

func (d *Decoder) handleSkyWalking(decoder *codec.SimpleDecoder, pbSkyWalkingData *pb.ThirdPartyTrace, compressed bool) {
var err error
for !decoder.IsEnd() {
pbSkyWalkingData.Reset()
Expand Down
11 changes: 11 additions & 0 deletions server/ingester/flow_log/log_data/dd_import/dd_import.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package dd_import

import (
flowlogCfg "github.com/deepflowio/deepflow/server/ingester/flow_log/config"
"github.com/deepflowio/deepflow/server/ingester/flow_log/log_data"
"github.com/deepflowio/deepflow/server/libs/grpc"
)

func DDogDataToL7FlowLogs(vtapID, orgId, teamId uint16, ddog, peerIP []byte, uri string, platformData *grpc.PlatformInfoTable, cfg *flowlogCfg.Config) []*log_data.L7FlowLog {
return []*log_data.L7FlowLog{}
}
3 changes: 3 additions & 0 deletions server/ingester/flow_log/log_data/dd_import/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module github.com/deepflowio/deepflow/server/ingester/flow_log/log_data/dd_import

go 1.18
5 changes: 4 additions & 1 deletion server/libs/datatype/droplet-message.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ const (
MESSAGE_TYPE_K8S_EVENT
MESSAGE_TYPE_APPLICATION_LOG
MESSAGE_TYPE_AGENT_LOG
MESSAGE_TYPE_SKYWALKING // 19
MESSAGE_TYPE_SKYWALKING
MESSAGE_TYPE_DATADOG // 20
MESSAGE_TYPE_MAX
)

Expand All @@ -82,6 +83,7 @@ var MessageTypeString = [MESSAGE_TYPE_MAX]string{
MESSAGE_TYPE_APPLICATION_LOG: "application_log",
MESSAGE_TYPE_AGENT_LOG: "agent_log",
MESSAGE_TYPE_SKYWALKING: "skywalking",
MESSAGE_TYPE_DATADOG: "datadog",
}

func (m MessageType) String() string {
Expand Down Expand Up @@ -126,6 +128,7 @@ var MessageHeaderTypes = [MESSAGE_TYPE_MAX]MessageHeaderType{
MESSAGE_TYPE_APPLICATION_LOG: HEADER_TYPE_LT_VTAP,
MESSAGE_TYPE_AGENT_LOG: HEADER_TYPE_LT_VTAP,
MESSAGE_TYPE_SKYWALKING: HEADER_TYPE_LT_VTAP,
MESSAGE_TYPE_DATADOG: HEADER_TYPE_LT_VTAP,
}

func (m MessageType) HeaderType() MessageHeaderType {
Expand Down

0 comments on commit 153eee4

Please sign in to comment.