Skip to content

Commit

Permalink
Fills in missing filter logic
Browse files Browse the repository at this point in the history
  • Loading branch information
djperrefort committed Dec 7, 2023
1 parent e8833ae commit 99e2740
Showing 1 changed file with 21 additions and 31 deletions.
52 changes: 21 additions & 31 deletions shinigami/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,62 +76,53 @@ async def get_remote_processes(conn: asyncssh.SSHClientConnection) -> pd.DataFra
return pd.read_fwf(StringIO(ps_return.stdout), widths=[11, 11, 11, 11, 500])


async def get_slurm_job_users(conn: asyncssh.SSHClientConnection) -> pd.Series:
"""Fetch running process data from the remote machine
Args:
conn: Open SSH connection to the machine
Returns:
A pandas Series with user IDs
"""

# Todo: Optimize pd command and parse command output into series
ps_return = await conn.run('ps -efH | grep slurmd | grep job', check=True)
return pd.Series([])


def filter_orphaned_processes(df: pd.DataFrame, ppid_column: str = 'PPID') -> pd.DataFrame:
def filter_orphaned_processes(df: pd.DataFrame) -> pd.DataFrame:
"""Filter a DataFrame to only include orphaned processes
Given a DataFrame with system process data, return a subset of the data
containing processes parented by `INIT_PROCESS_ID`.
Args:
df: DataFrame to filter
ppid_column: Column name containing parent process ID (PPID) values
Returns:
A filtered copy of the given DataFrame
"""

return df[df[ppid_column] == INIT_PROCESS_ID]
return df[df['PPID'] == INIT_PROCESS_ID]


def filter_user_processes(df: pd.DataFrame, uid_whitelist: Whitelist, uid_column: str = 'UID') -> pd.DataFrame:
"""Filter a DataFrame to only include whitelisted users
def filter_user_whitelist(df: pd.DataFrame, uid_whitelist: Whitelist) -> pd.DataFrame:
"""Filter a DataFrame to only include a subset of user IDs
Given a DataFrame with system process data, return a subset of the data
containing processes owned by given user IDs.
containing processes owned by the given user IDs.
Args:
df: DataFrame to filter
uid_whitelist: List of user IDs to whitelist
uid_column: Column name containing user ID (UID) values
Returns:
A filtered copy of the given DataFrame
"""

whitelist_index = df[uid_column].apply(id_in_whitelist, whitelist=uid_whitelist)
whitelist_index = df['UID'].apply(id_in_whitelist, whitelist=uid_whitelist)
return df[whitelist_index]


def filter_valid_slurm_users(df: pd.DataFrame, uid_blacklist: pd.Series, uid_column: str = 'UID') -> pd.DataFrame:
"""Filter a DataFrame to exclude a list of user IDs"""
def filter_active_slurm_users(df: pd.DataFrame) -> pd.DataFrame:
"""Filter a DataFrame to exclude user IDs tied to a running slurm job
Args:
df: DataFrame to filter
Returns:
A filtered copy of the given DataFrame
"""

# Todo
return df
is_slurm = df['CMD'].str.contains('slurmd').any()
slurm_uids = is_slurm['UID']
return df[~df['UID'].isin(slurm_uids)]


async def terminate_errant_processes(
Expand All @@ -155,12 +146,11 @@ async def terminate_errant_processes(
async with ssh_limit, asyncssh.connect(node, options=ssh_options) as conn:
logging.info(f'[{node}] Scanning for processes')
process_df = await get_remote_processes(conn)
slurm_pids = await get_slurm_job_users(conn)

# Identify orphaned processes and filter them by whitelist criteria
process_df = filter_orphaned_processes(process_df, 'PPID')
process_df = filter_user_processes(process_df, uid_whitelist, 'UID')
process_df = filter_valid_slurm_users(process_df, slurm_pids, 'UID')
process_df = filter_orphaned_processes(process_df)
process_df = filter_user_whitelist(process_df, uid_whitelist)
process_df = filter_active_slurm_users(process_df)

for _, row in process_df.iterrows():
logging.info(f'[{node}] Marking for termination {dict(row)}')
Expand Down

0 comments on commit 99e2740

Please sign in to comment.