bin/gwin_make_inj_workflow
#!/usr/bin/env python # Copyright (C) 2017 Christopher M. Biwer, Alexander Harvey Nitz## This program is free software; you can redistribute it and/or modify it# under the terms of the GNU General Public License as published by the# Free Software Foundation; either version 3 of the License, or (at your# option) any later version.## This program is distributed in the hope that it will be useful, but# WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General# Public License for more details.## You should have received a copy of the GNU General Public License along# with this program; if not, write to the Free Software Foundation, Inc.,# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.""" Creates a DAX for a parameter estimation injection study.""" import argparseimport loggingimport osimport Pegasus.DAX3 as daximport socketimport sysfrom pycbc.results import layoutfrom pycbc.results import metadatafrom pycbc.results import versioningfrom pycbc.workflow import configurationfrom pycbc.workflow import corefrom pycbc.workflow import jobsetupfrom pycbc.workflow import plotting from gwin import (__version__, workflow as gwin_workflow) # set command line parserparser = argparse.ArgumentParser(description=__doc__[1:]) # version optionparser.add_argument( "--version", action="version", version=__version__, help="Prints version information.") # workflow optionsparser.add_argument( "--workflow-name", default="inference_injection_run", help="Name of the workflow to append in various places.")parser.add_argument( "--output-dir", default=None, help="Path that will contain output data files from the workflow.")parser.add_argument( "--output-map", default="output.map", help="Path to output map file.")parser.add_argument( "--output-file", required=True, help="Path to DAX file.") # inference optionsparser.add_argument( "--inference-config-file", type=str, required=True, help="WorkflowConfigParser parsable file with proir information.") # input data for workflow optionsparser.add_argument( "--data-type", choices=["analytical", "simulated_data", "detector_data"], default="detector_data", help="Set to 'analytical' to use test likelihood or set to " "'simulated_data' to use simulated detector data.")parser.add_argument( "--gps-end-time", type=float, nargs="+", default=None, help="Trigger time to analyze. You only need to use this option " "with ``--data-type detector_data``.") # add option groupsconfiguration.add_workflow_command_line_group(parser) # parser command lineopts = parser.parse_args() # log to stdout until we know where the path to log output filelog_format = "%(asctime)s:%(levelname)s : %(message)s"logging.basicConfig(format=log_format, level=logging.INFO) # create workflow and sub-workflowscontainer = core.Workflow(opts, opts.workflow_name)workflow = core.Workflow(opts, opts.workflow_name + "-main")finalize_workflow = core.Workflow(opts, opts.workflow_name + "-finalization") # sections for output HTML pagesrdir = layout.SectionNumber("results", ["posteriors", "samples", "workflow"]) # make data output directorycore.makedir(opts.output_dir) # make results directoriescore.makedir(rdir.base)core.makedir(rdir["workflow"]) # create files to store workflow loglog_file_txt = core.File(workflow.ifos, "WORKFLOW-LOG", workflow.analysis_time, extension=".txt", directory=rdir["workflow"])log_file_html = core.File(workflow.ifos, "WORKFLOW-LOG", workflow.analysis_time, extension=".html", directory=rdir["workflow"]) # switch saving log from stdout to filelogging.basicConfig(format=log_format, level=logging.INFO, filename=log_file_txt.storage_path, filemode="w")log_file = logging.FileHandler(filename=log_file_txt.storage_path, mode="w")log_file.setLevel(logging.INFO)formatter = logging.Formatter(log_format)log_file.setFormatter(formatter)logging.getLogger("").addHandler(log_file)logging.info("Created log file {}".format(log_file_txt.storage_path)) # typecast str from command line to File instancesconfig_file = core.File.from_path(opts.inference_config_file) # construct Executable for creating injectionscreate_injections_exe = jobsetup.PycbcCreateInjectionsExecutable( workflow.cp, "create_injections", ifo=workflow.ifos, out_dir=opts.output_dir) # construct Executable for running samplerinference_exe = jobsetup.PycbcInferenceExecutable( workflow.cp, "inference", ifo=workflow.ifos, out_dir=opts.output_dir) # get channel names from workflow configuration filechannel_names = {}for ifo in workflow.ifos: channel_names[ifo] = workflow.cp.get_opt_tags( "workflow", "%s-channel-name" % ifo.lower(), "") # figure out what parameters user wants to plot from workflow configurationplot_groups = {}for option in workflow.cp.options("workflow-inference"): if option.startswith("plot-group-"): group = option.replace("plot-group-", "").replace("-", "_") plot_groups[group] = workflow.cp.get_opt_tag( "workflow", option, "inference").split(" ")all_plot_parameters = sorted([param for group in plot_groups.values() for param in group])unique_plot_parameters = set(all_plot_parameters) # formatted strings for output dirspost_group_fmt = "posteriors/{}_posteriors"sample_group_fmt = "samples/{}_samples" # loop over number of injectionsn_injections = int(workflow.cp.get_opt_tags( "workflow-inference", "num-injections", ""))inference_files = core.FileList([])post_files = core.FileList([])post_group_files = {g : core.FileList([]) for g in plot_groups.keys()}post_table_files = core.FileList([])accept_files = core.FileList([])sample_group_files = {g : core.FileList([]) for g in plot_groups.keys()}for i in range(n_injections): # make node for drawing injection parameter values if not opts.data_type == "analytical": node, injection_file = create_injections_exe.create_node(config_file, seed=i, tags=[str(i)]) workflow += node else: injection_file = None # set fake strain seed if opts.data_type == "simulated_data": fake_strain_seed = {ifo : j + i * len(workflow.ifos) for j, ifo in enumerate(workflow.ifos)} else: fake_strain_seed = None # make node for running sampler node, inference_file = inference_exe.create_node( channel_names, config_file, injection_file=injection_file, seed=i, fake_strain_seed=fake_strain_seed, tags=[str(i)]) inference_files.append(inference_file) workflow += node # make node for writing HTML table of parameters post_table_files += gwin_workflow.make_inference_summary_table( workflow, inference_file, rdir["posteriors"], variable_params=unique_plot_parameters, analysis_seg=workflow.analysis_time, tags=[str(i)]) # make node for plotting all parameters posteriors post_files += gwin_workflow.make_inference_posterior_plot( workflow, inference_file, rdir["posteriors"], parameters=unique_plot_parameters, analysis_seg=workflow.analysis_time, tags=[str(i)]) # make node for plotting acceptance rate accept_files += gwin_workflow.make_inference_acceptance_rate_plot( workflow, inference_file, rdir["samples"], analysis_seg=workflow.analysis_time, tags=[str(i)]) # plot grouped parameters for group in plot_groups.keys(): # make nodes for plotting grouped-parameters posteriors parameters = plot_groups[group] post_group_files[group] += \ gwin_workflow.make_inference_posterior_plot( workflow, inference_file, rdir[post_group_fmt.format(group)], parameters=parameters, analysis_seg=workflow.analysis_time, tags=[str(i), group]) # make nodes for plotting sample as function of sampler iterations for j, parameter in enumerate(plot_groups[group]): sample_group_files[group] += \ gwin_workflow.make_inference_samples_plot( workflow, inference_file, rdir[sample_group_fmt.format(group)], parameters=[parameter], analysis_seg=workflow.analysis_time, tags=[str(i), group, str(j)]) # add plots to HTML pageslayout.single_layout(rdir["posteriors"], sum(zip(post_table_files, post_files), ()))layout.single_layout(rdir["samples"], accept_files)for group in sorted(plot_groups.keys()): layout.single_layout(rdir[post_group_fmt.format(group)], post_group_files[group]) layout.single_layout(rdir[sample_group_fmt.format(group)], sample_group_files[group]) # read inference configuration filecp = configuration.WorkflowConfigParser([opts.inference_config_file]) # add injection recovery plotsif not opts.data_type == "analytical": inj_int_files = gwin_workflow.make_inference_inj_plots( workflow, inference_files, rdir.base, cp.options("variable_params"), name="inference_intervals") inj_rec_files = gwin_workflow.make_inference_inj_plots( workflow, inference_files, rdir.base, cp.options("variable_params"), name="inference_recovery") layout.two_column_layout(rdir.base, [(a, b) for a, b in zip(inj_int_files, inj_rec_files)]) # create versioning HTML pagesversioning.create_versioning_page(rdir["workflow/version"], container.cp) # create node for making HTML pagesplotting.make_results_web_page(finalize_workflow, os.path.join(os.getcwd(), rdir.base)) # add sub-workflows to super-workflowcontainer += workflowcontainer += finalize_workflow # make finalize sub-workflow depend on main sub-workflowdep = dax.Dependency(parent=workflow.as_job, child=finalize_workflow.as_job)container._adag.addDependency(dep) # write daxcontainer.save(filename=opts.output_file, output_map_path=opts.output_map) # save workflow configuration filebase = rdir["workflow/workflow_configuration"]core.makedir(base)workflow_ini = workflow.save_config("workflow.ini", base, container.cp)layout.single_layout(base, workflow_ini) # save inference configuration filebase = rdir["workflow/inference_configuration"]core.makedir(base)prior_ini = workflow.save_config("gwin.ini", base, cp)layout.single_layout(base, prior_ini) # close the log and flush to the html filelogging.shutdown()with open (log_file_txt.storage_path, "r") as log_file: log_data = log_file.read()log_str = """<p>Workflow generation script created workflow in output directory: %s</p><p>Workflow name is: %s</p><p>Workflow generation script run on host: %s</p><pre>%s</pre>""" % (os.getcwd(), opts.workflow_name, socket.gethostname(), log_data)kwds = {"title" : "Workflow Generation Log", "caption" : "Log of the workflow script %s" % sys.argv[0], "cmd" : " ".join(sys.argv)}metadata.save_fig_with_metadata(log_str, log_file_html.storage_path, **kwds)layout.single_layout(rdir["workflow"], ([log_file_html]))