Source code for esp.models.analysis

# -*- coding: utf-8 -*-
#
# Analysis-related models.
#
# ------------------------------------------------


# imports
# -------
import os
import time
import logging
from operator import itemgetter

from gems import cached, composite
import six

import esp.base as base
from .__base__ import BaseModel, LinkedModel
from .__base__ import raw, yaml_str_list, yaml_str


# models
# ------
[docs]class Pipeline(BaseModel): """ Object for interacting with Pipelines from the ESP database. See the `Usage <./usage.html>`_ and `Examples <./examples.html>`_ pages of the documentation for more context and comprehensive examples of how to create and use this type of objects. Configuration: Create pipeline with single task: .. code-block:: yaml name: Generate Illumina Runsheet desc: Run script to generate illumina runsheet and generate runsheet report tasks: - Create Illumina Runsheet Create pipeline with nested tasks: .. code-block:: yaml name: Generate Illumina Runsheet desc: Run script to generate illumina runsheet and generate runsheet report tasks: - Create Illumina Runsheet: desc: Run script to generate illumina runsheet. cmd: /path/to/script.py {{project}} > runsheet.xml files: - Illumina Runsheet: file_type: xml filename_template: '{{ "runsheet.xml" }}' Create pipeline with task dependencies: .. code-block:: yaml name: Generate Illumina Runsheet desc: Run script to generate illumina runsheet and generate runsheet report tasks: - Create Illumina Runsheet - Upload Illumina Runsheet deps: Upload Illumina Runsheet: Create Illumina Runsheet Create pipeline with report: .. code-block:: yaml name: Generate Illumina Runsheet desc: Run script to generate illumina runsheet and generate runsheet report report: Runsheet Report: desc: Report showing details of runsheet generation elements - - type: file_details depends: file: Illumina Runsheet tasknumber:1 - [] tasks: - Create Illumina Runsheet - Upload Illumina Runsheet deps: Upload Illumina Runsheet: Create Illumina Runsheet Create pipeline with report and failure report: .. code-block:: yaml name: Generate Illumina Runsheet desc: Run script to generate illumina runsheet and generate runsheet report report: Runsheet Report: desc: Report showing details of runsheet generation elements - - type: file_details depends: file: Illumina Runsheet tasknumber:1 - [] failure_report: Runsheet Failure Report: desc: Report showing details of failed runsheet generation elements - - type: file_details depends: file: stderr tasknumber:1 - [] tasks: - Create Illumina Runsheet - Upload Illumina Runsheet deps: Upload Illumina Runsheet: Create Illumina Runsheet Configuration Notes: * Current implementation requires special syntax around creating report elements `- -` and `- []`. This is due to payload requirements and will be updated in future versions to be more user-friendly. Examples: .. code-block:: python >>> from esp.models import Pipeline >>> pipe = Pipeline('Generate Illumina Runsheet') >>> pipe.name, pipe.created_at ('Generate Illumina Runsheet', '2019-06-21T16:04:01.199076Z') >>> # show relationships >>> pipe.tasks [<Task(name=Create Illumina Runsheet)>, <Task(name=Upload Illumina Runsheet)>] >>> pipe.report <ReportTemplate(name=Runsheet Report)> >>> # run pipeline (wait to finish) >>> analysis = pipe.run(project='My Project') >>> analysis.task[0].stdout 'stdout from the first analysis task' >>> analysis.task[0].files [<File(name=Illumina Runsheet)>] >>> # run pipeline (in background) >>> analysis = pipe.run(project='My Project', background=True) >>> analysis.pause() >>> analysis.restart() >>> analysis.kill() Arguments: ident (str): Name or uuid for object. """ __api__ = "pipelines" __api_cls__ = "Pipeline" __version_api__ = "pipeline_definitions" __defaults__ = {"dependencies": {}} __mutable__ = BaseModel.__mutable__ + ["tasks", "dependencies", "schedule", "run_condition"] __push_format__ = { # note that the backend needs to receive the "tasks": lambda x: [ [step.row, step.col, step.name, step.uuid] for step in sorted(x.task_steps, key=lambda step: step.local_id) ], "dependencies": lambda x: x._push_dependencies(), } __exportable__ = BaseModel.__base_exportable__ + [ "tasks", "report", "failure_report", "dependencies", "schedule", "run_condition", ] __versioned_exportable__ = ["uuid", "def_uuid"] __shallow_format__ = { "tasks": lambda x: x._task_layout(False, False), "report": lambda x: x._export_report_type(None), "failure_report": lambda x: x._export_report_type("failed"), "dependencies": lambda x: x._export_dependencies(), } __deep_format__ = {"tasks": lambda x, versioned: x._task_layout(True, versioned=versioned)} __export_format__ = __shallow_format__ def _task_layout(self, deep, versioned): rows = [x.row for x in self.task_steps] cols = [x.col for x in self.task_steps] max_rows = max(rows) max_cols = max(cols) min_rows = min(rows) grid = [[None] * (max_cols + 1) for x in range(min_rows, max_rows + 1)] for step in self.task_steps: # in fact that exporting has only task list without negative rows position # and assuming that tasks may have negative indexes in row value, export each task based on it's order, # mapping to the grid list starting from 0 row till to max_rows+1 grid[step.row + abs(min_rows)][step.col] = ( step.task.export(deep=deep, versioned=versioned) if deep else step.name ) for row in grid: while row and row[-1] is None: row.pop() return grid def _export_dependencies(self): parent_map = {step.local_id: step for step in self.task_steps} deps = {} # trying to generate: # deps: # child_task_name: [parent_task_name,...] # for temporal dependencies and # deps: # child_task_name: # - parent_task_name: # parent_task_label: child_task_variable for step in self.task_steps: # Note, parents from the server is 1-based... # Note 2: steps don't have the names for some reason, but self.tasks does... mod_map = {} for mod in step.mods: mod_map.setdefault(mod[2], {})[mod[-1]] = mod[1] for j in step.parents: if j == 0: # no parents. continue parent = parent_map[j] if j in mod_map: deps.setdefault(step.name, []).append({parent.name: mod_map[j]}) else: deps.setdefault(step.name, []).append(parent.name) return deps def _export_report_type(self, type_): templates = [x for x in self.reports if x.report_type == type_] if not templates: return None # Reports are returned from the backend in no particular order. # Sort them via resource_id (the highest resource id is the # most recent) if self.data.report_templates[0].get("resource_id", None): target_uuid = sorted(list(self.data.report_templates), key=itemgetter("resource_id"), reverse=True)[0].uuid # Find/Return the matching report template for template in templates: if template.uuid == target_uuid: return template._export(False, True) else: # Templates are not sortable :( return templates[0]._export(False, True) def _push_dependencies(self): lookup = {x.local_id: x for x in self.task_steps} deps = {} for step in self.task_steps: mod_map = {} for mod in step.mods: mod_map.setdefault(mod[2], {})[mod[-1]] = mod[1] for parent in step.parents: # 0 -> no parent. if parent == 0: continue if parent not in lookup: raise ValueError( "Unable to find parent with local_id {} in steps of pipeline {}!".format(parent, self.name) ) parent = lookup[parent] key = ",".join(map(str, [parent.row, parent.col, step.row, step.col])) if parent.local_id in mod_map: deps[key] = [ {"type": "param", "variable": variable, "label": label} for label, variable in mod_map[parent.local_id].items() ] else: deps[key] = [{"type": "temp", "coord": [parent.row, parent.col]}] return deps
[docs] @classmethod def parse_import(cls, config, overwrite=False, allow_snapshot_uuid_remap=False): """ Create new object in ESP database using config file or other data. Args: config (str, dict, list): Config file or information to use in creating new object. overwrite (bool): Whether or not to delete current entry in the ESP database. """ # prepare task matrix if not isinstance(config["tasks"][0], list): config["tasks"] = [[tsk] for tsk in config["tasks"]] # create pipeline task matrix # IMPROVEMENT: This data structure is weird and the api should # not look like this # The data structure looks like this to support the grid-based layout of pipeline # tasks where tasks on the same row can be concurrently executed, tasks above must come # prior, tasks below may have dependencies on the current row of tasks or any row above, etc. tasks, locs = [], {} for ridx, row in enumerate(config["tasks"]): for cidx, col in enumerate(config["tasks"][ridx]): tconfig = config["tasks"][ridx][cidx] if tconfig: if isinstance(tconfig, six.string_types): obj = Task(tconfig) else: obj = Task.create( config["tasks"][ridx][cidx], overwrite=overwrite, allow_snapshot_uuid_remap=allow_snapshot_uuid_remap, ) else: continue tasks.append([ridx, cidx, obj.name, obj.uuid]) # KNOWN LIMITATION: including the same task in a pipeline twice if obj.name in locs: raise NotImplementedError( "Currently cannot include a task multiple times in a " f"pipeline. This feature has yet to be implemented. Duplicate object: {obj.name}" ) locs[obj.name] = [ridx, cidx] config["tasks"] = tasks # configure task dependencies deps = {} for child, parents in config["dependencies"].items(): if not isinstance(parents, (list, tuple)): parents = [parents] for parent in parents: if isinstance(parent, str): deps[",".join(map(str, locs[parent] + locs[child]))] = [{"type": "temp", "coord": locs[parent]}] elif not isinstance(parent, dict): raise ValueError("Dependency parents must be str or dict.") else: parent_name = parent.pop("name") key = ",".join(map(str, locs[parent_name] + locs[child])) deps[key] = [{"type": "param", "variable": v, "label": k} for k, v in parent.items()] config["dependencies"] = deps cls._reports = [config.pop("report", None), config.pop("failure_report", None)] return config
[docs] @classmethod def parse_response(cls, data, overwrite=False, prompt=False, allow_snapshot_uuid_remap=False): """ Parse response from server when creating new pipeline. :param allow_snapshot_uuid_remap: """ # NOTE: MORE BACKEND INCONSISTENCY THAT RETURNS {'id': UUID} # OR {'uuid': UUID}. ACCOUNTING FOR VARIABILITY HERE, BUT # THE BACKEND NEEDS TO CHANGE. report, failure_report = cls._reports del cls._reports obj = cls(data.get("uuid", data.get("id", None))) if not obj.exists(): return if report is not None: report["pipeline"] = obj PipelineReport.create(report, overwrite=overwrite, prompt=prompt) if failure_report is not None: failure_report["pipeline"] = obj failure_report["report_type"] = "failed" PipelineReport.create(failure_report, overwrite=overwrite, prompt=prompt) obj.refresh() return obj
[docs] def drop(self, deep=False): """ Drop all associated reports and pipeline. """ for report in self.reports: if report.exists(): report.drop() tasks = self.tasks super(Pipeline, self).drop() if deep: for task in tasks: try: task.drop(deep=deep) except: logging.info("Cannot drop nested task {}. Used by multiple pipelines.".format(task.name)) continue return
@cached def dependencies(self): """ Show task dependency graph. Edits to this graph will be reflected when a user issues self.push(). """ raise NotImplementedError @cached def tasks(self): """ Return tasks associated with pipeline. """ # vast majority of our pipelines are single-task pipelines right now, # so using standard constructor for the time being to support pinning. # TODO: support snapshots within items_for_uuids, # return [x for x in Task.items_for_uuids(step.uuid for step in self.steps)] snapshot = self._snapshot or {} tasks_snap = {x["uuid"]: x for x in snapshot.get("tasks", [])} return [ Task(step.uuid, version=tasks_snap.get(step.uuid, {}).get("def_uuid"), snapshot=tasks_snap.get(step.uuid)) for step in self.steps ] @cached def task_steps(self): """ Return tasks associated with pipeline along with their pipeline-specific step information. """ lookup = {task.uuid: task for task in self.tasks} return [TaskStep(lookup.get(obj.uuid), obj) for obj in self.steps] @cached def report(self): """ Return PipelineReport objects associated with pipeline. """ if len(self.reports) != 1: raise AssertionError( "Pipeline {} has multiple PipelineReport objects." "Use self.reports instead.".format(self.name) ) return self.reports[0] @cached def reports(self): """ Return PipelineReport objects associated with pipeline. """ return [PipelineReport(x["uuid"]) for x in self.report_templates] @property def schedule(self): return self.data.get("schedule") @schedule.setter def schedule(self, schedule): self.data["schedule"] = schedule @property def run_condition(self): return self.meta.get("run_condition")
[docs] def run(self, **kwargs): """ Run pipeline with specified parameters. Args: **kwargs: Keyword arguments to pass in for pipeline execution. Examples: >>> pipe = Pipeline('NGS Pipeline') >>> print(pipe.meta.tasks) ['Preprocess FASTQ', 'Align BAM', 'Call Variants'] >>> print(pipe.arguments) 'fastq' >>> pipe.run(fastq='/path/to/file.fastq') """ logging.info("Running pipeline: {}".format(self.name)) bkg = kwargs.pop("background", False) # add support for specifying objects from .file import File for key in kwargs: if isinstance(kwargs[key], File): kwargs[key] = kwargs[key].path elif isinstance(kwargs[key], BaseModel): kwargs[key] = kwargs[key].name # dispatch pipeline payload = {"env": kwargs, "instances": True, "pipeline": self.uuid} try: result = base.SESSION.post("/api/pipeline_instances", json=payload) except AssertionError as err: message = "Pipeline could not be executed! Please make sure all arguments for the pipeline are specified as inputs to run()." raise AssertionError("{} \n\nCLIENT ERROR:\n\n {}".format(err, message)) data = result.json() # set up Analysis object for monitoring pool = Analysis(data[0]["uuid"]) # don't wait to return analysis if bkg: return pool # wait for pipeline to finish pool.join() return pool
class TaskStep: def __init__(self, task, step): self.task = task self.step = step @property def uuid(self): return self.task.uuid @property def name(self): return self.task.name @property def row(self): return self.step.row @property def col(self): return self.step.col @property def parents(self): return self.step.parents @property def mods(self): return self.step.mods @property def local_id(self): return self.step.local_id
[docs]class PipelineReport(BaseModel): """ Object for interacting with PipelineReports from the ESP database. See the `Usage <./usage.html>`_ and `Examples <./examples.html>`_ pages of the documentation for more context and comprehensive examples of how to create and use this type of objects. Configuration: Create pipeline with report: .. code-block:: yaml name: Generate Illumina Runsheet desc: Run script to generate illumina runsheet and generate runsheet report report: Runsheet Report: desc: Report showing details of runsheet generation elements - - type: file_details depends: file: Illumina Runsheet tasknumber:1 - [] tasks: - Create Illumina Runsheet Create multi-step pipeline with report: .. code-block:: yaml name: Generate Illumina Runsheet desc: Run script to generate illumina runsheet and generate runsheet report report: Runsheet Report: desc: Report showing details of runsheet generation elements: - - type: file_details depends: file: Illumina Runsheet tasknumber:1 - - type: raw_file depends: file: Runsheet Upload Report tasknumber:2 - - type: html contents: |+ <h1>Report Header</h1> <p>Report Body</p> - [] tasks: - Create Illumina Runsheet - Upload Illumina Runsheet deps: Upload Illumina Runsheet: Create Illumina Runsheet .. TODO - Refactor report configuration to look like this instead: .. .. code-block:: yaml .. name: Generate Illumina Runsheet .. desc: Run script to generate illumina runsheet and generate runsheet report .. report: .. Runsheet Report: .. desc: Report showing details of runsheet generation .. elements: .. - type: file_details .. file: Illumina Runsheet .. - type: raw_file .. file: /path/to/local/file.pdf .. - type: html .. contents: |+ .. <h1>Report Header</h1> .. <p>Report Body</p> .. tasks: .. - Create Illumina Runsheet .. - Upload Illumina Runsheet .. deps: .. Upload Illumina Runsheet: Create Illumina Runsheet Configuration Notes: * Report configuration will generally happen in the context of a pipeline, Accordingly, this documentation references report generation in that context. * Current implementation requires special syntax around creating report elements ``- -`` and ``- []``. This is due to payload requirements and will be updated in future versions to be more user-friendly. Examples: .. code-block:: python >>> from esp.models import Pipeline >>> pipe = Pipeline('Create Illumina Runsheet') >>> pipe.name, pipe.created_at ('Generate Illumina Runsheet', '2019-06-21T16:04:01.199076Z') >>> pipe.report.name, pipe.created_at ('Runsheet Report', '2019-06-21T16:04:01.199076Z') >>> pipe.report.pipeline.name, pipe.report.pipeline.created_at ('Generate Illumina Runsheet', '2019-06-21T16:04:01.199076Z') Arguments: ident (str): Name or uuid for object. """ __api__ = "report_templates" __api_cls__ = "ReportTemplate" __mutable__ = BaseModel.__mutable__ + [ # NOTE: UUID SHOULDN'T BE REQUIRED BY THE BACKEND "uuid", "parent", "elements", ] __push_format__ = { # NOTE: THE BACKEND REQUIRING THIS DATA STRUCTURE IS UNECESSARY # AND THE HANDLER SHOULD BE CLEANED UP "parent": lambda x: { "uuid": x.pipeline.uuid, "url": "/api/pipelines/{}".format(x.pipeline.uuid), "cls": "Pipeline", } } __exportable__ = BaseModel.__base_exportable__ + [ "pipeline", "elements", ] def _export_elements(self): ret = [] for tab in self.elements: export_tab = [] ret.append(export_tab) for element in tab: eltype = element["type"] export_el = { "type": yaml_str(eltype), } export_tab.append(export_el) for key in self.__element_required__[eltype]: if key != "depends": # b/c the only other required is contents. # If that changes, this code needs to change, too. export_el[key] = yaml_str(element[key]) for key, val in self.__element_defaults__.get(eltype, ()): if key in element: if isinstance(element[key], composite): export_el[key] = element[key].json() else: export_el[key] = yaml_str(element[key]) if "depends" not in element: continue export_deps = [] export_el["depends"] = export_deps for dep in element["depends"]: export_dep = {} export_deps.append(export_dep) if "source" in dep: export_dep["source"] = dep["source"].json() elif "taskfile_name" in dep: export_dep["file"] = yaml_str(dep["taskfile_name"]) export_dep["tasknumber"] = dep["task_local_id"] else: raise ValueError("Unhandled dependency: {}".format(dep)) return ret __shallow_format__ = { "pipeline": lambda x: yaml_str(x.pipeline.name), "elements": _export_elements, } # Note: tread carefully here: the elements below should ONLY be used # as defaults in the create method, where the data will be posted # and replaced with values from the server. __element_defaults__ = { "annotation_list": (("visibleRows", {}),), "stacked_barchart": (("cols", "*"), ("delimiter", ",")), "table": (("delimiter", ","),), "text_choice": (("depends", []),), "text_edit": (("contents", ""),), "vcf_viewer": (("contents", ""),), } __element_required__ = { "annotation_list": ("depends",), "annotation_text": ("depends",), "file_details": ("depends",), "genome_browser": ("depends",), "html": ("contents",), "html_file": ("depends",), "iframe_file": ("depends",), "iframe_url": ("depends",), "illuminate_viewer": ("depends",), "image_file": ("depends",), "lifemap_varelect": ("depends",), "raw_file": ("depends",), "stacked_barchart": ("depends",), "table": ("depends",), "text": ("contents",), "text_choice": ("contents",), "text_edit": (), "vcf_viewer": ("depends",), } __element_datasources__ = {"text_edit", "vcf_viewer"} def _data_by_name(self): """ Override data querying by name. """ response = base.SESSION.get("/api/{}?name={}".format(self.__api__, self.ident)).json() if len(response) == 0: return {} # Return the most-recent one that has the same name if response[0].get("resource_id", None): response = sorted(response, key=itemgetter("resource_id"), reverse=True) result = {} for data in response: if data["name"] == self.ident: return data return result
[docs] @classmethod def parse_import(cls, config, overwrite=False, allow_snapshot_uuid_remap=False): """ Create new object in ESP database using config file or other data. .. note:: A Pipeline with the following construct could have a corresponding config that looks like the following: If a Pipeline (named 'pipelineA') with Tasks "taskB" and "taskA" is setup as such (without a temporal connection) in the UI: .. code-block:: text - - - - taskB - - taskA - - - - and the Pipeline has a report that has the following construct: .. code-block:: yaml FileDetails: fileA_for_taskA (Step 2: taskA) FileDetails: fileB_for_taskB (Step 1: taskB) the config object could have (or has) the following structure: .. code-block:: json { "elements": [ [ { "depends": [ { "file": "fileA_for_taskA", "tasknumber": 2 } ], "type": "file_details" }, { "depends": [ { "file": "fileB_for_taskB", "tasknumber": 1 } ], "type": "file_details" } ], [] ], "name": "pipelineA", "uuid": null, "desc": "" } Args: config (str, dict, list): Config file or information to use in creating new object. overwrite (bool): Whether or not to delete current entry in the ESP database. """ # Get/Construct the pipeline object if "pipeline" not in config: raise ValueError("`pipeline` must be specified in PipelineReport configuration!") if isinstance(config["pipeline"], Pipeline): pipeline = config.pop("pipeline") else: pipeline = Pipeline(config.pop("pipeline")) # Set the 'parent' pipeline definition config["parent"] = {"uuid": pipeline.def_uuid} # Gather task files for parsing report elements pipe_task_files = {} for task in pipeline.tasks: for taskfile in task.task_files: pipe_task_files.setdefault(taskfile.name, []).append(taskfile) # Parse report elements for element_table in config["elements"]: for element in element_table: cls._verify_report_element(element) if "depends" not in element: continue for dependency in element["depends"]: dependency["pipeline"] = config["parent"] if "file" in dependency: cls._build_taskfile_dependency(dependency, pipeline, pipe_task_files) elif "source" in dependency: cls._build_element_dependency(dependency) else: raise ValueError("Unhandled dependency type: {}".format(dependency)) return config
@classmethod def _verify_report_element(cls, element): # Check that parameter element has 'type' if "type" not in element: raise ValueError("PipelineReport elements must declare type.") # Check that the 'type' defined in element is a part of the valid # types as defined in __element_required__ if element["type"] not in cls.__element_required__: raise ValueError( "Invalid report element type `{}`. " "Valid types: {}".format(element["type"], list(cls.__element_required__.keys())) ) # Check that each required key for the element's 'type' is defined in the element for key in cls.__element_required__[element["type"]]: if key not in element: raise ValueError( "PipelineReport element of type `{}` " "must have key `{}`".format(element["type"], key) ) # Assign default values for the applicable element types to the element # (leave already-defined defaults as-is) if element["type"] in cls.__element_defaults__: for key, val in cls.__element_defaults__[element["type"]]: element.setdefault(key, val) @classmethod def _build_taskfile_dependency(cls, dependency, pipeline, pipe_task_files): # Assign the resource class of the dependency # TODO: There is no cls for 'TaskFile'... ?? dependency["cls"] = "TaskFile" # Get the dependency's file name filename = dependency.pop("file") # Assign the dependency's task file's name dependency["taskfile_name"] = filename # Check for std out/err to get the file type if not std if filename not in ["stdout", "stderr"]: # Verify the filename exists in the pipeline's task file list if filename not in pipe_task_files: raise ValueError("Task file `{}` not " "defined for pipeline `{}`".format(filename, pipeline.name)) # Assign the dependency's task file type dependency["taskfile_type"] = pipe_task_files[filename][0]["file_type"] # Verify the dependency has a tasknumber assigned to it if "tasknumber" not in dependency: raise ValueError( "File `{}` does not contain tasknumber " "in pipeline `{}`: must define tasknumber".format(filename, pipeline.name) ) # Get the tasknumber tasknumber = dependency.pop("tasknumber") # Verify the tasknumber is within the range of how many tasks the pipeline has if tasknumber > len(pipeline.tasks): raise ValueError( "Tasknumber ({}) for task `{}` " "exceeds number of tasks in pipeline!".format(tasknumber, filename) ) # Assign the dependency's task number dependency["task_local_id"] = tasknumber # Find the name of the task taskname = "null" for step in pipeline.task_steps: if step.local_id == tasknumber: taskname = step.name break # Build the UI Label for this dependency dependency["ui_label"] = "{} (Step {}: {})".format(filename, tasknumber, taskname) @classmethod def _build_element_dependency(cls, dep): dep["cls"] = "ReportElement" source = dep["source"] for key in ("element", "report", "type"): if key not in dep["source"]: raise ValueError( "PipelineReport element dependencies of type ReportElement must " "have a source dict with keys element, report, and type" ) dep["ui_label"] = "{} ({}.{})".format(source["type"], source["report"] + 1, source["element"] + 1) @cached def pipeline(self): """ Relationship to pipeline object. """ # backend DS _does_ have a ref to the parent, but it's the pipeline _definition_. return Pipeline.from_definition(self.parent.uuid)
[docs]class Task(BaseModel): """ Object for interacting with Tasks from the ESP database. See the `Usage <./usage.html>`_ and `Examples <./examples.html>`_ pages of the documentation for more context and comprehensive examples of how to create and use this type of objects. Configuration: Simple task (no variables): .. code-block:: yaml name: Ping Internal Server desc: Run command to ping internal server. cmd: curl http://internal-server Simple task with variables: .. code-block:: yaml name: Ping Internal Server desc: Run command to ping internal server. cmd: curl '{{ server }}' Create task and track outputs: .. code-block:: yaml name: Create Illumina Runsheet desc: Run script to generate illumina runsheet. cmd: /path/to/generate_runsheet.py {{project}} > runsheet.xml files: - Illumina Runsheet: file_type: xml filename_template: '{{ "runsheet.xml" }}' Create task with inline code: .. code-block:: yaml name: Report DNA Type desc: Run bash script to generate simple report. cmd: |+ # Simple Task that determines if the specified 'ratio' is in the proper range if [ "{{ type }}" = "DNA" ]; then echo "<b>This is a <font color='green'>DNA</font> sample</b>" > result.html elif "{{ type }}" = "DNA" ]; then echo "<b>This is a <font color='green'>RNA</font> sample</b>" > result.html fi files: - Type Report: file_type: html filename_template: '{{ "report.html" }}' Examples: .. code-block:: python >>> from esp.models import Task >>> task = Task('Create Illumina Runsheet') >>> task.name, task.created_at ('Create Illumina Runsheet', '2019-06-21T16:04:01.199076Z') >>> # show relationships >>> task.variables ['project'] >>> task.cmd '/path/to/generate_runsheet.py {{project}} > runsheet.xml' Arguments: ident (str): Name or uuid for object. """ __api__ = "tasks" __api_cls__ = "Task" # NOTE: /api/task_definitions doesn't exist __version_api__ = "task_definitions" __mutable__ = BaseModel.__mutable__ + ["cmd", "task_files"] __push_format__ = {"task_files": lambda x: raw(x.files)} __exportable__ = BaseModel.__base_exportable__ + ["cmd", "files"] __versioned_exportable__ = ["uuid", "def_uuid"] def _export_files(self): ret = [] for x in self.task_files: y = {} for key in ("name", "file_type", "filename_template"): y[yaml_str(key)] = yaml_str(x[key]) if "tags" in x and x["tags"]: y["tags"] = yaml_str_list(x["tags"]) ret.append(y) return ret __shallow_format__ = {"files": _export_files} @cached def files(self): return self.task_files
[docs]class Analysis(LinkedModel): """ Object for interacting with analyses from the ESP database. Examples: .. code-block:: python >>> from esp.models import Pipeline >>> pipe = Pipeline('Generate Illumina Runsheet') >>> # run pipeline to return analysis object >>> analysis = pipe.run(project='My Project') >>> analysis.task[0].stdout 'stdout from the first analysis task' >>> analysis.task[0].files [<File(name=Illumina Runsheet)>] >>> # run pipeline (in background) >>> analysis = pipe.run(project='My Project', background=True) >>> analysis.pause() >>> analysis.restart() >>> analysis.kill() >>> # get current analysis (within script) >>> analysis = Analysis.current() >>> analysis.pause() Args: ident (str): Name or uuid for object. """ __api__ = "pipeline_instances" period = 0.25 @classmethod def current(cls): uu = os.environ.get("LAB7_PIPELINE_UUID") if uu is None: raise AssertionError("LAB7_PIPELINE_UUID is not set -- this can only " "be used within a running pipeline.") return cls(uu) @cached def tasks(self): """ Return objects for each step in pipeline. """ return [AnalysisTask(x["uuid"]) for x in self.steps] def pause(self): result = base.SESSION.put("/api/{}/{}?action={}".format(self.__api__, self.uuid, "pause")) return self.join(["done", "paused"], timeout=5) def kill(self): result = base.SESSION.put("/api/{}/{}?action={}".format(self.__api__, self.uuid, "kill")) return self.join(["done", "failed"], timeout=5) def restart(self): result = base.SESSION.put("/api/{}/{}?action={}".format(self.__api__, self.uuid, "restart")) return self.join(["running", "pending"], timeout=5)
[docs] def join(self, expect=["failed", "done", "paused"], timeout=1e6): """ Wait for analysis to finish, and return when done. """ count = 0 while True: done = False if self.state in expect: done = True if done or count > timeout: break time.sleep(self.period) count += self.period self.refresh() return self
class LimsAnalysis(Analysis): """ Object for interacting with pipelines run via LIMS application. Examples: .. code-block:: python >>> from esp.models import LimsAnalysis >>> # get current analysis (within script) >>> analysis = LimsAnalysis.current() >>> analysis.pause() >>> # get samples associated with current worksheet >>> analysis.samples [<Sample(ESP0001)>, <Sample(ESP0001)>] >>> # get sample sheet associated with current worksheet >>> analysis.sample_sheet <SampleSheet(My-LIMS-Sample-Sheet)> Args: ident (str): Name or uuid for object. """ @cached def samples(self): """ Fetch sample dependencies of current LIMS analysis. """ from .sample import Sample return [Sample.from_data(x[0]) for x in self.priors if x[0].cls == "Sample" and x[1] == "processed"] @cached def sample_sheet(self): """ Fetch sample sheet dependency of current LIMS analysis. """ from .project import SampleSheet ss = [SampleSheet.from_data(x[0]) for x in self.priors if x[0].cls == "SampleSheet" and x[1] == "ran"] if ss: ss = ss[0] ss.refresh() return ss return None
[docs]class AnalysisTask(BaseModel): """ Object for interacting with task instances from the ESP database. Examples: .. code-block:: python >>> from esp.models import Pipeline >>> pipe = Pipeline('Generate Illumina Runsheet') >>> # run pipeline to return analysis object >>> analysis = pipe.run(project='My Project') >>> # get first task object from analysis >>> atask = analysis.task[0] >>> atask.stdout 'stdout from the first analysis task' >>> atask.stderr 'stderr from the first analysis task' >>> atask.files [<File(name=Illumina Runsheet)>] Args: ident (str): Name or uuid for object. """ __api__ = "task_instances" @cached def files(self): """ Property for accessing all files associated with task. """ from .file import File res = [] for fi in self.task_files: if fi["taskfile_name"] not in ["job_script", "stdout", "stderr"]: res.append(File(fi["uuid"])) return res @cached def script(self): """ Property to accessing task stdout. """ for fi in self.task_files: if fi["taskfile_name"] == "job_script": res = base.SESSION.get("/api/files/{}/contents".format(fi["uuid"])) return res.json().get("data") return @cached def stdout(self): """ Property to accessing task stdout. """ for fi in self.task_files: if fi["taskfile_name"] == "stdout": res = base.SESSION.get("/api/files/{}/contents".format(fi["uuid"])) return res.json().get("data") return @cached def stderr(self): """ Property to accessing task stderr. """ for fi in self.task_files: if fi["taskfile_name"] == "stderr": res = base.SESSION.get("/api/files/{}/contents".format(fi["uuid"])) return res.json().get("data") return
[docs]class AnalysisPool(object): """ Object for interacting with analyses from the ESP database. Examples: .. code-block:: python >>> from esp.models import Pipeline >>> pipe = Pipeline('Generate Illumina Runsheet') >>> # run multiple scripts to gather in pool >>> foo_analysis = pipe.run(project='Foo Project') >>> bar_analysis = pipe.run(project='Bar Project') >>> # create analysis pool to control execution >>> pool = AnalysisPool([foo_analysis, bar_analysis]) >>> pool.state 'running' >>> # wait for all analyses to finish in pool >>> pool.join() >>> pool.state 'done' Args: analyses (str): List of analyses to include in pool. """ def __init__(self, analyses): self.analyses = analyses return def __iter__(self): for obj in self.analyses: yield obj return @property def state(self): """ Return "state" of the analysis pool. """ states = [a.state for a in self.analyses] if "failed" in states: return "failed" elif "paused" in states: return "paused" elif "running" in states: return "running" return "done"
[docs] def join(self, period=0.25): """ Wait for all analyses to finish, and return when they're done. Arguments: period (float): Period (seconds) for polling results on a join operation. """ while True: done = True for obj in self.analyses: if obj.state not in ["failed", "done"]: done = False break if done: break time.sleep(period) for obj in self.analyses: obj.refresh() return self.analyses