From 9bcb5a622f2ab2c47e7db51b9c09dfa2a4e7f45b Mon Sep 17 00:00:00 2001 From: taloric Date: Mon, 21 Oct 2024 17:00:11 +0800 Subject: [PATCH] fix: use sw uri to identify json/grpc request --- .../plugins/integration_skywalking/src/lib.rs | 12 +++++++++++ agent/src/integration_collector.rs | 21 +++++++++++++++---- message/flow_log.proto | 1 + server/ingester/flow_log/decoder/decoder.go | 7 ++++--- .../flow_log/log_data/sw_import/sw_import.go | 2 +- 5 files changed, 35 insertions(+), 8 deletions(-) diff --git a/agent/plugins/integration_skywalking/src/lib.rs b/agent/plugins/integration_skywalking/src/lib.rs index f9d28197c1f..7812401abbf 100644 --- a/agent/plugins/integration_skywalking/src/lib.rs +++ b/agent/plugins/integration_skywalking/src/lib.rs @@ -47,3 +47,15 @@ pub async fn handle_skywalking_request( .body(Body::empty()) .unwrap() } + +pub async fn handle_skywalking_streaming_request( + _: SocketAddr, + _: Body, + _: &str, + _: DebugSender, +) -> Response { + Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::empty()) + .unwrap() +} diff --git a/agent/src/integration_collector.rs b/agent/src/integration_collector.rs index f9ac3ed3d6d..dd64bc783c7 100644 --- a/agent/src/integration_collector.rs +++ b/agent/src/integration_collector.rs @@ -61,7 +61,9 @@ use crate::{ policy::PolicyGetter, }; -use integration_skywalking::{handle_skywalking_request, SkyWalkingExtra}; +use integration_skywalking::{ + handle_skywalking_request, handle_skywalking_streaming_request, SkyWalkingExtra, +}; use public::{ counter::{Counter, CounterType, CounterValue, OwnedCountable}, enums::{CaptureNetworkType, EthernetType, L4Protocol}, @@ -829,9 +831,7 @@ async fn handler( } ( &Method::POST, - "/v3/segments" - | "/skywalking.v3.TraceSegmentReportService/collect" - | "/skywalking.v3.TraceSegmentReportService/collectInSync", + "/v3/segments" | "/skywalking.v3.TraceSegmentReportService/collectInSync", ) => { if external_trace_integration_disabled { return Ok(Response::builder().body(Body::empty()).unwrap()); @@ -849,6 +849,19 @@ async fn handler( .await, ) } + (&Method::POST, "/skywalking.v3.TraceSegmentReportService/collect") => { + if external_trace_integration_disabled { + return Ok(Response::builder().body(Body::empty()).unwrap()); + } + let (part, body) = req.into_parts(); + Ok(handle_skywalking_streaming_request( + peer_addr, + body, + part.uri.path(), + skywalking_sender, + ) + .await) + } // Return the 404 Not Found for other routes. _ => Ok(Response::builder() .status(StatusCode::NOT_FOUND) diff --git a/message/flow_log.proto b/message/flow_log.proto index 77737f71d3e..ee06ea9c778 100644 --- a/message/flow_log.proto +++ b/message/flow_log.proto @@ -299,4 +299,5 @@ message MqttTopic { message SkyWalkingExtra { bytes data = 1; bytes peer_ip = 2; + string uri = 3; } diff --git a/server/ingester/flow_log/decoder/decoder.go b/server/ingester/flow_log/decoder/decoder.go index e5e06d4ec2b..00d790542c8 100644 --- a/server/ingester/flow_log/decoder/decoder.go +++ b/server/ingester/flow_log/decoder/decoder.go @@ -304,15 +304,16 @@ func (d *Decoder) handleSkyWalking(decoder *codec.SimpleDecoder, pbSkyWalkingDat d.counter.ErrorCount++ continue } - d.sendSkyWalking(pbSkyWalkingData.Data, pbSkyWalkingData.PeerIp) + d.sendSkyWalking(pbSkyWalkingData.Data, pbSkyWalkingData.PeerIp, pbSkyWalkingData.Uri) } } -func (d *Decoder) sendSkyWalking(segmentData, peerIP []byte) { + +func (d *Decoder) sendSkyWalking(segmentData, peerIP []byte, uri string) { if d.debugEnabled { log.Debugf("decoder %d vtap %d recv skywalking data length: %d", d.index, d.agentId, len(segmentData)) } d.counter.Count++ - ls := sw_import.SkyWalkingDataToL7FlowLogs(d.agentId, d.orgId, d.teamId, segmentData, peerIP, d.platformData, d.cfg) + ls := sw_import.SkyWalkingDataToL7FlowLogs(d.agentId, d.orgId, d.teamId, segmentData, peerIP, uri, d.platformData, d.cfg) for _, l := range ls { l.AddReferenceCount() if !d.throttler.SendWithThrottling(l) { diff --git a/server/ingester/flow_log/log_data/sw_import/sw_import.go b/server/ingester/flow_log/log_data/sw_import/sw_import.go index 1a96f718f88..a416e0f2aad 100644 --- a/server/ingester/flow_log/log_data/sw_import/sw_import.go +++ b/server/ingester/flow_log/log_data/sw_import/sw_import.go @@ -22,6 +22,6 @@ import ( "github.com/deepflowio/deepflow/server/libs/grpc" ) -func SkyWalkingDataToL7FlowLogs(vtapID, orgId, teamId uint16, segmentData, peerIP []byte, platformData *grpc.PlatformInfoTable, cfg *flowlogCfg.Config) []*log_data.L7FlowLog { +func SkyWalkingDataToL7FlowLogs(vtapID, orgId, teamId uint16, segmentData, peerIP []byte, uri string, platformData *grpc.PlatformInfoTable, cfg *flowlogCfg.Config) []*log_data.L7FlowLog { return []*log_data.L7FlowLog{} }