diff --git a/shinigami/utils.py b/shinigami/utils.py index 9df9b94..eb32505 100755 --- a/shinigami/utils.py +++ b/shinigami/utils.py @@ -59,12 +59,80 @@ def get_nodes(cluster: str, ignore_nodes: Collection[str] = tuple()) -> set: return set(node for node in all_nodes if node not in ignore_nodes) -def build_grep_command(vars: Collection[str], proc: Collection[int]) -> str: +async def get_remote_processes(conn) -> pd.DataFrame: + """Fetch running process data from the remote machine + + Args: + conn: Open SSH connection to the machine + + Returns: + A pandas DataFrame + """ + + # Add 1 to column widths when parsing ps output to account for space between columns + ps_return = await conn.run('ps -eo pid:10,ppid:10,pgid:10,uid:10,cmd:500', check=True) + return pd.read_fwf(StringIO(ps_return.stdout), widths=[11, 11, 11, 11, 500]) + + +def filter_orphaned_processes(df, ppid_column: str = 'PPID'): + """Filter a DataFrame to only include orphaned processes + + Given a DataFrame with system process data, return a subset of the data + containing processes parented by `INIT_PROCESS_ID`. + + Args: + df: DataFrame to filter + ppid_column: Column name containing parent process ID (PPID) values + + Returns: + A filtered copy of the given DataFrame + """ + + return df[df[ppid_column] == INIT_PROCESS_ID] + + +def filter_user_processes(df, uid_whitelist, uid_column: str = 'UID'): + """Filter a DataFrame to only include whitelisted users + + Given a DataFrame with system process data, return a subset of the data + containing processes owned by given user IDs. + + Args: + df: DataFrame to filter + uid_whitelist: List of user IDs to whitelist + uid_column: Column name containing user ID (UID) values + + Returns: + A filtered copy of the given DataFrame + """ + + whitelist_index = df['UID'].apply(id_in_whitelist, whitelist=uid_whitelist) + return df[whitelist_index] + + +def filter_env_defined(df, var_names, pid_column: str = 'PID') -> pd.DataFrame: + """Filter a DataFrame to only include processes with variable definitions + + Given a DataFrame with system process data, return a subset of the data + containing processes where one or more of the given environmental variables are defined. + + Args: + df: DataFrame to filter + var_names: Variable names to check for + pid_column: Column name containing process ID (PID) values + + Returns: + A filtered copy of the given DataFrame + """ # grep -Eq '^DBUS_SESSION_BUS_ADDRESS=|^some_other_var=' /proc/44703/environ /proc/2235/environ; - variable_regex = '|'.join(f'^{variable}=' for variable in vars) - proc_files = ' '.join(f'/proc/{proc_id}/environ' for proc_id in proc) - return f"grep -Eq '{variable_regex}' {proc_files}" + variable_regex = '|'.join(f'^{variable}=' for variable in var_names) + proc_files = ' '.join(f'/proc/{proc_id}/environ' for proc_id in df[pid_column]) + cmd = f"grep -Eq '{variable_regex}' {proc_files}" + + # TODO: Filter dataframe + + return df async def terminate_errant_processes( @@ -88,23 +156,18 @@ async def terminate_errant_processes( async with ssh_limit, asyncssh.connect(node, options=ssh_options) as conn: logging.info(f'[{node}] Scanning for processes') - # Fetch running process data from the remote machine - # Add 1 to column widths when parsing ps output to account for space between columns - ps_return = await conn.run('ps -eo pid:10,ppid:10,pgid:10,uid:10,cmd:500', check=True) - process_df = pd.read_fwf(StringIO(ps_return.stdout), widths=[11, 11, 11, 11, 500]) - - # Identify orphaned processes and filter them by the UID whitelist - orphaned = process_df[process_df.PPID == INIT_PROCESS_ID] - whitelist_index = orphaned['UID'].apply(id_in_whitelist, whitelist=uid_whitelist) - to_terminate = orphaned[whitelist_index] + # Identify orphaned processes and filter them by whitelist criteria + process_df = await get_remote_processes(conn) + process_df = filter_orphaned_processes(process_df, 'PPID') + process_df = filter_user_processes(process_df, uid_whitelist, 'UID') - for _, row in to_terminate.iterrows(): + for _, row in process_df.iterrows(): logging.info(f'[{node}] Marking for termination {dict(row)}') - if to_terminate.empty: + if process_df.empty: logging.info(f'[{node}] no processes found') elif not debug: - proc_id_str = ','.join(to_terminate.PGID.unique().astype(str)) + proc_id_str = ','.join(process_df.PGID.unique().astype(str)) logging.info(f"[{node}] Sending termination signal for process groups {proc_id_str}") await conn.run(f"pkill --signal 9 --pgroup {proc_id_str}", check=True)