From 56835459be3bdadf23da9cd9c0e2be1ead720fc9 Mon Sep 17 00:00:00 2001 From: zhuofeng Date: Thu, 24 Oct 2024 14:59:17 +0800 Subject: [PATCH] perf: use template instead of Interface{} in LockFreePool --- .../trisolaris/vtap/process_info.go | 4 ++-- server/ingester/app_log/dbwriter/log.go | 4 ++-- .../event/dbwriter/alert_event_writer.go | 4 ++-- server/ingester/event/dbwriter/event.go | 4 ++-- .../exporters/prometheus_exporter/exporter.go | 4 ++-- .../ext_metrics/dbwriter/ext_metrics.go | 4 ++-- .../ingester/flow_log/log_data/l4_flow_log.go | 4 ++-- .../ingester/flow_log/log_data/l4_packet.go | 4 ++-- .../ingester/flow_log/log_data/l7_flow_log.go | 4 ++-- server/ingester/flow_tag/app_service_tag.go | 4 ++-- server/ingester/flow_tag/flow_tag.go | 4 ++-- server/ingester/pcap/dbwriter/pcap.go | 4 ++-- server/ingester/profile/dbwriter/profile.go | 4 ++-- .../prometheus/dbwriter/prometheus_sample.go | 8 +++---- .../prometheus/decoder/slow_decoder.go | 4 ++-- server/libs/app/document.go | 12 +++++----- server/libs/codec/simple_codec.go | 4 ++-- server/libs/datastructure/linked_list.go | 4 ++-- server/libs/datatype/endpoint.go | 8 +++---- server/libs/datatype/flow.go | 4 ++-- server/libs/datatype/meta_packet.go | 4 ++-- server/libs/datatype/mq.go | 8 +++---- server/libs/datatype/pb/flow_log.go | 4 ++-- server/libs/datatype/protocol_logs.go | 12 +++++----- server/libs/datatype/rpc.go | 4 ++-- server/libs/datatype/sql.go | 8 +++---- server/libs/datatype/tagged_flow.go | 4 ++-- server/libs/eventapi/resource_event.go | 4 ++-- server/libs/flow-metrics/custom_tag_meter.go | 4 ++-- .../libs/flow-metrics/pooled_meters.go.tmpl | 4 ++-- server/libs/flow-metrics/tag.go | 8 +++---- server/libs/flow-metrics/tag_mini.go | 8 +++---- server/libs/hmap/timedtagmap/timed_tag_map.go | 4 ++-- server/libs/pool/pool.go | 22 +++++++++---------- server/libs/pool/pool_test.go | 14 ++++++------ server/libs/receiver/receiver.go | 10 ++++----- .../segmenttree/immutable_segment_tree.go | 4 ++-- server/libs/stats/message.go | 4 ++-- server/libs/tracetree/spantrace.go | 4 ++-- server/libs/tracetree/tracetree.go | 4 ++-- server/libs/utils/byte_buffer.go | 4 ++-- 41 files changed, 119 insertions(+), 119 deletions(-) diff --git a/server/controller/trisolaris/vtap/process_info.go b/server/controller/trisolaris/vtap/process_info.go index e44a6fcbb33..d3e8f7d357a 100644 --- a/server/controller/trisolaris/vtap/process_info.go +++ b/server/controller/trisolaris/vtap/process_info.go @@ -56,12 +56,12 @@ type PidPair struct { pid1 uint32 } -var pidPairPool = pool.NewLockFreePool(func() interface{} { +var pidPairPool = pool.NewLockFreePool(func() *PidPair { return &PidPair{} }) func newPidPair() *PidPair { - return pidPairPool.Get().(*PidPair) + return pidPairPool.Get() } func releasePidPair(pidPair *PidPair) { diff --git a/server/ingester/app_log/dbwriter/log.go b/server/ingester/app_log/dbwriter/log.go index 08709cb42db..13124f54588 100644 --- a/server/ingester/app_log/dbwriter/log.go +++ b/server/ingester/app_log/dbwriter/log.go @@ -327,7 +327,7 @@ func (l *ApplicationLogStore) GenerateNewFlowTags(cache *flow_tag.FlowTagCache) } } -var logPool = pool.NewLockFreePool(func() interface{} { +var logPool = pool.NewLockFreePool(func() *ApplicationLogStore { return &ApplicationLogStore{ IsIPv4: true, AttributeNames: []string{}, @@ -338,7 +338,7 @@ var logPool = pool.NewLockFreePool(func() interface{} { }) func AcquireApplicationLogStore() *ApplicationLogStore { - e := logPool.Get().(*ApplicationLogStore) + e := logPool.Get() e.Reset() return e } diff --git a/server/ingester/event/dbwriter/alert_event_writer.go b/server/ingester/event/dbwriter/alert_event_writer.go index c2f3a8c830c..9135829b41c 100644 --- a/server/ingester/event/dbwriter/alert_event_writer.go +++ b/server/ingester/event/dbwriter/alert_event_writer.go @@ -29,12 +29,12 @@ import ( "github.com/deepflowio/deepflow/server/libs/pool" ) -var alertEventPool = pool.NewLockFreePool(func() interface{} { +var alertEventPool = pool.NewLockFreePool(func() *AlertEventStore { return &AlertEventStore{} }) func AcquireAlertEventStore() *AlertEventStore { - return alertEventPool.Get().(*AlertEventStore) + return alertEventPool.Get() } func ReleaseAlertEventStore(e *AlertEventStore) { diff --git a/server/ingester/event/dbwriter/event.go b/server/ingester/event/dbwriter/event.go index 07839dba324..0700285092a 100644 --- a/server/ingester/event/dbwriter/event.go +++ b/server/ingester/event/dbwriter/event.go @@ -353,7 +353,7 @@ func (e *EventStore) GenerateNewFlowTags(cache *flow_tag.FlowTagCache) { } } -var eventPool = pool.NewLockFreePool(func() interface{} { +var eventPool = pool.NewLockFreePool(func() *EventStore { return &EventStore{ AttributeNames: []string{}, AttributeValues: []string{}, @@ -361,7 +361,7 @@ var eventPool = pool.NewLockFreePool(func() interface{} { }) func AcquireEventStore() *EventStore { - e := eventPool.Get().(*EventStore) + e := eventPool.Get() e.Reset() return e } diff --git a/server/ingester/exporters/prometheus_exporter/exporter.go b/server/ingester/exporters/prometheus_exporter/exporter.go index 227ebe70bc5..df94cd06097 100644 --- a/server/ingester/exporters/prometheus_exporter/exporter.go +++ b/server/ingester/exporters/prometheus_exporter/exporter.go @@ -245,14 +245,14 @@ func (e *PrometheusExporter) sendRequest(queueID int, batchs []prompb.TimeSeries return nil } -var prompbTimeSeriesPool = pool.NewLockFreePool(func() interface{} { +var prompbTimeSeriesPool = pool.NewLockFreePool(func() *prompb.TimeSeries { return &prompb.TimeSeries{ Samples: make([]prompb.Sample, 1), } }) func AcquirePrompbTimeSeries() *prompb.TimeSeries { - return prompbTimeSeriesPool.Get().(*prompb.TimeSeries) + return prompbTimeSeriesPool.Get() } func ReleasePrompbTimeSeries(t *prompb.TimeSeries) { diff --git a/server/ingester/ext_metrics/dbwriter/ext_metrics.go b/server/ingester/ext_metrics/dbwriter/ext_metrics.go index f1c2f9a65b7..9db8b032027 100644 --- a/server/ingester/ext_metrics/dbwriter/ext_metrics.go +++ b/server/ingester/ext_metrics/dbwriter/ext_metrics.go @@ -244,12 +244,12 @@ func (m *ExtMetrics) GenerateNewFlowTags(cache *flow_tag.FlowTagCache) { } } -var extMetricsPool = pool.NewLockFreePool(func() interface{} { +var extMetricsPool = pool.NewLockFreePool(func() *ExtMetrics { return &ExtMetrics{} }) func AcquireExtMetrics() *ExtMetrics { - return extMetricsPool.Get().(*ExtMetrics) + return extMetricsPool.Get() } var emptyUniversalTag = flow_metrics.UniversalTag{} diff --git a/server/ingester/flow_log/log_data/l4_flow_log.go b/server/ingester/flow_log/log_data/l4_flow_log.go index 40f2cc6337d..b823998c70e 100644 --- a/server/ingester/flow_log/log_data/l4_flow_log.go +++ b/server/ingester/flow_log/log_data/l4_flow_log.go @@ -1065,13 +1065,13 @@ func (f *L4FlowLog) HitPcapPolicy() bool { return len(f.AclGids) > 0 } -var poolL4FlowLog = pool.NewLockFreePool(func() interface{} { +var poolL4FlowLog = pool.NewLockFreePool(func() *L4FlowLog { l := new(L4FlowLog) return l }) func AcquireL4FlowLog() *L4FlowLog { - l := poolL4FlowLog.Get().(*L4FlowLog) + l := poolL4FlowLog.Get() l.ReferenceCount.Reset() return l } diff --git a/server/ingester/flow_log/log_data/l4_packet.go b/server/ingester/flow_log/log_data/l4_packet.go index ef59673caf6..027b76e7617 100644 --- a/server/ingester/flow_log/log_data/l4_packet.go +++ b/server/ingester/flow_log/log_data/l4_packet.go @@ -77,12 +77,12 @@ func (p *L4Packet) String() string { return fmt.Sprintf("L4Packet: %+v\n", *p) } -var poolL4Packet = pool.NewLockFreePool(func() interface{} { +var poolL4Packet = pool.NewLockFreePool(func() *L4Packet { return new(L4Packet) }) func AcquireL4Packet() *L4Packet { - l := poolL4Packet.Get().(*L4Packet) + l := poolL4Packet.Get() return l } diff --git a/server/ingester/flow_log/log_data/l7_flow_log.go b/server/ingester/flow_log/log_data/l7_flow_log.go index ab0a7d96712..37891c24199 100644 --- a/server/ingester/flow_log/log_data/l7_flow_log.go +++ b/server/ingester/flow_log/log_data/l7_flow_log.go @@ -638,12 +638,12 @@ func (k *KnowledgeGraph) FillL7(l *pb.AppProtoLogsBaseInfo, platformData *grpc.P ) } -var poolL7FlowLog = pool.NewLockFreePool(func() interface{} { +var poolL7FlowLog = pool.NewLockFreePool(func() *L7FlowLog { return new(L7FlowLog) }) func AcquireL7FlowLog() *L7FlowLog { - l := poolL7FlowLog.Get().(*L7FlowLog) + l := poolL7FlowLog.Get() l.ReferenceCount.Reset() return l } diff --git a/server/ingester/flow_tag/app_service_tag.go b/server/ingester/flow_tag/app_service_tag.go index 73ae40cdf67..cb2e6a61b21 100644 --- a/server/ingester/flow_tag/app_service_tag.go +++ b/server/ingester/flow_tag/app_service_tag.go @@ -84,12 +84,12 @@ func (t *AppServiceTag) Release() { ReleaseAppServiceTag(t) } -var appServiceTagPool = pool.NewLockFreePool(func() interface{} { +var appServiceTagPool = pool.NewLockFreePool(func() *AppServiceTag { return &AppServiceTag{} }) func AcquireAppServiceTag() *AppServiceTag { - f := appServiceTagPool.Get().(*AppServiceTag) + f := appServiceTagPool.Get() return f } diff --git a/server/ingester/flow_tag/flow_tag.go b/server/ingester/flow_tag/flow_tag.go index 5b048ea67ab..cadc46ccaa1 100644 --- a/server/ingester/flow_tag/flow_tag.go +++ b/server/ingester/flow_tag/flow_tag.go @@ -199,12 +199,12 @@ func (t *FlowTag) Release() { ReleaseFlowTag(t) } -var flowTagPool = pool.NewLockFreePool(func() interface{} { +var flowTagPool = pool.NewLockFreePool(func() *FlowTag { return &FlowTag{} }) func AcquireFlowTag(tagType TagType) *FlowTag { - f := flowTagPool.Get().(*FlowTag) + f := flowTagPool.Get() f.ReferenceCount.Reset() f.TagType = tagType return f diff --git a/server/ingester/pcap/dbwriter/pcap.go b/server/ingester/pcap/dbwriter/pcap.go index 5ca967fd509..6f0c4edb309 100644 --- a/server/ingester/pcap/dbwriter/pcap.go +++ b/server/ingester/pcap/dbwriter/pcap.go @@ -85,12 +85,12 @@ func (p *PcapStore) String() string { return fmt.Sprintf("PcapStore: %+v\n", *p) } -var poolPcapStore = pool.NewLockFreePool(func() interface{} { +var poolPcapStore = pool.NewLockFreePool(func() *PcapStore { return new(PcapStore) }) func AcquirePcapStore() *PcapStore { - l := poolPcapStore.Get().(*PcapStore) + l := poolPcapStore.Get() return l } diff --git a/server/ingester/profile/dbwriter/profile.go b/server/ingester/profile/dbwriter/profile.go index c3e05a14a26..50fac8f08ad 100644 --- a/server/ingester/profile/dbwriter/profile.go +++ b/server/ingester/profile/dbwriter/profile.go @@ -260,7 +260,7 @@ func (p *InProcessProfile) OrgID() uint16 { return p.OrgId } -var poolInProcess = pool.NewLockFreePool(func() interface{} { +var poolInProcess = pool.NewLockFreePool(func() *InProcessProfile { return new(InProcessProfile) }) @@ -273,7 +273,7 @@ func (p *InProcessProfile) String() string { } func AcquireInProcess() *InProcessProfile { - l := poolInProcess.Get().(*InProcessProfile) + l := poolInProcess.Get() return l } diff --git a/server/ingester/prometheus/dbwriter/prometheus_sample.go b/server/ingester/prometheus/dbwriter/prometheus_sample.go index 70e2be8d335..558fc40c270 100644 --- a/server/ingester/prometheus/dbwriter/prometheus_sample.go +++ b/server/ingester/prometheus/dbwriter/prometheus_sample.go @@ -284,12 +284,12 @@ func (m *PrometheusSample) GenerateNewFlowTags(cache *flow_tag.FlowTagCache, met m.PrometheusSampleMini.GenerateNewFlowTags(cache, metricName, timeSeries, extraLabels, tsLabelNameIDs, tsLabelValueIDs) } -var prometheusSampleMiniPool = pool.NewLockFreePool(func() interface{} { +var prometheusSampleMiniPool = pool.NewLockFreePool(func() *PrometheusSampleMini { return &PrometheusSampleMini{} }) func AcquirePrometheusSampleMini() *PrometheusSampleMini { - return prometheusSampleMiniPool.Get().(*PrometheusSampleMini) + return prometheusSampleMiniPool.Get() } func ReleasePrometheusSampleMini(p *PrometheusSampleMini) { @@ -297,12 +297,12 @@ func ReleasePrometheusSampleMini(p *PrometheusSampleMini) { prometheusSampleMiniPool.Put(p) } -var prometheusSamplePool = pool.NewLockFreePool(func() interface{} { +var prometheusSamplePool = pool.NewLockFreePool(func() *PrometheusSample { return &PrometheusSample{} }) func AcquirePrometheusSample() *PrometheusSample { - return prometheusSamplePool.Get().(*PrometheusSample) + return prometheusSamplePool.Get() } var emptyUniversalTag = flow_metrics.UniversalTag{} diff --git a/server/ingester/prometheus/decoder/slow_decoder.go b/server/ingester/prometheus/decoder/slow_decoder.go index d11c8273817..ec3b5ac3e8b 100644 --- a/server/ingester/prometheus/decoder/slow_decoder.go +++ b/server/ingester/prometheus/decoder/slow_decoder.go @@ -56,12 +56,12 @@ type SlowItem struct { ts prompb.TimeSeries } -var slowItemPool = pool.NewLockFreePool(func() interface{} { +var slowItemPool = pool.NewLockFreePool(func() *SlowItem { return &SlowItem{} }) func AcquireSlowItem(vtapId, epcId, podClusterId, orgId, teamId uint16, ts *prompb.TimeSeries, extraLabels []prompb.Label) *SlowItem { - s := slowItemPool.Get().(*SlowItem) + s := slowItemPool.Get() s.vtapId = vtapId s.epcId = epcId s.podClusterId = podClusterId diff --git a/server/libs/app/document.go b/server/libs/app/document.go index a7464ed036c..aba6bb14109 100644 --- a/server/libs/app/document.go +++ b/server/libs/app/document.go @@ -112,12 +112,12 @@ func (d *DocumentFlow) String() string { d.Timestamp, d.Flags, d.Tag, d.FlowMeter) } -var poolDocumentFlow = pool.NewLockFreePool(func() interface{} { +var poolDocumentFlow = pool.NewLockFreePool(func() *DocumentFlow { return &DocumentFlow{} }) func AcquireDocumentFlow() *DocumentFlow { - d := poolDocumentFlow.Get().(*DocumentFlow) + d := poolDocumentFlow.Get() d.ReferenceCount.Reset() return d } @@ -157,12 +157,12 @@ func (d *DocumentApp) String() string { d.Timestamp, d.Flags, d.Tag, d.AppMeter) } -var poolDocumentApp = pool.NewLockFreePool(func() interface{} { +var poolDocumentApp = pool.NewLockFreePool(func() *DocumentApp { return &DocumentApp{} }) func AcquireDocumentApp() *DocumentApp { - d := poolDocumentApp.Get().(*DocumentApp) + d := poolDocumentApp.Get() d.ReferenceCount.Reset() return d } @@ -198,12 +198,12 @@ func (d *DocumentUsage) String() string { d.Timestamp, d.Flags, d.Tag, d.UsageMeter) } -var poolDocumentUsage = pool.NewLockFreePool(func() interface{} { +var poolDocumentUsage = pool.NewLockFreePool(func() *DocumentUsage { return &DocumentUsage{} }) func AcquireDocumentUsage() *DocumentUsage { - d := poolDocumentUsage.Get().(*DocumentUsage) + d := poolDocumentUsage.Get() d.ReferenceCount.Reset() return d } diff --git a/server/libs/codec/simple_codec.go b/server/libs/codec/simple_codec.go index c51549383c3..df6ec40a07b 100644 --- a/server/libs/codec/simple_codec.go +++ b/server/libs/codec/simple_codec.go @@ -191,12 +191,12 @@ func (e *SimpleEncoder) String() string { } // pool of encoder -var simpleEncoderPool = pool.NewLockFreePool(func() interface{} { +var simpleEncoderPool = pool.NewLockFreePool(func() *SimpleEncoder { return new(SimpleEncoder) }) func AcquireSimpleEncoder() *SimpleEncoder { - e := simpleEncoderPool.Get().(*SimpleEncoder) + e := simpleEncoderPool.Get() e.ReferenceCount.Reset() return e } diff --git a/server/libs/datastructure/linked_list.go b/server/libs/datastructure/linked_list.go index 1ec62c53653..95fe564bacd 100644 --- a/server/libs/datastructure/linked_list.go +++ b/server/libs/datastructure/linked_list.go @@ -21,7 +21,7 @@ import ( "github.com/deepflowio/deepflow/server/libs/pool" ) -var elementPool = pool.NewLockFreePool(func() interface{} { +var elementPool = pool.NewLockFreePool(func() *Element { return new(Element) }) @@ -37,7 +37,7 @@ type LinkedList struct { } func element(v interface{}) *Element { - e := elementPool.Get().(*Element) + e := elementPool.Get() e.value = v return e } diff --git a/server/libs/datatype/endpoint.go b/server/libs/datatype/endpoint.go index 9ab79fb4674..ed2fca24f14 100644 --- a/server/libs/datatype/endpoint.go +++ b/server/libs/datatype/endpoint.go @@ -224,12 +224,12 @@ func FormatGroupId(id uint32) uint32 { } } -var endpointInfoPool = pool.NewLockFreePool(func() interface{} { +var endpointInfoPool = pool.NewLockFreePool(func() *EndpointInfo { return new(EndpointInfo) }) func AcquireEndpointInfo() *EndpointInfo { - return endpointInfoPool.Get().(*EndpointInfo) + return endpointInfoPool.Get() } func ReleaseEndpointInfo(i *EndpointInfo) { @@ -243,12 +243,12 @@ func CloneEndpointInfo(i *EndpointInfo) *EndpointInfo { return dup } -var endpointDataPool = pool.NewLockFreePool(func() interface{} { +var endpointDataPool = pool.NewLockFreePool(func() *EndpointData { return new(EndpointData) }) func AcquireEndpointData(infos ...*EndpointInfo) *EndpointData { - d := endpointDataPool.Get().(*EndpointData) + d := endpointDataPool.Get() len := len(infos) if len == 0 { d.SrcInfo = AcquireEndpointInfo() diff --git a/server/libs/datatype/flow.go b/server/libs/datatype/flow.go index 9c507779f38..d92b094ee10 100644 --- a/server/libs/datatype/flow.go +++ b/server/libs/datatype/flow.go @@ -921,12 +921,12 @@ func (f *Flow) String() string { } var ZeroFlowPerfStats FlowPerfStats = FlowPerfStats{} -var flowPerfStatsPool = pool.NewLockFreePool(func() interface{} { +var flowPerfStatsPool = pool.NewLockFreePool(func() *FlowPerfStats { return new(FlowPerfStats) }) func AcquireFlowPerfStats() *FlowPerfStats { - return flowPerfStatsPool.Get().(*FlowPerfStats) + return flowPerfStatsPool.Get() } func ReleaseFlowPerfStats(s *FlowPerfStats) { diff --git a/server/libs/datatype/meta_packet.go b/server/libs/datatype/meta_packet.go index f700112eee7..fb16af4654f 100644 --- a/server/libs/datatype/meta_packet.go +++ b/server/libs/datatype/meta_packet.go @@ -177,12 +177,12 @@ func (b *MetaPacketBlock) String() string { return result } -var metaPacketBlockPool = pool.NewLockFreePool(func() interface{} { +var metaPacketBlockPool = pool.NewLockFreePool(func() *MetaPacketBlock { return new(MetaPacketBlock) }, pool.OptionPoolSizePerCPU(16), pool.OptionInitFullPoolSize(16)) func AcquireMetaPacketBlock() *MetaPacketBlock { - b := metaPacketBlockPool.Get().(*MetaPacketBlock) + b := metaPacketBlockPool.Get() b.ReferenceCount.Reset() return b } diff --git a/server/libs/datatype/mq.go b/server/libs/datatype/mq.go index 0a20c6c95a8..c097d587c23 100644 --- a/server/libs/datatype/mq.go +++ b/server/libs/datatype/mq.go @@ -24,12 +24,12 @@ import ( "github.com/deepflowio/deepflow/server/libs/pool" ) -var kafkaInfoPool = pool.NewLockFreePool(func() interface{} { +var kafkaInfoPool = pool.NewLockFreePool(func() *KafkaInfo { return new(KafkaInfo) }) func AcquireKafkaInfo() *KafkaInfo { - return kafkaInfoPool.Get().(*KafkaInfo) + return kafkaInfoPool.Get() } func ReleaseKafkaInfo(d *KafkaInfo) { @@ -80,12 +80,12 @@ func (i *KafkaInfo) Merge(r interface{}) { } } -var mqttInfoPool = pool.NewLockFreePool(func() interface{} { +var mqttInfoPool = pool.NewLockFreePool(func() *MqttInfo { return new(MqttInfo) }) func AcquireMqttInfo() *MqttInfo { - return mqttInfoPool.Get().(*MqttInfo) + return mqttInfoPool.Get() } func ReleaseMqttInfo(d *MqttInfo) { diff --git a/server/libs/datatype/pb/flow_log.go b/server/libs/datatype/pb/flow_log.go index 7136df71806..0679f537867 100644 --- a/server/libs/datatype/pb/flow_log.go +++ b/server/libs/datatype/pb/flow_log.go @@ -20,7 +20,7 @@ import ( "github.com/deepflowio/deepflow/server/libs/pool" ) -var pbAppProtoLogsDataPool = pool.NewLockFreePool(func() interface{} { +var pbAppProtoLogsDataPool = pool.NewLockFreePool(func() *AppProtoLogsData { return &AppProtoLogsData{ Base: &AppProtoLogsBaseInfo{ Head: &AppProtoHead{}, @@ -29,7 +29,7 @@ var pbAppProtoLogsDataPool = pool.NewLockFreePool(func() interface{} { }) func AcquirePbAppProtoLogsData() *AppProtoLogsData { - d := pbAppProtoLogsDataPool.Get().(*AppProtoLogsData) + d := pbAppProtoLogsDataPool.Get() return d } diff --git a/server/libs/datatype/protocol_logs.go b/server/libs/datatype/protocol_logs.go index f86e15295d1..0fd413edab1 100644 --- a/server/libs/datatype/protocol_logs.go +++ b/server/libs/datatype/protocol_logs.go @@ -175,12 +175,12 @@ type AppProtoLogsData struct { pool.ReferenceCount } -var httpInfoPool = pool.NewLockFreePool(func() interface{} { +var httpInfoPool = pool.NewLockFreePool(func() *HTTPInfo { return new(HTTPInfo) }) func AcquireHTTPInfo() *HTTPInfo { - return httpInfoPool.Get().(*HTTPInfo) + return httpInfoPool.Get() } func ReleaseHTTPInfo(h *HTTPInfo) { @@ -188,12 +188,12 @@ func ReleaseHTTPInfo(h *HTTPInfo) { httpInfoPool.Put(h) } -var dnsInfoPool = pool.NewLockFreePool(func() interface{} { +var dnsInfoPool = pool.NewLockFreePool(func() *DNSInfo { return new(DNSInfo) }) func AcquireDNSInfo() *DNSInfo { - return dnsInfoPool.Get().(*DNSInfo) + return dnsInfoPool.Get() } func ReleaseDNSInfo(d *DNSInfo) { @@ -201,13 +201,13 @@ func ReleaseDNSInfo(d *DNSInfo) { dnsInfoPool.Put(d) } -var appProtoLogsDataPool = pool.NewLockFreePool(func() interface{} { +var appProtoLogsDataPool = pool.NewLockFreePool(func() *AppProtoLogsData { return new(AppProtoLogsData) }) var zeroAppProtoLogsData = AppProtoLogsData{} func AcquireAppProtoLogsData() *AppProtoLogsData { - d := appProtoLogsDataPool.Get().(*AppProtoLogsData) + d := appProtoLogsDataPool.Get() d.Reset() return d } diff --git a/server/libs/datatype/rpc.go b/server/libs/datatype/rpc.go index 38d7c13425d..b5f7d08dadb 100644 --- a/server/libs/datatype/rpc.go +++ b/server/libs/datatype/rpc.go @@ -23,12 +23,12 @@ import ( "github.com/deepflowio/deepflow/server/libs/pool" ) -var dubboInfoPool = pool.NewLockFreePool(func() interface{} { +var dubboInfoPool = pool.NewLockFreePool(func() *DubboInfo { return new(DubboInfo) }) func AcquireDubboInfo() *DubboInfo { - return dubboInfoPool.Get().(*DubboInfo) + return dubboInfoPool.Get() } func ReleaseDubboInfo(d *DubboInfo) { diff --git a/server/libs/datatype/sql.go b/server/libs/datatype/sql.go index 14c842c25bd..fe5a8d66875 100644 --- a/server/libs/datatype/sql.go +++ b/server/libs/datatype/sql.go @@ -25,12 +25,12 @@ import ( "github.com/deepflowio/deepflow/server/libs/pool" ) -var mysqlInfoPool = pool.NewLockFreePool(func() interface{} { +var mysqlInfoPool = pool.NewLockFreePool(func() *MysqlInfo { return new(MysqlInfo) }) func AcquireMYSQLInfo() *MysqlInfo { - return mysqlInfoPool.Get().(*MysqlInfo) + return mysqlInfoPool.Get() } func ReleaseMYSQLInfo(d *MysqlInfo) { @@ -138,12 +138,12 @@ func (i *MysqlInfo) Merge(r interface{}) { } } -var redisInfoPool = pool.NewLockFreePool(func() interface{} { +var redisInfoPool = pool.NewLockFreePool(func() *RedisInfo { return new(RedisInfo) }) func AcquireREDISInfo() *RedisInfo { - return redisInfoPool.Get().(*RedisInfo) + return redisInfoPool.Get() } func ReleaseREDISInfo(d *RedisInfo) { diff --git a/server/libs/datatype/tagged_flow.go b/server/libs/datatype/tagged_flow.go index b9adb2fd078..cfdf5966f12 100644 --- a/server/libs/datatype/tagged_flow.go +++ b/server/libs/datatype/tagged_flow.go @@ -77,12 +77,12 @@ func (f *TaggedFlow) Reverse() { f.Tag.Reverse() } -var taggedFlowPool = pool.NewLockFreePool(func() interface{} { +var taggedFlowPool = pool.NewLockFreePool(func() *TaggedFlow { return new(TaggedFlow) }) func AcquireTaggedFlow() *TaggedFlow { - f := taggedFlowPool.Get().(*TaggedFlow) + f := taggedFlowPool.Get() f.ReferenceCount.Reset() return f } diff --git a/server/libs/eventapi/resource_event.go b/server/libs/eventapi/resource_event.go index 2a5bdadb110..35a4144b2f0 100644 --- a/server/libs/eventapi/resource_event.go +++ b/server/libs/eventapi/resource_event.go @@ -176,12 +176,12 @@ func (r *ResourceEvent) Release() { ReleaseResourceEvent(r) } -var poolResourceEvent = pool.NewLockFreePool(func() interface{} { +var poolResourceEvent = pool.NewLockFreePool(func() *ResourceEvent { return new(ResourceEvent) }) func AcquireResourceEvent() *ResourceEvent { - return poolResourceEvent.Get().(*ResourceEvent) + return poolResourceEvent.Get() } func ReleaseResourceEvent(event *ResourceEvent) { diff --git a/server/libs/flow-metrics/custom_tag_meter.go b/server/libs/flow-metrics/custom_tag_meter.go index 8d819bd1727..3e46a241c43 100644 --- a/server/libs/flow-metrics/custom_tag_meter.go +++ b/server/libs/flow-metrics/custom_tag_meter.go @@ -134,12 +134,12 @@ type CustomTag struct { pool.ReferenceCount } -var customTagPool = pool.NewLockFreePool(func() interface{} { +var customTagPool = pool.NewLockFreePool(func() *CustomTag { return &CustomTag{} }) func AcquireCustomTag() *CustomTag { - t := customTagPool.Get().(*CustomTag) + t := customTagPool.Get() t.Reset() return t } diff --git a/server/libs/flow-metrics/pooled_meters.go.tmpl b/server/libs/flow-metrics/pooled_meters.go.tmpl index 73e7db7693d..64ce2c0806b 100644 --- a/server/libs/flow-metrics/pooled_meters.go.tmpl +++ b/server/libs/flow-metrics/pooled_meters.go.tmpl @@ -8,12 +8,12 @@ import ( {{ $pool_name := print "pool" . }} - var {{$pool_name}} = pool.NewLockFreePool(func() interface{} { + var {{$pool_name}} = pool.NewLockFreePool(func() *{{.}} { return new({{.}}) }) func Acquire{{.}}() *{{.}} { - return {{$pool_name}}.Get().(*{{.}}) + return {{$pool_name}}.Get() } func Release{{.}}(meter *{{.}}) { diff --git a/server/libs/flow-metrics/tag.go b/server/libs/flow-metrics/tag.go index fd725d657c8..b99148a4203 100644 --- a/server/libs/flow-metrics/tag.go +++ b/server/libs/flow-metrics/tag.go @@ -1470,12 +1470,12 @@ func (t *Tag) DatabaseSuffix() string { return DatabaseSuffix[t.DatabaseSuffixID()] } -var fieldPool = pool.NewLockFreePool(func() interface{} { +var fieldPool = pool.NewLockFreePool(func() *Field { return &Field{} }) func AcquireField() *Field { - return fieldPool.Get().(*Field) + return fieldPool.Get() } func ReleaseField(field *Field) { @@ -1500,12 +1500,12 @@ func CloneField(field *Field) *Field { return newField } -var tagPool = pool.NewLockFreePool(func() interface{} { +var tagPool = pool.NewLockFreePool(func() *Tag { return &Tag{} }) func AcquireTag() *Tag { - return tagPool.Get().(*Tag) + return tagPool.Get() } // ReleaseTag 需要释放Tag拥有的Field diff --git a/server/libs/flow-metrics/tag_mini.go b/server/libs/flow-metrics/tag_mini.go index 0941d37c3d1..4b2282ee89b 100644 --- a/server/libs/flow-metrics/tag_mini.go +++ b/server/libs/flow-metrics/tag_mini.go @@ -264,12 +264,12 @@ func (t *MiniTag) GetTAPType() uint8 { return uint8(t.TAPType) } -var miniFieldPool = pool.NewLockFreePool(func() interface{} { +var miniFieldPool = pool.NewLockFreePool(func() *MiniField { return &MiniField{} }) func AcquireMiniField() *MiniField { - return miniFieldPool.Get().(*MiniField) + return miniFieldPool.Get() } func ReleaseMiniField(miniField *MiniField) { @@ -286,12 +286,12 @@ func CloneMiniField(miniField *MiniField) *MiniField { return newMiniField } -var miniTagPool = pool.NewLockFreePool(func() interface{} { +var miniTagPool = pool.NewLockFreePool(func() *MiniTag { return &MiniTag{} }) func AcquireMiniTag() *MiniTag { - return miniTagPool.Get().(*MiniTag) + return miniTagPool.Get() } // ReleaseMiniTag 需要释放Tag拥有的Field diff --git a/server/libs/hmap/timedtagmap/timed_tag_map.go b/server/libs/hmap/timedtagmap/timed_tag_map.go index 66eb583adc5..08647fc1a8b 100644 --- a/server/libs/hmap/timedtagmap/timed_tag_map.go +++ b/server/libs/hmap/timedtagmap/timed_tag_map.go @@ -33,12 +33,12 @@ const ( MAX_TTL = 32 // 需要用32bit表示是否invalid ) -var tagIDSlicePool = pool.NewLockFreePool(func() interface{} { +var tagIDSlicePool = pool.NewLockFreePool(func() []TagID { return make([]TagID, MAX_TTL+1) }) func AcquireTagIDSlice(ttl int) []TagID { - ti := tagIDSlicePool.Get().([]TagID) + ti := tagIDSlicePool.Get() for i := 0; i < ttl; i++ { ti[i] = INVALID_TAG_ID } diff --git a/server/libs/pool/pool.go b/server/libs/pool/pool.go index ce0c136a787..1b1414cb55a 100644 --- a/server/libs/pool/pool.go +++ b/server/libs/pool/pool.go @@ -78,18 +78,18 @@ func SetCounterRegisterCallback(callback CounterRegisterCallback) { // 我们需要利用好这一个元素的位置,所以在这个元素上放置slice指针 // 作为实际的pool使用,每次Get/Put时,先拿到slice,弹出/推入元素后再 // 将slice放回,以尽可能无锁分配释放资源 -type LockFreePool struct { +type LockFreePool[T any] struct { emptyPool *sync.Pool fullPool *sync.Pool counter *Counter } -func (p *LockFreePool) Get() interface{} { +func (p *LockFreePool[T]) Get() T { atomic.AddUint64(&p.counter.InUseObjects, 1) atomic.AddUint64(&p.counter.InUseBytes, p.counter.ObjectSize) - elemPool := p.fullPool.Get().(*[]interface{}) // avoid convT2Eslice + elemPool := p.fullPool.Get().(*[]T) // avoid convT2Eslice pool := *elemPool e := pool[len(pool)-1] *elemPool = pool[:len(pool)-1] @@ -101,11 +101,11 @@ func (p *LockFreePool) Get() interface{} { return e } -func (p *LockFreePool) Put(x interface{}) { +func (p *LockFreePool[T]) Put(x T) { atomic.AddUint64(&p.counter.InUseObjects, math.MaxUint64) atomic.AddUint64(&p.counter.InUseBytes, math.MaxUint64-p.counter.ObjectSize+1) - pool := p.emptyPool.Get().(*[]interface{}) // avoid convT2Eslice + pool := p.emptyPool.Get().(*[]T) // avoid convT2Eslice *pool = append(*pool, x) if len(*pool) < cap(*pool) { p.emptyPool.Put(pool) @@ -115,7 +115,7 @@ func (p *LockFreePool) Put(x interface{}) { } // 注意OptionInitFullPoolSize不能大于OptionPoolSizePerCPU,且不能小于等于0 -func NewLockFreePool(alloc func() interface{}, options ...Option) *LockFreePool { +func NewLockFreePool[T any](alloc func() T, options ...Option) *LockFreePool[T] { // options poolSizePerCPU := POOL_SIZE_PER_CPU initFullPoolSize := INIT_FULL_POOL_SIZE @@ -144,12 +144,12 @@ func NewLockFreePool(alloc func() interface{}, options ...Option) *LockFreePool } // functions - newEmptySlice := func() interface{} { - p := make([]interface{}, 0, poolSizePerCPU) + newEmptySlice := func() any { + p := make([]T, 0, poolSizePerCPU) return &p } - newFullSlice := func() interface{} { - p := make([]interface{}, initFullPoolSize, poolSizePerCPU) + newFullSlice := func() any { + p := make([]T, initFullPoolSize, poolSizePerCPU) for i := OptionInitFullPoolSize(0); i < initFullPoolSize; i++ { p[i] = alloc() } @@ -175,7 +175,7 @@ func NewLockFreePool(alloc func() interface{}, options ...Option) *LockFreePool allCounters = append(allCounters, counter) counterListLock.Unlock() - return &LockFreePool{ + return &LockFreePool[T]{ emptyPool: &sync.Pool{ New: newEmptySlice, }, diff --git a/server/libs/pool/pool_test.go b/server/libs/pool/pool_test.go index 838859a1bbe..c95eee7dc94 100644 --- a/server/libs/pool/pool_test.go +++ b/server/libs/pool/pool_test.go @@ -84,9 +84,9 @@ func BenchmarkNativePoolOverPut(b *testing.B) { } func BenchmarkLockFreePoolGetPut1Thread(b *testing.B) { - pools := make([]*LockFreePool, b.N/1024) + pools := make([]*LockFreePool[int], b.N/1024) for p, _ := range pools { - pool := NewLockFreePool(func() interface{} { return 0 }) + pool := NewLockFreePool(func() int { return 0 }) for i := 0; i < 1024; i++ { pool.Put(0) } @@ -103,13 +103,13 @@ func BenchmarkLockFreePoolGetPut1Thread(b *testing.B) { } func BenchmarkLockFreePoolGetPut2Thread(b *testing.B) { - pools := make([]*LockFreePool, 16) + pools := make([]*LockFreePool[int], 16) for i := range pools { - pool := NewLockFreePool(func() interface{} { return 0 }) + pool := NewLockFreePool(func() int { return 0 }) pools[i] = pool } - put := func(pool []*LockFreePool) { + put := func(pool []*LockFreePool[int]) { for i := 0; i < b.N; i++ { for _, p := range pools { p.Put(0) @@ -127,14 +127,14 @@ func BenchmarkLockFreePoolGetPut2Thread(b *testing.B) { } func BenchmarkLockFreePoolHungryGet(b *testing.B) { - pool := NewLockFreePool(func() interface{} { return 0 }) + pool := NewLockFreePool[int](func() int { return 0 }) for i := 0; i < b.N; i++ { pool.Get() } } func BenchmarkLockFreePoolOverPut(b *testing.B) { - pool := NewLockFreePool(func() interface{} { return 0 }) + pool := NewLockFreePool[int](func() int { return 0 }) for i := 0; i < 1024; i++ { pool.Put(0) } diff --git a/server/libs/receiver/receiver.go b/server/libs/receiver/receiver.go index 55e795119d1..94bf7505780 100644 --- a/server/libs/receiver/receiver.go +++ b/server/libs/receiver/receiver.go @@ -85,9 +85,9 @@ func (r *RecvBuffer) String() string { return fmt.Sprintf("IP:%s %s\n", r.IP, string(r.Buffer)) } -func newBufferPool(bufferSize, poolSizePerCPU int) *pool.LockFreePool { - return pool.NewLockFreePool( - func() interface{} { +func newBufferPool(bufferSize, poolSizePerCPU int) *pool.LockFreePool[*RecvBuffer] { + return pool.NewLockFreePool[*RecvBuffer]( + func() *RecvBuffer { return &RecvBuffer{ Buffer: make([]byte, bufferSize), } @@ -98,7 +98,7 @@ func newBufferPool(bufferSize, poolSizePerCPU int) *pool.LockFreePool { ) } -var recvBufferPools = []*pool.LockFreePool{ +var recvBufferPools = []*pool.LockFreePool[*RecvBuffer]{ newBufferPool(RECV_BUFSIZE_2K, 16), newBufferPool(RECV_BUFSIZE_8K, 32), newBufferPool(RECV_BUFSIZE_64K, 8), @@ -130,7 +130,7 @@ func minPowerOfTwo(v int) int { func AcquireRecvBuffer(length int, socketType ServerType) (*RecvBuffer, bool) { isNew := false index := getBufferPoolIndex(length) - buf := recvBufferPools[index].Get().(*RecvBuffer) + buf := recvBufferPools[index].Get() buf.SocketType = socketType if len(buf.Buffer) < length { length = minPowerOfTwo(length) diff --git a/server/libs/segmenttree/immutable_segment_tree.go b/server/libs/segmenttree/immutable_segment_tree.go index fe5997bc6c9..f33d9ef04e5 100644 --- a/server/libs/segmenttree/immutable_segment_tree.go +++ b/server/libs/segmenttree/immutable_segment_tree.go @@ -36,7 +36,7 @@ const ( RIGHT_CLOSED // ... endpoint] (... ) -var subTreePool = pool.NewLockFreePool(func() interface{} { +var subTreePool = pool.NewLockFreePool(func() *SubTree { return new(SubTree) }) @@ -50,7 +50,7 @@ type SortableEndpoints struct { } func subTree(tree SubTree) *SubTree { - t := subTreePool.Get().(*SubTree) + t := subTreePool.Get() *t = tree return t } diff --git a/server/libs/stats/message.go b/server/libs/stats/message.go index 6a690a7f7de..e150df92c99 100644 --- a/server/libs/stats/message.go +++ b/server/libs/stats/message.go @@ -163,7 +163,7 @@ func (s *DFStats) GenCKTable(ttl int) *ckdb.Table { } } -var poolDFStats = pool.NewLockFreePool(func() interface{} { +var poolDFStats = pool.NewLockFreePool(func() *DFStats { return &DFStats{ Tags: make([]Tag, 0, 4), Fields: make([]Field, 0, 4), @@ -171,7 +171,7 @@ var poolDFStats = pool.NewLockFreePool(func() interface{} { }) func AcquireDFStats() *DFStats { - return poolDFStats.Get().(*DFStats) + return poolDFStats.Get() } func ReleaseDFStats(s *DFStats) { diff --git a/server/libs/tracetree/spantrace.go b/server/libs/tracetree/spantrace.go index 8cedb751cf0..f5e6a650e47 100644 --- a/server/libs/tracetree/spantrace.go +++ b/server/libs/tracetree/spantrace.go @@ -108,12 +108,12 @@ func (t *SpanTrace) Decode(decoder *codec.SimpleDecoder) error { } -var poolSpanTrace = pool.NewLockFreePool(func() interface{} { +var poolSpanTrace = pool.NewLockFreePool(func() *SpanTrace { return new(SpanTrace) }) func AcquireSpanTrace() *SpanTrace { - return poolSpanTrace.Get().(*SpanTrace) + return poolSpanTrace.Get() } func ReleaseSpanTrace(t *SpanTrace) { diff --git a/server/libs/tracetree/tracetree.go b/server/libs/tracetree/tracetree.go index 02b94eb93c3..1deca42f77e 100644 --- a/server/libs/tracetree/tracetree.go +++ b/server/libs/tracetree/tracetree.go @@ -116,12 +116,12 @@ func TraceTreeColumns() []*ckdb.Column { } } -var poolTraceTree = pool.NewLockFreePool(func() interface{} { +var poolTraceTree = pool.NewLockFreePool(func() *TraceTree { return new(TraceTree) }) func AcquireTraceTree() *TraceTree { - return poolTraceTree.Get().(*TraceTree) + return poolTraceTree.Get() } func ReleaseTraceTree(t *TraceTree) { diff --git a/server/libs/utils/byte_buffer.go b/server/libs/utils/byte_buffer.go index cf04e0f5be9..ed8ac070a37 100644 --- a/server/libs/utils/byte_buffer.go +++ b/server/libs/utils/byte_buffer.go @@ -53,12 +53,12 @@ func (b *ByteBuffer) SetQuota(n int) { b.quota = n } -var byteBufferPool = pool.NewLockFreePool(func() interface{} { +var byteBufferPool = pool.NewLockFreePool(func() *ByteBuffer { return &ByteBuffer{quota: 1 << 16} }) func AcquireByteBuffer() *ByteBuffer { - b := byteBufferPool.Get().(*ByteBuffer) + b := byteBufferPool.Get() b.ReferenceCount.Reset() return b }