Skip to content

Commit

Permalink
source: add THV_RUNNING flag to notify of running state
Browse files Browse the repository at this point in the history
Each module (thread) updates its status to indicate running.
Main thread awaits for all threads to be in a running state
before continuing the initialisation process

Implements feature 5384
(https://redmine.openinfosecfoundation.org/issues/5384)
  • Loading branch information
rmcconnell-r7 authored and victorjulien committed Oct 27, 2022
1 parent 9fb0137 commit 13beba1
Show file tree
Hide file tree
Showing 19 changed files with 200 additions and 57 deletions.
5 changes: 3 additions & 2 deletions src/counters.c
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ static void *StatsMgmtThread(void *arg)
}
SCLogDebug("stats_thread_data %p", &stats_thread_data);

TmThreadsSetFlag(tv_local, THV_INIT_DONE);
TmThreadsSetFlag(tv_local, THV_INIT_DONE | THV_RUNNING);
while (1) {
if (TmThreadsCheckFlag(tv_local, THV_PAUSE)) {
TmThreadsSetFlag(tv_local, THV_PAUSED);
Expand Down Expand Up @@ -480,7 +480,8 @@ static void *StatsWakeupThread(void *arg)
return NULL;
}

TmThreadsSetFlag(tv_local, THV_INIT_DONE);
TmThreadsSetFlag(tv_local, THV_INIT_DONE | THV_RUNNING);

while (1) {
if (TmThreadsCheckFlag(tv_local, THV_PAUSE)) {
TmThreadsSetFlag(tv_local, THV_PAUSED);
Expand Down
4 changes: 4 additions & 0 deletions src/flow-manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,8 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
GetWorkUnitSizing(rows, mp, emerg, &sleep_per_wu, &rows_per_wu, &rows_sec);
StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_sec, rows_sec);

TmThreadsSetFlag(th_v, THV_RUNNING);

while (1)
{
if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
Expand Down Expand Up @@ -1063,6 +1065,8 @@ static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data)
struct timeval ts;
memset(&ts, 0, sizeof(ts));

TmThreadsSetFlag(th_v, THV_RUNNING);

while (1)
{
if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
Expand Down
5 changes: 4 additions & 1 deletion src/source-af-packet.c
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,6 @@ static void AFPPeersListReachedInc(void)
return;

if ((SC_ATOMIC_ADD(peerslist.reached, 1) + 1) == peerslist.turn) {
SCLogInfo("All AFP capture threads are running.");
(void)SC_ATOMIC_SET(peerslist.reached, 0);
/* Set turn to 0 to skip syncrhonization when ReceiveAFPLoop is
* restarted.
Expand Down Expand Up @@ -1339,6 +1338,10 @@ TmEcode ReceiveAFPLoop(ThreadVars *tv, void *data, void *slot)
fds.fd = ptv->socket;
fds.events = POLLIN;

// Indicate that the thread is actually running its application level code (i.e., it can poll
// packets)
TmThreadsSetFlag(tv, THV_RUNNING);

while (1) {
/* Start by checking the state of our interface */
if (unlikely(ptv->afp_state == AFP_STATE_DOWN)) {
Expand Down
4 changes: 4 additions & 0 deletions src/source-erf-dag.c
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,10 @@ ReceiveErfDagLoop(ThreadVars *tv, void *data, void *slot)

dtv->slot = s->slot_next;

// Indicate that the thread is actually running its application level code (i.e., it can poll
// packets)
TmThreadsSetFlag(tv, THV_RUNNING);

while (1) {
if (suricata_ctl_flags & SURICATA_STOP) {
SCReturnInt(TM_ECODE_OK);
Expand Down
4 changes: 4 additions & 0 deletions src/source-erf-file.c
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ TmEcode ReceiveErfFileLoop(ThreadVars *tv, void *data, void *slot)

etv->slot = ((TmSlot *)slot)->slot_next;

// Indicate that the thread is actually running its application level code (i.e., it can poll
// packets)
TmThreadsSetFlag(tv, THV_RUNNING);

while (1) {
if (suricata_ctl_flags & SURICATA_STOP) {
SCReturnInt(TM_ECODE_OK);
Expand Down
5 changes: 5 additions & 0 deletions src/source-ipfw.c
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,11 @@ TmEcode ReceiveIPFWLoop(ThreadVars *tv, void *data, void *slot)

SCLogInfo("Thread '%s' will run on port %d (item %d)",
tv->name, nq->port_num, ptv->ipfw_index);

// Indicate that the thread is actually running its application level code (i.e., it can poll
// packets)
TmThreadsSetFlag(tv, THV_RUNNING);

while (1) {
if (unlikely(suricata_ctl_flags != 0)) {
SCReturnInt(TM_ECODE_OK);
Expand Down
4 changes: 4 additions & 0 deletions src/source-napatech.c
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,10 @@ TmEcode NapatechPacketLoop(ThreadVars *tv, void *data, void *slot)
TmSlot *s = (TmSlot *) slot;
ntv->slot = s->slot_next;

// Indicate that the thread is actually running its application level code (i.e., it can poll
// packets)
TmThreadsSetFlag(tv, THV_RUNNING);

while (!(suricata_ctl_flags & SURICATA_STOP)) {
/* make sure we have at least one packet in the packet pool, to prevent
* us from alloc'ing packets at line rate */
Expand Down
5 changes: 5 additions & 0 deletions src/source-netmap.c
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,11 @@ static TmEcode ReceiveNetmapLoop(ThreadVars *tv, void *data, void *slot)
fds.events = POLLIN;

SCLogDebug("thread %s polling on %d", tv->name, fds.fd);

// Indicate that the thread is actually running its application level code (i.e., it can poll
// packets)
TmThreadsSetFlag(tv, THV_RUNNING);

for(;;) {
if (unlikely(suricata_ctl_flags != 0)) {
break;
Expand Down
4 changes: 4 additions & 0 deletions src/source-nflog.c
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,10 @@ TmEcode ReceiveNFLOGLoop(ThreadVars *tv, void *data, void *slot)
SCReturnInt(TM_ECODE_FAILED);
}

// Indicate that the thread is actually running its application level code (i.e., it can poll
// packets)
TmThreadsSetFlag(tv, THV_RUNNING);

while (1) {
if (suricata_ctl_flags != 0)
break;
Expand Down
4 changes: 4 additions & 0 deletions src/source-nfq.c
Original file line number Diff line number Diff line change
Expand Up @@ -1009,6 +1009,10 @@ TmEcode ReceiveNFQLoop(ThreadVars *tv, void *data, void *slot)

ntv->slot = ((TmSlot *) slot)->slot_next;

// Indicate that the thread is actually running its application level code (i.e., it can poll
// packets)
TmThreadsSetFlag(tv, THV_RUNNING);

while(1) {
if (unlikely(suricata_ctl_flags != 0)) {
NFQDestroyQueue(nq);
Expand Down
4 changes: 4 additions & 0 deletions src/source-pcap-file.c
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ TmEcode ReceivePcapFileLoop(ThreadVars *tv, void *data, void *slot)
ptv->shared.slot = s->slot_next;
ptv->shared.cb_result = TM_ECODE_OK;

// Indicate that the thread is actually running its application level code (i.e., it can poll
// packets)
TmThreadsSetFlag(tv, THV_RUNNING);

if(ptv->is_directory == 0) {
SCLogInfo("Starting file run for %s", ptv->behavior.file->filename);
status = PcapFileDispatch(ptv->behavior.file);
Expand Down
4 changes: 4 additions & 0 deletions src/source-pcap.c
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,10 @@ static TmEcode ReceivePcapLoop(ThreadVars *tv, void *data, void *slot)
ptv->slot = s->slot_next;
ptv->cb_result = TM_ECODE_OK;

// Indicate that the thread is actually running its application level code (i.e., it can poll
// packets)
TmThreadsSetFlag(tv, THV_RUNNING);

while (1) {
if (suricata_ctl_flags & SURICATA_STOP) {
SCReturnInt(TM_ECODE_OK);
Expand Down
4 changes: 4 additions & 0 deletions src/source-pfring.c
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,10 @@ TmEcode ReceivePfringLoop(ThreadVars *tv, void *data, void *slot)
SCReturnInt(TM_ECODE_FAILED);
}

// Indicate that the thread is actually running its application level code (i.e., it can poll
// packets)
TmThreadsSetFlag(tv, THV_RUNNING);

while(1) {
if (suricata_ctl_flags & SURICATA_STOP) {
SCReturnInt(TM_ECODE_OK);
Expand Down
4 changes: 4 additions & 0 deletions src/source-windivert.c
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,10 @@ TmEcode ReceiveWinDivertLoop(ThreadVars *tv, void *data, void *slot)
WinDivertThreadVars *wd_tv = (WinDivertThreadVars *)data;
wd_tv->slot = ((TmSlot *)slot)->slot_next;

// Indicate that the thread is actually running its application level code (i.e., it can poll
// packets)
TmThreadsSetFlag(tv, THV_RUNNING);

while (true) {
if (suricata_ctl_flags & SURICATA_STOP) {
SCReturnInt(TM_ECODE_OK);
Expand Down
29 changes: 29 additions & 0 deletions src/suricata.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
#include <signal.h>
#endif

#if HAVE_LIBSYSTEMD
#include <systemd/sd-daemon.h>
#endif

#include "suricata.h"

#include "conf.h"
Expand Down Expand Up @@ -393,6 +397,23 @@ static void GlobalsDestroy(SCInstance *suri)
suri->pid_filename = NULL;
}

/**
* \brief Used to send OS specific notification of running threads
*
* \retval TmEcode TM_ECODE_OK on success; TM_ECODE_FAILED on failure.
*/
static void OnNotifyRunning(void)
{
#if HAVE_LIBSYSTEMD
if (sd_notify(0, "READY=1") < 0) {
SCLogWarning(SC_ERR_SYSCALL, "failed to notify systemd");
/* Please refer to:
* https://www.freedesktop.org/software/systemd/man/sd_notify.html#Return%20Value
* for discussion on why failure should not be considered an error */
}
#endif
}

/** \brief make sure threads can stop the engine by calling this
* function. Purpose: pcap file mode needs to be able to tell the
* engine the file eof is reached. */
Expand Down Expand Up @@ -2888,6 +2909,14 @@ int SuricataMain(int argc, char **argv)
/* Un-pause all the paused threads */
TmThreadContinueThreads();

/* Must ensure all threads are fully operational before continuing with init process */
if (TmThreadWaitOnThreadRunning() != TM_ECODE_OK) {
exit(EXIT_FAILURE);
}

/* Print notice and send OS specific notification of threads in running state */
OnNotifyRunning();

PostRunStartedDetectSetup(&suricata);

SCPledge();
Expand Down
1 change: 1 addition & 0 deletions src/threadvars.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ struct TmSlot_;
* rule reloads even if no packets are read by the capture method. */
#define THV_CAPTURE_INJECT_PKT BIT_U32(11)
#define THV_DEAD BIT_U32(12) /**< thread has been joined with pthread_join() */
#define THV_RUNNING BIT_U32(13) /**< thread is running */

/** \brief Per thread variable structure */
typedef struct ThreadVars_ {
Expand Down
Loading

0 comments on commit 13beba1

Please sign in to comment.