Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fm backport/v1 #12254

Open
wants to merge 2 commits into
base: main-7.0.x
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 51 additions & 7 deletions src/flow-manager.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* Copyright (C) 2007-2023 Open Information Security Foundation
/* Copyright (C) 2007-2024 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
Expand Down Expand Up @@ -259,10 +259,23 @@ static inline int FlowBypassedTimeout(Flow *f, SCTime_t ts, FlowTimeoutCounters

typedef struct FlowManagerTimeoutThread {
/* used to temporarily store flows that have timed out and are
* removed from the hash */
* removed from the hash to reduce locking contention */
FlowQueuePrivate aside_queue;
} FlowManagerTimeoutThread;

/**
* \internal
*
* \brief Process the temporary Aside Queue
* This means that as long as a flow f is not waiting on detection
* engine to finish dealing with it, f will be put in the recycle
* queue for further processing later on.
*
* \param td FM Timeout Thread instance
* \param counters Flow Timeout counters to be updated
*
* \retval Number of flows that were recycled
*/
static uint32_t ProcessAsideQueue(FlowManagerTimeoutThread *td, FlowTimeoutCounters *counters)
{
FlowQueuePrivate recycle = { NULL, NULL, 0 };
Expand All @@ -279,7 +292,7 @@ static uint32_t ProcessAsideQueue(FlowManagerTimeoutThread *td, FlowTimeoutCount
/* Send the flow to its thread */
FlowForceReassemblyForFlow(f);
FLOWLOCK_UNLOCK(f);
/* flow ownership is passed to the worker thread */
/* flow ownership is already passed to the worker thread */

counters->flows_aside_needs_work++;
continue;
Expand Down Expand Up @@ -364,6 +377,16 @@ static void FlowManagerHashRowTimeout(FlowManagerTimeoutThread *td, Flow *f, SCT
counters->rows_maxlen = checked;
}

/**
* \internal
*
* \brief Clear evicted list from Flow Manager.
* All the evicted flows are removed from the Flow bucket and added
* to the temporary Aside Queue.
*
* \param td FM timeout thread instance
* \param f head of the evicted list
*/
static void FlowManagerHashRowClearEvictedList(
FlowManagerTimeoutThread *td, Flow *f, SCTime_t ts, FlowTimeoutCounters *counters)
{
Expand Down Expand Up @@ -440,6 +463,7 @@ static uint32_t FlowTimeoutHash(FlowManagerTimeoutThread *td, SCTime_t ts, const
SC_ATOMIC_SET(fb->next_ts, next_ts);
}
if (fb->evicted == NULL && fb->head == NULL) {
/* row is empty */
SC_ATOMIC_SET(fb->next_ts, UINT_MAX);
}
} else {
Expand Down Expand Up @@ -473,8 +497,19 @@ static uint32_t FlowTimeoutHash(FlowManagerTimeoutThread *td, SCTime_t ts, const
}

/** \internal
*
* \brief handle timeout for a slice of hash rows
* If we wrap around we call FlowTimeoutHash twice */
* If we wrap around we call FlowTimeoutHash twice
* \param td FM timeout thread
* \param ts timeout in seconds
* \param hash_min lower bound of the row slice
* \param hash_max upper bound of the row slice
* \param counters Flow timeout counters to be passed
* \param rows number of rows for this worker unit
* \param pos absolute position of the beginning of row slice in the hash table
*
* \retval number of successfully timed out flows
*/
static uint32_t FlowTimeoutHashInChunks(FlowManagerTimeoutThread *td, SCTime_t ts,
const uint32_t hash_min, const uint32_t hash_max, FlowTimeoutCounters *counters,
const uint32_t rows, uint32_t *pos)
Expand All @@ -485,7 +520,7 @@ static uint32_t FlowTimeoutHashInChunks(FlowManagerTimeoutThread *td, SCTime_t t
uint32_t rows_left = rows;

again:
start = hash_min + (*pos);
start = (*pos);
if (start >= hash_max) {
start = hash_min;
}
Expand All @@ -509,8 +544,10 @@ static uint32_t FlowTimeoutHashInChunks(FlowManagerTimeoutThread *td, SCTime_t t
* \brief move all flows out of a hash row
*
* \param f last flow in the hash row
* \param recycle_q Flow recycle queue
* \param mode emergency or not
*
* \retval cnt removed out flows
* \retval cnt number of flows removed from the hash and added to the recycle queue
*/
static uint32_t FlowManagerHashRowCleanup(Flow *f, FlowQueuePrivate *recycle_q, const int mode)
{
Expand Down Expand Up @@ -716,6 +753,13 @@ static TmEcode FlowManagerThreadDeinit(ThreadVars *t, void *data)
* a rapid increase of the busy score, which could lead to the flow manager
* suddenly scanning a much larger slice of the hash leading to a burst
* in scan/eviction work.
*
* \param rows number of rows for the work unit
* \param mp current memcap pressure value
* \param emergency emergency mode is set or not
* \param wu_sleep holds value of sleep time per worker unit
* \param wu_rows holds value of calculated rows to be processed per second
* \param rows_sec same as wu_rows, only used for counter updates
*/
static void GetWorkUnitSizing(const uint32_t rows, const uint32_t mp, const bool emergency,
uint64_t *wu_sleep, uint32_t *wu_rows, uint32_t *rows_sec)
Expand Down Expand Up @@ -756,7 +800,7 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)

uint32_t emerg_over_cnt = 0;
uint64_t next_run_ms = 0;
uint32_t pos = 0;
uint32_t pos = ftd->min;
uint32_t rows_sec = 0;
uint32_t rows_per_wu = 0;
uint64_t sleep_per_wu = 0;
Expand Down
Loading