diff --git a/bin/pycbc_live b/bin/pycbc_live index 8a808ca5a4f..606f081cf9a 100755 --- a/bin/pycbc_live +++ b/bin/pycbc_live @@ -82,7 +82,13 @@ class LiveEventManager(object): def __init__(self, args, bank): self.low_frequency_cutoff = args.low_frequency_cutoff self.bank = bank - self.skymap_only_ifos = [] if args.skymap_only_ifos is None else list(set(args.skymap_only_ifos)) + # all interferometers involved in the analysis, whether for generating + # candidates or doing localization only + self.ifos = set(args.channel_name.keys()) + # interferometers used for localization only + self.skymap_only_ifos = set(args.skymap_only_ifos or []) + # subset of the interferometers allowed to produce candidates + self.trigg_ifos = self.ifos - self.skymap_only_ifos # Figure out what we are supposed to process within the pool of MPI processes self.comm = mpi.COMM_WORLD @@ -117,7 +123,7 @@ class LiveEventManager(object): if self.run_snr_optimization: # preestimate the number of CPU cores that we can afford giving # to followup processes without slowing down the main search - bg_cores = len(tuple(itertools.combinations(ifos, 2))) + bg_cores = len(tuple(itertools.combinations(self.trigg_ifos, 2))) analysis_cores = 1 + bg_cores if platform.system() != 'Darwin': available_cores = len(os.sched_getaffinity(0)) @@ -502,7 +508,7 @@ class LiveEventManager(object): logging.info('computing followup data for coinc') coinc_ifos = coinc_results['foreground/type'].split('-') followup_ifos = set(ifos) - set(coinc_ifos) - followup_ifos = list(followup_ifos | set(self.skymap_only_ifos)) + followup_ifos = list(followup_ifos | self.skymap_only_ifos) double_ifar = coinc_results['foreground/ifar'] if double_ifar < args.ifar_double_followup_threshold: @@ -537,13 +543,20 @@ class LiveEventManager(object): logging.info('Coincident candidate! Saving as %s', fname) # Verbally explain some details not obvious from the other info - comment = ('Trigger produced as a {} coincidence. ' - 'FAR is based on all listed detectors.
' - 'Two-detector ranking statistic: {}
' - 'Followup detectors: {}') - comment = comment.format(ppdets(coinc_ifos), - args.ranking_statistic, - ppdets(followup_ifos)) + comment = ( + 'Trigger produced as a {} coincidence.
' + 'Two-detector ranking statistic: {}
' + 'Detectors used for FAR calculation: {}.
' + 'Detectors used for localization: {}.
' + 'Detectors used only for localization: {}.' + ) + comment = comment.format( + ppdets(coinc_ifos), + args.ranking_statistic, + set(ifos) - self.skymap_only_ifos, + ppdets(followup_ifos), + ppdets(self.skymap_only_ifos) + ) ifar = coinc_results['foreground/ifar'] upload_checks = self.enable_gracedb_upload and self.ifar_upload_threshold < ifar @@ -604,7 +617,7 @@ class LiveEventManager(object): logging.info(f'Found {ifo} single with ifar {sifar}') followup_ifos = [i for i in active if i is not ifo] - followup_ifos = list(set(followup_ifos) | set(self.skymap_only_ifos)) + followup_ifos = list(set(followup_ifos) | self.skymap_only_ifos) # Don't recompute ifar considering other ifos sld = self.compute_followup_data( [ifo], @@ -636,10 +649,15 @@ class LiveEventManager(object): logging.info('Single-detector candidate! Saving as %s', fname) # Verbally explain some details not obvious from the other info - comment = ('Trigger produced as a {0} single. ' - 'FAR is based on {0} only.
' - 'Followup detectors: {1}') - comment = comment.format(ifo, ppdets(followup_ifos)) + comment = ( + 'Trigger produced as a {0} single.
' + 'Detectors used for FAR calculation: {0}.
' + 'Detectors used for localization: {1}.
' + 'Detectors used only for localization: {2}.' + ) + comment = comment.format( + ifo, ppdets(followup_ifos), ppdets(self.skymap_only_ifos) + ) # Has a coinc event at this time been uploaded recently? # If so, skip upload - Note that this means that we _always_ @@ -1017,8 +1035,6 @@ args = parser.parse_args() scheme.verify_processing_options(args, parser) fft.verify_fft_options(args, parser) Coincer.verify_args(args, parser) -ifos = set(args.channel_name.keys()) -analyze_singles = LiveSingle.verify_args(args, parser, ifos) if args.output_background is not None and len(args.output_background) != 2: parser.error('--output-background takes two parameters: period and path') @@ -1045,14 +1061,16 @@ if bank.min_f_lower < args.low_frequency_cutoff: 'minimum f_lower across all templates ' '({} Hz)'.format(args.low_frequency_cutoff, bank.min_f_lower)) -logging.info('Analyzing data from detectors %s', ppdets(ifos)) - evnt = LiveEventManager(args, bank) -logging.info('Detectors that only aid in the sky localization %s', ppdets(evnt.skymap_only_ifos)) + +logging.info('Analyzing data from detectors %s', ppdets(evnt.ifos)) +logging.info('Using %s for localization only', ppdets(evnt.skymap_only_ifos)) + +analyze_singles = LiveSingle.verify_args(args, parser, evnt.trigg_ifos) # include MPI rank and functional description into proctitle task_name = 'root' if evnt.rank == 0 else 'filtering' -setproctitle('PyCBC Live rank {:d} [{}]'.format(evnt.rank, task_name)) +setproctitle(f'PyCBC Live rank {evnt.rank:d} [{task_name}]') sg_chisq = SingleDetSGChisq.from_cli(args, bank, args.chisq_bins) @@ -1111,23 +1129,27 @@ with ctx: args.round_start_time logging.info('Starting from: %s', args.start_time) - # initialize the data readers for all detectors + # Initialize the data readers for all detectors. For rank 0, we need data + # from all detectors, including the localization-only ones. For higher + # ranks, we only need the detectors that can generate candidates. if args.max_length is not None: maxlen = args.max_length maxlen = int(maxlen) - data_reader = {ifo: StrainBuffer.from_cli(ifo, args, maxlen) - for ifo in ifos} + data_reader = { + ifo: StrainBuffer.from_cli(ifo, args, maxlen) + for ifo in (evnt.ifos if evnt.rank == 0 else evnt.trigg_ifos) + } evnt.data_readers = data_reader # create single-detector background "estimators" if analyze_singles and evnt.rank == 0: sngl_estimator = {ifo: LiveSingle.from_cli(args, ifo) - for ifo in ifos} + for ifo in evnt.trigg_ifos} - # Create double coincident background estimator for every combo + # Create double coincident background estimator + # for every pair of triggering interferometers if args.enable_background_estimation and evnt.rank == 0: - trigg_ifos = [ifo for ifo in ifos if ifo not in evnt.skymap_only_ifos] - ifo_combos = itertools.combinations(trigg_ifos, 2) + ifo_combos = itertools.combinations(evnt.trigg_ifos, 2) estimators = [] for combo in ifo_combos: logging.info('Will calculate %s background', ppdets(combo, "-")) @@ -1167,12 +1189,14 @@ with ctx: # main analysis loop data_end = lambda: data_reader[tuple(data_reader.keys())[0]].end_time last_bg_dump_time = int(data_end()) - psd_count = {ifo:0 for ifo in ifos} + psd_count = {ifo:0 for ifo in evnt.ifos} # Create dicts to track whether the psd has been recalculated and to hold # psd variation filters - psd_recalculated = {ifo: True for ifo in ifos} - psd_var_filts = {ifo: None for ifo in ifos} + psd_recalculated = { + ifo: True for ifo in (evnt.ifos if evnt.rank == 0 else evnt.trigg_ifos) + } + psd_var_filts = {ifo: None for ifo in evnt.trigg_ifos} while data_end() < args.end_time: t1 = pycbc.gps_now() @@ -1181,7 +1205,7 @@ with ctx: results = {} evnt.live_detectors = set() - for ifo in ifos: + for ifo in (evnt.ifos if evnt.rank == 0 else evnt.trigg_ifos): results[ifo] = False status = data_reader[ifo].advance( valid_pad, @@ -1259,23 +1283,27 @@ with ctx: # Calculate and add the psd variation for the results if args.psd_variation: - for ifo in results: - logging.info(f"Calculating PSD Variation Statistic for {ifo}") + logging.info("Calculating PSD Variation Statistic for %s", ifo) # A new filter is needed if the PSD has been recalculated if psd_recalculated[ifo] is True: - psd_var_filts[ifo] = variation.live_create_filter(data_reader[ifo].psd, - args.psd_segment_length, - int(args.sample_rate)) + psd_var_filts[ifo] = variation.live_create_filter( + data_reader[ifo].psd, + args.psd_segment_length, + int(args.sample_rate) + ) psd_recalculated[ifo] = False - psd_var_ts = variation.live_calc_psd_variation(data_reader[ifo].strain, - psd_var_filts[ifo], - args.increment) + psd_var_ts = variation.live_calc_psd_variation( + data_reader[ifo].strain, + psd_var_filts[ifo], + args.increment + ) - psd_var_vals = variation.live_find_var_value(results[ifo], - psd_var_ts) + psd_var_vals = variation.live_find_var_value( + results[ifo], psd_var_ts + ) results[ifo]['psd_var_val'] = psd_var_vals @@ -1295,7 +1323,7 @@ with ctx: gates = {ifo: data_reader[ifo].gate_params for ifo in data_reader} # map the results file to an hdf file - prefix = '{}-{}-{}-{}'.format(''.join(sorted(ifos)), + prefix = '{}-{}-{}-{}'.format(''.join(sorted(evnt.ifos)), args.file_prefix, data_end() - args.analysis_chunk, valid_pad) @@ -1310,8 +1338,9 @@ with ctx: data_end() - last_bg_dump_time > float(args.output_background[0]): last_bg_dump_time = int(data_end()) bg_dists = coinc_pool.broadcast(output_background, None) - bg_fn = '{}-LIVE_BACKGROUND-{}.hdf'.format(''.join(sorted(ifos)), - last_bg_dump_time) + bg_fn = '{}-LIVE_BACKGROUND-{}.hdf'.format( + ''.join(sorted(evnt.trigg_ifos)), last_bg_dump_time + ) bg_fn = os.path.join(args.output_background[1], bg_fn) with h5py.File(bg_fn, 'w') as bgf: for bg_ifos, bg_data, bg_time in bg_dists: