Skip to content

Commit

Permalink
Breaks filtering logic into seperate functions
Browse files Browse the repository at this point in the history
  • Loading branch information
djperrefort committed Dec 5, 2023
1 parent 8f6650c commit 5374192
Showing 1 changed file with 79 additions and 16 deletions.
95 changes: 79 additions & 16 deletions shinigami/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,80 @@ def get_nodes(cluster: str, ignore_nodes: Collection[str] = tuple()) -> set:
return set(node for node in all_nodes if node not in ignore_nodes)


def build_grep_command(vars: Collection[str], proc: Collection[int]) -> str:
async def get_remote_processes(conn) -> pd.DataFrame:
"""Fetch running process data from the remote machine
Args:
conn: Open SSH connection to the machine
Returns:
A pandas DataFrame
"""

# Add 1 to column widths when parsing ps output to account for space between columns
ps_return = await conn.run('ps -eo pid:10,ppid:10,pgid:10,uid:10,cmd:500', check=True)
return pd.read_fwf(StringIO(ps_return.stdout), widths=[11, 11, 11, 11, 500])


def filter_orphaned_processes(df, ppid_column: str = 'PPID'):
"""Filter a DataFrame to only include orphaned processes
Given a DataFrame with system process data, return a subset of the data
containing processes parented by `INIT_PROCESS_ID`.
Args:
df: DataFrame to filter
ppid_column: Column name containing parent process ID (PPID) values
Returns:
A filtered copy of the given DataFrame
"""

return df[df[ppid_column] == INIT_PROCESS_ID]


def filter_user_processes(df, uid_whitelist, uid_column: str = 'UID'):
"""Filter a DataFrame to only include whitelisted users
Given a DataFrame with system process data, return a subset of the data
containing processes owned by given user IDs.
Args:
df: DataFrame to filter
uid_whitelist: List of user IDs to whitelist
uid_column: Column name containing user ID (UID) values
Returns:
A filtered copy of the given DataFrame
"""

whitelist_index = df['UID'].apply(id_in_whitelist, whitelist=uid_whitelist)
return df[whitelist_index]


def filter_env_defined(df, var_names, pid_column: str = 'PID') -> pd.DataFrame:
"""Filter a DataFrame to only include processes with variable definitions
Given a DataFrame with system process data, return a subset of the data
containing processes where one or more of the given environmental variables are defined.
Args:
df: DataFrame to filter
var_names: Variable names to check for
pid_column: Column name containing process ID (PID) values
Returns:
A filtered copy of the given DataFrame
"""

# grep -Eq '^DBUS_SESSION_BUS_ADDRESS=|^some_other_var=' /proc/44703/environ /proc/2235/environ;
variable_regex = '|'.join(f'^{variable}=' for variable in vars)
proc_files = ' '.join(f'/proc/{proc_id}/environ' for proc_id in proc)
return f"grep -Eq '{variable_regex}' {proc_files}"
variable_regex = '|'.join(f'^{variable}=' for variable in var_names)
proc_files = ' '.join(f'/proc/{proc_id}/environ' for proc_id in df[pid_column])
cmd = f"grep -Eq '{variable_regex}' {proc_files}"

# TODO: Filter dataframe

return df


async def terminate_errant_processes(
Expand All @@ -88,23 +156,18 @@ async def terminate_errant_processes(
async with ssh_limit, asyncssh.connect(node, options=ssh_options) as conn:
logging.info(f'[{node}] Scanning for processes')

# Fetch running process data from the remote machine
# Add 1 to column widths when parsing ps output to account for space between columns
ps_return = await conn.run('ps -eo pid:10,ppid:10,pgid:10,uid:10,cmd:500', check=True)
process_df = pd.read_fwf(StringIO(ps_return.stdout), widths=[11, 11, 11, 11, 500])

# Identify orphaned processes and filter them by the UID whitelist
orphaned = process_df[process_df.PPID == INIT_PROCESS_ID]
whitelist_index = orphaned['UID'].apply(id_in_whitelist, whitelist=uid_whitelist)
to_terminate = orphaned[whitelist_index]
# Identify orphaned processes and filter them by whitelist criteria
process_df = await get_remote_processes(conn)
process_df = filter_orphaned_processes(process_df, 'PPID')
process_df = filter_user_processes(process_df, uid_whitelist, 'UID')

for _, row in to_terminate.iterrows():
for _, row in process_df.iterrows():
logging.info(f'[{node}] Marking for termination {dict(row)}')

if to_terminate.empty:
if process_df.empty:
logging.info(f'[{node}] no processes found')

elif not debug:
proc_id_str = ','.join(to_terminate.PGID.unique().astype(str))
proc_id_str = ','.join(process_df.PGID.unique().astype(str))
logging.info(f"[{node}] Sending termination signal for process groups {proc_id_str}")
await conn.run(f"pkill --signal 9 --pgroup {proc_id_str}", check=True)

0 comments on commit 5374192

Please sign in to comment.