From 7d878d12a38156eabf158a67f11ce58c818a0342 Mon Sep 17 00:00:00 2001 From: GarethCabournDavies Date: Wed, 8 Mar 2023 08:59:14 -0800 Subject: [PATCH 1/2] Move SNR optimisation functions and arguments into their own module --- bin/pycbc_live | 15 +- bin/pycbc_optimize_snr | 234 ++--------------------------- pycbc/live/__init__.py | 5 + pycbc/live/snr_optimizer.py | 288 ++++++++++++++++++++++++++++++++++++ 4 files changed, 315 insertions(+), 227 deletions(-) create mode 100644 pycbc/live/__init__.py create mode 100644 pycbc/live/snr_optimizer.py diff --git a/bin/pycbc_live b/bin/pycbc_live index c5bf3b07b3d..98bd36c61e2 100755 --- a/bin/pycbc_live +++ b/bin/pycbc_live @@ -41,6 +41,7 @@ from pycbc import mchirp_area from pycbc.detector import ppdets from pycbc.filter import resample from pycbc.psd import estimate +from pycbc.live import snr_optimizer # Use cached class-based FFTs in the resample and estimate module resample.USE_CACHING_FOR_LFILTER = True @@ -114,12 +115,15 @@ class LiveEventManager(object): bg_cores = len(tuple(itertools.combinations(ifos, 2))) analysis_cores = 1 + bg_cores self.fu_cores = available_cores - analysis_cores + self.optimizer = args.optimizer if self.fu_cores <= 0: logging.warning('Insufficient number of CPU cores (%d) to ' 'run search and trigger followups. Uploaded ' 'triggers will momentarily increase the lag', available_cores) self.fu_cores = 1 + # Convert SNR optimizer options into a string + self.snr_opt_options = snr_optimizer.args_to_string(args) if args.enable_embright_has_massgap: if args.embright_massgap_max < self.mc_area_args['mass_bdary']['ns_max']: @@ -313,6 +317,11 @@ class LiveEventManager(object): cmd = 'timeout {} '.format(args.snr_opt_timeout) exepath = which('pycbc_optimize_snr') cmd += exepath + ' ' + + # Add SNR optimization options to the command + cmd += self.snr_opt_options + + # Add data files according to the event we are optimizing data_fils_str = '--data-files ' psd_fils_str = '--psd-files ' for ifo in live_ifos: @@ -757,7 +766,7 @@ parser.add_argument('--psd-samples', type=int, required=True, help="Number of PSD segments to use in the rolling estimate") parser.add_argument('--psd-segment-length', type=int, required=True, help="Length in seconds of each PSD segment") -parser.add_argument('--psd-inverse-length', type=float, +sparser.add_argument('--psd-inverse-length', type=float, help="Length in time for the equivalent FIR filter") parser.add_argument('--psd-recompute-length', type=positive_int, default=1, help="If given only recompute the PSD after this number of " @@ -894,15 +903,12 @@ parser.add_argument('--size-override', type=int, metavar='N', parser.add_argument('--fftw-planning-limit', type=float, help="Time in seconds to allow for a plan to be created") parser.add_argument('--run-snr-optimization', action='store_true', - default=False, help='Run spawned followup processes to maximize SNR for ' 'any trigger uploaded to GraceDB') parser.add_argument('--snr-opt-timeout', type=int, default=400, metavar='SECONDS', help='Maximum allowed duration of followup process to maximize SNR') parser.add_argument('--snr-opt-label', type=str, default='SNR_OPTIMIZED', help='Label to apply to snr-optimized GraceDB uploads') - - parser.add_argument('--enable-embright-has-massgap', action='store_true', default=False, help='Estimate HasMassGap probability for EMBright info. Lower limit ' 'of the mass gap is equal to the maximum NS mass used for ' @@ -918,6 +924,7 @@ Coincer.insert_args(parser) SingleDetSGChisq.insert_option_group(parser) mchirp_area.insert_args(parser) livepau.insert_live_pastro_option_group(parser) +snr_optimizer.insert_snr_optimizer_options(parser) args = parser.parse_args() scheme.verify_processing_options(args, parser) diff --git a/bin/pycbc_optimize_snr b/bin/pycbc_optimize_snr index 8c6b05059d1..c5ff0e76fc4 100755 --- a/bin/pycbc_optimize_snr +++ b/bin/pycbc_optimize_snr @@ -4,17 +4,12 @@ import os import argparse, numpy, h5py -import time -import types import logging -from scipy.optimize import differential_evolution, shgo import pycbc from pycbc import ( - DYN_RANGE_FAC, fft, scheme, version, waveform + fft, scheme, version ) -from pycbc.types import MultiDetOptionAction, zeros, load_frequencyseries -from pycbc.filter import matched_filter_core -import pycbc.waveform.bank +from pycbc.types import MultiDetOptionAction, load_frequencyseries from pycbc.conversions import ( mchirp_from_mass1_mass2, mtotal_from_mchirp_eta, mass1_from_mtotal_eta, mass2_from_mtotal_eta @@ -23,183 +18,7 @@ from pycbc.io import live from pycbc.io.hdf import load_hdf5_to_dict from pycbc.detector import Detector from pycbc.psd import interpolate - -try: - import pyswarms as ps -except: - ps = None - - -Nfeval = 0 -start_time = time.time() - -def callback_func(Xi, convergence=0): - global Nfeval - logging.info("Currently at %d %s", Nfeval, convergence) - if (time.time() - start_time) > 360: - return True - Nfeval += 1 - - -def compute_network_snr_core(v, *argv, raise_err=False): - """ - Compute network SNR as a function over mchirp, eta and two aligned - spin components, stored in that order in the sequence v - """ - data = argv[0] - coinc_times = argv[1] - ifos = argv[2] - flen = argv[3] - approximant = argv[4] - flow = argv[5] - f_end = argv[6] - delta_f = argv[7] - sample_rate = argv[8] - distance = 1.0 / DYN_RANGE_FAC - mtotal = mtotal_from_mchirp_eta(v[0], v[1]) - mass1 = mass1_from_mtotal_eta(mtotal, v[1]) - mass2 = mass2_from_mtotal_eta(mtotal, v[1]) - - # enforce broadly accepted search space boundaries - if mass1 < 1 or mass2 < 1 or mtotal > 500: - return -numpy.inf, {} - - try: - htilde = waveform.get_waveform_filter( - zeros(flen, dtype=numpy.complex64), - approximant=approximant, - mass1=mass1, mass2=mass2, spin1z=v[2], spin2z=v[3], - f_lower=flow, f_final=f_end, delta_f=delta_f, - delta_t=1.0/sample_rate, distance=distance) - except RuntimeError: - if raise_err: - raise - # assume a failure in the waveform approximant - # due to the choice of parameters - else: - return -numpy.inf, {} - - if not hasattr(htilde, 'params'): - htilde.params = dict(mass1=mass1, mass2=mass2, - spin1z=v[2], spin2z=v[3]) - if not hasattr(htilde, 'end_idx'): - htilde.end_idx = int(f_end / htilde.delta_f) - htilde.approximant = approximant - htilde.sigmasq = types.MethodType(pycbc.waveform.bank.sigma_cached, - htilde) - htilde.min_f_lower = flow - htilde.end_frequency = f_end - htilde.f_lower = flow - network_snrsq = 0 - snr_series_dict = {} - for ifo in ifos: - sigmasq = htilde.sigmasq(data[ifo].psd) - snr, _, norm = matched_filter_core(htilde, data[ifo], - h_norm=sigmasq) - duration = 0.095 - half_dur_samples = int(snr.sample_rate * duration / 2) - onsource_idx = float(coinc_times[ifo] - snr.start_time) * snr.sample_rate - onsource_idx = int(round(onsource_idx)) - onsource_slice = slice(onsource_idx - half_dur_samples, - onsource_idx + half_dur_samples + 1) - snr_series = snr[onsource_slice] * norm - snr_series_dict[ifo] = snr * norm - snr_series_dict['sigmasq_' + ifo] = sigmasq - network_snrsq += max(abs(snr_series._data)) ** 2. - - return network_snrsq ** 0.5, snr_series_dict - - -def compute_minus_network_snr(v, *argv): - if len(argv) == 1: - argv = argv[0] - nsnr, _ = compute_network_snr_core(v, *argv) - return -nsnr - - -def compute_minus_network_snr_pso(v, *argv, **kwargs): - argv = kwargs['args'] - nsnr_array = numpy.array([-compute_network_snr_core(v_i, *argv)[0] for v_i in v]) - return nsnr_array - - -def optimize_di(bounds, cli_args, extra_args): - bounds = [ - bounds['mchirp'], - bounds['eta'], - bounds['spin1z'], - bounds['spin2z'] - ] - results = differential_evolution( - compute_minus_network_snr, - bounds, - maxiter=cli_args.di_maxiter, - workers=(cli_args.cores or -1), - popsize=cli_args.di_popsize, - mutation=(0.5, 1), - recombination=0.7, - callback=callback_func, - args=extra_args - ) - return results.x - - -def optimize_shgo(bounds, cli_args, extra_args): - bounds = [ - bounds['mchirp'], - bounds['eta'], - bounds['spin1z'], - bounds['spin2z'] - ] - results = shgo( - compute_minus_network_snr, - bounds=bounds, - args=extra_args, - iters=args.shgo_iters, - n=args.shgo_samples, - sampling_method="sobol" - ) - return results.x - - -def optimize_pso(bounds, cli_args, extra_args): - options = { - 'c1': cli_args.pso_c1, - 'c2': cli_args.pso_c2, - 'w': cli_args.pso_w - } - min_bounds = [ - bounds['mchirp'][0], - bounds['eta'][0], - bounds['spin1z'][0], - bounds['spin2z'][0] - ] - max_bounds = [ - bounds['mchirp'][1], - bounds['eta'][1], - bounds['spin1z'][1], - bounds['spin2z'][1] - ] - optimizer = ps.single.GlobalBestPSO( - n_particles=cli_args.pso_particles, - dimensions=4, - options=options, - bounds=(min_bounds, max_bounds) - ) - _, results = optimizer.optimize( - compute_minus_network_snr_pso, - iters=cli_args.pso_iters, - n_processes=cli_args.cores, - args=extra_args - ) - return results - - -optimize_funcs = { - 'differential_evolution': optimize_di, - 'shgo': optimize_shgo, - 'pso': optimize_pso -} +from pycbc.live import snr_optimizer parser = argparse.ArgumentParser(description=__doc__) @@ -241,49 +60,18 @@ parser.add_argument('--output-path', required=True, help='Path to a directory to store results in') parser.add_argument('--cores', type=int, help='Restrict calculation to given number of CPU cores') -parser.add_argument('--optimizer', type=str, default='differential_evolution', - choices=sorted(optimize_funcs), - help='The optimizer to use, differential_evolution, shgo or pso') -parser.add_argument('--di-maxiter', type=int, default=50, - help='Only relevant for --optimizer differential_evolution: ' - 'The maximum number of generations over which the entire ' - 'population is evolved.') -parser.add_argument('--di-popsize', type=int, default=100, - help='Only relevant for --optimizer differential_evolution: ' - 'A multiplier for setting the total population size.') -parser.add_argument('--shgo-samples', type=int, default=76, - help='Only relevant for --optimizer shgo: ' - 'Number of sampling points used in the construction of the ' - 'simplicial complex.') -parser.add_argument('--shgo-iters', type=int, default=3, - help='Only relevant for --optimizer shgo: ' - 'Number of iterations used in the construction of the simplicial complex.') -parser.add_argument('--pso-iters', type=int, default=5, - help='Only relevant for --optimizer pso: ' - 'Number of iterations used in the particle swarm optimization.') -parser.add_argument('--pso-particles', type=int, default=250, - help='Only relevant for --optimizer pso: ' - 'Number of particles used in the swarm.') -parser.add_argument('--pso-c1', type=float, default=0.5, - help='Only relevant for --optimizer pso: ' - 'The hyperparameter c1: the cognitive parameter.') -parser.add_argument('--pso-c2', type=float, default=2.0, - help='Only relevant for --optimizer pso: ' - 'The hyperparameter c2: the social parameter.') -parser.add_argument('--pso-w', type=float, default=0.01, - help='Only relevant for --optimizer pso: ' - 'The hyperparameter w: the inertia parameter.') - +snr_optimizer.insert_snr_optimizer_options(parser) scheme.insert_processing_option_group(parser) fft.insert_fft_option_group(parser) -# Input checking args = parser.parse_args() + +pycbc.init_logging(args.verbose) + +# Input checking if args.snr_threshold != 4: parser.error("Sorry, the SNR threshold option doesn't work yet") -if args.optimizer == 'pso' and ps == None: - parser.error('You need to install pyswarms to use the pso optimizer.') -pycbc.init_logging(args.verbose) +snr_optimizer.check_snr_optimizer_options(args, parser) scheme.verify_processing_options(args, parser) fft.verify_fft_options(args, parser) @@ -347,7 +135,7 @@ bounds = { with scheme_context: logging.info('Starting optimization') - optimize_func = optimize_funcs[args.optimizer] + optimize_func = snr_optimizer.optimize_funcs[args.optimizer] opt_params = optimize_func(bounds, args, extra_args) logging.info('Optimization complete') @@ -358,7 +146,7 @@ with scheme_context: extra_args[2] = ifos - _, snr_series_dict = compute_network_snr_core(opt_params, *extra_args) + _, snr_series_dict = snr_optimizer.compute_network_snr_core(opt_params, *extra_args) mtotal = mtotal_from_mchirp_eta(opt_params[0], opt_params[1]) mass1 = mass1_from_mtotal_eta(mtotal, opt_params[1]) diff --git a/pycbc/live/__init__.py b/pycbc/live/__init__.py new file mode 100644 index 00000000000..c1f3cddf6c8 --- /dev/null +++ b/pycbc/live/__init__.py @@ -0,0 +1,5 @@ +""" +This packages contains modules to help with pycbc live running +""" + +from .snr_optimizer import * diff --git a/pycbc/live/snr_optimizer.py b/pycbc/live/snr_optimizer.py new file mode 100644 index 00000000000..a5f338b29a7 --- /dev/null +++ b/pycbc/live/snr_optimizer.py @@ -0,0 +1,288 @@ +import argparse, numpy +import time +import logging +import types +from scipy.optimize import differential_evolution, shgo +from pycbc import ( + DYN_RANGE_FAC, waveform +) +from pycbc.types import zeros +import pycbc.waveform.bank +from pycbc.filter import matched_filter_core +from pycbc.conversions import ( + mchirp_from_mass1_mass2, mtotal_from_mchirp_eta, + mass1_from_mtotal_eta, mass2_from_mtotal_eta +) + +try: + import pyswarms as ps +except: + ps = None + +Nfeval = 0 +start_time = time.time() + +def callback_func(Xi, convergence=0): + global Nfeval + logging.info("Currently at %d %s", Nfeval, convergence) + if (time.time() - start_time) > 360: + return True + Nfeval += 1 + +def compute_network_snr_core(v, *argv, raise_err=False): + """ + Compute network SNR as a function over mchirp, eta and two aligned + spin components, stored in that order in the sequence v + """ + data = argv[0] + coinc_times = argv[1] + ifos = argv[2] + flen = argv[3] + approximant = argv[4] + flow = argv[5] + f_end = argv[6] + delta_f = argv[7] + sample_rate = argv[8] + distance = 1.0 / DYN_RANGE_FAC + mtotal = mtotal_from_mchirp_eta(v[0], v[1]) + mass1 = mass1_from_mtotal_eta(mtotal, v[1]) + mass2 = mass2_from_mtotal_eta(mtotal, v[1]) + + # enforce broadly accepted search space boundaries + if mass1 < 1 or mass2 < 1 or mtotal > 500: + return -numpy.inf, {} + + try: + htilde = waveform.get_waveform_filter( + zeros(flen, dtype=numpy.complex64), + approximant=approximant, + mass1=mass1, mass2=mass2, spin1z=v[2], spin2z=v[3], + f_lower=flow, f_final=f_end, delta_f=delta_f, + delta_t=1.0/sample_rate, distance=distance) + except RuntimeError: + if raise_err: + raise + # assume a failure in the waveform approximant + # due to the choice of parameters + else: + return -numpy.inf, {} + + if not hasattr(htilde, 'params'): + htilde.params = dict(mass1=mass1, mass2=mass2, + spin1z=v[2], spin2z=v[3]) + if not hasattr(htilde, 'end_idx'): + htilde.end_idx = int(f_end / htilde.delta_f) + htilde.approximant = approximant + htilde.sigmasq = types.MethodType(pycbc.waveform.bank.sigma_cached, + htilde) + htilde.min_f_lower = flow + htilde.end_frequency = f_end + htilde.f_lower = flow + network_snrsq = 0 + snr_series_dict = {} + for ifo in ifos: + sigmasq = htilde.sigmasq(data[ifo].psd) + snr, _, norm = matched_filter_core(htilde, data[ifo], + h_norm=sigmasq) + duration = 0.095 + half_dur_samples = int(snr.sample_rate * duration / 2) + onsource_idx = float(coinc_times[ifo] - snr.start_time) * snr.sample_rate + onsource_idx = int(round(onsource_idx)) + onsource_slice = slice(onsource_idx - half_dur_samples, + onsource_idx + half_dur_samples + 1) + snr_series = snr[onsource_slice] * norm + snr_series_dict[ifo] = snr * norm + snr_series_dict['sigmasq_' + ifo] = sigmasq + network_snrsq += max(abs(snr_series._data)) ** 2. + + return network_snrsq ** 0.5, snr_series_dict + + +def compute_minus_network_snr(v, *argv): + if len(argv) == 1: + argv = argv[0] + nsnr, _ = compute_network_snr_core(v, *argv) + return -nsnr + + +def compute_minus_network_snr_pso(v, *argv, **kwargs): + argv = kwargs['args'] + nsnr_array = numpy.array([-compute_network_snr_core(v_i, *argv)[0] for v_i in v]) + return nsnr_array + + +def optimize_di(bounds, cli_args, extra_args): + logging.info("Using differential evolution optimizer") + bounds = [ + bounds['mchirp'], + bounds['eta'], + bounds['spin1z'], + bounds['spin2z'] + ] + results = differential_evolution( + compute_minus_network_snr, + bounds, + maxiter=cli_args.differential_evolution_maxiter, + workers=(cli_args.cores or -1), + popsize=cli_args.differential_evolution_popsize, + mutation=(0.5, 1), + recombination=0.7, + callback=callback_func, + args=tuple(extra_args) + ) + return results.x + + +def optimize_shgo(bounds, cli_args, extra_args): + logging.info("Using SHGO optimizer") + bounds = [ + bounds['mchirp'], + bounds['eta'], + bounds['spin1z'], + bounds['spin2z'] + ] + results = shgo( + compute_minus_network_snr, + bounds=bounds, + args=tuple(extra_args), + iters=cli_args.shgo_iters, + n=cli_args.shgo_samples, + callback=callback_func, + sampling_method="sobol" + ) + return results.x + + +def optimize_pso(bounds, cli_args, extra_args): + logging.info("Using PSO optimizer") + options = { + 'c1': cli_args.pso_c1, + 'c2': cli_args.pso_c2, + 'w': cli_args.pso_w + } + min_bounds = [ + bounds['mchirp'][0], + bounds['eta'][0], + bounds['spin1z'][0], + bounds['spin2z'][0] + ] + max_bounds = [ + bounds['mchirp'][1], + bounds['eta'][1], + bounds['spin1z'][1], + bounds['spin2z'][1] + ] + optimizer = ps.single.GlobalBestPSO( + n_particles=cli_args.pso_particles, + dimensions=4, + options=options, + bounds=(min_bounds, max_bounds) + ) + _, results = optimizer.optimize( + compute_minus_network_snr_pso, + iters=cli_args.pso_iters, + n_processes=cli_args.cores, + args=extra_args + ) + return results + + +optimize_funcs = { + 'differential_evolution': optimize_di, + 'shgo': optimize_shgo, + 'pso': optimize_pso +} + +# The following sets the default values of the options, but allows us to check +# if the option has been given on the command line + +# For each optimizer, we have a dictionary of the options, its help +# message and default value + +option_dict = { + 'differential_evolution': { + 'maxiter': ('The maximum number of generations over which the entire ' + 'population is evolved.', 50), + 'popsize': ('A multiplier for setting the total population size.', + 100), + }, + 'shgo': { + 'samples': ('Number of sampling points used in the construction of ' + 'the simplicial complex.', 76), + 'iters': ('Number of iterations used in the construction of the ' + 'simplicial complex.', 3), + }, + 'pso': { + 'iters': ('Number of iterations used in the particle swarm ' + 'optimization.', 5), + 'particles': ('Number of particles used in the swarm.', 250), + 'c1': ('The hyperparameter c1: the cognitive parameter.', 0.5), + 'c2': ('The hyperparameter c2: the social parameter.', 2.0), + 'w': ('The hyperparameter w: the inertia parameter.', 0.01), + }, +} + +def insert_snr_optimizer_options(parser): + opt_opt_group = parser.add_argument_group( + "SNR optimizer " + "configuration options." + ) + # Option to choose which optimizer to use: + optimizer_choices = sorted(list(option_dict.keys())) + opt_opt_group.add_argument('--optimizer', + type=str, + default='differential_evolution', + choices=optimizer_choices, + help='The optimizer to use, choose from ' + ', '.join(optimizer_choices)) + + # For each optimizer, add the possible options + for optimizer, option_subdict in option_dict.items(): + optimizer_name = optimizer.replace('_','-') + for opt_name, opt_help_default in option_subdict.items(): + option_name = f"--{optimizer_name}-{opt_name}" + opt_opt_group.add_argument(option_name, + type=float, + help=f'Only relevant for --optimizer {optimizer}: ' + + opt_help_default[0] + + f' Default = {opt_help_default[1]}') + +def check_snr_optimizer_options(args, parser): + """ + Deal with default options and required parameters given optimizer option + """ + options = {} + options['differential_evolution'] = [args.differential_evolution_maxiter, + args.differential_evolution_popsize] + options['shgo'] = [args.shgo_samples, args.shgo_iters] + options['pso'] = [args.pso_particles, args.pso_c1, args.pso_c2, args.pso_w] + + if args.optimizer == 'pso' and ps == None: + parser.error('You need to install pyswarms to use the pso optimizer.') + + # Have any options been given for a different optimizer? + # If so, raise a warning + for k in options.keys(): + if args.optimizer == k: continue + if any(options[k]): + parser.error("Argument has been supplied which is not suitable " + + f"for the optimizer given ({args.optimizer})") + + # Give the arguments the default values according to the dictionary + for key, value in option_dict[args.optimizer].items(): + key_name = f'{args.optimizer}_{key}' + if not getattr(args, key_name): + setattr(args, key_name, value[1]) + +def args_to_string(args): + """ + Convert the supplied arguments for SNR optimization config into + a string - this is to be used when running subprocesses + """ + argstr = f' --optimizer {args.optimizer} ' + for opt in option_dict[args.optimizer]: + option_fullname = f"--{args.optimizer}-{opt}" + key_name = f'{args.optimizer}_{opt}' + option_value = getattr(args, key_name) + argstr += f'{option_fullname} {option_value} ' + return argstr From 3d4dae2053bd4d60cb548048e47a8a7590be9e94 Mon Sep 17 00:00:00 2001 From: GarethCabournDavies Date: Fri, 5 May 2023 02:10:04 -0700 Subject: [PATCH 2/2] typo --- bin/pycbc_live | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/pycbc_live b/bin/pycbc_live index 98bd36c61e2..b1d007f6320 100755 --- a/bin/pycbc_live +++ b/bin/pycbc_live @@ -766,7 +766,7 @@ parser.add_argument('--psd-samples', type=int, required=True, help="Number of PSD segments to use in the rolling estimate") parser.add_argument('--psd-segment-length', type=int, required=True, help="Length in seconds of each PSD segment") -sparser.add_argument('--psd-inverse-length', type=float, +parser.add_argument('--psd-inverse-length', type=float, help="Length in time for the equivalent FIR filter") parser.add_argument('--psd-recompute-length', type=positive_int, default=1, help="If given only recompute the PSD after this number of "