Skip to content

Commit

Permalink
feat: support skywalking integration
Browse files Browse the repository at this point in the history
  • Loading branch information
taloric committed Oct 12, 2024
1 parent 575d89d commit b73a2a8
Show file tree
Hide file tree
Showing 15 changed files with 152 additions and 9 deletions.
10 changes: 10 additions & 0 deletions agent/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ tonic = "0.8.1"
wasmtime = "12.0.1"
wasmtime-wasi = "12.0.1"
zstd = "0.13.2"
integration_skywalking = { path = "plugins/integration_skywalking" }


[target.'cfg(any(target_os = "linux", target_os = "android"))'.dependencies]
Expand Down
2 changes: 2 additions & 0 deletions agent/crates/public/src/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub enum SendMessageType {
// K8sEvent = 16,
ApplicationLog = 17,
SyslogDetail = 18,
SkyWalking = 19,
}

impl fmt::Display for SendMessageType {
Expand All @@ -78,6 +79,7 @@ impl fmt::Display for SendMessageType {
Self::AlarmEvent => write!(f, "alarm_event"),
Self::ApplicationLog => write!(f, "application_log"),
Self::SyslogDetail => write!(f, "syslog_detail"),
Self::SkyWalking => write!(f, "skywalking"),
}
}
}
9 changes: 9 additions & 0 deletions agent/plugins/integration_skywalking/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[package]
name = "integration_skywalking"
version = "0.1.0"
edition = "2021"

[dependencies]
hyper = { version = "0.14", features = ["full"] }
public = { path = "../../crates/public" }
prost = "0.11"
33 changes: 33 additions & 0 deletions agent/plugins/integration_skywalking/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use hyper::{Body, Response, StatusCode};
use prost::EncodeError;
use public::{
proto::flow_log,
queue::DebugSender,
sender::{SendMessageType, Sendable},
};
use std::net::SocketAddr;

#[derive(Debug, PartialEq)]
pub struct SkyWalkingExtra(pub flow_log::SkyWalkingExtra);

impl Sendable for SkyWalkingExtra {
fn encode(self, _: &mut Vec<u8>) -> Result<usize, EncodeError> {
return Ok(0);
}

fn message_type(&self) -> SendMessageType {
SendMessageType::SkyWalking
}
}

pub async fn handle_skywalking_request(
_: SocketAddr,
_: Vec<u8>,
_: &str,
_: DebugSender<SkyWalkingExtra>,
) -> Response<Body> {
Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::empty())
.unwrap()
}
31 changes: 31 additions & 0 deletions agent/src/integration_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ use crate::{
policy::PolicyGetter,
};

use integration_skywalking::{handle_skywalking_request, SkyWalkingExtra};
use public::{
counter::{Counter, CounterType, CounterValue, OwnedCountable},
enums::{CaptureNetworkType, EthernetType, L4Protocol},
Expand Down Expand Up @@ -595,6 +596,7 @@ async fn handler(
telegraf_sender: DebugSender<TelegrafMetric>,
profile_sender: DebugSender<Profile>,
application_log_sender: DebugSender<ApplicationLog>,
skywalking_sender: DebugSender<SkyWalkingExtra>,
exception_handler: ExceptionHandler,
compressed: bool,
profile_compressed: bool,
Expand Down Expand Up @@ -825,6 +827,28 @@ async fn handler(

Ok(Response::builder().body(Body::empty()).unwrap())
}
(
&Method::POST,
"/v3/segments"
| "/skywalking.v3.TraceSegmentReportService/collect"
| "/skywalking.v3.TraceSegmentReportService/collectInSync",
) => {
if external_trace_integration_disabled {
return Ok(Response::builder().body(Body::empty()).unwrap());
}
let (part, body) = req.into_parts();
let whole_body = match aggregate_with_catch_exception(body, &exception_handler).await {
Ok(b) => b,
Err(e) => {
return Ok(e);
}
};
let data = decode_metric(whole_body, &part.headers)?;
Ok(
handle_skywalking_request(peer_addr, data, part.uri.path(), skywalking_sender)
.await,
)
}
// Return the 404 Not Found for other routes.
_ => Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
Expand Down Expand Up @@ -925,6 +949,7 @@ pub struct MetricServer {
telegraf_sender: DebugSender<TelegrafMetric>,
profile_sender: DebugSender<Profile>,
application_log_sender: DebugSender<ApplicationLog>,
skywalking_sender: DebugSender<SkyWalkingExtra>,
port: Arc<AtomicU16>,
exception_handler: ExceptionHandler,
server_shutdown_tx: Mutex<Option<mpsc::Sender<()>>>,
Expand Down Expand Up @@ -952,6 +977,7 @@ impl MetricServer {
telegraf_sender: DebugSender<TelegrafMetric>,
profile_sender: DebugSender<Profile>,
application_log_sender: DebugSender<ApplicationLog>,
skywalking_sender: DebugSender<SkyWalkingExtra>,
port: u16,
exception_handler: ExceptionHandler,
compressed: bool,
Expand Down Expand Up @@ -980,6 +1006,7 @@ impl MetricServer {
telegraf_sender,
profile_sender,
application_log_sender,
skywalking_sender,
port: Arc::new(AtomicU16::new(port)),
exception_handler,
server_shutdown_tx: Default::default(),
Expand Down Expand Up @@ -1029,6 +1056,7 @@ impl MetricServer {
let telegraf_sender = self.telegraf_sender.clone();
let profile_sender = self.profile_sender.clone();
let application_log_sender = self.application_log_sender.clone();
let skywalking_sender = self.skywalking_sender.clone();
let port = self.port.clone();
let monitor_port = Arc::new(AtomicU16::new(port.load(Ordering::Acquire)));
let (mon_tx, mon_rx) = oneshot::channel();
Expand Down Expand Up @@ -1101,6 +1129,7 @@ impl MetricServer {
let telegraf_sender = telegraf_sender.clone();
let profile_sender = profile_sender.clone();
let application_log_sender = application_log_sender.clone();
let skywalking_sender = skywalking_sender.clone();
let exception_handler_inner = exception_handler.clone();
let counter = counter.clone();
let compressed = compressed.clone();
Expand All @@ -1118,6 +1147,7 @@ impl MetricServer {
let telegraf_sender = telegraf_sender.clone();
let profile_sender = profile_sender.clone();
let application_log_sender = application_log_sender.clone();
let skywalking_sender = skywalking_sender.clone();
let exception_handler = exception_handler_inner.clone();
let peer_addr = conn.remote_addr();
let counter = counter.clone();
Expand All @@ -1141,6 +1171,7 @@ impl MetricServer {
telegraf_sender.clone(),
profile_sender.clone(),
application_log_sender.clone(),
skywalking_sender.clone(),
exception_handler.clone(),
compressed.load(Ordering::Relaxed),
profile_compressed.load(Ordering::Relaxed),
Expand Down
34 changes: 34 additions & 0 deletions agent/src/trident.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ use crate::{
utils::environment::{IN_CONTAINER, K8S_WATCH_POLICY},
};

use integration_skywalking::SkyWalkingExtra;
use packet_sequence_block::BoxedPacketSequenceBlock;
use pcap_assembler::{BoxedPcapBatch, PcapAssembler};

Expand Down Expand Up @@ -1557,6 +1558,7 @@ pub struct AgentComponents {
pub packet_sequence_uniform_sender: UniformSenderThread<BoxedPacketSequenceBlock>, // Enterprise Edition Feature: packet-sequence
pub proc_event_uniform_sender: UniformSenderThread<BoxedProcEvents>,
pub application_log_uniform_sender: UniformSenderThread<ApplicationLog>,
pub skywalking_uniform_sender: UniformSenderThread<SkyWalkingExtra>,
pub exception_handler: ExceptionHandler,
pub proto_log_sender: DebugSender<BoxAppProtoLogsData>,
pub pcap_batch_sender: DebugSender<BoxedPcapBatch>,
Expand Down Expand Up @@ -2430,6 +2432,32 @@ impl AgentComponents {
None,
);

let skywalking_queue_name = "1-skywalking-to-sender";
let (skywalking_sender, skywalking_receiver, counter) = queue::bounded_with_debug(
user_config
.processors
.flow_log
.tunning
.flow_aggregator_queue_size,
skywalking_queue_name,
&queue_debugger,
);
stats_collector.register_countable(
&QueueStats {
module: skywalking_queue_name,
..Default::default()
},
Countable::Owned(Box::new(counter)),
);
let skywalking_uniform_sender = UniformSenderThread::new(
skywalking_queue_name,
Arc::new(skywalking_receiver),
config_handler.sender(),
stats_collector.clone(),
exception_handler.clone(),
None,
);

let ebpf_dispatcher_id = dispatcher_components.len();
#[cfg(any(target_os = "linux", target_os = "android"))]
let mut ebpf_dispatcher_component = None;
Expand Down Expand Up @@ -2672,6 +2700,7 @@ impl AgentComponents {
telegraf_sender,
profile_sender,
application_log_sender,
skywalking_sender,
candidate_config.metric_server.port,
exception_handler.clone(),
candidate_config.metric_server.compressed,
Expand Down Expand Up @@ -2756,6 +2785,7 @@ impl AgentComponents {
profile_uniform_sender,
proc_event_uniform_sender,
application_log_uniform_sender,
skywalking_uniform_sender,
capture_mode: candidate_config.capture_mode,
packet_sequence_uniform_output, // Enterprise Edition Feature: packet-sequence
packet_sequence_uniform_sender, // Enterprise Edition Feature: packet-sequence
Expand Down Expand Up @@ -2844,6 +2874,7 @@ impl AgentComponents {
self.profile_uniform_sender.start();
self.proc_event_uniform_sender.start();
self.application_log_uniform_sender.start();
self.skywalking_uniform_sender.start();
if self.config.metric_server.enabled {
self.metrics_server_component.start();
}
Expand Down Expand Up @@ -2916,6 +2947,9 @@ impl AgentComponents {
if let Some(h) = self.application_log_uniform_sender.notify_stop() {
join_handles.push(h);
}
if let Some(h) = self.skywalking_uniform_sender.notify_stop() {
join_handles.push(h);
}
// Enterprise Edition Feature: packet-sequence
if let Some(h) = self.packet_sequence_uniform_sender.notify_stop() {
join_handles.push(h);
Expand Down
4 changes: 4 additions & 0 deletions message/flow_log.proto
Original file line number Diff line number Diff line change
Expand Up @@ -296,3 +296,7 @@ message MqttTopic {
int32 qos = 2; // -1 mean not exist qos
}

message SkyWalkingExtra {
bytes data = 1;
bytes peer_ip = 2;
}
1 change: 1 addition & 0 deletions server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ replace (
github.com/deepflowio/deepflow/server/querier/app/prometheus/service/packet_wrapper => ./querier/app/prometheus/service/packet_wrapper
github.com/deepflowio/deepflow/server/querier/app/tracing-adapter/service/packet_service => ./querier/app/tracing-adapter/service/packet_service
github.com/deepflowio/deepflow/server/querier/engine/clickhouse/packet_batch => ./querier/engine/clickhouse/packet_batch
github.com/deepflowio/deepflow/server/ingester/flow_log/log_data/sw_import => ./ingester/flow_log/log_data/sw_import
github.com/ionos-cloud/sdk-go/v6 => github.com/ionos-cloud/sdk-go/v6 v6.1.0
)

Expand Down
8 changes: 6 additions & 2 deletions server/ingester/flow_log/log_data/l7_flow_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ func base64ToHexString(str string) string {
// when the traceId-index data is stored in CK, the generated minmax index will have min non-zero, which improves the filtering performance of the minmax index
var lastTraceIdIndex uint64

func parseTraceIdIndex(traceId string, traceIdIndexCfg *config.TraceIdWithIndex) uint64 {
func ParseTraceIdIndex(traceId string, traceIdIndexCfg *config.TraceIdWithIndex) uint64 {
if traceIdIndexCfg.Disabled {
return 0
}
Expand Down Expand Up @@ -474,7 +474,7 @@ func (h *L7FlowLog) fillL7FlowLog(l *pb.AppProtoLogsData, cfg *flowlogCfg.Config
h.TraceId = l.TraceInfo.TraceId
h.ParentSpanId = l.TraceInfo.ParentSpanId
}
h.TraceIdIndex = parseTraceIdIndex(h.TraceId, &cfg.Base.TraceIdWithIndex)
h.TraceIdIndex = ParseTraceIdIndex(h.TraceId, &cfg.Base.TraceIdWithIndex)

// 处理内置协议特殊情况
switch datatype.L7Protocol(h.L7Protocol) {
Expand Down Expand Up @@ -548,6 +548,10 @@ func (h *L7FlowLog) ID() uint64 {
return h._id
}

func (h *L7FlowLog) SetID(id uint64) {
h._id = id
}

func (b *L7Base) Fill(log *pb.AppProtoLogsData, platformData *grpc.PlatformInfoTable) {
l := log.Base
// 网络层
Expand Down
10 changes: 5 additions & 5 deletions server/ingester/flow_log/log_data/otel_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func spanStatusToResponseStatus(status *v1.Status) datatype.LogMessageStatus {
return datatype.STATUS_NOT_EXIST
}

func httpCodeToResponseStatus(code int32) datatype.LogMessageStatus {
func HttpCodeToResponseStatus(code int32) datatype.LogMessageStatus {
if code >= 400 && code <= 499 {
return datatype.STATUS_CLIENT_ERROR
} else if code >= 500 && code <= 600 {
Expand Down Expand Up @@ -197,7 +197,7 @@ func (h *L7FlowLog) fillAttributes(spanAttributes, resAttributes []*v11.KeyValue
}
case "sw8.trace_id":
h.TraceId = getValueString(value)
h.TraceIdIndex = parseTraceIdIndex(h.TraceId, &cfg.Base.TraceIdWithIndex)
h.TraceIdIndex = ParseTraceIdIndex(h.TraceId, &cfg.Base.TraceIdWithIndex)
}

} else {
Expand Down Expand Up @@ -297,7 +297,7 @@ func (h *L7FlowLog) fillAttributes(spanAttributes, resAttributes []*v11.KeyValue
// If http.target exists, read it for RequestResource. If not exist, read the part after the domain name from http.url.
// eg. http.url = http://nacos:8848/nacos/v1/ns/instance/list, mapped to request_resource is /nacos/v1/ns/instance/list
if h.RequestResource == "" && httpURL != "" {
parsedURLPath, err := parseUrlPath(httpURL)
parsedURLPath, err := ParseUrlPath(httpURL)
if err != nil {
log.Debugf("http.url (%s) parse failed: %s", httpURL, err)
} else {
Expand Down Expand Up @@ -340,7 +340,7 @@ func (h *L7FlowLog) FillOTel(l *v1.Span, resAttributes []*v11.KeyValue, platform
h.TapPortType = datatype.TAPPORT_FROM_OTEL
h.SignalSource = uint16(datatype.SIGNAL_SOURCE_OTEL)
h.TraceId = hex.EncodeToString(l.TraceId)
h.TraceIdIndex = parseTraceIdIndex(h.TraceId, &cfg.Base.TraceIdWithIndex)
h.TraceIdIndex = ParseTraceIdIndex(h.TraceId, &cfg.Base.TraceIdWithIndex)
h.SpanId = hex.EncodeToString(l.SpanId)
h.ParentSpanId = hex.EncodeToString(l.ParentSpanId)
h.TapSideEnum = uint8(spanKindToTapSide(l.Kind))
Expand All @@ -362,7 +362,7 @@ func (h *L7FlowLog) FillOTel(l *v1.Span, resAttributes []*v11.KeyValue, platform
h.fillAttributes(l.GetAttributes(), resAttributes, l.GetLinks(), cfg)
// 优先匹配http的响应码
if h.responseCode != 0 {
h.ResponseStatus = uint8(httpCodeToResponseStatus(h.responseCode))
h.ResponseStatus = uint8(HttpCodeToResponseStatus(h.responseCode))
if h.ResponseStatus == uint8(datatype.STATUS_CLIENT_ERROR) ||
h.ResponseStatus == uint8(datatype.STATUS_SERVER_ERROR) {
h.ResponseException = GetHTTPExceptionDesc(uint16(h.responseCode))
Expand Down
3 changes: 3 additions & 0 deletions server/ingester/flow_log/log_data/sw_import/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module github.com/deepflowio/deepflow/server/ingester/flow_log/log_data/sw_import

go 1.18
11 changes: 11 additions & 0 deletions server/ingester/flow_log/log_data/sw_import/sw_import.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package sw_import

import (
flowlogCfg "github.com/deepflowio/deepflow/server/ingester/flow_log/config"
"github.com/deepflowio/deepflow/server/ingester/flow_log/log_data"
"github.com/deepflowio/deepflow/server/libs/grpc"
)

func SkyWalkingDataToL7FlowLogs(vtapID, orgId, teamId uint16, segmentData, peerIP []byte, platformData *grpc.PlatformInfoTable, cfg *flowlogCfg.Config) []*log_data.L7FlowLog {
return []*log_data.L7FlowLog{}
}
2 changes: 1 addition & 1 deletion server/ingester/flow_log/log_data/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func IPIntToString(ipInt uint32) string {
}

// eg. url=http://nacos:8848/nacos/v1/ns/instance/list, parse return `/nacos/v1/ns/instance/list`
func parseUrlPath(rawURL string) (string, error) {
func ParseUrlPath(rawURL string) (string, error) {
parts := strings.SplitN(rawURL, "://", 2)
if len(parts) != 2 || parts[1] == "" {
return "", fmt.Errorf("invalid URL format")
Expand Down
Loading

0 comments on commit b73a2a8

Please sign in to comment.