Skip to content

Commit

Permalink
perf: improve get pool flow_log item batch
Browse files Browse the repository at this point in the history
  • Loading branch information
lzf575 committed Nov 4, 2024
1 parent 48db418 commit 6dbe996
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 92 deletions.
130 changes: 78 additions & 52 deletions server/ingester/flow_log/decoder/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,22 +78,25 @@ type Counter struct {
}

type Decoder struct {
index int
msgType datatype.MessageType
dataSourceID uint32
platformData *grpc.PlatformInfoTable
inQueue queue.QueueReader
throttler *throttler.ThrottlingQueue
flowTagWriter *flow_tag.FlowTagWriter
appServiceTagWriter *flow_tag.AppServiceTagWriter
spanWriter *dbwriter.SpanWriter
spanBuf []interface{}
exporters *exporters.Exporters
cfg *config.Config
debugEnabled bool
index int
msgType datatype.MessageType
dataSourceID uint32
platformData *grpc.PlatformInfoTable
inQueue queue.QueueReader
throttler *throttler.ThrottlingQueue
flowTagWriter *flow_tag.FlowTagWriter
appServiceTagWriter *flow_tag.AppServiceTagWriter
spanWriter *dbwriter.SpanWriter
spanBuf []interface{}
exporters *exporters.Exporters
cfg *config.Config
debugEnabled bool
maxL4Count, maxL7Count int

agentId, orgId, teamId uint16

protoLogBuf []pb.AppProtoLogsData
taggedFlowBuf []pb.TaggedFlow
fieldsBuf []interface{}
fieldValuesBuf []interface{}
counter *Counter
Expand Down Expand Up @@ -126,6 +129,8 @@ func NewDecoder(
exporters: exporters,
cfg: cfg,
debugEnabled: log.IsEnabledFor(logging.DEBUG),
protoLogBuf: make([]pb.AppProtoLogsData, 0, 16),
taggedFlowBuf: make([]pb.TaggedFlow, 0, 16),
fieldsBuf: make([]interface{}, 0, 64),
fieldValuesBuf: make([]interface{}, 0, 64),
counter: &Counter{},
Expand All @@ -152,7 +157,6 @@ func (d *Decoder) Run() {
"msg_type": d.msgType.String()})
buffer := make([]interface{}, BUFFER_SIZE)
decoder := &codec.SimpleDecoder{}
pbTaggedFlow := pb.NewTaggedFlow()
pbTracesData := &v1.TracesData{}
pbSkywalkingData := &pb.SkyWalkingExtra{}
for {
Expand All @@ -176,7 +180,7 @@ func (d *Decoder) Run() {
case datatype.MESSAGE_TYPE_PROTOCOLLOG:
d.handleProtoLog(decoder)
case datatype.MESSAGE_TYPE_TAGGEDFLOW:
d.handleTaggedFlow(decoder, pbTaggedFlow)
d.handleTaggedFlow(decoder)
case datatype.MESSAGE_TYPE_OPENTELEMETRY:
d.handleOpenTelemetry(decoder, pbTracesData, false)
case datatype.MESSAGE_TYPE_OPENTELEMETRY_COMPRESSED:
Expand All @@ -195,10 +199,11 @@ func (d *Decoder) Run() {
}
}

func (d *Decoder) handleTaggedFlow(decoder *codec.SimpleDecoder, pbTaggedFlow *pb.TaggedFlow) {
func (d *Decoder) handleTaggedFlow(decoder *codec.SimpleDecoder) {
d.taggedFlowBuf = d.taggedFlowBuf[:0]
for !decoder.IsEnd() {
pbTaggedFlow.ResetAll()
decoder.ReadPB(pbTaggedFlow)
pbTaggedFlow := pb.TaggedFlow{}
decoder.ReadPB(&pbTaggedFlow)
if decoder.Failed() {
d.counter.ErrorCount++
log.Errorf("flow decode failed, offset=%d len=%d", decoder.Offset(), len(decoder.Bytes()))
Expand All @@ -209,23 +214,38 @@ func (d *Decoder) handleTaggedFlow(decoder *codec.SimpleDecoder, pbTaggedFlow *p
log.Warningf("invalid flow %s", pbTaggedFlow.Flow)
continue
}
d.sendFlow(pbTaggedFlow)
d.taggedFlowBuf = append(d.taggedFlowBuf, pbTaggedFlow)
}
if d.maxL4Count < len(d.taggedFlowBuf) {
d.maxL4Count = len(d.taggedFlowBuf)
}
if d.counter.RawCount == 1 && time.Now().Unix()%60 <= 10 {
log.Infof("max flow log l4 count=%d", d.maxL4Count)
d.maxL4Count = 0
}
d.sendFlow()
}

func (d *Decoder) handleProtoLog(decoder *codec.SimpleDecoder) {
d.protoLogBuf = d.protoLogBuf[:0]
for !decoder.IsEnd() {
protoLog := pb.AcquirePbAppProtoLogsData()

decoder.ReadPB(protoLog)
protoLog := pb.AppProtoLogsData{}
decoder.ReadPB(&protoLog)
if decoder.Failed() || !protoLog.IsValid() {
d.counter.ErrorCount++
pb.ReleasePbAppProtoLogsData(protoLog)
log.Errorf("proto log decode failed, offset=%d len=%d", decoder.Offset(), len(decoder.Bytes()))
return
}
d.sendProto(protoLog)
d.protoLogBuf = append(d.protoLogBuf, protoLog)
}
if d.maxL7Count < len(d.protoLogBuf) {
d.maxL7Count = len(d.protoLogBuf)
}
if d.counter.RawCount == 1 && time.Now().Unix()%60 <= 10 {
log.Infof("max flow log l7 count=%d", d.maxL7Count)
d.maxL7Count = 0
}
d.sendProto()
}

func decompressOpenTelemetry(compressed []byte) ([]byte, error) {
Expand Down Expand Up @@ -349,24 +369,27 @@ func (d *Decoder) handleL4Packet(decoder *codec.SimpleDecoder) {
}
}

func (d *Decoder) sendFlow(flow *pb.TaggedFlow) {
func (d *Decoder) sendFlow() {
if d.debugEnabled {
log.Debugf("decoder %d recv flow: %s", d.index, flow)
log.Debugf("decoder %d recv flow: %s", d.index, d.taggedFlowBuf[0])
}
d.counter.Count++
l := log_data.TaggedFlowToL4FlowLog(d.orgId, d.teamId, flow, d.platformData)
ls := log_data.TaggedFlowsToL4FlowLogs(d.orgId, d.teamId, d.taggedFlowBuf, d.platformData)

if l.HitPcapPolicy() {
d.export(l)
d.throttler.SendWithoutThrottling(l)
} else {
l.AddReferenceCount()
if !d.throttler.SendWithThrottling(l) {
d.counter.DropCount++
} else {
for i := range ls {
l := &ls[i]
if l.HitPcapPolicy() {
d.export(l)
d.throttler.SendWithoutThrottling(l)
} else {
l.AddReferenceCount()
if !d.throttler.SendWithThrottling(l) {
d.counter.DropCount++
} else {
d.export(l)
}
l.Release()
}
l.Release()
}
}

Expand Down Expand Up @@ -411,27 +434,30 @@ func (d *Decoder) appServiceTagWrite(l *log_data.L7FlowLog) {
d.appServiceTagWriter.Write(l.Time, flowlogcommon.L7_FLOW_ID.String(), l.AppService, l.AppInstance, l.OrgId, l.TeamID)
}

func (d *Decoder) sendProto(proto *pb.AppProtoLogsData) {
func (d *Decoder) sendProto() {
if d.debugEnabled {
log.Debugf("decoder %d recv proto: %s", d.index, proto)
log.Debugf("decoder %d recv proto: %s", d.index, d.protoLogBuf[0])
}

l := log_data.ProtoLogToL7FlowLog(d.orgId, d.teamId, proto, d.platformData, d.cfg)
l.AddReferenceCount()
sent := d.throttler.SendWithThrottling(l)
if sent {
if d.flowTagWriter != nil {
d.fieldsBuf, d.fieldValuesBuf = d.fieldsBuf[:0], d.fieldValuesBuf[:0]
l.GenerateNewFlowTags(d.flowTagWriter.Cache)
d.flowTagWriter.WriteFieldsAndFieldValuesInCache()
ls := log_data.ProtoLogsToL7FlowLogs(d.orgId, d.teamId, d.protoLogBuf, d.platformData, d.cfg)
for i := range ls {
l := &ls[i]
l.AddReferenceCount()
sent := d.throttler.SendWithThrottling(l)
if sent {
if d.flowTagWriter != nil {
d.fieldsBuf, d.fieldValuesBuf = d.fieldsBuf[:0], d.fieldValuesBuf[:0]
l.GenerateNewFlowTags(d.flowTagWriter.Cache)
d.flowTagWriter.WriteFieldsAndFieldValuesInCache()
}
d.appServiceTagWrite(l)
d.export(l)
d.spanWrite(l)
}
d.appServiceTagWrite(l)
d.export(l)
d.spanWrite(l)
d.updateCounter(datatype.L7Protocol(l.L7Protocol), !sent)
l.Release()
}
d.updateCounter(datatype.L7Protocol(proto.Base.Head.Proto), !sent)
l.Release()
proto.Release()
d.protoLogBuf = d.protoLogBuf[:0]

}

Expand Down
77 changes: 51 additions & 26 deletions server/ingester/flow_log/log_data/l4_flow_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const (
)

type L4FlowLog struct {
BelongingBlock []L4FlowLog
pool.ReferenceCount
_id uint64 `json:"_id" category:"$tag" sub:"flow_info"` // 用来标记全局(多节点)唯一的记录

Expand Down Expand Up @@ -1065,15 +1066,31 @@ func (f *L4FlowLog) HitPcapPolicy() bool {
return len(f.AclGids) > 0
}

var poolL4FlowLog = pool.NewLockFreePool(func() *L4FlowLog {
l := new(L4FlowLog)
return l
})

func AcquireL4FlowLog() *L4FlowLog {
l := poolL4FlowLog.Get()
l.ReferenceCount.Reset()
return l
var poolL4FlowLogBatch = pool.NewLockFreePool(func() []L4FlowLog {
ls := make([]L4FlowLog, 8)
for i := range ls {
ls[i].BelongingBlock = ls
}
return ls
},
pool.OptionPoolSizePerCPU(32),
pool.OptionInitFullPoolSize(32))

func AcquireL4FlowLogBatch(size int) []L4FlowLog {
ls := poolL4FlowLogBatch.Get()
if cap(ls) >= size {
ls = ls[:size]
} else {
poolL4FlowLogBatch.Put(ls)
s := make([]L4FlowLog, size)
ls = s
}
for i := range ls {
l := ls[i]
l.BelongingBlock = ls
l.ReferenceCount.Reset()
}
return ls
}

func ReleaseL4FlowLog(l *L4FlowLog) {
Expand All @@ -1084,7 +1101,12 @@ func ReleaseL4FlowLog(l *L4FlowLog) {
return
}
*l = L4FlowLog{}
poolL4FlowLog.Put(l)
for i := range l.BelongingBlock {
if (l.BelongingBlock)[i].GetReferenceCount() > 0 {
return
}
}
poolL4FlowLogBatch.Put(l.BelongingBlock)
}

var L4FlowCounter uint32
Expand All @@ -1095,20 +1117,23 @@ func genID(time uint32, counter *uint32, analyzerID uint32) uint64 {
return uint64(time)<<32 | uint64(analyzerID&0x3ff)<<22 | (uint64(count) & 0x3fffff)
}

func TaggedFlowToL4FlowLog(orgId, teamId uint16, f *pb.TaggedFlow, platformData *grpc.PlatformInfoTable) *L4FlowLog {
isIPV6 := f.Flow.EthType == uint32(layers.EthernetTypeIPv6)

s := AcquireL4FlowLog()
s.OrgId, s.TeamID = orgId, teamId
s._id = genID(uint32(f.Flow.EndTime/uint64(time.Second)), &L4FlowCounter, platformData.QueryAnalyzerID())
s.DataLinkLayer.Fill(f.Flow)
s.NetworkLayer.Fill(f.Flow, isIPV6)
s.TransportLayer.Fill(f.Flow)
s.ApplicationLayer.Fill(f.Flow)
s.Internet.Fill(f.Flow)
s.KnowledgeGraph.FillL4(f.Flow, isIPV6, platformData)
s.FlowInfo.Fill(f.Flow)
s.Metrics.Fill(f.Flow)

return s
func TaggedFlowsToL4FlowLogs(orgId, teamId uint16, fs []pb.TaggedFlow, platformData *grpc.PlatformInfoTable) []L4FlowLog {
ls := AcquireL4FlowLogBatch(len(fs))
for i := range fs {
f := &fs[i]
s := &ls[i]
isIPV6 := f.Flow.EthType == uint32(layers.EthernetTypeIPv6)
s.OrgId, s.TeamID = orgId, teamId
s._id = genID(uint32(f.Flow.EndTime/uint64(time.Second)), &L4FlowCounter, platformData.QueryAnalyzerID())
s.DataLinkLayer.Fill(f.Flow)
s.NetworkLayer.Fill(f.Flow, isIPV6)
s.TransportLayer.Fill(f.Flow)
s.ApplicationLayer.Fill(f.Flow)
s.Internet.Fill(f.Flow)
s.KnowledgeGraph.FillL4(f.Flow, isIPV6, platformData)
s.FlowInfo.Fill(f.Flow)
s.Metrics.Fill(f.Flow)
}

return ls
}
Loading

0 comments on commit 6dbe996

Please sign in to comment.