# -*- coding: utf-8 -*-
#
# Workflow-related models.
#
# ------------------------------------------------
# imports
# -------
import datetime
import hashlib
import json
import logging
import random
from urllib.parse import quote_plus
import uuid
import dateparser
from gems import cached, composite
import six
import esp.base as base
from .exception import MissingVersionError
from .sample import SampleType, EntityType, WorkflowableClass
from .__base__ import BaseModel, UUID_REMAP_CACHE
from .__base__ import (
list_dispatch,
create_and_return,
export_variable_definitions,
push_variable_definitions,
create_and_return_uuid,
yaml_str_list,
yaml_str,
raw,
export_fixed_id,
)
from esp.utils import is_uuid, data_to_uuid
def _import_signature_flow(flow, overwrite, allow_snapshot_uuid_remap):
from .signatureflow import SignatureFlow, SignatureFlowDefinition
# sf, version_was_resolved = SignatureFlow._resolve_existing(flow, allow_snapshot_uuid_remap)
# if sf.exists():
# flow.pop('uuid', None)
# if version_was_resolved:
# return flow
name = flow["name"]
uuid = flow.get("uuid")
def_uuid = flow.get("def_uuid")
if uuid is None:
sf = SignatureFlow(name)
else:
if def_uuid is not None:
try:
sf = SignatureFlow(uuid, version=def_uuid)
# force resolution of data so we can catch
# the missing version error here.
sf.exists()
except MissingVersionError:
sf = SignatureFlow(uuid)
else:
sf = SignatureFlow(uuid)
if sf.exists():
flow["uuid"] = sf["uuid"]
if uuid is not None:
UUID_REMAP_CACHE[uuid] = sf["uuid"]
# update signature (create new version)
create_and_return(SignatureFlow, flow, overwrite=overwrite, allow_snapshot_uuid_remap=allow_snapshot_uuid_remap)
else:
new_signature_flow = create_and_return(
SignatureFlow, flow, overwrite=overwrite, allow_snapshot_uuid_remap=allow_snapshot_uuid_remap
)
flow["uuid"] = new_signature_flow["uuid"]
UUID_REMAP_CACHE[flow["uuid"]] = flow["uuid"]
return flow
def _import_pipeline_with_snapshot(pipeline, snapshot, overwrite, allow_snapshot_uuid_remap):
from .analysis import Pipeline
pipe_snaps = {x["uuid"]: x for x in snapshot.get("pipelines", [])}
if "uuid" in pipeline and pipeline["uuid"] in pipe_snaps:
pipeline["snapshot"] = pipe_snaps[pipeline["uuid"]]
create_and_return(Pipeline, pipeline, overwrite=overwrite, allow_snapshot_uuid_remap=allow_snapshot_uuid_remap)
# models
# ------
[docs]class Workflow(BaseModel):
"""
Object for interacting with Workflows from the ESP database.
See the `Usage <./usage.html>`_ and `Examples <./examples.html>`_ pages
of the documentation for more context and comprehensive examples of
how to create and use this type of objects.
Configuration:
Simple workflow:
.. code-block:: yaml
name: Illumina Sequencing
desc: Workflow for filling in data during illumina sequencing.
tags: [demo, test]
protocols:
- Set Samples
- Create Illumina Library
- Analyze Sequencing Results
Workflow with embedded protocols:
.. code-block:: yaml
name: Illumina Sequencing
desc: Workflow for filling in data during illumina sequencing.
tags: [demo, test]
protocols:
- Set Samples:
protocol: standard
variables:
- Attachment:
rule: attachment
- Note:
rule: string
- Create Illumina Library:
desc: Create Illumina library and assign index codes.
protocol: sample
sample: Illumina Library
relation: 1-to-1
- Analyze Sequencing Results:
protocol: pipeline
group: All
pipeline: Miseq Analysis Pipeline
Workflow with sample types (embedded and referenced):
.. code-block:: yaml
name: Illumina Sequencing
desc: Workflow for filling in data during illumina sequencing.
tags: [demo, test]
sample_types:
- Generic sample
- Illumina Library:
desc: Sample type for illumina library.
sequences:
- ESP SEQUENCE
protocols:
- Set Samples
- Create Illumina Library
- Analyze Sequencing Results
Workflow with data links and sample groupings:
.. code-block:: yaml
name: Illumina Sequencing
desc: Workflow for filling in data during illumina sequencing.
tags: [demo, test]
links:
Analyze Sequencing Results:
Instrument: '{{ column_value("Instrument", "Create Illumina Library") }}'
groups:
Analyze Sequencing Results: Set Samples
sample_types:
- Illumina Library
protocols:
- Set Samples
- Create Illumina Library
- Analyze Sequencing Results
Configuration Notes:
* See the ESP documentation for information about data linking, sample groups, and
available protocol configuration options.
* More information on types of protocol configuration can be found in the Protocol
object documentation.
* Nesting configuration for protocols is usually an easier way of defining
configuration for protocols that won't be used across workflows.
Examples:
.. code-block:: python
>>> from esp.models import Workflow
>>> wf = Workflow('Illumina Sequencing')
>>> wf.name, wf.created_at
('Illumina Sequencing', '2019-06-21T16:04:01.199076Z')
>>> # show relationships
>>> wf.experiments
[<Experiment(name=Experiment 1)>, <Experiment(name=Experiment 2)>]
>>> wf.protocols
[<Protocol(name=Set Samples)>,
<Protocol(name=Create Illumina Library)>,
<Protocol(name=Analyze Sequencing Results)>]
>>> wf.sample_types
[<SampleType(name=Illumina Library)>]
Arguments:
ident (str): Name or uuid for object.
"""
__api__ = "workflows"
__api_cls__ = "Workflow"
__version_api__ = "workflow_definitions"
__mutable__ = BaseModel.__mutable__ + [
"template_type",
"sample_types",
"sample_group_indices",
"parent_group_indices",
"protocols",
"fixed_id",
]
__versioned_exportable__ = ["uuid", "def_uuid", "snapshot"]
@property
def snapshot(self):
return self.meta.get("snapshot", {})
def _push_sample_group_indices(self):
"""
Helper method for formatting sample_group indices.
"""
extra = [self.sample_group_indices[-1]] * (len(self.protocols) - len(self.sample_group_indices))
return raw(self.sample_group_indices) + extra
def _push_parent_group_indices(self):
"""
Helper method for formatting parent_group indices.
"""
extra = [self.parent_group_indices[-1]] * (len(self.protocols) - len(self.parent_group_indices))
return raw(self.parent_group_indices) + extra
__push_format__ = {
# account for changes to protocol array
"sample_group_indices": _push_sample_group_indices,
"parent_group_indices": _push_parent_group_indices,
"sample_types": lambda x: list(
map(lambda y: {"uuid": y.uuid, "def_uuid": y.def_uuid, "name": y.name}, x.sample_types)
),
"protocols": lambda x: list(map(lambda y: y.uuid, x.protocols)),
}
def _export_protocols(self, filter_empty, versioned):
if self._snapshot is None:
return [p.export(deep=True, filter_empty=filter_empty, versioned=versioned) for p in self.protocols]
protocol_snapshots = {x["uuid"]: x for x in self._snapshot.get("protocols", [])}
ret = []
for p in self.protocols:
# only used for deep export, so deep is true.
prot_snapshot = protocol_snapshots.get(p.uuid)
if prot_snapshot:
p = Protocol(p.uuid, version=prot_snapshot["def_uuid"], snapshot=prot_snapshot)
ret.append(p.export(deep=True, filter_empty=filter_empty, versioned=versioned))
return ret
__exportable__ = [x for x in __mutable__ if x != "barcode"] + [
"links",
"sequence_only",
"sequence",
"fixed_id",
"template_type",
]
__deep_format__ = {"protocols": _export_protocols}
def _export_sample_group_indices(self):
"""convert from a list of indices to a dict where the key is the protocol
that should be using a sample set and the value is the protocol that created
the sample set."""
samplesets = self.sample_group_indices
sampleset, sampleset_to_protocol = 0, {}
sampleset_to_protocol[sampleset] = yaml_str(self.protocols[0].name)
for protocol in self.protocols:
if protocol.protocol_type == "sample":
sampleset += 1
sampleset_to_protocol[sampleset] = yaml_str(protocol.name)
ret = {}
for index, sampleset in enumerate(samplesets):
ret[yaml_str(self.protocols[index].name)] = yaml_str(sampleset_to_protocol[sampleset])
return ret
def _export_parent_group_indices(self):
"""convert from a list of indices to a dict where the key is the protocol
that should be using a sample set and the value is the protocol that created
# the sample set."""
ret = {}
for index, parentgroup in enumerate(self.parent_group_indices):
ret[yaml_str(self.protocols[index].name)] = parentgroup
return ret
def _export_datalinks(self):
pname_to_links = {}
for i, links in enumerate(self.columns):
if not links:
continue
pname_to_links[yaml_str(self.protocols[i].name)] = {yaml_str(k): yaml_str(v) for k, v in links.items()}
return pname_to_links
def _export_meta(self):
meta = self.meta
if isinstance(self.meta, composite):
meta = meta.json()
meta.pop("sequence", None)
meta.pop("sequence_only", None)
return meta
def _export_sequence_only(self):
sonly = self.meta.get("sequence_only", False)
seq = self.meta.get("sequence", "")
if seq:
return sonly
return None
__export_format__ = {
"template_type": lambda x: yaml_str(x.template_type),
"protocols": lambda x: [yaml_str(i.name) for i in x.protocols],
"sample_types": lambda x: [yaml_str(i.name) for i in x.sample_types],
"sample_group_indices": _export_sample_group_indices,
"links": _export_datalinks,
"meta": _export_meta,
"sequence": lambda x: x.meta.get("sequence", []),
"sequence_only": _export_sequence_only,
"fixed_id": export_fixed_id,
"parent_group_indices": _export_parent_group_indices,
}
[docs] @classmethod
def parse_import(cls, config, overwrite=False, allow_snapshot_uuid_remap=False):
"""
Create new object in ESP database using config file or other data.
This method should be overwritten by subclasses of LinkedModel for
model-specific logic.
Args:
config (str, dict, list): Config file or information to use in
creating new object.
overwrite (bool): Whether or not to delete current entry in
the ESP database.
"""
# manage required sample types?
from .sample import WorkflowableType
snapshot = config.get("snapshot", {})
# workflows can receive a snapshot directly, from self-pinning,
# or they can receive a snapshot from a higher-level pinned WFC.
# In the former case, the snapshot will have a name. In the latter
# case, it won't. We only send the snapshot if it has a name.
if snapshot and "name" not in snapshot:
config.pop("snapshot")
entity_types = config.pop("entity_types", config.pop("sample_types", []))
if entity_types:
config["sample_types"] = create_and_return_uuid(WorkflowableType, entity_types, overwrite=overwrite)
if not isinstance(config["sample_types"], (list, tuple)):
config["sample_types"] = [config["sample_types"]]
else:
config["sample_types"] = []
config["template_type"] = config.get("template_type", "standard")
# create protocols
if "protocols" not in config or len(config["protocols"]) == 0:
raise AssertionError('Workflow "{}" must have at least one protocol to be created!'.format(config["name"]))
if snapshot:
prot_snaps = {x["uuid"]: x for x in snapshot.get("protocols", [])}
for protocol in config["protocols"]:
if "uuid" in protocol and protocol["uuid"] in prot_snaps:
protocol["snapshot"] = prot_snaps[protocol["uuid"]]
protocols = create_and_return(
Protocol, config["protocols"], overwrite=overwrite, allow_snapshot_uuid_remap=allow_snapshot_uuid_remap
)
config["protocols"] = [obj.uuid for obj in protocols]
# add data linking
if "links" in config:
links = config.pop("links")
cols = []
for obj in protocols:
if obj.name in links:
cols.append(links[obj.name])
else:
cols.append({})
config["columns"] = cols
# reassign sample group index using dictionary input (api niceness)
group_indices = config.get("sample_group_indices", {})
if isinstance(group_indices, dict):
idx, indices = 0, {}
for p in protocols:
if p.protocol_type == "sample":
idx += 1
indices[p.name] = idx
for key in group_indices:
indices[key] = indices[group_indices[key]]
config["sample_group_indices"] = [indices[obj.name] for obj in protocols]
# set parent group indices as a list where list index is protocol index and value is zero-indexed
# sample group of parent. empty list results in each sample protocol using the previous sample group
parent_group_indices = config.get("parent_group_indices", {})
if isinstance(parent_group_indices, dict):
parent_group_indices_list = []
if parent_group_indices:
for protocol in protocols:
if protocol.name in parent_group_indices:
parent_group_indices_list.append(parent_group_indices[protocol.name])
else:
parent_group_indices_list.append(0)
config["parent_group_indices"] = parent_group_indices_list
# create workflow and add protocols
return config
[docs] def drop(self, deep=False):
"""
Drop object from ESP database.
"""
protocols = self.protocols
super(Workflow, self).drop()
if deep:
for protocol in protocols:
try:
protocol.drop(deep=deep)
except:
logging.info("Cannot drop nested protocol {}. Used by multiple workflows.".format(protocol.name))
continue
return
@cached
def experiments(self):
"""
Property for fluidly accessing ``Experiment`` objects from metadata (only returns experiments associated
with the current workflow version)
"""
from .project import Experiment
res = base.SESSION.get("/api/workflow_instances", params=dict(workflow_name=self.name)).json()
return [Experiment(item["uuid"], data=item) for item in res if item["workflow_uuid"] == self.def_uuid]
[docs] def all_experiments(self):
"""
Returns:
All experiments for this workflow, regardless of workflow version
"""
from .project import Experiment
res = base.SESSION.get("/api/workflow_instances", params=dict(workflow_name=self.name)).json()
return [Experiment.from_data(item) for item in res]
@cached
def protocols(self):
"""
Property for fluidly accessing ``Protocol`` objects from metadata.
"""
snapshot = self._snapshot or {}
protocol_snap = {x["uuid"]: x for x in snapshot.get("protocols", [])}
return [
Protocol(
proto.uuid,
version=protocol_snap.get(proto.uuid, {}).get("def_uuid"),
snapshot=protocol_snap.get(proto.uuid),
)
for proto in self.data.protocols
]
@cached
def sample_types(self):
"""
Property for fluidly accessing ``SampleType`` objects from metadata.
"""
from .sample import EntityType
return [EntityType(stype.def_uuid) for stype in self.data.sample_types]
@cached
def sample_type(self):
"""
Property for accessing single sample type object from Workflow (if only
one sample type is used for the workflow).
"""
if len(self.sample_types) != 1:
raise AssertionError(
"Workflow {} has multiple sample types. " "Use self.sample_types instead.".format(self.name)
)
return self.sample_types[0]
[docs] @list_dispatch
def add(self, item):
"""
Add new objects (Protocol or SampleType) to existing workflow.
Args:
item (object): exsiting Protocol or SampleType object to add to workflow.
Examples:
>>> wf = Workflow('Test Workflow')
>>> print(wf.protocols)
['<Protocol(name=Set Samples)>',
'<Protocol(name=Run Analysis)>']
>>>
>>> pc = Protocol('Summarize Results')
>>> wf.add(pc)
>> print(wf.protocols)
['<Protocol(name=Set Samples)',
'<Protocol(name=Run Analysis)>',
'<Protocol(name=Summarize Results)>']
"""
from .sample import SampleType
if isinstance(item, SampleType):
self.sample_types.append(item)
self.push()
elif isinstance(item, Protocol):
self.protocols.append(item)
self.push()
else:
raise AssertionError("No rule for adding type {} to Workflow!".format(item.__class__.__name__))
return
[docs] @list_dispatch
def remove(self, item):
"""
Remove new objects (Protocol or SampleType) from an existing workflow.
Args:
item (object): exsiting Protocol or SampleType object to remove from workflow.
Examples:
>>> wf = Workflow('Test Workflow')
>> print(wf.protocols)
['<Protocol(name=Set Samples)',
'<Protocol(name=Run Analysis)>',
'<Protocol(name=Summarize Results)>']
>>>
>>> pc = Protocol('Summarize Results')
>>> wf.remove(pc)
>>> print(wf.protocols)
['<Protocol(name=Set Samples)>',
'<Protocol(name=Run Analysis)>']
>>>
"""
from .sample import SampleType
if isinstance(item, SampleType):
self.sample_types = list(filter(lambda x: x.name != item.name, self.sample_types))
self.push()
elif isinstance(item, Protocol):
self.protocols = list(filter(lambda x: x.name != item.name, self.protocols))
self.push()
else:
raise AssertionError("No rule for removing type {} from Workflow!".format(item.__class__.__name__))
return
[docs]class Protocol(BaseModel):
"""
Object for interacting with Protocols from the ESP database.
See the `Usage <./usage.html>`_ and `Examples <./examples.html>`_ pages
of the documentation for more context and comprehensive examples of
how to create and use this type of objects.
Configuration:
Standard protocol:
.. code-block:: yaml
name: Set Samples
protocol: standard
desc: Example standard protocol with a few columns.
tags:
- illumina
- sequencing
variables:
- Attachment Column:
rule: attachment
- Container Column:
rule: location
container: Freezer
- Dropdown Column:
rule: dropdown
dropdown:
- foo
- bar
- Date Column:
rule: date
- Note:
rule: string
Sample protocol with expressions:
.. code-block:: yaml
name: Create Illumina Library
desc: Example sample protocol with expression columns.
protocol: sample
sample: Illumina Library
relation: 1-to-1
variables:
- Sample Type:
rule: string
onchange: |+
alert('Changed Sample Type')
- Instrument:
rule: string
value: MiSeq, HiSeq 2000/2500
- Index Type:
rule: dropdown
required: true
dropdown: '{{ ilmn_kits() }}'
- I7 Index ID:
rule: dropdown
required: true
dropdown: '{{ ilmn_adapter_names(column_value("Index Type"), position="i7") }}'
- I7 Index:
rule: string
read_only: true
value: '{{ ilmn_adapter_seq(column_value("Index Type"), column_value("I7 Index ID")) }}'
- I5 Index ID:
rule: dropdown
required: true
dropdown: '{{ ilmn_adapter_names(column_value("Index Type"), position="i5") }}'
- I5 Index:
rule: string
read_only: true
value: '{{ ilmn_adapter_seq(column_value("Index Type"), column_value("I5 Index ID"), position="i5") }}'
Pipeline protocol with grouping:
.. code-block:: yaml
name: Analyze Sequencing Results
protocol: pipeline
desc: Run bioinformatics pipelines to analyze sequencing data.
group: Reference
pipeline: Miseq Analysis Pipeline
variables:
- Sample:
rule: string
visible: false
pipeline_param: true
value: '{{ sample["name"] }}'
- Reference:
rule: dropdown
pipeline_param: true
default: GRCh38
dropdown:
- GRCh37
- GRCh38
Pipeline protocol with embedded pipeline:
.. code-block:: yaml
name: Analyze Sequencing Results
protocol: pipeline
desc: Run bioinformatics pipelines to analyze sequencing data.
group: All
pipeline:
Miseq Analysis Pipeline:
desc: Analyze sequenced library with MiSeq analysis pipeline.
tasks:
- BWA Align:
desc: Align fastq files to reference.
cmd: "echo 'This is a bam created with the command: bwa mem {{ Sample }}.fq.gz' > {{ Sample }}.bam"
files:
- Aligned BAM:
file_type: text
filename_template: "{{ Sample }}.bam"
- GATK Unified Genotyper:
desc: Use GATK to genotype bam file.
cmd: "echo 'This is a vcf created with the command: gatk UnifiedGenotyper -I {{ Sample }}.bam -R /references/{{ Reference }}.fa' > {{ Sample }}.vcf"
files:
- Genotype VCF:
file_type: text
filename_template: "{{ Sample }}.vcf"
deps:
GATK Unified Genotyper: BWA Align
Protocol with inheritance:
.. code-block:: yaml
name: Set Samples Override
inherit: Set Samples
desc: Inherited protocol with extra columns.
variables:
- Extra Column:
rule: numeric
Configuration Notes:
* See the ESP documentation for information about type types of columns and associated
metadata available to use as protocol variables.
* Using the ``group`` parameter on protocol definitions allows developers to
group columns by a specified column or value.
* The sample protocol ``relation`` property can take either ``1-to-1``,
``fan-out``, or ``fan-in``.
* Protocol inheritance is useful for creating slightly modified copies of existing
protocols without needing to re-define variable configuration. All parameters
are copied over from the inherited protocol.
* Nesting configuration for pipelines is usually an easier way of defining
configuration for pipelines that won't be used across protocols.
Examples:
.. code-block:: python
>>> from esp.models import Protocol
>>> pc = Protocol('Analyze Sequencing Results')
>>> pc.name, pc.created_at
('Analyze Sequencing Results', '2019-06-21T16:04:01.199076Z')
>>> # show relationships
>>> pc.pipeline
<Pipeline(name=Analyze Sequencing Results)>
Arguments:
ident (str): Name or uuid for object.
"""
__api__ = "protocols"
__api_cls__ = "Protocol"
__version_api__ = "protocol_definitions"
__defaults__ = {
"variables": [],
"parent_child_relation": "1-to-1",
}
__var_keys__ = [
"meta",
"name",
"default_val",
"dropdown_expr",
"in_sample_sheet",
"pipeline_param",
"var_type",
"required",
"read_only",
"dropdown",
"source",
"onchange",
]
__mutable__ = BaseModel.__mutable__ + [
"protocol_type",
"template_type",
"parent_child_relation",
"sample_type_uuid",
"pipeline_uuid",
"variables",
"group",
"instructions",
"fixed_id",
]
__versioned_exportable__ = ["uuid", "def_uuid"]
__push_format__ = {
"sample_type_uuid": lambda x: x.sample_type.uuid if x.protocol_type == "sample" else None,
"pipeline_uuid": lambda x: x.pipeline.uuid if x.protocol_type == "pipeline" else None,
"variables": lambda x: push_variable_definitions(raw(x.variables)),
"onrender": lambda x: x.meta.get("code", None),
}
@property
def __exportable__(self):
ret = BaseModel.__base_exportable__ + [
"protocol",
"template_type",
"group",
"onrender",
"instructions",
"fixed_id",
"signature_flows",
"sample_types",
"sample_classes",
]
if self.protocol_type == "sample":
ret += ["sample", "relation"]
elif self.protocol_type == "pipeline":
ret += ["pipeline"]
elif self.protocol_type == "standard":
ret += ["steps"]
if any(x["var_type"] == "pipelinebutton" for x in self.variables):
ret += ["pipelines"]
ret += ["variables"]
return ret
def _export_meta(self):
meta_copy = dict(self.meta.json())
# code handled as onrender.
meta_copy.pop("code", None)
meta_copy.pop("parent_child_relation", None)
meta_copy.pop("sample_type_uuid", None)
meta_copy.pop("pipeline_input", None)
meta_copy.pop("pipeline_run", None)
meta_copy.pop("pipeline_uuid", None)
return meta_copy
def _export_variables(self, variables):
def exclude(data):
if data["var_type"] == "complete" and not data.get("onchange", data.get("meta", {}).get("onchange")):
return True
if self.protocol_type == "sample" and (
data["name"] in ["Parent ID", "Sample ID", "Note"]
or (data["name"] == "Sample ID Sequence" and len(self.sample_type.sequences) < 2)
):
return True
# NOTE: this might change in the future if we convert
# "run a pipeline protocol" into "run a pipeline column".
if data["var_type"] == "pipeline_instance_uuid":
return True
return export_variable_definitions(variables, exclude)
def _export_steps(self):
ret = []
if "steps" not in self.data or not self.steps:
return None
if self.protocol_type != "standard":
raise ValueError("Error: steps only allowed in standard protocol.")
for step in self.steps:
x = {}
for key in ("desc",):
if key in step and step[key]:
x[yaml_str(key)] = yaml_str(step[key])
x["variables"] = self._export_variable_definitions(step["variables"])
ret.append(x)
return ret
def _export_signature_flows(self, versioned=False):
from .signatureflow import SignatureFlow, SignatureFlowDefinition
exported_flows = set()
ret = []
snapshot = self._snapshot or {}
sf_snaps = {x["uuid"]: x for x in snapshot.get("signature_flows", [])}
for var in self.variables:
# logging.info('var: {}'.format(jsons.dumps(var)))
if var["var_type"] == "signatureflow":
# TODO: Rework this export when the signature flow payload is reworked
# to behavior more like other versioned objects.
meta = var["meta"]
sig_flow_uuid = meta["signature_flow_uuid"]
sf_snap = sf_snaps.get(sig_flow_uuid, {})
signature_flow = SignatureFlow(sig_flow_uuid)
version_uuid = sf_snap.get("def_uuid", signature_flow["head"])
if version_uuid not in exported_flows:
signature_flow_def = SignatureFlowDefinition(version_uuid)
export = signature_flow_def.export(deep=True, versioned=versioned)
if versioned:
export_dict = list(export.values())[0]
export_dict["def_uuid"] = version_uuid
export_dict["uuid"] = sig_flow_uuid
ret.append(export)
exported_flows.add(version_uuid)
return ret
def export_sample_types(self):
ret = []
for var in self.variables:
if var["var_type"] == "sample_point":
sample_types = var["meta"].get("sample_types", [])
for sample_type in sample_types:
sample_type_obj = EntityType(sample_type["uuid"])
sample_type_added = [s for s in ret if sample_type_obj.name in s]
if len(sample_type_added) == 0:
sample_type_exp = sample_type_obj.export()
ret.append(sample_type_exp)
return ret
def export_sample_classes(self):
ret = []
for var in self.variables:
if var["var_type"] == "sample_point":
sample_types = var["meta"].get("sample_types", None)
for sample_type in sample_types:
sample_type_obj = EntityType(sample_type["uuid"])
sample_class = sample_type_obj.workflowable_class
sample_class_added = [s for s in ret if sample_class.name in s]
if len(sample_class_added) == 0:
ret.append(sample_class.export())
return ret
__shallow_format__ = {
"protocol": lambda x: yaml_str(x.protocol_type),
"template_type": lambda x: yaml_str(x.template_type),
"sample": lambda x: yaml_str(x.sample_type.name),
"relation": lambda x: yaml_str(x.parent_child_relation),
"pipeline": lambda x: yaml_str(x.pipeline.name),
"pipelines": lambda x: [yaml_str(pipe.name) for pipe in x.pipelines],
"variables": lambda x: x._export_variables(x.variables),
"group": lambda x: yaml_str(x.group) if x.group else None,
"onrender": lambda x: x.meta.get("code", ""),
"meta": _export_meta,
"steps": _export_steps,
"fixed_id": export_fixed_id,
# TODO: I wouldn't export the signature flows to be exported
# in a shallow export, only the reference to them.
"signature_flows": _export_signature_flows,
}
__deep_format__ = {
"pipeline": lambda x, versioned: x.pipeline.export(deep=True, versioned=versioned),
"pipelines": lambda x, versioned: [pipe.export(deep=True, versioned=versioned) for pipe in x.pipelines],
}
__export_format__ = __shallow_format__
@property
def __droppable__(self):
# TODO: need a more efficient way to query this, but
# generally this is only used for cleanup of tests.
if not self.exists():
return False
wfs = Workflow.all()
for wf in wfs:
if not wf.deleted and self.name in [x.name for x in wf.protocols]:
return False
return True
[docs] @classmethod
def parse_import(cls, config, overwrite=False, allow_snapshot_uuid_remap=False):
"""
Create new object in ESP database using config file or other data.
This method should be overwritten by subclasses of LinkedModel for
model-specific logic.
Args:
config (str, dict, list): Config file or information to use in
creating new object.
overwrite (bool): Whether or not to delete current entry in
the ESP database.
"""
from .sample import SampleType
from .inventory import ItemType
from .analysis import Pipeline
from .signatureflow import SignatureFlow, SignatureFlowDefinition
import markdown
snapshot = config.pop("snapshot", {})
# manage protocol inheritance
if "inherit" in config:
inherit = config.pop("inherit")
pc = Protocol(inherit)
if not pc.exists():
raise AssertionError("Could not find existing protocol {} to inherit from!".format(inherit))
# TODO: change this process to an export, dict.update()/re-import
# high-level metadata
iconfig = {
"name": config.get("name", pc.name),
"desc": config.get("desc", pc.desc),
"protocol_type": config.get("protocol_type", pc.protocol_type),
"template_type": config.get("template_type", pc.template_type),
"variables": list(map(lambda x: {k: x[k] for k in x if k in cls.__var_keys__}, pc.variables.json())),
"group": config.get("group", pc.group),
"tags": list(set(config.get("tags", []) + list(pc.tags))),
}
if "onrender" in pc.data or "onrender" in config:
iconfig["onrender"] = config.pop("onrender", pc.data.get("onrender"))
# updating variables
reindex = {}
for var in config.get("variables", []):
changed = False
if "index" in var:
reindex[var["name"]] = var.pop("index")
for idx, ivar in enumerate(iconfig["variables"]):
if ivar["name"] == var["name"]:
if var.get("delete", False):
iconfig["variables"].pop(idx)
else:
iconfig["variables"][idx].update(var)
changed = True
break
if not changed:
iconfig["variables"].append(var)
# reindexing variables (if specified)
for key, idx in sorted(list(reindex.items()), key=lambda x: x[1], reverse=True):
vnames = [x["name"] for x in iconfig["variables"]]
cidx = vnames.index(key)
var = iconfig["variables"].pop(cidx)
iconfig["variables"].insert(idx, var)
# other metadata
if iconfig["protocol_type"] == "sample":
iconfig["sample_type"] = config.pop("sample_type") if "sample_type" in config else pc.sample_type.name
iconfig["parent_child_relation"] = (
config.pop("parent_child_relation")
if "parent_child_relation" in config
else pc.parent_child_relation
)
if "sample_classes" in config:
iconfig["sample_classes"] = config.pop("sample_classes", [])
if "sample_types" in config:
iconfig["sample_types"] = config.pop("sample_types", [])
if iconfig["protocol_type"] == "pipeline":
iconfig["pipeline"] = config.pop("pipeline") if "pipeline" in config else pc.pipeline.name
if "signature_flows" in config:
iconfig["signature_flows"] = config.pop("signature_flows")
config = iconfig
# set default protocol type (if not specified)
config["protocol_type"] = config.get("protocol_type", "standard")
# set default template type (if not specified)
config["template_type"] = config.get("template_type", "standard")
# The backend will break if the all grouping is not exactly the text
# All, so protect ourselves against silly mistakes.
if "group" in config and str(config["group"]).lower() == "all":
config["group"] = "All"
# assertions
if "steps" in config and config["protocol_type"] != "standard":
raise AssertionError("Error: steps only allowed in standard protocols!")
# make better descriptions
if config.get("desc") is not None and "\n" in config.get("desc"):
config["desc"] = markdown.markdown(config["desc"].strip(), extensions=["extra"], tab_length=2)
# interpret variable strings
for idx, var in enumerate(config["variables"]):
if isinstance(var, six.string_types):
config["variables"][idx] = {"name": var, "var_type": "string"}
# sample protocol logic
if config["protocol_type"] == "sample":
st = SampleType(config.pop("sample_type", "Generic sample"))
if not st.exists():
# localized fix for APPS-7823.
# Longer-term, we need to properly "fix" the client to be
# aware of the fact that inventory items and containers are all
# workflowable and that container and inventory item are now
# specialized sample classes. Note, however, that container types
# aren't currently supported for sample protocols.
if is_uuid(st.ident):
key = "uuid"
else:
key = "name"
# note: as of 3.0 where this branch comes into play, name filtering
# is exact match, case-insensitive.
data = base.SESSION.get('/api/sample_types?clses=["ItemType"]&{}={}'.format(key, st.ident)).json()
if not data:
raise AssertionError(
"Cannot create sample protocol with non-existent SampleType: {}".format(st.ident)
)
st = SampleType.from_data(data[0])
# NOTE: BACKEND DOESN'T CREATE PROTOCOL-SPECIFIC REQUIRED
# COLUMNS IMPLICITLY
one_def = {
"name": "Parent ID",
"in_sample_sheet": True,
"read_only": True,
"pipeline_param": True,
"var_type": "string",
}
group_def = {
"name": "Group",
"in_sample_sheet": True,
"read_only": True,
"pipeline_param": True,
"var_type": "string",
"default_val": "All",
}
required = [
{
"name": "Sample ID Sequence",
"in_sample_sheet": False,
"read_only": True,
"pipeline_param": True,
"var_type": "text",
"default_val": st.sequences[0],
},
group_def if config["parent_child_relation"] == "fan-in" else one_def,
{
"name": "Sample ID",
"in_sample_sheet": False,
"read_only": True,
"pipeline_param": True,
"var_type": "string",
},
{
"name": "Note",
"in_sample_sheet": True,
"pipeline_param": True,
"var_type": "string",
},
]
rnames = [x["name"] for x in required]
# Sample ID Sequence is a special case b/c the value can be any of the valid sample type sequences for
# this sample type OR an expression that will be resolved at runtime to one of the valid sample type
# sequences for this sample type. So if the user has specified Sample ID Sequence, take what they've got.
var = [x for x in config["variables"] if x["name"] == "Sample ID Sequence"]
if var and var[0].get("default_val"):
required[0]["default_val"] = var[0]["default_val"]
config["variables"] = required + list(filter(lambda x: x["name"] not in rnames, config["variables"]))
config["sample_type_uuid"] = st.uuid
# pipeline protocol logic
if config["protocol_type"] == "pipeline":
required = [{"name": "Start", "in_sample_sheet": True, "var_type": "pipeline_instance_uuid"}]
rnames = [x["name"] for x in required]
config["variables"] = required + list(filter(lambda x: x["name"] not in rnames, config["variables"]))
pipeline_config = config.pop("pipeline")
if not isinstance(pipeline_config, six.string_types) and snapshot and "pipeline" in snapshot:
pipeline_config["snapshot"] = snapshot["pipeline"]
config["pipeline_uuid"] = create_and_return_uuid(
Pipeline, pipeline_config, overwrite=overwrite, allow_snapshot_uuid_remap=allow_snapshot_uuid_remap
)
# create sample classes
sample_classes = config.pop("sample_classes", [])
for sample_class in sample_classes:
create_and_return_uuid(WorkflowableClass, sample_class, overwrite=overwrite)
# create sample types
entity_types = config.pop("sample_types", [])
if len(entity_types) > 0:
for entity_type in entity_types:
if entity_type.get("class", "") == "":
entity_type["class"] = "Sample"
create_and_return_uuid(EntityType, entity_type, overwrite=overwrite)
# create signature flows
signatureflows = config.pop("signature_flows", [])
for signatureflow in signatureflows:
create_and_return(SignatureFlow, signatureflow, overwrite, allow_snapshot_uuid_remap)
# why special case this???
# _import_signature_flow(
# signatureflow,
# overwrite,
# allow_snapshot_uuid_remap)
# create pipelines.
pipelines = config.pop("pipelines", [])
pipe_snaps = {x["uuid"]: x for x in snapshot.get("pipelines", [])}
for pipeline in pipelines:
pipeline_id = pipeline if isinstance(pipeline, six.string_types) else pipeline.get('uuid')
_import_pipeline_with_snapshot(
pipeline,
pipe_snaps.get(pipeline_id, {}),
overwrite,
allow_snapshot_uuid_remap=allow_snapshot_uuid_remap,
)
# format specified variables
config["variables"] = push_variable_definitions(config["variables"])
if "onrender" in config:
config.setdefault("meta", {})["code"] = config.pop("onrender")
# Because format_entry() in __base__.py renames "value" to "default_val", Flex view designer doesn't render on the frontend.
# The key "value" in meta.report needs to stay "value" and not be default_val. It's a little ugly, but we need to go back
# through meta and undo that change. We can't change format_entry because it affects way too much
if "meta" in config and "report" in config["meta"]:
fix_meta_for_flex(config["meta"]["report"])
return config
[docs] def drop(self, deep=False):
"""
Drop object from ESP database.
"""
pipeline = self.pipeline if self.protocol_type == "pipeline" else None
super(Protocol, self).drop()
if deep and pipeline:
try:
pipeline.drop(deep=deep)
except:
logging.info("Cannot drop nested pipeline {}. Used by multiple protocols.".format(pipeline.name))
return
@property
def onrender(self):
return self.meta.code
@cached
def sample_type(self):
"""
Return associated sampletype object.
"""
# Note: when fetching sample protocols from the server, the associated
# sample type uuid currently points to a definition uuid rather than the
# sample type uuid, so account for that.
from .sample import SampleType
if self.protocol_type != "sample":
raise AttributeError("Cannot access sample_type property from non-sample protocol!")
return SampleType(self.sample_type_uuid)
@cached
def pipeline(self):
"""
Return associated pipeline object.
"""
from .analysis import Pipeline
if self.protocol_type != "pipeline":
raise AttributeError(" Cannot access pipeline property from non-pipeline protocol!")
snapshot = self._pipeline_snapshots.get(self.pipeline_uuid, {})
return Pipeline(self.pipeline_uuid, version=snapshot.get("def_uuid"), snapshot=snapshot)
@cached.tag("refresh")
def pipelines(self):
"""
Return all pipeline objects associated with pipeline button fields.
"""
from .analysis import Pipeline
pipelines = []
snapshots = self._pipeline_snapshots
for var in self.variables:
if var.var_type == "pipelinebutton":
pipe_uuid = var.meta.pipeline
pipelines.append(
Pipeline(
pipe_uuid,
version=snapshots.get(pipe_uuid, {}).get("def_uuid"),
snapshot=snapshots.get(pipe_uuid),
)
)
return pipelines
@cached
def _pipeline_snapshots(self):
if self._snapshot is None:
return {}
return {x["uuid"]: x for x in self._snapshot.get("pipelines", [])}
@cached
def workflows(self):
"""
Query Workflows containing protocol.
"""
raise NotImplementedError
[docs]class WorkflowChain(BaseModel):
"""
Object for interacting with WorkflowChains from the ESP database.
See the `Usage <./usage.html>`_ and `Examples <./examples.html>`_ pages
of the documentation for more context and comprehensive examples of
how to create and use this type of objects.
Configuration:
Simple workflow chain:
.. code-block:: yaml
name: Manufacturing Chain
desc: Workflow chain for manufacturing process.
tags: [demo, test]
chain:
- Initiate Manufacturing:
to:
- Process A
- Process B
- Process A:
head: false
workflow: My Process
virtual: true
- Process B:
head: false
workflow: My Process
resubmit: True
Workflow chain with conditionals and strategies:
.. code-block:: yaml
name: Manufacturing Chain
desc: Workflow chain for manufacturing process.
tags: [demo, test]
chain:
- Initiate Manufacturing:
to:
- Process A:
if: '{{ column_value("Column 1") == "foo" }}'
sample_type: Specimen
- Process A:
head: false
to: Process B
workflow: My Process
strategy: named_strategy
- Process B:
head: false
workflow: My Process
Workflow chain with node positioning and swimlanes:
.. code-block:: yaml
Demo WFC 3:
chain:
- Node 1:
head: true
position:
x: 290.75
y: 100
startable: true
swimlane: sl-1641460171013
to:
- Checkbox Test:
if: '{{ True }}'
- Checkbox Test:
position:
x: 570.8214285714287
y: 300
startable: true
swimlane: sl-1641460181862
to:
- Approval Instructions:
if: ''
swimlanes:
- color: '#3d85c6'
height: 200
id: sl-1641460171013
label: lane 1
y: 0
- color: '#ff9900'
height: 200
id: sl-1641460181862
label: lane 2
y: 200
Configuration Notes:
* See the ESP documentation for information about how to configure workflow
transition strategies.
* See the conditional transition logic above for an example of how to send
subsets of samples to downstream nodes in a workflow chain.
* More information on types of workflow configuration can be found in the Workflow
object documentation.
* Nesting configuration for workflows is usually an easier way of defining
configuration for workflows that won't be used across workflow chains.
* Note the on the backend, rules are evaluated for transitions in the
context of the worksheet that triggers the transition, except there is
no "current"/"active" protocol, so expressions like column_value must
specify both the column and the protocol.
Examples:
.. code-block:: python
>>> from esp.models import WorkflowChain
>>> chain = WorkflowChain('Illumina Sequencing')
>>> chain.name, chain.created_at
('Illumina Sequencing', '2019-06-21T16:04:01.199076Z')
>>> # show relationships
>>> chain.workflows
[<Workflow(name=Initiate Manufacturing)>,
<Workflow(name=Process A)>,
<Workflow(name=Process B)>]
Arguments:
ident (str): Name or uuid for object.
"""
__api__ = "workflow_chains"
__version_api__ = "workflow_chain_definitions"
__mutable__ = BaseModel.__mutable__ + ["fixed_id", "template_type", "variables"]
__defaults__ = {"variables": []}
__push_format__ = {"variables": lambda x: push_variable_definitions(raw(x.variables))}
@property
def variables(self):
"""Returns the list of resource vars"""
return self.resource_vars
def _data_by_uuid(self):
try:
url = "/api/{}/{}?ignore_deleted=false".format(self.__api__, self.ident)
return base.SESSION.get(url).json()
except:
return {}
[docs] @classmethod
def parse_import(cls, config, overwrite=False, allow_snapshot_uuid_remap=False):
"""
Create new object in ESP database using config file or other data.
This method should be overwritten by subclasses of LinkedModel for
model-specific logic.
Args:
config (str, dict, list): Config file or information to use in
creating new object.
overwrite (bool): Whether or not to delete current entry in
the ESP database.
"""
# assertions
if isinstance(config, six.string_types):
raise ValueError("Workflow Chain {} does not exist".format(config))
if "chain" not in config or not len(config["chain"]):
raise ValueError("Chain has name but no definition!")
chain_name = config["name"]
config["template_type"] = config.get("template_type", "standard")
snapshot = config.get("snapshot", {})
wf_snaps = {x["uuid"]: x for x in snapshot.get("workflows", [])}
# create members
members, transitions = [], []
head, xfrom, xto = 0, set(), set()
swimlane_definitions = config.pop("swimlanes", [])
swimlane_mapping_list = []
swimlanes = {"mapping": swimlane_mapping_list, "definitions": swimlane_definitions}
for idx, member in enumerate(config.pop("chain", [])):
# assert and normalize
if isinstance(member, six.string_types):
member = {"workflow": member}
if "name" in member and "workflow" not in member:
member["workflow"] = member["name"]
if "workflow" not in member:
raise AssertionError("Expected `workflow` property to be defined on WFC members.")
workflow = member["workflow"]
obj = None
if isinstance(workflow, six.string_types):
obj = Workflow.search(member["workflow"])
if obj:
obj = obj[0]
else:
obj = None
elif "uuid" in workflow and "def_uuid" in workflow:
wf_snap = wf_snaps.get(workflow["uuid"])
try:
obj = Workflow(workflow["uuid"], version=workflow["def_uuid"], snapshot=wf_snap)
if not obj.exists():
obj = None
if wf_snap:
member["snapshot"] = wf_snap
except MissingVersionError:
obj = None
if obj is None:
obj = create_and_return(
Workflow,
member["workflow"],
overwrite=overwrite,
allow_snapshot_uuid_remap=allow_snapshot_uuid_remap,
)
name = member.get("name", obj.name)
if "position" in member:
member.setdefault("meta", {})["position"] = member.pop("position")
# save transitions
to = member.get("to", [])
if isinstance(to, six.string_types):
to = [to]
elif isinstance(to, dict):
# case: to: contains a single entry. In this case, the auto-reformatting remaps it to {'name': 'foo', ...}
if "name" in to:
to = [to]
# case: to: contains multiple entries. In this case, the reformatting does not remap it, so we do it.
else:
to = [dict(name=k, **to[k]) for k in to]
elif not isinstance(to, (list, tuple)):
raise ValueError("Unhandled transition format: {}".format(to))
for cfg in to:
if isinstance(cfg, six.string_types):
cfg = {"name": cfg}
# basic configuration
xfrom.add(name)
xto.add(cfg["name"])
data = dict(
workflow_chain_member_from=name,
workflow_chain_member_to=cfg["name"],
rule=cfg["if"] if "if" in cfg else "{{True}}",
duration=cfg.get("duration", 0),
)
if "uuid" in cfg:
data["uuid"] = cfg["uuid"]
# optional configuration
if "virtual" in cfg:
data["virtual"] = cfg["virtual"]
if "strategy" in cfg:
data["strategy"] = cfg["strategy"]
elif "sample_type" in cfg:
data["strategy"] = "createsample"
data.setdefault("parameters", {})["sample_type"] = cfg.pop("sample_type")
elif "resubmit" in cfg and cfg["resubmit"]:
data["strategy"] = "resubmit"
else:
data["strategy"] = "identity"
parameters = cfg.pop("parameters", data.get("parameters", {}))
if data["strategy"] == "createsample":
sample_type = cfg.pop("sample_type", parameters.pop("sample_type", None))
if not sample_type:
raise ValueError("createsample strategy required sample_type!")
st = SampleType(sample_type)
if not st.exists():
raise ValueError(("Specified sample type `{}` does not " "exist!").format(st.ident))
parameters["sample_type_uuid"] = st.uuid
if parameters:
data["parameters"] = parameters
transitions.append(data)
# add to list
if member.get("head", False):
head += 1
fixed_id = member.get("fixed_id", member.get("barcode", name))
member = dict(
workflow=obj.uuid,
name=name,
is_head=member.get("head", False),
start=member.get("startable", member.get("start", False)),
meta=member.get("meta"),
swimlane=member.get("swimlane"),
)
# For backwards compatibility with existing chains (b/c UUID is computed):
# only add the fixed_id attribute if it is present and different from the name.
if fixed_id != name:
member["fixed_id"] = fixed_id
# pre-generate uuid for member based on data
# We add "chain_name" here to ensure a client-side-generated member can only belong to
# a single WFC.
member["chain_name"] = chain_name
if "uuid" not in member:
member["uuid"] = data_to_uuid(member)
if "swimlane" in member:
swimlane_mapping_list.append({"id": member["uuid"], "row": member.pop("swimlane")})
member.pop("chain_name")
members.append(member)
# check for head or infer
if not head:
heads = xfrom - xto
if not heads:
raise ValueError("Cyclic chains must declare `head` on one chain member.")
# this might be slightly different than the original logic, which probably tolerated two nodes with no incoming transitions.
elif len(heads) > 1:
raise ValueError("Multiple head nodes detected. Only a single head node allowed.")
head = list(heads)[0]
[x for x in members if x["name"] == head][0]["is_head"] = True
elif head > 1:
raise ValueError("Multiple head nodes declared. Only a single head node allowed.")
# re-configure transition names
member_uuid = {x["name"]: x["uuid"] for x in members}
for transition in transitions:
transition["workflow_chain_member_from"] = member_uuid[transition["workflow_chain_member_from"]]
transition["workflow_chain_member_to"] = member_uuid[transition["workflow_chain_member_to"]]
transition["chain_name"] = chain_name
if "uuid" not in transition:
transition["uuid"] = data_to_uuid(transition)
transition.pop("chain_name")
config["members"] = members
config["transitions"] = transitions
config["swimlanes"] = swimlanes
# check for variables.
for idx, var in enumerate(config.get("variables", [])):
if isinstance(var, six.string_types):
config["variables"][idx] = {"name": var, "vary_type": "string"}
config["resource_vars"] = push_variable_definitions(config.pop("variables", []))
# TODO: check signature flows and pipelines. Skip for now.
pipe_snaps = {x["uuid"]: x for x in snapshot.get("pipelines", [])}
for pipeline in config.pop("pipelines", []):
_import_pipeline_with_snapshot(
pipeline, pipe_snaps.get(pipeline.get("uuid"), {}), overwrite, allow_snapshot_uuid_remap
)
from esp.models import SignatureFlow
for flow in config.pop("signature_flows", []):
# _import_signature_flow(flow, overwrite, allow_snapshot_uuid_remap)
create_and_return(SignatureFlow, flow, overwrite, allow_snapshot_uuid_remap)
return config
@cached
def workflows(self):
return [Workflow(x["workflow"]) for x in self.members]
def _export_transition(self, t, members_map, versioned):
if t.deleted:
raise AssertionError("Should not have deleted transitions in Workflow Chain definition!")
texport = {}
texport["if"] = t["rule"]
if versioned:
texport["uuid"] = t["uuid"]
texport["duration"] = t.get("duration", 0)
strategy = t.meta.get("strategy", "identity")
if strategy != "identity":
texport["strategy"] = strategy
if strategy == "createsample":
texport["strategy"] = "createsample"
# older versions of ESP (< 2.3.2) use t.meta['sample_type_uuid']. Newer versions (> 2.3.2) use
# t.meta['parameters']['sample_type_uuid']. Support either in the client for now.
# Also, we now have more parameters possible.
parameters = {}
if "parameters" in t.meta:
parameters.update(t.meta["parameters"])
else:
parameters["sample_type_uuid"] = t.meta.get("sample_type_uuid")
parameters["lab7_id_sequence"] = t.meta.get("lab7_id_sequence")
parameters["Number of Children"] = t.mget.get("Number of Children", 1)
sample_type_uuid = parameters.pop("sample_type_uuid", None)
if not sample_type_uuid:
raise ValueError("Should have sample_type_uuid in transition meta or transition.meta.parameters!")
stype = SampleType(sample_type_uuid)
parameters["sample_type"] = stype.name
if not parameters.get("lab7_id_sequence"):
seqs = stype.sequences
if seqs:
parameters["lab7_id_sequence"] = seqs[0]
else:
parameters.pop("lab7_id_sequence", None)
texport["parameters"] = parameters
elif t.meta.get("parameters"):
texport["parameters"] = t.meta["parameters"]
if t.desc:
texport["desc"] = t.desc
if t.tags:
texport["tags"] = t.tags
if t.virtual:
texport["virtual"] = True
return {members_map[t.workflow_chain_member_to].name: texport}
def _simple_transition(self, t):
attrib = list(t.values())[0]
return (
"virtual" not in attrib
and "if" not in attrib
or attrib["if"] == "true"
and "resubmit" not in attrib
and "strategy" not in attrib
and "sample_type" not in attrib
and "desc" not in attrib
and "tags" not in attrib
)
# TODO: tech debt reduction: WFCs are gaining more properties.
# It's worth creating a pseudo property "chain" and refactoring
# WFC export to use the standard export code with a formatter
# for the chain, where we don't directly export nodes or transition,
# just the chain.
def _export(self, deep, filter_empty=False, versioned=False):
"""Export a yaml-formatted workflow chain.
Args:
filename (str): Filename to write yaml output to
deep (bool): Perform a deep (True) or shallow (False) export
filter_empty (bool): Whether ot filter out empty properties.
Note:
A deep export embeds all nested objects (e.g.: protocols of a workflow).
A deep export is therefore the yaml analog to the .lab7 file.
A shallow export uses names as references to nested objects.
This method ignores filter_empty and always filters to the simplest
possible text representation of the data.
"""
# The backend wfc structure and the yaml config are different enough
# that overriding _export makes the most sense.
export = {}
if versioned and self._snapshot is not None:
export["snapshot"] = raw(self._snapshot)
# grandfather 3.1 interim snapshot structure for SF.
# TODO: Remove this code for 3.2.
if "signature_flow" in export["snapshot"]:
flow = export["snapshot"].pop("signature_flow")
if flow and "signature_flows" not in export["snapshot"]:
export["snapshot"]["signature_flows"] = [flow]
export["name"] = self.name
if self.desc:
export["desc"] = self.desc
if self.tags:
export["tags"] = self.tags.json()
if self.variables:
export["variables"] = export_variable_definitions(self.variables)
fixed_id = self.data.get("fixed_id", self.data.get("barcode", self.name))
if fixed_id not in (self.name, self.uuid):
export["fixed_id"] = fixed_id
export["template_type"] = self.template_type if self.template_type else "standard"
export["chain"] = []
transition_map = {}
for transition in self.transitions:
transition_map.setdefault(transition.workflow_chain_member_from, []).append(transition)
member_map = {x.uuid: x for x in self.members}
swimlanes = self.meta.get("swimlanes", {"mapping": [], "definitions": []})
# "mapping" is a list of dicts, not a map... so make it a map. :/
swimlane_mapping = {x["id"]: x["row"] for x in swimlanes.get("mapping", [])}
if swimlanes and swimlanes.get("definitions"):
export["swimlanes"] = raw(swimlanes["definitions"])
for member in self.nodes:
mexport = {}
if member.uuid == self.head:
mexport["head"] = True
if member.start:
mexport["startable"] = member["start"]
position = member.position
if position:
mexport["position"] = raw(position)
fixed_id = member.fixed_id
if fixed_id != member.name and fixed_id != member.uuid:
mexport["fixed_id"] = fixed_id
member_id = fixed_id
else:
member_id = member.name
if versioned:
mexport["uuid"] = member.uuid
wf = member.workflow # member is now a WorkflowChainNode, which handles resolving the right version
wf_id = wf.name if wf.fixed_id == wf.uuid else wf.fixed_id
if deep:
mexport["workflow"] = wf.export(deep=deep, filter_empty=filter_empty, versioned=versioned)
elif member_id != wf_id:
mexport["workflow"] = wf_id
if member.uuid in transition_map:
mexport["to"] = []
for transition in transition_map[member.uuid]:
texport = self._export_transition(transition, member_map, versioned)
mexport["to"].append(texport)
# collapse simple transitions.
if all(self._simple_transition(x) for x in mexport["to"]):
if len(mexport["to"]) == 1:
mexport["to"] = list(mexport["to"][0].keys())[0]
else:
mexport["to"] = [list(x.keys())[0] for x in mexport["to"]]
if member.uuid in swimlane_mapping:
mexport["swimlane"] = swimlane_mapping[member.uuid]
if mexport:
export["chain"].append({member.name: mexport})
else:
export["chain"].append(member.name)
if self.signature_flows and deep:
# export as a list for consistency with protocol.
export["signature_flows"] = [
self._export_signature_flow(sf, deep=deep, versioned=versioned) for sf in self.signature_flows
]
if self.pipelines and deep:
export["pipelines"] = [p.export(deep=deep, versioned=versioned) for p in self.pipelines]
if versioned:
export["uuid"] = self.uuid
export["def_uuid"] = self.def_uuid
return export
def _export_signature_flow(self, flow, deep, versioned):
# note: flow definition should already be correctly resolved...
from .signatureflow import SignatureFlowDefinition
if versioned:
sf_def = SignatureFlowDefinition(flow._version)
elif hasattr(flow, "head"):
sf_def = SignatureFlowDefinition(flow.head)
else:
sf_def = sorted(flow.versions, key=lambda x: x.created_timestamp)[-1]
export = sf_def.export(deep=deep, versioned=versioned)
if versioned:
export_dict = list(export.values())[0]
export_dict["def_uuid"] = sf_def.uuid
export_dict["uuid"] = flow.uuid
return export
@cached.tag("refresh")
def signature_flows(self):
from .signatureflow import SignatureFlow
flows = []
added_flows = set()
snapshot = self._snapshot or {}
# unlike other props, this is snapshotted directly as a key: dict,
# instead of a key: list_of_dicts
sf_snaps = snapshot.get("signature_flows", [])
sf_snap_old = snapshot.get("signature_flow", {})
if sf_snap_old and not sf_snaps:
sf_snaps = [sf_snap_old]
sf_snaps = {x["uuid"]: x for x in sf_snaps}
for var in self.variables:
if var.var_type != "signatureflow":
continue
meta = var["meta"]
sig_flow_uuid = meta["signature_flow_uuid"]
sf_snap = sf_snaps.get(sig_flow_uuid, {})
version = sf_snap.get("def_uuid", None)
if version is None or version not in added_flows:
flows.append(SignatureFlow(sig_flow_uuid, version=version))
added_flows.add(sig_flow_uuid if version is None else version)
return flows
@cached.tag("refresh")
def pipelines(self):
"""
Return all pipeline objects associated with pipeline button fields.
"""
from .analysis import Pipeline
pipelines = []
snapshot = self._snapshot or {}
pipe_snap_old = snapshot.get("pipeline", {})
pipe_snaps = snapshot.get("pipeliens", [])
if pipe_snap_old and not pipe_snaps:
pipe_snaps = [pipe_snap_old]
snapshots = {x["uuid"]: x for x in pipe_snaps}
for var in self.variables:
if var.var_type == "pipelinebutton":
pipe_uuid = var.meta.pipeline
pipelines.append(
Pipeline(
pipe_uuid,
version=snapshots.get(pipe_uuid, {}).get("def_uuid"),
snapshot=snapshots.get(pipe_uuid),
)
)
return pipelines
@cached.tag("refresh")
def nodes(self):
snapshot = self._snapshot or {}
wf_snap = {x["uuid"]: x for x in snapshot.get("workflows", [])}
return [WorkflowChainNode(x["uuid"], data=x, snapshot=wf_snap.get(x["workflow"], {})) for x in self.members]
[docs] def pin(self, version_name: str) -> None:
"""
Pin a workflow chain.
Args:
version_name: The name to give the pinned version
"""
base.SESSION.put(f"/api/{self.__api__}/{self.uuid}/pin/{version_name}")
class WorkflowChainNode(BaseModel):
@property
def workflow(self):
version = self._snapshot.get("def_uuid")
return Workflow(self.data.workflow, version=version, snapshot=self._snapshot)
@property
def start(self):
return self.data.get("start", False)
@property
def position(self):
return self.data.get("meta", {}).get("position", {})
@property
def fixed_id(self):
return self.data.get("fixed_id", self.data.get("barcode", self.name))
def fix_meta_for_flex(config):
if isinstance(config, dict):
for key in list(config.keys()):
if key == "default_val":
config["value"] = config.pop(key)
elif isinstance(config[key], dict) or isinstance(config[key], list):
fix_meta_for_flex(config[key])
elif isinstance(config, list):
for entry in config:
fix_meta_for_flex(entry)