# -*- coding: utf-8 -*-
#
# Project-related models.
#
# ------------------------------------------------
# imports
# -------
from collections import defaultdict, OrderedDict
from functools import reduce
import json
import logging
import os
try:
from urllib.parse import quote_plus
except ImportError:
from urllib import quote_plus
from gems import cached, composite
from datetime import datetime, date
import six
import time
from .. import base
from ..utils import object_for_value, format_resourcelink_value, format_attachment_value, timeit
from .__base__ import BaseModel, LinkedModel
from .__base__ import raw, is_uuid, create_and_return, create_and_return_uuid
# models
# ------
[docs]class Project(BaseModel):
"""
Object for interacting with Projects from the ESP database.
See the `Usage <./usage.html>`_ and `Examples <./examples.html>`_ pages
of the documentation for more context and comprehensive examples of
how to create and use this type of objects.
Configuration:
Simple project:
.. code-block:: yaml
name: My Project
desc: Project for a series of experiments.
tags: [demo, test]
Project with nested experiments:
.. code-block:: yaml
name: My Project
desc: Project for a series of experiments.
tags: [demo, test]
experiments:
- My Experiment 1:
workflow: Workflow 1
submit: true
samples: [ESP001, ESP002]
Project with nested experiment chains:
.. code-block:: yaml
name: My Project
desc: Project for a series of experiments.
tags: [demo, test]
chains:
- My Experiment Chain 1:
chain: Workflow Chain 1
samples: [ESP001, ESP002]
Configuration Notes:
* Experiments and Experiment Chains can be nested within project configuration,
which is particularly useful for seeding data in bulk or creating test resources.
Examples:
.. code-block:: python
>>> from esp.models import Project
>>> project = Project('Project 1')
>>> project.name, project.created_at
('Project 1', '2019-06-21T16:04:01.199076Z')
>>> # show relationships
>>> project.experiments
[<Experiment(name=Experiment 1)>, <Experiment(name=Experiment 2)>]
>>> project.samples
[<Sample(name=ESP001)>, <Sample(name=ESP001)>, ...]
Arguments:
ident (str): Name or uuid for object.
"""
__api__ = "projects"
__api_cls__ = "Project"
__mutable__ = BaseModel.__mutable__ + []
__push_format__ = {}
__defaults__ = {"experiments": []}
[docs] @classmethod
def parse_import(cls, config, overwrite=False, allow_snapshot_uuid_remap=False):
"""
Create new object in ESP database using config file or other data.
This method should be overwritten by subclasses of LinkedModel for
model-specific logic.
Args:
config (str, dict, list): Config file or information to use in
creating new object.
overwrite (bool): Whether or not to delete current entry in
the ESP database.
"""
# store experiment and chain data
cls._experiments = config.pop("experiments", [])
cls._chains = config.pop("chains", [])
return config
[docs] @classmethod
def parse_response(cls, data, overwrite=False, prompt=False, allow_snapshot_uuid_remap=False):
"""
Parse response from server when creating new project.
This method is primarily responsible for creating nested
experiments once a project is created.
:param allow_snapshot_uuid_remap:
"""
# NOTE: BACKEND RETURNS {'id': UUID}
# INSTEAD OF PIPELINE DATA STRUCTURE.
self = cls.from_data(data)
uuid = data.get("uuid")
enames = list(map(lambda x: x.name, self.workflow_instances))
# create experiments
create = cls._experiments
del cls._experiments
if len(create) > 0 and uuid is not None:
for idx, item in enumerate(create):
if item["name"] in enames:
obj = self.experiments[enames.index(item["name"])]
if overwrite and "protocols" in create[idx]:
obj.fill(create[idx]["protocols"])
else:
create[idx]["project"] = self
Experiment.create(create[idx], overwrite=overwrite, prompt=prompt)
# create experiment chains
create = cls._chains
del cls._chains
if len(create) > 0 and uuid is not None:
for idx, item in enumerate(create):
create[idx]["project"] = self
ExperimentChain.create(create[idx], overwrite=overwrite, prompt=prompt)
return self
[docs] def drop(self, deep=False):
"""
Issue DELETE request to remove object in ESP database.
"""
if self.exists():
for exp in self.experiments:
exp.refresh()
# By default, fetching experiments by UUID returns the experiment even for archived experiments, so
# must check deleted flag in addition to exists.
if exp.exists() and not exp.deleted:
exp.drop(deep=deep)
return super(Project, self).drop()
[docs] def new_experiment(self, **data):
"""
Add new Experiment to existing workflow.
Args:
**data (object): Keyword arguments for creating experiment.
Examples:
>>> obj = Project('Test Workflow')
>>> print(obj.experiments)
[]
>>>
>>> exp = obj.new_experiment(name='test', samples=['one', 'two'])
>> print(obj.experiments)
['<Experiment(name=test)']
>>> print(exp.samples)
['<Sample(name=one)>',
'<Sample(name=two)>']
"""
if "workflow" not in data:
raise AssertionError("Error: workflow= must be defined in " "call to Project.new_experiment!")
data["project"] = self.data.uuid
return Experiment.create(data, overwrite=False)
@cached.tag("refresh")
def experiments(self):
"""
Property for fluidly accessing ``Experiment`` objects from metadata.
"""
return [Experiment(obj["uuid"]) for obj in self.workflow_instances]
@cached.tag("refresh")
def samples(self):
"""
Property for fluidly accessing ``Sample`` objects from projects.
"""
from .sample import Sample
res = base.SESSION.get("/api/projects/{}/samples".format(self.uuid)).json()
return [Sample(obj["sample_uuid"]) for obj in res["results"]]
[docs]class ExperimentChain(LinkedModel):
"""
Object for interacting with ExperimentChains from the ESP database.
See the `Usage <./usage.html>`_ and `Examples <./examples.html>`_ pages
of the documentation for more context and comprehensive examples of
how to create and use this type of objects.
Configuration:
Create simple chain:
.. code-block:: yaml
name: MF Chain 1
chain: Manufacturing Chain
project: Manufacturing
samples:
count: 3
type: Tissue
Create chain with nested experiments:
.. code-block:: yaml
name: MF Chain 1
chain: Manufacturing Chain
project: Manufacturing
samples:
count: 3
type: Tissue
workflows:
- Initiate Manufacturing
submit: true
transition: true
protocols:
- Initiate Manufacturing:
complete: true
data:
Start Date: 01Jan18
Signature: true
- Finish Manufacturing:
submit: true
protocols:
- Process A:
data:
Completion Date: 01Jan18
Signature: true
Configuration Notes:
* The ``workflows`` parameter in the config definition references nested
configuration for specific Experiment objects.
* To automatically transition across workflows via configuration, nodes
with possible transitions need to include the ``transition: true``
configuration (seen above).
* Experiment Chain configuration must include a referenced project, per
constraints on Experiment Chains in ESP.
Examples:
.. code-block:: python
>>> from esp.models import ExperimentChain
>>> chain = ExperimentChain('MF Chain 1')
>>> chain.name, chain.created_at
('MF Chain 1', '2019-06-21T16:04:01.199076Z')
>>> # show relationships
>>> chain.samples
[<Sample(name=ESP001)>, <Sample(name=ESP001)>, ...]
>>> chain.project
<Project(name=Project 1)>
>>> chain.experiments
[<Experiment(name=Experiment 1)>, <Experiment(name=Experiment 2)>]
>>> chain.chain
<WorkflowChain(name=Manufacturing Chain)>
>>> chain.workflows
[<Workflow(name=Initiate Manufacturing)>, <Workflow(name=Finish Manufacturing>]
>>> # helper methods
>>> chain.active
True
>>> chain.submit() # submit workflow chain
>>> chain.fill(...) # fill with workflows config above
Arguments:
ident (str): Name or uuid for object.
"""
__api__ = "workflow_chain_instances"
__api_cls__ = "WorkflowChainInstance"
__defaults__ = {
"samples": [],
"workflows": [],
}
[docs] @classmethod
@timeit(info="{config[chain]}:::{config[name]}")
def create(cls, config=None, overwrite=False, prompt=False, object_names=None, **kwargs):
return super().create(config, overwrite, prompt, object_names, **kwargs)
[docs] def drop(self, deep=False):
"""
Issue DELETE request to remove object in ESP database.
"""
if self.exists():
for exp in self.experiments:
exp.refresh()
# By default, fetching experiments by UUID returns the experiment even for archived experiments, so
# must check deleted flag in addition to exists.
if exp.exists() and not exp.deleted:
exp.drop(deep=deep)
return super(ExperimentChain, self).drop()
[docs] @classmethod
def parse_import(cls, config, overwrite=False, allow_snapshot_uuid_remap=False):
"""
Create new object in ESP database using config file or other data.
This method should be overwritten by subclasses of LinkedModel for
model-specific logic.
Args:
config (str, dict, list): Config file or information to use in
creating new object.
overwrite (bool): Whether or not to delete current entry in
the ESP database.
"""
from .workflow import WorkflowChain
# store sample data and sheet data
project = config.pop("project")
chain = config.pop("chain")
cls._workflows = config.pop("workflows", [])
cls._samples = config.pop("entities", config.pop("samples", []))
# assertions
if len(cls._samples) == 0:
raise AssertionError("Cannot submit a WorkflowChain without samples!")
# get workflow information
wfc = WorkflowChain(chain)
if not wfc.exists():
raise AssertionError("Cannot create an experiment tied to non-existent " "workflow chain {}!".format(chain))
config["workflow_chain_uuid"] = wfc.uuid
if "start_member" in config:
start_node = config.pop("start_member")
node = [x for x in wfc.members if x.name == start_node]
if not node:
raise AssertionError(
"Cannot create an experiment chain with non-existent start node `{}`!".format(start_node)
)
# should only match if found
if not node[0].start:
raise AssertionError("Cannot start an experiment chain from `{}`: this node is not startable")
config["start_member_uuid"] = node[0].uuid
# get project information
config["project_uuid"] = create_and_return_uuid(Project, project, overwrite=True)
# submit initial experiment
config["workflow_instance"] = {
"name": config["name"],
"submit": False,
}
return config
[docs] @classmethod
def parse_response(cls, data, overwrite=False, prompt=False, allow_snapshot_uuid_remap=False):
"""
Parse response from experiment creation and manage submission
and protocol data filling.
:param allow_snapshot_uuid_remap:
"""
from .sample import Sample, SampleType
self = cls.from_data(data)
workflows, samples = cls._workflows, cls._samples
del cls._workflows, cls._samples
# do sample creation for first wfi
# TODO: CONSOLIDATE WITH EXPERIMENT MODEL CODE
wf = self.experiments[0].workflow
for idx, sample in list(enumerate(samples)):
if isinstance(sample, str) and not is_uuid(sample):
if len(wf.sample_types) > 0:
typ = wf.sample_types[0]
else:
typ = SampleType("Generic sample")
samples[idx] = {"name": sample, "type": typ.name}
samples = create_and_return(Sample, samples, overwrite=False)
if not isinstance(samples, (list, tuple)):
samples = [samples]
self.experiments[0].samples = samples
self.experiments[0].push()
# fill workflow data
if len(workflows) > 0:
self.fill(workflows)
return self
[docs] def submit(self, sheet=True):
"""
Submit experiment to lab for processing.
"""
logging.info("Submitting Workflow Chain: {}".format(self.name))
self.experiments[0].submit(sheet=sheet)
return self
[docs] def fill(self, workflows):
"""
Fill experiment with protocol data.
"""
from .workflow import Workflow
# format input
if isinstance(workflows, (dict)):
workflows = [workflows]
# go through workflows and fill data
nmap = {item["name"]: Workflow(item["workflow"]) for item in self.chain.members}
processed = []
processed_items = [False] * len(workflows)
for i, item in enumerate(workflows):
for exp in self.experiments:
if item["name"] not in nmap:
raise AssertionError("Error: specified chain member {} not in chain.".format(item["name"]))
# process if the workflow matches and it hasn't been
# processed in this block (accounting for node name vs wf name differences)
wf = nmap[item["name"]]
if exp.workflow.name == wf.name and exp.uuid not in processed:
# manage submissions and specific types of sample sheet creation
if item.get("submit"):
exp = exp.submit(sheet=item.get("sheet", self.name + " - " + item["name"]))
# update experiment name
if item["name"] not in exp.name:
exp.name = self.name + " - " + item["name"]
exp.push()
# fill data and transition (if specified)
pc = item.get("protocols", [])
if exp.active:
if len(pc) > 0:
exp.fill(pc)
if item.get("transition"):
base.SESSION.put("/api/sample_sheets/{}/transition_chains".format(exp.sheet.uuid))
self.refresh()
processed.append(exp.uuid)
processed_items[i] = True
break
if not all(processed_items):
unprocessed = [x["name"] for processed, x in zip(processed_items, workflows) if not processed]
raise AssertionError("Unprocessed workflow entries: {}".format(", ".join(unprocessed)))
return self
@cached.tag("refresh")
def samples(self):
"""
Property for accessing initial sample objects in experiment.
"""
return self.experiments[0].samples
@cached.tag("refresh")
def project(self):
"""
Property for accessing experiment project.
"""
return self.experiments[0].project
@cached.tag("refresh")
def experiments(self):
"""
Property for accessing experiment project.
"""
exps = [Experiment(x["uuid"]) for x in self.members]
for exp in exps:
exp.experiment_chain = self
return exps
@cached.tag("refresh")
def chain(self):
"""
Property for accessing experiment workflow chain definition.
"""
from .workflow import WorkflowChain
return WorkflowChain.from_definition(self._data["workflow_chain"])
@cached.tag("refresh")
def workflows(self):
"""
Property for accessing experiment workflow definition.
"""
return self.chain.workflows
@property
def sample_types(self):
"""
Proxy for returning sample types from workflows.
"""
return self.workflows[0].sample_types
@property
def active(self):
"""
Check if experiment is currently active.
"""
if len(self.experiments) == 0:
return False
return self.experiments[0].state in ["running", "done"]
[docs]class Experiment(LinkedModel):
"""
Object for interacting with Experiments from the ESP database.
See the `Usage <./usage.html>`_ and `Examples <./examples.html>`_ pages
of the documentation for more context and comprehensive examples of
how to create and use this type of objects.
Configuration:
Create simple experiment:
.. code-block:: yaml
name: MS001
workflow: Illumina Sequencing
project: Miseq Sequencing
samples:
count: 3
type: Extract
Create experiment and fill in data:
.. code-block:: yaml
name: MS001
workflow: Illumina Sequencing
project: Miseq Sequencing
samples:
count: 2
type: Extract
protocols:
- Set Samples:
complete: true
data:
Attachment: ${CWD}/runsheet.txt
Note: ['Ready to go!', 'Ready!']
- Create Illumina Library:
complete: true
load: ${CWD}/spreadsheet.txt'
- Analyze Sequencing Results:
run: true
complete: true
data:
Container: 'MiSeq: Lane 1'
Reference: ['GRCh38', 'GRCh37']
Create experiment and fill in data with verification:
.. code-block:: yaml
name: MS001
workflow: Illumina Sequencing
project: Miseq Sequencing
samples:
count: 2
type: Extract
protocols:
- Set Samples:
complete: true
before:
verify:
Attachment: '{{ value is None }}'
data:
Attachment: ${CWD}/runsheet.txt
Note: ['Ready to go!', 'Ready!']
after:
verify:
Attachment: '{{ value is not None }}'
- Create Illumina Library:
complete: true
cols:
- Index Type
- I7 Index ID
- I5 Index ID
actions:
- data:
- ['TruSeq Amplicon', 'A701', 'A501']
- ['TruSeq Amplicon', 'A702', 'A502']
verify:
Index Type: 'TruSeq Amplicon'
- Analyze Sequencing Results:
run: true
complete: true
data:
Amount: 10
Reference: ['GRCh38', 'GRCh37']
verify:
Amount: '{{ value >= 10 }}'
Reference: ['GRCh38', 'GRCh37']
Configuration Notes:
* There are several hooks for entering data in workflows, and each of them
are useful in different contexts. At a high-level, data are entered in order
of the block they're specified in. An overview of the blocks are as follows:
* ``before`` - Perform actions before any other actions are perform.
* ``actions`` - List of action blocks to perform (as a list).
* ``after`` - Perform actions after any other actions are perform.
If no ``before``, ``actions``, or ``after`` blocks are specified, then the
parameters included in the config will be run in a single action. See the
examples above for context.
* Available actions that can be performed when filling in data are:
* ``data`` - Fill in data specified in configuration. Data can take
the format of a dictionary of values (fill in all rows
with the specified value), or a dictionary of lists (use the
list indices to fill in individual rows).
* ``load`` - Load tabular file into worksheet, matching file headers
with protocol columns.
* ``user``/``password`` - Log in as specific user before entering data.
* ``verify`` - Run assertions on data after data are entered into
a worksheet. This action is particularly useful for verifying
that expressions work as expected.
* ``run`` - Start a pipeline from a pipeline protocol. This option will
run all pipelines specified for the worksheet protocol.
* ``wait`` - Wait a number of seconds before performing the next action.
* To automatically create Worksheets from experiments, use the ``submit: true``
parameter.
* To complete a protocol at the end of filling in data, use the ``complete: true``
parameter.
Examples:
.. code-block:: python
>>> from esp.models import Experiment
>>> exp = Experiment('MS001')
>>> exp.name, exp.created_at
('MS001', '2019-06-21T16:04:01.199076Z')
>>> # show relationships
>>> exp.samples
[<Sample(name=ESP001)>, <Sample(name=ESP001)>, ...]
>>> exp.project
<Project(name=Project 1)>
>>> exp.workflow
<Workflow(name=Initiate Manufacturing)>
>>> # accessing worksheet data
>>> exp.protocols[0]
Sample ID Column 1 Column 2
ESP001 1.2 foo
ESP002 1.3 bar
>>> exp.protocol('Set Samples')['Column 1']
[1.2, 1.3]
>>> # helper methods
>>> exp.active
True
>>> exp.submit() # submit workflow
>>> exp.fill(...) # fill with workflow config above
>>> exp.archive()
Arguments:
ident (str): Name or uuid for object.
"""
__api__ = "workflow_instances"
__api_cls__ = "WorkflowInstance"
__defaults__ = {
"samples": [],
"protocols": [],
}
__mutable__ = BaseModel.__mutable__ + [
"samples",
]
__push_format__ = {"samples": lambda x: [s.uuid for s in x.samples]}
[docs] @classmethod
@timeit(info="{config[workflow]}:::{config[name]}")
def create(cls, config=None, overwrite=False, prompt=False, object_names=None, **kwargs):
return super().create(config, overwrite, prompt, object_names, **kwargs)
[docs] @classmethod
def parse_import(cls, config, overwrite=False, allow_snapshot_uuid_remap=False):
"""
Create new object in ESP database using config file or other data.
This method should be overwritten by subclasses of LinkedModel for
model-specific logic.
Args:
config (str, dict, list): Config file or information to use in
creating new object.
overwrite (bool): Whether or not to delete current entry in
the ESP database.
"""
from .sample import Sample, SampleType
from .container import Container
from .inventory import Item
from .workflow import Workflow
# store sample data and sheet data
if "entities" in config:
entities = config.pop("entities")
else:
entities = config.pop("samples")
cls._protocols = config.pop("protocols")
cls._submit = config.pop("submit", False)
cls._sheet = config.pop("sheet", True)
cls._append_samples = config.pop("append_sheet_samples", False)
# get workflow information
if "workflow" in config:
wf = create_and_return(Workflow, config["workflow"], overwrite=overwrite)
config["workflow"] = wf.uuid
# get project information
config["project"] = create_and_return_uuid(Project, config["project"], overwrite=True)
# config samples
if len(entities) > 0 and isinstance(entities, (list, tuple)):
for idx, sample in list(enumerate(entities)):
# If the entity is a non-uuid string but a Container or Item this may not work.
if isinstance(sample, str) and not is_uuid(sample):
if len(wf.sample_types) > 0:
typ = wf.sample_types[0]
else:
typ = SampleType("Generic sample")
entities[idx] = {"name": sample, "type": typ.name}
for idx, entity in list(enumerate(entities)):
if is_uuid(entity):
continue
if isinstance(entity, dict):
entity_class = entity.get("cls", "Sample")
else: # It's a specific Entity object
entity_class = entity.data.get("cls", "Sample")
if entity_class == "Container":
entities[idx] = create_and_return_uuid(Container, entity, overwrite=False)
elif entity_class == "Item":
entities[idx] = create_and_return_uuid(Item, entity, overwrite=False)
else:
entities[idx] = create_and_return_uuid(Sample, entity, overwrite=False)
elif len(entities) > 0: # It's an object so create a Sample from the object (i.e. {count:x, type: Y})
entities = create_and_return_uuid(Sample, entities, overwrite=False)
config["samples"] = entities
return config
[docs] @classmethod
def parse_response(cls, data, overwrite=False, prompt=False, allow_snapshot_uuid_remap=False):
"""
Parse response from experiment creation and manage submission
and protocol data filling.
:param allow_snapshot_uuid_remap:
"""
self = cls.from_data(data)
sheet, submit, protocols = cls._sheet, cls._submit, cls._protocols
append_samples = cls._append_samples
del cls._submit, cls._protocols
if submit:
self.submit(sheet=sheet, append_samples=append_samples)
if self.active and len(protocols) > 0:
self.fill(protocols)
return self
def _data_by_name(self):
"""
Custom query for data by experiment name.
.. note: Developer note - look into removing the need for
this resolution - querying by name should be fine.
"""
# Notes:
# 1) the default params from querying are not the same as for
# fetching by uuid. For some reason. In particular, querying
# does not return the full set of resource vals. Since we
# have to do a double hit anyway, supply params=["name"] to
# make this first query faster.
# 2) On the backend, name is using a double-ended wildcard like
# expression, but names uses exact matching, so use names.
# TODO: Fix the backend so the default params for querying and fetching
# by ID are the same + fix querying by name to be exact match.
data = base.SESSION.get(
"/api/{}?names={}¶ms={}".format(
self.__api__, quote_plus('["' + self.ident + '"]'), quote_plus('["name"]')
)
).json()
for x in data:
if x.get("name") == self.ident:
uuid = x.get("uuid")
return base.SESSION.get("/api/{}/{}".format(self.__api__, uuid)).json()
return {}
[docs] def fill(self, protocols):
"""
Fill experiment with protocol data.
"""
import pandas
from .admin import User
# format input
if isinstance(protocols, (dict)):
protocols = [protocols]
# set data for protocols
user = None
action_conflict_keys = set(["data", "verify", "before", "after"])
action_keys = set(["data", "verify", "load", "user", "approve", "wait", "password", "endpoint"])
for obj in protocols:
param = obj.get("cols")
ignore = obj.get("ignore")
# parse specified actions
actions = obj.get("actions", [])
keys = list(obj.keys())
if len(actions) > 0 and len(action_conflict_keys.intersection(set(keys))) > 0:
raise AssertionError(
"Cannot use `before`, `after`, `data`, or `verify` alongside `action` block. "
"Please define those items in the `action` block."
)
if "before" in obj:
actions.append(obj.get("before"))
if len(action_keys.intersection(set(keys))) > 0:
actions.append({key: obj.get(key) for key in action_keys})
if "after" in obj:
actions.append(obj.get("after"))
# add run command to final action
if len(actions) == 0:
actions.append({})
actions[-1].update({"run": obj.get("run")})
# get current sheet for processing
sheet = self.protocol(obj["name"], index=obj.get("index", 0))
if sheet is None:
continue
sheet.refresh()
# iterate through and process actions
for action in actions:
data = action.get("data")
verify = action.get("verify")
approve = action.get("approve")
run = action.get("run")
load = action.get("load")
user = action.get("user")
wait = action.get("wait")
password = action.get("password")
endpoint = action.get("endpoint")
# set up specified user
if user:
if password is None:
raise AssertionError(
"Password must be specified to become specific user "
"for filling protocol (password: <password>)."
)
user = User.current()
user.password = base.SESSION.password
User(user).login(password)
# load from file
if load:
sheet.load(load)
# load data specified as data frame or string
if isinstance(data, six.string_types) or isinstance(data, pandas.DataFrame):
sheet.load(data)
# parameterized definition
elif param and data:
for idx in range(len(data)):
if len(param) != len(data[idx]):
raise AssertionError(
"Error: length of column definition is different than "
"length of data array in worksheet import."
)
sheet.df.loc[idx, param] = sheet.convert(param, data[idx])
# column-level definition
elif data and isinstance(data, dict):
for key, val in data.items():
if key not in sheet.columns:
raise AssertionError(
("Expected `{}` to be in sheet but wasn't. " "Available columns: {}").format(
key, sheet.columns
)
)
# dict can be:
# column: scalar
# column: list
# column:
# sample: scalar
if isinstance(val, dict):
sample_names = [x.name for x in sheet.samples]
for sampkey, sampval in val.items():
if sampkey not in sample_names:
raise ValueError(
("Expected sample `{}` to be in sheet " "but wasn't. Valid samples: {}").format(
sampkey, sample_names
)
)
idx = sample_names.index(sampkey)
sheet.df.loc[idx, key] = sheet.convert(key, sampval)
else:
sheet[key] = sheet.convert(key, val)
# don't know
elif data:
raise AssertionError(
"No rules for adding data as specified " "for protocol {}!".format(data["name"])
)
# save sheet data
sheet.save()
# wait for specified about of time before refresh
if wait:
time.sleep(wait)
sheet.refresh()
# run pipeline (if part of action)
if run is not None:
result = ""
if isinstance(run, bool) and run is True:
result = sheet.run()
else:
result = sheet.run(col_name=run)
if result.state == "failed" and not ignore:
raise OSError("Pipeline failed to successfully execute.")
# call an endpoint (if part of action)
if endpoint is not None:
import esp.base
def evaluate(value, context):
if not isinstance(value, str):
return value
if not value.startswith("{{") or not value.endswith("}}"):
return value
value = value[2:-2].strip()
return eval(value, context)
context = {
"sample_uuids": [x.uuid for x in self.samples],
"samples": self.samples,
"experiment": self,
"experiment_uuid": self.uuid,
# will break for multi-sheet experiments... but that edge case should be handled by custom
# test code anyway.
"sample_sheet_uuid": self.sheet.uuid,
"sample_sheet_name": self.sheet.name,
"sample_sheet": self.sheet,
}
if isinstance(endpoint, six.string_types):
endpoint = {"url": endpoint, "method": "get"}
if not isinstance(endpoint, dict):
raise ValueError("endpoint value must be a string (the endpoint to GET) or a dict!")
if "url" not in endpoint:
raise ValueError("endpoint dict must have url!")
url = endpoint["url"]
method = str(endpoint.get("method", "post" if endpoint.get("body") else "get")).lower()
if method not in ("put", "post", "get", "delete"):
raise ValueError("method must be one of put, post, get, or delete!")
body = endpoint.get("body", {})
queryargs = endpoint.get("queryargs", {})
if body and method in ("get", "delete"):
raise ValueError("body not supported for get or delete at this time. Try queryargs")
for key, value in queryargs.items():
value = evaluate(value, context)
if isinstance(value, (list, dict)):
value = json.dumps(value)
url += "{key}={value}".format(key=key, value=value)
for key, value in body.items():
value = evaluate(value, context)
body[key] = value
method = getattr(esp.base.SESSION, method)
result = method(url, json=body if body else None)
# do what for the result? Print it for now for posterity
print("Result of endpoint {}: ".format(url))
try:
result = result.json()
print(result)
except:
result = result.text
print(result.text)
# may be additional wait clause...
if "wait" in endpoint:
wait_conditions = endpoint["wait"]
if "time" in wait_conditions:
time.sleep(wait_conditions["time"])
if "pipeline_status" in wait_conditions:
pipeline_info = wait_conditions["pipeline_status"]
pipeline = pipeline_info["pipeline"]
pipeline = evaluate(pipeline, {"result": result, "experiment": self, "worksheet": sheet})
self._monitor_pipeline(pipeline, pipeline_info, sheet)
# log back in as original user
if user is not None:
user.login(user.password)
user = None
# approve protocol at end of actions
if approve:
if isinstance(approve, dict):
for key in approve:
sheet.approve(approve[key], column=key)
else:
sheet.approve(approve)
# run verify block
if verify:
sheet.verify(verify)
# complete protocol at end of actions
if obj.get("complete"):
sheet.complete()
return self
@timeit(info="{self.workflow.name}:::{sheet.name}:::{pipeline}")
def _monitor_pipeline(self, pipeline, pipeline_info, sheet):
status_targets = pipeline_info["status"]
limit = pipeline_info.get("time_limit", 300)
from esp.models import Analysis
analysis = Analysis(pipeline)
if not analysis.exists():
raise ValueError("Could not determine pipeline status: invalid pipeline: {}".format(pipeline))
start_time = datetime.now()
while analysis.state not in status_targets:
if analysis.state in ["done", "failed"]:
break # can't go any further than that...
if (datetime.now() - start_time).total_seconds() > limit:
raise AssertionError(
"Pipeline status `{}` not in `{}` in `{}` seconds".format(analysis.state, status_targets, limit)
)
time.sleep(pipeline_info.get("sleep_time", 10))
analysis.refresh()
[docs] def archive(self):
"""
Archive experiment once sample sheet is finished. This provides
guarantees around experiment completeness by explicitly checking
for protocol completion before dropping.
"""
logging.info("Archiving SampleSheet for Experiment: {}".format(self.name))
# check if all protocols are complete
self.refresh()
if self.active:
if len(self.workflow.protocols) != len(self.protocols):
raise AssertionError("Cannot archive incomplete sample sheet!")
for obj in self.protocols:
if not obj.completed:
raise AssertionError("Cannot archive incomplete sample sheet!")
self.drop()
return
[docs] def push(self):
"""
Overwrite push method to account for inconsistencies in PUT
payloads vs GET payloads.
"""
super(Experiment, self).push()
self.refresh()
return
[docs] def drop(self, deep=False):
"""
Issue DELETE request to remove object in ESP database.
"""
if self.state not in ["pending", "queued"] and self.sheet.exists():
# if len(self.sheet.experiments) == 1:
self.sheet.drop()
if deep:
# If there are Containers or Items in the samples, they will get deleted too.
# Leaving that alone for now but we may want to check for that.
for sample in self.samples:
sample.refresh()
if sample.exists() and not sample.deleted:
sample.drop(deep=deep)
return super(Experiment, self).drop()
[docs] @timeit(info="{self.name}")
def submit(self, sheet=True, append_samples=False):
"""
Submit experiment to lab for processing.
"""
logging.info("Submitting Experiment: {}".format(self.name))
# submit the experiment
base.SESSION.put(
"/api/workflow_instances/{}".format(self.uuid),
json={"submit": True, "samples": [s.uuid for s in self.samples]},
)
# create sheet if specified
if isinstance(sheet, bool):
if sheet:
sheet = self.name
if isinstance(sheet, six.string_types):
self.create_sheet(name=sheet, append_samples=append_samples)
self.refresh()
return self
@timeit(info="{self.name}:::{name}")
def create_sheet(self, name=None, append_samples=False):
if self.active and self.sheet.name == self.name:
return self.sheet
sheet_name = self.name if name is None else name
sheet = SampleSheet(sheet_name)
if sheet.exists() and append_samples:
sheet.add_experiment(self)
return sheet
else:
return SampleSheet.create(name=self.name if name is None else name, experiments=[self], overwrite=True)
[docs] def sample_group(self, idx=0):
"""
Property for accessing sample groups within experiment.
"""
return [Experiment._create_entity_from_data(s) for s in self.sample_groups[idx]]
@classmethod
def _create_entity_from_data(cls, entity):
"""
Determine entity class and return appropriate entity.
"""
from .sample import Sample
from .container import Container
from .inventory import Item
entity_class = entity.get("cls", "Sample")
if entity_class == "Container":
return Container.from_data(entity)
if entity_class == "Item":
return Item.from_data(entity)
return Sample.from_data(entity)
@cached.tag("refresh")
def sheet(self):
"""
Property for accessing sample sheet related to experiment.
"""
# NOTE: The esp client only allows for one sample sheet per experiment. This
# is a design decision meant to simplify assumptions about how the API
# could be structured.
if not self.active:
raise AssertionError("Experiment must be submitted to access sample sheet!")
return SampleSheet(self.sample_sheets[0])
@cached.tag("refresh")
def protocols(self):
"""
Property for accessing sample sheet related to experiment.
"""
# NOTE: The esp client only allows for one sample sheet per experiment. This
# is a design decision meant to simplify assumptions about how the API
# could be structured.
if not self.active:
raise AssertionError("Experiment must be submitted to retrieve Worksheet objects!")
return [ExperimentWorksheet(obj.uuid, experiment=self) for idx, obj in enumerate(self.step_instances)]
[docs] def protocol(self, name, index=0):
"""
Retrieve protocol instance by name for easier API access.
"""
count = -1
for obj in self.protocols:
if obj.name == name:
count += 1
if count == index:
return obj
return None
@cached.tag("refresh")
def samples(self):
"""
Property for accessing initial sample objects in experiment.
"""
return self.sample_group(0)
[docs] def add_samples(self, new_samples):
"""
Add additional samples to the experiment.
NOTE: If the experiment is pending, the samples will replace the current list of samples rather
than add to it.
Args:
new_samples (List[Sample]): list of samples to add
"""
sample_key = "add_samples"
if self.state == "pending":
sample_key = "samples"
base.SESSION.put("/api/{}/{}".format(self.__api__, self.uuid), json={sample_key: [x.uuid for x in new_samples]})
@cached.tag("refresh")
def project(self):
"""
Property for accessing experiment project.
"""
return Project(self.project_uuid)
@cached.tag("refresh")
def workflow(self):
"""
Property for accessing experiment workflow definition.
"""
from .workflow import Workflow
return Workflow.from_definition(self.workflow_uuid)
@property
def sample_types(self):
"""
Proxy for returning sample types from workflows.
"""
return self.workflow.sample_types
@property
def active(self):
"""
Check if experiment is currently active.
"""
return self.state in ["loading", "running", "done", "failed"]
[docs]class SampleSheet(BaseModel):
"""
Object for interacting with SampleSheets from the ESP database.
See the `Usage <./usage.html>`_ and `Examples <./examples.html>`_ pages
of the documentation for more context and comprehensive examples of
how to create and use this type of objects.
Configuration:
Create sample sheet with submitted experiments:
.. code-block:: yaml
name: Sheet 001
desc: Sample sheet with experiments
experiments:
- Experiment 1
- Experiment 2
Create sample sheet nested experiments:
.. code-block:: yaml
name: Sheet 001
desc: Sample sheet with experiments
experiments:
- Experiment 1
- Experiment 2:
samples:
count 2
submit: True
sheet: False
Configuration Notes:
* It's recommended to use Experiment objects for accessing worksheet
data. This model is primarily useful for creating multi-experiment
sample sheets.
Examples:
.. code-block:: python
>>> from esp.models import SampleSheet
>>> sheet = SampleSheet('Sheet 001')
>>> sheet.name, sheet.created_at
('Sheet 001', '2019-06-21T16:04:01.199076Z')
>>> # show relationships
>>> sheet.samples
[<Sample(name=ESP001)>, <Sample(name=ESP001)>, ...]
>>> sheet.experiments
[<Experiment(name=Experiment 1)>, <Sample(name=Experiment 2)>]
>>> sheet.project
<Project(name=My Project)>
Arguments:
ident (str): Name or uuid for object.
"""
__api__ = "sample_sheets"
__mutable__ = BaseModel.__mutable__ + ["workflow_uuid", "samples"]
__push_format__ = {
"samples": lambda x: raw(x.data.samples),
}
_worksheet_class_version = 2
[docs] @classmethod
def all(cls, **kwargs):
"""
Return all instances related to a given model.
"""
# NOTE: BACKEND DOESN'T INCLUDE WORKFLOW INSTANCES IF QUERYING
# BASE URL FOR MODEL, SO WE HAVE TO RE-INSTANTIATE OBJECTS.
result = base.SESSION.get("/api/{}".format(cls.__api__))
return [cls(data["uuid"]) for data in result.json()]
[docs] @classmethod
def parse_import(cls, config, overwrite=False, allow_snapshot_uuid_remap=False):
"""
Create new object in ESP database using config file or other data.
This method should be overwritten by subclasses of LinkedModel for
model-specific logic.
Args:
config (str, dict, list): Config file or information to use in
creating new object.
overwrite (bool): Whether or not to delete current entry in
the ESP database.
"""
from .sample import Sample
experiments = config.pop("experiments", None)
workflow = config.pop("workflow", None)
if not experiments and not workflow:
raise ValueError("Must specify at least one of `experiments` or `workflow` properties to config.")
cls._experiments = None
if experiments is not None and len(experiments) > 0:
experiments = create_and_return(Experiment, experiments, overwrite=overwrite)
if "samples" in config:
samples = config.pop("samples")
samples = create_and_return(Sample, samples, overwrite=overwrite)
for exp in experiments:
config["workflow_uuid"] = exp.workflow.uuid
config["workflow_instance_uuid"] = exp.uuid
config["samples"] = config.get("samples", []) + [
{"uuid": s.uuid, "workflow_instance_id": exp.uuid} for s in exp.samples
]
if workflow is not None:
from .workflow import Workflow
if isinstance(workflow, str):
resolved_workflow = Workflow(workflow)
else:
resolved_workflow = workflow
if isinstance(resolved_workflow, Workflow) and resolved_workflow.exists():
config["workflow_uuid"] = resolved_workflow.def_uuid
else:
raise ValueError(
"Property `workflow` must be a valid workflow name, uuid, or Workflow object, but was: {}".format(
workflow
)
)
return config
[docs] @classmethod
def parse_response(cls, data, overwrite=False, prompt=False, allow_snapshot_uuid_remap=False):
"""
Do multi-experiment adding after sample sheet creation.
:param allow_snapshot_uuid_remap:
"""
# NOTE: THE BACKEND SHOULD BE ABLE TO TAKE MULTIPLE WORKFLOW
# INSTANCE UUIDS ON THE FIRST CREATE
self = cls.from_data(data)
experiments = cls._experiments
del cls._experiments
if experiments is not None:
self.add_experiment(experiments)
return self
[docs] def drop(self, deep=False):
"""
Force drop a sample sheet by removing all samples and running super
drop method.
"""
# self.data.samples = [[] * len(self.experiments)]
# self.push()
return super(SampleSheet, self).drop()
[docs] def archive(self):
"""
Try to gracefully archive sample sheet using DELETE request. This
method make assumptions about experiment completeness and will fail
if any Worksheets in SampleSheet are incomplete.
"""
return super(SampleSheet, self).drop()
[docs] def remove(self, item):
"""
Remove samples or experiments from existing sample sheet.
"""
from .sample import Sample
# experiment
if isinstance(item, Experiment):
uuids = [item.uuid]
# samples
else:
if isinstance(item, Sample):
item = [item]
elif not isinstance(item, (list, tuple)):
raise AssertionError("Input to SampleSheet.remove() must be Sample, Experiment, or list of samples!")
uuids = list(set([s.uuid for s in item]))
# iterate through data structure and remove entries
for gidx, grp in enumerate(self.data.samples):
# find items to remove
remove = []
for sidx, sid in enumerate(grp):
if (sid in uuids) or (self.data.workflow_instances[gidx][sidx] in uuids):
remove.append(sidx)
# get rid of them in reverse order
for idx in remove[::-1]:
del self.data.samples[gidx][idx]
del self.data.workflow_instances[gidx][idx]
self.push()
return
[docs] def add_experiment(self, experiment, samples=None):
"""
Add experiment(s) to existing sample sheet.
If samples is provided, only samples in the provided list will be added.
Otherwise, all samples from all experiments will be added.
"""
if samples is not None:
sample_uuids = set(x.uuid for x in samples)
if not isinstance(experiment, (list, tuple)):
experiment = [experiment]
if samples is None:
should_include = lambda x: True
else:
ids = set(x.uuid for x in samples)
should_include = lambda x: x.uuid in ids
payload = []
for exp in experiment:
if samples is None:
sample_uuids = set(x.uuid for x in exp.samples)
if self.workflow_uuid != exp.workflow.uuid:
raise AssertionError(
"Error: all Experiments in a SampleSheet " "must use the same Workflow definition!"
)
payload += [
{"sample": {"uuid": s.uuid}, "workflowInstance": {"uuid": exp.uuid}}
for s in exp.samples
if should_include(s)
]
base.SESSION.put("/api/sample_sheets/{}/add_samples".format(self.uuid), json=dict(samples=payload))
self.refresh()
return
@cached.tag("refresh")
def experiment(self):
"""
Return experiment objects associated with sample sheet.
"""
if len(self.experiments) != 1:
raise AssertionError(
"Worksheet {} has samples from multiple experiments. " "Use self.experiments instead.".format(self.name)
)
return self.experiments[0]
@cached.tag("refresh")
def experiments(self):
"""
Return experiment objects associated with sample sheet.
"""
wfis = set(reduce(lambda x, y: x + y, self.workflow_instances, composite([])))
split_wfis = []
for wfi in wfis:
if "|" in wfi:
split_wfis = split_wfis + wfi.split("|")
else:
split_wfis.append(wfi)
wfis = list(set(split_wfis))
return [Experiment(x) for x in wfis]
@cached.tag("refresh")
def _sample_to_wfi(self):
ret = {}
for i, sample_set in enumerate(self.data.samples):
for j, sample in enumerate(sample_set):
ret.setdefault(sample, {})[j] = self.data.workflow_instances[i][j]
return ret
def _is_mes(self):
return True if self.sample_sheet_type == "batch" else False
[docs] def experiment_for_sample(self, sample, sample_index=None):
"""
Superceded by experiments_for_sample().
Included for backwards compatibility.
"""
if not isinstance(sample, six.string_types):
sample = sample.uuid
if sample not in self._sample_to_wfi:
raise ValueError(
"Sample {} ({}) is not in worksheet {} ({})".format(sample.name, sample.uuid, self.name, self.uuid)
)
# use the cache of experiments to ensure we don't
# hit the backend N times...
wfi = self._sample_to_wfi[sample]
if len(wfi) == 1:
wfi = list(wfi.values())[0]
elif sample_index is None:
raise ValueError("Same sample multiple times in the worksheet: must specify the sample index")
elif sample_index not in wfi:
raise ValueError(
"Invalid sample_index. Got `{}` but sample `{}` present at `{}`".format(
sample_index, sample, ", ".join(wfi.keys())
)
)
else:
wfi = wfi[sample_index]
# Since we first verify sample exists in _sample_to_wfi, below is
# safe.
return [x for x in self.experiments if x.uuid == wfi][0]
[docs] def experiments_for_sample(self, sample, sample_index=None):
"""
Get list of Experiments corresponding to a Sample row. This usually
returns a list with a single Experiment, but if a fan-in Sample
Protocol has fanned in Samples from multiple Experiments, these
rows pass a pipe-delimited list of Experiment UUIDs for "rolled-up"
rows that actually correspond to separate StepInstanceBaseSamples
maintained by separate Experiments. These rows are typically updated
in tandem so that they have the same contents.
Args:
sample (str): Sample UUID.
sample_index (int): Index of Sample row. Only needed for
first (submitted) SampleGroup which may have the same
Sample included from more than one Experiment.
Returns:
List of Experiments.
"""
if not isinstance(sample, six.string_types):
sample = sample.uuid
if sample not in self._sample_to_wfi:
raise ValueError(
"Sample {} ({}) is not in worksheet {} ({})".format(sample.name, sample.uuid, self.name, self.uuid)
)
# use the cache of experiments to ensure we don't
# hit the backend N times...
wfi = self._sample_to_wfi[sample]
if len(wfi) == 1:
wfi = list(wfi.values())[0]
elif sample_index is None:
raise ValueError("Same sample multiple times in the worksheet: must specify the sample index")
elif sample_index not in wfi:
raise ValueError(
"Invalid sample_index. Got `{}` but sample `{}` present at `{}`".format(
sample_index, sample, ", ".join(wfi.keys())
)
)
else:
wfi = wfi[sample_index]
if "|" in wfi:
wfi = wfi.split("|")
else:
wfi = [wfi]
# Since we first verify sample exists in _sample_to_wfi, below is
# safe.
return [x for x in self.experiments if x.uuid in wfi]
@cached.tag("refresh")
def project(self):
"""
Return project associated with sample sheet.
"""
if len(self.projects) != 1:
raise AssertionError(
"Worksheet {} has samples from multiple projects. " "Use self.projects instead.".format(self.name)
)
return self.projects[0]
@cached.tag("refresh")
def projects(self):
"""
Return all projects associated with sample sheet.
"""
projects = {}
for exp in self.experiments:
projects[exp.project.name] = exp.project
return list(projects.values())
@cached.tag("refresh")
def samples(self):
"""
Return all samples added to sample sheet. Only includes the initial
sample set (ie the samples directly added by the user). Does not
include child samples created by protocols within the worksheet.
Samples will be returned in the order they were added to the worksheet.
"""
# lookup table of samples b/c experiment has full sample data ref,
# but sample sheet does not, so we get the samples from the experiment
# to avoid individual sample fetching.
samples = {}
for exp in self.experiments:
samples.update({x.uuid: x for x in exp.samples if x.uuid in self._sample_to_wfi})
# ensure the ordering is correct.
if self.data.samples:
return [samples[uuid] for uuid in self.data.samples[0]]
return []
@cached.tag("refresh")
def workflow(self):
"""
Return the workflow definition for this sample sheet.
.. note: There is no workflows property, because all experiments in
a sample sheet MUST have the same workflow definition.
"""
from .workflow import Workflow
return Workflow.from_definition(self.workflow_uuid)
@cached.tag("refresh")
def bulk_data(self):
# do this by using the bulk endpoint.
import esp.base
data = esp.base.SESSION.get("/api/sample_sheets/{}/bulk".format(self.uuid)).json()
return composite(data)
@cached.tag("refresh")
def protocols(self):
"""Get all protocols in a worksheet."""
if self._worksheet_class_version == 2:
return [SampleSheetWorksheet2(self, tab) for tab in self.bulk_data.tabs]
else:
return [SampleSheetWorksheet(self, x["name"]) for x in self.workflow.data["protocols"]]
[docs] def protocol(self, protocol_name):
"""Get a protocol by name"""
protocol = [x for x in self.protocols if x.name == protocol_name]
if not protocol:
raise ValueError(
"Worksheet `{}` ({}) does not contain protocol `{}`. Valid protocols: {}".format(
self.name, self.uuid, protocol_name, ", ".join([x.name for x in self.protocols])
)
)
return protocol[0]
[docs] def transition(self):
"""Transition samples in the SampleSheet that are part of a workflow chain."""
return base.SESSION.put("/api/sample_sheets/{}/transition_chains".format(self.uuid))
@cached.tag("refresh")
def _samples_to_groups(self):
ret = {}
for i, sample_set in enumerate(self.data.samples):
for sample in sample_set:
ret[sample] = i
return ret
def sample_to_group(self, sample):
from .sample import Sample
if hasattr(sample, "uuid"):
sample = sample.uuid
elif not isinstance(sample, six.string_types):
raise TypeError("sample must be a Sample or a string (UUID)")
return self._samples_to_groups.get(sample)
[docs]class Worksheet(BaseModel):
"""
Object for managing sample sheets within ESP. This object is not
configurable, and operates like a pandas DataFrame for each
protocol in a workflow. You can access these worksheet objects
from an experiment via:
.. code-block:: python
>>> from esp.models import Experiment
>>> exp = Experiment('My Experiment')
>>> ws = exp.protocol('My Protocol')
>>> ws = exp.protocols[0]
Also, since the Worksheet object acts as a pandas DataFrame, you can
interact with it like a data frame:
.. code-block:: python
>>> ws.columns
['Column 1', 'Column 2']
>>> ws['Column 1'] = ws['Column 1'].apply(lambda x: x.replace('foo', 'bar'))
>>> ws.melt(...)
After changing the data in the worksheet object, you can save it back
to the L7 database and proceed to the next worksheet using:
.. code-block:: python
>>> ws.save()
>>> ws.complete()
>>> ws.next()
Args:
ident (str): UUID for worksheet instance.
experiment (Experiment): Experiment object related to worksheet.
"""
__prime_attributes__ = BaseModel.__prime_attributes__ + [
# empty string is a carryover from original implementation. Not sure why it's necessary.
"df",
"_data",
"_version",
"",
"experiment",
"protocol_name",
"protocol",
"cache",
"urows",
"var_types",
"_sample_sheet",
"columns_fixed_id_to_name",
"add_samplecols",
"colmap",
]
# make things feel like a data frame
def __repr__(self):
return repr(self.df)
def __len__(self):
return len(self.df)
def __getattr__(self, name):
# special-case df ane _data b/c we use them directly in __getattr__,
# making it way to easy to endlessly recurse.
if name in self.__class__.__dict__ or name in self.__dict__ or name in self.__prime_attributes__:
return self.__getattribute__(name)
elif self._data is not None and name in self._data:
return self._data[name]
elif not hasattr(self.df, name):
return super(Worksheet, self).__getattr__(name)
else:
return getattr(self.df, name)
def __setattr__(self, name, value):
if name in self.__prime_attributes__:
self.__dict__[name] = value
elif name in self.__class__.__dict__:
self.__dict__[name] = value
elif hasattr(self.df, name):
setattr(self.df, name, value)
else:
super(Worksheet, self).__setattr__(name, value)
def __contains__(self, item):
return item in self.df
def __getitem__(self, item):
# Force resolution of df - it may not
# be resolved yet.
self.df
if item in self.columns_fixed_id_to_name:
item = self.columns_fixed_id_to_name[item]
if item in self.df.columns or not isinstance(item, str):
return self.df[item]
return self.data[item]
def __setitem__(self, item, value):
# Force resolution of df - it may not
# be resolved yet.
self.df
if item in self.columns_fixed_id_to_name:
item = self.columns_fixed_id_to_name[item]
if item in self.df.columns or not isinstance(item, str):
self.df[item] = value
else:
self.data[item] = value
@cached.tag("refresh")
def sample_sheet(self):
"""Return a reference to the sample sheet for this worksheet."""
raise NotImplementedError("Subclasses of Worksheet must implement sample_sheet")
[docs] def refresh(self):
"""Refresh the object."""
raise NotImplementedError("Subclasses of Worksheet must implement refresh.")
@cached.tag("refresh")
def index(self):
"""Return the protocol index within the workflow for this worksheet."""
raise NotImplementedError("Subclasses of Worksheet must implement refresh.")
@cached.tag("refresh")
def sample_group(self):
"""Return the sample group for this worksheet."""
raise NotImplementedError("Subclasses of Worksheet must implement sample_group.")
@cached.tag("refresh")
def samples(self):
"""Return the samples for this worksheet."""
raise NotImplementedError("Subclasses of Worksheet must implement samples.")
[docs] def convert_attachment(self, value):
"""
Create column value data structure for attachment column.
"""
return format_attachment_value(value)
[docs] def convert_location(self, value):
"""
Create column value data structure for location column.
"""
try:
json.loads(value)
return value
except Exception:
pass
from .container import Container
def raise_format_error():
raise ValueError(
"location values must either be the appropriate json string "
"or a string in the format: <containername>: <slot>[,...] "
"as: Plate0001: A01, or as a dictionary in the format "
"<containername>: [slot, ...] or as a dictionary in the "
"<containername>: [{{slot: {{field: value, field: value}}], "
"but was: {}".format(value)
)
if isinstance(value, str):
if ":" not in value:
raise_format_error()
value = value.replace(": ", ":")
cname, slots = value.split(":")
slots = slots.split(",")
elif isinstance(value, dict):
# note: the format used for specifying fields triggers the client's
# standard behavior of remapping the lone key of the parent dict
# into a "name" key within the child dict and passing that through.
# convenient here, since it simplifies the data structure.
if "name" not in value:
raise_format_error()
# make a local copy to avoid modifying the original. Otherwise,
# the conversion check of val[0] == converter(val[0]) drops the
# name from the first entry, and the subsequent
# [converter(x) for x in val] fails.
value = dict(value)
cname = value.pop("name")
slots = list(value.items())
else:
raise_format_error()
cont = Container(cname)
if not cont.exists():
raise ValueError("No such container in DB: {}".format(cname))
locs = []
for slot in slots:
if isinstance(slot, str):
fields = {}
elif isinstance(slot, tuple):
fields = slot[1]
if not isinstance(fields, dict):
raise_format_error()
slot = slot[0]
else:
raise_format_error()
locs.append({"label": slot, "fields": fields})
if hasattr(self, "workflow_instance_uuid"):
wfi_uuid = self.workflow_instance_uuid
elif hasattr(self, "experiment"):
wfi_uuid = self.experiment.uuid
else:
wfi_uuid = None
return json.dumps(
[
{
"container": cont.uuid,
"name": cont.name,
# TODO: currently assuming a single WFI!
"workflow_instance": wfi_uuid,
"locations": locs,
}
]
)
[docs] def convert_approval(self, value):
"""
Create column value data structure for approval column.
"""
return json.dumps({"user": base.SESSION.user.uuid, "timestamp": str(datetime.utcnow()) + "Z"})
[docs] def convert_item_qty_adjust(self, value):
"""
Create column value data structure for itemqtyadj column.
"""
try:
json.loads(value)
return value
except Exception:
pass
from .inventory import Item
if ":" not in value:
raise ValueError(
"itemqtyadj values must either be the appropriate json string "
"or a string in the format: <itemname>: <qty> "
"as: Batch 1: 3. Note that the simplified form only supports "
"setting a single entry at this time."
)
itemid, qty = [x.strip() for x in value.split(":", 1)]
item = Item(itemid)
if not item.exists():
raise ValueError("item {} does not exist".format(itemid))
return json.dumps([{"item": item.uuid, "name": item.name, "note": "", "qty": float(qty), "new_entry": True}])
def convert_resource_link(self, val, var=None):
return format_resourcelink_value(val, var)
def convert_rand(self, val):
import esp.data.generation as gen
val = val.replace("rand", "generate_data")
[docs] def convert(self, col, val):
"""
Convert column value from sheet.
"""
if isinstance(col, (list, tuple)):
return [self.convert(k, v) for k, v in zip(col, val)]
if not self.var_types or col not in self.var_types:
return val
var_type = self.var_types[col]
def random(**kwargs):
import esp.data.generation as gen
n = kwargs.pop("n", len(self.samples))
type_ = kwargs.pop("type_", var_type)
if type_ == "dropdown":
type_ = "choice"
kwargs["choices"] = self._get_dropdown_options(col)[0]
return gen.generate_data(n, type_, **kwargs)
if str(val).startswith("{{") and val.endswith("}}"):
val = val[2:-2].strip()
env = {
"random": random,
}
val = eval(val, env, {})
converters = {
"attachment": self.convert_attachment,
"location": self.convert_location,
"approval": self.convert_approval,
"itemqtyadj": self.convert_item_qty_adjust,
"itemadj": self.convert_item_qty_adjust,
"resource_link": self.convert_resource_link,
}
if var_type not in converters:
return val
converter = converters[var_type]
if isinstance(val, six.string_types):
return converter(val)
import numpy as np
if isinstance(val, (np.object, list, tuple)):
if len(val) == 0:
return val
if val[0] == converter(val[0]):
return val
return [converter(x) for x in val]
raise ValueError("Don't know how to convert {}!".format(val))
@cached.tag("refresh")
def resource_vals(self):
raise NotImplementedError("Subclasses of worksheet must implement resource_vals.")
@cached.tag("refresh")
def df(self):
"""
Return sheet data for data editing/fetching.
"""
try:
import pandas
nopandas = False
except ImportError:
nopandas = True
rows = []
vals = self.resource_vals
# grab the ESP var types.
if vals:
self.var_types = {x.name: x.var_type for x in vals[0]}
self.columns_fixed_id_to_name = {x.fixed_id if x.fixed_id else x.name: x.name for x in vals[0]}
else:
self.var_types = {}
self.columns_fixed_id_to_name = {}
# set up df rows
self.urows = [s.uuid for s in self.samples]
for i, samp in enumerate(vals):
record = OrderedDict((obj.name, obj.value) for obj in samp)
if self.add_samplecols:
if "Sample ID" not in record:
record["Sample ID"] = self.samples[i].name
if "Sample UUID" not in record:
record["Sample UUID"] = self.urows[i]
rows.append(record)
if len(rows) > 0:
columns = list(rows[0].keys())
else:
columns = []
self.colmap = {idx: {rv["name"]: rv["uuid"] for rv in vals[idx]} for idx, uu in enumerate(self.urows)}
# TODO: think about using sample names or uuid for index. UUID would
# be easier, but not as nice for usability.
# RDZ Note: probably can't use either since the same sample can appear
# in a worksheet > 1x if it comes from 2 different experiments, unless we
# autoadd a suffix, but...
if nopandas:
self.cache = {}
return {}
else:
df = pandas.DataFrame(rows, columns=columns)
df.where(df.notnull(), None)
for col in df.columns:
# convert numeric types
if self.var_types.get(col) == "numeric":
try:
df[col] = df[col].astype(float)
except Exception:
continue
# initially unchecked checkboxes -> null, so make sure all
# checkbox-type columns are appropriately boolean.
if self.var_types.get(col) in ["complete", "checkbox"]:
df[col] = [x is not None and x.lower() == "true" for x in df[col].astype(str)]
self.cache = df.copy()
return df
[docs] def load(self, filename, sep="\t"):
"""
Update sheet with data from specified file.
"""
import pandas
if isinstance(filename, pandas.DataFrame):
df = filename
else:
df = pandas.read_csv(filename, sep=sep)
# check that data are complete
if len(df) != len(self.df):
raise AssertionError("Error: data frame number of rows must match " "number of samples")
# reorder by potential Sample ID column
if "Sample ID" in df.columns:
# check for redundant entries
names = [s.name for s in self.samples]
order = list(df["Sample ID"])
if len(set(names)) != len(set(order)):
raise AssertionError("Error: multiple definitions for same " "sample in input data frame!")
# calculate order and sort input
order = sorted(range(len(df)), key=lambda x: names.index(order[x]))
df = df.iloc[order]
df = df.drop(["Sample ID"], axis=1).reset_index(drop=True)
# set the column values
for col in df.columns:
if col not in self.df.columns:
raise AssertionError("Error: column {} from file import not " "in sheet data!".format(col))
self.df[col] = self.convert(col, df[col])
return
[docs] def fail(self, sample, message=""):
"""
Fail specified sample or samples in sample sheet.
"""
if isinstance(sample, str):
sample = [sample]
remove = []
for idx, obj in enumerate(self.samples):
if obj.name in sample:
remove.append(idx + 1) # 0-based index in the client; 1-based on the server
if len(remove) > 0:
payload = {
"message": message,
"protocol_index": self.index + 1, # 0-based index in the client; 1-based on the server.
"rows": remove,
}
base.SESSION.put("/api/lims/worksheets/{}/fail_samples".format(self.sample_sheet.uuid), json=payload)
return
@property
def completed(self):
"""
Return boolean describing whether or not sheet has been completed.
"""
mask = self.df[self.df.Complete == True] ## noqa
return len(self.df) != 0 and len(mask) == len(self.df)
[docs] def complete(self, value=True, save_first=True, samples=None):
"""
Mark complete column on workflow and save.
"""
# force resolution of dataframe, which is required for the code below to work right.
self.df
if save_first:
self.save()
col = {
(s, i): {self.colmap[i]["Complete"]: value}
for i, s in enumerate(self.urows)
if (samples is None or self.samples[i].name in samples)
}
self._save_changes(col)
self.refresh()
return self
[docs] def approve(self, password, sample=None, column=None):
"""
Approve sample and column in worksheet.
Args:
password (str): Password to approve with.
sample (str): Sample name to approve (default is all samples)
column (str): Column name to approve (default is all columns)
"""
if column is None:
column = [col["name"] for col in self.resource_vals[0] if col["var_type"] == "approval"]
elif not isinstance(column, (list, tuple)):
column = [column]
# construct payload for one column at a time
if isinstance(password, bool) and password:
password = base.SESSION.password
else:
password = str(password)
for col in column:
rvar = [val["resource_var"] for val in self.resource_vals[0] if val["name"] == col][0]
if sample is None:
sample = [s.name for s in self.samples]
elif not isinstance(sample, (list, tuple)):
sample = [sample]
sample_uuids = [(s.uuid, i) for i, s in enumerate(self.samples) if s.name in sample]
pi_uuids = self._pi_uuids(sample_uuids)
payload = {
"workflow_instance_uuids": self._wfi_uuids(sample_uuids),
"resource_val_uuids": [self.colmap[i][col] for uu, i in sample_uuids],
"resource_var_uuid": rvar,
"username": base.SESSION.user.email,
"password": password,
}
base.SESSION.put("/api/sample_sheets/{}/approve".format(self.sample_sheet.uuid), json=payload)
self.refresh()
return self
[docs] @timeit(info="{self.ident}:::{self.name}:::eval={evaluate}:::refresh={refresh}")
def save(self, evaluate=True, refresh=True):
"""
Set sample values that have changed
"""
import pandas
if len(self.df) != len(self.samples):
raise AssertionError(
"Error: number of samples does not match number of entries "
"in data frame for Worksheet {}".format(self.name)
)
changed = []
for ridx in range(len(self.df)):
for col in self.df.columns:
# numeric columns -> nan for null, and nan != nan, so we need additional checks here.
if self.df[col][ridx] != self.cache[col][ridx] and (
not self.df[col].isna()[ridx] or not self.cache[col].isna()[ridx]
):
changed.append((ridx, col))
# construct payload
delta = {}
js = self.df.to_dict()
for idx, col in changed:
# cast date objects as string (not serializeable)
if isinstance(js[col][idx], date):
js[col][idx] = str(js[col][idx])
delta.setdefault((self.urows[idx], idx), {}).update({self.colmap[idx][col]: js[col][idx]})
# send data if there was a change
if len(delta) > 0:
# do not implicitly uncomplete samples. Instead, script authors must
# explicitly ensure samples are uncompleted when making changes.
# Note that uncompleted samples was turned off in 2.3.1, but the residual
# code was causing problems.
self._save_changes(delta, evaluate=evaluate)
if refresh:
self.refresh()
return self
def _save_changes(self, delta, evaluate=True):
"""Given a delta in the format 'Dict[sample_uuid, Dict[column, value]]'
save the changes to the backend. Gauranteed to only be called if
there are values to update."""
raise NotImplementedError("Subclasses of worksheet must implement _save_changes.")
[docs] def next(self):
"""
Return next protocol instance for editing.
"""
raise NotImplementedError("Subclasses of worksheet must implement the next method")
def _wfi_uuids(self, sample_uuids):
return [self.experiment.uuid] * len(sample_uuids)
def _pi_uuids(self, sample_uuids):
return [self.uuid] * len(sample_uuids)
[docs] def grouped(self):
"""True if this worksheet tab is for a a grouped protocol"""
return self.group is not None and self.group != ""
[docs] def run(self, background=False, run_groups=None, col_name="Start"):
"""
Run pipeline for columns and wait for results to finish.
Arguments:
background(bool): When true, return immediately after submitting
the pipelines. Otherwise, wait for pipelines to complete before returning.
run_groups(optional[List|Dict|Set]): If not none, only samples with a group
value included in run_groups will be executed. For ungrouped pipeline
protocols, run_groups should use the sample names.
Returns:
AnalysisPool for submitted analyses, or None if no analyses were submitted.
"""
from .analysis import Analysis, AnalysisPool
# configure pipeline groups
groups = {}
for idx, row in self.df.iterrows():
if not self.grouped():
val = self.samples[idx].name
elif self.group in row:
val = row[self.group]
else:
val = self.group
if run_groups is None or val in run_groups:
groups.setdefault(val, []).append((self.samples[idx].uuid, idx))
# configure pipeline runs by group
pipes = []
for group in groups:
uuids = groups[group]
pi_uuids = self._pi_uuids(uuids)
payload = {
"protocol_instance_uuids": pi_uuids,
"sample_uuids": [x[0] for x in uuids],
"workflow_instance_uuids": self._wfi_uuids(uuids),
}
resource_var_uuid, resource_var_type = self._resource_var_for_column_name(col_name)
if resource_var_type in ["pipelinebutton", "pipeline_instance_uuid"]:
payload["start_resource_var"] = resource_var_uuid
else:
raise AssertionError(
"`{}`: Expected `{}` to be of type `pipelinebutton` or `pipeline_instance_uuid` but got `{}`".format(
self.name, col_name, resource_var_type
)
)
payload["step_instance_values"] = [
{pi_uuid: {uu[0]: {self.colmap[uu[1]][col_name]: True}}} for pi_uuid, uu in zip(pi_uuids, uuids)
]
logging.info("Running analysis pipeline for sample group {}".format(group))
logging.info("Pipeline payload {}".format(payload))
result = base.SESSION.put(
"/api/sample_sheets/{}/start_pipeline".format(self.sample_sheet.uuid), json=payload
)
data = result.json()
# NOTE: BACKEND RETURNS VARIOUS RESPONSES BASED ON ERROR - THIS CHECKS
# FOR ALL OF THEM
if "pipeline_instances" not in data:
raise AssertionError(
"Pipeline failed to execute, either because of "
"insufficient privileges or some other reason. Response data:\n{}".format(data)
)
pipes.append(data)
if len(pipes) > 0:
pool = AnalysisPool([Analysis.from_data(pipe["pipeline_instances"][0]) for pipe in pipes])
if background:
return pool
pool.join()
self.refresh()
return pool
return
[docs] def verify(self, data):
"""
Verify that the data in the sheet matches the data specification passed in.
Data can be formatted as:
.. code-block:: yaml
// all values in colname have the same value.
colname: value
// all values in colname have the same value.
colname: [1, 2, 3, 4] # vector of values should match this vector
// Specify values for each sample.
colname:
SAM000001: 1
SAM000002: 2
SAM000003: 3
SAM000004: 4
Value is usually treated literally, but if the value is a string that
starts with {{ and ends with }}, everything within {{ }} is treated as
a python expression to evaluate. The expression must evaluate to a
boolean and may reference the variables:
* value - the observed value for one row/sample for the column being verified.
* row - the complete (pandas) row under evaluation.
Expression can be simple, such as:
.. code-block:: python
{{bool(value)}} # make sure value is specified/not null/not empty.
Or more complicated:
.. code-block:: python
{{value == row['colA'] + '123' + row['colB']}}
Verify makes one utility function available to the expression: "eq".
.. code-block:: python
{{ eq(value, 123.5, 2) }}
This is equivalent to:
.. code-block:: python
{{ round(float(value), 2) == round(float(123.5, 2)) }}
"""
def check(expect, observe, row):
if isinstance(expect, six.string_types) and expect.startswith("{{") and expect.endswith("}}"):
expect = expect[2:-2]
elif isinstance(expect, (int, float)):
expect = "float(value) == float({})".format(expect)
else:
expect = "str(value) == '{}'".format(expect)
def eq(a, b, c):
return round(float(a), c) == round(float(b), c)
def valid_expression(value):
return bool(value) and "Error" not in str(value)
def match(regex, value):
import re
return re.match(regex, value)
return eval(
expect, {"value": observe, "row": row, "eq": eq, "valid_expression": valid_expression, "match": match}
)
# go through data and do assertions
df = self.df
errors = []
for var in data:
# make sure columns exist
if var not in df.columns:
if var.endswith("__options"):
self._verify_dropdown_options(var, df, data[var], errors)
continue
errors.append("Expected column `{}` to be in protocol `{}`".format(var, self.name))
continue
# coerce data to standardized structure
expect = data[var]
if isinstance(expect, dict):
expect = [expect[s.name] for s in self.samples]
elif not isinstance(expect, list):
expect = [expect] * len(df)
observe = list(df[var])
# calculate and assert on results
results = [check(expect[i], observe[i], row) for i, row in df.iterrows()]
if not all(results):
errors.append(
"Expected `{}` for column `{}`. Observed: `{}` (eval results: `{}`)".format(
expect, var, observe, results
)
)
if len(errors) > 0:
raise AssertionError("Errors in cell validation: \n\n{}".format("\n".join(errors)))
return True
def _vals_for_var(self, var):
if var not in self.df:
raise AssertionError("Expected column `{}` to be in protocol `{}`".format(var, self.name))
rvals = [y for x in self.resource_vals for y in x if y["name"] == var]
if not rvals:
raise AssertionError("Expected column `{}` to be in protocol `{}`".format(var, self.name))
return rvals
def _resource_var_for_column_name(self, col_name):
if self.resource_vals is None or len(self.resource_vals) == 0:
raise AssertionError("`{}`: Expected resource_vals but got None".format(self.name))
if col_name not in self.df:
raise AssertionError("Expected column `{}` to be in protocol `{}`".format(col_name, self.name))
vals = self.resource_vals[0]
resource_var_uuid = None
resource_var_type = None
for val in vals:
if val["name"] == col_name:
resource_var_uuid = val["resource_var"]
resource_var_type = val["var_type"]
return resource_var_uuid, resource_var_type
def _get_dropdown_options(self, var):
rvals = self._vals_for_var(var)
type_ = self.var_types.get(var)
if type_ not in ["dropdown", "multiselect"]:
raise AssertionError(
("Expected column `{}` of protocol `{}` to be type " "dropdown or multiselect but was: `{} `").format(
var, self.name, type_
)
)
return [x.get("dropdown") for x in rvals]
def _verify_dropdown_options(self, var, df, expect, errors):
"""Verify dropdown options for a column.
Valid formats of expectations:
* varname__options: [opt1, opt2, opt3]
* varname__options:
- [opt1, opt2, opt3]
- [opt4, opt5, opt6]
The first case asserts all cells in the column have the listed options
as their dropdown values. THe second case requires the same number of
outer list entries as there are samples in the worksheet.
"""
if var.endswith("__options"):
var = var[: -len("__options")]
if var not in df:
errors.append("Expected column `{}` to be in protocol `{}`".format(var, self.name))
return
try:
option_lists = self._get_dropdown_options(var)
except Exception as e:
errors.append(str(e))
return
if not isinstance(expect, list):
errors.append(
"Expected dropdown value for " "`{}` must be a list or a list of lists. Got: `{}`".format(var, expect)
)
return
if isinstance(expect[0], str):
# normalize expectations
expect = [expect for i in range(len(df))]
for options, dropdown in zip(option_lists, expect):
if options != dropdown:
errors.append("{}: Incorrect dropdown: `{}` != `{}` (expected)".format(var, options, dropdown))
class ExperimentWorksheet(Worksheet):
__api__ = "protocol_instances"
__api_cls__ = "ProtocolInstance"
__allow_update__ = False
def __init__(self, ident, experiment, add_samplecols=False, **kwargs):
self.add_samplecols = add_samplecols
if is_uuid(ident):
super(Worksheet, self).__init__(ident, **kwargs)
if isinstance(experiment, six.string_types):
self.experiment = Experiment(experiment)
else:
self.experiment = experiment
if not self.experiment.exists():
raise AssertionError("Specified experiment {} does not exist!".format(experiment))
else:
exp = Experiment(experiment)
pc = exp.protocol(ident)
super(Worksheet, self).__init__(pc.uuid, **kwargs)
self.experiment = exp
return
def refresh(self):
self.experiment.refresh()
super(Worksheet, self).refresh()
return
@cached.tag("refresh")
def index(self):
"""
Return sample group index for sheet.
"""
idx = 0
for idx, item in enumerate(self.experiment.step_instances):
if item.name == self.data["name"]:
return idx
return None
@cached.tag("refresh")
def sample_group(self):
"""
Return sample group index for sheet.
"""
group = 0
for idx, item in enumerate(self.experiment.step_instances):
if item.name == self.data["name"]:
group = self.experiment.sample_group_indices[idx]
break
return group
@cached.tag("refresh")
def samples(self):
"""
Return list of sample objects in protocol instance.
"""
return self.experiment.sample_group(self.sample_group)
@cached.tag("refresh")
def resource_vals(self):
return self.data["values"]
@cached.tag("refresh")
def sample_sheet(self):
return SampleSheet(self.experiment.sample_sheets[0])
def next(self):
"""
Return next protocol instance for editing.
"""
for idx, obj in enumerate(self.experiment.protocols):
if obj.name == self.name:
break
if (idx + 1) > (len(self.experiment.protocols) - 1):
return None
obj = self.experiment.protocols[idx + 1]
if len(obj.df) == 0:
obj.refresh()
return obj
def _save_changes(self, delta, evaluate=True):
# construct payload
logging.info("Saving data in Experiment {} for " "Protocol: {}.".format(self.experiment.name, self.name))
# set sample values and update self
_delta = {}
for samp, i in delta:
_delta[samp] = delta[(samp, i)]
payload = {
"step_instance_values": [{self.uuid: _delta}],
"workflow_instance_uuids": [self.experiment.uuid],
"evaluate": evaluate,
}
res = base.SESSION.put(
"/api/sample_sheets/{}/set_sample_values".format(self.experiment.sample_sheets[0]), json=payload
)
return res.json()
class SampleSheetWorksheet2(Worksheet):
"""
New implementation of SampleSheetWorksheet that utilizes data returned from
the /api/sample_sheets/:uuid/bulk endpoint for better performance and a richer
data set.
"""
__allow_update__ = False
__prime_attributes__ = Worksheet.__prime_attributes__ + ["tab"]
def __init__(self, sample_sheet, tab, add_samplecols=True, **kwargs):
self.add_samplecols = add_samplecols
self._sample_sheet = object_for_value(sample_sheet, "sample_sheet", SampleSheet)
self.tab = tab
self.protocol_name = tab.protocol_name
super().__init__(self._sample_sheet.ident, data=self._sample_sheet.data, **kwargs)
@cached.tag("refresh")
def sample_sheet(self):
return self._sample_sheet
@property
def group(self):
return self.tab.group
@property
def name(self):
return self.protocol_name
def refresh(self):
import esp.models.__base__ as ebase
index = self.index
self._sample_sheet.refresh()
if "uuid" in self.data and self.uuid in ebase.CACHE:
del ebase.CACHE[self.uuid]
self._data = {}
cached.invalidate(self, "refresh")
self.tab = self._sample_sheet.bulk_data.tabs[index]
@cached.tag("refresh")
def index(self):
return self._sample_sheet.bulk_data.tabs.index(self.tab)
@cached.tag("refresh")
def sample_group(self):
return self._sample_sheet.workflow.sample_group_indices[self.index]
@cached.tag("refresh")
def samples(self):
from .sample import Entity
return [x for x in Entity.items_for_uuids(s.sample_uuid for s in self.tab.rows)]
@cached.tag("refresh")
def resource_vals(self):
vals = []
for row in self.tab.rows:
samp_vals = []
for col, cell in zip(self.tab.columns, row.cells):
samp_vals.append(
composite(
{
"name": col.name,
"fixed_id": col.barcode,
"resource_var": col.resource_var_uuid,
"var_type": col.var_type,
"value": cell.value,
"uuid": cell.resource_val_uuid,
"in_sample_sheet": col.in_sample_sheet,
}
)
)
vals.append(samp_vals)
return vals
@cached.tag("refresh")
def fields_metadata(self):
return [
{
"barcode": field.barcode,
"fixed_id": field.barcode, # alias for barcode.
"default_val": field.default_val,
"desc": field.desc,
"description": field.desc, # alias for description.
"dropdown": field.dropdown,
"dropdown_options": field.dropdown, # alias for dropdown.
"dropdown_expr": field.dropdown_expr,
"dropdown_expression": field.dropdown_expr, # alias for dropdown_expr.
"expected": field.expected,
"group_name": field.group_name,
"group_sort_order": field.group_sort_order,
"in_sample_sheet": field.in_sample_sheet,
"hidden": not field.in_sample_sheet, # alias for in_sample_sheet.
"instructions": field.instructions,
"meta": field.meta,
"name": field.name,
"pipeline_param": field.pipeline_param,
"read_only": field.read_only,
"required": field.required,
"resource_var_name": field.resource_var_name,
"name": field.resource_var_name, # alias for resource_var_name.
"shared": field.shared,
"sort_order": field.sort_order,
"source": field.source,
"units": field.units,
"uuid": field.uuid,
"var_group": field.var_group,
"var_type": field.var_type,
"type": field.var_type, # alias for var_type.
}
for field in self.tab.columns
]
def _save_changes(self, delta, evaluate=True):
logging.info("Saving data in SampleSheet(2) {} for " "Protocol: {}.".format(self._sample_sheet.name, self.name))
step_instance_dicts = defaultdict(dict)
resource_vals = defaultdict(dict)
for (uuid, i), value in delta.items():
row = self.tab.rows[i]
# should already be concatenanted for multiple PIs...
protocol_uuid = row.protocol_instance_uuid
step_instance_dicts[protocol_uuid][uuid] = value
resource_vals.update(value)
step_instance_values = [{x: step_instance_dicts[x]} for x in step_instance_dicts]
if self.sample_sheet._is_mes():
payload = {"resource_vals": resource_vals}
res = base.SESSION.put(
"/api/electronic_batch_records/{}/set_sample_values".format(self.sample_sheet.uuid), json=payload
)
else:
payload = {
"step_instance_values": step_instance_values,
"evaluate": evaluate,
}
res = base.SESSION.put(
"/api/sample_sheets/{}/set_sample_values".format(self.sample_sheet.uuid), json=payload
)
return res.json()
def next(self):
idx = self.index
if (idx + 1) > len(self.sample_sheet.bulk_data.tabs):
return None
obj = self.sample_sheet.protocols[idx + 1]
if len(obj.df) == 0:
obj.refresh()
return obj
def _wfi_uuids(self, sample_uuids):
return [self._sample_sheet.experiment_for_sample(uuid, i).uuid for uuid, i in sample_uuids]
def _pi_uuids(self, sample_uuids):
ret = []
for uu, i in sample_uuids:
exp = self._sample_sheet.experiment_for_sample(uu, i)
ret.append(exp.step_instances[self.index]["uuid"])
return ret
@cached.tag("refresh")
def df(self):
"""
Return sheet data for data editing/fetching.
"""
try:
import pandas
nopandas = False
except ImportError:
nopandas = True
rows = []
tab = self.tab
self.var_types = {x.name: x.var_type for x in tab.columns}
self.columns_fixed_id_to_name = {x.barcode: x.name for x in tab.columns}
self.urows = []
# set up df rows
self.urows = [s.sample_uuid for s in tab.rows]
colmap = {}
for i, row in enumerate(tab.rows):
record = {cell.resource_var_name: cell.value for cell in row.cells}
colmap[i] = {cell.resource_var_name: cell.resource_val_uuid for cell in row.cells}
if self.add_samplecols:
if "Sample ID" not in record:
record["Sample ID"] = row.name
if "Sample UUID" not in record:
record["Sample UUID"] = row.sample_uuid
rows.append(record)
columns = [x.name for x in tab.columns]
if self.add_samplecols:
if "Sample ID" not in columns:
columns.append("Sample ID")
if "Sample UUID" not in columns:
columns.append("Sample UUID")
self.colmap = colmap
# TODO: think about using sample names or uuid for index. UUID would
# be easier, but not as nice for usability.
# RDZ Note: probably can't use either since the same sample can appear
# in a worksheet > 1x if it comes from 2 different experiments, unless we
# autoadd a suffix, but...
if nopandas:
self.cache = {}
return {}
else:
df = pandas.DataFrame(rows, columns=columns)
df.where(df.notnull(), None)
for col in df.columns:
# convert numeric types
if self.var_types.get(col) == "numeric":
try:
df[col] = df[col].astype(float)
except Exception:
continue
# initially unchecked checkboxes -> null, so make sure all
# checkbox-type columns are appropriately boolean.
if self.var_types.get(col) in ["complete", "checkbox"]:
df[col] = [x is not None and x.lower() == "true" for x in df[col].astype(str)]
self.cache = df.copy()
return df
class SampleSheetWorksheet(Worksheet):
"""
Object for managing sample sheets within ESP. This object is not
configurable, and operates like a pandas DataFrame for each
protocol in a workflow. You can access these worksheet objects
from an experiment via:
.. code-block:: python
>>> from esp.models import Experiment
>>> exp = Experiment('My Experiment')
>>> ws = exp.protocol('My Protocol')
>>> ws = exp.protocols[0]
Also, since the Worksheet object acts as a pandas DataFrame, you can
interact with it like a data frame:
.. code-block:: python
>>> ws.columns
['Column 1', 'Column 2']
>>> ws['Column 1'] = ws['Column 1'].apply(lambda x: x.replace('foo', 'bar'))
>>> ws.melt(...)
After changing the data in the worksheet object, you can save it back
to the L7 database and proceed to the next worksheet using:
.. code-block:: python
>>> ws.save()
>>> ws.complete()
>>> ws.next()
Args:
sample_sheet (str|SampleSheet): SampleSheet or UUID for SampleSheet instance.
experiments (List[Experiment]): Experiment object related to worksheet.
protocol (str): Name of the protocol
"""
__api__ = "sample_sheets"
__api_cls__ = "SampleSheet"
__allow_update__ = False
def __init__(self, sample_sheet, protocol, add_samplecols=True, **kwargs):
self.add_samplecols = add_samplecols
self._sample_sheet = object_for_value(sample_sheet, "sample_sheet", SampleSheet)
self.protocol = protocol
# for convenience. Otherwise, worksheet.name -> the sample sheet name.
self.protocol_name = protocol
# self.experiment = self._sample_sheet.experiment
super(SampleSheetWorksheet, self).__init__(self._sample_sheet.ident, data=self._sample_sheet.data, **kwargs)
@property
def group(self):
if len(self._sample_sheet.experiments) > 0:
return self._sample_sheet.experiments[0].protocol(self.protocol_name).group
return None
@property
def name(self):
return self.protocol_name
def refresh(self):
self._sample_sheet.refresh()
for experiment in self._sample_sheet.experiments:
experiment.refresh()
# hop over worksheet.refresh since it's not implemented...
super(Worksheet, self).refresh()
return
@cached.tag("refresh")
def index(self):
"""
Return protocol index for sheet.
"""
for idx, item in enumerate(self._sample_sheet.workflow.data["protocols"]):
if item.name == self.protocol:
return idx
return None
@cached.tag("refresh")
def sample_group(self):
"""
Return sample group index for sheet.
"""
return self._sample_sheet.workflow.sample_group_indices[self.index]
@cached.tag("refresh")
def samples(self):
"""
Return list of sample objects in protocol instance.
"""
samples = {}
for experiment in self._sample_sheet.experiments:
for sample in experiment.sample_group(self.sample_group):
samples[sample.uuid] = sample
sample_uuids = self.sample_sheet.data.samples[self.sample_group]
return [samples[x] for x in sample_uuids]
def _merge_protocol_values(self, prot_vals1, prot_vals2):
"""
Take ProtocolInstance Sample rows from different ProtocolInstances
and merge them into a single representation with pipe-delimited
ResourceVar UUIDs. Needed when Samples from multiple Experiments
are fanned-in to the same row. Each Experiment maintains its own
copy of the data, but the copies are rolled-up during processing
because they should always have the same data.
Args:
prot_vals1 (List[ResourceVal]): List of ResourceVals for a single ProtocolInstance row.
prot_vals2 (List[ResourceVal]): List of ResourceVals for a single ProtocolInstance row.
Returns List of ResourceVals for multiple SampleSheet rows that have been rolled up into
a single row representation.
"""
new_prot_vals = []
for prot_val1, prot_val2 in zip(prot_vals1, prot_vals2):
new_prot_val = composite({k: v for k, v in prot_val1.items()})
if prot_val1["resource_var"] != prot_val2["resource_var"]:
raise AssertionError(
f"Error: ResourceVals ({prot_val1['uuid']}, {prot_val2['uuid']}) "
f"do not have matching ResourceVars "
f"({prot_val1['resource_var']}, {prot_val2['resource_var']})"
)
new_prot_val["uuid"] += "|" + prot_val2["uuid"]
new_prot_vals.append(new_prot_val)
return new_prot_vals
@cached.tag("refresh")
def resource_vals(self):
"""
Return the resource vals for all samples in the worksheet.
Note: there is potentially an issue if the same sample is placed into
the same SampleSheet derived from two different experiments. Users
will hit a ValueError in that case.
"""
values = []
exp_to_sample_idx = {}
for exp in self._sample_sheet.experiments:
# Some test executions resulted in exp data being only partially
# hydrated. Guarding against that here, but
# TODO: Figure out why experiments are sometimes only partially hydrated.
if "definition" not in exp.data:
exp.refresh()
exp_to_sample_idx[exp.uuid] = {}
for i, sample in enumerate(exp.sample_groups[self.sample_group]):
exp_to_sample_idx[exp.uuid][sample.uuid] = i
used_sample_uuids = set()
for i, sample in enumerate(self.samples):
if self.sample_group == 0 or sample.uuid not in used_sample_uuids:
experiments = self._sample_sheet.experiments_for_sample(sample, i)
sample_row_data = None
for experiment in experiments:
protocol_data = experiment.step_instances[self.index]
idx = exp_to_sample_idx[experiment.uuid][sample.uuid]
if sample_row_data is None:
sample_row_data = protocol_data["values"][idx]
else:
sample_row_data = self._merge_protocol_values(sample_row_data, protocol_data["values"][idx])
values.append(sample_row_data)
used_sample_uuids.add(sample.uuid)
return values
@cached.tag("refresh")
def sample_sheet(self):
return self._sample_sheet
def _wfi_uuids(self, sample_uuids):
return [self._sample_sheet.experiment_for_sample(uuid, i).uuid for uuid, i in sample_uuids]
def _pi_uuids(self, sample_uuids):
ret = []
for uu, i in sample_uuids:
exp = self._sample_sheet.experiment_for_sample(uu, i)
ret.append(exp.step_instances[self.index]["uuid"])
return ret
def next(self):
"""
Return next protocol for editing
"""
for idx, obj in enumerate(self.sample_sheet.protocols):
if obj.name == self.name:
break
if (idx + 1) > (len(self.sample_sheet.protocols) - 1):
return None
obj = self.sample_sheet.protocols[idx + 1]
if len(obj.df) == 0:
obj.refresh()
return obj
def _save_changes(self, delta, evaluate=True):
# construct payload
logging.info("Saving data in SampleSheet {} for " "Protocol: {}.".format(self._sample_sheet.name, self.name))
step_instance_dicts = defaultdict(dict)
for uuid, i in delta:
experiments = self.sample_sheet.experiments_for_sample(uuid, i)
protocol_uuid = "|".join([experiment.step_instances[self.index].uuid for experiment in experiments])
step_instance_dicts[protocol_uuid][uuid] = delta[(uuid, i)]
step_instance_values = [{x: step_instance_dicts[x]} for x in step_instance_dicts]
payload = {
"step_instance_values": step_instance_values,
"evaluate": evaluate,
}
res = base.SESSION.put("/api/sample_sheets/{}/set_sample_values".format(self.sample_sheet.uuid), json=payload)
return res.json()