From f06427f17cd3810b6c7b7d541aee7a5ee6e084e5 Mon Sep 17 00:00:00 2001 From: taloric Date: Sat, 12 Oct 2024 13:47:11 +0800 Subject: [PATCH] feat: support skywalking integration --- agent/Cargo.lock | 10 ++++ agent/Cargo.toml | 1 + agent/crates/public/src/sender.rs | 2 + .../plugins/integration_skywalking/Cargo.toml | 9 ++++ .../plugins/integration_skywalking/src/lib.rs | 33 +++++++++++++ agent/src/integration_collector.rs | 31 ++++++++++++ agent/src/trident.rs | 34 +++++++++++++ message/flow_log.proto | 4 ++ server/go.mod | 2 + server/ingester/flow_log/decoder/decoder.go | 49 +++++++++++++++++++ server/ingester/flow_log/flow_log/flow_log.go | 12 +++++ .../ingester/flow_log/log_data/l7_flow_log.go | 8 ++- .../ingester/flow_log/log_data/otel_import.go | 10 ++-- .../flow_log/log_data/sw_import/go.mod | 3 ++ .../flow_log/log_data/sw_import/sw_import.go | 11 +++++ server/ingester/flow_log/log_data/utils.go | 2 +- .../ingester/flow_log/log_data/utils_test.go | 2 +- server/libs/datatype/droplet-message.go | 5 +- 18 files changed, 218 insertions(+), 10 deletions(-) create mode 100644 agent/plugins/integration_skywalking/Cargo.toml create mode 100644 agent/plugins/integration_skywalking/src/lib.rs create mode 100644 server/ingester/flow_log/log_data/sw_import/go.mod create mode 100644 server/ingester/flow_log/log_data/sw_import/sw_import.go diff --git a/agent/Cargo.lock b/agent/Cargo.lock index c1e3e854332d..32cd36bfd297 100644 --- a/agent/Cargo.lock +++ b/agent/Cargo.lock @@ -1009,6 +1009,7 @@ dependencies = [ "http2", "humantime-serde", "hyper 0.14.26", + "integration_skywalking", "ipnet", "ipnetwork", "k8s-openapi", @@ -2008,6 +2009,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "integration_skywalking" +version = "0.1.0" +dependencies = [ + "hyper 0.14.26", + "prost", + "public", +] + [[package]] name = "io-extras" version = "0.18.0" diff --git a/agent/Cargo.toml b/agent/Cargo.toml index 5ba3ef60143f..5b001da7a5f4 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -97,6 +97,7 @@ tonic = "0.8.1" wasmtime = "12.0.1" wasmtime-wasi = "12.0.1" zstd = "0.13.2" +integration_skywalking = { path = "plugins/integration_skywalking" } [target.'cfg(any(target_os = "linux", target_os = "android"))'.dependencies] diff --git a/agent/crates/public/src/sender.rs b/agent/crates/public/src/sender.rs index 41b279c78f77..7c0ff7af6f56 100644 --- a/agent/crates/public/src/sender.rs +++ b/agent/crates/public/src/sender.rs @@ -55,6 +55,7 @@ pub enum SendMessageType { // K8sEvent = 16, ApplicationLog = 17, SyslogDetail = 18, + SkyWalking = 19, } impl fmt::Display for SendMessageType { @@ -78,6 +79,7 @@ impl fmt::Display for SendMessageType { Self::AlarmEvent => write!(f, "alarm_event"), Self::ApplicationLog => write!(f, "application_log"), Self::SyslogDetail => write!(f, "syslog_detail"), + Self::SkyWalking => write!(f, "skywalking"), } } } diff --git a/agent/plugins/integration_skywalking/Cargo.toml b/agent/plugins/integration_skywalking/Cargo.toml new file mode 100644 index 000000000000..ddea66ef4b39 --- /dev/null +++ b/agent/plugins/integration_skywalking/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "integration_skywalking" +version = "0.1.0" +edition = "2021" + +[dependencies] +hyper = { version = "0.14", features = ["full"] } +public = { path = "../../crates/public" } +prost = "0.11" diff --git a/agent/plugins/integration_skywalking/src/lib.rs b/agent/plugins/integration_skywalking/src/lib.rs new file mode 100644 index 000000000000..325828e498fb --- /dev/null +++ b/agent/plugins/integration_skywalking/src/lib.rs @@ -0,0 +1,33 @@ +use hyper::{Body, Response, StatusCode}; +use prost::EncodeError; +use public::{ + proto::flow_log, + queue::DebugSender, + sender::{SendMessageType, Sendable}, +}; +use std::net::SocketAddr; + +#[derive(Debug, PartialEq)] +pub struct SkyWalkingExtra(pub flow_log::SkyWalkingExtra); + +impl Sendable for SkyWalkingExtra { + fn encode(self, _: &mut Vec) -> Result { + return Ok(0); + } + + fn message_type(&self) -> SendMessageType { + SendMessageType::SkyWalking + } +} + +pub async fn handle_skywalking_request( + _: SocketAddr, + _: Vec, + _: &str, + _: DebugSender, +) -> Response { + Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::empty()) + .unwrap() +} diff --git a/agent/src/integration_collector.rs b/agent/src/integration_collector.rs index 62e67c719ae3..f9ac3ed3d6d8 100644 --- a/agent/src/integration_collector.rs +++ b/agent/src/integration_collector.rs @@ -61,6 +61,7 @@ use crate::{ policy::PolicyGetter, }; +use integration_skywalking::{handle_skywalking_request, SkyWalkingExtra}; use public::{ counter::{Counter, CounterType, CounterValue, OwnedCountable}, enums::{CaptureNetworkType, EthernetType, L4Protocol}, @@ -595,6 +596,7 @@ async fn handler( telegraf_sender: DebugSender, profile_sender: DebugSender, application_log_sender: DebugSender, + skywalking_sender: DebugSender, exception_handler: ExceptionHandler, compressed: bool, profile_compressed: bool, @@ -825,6 +827,28 @@ async fn handler( Ok(Response::builder().body(Body::empty()).unwrap()) } + ( + &Method::POST, + "/v3/segments" + | "/skywalking.v3.TraceSegmentReportService/collect" + | "/skywalking.v3.TraceSegmentReportService/collectInSync", + ) => { + if external_trace_integration_disabled { + return Ok(Response::builder().body(Body::empty()).unwrap()); + } + let (part, body) = req.into_parts(); + let whole_body = match aggregate_with_catch_exception(body, &exception_handler).await { + Ok(b) => b, + Err(e) => { + return Ok(e); + } + }; + let data = decode_metric(whole_body, &part.headers)?; + Ok( + handle_skywalking_request(peer_addr, data, part.uri.path(), skywalking_sender) + .await, + ) + } // Return the 404 Not Found for other routes. _ => Ok(Response::builder() .status(StatusCode::NOT_FOUND) @@ -925,6 +949,7 @@ pub struct MetricServer { telegraf_sender: DebugSender, profile_sender: DebugSender, application_log_sender: DebugSender, + skywalking_sender: DebugSender, port: Arc, exception_handler: ExceptionHandler, server_shutdown_tx: Mutex>>, @@ -952,6 +977,7 @@ impl MetricServer { telegraf_sender: DebugSender, profile_sender: DebugSender, application_log_sender: DebugSender, + skywalking_sender: DebugSender, port: u16, exception_handler: ExceptionHandler, compressed: bool, @@ -980,6 +1006,7 @@ impl MetricServer { telegraf_sender, profile_sender, application_log_sender, + skywalking_sender, port: Arc::new(AtomicU16::new(port)), exception_handler, server_shutdown_tx: Default::default(), @@ -1029,6 +1056,7 @@ impl MetricServer { let telegraf_sender = self.telegraf_sender.clone(); let profile_sender = self.profile_sender.clone(); let application_log_sender = self.application_log_sender.clone(); + let skywalking_sender = self.skywalking_sender.clone(); let port = self.port.clone(); let monitor_port = Arc::new(AtomicU16::new(port.load(Ordering::Acquire))); let (mon_tx, mon_rx) = oneshot::channel(); @@ -1101,6 +1129,7 @@ impl MetricServer { let telegraf_sender = telegraf_sender.clone(); let profile_sender = profile_sender.clone(); let application_log_sender = application_log_sender.clone(); + let skywalking_sender = skywalking_sender.clone(); let exception_handler_inner = exception_handler.clone(); let counter = counter.clone(); let compressed = compressed.clone(); @@ -1118,6 +1147,7 @@ impl MetricServer { let telegraf_sender = telegraf_sender.clone(); let profile_sender = profile_sender.clone(); let application_log_sender = application_log_sender.clone(); + let skywalking_sender = skywalking_sender.clone(); let exception_handler = exception_handler_inner.clone(); let peer_addr = conn.remote_addr(); let counter = counter.clone(); @@ -1141,6 +1171,7 @@ impl MetricServer { telegraf_sender.clone(), profile_sender.clone(), application_log_sender.clone(), + skywalking_sender.clone(), exception_handler.clone(), compressed.load(Ordering::Relaxed), profile_compressed.load(Ordering::Relaxed), diff --git a/agent/src/trident.rs b/agent/src/trident.rs index c1346695091b..20fc3a4d70ce 100644 --- a/agent/src/trident.rs +++ b/agent/src/trident.rs @@ -113,6 +113,7 @@ use crate::{ utils::environment::{IN_CONTAINER, K8S_WATCH_POLICY}, }; +use integration_skywalking::SkyWalkingExtra; use packet_sequence_block::BoxedPacketSequenceBlock; use pcap_assembler::{BoxedPcapBatch, PcapAssembler}; @@ -1557,6 +1558,7 @@ pub struct AgentComponents { pub packet_sequence_uniform_sender: UniformSenderThread, // Enterprise Edition Feature: packet-sequence pub proc_event_uniform_sender: UniformSenderThread, pub application_log_uniform_sender: UniformSenderThread, + pub skywalking_uniform_sender: UniformSenderThread, pub exception_handler: ExceptionHandler, pub proto_log_sender: DebugSender, pub pcap_batch_sender: DebugSender, @@ -2430,6 +2432,32 @@ impl AgentComponents { None, ); + let skywalking_queue_name = "1-skywalking-to-sender"; + let (skywalking_sender, skywalking_receiver, counter) = queue::bounded_with_debug( + user_config + .processors + .flow_log + .tunning + .flow_aggregator_queue_size, + skywalking_queue_name, + &queue_debugger, + ); + stats_collector.register_countable( + &QueueStats { + module: skywalking_queue_name, + ..Default::default() + }, + Countable::Owned(Box::new(counter)), + ); + let skywalking_uniform_sender = UniformSenderThread::new( + skywalking_queue_name, + Arc::new(skywalking_receiver), + config_handler.sender(), + stats_collector.clone(), + exception_handler.clone(), + None, + ); + let ebpf_dispatcher_id = dispatcher_components.len(); #[cfg(any(target_os = "linux", target_os = "android"))] let mut ebpf_dispatcher_component = None; @@ -2672,6 +2700,7 @@ impl AgentComponents { telegraf_sender, profile_sender, application_log_sender, + skywalking_sender, candidate_config.metric_server.port, exception_handler.clone(), candidate_config.metric_server.compressed, @@ -2756,6 +2785,7 @@ impl AgentComponents { profile_uniform_sender, proc_event_uniform_sender, application_log_uniform_sender, + skywalking_uniform_sender, capture_mode: candidate_config.capture_mode, packet_sequence_uniform_output, // Enterprise Edition Feature: packet-sequence packet_sequence_uniform_sender, // Enterprise Edition Feature: packet-sequence @@ -2844,6 +2874,7 @@ impl AgentComponents { self.profile_uniform_sender.start(); self.proc_event_uniform_sender.start(); self.application_log_uniform_sender.start(); + self.skywalking_uniform_sender.start(); if self.config.metric_server.enabled { self.metrics_server_component.start(); } @@ -2916,6 +2947,9 @@ impl AgentComponents { if let Some(h) = self.application_log_uniform_sender.notify_stop() { join_handles.push(h); } + if let Some(h) = self.skywalking_uniform_sender.notify_stop() { + join_handles.push(h); + } // Enterprise Edition Feature: packet-sequence if let Some(h) = self.packet_sequence_uniform_sender.notify_stop() { join_handles.push(h); diff --git a/message/flow_log.proto b/message/flow_log.proto index a298fcf2da10..77737f71d3e8 100644 --- a/message/flow_log.proto +++ b/message/flow_log.proto @@ -296,3 +296,7 @@ message MqttTopic { int32 qos = 2; // -1 mean not exist qos } +message SkyWalkingExtra { + bytes data = 1; + bytes peer_ip = 2; +} diff --git a/server/go.mod b/server/go.mod index 2ee92a89def4..3363920d7442 100644 --- a/server/go.mod +++ b/server/go.mod @@ -14,6 +14,7 @@ replace ( github.com/deepflowio/deepflow/server/controller/http/service/configuration => ./controller/http/service/configuration github.com/deepflowio/deepflow/server/controller/monitor/license => ./controller/monitor/license github.com/deepflowio/deepflow/server/ingester/config/configdefaults => ./ingester/config/configdefaults + github.com/deepflowio/deepflow/server/ingester/flow_log/log_data/sw_import => ./ingester/flow_log/log_data/sw_import github.com/deepflowio/deepflow/server/libs/logger/blocker => ./libs/logger/blocker github.com/deepflowio/deepflow/server/querier/app/distributed_tracing/service/tracemap => ./querier/app/distributed_tracing/service/tracemap github.com/deepflowio/deepflow/server/querier/app/prometheus/router/packet_adapter => ./querier/app/prometheus/router/packet_adapter @@ -105,6 +106,7 @@ require ( github.com/bytedance/sonic v1.11.8 github.com/deepflowio/deepflow/server/controller/http/appender v0.0.0-00010101000000-000000000000 github.com/deepflowio/deepflow/server/controller/http/service/agentlicense v0.0.0-00010101000000-000000000000 + github.com/deepflowio/deepflow/server/ingester/flow_log/log_data/sw_import v0.0.0-00010101000000-000000000000 github.com/deepflowio/deepflow/server/libs/logger/blocker v0.0.0-20240822020041-cdaf0f82ce6f github.com/deepflowio/deepflow/server/querier/app/distributed_tracing/service/tracemap v0.0.0-00010101000000-000000000000 github.com/deepflowio/deepflow/server/querier/app/prometheus/router/packet_adapter v0.0.0-00010101000000-000000000000 diff --git a/server/ingester/flow_log/decoder/decoder.go b/server/ingester/flow_log/decoder/decoder.go index 7a61e1d319dc..e5e06d4ec2be 100644 --- a/server/ingester/flow_log/decoder/decoder.go +++ b/server/ingester/flow_log/decoder/decoder.go @@ -35,6 +35,7 @@ import ( "github.com/deepflowio/deepflow/server/ingester/flow_log/config" "github.com/deepflowio/deepflow/server/ingester/flow_log/dbwriter" "github.com/deepflowio/deepflow/server/ingester/flow_log/log_data" + "github.com/deepflowio/deepflow/server/ingester/flow_log/log_data/sw_import" "github.com/deepflowio/deepflow/server/ingester/flow_log/throttler" "github.com/deepflowio/deepflow/server/ingester/flow_tag" "github.com/deepflowio/deepflow/server/libs/codec" @@ -153,6 +154,7 @@ func (d *Decoder) Run() { decoder := &codec.SimpleDecoder{} pbTaggedFlow := pb.NewTaggedFlow() pbTracesData := &v1.TracesData{} + pbSkywalkingData := &pb.SkyWalkingExtra{} for { n := d.inQueue.Gets(buffer) start := time.Now() @@ -181,6 +183,8 @@ func (d *Decoder) Run() { d.handleOpenTelemetry(decoder, pbTracesData, true) case datatype.MESSAGE_TYPE_PACKETSEQUENCE: d.handleL4Packet(decoder) + case datatype.MESSAGE_TYPE_SKYWALKING: + d.handleSkyWalking(decoder, pbSkywalkingData, false) default: log.Warningf("unknown msg type: %d", d.msgType) @@ -279,6 +283,51 @@ func (d *Decoder) sendOpenMetetry(tracesData *v1.TracesData) { } } +func (d *Decoder) handleSkyWalking(decoder *codec.SimpleDecoder, pbSkyWalkingData *pb.SkyWalkingExtra, compressed bool) { + var err error + for !decoder.IsEnd() { + pbSkyWalkingData.Reset() + bytes := decoder.ReadBytes() + if len(bytes) > 0 { + // universal compression + if compressed { + bytes, err = decompressOpenTelemetry(bytes) + } + if err == nil { + err = proto.Unmarshal(bytes, pbSkyWalkingData) + } + } + if decoder.Failed() || err != nil { + if d.counter.ErrorCount == 0 { + log.Errorf("skywalking data decode failed, offset=%d len=%d err: %s", decoder.Offset(), len(decoder.Bytes()), err) + } + d.counter.ErrorCount++ + continue + } + d.sendSkyWalking(pbSkyWalkingData.Data, pbSkyWalkingData.PeerIp) + } +} +func (d *Decoder) sendSkyWalking(segmentData, peerIP []byte) { + if d.debugEnabled { + log.Debugf("decoder %d vtap %d recv skywalking data length: %d", d.index, d.agentId, len(segmentData)) + } + d.counter.Count++ + ls := sw_import.SkyWalkingDataToL7FlowLogs(d.agentId, d.orgId, d.teamId, segmentData, peerIP, d.platformData, d.cfg) + for _, l := range ls { + l.AddReferenceCount() + if !d.throttler.SendWithThrottling(l) { + d.counter.DropCount++ + } else { + d.fieldsBuf, d.fieldValuesBuf = d.fieldsBuf[:0], d.fieldValuesBuf[:0] + l.GenerateNewFlowTags(d.flowTagWriter.Cache) + d.flowTagWriter.WriteFieldsAndFieldValuesInCache() + d.appServiceTagWrite(l) + d.spanWrite(l) + } + l.Release() + } +} + func (d *Decoder) handleL4Packet(decoder *codec.SimpleDecoder) { for !decoder.IsEnd() { l4Packet, err := log_data.DecodePacketSequence(d.agentId, d.orgId, d.teamId, decoder) diff --git a/server/ingester/flow_log/flow_log/flow_log.go b/server/ingester/flow_log/flow_log/flow_log.go index 839d78c3af55..902341c43210 100644 --- a/server/ingester/flow_log/flow_log/flow_log.go +++ b/server/ingester/flow_log/flow_log/flow_log.go @@ -54,6 +54,7 @@ type FlowLog struct { OtelLogger *Logger OtelCompressedLogger *Logger L4PacketLogger *Logger + SkyWalkingLogger *Logger Exporters *exporters.Exporters SpanWriter *dbwriter.SpanWriter TraceTreeWriter *dbwriter.TraceTreeWriter @@ -118,6 +119,10 @@ func NewFlowLog(config *config.Config, traceTreeQueue *queue.OverwriteQueue, rec if err != nil { return nil, err } + skywalkingLogger, err := NewLogger(datatype.MESSAGE_TYPE_SKYWALKING, config, platformDataManager, manager, recv, flowLogWriter, common.L7_FLOW_ID, nil, spanWriter) + if err != nil { + return nil, err + } return &FlowLog{ FlowLogConfig: config, L4FlowLogger: l4FlowLogger, @@ -125,6 +130,7 @@ func NewFlowLog(config *config.Config, traceTreeQueue *queue.OverwriteQueue, rec OtelLogger: otelLogger, OtelCompressedLogger: otelCompressedLogger, L4PacketLogger: l4PacketLogger, + SkyWalkingLogger: skywalkingLogger, Exporters: exporters, SpanWriter: spanWriter, TraceTreeWriter: traceTreeWriter, @@ -356,6 +362,9 @@ func (s *FlowLog) Start() { if s.OtelCompressedLogger != nil { s.OtelCompressedLogger.Start() } + if s.SkyWalkingLogger != nil { + s.SkyWalkingLogger.Start() + } if s.SpanWriter != nil { s.SpanWriter.Start() } @@ -380,6 +389,9 @@ func (s *FlowLog) Close() error { if s.OtelCompressedLogger != nil { s.OtelCompressedLogger.Close() } + if s.SkyWalkingLogger != nil { + s.SkyWalkingLogger.Close() + } if s.SpanWriter != nil { s.SpanWriter.Close() } diff --git a/server/ingester/flow_log/log_data/l7_flow_log.go b/server/ingester/flow_log/log_data/l7_flow_log.go index aeeb6061f3bb..ab0a7d967127 100644 --- a/server/ingester/flow_log/log_data/l7_flow_log.go +++ b/server/ingester/flow_log/log_data/l7_flow_log.go @@ -369,7 +369,7 @@ func base64ToHexString(str string) string { // when the traceId-index data is stored in CK, the generated minmax index will have min non-zero, which improves the filtering performance of the minmax index var lastTraceIdIndex uint64 -func parseTraceIdIndex(traceId string, traceIdIndexCfg *config.TraceIdWithIndex) uint64 { +func ParseTraceIdIndex(traceId string, traceIdIndexCfg *config.TraceIdWithIndex) uint64 { if traceIdIndexCfg.Disabled { return 0 } @@ -474,7 +474,7 @@ func (h *L7FlowLog) fillL7FlowLog(l *pb.AppProtoLogsData, cfg *flowlogCfg.Config h.TraceId = l.TraceInfo.TraceId h.ParentSpanId = l.TraceInfo.ParentSpanId } - h.TraceIdIndex = parseTraceIdIndex(h.TraceId, &cfg.Base.TraceIdWithIndex) + h.TraceIdIndex = ParseTraceIdIndex(h.TraceId, &cfg.Base.TraceIdWithIndex) // 处理内置协议特殊情况 switch datatype.L7Protocol(h.L7Protocol) { @@ -548,6 +548,10 @@ func (h *L7FlowLog) ID() uint64 { return h._id } +func (h *L7FlowLog) SetID(id uint64) { + h._id = id +} + func (b *L7Base) Fill(log *pb.AppProtoLogsData, platformData *grpc.PlatformInfoTable) { l := log.Base // 网络层 diff --git a/server/ingester/flow_log/log_data/otel_import.go b/server/ingester/flow_log/log_data/otel_import.go index 71b0eda7eaf9..53ea1740f283 100644 --- a/server/ingester/flow_log/log_data/otel_import.go +++ b/server/ingester/flow_log/log_data/otel_import.go @@ -87,7 +87,7 @@ func spanStatusToResponseStatus(status *v1.Status) datatype.LogMessageStatus { return datatype.STATUS_NOT_EXIST } -func httpCodeToResponseStatus(code int32) datatype.LogMessageStatus { +func HttpCodeToResponseStatus(code int32) datatype.LogMessageStatus { if code >= 400 && code <= 499 { return datatype.STATUS_CLIENT_ERROR } else if code >= 500 && code <= 600 { @@ -197,7 +197,7 @@ func (h *L7FlowLog) fillAttributes(spanAttributes, resAttributes []*v11.KeyValue } case "sw8.trace_id": h.TraceId = getValueString(value) - h.TraceIdIndex = parseTraceIdIndex(h.TraceId, &cfg.Base.TraceIdWithIndex) + h.TraceIdIndex = ParseTraceIdIndex(h.TraceId, &cfg.Base.TraceIdWithIndex) } } else { @@ -297,7 +297,7 @@ func (h *L7FlowLog) fillAttributes(spanAttributes, resAttributes []*v11.KeyValue // If http.target exists, read it for RequestResource. If not exist, read the part after the domain name from http.url. // eg. http.url = http://nacos:8848/nacos/v1/ns/instance/list, mapped to request_resource is /nacos/v1/ns/instance/list if h.RequestResource == "" && httpURL != "" { - parsedURLPath, err := parseUrlPath(httpURL) + parsedURLPath, err := ParseUrlPath(httpURL) if err != nil { log.Debugf("http.url (%s) parse failed: %s", httpURL, err) } else { @@ -340,7 +340,7 @@ func (h *L7FlowLog) FillOTel(l *v1.Span, resAttributes []*v11.KeyValue, platform h.TapPortType = datatype.TAPPORT_FROM_OTEL h.SignalSource = uint16(datatype.SIGNAL_SOURCE_OTEL) h.TraceId = hex.EncodeToString(l.TraceId) - h.TraceIdIndex = parseTraceIdIndex(h.TraceId, &cfg.Base.TraceIdWithIndex) + h.TraceIdIndex = ParseTraceIdIndex(h.TraceId, &cfg.Base.TraceIdWithIndex) h.SpanId = hex.EncodeToString(l.SpanId) h.ParentSpanId = hex.EncodeToString(l.ParentSpanId) h.TapSideEnum = uint8(spanKindToTapSide(l.Kind)) @@ -362,7 +362,7 @@ func (h *L7FlowLog) FillOTel(l *v1.Span, resAttributes []*v11.KeyValue, platform h.fillAttributes(l.GetAttributes(), resAttributes, l.GetLinks(), cfg) // 优先匹配http的响应码 if h.responseCode != 0 { - h.ResponseStatus = uint8(httpCodeToResponseStatus(h.responseCode)) + h.ResponseStatus = uint8(HttpCodeToResponseStatus(h.responseCode)) if h.ResponseStatus == uint8(datatype.STATUS_CLIENT_ERROR) || h.ResponseStatus == uint8(datatype.STATUS_SERVER_ERROR) { h.ResponseException = GetHTTPExceptionDesc(uint16(h.responseCode)) diff --git a/server/ingester/flow_log/log_data/sw_import/go.mod b/server/ingester/flow_log/log_data/sw_import/go.mod new file mode 100644 index 000000000000..7cd4d709a995 --- /dev/null +++ b/server/ingester/flow_log/log_data/sw_import/go.mod @@ -0,0 +1,3 @@ +module github.com/deepflowio/deepflow/server/ingester/flow_log/log_data/sw_import + +go 1.18 \ No newline at end of file diff --git a/server/ingester/flow_log/log_data/sw_import/sw_import.go b/server/ingester/flow_log/log_data/sw_import/sw_import.go new file mode 100644 index 000000000000..8a113dd3c29a --- /dev/null +++ b/server/ingester/flow_log/log_data/sw_import/sw_import.go @@ -0,0 +1,11 @@ +package sw_import + +import ( + flowlogCfg "github.com/deepflowio/deepflow/server/ingester/flow_log/config" + "github.com/deepflowio/deepflow/server/ingester/flow_log/log_data" + "github.com/deepflowio/deepflow/server/libs/grpc" +) + +func SkyWalkingDataToL7FlowLogs(vtapID, orgId, teamId uint16, segmentData, peerIP []byte, platformData *grpc.PlatformInfoTable, cfg *flowlogCfg.Config) []*log_data.L7FlowLog { + return []*log_data.L7FlowLog{} +} diff --git a/server/ingester/flow_log/log_data/utils.go b/server/ingester/flow_log/log_data/utils.go index 39899ec4edc1..2c5acf6f5c9a 100644 --- a/server/ingester/flow_log/log_data/utils.go +++ b/server/ingester/flow_log/log_data/utils.go @@ -27,7 +27,7 @@ func IPIntToString(ipInt uint32) string { } // eg. url=http://nacos:8848/nacos/v1/ns/instance/list, parse return `/nacos/v1/ns/instance/list` -func parseUrlPath(rawURL string) (string, error) { +func ParseUrlPath(rawURL string) (string, error) { parts := strings.SplitN(rawURL, "://", 2) if len(parts) != 2 || parts[1] == "" { return "", fmt.Errorf("invalid URL format") diff --git a/server/ingester/flow_log/log_data/utils_test.go b/server/ingester/flow_log/log_data/utils_test.go index b9f456cd2518..31fae1e33b42 100644 --- a/server/ingester/flow_log/log_data/utils_test.go +++ b/server/ingester/flow_log/log_data/utils_test.go @@ -37,7 +37,7 @@ func TestParseUrlPath(t *testing.T) { } for _, testCase := range testCases { - result, err := parseUrlPath(testCase.url) + result, err := ParseUrlPath(testCase.url) if (err != nil) != testCase.err { t.Errorf("URL: %s\nExpected error: %v\nGot error: %v\n", testCase.url, testCase.err, err != nil) } diff --git a/server/libs/datatype/droplet-message.go b/server/libs/datatype/droplet-message.go index 70cd12b9e0a3..b529fdf7f1e0 100644 --- a/server/libs/datatype/droplet-message.go +++ b/server/libs/datatype/droplet-message.go @@ -54,7 +54,8 @@ const ( MESSAGE_TYPE_ALERT_EVENT MESSAGE_TYPE_K8S_EVENT MESSAGE_TYPE_APPLICATION_LOG - MESSAGE_TYPE_AGENT_LOG // 18 + MESSAGE_TYPE_AGENT_LOG + MESSAGE_TYPE_SKYWALKING // 19 MESSAGE_TYPE_MAX ) @@ -80,6 +81,7 @@ var MessageTypeString = [MESSAGE_TYPE_MAX]string{ MESSAGE_TYPE_K8S_EVENT: "k8s_event", MESSAGE_TYPE_APPLICATION_LOG: "application_log", MESSAGE_TYPE_AGENT_LOG: "agent_log", + MESSAGE_TYPE_SKYWALKING: "skywalking", } func (m MessageType) String() string { @@ -123,6 +125,7 @@ var MessageHeaderTypes = [MESSAGE_TYPE_MAX]MessageHeaderType{ MESSAGE_TYPE_K8S_EVENT: HEADER_TYPE_LT_VTAP, MESSAGE_TYPE_APPLICATION_LOG: HEADER_TYPE_LT_VTAP, MESSAGE_TYPE_AGENT_LOG: HEADER_TYPE_LT_VTAP, + MESSAGE_TYPE_SKYWALKING: HEADER_TYPE_LT_VTAP, } func (m MessageType) HeaderType() MessageHeaderType {