Source code for esp.models.workflow

# -*- coding: utf-8 -*-
#
# Workflow-related models.
#
# ------------------------------------------------


# imports
# -------
import datetime
import hashlib
import json
import logging
import random
from urllib.parse import quote_plus
import uuid

import dateparser
from gems import cached, composite
import six

import esp.base as base
from .exception import MissingVersionError
from .sample import SampleType, EntityType, WorkflowableClass
from .__base__ import BaseModel, UUID_REMAP_CACHE
from .__base__ import (
    list_dispatch,
    create_and_return,
    export_variable_definitions,
    push_variable_definitions,
    create_and_return_uuid,
    yaml_str_list,
    yaml_str,
    raw,
    export_fixed_id,
)
from esp.utils import is_uuid, data_to_uuid


def _import_signature_flow(flow, overwrite, allow_snapshot_uuid_remap):
    from .signatureflow import SignatureFlow, SignatureFlowDefinition

    # sf, version_was_resolved = SignatureFlow._resolve_existing(flow, allow_snapshot_uuid_remap)
    # if sf.exists():
    #     flow.pop('uuid', None)
    #     if version_was_resolved:
    #         return flow

    name = flow["name"]
    uuid = flow.get("uuid")
    def_uuid = flow.get("def_uuid")
    if uuid is None:
        sf = SignatureFlow(name)
    else:
        if def_uuid is not None:
            try:
                sf = SignatureFlow(uuid, version=def_uuid)
                # force resolution of data so we can catch
                # the missing version error here.
                sf.exists()
            except MissingVersionError:
                sf = SignatureFlow(uuid)
        else:
            sf = SignatureFlow(uuid)
    if sf.exists():
        flow["uuid"] = sf["uuid"]
        if uuid is not None:
            UUID_REMAP_CACHE[uuid] = sf["uuid"]
        # update signature (create new version)
        create_and_return(SignatureFlow, flow, overwrite=overwrite, allow_snapshot_uuid_remap=allow_snapshot_uuid_remap)
    else:
        new_signature_flow = create_and_return(
            SignatureFlow, flow, overwrite=overwrite, allow_snapshot_uuid_remap=allow_snapshot_uuid_remap
        )
        flow["uuid"] = new_signature_flow["uuid"]
        UUID_REMAP_CACHE[flow["uuid"]] = flow["uuid"]
    return flow


def _import_pipeline_with_snapshot(pipeline, snapshot, overwrite, allow_snapshot_uuid_remap):
    from .analysis import Pipeline

    pipe_snaps = {x["uuid"]: x for x in snapshot.get("pipelines", [])}
    if "uuid" in pipeline and pipeline["uuid"] in pipe_snaps:
        pipeline["snapshot"] = pipe_snaps[pipeline["uuid"]]
    create_and_return(Pipeline, pipeline, overwrite=overwrite, allow_snapshot_uuid_remap=allow_snapshot_uuid_remap)


# models
# ------
[docs]class Workflow(BaseModel): """ Object for interacting with Workflows from the ESP database. See the `Usage <./usage.html>`_ and `Examples <./examples.html>`_ pages of the documentation for more context and comprehensive examples of how to create and use this type of objects. Configuration: Simple workflow: .. code-block:: yaml name: Illumina Sequencing desc: Workflow for filling in data during illumina sequencing. tags: [demo, test] protocols: - Set Samples - Create Illumina Library - Analyze Sequencing Results Workflow with embedded protocols: .. code-block:: yaml name: Illumina Sequencing desc: Workflow for filling in data during illumina sequencing. tags: [demo, test] protocols: - Set Samples: protocol: standard variables: - Attachment: rule: attachment - Note: rule: string - Create Illumina Library: desc: Create Illumina library and assign index codes. protocol: sample sample: Illumina Library relation: 1-to-1 - Analyze Sequencing Results: protocol: pipeline group: All pipeline: Miseq Analysis Pipeline Workflow with sample types (embedded and referenced): .. code-block:: yaml name: Illumina Sequencing desc: Workflow for filling in data during illumina sequencing. tags: [demo, test] sample_types: - Generic sample - Illumina Library: desc: Sample type for illumina library. sequences: - ESP SEQUENCE protocols: - Set Samples - Create Illumina Library - Analyze Sequencing Results Workflow with data links and sample groupings: .. code-block:: yaml name: Illumina Sequencing desc: Workflow for filling in data during illumina sequencing. tags: [demo, test] links: Analyze Sequencing Results: Instrument: '{{ column_value("Instrument", "Create Illumina Library") }}' groups: Analyze Sequencing Results: Set Samples sample_types: - Illumina Library protocols: - Set Samples - Create Illumina Library - Analyze Sequencing Results Configuration Notes: * See the ESP documentation for information about data linking, sample groups, and available protocol configuration options. * More information on types of protocol configuration can be found in the Protocol object documentation. * Nesting configuration for protocols is usually an easier way of defining configuration for protocols that won't be used across workflows. Examples: .. code-block:: python >>> from esp.models import Workflow >>> wf = Workflow('Illumina Sequencing') >>> wf.name, wf.created_at ('Illumina Sequencing', '2019-06-21T16:04:01.199076Z') >>> # show relationships >>> wf.experiments [<Experiment(name=Experiment 1)>, <Experiment(name=Experiment 2)>] >>> wf.protocols [<Protocol(name=Set Samples)>, <Protocol(name=Create Illumina Library)>, <Protocol(name=Analyze Sequencing Results)>] >>> wf.sample_types [<SampleType(name=Illumina Library)>] Arguments: ident (str): Name or uuid for object. """ __api__ = "workflows" __api_cls__ = "Workflow" __version_api__ = "workflow_definitions" __mutable__ = BaseModel.__mutable__ + [ "template_type", "sample_types", "sample_group_indices", "parent_group_indices", "protocols", "fixed_id", ] __versioned_exportable__ = ["uuid", "def_uuid", "snapshot"] @property def snapshot(self): return self.meta.get("snapshot", {}) def _push_sample_group_indices(self): """ Helper method for formatting sample_group indices. """ extra = [self.sample_group_indices[-1]] * (len(self.protocols) - len(self.sample_group_indices)) return raw(self.sample_group_indices) + extra def _push_parent_group_indices(self): """ Helper method for formatting parent_group indices. """ extra = [self.parent_group_indices[-1]] * (len(self.protocols) - len(self.parent_group_indices)) return raw(self.parent_group_indices) + extra __push_format__ = { # account for changes to protocol array "sample_group_indices": _push_sample_group_indices, "parent_group_indices": _push_parent_group_indices, "sample_types": lambda x: list( map(lambda y: {"uuid": y.uuid, "def_uuid": y.def_uuid, "name": y.name}, x.sample_types) ), "protocols": lambda x: list(map(lambda y: y.uuid, x.protocols)), } def _export_protocols(self, filter_empty, versioned): if self._snapshot is None: return [p.export(deep=True, filter_empty=filter_empty, versioned=versioned) for p in self.protocols] protocol_snapshots = {x["uuid"]: x for x in self._snapshot.get("protocols", [])} ret = [] for p in self.protocols: # only used for deep export, so deep is true. prot_snapshot = protocol_snapshots.get(p.uuid) if prot_snapshot: p = Protocol(p.uuid, version=prot_snapshot["def_uuid"], snapshot=prot_snapshot) ret.append(p.export(deep=True, filter_empty=filter_empty, versioned=versioned)) return ret __exportable__ = [x for x in __mutable__ if x != "barcode"] + [ "links", "sequence_only", "sequence", "fixed_id", "template_type", ] __deep_format__ = {"protocols": _export_protocols} def _export_sample_group_indices(self): """convert from a list of indices to a dict where the key is the protocol that should be using a sample set and the value is the protocol that created the sample set.""" samplesets = self.sample_group_indices sampleset, sampleset_to_protocol = 0, {} sampleset_to_protocol[sampleset] = yaml_str(self.protocols[0].name) for protocol in self.protocols: if protocol.protocol_type == "sample": sampleset += 1 sampleset_to_protocol[sampleset] = yaml_str(protocol.name) ret = {} for index, sampleset in enumerate(samplesets): ret[yaml_str(self.protocols[index].name)] = yaml_str(sampleset_to_protocol[sampleset]) return ret def _export_parent_group_indices(self): """convert from a list of indices to a dict where the key is the protocol that should be using a sample set and the value is the protocol that created # the sample set.""" ret = {} for index, parentgroup in enumerate(self.parent_group_indices): ret[yaml_str(self.protocols[index].name)] = parentgroup return ret def _export_datalinks(self): pname_to_links = {} for i, links in enumerate(self.columns): if not links: continue pname_to_links[yaml_str(self.protocols[i].name)] = {yaml_str(k): yaml_str(v) for k, v in links.items()} return pname_to_links def _export_meta(self): meta = self.meta if isinstance(self.meta, composite): meta = meta.json() meta.pop("sequence", None) meta.pop("sequence_only", None) return meta def _export_sequence_only(self): sonly = self.meta.get("sequence_only", False) seq = self.meta.get("sequence", "") if seq: return sonly return None __export_format__ = { "template_type": lambda x: yaml_str(x.template_type), "protocols": lambda x: [yaml_str(i.name) for i in x.protocols], "sample_types": lambda x: [yaml_str(i.name) for i in x.sample_types], "sample_group_indices": _export_sample_group_indices, "links": _export_datalinks, "meta": _export_meta, "sequence": lambda x: x.meta.get("sequence", []), "sequence_only": _export_sequence_only, "fixed_id": export_fixed_id, "parent_group_indices": _export_parent_group_indices, }
[docs] @classmethod def parse_import(cls, config, overwrite=False, allow_snapshot_uuid_remap=False): """ Create new object in ESP database using config file or other data. This method should be overwritten by subclasses of LinkedModel for model-specific logic. Args: config (str, dict, list): Config file or information to use in creating new object. overwrite (bool): Whether or not to delete current entry in the ESP database. """ # manage required sample types? from .sample import WorkflowableType snapshot = config.get("snapshot", {}) # workflows can receive a snapshot directly, from self-pinning, # or they can receive a snapshot from a higher-level pinned WFC. # In the former case, the snapshot will have a name. In the latter # case, it won't. We only send the snapshot if it has a name. if snapshot and "name" not in snapshot: config.pop("snapshot") entity_types = config.pop("entity_types", config.pop("sample_types", [])) if entity_types: config["sample_types"] = create_and_return_uuid(WorkflowableType, entity_types, overwrite=overwrite) if not isinstance(config["sample_types"], (list, tuple)): config["sample_types"] = [config["sample_types"]] else: config["sample_types"] = [] config["template_type"] = config.get("template_type", "standard") # create protocols if "protocols" not in config or len(config["protocols"]) == 0: raise AssertionError('Workflow "{}" must have at least one protocol to be created!'.format(config["name"])) if snapshot: prot_snaps = {x["uuid"]: x for x in snapshot.get("protocols", [])} for protocol in config["protocols"]: if "uuid" in protocol and protocol["uuid"] in prot_snaps: protocol["snapshot"] = prot_snaps[protocol["uuid"]] protocols = create_and_return( Protocol, config["protocols"], overwrite=overwrite, allow_snapshot_uuid_remap=allow_snapshot_uuid_remap ) config["protocols"] = [obj.uuid for obj in protocols] # add data linking if "links" in config: links = config.pop("links") cols = [] for obj in protocols: if obj.name in links: cols.append(links[obj.name]) else: cols.append({}) config["columns"] = cols # reassign sample group index using dictionary input (api niceness) group_indices = config.get("sample_group_indices", {}) if isinstance(group_indices, dict): idx, indices = 0, {} for p in protocols: if p.protocol_type == "sample": idx += 1 indices[p.name] = idx for key in group_indices: indices[key] = indices[group_indices[key]] config["sample_group_indices"] = [indices[obj.name] for obj in protocols] # set parent group indices as a list where list index is protocol index and value is zero-indexed # sample group of parent. empty list results in each sample protocol using the previous sample group parent_group_indices = config.get("parent_group_indices", {}) if isinstance(parent_group_indices, dict): parent_group_indices_list = [] if parent_group_indices: for protocol in protocols: if protocol.name in parent_group_indices: parent_group_indices_list.append(parent_group_indices[protocol.name]) else: parent_group_indices_list.append(0) config["parent_group_indices"] = parent_group_indices_list # create workflow and add protocols return config
[docs] def drop(self, deep=False): """ Drop object from ESP database. """ protocols = self.protocols super(Workflow, self).drop() if deep: for protocol in protocols: try: protocol.drop(deep=deep) except: logging.info("Cannot drop nested protocol {}. Used by multiple workflows.".format(protocol.name)) continue return
@cached def experiments(self): """ Property for fluidly accessing ``Experiment`` objects from metadata (only returns experiments associated with the current workflow version) """ from .project import Experiment res = base.SESSION.get("/api/workflow_instances", params=dict(workflow_name=self.name)).json() return [Experiment(item["uuid"], data=item) for item in res if item["workflow_uuid"] == self.def_uuid]
[docs] def all_experiments(self): """ Returns: All experiments for this workflow, regardless of workflow version """ from .project import Experiment res = base.SESSION.get("/api/workflow_instances", params=dict(workflow_name=self.name)).json() return [Experiment.from_data(item) for item in res]
@cached def protocols(self): """ Property for fluidly accessing ``Protocol`` objects from metadata. """ snapshot = self._snapshot or {} protocol_snap = {x["uuid"]: x for x in snapshot.get("protocols", [])} return [ Protocol( proto.uuid, version=protocol_snap.get(proto.uuid, {}).get("def_uuid"), snapshot=protocol_snap.get(proto.uuid), ) for proto in self.data.protocols ] @cached def sample_types(self): """ Property for fluidly accessing ``SampleType`` objects from metadata. """ from .sample import EntityType return [EntityType(stype.def_uuid) for stype in self.data.sample_types] @cached def sample_type(self): """ Property for accessing single sample type object from Workflow (if only one sample type is used for the workflow). """ if len(self.sample_types) != 1: raise AssertionError( "Workflow {} has multiple sample types. " "Use self.sample_types instead.".format(self.name) ) return self.sample_types[0]
[docs] @list_dispatch def add(self, item): """ Add new objects (Protocol or SampleType) to existing workflow. Args: item (object): exsiting Protocol or SampleType object to add to workflow. Examples: >>> wf = Workflow('Test Workflow') >>> print(wf.protocols) ['<Protocol(name=Set Samples)>', '<Protocol(name=Run Analysis)>'] >>> >>> pc = Protocol('Summarize Results') >>> wf.add(pc) >> print(wf.protocols) ['<Protocol(name=Set Samples)', '<Protocol(name=Run Analysis)>', '<Protocol(name=Summarize Results)>'] """ from .sample import SampleType if isinstance(item, SampleType): self.sample_types.append(item) self.push() elif isinstance(item, Protocol): self.protocols.append(item) self.push() else: raise AssertionError("No rule for adding type {} to Workflow!".format(item.__class__.__name__)) return
[docs] @list_dispatch def remove(self, item): """ Remove new objects (Protocol or SampleType) from an existing workflow. Args: item (object): exsiting Protocol or SampleType object to remove from workflow. Examples: >>> wf = Workflow('Test Workflow') >> print(wf.protocols) ['<Protocol(name=Set Samples)', '<Protocol(name=Run Analysis)>', '<Protocol(name=Summarize Results)>'] >>> >>> pc = Protocol('Summarize Results') >>> wf.remove(pc) >>> print(wf.protocols) ['<Protocol(name=Set Samples)>', '<Protocol(name=Run Analysis)>'] >>> """ from .sample import SampleType if isinstance(item, SampleType): self.sample_types = list(filter(lambda x: x.name != item.name, self.sample_types)) self.push() elif isinstance(item, Protocol): self.protocols = list(filter(lambda x: x.name != item.name, self.protocols)) self.push() else: raise AssertionError("No rule for removing type {} from Workflow!".format(item.__class__.__name__)) return
[docs]class Protocol(BaseModel): """ Object for interacting with Protocols from the ESP database. See the `Usage <./usage.html>`_ and `Examples <./examples.html>`_ pages of the documentation for more context and comprehensive examples of how to create and use this type of objects. Configuration: Standard protocol: .. code-block:: yaml name: Set Samples protocol: standard desc: Example standard protocol with a few columns. tags: - illumina - sequencing variables: - Attachment Column: rule: attachment - Container Column: rule: location container: Freezer - Dropdown Column: rule: dropdown dropdown: - foo - bar - Date Column: rule: date - Note: rule: string Sample protocol with expressions: .. code-block:: yaml name: Create Illumina Library desc: Example sample protocol with expression columns. protocol: sample sample: Illumina Library relation: 1-to-1 variables: - Sample Type: rule: string onchange: |+ alert('Changed Sample Type') - Instrument: rule: string value: MiSeq, HiSeq 2000/2500 - Index Type: rule: dropdown required: true dropdown: '{{ ilmn_kits() }}' - I7 Index ID: rule: dropdown required: true dropdown: '{{ ilmn_adapter_names(column_value("Index Type"), position="i7") }}' - I7 Index: rule: string read_only: true value: '{{ ilmn_adapter_seq(column_value("Index Type"), column_value("I7 Index ID")) }}' - I5 Index ID: rule: dropdown required: true dropdown: '{{ ilmn_adapter_names(column_value("Index Type"), position="i5") }}' - I5 Index: rule: string read_only: true value: '{{ ilmn_adapter_seq(column_value("Index Type"), column_value("I5 Index ID"), position="i5") }}' Pipeline protocol with grouping: .. code-block:: yaml name: Analyze Sequencing Results protocol: pipeline desc: Run bioinformatics pipelines to analyze sequencing data. group: Reference pipeline: Miseq Analysis Pipeline variables: - Sample: rule: string visible: false pipeline_param: true value: '{{ sample["name"] }}' - Reference: rule: dropdown pipeline_param: true default: GRCh38 dropdown: - GRCh37 - GRCh38 Pipeline protocol with embedded pipeline: .. code-block:: yaml name: Analyze Sequencing Results protocol: pipeline desc: Run bioinformatics pipelines to analyze sequencing data. group: All pipeline: Miseq Analysis Pipeline: desc: Analyze sequenced library with MiSeq analysis pipeline. tasks: - BWA Align: desc: Align fastq files to reference. cmd: "echo 'This is a bam created with the command: bwa mem {{ Sample }}.fq.gz' > {{ Sample }}.bam" files: - Aligned BAM: file_type: text filename_template: "{{ Sample }}.bam" - GATK Unified Genotyper: desc: Use GATK to genotype bam file. cmd: "echo 'This is a vcf created with the command: gatk UnifiedGenotyper -I {{ Sample }}.bam -R /references/{{ Reference }}.fa' > {{ Sample }}.vcf" files: - Genotype VCF: file_type: text filename_template: "{{ Sample }}.vcf" deps: GATK Unified Genotyper: BWA Align Protocol with inheritance: .. code-block:: yaml name: Set Samples Override inherit: Set Samples desc: Inherited protocol with extra columns. variables: - Extra Column: rule: numeric Configuration Notes: * See the ESP documentation for information about type types of columns and associated metadata available to use as protocol variables. * Using the ``group`` parameter on protocol definitions allows developers to group columns by a specified column or value. * The sample protocol ``relation`` property can take either ``1-to-1``, ``fan-out``, or ``fan-in``. * Protocol inheritance is useful for creating slightly modified copies of existing protocols without needing to re-define variable configuration. All parameters are copied over from the inherited protocol. * Nesting configuration for pipelines is usually an easier way of defining configuration for pipelines that won't be used across protocols. Examples: .. code-block:: python >>> from esp.models import Protocol >>> pc = Protocol('Analyze Sequencing Results') >>> pc.name, pc.created_at ('Analyze Sequencing Results', '2019-06-21T16:04:01.199076Z') >>> # show relationships >>> pc.pipeline <Pipeline(name=Analyze Sequencing Results)> Arguments: ident (str): Name or uuid for object. """ __api__ = "protocols" __api_cls__ = "Protocol" __version_api__ = "protocol_definitions" __defaults__ = { "variables": [], "parent_child_relation": "1-to-1", } __var_keys__ = [ "meta", "name", "default_val", "dropdown_expr", "in_sample_sheet", "pipeline_param", "var_type", "required", "read_only", "dropdown", "source", "onchange", ] __mutable__ = BaseModel.__mutable__ + [ "protocol_type", "template_type", "parent_child_relation", "sample_type_uuid", "pipeline_uuid", "variables", "group", "instructions", "fixed_id", ] __versioned_exportable__ = ["uuid", "def_uuid"] __push_format__ = { "sample_type_uuid": lambda x: x.sample_type.uuid if x.protocol_type == "sample" else None, "pipeline_uuid": lambda x: x.pipeline.uuid if x.protocol_type == "pipeline" else None, "variables": lambda x: push_variable_definitions(raw(x.variables)), "onrender": lambda x: x.meta.get("code", None), } @property def __exportable__(self): ret = BaseModel.__base_exportable__ + [ "protocol", "template_type", "group", "onrender", "instructions", "fixed_id", "signature_flows", "sample_types", "sample_classes", ] if self.protocol_type == "sample": ret += ["sample", "relation"] elif self.protocol_type == "pipeline": ret += ["pipeline"] elif self.protocol_type == "standard": ret += ["steps"] if any(x["var_type"] == "pipelinebutton" for x in self.variables): ret += ["pipelines"] ret += ["variables"] return ret def _export_meta(self): meta_copy = dict(self.meta.json()) # code handled as onrender. meta_copy.pop("code", None) meta_copy.pop("parent_child_relation", None) meta_copy.pop("sample_type_uuid", None) meta_copy.pop("pipeline_input", None) meta_copy.pop("pipeline_run", None) meta_copy.pop("pipeline_uuid", None) return meta_copy def _export_variables(self, variables): def exclude(data): if data["var_type"] == "complete" and not data.get("onchange", data.get("meta", {}).get("onchange")): return True if self.protocol_type == "sample" and ( data["name"] in ["Parent ID", "Sample ID", "Note"] or (data["name"] == "Sample ID Sequence" and len(self.sample_type.sequences) < 2) ): return True # NOTE: this might change in the future if we convert # "run a pipeline protocol" into "run a pipeline column". if data["var_type"] == "pipeline_instance_uuid": return True return export_variable_definitions(variables, exclude) def _export_steps(self): ret = [] if "steps" not in self.data or not self.steps: return None if self.protocol_type != "standard": raise ValueError("Error: steps only allowed in standard protocol.") for step in self.steps: x = {} for key in ("desc",): if key in step and step[key]: x[yaml_str(key)] = yaml_str(step[key]) x["variables"] = self._export_variable_definitions(step["variables"]) ret.append(x) return ret def _export_signature_flows(self, versioned=False): from .signatureflow import SignatureFlow, SignatureFlowDefinition exported_flows = set() ret = [] snapshot = self._snapshot or {} sf_snaps = {x["uuid"]: x for x in snapshot.get("signature_flows", [])} for var in self.variables: # logging.info('var: {}'.format(jsons.dumps(var))) if var["var_type"] == "signatureflow": # TODO: Rework this export when the signature flow payload is reworked # to behavior more like other versioned objects. meta = var["meta"] sig_flow_uuid = meta["signature_flow_uuid"] sf_snap = sf_snaps.get(sig_flow_uuid, {}) signature_flow = SignatureFlow(sig_flow_uuid) version_uuid = sf_snap.get("def_uuid", signature_flow["head"]) if version_uuid not in exported_flows: signature_flow_def = SignatureFlowDefinition(version_uuid) export = signature_flow_def.export(deep=True, versioned=versioned) if versioned: export_dict = list(export.values())[0] export_dict["def_uuid"] = version_uuid export_dict["uuid"] = sig_flow_uuid ret.append(export) exported_flows.add(version_uuid) return ret def export_sample_types(self): ret = [] for var in self.variables: if var["var_type"] == "sample_point": sample_types = var["meta"].get("sample_types", []) for sample_type in sample_types: sample_type_obj = EntityType(sample_type["uuid"]) sample_type_added = [s for s in ret if sample_type_obj.name in s] if len(sample_type_added) == 0: sample_type_exp = sample_type_obj.export() ret.append(sample_type_exp) return ret def export_sample_classes(self): ret = [] for var in self.variables: if var["var_type"] == "sample_point": sample_types = var["meta"].get("sample_types", None) for sample_type in sample_types: sample_type_obj = EntityType(sample_type["uuid"]) sample_class = sample_type_obj.workflowable_class sample_class_added = [s for s in ret if sample_class.name in s] if len(sample_class_added) == 0: ret.append(sample_class.export()) return ret __shallow_format__ = { "protocol": lambda x: yaml_str(x.protocol_type), "template_type": lambda x: yaml_str(x.template_type), "sample": lambda x: yaml_str(x.sample_type.name), "relation": lambda x: yaml_str(x.parent_child_relation), "pipeline": lambda x: yaml_str(x.pipeline.name), "pipelines": lambda x: [yaml_str(pipe.name) for pipe in x.pipelines], "variables": lambda x: x._export_variables(x.variables), "group": lambda x: yaml_str(x.group) if x.group else None, "onrender": lambda x: x.meta.get("code", ""), "meta": _export_meta, "steps": _export_steps, "fixed_id": export_fixed_id, # TODO: I wouldn't export the signature flows to be exported # in a shallow export, only the reference to them. "signature_flows": _export_signature_flows, } __deep_format__ = { "pipeline": lambda x, versioned: x.pipeline.export(deep=True, versioned=versioned), "pipelines": lambda x, versioned: [pipe.export(deep=True, versioned=versioned) for pipe in x.pipelines], } __export_format__ = __shallow_format__ @property def __droppable__(self): # TODO: need a more efficient way to query this, but # generally this is only used for cleanup of tests. if not self.exists(): return False wfs = Workflow.all() for wf in wfs: if not wf.deleted and self.name in [x.name for x in wf.protocols]: return False return True
[docs] @classmethod def parse_import(cls, config, overwrite=False, allow_snapshot_uuid_remap=False): """ Create new object in ESP database using config file or other data. This method should be overwritten by subclasses of LinkedModel for model-specific logic. Args: config (str, dict, list): Config file or information to use in creating new object. overwrite (bool): Whether or not to delete current entry in the ESP database. """ from .sample import SampleType from .inventory import ItemType from .analysis import Pipeline from .signatureflow import SignatureFlow, SignatureFlowDefinition import markdown snapshot = config.pop("snapshot", {}) # manage protocol inheritance if "inherit" in config: inherit = config.pop("inherit") pc = Protocol(inherit) if not pc.exists(): raise AssertionError("Could not find existing protocol {} to inherit from!".format(inherit)) # TODO: change this process to an export, dict.update()/re-import # high-level metadata iconfig = { "name": config.get("name", pc.name), "desc": config.get("desc", pc.desc), "protocol_type": config.get("protocol_type", pc.protocol_type), "template_type": config.get("template_type", pc.template_type), "variables": list(map(lambda x: {k: x[k] for k in x if k in cls.__var_keys__}, pc.variables.json())), "group": config.get("group", pc.group), "tags": list(set(config.get("tags", []) + list(pc.tags))), } if "onrender" in pc.data or "onrender" in config: iconfig["onrender"] = config.pop("onrender", pc.data.get("onrender")) # updating variables reindex = {} for var in config.get("variables", []): changed = False if "index" in var: reindex[var["name"]] = var.pop("index") for idx, ivar in enumerate(iconfig["variables"]): if ivar["name"] == var["name"]: if var.get("delete", False): iconfig["variables"].pop(idx) else: iconfig["variables"][idx].update(var) changed = True break if not changed: iconfig["variables"].append(var) # reindexing variables (if specified) for key, idx in sorted(list(reindex.items()), key=lambda x: x[1], reverse=True): vnames = [x["name"] for x in iconfig["variables"]] cidx = vnames.index(key) var = iconfig["variables"].pop(cidx) iconfig["variables"].insert(idx, var) # other metadata if iconfig["protocol_type"] == "sample": iconfig["sample_type"] = config.pop("sample_type") if "sample_type" in config else pc.sample_type.name iconfig["parent_child_relation"] = ( config.pop("parent_child_relation") if "parent_child_relation" in config else pc.parent_child_relation ) if "sample_classes" in config: iconfig["sample_classes"] = config.pop("sample_classes", []) if "sample_types" in config: iconfig["sample_types"] = config.pop("sample_types", []) if iconfig["protocol_type"] == "pipeline": iconfig["pipeline"] = config.pop("pipeline") if "pipeline" in config else pc.pipeline.name if "signature_flows" in config: iconfig["signature_flows"] = config.pop("signature_flows") config = iconfig # set default protocol type (if not specified) config["protocol_type"] = config.get("protocol_type", "standard") # set default template type (if not specified) config["template_type"] = config.get("template_type", "standard") # The backend will break if the all grouping is not exactly the text # All, so protect ourselves against silly mistakes. if "group" in config and str(config["group"]).lower() == "all": config["group"] = "All" # assertions if "steps" in config and config["protocol_type"] != "standard": raise AssertionError("Error: steps only allowed in standard protocols!") # make better descriptions if config.get("desc") is not None and "\n" in config.get("desc"): config["desc"] = markdown.markdown(config["desc"].strip(), extensions=["extra"], tab_length=2) # interpret variable strings for idx, var in enumerate(config["variables"]): if isinstance(var, six.string_types): config["variables"][idx] = {"name": var, "var_type": "string"} # sample protocol logic if config["protocol_type"] == "sample": st = SampleType(config.pop("sample_type", "Generic sample")) if not st.exists(): # localized fix for APPS-7823. # Longer-term, we need to properly "fix" the client to be # aware of the fact that inventory items and containers are all # workflowable and that container and inventory item are now # specialized sample classes. Note, however, that container types # aren't currently supported for sample protocols. if is_uuid(st.ident): key = "uuid" else: key = "name" # note: as of 3.0 where this branch comes into play, name filtering # is exact match, case-insensitive. data = base.SESSION.get('/api/sample_types?clses=["ItemType"]&{}={}'.format(key, st.ident)).json() if not data: raise AssertionError( "Cannot create sample protocol with non-existent SampleType: {}".format(st.ident) ) st = SampleType.from_data(data[0]) # NOTE: BACKEND DOESN'T CREATE PROTOCOL-SPECIFIC REQUIRED # COLUMNS IMPLICITLY one_def = { "name": "Parent ID", "in_sample_sheet": True, "read_only": True, "pipeline_param": True, "var_type": "string", } group_def = { "name": "Group", "in_sample_sheet": True, "read_only": True, "pipeline_param": True, "var_type": "string", "default_val": "All", } required = [ { "name": "Sample ID Sequence", "in_sample_sheet": False, "read_only": True, "pipeline_param": True, "var_type": "text", "default_val": st.sequences[0], }, group_def if config["parent_child_relation"] == "fan-in" else one_def, { "name": "Sample ID", "in_sample_sheet": False, "read_only": True, "pipeline_param": True, "var_type": "string", }, { "name": "Note", "in_sample_sheet": True, "pipeline_param": True, "var_type": "string", }, ] rnames = [x["name"] for x in required] # Sample ID Sequence is a special case b/c the value can be any of the valid sample type sequences for # this sample type OR an expression that will be resolved at runtime to one of the valid sample type # sequences for this sample type. So if the user has specified Sample ID Sequence, take what they've got. var = [x for x in config["variables"] if x["name"] == "Sample ID Sequence"] if var and var[0].get("default_val"): required[0]["default_val"] = var[0]["default_val"] config["variables"] = required + list(filter(lambda x: x["name"] not in rnames, config["variables"])) config["sample_type_uuid"] = st.uuid # pipeline protocol logic if config["protocol_type"] == "pipeline": required = [{"name": "Start", "in_sample_sheet": True, "var_type": "pipeline_instance_uuid"}] rnames = [x["name"] for x in required] config["variables"] = required + list(filter(lambda x: x["name"] not in rnames, config["variables"])) pipeline_config = config.pop("pipeline") if not isinstance(pipeline_config, six.string_types) and snapshot and "pipeline" in snapshot: pipeline_config["snapshot"] = snapshot["pipeline"] config["pipeline_uuid"] = create_and_return_uuid( Pipeline, pipeline_config, overwrite=overwrite, allow_snapshot_uuid_remap=allow_snapshot_uuid_remap ) # create sample classes sample_classes = config.pop("sample_classes", []) for sample_class in sample_classes: create_and_return_uuid(WorkflowableClass, sample_class, overwrite=overwrite) # create sample types entity_types = config.pop("sample_types", []) if len(entity_types) > 0: for entity_type in entity_types: if entity_type.get("class", "") == "": entity_type["class"] = "Sample" create_and_return_uuid(EntityType, entity_type, overwrite=overwrite) # create signature flows signatureflows = config.pop("signature_flows", []) for signatureflow in signatureflows: create_and_return(SignatureFlow, signatureflow, overwrite, allow_snapshot_uuid_remap) # why special case this??? # _import_signature_flow( # signatureflow, # overwrite, # allow_snapshot_uuid_remap) # create pipelines. pipelines = config.pop("pipelines", []) pipe_snaps = {x["uuid"]: x for x in snapshot.get("pipelines", [])} for pipeline in pipelines: pipeline_id = pipeline if isinstance(pipeline, six.string_types) else pipeline.get('uuid') _import_pipeline_with_snapshot( pipeline, pipe_snaps.get(pipeline_id, {}), overwrite, allow_snapshot_uuid_remap=allow_snapshot_uuid_remap, ) # format specified variables config["variables"] = push_variable_definitions(config["variables"]) if "onrender" in config: config.setdefault("meta", {})["code"] = config.pop("onrender") # Because format_entry() in __base__.py renames "value" to "default_val", Flex view designer doesn't render on the frontend. # The key "value" in meta.report needs to stay "value" and not be default_val. It's a little ugly, but we need to go back # through meta and undo that change. We can't change format_entry because it affects way too much if "meta" in config and "report" in config["meta"]: fix_meta_for_flex(config["meta"]["report"]) return config
[docs] def drop(self, deep=False): """ Drop object from ESP database. """ pipeline = self.pipeline if self.protocol_type == "pipeline" else None super(Protocol, self).drop() if deep and pipeline: try: pipeline.drop(deep=deep) except: logging.info("Cannot drop nested pipeline {}. Used by multiple protocols.".format(pipeline.name)) return
@property def onrender(self): return self.meta.code @cached def sample_type(self): """ Return associated sampletype object. """ # Note: when fetching sample protocols from the server, the associated # sample type uuid currently points to a definition uuid rather than the # sample type uuid, so account for that. from .sample import SampleType if self.protocol_type != "sample": raise AttributeError("Cannot access sample_type property from non-sample protocol!") return SampleType(self.sample_type_uuid) @cached def pipeline(self): """ Return associated pipeline object. """ from .analysis import Pipeline if self.protocol_type != "pipeline": raise AttributeError(" Cannot access pipeline property from non-pipeline protocol!") snapshot = self._pipeline_snapshots.get(self.pipeline_uuid, {}) return Pipeline(self.pipeline_uuid, version=snapshot.get("def_uuid"), snapshot=snapshot) @cached.tag("refresh") def pipelines(self): """ Return all pipeline objects associated with pipeline button fields. """ from .analysis import Pipeline pipelines = [] snapshots = self._pipeline_snapshots for var in self.variables: if var.var_type == "pipelinebutton": pipe_uuid = var.meta.pipeline pipelines.append( Pipeline( pipe_uuid, version=snapshots.get(pipe_uuid, {}).get("def_uuid"), snapshot=snapshots.get(pipe_uuid), ) ) return pipelines @cached def _pipeline_snapshots(self): if self._snapshot is None: return {} return {x["uuid"]: x for x in self._snapshot.get("pipelines", [])} @cached def workflows(self): """ Query Workflows containing protocol. """ raise NotImplementedError
[docs]class WorkflowChain(BaseModel): """ Object for interacting with WorkflowChains from the ESP database. See the `Usage <./usage.html>`_ and `Examples <./examples.html>`_ pages of the documentation for more context and comprehensive examples of how to create and use this type of objects. Configuration: Simple workflow chain: .. code-block:: yaml name: Manufacturing Chain desc: Workflow chain for manufacturing process. tags: [demo, test] chain: - Initiate Manufacturing: to: - Process A - Process B - Process A: head: false workflow: My Process virtual: true - Process B: head: false workflow: My Process resubmit: True Workflow chain with conditionals and strategies: .. code-block:: yaml name: Manufacturing Chain desc: Workflow chain for manufacturing process. tags: [demo, test] chain: - Initiate Manufacturing: to: - Process A: if: '{{ column_value("Column 1") == "foo" }}' sample_type: Specimen - Process A: head: false to: Process B workflow: My Process strategy: named_strategy - Process B: head: false workflow: My Process Workflow chain with node positioning and swimlanes: .. code-block:: yaml Demo WFC 3: chain: - Node 1: head: true position: x: 290.75 y: 100 startable: true swimlane: sl-1641460171013 to: - Checkbox Test: if: '{{ True }}' - Checkbox Test: position: x: 570.8214285714287 y: 300 startable: true swimlane: sl-1641460181862 to: - Approval Instructions: if: '' swimlanes: - color: '#3d85c6' height: 200 id: sl-1641460171013 label: lane 1 y: 0 - color: '#ff9900' height: 200 id: sl-1641460181862 label: lane 2 y: 200 Configuration Notes: * See the ESP documentation for information about how to configure workflow transition strategies. * See the conditional transition logic above for an example of how to send subsets of samples to downstream nodes in a workflow chain. * More information on types of workflow configuration can be found in the Workflow object documentation. * Nesting configuration for workflows is usually an easier way of defining configuration for workflows that won't be used across workflow chains. * Note the on the backend, rules are evaluated for transitions in the context of the worksheet that triggers the transition, except there is no "current"/"active" protocol, so expressions like column_value must specify both the column and the protocol. Examples: .. code-block:: python >>> from esp.models import WorkflowChain >>> chain = WorkflowChain('Illumina Sequencing') >>> chain.name, chain.created_at ('Illumina Sequencing', '2019-06-21T16:04:01.199076Z') >>> # show relationships >>> chain.workflows [<Workflow(name=Initiate Manufacturing)>, <Workflow(name=Process A)>, <Workflow(name=Process B)>] Arguments: ident (str): Name or uuid for object. """ __api__ = "workflow_chains" __version_api__ = "workflow_chain_definitions" __mutable__ = BaseModel.__mutable__ + ["fixed_id", "template_type", "variables"] __defaults__ = {"variables": []} __push_format__ = {"variables": lambda x: push_variable_definitions(raw(x.variables))} @property def variables(self): """Returns the list of resource vars""" return self.resource_vars def _data_by_uuid(self): try: url = "/api/{}/{}?ignore_deleted=false".format(self.__api__, self.ident) return base.SESSION.get(url).json() except: return {}
[docs] @classmethod def parse_import(cls, config, overwrite=False, allow_snapshot_uuid_remap=False): """ Create new object in ESP database using config file or other data. This method should be overwritten by subclasses of LinkedModel for model-specific logic. Args: config (str, dict, list): Config file or information to use in creating new object. overwrite (bool): Whether or not to delete current entry in the ESP database. """ # assertions if isinstance(config, six.string_types): raise ValueError("Workflow Chain {} does not exist".format(config)) if "chain" not in config or not len(config["chain"]): raise ValueError("Chain has name but no definition!") chain_name = config["name"] config["template_type"] = config.get("template_type", "standard") snapshot = config.get("snapshot", {}) wf_snaps = {x["uuid"]: x for x in snapshot.get("workflows", [])} # create members members, transitions = [], [] head, xfrom, xto = 0, set(), set() swimlane_definitions = config.pop("swimlanes", []) swimlane_mapping_list = [] swimlanes = {"mapping": swimlane_mapping_list, "definitions": swimlane_definitions} for idx, member in enumerate(config.pop("chain", [])): # assert and normalize if isinstance(member, six.string_types): member = {"workflow": member} if "name" in member and "workflow" not in member: member["workflow"] = member["name"] if "workflow" not in member: raise AssertionError("Expected `workflow` property to be defined on WFC members.") workflow = member["workflow"] obj = None if isinstance(workflow, six.string_types): obj = Workflow.search(member["workflow"]) if obj: obj = obj[0] else: obj = None elif "uuid" in workflow and "def_uuid" in workflow: wf_snap = wf_snaps.get(workflow["uuid"]) try: obj = Workflow(workflow["uuid"], version=workflow["def_uuid"], snapshot=wf_snap) if not obj.exists(): obj = None if wf_snap: member["snapshot"] = wf_snap except MissingVersionError: obj = None if obj is None: obj = create_and_return( Workflow, member["workflow"], overwrite=overwrite, allow_snapshot_uuid_remap=allow_snapshot_uuid_remap, ) name = member.get("name", obj.name) if "position" in member: member.setdefault("meta", {})["position"] = member.pop("position") # save transitions to = member.get("to", []) if isinstance(to, six.string_types): to = [to] elif isinstance(to, dict): # case: to: contains a single entry. In this case, the auto-reformatting remaps it to {'name': 'foo', ...} if "name" in to: to = [to] # case: to: contains multiple entries. In this case, the reformatting does not remap it, so we do it. else: to = [dict(name=k, **to[k]) for k in to] elif not isinstance(to, (list, tuple)): raise ValueError("Unhandled transition format: {}".format(to)) for cfg in to: if isinstance(cfg, six.string_types): cfg = {"name": cfg} # basic configuration xfrom.add(name) xto.add(cfg["name"]) data = dict( workflow_chain_member_from=name, workflow_chain_member_to=cfg["name"], rule=cfg["if"] if "if" in cfg else "{{True}}", duration=cfg.get("duration", 0), ) if "uuid" in cfg: data["uuid"] = cfg["uuid"] # optional configuration if "virtual" in cfg: data["virtual"] = cfg["virtual"] if "strategy" in cfg: data["strategy"] = cfg["strategy"] elif "sample_type" in cfg: data["strategy"] = "createsample" data.setdefault("parameters", {})["sample_type"] = cfg.pop("sample_type") elif "resubmit" in cfg and cfg["resubmit"]: data["strategy"] = "resubmit" else: data["strategy"] = "identity" parameters = cfg.pop("parameters", data.get("parameters", {})) if data["strategy"] == "createsample": sample_type = cfg.pop("sample_type", parameters.pop("sample_type", None)) if not sample_type: raise ValueError("createsample strategy required sample_type!") st = SampleType(sample_type) if not st.exists(): raise ValueError(("Specified sample type `{}` does not " "exist!").format(st.ident)) parameters["sample_type_uuid"] = st.uuid if parameters: data["parameters"] = parameters transitions.append(data) # add to list if member.get("head", False): head += 1 fixed_id = member.get("fixed_id", member.get("barcode", name)) member = dict( workflow=obj.uuid, name=name, is_head=member.get("head", False), start=member.get("startable", member.get("start", False)), meta=member.get("meta"), swimlane=member.get("swimlane"), ) # For backwards compatibility with existing chains (b/c UUID is computed): # only add the fixed_id attribute if it is present and different from the name. if fixed_id != name: member["fixed_id"] = fixed_id # pre-generate uuid for member based on data # We add "chain_name" here to ensure a client-side-generated member can only belong to # a single WFC. member["chain_name"] = chain_name if "uuid" not in member: member["uuid"] = data_to_uuid(member) if "swimlane" in member: swimlane_mapping_list.append({"id": member["uuid"], "row": member.pop("swimlane")}) member.pop("chain_name") members.append(member) # check for head or infer if not head: heads = xfrom - xto if not heads: raise ValueError("Cyclic chains must declare `head` on one chain member.") # this might be slightly different than the original logic, which probably tolerated two nodes with no incoming transitions. elif len(heads) > 1: raise ValueError("Multiple head nodes detected. Only a single head node allowed.") head = list(heads)[0] [x for x in members if x["name"] == head][0]["is_head"] = True elif head > 1: raise ValueError("Multiple head nodes declared. Only a single head node allowed.") # re-configure transition names member_uuid = {x["name"]: x["uuid"] for x in members} for transition in transitions: transition["workflow_chain_member_from"] = member_uuid[transition["workflow_chain_member_from"]] transition["workflow_chain_member_to"] = member_uuid[transition["workflow_chain_member_to"]] transition["chain_name"] = chain_name if "uuid" not in transition: transition["uuid"] = data_to_uuid(transition) transition.pop("chain_name") config["members"] = members config["transitions"] = transitions config["swimlanes"] = swimlanes # check for variables. for idx, var in enumerate(config.get("variables", [])): if isinstance(var, six.string_types): config["variables"][idx] = {"name": var, "vary_type": "string"} config["resource_vars"] = push_variable_definitions(config.pop("variables", [])) # TODO: check signature flows and pipelines. Skip for now. pipe_snaps = {x["uuid"]: x for x in snapshot.get("pipelines", [])} for pipeline in config.pop("pipelines", []): _import_pipeline_with_snapshot( pipeline, pipe_snaps.get(pipeline.get("uuid"), {}), overwrite, allow_snapshot_uuid_remap ) from esp.models import SignatureFlow for flow in config.pop("signature_flows", []): # _import_signature_flow(flow, overwrite, allow_snapshot_uuid_remap) create_and_return(SignatureFlow, flow, overwrite, allow_snapshot_uuid_remap) return config
@cached def workflows(self): return [Workflow(x["workflow"]) for x in self.members] def _export_transition(self, t, members_map, versioned): if t.deleted: raise AssertionError("Should not have deleted transitions in Workflow Chain definition!") texport = {} texport["if"] = t["rule"] if versioned: texport["uuid"] = t["uuid"] texport["duration"] = t.get("duration", 0) strategy = t.meta.get("strategy", "identity") if strategy != "identity": texport["strategy"] = strategy if strategy == "createsample": texport["strategy"] = "createsample" # older versions of ESP (< 2.3.2) use t.meta['sample_type_uuid']. Newer versions (> 2.3.2) use # t.meta['parameters']['sample_type_uuid']. Support either in the client for now. # Also, we now have more parameters possible. parameters = {} if "parameters" in t.meta: parameters.update(t.meta["parameters"]) else: parameters["sample_type_uuid"] = t.meta.get("sample_type_uuid") parameters["lab7_id_sequence"] = t.meta.get("lab7_id_sequence") parameters["Number of Children"] = t.mget.get("Number of Children", 1) sample_type_uuid = parameters.pop("sample_type_uuid", None) if not sample_type_uuid: raise ValueError("Should have sample_type_uuid in transition meta or transition.meta.parameters!") stype = SampleType(sample_type_uuid) parameters["sample_type"] = stype.name if not parameters.get("lab7_id_sequence"): seqs = stype.sequences if seqs: parameters["lab7_id_sequence"] = seqs[0] else: parameters.pop("lab7_id_sequence", None) texport["parameters"] = parameters elif t.meta.get("parameters"): texport["parameters"] = t.meta["parameters"] if t.desc: texport["desc"] = t.desc if t.tags: texport["tags"] = t.tags if t.virtual: texport["virtual"] = True return {members_map[t.workflow_chain_member_to].name: texport} def _simple_transition(self, t): attrib = list(t.values())[0] return ( "virtual" not in attrib and "if" not in attrib or attrib["if"] == "true" and "resubmit" not in attrib and "strategy" not in attrib and "sample_type" not in attrib and "desc" not in attrib and "tags" not in attrib ) # TODO: tech debt reduction: WFCs are gaining more properties. # It's worth creating a pseudo property "chain" and refactoring # WFC export to use the standard export code with a formatter # for the chain, where we don't directly export nodes or transition, # just the chain. def _export(self, deep, filter_empty=False, versioned=False): """Export a yaml-formatted workflow chain. Args: filename (str): Filename to write yaml output to deep (bool): Perform a deep (True) or shallow (False) export filter_empty (bool): Whether ot filter out empty properties. Note: A deep export embeds all nested objects (e.g.: protocols of a workflow). A deep export is therefore the yaml analog to the .lab7 file. A shallow export uses names as references to nested objects. This method ignores filter_empty and always filters to the simplest possible text representation of the data. """ # The backend wfc structure and the yaml config are different enough # that overriding _export makes the most sense. export = {} if versioned and self._snapshot is not None: export["snapshot"] = raw(self._snapshot) # grandfather 3.1 interim snapshot structure for SF. # TODO: Remove this code for 3.2. if "signature_flow" in export["snapshot"]: flow = export["snapshot"].pop("signature_flow") if flow and "signature_flows" not in export["snapshot"]: export["snapshot"]["signature_flows"] = [flow] export["name"] = self.name if self.desc: export["desc"] = self.desc if self.tags: export["tags"] = self.tags.json() if self.variables: export["variables"] = export_variable_definitions(self.variables) fixed_id = self.data.get("fixed_id", self.data.get("barcode", self.name)) if fixed_id not in (self.name, self.uuid): export["fixed_id"] = fixed_id export["template_type"] = self.template_type if self.template_type else "standard" export["chain"] = [] transition_map = {} for transition in self.transitions: transition_map.setdefault(transition.workflow_chain_member_from, []).append(transition) member_map = {x.uuid: x for x in self.members} swimlanes = self.meta.get("swimlanes", {"mapping": [], "definitions": []}) # "mapping" is a list of dicts, not a map... so make it a map. :/ swimlane_mapping = {x["id"]: x["row"] for x in swimlanes.get("mapping", [])} if swimlanes and swimlanes.get("definitions"): export["swimlanes"] = raw(swimlanes["definitions"]) for member in self.nodes: mexport = {} if member.uuid == self.head: mexport["head"] = True if member.start: mexport["startable"] = member["start"] position = member.position if position: mexport["position"] = raw(position) fixed_id = member.fixed_id if fixed_id != member.name and fixed_id != member.uuid: mexport["fixed_id"] = fixed_id member_id = fixed_id else: member_id = member.name if versioned: mexport["uuid"] = member.uuid wf = member.workflow # member is now a WorkflowChainNode, which handles resolving the right version wf_id = wf.name if wf.fixed_id == wf.uuid else wf.fixed_id if deep: mexport["workflow"] = wf.export(deep=deep, filter_empty=filter_empty, versioned=versioned) elif member_id != wf_id: mexport["workflow"] = wf_id if member.uuid in transition_map: mexport["to"] = [] for transition in transition_map[member.uuid]: texport = self._export_transition(transition, member_map, versioned) mexport["to"].append(texport) # collapse simple transitions. if all(self._simple_transition(x) for x in mexport["to"]): if len(mexport["to"]) == 1: mexport["to"] = list(mexport["to"][0].keys())[0] else: mexport["to"] = [list(x.keys())[0] for x in mexport["to"]] if member.uuid in swimlane_mapping: mexport["swimlane"] = swimlane_mapping[member.uuid] if mexport: export["chain"].append({member.name: mexport}) else: export["chain"].append(member.name) if self.signature_flows and deep: # export as a list for consistency with protocol. export["signature_flows"] = [ self._export_signature_flow(sf, deep=deep, versioned=versioned) for sf in self.signature_flows ] if self.pipelines and deep: export["pipelines"] = [p.export(deep=deep, versioned=versioned) for p in self.pipelines] if versioned: export["uuid"] = self.uuid export["def_uuid"] = self.def_uuid return export def _export_signature_flow(self, flow, deep, versioned): # note: flow definition should already be correctly resolved... from .signatureflow import SignatureFlowDefinition if versioned: sf_def = SignatureFlowDefinition(flow._version) elif hasattr(flow, "head"): sf_def = SignatureFlowDefinition(flow.head) else: sf_def = sorted(flow.versions, key=lambda x: x.created_timestamp)[-1] export = sf_def.export(deep=deep, versioned=versioned) if versioned: export_dict = list(export.values())[0] export_dict["def_uuid"] = sf_def.uuid export_dict["uuid"] = flow.uuid return export @cached.tag("refresh") def signature_flows(self): from .signatureflow import SignatureFlow flows = [] added_flows = set() snapshot = self._snapshot or {} # unlike other props, this is snapshotted directly as a key: dict, # instead of a key: list_of_dicts sf_snaps = snapshot.get("signature_flows", []) sf_snap_old = snapshot.get("signature_flow", {}) if sf_snap_old and not sf_snaps: sf_snaps = [sf_snap_old] sf_snaps = {x["uuid"]: x for x in sf_snaps} for var in self.variables: if var.var_type != "signatureflow": continue meta = var["meta"] sig_flow_uuid = meta["signature_flow_uuid"] sf_snap = sf_snaps.get(sig_flow_uuid, {}) version = sf_snap.get("def_uuid", None) if version is None or version not in added_flows: flows.append(SignatureFlow(sig_flow_uuid, version=version)) added_flows.add(sig_flow_uuid if version is None else version) return flows @cached.tag("refresh") def pipelines(self): """ Return all pipeline objects associated with pipeline button fields. """ from .analysis import Pipeline pipelines = [] snapshot = self._snapshot or {} pipe_snap_old = snapshot.get("pipeline", {}) pipe_snaps = snapshot.get("pipeliens", []) if pipe_snap_old and not pipe_snaps: pipe_snaps = [pipe_snap_old] snapshots = {x["uuid"]: x for x in pipe_snaps} for var in self.variables: if var.var_type == "pipelinebutton": pipe_uuid = var.meta.pipeline pipelines.append( Pipeline( pipe_uuid, version=snapshots.get(pipe_uuid, {}).get("def_uuid"), snapshot=snapshots.get(pipe_uuid), ) ) return pipelines @cached.tag("refresh") def nodes(self): snapshot = self._snapshot or {} wf_snap = {x["uuid"]: x for x in snapshot.get("workflows", [])} return [WorkflowChainNode(x["uuid"], data=x, snapshot=wf_snap.get(x["workflow"], {})) for x in self.members]
[docs] def pin(self, version_name: str) -> None: """ Pin a workflow chain. Args: version_name: The name to give the pinned version """ base.SESSION.put(f"/api/{self.__api__}/{self.uuid}/pin/{version_name}")
class WorkflowChainNode(BaseModel): @property def workflow(self): version = self._snapshot.get("def_uuid") return Workflow(self.data.workflow, version=version, snapshot=self._snapshot) @property def start(self): return self.data.get("start", False) @property def position(self): return self.data.get("meta", {}).get("position", {}) @property def fixed_id(self): return self.data.get("fixed_id", self.data.get("barcode", self.name)) def fix_meta_for_flex(config): if isinstance(config, dict): for key in list(config.keys()): if key == "default_val": config["value"] = config.pop(key) elif isinstance(config[key], dict) or isinstance(config[key], list): fix_meta_for_flex(config[key]) elif isinstance(config, list): for entry in config: fix_meta_for_flex(entry)