Skip to content

Commit

Permalink
Sync of sFlow plugin with Advanced edition
Browse files Browse the repository at this point in the history
  • Loading branch information
pavel-odintsov committed Jul 17, 2024
1 parent 4a9fc72 commit 7be16b0
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 74 deletions.
1 change: 1 addition & 0 deletions src/fastnetmon_configuration_scheme.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class fastnetmon_configuration_t {
std::vector<unsigned int> sflow_ports{};
std::string sflow_host{ "0.0.0.0" };
bool sflow_read_packet_length_from_ip_header{ false };
bool sflow_extract_tunnel_traffic{ false };

// Netflow / IPFIX
bool netflow{ false };
Expand Down
2 changes: 1 addition & 1 deletion src/fixed_size_packet_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
class fixed_size_packet_storage_t {
public:
fixed_size_packet_storage_t() = default;
fixed_size_packet_storage_t(void* payload_pointer, unsigned int captured_length, unsigned int real_packet_length) {
fixed_size_packet_storage_t(const void* payload_pointer, unsigned int captured_length, unsigned int real_packet_length) {
// TODO: performance killer! Check it!
bool we_do_timestamps = true;

Expand Down
135 changes: 63 additions & 72 deletions src/sflow_plugin/sflow_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ void start_sflow_collection(process_packet_pointer func_ptr) {
boost::thread_group sflow_collector_threads;

logger << log4cpp::Priority::DEBUG << plugin_log_prefix << "We will listen on "
<< fastnetmon_global_configuration.sflow_ports.size() << " ports";
<< fastnetmon_global_configuration.sflow_ports.size() << " ports";

for (auto sflow_port : fastnetmon_global_configuration.sflow_ports) {
sflow_collector_threads.add_thread(
Expand Down Expand Up @@ -231,7 +231,7 @@ void start_sflow_collector(const std::string& sflow_host, unsigned int sflow_por

if (bind_result != 0) {
logger << log4cpp::Priority::ERROR << plugin_log_prefix << "cannot bind on " << sflow_port << ":" << sflow_host
<< " with errno: " << errno << " error: " << strerror(errno);
<< " with errno: " << errno << " error: " << strerror(errno);
return;
}

Expand Down Expand Up @@ -380,13 +380,11 @@ bool process_sflow_flow_sample(const uint8_t* data_pointer,

if (sflow_raw_protocol_header.header_protocol == SFLOW_HEADER_PROTOCOL_ETHERNET) {

bool unpack_gre = false;

// We could enable this new parser for testing purpose
auto result = parse_raw_packet_to_simple_packet_full_ng(header_payload_pointer,
sflow_raw_protocol_header.frame_length_before_sampling,
sflow_raw_protocol_header.header_size, packet,
unpack_gre, fastnetmon_global_configuration.sflow_read_packet_length_from_ip_header);
auto result =
parse_raw_packet_to_simple_packet_full_ng(header_payload_pointer, sflow_raw_protocol_header.frame_length_before_sampling,
sflow_raw_protocol_header.header_size, packet,
fastnetmon_global_configuration.sflow_extract_tunnel_traffic,
fastnetmon_global_configuration.sflow_read_packet_length_from_ip_header);

if (result != network_data_stuctures::parser_code_t::success) {
sflow_parse_error_nested_header++;
Expand All @@ -401,10 +399,11 @@ bool process_sflow_flow_sample(const uint8_t* data_pointer,
sflow_ipv4_header_protocol++;

// We parse this packet using special version of our parser which looks only on IPv4 packet
auto result = parse_raw_ipv4_packet_to_simple_packet_full_ng(header_payload_pointer,
sflow_raw_protocol_header.frame_length_before_sampling,
sflow_raw_protocol_header.header_size, packet,
fastnetmon_global_configuration.sflow_read_packet_length_from_ip_header);
auto result =
parse_raw_ipv4_packet_to_simple_packet_full_ng(header_payload_pointer,
sflow_raw_protocol_header.frame_length_before_sampling,
sflow_raw_protocol_header.header_size, packet,
fastnetmon_global_configuration.sflow_read_packet_length_from_ip_header);

if (result != network_data_stuctures::parser_code_t::success) {
sflow_parse_error_nested_header++;
Expand All @@ -427,10 +426,13 @@ bool process_sflow_flow_sample(const uint8_t* data_pointer,
return false;
}

// That's actually WEIRD as we touch these fields in packet parser logic too.
// So basically we do not use logic from parsers above and override values after parser finishes work

// Pass pointer to raw header to FastNetMon processing functions
packet.payload_pointer = header_payload_pointer;
packet.payload_full_length = sflow_raw_protocol_header.frame_length_before_sampling;
packet.captured_payload_length = sflow_raw_protocol_header.header_size;
packet.payload_pointer = header_payload_pointer;
packet.payload_full_length = sflow_raw_protocol_header.frame_length_before_sampling;
packet.captured_payload_length = sflow_raw_protocol_header.header_size;

packet.sample_ratio = sflow_sample_header_unified_accessor.sampling_rate;

Expand Down Expand Up @@ -470,7 +472,7 @@ bool process_sflow_flow_sample(const uint8_t* data_pointer,
}

// We're ready to parse it
const sflow_extended_gateway_information_t* gateway_details = (sflow_extended_gateway_information_t*)payload_ptr;
const sflow_extended_gateway_information_t* gateway_details = (const sflow_extended_gateway_information_t*)payload_ptr;

packet.src_asn = fast_ntoh(gateway_details->router_asn);
packet.dst_asn = fast_ntoh(gateway_details->source_asn);
Expand Down Expand Up @@ -501,14 +503,14 @@ void parse_sflow_v5_packet(const uint8_t* payload_ptr, unsigned int payload_leng
bool read_sflow_header_result = read_sflow_header(payload_ptr, payload_length, sflow_header_accessor);

if (!read_sflow_header_result) {
logger << log4cpp::Priority::ERROR << plugin_log_prefix << "could not read sflow packet header correctly";
logger << log4cpp::Priority::ERROR << plugin_log_prefix << "could not read sFlow packet header correctly";
sflow_bad_packets++;
return;
}

if (sflow_header_accessor.get_datagram_samples_count() <= 0) {
logger << log4cpp::Priority::ERROR << plugin_log_prefix
<< "Strange number of sFLOW samples: " << sflow_header_accessor.get_datagram_samples_count();
<< "Strange number of sFlow samples: " << sflow_header_accessor.get_datagram_samples_count();
sflow_bad_packets++;
return;
}
Expand Down Expand Up @@ -539,10 +541,10 @@ void parse_sflow_v5_packet(const uint8_t* payload_ptr, unsigned int payload_leng
// << " enterprise " << enterprise
// << " and length " << sample_length << std::endl;

int32_t enterprise = std::get<0>(sample);
int32_t integer_format = std::get<1>(sample);
uint8_t* data_pointer = std::get<2>(sample);
size_t data_length = std::get<3>(sample);
int32_t enterprise = std::get<0>(sample);
int32_t integer_format = std::get<1>(sample);
const uint8_t* data_pointer = std::get<2>(sample);
size_t data_length = std::get<3>(sample);

if (enterprise != 0) {
// We do not support vendor specific additions
Expand All @@ -552,36 +554,33 @@ void parse_sflow_v5_packet(const uint8_t* payload_ptr, unsigned int payload_leng
sflow_sample_type_t sample_format = sflow_sample_type_from_integer(integer_format);

if (sample_format == sflow_sample_type_t::BROKEN_TYPE) {
logger << log4cpp::Priority::ERROR << plugin_log_prefix << "we got broken format type number: " << integer_format;
logger << log4cpp::Priority::ERROR << plugin_log_prefix << "we got broken format type number: " << integer_format;
continue;
}

// Move this code to separate function!!!
if (sample_format == sflow_sample_type_t::FLOW_SAMPLE) {
// std::cout << "We got flow sample" << std::endl;
// logger << log4cpp::Priority::DEBUG << plugin_log_prefix << "We got flow sample";
process_sflow_flow_sample(data_pointer, data_length, false, sflow_header_accessor, client_ipv4_address);
sflow_flow_samples++;
} else if (sample_format == sflow_sample_type_t::COUNTER_SAMPLE) {
// std::cout << "We got counter sample" << std::endl;
// TODO: add support for sflow counetrs
// process_sflow_counter_sample(data_pointer, data_length, false, sflow_header_accessor);
// logger << log4cpp::Priority::DEBUG << plugin_log_prefix << "We got counter sample";
process_sflow_counter_sample(data_pointer, data_length, false, sflow_header_accessor);
sflow_counter_sample++;
} else if (sample_format == sflow_sample_type_t::EXPANDED_FLOW_SAMPLE) {
// std::cout << "We got expanded flow sample" << std::endl;
// logger << log4cpp::Priority::DEBUG << plugin_log_prefix << "We got expanded flow sample";
process_sflow_flow_sample(data_pointer, data_length, true, sflow_header_accessor, client_ipv4_address);
sflow_flow_samples++;
} else if (sample_format == sflow_sample_type_t::EXPANDED_COUNTER_SAMPLE) {
// TODO:add support for sflow counetrs
// std::cout << "We got expanded counter sample" << std::endl;
////process_sflow_counter_sample(data_pointer, data_length, true, sflow_header_accessor);
sflow_counter_sample++;
// logger << log4cpp::Priority::DEBUG << plugin_log_prefix << "We got expanded counter sample";
// process_sflow_counter_sample(data_pointer, data_length, true, sflow_header_accessor);
sflow_expanded_counter_sample++;
} else {
logger << log4cpp::Priority::ERROR << plugin_log_prefix << "we got broken format type: " << integer_format;
}
}
}

bool process_sflow_counter_sample(uint8_t* data_pointer,
bool process_sflow_counter_sample(const uint8_t* data_pointer,
size_t data_length,
bool expanded,
const sflow_packet_header_unified_accessor& sflow_header_accessor) {
Expand All @@ -591,7 +590,7 @@ bool process_sflow_counter_sample(uint8_t* data_pointer,
read_sflow_counter_header(data_pointer, data_length, expanded, sflow_counter_header_unified_accessor);

if (!read_sflow_counter_header_result) {
logger << log4cpp::Priority::ERROR << plugin_log_prefix << "could not read sflow counter header";
logger << log4cpp::Priority::ERROR << plugin_log_prefix << "could not read sFlow counter header";
return false;
}

Expand All @@ -614,53 +613,45 @@ bool process_sflow_counter_sample(uint8_t* data_pointer,
return false;
}

for (auto counter_record : counter_record_sample_vector) {
uint32_t enterprise = 0;
uint32_t format = 0;
ssize_t length = 0;
uint8_t* data_pointer = nullptr;

std::tie(enterprise, format, length, data_pointer) = counter_record;
for (const auto& counter_record : counter_record_sample_vector) {
if (counter_record.enterprise != 0) {
logger << log4cpp::Priority::ERROR << plugin_log_prefix << "we do not support vendor specific enterprise numbers";
continue;
}

if (enterprise == 0) {
sample_counter_types_t sample_type = sample_counter_types_t::BROKEN_COUNTER;
;
sample_counter_types_t sample_type = sample_counter_types_t::BROKEN_COUNTER;

if (format == 1) {
sample_type = sample_counter_types_t::GENERIC_INTERFACE_COUNTERS;
} else if (format == 2) {
sample_type = sample_counter_types_t::ETHERNET_INTERFACE_COUNTERS;
}
if (counter_record.format == 1) {
sample_type = sample_counter_types_t::GENERIC_INTERFACE_COUNTERS;
} else if (counter_record.format == 2) {
sample_type = sample_counter_types_t::ETHERNET_INTERFACE_COUNTERS;
}

if (sample_type == sample_counter_types_t::ETHERNET_INTERFACE_COUNTERS) {
// std::cout << "ETHERNET_INTERFACE_COUNTERS" << std::endl;
if (sample_type == sample_counter_types_t::GENERIC_INTERFACE_COUNTERS) {
// logger << log4cpp::Priority::DEBUG << plugin_log_prefix << "GENERIC_INTERFACE_COUNTERS";

if (sizeof(ethernet_sflow_interface_counters_t) != length) {
logger << log4cpp::Priority::ERROR << plugin_log_prefix << "we haven't enough data for ethernet counter packet";
return false;
}

ethernet_sflow_interface_counters_t ethernet_counters(data_pointer);
// std::cout << ethernet_counters.print() << std::endl;
if (sizeof(generic_sflow_interface_counters_t) != counter_record.length) {
logger << log4cpp::Priority::ERROR << plugin_log_prefix
<< "length mismatch for generic interface counter packet: " << counter_record.length;
continue;
}

if (sample_type == sample_counter_types_t::GENERIC_INTERFACE_COUNTERS) {
// std::cout << "GENERIC_INTERFACE_COUNTERS" << std::endl;
sflow_generic_interface_counter_sample++;

if (sizeof(generic_sflow_interface_counters_t) != length) {
logger << log4cpp::Priority::ERROR << plugin_log_prefix << "we haven't enough data for generic packet";
return false;
}
const generic_sflow_interface_counters_t* generic_sflow_interface_counters =
(const generic_sflow_interface_counters_t*)(counter_record.pointer);

generic_sflow_interface_counters_t generic_sflow_interface_counters(data_pointer);
// std::cout << generic_sflow_interface_counters.print() << std::endl;
if (logger.getPriority() == log4cpp::Priority::DEBUG) {
logger << log4cpp::Priority::DEBUG << generic_sflow_interface_counters->print();
}
} else {
logger << log4cpp::Priority::ERROR << plugin_log_prefix << "we do not support vendor specific enterprise numbers";
}

// std::cout << "Counter record" << std::endl;
// TODO: we need to get relevant fields in special intermediate structure and then pass them for processing
// somwhere else I think we need one dedicated thread to implement such metrics pusshes to Clickhouse
} else if (sample_type == sample_counter_types_t::ETHERNET_INTERFACE_COUNTERS) {
// These counters provide too detailed information about interface and we do not need it on such level for DDoS detection purposes
}
}

return true;
}

2 changes: 1 addition & 1 deletion src/sflow_plugin/sflow_collector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ void init_sflow_module();
void deinit_sflow_module();

// New code for v5 only
void parse_sflow_v5_packet(uint8_t* payload_ptr, unsigned int payload_length, uint32_t client_ipv4_address);
void parse_sflow_v5_packet(const uint8_t* payload_ptr, unsigned int payload_length, uint32_t client_ipv4_address);
std::vector<system_counter_t> get_sflow_stats();

0 comments on commit 7be16b0

Please sign in to comment.