Skip to content

Commit

Permalink
fix: use sw uri to identify json/grpc request
Browse files Browse the repository at this point in the history
  • Loading branch information
taloric committed Oct 24, 2024
1 parent 24f651b commit 9bcb5a6
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 8 deletions.
12 changes: 12 additions & 0 deletions agent/plugins/integration_skywalking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,15 @@ pub async fn handle_skywalking_request(
.body(Body::empty())
.unwrap()
}

pub async fn handle_skywalking_streaming_request(
_: SocketAddr,
_: Body,
_: &str,
_: DebugSender<SkyWalkingExtra>,
) -> Response<Body> {
Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::empty())
.unwrap()
}
21 changes: 17 additions & 4 deletions agent/src/integration_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ use crate::{
policy::PolicyGetter,
};

use integration_skywalking::{handle_skywalking_request, SkyWalkingExtra};
use integration_skywalking::{
handle_skywalking_request, handle_skywalking_streaming_request, SkyWalkingExtra,
};
use public::{
counter::{Counter, CounterType, CounterValue, OwnedCountable},
enums::{CaptureNetworkType, EthernetType, L4Protocol},
Expand Down Expand Up @@ -829,9 +831,7 @@ async fn handler(
}
(
&Method::POST,
"/v3/segments"
| "/skywalking.v3.TraceSegmentReportService/collect"
| "/skywalking.v3.TraceSegmentReportService/collectInSync",
"/v3/segments" | "/skywalking.v3.TraceSegmentReportService/collectInSync",
) => {
if external_trace_integration_disabled {
return Ok(Response::builder().body(Body::empty()).unwrap());
Expand All @@ -849,6 +849,19 @@ async fn handler(
.await,
)
}
(&Method::POST, "/skywalking.v3.TraceSegmentReportService/collect") => {
if external_trace_integration_disabled {
return Ok(Response::builder().body(Body::empty()).unwrap());
}
let (part, body) = req.into_parts();
Ok(handle_skywalking_streaming_request(
peer_addr,
body,
part.uri.path(),
skywalking_sender,
)
.await)
}
// Return the 404 Not Found for other routes.
_ => Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
Expand Down
1 change: 1 addition & 0 deletions message/flow_log.proto
Original file line number Diff line number Diff line change
Expand Up @@ -299,4 +299,5 @@ message MqttTopic {
message SkyWalkingExtra {
bytes data = 1;
bytes peer_ip = 2;
string uri = 3;
}
7 changes: 4 additions & 3 deletions server/ingester/flow_log/decoder/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,15 +304,16 @@ func (d *Decoder) handleSkyWalking(decoder *codec.SimpleDecoder, pbSkyWalkingDat
d.counter.ErrorCount++
continue
}
d.sendSkyWalking(pbSkyWalkingData.Data, pbSkyWalkingData.PeerIp)
d.sendSkyWalking(pbSkyWalkingData.Data, pbSkyWalkingData.PeerIp, pbSkyWalkingData.Uri)
}
}
func (d *Decoder) sendSkyWalking(segmentData, peerIP []byte) {

func (d *Decoder) sendSkyWalking(segmentData, peerIP []byte, uri string) {
if d.debugEnabled {
log.Debugf("decoder %d vtap %d recv skywalking data length: %d", d.index, d.agentId, len(segmentData))
}
d.counter.Count++
ls := sw_import.SkyWalkingDataToL7FlowLogs(d.agentId, d.orgId, d.teamId, segmentData, peerIP, d.platformData, d.cfg)
ls := sw_import.SkyWalkingDataToL7FlowLogs(d.agentId, d.orgId, d.teamId, segmentData, peerIP, uri, d.platformData, d.cfg)
for _, l := range ls {
l.AddReferenceCount()
if !d.throttler.SendWithThrottling(l) {
Expand Down
2 changes: 1 addition & 1 deletion server/ingester/flow_log/log_data/sw_import/sw_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ import (
"github.com/deepflowio/deepflow/server/libs/grpc"
)

func SkyWalkingDataToL7FlowLogs(vtapID, orgId, teamId uint16, segmentData, peerIP []byte, platformData *grpc.PlatformInfoTable, cfg *flowlogCfg.Config) []*log_data.L7FlowLog {
func SkyWalkingDataToL7FlowLogs(vtapID, orgId, teamId uint16, segmentData, peerIP []byte, uri string, platformData *grpc.PlatformInfoTable, cfg *flowlogCfg.Config) []*log_data.L7FlowLog {
return []*log_data.L7FlowLog{}
}

0 comments on commit 9bcb5a6

Please sign in to comment.