From 6dbe996dce7d7b44f7cbd90b9ca03aaca838c377 Mon Sep 17 00:00:00 2001 From: zhuofeng Date: Sat, 2 Nov 2024 13:48:42 +0800 Subject: [PATCH] perf: improve get pool flow_log item batch --- server/ingester/flow_log/decoder/decoder.go | 130 +++++++++++------- .../ingester/flow_log/log_data/l4_flow_log.go | 77 +++++++---- .../ingester/flow_log/log_data/l7_flow_log.go | 62 ++++++++- .../ingester/flow_log/log_data/otel_import.go | 20 +-- 4 files changed, 197 insertions(+), 92 deletions(-) diff --git a/server/ingester/flow_log/decoder/decoder.go b/server/ingester/flow_log/decoder/decoder.go index 00d790542c85..90bd03346e5f 100644 --- a/server/ingester/flow_log/decoder/decoder.go +++ b/server/ingester/flow_log/decoder/decoder.go @@ -78,22 +78,25 @@ type Counter struct { } type Decoder struct { - index int - msgType datatype.MessageType - dataSourceID uint32 - platformData *grpc.PlatformInfoTable - inQueue queue.QueueReader - throttler *throttler.ThrottlingQueue - flowTagWriter *flow_tag.FlowTagWriter - appServiceTagWriter *flow_tag.AppServiceTagWriter - spanWriter *dbwriter.SpanWriter - spanBuf []interface{} - exporters *exporters.Exporters - cfg *config.Config - debugEnabled bool + index int + msgType datatype.MessageType + dataSourceID uint32 + platformData *grpc.PlatformInfoTable + inQueue queue.QueueReader + throttler *throttler.ThrottlingQueue + flowTagWriter *flow_tag.FlowTagWriter + appServiceTagWriter *flow_tag.AppServiceTagWriter + spanWriter *dbwriter.SpanWriter + spanBuf []interface{} + exporters *exporters.Exporters + cfg *config.Config + debugEnabled bool + maxL4Count, maxL7Count int agentId, orgId, teamId uint16 + protoLogBuf []pb.AppProtoLogsData + taggedFlowBuf []pb.TaggedFlow fieldsBuf []interface{} fieldValuesBuf []interface{} counter *Counter @@ -126,6 +129,8 @@ func NewDecoder( exporters: exporters, cfg: cfg, debugEnabled: log.IsEnabledFor(logging.DEBUG), + protoLogBuf: make([]pb.AppProtoLogsData, 0, 16), + taggedFlowBuf: make([]pb.TaggedFlow, 0, 16), fieldsBuf: make([]interface{}, 0, 64), fieldValuesBuf: make([]interface{}, 0, 64), counter: &Counter{}, @@ -152,7 +157,6 @@ func (d *Decoder) Run() { "msg_type": d.msgType.String()}) buffer := make([]interface{}, BUFFER_SIZE) decoder := &codec.SimpleDecoder{} - pbTaggedFlow := pb.NewTaggedFlow() pbTracesData := &v1.TracesData{} pbSkywalkingData := &pb.SkyWalkingExtra{} for { @@ -176,7 +180,7 @@ func (d *Decoder) Run() { case datatype.MESSAGE_TYPE_PROTOCOLLOG: d.handleProtoLog(decoder) case datatype.MESSAGE_TYPE_TAGGEDFLOW: - d.handleTaggedFlow(decoder, pbTaggedFlow) + d.handleTaggedFlow(decoder) case datatype.MESSAGE_TYPE_OPENTELEMETRY: d.handleOpenTelemetry(decoder, pbTracesData, false) case datatype.MESSAGE_TYPE_OPENTELEMETRY_COMPRESSED: @@ -195,10 +199,11 @@ func (d *Decoder) Run() { } } -func (d *Decoder) handleTaggedFlow(decoder *codec.SimpleDecoder, pbTaggedFlow *pb.TaggedFlow) { +func (d *Decoder) handleTaggedFlow(decoder *codec.SimpleDecoder) { + d.taggedFlowBuf = d.taggedFlowBuf[:0] for !decoder.IsEnd() { - pbTaggedFlow.ResetAll() - decoder.ReadPB(pbTaggedFlow) + pbTaggedFlow := pb.TaggedFlow{} + decoder.ReadPB(&pbTaggedFlow) if decoder.Failed() { d.counter.ErrorCount++ log.Errorf("flow decode failed, offset=%d len=%d", decoder.Offset(), len(decoder.Bytes())) @@ -209,23 +214,38 @@ func (d *Decoder) handleTaggedFlow(decoder *codec.SimpleDecoder, pbTaggedFlow *p log.Warningf("invalid flow %s", pbTaggedFlow.Flow) continue } - d.sendFlow(pbTaggedFlow) + d.taggedFlowBuf = append(d.taggedFlowBuf, pbTaggedFlow) } + if d.maxL4Count < len(d.taggedFlowBuf) { + d.maxL4Count = len(d.taggedFlowBuf) + } + if d.counter.RawCount == 1 && time.Now().Unix()%60 <= 10 { + log.Infof("max flow log l4 count=%d", d.maxL4Count) + d.maxL4Count = 0 + } + d.sendFlow() } func (d *Decoder) handleProtoLog(decoder *codec.SimpleDecoder) { + d.protoLogBuf = d.protoLogBuf[:0] for !decoder.IsEnd() { - protoLog := pb.AcquirePbAppProtoLogsData() - - decoder.ReadPB(protoLog) + protoLog := pb.AppProtoLogsData{} + decoder.ReadPB(&protoLog) if decoder.Failed() || !protoLog.IsValid() { d.counter.ErrorCount++ - pb.ReleasePbAppProtoLogsData(protoLog) log.Errorf("proto log decode failed, offset=%d len=%d", decoder.Offset(), len(decoder.Bytes())) return } - d.sendProto(protoLog) + d.protoLogBuf = append(d.protoLogBuf, protoLog) } + if d.maxL7Count < len(d.protoLogBuf) { + d.maxL7Count = len(d.protoLogBuf) + } + if d.counter.RawCount == 1 && time.Now().Unix()%60 <= 10 { + log.Infof("max flow log l7 count=%d", d.maxL7Count) + d.maxL7Count = 0 + } + d.sendProto() } func decompressOpenTelemetry(compressed []byte) ([]byte, error) { @@ -349,24 +369,27 @@ func (d *Decoder) handleL4Packet(decoder *codec.SimpleDecoder) { } } -func (d *Decoder) sendFlow(flow *pb.TaggedFlow) { +func (d *Decoder) sendFlow() { if d.debugEnabled { - log.Debugf("decoder %d recv flow: %s", d.index, flow) + log.Debugf("decoder %d recv flow: %s", d.index, d.taggedFlowBuf[0]) } d.counter.Count++ - l := log_data.TaggedFlowToL4FlowLog(d.orgId, d.teamId, flow, d.platformData) + ls := log_data.TaggedFlowsToL4FlowLogs(d.orgId, d.teamId, d.taggedFlowBuf, d.platformData) - if l.HitPcapPolicy() { - d.export(l) - d.throttler.SendWithoutThrottling(l) - } else { - l.AddReferenceCount() - if !d.throttler.SendWithThrottling(l) { - d.counter.DropCount++ - } else { + for i := range ls { + l := &ls[i] + if l.HitPcapPolicy() { d.export(l) + d.throttler.SendWithoutThrottling(l) + } else { + l.AddReferenceCount() + if !d.throttler.SendWithThrottling(l) { + d.counter.DropCount++ + } else { + d.export(l) + } + l.Release() } - l.Release() } } @@ -411,27 +434,30 @@ func (d *Decoder) appServiceTagWrite(l *log_data.L7FlowLog) { d.appServiceTagWriter.Write(l.Time, flowlogcommon.L7_FLOW_ID.String(), l.AppService, l.AppInstance, l.OrgId, l.TeamID) } -func (d *Decoder) sendProto(proto *pb.AppProtoLogsData) { +func (d *Decoder) sendProto() { if d.debugEnabled { - log.Debugf("decoder %d recv proto: %s", d.index, proto) + log.Debugf("decoder %d recv proto: %s", d.index, d.protoLogBuf[0]) } - l := log_data.ProtoLogToL7FlowLog(d.orgId, d.teamId, proto, d.platformData, d.cfg) - l.AddReferenceCount() - sent := d.throttler.SendWithThrottling(l) - if sent { - if d.flowTagWriter != nil { - d.fieldsBuf, d.fieldValuesBuf = d.fieldsBuf[:0], d.fieldValuesBuf[:0] - l.GenerateNewFlowTags(d.flowTagWriter.Cache) - d.flowTagWriter.WriteFieldsAndFieldValuesInCache() + ls := log_data.ProtoLogsToL7FlowLogs(d.orgId, d.teamId, d.protoLogBuf, d.platformData, d.cfg) + for i := range ls { + l := &ls[i] + l.AddReferenceCount() + sent := d.throttler.SendWithThrottling(l) + if sent { + if d.flowTagWriter != nil { + d.fieldsBuf, d.fieldValuesBuf = d.fieldsBuf[:0], d.fieldValuesBuf[:0] + l.GenerateNewFlowTags(d.flowTagWriter.Cache) + d.flowTagWriter.WriteFieldsAndFieldValuesInCache() + } + d.appServiceTagWrite(l) + d.export(l) + d.spanWrite(l) } - d.appServiceTagWrite(l) - d.export(l) - d.spanWrite(l) + d.updateCounter(datatype.L7Protocol(l.L7Protocol), !sent) + l.Release() } - d.updateCounter(datatype.L7Protocol(proto.Base.Head.Proto), !sent) - l.Release() - proto.Release() + d.protoLogBuf = d.protoLogBuf[:0] } diff --git a/server/ingester/flow_log/log_data/l4_flow_log.go b/server/ingester/flow_log/log_data/l4_flow_log.go index b823998c70e0..9a31da6a18ff 100644 --- a/server/ingester/flow_log/log_data/l4_flow_log.go +++ b/server/ingester/flow_log/log_data/l4_flow_log.go @@ -41,6 +41,7 @@ const ( ) type L4FlowLog struct { + BelongingBlock []L4FlowLog pool.ReferenceCount _id uint64 `json:"_id" category:"$tag" sub:"flow_info"` // 用来标记全局(多节点)唯一的记录 @@ -1065,15 +1066,31 @@ func (f *L4FlowLog) HitPcapPolicy() bool { return len(f.AclGids) > 0 } -var poolL4FlowLog = pool.NewLockFreePool(func() *L4FlowLog { - l := new(L4FlowLog) - return l -}) - -func AcquireL4FlowLog() *L4FlowLog { - l := poolL4FlowLog.Get() - l.ReferenceCount.Reset() - return l +var poolL4FlowLogBatch = pool.NewLockFreePool(func() []L4FlowLog { + ls := make([]L4FlowLog, 8) + for i := range ls { + ls[i].BelongingBlock = ls + } + return ls +}, + pool.OptionPoolSizePerCPU(32), + pool.OptionInitFullPoolSize(32)) + +func AcquireL4FlowLogBatch(size int) []L4FlowLog { + ls := poolL4FlowLogBatch.Get() + if cap(ls) >= size { + ls = ls[:size] + } else { + poolL4FlowLogBatch.Put(ls) + s := make([]L4FlowLog, size) + ls = s + } + for i := range ls { + l := ls[i] + l.BelongingBlock = ls + l.ReferenceCount.Reset() + } + return ls } func ReleaseL4FlowLog(l *L4FlowLog) { @@ -1084,7 +1101,12 @@ func ReleaseL4FlowLog(l *L4FlowLog) { return } *l = L4FlowLog{} - poolL4FlowLog.Put(l) + for i := range l.BelongingBlock { + if (l.BelongingBlock)[i].GetReferenceCount() > 0 { + return + } + } + poolL4FlowLogBatch.Put(l.BelongingBlock) } var L4FlowCounter uint32 @@ -1095,20 +1117,23 @@ func genID(time uint32, counter *uint32, analyzerID uint32) uint64 { return uint64(time)<<32 | uint64(analyzerID&0x3ff)<<22 | (uint64(count) & 0x3fffff) } -func TaggedFlowToL4FlowLog(orgId, teamId uint16, f *pb.TaggedFlow, platformData *grpc.PlatformInfoTable) *L4FlowLog { - isIPV6 := f.Flow.EthType == uint32(layers.EthernetTypeIPv6) - - s := AcquireL4FlowLog() - s.OrgId, s.TeamID = orgId, teamId - s._id = genID(uint32(f.Flow.EndTime/uint64(time.Second)), &L4FlowCounter, platformData.QueryAnalyzerID()) - s.DataLinkLayer.Fill(f.Flow) - s.NetworkLayer.Fill(f.Flow, isIPV6) - s.TransportLayer.Fill(f.Flow) - s.ApplicationLayer.Fill(f.Flow) - s.Internet.Fill(f.Flow) - s.KnowledgeGraph.FillL4(f.Flow, isIPV6, platformData) - s.FlowInfo.Fill(f.Flow) - s.Metrics.Fill(f.Flow) - - return s +func TaggedFlowsToL4FlowLogs(orgId, teamId uint16, fs []pb.TaggedFlow, platformData *grpc.PlatformInfoTable) []L4FlowLog { + ls := AcquireL4FlowLogBatch(len(fs)) + for i := range fs { + f := &fs[i] + s := &ls[i] + isIPV6 := f.Flow.EthType == uint32(layers.EthernetTypeIPv6) + s.OrgId, s.TeamID = orgId, teamId + s._id = genID(uint32(f.Flow.EndTime/uint64(time.Second)), &L4FlowCounter, platformData.QueryAnalyzerID()) + s.DataLinkLayer.Fill(f.Flow) + s.NetworkLayer.Fill(f.Flow, isIPV6) + s.TransportLayer.Fill(f.Flow) + s.ApplicationLayer.Fill(f.Flow) + s.Internet.Fill(f.Flow) + s.KnowledgeGraph.FillL4(f.Flow, isIPV6, platformData) + s.FlowInfo.Fill(f.Flow) + s.Metrics.Fill(f.Flow) + } + + return ls } diff --git a/server/ingester/flow_log/log_data/l7_flow_log.go b/server/ingester/flow_log/log_data/l7_flow_log.go index 37891c241991..a33fd12c1e1a 100644 --- a/server/ingester/flow_log/log_data/l7_flow_log.go +++ b/server/ingester/flow_log/log_data/l7_flow_log.go @@ -192,6 +192,7 @@ func (f *L7Base) WriteBlock(block *ckdb.Block) { } type L7FlowLog struct { + BelongingBlock []L7FlowLog pool.ReferenceCount _id uint64 `json:"_id" category:"$tag" sub:"flow_info"` @@ -638,6 +639,51 @@ func (k *KnowledgeGraph) FillL7(l *pb.AppProtoLogsBaseInfo, platformData *grpc.P ) } +var poolL7FlowLogBatch = pool.NewLockFreePool(func() []L7FlowLog { + ls := make([]L7FlowLog, 8) + for i := range ls { + ls[i].BelongingBlock = ls + } + return ls +}, + pool.OptionPoolSizePerCPU(32), + pool.OptionInitFullPoolSize(32)) + +func AcquireL7FlowLogBatch(size int) []L7FlowLog { + ls := poolL7FlowLogBatch.Get() + if cap(ls) >= size { + ls = ls[:size] + } else { + poolL7FlowLogBatch.Put(ls) + s := make([]L7FlowLog, size) + ls = s + } + for i := range ls { + l := ls[i] + l.BelongingBlock = ls + l.ReferenceCount.Reset() + } + return ls +} + +func ReleaseL7FlowLog(l *L7FlowLog) { + if l == nil { + return + } + if l.SubReferenceCount() { + return + } + *l = L7FlowLog{} + + for i := len(l.BelongingBlock) - 1; i >= 0; i-- { + if l.BelongingBlock[i].GetReferenceCount() > 0 { + return + } + } + poolL7FlowLogBatch.Put(l.BelongingBlock) +} + +/* var poolL7FlowLog = pool.NewLockFreePool(func() *L7FlowLog { return new(L7FlowLog) }) @@ -658,15 +704,19 @@ func ReleaseL7FlowLog(l *L7FlowLog) { *l = L7FlowLog{} poolL7FlowLog.Put(l) } +*/ var L7FlowLogCounter uint32 -func ProtoLogToL7FlowLog(orgId, teamId uint16, l *pb.AppProtoLogsData, platformData *grpc.PlatformInfoTable, cfg *flowlogCfg.Config) *L7FlowLog { - h := AcquireL7FlowLog() - h.OrgId, h.TeamID = orgId, teamId - h._id = genID(uint32(l.Base.EndTime/uint64(time.Second)), &L7FlowLogCounter, platformData.QueryAnalyzerID()) - h.Fill(l, platformData, cfg) - return h +func ProtoLogsToL7FlowLogs(orgId, teamId uint16, ls []pb.AppProtoLogsData, platformData *grpc.PlatformInfoTable, cfg *flowlogCfg.Config) []L7FlowLog { + hs := AcquireL7FlowLogBatch(len(ls)) + for i := range ls { + h := &hs[i] + h.OrgId, h.TeamID = orgId, teamId + h._id = genID(uint32(ls[i].Base.EndTime/uint64(time.Second)), &L7FlowLogCounter, platformData.QueryAnalyzerID()) + h.Fill(&ls[i], platformData, cfg) + } + return hs } var extraFieldNamesNeedWriteFlowTag = [3]string{"app_service", "endpoint", "app_instance"} diff --git a/server/ingester/flow_log/log_data/otel_import.go b/server/ingester/flow_log/log_data/otel_import.go index 53ea1740f283..1408581fea21 100644 --- a/server/ingester/flow_log/log_data/otel_import.go +++ b/server/ingester/flow_log/log_data/otel_import.go @@ -45,20 +45,24 @@ func OTelTracesDataToL7FlowLogs(vtapID, orgId, teamId uint16, l *v1.TracesData, resAttributes = resource.Attributes } for _, scopeSpan := range resourceSpan.GetScopeSpans() { - for _, span := range scopeSpan.GetSpans() { - ret = append(ret, spanToL7FlowLog(vtapID, orgId, teamId, span, resAttributes, platformData, cfg)) + logs := spansToL7FlowLog(vtapID, orgId, teamId, scopeSpan.GetSpans(), resAttributes, platformData, cfg) + for i := range logs { + ret = append(ret, &logs[i]) } } } return ret } -func spanToL7FlowLog(vtapID, orgId, teamId uint16, span *v1.Span, resAttributes []*v11.KeyValue, platformData *grpc.PlatformInfoTable, cfg *flowlogCfg.Config) *L7FlowLog { - h := AcquireL7FlowLog() - h._id = genID(uint32(span.EndTimeUnixNano/uint64(time.Second)), &L7FlowLogCounter, platformData.QueryAnalyzerID()) - h.VtapID, h.OrgId, h.TeamID = vtapID, orgId, teamId - h.FillOTel(span, resAttributes, platformData, cfg) - return h +func spansToL7FlowLog(vtapID, orgId, teamId uint16, spans []*v1.Span, resAttributes []*v11.KeyValue, platformData *grpc.PlatformInfoTable, cfg *flowlogCfg.Config) []L7FlowLog { + hs := AcquireL7FlowLogBatch(len(spans)) + for i := range hs { + h := &hs[i] + h._id = genID(uint32(spans[i].EndTimeUnixNano/uint64(time.Second)), &L7FlowLogCounter, platformData.QueryAnalyzerID()) + h.VtapID, h.OrgId, h.TeamID = vtapID, orgId, teamId + h.FillOTel(spans[i], resAttributes, platformData, cfg) + } + return hs } func spanKindToTapSide(spanKind v1.Span_SpanKind) flow_metrics.TAPSideEnum {