diff --git a/dependencies/nDPIsrvd.h b/dependencies/nDPIsrvd.h index 05f41335c..390dd1416 100644 --- a/dependencies/nDPIsrvd.h +++ b/dependencies/nDPIsrvd.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -53,6 +54,7 @@ enum nDPIsrvd_read_return { READ_OK = CONNECT_LAST_ENUM_VALUE, READ_PEER_DISCONNECT, + READ_TIMEOUT, READ_ERROR, /* check for errno */ READ_LAST_ENUM_VALUE @@ -216,6 +218,7 @@ struct nDPIsrvd_jsmn struct nDPIsrvd_socket { int fd; + struct timeval read_timeout; struct nDPIsrvd_address address; size_t instance_user_data_size; @@ -324,6 +327,7 @@ static inline char const * nDPIsrvd_enum_to_string(int enum_value) "READ_OK", "READ_PEER_DISCONNECT", + "READ_TIMEOUT", "READ_ERROR", "PARSE_OK", @@ -429,6 +433,9 @@ static inline struct nDPIsrvd_socket * nDPIsrvd_socket_init(size_t global_user_d if (sock != NULL) { sock->fd = -1; + sock->read_timeout.tv_sec = 0; + sock->read_timeout.tv_usec = 0; + if (nDPIsrvd_json_buffer_init(&sock->buffer, NETWORK_BUFFER_MAX_SIZE) != 0) { goto error; @@ -460,6 +467,45 @@ static inline struct nDPIsrvd_socket * nDPIsrvd_socket_init(size_t global_user_d return NULL; } +static inline int nDPIsrvd_set_read_timeout(struct nDPIsrvd_socket * const sock, + time_t seconds, + suseconds_t micro_seconds) +{ + struct timeval tv = {.tv_sec = seconds, .tv_usec = micro_seconds}; + + if (sock->fd < 0) + { + return 1; + } + + if (setsockopt(sock->fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) < 0) + { + return 1; + } + + sock->read_timeout = tv; + + return 0; +} + +static inline int nDPIsrvd_set_nonblock(struct nDPIsrvd_socket * const sock) +{ + int flags; + + if (sock->fd < 0) + { + return 1; + } + + flags = fcntl(sock->fd, F_GETFL, 0); + if (flags == -1) + { + return 1; + } + + return (fcntl(sock->fd, F_SETFL, flags | O_NONBLOCK) != 0); +} + static inline void nDPIsrvd_cleanup_flow(struct nDPIsrvd_socket * const sock, struct nDPIsrvd_instance * const instance, struct nDPIsrvd_thread_data * const thread_data, @@ -675,6 +721,10 @@ static inline enum nDPIsrvd_read_return nDPIsrvd_read(struct nDPIsrvd_socket * c } if (bytes_read < 0) { + if (errno == EAGAIN) + { + return READ_TIMEOUT; + } return READ_ERROR; } diff --git a/dependencies/nDPIsrvd.py b/dependencies/nDPIsrvd.py index b587f6ddf..38e181bfc 100644 --- a/dependencies/nDPIsrvd.py +++ b/dependencies/nDPIsrvd.py @@ -339,6 +339,9 @@ def connect(self, addr): self.digitlen = 0 self.lines = [] + def timeout(self, timeout): + self.sock.settimeout(timeout) + def receive(self): if len(self.buffer) == NETWORK_BUFFER_MAX_SIZE: raise BufferCapacityReached(len(self.buffer), NETWORK_BUFFER_MAX_SIZE) @@ -349,6 +352,7 @@ def receive(self): except ConnectionResetError: connection_finished = True recvd = bytes() + if len(recvd) == 0: connection_finished = True diff --git a/examples/c-collectd/c-collectd.c b/examples/c-collectd/c-collectd.c index aef84239a..3abceec83 100644 --- a/examples/c-collectd/c-collectd.c +++ b/examples/c-collectd/c-collectd.c @@ -44,7 +44,6 @@ static struct uint64_t flow_detection_update_count; uint64_t flow_not_detected_count; - uint64_t flow_packet_count; uint64_t flow_total_bytes; uint64_t flow_risky_count; @@ -256,7 +255,7 @@ static void print_collectd_exec_output(void) printf(COLLECTD_PUTVAL_N_FORMAT(flow_new_count) COLLECTD_PUTVAL_N_FORMAT(flow_end_count) COLLECTD_PUTVAL_N_FORMAT(flow_idle_count) COLLECTD_PUTVAL_N_FORMAT(flow_guessed_count) COLLECTD_PUTVAL_N_FORMAT(flow_detected_count) COLLECTD_PUTVAL_N_FORMAT(flow_detection_update_count) - COLLECTD_PUTVAL_N_FORMAT(flow_not_detected_count) COLLECTD_PUTVAL_N_FORMAT(flow_packet_count) + COLLECTD_PUTVAL_N_FORMAT(flow_not_detected_count) COLLECTD_PUTVAL_N_FORMAT(flow_total_bytes) COLLECTD_PUTVAL_N_FORMAT(flow_risky_count), COLLECTD_PUTVAL_N(flow_new_count), @@ -266,7 +265,6 @@ static void print_collectd_exec_output(void) COLLECTD_PUTVAL_N(flow_detected_count), COLLECTD_PUTVAL_N(flow_detection_update_count), COLLECTD_PUTVAL_N(flow_not_detected_count), - COLLECTD_PUTVAL_N(flow_packet_count), COLLECTD_PUTVAL_N(flow_total_bytes), COLLECTD_PUTVAL_N(flow_risky_count)); @@ -428,7 +426,7 @@ static uint64_t get_total_flow_bytes(struct nDPIsrvd_socket * const sock) { nDPIsrvd_ull total_bytes_ull = 0; - if (TOKEN_VALUE_TO_ULL(TOKEN_GET_SZ(sock, "flow_tot_l4_data_len"), &total_bytes_ull) == CONVERSION_OK) + if (TOKEN_VALUE_TO_ULL(TOKEN_GET_SZ(sock, "flow_tot_l4_payload_len"), &total_bytes_ull) == CONVERSION_OK) { return total_bytes_ull; } @@ -670,11 +668,6 @@ static enum nDPIsrvd_callback_return captured_json_callback(struct nDPIsrvd_sock collectd_statistics.flow_not_detected_count++; } - if (TOKEN_GET_SZ(sock, "packet_event_name") != NULL) - { - collectd_statistics.flow_packet_count++; - } - return CALLBACK_OK; } @@ -716,6 +709,13 @@ int main(int argc, char ** argv) return 1; } + if (nDPIsrvd_set_nonblock(sock) != 0) + { + LOG(LOG_DAEMON | LOG_ERR, "nDPIsrvd set nonblock failed: %s", strerror(errno)); + nDPIsrvd_socket_free(&sock); + return 1; + } + signal(SIGINT, sighandler); signal(SIGTERM, sighandler); signal(SIGPIPE, SIG_IGN); diff --git a/examples/c-collectd/plugin_nDPIsrvd_types.db b/examples/c-collectd/plugin_nDPIsrvd_types.db index dc910a3bb..a537f0824 100644 --- a/examples/c-collectd/plugin_nDPIsrvd_types.db +++ b/examples/c-collectd/plugin_nDPIsrvd_types.db @@ -11,7 +11,6 @@ flow_detection_update_count value:GAUGE:0:U flow_not_detected_count value:GAUGE:0:U # flow additional counters -flow_packet_count value:GAUGE:0:U flow_total_bytes value:GAUGE:0:U flow_risky_count value:GAUGE:0:U diff --git a/examples/c-simple/c-simple.c b/examples/c-simple/c-simple.c index 6e7d0f66d..8d723fe20 100644 --- a/examples/c-simple/c-simple.c +++ b/examples/c-simple/c-simple.c @@ -159,8 +159,7 @@ static void simple_flow_cleanup_callback(struct nDPIsrvd_socket * const sock, if (reason == CLEANUP_REASON_FLOW_TIMEOUT) { - fprintf(stderr, "Flow timeout occurred, something really bad happened.\n"); - exit(1); + fprintf(stderr, "Flow %llu timeouted.\n", flow->id_as_ull); } } @@ -188,9 +187,27 @@ int main(int argc, char ** argv) return 1; } + if (nDPIsrvd_set_read_timeout(sock, 3, 0) != 0) + { + return 1; + } + enum nDPIsrvd_read_return read_ret; - while (main_thread_shutdown == 0 && (read_ret = nDPIsrvd_read(sock)) == READ_OK) + while (main_thread_shutdown == 0) { + read_ret = nDPIsrvd_read(sock); + if (read_ret == READ_TIMEOUT) + { + printf("No data received during the last %llu second(s).\n", + (long long unsigned int)sock->read_timeout.tv_sec); + continue; + } + if (read_ret != READ_OK) + { + main_thread_shutdown = 1; + continue; + } + enum nDPIsrvd_parse_return parse_ret = nDPIsrvd_parse_all(sock); if (parse_ret != PARSE_NEED_MORE_DATA) { diff --git a/examples/py-flow-info/flow-info.py b/examples/py-flow-info/flow-info.py index 0dbc5e63c..f11750351 100755 --- a/examples/py-flow-info/flow-info.py +++ b/examples/py-flow-info/flow-info.py @@ -426,8 +426,16 @@ def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data): nsock = nDPIsrvdSocket() nsock.connect(address) + nsock.timeout(1.0) stats = Stats(nsock) - try: - nsock.loop(onJsonLineRecvd, onFlowCleanup, stats) - except KeyboardInterrupt: - print('\n\nKeyboard Interrupt: cleaned up {} flows.'.format(len(nsock.shutdown()))) + + while True: + try: + nsock.loop(onJsonLineRecvd, onFlowCleanup, stats) + except KeyboardInterrupt: + print('\n\nKeyboard Interrupt: cleaned up {} flows.'.format(len(nsock.shutdown()))) + break + except TimeoutError: + stats.updateSpinner() + stats.resetStatus() + stats.printStatus() diff --git a/nDPId-test.c b/nDPId-test.c index 03d69c237..cd79345ea 100644 --- a/nDPId-test.c +++ b/nDPId-test.c @@ -626,6 +626,7 @@ static void * distributor_client_mainloop_thread(void * const arg) case READ_ERROR: logger(1, "Read and verify fd returned an error: %s", strerror(errno)); THREAD_ERROR_GOTO(trv); + case READ_TIMEOUT: case READ_PEER_DISCONNECT: del_event(dis_epollfd, mock_testfds[PIPE_TEST_READ]); pipe_read_finished = 1;