Skip to content

Commit

Permalink
Outline logic for filtering valid slurm users
Browse files Browse the repository at this point in the history
  • Loading branch information
djperrefort committed Dec 6, 2023
1 parent b09eabf commit e8833ae
Showing 1 changed file with 22 additions and 23 deletions.
45 changes: 22 additions & 23 deletions shinigami/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,29 @@ async def get_remote_processes(conn: asyncssh.SSHClientConnection) -> pd.DataFra
conn: Open SSH connection to the machine
Returns:
A pandas DataFrame
A pandas DataFrame with process data
"""

# Add 1 to column widths when parsing ps output to account for space between columns
ps_return = await conn.run('ps -eo pid:10,ppid:10,pgid:10,uid:10,cmd:500', check=True)
return pd.read_fwf(StringIO(ps_return.stdout), widths=[11, 11, 11, 11, 500])


async def get_slurm_job_users(conn: asyncssh.SSHClientConnection) -> pd.Series:
"""Fetch running process data from the remote machine
Args:
conn: Open SSH connection to the machine
Returns:
A pandas Series with user IDs
"""

# Todo: Optimize pd command and parse command output into series
ps_return = await conn.run('ps -efH | grep slurmd | grep job', check=True)
return pd.Series([])


def filter_orphaned_processes(df: pd.DataFrame, ppid_column: str = 'PPID') -> pd.DataFrame:
"""Filter a DataFrame to only include orphaned processes
Expand Down Expand Up @@ -112,28 +127,10 @@ def filter_user_processes(df: pd.DataFrame, uid_whitelist: Whitelist, uid_column
return df[whitelist_index]


def filter_env_defined(df: pd.DataFrame, var_names: list[str], pid_column: str = 'PID') -> pd.DataFrame:
"""Filter a DataFrame to only include processes with variable definitions
Given a DataFrame with system process data, return a subset of the data
containing processes where one or more of the given environmental variables are defined.
Args:
df: DataFrame to filter
var_names: Variable names to check for
pid_column: Column name containing process ID (PID) values
Returns:
A filtered copy of the given DataFrame
"""

# grep -Eq '^DBUS_SESSION_BUS_ADDRESS=|^some_other_var=' /proc/44703/environ /proc/2235/environ;
variable_regex = '|'.join(f'^{variable}=' for variable in var_names)
proc_files = ' '.join(f'/proc/{proc_id}/environ' for proc_id in df[pid_column])
cmd = f"grep -Eq '{variable_regex}' {proc_files}"

# TODO: Filter dataframe
def filter_valid_slurm_users(df: pd.DataFrame, uid_blacklist: pd.Series, uid_column: str = 'UID') -> pd.DataFrame:
"""Filter a DataFrame to exclude a list of user IDs"""

# Todo
return df


Expand All @@ -157,11 +154,13 @@ async def terminate_errant_processes(
logging.debug(f'[{node}] Waiting for SSH pool')
async with ssh_limit, asyncssh.connect(node, options=ssh_options) as conn:
logging.info(f'[{node}] Scanning for processes')
process_df = await get_remote_processes(conn)
slurm_pids = await get_slurm_job_users(conn)

# Identify orphaned processes and filter them by whitelist criteria
process_df = await get_remote_processes(conn)
process_df = filter_orphaned_processes(process_df, 'PPID')
process_df = filter_user_processes(process_df, uid_whitelist, 'UID')
process_df = filter_valid_slurm_users(process_df, slurm_pids, 'UID')

for _, row in process_df.iterrows():
logging.info(f'[{node}] Marking for termination {dict(row)}')
Expand Down

0 comments on commit e8833ae

Please sign in to comment.