# -*- coding: utf-8 -*-
#
# Analysis-related models.
#
# ------------------------------------------------
# imports
# -------
import os
import time
import logging
from operator import itemgetter
from gems import cached, composite
import six
import esp.base as base
from .__base__ import BaseModel, LinkedModel
from .__base__ import raw, yaml_str_list, yaml_str
# models
# ------
[docs]class Pipeline(BaseModel):
"""
Object for interacting with Pipelines from the ESP database.
See the `Usage <./usage.html>`_ and `Examples <./examples.html>`_ pages
of the documentation for more context and comprehensive examples of
how to create and use this type of objects.
Configuration:
Create pipeline with single task:
.. code-block:: yaml
name: Generate Illumina Runsheet
desc: Run script to generate illumina runsheet and generate runsheet report
tasks:
- Create Illumina Runsheet
Create pipeline with nested tasks:
.. code-block:: yaml
name: Generate Illumina Runsheet
desc: Run script to generate illumina runsheet and generate runsheet report
tasks:
- Create Illumina Runsheet:
desc: Run script to generate illumina runsheet.
cmd: /path/to/script.py {{project}} > runsheet.xml
files:
- Illumina Runsheet:
file_type: xml
filename_template: '{{ "runsheet.xml" }}'
Create pipeline with task dependencies:
.. code-block:: yaml
name: Generate Illumina Runsheet
desc: Run script to generate illumina runsheet and generate runsheet report
tasks:
- Create Illumina Runsheet
- Upload Illumina Runsheet
deps:
Upload Illumina Runsheet: Create Illumina Runsheet
Create pipeline with report:
.. code-block:: yaml
name: Generate Illumina Runsheet
desc: Run script to generate illumina runsheet and generate runsheet report
report:
Runsheet Report:
desc: Report showing details of runsheet generation
elements
- - type: file_details
depends:
file: Illumina Runsheet
tasknumber:1
- []
tasks:
- Create Illumina Runsheet
- Upload Illumina Runsheet
deps:
Upload Illumina Runsheet: Create Illumina Runsheet
Create pipeline with report and failure report:
.. code-block:: yaml
name: Generate Illumina Runsheet
desc: Run script to generate illumina runsheet and generate runsheet report
report:
Runsheet Report:
desc: Report showing details of runsheet generation
elements
- - type: file_details
depends:
file: Illumina Runsheet
tasknumber:1
- []
failure_report:
Runsheet Failure Report:
desc: Report showing details of failed runsheet generation
elements
- - type: file_details
depends:
file: stderr
tasknumber:1
- []
tasks:
- Create Illumina Runsheet
- Upload Illumina Runsheet
deps:
Upload Illumina Runsheet: Create Illumina Runsheet
Configuration Notes:
* Current implementation requires special syntax around creating report
elements `- -` and `- []`. This is due to payload requirements and will
be updated in future versions to be more user-friendly.
Examples:
.. code-block:: python
>>> from esp.models import Pipeline
>>> pipe = Pipeline('Generate Illumina Runsheet')
>>> pipe.name, pipe.created_at
('Generate Illumina Runsheet', '2019-06-21T16:04:01.199076Z')
>>> # show relationships
>>> pipe.tasks
[<Task(name=Create Illumina Runsheet)>, <Task(name=Upload Illumina Runsheet)>]
>>> pipe.report
<ReportTemplate(name=Runsheet Report)>
>>> # run pipeline (wait to finish)
>>> analysis = pipe.run(project='My Project')
>>> analysis.task[0].stdout
'stdout from the first analysis task'
>>> analysis.task[0].files
[<File(name=Illumina Runsheet)>]
>>> # run pipeline (in background)
>>> analysis = pipe.run(project='My Project', background=True)
>>> analysis.pause()
>>> analysis.restart()
>>> analysis.kill()
Arguments:
ident (str): Name or uuid for object.
"""
__api__ = "pipelines"
__api_cls__ = "Pipeline"
__version_api__ = "pipeline_definitions"
__defaults__ = {"dependencies": {}}
__mutable__ = BaseModel.__mutable__ + ["tasks", "dependencies", "schedule", "run_condition"]
__push_format__ = {
# note that the backend needs to receive the
"tasks": lambda x: [
[step.row, step.col, step.name, step.uuid] for step in sorted(x.task_steps, key=lambda step: step.local_id)
],
"dependencies": lambda x: x._push_dependencies(),
}
__exportable__ = BaseModel.__base_exportable__ + [
"tasks",
"report",
"failure_report",
"dependencies",
"schedule",
"run_condition",
]
__versioned_exportable__ = ["uuid", "def_uuid"]
__shallow_format__ = {
"tasks": lambda x: x._task_layout(False, False),
"report": lambda x: x._export_report_type(None),
"failure_report": lambda x: x._export_report_type("failed"),
"dependencies": lambda x: x._export_dependencies(),
}
__deep_format__ = {"tasks": lambda x, versioned: x._task_layout(True, versioned=versioned)}
__export_format__ = __shallow_format__
def _task_layout(self, deep, versioned):
rows = [x.row for x in self.task_steps]
cols = [x.col for x in self.task_steps]
max_rows = max(rows)
max_cols = max(cols)
min_rows = min(rows)
grid = [[None] * (max_cols + 1) for x in range(min_rows, max_rows + 1)]
for step in self.task_steps:
# in fact that exporting has only task list without negative rows position
# and assuming that tasks may have negative indexes in row value, export each task based on it's order,
# mapping to the grid list starting from 0 row till to max_rows+1
grid[step.row + abs(min_rows)][step.col] = (
step.task.export(deep=deep, versioned=versioned) if deep else step.name
)
for row in grid:
while row and row[-1] is None:
row.pop()
return grid
def _export_dependencies(self):
parent_map = {step.local_id: step for step in self.task_steps}
deps = {}
# trying to generate:
# deps:
# child_task_name: [parent_task_name,...]
# for temporal dependencies and
# deps:
# child_task_name:
# - parent_task_name:
# parent_task_label: child_task_variable
for step in self.task_steps:
# Note, parents from the server is 1-based...
# Note 2: steps don't have the names for some reason, but self.tasks does...
mod_map = {}
for mod in step.mods:
mod_map.setdefault(mod[2], {})[mod[-1]] = mod[1]
for j in step.parents:
if j == 0: # no parents.
continue
parent = parent_map[j]
if j in mod_map:
deps.setdefault(step.name, []).append({parent.name: mod_map[j]})
else:
deps.setdefault(step.name, []).append(parent.name)
return deps
def _export_report_type(self, type_):
templates = [x for x in self.reports if x.report_type == type_]
if not templates:
return None
# Reports are returned from the backend in no particular order.
# Sort them via resource_id (the highest resource id is the
# most recent)
if self.data.report_templates[0].get("resource_id", None):
target_uuid = sorted(list(self.data.report_templates), key=itemgetter("resource_id"), reverse=True)[0].uuid
# Find/Return the matching report template
for template in templates:
if template.uuid == target_uuid:
return template._export(False, True)
else:
# Templates are not sortable :(
return templates[0]._export(False, True)
def _push_dependencies(self):
lookup = {x.local_id: x for x in self.task_steps}
deps = {}
for step in self.task_steps:
mod_map = {}
for mod in step.mods:
mod_map.setdefault(mod[2], {})[mod[-1]] = mod[1]
for parent in step.parents:
# 0 -> no parent.
if parent == 0:
continue
if parent not in lookup:
raise ValueError(
"Unable to find parent with local_id {} in steps of pipeline {}!".format(parent, self.name)
)
parent = lookup[parent]
key = ",".join(map(str, [parent.row, parent.col, step.row, step.col]))
if parent.local_id in mod_map:
deps[key] = [
{"type": "param", "variable": variable, "label": label}
for label, variable in mod_map[parent.local_id].items()
]
else:
deps[key] = [{"type": "temp", "coord": [parent.row, parent.col]}]
return deps
[docs] @classmethod
def parse_import(cls, config, overwrite=False, allow_snapshot_uuid_remap=False):
"""
Create new object in ESP database using config file or other data.
Args:
config (str, dict, list): Config file or information to use in
creating new object.
overwrite (bool): Whether or not to delete current entry in
the ESP database.
"""
# prepare task matrix
if not isinstance(config["tasks"][0], list):
config["tasks"] = [[tsk] for tsk in config["tasks"]]
# create pipeline task matrix
# IMPROVEMENT: This data structure is weird and the api should
# not look like this
# The data structure looks like this to support the grid-based layout of pipeline
# tasks where tasks on the same row can be concurrently executed, tasks above must come
# prior, tasks below may have dependencies on the current row of tasks or any row above, etc.
tasks, locs = [], {}
for ridx, row in enumerate(config["tasks"]):
for cidx, col in enumerate(config["tasks"][ridx]):
tconfig = config["tasks"][ridx][cidx]
if tconfig:
if isinstance(tconfig, six.string_types):
obj = Task(tconfig)
else:
obj = Task.create(
config["tasks"][ridx][cidx],
overwrite=overwrite,
allow_snapshot_uuid_remap=allow_snapshot_uuid_remap,
)
else:
continue
tasks.append([ridx, cidx, obj.name, obj.uuid])
# KNOWN LIMITATION: including the same task in a pipeline twice
if obj.name in locs:
raise NotImplementedError(
"Currently cannot include a task multiple times in a "
f"pipeline. This feature has yet to be implemented. Duplicate object: {obj.name}"
)
locs[obj.name] = [ridx, cidx]
config["tasks"] = tasks
# configure task dependencies
deps = {}
for child, parents in config["dependencies"].items():
if not isinstance(parents, (list, tuple)):
parents = [parents]
for parent in parents:
if isinstance(parent, str):
deps[",".join(map(str, locs[parent] + locs[child]))] = [{"type": "temp", "coord": locs[parent]}]
elif not isinstance(parent, dict):
raise ValueError("Dependency parents must be str or dict.")
else:
parent_name = parent.pop("name")
key = ",".join(map(str, locs[parent_name] + locs[child]))
deps[key] = [{"type": "param", "variable": v, "label": k} for k, v in parent.items()]
config["dependencies"] = deps
cls._reports = [config.pop("report", None), config.pop("failure_report", None)]
return config
[docs] @classmethod
def parse_response(cls, data, overwrite=False, prompt=False, allow_snapshot_uuid_remap=False):
"""
Parse response from server when creating new pipeline.
:param allow_snapshot_uuid_remap:
"""
# NOTE: MORE BACKEND INCONSISTENCY THAT RETURNS {'id': UUID}
# OR {'uuid': UUID}. ACCOUNTING FOR VARIABILITY HERE, BUT
# THE BACKEND NEEDS TO CHANGE.
report, failure_report = cls._reports
del cls._reports
obj = cls(data.get("uuid", data.get("id", None)))
if not obj.exists():
return
if report is not None:
report["pipeline"] = obj
PipelineReport.create(report, overwrite=overwrite, prompt=prompt)
if failure_report is not None:
failure_report["pipeline"] = obj
failure_report["report_type"] = "failed"
PipelineReport.create(failure_report, overwrite=overwrite, prompt=prompt)
obj.refresh()
return obj
[docs] def drop(self, deep=False):
"""
Drop all associated reports and pipeline.
"""
for report in self.reports:
if report.exists():
report.drop()
tasks = self.tasks
super(Pipeline, self).drop()
if deep:
for task in tasks:
try:
task.drop(deep=deep)
except:
logging.info("Cannot drop nested task {}. Used by multiple pipelines.".format(task.name))
continue
return
@cached
def dependencies(self):
"""
Show task dependency graph. Edits to this graph
will be reflected when a user issues self.push().
"""
raise NotImplementedError
@cached
def tasks(self):
"""
Return tasks associated with pipeline.
"""
# vast majority of our pipelines are single-task pipelines right now,
# so using standard constructor for the time being to support pinning.
# TODO: support snapshots within items_for_uuids,
# return [x for x in Task.items_for_uuids(step.uuid for step in self.steps)]
snapshot = self._snapshot or {}
tasks_snap = {x["uuid"]: x for x in snapshot.get("tasks", [])}
return [
Task(step.uuid, version=tasks_snap.get(step.uuid, {}).get("def_uuid"), snapshot=tasks_snap.get(step.uuid))
for step in self.steps
]
@cached
def task_steps(self):
"""
Return tasks associated with pipeline along with their pipeline-specific step information.
"""
lookup = {task.uuid: task for task in self.tasks}
return [TaskStep(lookup.get(obj.uuid), obj) for obj in self.steps]
@cached
def report(self):
"""
Return PipelineReport objects associated with pipeline.
"""
if len(self.reports) != 1:
raise AssertionError(
"Pipeline {} has multiple PipelineReport objects." "Use self.reports instead.".format(self.name)
)
return self.reports[0]
@cached
def reports(self):
"""
Return PipelineReport objects associated with pipeline.
"""
return [PipelineReport(x["uuid"]) for x in self.report_templates]
@property
def schedule(self):
return self.data.get("schedule")
@schedule.setter
def schedule(self, schedule):
self.data["schedule"] = schedule
@property
def run_condition(self):
return self.meta.get("run_condition")
[docs] def run(self, **kwargs):
"""
Run pipeline with specified parameters.
Args:
**kwargs: Keyword arguments to pass in for pipeline execution.
Examples:
>>> pipe = Pipeline('NGS Pipeline')
>>> print(pipe.meta.tasks)
['Preprocess FASTQ', 'Align BAM', 'Call Variants']
>>> print(pipe.arguments)
'fastq'
>>> pipe.run(fastq='/path/to/file.fastq')
"""
logging.info("Running pipeline: {}".format(self.name))
bkg = kwargs.pop("background", False)
# add support for specifying objects
from .file import File
for key in kwargs:
if isinstance(kwargs[key], File):
kwargs[key] = kwargs[key].path
elif isinstance(kwargs[key], BaseModel):
kwargs[key] = kwargs[key].name
# dispatch pipeline
payload = {"env": kwargs, "instances": True, "pipeline": self.uuid}
try:
result = base.SESSION.post("/api/pipeline_instances", json=payload)
except AssertionError as err:
message = "Pipeline could not be executed! Please make sure all arguments for the pipeline are specified as inputs to run()."
raise AssertionError("{} \n\nCLIENT ERROR:\n\n {}".format(err, message))
data = result.json()
# set up Analysis object for monitoring
pool = Analysis(data[0]["uuid"])
# don't wait to return analysis
if bkg:
return pool
# wait for pipeline to finish
pool.join()
return pool
class TaskStep:
def __init__(self, task, step):
self.task = task
self.step = step
@property
def uuid(self):
return self.task.uuid
@property
def name(self):
return self.task.name
@property
def row(self):
return self.step.row
@property
def col(self):
return self.step.col
@property
def parents(self):
return self.step.parents
@property
def mods(self):
return self.step.mods
@property
def local_id(self):
return self.step.local_id
[docs]class PipelineReport(BaseModel):
"""
Object for interacting with PipelineReports from the ESP database.
See the `Usage <./usage.html>`_ and `Examples <./examples.html>`_ pages
of the documentation for more context and comprehensive examples of
how to create and use this type of objects.
Configuration:
Create pipeline with report:
.. code-block:: yaml
name: Generate Illumina Runsheet
desc: Run script to generate illumina runsheet and generate runsheet report
report:
Runsheet Report:
desc: Report showing details of runsheet generation
elements
- - type: file_details
depends:
file: Illumina Runsheet
tasknumber:1
- []
tasks:
- Create Illumina Runsheet
Create multi-step pipeline with report:
.. code-block:: yaml
name: Generate Illumina Runsheet
desc: Run script to generate illumina runsheet and generate runsheet report
report:
Runsheet Report:
desc: Report showing details of runsheet generation
elements:
- - type: file_details
depends:
file: Illumina Runsheet
tasknumber:1
- - type: raw_file
depends:
file: Runsheet Upload Report
tasknumber:2
- - type: html
contents: |+
<h1>Report Header</h1>
<p>Report Body</p>
- []
tasks:
- Create Illumina Runsheet
- Upload Illumina Runsheet
deps:
Upload Illumina Runsheet: Create Illumina Runsheet
.. TODO - Refactor report configuration to look like this instead:
.. .. code-block:: yaml
.. name: Generate Illumina Runsheet
.. desc: Run script to generate illumina runsheet and generate runsheet report
.. report:
.. Runsheet Report:
.. desc: Report showing details of runsheet generation
.. elements:
.. - type: file_details
.. file: Illumina Runsheet
.. - type: raw_file
.. file: /path/to/local/file.pdf
.. - type: html
.. contents: |+
.. <h1>Report Header</h1>
.. <p>Report Body</p>
.. tasks:
.. - Create Illumina Runsheet
.. - Upload Illumina Runsheet
.. deps:
.. Upload Illumina Runsheet: Create Illumina Runsheet
Configuration Notes:
* Report configuration will generally happen in the context of a pipeline,
Accordingly, this documentation references report generation in that context.
* Current implementation requires special syntax around creating report
elements ``- -`` and ``- []``. This is due to payload requirements and will
be updated in future versions to be more user-friendly.
Examples:
.. code-block:: python
>>> from esp.models import Pipeline
>>> pipe = Pipeline('Create Illumina Runsheet')
>>> pipe.name, pipe.created_at
('Generate Illumina Runsheet', '2019-06-21T16:04:01.199076Z')
>>> pipe.report.name, pipe.created_at
('Runsheet Report', '2019-06-21T16:04:01.199076Z')
>>> pipe.report.pipeline.name, pipe.report.pipeline.created_at
('Generate Illumina Runsheet', '2019-06-21T16:04:01.199076Z')
Arguments:
ident (str): Name or uuid for object.
"""
__api__ = "report_templates"
__api_cls__ = "ReportTemplate"
__mutable__ = BaseModel.__mutable__ + [
# NOTE: UUID SHOULDN'T BE REQUIRED BY THE BACKEND
"uuid",
"parent",
"elements",
]
__push_format__ = {
# NOTE: THE BACKEND REQUIRING THIS DATA STRUCTURE IS UNECESSARY
# AND THE HANDLER SHOULD BE CLEANED UP
"parent": lambda x: {
"uuid": x.pipeline.uuid,
"url": "/api/pipelines/{}".format(x.pipeline.uuid),
"cls": "Pipeline",
}
}
__exportable__ = BaseModel.__base_exportable__ + [
"pipeline",
"elements",
]
def _export_elements(self):
ret = []
for tab in self.elements:
export_tab = []
ret.append(export_tab)
for element in tab:
eltype = element["type"]
export_el = {
"type": yaml_str(eltype),
}
export_tab.append(export_el)
for key in self.__element_required__[eltype]:
if key != "depends":
# b/c the only other required is contents.
# If that changes, this code needs to change, too.
export_el[key] = yaml_str(element[key])
for key, val in self.__element_defaults__.get(eltype, ()):
if key in element:
if isinstance(element[key], composite):
export_el[key] = element[key].json()
else:
export_el[key] = yaml_str(element[key])
if "depends" not in element:
continue
export_deps = []
export_el["depends"] = export_deps
for dep in element["depends"]:
export_dep = {}
export_deps.append(export_dep)
if "source" in dep:
export_dep["source"] = dep["source"].json()
elif "taskfile_name" in dep:
export_dep["file"] = yaml_str(dep["taskfile_name"])
export_dep["tasknumber"] = dep["task_local_id"]
else:
raise ValueError("Unhandled dependency: {}".format(dep))
return ret
__shallow_format__ = {
"pipeline": lambda x: yaml_str(x.pipeline.name),
"elements": _export_elements,
}
# Note: tread carefully here: the elements below should ONLY be used
# as defaults in the create method, where the data will be posted
# and replaced with values from the server.
__element_defaults__ = {
"annotation_list": (("visibleRows", {}),),
"stacked_barchart": (("cols", "*"), ("delimiter", ",")),
"table": (("delimiter", ","),),
"text_choice": (("depends", []),),
"text_edit": (("contents", ""),),
"vcf_viewer": (("contents", ""),),
}
__element_required__ = {
"annotation_list": ("depends",),
"annotation_text": ("depends",),
"file_details": ("depends",),
"genome_browser": ("depends",),
"html": ("contents",),
"html_file": ("depends",),
"iframe_file": ("depends",),
"iframe_url": ("depends",),
"illuminate_viewer": ("depends",),
"image_file": ("depends",),
"lifemap_varelect": ("depends",),
"raw_file": ("depends",),
"stacked_barchart": ("depends",),
"table": ("depends",),
"text": ("contents",),
"text_choice": ("contents",),
"text_edit": (),
"vcf_viewer": ("depends",),
}
__element_datasources__ = {"text_edit", "vcf_viewer"}
def _data_by_name(self):
"""
Override data querying by name.
"""
response = base.SESSION.get("/api/{}?name={}".format(self.__api__, self.ident)).json()
if len(response) == 0:
return {}
# Return the most-recent one that has the same name
if response[0].get("resource_id", None):
response = sorted(response, key=itemgetter("resource_id"), reverse=True)
result = {}
for data in response:
if data["name"] == self.ident:
return data
return result
[docs] @classmethod
def parse_import(cls, config, overwrite=False, allow_snapshot_uuid_remap=False):
"""
Create new object in ESP database using config file or other data.
.. note:: A Pipeline with the following construct could have a corresponding
config that looks like the following:
If a Pipeline (named 'pipelineA') with Tasks "taskB" and "taskA" is
setup as such (without a temporal connection) in the UI:
.. code-block:: text
- - -
- taskB -
- taskA -
- - -
and the Pipeline has a report that has the following construct:
.. code-block:: yaml
FileDetails:
fileA_for_taskA (Step 2: taskA)
FileDetails:
fileB_for_taskB (Step 1: taskB)
the config object could have (or has) the following structure:
.. code-block:: json
{
"elements": [
[
{
"depends": [
{
"file": "fileA_for_taskA",
"tasknumber": 2
}
],
"type": "file_details"
},
{
"depends": [
{
"file": "fileB_for_taskB",
"tasknumber": 1
}
],
"type": "file_details"
}
],
[]
],
"name": "pipelineA",
"uuid": null,
"desc": ""
}
Args:
config (str, dict, list): Config file or information to use in
creating new object.
overwrite (bool): Whether or not to delete current entry in
the ESP database.
"""
# Get/Construct the pipeline object
if "pipeline" not in config:
raise ValueError("`pipeline` must be specified in PipelineReport configuration!")
if isinstance(config["pipeline"], Pipeline):
pipeline = config.pop("pipeline")
else:
pipeline = Pipeline(config.pop("pipeline"))
# Set the 'parent' pipeline definition
config["parent"] = {"uuid": pipeline.def_uuid}
# Gather task files for parsing report elements
pipe_task_files = {}
for task in pipeline.tasks:
for taskfile in task.task_files:
pipe_task_files.setdefault(taskfile.name, []).append(taskfile)
# Parse report elements
for element_table in config["elements"]:
for element in element_table:
cls._verify_report_element(element)
if "depends" not in element:
continue
for dependency in element["depends"]:
dependency["pipeline"] = config["parent"]
if "file" in dependency:
cls._build_taskfile_dependency(dependency, pipeline, pipe_task_files)
elif "source" in dependency:
cls._build_element_dependency(dependency)
else:
raise ValueError("Unhandled dependency type: {}".format(dependency))
return config
@classmethod
def _verify_report_element(cls, element):
# Check that parameter element has 'type'
if "type" not in element:
raise ValueError("PipelineReport elements must declare type.")
# Check that the 'type' defined in element is a part of the valid
# types as defined in __element_required__
if element["type"] not in cls.__element_required__:
raise ValueError(
"Invalid report element type `{}`. "
"Valid types: {}".format(element["type"], list(cls.__element_required__.keys()))
)
# Check that each required key for the element's 'type' is defined in the element
for key in cls.__element_required__[element["type"]]:
if key not in element:
raise ValueError(
"PipelineReport element of type `{}` " "must have key `{}`".format(element["type"], key)
)
# Assign default values for the applicable element types to the element
# (leave already-defined defaults as-is)
if element["type"] in cls.__element_defaults__:
for key, val in cls.__element_defaults__[element["type"]]:
element.setdefault(key, val)
@classmethod
def _build_taskfile_dependency(cls, dependency, pipeline, pipe_task_files):
# Assign the resource class of the dependency
# TODO: There is no cls for 'TaskFile'... ??
dependency["cls"] = "TaskFile"
# Get the dependency's file name
filename = dependency.pop("file")
# Assign the dependency's task file's name
dependency["taskfile_name"] = filename
# Check for std out/err to get the file type if not std
if filename not in ["stdout", "stderr"]:
# Verify the filename exists in the pipeline's task file list
if filename not in pipe_task_files:
raise ValueError("Task file `{}` not " "defined for pipeline `{}`".format(filename, pipeline.name))
# Assign the dependency's task file type
dependency["taskfile_type"] = pipe_task_files[filename][0]["file_type"]
# Verify the dependency has a tasknumber assigned to it
if "tasknumber" not in dependency:
raise ValueError(
"File `{}` does not contain tasknumber "
"in pipeline `{}`: must define tasknumber".format(filename, pipeline.name)
)
# Get the tasknumber
tasknumber = dependency.pop("tasknumber")
# Verify the tasknumber is within the range of how many tasks the pipeline has
if tasknumber > len(pipeline.tasks):
raise ValueError(
"Tasknumber ({}) for task `{}` " "exceeds number of tasks in pipeline!".format(tasknumber, filename)
)
# Assign the dependency's task number
dependency["task_local_id"] = tasknumber
# Find the name of the task
taskname = "null"
for step in pipeline.task_steps:
if step.local_id == tasknumber:
taskname = step.name
break
# Build the UI Label for this dependency
dependency["ui_label"] = "{} (Step {}: {})".format(filename, tasknumber, taskname)
@classmethod
def _build_element_dependency(cls, dep):
dep["cls"] = "ReportElement"
source = dep["source"]
for key in ("element", "report", "type"):
if key not in dep["source"]:
raise ValueError(
"PipelineReport element dependencies of type ReportElement must "
"have a source dict with keys element, report, and type"
)
dep["ui_label"] = "{} ({}.{})".format(source["type"], source["report"] + 1, source["element"] + 1)
@cached
def pipeline(self):
"""
Relationship to pipeline object.
"""
# backend DS _does_ have a ref to the parent, but it's the pipeline _definition_.
return Pipeline.from_definition(self.parent.uuid)
[docs]class Task(BaseModel):
"""
Object for interacting with Tasks from the ESP database.
See the `Usage <./usage.html>`_ and `Examples <./examples.html>`_ pages
of the documentation for more context and comprehensive examples of
how to create and use this type of objects.
Configuration:
Simple task (no variables):
.. code-block:: yaml
name: Ping Internal Server
desc: Run command to ping internal server.
cmd: curl http://internal-server
Simple task with variables:
.. code-block:: yaml
name: Ping Internal Server
desc: Run command to ping internal server.
cmd: curl '{{ server }}'
Create task and track outputs:
.. code-block:: yaml
name: Create Illumina Runsheet
desc: Run script to generate illumina runsheet.
cmd: /path/to/generate_runsheet.py {{project}} > runsheet.xml
files:
- Illumina Runsheet:
file_type: xml
filename_template: '{{ "runsheet.xml" }}'
Create task with inline code:
.. code-block:: yaml
name: Report DNA Type
desc: Run bash script to generate simple report.
cmd: |+
# Simple Task that determines if the specified 'ratio' is in the proper range
if [ "{{ type }}" = "DNA" ]; then
echo "<b>This is a <font color='green'>DNA</font> sample</b>" > result.html
elif "{{ type }}" = "DNA" ]; then
echo "<b>This is a <font color='green'>RNA</font> sample</b>" > result.html
fi
files:
- Type Report:
file_type: html
filename_template: '{{ "report.html" }}'
Examples:
.. code-block:: python
>>> from esp.models import Task
>>> task = Task('Create Illumina Runsheet')
>>> task.name, task.created_at
('Create Illumina Runsheet', '2019-06-21T16:04:01.199076Z')
>>> # show relationships
>>> task.variables
['project']
>>> task.cmd
'/path/to/generate_runsheet.py {{project}} > runsheet.xml'
Arguments:
ident (str): Name or uuid for object.
"""
__api__ = "tasks"
__api_cls__ = "Task"
# NOTE: /api/task_definitions doesn't exist
__version_api__ = "task_definitions"
__mutable__ = BaseModel.__mutable__ + ["cmd", "task_files"]
__push_format__ = {"task_files": lambda x: raw(x.files)}
__exportable__ = BaseModel.__base_exportable__ + ["cmd", "files"]
__versioned_exportable__ = ["uuid", "def_uuid"]
def _export_files(self):
ret = []
for x in self.task_files:
y = {}
for key in ("name", "file_type", "filename_template"):
y[yaml_str(key)] = yaml_str(x[key])
if "tags" in x and x["tags"]:
y["tags"] = yaml_str_list(x["tags"])
ret.append(y)
return ret
__shallow_format__ = {"files": _export_files}
@cached
def files(self):
return self.task_files
[docs]class Analysis(LinkedModel):
"""
Object for interacting with analyses from the ESP database.
Examples:
.. code-block:: python
>>> from esp.models import Pipeline
>>> pipe = Pipeline('Generate Illumina Runsheet')
>>> # run pipeline to return analysis object
>>> analysis = pipe.run(project='My Project')
>>> analysis.task[0].stdout
'stdout from the first analysis task'
>>> analysis.task[0].files
[<File(name=Illumina Runsheet)>]
>>> # run pipeline (in background)
>>> analysis = pipe.run(project='My Project', background=True)
>>> analysis.pause()
>>> analysis.restart()
>>> analysis.kill()
>>> # get current analysis (within script)
>>> analysis = Analysis.current()
>>> analysis.pause()
Args:
ident (str): Name or uuid for object.
"""
__api__ = "pipeline_instances"
period = 0.25
@classmethod
def current(cls):
uu = os.environ.get("LAB7_PIPELINE_UUID")
if uu is None:
raise AssertionError("LAB7_PIPELINE_UUID is not set -- this can only " "be used within a running pipeline.")
return cls(uu)
@cached
def tasks(self):
"""
Return objects for each step in pipeline.
"""
return [AnalysisTask(x["uuid"]) for x in self.steps]
def pause(self):
result = base.SESSION.put("/api/{}/{}?action={}".format(self.__api__, self.uuid, "pause"))
return self.join(["done", "paused"], timeout=5)
def kill(self):
result = base.SESSION.put("/api/{}/{}?action={}".format(self.__api__, self.uuid, "kill"))
return self.join(["done", "failed"], timeout=5)
def restart(self):
result = base.SESSION.put("/api/{}/{}?action={}".format(self.__api__, self.uuid, "restart"))
return self.join(["running", "pending"], timeout=5)
[docs] def join(self, expect=["failed", "done", "paused"], timeout=1e6):
"""
Wait for analysis to finish, and return when done.
"""
count = 0
while True:
done = False
if self.state in expect:
done = True
if done or count > timeout:
break
time.sleep(self.period)
count += self.period
self.refresh()
return self
class LimsAnalysis(Analysis):
"""
Object for interacting with pipelines run via LIMS application.
Examples:
.. code-block:: python
>>> from esp.models import LimsAnalysis
>>> # get current analysis (within script)
>>> analysis = LimsAnalysis.current()
>>> analysis.pause()
>>> # get samples associated with current worksheet
>>> analysis.samples
[<Sample(ESP0001)>, <Sample(ESP0001)>]
>>> # get sample sheet associated with current worksheet
>>> analysis.sample_sheet
<SampleSheet(My-LIMS-Sample-Sheet)>
Args:
ident (str): Name or uuid for object.
"""
@cached
def samples(self):
"""
Fetch sample dependencies of current LIMS analysis.
"""
from .sample import Sample
return [Sample.from_data(x[0]) for x in self.priors if x[0].cls == "Sample" and x[1] == "processed"]
@cached
def sample_sheet(self):
"""
Fetch sample sheet dependency of current LIMS analysis.
"""
from .project import SampleSheet
ss = [SampleSheet.from_data(x[0]) for x in self.priors if x[0].cls == "SampleSheet" and x[1] == "ran"]
if ss:
ss = ss[0]
ss.refresh()
return ss
return None
[docs]class AnalysisTask(BaseModel):
"""
Object for interacting with task instances from the ESP database.
Examples:
.. code-block:: python
>>> from esp.models import Pipeline
>>> pipe = Pipeline('Generate Illumina Runsheet')
>>> # run pipeline to return analysis object
>>> analysis = pipe.run(project='My Project')
>>> # get first task object from analysis
>>> atask = analysis.task[0]
>>> atask.stdout
'stdout from the first analysis task'
>>> atask.stderr
'stderr from the first analysis task'
>>> atask.files
[<File(name=Illumina Runsheet)>]
Args:
ident (str): Name or uuid for object.
"""
__api__ = "task_instances"
@cached
def files(self):
"""
Property for accessing all files associated with task.
"""
from .file import File
res = []
for fi in self.task_files:
if fi["taskfile_name"] not in ["job_script", "stdout", "stderr"]:
res.append(File(fi["uuid"]))
return res
@cached
def script(self):
"""
Property to accessing task stdout.
"""
for fi in self.task_files:
if fi["taskfile_name"] == "job_script":
res = base.SESSION.get("/api/files/{}/contents".format(fi["uuid"]))
return res.json().get("data")
return
@cached
def stdout(self):
"""
Property to accessing task stdout.
"""
for fi in self.task_files:
if fi["taskfile_name"] == "stdout":
res = base.SESSION.get("/api/files/{}/contents".format(fi["uuid"]))
return res.json().get("data")
return
@cached
def stderr(self):
"""
Property to accessing task stderr.
"""
for fi in self.task_files:
if fi["taskfile_name"] == "stderr":
res = base.SESSION.get("/api/files/{}/contents".format(fi["uuid"]))
return res.json().get("data")
return
[docs]class AnalysisPool(object):
"""
Object for interacting with analyses from the ESP database.
Examples:
.. code-block:: python
>>> from esp.models import Pipeline
>>> pipe = Pipeline('Generate Illumina Runsheet')
>>> # run multiple scripts to gather in pool
>>> foo_analysis = pipe.run(project='Foo Project')
>>> bar_analysis = pipe.run(project='Bar Project')
>>> # create analysis pool to control execution
>>> pool = AnalysisPool([foo_analysis, bar_analysis])
>>> pool.state
'running'
>>> # wait for all analyses to finish in pool
>>> pool.join()
>>> pool.state
'done'
Args:
analyses (str): List of analyses to include in pool.
"""
def __init__(self, analyses):
self.analyses = analyses
return
def __iter__(self):
for obj in self.analyses:
yield obj
return
@property
def state(self):
"""
Return "state" of the analysis pool.
"""
states = [a.state for a in self.analyses]
if "failed" in states:
return "failed"
elif "paused" in states:
return "paused"
elif "running" in states:
return "running"
return "done"
[docs] def join(self, period=0.25):
"""
Wait for all analyses to finish, and return when they're done.
Arguments:
period (float): Period (seconds) for polling results
on a join operation.
"""
while True:
done = True
for obj in self.analyses:
if obj.state not in ["failed", "done"]:
done = False
break
if done:
break
time.sleep(period)
for obj in self.analyses:
obj.refresh()
return self.analyses