From 0fbf7df631c963aa2e2556c7846a9680d6ecd1c3 Mon Sep 17 00:00:00 2001 From: Pat Gunn Date: Fri, 17 Nov 2023 16:28:15 -0500 Subject: [PATCH] Remove SLURM integration --- SLURM/README | 11 -- SLURM/demo_slurm.py | 135 ------------------- SLURM/slurmStart.sh | 33 ----- caiman/cluster.py | 164 ++++++----------------- caiman/source_extraction/cnmf/spatial.py | 2 +- 5 files changed, 44 insertions(+), 301 deletions(-) delete mode 100644 SLURM/README delete mode 100644 SLURM/demo_slurm.py delete mode 100755 SLURM/slurmStart.sh diff --git a/SLURM/README b/SLURM/README deleted file mode 100644 index 5e34ab04e..000000000 --- a/SLURM/README +++ /dev/null @@ -1,11 +0,0 @@ -Caiman generally works fine under SLURM, on a single node allocated for use with an interactive run. - -The code in this directory is for a past attempt at something different: SLURM integration with the ipyparallel -backend, resulting in SLURM running caiman in parallel over a number of nodes. This mode was never documented and -likely never saw use outside where Caiman was developed, possibly never seeing use after about 2017. - -It is unclear whether this mode ever provided performance benefits over running on a single reasonably fast node. - -If you're using this, we likely will not be able to support you if anything goes wrong, and support for this -mode may be removed from Caiman at some point. To try it out, you will need to know SLURM very well and be -comfortable reading the code. diff --git a/SLURM/demo_slurm.py b/SLURM/demo_slurm.py deleted file mode 100644 index 3d4e587dc..000000000 --- a/SLURM/demo_slurm.py +++ /dev/null @@ -1,135 +0,0 @@ -#!/usr/bin/env python - - -import ca_source_extraction as cse -import glob -from ipyparallel import Client -import matplotlib as mpl -mpl.use('TKAgg') -from matplotlib import pyplot as plt -import numpy as np -import os -import psutil -import pylab as pl -from scipy.sparse import coo_matrix -import shutil -import subprocess -import sys -from time import time -import tifffile -import time as tm -from time import time - - -############# -# FIXME This demo is ancient and uses APIs that predate the modern codebase; it will not work - -slurm_script = 'slurmStart.sh' # Replace this with a full path to your slurmStart script -cse.utilities.start_server(ncpus=None, slurm_script=slurm_script) - -# see if running on slurm, you need to source the file slurmAlloc.rc before starting in order to start controller and engines -cse.utilities.stop_server(is_slurm=True) - - -# roughly number of cores on your machine minus 1 -n_processes = np.maximum(psutil.cpu_count() - 2, 1) -# print 'using ' + str(n_processes) + ' processes' -p = 2 # order of the AR model (in general 1 or 2) - -# start cluster for efficient computation -# print "Stopping cluster to avoid unnencessary use of memory...." -# sys.stdout.flush() -# cse.utilities.stop_server() - -# LOAD MOVIE AND MAKE DIMENSIONS COMPATIBLE WITH CNMF -reload = 0 -filename = 'movies/demoMovie.tif' -t = tifffile.TiffFile(filename) -Yr = t.asarray().astype(dtype=np.float32) -Yr = np.transpose(Yr, (1, 2, 0)) -d1, d2, T = Yr.shape -Yr = np.reshape(Yr, (d1 * d2, T), order='F') -# np.save('Y',Y) -np.save('Yr', Yr) -# Y=np.load('Y.npy',mmap_mode='r') -Yr = np.load('Yr.npy', mmap_mode='r') -Y = np.reshape(Yr, (d1, d2, T), order='F') -Cn = cse.utilities.local_correlations(Y) -# n_pixels_per_process=d1*d2/n_processes # how to subdivide the work among processes - - -options = cse.utilities.CNMFSetParms(Y, p=p, gSig=[4, 4], K=30) -# cse.utilities.start_server(options['spatial_params']['n_processes']) - -# PREPROCESS DATA AND INITIALIZE COMPONENTS -t1 = time() -Yr, sn, g = cse.pre_processing.preprocess_data( - Yr, **options['preprocess_params']) -Atmp, Ctmp, b_in, f_in, center = cse.initialization.initialize_components( - Y, **options['init_params']) -print((time() - t1)) - -# Refine manually component by clicking on neurons -refine_components = False -if refine_components: - Ain, Cin = cse.utilities.manually_refine_components( - Y, options['init_params']['gSig'], coo_matrix(Atmp), Ctmp, Cn, thr=0.9) -else: - Ain, Cin = Atmp, Ctmp -# plot estimated component -crd = cse.utilities.plot_contours(coo_matrix(Ain), Cn, thr=0.9) -pl.show() -# UPDATE SPATIAL COMPONENTS -pl.close() -t1 = time() -A, b, Cin, f_in = cse.spatial.update_spatial_components( - Yr, Cin, f_in, Ain, sn=sn, **options['spatial_params']) -t_elSPATIAL = time() - t1 -print(t_elSPATIAL) -plt.figure() -crd = cse.utilities.plot_contours(A, Cn, thr=0.9) -# update_temporal_components -pl.close() -t1 = time() -# set this to zero for fast updating without deconvolution -options['temporal_params']['p'] = 0 -C, A, b, f, S, bl, c1, neurons_sn, g, YrA = cse.temporal.update_temporal_components( - Yr, A, b, Cin, f_in, bl=None, c1=None, sn=None, g=None, **options['temporal_params']) -t_elTEMPORAL = time() - t1 -print(t_elTEMPORAL) -# merge components corresponding to the same neuron -t1 = time() -A_m, C_m, nr_m, merged_ROIs, S_m, bl_m, c1_m, sn_m, g_m = cse.merging.merge_components( - Yr, A, b, C, f, S, sn, options['temporal_params'], options['spatial_params'], bl=bl, c1=c1, sn=neurons_sn, g=g, thr=0.8, mx=50, fast_merge=True) -t_elMERGE = time() - t1 -print(t_elMERGE) - - -plt.figure() -crd = cse.plot_contours(A_m, Cn, thr=0.9) -# refine spatial and temporal -pl.close() -t1 = time() -A2, b2, C2, f = cse.spatial.update_spatial_components( - Yr, C_m, f, A_m, sn=sn, **options['spatial_params']) -# set it back to original value to perform full deconvolution -options['temporal_params']['p'] = p -C2, A2, b2, f2, S2, bl2, c12, neurons_sn2, g21, YrA = cse.temporal.update_temporal_components( - Yr, A2, b2, C2, f, bl=None, c1=None, sn=None, g=None, **options['temporal_params']) -print((time() - t1)) - -A_or, C_or, srt = cse.utilities.order_components(A2, C2) -#cse.utilities.view_patches(Yr,coo_matrix(A_or),C_or,b2,f2,d1,d2,YrA = YrA[srt,:], secs=1) -cse.utilities.view_patches_bar(Yr, coo_matrix( - A_or), C_or, b2, f2, d1, d2, YrA=YrA[srt, :]) -# plt.show(block=True) -plt.show() - - - -plt.figure() -crd = cse.utilities.plot_contours(A_or, Cn, thr=0.9) - -# STOP CLUSTER -pl.close() -cse.utilities.stop_server() diff --git a/SLURM/slurmStart.sh b/SLURM/slurmStart.sh deleted file mode 100755 index cdd482482..000000000 --- a/SLURM/slurmStart.sh +++ /dev/null @@ -1,33 +0,0 @@ -#!/bin/bash - -[[ ${SLURM_NODELIST} ]] || { echo "SLURM environment not detected." ; exit 1 ; } - -pdir=$(pwd) -profile="${SLURM_JOBID}_profile" -clog="ipcontroller_${SLURM_JOBID}.log" - -ipcontroller --location=$(hostname -i) --profile=${profile} --ipython-dir=${pdir} --ip='*' > ${clog} 2>&1 & -cpid=$! - -started=0 -for d in 1 1 2 4 -do - grep -q 'Scheduler started' ${clog} && { started=1 ; break ; } - sleep $d -done - -[[ ${started} == 1 ]] || { echo "ipcontroller took too long to start. Exiting." ; exit 1 ; } - -srun bash -c 'ipengine --profile='${profile}' --ipython-dir='${pdir}' > ipengine_${SLURM_JOBID}_${SLURM_PROCID}.log 2>&1' & - -started=0 -for d in 2 2 4 4 8 8 8 -do - [[ $(grep 'engine::Engine Connected: ' ${clog} | wc -l) == ${SLURM_NTASKS} ]] && { started=1 ; break ; } - grep 'engine::Engine Connected: ' ${clog} - sleep $d -done -[[ ${started} == 1 ]] || { echo "ipengines took too long to start. Exiting." ; exit 1 ; } - -export IPPPDIR=${pdir} IPPPROFILE=${profile} - diff --git a/caiman/cluster.py b/caiman/cluster.py index f2afee4f0..139164951 100644 --- a/caiman/cluster.py +++ b/caiman/cluster.py @@ -100,15 +100,15 @@ def extract_patch_coordinates(dims: tuple, return list(map(np.sort, coords_flat)), shapes -def start_server(slurm_script: str = None, ipcluster: str = "ipcluster", ncpus: int = None) -> None: +def start_server(ipcluster: str = "ipcluster", ncpus: int = None) -> None: """ programmatically start the ipyparallel server Args: - ncpus: int + ncpus number of processors - ipcluster : str + ipcluster ipcluster binary file name; requires 4 path separators on Windows. ipcluster="C:\\\\Anaconda3\\\\Scripts\\\\ipcluster.exe" Default: "ipcluster" """ @@ -116,56 +116,22 @@ def start_server(slurm_script: str = None, ipcluster: str = "ipcluster", ncpus: if ncpus is None: ncpus = psutil.cpu_count() - if slurm_script is None: - - if ipcluster == "ipcluster": - subprocess.Popen(f"ipcluster start -n {ncpus}", shell=True, close_fds=(os.name != 'nt')) - else: - subprocess.Popen(shlex.split(f"{ipcluster} start -n {ncpus}"), - shell=True, - close_fds=(os.name != 'nt')) - time.sleep(1.5) - # Check that all processes have started - client = ipyparallel.Client() - time.sleep(1.5) - while len(client) < ncpus: - sys.stdout.write(".") # Give some visual feedback of things starting - sys.stdout.flush() # (de-buffered) - time.sleep(0.5) - logger.debug('Making sure everything is up and running') - client.direct_view().execute('__a=1', block=True) # when done on all, we're set to go + if ipcluster == "ipcluster": + subprocess.Popen(f"ipcluster start -n {ncpus}", shell=True, close_fds=(os.name != 'nt')) else: - shell_source(slurm_script) - pdir, profile = os.environ['IPPPDIR'], os.environ['IPPPROFILE'] - logger.debug([pdir, profile]) - c = Client(ipython_dir=pdir, profile=profile) - ee = c[:] - ne = len(ee) - logger.info(f'Running on {ne} engines.') - c.close() - sys.stdout.write("start_server: done\n") - - -def shell_source(script: str) -> None: - """ Run a source-style bash script, copy resulting env vars to current process. """ - # XXX This function is weird and maybe not a good idea. People easily might expect - # it to handle conditionals. Maybe just make them provide a key-value file - #introduce echo to indicate the end of the output - pipe = subprocess.Popen(f". {script}; env; echo 'FINISHED_CLUSTER'", stdout=subprocess.PIPE, shell=True) - - env = dict() - while True: - line = pipe.stdout.readline().decode('utf-8').rstrip() - if 'FINISHED_CLUSTER' in line: # find the keyword set above to determine the end of the output stream - break - logger.debug("shell_source parsing line[" + str(line) + "]") - lsp = str(line).split("=", 1) - if len(lsp) > 1: - env[lsp[0]] = lsp[1] - - os.environ.update(env) - pipe.stdout.close() - + subprocess.Popen(shlex.split(f"{ipcluster} start -n {ncpus}"), + shell=True, + close_fds=(os.name != 'nt')) + time.sleep(1.5) + # Check that all processes have started + client = ipyparallel.Client() + time.sleep(1.5) + while len(client) < ncpus: + sys.stdout.write(".") # Give some visual feedback of things starting + sys.stdout.flush() # (de-buffered) + time.sleep(0.5) + logger.debug('Making sure everything is up and running') + client.direct_view().execute('__a=1', block=True) # when done on all, we're set to go def stop_server(ipcluster: str = 'ipcluster', pdir: str = None, profile: str = None, dview=None) -> None: """ @@ -185,61 +151,33 @@ def stop_server(ipcluster: str = 'ipcluster', pdir: str = None, profile: str = N dview.terminate() else: logger.info("Stopping cluster...") - try: - pdir, profile = os.environ['IPPPDIR'], os.environ['IPPPROFILE'] - is_slurm = True - except: - logger.debug('stop_server: not a slurm cluster') - is_slurm = False - - if is_slurm: - if pdir is None and profile is None: - pdir, profile = os.environ['IPPPDIR'], os.environ['IPPPROFILE'] - c = Client(ipython_dir=pdir, profile=profile) - ee = c[:] - ne = len(ee) - logger.info(f'Shutting down {ne} engines.') - c.close() - c.shutdown(hub=True) - shutil.rmtree('profile_' + str(profile)) - try: - shutil.rmtree('./log/') - except: - logger.info('creating log folder') # FIXME Not what this means - - files = glob.glob('*.log') - os.mkdir('./log') - - for fl in files: - shutil.move(fl, './log/') + if ipcluster == "ipcluster": + proc = subprocess.Popen("ipcluster stop", + shell=True, + stderr=subprocess.PIPE, + close_fds=(os.name != 'nt')) else: - if ipcluster == "ipcluster": - proc = subprocess.Popen("ipcluster stop", - shell=True, - stderr=subprocess.PIPE, - close_fds=(os.name != 'nt')) - else: - proc = subprocess.Popen(shlex.split(ipcluster + " stop"), - shell=True, - stderr=subprocess.PIPE, - close_fds=(os.name != 'nt')) - - line_out = proc.stderr.readline() - if b'CRITICAL' in line_out: - logger.info("No cluster to stop...") - elif b'Stopping' in line_out: - st = time.time() - logger.debug('Waiting for cluster to stop...') - while (time.time() - st) < 4: - sys.stdout.write('.') - sys.stdout.flush() - time.sleep(1) - else: - logger.error(line_out) - logger.error('**** Unrecognized syntax in ipcluster output, waiting for server to stop anyways ****') + proc = subprocess.Popen(shlex.split(ipcluster + " stop"), + shell=True, + stderr=subprocess.PIPE, + close_fds=(os.name != 'nt')) + + line_out = proc.stderr.readline() + if b'CRITICAL' in line_out: + logger.info("No cluster to stop...") + elif b'Stopping' in line_out: + st = time.time() + logger.debug('Waiting for cluster to stop...') + while (time.time() - st) < 4: + sys.stdout.write('.') + sys.stdout.flush() + time.sleep(1) + else: + logger.error(line_out) + logger.error('**** Unrecognized syntax in ipcluster output, waiting for server to stop anyways ****') - proc.stderr.close() + proc.stderr.close() logger.info("stop_cluster(): done") @@ -256,7 +194,6 @@ def setup_cluster(backend:str = 'multiprocessing', 'multiprocessing' - Use multiprocessing library 'ipyparallel' - Use ipyparallel instead (better on Windows?) 'single' - Don't be parallel (good for debugging, slow) - 'SLURM' - Try to use SLURM batch system (untested, involved). Most backends will try, by default, to stop a running cluster if it is running before setting up a new one, or throw an error if they find one. @@ -273,7 +210,7 @@ def setup_cluster(backend:str = 'multiprocessing', Returns: - c: ipyparallel.Client object; only used for ipyparallel and SLURM backends, else None + c: ipyparallel.Client object; only used for ipyparallel backends, else None dview: multicore processing engine that is used for parallel processing. If backend is 'multiprocessing' then dview is Pool object. If backend is 'ipyparallel' then dview is a DirectView object. @@ -325,21 +262,6 @@ def setup_cluster(backend:str = 'multiprocessing', c = None n_processes = 1 - elif backend == 'SLURM': - # Override n_processes from above because with slurm you're using cluster resources, not machine-local resources - # Warning: This code may no longer work; it has not been tested in a very, very long time - n_processes = int(os.environ.get('SLURM_NPROCS')) - try: - stop_server() - except: - logger.debug('Nothing to stop') - slurm_script = os.environ.get('SLURMSTART_SCRIPT') # An example of this is in the source repo under 'SLURM/slurmStart.sh' - logger.info([str(n_processes), slurm_script]) - start_server(slurm_script=slurm_script, ncpus=n_processes) - pdir, profile = os.environ['IPPPDIR'], os.environ['IPPPROFILE'] - logger.info([pdir, profile]) - c = Client(ipython_dir=pdir, profile=profile) - dview = c[:] else: raise Exception('Unknown Backend') diff --git a/caiman/source_extraction/cnmf/spatial.py b/caiman/source_extraction/cnmf/spatial.py index 329dff2d3..e3c655f19 100644 --- a/caiman/source_extraction/cnmf/spatial.py +++ b/caiman/source_extraction/cnmf/spatial.py @@ -92,7 +92,6 @@ def update_spatial_components(Y, C=None, f=None, A_in=None, sn=None, dims=None, 'ipyparallel', 'single_thread' single_thread:no parallelization. It can be used with small datasets. ipyparallel: uses ipython clusters and then send jobs to each of them - SLURM: use the slurm scheduler n_pixels_per_process: [optional] int number of pixels to be processed by each thread @@ -160,6 +159,7 @@ def update_spatial_components(Y, C=None, f=None, A_in=None, sn=None, dims=None, Exception "Failed to delete: " + folder """ + # TODO fix documentation on backend #logging.info('Initializing update of Spatial Components') if expandCore is None: