From efa5483e96f9bd3994e2d14aaad6a26fdce0b861 Mon Sep 17 00:00:00 2001 From: Francesco Pannarale Date: Mon, 11 Nov 2024 20:50:42 +0100 Subject: [PATCH] Preparing for veto file handling in PyGRB (#4929) * Missing empty line * grb_utils.py preparation to handle veto-files and segment files * pycbc_make_offline_grb_workflow update based on grb_utils.py upgrade * Updated hacky pygrb stuff in jobsetup.py * First batch of updates to handle the VDF in PyGRB and streamline pygrb_postprocessing_utils.py * Removing build_veto_filelist from pycbc_pygrb_minifollowups * Better variable naming and string search in jobsetup.py * Removing redundant variables * Corrected call to segments plots * Better argparse syntax * No f-string in logging * assert instead of if --- bin/pycbc_multi_inspiral | 1 + bin/pygrb/pycbc_make_offline_grb_workflow | 35 +- bin/pygrb/pycbc_pygrb_minifollowups | 12 +- pycbc/results/pygrb_postprocessing_utils.py | 340 +++++++------------- pycbc/workflow/grb_utils.py | 90 +++--- pycbc/workflow/jobsetup.py | 31 +- 6 files changed, 199 insertions(+), 310 deletions(-) diff --git a/bin/pycbc_multi_inspiral b/bin/pycbc_multi_inspiral index 655b5ce33e2..dd33cf75f1d 100755 --- a/bin/pycbc_multi_inspiral +++ b/bin/pycbc_multi_inspiral @@ -74,6 +74,7 @@ def slide_limiter(args): num_slides = 1 return num_slides + # The following block of lines sets up the command-line interface (CLI) for the # pycbc_multi_inspiral executable. time_init = time.time() diff --git a/bin/pygrb/pycbc_make_offline_grb_workflow b/bin/pygrb/pycbc_make_offline_grb_workflow index 23a3d262e54..f704c55b570 100644 --- a/bin/pygrb/pycbc_make_offline_grb_workflow +++ b/bin/pygrb/pycbc_make_offline_grb_workflow @@ -1,6 +1,6 @@ #!/usr/bin/env python -# Copyright (C) 2015 Andrew R. Williamson +# Copyright (C) 2015 Andrew R. Williamson, Francesco Pannarale # # This program is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by the @@ -79,7 +79,6 @@ sciSegsFile = _workflow.get_segments_file(wflow, 'science', 'segments-science', seg_dir) - sciSegs = {} for ifo in wflow.ifos: sciSegs[ifo] = sciSegsFile.segment_dict[ifo+':science'] @@ -130,14 +129,15 @@ else: sciSegs = segmentlistdict(sciSegs) if onSrc is None: - plot_met = make_grb_segments_plot(wflow, sciSegs, triggertime, triggername, - seg_dir, fail_criterion=offSrc) + plot_met = make_grb_segments_plot( + wflow, sciSegs, triggertime, triggername, + seg_dir, fail_criterion=offSrc) logging.info("Making segment plot and exiting.") sys.exit() else: plot_met = make_grb_segments_plot( - wflow, sciSegs, triggertime, triggername, seg_dir, - coherent_seg=offSrc[tuple(offSrc.keys())[0]][0]) + wflow, sciSegs, triggertime, triggername, seg_dir, + coherent_seg=offSrc[tuple(offSrc.keys())[0]][0]) segs_plot = _workflow.File(plot_met[0], plot_met[1], plot_met[2], file_url=plot_met[3]) segs_plot.add_pfn(segs_plot.cache_entry.path, site="local") @@ -193,10 +193,12 @@ if wflow.cp.has_option("workflow-condition_strain", "do-gating"): int(sciSegs[ifo][0][1])) ifos = sorted(sciSegs.keys()) -ifo = ifos[0] wflow.ifos = ifos datafind_veto_files = _workflow.FileList([]) +# Convenience variable to operate on sciSegs within this executable +ifo = ifos[0] + # GATING if wflow.cp.has_option("workflow-condition_strain", "do-gating"): logging.info("Creating gating jobs.") @@ -234,12 +236,17 @@ if wflow.cp.has_option("workflow-condition_strain", "do-gating"): gated_frames.convert_to_lal_cache().tofile( open(gated_cache.storage_path, "w")) datafind_files.append(gated_cache) - datafind_veto_files.extend(datafind_files) -ifo_list = sorted(sciSegs.keys()) -ifo = ifo_list[0] -ifos = ''.join(ifo_list) -wflow.ifos = ifos + +# Retrieve vetoes +veto_file = None +if wflow.cp.has_option("workflow-segments", "segments-vetoes"): + veto_file = _workflow.get_segments_file(wflow, + 'vetoes', + 'segments-vetoes', + seg_dir, + tags=['veto']) + datafind_veto_files.append(veto_file) # Config file consistency check for IPN GRBs if wflow.cp.has_option("workflow-inspiral", "ipn-search-points") \ @@ -262,6 +269,7 @@ else: if wflow.cp.has_option('workflow-inspiral', 'bank-veto-bank-file'): bank_veto_file = configparser_value_to_file(wflow.cp, 'workflow-inspiral', 'bank-veto-bank-file') + bank_veto_file.description += '_BANK_VETO_BANK' bank_veto_file = _workflow.FileList([bank_veto_file]) datafind_veto_files.extend(bank_veto_file) @@ -453,7 +461,8 @@ if post_proc_method == "PYGRB_OFFLINE": clustered_files, inj_find_files, full_bank_file, - seg_dir) + seg_dir, + veto_file=veto_file) logging.info('Leaving results module') all_files.extend(pp_files) diff --git a/bin/pygrb/pycbc_pygrb_minifollowups b/bin/pygrb/pycbc_pygrb_minifollowups index 1f13c549199..78c2b4c97e5 100644 --- a/bin/pygrb/pycbc_pygrb_minifollowups +++ b/bin/pygrb/pycbc_pygrb_minifollowups @@ -36,7 +36,6 @@ import pycbc.events import pycbc.results.pygrb_postprocessing_utils as ppu from pycbc.results import layout from pycbc.workflow.plotting import PlotExecutable -from pycbc.workflow.grb_utils import build_veto_filelist from pycbc.io.hdf import HFile __author__ = "Francesco Pannarale " @@ -72,10 +71,6 @@ def make_timeseries_plot(workflow, trig_file, snr_type, central_time, ifos=workflow.ifos, out_dir=out_dir, tags=tags+extra_tags).create_node() node.add_input_opt('--trig-file', trig_file) - # Pass the veto files - if workflow.cp.has_option('workflow', 'veto-files'): - veto_files = build_veto_filelist(workflow) - node.add_input_list_opt('--veto-files', veto_files) node.new_output_file_opt(workflow.analysis_time, '.png', '--output-file', tags=extra_tags) # Quantity to be displayed on the y-axis of the plot @@ -112,9 +107,6 @@ parser.add_argument('--followups-file', help="HDF file with the triggers/injections to follow up") parser.add_argument('--wiki-file', help="Name of file to save wiki-formatted table in") -parser.add_argument('--veto-files', nargs='+', action="store", - default=None, help="The location of the CATX veto " + - "files provided as a list of space-separated values.") wf.add_workflow_command_line_group(parser) wf.add_workflow_settings_cli(parser, include_subdax_opts=True) ppu.pygrb_add_bestnr_cut_opt(parser) @@ -143,8 +135,10 @@ num_events = int(workflow.cp.get_opt_tags('workflow-minifollowups', 'num-events', '')) num_events = min(num_events, len(fp['BestNR'][:])) -# Determine ifos used in the analysis +# File instance of the input trigger file trig_file = resolve_url_to_file(os.path.abspath(args.trig_file)) + +# Determine ifos used in the analysis ifos = ppu.extract_ifos(os.path.abspath(args.trig_file)) num_ifos = len(ifos) diff --git a/pycbc/results/pygrb_postprocessing_utils.py b/pycbc/results/pygrb_postprocessing_utils.py index 99e562f1df5..2869e62db6b 100644 --- a/pycbc/results/pygrb_postprocessing_utils.py +++ b/pycbc/results/pygrb_postprocessing_utils.py @@ -38,15 +38,7 @@ logger = logging.getLogger('pycbc.results.pygrb_postprocessing_utils') -# All/most of these final imports will become obsolete with hdf5 switch -try: - from ligo.lw import utils - from ligo.lw.table import Table - from ligo.segments.utils import fromsegwizard - from glue.ligolw import lsctables as glsctables - from glue.ligolw.ligolw import LIGOLWContentHandler -except ImportError: - pass +from ligo.segments.utils import fromsegwizard # ============================================================================= @@ -65,32 +57,28 @@ def pygrb_initialize_plot_parser(description=None): add_common_pycbc_options(parser) parser.add_argument("-o", "--output-file", default=None, help="Output file.") - parser.add_argument("--x-lims", action="store", default=None, - help="Comma separated minimum and maximum values " + - "for the horizontal axis. When using negative " + + parser.add_argument("--x-lims", default=None, + help="Comma separated minimum and maximum values " + "for the horizontal axis. When using negative " "values an equal sign after --x-lims is necessary.") - parser.add_argument("--y-lims", action="store", default=None, - help="Comma separated minimum and maximum values " + - "for the vertical axis. When using negative values " + + parser.add_argument("--y-lims", default=None, + help="Comma separated minimum and maximum values " + "for the vertical axis. When using negative values " "an equal sign after --y-lims is necessary.") parser.add_argument("--use-logs", default=False, action="store_true", help="Produce a log-log plot") - parser.add_argument("-i", "--ifo", default=None, help="IFO used for IFO " + + parser.add_argument("-i", "--ifo", default=None, help="IFO used for IFO " "specific plots") - parser.add_argument("-a", "--seg-files", nargs="+", action="store", - default=None, help="The location of the buffer, " + - "onsource and offsource segment files.") - parser.add_argument("-V", "--veto-files", nargs="+", action="store", - default=None, help="The location of the CATX veto " + - "files provided as a list of space-separated values.") - parser.add_argument("-b", "--veto-category", action="store", type=int, - default=None, help="Apply vetoes up to this level " + - "inclusive.") + parser.add_argument("-a", "--seg-files", nargs="+", + default=[], help="The location of the buffer, " + "onsource and offsource txt segment files.") + parser.add_argument("-V", "--veto-file", + help="The location of the xml veto file.") parser.add_argument('--plot-title', default=None, - help="If provided, use the given string as the plot " + + help="If provided, use the given string as the plot " "title.") parser.add_argument('--plot-caption', default=None, - help="If provided, use the given string as the plot " + + help="If provided, use the given string as the plot " "caption") return parser @@ -98,8 +86,8 @@ def pygrb_initialize_plot_parser(description=None): def pygrb_add_slide_opts(parser): """Add to parser object arguments related to short timeslides""" parser.add_argument("--slide-id", type=str, default='0', - help="If all, the plotting scripts will use triggers" + - "from all short slides.") + help="Select a specific slide or set to all to plot " + "results from all short slides.") def slide_opts_helper(args): @@ -119,31 +107,31 @@ def pygrb_add_injmc_opts(parser): """Add to parser object the arguments used for Monte-Carlo on distance.""" if parser is None: parser = argparse.ArgumentParser() - parser.add_argument("-M", "--num-mc-injections", action="store", - type=int, default=100, help="Number of Monte " + + parser.add_argument("-M", "--num-mc-injections", + type=int, default=100, help="Number of Monte " "Carlo injection simulations to perform.") - parser.add_argument("-S", "--seed", action="store", type=int, + parser.add_argument("-S", "--seed", type=int, default=1234, help="Seed to initialize Monte Carlo.") - parser.add_argument("-U", "--upper-inj-dist", action="store", - type=float, default=1000, help="The upper distance " + + parser.add_argument("-U", "--upper-inj-dist", + type=float, default=1000, help="The upper distance " "of the injections in Mpc, if used.") - parser.add_argument("-L", "--lower-inj-dist", action="store", - type=float, default=0, help="The lower distance of " + + parser.add_argument("-L", "--lower-inj-dist", + type=float, default=0, help="The lower distance of " "the injections in Mpc, if used.") - parser.add_argument("-n", "--num-bins", action="store", type=int, - default=0, help="The number of bins used to " + + parser.add_argument("-n", "--num-bins", type=int, + default=0, help="The number of bins used to " "calculate injection efficiency.") - parser.add_argument("-w", "--waveform-error", action="store", - type=float, default=0, help="The standard deviation " + + parser.add_argument("-w", "--waveform-error", + type=float, default=0, help="The standard deviation " "to use when calculating the waveform error.") for ifo in ["g1", "h1", "k1", "l1", "v1"]: - parser.add_argument(f"--{ifo}-cal-error", action="store", type=float, - default=0, help="The standard deviation to use " + - f"when calculating the {ifo.upper()} " + + parser.add_argument(f"--{ifo}-cal-error", type=float, + default=0, help="The standard deviation to use " + f"when calculating the {ifo.upper()} " "calibration amplitude error.") - parser.add_argument(f"--{ifo}-dc-cal-error", action="store", - type=float, default=1.0, help="The scaling " + - "factor to use when calculating the " + + parser.add_argument(f"--{ifo}-dc-cal-error", + type=float, default=1.0, help="The scaling " + "factor to use when calculating the " f"{ifo.upper()} calibration amplitude error.") @@ -151,36 +139,36 @@ def pygrb_add_bestnr_opts(parser): """Add to the parser object the arguments used for BestNR calculation""" if parser is None: parser = argparse.ArgumentParser() - parser.add_argument("-Q", "--chisq-index", action="store", type=float, - default=6.0, help="chisq_index for newSNR " + + parser.add_argument("-Q", "--chisq-index", type=float, + default=6.0, help="chisq_index for newSNR " "calculation (default: 6)") - parser.add_argument("-N", "--chisq-nhigh", action="store", type=float, - default=2.0, help="chisq_nhigh for newSNR " + + parser.add_argument("-N", "--chisq-nhigh", type=float, + default=2.0, help="chisq_nhigh for newSNR " "calculation (default: 2") def pygrb_add_null_snr_opts(parser): """Add to the parser object the arguments used for null SNR calculation and null SNR cut.""" - parser.add_argument("-A", "--null-snr-threshold", action="store", + parser.add_argument("-A", "--null-snr-threshold", default=5.25, type=float, help="Null SNR threshold for null SNR cut " "(default: 5.25)") - parser.add_argument("-T", "--null-grad-thresh", action="store", type=float, - default=20., help="Threshold above which to " + + parser.add_argument("-T", "--null-grad-thresh", type=float, + default=20., help="Threshold above which to " "increase the values of the null SNR cut") - parser.add_argument("-D", "--null-grad-val", action="store", type=float, - default=0.2, help="Rate the null SNR cut will " + + parser.add_argument("-D", "--null-grad-val", type=float, + default=0.2, help="Rate the null SNR cut will " "increase above the threshold") def pygrb_add_single_snr_cut_opt(parser): """Add to the parser object an argument to place a threshold on single detector SNR.""" - parser.add_argument("-B", "--sngl-snr-threshold", action="store", - type=float, default=4.0, help="Single detector SNR " + - "threshold, the two most sensitive detectors " + + parser.add_argument("-B", "--sngl-snr-threshold", + type=float, default=4.0, help="Single detector SNR " + "threshold, the two most sensitive detectors " "should have SNR above this.") @@ -190,14 +178,15 @@ def pygrb_add_bestnr_cut_opt(parser): parser = argparse.ArgumentParser() parser.add_argument("--newsnr-threshold", type=float, metavar='THRESHOLD', default=0., - help="Cut triggers with NewSNR less than THRESHOLD" + + help="Cut triggers with NewSNR less than THRESHOLD. " "Default 0: all events are considered.") # ============================================================================= -# Wrapper to pick triggers with certain slide_ids +# Wrapper to pick triggers with a given slide_id # ============================================================================= -def slide_filter(trig_file, data, slide_id=None): +# Underscore starts name of functions not called outside this file +def _slide_filter(trig_file, data, slide_id=None): """ This function adds the capability to select triggers with specific slide_ids during the postprocessing stage of PyGRB. @@ -214,12 +203,13 @@ def slide_filter(trig_file, data, slide_id=None): def _read_seg_files(seg_files): """Read segments txt files""" - if len(seg_files) != 3 or seg_files is None: + if len(seg_files) != 3: err_msg = "The location of three segment files is necessary." err_msg += "[bufferSeg.txt, offSourceSeg.txt, onSourceSeg.txt]" raise RuntimeError(err_msg) times = {} + # Needs to be in this order for consistency with build_segment_filelist keys = ["buffer", "off", "on"] for key, seg_file in zip(keys, seg_files): @@ -232,127 +222,32 @@ def _read_seg_files(seg_files): return times -# ============================================================================= -# Function to load a table from an xml file -# ============================================================================= -def load_xml_table(file_name, table_name): - """Load xml table from file.""" - - xml_doc = utils.load_filename( - file_name, - compress='auto', - contenthandler=glsctables.use_in(LIGOLWContentHandler) - ) - return Table.get_table(xml_doc, table_name) - - -# ============================================================================= -# Function to load segments from an xml file -# ============================================================================= -def _load_segments_from_xml(xml_doc, return_dict=False, select_id=None): - """Read a ligo.segments.segmentlist from the file object file containing an - xml segment table. - - Parameters - ---------- - xml_doc: name of segment xml file - - Keyword Arguments: - return_dict : [ True | False ] - return a ligo.segments.segmentlistdict containing coalesced - ligo.segments.segmentlists keyed by seg_def.name for each entry - in the contained segment_def_table. Default False - select_id : int - return a ligo.segments.segmentlist object containing only - those segments matching the given segment_def_id integer - - """ - - # Load SegmentDefTable and SegmentTable - seg_def_table = load_xml_table(xml_doc, - glsctables.SegmentDefTable.tableName) - seg_table = load_xml_table(xml_doc, glsctables.SegmentTable.tableName) - - if return_dict: - segs = segments.segmentlistdict() - else: - segs = segments.segmentlist() - - seg_id = {} - for seg_def in seg_def_table: - seg_id[int(seg_def.segment_def_id)] = str(seg_def.name) - if return_dict: - segs[str(seg_def.name)] = segments.segmentlist() - - for seg in seg_table: - if return_dict: - segs[seg_id[int(seg.segment_def_id)]]\ - .append(segments.segment(seg.start_time, seg.end_time)) - continue - if select_id and int(seg.segment_def_id) == select_id: - segs.append(segments.segment(seg.start_time, seg.end_time)) - continue - segs.append(segments.segment(seg.start_time, seg.end_time)) - - if return_dict: - for seg_name in seg_id.values(): - segs[seg_name] = segs[seg_name].coalesce() - else: - segs = segs.coalesce() - - return segs - - # ============================================================================= # Function to extract vetoes # ============================================================================= -def _extract_vetoes(all_veto_files, ifos, veto_cat): - """Extracts vetoes from veto filelist""" - - if all_veto_files and (veto_cat is None): - err_msg = "Must supply veto category to apply vetoes." - raise RuntimeError(err_msg) +def _extract_vetoes(veto_file, ifos, offsource): + """Extracts the veto segments from the veto File""" - # Initialize veto containers + clean_segs = {} vetoes = segments.segmentlistdict() - for ifo in ifos: - vetoes[ifo] = segments.segmentlist() - - veto_files = [] - veto_cats = range(2, veto_cat+1) - for cat in veto_cats: - veto_files += [vf for vf in all_veto_files if "CAT"+str(cat) in vf] - n_found = len(veto_files) - n_expected = len(ifos)*len(veto_cats) - if n_found != n_expected: - err_msg = f"Found {n_found} veto files instead of the expected " - err_msg += f"{n_expected}; check the options." - raise RuntimeError(err_msg) - - # Construct veto list from veto filelist - if veto_files: - for veto_file in veto_files: - ifo = os.path.basename(veto_file)[:2] - if ifo in ifos: - # This returns a coalesced list of the vetoes - tmp_veto_segs = _load_segments_from_xml(veto_file) - for entry in tmp_veto_segs: - vetoes[ifo].append(entry) - for ifo in ifos: - vetoes[ifo].coalesce() - return vetoes - - -# ============================================================================= -# Function to get the ID numbers from a LIGO-LW table -# ============================================================================= -def _get_id_numbers(ligolw_table, column): - """Grab the IDs of a LIGO-LW table""" + if veto_file: + for ifo in ifos: + segs = veto.select_segments_by_definer(veto_file, ifo=ifo) + segs.coalesce() + clean_segs[ifo] = segs - ids = [int(getattr(row, column)) for row in ligolw_table] + if clean_segs: + for ifo in ifos: + vetoes[ifo] = segments.segmentlist([offsource]) - clean_segs[ifo] + vetoes.coalesce() + for ifo in ifos: + for v in vetoes[ifo]: + v_span = v[1] - v[0] + logging.info("%ds of data vetoed at GPS time %d", + v_span, v[0]) - return ids + return vetoes # ============================================================================= @@ -362,7 +257,7 @@ def _slide_vetoes(vetoes, slide_dict_or_list, slide_id, ifos): """Build a dictionary (indexed by ifo) of time-slid vetoes""" # Copy vetoes - if vetoes is not None: + if vetoes: slid_vetoes = copy.deepcopy(vetoes) # Slide them for ifo in ifos: @@ -373,25 +268,25 @@ def _slide_vetoes(vetoes, slide_dict_or_list, slide_id, ifos): return slid_vetoes -# -# Used (also) in executables -# - # ============================================================================= -# Functions to load triggers +# Recursive function to reach all datasets in an HDF file handle # ============================================================================= -def dataset_iterator(g, prefix=''): - """Reach all datasets in and HDF file""" +def _dataset_iterator(g, prefix=''): + """Reach all datasets in an HDF file handle""" for key, item in g.items(): # Avoid slash as first character - path = prefix[1:] + '/' + key + pref = prefix[1:] if prefix.startswith('/') else prefix + path = pref + '/' + key if isinstance(item, h5py.Dataset): yield (path, item) elif isinstance(item, h5py.Group): - yield from dataset_iterator(item, path) + yield from _dataset_iterator(item, path) +# ============================================================================= +# Functions to load triggers +# ============================================================================= def load_triggers(input_file, ifos, vetoes, rw_snr_threshold=None, slide_id=None): """Loads triggers from PyGRB output file, returning a dictionary""" @@ -404,10 +299,6 @@ def load_triggers(input_file, ifos, vetoes, rw_snr_threshold=None, ifo_ids[ifo] = trigs[ifo+'/event_id'][:] trigs.close() - if vetoes is not None: - # Developers: see PR 3972 for previous implementation - raise NotImplementedError - # Apply the reweighted SNR cut on the reweighted SNR if rw_snr_threshold is not None: rw_snr = reweightedsnr_cut(rw_snr, rw_snr_threshold) @@ -485,7 +376,8 @@ def get_antenna_dist_factor(antenna, ra, dec, geocent_time, inc=0.0): # Construct sorted triggers from trials # ============================================================================= def sort_trigs(trial_dict, trigs, slide_dict, seg_dict): - """Constructs sorted triggers from a trials dictionary""" + """Constructs sorted triggers from a trials dictionary for the slides + requested via slide_dict.""" sorted_trigs = {} @@ -494,7 +386,8 @@ def sort_trigs(trial_dict, trigs, slide_dict, seg_dict): sorted_trigs[slide_id] = [] for slide_id, event_id in zip(trigs['network/slide_id'], trigs['network/event_id']): - sorted_trigs[slide_id].append(event_id) + if slide_id in slide_dict: + sorted_trigs[slide_id].append(event_id) for slide_id in slide_dict: # These can only *reduce* the analysis time @@ -518,17 +411,25 @@ def sort_trigs(trial_dict, trigs, slide_dict, seg_dict): # END OF CHECK # # Keep triggers that are in trial_dict + num_trigs_before = len(sorted_trigs[slide_id]) sorted_trigs[slide_id] = [event_id for event_id in sorted_trigs[slide_id] if trigs['network/end_time_gc'][ trigs['network/event_id'] == event_id][0] in trial_dict[slide_id]] + # Check that the number of triggers has not increased after vetoes + assert len(sorted_trigs[slide_id]) <= num_trigs_before,\ + f"Slide {slide_id} has {num_trigs_before} triggers before the "\ + f"trials dictionary was used and {len(sorted_trigs[slide_id])} "\ + "after. This should not happen." + # END OF CHECK # + return sorted_trigs # ============================================================================= -# Extract basic trigger properties and store them as dictionaries +# Extract trigger properties and store them as dictionaries # ============================================================================= def extract_basic_trig_properties(trial_dict, trigs, slide_dict, seg_dict, opts): @@ -567,8 +468,8 @@ def extract_basic_trig_properties(trial_dict, trigs, slide_dict, seg_dict, # ============================================================================= # Function to extract ifos from hdfs # ============================================================================= -def extract_ifos(trig_file): - """Extracts IFOs from hdf file""" +def extract_ifos(trig_file, ifo=None): + """Extracts IFOs from hdf file and checks for presence of a specific IFO""" # Load hdf file hdf_file = HFile(trig_file, 'r') @@ -576,31 +477,17 @@ def extract_ifos(trig_file): # Extract IFOs ifos = sorted(list(hdf_file.keys())) - # Remove 'network' key from list of ifos - if 'network' in ifos: - ifos.remove('network') + # Remove unwanted keys from key list to reduce it to the ifos + for key in ['network', 'found', 'missed']: + if key in ifos: + ifos.remove(key) - return ifos - - -# ============================================================================= -# Function to extract IFOs and vetoes -# ============================================================================= -def extract_ifos_and_vetoes(trig_file, veto_files, veto_cat): - """Extracts IFOs from HDF files and vetoes from a directory""" - - logger.info("Extracting IFOs and vetoes.") - - # Extract IFOs - ifos = extract_ifos(trig_file) - - # Extract vetoes - if veto_files is not None: - vetoes = _extract_vetoes(veto_files, ifos, veto_cat) - else: - vetoes = None + # Exit gracefully if the requested IFO is not available + if ifo and ifo not in ifos: + err_msg = "The IFO selected with --ifo is unavailable in the data." + raise RuntimeError(err_msg) - return ifos, vetoes + return ifos # ============================================================================= @@ -608,6 +495,7 @@ def extract_ifos_and_vetoes(trig_file, veto_files, veto_cat): # ============================================================================= def load_time_slides(hdf_file_path): """Loads timeslides from PyGRB output file as a dictionary""" + logging.info("Loading timeslides.") hdf_file = HFile(hdf_file_path, 'r') ifos = extract_ifos(hdf_file_path) ids = numpy.arange(len(hdf_file[f'{ifos[0]}/search/time_slides'])) @@ -641,6 +529,8 @@ def load_segment_dict(hdf_file_path): {slide_id: segmentlist(segments analyzed)} """ + logging.info("Loading segments.") + # Long time slides will require mapping between slides and segments hdf_file = HFile(hdf_file_path, 'r') ifos = extract_ifos(hdf_file_path) @@ -710,7 +600,10 @@ def construct_trials(seg_files, seg_dict, ifos, slide_dict, vetoes): iter_int += 1 - return trial_dict + total_trials = sum([len(trial_dict[slide_id]) for slide_id in slide_dict]) + logging.info("%d trials generated.", total_trials) + + return trial_dict, total_trials # ============================================================================= @@ -730,7 +623,7 @@ def sort_stat(time_veto_max_stat): # Find max and median of loudest SNRs or BestNRs # ============================================================================= def max_median_stat(slide_dict, time_veto_max_stat, trig_stat, total_trials): - """Deterine the maximum and median of the loudest SNRs/BestNRs""" + """Return maximum and median of trig_stat and sorted time_veto_max_stat""" max_stat = max([trig_stat[slide_id].max() if trig_stat[slide_id].size else 0 for slide_id in slide_dict]) @@ -776,9 +669,12 @@ def mc_cal_wf_errs(num_mc_injs, inj_dists, cal_err, wf_err, max_dc_cal_err): def get_coinc_snr(trigs_or_injs): """ Calculate coincident SNR using coherent and null SNRs""" - coh_snr_sq = numpy.square(trigs_or_injs['network/coherent_snr'][:]) - null_snr_sq = numpy.square(trigs_or_injs['network/null_snr'][:]) - coinc_snr = numpy.sqrt(coh_snr_sq + null_snr_sq) + coinc_snr = numpy.array([]) + if 'network/coherent_snr' in trigs_or_injs.keys() and \ + 'network/null_snr' in trigs_or_injs.keys(): + coh_snr_sq = numpy.square(trigs_or_injs['network/coherent_snr'][:]) + null_snr_sq = numpy.square(trigs_or_injs['network/null_snr'][:]) + coinc_snr = numpy.sqrt(coh_snr_sq + null_snr_sq) return coinc_snr diff --git a/pycbc/workflow/grb_utils.py b/pycbc/workflow/grb_utils.py index 9ceb6b17d4c..c62a2241e9e 100644 --- a/pycbc/workflow/grb_utils.py +++ b/pycbc/workflow/grb_utils.py @@ -314,7 +314,7 @@ def setup_pygrb_pp_workflow(wf, pp_dir, seg_dir, segment, bank_file, exe_class = _select_grb_pp_class(wf, "trig_combiner") job_instance = exe_class(wf.cp, "trig_combiner") # Create node for coherent no injections jobs - node, trig_files = job_instance.create_node(wf.ifos, seg_dir, segment, + node, trig_files = job_instance.create_node(wf.ifo_string, seg_dir, segment, insp_files, pp_dir, bank_file) wf.add_node(node) @@ -445,20 +445,10 @@ def create_node(self, inj_files, inj_insp_files, bank_file, return node, out_file -def build_veto_filelist(workflow): - """Construct a FileList instance containing all veto xml files""" - - veto_dir = workflow.cp.get('workflow', 'veto-directory') - veto_files = glob.glob(veto_dir + '/*CAT*.xml') - veto_files = [resolve_url_to_file(vf) for vf in veto_files] - veto_files = FileList(veto_files) - - return veto_files - - def build_segment_filelist(seg_dir): """Construct a FileList instance containing all segments txt files""" + # Needs to be in this order for consistency with _read_seg_files file_names = ["bufferSeg.txt", "offSourceSeg.txt", "onSourceSeg.txt"] seg_files = [os.path.join(seg_dir, fn) for fn in file_names] seg_files = [resolve_url_to_file(sf) for sf in seg_files] @@ -470,7 +460,7 @@ def build_segment_filelist(seg_dir): def make_pygrb_plot(workflow, exec_name, out_dir, ifo=None, inj_file=None, trig_file=None, onsource_file=None, bank_file=None, - seg_files=None, tags=None, **kwargs): + seg_files=None, veto_file=None, tags=None, **kwargs): """Adds a node for a plot of PyGRB results to the workflow""" tags = [] if tags is None else tags @@ -486,18 +476,13 @@ def make_pygrb_plot(workflow, exec_name, out_dir, node = PlotExecutable(workflow.cp, exec_name, ifos=workflow.ifos, out_dir=out_dir, tags=tags+extra_tags).create_node() - if trig_file is not None: + if trig_file: node.add_input_opt('--trig-file', trig_file) # Pass the veto and segment files and options - if workflow.cp.has_option('workflow', 'veto-category'): - node.add_opt('--veto-category', - workflow.cp.get('workflow', 'veto-category')) - # FIXME: move to next if within previous one and else Raise error? - if workflow.cp.has_option('workflow', 'veto-files'): - veto_files = build_veto_filelist(workflow) - node.add_input_list_opt('--veto-files', veto_files) - # TODO: check this for pygrb_plot_stats_distribution - # They originally wanted seg_files + if seg_files: + node.add_input_list_opt('--seg-files', seg_files) + if veto_file: + node.add_input_opt('--veto-file', veto_file) if exec_name in ['pygrb_plot_injs_results', 'pygrb_plot_snr_timeseries']: trig_time = workflow.cp.get('workflow', 'trigger-time') @@ -505,15 +490,13 @@ def make_pygrb_plot(workflow, exec_name, out_dir, # Pass the injection file as an input File instance if inj_file is not None and exec_name not in \ ['pygrb_plot_skygrid', 'pygrb_plot_stats_distribution']: - fm_file = inj_file - node.add_input_opt('--found-missed-file', fm_file) + node.add_input_opt('--found-missed-file', inj_file) # IFO option if ifo: node.add_opt('--ifo', ifo) # Output files and final input file (passed as a File instance) if exec_name == 'pygrb_efficiency': # In this case tags[0] is the offtrial number - node.add_input_list_opt('--seg-files', seg_files) node.add_input_opt('--bank-file', bank_file) node.add_opt('--trial-name', tags[0]) node.add_opt('--injection-set-name', tags[1]) @@ -544,7 +527,6 @@ def make_pygrb_plot(workflow, exec_name, out_dir, node.add_opt('--y-variable', tags[0]) # Quantity to be displayed on the x-axis of the plot elif exec_name == 'pygrb_plot_stats_distribution': - node.add_input_list_opt('--seg-files', seg_files) node.add_opt('--x-variable', tags[0]) elif exec_name == 'pygrb_plot_injs_results': # Variables to plot on x and y axes @@ -600,7 +582,8 @@ def make_pygrb_info_table(workflow, exec_name, out_dir, in_files=None, def make_pygrb_injs_tables(workflow, out_dir, bank_file, off_file, seg_files, - inj_file=None, on_file=None, tags=None): + inj_file=None, on_file=None, veto_file=None, + tags=None): """ Adds a job to make quiet-found and missed-found injection tables, or loudest trigger(s) table.""" @@ -620,16 +603,14 @@ def make_pygrb_injs_tables(workflow, out_dir, bank_file, off_file, seg_files, # Offsource input file (or equivalently trigger file for injections) offsource_file = off_file node.add_input_opt('--offsource-file', offsource_file) - # Pass the veto and segment files and options - if workflow.cp.has_option('workflow', 'veto-files'): - veto_files = build_veto_filelist(workflow) - node.add_input_list_opt('--veto-files', veto_files) + # Pass the veto and segment files (as File instances) + if veto_file: + node.add_input_opt('--veto-file', veto_file) node.add_input_list_opt('--seg-files', seg_files) # Handle input/output for injections - if inj_file is not None: + if inj_file: # Found-missed injection file (passed as File instance) - fm_file = inj_file - node.add_input_opt('--found-missed-file', fm_file) + node.add_input_opt('--found-missed-file', inj_file) # Missed-found and quiet-found injections html output files for mf_or_qf in ['missed-found', 'quiet-found']: mf_or_qf_tags = [mf_or_qf.upper().replace('-', '_')] @@ -643,11 +624,10 @@ def make_pygrb_injs_tables(workflow, out_dir, bank_file, off_file, seg_files, # Handle input/output for onsource/offsource else: src_type = 'offsource-trigs' - if on_file is not None: + if on_file: src_type = 'onsource-trig' - # Onsource input file (passed as File instance) - onsource_file = on_file - node.add_input_opt('--onsource-file', onsource_file) + # Pass onsource input File instance + node.add_input_opt('--onsource-file', on_file) # Loudest offsource/onsource triggers html and h5 output files src_type_tags = [src_type.upper().replace('-', '_')] node.new_output_file_opt(workflow.analysis_time, '.html', @@ -665,7 +645,8 @@ def make_pygrb_injs_tables(workflow, out_dir, bank_file, off_file, seg_files, # Based on setup_single_det_minifollowups def setup_pygrb_minifollowups(workflow, followups_file, trigger_file, - dax_output, out_dir, tags=None): + dax_output, out_dir, seg_files=None, + veto_file=None, tags=None): """ Create plots that followup the the loudest PyGRB triggers or missed injections from an HDF file. @@ -680,6 +661,10 @@ def setup_pygrb_minifollowups(workflow, followups_file, trigger_file, dax_output: The directory that will contain the dax file out_dir: path The directory to store minifollowups result plots and files + seg_files: {pycbc.workflow.FileList, optional} + The list of segments Files + veto_file: {pycbc.workflow.File, optional} + The veto definer file tags: {None, optional} Tags to add to the minifollowups executables """ @@ -713,10 +698,11 @@ def setup_pygrb_minifollowups(workflow, followups_file, trigger_file, node.add_input_opt('--trig-file', trigger_file) - # Grab and pass all necessary files - if workflow.cp.has_option('workflow', 'veto-files'): - veto_files = build_veto_filelist(workflow) - node.add_input_list_opt('--veto-files', veto_files) + # Grab and pass all necessary files as File instances + if seg_files: + node.add_input_list_opt('--seg-files', seg_files) + if veto_file: + node.add_input_opt('--veto-file', veto_file) node.add_input_opt('--config-files', config_file) node.add_input_opt('--followups-file', followups_file) node.add_opt('--wiki-file', wikifile) @@ -748,7 +734,8 @@ def setup_pygrb_minifollowups(workflow, followups_file, trigger_file, def setup_pygrb_results_workflow(workflow, res_dir, trig_files, - inj_files, bank_file, seg_dir, tags=None, + inj_files, bank_file, seg_dir, + veto_file=None,tags=None, explicit_dependencies=None): """Create subworkflow to produce plots, tables, and results webpage for a PyGRB analysis. @@ -762,9 +749,13 @@ def setup_pygrb_results_workflow(workflow, res_dir, trig_files, trig_files: FileList of trigger files inj_files: FileList of injection results bank_file: The template bank File object + seg_dir: The directory path with the segments files + veto_file: {None, optional} + The veto File object tags: {None, optional} Tags to add to the executables - explicit_dependencies: nodes that must precede this + explicit_dependencies: {None, optional} + nodes that must precede this """ tags = [] if tags is None else tags @@ -774,18 +765,17 @@ def setup_pygrb_results_workflow(workflow, res_dir, trig_files, # Create the node exe = Executable(workflow.cp, 'pygrb_results_workflow', - ifos=workflow.ifos, out_dir=dax_output, + ifos=workflow.ifo_string, out_dir=dax_output, tags=tags) node = exe.create_node() # Grab and pass all necessary files node.add_input_list_opt('--trig-files', trig_files) - if workflow.cp.has_option('workflow', 'veto-files'): - veto_files = build_veto_filelist(workflow) - node.add_input_list_opt('--veto-files', veto_files) # node.add_input_opt('--config-files', config_file) node.add_input_list_opt('--inj-files', inj_files) node.add_input_opt('--bank-file', bank_file) node.add_opt('--segment-dir', seg_dir) + if veto_file: + node.add_input_opt('--veto-file', veto_file) if tags: node.add_list_opt('--tags', tags) diff --git a/pycbc/workflow/jobsetup.py b/pycbc/workflow/jobsetup.py index dcd771f8ff1..34b20bfdbdc 100644 --- a/pycbc/workflow/jobsetup.py +++ b/pycbc/workflow/jobsetup.py @@ -275,21 +275,20 @@ def multi_ifo_coherent_job_setup(workflow, out_files, curr_exe_job, tags = [] data_seg, job_valid_seg = curr_exe_job.get_valid_times() curr_out_files = FileList([]) - if 'IPN' in datafind_outs[-1].description \ - and 'bank_veto_bank' in datafind_outs[-2].description: - # FIXME: This looks like a really nasty hack for the GRB code. - # This should be fixed properly to avoid strange behaviour! - ipn_sky_points = datafind_outs[-1] - bank_veto = datafind_outs[-2] - frame_files = datafind_outs[:-2] - else: - ipn_sky_points = None - if 'bank_veto_bank' in datafind_outs[-1].name: - bank_veto = datafind_outs[-1] - frame_files = datafind_outs[:-1] - else: - bank_veto = None - frame_files = datafind_outs + ipn_sky_points = None + veto_file = None + bank_veto = None + input_files = FileList(datafind_outs) + for f in datafind_outs: + if 'IPN_SKY_POINTS' in f.description: + ipn_sky_points = f + input_files.remove(f) + elif 'vetoes' in f.description: + veto_file = f + input_files.remove(f) + elif 'INPUT_BANK_VETO_BANK' in f.description: + bank_veto = f + input_files.remove(f) split_bank_counter = 0 @@ -298,7 +297,7 @@ def multi_ifo_coherent_job_setup(workflow, out_files, curr_exe_job, tag = list(tags) tag.append(split_bank.tag_str) node = curr_exe_job.create_node(data_seg, job_valid_seg, - parent=split_bank, dfParents=frame_files, + parent=split_bank, dfParents=input_files, bankVetoBank=bank_veto, ipn_file=ipn_sky_points, slide=slide_dict, tags=tag) workflow.add_node(node)