# -*- coding: utf-8 -*-
#
# Session management.
#
# ------------------------------------------------
# imports
# -------
import datetime
import difflib
from functools import wraps
import inspect
import json
import logging
import os
import re
import sys
from uuid import UUID
from xml.etree import ElementTree
import esp
from ..utils import normalize_resource_link_type, restrict_to_type_mapper
try:
from urllib.parse import quote_plus, urlencode
except ImportError:
from urllib import quote_plus, urlencode
import dateparser
from gems import composite, cached
from jinja2 import Environment, FileSystemLoader, select_autoescape
from jinja2.utils import markupsafe
from markupsafe import Markup
from jinja2.exceptions import TemplateNotFound
import six
import yaml
import pkg_resources
from .. import base
from .. import utils
from .exception import MissingVersionError
# config
# ------
ALIASES = {
"columns": "variables",
"default": "default_val",
"deps": "dependencies",
"description": "desc",
"dropdown_expression": "dropdown_expr",
"visible": "in_sample_sheet",
"files": "task_files",
"protocol": "protocol_type",
"relation": "parent_child_relation",
"rule": "var_type",
"custom_field_type": "var_type",
"rules": "var_type",
"sample": "sample_type",
"sequence": "lab7_id_sequence",
"value": "default_val",
"default_value": "default_val",
"groups": "sample_group_indices",
"icon": "icon_svg",
"custom_fields": "variables",
# In 3.1.1, it was report_key, report_display,
# but in 3.2, we've changed over to reportable_key
# and reportable_display for clarity that the key
# is for the field, not for a specific report.
"report_key": "reportable_key",
"report_display": "reportable_display",
}
CACHE = {}
NAME_CACHE = {}
UUID_REMAP_CACHE = {}
TEMPLATE_PATH = pkg_resources.resource_filename("esp", "templates")
TEMPLATES = Environment(loader=FileSystemLoader(TEMPLATE_PATH), autoescape=select_autoescape(["html", "xml"]))
DYNAMIC_EXECUTION_PLAN = "Dynamic Execution Plan"
WORKFLOW_CHAIN_VERSION = "Workflow Chain Version"
SYSTEM_SETTING = "System Setting"
dep_types_mapper = {"samplePlan": "Sample Plan", "specificationPlan": "Specification Plan"}
# decorators
# ----------
def list_dispatch(func):
@wraps(func)
def _(cls, data, **kwargs):
if isinstance(data, (list, tuple)):
return [func(cls, d, **kwargs) for d in data]
else:
return func(cls, data, **kwargs)
return _
# utils
# -----
def raw(x):
if isinstance(x, composite):
return x.json()
return x
def is_uuid(name):
try:
UUID(name)
return True
except Exception:
return False
def load_config(config, aliases=None, class_=None):
if aliases is None:
aliases = ALIASES
# load yaml if config is string
if isinstance(config, six.string_types) and os.path.exists(config):
config = os.path.abspath(config)
with open(config, "r") as fi:
data = fi.read()
data = data.replace("${CWD}", os.path.dirname(config))
data = yaml.load(data, Loader=yaml.SafeLoader)
else:
data = config
# format result and return
if class_ is None:
path = ""
elif issubclass(class_, BaseModel):
path = class_.__name__
return format_entry(data, aliases, path=path, cls=class_)
def _format_excluded(cls, path, key, data):
if key in ["links", "entities"]:
return True
if hasattr(cls, "__pristine__") and (key in cls.__pristine__ or path in cls.__pristine__):
return True
return False
def format_entry(data, aliases=None, path=None, cls=None):
if aliases is None:
aliases = ALIASES
if path is None:
path = "unknown"
if cls is None:
cls = BaseModel
# re-format and re-label dictionary
if isinstance(data, dict):
# rearrange dict to include name field
# possibilities:
# name is in data, but fixed_id isn't. Then outer key is fixed_id.
# name and fixed_id aren't in data. Then outer key == name == fixed_id
# fixed_id is in data but name isn't. Then outer key == name.
if len(data.keys()) == 1 and ("name" not in data or "fixed_id" not in data):
ident = list(data.keys())[0]
if (
isinstance(data[ident], dict)
and ident
not in ["steps", "variables", "meta", "sequence", "before", "after", "verify", "pipeline_status"]
and not _format_excluded(cls, path, ident, data)
):
data = data[ident]
if "name" not in data.keys():
data["name"] = ident
# be more conservative about supplying fixed_id - only do it for models that explicitly
# support it as a mutable parameter. And also only if it's different from name.
name = data["name"]
if (
hasattr(cls, "__mutable__")
and "fixed_id" in cls.__mutable__
and "fixed_id" not in data.keys()
and ident != name
):
data["fixed_id"] = ident
# re-label keys for backend
for key in list(data.keys()):
if key in aliases:
data[aliases[key]] = data.pop(key)
# dropdown expressions
if "dropdown" in data and isinstance(data["dropdown"], str):
data["dropdown_expr"] = data.pop("dropdown")
# go deeper down rabbit hole
for key in list(data.keys()):
if not _format_excluded(cls, path, key, data):
data[key] = format_entry(data[key], aliases, path + "." + str(key), cls)
# iterate through list and recursively call
elif isinstance(data, list):
data = [
x if _format_excluded(cls, path, str(x), data) else format_entry(x, aliases, path + "." + str(i), cls)
for i, x in enumerate(data)
]
return data
def create_and_return(model, data, overwrite=False, allow_snapshot_uuid_remap=False):
"""
Create set of models and return objects. This is used throughout
several model creation processes, especially where models can
create related sub-models (i.e. Workflow > Protocol)
"""
single = not isinstance(data, (list, tuple))
if single:
data = [data]
ret = []
for item in data:
if isinstance(item, six.string_types):
obj = model(item)
if not obj.exists():
raise AssertionError("Cannot create object with non-existent {} {}".format(model.__name__, item))
ret.append(obj)
elif not isinstance(item, BaseModel):
ret.append(model.create(item, overwrite=overwrite, allow_snapshot_uuid_remap=allow_snapshot_uuid_remap))
else:
ret.append(item)
return ret[0] if single else ret
def create_and_return_uuid(model, data, overwrite=False, allow_snapshot_uuid_remap=False):
"""
Create set of models and return UUIDs. This is used throughout
several model creation processes, especially where models can
create related sub-models (i.e. Workflow > Protocol)
"""
single = not isinstance(data, (list, tuple))
if single:
data = [data]
ret = []
for item in data:
if is_uuid(item):
ret.append(item)
else:
res = create_and_return(
model, item, overwrite=overwrite, allow_snapshot_uuid_remap=allow_snapshot_uuid_remap
)
if isinstance(res, (list, tuple)):
ret.extend([x.uuid for x in res])
else:
ret.append(res.uuid)
single = len(ret) == 1
return ret[0] if single else ret
def cached_uuid(model, ident):
for uuid in CACHE:
if model.__api_cls__ == CACHE[uuid].get("cls") and CACHE[uuid].get("name") == ident:
return uuid
raise ValueError("Cannot find {} '{}'.".format(model.__name__, ident))
def yaml_str(item):
# return the best representation of the item such that:
# 1. The item is represented by a !!str node in both python 2 and python 3
# 2. Avoid encoding errors/surprises.
import six
if six.PY2:
try:
return str(item)
except:
return item
return item
def yaml_str_list(items):
# return a list of strings.
return [yaml_str(x) for x in items]
def espfile_or_value(value, ashref=False, resolve_existing=False):
"""
Expand vars in value. If value is a path to a locale file,
create an ESP File object from the file and return the File object.
Args:
value (string): A string that might point to a local file
ashref (bool): If true, only a url suitable for an image src or file href
is returned. Otherwise, the file object itself is returned. If value is
not a file, then value is still returned.
resolve_existing(bool): If true and the value does not resolve to an
existing OS path, attempt to try the value as a file name or uuid
and return it if it exists (still honoring ashref)
"""
from .file import File
value = os.path.expandvars(value)
if os.path.exists(value):
value = File.create({"name": os.path.basename(value), "uri": value, "upload": True})
if ashref:
return value.linkable_uri
elif resolve_existing:
existing = File(value)
if existing.exists():
if ashref:
return existing.linkable_uri
return existing
return value
def compile_report(html, compiled=False):
"""
Compile report using fragments available in the client
templates directory. This is similar to composing html
using components in modern front-end frameworks.
This function also supports reading external paths into
reports.
Examples:
Creating a report with a variable form panel and
a sample type overview:
.. code-block:: yaml
Extract:
view_template: |+
<panel header="Set Variables">
<variable-form></variable-form>
</panel>
<panel header="Linked Data">
<dependency-table type="Library"/>
</panel>
"""
global TEMPLATES
# handle path data
path = os.path.expandvars(html)
if os.path.exists(path):
with open(path, "r") as fi:
html = fi.read()
# return non-compiled html
if not compiled:
return html
# try to load xml content if it is xml
try:
tree = ElementTree.fromstring("<div>" + html + "</div>")
except Exception as exe:
logging.warning("Error in rendering html - " + str(exe))
return html
# parse and render elements
def _(tree):
result = ""
for child in tree:
# recurse if the element has children to
# retrieve inner html
text = child.text if child.text is not None else ""
if child:
body = text + _(child)
else:
body = text
# render the template
try:
template = TEMPLATES.get_template(child.tag + ".html")
data = child.attrib.copy()
data["body"] = Markup(body)
result += "\n" + template.render(**data)
# otherwise use xml as template
except TemplateNotFound as exe:
attrib = ['{}="{}"'.format(key, value) for key, value in child.attrib.items()]
attrib = " ".join(attrib)
result += "<{} {}>\n{}\n</{}>".format(child.tag, attrib, body, child.tag)
if child.tail is not None:
result += child.tail
return result
result = _(tree)
return result
def _export_approval(data):
from .admin import Workgroup
ret = {}
wg = data.get("meta", {}).get("workgroup")
if not wg:
return ret
wg = Workgroup(wg)
ret["workgroup"] = wg.name
labels = data["meta"].get("approval_labels", {})
if "button" in labels and labels["button"] != "Approve":
ret["button"] = labels["button"]
if "action_text" in labels and labels["action_text"] != "Approved":
ret["action_text"] = labels["action_text"]
return ret
def _import_approval(data):
data.setdefault("meta", {})
if "workgroup" in data:
data["meta"]["workgroup"] = data.pop("workgroup")
if "workgroup" in data["meta"]:
from esp.models import Workgroup
wg = Workgroup(data["meta"]["workgroup"])
data["meta"]["workgroup"] = wg.uuid
else:
data["meta"]["workgroup"] = None
default_labels = {"button": "Approve", "action_text": "Approved"}
button = data.pop("button", data["meta"].get("approval_labels", default_labels).get("button", "Approve"))
action_text = data.pop(
"action_text", data["meta"].get("approval_labels", default_labels).get("action_text", "Approved")
)
data["meta"]["approval_labels"] = {"button": button, "action_text": action_text}
def _export_signatureflow(data):
ret = {}
signature_def_uuid = data["meta"].get("signature_flow_uuid", None)
if signature_def_uuid:
from .signatureflow import SignatureFlow
signature_flow = SignatureFlow(signature_def_uuid)
ret["signature_flow"] = yaml_str(signature_flow.name)
ret["signature_flow_uuid"] = yaml_str(signature_flow.uuid)
autocomplete_protocol_when = data["meta"].get("signature_flow_complete_row_when", None)
if autocomplete_protocol_when:
ret["autocomplete_protocol_when"] = autocomplete_protocol_when
return ret
def _import_signatureflow(data):
data.setdefault("meta", {})
signatureflow_name = data.pop("signature_flow", "")
# Should we be doing something with signature_flow_uuid?
data.pop("signature_flow_uuid", "")
if signatureflow_name:
from esp.models import SignatureFlow
sf = SignatureFlow(signatureflow_name)
data["meta"]["signature_flow_uuid"] = sf.uuid
else:
data["meta"]["signature_flow_uuid"] = None
autocomplete_protocol_when = data.pop("autocomplete_protocol_when", None)
if autocomplete_protocol_when:
data["meta"]["signature_flow_complete_row_when"] = autocomplete_protocol_when
def _export_sample_point(data):
from esp.models import EntityType
ret = {}
sample_types = data["meta"].get("sample_types", None)
for sample_type in sample_types:
sample_type_obj = EntityType(sample_type.pop("uuid"))
sample_type["name"] = sample_type_obj.name
if sample_type_obj.fixed_id:
sample_type["fixed_id"] = sample_type_obj.fixed_id
ret["sample_types"] = sample_types
ret["label"] = data.get("meta", {}).get("button_label", "Add")
return ret
def _import_sample_point(data):
from esp.models import SampleType
data.setdefault("meta", {})
sample_types = data.pop("sample_types", [])
for sample_type in sample_types:
name_or_fixed_id = sample_type.get("fixed_id", sample_type.get("name"))
# remove not needed data
sample_type.pop("fixed_id", "")
sample_type.pop("name", "")
sample_type_obj = SampleType(name_or_fixed_id)
sample_type["uuid"] = sample_type_obj.uuid
label = data.pop("button_label", data.pop("label", "Add"))
data["meta"]["sample_types"] = sample_types
data["meta"]["button_label"] = label
def _export_pipelinebutton(data):
ret = {}
# TODO: Simplify this code block during 3.2 development.
# There was a period of time during 3.1 when the way pipeline button fields
# stored their metadata was different between protocols and workflow chains,
# so this code exists to smooth over the difference and can be simplified
# to the unified format when we start 3.2 development.
labels = data["meta"].get(
"button_labels",
{
"button": data["meta"].pop("button_label", "Start"),
"action_text": data["meta"].pop("button_action_label", "Done"),
"failed_text": data["meta"].pop("button_failed_action_label", "Failed"),
},
)
pipeline_uuid = data["meta"].get("pipeline", None)
pipeline_variables = data["meta"].get("pipelineVariables", data["meta"].get("pipeline_variables"))
ret["pipeline_variables"] = pipeline_variables
if pipeline_uuid:
from .analysis import Pipeline
ret["pipeline"] = yaml_str(Pipeline(pipeline_uuid).name)
if "button" in labels:
ret["button"] = labels["button"]
if "action_text" in labels:
ret["action_text"] = labels["action_text"]
if "failed_text" in labels:
ret["failed_text"] = labels["failed_text"]
return ret
def _import_pipelinebutton(data):
data.setdefault("meta", {})
if "pipeline" in data:
data["meta"]["pipeline"] = data.pop("pipeline")
if "pipeline_variables" in data:
data["meta"]["pipeline_variables"] = data.pop("pipeline_variables")
if "pipeline" in data["meta"]:
from esp.models import Pipeline
pi = Pipeline(data["meta"]["pipeline"])
data["meta"]["pipeline"] = pi.uuid
else:
data["meta"]["pipeline"] = None
default_labels = {"button": "Start", "action_text": "Done", "failed_text": "Failed"}
labels = data["meta"].get("button_labels", default_labels)
button = data.pop("button", labels.get("button", "Start"))
action_text = data.pop("action_text", labels.get("action_text", "Done"))
failed_text = data.pop("failed_text", labels.get("failed_text", "Failed"))
data["meta"]["button_labels"] = {"button": button, "action_text": action_text, "failed_text": failed_text}
def _export_attachment(data):
ret = {}
meta = data.get("meta", {})
extensions = meta.get("allowed_extensions", None)
allow_multiple = meta.get("allow_multiple_files", False)
if extensions:
ret["allowed_extensions"] = extensions
if allow_multiple:
ret["multiple"] = True
return ret
def _import_attachment(data):
meta = data.setdefault("meta", {})
meta["allowed_extensions"] = data.pop("allowed_extensions", [])
meta["allow_multiple_files"] = data.pop("multiple", False)
def _export_barcode(data):
ret = {}
ret["barcode_type"] = data.get("meta", {}).get("barcode_type", "QR")
resource_barcode = data.get("meta", {}).get("resource_barcode", False)
if resource_barcode:
ret["resource_barcode"] = True
return ret
def _import_barcode(data):
data.setdefault("meta", {})
if "barcode_type" in data:
data["meta"]["barcode_type"] = data.pop("barcode_type")
elif "barcode_type" not in data["meta"]:
raise ValueError(
(
"Column `{}` is a barcode column but is missing the required `barcode_type` field (definition: {}`"
).format(data.get("name"), data)
)
# TODO: Move to a constants definition.
# list of valid types from lab7/resource/__init__.py::BarcodeTypes.
valid_barcode_types = ["QR", "1D", "mini data matrix", "UUID", "text"]
if data["meta"]["barcode_type"] not in valid_barcode_types:
raise ValueError(
"Invalid barcode type `{}`. Valid types: {}".format(data["meta"]["barcode_type"], valid_barcode_types)
)
if data.get("resource_barcode"):
data["meta"]["resource_barcode"] = data.pop("resource_barcode")
def _import_checkbox(data):
if str(data.get("name")).lower() == "complete":
logging.info('Re-classifying type of column "Complete" from "checkbox" to "complete"')
data["var_type"] = "complete"
if data.get("default_val") is None:
data["default_val"] = "false"
def _import_complete(data):
if data.get("default_val") is None:
data["default_val"] = "false"
def _export_date(data):
ret = {}
if "format" in data["meta"] and data["meta"]["format"]:
ret["format"] = data["meta"]["format"]
return ret
# Use the same exporter for time and datetime types
_export_time = _export_date
_export_datetime = _export_date
def _date_or_time_importer(default_format):
def importer(data):
if "format" in data:
format_ = data.pop("format")
user_supplied_format = True
else:
format_ = default_format # default
user_supplied_format = False
meta = data.setdefault("meta", {})
if "format" not in meta or (user_supplied_format and format_ != meta["format"]):
meta["format"] = format_
# '{{' check: don't muck with expression-based default values.
if "default_val" in data and data["default_val"] is not None and "{{" not in data["default_val"]:
val = data["default_val"]
if not isinstance(val, (datetime.datetime, datetime.date, datetime.time)):
logging.info("Ensuring date string `{}` conforms to backend expectations".format(val))
val = dateparser.parse(val)
if isinstance(val, datetime.time):
data["default_val"] = val.isoformat(timespec="auto")
elif isinstance(val, (datetime.datetime, datetime.date)):
data["default_val"] = val.isoformat()
return importer
# Use _date_or_time_importer for date, time and datetime import with default format if any other was not specified
_import_date = _date_or_time_importer("YYYY/MM/DD")
_import_datetime = _date_or_time_importer("YYYY/MM/DDTHH:mm Z")
_import_time = _date_or_time_importer("HH:mm a Z")
def _export_dropdown(data):
if data["dropdown_expr"]:
return {"dropdown": yaml_str(data["dropdown_expr"])}
return {"dropdown": yaml_str_list(data["dropdown"])}
def _export_itemqtyadj(data):
ret = {}
item_qty_aliases = {
"item_status": "include_statuses",
"minquant": "minimum_quantity",
}
itype = data["meta"].get("item_type")
if itype:
from .inventory import ItemType
if hasattr(itype, "name"):
ret["item"] = yaml_str(itype.name)
else:
ret["item"] = yaml_str(ItemType(itype).name)
for key in [
"item_display",
"include_expired",
"include_statuses",
"item_tags",
"render_code",
"default_quantity",
"minimum_quantity",
"default_item",
"item_status",
"minquant",
]:
if key not in data and key not in data["meta"]:
continue
ret[item_qty_aliases.get(key, key)] = data.get(key, data["meta"].get(key))
dropdown_expr = data.get("dropdown_expr", None)
if dropdown_expr:
ret["only_use_items_within_containers_expression"] = dropdown_expr
if ret.get("item_display") is None:
ret["item_display"] = "name"
return ret
def _import_itemqtyadj(data):
meta = data.setdefault("meta", {})
if "item_type" in data:
meta["item_type"] = data.pop("item_type")
# B/c it looks like client-based "export" uses item, not
# item_type, so be sure to handle importing exports.
elif "item" in data:
meta["item_type"] = data.pop("item")
if "item_type" not in data["meta"]:
raise ValueError("itemqtyadj column MUST define item_type!")
from .inventory import ItemType
item_type = ItemType(data["meta"]["item_type"])
if not item_type.exists():
raise ValueError("Inventory ItemType {} does not exist!".format(item_type.ident))
meta["item_type"] = item_type.uuid
item_qty_aliases = {
"item_status": "include_statuses",
"minquant": "minimum_quantity",
}
for key in [
"item_display",
"include_expired",
"include_statuses",
"item_tags",
"render_code",
"default_quantity",
"minimum_quantity",
"default_item",
"item_status",
"minquant",
]:
if key in data:
if key not in data:
continue
meta[item_qty_aliases.get(key, key)] = data.pop(key)
if "include_statuses" in meta and not isinstance(meta["include_statuses"], list):
meta["include_statuses"] = list(meta["include_statuses"])
if meta.get("item_display") is None:
meta["item_display"] = "name"
if "only_use_items_within_containers_expression" in data:
dropdown_expr = data.pop("only_use_items_within_containers_expression")
data["dropdown_expr"] = dropdown_expr
display = meta.setdefault("item_display", "name")
if display not in ["custom", "name", "serial"]:
raise ValueError("item_display must be one of custom, name, or serial")
if display == "custom" and not meta.get("render_code"):
raise ValueError("render_code must be specified if item_display is custom")
def _export_itemadj(data):
return _export_itemqtyadj(data)
def _import_itemadj(data):
_import_itemqtyadj(data)
def _export_location(data):
from .container import ContainerType
if "container_type" not in data["meta"]:
return {}
ctype = data["meta"]["container_type"]
ret = {}
if isinstance(ctype, ContainerType):
ret["container"] = yaml_str(data["meta"]["container_type"].name)
# temporary workaround for glitch in saving location type in GUI.
# TODO: Is this still necessary? I believe the GUI has been fixed...
elif ctype is None:
ret["container"] = None
else:
ret["container"] = yaml_str(ContainerType(ctype).name)
if "location_fields" in data["meta"] and data["meta"]["location_fields"]:
ret["fields"] = yaml_str_list(data["meta"]["location_fields"])
return ret
# TODO: 3.1+ refactor - there's a weird asymetry in the import vs. export.
# export takes data from the server and returns a dict that is used to update
# the exported object. Import takes data from a json/yaml construct and modifies
# it in-place to produce the desired output structure. The behavior should be symmetric
# Either modify in-place universally or return a dictionary universally.
def _import_location(data):
from .container import ContainerType
vmeta = data.setdefault("meta", {})
if "container" not in data and "container_type" not in vmeta:
raise ValueError("Location variables must define container")
if "container" in data:
vmeta["container_type"] = ContainerType(data.pop("container")).uuid
if "fields" in data:
vmeta["location_fields"] = data.pop("fields")
_export_multiselect = _export_dropdown
# Should redefine these in terms of enums
SIG_FIG_PRECISION_TYPE = "significant figures"
DECIMAL_PLACE_PRECISION_TYPE = "decimal places"
NUMERIC_PRECISION_TYPES = [None, SIG_FIG_PRECISION_TYPE, DECIMAL_PLACE_PRECISION_TYPE]
PERCENTAGE_FORMAT = "percentage"
CURRENCY_FORMAT = "currency"
NUMERIC_FORMAT_TYPES = [None, PERCENTAGE_FORMAT, CURRENCY_FORMAT]
def _api_to_client(value, ary):
if value < 0 or value >= len(ary):
value = 0
return ary[value]
def _client_to_api(value, ary, prop):
if value is None:
return 0
if str(value).lower() in ary:
return ary.index(str(value).lower())
raise ValueError(f"Unknown value {value} for property {prop}. Known values: {ary}")
def _export_numeric(data):
ret = {}
# forward compatibility.
if "units" in data and data["units"]:
ret["units"] = data["units"]
meta = data.get("meta", {})
max_value = meta.get("maximum_value")
min_value = meta.get("minimum_value")
hard_max = meta.get("hard_maximum", False)
hard_min = meta.get("hard_minimum", False)
workgroups = meta.get("notify_workgroups", [])
prec_type = _api_to_client(meta.get("number_precision_type", 0), NUMERIC_PRECISION_TYPES)
format_type = _api_to_client(meta.get("number_formatting_type", 0), NUMERIC_FORMAT_TYPES)
prec_count = meta.get("number_precision_count", 0)
format_options = meta.get("number_formatting_options", {})
spec_type = meta.get("field_spec_type", None)
unit_of_measure = meta.get("unit_of_measure", None)
if max_value is not None:
ret["max"] = f'{max_value}{" hard" if hard_max else ""}'
if min_value is not None:
ret["min"] = f'{min_value}{" hard" if hard_min else ""}'
if workgroups:
ret["notify"] = [x["name"] for x in workgroups]
if prec_type:
if prec_type == SIG_FIG_PRECISION_TYPE:
ret["sigfigs"] = prec_count
else:
ret["decimals"] = prec_count
if format_type:
ret["format"] = format_type
if format_type == CURRENCY_FORMAT:
ret.update(format_options)
if spec_type:
ret["spec_type"] = spec_type
if unit_of_measure:
ret["unit_of_measure"] = unit_of_measure
return ret
def _parse_limit(limit):
parts = limit.split(" ")
if len(parts) == 1:
return parts[0], "soft"
if len(parts) == 2 and parts[1] in ("soft", "hard"):
return parts
raise ValueError(
f"Unable to parse limit: {limit}: expected a string formatted "
'":limit" or ":limit :type" where type is one of "soft" or "hard"'
)
def _import_numeric(data):
meta = data.get("meta", {})
if "sigfigs" in data:
prec_count = data.pop("sigfigs")
prec_type = _client_to_api(SIG_FIG_PRECISION_TYPE, NUMERIC_PRECISION_TYPES, "precision")
elif "decimals" in data:
prec_count = data.pop("decimals")
prec_type = _client_to_api(DECIMAL_PLACE_PRECISION_TYPE, NUMERIC_PRECISION_TYPES, "precision")
else:
prec_count = None
prec_type = 0
meta["number_precision_type"] = prec_type
if prec_type:
meta["number_precision_count"] = prec_count
if "min" in data:
val, type_ = _parse_limit(data.pop("min"))
meta["minimum_value"] = val
meta["hard_minimum"] = type_ == "hard"
else:
meta["hard_minimum"] = False
if "max" in data:
val, type_ = _parse_limit(data.pop("max"))
meta["maximum_value"] = val
meta["hard_maximum"] = type_ == "hard"
else:
meta["hard_maximum"] = False
meta["number_formatting_type"] = _client_to_api(data.pop("format", None), NUMERIC_FORMAT_TYPES, "format")
if "currencyCode" in data:
meta["number_formatting_options"] = {"currencyCode": data.pop("currencyCode")}
else:
meta["number_formatting_options"] = {}
if "spec_type" in data:
meta["field_spec_type"] = data.pop("spec_type")
if "unit_of_measure" in data:
meta["unit_of_measure"] = data.pop("unit_of_measure")
notify = data.pop("notify", [])
if notify:
from esp.models import Workgroup
notify = [{"name": x} for x in notify]
meta["notify_workgroups"] = notify
data["meta"] = meta
# rewrite string -> text.
def _import_string(data):
data["var_type"] = "text"
return data
def _export_resource_link(data):
link_type = data.get("resource_link_type", "Sample")
normalized_link_type = normalize_resource_link_type(link_type)
if link_type not in [DYNAMIC_EXECUTION_PLAN, WORKFLOW_CHAIN_VERSION] and not hasattr(
esp.models, normalized_link_type
):
# link_type can be a pre-supplied/Out-of-box Entity Class.
# Customers and implementations may create other entity classes like MESProduct
normalized_link_type = "Sample"
restrict_to = []
restrict_to_wfc = []
meta = data.get("meta", {})
# resource link container meta properties
valid_container_statuses = meta.get("status", [])
after_use_container_status = meta.get("afterUseContainerStatus", None)
meta_restrict_to = meta.get("restrictTo", [])
meta_restrict_to_wfc = meta.get("restrictToWfc", [])
if link_type == DYNAMIC_EXECUTION_PLAN:
restrict_to = [dep_types_mapper.get(dep_type) for dep_type in meta_restrict_to]
restrict_to_wfc = _get_restrict_to_list_for_export(normalized_link_type, meta_restrict_to_wfc)
else:
restrict_to = _get_restrict_to_list_for_export(normalized_link_type, meta_restrict_to)
if link_type == WORKFLOW_CHAIN_VERSION:
allow_unpinned_versions = meta.get("allowUnpinnedVersions", False)
allow_unpinned_versions_from_setting = meta.get("allowUnpinnedVersionsFromSystemSetting", True)
if not allow_unpinned_versions and not allow_unpinned_versions_from_setting:
allow_unpinned_versions_button = "No"
if allow_unpinned_versions and not allow_unpinned_versions_from_setting:
allow_unpinned_versions_button = "Yes"
if not allow_unpinned_versions and allow_unpinned_versions_from_setting:
allow_unpinned_versions_button = SYSTEM_SETTING
result = {}
if link_type == "Container" and len(valid_container_statuses):
result["valid_container_statuses"] = valid_container_statuses
if after_use_container_status:
result["after_use_container_status"] = after_use_container_status
if link_type == WORKFLOW_CHAIN_VERSION:
result["allow_unpinned_versions_button"] = allow_unpinned_versions_button
if len(restrict_to):
result["restrict_to"] = restrict_to
if len(restrict_to_wfc):
result["restrict_to_wfc"] = restrict_to_wfc
if link_type and link_type != "Sample":
result["resource_link_type"] = link_type
return result
def _get_restrict_to_list_for_export(normalized_link_type, restrict_to_uuids):
restrict_to_type = restrict_to_type_mapper(normalized_link_type)
restrict_to = []
for uuid in restrict_to_uuids:
if not hasattr(esp.models, restrict_to_type):
raise ValueError("Restrict to type `{}` is not a valid esp model.".format(restrict_to_type))
obj = getattr(esp.models, restrict_to_type)(uuid)
restrict_to_obj = {"name": obj.name}
if obj.fixed_id is not None:
restrict_to_obj["fixed_id"] = obj.fixed_id
restrict_to.append(restrict_to_obj)
return restrict_to
def _import_resource_link(data):
link_type = data.get("resource_link_type", "Sample")
restrict_to = data.pop("restrict_to", [])
restrict_to_uuids = []
restrict_to_wfc_uuids = []
if link_type == DYNAMIC_EXECUTION_PLAN:
restrict_to_wfc = data.pop("restrict_to_wfc", [])
restrict_to_wfc_uuids = _get_restrict_to_list_for_import(restrict_to_wfc, link_type)
inverse_dep_types_mapper = {v: k for k, v in dep_types_mapper.items()}
restrict_to_uuids = [inverse_dep_types_mapper.get(dep_type) for dep_type in restrict_to]
else:
restrict_to_uuids = _get_restrict_to_list_for_import(restrict_to, link_type)
if link_type == WORKFLOW_CHAIN_VERSION:
allow_unpinned_versions_button = data.pop("allow_unpinned_versions", SYSTEM_SETTING)
if allow_unpinned_versions_button in ["No", False]:
allow_unpinned_versions = False
allow_unpinned_versions_from_setting = False
if allow_unpinned_versions_button in ["Yes", True]:
allow_unpinned_versions = True
allow_unpinned_versions_from_setting = False
if allow_unpinned_versions_button in [SYSTEM_SETTING]:
allow_unpinned_versions = False
allow_unpinned_versions_from_setting = True
meta = data.get("meta", {})
if link_type == WORKFLOW_CHAIN_VERSION:
meta["allowUnpinnedVersions"] = allow_unpinned_versions
meta["allowUnpinnedVersionsFromSystemSetting"] = allow_unpinned_versions_from_setting
if len(restrict_to_uuids) > 0:
meta["restrictTo"] = restrict_to_uuids
if len(restrict_to_wfc_uuids) > 0:
meta["restrictToWfc"] = restrict_to_wfc_uuids
# set resource link container meta properties
if link_type == "Container":
if "valid_container_statuses" in data:
meta["status"] = data.pop("valid_container_statuses")
if "after_use_container_status" in data:
meta["afterUseContainerStatus"] = data.pop("after_use_container_status")
data["meta"] = meta
if not data.get("resource_link_type"):
data["resource_link_type"] = "Sample"
def _get_restrict_to_list_for_import(restrict_to, link_type):
restrict_to_uuids = []
normalized_link_type = normalize_resource_link_type(link_type)
if not hasattr(esp.models, normalized_link_type):
# link_type can be a pre-supplied/Out-of-box Entity Class.
# Customers and implementations may create other entity classes like MESProduct
normalized_link_type = "Sample"
restrict_to_type = restrict_to_type_mapper(normalized_link_type)
# accepted import formats: array of objects (containing name and fixed id properties) or array of names
# ex:
# 1. [{ name: The Name, fixed_id: The Fixed Id }, { name: The Name, fixed_id: The Fixed Id } ... ]
# 2. [Name 1, Name 2, Name 3 ...]
for item in restrict_to:
if isinstance(item, str):
name_or_fixed_id = item
else:
name_or_fixed_id = item.get("fixed_id", item.get("name"))
obj = getattr(esp.models, restrict_to_type)(name_or_fixed_id)
restrict_to_uuids.append(obj.uuid)
return restrict_to_uuids
def export_variable_definitions(variables, exclude=lambda x: False):
"""exporter function that can be used for resource var export."""
# TODO: rework the logic below in a bit more clean manner.
if isinstance(variables, composite):
variables = variables.json()
else:
variables = list(variables)
ret = []
alias_inverted = {v: k for k, v in ALIASES.items()}
for data in variables:
if exclude(data):
continue
var = {"rule": yaml_str(data["var_type"])}
# universal properties.
if data["tags"]:
var["tags"] = sorted(yaml_str_list(data["tags"]))
for flag in ("required", "pipeline_param", "read_only"):
if data[flag]:
var[yaml_str(flag)] = True
if data["in_sample_sheet"] is False:
var["visible"] = False
if data["default_val"] is not None:
var["value"] = yaml_str(data["default_val"])
if data.get("meta") and "onchange" in data["meta"]:
var["onchange"] = data["meta"]["onchange"]
if data.get("meta") and "augment" in data["meta"]:
var["augment"] = data["meta"]["augment"]
augment = var.get("augment", {})
# for 3.1 series, pull these up from augment to top-level.
# For 3.2+ ESP, they will be at the top level, anyway.
for key in ["reportable", "report_key", "reportable_key", "report_display", "reportable_display"]:
if key in data:
var[ALIASES.get(key, key)] = data[key]
elif augment and key in augment:
var[ALIASES.get(key, key)] = augment.pop(key)
fixed_id = data.get("fixed_id", data.get("barcode", data["name"]))
if fixed_id not in (data["name"], data["uuid"]):
var["fixed_id"] = fixed_id
name = data.get("name")
var["name"] = name
for key in ["desc", "instructions", "var_group", "ontology_path", "shared"]:
if data.get(key):
var[alias_inverted.get(key, key)] = data[key]
# vartype-specific exports
exporter = "_export_{}".format(var["rule"])
if exporter in globals():
var.update(globals()[exporter](data))
key = data.get("fixed_id", name)
ret.append({yaml_str(key): var})
return ret
def push_variable_definitions(data):
"""
Format variables data structure for parsing import and
pushing updates. This is particularly useful for updating complex
variables like containers or other inventory items.
Args:
data: list[dict[str,object]] - List of variable dicts to push.
"""
res = []
for var in data:
var_type = var.setdefault("var_type", "text")
importer = "_import_{}".format(var_type)
# onchange
if "onchange" in var:
var.setdefault("meta", {})["onchange"] = var.pop("onchange")
# 3.1.1 and rest of 3.1.x series: reportable information is in
# augment.
# 3.2 and later, this will stay as top-level fields in the var,
# and standard alias handling will re-map from report_key,
# report_display -> reportable_key, reportable_display.
# just need to handle the case where they were written under augment.
if "augment" in var:
for key in ["reportable", "report_key", "report_display", "reportable_key", "reportable_display"]:
if key in var["augment"]:
var[ALIASES.get(key, key)] = var["augment"].pop(key)
if importer in globals():
globals()[importer](var)
res.append(var)
return res
def export_fixed_id(obj):
"""
Exports fixed_id for an object.
If the fixed_id is the same as the object name, this will export None for the value
"""
fixed_id = getattr(obj, "fixed_id", None)
if fixed_id and fixed_id != obj.name:
return fixed_id
return None
# objects
# -------
[docs]class BaseModel(object):
"""
Abstract class for managing initialization methods for
common objects.
Args:
ident (str): Name or uuid to use for initializing object.
data (dict): Data to use for instantiating object. This only needs
to be specified if you already have all metadata related to an
object.
**kwargs (dict): Additional data to include in model query. For example,
including sample_type='Generic sample' will subset the query by sample type.
"""
__api__ = None # the backend api. Ie in /api/samples/:uuid, "samples" is the api.
__api_cls__ = None # the (string) "class" returned by the backend for the cls property.
__version_api__ = None # The endpoint that returns "version" (definition) objects for versioned object types.
__allow_update__ = True # If false, a "create" operation cannot overwrite an existing object.
__extend_aliases__ = True
__droppable__ = True # If false, the model cannot be archived
__aliases__ = {} # class-specific aliases for mapping between config <--> backend.
__type_aliases__ = [] # For evolving the codebase over time
__defaults__ = { # class specific value defaults.
"name": None,
"uuid": None,
"desc": "",
}
__mutable__ = [ # properties that can be altered via PUT.
"name",
"desc",
"tags",
"meta",
"augment",
"barcode",
]
# by default, don't export barcode. Subclasses can override as-needed.
__base_exportable__ = [x for x in __mutable__ if x != "barcode"] # which model properties are exported.
__create_params__ = [] # parameters passed from Model.create() -> Model.__init__.
__push_format__ = {} # property -> callable map for mapping from client format to backend PUT format.
__pristine__ = [] # class properties whose values will not be munged by standard config handling rules.
__versioned_exportable__ = []
# Class constant for avoiding recursion errors when setting attributes.
__prime_attributes__ = ["ident", "_data", "_params", "_version", "_snapshot"]
def __init__(self, ident, data=None, version=None, snapshot=None, **kwargs):
if not isinstance(ident, six.string_types):
raise AssertionError("Error: ident argument to BaseModel must be string type!")
self.ident = ident
self._data = composite(data) if data is not None else None
self._version = version
self._snapshot = snapshot
self._params = kwargs
return
def __eq__(self, other):
try:
return self.uuid == other.uuid
except:
return False
def __repr__(self):
if "name" in self.data:
return "<{}(name={})>".format(self.__class__.__name__, self.data.name)
elif "desc" in self.data:
return "<{}(desc={})>".format(self.__class__.__name__, self.data.desc)
else:
return "<{}({})>".format(self.__class__.__name__, self.ident)
def __getattr__(self, name):
if name in self.__class__.__dict__ or name in self.__dict__:
return self.__getattribute__(name)
elif name in self.data:
return self.data[name]
elif name in dir(self):
# This check is needed to handle any @cached methods that haven't
# been replaced with fields yet.
return self.__getattribute__(name)
elif len(self.data) == 0:
raise AssertionError("Error: No data available for object {}.".format(self))
else:
raise AttributeError("{} object has no attribute {}".format(self.__class__.__name__, name))
def __setattr__(self, name, value):
if name in self.__prime_attributes__:
self.__dict__[name] = value
elif name in self.__class__.__dict__:
if isinstance(self.__class__.__dict__[name], property):
self.__class__.__dict__[name].fset(self, value)
else:
self.__dict__[name] = value
elif hasattr(self.__class__, name):
prop = getattr(self.__class__, name)
if isinstance(prop, property):
prop.fset(self, value)
else:
self.__dict__[name] = value
elif name in self.data:
self.data[name] = value
else:
self.__dict__[name] = value
def __getitem__(self, item):
return self.data[item]
def __setitem__(self, item, value):
self.data[item] = value
return
[docs] @classmethod
def all(cls, **kwargs):
"""
Return all instances related to a given model.
Examples:
>>> samples = Sample.all()
>>> print(samples[0].name)
'ESP0000001'
"""
result = base.SESSION.get("/api/{}".format(cls.__api__))
return [cls.from_data(data) for data in result.json()]
[docs] @classmethod
def from_data(cls, data):
"""
Instantiate object from data.
Examples:
>>> import esp.base as base
>>> from esp.models import Sample
>>> response = base.SESSION.get('/api/sample?name=ESP000001')
>>> samp = Sample.from_data(response.json())
>>> print(samples[0].name)
'ESP0000001'
"""
return cls(ident=data["uuid"], data=data)
[docs] @classmethod
def from_definition(cls, uuid):
"""
Instantiate object definition from uuid.
Examples:
>>> import esp.base as base
>>> from esp.models import SampleType
>>> st = SampleType.from_definition('8552afc3-882b-41e3-b0cf-1a4cf3f28d33')
>>> print(st[0].name)
'Illumina Sample'
"""
if cls.__version_api__ is None:
raise AssertionError(
"Error: __version_api__ must be specified for subclasses for from_definition() to resolve."
)
# check the cache first...
if uuid in CACHE:
return cls.from_data(CACHE[uuid])
result = base.SESSION.get("/api/{}/{}".format(cls.__version_api__, uuid))
CACHE[uuid] = result.json()
return cls.from_data(CACHE[uuid])
[docs] @classmethod
def items_for_uuids(cls, uuids, max_query_string_size=8000):
"""
Fetch objects by UUID. This is similar to search but uses a different fetch strategy and uses generators,
so was split out for backwards compatibility.
Args:
uuids (List[str]): List of UUIDs of objects to fetch.
max_query_string_size (int): Max number of characters allowed in the query string.
Differences from search:
1. Simpler interface. Only "uuids" is provided.
2. search does a POST to avoid long URLs; items_for_uuids does a "GET" but splits the UUID payload up
so items_for_uuids may make several requests. However, items_for_uuids calls object-type specific
endpoints that typically provide better performance than the general-purpose resource endpoint used
by search.
3. items_for_uuids is less likely to cause an OOM exception for a web worker by asking for thousands of
objects at once. In scenarios where you are simply iterating through the results and do not need to
hold the entire collection of objects in memory at once, items_for_uuids provides better memory
performance because it returns a generator so you never have to hold the entire result data in memory
at once.
"""
uuids = list(uuids)
# we can compute the uuid size based on teh max_query_string_size since UUIDs are fixed length...
STATIC_LEN = 9 # accounts for ?uuids=[]
# divide by 38 b/c 36 characters for uuid4 uuid + two quotes per uuid.
# Int to ensure we skate under
batch_size = int((max_query_string_size - STATIC_LEN) / 38)
# Entities use the Sample api which was not designed for Containers and Items
# additional_query_params can be removed once Entities get a separate api
additional_query_params = ""
from esp.models import Entity
if cls == Entity:
additional_query_params = '&clses=["Sample","Container","Item"]'
while uuids:
current_uuids = uuids[:batch_size]
result = base.SESSION.get(f"/api/{cls.__api__}?uuids={json.dumps(current_uuids)}{additional_query_params}")
# ensure entities are returned in uuid order.
data_map = {x["uuid"]: x for x in result.json()}
for uuid in current_uuids:
if uuid in data_map:
obj = cls.from_data(data_map[uuid])
else:
obj = None
yield obj
uuids = uuids[batch_size:]
[docs] @classmethod
def items_for_names(cls, names, max_query_string_size=4000):
"""
Fetch objects by name. This is similar to search but uses a different fetch strategy and uses generators,
so was split out for backwards compatibility.
Args:
names (List[str]): List of names of objects to fetch.
max_query_string_size (int): Max number of characters allowed in the query string.
Differences from search:
1. Simpler interface. Only "names" is provided.
2. search does a POST to avoid long URLs; items_for_uuids does a "GET" but splits the names payload up
so items_for_names may make several requests. However, items_for_names calls object-type specific
endpoints that typically provide better performance than the general-purpose resource endpoint used
by search.
3. items_for_names is less likely to cause an OOM exception for a web worker by asking for thousands of
objects at once. In scenarios where you are simply iterating through the results and do not need to
hold the entire collection of objects in memory at once, items_for_names provides better memory
performance because it returns a generator so you never have to hold the entire result data in memory
at once.
"""
def _find_batch_size(name_list):
index = len(name_list)
while len(json.dumps(name_list[:index])) + 7 > max_query_string_size:
index //= 2
# note that this is only _half_ a true binary search, so we will consistently undershoot our target
# index by as much as 2x. But most use-cases are small batch sizes...
return index
names = list(names)
while names:
# determine the next batch size
batch_size = _find_batch_size(names)
current_names = names[:batch_size]
result = base.SESSION.get(f"/api/{cls.__api__}?names={json.dumps(current_names)}")
for x in result.json():
yield cls.from_data(x)
names = names[batch_size:]
[docs] @classmethod
def search(cls, name=None, tags=None, method="or", uuids=None, names=None):
"""
Search for model by name wildcard or tags.
Args:
name (str|List[str]): Wildcard with sample name. For searching with
an exact match, simply instantiate the model with the
name (i.e. Sample('My Sample')).
tags (list): List of tags to search for.
method (str): How to filter the search by tags. Can either be
'and' (intersection) or 'or' (union).
uuids (List[str]): UUID(s) search for. For fetching a single object
by uuid, instantiate the object as ModelClass(<uuid>)
(e.g.: Sample(<uuid>))
"""
if cls.__api_cls__ is None:
raise AssertionError("Search not supported for type {}".format(cls.__name__))
if name is None and tags is None and uuids is None and names is None:
raise AssertionError("Please specify name= or tags= or uuids= or names= to search!")
data = {}
if name is not None:
data["name.like"] = name
if uuids is not None:
data["uuids"] = json.dumps(uuids)
if names is not None:
data["names"] = json.dumps(names)
if tags is not None:
if not isinstance(tags, (list, tuple)):
tags = [tags]
tags = json.dumps(tags)
# note: resources query supports both AND and OR for tags.
# tags= is OR. alltags= is AND.
if method == "and":
data["alltags"] = tags
else:
data["tags"] = tags
data["cls"] = cls.__api_cls__
res = base.SESSION.post("/api/resources/query", json=data)
obj = res.json()
# filter resources using api url and instantiate
uobj = {}
for o in obj:
if cls.__api_cls__ == o["cls"]:
# account for an or query returning multiple
# entries for the same thing
uobj[o["uuid"]] = o
# filter by unique entries
ret = []
for o in sorted(uobj.values(), key=lambda x: x["updated_at"]):
ret.append(cls.from_data(o))
return ret
@classmethod
def _resolve_existing(cls, item, allow_snapshot_uuid_remap):
"""Resolve an existing model object.
Args:
item: configuration to resolve
allow_snapshot_uuid_remap: allow for pinned UUID remaps to occur
"""
uuid = item.get("uuid")
def_uuid = item.get("def_uuid")
name = item.get("name")
fixed_id = item.get("fixed_id")
def _vet_potential_remap(obj):
# no uuid supplied... so we don't care anyway.
if not uuid:
return
UUID_REMAP_CACHE[uuid] = obj.uuid
if uuid == obj.uuid or allow_snapshot_uuid_remap:
return
raise ValueError(
"Importing a pinned WFC where a uuid remap is required, but not allowed. "
"Add the `--allow-snapshot-uuid-remap` to enable UUID remapping of pinned objects. "
f"(Existing object uuid: {obj.uuid}; expected uuid: {uuid})"
)
kwargs = {}
# adding additional query params (i.e. sample type for sample)
for param in cls.__create_params__:
if param in item:
kwargs[param] = item[param]
kwargs["version"] = def_uuid
lookups = [("uuid", uuid), ("fixed_id", fixed_id), ("name", name)]
lookups = [x for x in lookups if x[1]]
for type_, ident in lookups:
# note: this try/except encompasses three conditions:
# 1. version is not specified at all (covered in main try block)
# 2. version _is_ specified and it exists (covered in main try block)
# 3. version _is_ specified but does not exist (covered in except block).
try:
# note: it's okay to try to resolve an object with a version of None.
kwargs["ident"] = ident
obj = cls(**kwargs)
if obj.exists():
if obj.data and hasattr(obj.data, "deleted") and obj.data.deleted:
try:
url = "/api/{}/{}/undelete".format(cls.__api__, obj.data.uuid)
base.SESSION.put(url)
except:
raise ValueError(
"Undelete failed for {} identified by {}: {} . Object exists, but was archived".format(
cls.__name__, type_, ident
)
)
# and for the purpose of vetting downstream behaviors... we don't
# care if we resolved the object with a version specified or not here...
_vet_potential_remap(obj)
logging.info(
"{} with {} {}{} already exists - using existing object".format(
cls.__name__, type_, ident, ", def_uuid {}".format(def_uuid) if def_uuid else ""
)
)
return obj, def_uuid is not None
except MissingVersionError:
# only get MissingVersionError if the object exists but version uuid doesn't.
# In that case, we can do from existing head object and do the
# PUT later
kwargs.pop("version")
obj = cls(**kwargs)
_vet_potential_remap(obj)
logging.info(
"{} with {} {} already exists - using existing object".format(
cls.__name__,
type_,
ident,
)
)
return obj, False
# if we made it here, the object is just doesn't exist by any mechanism
# we can find.
return composite({"exists": lambda: False}), False
[docs] @classmethod
def create(
cls,
config=None,
overwrite=False,
prompt=False,
object_names=None,
force_overwrite=False,
allow_snapshot_uuid_remap=False,
**kwargs,
):
"""
Create new object in ESP database using config file or other data.
This method should be overwritten by subclasses of BaseModel for
model-specific logic.
Args:
config (str, dict, list): Config file or information to use in
creating new object.
overwrite (bool): Whether or not to delete the current entry in
the ESP database.
prompt (bool): If current entry for config exists, attempt
to detect conflicts and present interactive prompt for
resolving conflicts.
object_names (list[str]): When config refers to a list of objects
to load, only load the items named by object_names.
Default is to load all objects in config.
force_overwrite (bool): Will force an overwrite, even when no change
is detected between the existing DB configuration and the configuration loaded from file.
"""
if config is None:
config = {}
if isinstance(config, dict):
config.update(kwargs)
if not config:
raise AssertionError(
"Incorrect arguments specified to create()! Please see documentation for more information."
)
# load config
aliases = ALIASES.copy() if cls.__extend_aliases__ else {}
aliases.update(cls.__aliases__)
data = load_config(config, aliases, cls)
# manage single/multiple defs
if not isinstance(data, (list, tuple)):
data = [data]
# traverse data and create/update
created = []
for item in data:
# only load items listed explicitly by name
# if the object_names argument is provided.
if object_names is not None and "name" in item and item["name"] not in object_names:
continue
# totally bypass model creation logic if we're doing a hub import
# set defaults
if isinstance(item, dict):
if item.get("_format", None) == "hub":
created.append(cls._import_hub(item))
continue
for key in cls.__defaults__:
if key not in item:
default = cls.__defaults__[key]
if isinstance(default, (list, dict)):
item[key] = type(default)(default)
else:
item[key] = default
item = item.copy()
item = cls.parse_import(item, overwrite=overwrite, allow_snapshot_uuid_remap=allow_snapshot_uuid_remap)
# break on no create or model-specific override
if item is None:
continue
elif isinstance(item, BaseModel):
created.append(item)
continue
elif isinstance(item, (list, tuple)) and all(isinstance(x, BaseModel) for x in item):
created.extend(item)
continue
# Handle object format
item = dict(item)
# load object (or phony object) for determining if it exists
# check for versioned imports...
obj, exists_for_version = cls._resolve_existing(item, allow_snapshot_uuid_remap)
# generate json hash of item for update checking
hash_ = json.dumps({k: v for k, v in item.items() if k != "uuid"}, sort_keys=True)
hash_time = datetime.datetime.now(datetime.timezone.utc)
hash_time = hash_time.isoformat()
hash_time = hash_time.replace("+00:00", "")
item.setdefault("augment", {})
item["augment"].update(
dict(
imported_content=hash_,
imported_at=hash_time,
)
)
if obj.exists():
# however we found the object, we now have it, so we'll be
# using the object's UUID to run a PUT, so we don't need
# a UUID in item if there ever was one.
item.pop("uuid", None)
if exists_for_version:
# then we want to immediately abort processing.
created.append(obj)
continue
# object doesn't exist - create
if not obj.exists():
ident = item.get("name") or item.get("fixed_id") or item.get("uuid") or "<UNKNOWN>"
logging.info("Creating {}: {}".format(cls.__name__, ident))
if "snapshot" in item:
item["snapshot"] = cls._remap_snapshot_uuids(item["snapshot"])
created.append(cls._create_model(item, overwrite=overwrite, prompt=prompt))
# object exists - don't update
elif obj.exists() and not overwrite:
logging.info("Object {} already exists; will not overwrite.".format(repr(obj)))
cls.parse_response(obj.data.json(), overwrite=overwrite, prompt=prompt)
created.append(obj)
# check if update is allowed for model
elif not cls.__allow_update__:
raise AssertionError(
"Update not allowed for model {}! "
"Either no backend support or model assumptions "
"would be violated!".format(cls.__name__)
)
# object exists and overwrite specified
else:
change = "Config"
different = False
# pre-prompt or updated through ui
if "imported_at" not in obj.augment or "imported_content" not in obj.augment:
different = True
change = "Remote"
# otherwise, detect changes with previous import hash
else:
# keeping this block here since it's a more robust solution than the else block.
# We can remove the else block when we complete the python 3.9 migration.
if hasattr(datetime.datetime, "fromisoformat"):
last_update = datetime.datetime.fromisoformat(obj.updated_at.replace("Z", ""))
imported_at = datetime.datetime.fromisoformat(obj.augment.imported_at)
else:
last_update = datetime.datetime.strptime(obj.updated_at, "%Y-%m-%dT%H:%M:%S.%fZ")
imported_at = datetime.datetime.strptime(obj.augment.imported_at, "%Y-%m-%dT%H:%M:%S.%f")
previous_hash = obj.augment.get("imported_content", "{}")
import_delta = int(abs((last_update - imported_at).total_seconds()))
# if the last update was 30s after the last import or the content is different
# TODO: Why are we separating by 60 seconds (or 30s based on the comment above??)?
# It seems completely arbitrary
change = "Remote" if import_delta > 60 else "Config"
different = import_delta > 60 or previous_hash != hash_
# content not different -> don't overwrite
if not different:
if force_overwrite:
logging.info(
"Force-overwriting existing object {} despite no changes detected".format(repr(obj))
)
created.append(cls._overwrite_model(obj, item, overwrite=overwrite, prompt=prompt))
else:
logging.info("No change detected for object {}; will not overwrite.".format(repr(obj)))
cls.parse_response(obj.data.json(), overwrite=overwrite, prompt=prompt)
created.append(obj)
# content different but no prompt -> force overwrite
elif not prompt:
logging.info("Overwriting existing object {}".format(repr(obj)))
created.append(
cls._overwrite_model(
obj,
item,
overwrite=overwrite,
prompt=prompt,
allow_snapshot_uuid_remap=allow_snapshot_uuid_remap,
)
)
# content different and prompt -> print diff and prompt
else:
# generate diff hash for config change
if change == "Config":
prev = yaml.dump(json.loads(previous_hash)).split("\n")
# generate diff hash for remote change
elif change == "Remote":
jdata = obj.data.json()
# remove old keys for diff
jdata.setdefault("meta", {})
if "augment" in jdata["meta"]:
if "imported_content" in jdata["meta"]["augment"]:
del jdata["meta"]["augment"]["imported_content"]
del jdata["meta"]["augment"]["imported_at"]
# generate remote hash
rhash = {key: jdata.get(key) for key in item if key != "uuid"}
rhash = json.dumps(rhash, sort_keys=True)
prev = yaml.dump(json.loads(rhash)).split("\n")
# take diff of object
current = yaml.dump(json.loads(hash_)).split("\n")
diff = list(difflib.unified_diff(prev, current))[2:]
# notify user about change
sys.stderr.write(f"\n=======\n{change} change detected for object {repr(obj)}.\n=======\n")
if len(diff):
sys.stderr.write("\n--> start diff {} <--\n".format(repr(obj)))
sys.stderr.write("\n".join(diff))
sys.stderr.write("\n\n--> end diff {} <--\n\n".format(repr(obj)))
else:
sys.stderr.write(
"\nContent delta could not be determined locally for {}. "
"Please check ESP for potential content differences. "
"See below for content that will be imported after the prompt:\n".format(repr(obj))
)
sys.stderr.write("\n{}\n".format("\n".join(current)))
# prompt for input until correct
while True:
inp = input("Keep previous (-) or new (+) version? ")
# keep previous
if inp == "-":
logging.info("Keeping existing version for object {}".format(repr(obj)))
created.append(obj)
break
# update with new
elif inp == "+":
logging.info("Overwriting existing object {}".format(repr(obj)))
created.append(cls._overwrite_model(obj, item, overwrite=overwrite, prompt=prompt))
break
# input error
else:
sys.stderr.write(
"\n Error -> `{}` isn't a supported option. Please enter `-` or `+`\n\n".format(inp)
)
return created if len(created) > 1 else created[0]
@classmethod
def _remap_snapshot_uuids(cls, snapshot):
# note: snapshot metadata is only recorded in the top-level pin. So if you
# export a pinned WFC, it's in the WFC; pin and export a WF, and it's in the WF.
# Likewise for Pipelines and signature flows. Also, the client imports nested dependencies
# before outer dependencies, so if we have a snapshot, we should have _already_ imported
# all dependencies and therefore the UUID_REMAP_CACHE should have everything we need by now.
if isinstance(snapshot, list):
return [cls._remap_snapshot_uuids(x) for x in snapshot]
if not isinstance(snapshot, dict):
return snapshot
remapped = dict()
for key, value in snapshot.items():
# we only touch the uuid keys.
if key == "uuid":
if value in UUID_REMAP_CACHE:
value = UUID_REMAP_CACHE[value]
else:
import warnings
warnings.warn(
(
"Houston, we have a potential logic problem. "
"The UUID_REMAP_CACHE should have been fully populated, but `{}` was missing. "
"assuming the root cause of the miss is a nested definition where the parent "
"def_uuid already exists"
).format(value)
)
# raise AssertionError(
# 'Houston, we have a logic problem. '
# 'The UUID_REMAP_CACHE should have been fully populated, but `{}` was missing'.format(
# value
# ))
else:
value = cls._remap_snapshot_uuids(value)
remapped[key] = value
return remapped
@classmethod
def _overwrite_model(cls, obj, item, overwrite, prompt=False, allow_snapshot_uuid_remap=False):
"""Overwrite an existing model object with new data from item"""
if "snapshot" in item and allow_snapshot_uuid_remap:
item["snapshot"] = obj._remap_snapshot_uuids(item["snapshot"])
result = base.SESSION.put("/api/{}/{}".format(cls.__api__, obj.uuid), json=item)
data = result.json()
# clear cache on overwrite
global CACHE
if "uuid" in data and data["uuid"] in CACHE:
del CACHE[data["uuid"]]
key = (cls.__name__, obj.name)
if key in NAME_CACHE:
del NAME_CACHE[key]
return cls.parse_response(data, overwrite=overwrite, prompt=prompt)
@classmethod
def _create_model(cls, item, overwrite=False, prompt=False):
"""Create a new model object where none exists already given the information in item"""
result = base.SESSION.post("/api/{}".format(cls.__api__), json=item)
return cls.parse_response(result.json(), overwrite=overwrite, prompt=prompt)
[docs] @classmethod
def parse_import(cls, data, overwrite=False, prompt=False, allow_snapshot_uuid_remap=False):
"""
Method for formatting import-style data to data that can be
used in a backend POST/PUT request.
Returns:
A data structure to use in a POST/PUT request for
creating or updating the model.
"""
return data
[docs] @classmethod
def parse_response(cls, data, overwrite=False, prompt=False, allow_snapshot_uuid_remap=False):
"""
Method for parsing request returned from POST/PUT during
object creation. This method is responsible for returning an
object after create() is called.
:param allow_snapshot_uuid_remap:
"""
return cls.from_data(data)
[docs] def export(
self,
filename=None,
deep=False,
filter_empty=False,
use_dump_format=False,
use_hub_format=False,
versioned=False,
):
"""Export a yaml-formatted object.
Args:
filename (str): Filename to write yaml output to
deep (bool): Perform a deep (True) or shallow (False) export
filter_empty (bool): Attributes where the value is None or
empty string are filtered from export.
use_dump_format (bool): Whether to use the "dump" yaml format.
use_hub_format (bool): Whether to use the hub yaml format
versioned (bool): If true, attempts to export a snapshotted/pinned
export. Note: not all models support pinning.
Note:
A deep export embeds all nested objects (e.g.: protocols of a workflow).
A deep export is therefore the yaml analog to the .lab7 file.
A shallow export uses names as references to nested objects.
"""
if versioned:
deep = True
if use_hub_format:
export_dict = self._export_hub(deep, versioned)
else:
export_dict = self._export(deep, filter_empty, versioned)
# remove 'augment' metadata
export_dict.get("augment", {}).pop("imported_content", None)
export_dict.get("augment", {}).pop("imported_at", None)
export_dict.get("meta", {}).get("augment", {}).pop("imported_content", None)
export_dict.get("meta", {}).get("augment", {}).pop("imported_at", None)
if not use_hub_format:
fixed_id = export_dict.pop("fixed_id", None)
name = export_dict.get("name", None)
if fixed_id:
if name and (is_uuid(fixed_id) or fixed_id == name):
key = name
export_dict.pop("name", None)
else:
key = fixed_id
else:
key = name
export_dict.pop("name", None)
export_dict = {key: export_dict}
from esp.export import ModelDumper, str_presenter, DEFAULT_DUMP_CONF
import yaml
if use_dump_format:
yaml.add_representer(str, str_presenter)
def dump(obj, out=None):
if out:
yaml.dump(obj, out, Dumper=ModelDumper, **DEFAULT_DUMP_CONF["dump_config"])
else:
return yaml.dump(obj, Dumper=ModelDumper, **DEFAULT_DUMP_CONF["dump_config"])
else:
dump = yaml.dump
if filename is None:
return export_dict
elif filename == "-":
print(dump(export_dict))
else:
with open(filename, "w") as out:
dump(export_dict, out)
def _isempty(self, value):
if value is None:
return True
if hasattr(value, "__len__") and len(value) == 0:
return True
return False
def _export_hub(self, deep, versioned=False):
"""Export model in hub yaml format. versioned not currently supported"""
result = base.SESSION.get("/api/{}/{}/export?format=yaml".format(self.__api__, self.uuid))
export_dict = yaml.safe_load(result.content)
export_dict["_format"] = "hub"
augment = export_dict.get("augment", {})
if augment is None:
export_dict["augment"] = {}
return export_dict
@classmethod
def _import_hub(cls, data):
data = cls.parse_import_hub(data)
result = base.SESSION.post("/api/{}/import".format(cls.__api__), data=json.dumps(data, default=str))
return cls.from_data(result.json())
@classmethod
def parse_import_hub(cls, data):
if "_format" in data:
data.pop("_format")
return data
def _export(self, deep, filter_empty, versioned=False):
# try/except is needed to handle cases like Worksheet which will
# try and fail to load @cached fields when hasattr() is called.
try:
if not hasattr(self, "__exportable__"):
raise NotImplementedError()
except:
raise NotImplementedError("{} is not exportable.".format(type(self)))
export_dict = {}
formatters = {}
if hasattr(self, "__export_format__"):
formatters.update(self.__export_format__)
formatter_prop = "__{}_format__".format("deep" if deep else "shallow")
if hasattr(self, formatter_prop):
formatters.update(getattr(self, formatter_prop))
# don't use global ALIAS for this as the impacts
# would be too far-reaching. But class-specific alias
# works as a fine-grained control mechanism on output
# keys.
rev_alias = {v: k for k, v in self.__aliases__.items()}
if versioned and self.__versioned_exportable__:
props_to_export = list(self.__exportable__) + list(self.__versioned_exportable__)
else:
props_to_export = list(self.__exportable__)
for prop in props_to_export:
if prop in formatters:
aspec = inspect.getfullargspec(formatters[prop])
kwargs = {}
if "deep" in aspec.args:
kwargs["deep"] = deep
if "filter_empty" in aspec.args:
kwargs["filter_empty"] = filter_empty
if "versioned" in aspec.args:
kwargs["versioned"] = versioned
value = formatters[prop](self, **kwargs)
elif hasattr(self, prop):
value = getattr(self, prop)
if isinstance(value, composite):
value = value.json()
if isinstance(value, (list, tuple)):
value = yaml_str_list(value)
else:
value = yaml_str(value)
else:
continue
if not self._isempty(value) or not filter_empty:
export_dict[rev_alias.get(prop, prop)] = value
return export_dict
def _data_by_uuid(self):
try:
return base.SESSION.get("/api/{}/{}".format(self.__api__, self.ident)).json()
except:
return {}
def _data_by_name(self):
"""
Method for querying data by specified name.
.. note:: There are a lot of backend inconsistencies here that
require the logic below. Ideally, a query with name={}
should just resolve correctly without additional parsing.
"""
# format query
self._params.update({"name": self.ident})
query = ""
for key, val in self._params.items():
param = self._param_to_queryarg(val)
query += key + "=" + quote_plus(param) + "&"
# query
ret = base.SESSION.get("/api/{}?{}".format(self.__api__, query)).json()
# return nothing if no results
if len(ret) == 0:
return {}
# look for associated name in results
for dat in ret:
# Note: shouldn't be necessary starting in 2.4.1 b/c we reverted the name filter to be exact match
# again, but keeping in case.
if dat["name"] == self.ident:
return dat
return {}
def _data_by_fixed_id(self):
"""
Method for querying data by fixed_id
"""
# in 2.4.1, fixed_id is captured as the barcode under the hood...
query = ""
for key, val in self._params.items():
param = self._param_to_queryarg(val)
query += key + "=" + quote_plus(param) + "&"
query += "barcode={}".format(quote_plus(self.ident))
ret = base.SESSION.get("/api/{}?{}".format(self.__api__, query)).json()
if len(ret) == 0:
return {}
return ret[0]
@classmethod
def _param_to_queryarg(cls, val):
if isinstance(val, six.string_types):
return val
elif isinstance(val, BaseModel):
return val.name
else:
raise AssertionError(
"No rule for using parameter {} as query " "parameter for {}.".format(val, cls.__name__)
)
@property
def augment(self):
"""
Proxy for setting Model.data.meta.augment.
"""
if "meta" not in self.data:
self.data["meta"] = {}
if "augment" not in self.data["meta"]:
self.data["meta"]["augment"] = {}
return self.data["meta"]["augment"]
@augment.setter
def augment(self, value):
"""
Proxy for setting Model.data.meta.augment.
"""
if "meta" not in self.data:
self.data["meta"] = {}
self.data["meta"]["augment"] = value
return
def _set_data_version(self):
"""
Ensures that the data loaded is correct given the requested version.
Should only be called after _data has been initially/tentatively resolved.
"""
if not self._version or not self.__version_api__:
return
version_is_uuid = is_uuid(self._version)
try:
pointer_uuid = self.uuid
# AssertionError will be raised if the ESP object is not found,
# in which case, just return.
except AssertionError:
return
if (
# Have to use get for the def_uuid until the signature flow API
# returns a structure more like other versioned objects.
(version_is_uuid and self._data.get("def_uuid") != self._version)
or self._data.meta.get("snapshot", {}).get("name") != self._version
):
# should be able to speed this up...
for version in self.versions:
if self._version == version.uuid or self._version == version.meta.get("snapshot", {}).get("name"):
self._data = version.data
break
else:
raise MissingVersionError(type(self), self.uuid, self._version)
# def_uuid will be the version uuid... we still need the wrapper object UUID to be the correct UUID.
# This extra conditional is to handle the fact that signature flows API doesn't work
# quite like other versioned objects. Can be removed once the API functions more similarly.
if "def_uuid" not in self._data:
self._data["def_uuid"] = self._data["uuid"]
self._data["uuid"] = pointer_uuid
if self._snapshot is None and "snapshot" in self._data.get("meta", {}):
self._snapshot = self._data.meta.snapshot
@property
def data(self):
"""
Property containing all information about an object. This is
the information typically returned from a GET request using
the object endpoint and uuid.
"""
if self._data is None:
data = {}
ident_uuid = is_uuid(self.ident)
# look in cache for existing object
if base.SESSION.cache:
if ident_uuid:
if self.ident in CACHE:
self._data = composite(CACHE[self.ident])
self._set_data_version()
return self._data
else:
# careful... tread very carefully here...
# b/c we could have a _definition_ by name and class,
# or we could have the _wrapper type_ by name and class... and those
# may be different. Note: this code is in here as a forwards look
# to what could be, but we never actually put anything into the NAME_CACHE
# as yet.
uuid = NAME_CACHE.get((self.__class__.__name__, self.ident), None)
if uuid and uuid in CACHE:
self._data = composite(CACHE[uuid])
self._set_data_version()
return self._data
# do query if cache doesn't exist
if len(data) == 0:
if ident_uuid:
data = self._data_by_uuid()
elif self.ident is not None:
# prefer data by name for 2.4.1 to aid in the transition.
data = self._data_by_name()
if not data:
self._params.pop("name", None)
data = self._data_by_fixed_id()
else:
data = {}
# set empty data structure for no results
if data is None or len(data) == 0:
data = {}
# cache data if specified
if base.SESSION.cache:
if "uuid" in data:
CACHE[data["uuid"]] = data
if "name" in data:
# Commented out for now because it triggers a lot of
# test errors to use the name cache, likely because of
# version mismatches. _maybe_ not a problem for a production
# ESP, but definitely a problem for the test suite.
# the rest of the NAME_CACHE code can continue on as-is
# since it's never actually triggered if the cache isn't built
# here anyway.
# NAME_CACHE[(self.__class__.__name__, data['name'])] = data['uuid']
pass
self._data = composite(data)
self._set_data_version()
return self._data
def refresh(self):
if "uuid" in self.data and self.uuid in CACHE:
del CACHE[self.uuid]
key = (self.__class__.__name__, self.name)
if key in NAME_CACHE:
del NAME_CACHE[key]
self._data = None
self.data
# there might not be a need for _data with cached.invalidate
cached.invalidate(self, "refresh")
return
[docs] def exists(self):
"""
Test if object exists in esp database.
TODO: THINK ABOUT HOW THIS SHOULD BE USED -- fill in
after usage documentation.
"""
try:
return self.data.uuid is not None
except AttributeError:
return False
[docs] def drop(self, deep=False):
"""
Issue DELETE request to remove object in ESP database.
"""
if self.exists():
if self.__droppable__:
logging.info(
"Dropping {}: {}".format(self.__class__.__name__, self.name if "name" in self.data else self.uuid)
)
result = base.SESSION.delete("/api/{}/{}".format(self.__api__, quote_plus(self.uuid)))
return result
else:
raise AssertionError(f"Dropping a {type(self).__name__} is not allowed!")
return None
[docs] def json(self):
"""
Return dictionary object with object metadata.
"""
return self.data.json()
[docs] def summary(self):
"""
Print organized description of entity.
"""
for key in sorted(self.data.keys()):
print("{}: {}".format(key, self.data[key]))
return
[docs] def push(self, dry=False):
"""
Run put with metadata to update entry.
"""
# find and format mutable data
data = self.json()
payload = {}
for key in self.__mutable__:
if key in self.__push_format__:
if callable(self.__push_format__[key]):
payload[key] = self.__push_format__[key](self)
else:
payload[key] = self.__push_format__[key]
elif key in data:
payload[key] = data[key]
# return 'dry' push with payload only
if dry:
return payload
# push data
result = base.SESSION.put("/api/{}/{}".format(self.__api__, self.uuid), json=payload)
self.data.update(result.json())
cached.invalidate(self, "refresh")
# update cache
if base.SESSION.cache:
CACHE[self.uuid] = dict(self.data)
return
@cached.tag("refresh")
def history(self):
actions = base.SESSION.get("/api/resources/{}/actions".format(self.uuid)).json()["actions"]
return sorted([ResourceAction(x) for x in actions], key=lambda x: x.timestamp)
@cached.tag("refresh")
def versions(self):
"""
Return full history of item in ESP database
as objects of this type. The order of the list is
most recent (index 0) to least recent (index -1).
"""
if self.__version_api__ is None:
raise AssertionError(
"Versioning not supported for model. "
"Please set __version_api__ property in model "
"to enable versioning."
)
ver = []
res = base.SESSION.get("/api/{}".format(self.__version_api__))
for item in res.json():
# name can change from a version to other version
# maybe we should use just barcode
if item.get("name") == self.name or item.get("barcode") == self.barcode:
ver.append(type(self).from_data(item))
return sorted(ver, key=lambda x: x.created_timestamp, reverse=True)
@cached
def created_timestamp(self):
"""
Return datetime object with creation timestamp.
"""
return utils.parse_esp_timestamp(self.created_at)
@cached
def updated_timestamp(self):
"""
Return datetime object with last update timestamp.
"""
return utils.parse_esp_timestamp(self.updated_at)
@cached
def owner(self):
"""
Relationship to owner (creator) of object.
"""
from .admin import User
ownerstr = self.data.get("owner")
if not ownerstr:
return None
match = re.match(r"(.*) \((.*@.*)\)", ownerstr)
if not match:
raise ValueError(
"Expected owner to be formatted `user name (handle@domain)` " "but was: {}".format(ownerstr)
)
return User(match.group(1))
[docs]class LinkedModel(BaseModel):
"""
Subclass of BaseModel for decorating parent-child
relationship functionality. All items in the ESP database /can/
be linked, but there needs to be more formalism around what /should/
be linked in the database. Because of this, we should restrict
access to linking functionality to specific models until the
pattern for provenance is more flushed out on the backend.
Args:
ident (str): Name or uuid to use for initializing object.
data (dict): Data to use for instantiating object. This only needs
to be specified if you already have all metadata related to an
object.
"""
# def push(self):
# # TODO: OVERWRITE PUSH METHOD TO ALSO UPDATE DEPENDENCIES/LINKS
# # this would create for a much better api for managing
# # parent/child relationships, but it needs a route on the
# # backend (.../dependencies/set):
# # x.children.append(Sample('x'))
# # x.children = x.children[:-1]
# # x.push()
# # ... instead of ...
# # x.add_children(Sample('x'))
# # x.remove_children(x.children[-1])
# return
[docs] def update_link(self, items, link, method="add", label=None):
"""
Helper method for managing redundancy across
prior/depdendent link generation.
"""
if not isinstance(items, (list, tuple)):
items = [items]
def uuid(x):
return x if isinstance(x, str) else x["uuid"]
payload = {
"dependencies": {
link: [
# TODO: CLEAN UP DEPENDENCY DATA STRUCTURE
[uuid(x)] if label is None else [uuid(x), label]
for x in items
],
}
}
base.SESSION.post("/api/resources/{}/dependencies/{}".format(self.uuid, method), json=payload)
cached.invalidate(self, "refresh")
return
[docs] def add_dependents(self, items, label=None):
"""
Add resource dependencies for item or list of items.
"""
return self.update_link(items, link="dependents", method="add", label=label)
[docs] def remove_dependents(self, items, label=None):
"""
Remove resource dependencies for item or list of items.
"""
return self.update_link(items, link="dependents", method="remove", label=label)
[docs] def add_priors(self, items, label=None):
"""
Add resource priors for item or list of items.
"""
return self.update_link(items, link="priors", method="add", label=label)
[docs] def remove_priors(self, items, label=None):
"""
Remove resource priors for item or list of items.
"""
return self.update_link(items, link="priors", method="remove", label=label)
[docs] def add_children(self, items):
"""
Add specified items as children to object.
"""
return self.add_dependents(items, "begat")
[docs] def remove_children(self, items):
"""
Remove specified items as children to object.
"""
return self.remove_dependents(items, "begat")
[docs] def add_parents(self, items):
"""
Add specified items as parent to object.
"""
return self.add_priors(items, "begat")
[docs] def remove_parents(self, items):
"""
Remove specified items as parent to object.
"""
return self.remove_priors(items, "begat")
@cached.tag("refresh")
def dependencies(self):
"""
Return dependency information for item.
"""
res = base.SESSION.get("/api/resources/{}/dependencies".format(self.uuid))
return composite(res.json()["dependencies"])
@property
def priors(self):
"""
Proxy for accessing prior item dependencies.
TODO: Re-map the dependencies to their related
class types in the client. This currently just
returns the JSON associated with the dependency data.
"""
return self.dependencies.priors if "priors" in self.dependencies else []
@property
def dependents(self):
"""
Proxy for accessing dependent item dependencies.
TODO: Re-map the dependencies to their related
class types in the client. This currently just
returns the JSON associated with the dependency data.
"""
return self.dependencies.dependents if "dependents" in self.dependencies else []
[docs] def get_priors(self, label):
"""
Method for getting priors tagged with
specific label.
Args:
label (str): Label to filter priors with.
"""
return [x[0] for x in self.priors if x[1] == label]
[docs] def get_dependents(self, label):
"""
Method for getting dependencies tagged with
specific label.
Args:
label (str): Label to filter dependencies with.
"""
return [x[0] for x in self.dependents if x[1] == label]
@cached.tag("refresh")
def parents(self):
"""
Return sample parent objects.
.. note:: Changes to these data does not push changes to the
backend. Use self.add_parents for that functionality.
"""
return [type(self)(x["uuid"]) for x in self.get_priors("begat")]
@cached.tag("refresh")
def children(self):
"""
Return sample children.
.. note:: Changes to these data does not push changes to the
backend. Use self.add_children for that functionality.
"""
return [type(self)(x["uuid"]) for x in self.get_dependents("begat")]
[docs] def generation(self, entity_type, which="closestup"):
"""
Efficiently return generation of a particular type.
Args:
entity_type (str): The type of entity to search for in the hierarchy.
which (str): One of "closestup", "furthestup", "allup" for the nearest, furthest, or all ancestors of the
given type, respectively, or "closestdown", 'furthestdown', 'alldown' for the nearest, furthest, or all
descendants of the given type, respectively.
Returns:
List[Sample] - List of samples of the appropriate type. May be empty. Never null.
"""
valid_which = ("closestup", "furthestup", "allup", "closestdown", "furthestdown", "alldown")
if which not in valid_which:
raise ValueError("Invalid `which`. Must be one of `{}".format(valid_which))
results = utils.eval_expression(
"sample_generation('{}:{}', sample_uuid='{}')".format(which, entity_type, self.uuid)
)
return [type(self).from_data(x) for x in results]
class ResourceAction(object):
def __init__(self, data):
self.data = composite(data)
@cached
def agent(self):
from .admin import User
return User(self.data.agent)
@property
def message(self):
return self.data.desc
desc = message
@cached
def timestamp(self):
return datetime.datetime.fromisoformat(self.data.timestamp[:-1])
@property
def resource_uuid(self):
return self.data.resource