Skip to content

Commit

Permalink
feat(ct-metrics): add initial conntrack metrics for Prometheus
Browse files Browse the repository at this point in the history
  • Loading branch information
SRodi committed Nov 22, 2024
1 parent b53a09a commit 35fee01
Show file tree
Hide file tree
Showing 11 changed files with 184 additions and 8 deletions.
20 changes: 20 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,26 @@ func InitializeMetrics() {
dnsResponseCounterDescription,
)

// Conntrack Metrics
ConntrackPacketsCounter = exporter.CreatePrometheusGaugeVecForMetric(
exporter.DefaultRegistry,
utils.ConntrackPacketsCounterName,
conntrackPacketsCountDescription,
utils.ConntrackLabels...,
)
ConntrackPacketsBytesCounter = exporter.CreatePrometheusGaugeVecForMetric(
exporter.DefaultRegistry,
utils.ConntrackBytesCounterName,
conntrackPacketsBytesCountDescription,
utils.ConntrackLabels...,
)
ConntrackConnectionsCounter = exporter.CreatePrometheusCounterVecForMetric(
exporter.DefaultRegistry,
utils.ConntrackConnectionsCounterName,
conntrackConnectionsCountDescription,
utils.ConntrackLabels...,
)

// InfiniBand Metrics
InfinibandStatsGauge = exporter.CreatePrometheusGaugeVecForMetric(
exporter.DefaultRegistry,
Expand Down
8 changes: 8 additions & 0 deletions pkg/metrics/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ const (
dnsResponseCounterDescription = "DNS responses by statistics"
infinibandStatsGaugeDescription = "InfiniBand statistics gauge"
infinibandStatusParamsGaugeDescription = "InfiniBand Status Parameters gauge"
conntrackPacketsCountDescription = "Number of packets per connection"
conntrackPacketsBytesCountDescription = "Number of bytes per connection"
conntrackConnectionsCountDescription = "Number of connections"

// Control plane metrics
pluginManagerFailedToReconcileCounterDescription = "Number of times the plugin manager failed to reconcile the plugins"
Expand Down Expand Up @@ -89,6 +92,11 @@ var (

InfinibandStatsGauge GaugeVec
InfinibandStatusParamsGauge GaugeVec

// Conntrack metrics
ConntrackPacketsCounter GaugeVec
ConntrackPacketsBytesCounter GaugeVec
ConntrackConnectionsCounter CounterVec
)

func ToPrometheusType(metric interface{}) prometheus.Collector {
Expand Down
50 changes: 49 additions & 1 deletion pkg/plugin/conntrack/_cprog/conntrack.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,25 @@ struct {
__uint(pinning, LIBBPF_PIN_BY_NAME); // needs pinning so this can be access from other processes .i.e debug cli
} retina_conntrack SEC(".maps");

/*
* Metric to store the number of packets and bytes for a connection.
*/
struct conntrack_metric_entry {
__u64 packet_count;
__u64 byte_count;
__u8 observation_point;
__u8 traffic_direction;
};

struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__uint(max_entries, 512);
__type(key, struct ct_v4_key);
__type(value, struct conntrack_metric_entry);
__uint(pinning, LIBBPF_PIN_BY_NAME); // needs pinning so this can be access from other processes .i.e debug cli
} retina_conntrack_metrics SEC(".maps");


/**
* Helper function to reverse a key.
* @arg reverse_key The key to store the reversed key.
Expand All @@ -94,6 +113,30 @@ static inline void _ct_reverse_key(struct ct_v4_key *reverse_key, const struct c
reverse_key->proto = key->proto;
}

static __always_inline void _ct_update_counter_metrics(struct ct_v4_key *key, struct packet *p) {
// lookup if the key exists in packet metrics map
struct conntrack_metric_entry *ct_metric_entry = bpf_map_lookup_elem(&retina_conntrack_metrics, key);

if (!ct_metric_entry) {
// create a new entry
struct conntrack_metric_entry new_ct_metric_entry;
__builtin_memset(&new_ct_metric_entry, 0, sizeof(struct conntrack_metric_entry));
// initialize the new entry
new_ct_metric_entry.packet_count = 1;
new_ct_metric_entry.byte_count = p->bytes;
new_ct_metric_entry.observation_point = p->observation_point;
new_ct_metric_entry.traffic_direction = p->traffic_direction;
// update bpf map for packet metrics
bpf_map_update_elem(&retina_conntrack_metrics, key, &new_ct_metric_entry, BPF_ANY);
} else {
// Update packet and byte counters
ct_metric_entry->packet_count += 1;
ct_metric_entry->byte_count += p->bytes;
ct_metric_entry->observation_point = p->observation_point;
ct_metric_entry->traffic_direction = p->traffic_direction;
}
}

/**
* Returns the traffic direction based on the observation point.
* @arg observation_point The point in the network stack where the packet is observed.
Expand Down Expand Up @@ -168,6 +211,8 @@ static __always_inline bool _ct_handle_tcp_connection(struct packet *p, struct c
// Update packet accordingly.
p->is_reply = false;
p->traffic_direction = _ct_get_traffic_direction(observation_point);
// Update metrics this initializes the key if it does not exist
_ct_update_counter_metrics(&key, p);
// Create a new connection with a timeout of CT_SYN_TIMEOUT.
return _ct_create_new_tcp_connection(key, p->flags, observation_point);
}
Expand Down Expand Up @@ -298,7 +343,6 @@ static __always_inline bool _ct_should_report_packet(struct ct_entry *entry, __u
* Returns true if the packet should be report to userspace. False otherwise.
*/
static __always_inline __attribute__((unused)) bool ct_process_packet(struct packet *p, __u8 observation_point) {

if (!p) {
return false;
}
Expand All @@ -310,6 +354,10 @@ static __always_inline __attribute__((unused)) bool ct_process_packet(struct pac
key.src_port = p->src_port;
key.dst_port = p->dst_port;
key.proto = p->proto;

// Update metrics this initializes the key if it does not exist
_ct_update_counter_metrics(&key, p);

// Lookup the connection in the map.
struct ct_entry *entry = bpf_map_lookup_elem(&retina_conntrack, &key);

Expand Down
15 changes: 13 additions & 2 deletions pkg/plugin/conntrack/conntrack_bpfel_x86.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified pkg/plugin/conntrack/conntrack_bpfel_x86.o
Binary file not shown.
40 changes: 40 additions & 0 deletions pkg/plugin/conntrack/conntrack_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"github.com/microsoft/retina/pkg/utils"
"github.com/pkg/errors"
"go.uber.org/zap"

"github.com/microsoft/retina/pkg/metrics"
)

//go:generate go run github.com/cilium/ebpf/cmd/bpf2go@master -cflags "-g -O2 -Wall -D__TARGET_ARCH_${GOARCH} -Wall" -target ${GOARCH} -type ct_v4_key conntrack ./_cprog/conntrack.c -- -I../lib/_${GOARCH} -I../lib/common/libbpf/_src -I../lib/common/libbpf/_include/linux -I../lib/common/libbpf/_include/uapi/linux -I../lib/common/libbpf/_include/asm
Expand Down Expand Up @@ -63,9 +65,25 @@ func New() (*Conntrack, error) {
ct.objs = objs
// Get the conntrack map from the objects
ct.ctMap = objs.RetinaConntrack
ct.ctMetricsMap = objs.RetinaConntrackMetrics
return ct, nil
}

func (ct *Conntrack) updateConntrackMetrics() {
var key conntrackCtV4Key
var value conntrackConntrackMetricEntry
iter := ct.ctMetricsMap.Iterate()
info, err := ct.ctMetricsMap.Info()
if err != nil {
ct.l.Error("Failed to get conntrack metrics map info", zap.Error(err))
return
}
ct.l.Debug("Iterating over conntrack metrics" + info.Name)
for iter.Next(&key, &value) {
ct.conntrackMetricAdd(key, float64(value.PacketCount), float64(value.ByteCount), uint8(value.ObservationPoint), uint8(value.TrafficDirection))
}
}

// Run starts the Conntrack garbage collection loop.
func (ct *Conntrack) Run(ctx context.Context) error {
ticker := time.NewTicker(ct.gcFrequency)
Expand Down Expand Up @@ -93,8 +111,13 @@ func (ct *Conntrack) Run(ctx context.Context) error {
// List of keys to be deleted
var keysToDelete []conntrackCtV4Key

ct.updateConntrackMetrics()

iter := ct.ctMap.Iterate()
for iter.Next(&key, &value) {

// TODO: remove this once the metrics are updated
// ct.conntrackMetricAdd(key, 2, 4)
noOfCtEntries++
// Check if the connection is closing or has expired
if ktime.MonotonicOffset.Seconds()+float64(value.EvictionTime) < float64((time.Now().Unix())) {
Expand Down Expand Up @@ -139,3 +162,20 @@ func (ct *Conntrack) Run(ctx context.Context) error {
}
}
}

func (ct *Conntrack) conntrackMetricAdd(key conntrackCtV4Key, count float64, bytes float64, observationPoint uint8, direction uint8) {

srcIP := utils.Int2ip(key.SrcIp).To4()
dstIP := utils.Int2ip(key.DstIp).To4()

labels := []string{
srcIP.String(),
dstIP.String(),
decodeProto(key.Proto),
decodeObservationPoint(observationPoint),
decodeDirection(direction),
}
metrics.ConntrackPacketsCounter.WithLabelValues(labels...).Set(float64(count))
metrics.ConntrackPacketsBytesCounter.WithLabelValues(labels...).Set(float64(bytes))
// TODO metrics.ConntrackConnectionsCounter.WithLabelValues(labels...).Inc()
}
37 changes: 33 additions & 4 deletions pkg/plugin/conntrack/types_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ const (
)

type Conntrack struct {
l *log.ZapLogger
objs *conntrackObjects
ctMap *ebpf.Map
gcFrequency time.Duration
l *log.ZapLogger
objs *conntrackObjects
ctMap *ebpf.Map
ctMetricsMap *ebpf.Map
gcFrequency time.Duration
}

// Define TCP flag constants
Expand Down Expand Up @@ -74,3 +75,31 @@ func decodeProto(proto uint8) string {
return "Not supported"
}
}

func decodeDirection(trafficDirection uint8) string {
switch trafficDirection {
case 0: // nolint:gomnd // TRAFFIC_DIRECTION_UNKNOWN
return "TRAFFIC_DIRECTION_UNKNOWN"
case 1: // nolint:gomnd // TRAFFIC_DIRECTION_INGRESS
return "TRAFFIC_DIRECTION_INGRESS"
case 2: // nolint:gomnd // TRAFFIC_DIRECTION_EGRESS
return "TRAFFIC_DIRECTION_EGRESS"
default:
return "Not supported"
}
}

func decodeObservationPoint(trafficDirection uint8) string {
switch trafficDirection {
case 0: // nolint:gomnd // OBSERVATION_POINT_FROM_ENDPOINT
return "OBSERVATION_POINT_FROM_ENDPOINT"
case 1: // nolint:gomnd // OBSERVATION_POINT_TO_ENDPOINT
return "OBSERVATION_POINT_TO_ENDPOINT"
case 2: // nolint:gomnd // OBSERVATION_POINT_FROM_NETWORK
return "OBSERVATION_POINT_FROM_NETWORK"
case 3: // nolint:gomnd // OBSERVATION_POINT_TO_NETWORK
return "OBSERVATION_POINT_TO_NETWORK"
default:
return "Not supported"
}
}
11 changes: 11 additions & 0 deletions pkg/plugin/packetparser/packetparser_bpfel_x86.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified pkg/plugin/packetparser/packetparser_bpfel_x86.o
Binary file not shown.
3 changes: 3 additions & 0 deletions pkg/utils/attr_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ var (
// DNS labels.
DNSRequestLabels = []string{"query_type", "query"}
DNSResponseLabels = []string{"return_code", "query_type", "query", "response", "num_response"}

// Conntrack labels.
ConntrackLabels = []string{"src_ip", "dst_ip", "proto", "observation_point", "direction"}
)

func GetPluginEventAttributes(attrs []attribute.KeyValue, pluginName, eventName, timestamp string) []attribute.KeyValue {
Expand Down
8 changes: 7 additions & 1 deletion pkg/utils/metric_names.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ const (
NoResponseFromAPIServerName = "node_apiserver_no_response"
InfinibandCounterStatsName = "infiniband_counter_stats"
InfinibandStatusParamsName = "infiniband_status_params"
ConntrackPacketsCounterName = "packets_count_per_connection"
ConntrackBytesCounterName = "bytes_count_per_connection"
ConntrackConnectionsCounterName = "connections_count"

// Common Gauges across os distributions
NodeConnectivityStatusName = "node_connectivity_status"
Expand Down Expand Up @@ -57,7 +60,10 @@ func IsAdvancedMetric(name string) bool {
DNSResponseCounterName,
NodeAPIServerLatencyName,
NodeAPIServerTCPHandshakeLatencyName,
NoResponseFromAPIServerName:
NoResponseFromAPIServerName,
ConntrackPacketsCounterName,
ConntrackBytesCounterName,
ConntrackConnectionsCounterName:
return true
default:
return false
Expand Down

0 comments on commit 35fee01

Please sign in to comment.