Source code for esp.models.__base__

# -*- coding: utf-8 -*-
#
# Session management.
#
# ------------------------------------------------


# imports
# -------
import datetime
import difflib
from functools import wraps
import inspect
import json
import logging
import os
import re
import sys
from uuid import UUID
from xml.etree import ElementTree

import esp
from ..utils import normalize_resource_link_type, restrict_to_type_mapper

try:
    from urllib.parse import quote_plus, urlencode
except ImportError:
    from urllib import quote_plus, urlencode

import dateparser
from gems import composite, cached
from jinja2 import Environment, FileSystemLoader, select_autoescape
from jinja2.utils import markupsafe
from markupsafe import Markup
from jinja2.exceptions import TemplateNotFound
import six
import yaml
import pkg_resources

from .. import base
from .. import utils
from .exception import MissingVersionError


# config
# ------
ALIASES = {
    "columns": "variables",
    "default": "default_val",
    "deps": "dependencies",
    "description": "desc",
    "dropdown_expression": "dropdown_expr",
    "visible": "in_sample_sheet",
    "files": "task_files",
    "protocol": "protocol_type",
    "relation": "parent_child_relation",
    "rule": "var_type",
    "custom_field_type": "var_type",
    "rules": "var_type",
    "sample": "sample_type",
    "sequence": "lab7_id_sequence",
    "value": "default_val",
    "default_value": "default_val",
    "groups": "sample_group_indices",
    "icon": "icon_svg",
    "custom_fields": "variables",
    # In 3.1.1, it was report_key, report_display,
    # but in 3.2, we've changed over to reportable_key
    # and reportable_display for clarity that the key
    # is for the field, not for a specific report.
    "report_key": "reportable_key",
    "report_display": "reportable_display",
}
CACHE = {}
NAME_CACHE = {}
UUID_REMAP_CACHE = {}
TEMPLATE_PATH = pkg_resources.resource_filename("esp", "templates")
TEMPLATES = Environment(loader=FileSystemLoader(TEMPLATE_PATH), autoescape=select_autoescape(["html", "xml"]))
DYNAMIC_EXECUTION_PLAN = "Dynamic Execution Plan"
WORKFLOW_CHAIN_VERSION = "Workflow Chain Version"
SYSTEM_SETTING = "System Setting"

dep_types_mapper = {"samplePlan": "Sample Plan", "specificationPlan": "Specification Plan"}


# decorators
# ----------
def list_dispatch(func):
    @wraps(func)
    def _(cls, data, **kwargs):
        if isinstance(data, (list, tuple)):
            return [func(cls, d, **kwargs) for d in data]
        else:
            return func(cls, data, **kwargs)

    return _


# utils
# -----
def raw(x):
    if isinstance(x, composite):
        return x.json()
    return x


def is_uuid(name):
    try:
        UUID(name)
        return True
    except Exception:
        return False


def load_config(config, aliases=None, class_=None):
    if aliases is None:
        aliases = ALIASES
    # load yaml if config is string
    if isinstance(config, six.string_types) and os.path.exists(config):
        config = os.path.abspath(config)
        with open(config, "r") as fi:
            data = fi.read()
            data = data.replace("${CWD}", os.path.dirname(config))
            data = yaml.load(data, Loader=yaml.SafeLoader)
    else:
        data = config

    # format result and return
    if class_ is None:
        path = ""
    elif issubclass(class_, BaseModel):
        path = class_.__name__
    return format_entry(data, aliases, path=path, cls=class_)


def _format_excluded(cls, path, key, data):
    if key in ["links", "entities"]:
        return True
    if hasattr(cls, "__pristine__") and (key in cls.__pristine__ or path in cls.__pristine__):
        return True
    return False


def format_entry(data, aliases=None, path=None, cls=None):
    if aliases is None:
        aliases = ALIASES
    if path is None:
        path = "unknown"
    if cls is None:
        cls = BaseModel
    # re-format and re-label dictionary
    if isinstance(data, dict):
        # rearrange dict to include name field
        # possibilities:
        # name is in data, but fixed_id isn't. Then outer key is fixed_id.
        # name and fixed_id aren't in data. Then outer key == name == fixed_id
        # fixed_id is in data but name isn't. Then outer key == name.
        if len(data.keys()) == 1 and ("name" not in data or "fixed_id" not in data):
            ident = list(data.keys())[0]
            if (
                isinstance(data[ident], dict)
                and ident
                not in ["steps", "variables", "meta", "sequence", "before", "after", "verify", "pipeline_status"]
                and not _format_excluded(cls, path, ident, data)
            ):
                data = data[ident]
                if "name" not in data.keys():
                    data["name"] = ident
                # be more conservative about supplying fixed_id - only do it for models that explicitly
                # support it as a mutable parameter. And also only if it's different from name.
                name = data["name"]
                if (
                    hasattr(cls, "__mutable__")
                    and "fixed_id" in cls.__mutable__
                    and "fixed_id" not in data.keys()
                    and ident != name
                ):
                    data["fixed_id"] = ident

        # re-label keys for backend
        for key in list(data.keys()):
            if key in aliases:
                data[aliases[key]] = data.pop(key)

        # dropdown expressions
        if "dropdown" in data and isinstance(data["dropdown"], str):
            data["dropdown_expr"] = data.pop("dropdown")

        # go deeper down rabbit hole
        for key in list(data.keys()):
            if not _format_excluded(cls, path, key, data):
                data[key] = format_entry(data[key], aliases, path + "." + str(key), cls)

    # iterate through list and recursively call
    elif isinstance(data, list):
        data = [
            x if _format_excluded(cls, path, str(x), data) else format_entry(x, aliases, path + "." + str(i), cls)
            for i, x in enumerate(data)
        ]

    return data


def create_and_return(model, data, overwrite=False, allow_snapshot_uuid_remap=False):
    """
    Create set of models and return objects. This is used throughout
    several model creation processes, especially where models can
    create related sub-models (i.e. Workflow > Protocol)
    """
    single = not isinstance(data, (list, tuple))
    if single:
        data = [data]
    ret = []
    for item in data:
        if isinstance(item, six.string_types):
            obj = model(item)
            if not obj.exists():
                raise AssertionError("Cannot create object with non-existent {} {}".format(model.__name__, item))
            ret.append(obj)
        elif not isinstance(item, BaseModel):
            ret.append(model.create(item, overwrite=overwrite, allow_snapshot_uuid_remap=allow_snapshot_uuid_remap))
        else:
            ret.append(item)
    return ret[0] if single else ret


def create_and_return_uuid(model, data, overwrite=False, allow_snapshot_uuid_remap=False):
    """
    Create set of models and return UUIDs. This is used throughout
    several model creation processes, especially where models can
    create related sub-models (i.e. Workflow > Protocol)
    """
    single = not isinstance(data, (list, tuple))
    if single:
        data = [data]
    ret = []
    for item in data:
        if is_uuid(item):
            ret.append(item)
        else:
            res = create_and_return(
                model, item, overwrite=overwrite, allow_snapshot_uuid_remap=allow_snapshot_uuid_remap
            )
            if isinstance(res, (list, tuple)):
                ret.extend([x.uuid for x in res])
            else:
                ret.append(res.uuid)
    single = len(ret) == 1
    return ret[0] if single else ret


def cached_uuid(model, ident):
    for uuid in CACHE:
        if model.__api_cls__ == CACHE[uuid].get("cls") and CACHE[uuid].get("name") == ident:
            return uuid
    raise ValueError("Cannot find {} '{}'.".format(model.__name__, ident))


def yaml_str(item):
    # return the best representation of the item such that:
    # 1. The item is represented by a !!str node in both python 2 and python 3
    # 2. Avoid encoding errors/surprises.
    import six

    if six.PY2:
        try:
            return str(item)
        except:
            return item
    return item


def yaml_str_list(items):
    # return a list of strings.
    return [yaml_str(x) for x in items]


def espfile_or_value(value, ashref=False, resolve_existing=False):
    """
    Expand vars in value. If value is a path to a locale file,
    create an ESP File object from the file and return the File object.

    Args:
        value (string): A string that might point to a local file
        ashref (bool): If true, only a url suitable for an image src or file href
            is returned. Otherwise, the file object itself is returned. If value is
            not a file, then value is still returned.
        resolve_existing(bool): If true and the value does not resolve to an
            existing OS path, attempt to try the value as a file name or uuid
            and return it if it exists (still honoring ashref)
    """
    from .file import File

    value = os.path.expandvars(value)
    if os.path.exists(value):
        value = File.create({"name": os.path.basename(value), "uri": value, "upload": True})
        if ashref:
            return value.linkable_uri
    elif resolve_existing:
        existing = File(value)
        if existing.exists():
            if ashref:
                return existing.linkable_uri
            return existing
    return value


def compile_report(html, compiled=False):
    """
    Compile report using fragments available in the client
    templates directory. This is similar to composing html
    using components in modern front-end frameworks.

    This function also supports reading external paths into
    reports.

    Examples:

        Creating a report with a variable form panel and
        a sample type overview:

        .. code-block:: yaml

            Extract:
                view_template: |+

                    <panel header="Set Variables">
                      <variable-form></variable-form>
                    </panel>
                    <panel header="Linked Data">
                        <dependency-table type="Library"/>
                    </panel>

    """
    global TEMPLATES

    # handle path data
    path = os.path.expandvars(html)
    if os.path.exists(path):
        with open(path, "r") as fi:
            html = fi.read()

    # return non-compiled html
    if not compiled:
        return html

    # try to load xml content if it is xml
    try:
        tree = ElementTree.fromstring("<div>" + html + "</div>")
    except Exception as exe:
        logging.warning("Error in rendering html - " + str(exe))
        return html

    # parse and render elements
    def _(tree):
        result = ""
        for child in tree:
            # recurse if the element has children to
            # retrieve inner html
            text = child.text if child.text is not None else ""
            if child:
                body = text + _(child)
            else:
                body = text

            # render the template
            try:
                template = TEMPLATES.get_template(child.tag + ".html")
                data = child.attrib.copy()
                data["body"] = Markup(body)
                result += "\n" + template.render(**data)

            # otherwise use xml as template
            except TemplateNotFound as exe:
                attrib = ['{}="{}"'.format(key, value) for key, value in child.attrib.items()]
                attrib = " ".join(attrib)
                result += "<{} {}>\n{}\n</{}>".format(child.tag, attrib, body, child.tag)

            if child.tail is not None:
                result += child.tail

        return result

    result = _(tree)
    return result


def _export_approval(data):
    from .admin import Workgroup

    ret = {}
    wg = data.get("meta", {}).get("workgroup")
    if not wg:
        return ret
    wg = Workgroup(wg)
    ret["workgroup"] = wg.name
    labels = data["meta"].get("approval_labels", {})
    if "button" in labels and labels["button"] != "Approve":
        ret["button"] = labels["button"]
    if "action_text" in labels and labels["action_text"] != "Approved":
        ret["action_text"] = labels["action_text"]
    return ret


def _import_approval(data):
    data.setdefault("meta", {})
    if "workgroup" in data:
        data["meta"]["workgroup"] = data.pop("workgroup")
    if "workgroup" in data["meta"]:
        from esp.models import Workgroup

        wg = Workgroup(data["meta"]["workgroup"])
        data["meta"]["workgroup"] = wg.uuid
    else:
        data["meta"]["workgroup"] = None

    default_labels = {"button": "Approve", "action_text": "Approved"}
    button = data.pop("button", data["meta"].get("approval_labels", default_labels).get("button", "Approve"))
    action_text = data.pop(
        "action_text", data["meta"].get("approval_labels", default_labels).get("action_text", "Approved")
    )
    data["meta"]["approval_labels"] = {"button": button, "action_text": action_text}


def _export_signatureflow(data):
    ret = {}
    signature_def_uuid = data["meta"].get("signature_flow_uuid", None)
    if signature_def_uuid:
        from .signatureflow import SignatureFlow

        signature_flow = SignatureFlow(signature_def_uuid)
        ret["signature_flow"] = yaml_str(signature_flow.name)
        ret["signature_flow_uuid"] = yaml_str(signature_flow.uuid)

    autocomplete_protocol_when = data["meta"].get("signature_flow_complete_row_when", None)
    if autocomplete_protocol_when:
        ret["autocomplete_protocol_when"] = autocomplete_protocol_when
    return ret


def _import_signatureflow(data):
    data.setdefault("meta", {})
    signatureflow_name = data.pop("signature_flow", "")
    # Should we be doing something with signature_flow_uuid?
    data.pop("signature_flow_uuid", "")
    if signatureflow_name:
        from esp.models import SignatureFlow

        sf = SignatureFlow(signatureflow_name)
        data["meta"]["signature_flow_uuid"] = sf.uuid
    else:
        data["meta"]["signature_flow_uuid"] = None

    autocomplete_protocol_when = data.pop("autocomplete_protocol_when", None)
    if autocomplete_protocol_when:
        data["meta"]["signature_flow_complete_row_when"] = autocomplete_protocol_when


def _export_sample_point(data):
    from esp.models import EntityType

    ret = {}
    sample_types = data["meta"].get("sample_types", None)
    for sample_type in sample_types:
        sample_type_obj = EntityType(sample_type.pop("uuid"))
        sample_type["name"] = sample_type_obj.name
        if sample_type_obj.fixed_id:
            sample_type["fixed_id"] = sample_type_obj.fixed_id
    ret["sample_types"] = sample_types
    ret["label"] = data.get("meta", {}).get("button_label", "Add")
    return ret


def _import_sample_point(data):
    from esp.models import SampleType

    data.setdefault("meta", {})
    sample_types = data.pop("sample_types", [])
    for sample_type in sample_types:
        name_or_fixed_id = sample_type.get("fixed_id", sample_type.get("name"))
        # remove not needed data
        sample_type.pop("fixed_id", "")
        sample_type.pop("name", "")

        sample_type_obj = SampleType(name_or_fixed_id)
        sample_type["uuid"] = sample_type_obj.uuid
    label = data.pop("button_label", data.pop("label", "Add"))
    data["meta"]["sample_types"] = sample_types
    data["meta"]["button_label"] = label


def _export_pipelinebutton(data):
    ret = {}
    # TODO: Simplify this code block during 3.2 development.
    # There was a period of time during 3.1 when the way pipeline button fields
    # stored their metadata was different between protocols and workflow chains,
    # so this code exists to smooth over the difference and can be simplified
    # to the unified format when we start 3.2 development.
    labels = data["meta"].get(
        "button_labels",
        {
            "button": data["meta"].pop("button_label", "Start"),
            "action_text": data["meta"].pop("button_action_label", "Done"),
            "failed_text": data["meta"].pop("button_failed_action_label", "Failed"),
        },
    )
    pipeline_uuid = data["meta"].get("pipeline", None)
    pipeline_variables = data["meta"].get("pipelineVariables", data["meta"].get("pipeline_variables"))
    ret["pipeline_variables"] = pipeline_variables
    if pipeline_uuid:
        from .analysis import Pipeline

        ret["pipeline"] = yaml_str(Pipeline(pipeline_uuid).name)

    if "button" in labels:
        ret["button"] = labels["button"]
    if "action_text" in labels:
        ret["action_text"] = labels["action_text"]
    if "failed_text" in labels:
        ret["failed_text"] = labels["failed_text"]
    return ret


def _import_pipelinebutton(data):
    data.setdefault("meta", {})
    if "pipeline" in data:
        data["meta"]["pipeline"] = data.pop("pipeline")
    if "pipeline_variables" in data:
        data["meta"]["pipeline_variables"] = data.pop("pipeline_variables")

    if "pipeline" in data["meta"]:
        from esp.models import Pipeline

        pi = Pipeline(data["meta"]["pipeline"])
        data["meta"]["pipeline"] = pi.uuid
    else:
        data["meta"]["pipeline"] = None

    default_labels = {"button": "Start", "action_text": "Done", "failed_text": "Failed"}
    labels = data["meta"].get("button_labels", default_labels)
    button = data.pop("button", labels.get("button", "Start"))
    action_text = data.pop("action_text", labels.get("action_text", "Done"))
    failed_text = data.pop("failed_text", labels.get("failed_text", "Failed"))
    data["meta"]["button_labels"] = {"button": button, "action_text": action_text, "failed_text": failed_text}


def _export_attachment(data):
    ret = {}
    meta = data.get("meta", {})
    extensions = meta.get("allowed_extensions", None)
    allow_multiple = meta.get("allow_multiple_files", False)
    if extensions:
        ret["allowed_extensions"] = extensions
    if allow_multiple:
        ret["multiple"] = True
    return ret


def _import_attachment(data):
    meta = data.setdefault("meta", {})
    meta["allowed_extensions"] = data.pop("allowed_extensions", [])
    meta["allow_multiple_files"] = data.pop("multiple", False)


def _export_barcode(data):
    ret = {}
    ret["barcode_type"] = data.get("meta", {}).get("barcode_type", "QR")
    resource_barcode = data.get("meta", {}).get("resource_barcode", False)
    if resource_barcode:
        ret["resource_barcode"] = True
    return ret


def _import_barcode(data):
    data.setdefault("meta", {})
    if "barcode_type" in data:
        data["meta"]["barcode_type"] = data.pop("barcode_type")
    elif "barcode_type" not in data["meta"]:
        raise ValueError(
            (
                "Column `{}` is a barcode column but is missing the required `barcode_type` field (definition: {}`"
            ).format(data.get("name"), data)
        )
    # TODO: Move to a constants definition.
    # list of valid types from lab7/resource/__init__.py::BarcodeTypes.
    valid_barcode_types = ["QR", "1D", "mini data matrix", "UUID", "text"]
    if data["meta"]["barcode_type"] not in valid_barcode_types:
        raise ValueError(
            "Invalid barcode type `{}`. Valid types: {}".format(data["meta"]["barcode_type"], valid_barcode_types)
        )
    if data.get("resource_barcode"):
        data["meta"]["resource_barcode"] = data.pop("resource_barcode")


def _import_checkbox(data):
    if str(data.get("name")).lower() == "complete":
        logging.info('Re-classifying type of column "Complete" from "checkbox" to "complete"')
        data["var_type"] = "complete"
        if data.get("default_val") is None:
            data["default_val"] = "false"


def _import_complete(data):
    if data.get("default_val") is None:
        data["default_val"] = "false"


def _export_date(data):
    ret = {}
    if "format" in data["meta"] and data["meta"]["format"]:
        ret["format"] = data["meta"]["format"]
    return ret


# Use the same exporter for time and datetime types
_export_time = _export_date
_export_datetime = _export_date


def _date_or_time_importer(default_format):
    def importer(data):
        if "format" in data:
            format_ = data.pop("format")
            user_supplied_format = True
        else:
            format_ = default_format  # default
            user_supplied_format = False
        meta = data.setdefault("meta", {})
        if "format" not in meta or (user_supplied_format and format_ != meta["format"]):
            meta["format"] = format_
        # '{{' check: don't muck with expression-based default values.
        if "default_val" in data and data["default_val"] is not None and "{{" not in data["default_val"]:
            val = data["default_val"]
            if not isinstance(val, (datetime.datetime, datetime.date, datetime.time)):
                logging.info("Ensuring date string `{}` conforms to backend expectations".format(val))
                val = dateparser.parse(val)
            if isinstance(val, datetime.time):
                data["default_val"] = val.isoformat(timespec="auto")
            elif isinstance(val, (datetime.datetime, datetime.date)):
                data["default_val"] = val.isoformat()

    return importer


# Use _date_or_time_importer for date, time and datetime import with default format if any other was not specified
_import_date = _date_or_time_importer("YYYY/MM/DD")
_import_datetime = _date_or_time_importer("YYYY/MM/DDTHH:mm Z")
_import_time = _date_or_time_importer("HH:mm a Z")


def _export_dropdown(data):
    if data["dropdown_expr"]:
        return {"dropdown": yaml_str(data["dropdown_expr"])}
    return {"dropdown": yaml_str_list(data["dropdown"])}


def _export_itemqtyadj(data):
    ret = {}
    item_qty_aliases = {
        "item_status": "include_statuses",
        "minquant": "minimum_quantity",
    }
    itype = data["meta"].get("item_type")
    if itype:
        from .inventory import ItemType

        if hasattr(itype, "name"):
            ret["item"] = yaml_str(itype.name)
        else:
            ret["item"] = yaml_str(ItemType(itype).name)
    for key in [
        "item_display",
        "include_expired",
        "include_statuses",
        "item_tags",
        "render_code",
        "default_quantity",
        "minimum_quantity",
        "default_item",
        "item_status",
        "minquant",
    ]:
        if key not in data and key not in data["meta"]:
            continue
        ret[item_qty_aliases.get(key, key)] = data.get(key, data["meta"].get(key))

    dropdown_expr = data.get("dropdown_expr", None)
    if dropdown_expr:
        ret["only_use_items_within_containers_expression"] = dropdown_expr
    if ret.get("item_display") is None:
        ret["item_display"] = "name"
    return ret


def _import_itemqtyadj(data):
    meta = data.setdefault("meta", {})
    if "item_type" in data:
        meta["item_type"] = data.pop("item_type")
    # B/c it looks like client-based "export" uses item, not
    # item_type, so be sure to handle importing exports.
    elif "item" in data:
        meta["item_type"] = data.pop("item")
    if "item_type" not in data["meta"]:
        raise ValueError("itemqtyadj column MUST define item_type!")
    from .inventory import ItemType

    item_type = ItemType(data["meta"]["item_type"])
    if not item_type.exists():
        raise ValueError("Inventory ItemType {} does not exist!".format(item_type.ident))
    meta["item_type"] = item_type.uuid

    item_qty_aliases = {
        "item_status": "include_statuses",
        "minquant": "minimum_quantity",
    }

    for key in [
        "item_display",
        "include_expired",
        "include_statuses",
        "item_tags",
        "render_code",
        "default_quantity",
        "minimum_quantity",
        "default_item",
        "item_status",
        "minquant",
    ]:
        if key in data:
            if key not in data:
                continue
            meta[item_qty_aliases.get(key, key)] = data.pop(key)

    if "include_statuses" in meta and not isinstance(meta["include_statuses"], list):
        meta["include_statuses"] = list(meta["include_statuses"])

    if meta.get("item_display") is None:
        meta["item_display"] = "name"

    if "only_use_items_within_containers_expression" in data:
        dropdown_expr = data.pop("only_use_items_within_containers_expression")
        data["dropdown_expr"] = dropdown_expr

    display = meta.setdefault("item_display", "name")
    if display not in ["custom", "name", "serial"]:
        raise ValueError("item_display must be one of custom, name, or serial")
    if display == "custom" and not meta.get("render_code"):
        raise ValueError("render_code must be specified if item_display is custom")


def _export_itemadj(data):
    return _export_itemqtyadj(data)


def _import_itemadj(data):
    _import_itemqtyadj(data)


def _export_location(data):
    from .container import ContainerType

    if "container_type" not in data["meta"]:
        return {}
    ctype = data["meta"]["container_type"]
    ret = {}
    if isinstance(ctype, ContainerType):
        ret["container"] = yaml_str(data["meta"]["container_type"].name)
    # temporary workaround for glitch in saving location type in GUI.
    # TODO: Is this still necessary? I believe the GUI has been fixed...
    elif ctype is None:
        ret["container"] = None
    else:
        ret["container"] = yaml_str(ContainerType(ctype).name)
    if "location_fields" in data["meta"] and data["meta"]["location_fields"]:
        ret["fields"] = yaml_str_list(data["meta"]["location_fields"])
    return ret


# TODO: 3.1+ refactor - there's a weird asymetry in the import vs. export.
# export takes data from the server and returns a dict that is used to update
# the exported object. Import takes data from a json/yaml construct and modifies
# it in-place to produce the desired output structure. The behavior should be symmetric
# Either modify in-place universally or return a dictionary universally.
def _import_location(data):
    from .container import ContainerType

    vmeta = data.setdefault("meta", {})
    if "container" not in data and "container_type" not in vmeta:
        raise ValueError("Location variables must define container")
    if "container" in data:
        vmeta["container_type"] = ContainerType(data.pop("container")).uuid
    if "fields" in data:
        vmeta["location_fields"] = data.pop("fields")


_export_multiselect = _export_dropdown

# Should redefine these in terms of enums
SIG_FIG_PRECISION_TYPE = "significant figures"
DECIMAL_PLACE_PRECISION_TYPE = "decimal places"
NUMERIC_PRECISION_TYPES = [None, SIG_FIG_PRECISION_TYPE, DECIMAL_PLACE_PRECISION_TYPE]

PERCENTAGE_FORMAT = "percentage"
CURRENCY_FORMAT = "currency"
NUMERIC_FORMAT_TYPES = [None, PERCENTAGE_FORMAT, CURRENCY_FORMAT]


def _api_to_client(value, ary):
    if value < 0 or value >= len(ary):
        value = 0
    return ary[value]


def _client_to_api(value, ary, prop):
    if value is None:
        return 0
    if str(value).lower() in ary:
        return ary.index(str(value).lower())
    raise ValueError(f"Unknown value {value} for property {prop}. Known values: {ary}")


def _export_numeric(data):
    ret = {}
    # forward compatibility.
    if "units" in data and data["units"]:
        ret["units"] = data["units"]
    meta = data.get("meta", {})
    max_value = meta.get("maximum_value")
    min_value = meta.get("minimum_value")
    hard_max = meta.get("hard_maximum", False)
    hard_min = meta.get("hard_minimum", False)
    workgroups = meta.get("notify_workgroups", [])
    prec_type = _api_to_client(meta.get("number_precision_type", 0), NUMERIC_PRECISION_TYPES)
    format_type = _api_to_client(meta.get("number_formatting_type", 0), NUMERIC_FORMAT_TYPES)
    prec_count = meta.get("number_precision_count", 0)
    format_options = meta.get("number_formatting_options", {})
    spec_type = meta.get("field_spec_type", None)
    unit_of_measure = meta.get("unit_of_measure", None)

    if max_value is not None:
        ret["max"] = f'{max_value}{" hard" if hard_max else ""}'
    if min_value is not None:
        ret["min"] = f'{min_value}{" hard" if hard_min else ""}'
    if workgroups:
        ret["notify"] = [x["name"] for x in workgroups]
    if prec_type:
        if prec_type == SIG_FIG_PRECISION_TYPE:
            ret["sigfigs"] = prec_count
        else:
            ret["decimals"] = prec_count
    if format_type:
        ret["format"] = format_type
        if format_type == CURRENCY_FORMAT:
            ret.update(format_options)
    if spec_type:
        ret["spec_type"] = spec_type
    if unit_of_measure:
        ret["unit_of_measure"] = unit_of_measure
    return ret


def _parse_limit(limit):
    parts = limit.split(" ")
    if len(parts) == 1:
        return parts[0], "soft"
    if len(parts) == 2 and parts[1] in ("soft", "hard"):
        return parts
    raise ValueError(
        f"Unable to parse limit: {limit}: expected a string formatted "
        '":limit" or ":limit :type" where type is one of "soft" or "hard"'
    )


def _import_numeric(data):
    meta = data.get("meta", {})
    if "sigfigs" in data:
        prec_count = data.pop("sigfigs")
        prec_type = _client_to_api(SIG_FIG_PRECISION_TYPE, NUMERIC_PRECISION_TYPES, "precision")
    elif "decimals" in data:
        prec_count = data.pop("decimals")
        prec_type = _client_to_api(DECIMAL_PLACE_PRECISION_TYPE, NUMERIC_PRECISION_TYPES, "precision")
    else:
        prec_count = None
        prec_type = 0
    meta["number_precision_type"] = prec_type
    if prec_type:
        meta["number_precision_count"] = prec_count
    if "min" in data:
        val, type_ = _parse_limit(data.pop("min"))
        meta["minimum_value"] = val
        meta["hard_minimum"] = type_ == "hard"
    else:
        meta["hard_minimum"] = False
    if "max" in data:
        val, type_ = _parse_limit(data.pop("max"))
        meta["maximum_value"] = val
        meta["hard_maximum"] = type_ == "hard"
    else:
        meta["hard_maximum"] = False
    meta["number_formatting_type"] = _client_to_api(data.pop("format", None), NUMERIC_FORMAT_TYPES, "format")
    if "currencyCode" in data:
        meta["number_formatting_options"] = {"currencyCode": data.pop("currencyCode")}
    else:
        meta["number_formatting_options"] = {}
    if "spec_type" in data:
        meta["field_spec_type"] = data.pop("spec_type")
    if "unit_of_measure" in data:
        meta["unit_of_measure"] = data.pop("unit_of_measure")
    notify = data.pop("notify", [])
    if notify:
        from esp.models import Workgroup

        notify = [{"name": x} for x in notify]
    meta["notify_workgroups"] = notify
    data["meta"] = meta


# rewrite string -> text.
def _import_string(data):
    data["var_type"] = "text"
    return data


def _export_resource_link(data):
    link_type = data.get("resource_link_type", "Sample")
    normalized_link_type = normalize_resource_link_type(link_type)
    if link_type not in [DYNAMIC_EXECUTION_PLAN, WORKFLOW_CHAIN_VERSION] and not hasattr(
        esp.models, normalized_link_type
    ):
        # link_type can be a pre-supplied/Out-of-box Entity Class.
        # Customers and implementations may create other entity classes like MESProduct
        normalized_link_type = "Sample"

    restrict_to = []
    restrict_to_wfc = []
    meta = data.get("meta", {})
    # resource link container meta properties
    valid_container_statuses = meta.get("status", [])
    after_use_container_status = meta.get("afterUseContainerStatus", None)

    meta_restrict_to = meta.get("restrictTo", [])
    meta_restrict_to_wfc = meta.get("restrictToWfc", [])
    if link_type == DYNAMIC_EXECUTION_PLAN:
        restrict_to = [dep_types_mapper.get(dep_type) for dep_type in meta_restrict_to]
        restrict_to_wfc = _get_restrict_to_list_for_export(normalized_link_type, meta_restrict_to_wfc)
    else:
        restrict_to = _get_restrict_to_list_for_export(normalized_link_type, meta_restrict_to)
    if link_type == WORKFLOW_CHAIN_VERSION:
        allow_unpinned_versions = meta.get("allowUnpinnedVersions", False)
        allow_unpinned_versions_from_setting = meta.get("allowUnpinnedVersionsFromSystemSetting", True)
        if not allow_unpinned_versions and not allow_unpinned_versions_from_setting:
            allow_unpinned_versions_button = "No"
        if allow_unpinned_versions and not allow_unpinned_versions_from_setting:
            allow_unpinned_versions_button = "Yes"
        if not allow_unpinned_versions and allow_unpinned_versions_from_setting:
            allow_unpinned_versions_button = SYSTEM_SETTING

    result = {}
    if link_type == "Container" and len(valid_container_statuses):
        result["valid_container_statuses"] = valid_container_statuses
    if after_use_container_status:
        result["after_use_container_status"] = after_use_container_status
    if link_type == WORKFLOW_CHAIN_VERSION:
        result["allow_unpinned_versions_button"] = allow_unpinned_versions_button
    if len(restrict_to):
        result["restrict_to"] = restrict_to
    if len(restrict_to_wfc):
        result["restrict_to_wfc"] = restrict_to_wfc
    if link_type and link_type != "Sample":
        result["resource_link_type"] = link_type
    return result


def _get_restrict_to_list_for_export(normalized_link_type, restrict_to_uuids):
    restrict_to_type = restrict_to_type_mapper(normalized_link_type)
    restrict_to = []
    for uuid in restrict_to_uuids:
        if not hasattr(esp.models, restrict_to_type):
            raise ValueError("Restrict to type `{}` is not a valid esp model.".format(restrict_to_type))
        obj = getattr(esp.models, restrict_to_type)(uuid)
        restrict_to_obj = {"name": obj.name}
        if obj.fixed_id is not None:
            restrict_to_obj["fixed_id"] = obj.fixed_id
        restrict_to.append(restrict_to_obj)
    return restrict_to


def _import_resource_link(data):
    link_type = data.get("resource_link_type", "Sample")
    restrict_to = data.pop("restrict_to", [])
    restrict_to_uuids = []
    restrict_to_wfc_uuids = []
    if link_type == DYNAMIC_EXECUTION_PLAN:
        restrict_to_wfc = data.pop("restrict_to_wfc", [])
        restrict_to_wfc_uuids = _get_restrict_to_list_for_import(restrict_to_wfc, link_type)
        inverse_dep_types_mapper = {v: k for k, v in dep_types_mapper.items()}
        restrict_to_uuids = [inverse_dep_types_mapper.get(dep_type) for dep_type in restrict_to]
    else:
        restrict_to_uuids = _get_restrict_to_list_for_import(restrict_to, link_type)
    if link_type == WORKFLOW_CHAIN_VERSION:
        allow_unpinned_versions_button = data.pop("allow_unpinned_versions", SYSTEM_SETTING)
        if allow_unpinned_versions_button in ["No", False]:
            allow_unpinned_versions = False
            allow_unpinned_versions_from_setting = False
        if allow_unpinned_versions_button in ["Yes", True]:
            allow_unpinned_versions = True
            allow_unpinned_versions_from_setting = False
        if allow_unpinned_versions_button in [SYSTEM_SETTING]:
            allow_unpinned_versions = False
            allow_unpinned_versions_from_setting = True

    meta = data.get("meta", {})
    if link_type == WORKFLOW_CHAIN_VERSION:
        meta["allowUnpinnedVersions"] = allow_unpinned_versions
        meta["allowUnpinnedVersionsFromSystemSetting"] = allow_unpinned_versions_from_setting
    if len(restrict_to_uuids) > 0:
        meta["restrictTo"] = restrict_to_uuids
    if len(restrict_to_wfc_uuids) > 0:
        meta["restrictToWfc"] = restrict_to_wfc_uuids

    # set resource link container meta properties
    if link_type == "Container":
        if "valid_container_statuses" in data:
            meta["status"] = data.pop("valid_container_statuses")
        if "after_use_container_status" in data:
            meta["afterUseContainerStatus"] = data.pop("after_use_container_status")

    data["meta"] = meta
    if not data.get("resource_link_type"):
        data["resource_link_type"] = "Sample"


def _get_restrict_to_list_for_import(restrict_to, link_type):
    restrict_to_uuids = []
    normalized_link_type = normalize_resource_link_type(link_type)
    if not hasattr(esp.models, normalized_link_type):
        # link_type can be a pre-supplied/Out-of-box Entity Class.
        # Customers and implementations may create other entity classes like MESProduct
        normalized_link_type = "Sample"
    restrict_to_type = restrict_to_type_mapper(normalized_link_type)
    # accepted import formats: array of objects (containing name and fixed id properties) or array of names
    # ex:
    # 1. [{ name: The Name, fixed_id: The Fixed Id }, { name: The Name, fixed_id: The Fixed Id } ... ]
    # 2. [Name 1, Name 2, Name 3 ...]
    for item in restrict_to:
        if isinstance(item, str):
            name_or_fixed_id = item
        else:
            name_or_fixed_id = item.get("fixed_id", item.get("name"))
        obj = getattr(esp.models, restrict_to_type)(name_or_fixed_id)
        restrict_to_uuids.append(obj.uuid)
    return restrict_to_uuids


def export_variable_definitions(variables, exclude=lambda x: False):
    """exporter function that can be used for resource var export."""

    # TODO: rework the logic below in a bit more clean manner.
    if isinstance(variables, composite):
        variables = variables.json()
    else:
        variables = list(variables)
    ret = []
    alias_inverted = {v: k for k, v in ALIASES.items()}
    for data in variables:
        if exclude(data):
            continue
        var = {"rule": yaml_str(data["var_type"])}
        # universal properties.
        if data["tags"]:
            var["tags"] = sorted(yaml_str_list(data["tags"]))
        for flag in ("required", "pipeline_param", "read_only"):
            if data[flag]:
                var[yaml_str(flag)] = True
        if data["in_sample_sheet"] is False:
            var["visible"] = False
        if data["default_val"] is not None:
            var["value"] = yaml_str(data["default_val"])
        if data.get("meta") and "onchange" in data["meta"]:
            var["onchange"] = data["meta"]["onchange"]
        if data.get("meta") and "augment" in data["meta"]:
            var["augment"] = data["meta"]["augment"]
        augment = var.get("augment", {})
        # for 3.1 series, pull these up from augment to top-level.
        # For 3.2+ ESP, they will be at the top level, anyway.
        for key in ["reportable", "report_key", "reportable_key", "report_display", "reportable_display"]:
            if key in data:
                var[ALIASES.get(key, key)] = data[key]
            elif augment and key in augment:
                var[ALIASES.get(key, key)] = augment.pop(key)
        fixed_id = data.get("fixed_id", data.get("barcode", data["name"]))
        if fixed_id not in (data["name"], data["uuid"]):
            var["fixed_id"] = fixed_id
        name = data.get("name")
        var["name"] = name

        for key in ["desc", "instructions", "var_group", "ontology_path", "shared"]:
            if data.get(key):
                var[alias_inverted.get(key, key)] = data[key]

        # vartype-specific exports
        exporter = "_export_{}".format(var["rule"])
        if exporter in globals():
            var.update(globals()[exporter](data))

        key = data.get("fixed_id", name)
        ret.append({yaml_str(key): var})

    return ret


def push_variable_definitions(data):
    """
    Format variables data structure for parsing import and
    pushing updates. This is particularly useful for updating complex
    variables like containers or other inventory items.

    Args:
        data: list[dict[str,object]] - List of variable dicts to push.
    """
    res = []
    for var in data:
        var_type = var.setdefault("var_type", "text")
        importer = "_import_{}".format(var_type)
        # onchange
        if "onchange" in var:
            var.setdefault("meta", {})["onchange"] = var.pop("onchange")

        # 3.1.1 and rest of 3.1.x series: reportable information is in
        # augment.
        # 3.2 and later, this will stay as top-level fields in the var,
        # and standard alias handling will re-map from report_key,
        # report_display -> reportable_key, reportable_display.
        # just need to handle the case where they were written under augment.
        if "augment" in var:
            for key in ["reportable", "report_key", "report_display", "reportable_key", "reportable_display"]:
                if key in var["augment"]:
                    var[ALIASES.get(key, key)] = var["augment"].pop(key)

        if importer in globals():
            globals()[importer](var)

        res.append(var)
    return res


def export_fixed_id(obj):
    """
    Exports fixed_id for an object.
    If the fixed_id is the same as the object name, this will export None for the value
    """
    fixed_id = getattr(obj, "fixed_id", None)
    if fixed_id and fixed_id != obj.name:
        return fixed_id
    return None


# objects
# -------
[docs]class BaseModel(object): """ Abstract class for managing initialization methods for common objects. Args: ident (str): Name or uuid to use for initializing object. data (dict): Data to use for instantiating object. This only needs to be specified if you already have all metadata related to an object. **kwargs (dict): Additional data to include in model query. For example, including sample_type='Generic sample' will subset the query by sample type. """ __api__ = None # the backend api. Ie in /api/samples/:uuid, "samples" is the api. __api_cls__ = None # the (string) "class" returned by the backend for the cls property. __version_api__ = None # The endpoint that returns "version" (definition) objects for versioned object types. __allow_update__ = True # If false, a "create" operation cannot overwrite an existing object. __extend_aliases__ = True __droppable__ = True # If false, the model cannot be archived __aliases__ = {} # class-specific aliases for mapping between config <--> backend. __type_aliases__ = [] # For evolving the codebase over time __defaults__ = { # class specific value defaults. "name": None, "uuid": None, "desc": "", } __mutable__ = [ # properties that can be altered via PUT. "name", "desc", "tags", "meta", "augment", "barcode", ] # by default, don't export barcode. Subclasses can override as-needed. __base_exportable__ = [x for x in __mutable__ if x != "barcode"] # which model properties are exported. __create_params__ = [] # parameters passed from Model.create() -> Model.__init__. __push_format__ = {} # property -> callable map for mapping from client format to backend PUT format. __pristine__ = [] # class properties whose values will not be munged by standard config handling rules. __versioned_exportable__ = [] # Class constant for avoiding recursion errors when setting attributes. __prime_attributes__ = ["ident", "_data", "_params", "_version", "_snapshot"] def __init__(self, ident, data=None, version=None, snapshot=None, **kwargs): if not isinstance(ident, six.string_types): raise AssertionError("Error: ident argument to BaseModel must be string type!") self.ident = ident self._data = composite(data) if data is not None else None self._version = version self._snapshot = snapshot self._params = kwargs return def __eq__(self, other): try: return self.uuid == other.uuid except: return False def __repr__(self): if "name" in self.data: return "<{}(name={})>".format(self.__class__.__name__, self.data.name) elif "desc" in self.data: return "<{}(desc={})>".format(self.__class__.__name__, self.data.desc) else: return "<{}({})>".format(self.__class__.__name__, self.ident) def __getattr__(self, name): if name in self.__class__.__dict__ or name in self.__dict__: return self.__getattribute__(name) elif name in self.data: return self.data[name] elif name in dir(self): # This check is needed to handle any @cached methods that haven't # been replaced with fields yet. return self.__getattribute__(name) elif len(self.data) == 0: raise AssertionError("Error: No data available for object {}.".format(self)) else: raise AttributeError("{} object has no attribute {}".format(self.__class__.__name__, name)) def __setattr__(self, name, value): if name in self.__prime_attributes__: self.__dict__[name] = value elif name in self.__class__.__dict__: if isinstance(self.__class__.__dict__[name], property): self.__class__.__dict__[name].fset(self, value) else: self.__dict__[name] = value elif hasattr(self.__class__, name): prop = getattr(self.__class__, name) if isinstance(prop, property): prop.fset(self, value) else: self.__dict__[name] = value elif name in self.data: self.data[name] = value else: self.__dict__[name] = value def __getitem__(self, item): return self.data[item] def __setitem__(self, item, value): self.data[item] = value return
[docs] @classmethod def all(cls, **kwargs): """ Return all instances related to a given model. Examples: >>> samples = Sample.all() >>> print(samples[0].name) 'ESP0000001' """ result = base.SESSION.get("/api/{}".format(cls.__api__)) return [cls.from_data(data) for data in result.json()]
[docs] @classmethod def from_data(cls, data): """ Instantiate object from data. Examples: >>> import esp.base as base >>> from esp.models import Sample >>> response = base.SESSION.get('/api/sample?name=ESP000001') >>> samp = Sample.from_data(response.json()) >>> print(samples[0].name) 'ESP0000001' """ return cls(ident=data["uuid"], data=data)
[docs] @classmethod def from_definition(cls, uuid): """ Instantiate object definition from uuid. Examples: >>> import esp.base as base >>> from esp.models import SampleType >>> st = SampleType.from_definition('8552afc3-882b-41e3-b0cf-1a4cf3f28d33') >>> print(st[0].name) 'Illumina Sample' """ if cls.__version_api__ is None: raise AssertionError( "Error: __version_api__ must be specified for subclasses for from_definition() to resolve." ) # check the cache first... if uuid in CACHE: return cls.from_data(CACHE[uuid]) result = base.SESSION.get("/api/{}/{}".format(cls.__version_api__, uuid)) CACHE[uuid] = result.json() return cls.from_data(CACHE[uuid])
[docs] @classmethod def items_for_uuids(cls, uuids, max_query_string_size=8000): """ Fetch objects by UUID. This is similar to search but uses a different fetch strategy and uses generators, so was split out for backwards compatibility. Args: uuids (List[str]): List of UUIDs of objects to fetch. max_query_string_size (int): Max number of characters allowed in the query string. Differences from search: 1. Simpler interface. Only "uuids" is provided. 2. search does a POST to avoid long URLs; items_for_uuids does a "GET" but splits the UUID payload up so items_for_uuids may make several requests. However, items_for_uuids calls object-type specific endpoints that typically provide better performance than the general-purpose resource endpoint used by search. 3. items_for_uuids is less likely to cause an OOM exception for a web worker by asking for thousands of objects at once. In scenarios where you are simply iterating through the results and do not need to hold the entire collection of objects in memory at once, items_for_uuids provides better memory performance because it returns a generator so you never have to hold the entire result data in memory at once. """ uuids = list(uuids) # we can compute the uuid size based on teh max_query_string_size since UUIDs are fixed length... STATIC_LEN = 9 # accounts for ?uuids=[] # divide by 38 b/c 36 characters for uuid4 uuid + two quotes per uuid. # Int to ensure we skate under batch_size = int((max_query_string_size - STATIC_LEN) / 38) # Entities use the Sample api which was not designed for Containers and Items # additional_query_params can be removed once Entities get a separate api additional_query_params = "" from esp.models import Entity if cls == Entity: additional_query_params = '&clses=["Sample","Container","Item"]' while uuids: current_uuids = uuids[:batch_size] result = base.SESSION.get(f"/api/{cls.__api__}?uuids={json.dumps(current_uuids)}{additional_query_params}") # ensure entities are returned in uuid order. data_map = {x["uuid"]: x for x in result.json()} for uuid in current_uuids: if uuid in data_map: obj = cls.from_data(data_map[uuid]) else: obj = None yield obj uuids = uuids[batch_size:]
[docs] @classmethod def items_for_names(cls, names, max_query_string_size=4000): """ Fetch objects by name. This is similar to search but uses a different fetch strategy and uses generators, so was split out for backwards compatibility. Args: names (List[str]): List of names of objects to fetch. max_query_string_size (int): Max number of characters allowed in the query string. Differences from search: 1. Simpler interface. Only "names" is provided. 2. search does a POST to avoid long URLs; items_for_uuids does a "GET" but splits the names payload up so items_for_names may make several requests. However, items_for_names calls object-type specific endpoints that typically provide better performance than the general-purpose resource endpoint used by search. 3. items_for_names is less likely to cause an OOM exception for a web worker by asking for thousands of objects at once. In scenarios where you are simply iterating through the results and do not need to hold the entire collection of objects in memory at once, items_for_names provides better memory performance because it returns a generator so you never have to hold the entire result data in memory at once. """ def _find_batch_size(name_list): index = len(name_list) while len(json.dumps(name_list[:index])) + 7 > max_query_string_size: index //= 2 # note that this is only _half_ a true binary search, so we will consistently undershoot our target # index by as much as 2x. But most use-cases are small batch sizes... return index names = list(names) while names: # determine the next batch size batch_size = _find_batch_size(names) current_names = names[:batch_size] result = base.SESSION.get(f"/api/{cls.__api__}?names={json.dumps(current_names)}") for x in result.json(): yield cls.from_data(x) names = names[batch_size:]
[docs] @classmethod def search(cls, name=None, tags=None, method="or", uuids=None, names=None): """ Search for model by name wildcard or tags. Args: name (str|List[str]): Wildcard with sample name. For searching with an exact match, simply instantiate the model with the name (i.e. Sample('My Sample')). tags (list): List of tags to search for. method (str): How to filter the search by tags. Can either be 'and' (intersection) or 'or' (union). uuids (List[str]): UUID(s) search for. For fetching a single object by uuid, instantiate the object as ModelClass(<uuid>) (e.g.: Sample(<uuid>)) """ if cls.__api_cls__ is None: raise AssertionError("Search not supported for type {}".format(cls.__name__)) if name is None and tags is None and uuids is None and names is None: raise AssertionError("Please specify name= or tags= or uuids= or names= to search!") data = {} if name is not None: data["name.like"] = name if uuids is not None: data["uuids"] = json.dumps(uuids) if names is not None: data["names"] = json.dumps(names) if tags is not None: if not isinstance(tags, (list, tuple)): tags = [tags] tags = json.dumps(tags) # note: resources query supports both AND and OR for tags. # tags= is OR. alltags= is AND. if method == "and": data["alltags"] = tags else: data["tags"] = tags data["cls"] = cls.__api_cls__ res = base.SESSION.post("/api/resources/query", json=data) obj = res.json() # filter resources using api url and instantiate uobj = {} for o in obj: if cls.__api_cls__ == o["cls"]: # account for an or query returning multiple # entries for the same thing uobj[o["uuid"]] = o # filter by unique entries ret = [] for o in sorted(uobj.values(), key=lambda x: x["updated_at"]): ret.append(cls.from_data(o)) return ret
@classmethod def _resolve_existing(cls, item, allow_snapshot_uuid_remap): """Resolve an existing model object. Args: item: configuration to resolve allow_snapshot_uuid_remap: allow for pinned UUID remaps to occur """ uuid = item.get("uuid") def_uuid = item.get("def_uuid") name = item.get("name") fixed_id = item.get("fixed_id") def _vet_potential_remap(obj): # no uuid supplied... so we don't care anyway. if not uuid: return UUID_REMAP_CACHE[uuid] = obj.uuid if uuid == obj.uuid or allow_snapshot_uuid_remap: return raise ValueError( "Importing a pinned WFC where a uuid remap is required, but not allowed. " "Add the `--allow-snapshot-uuid-remap` to enable UUID remapping of pinned objects. " f"(Existing object uuid: {obj.uuid}; expected uuid: {uuid})" ) kwargs = {} # adding additional query params (i.e. sample type for sample) for param in cls.__create_params__: if param in item: kwargs[param] = item[param] kwargs["version"] = def_uuid lookups = [("uuid", uuid), ("fixed_id", fixed_id), ("name", name)] lookups = [x for x in lookups if x[1]] for type_, ident in lookups: # note: this try/except encompasses three conditions: # 1. version is not specified at all (covered in main try block) # 2. version _is_ specified and it exists (covered in main try block) # 3. version _is_ specified but does not exist (covered in except block). try: # note: it's okay to try to resolve an object with a version of None. kwargs["ident"] = ident obj = cls(**kwargs) if obj.exists(): if obj.data and hasattr(obj.data, "deleted") and obj.data.deleted: try: url = "/api/{}/{}/undelete".format(cls.__api__, obj.data.uuid) base.SESSION.put(url) except: raise ValueError( "Undelete failed for {} identified by {}: {} . Object exists, but was archived".format( cls.__name__, type_, ident ) ) # and for the purpose of vetting downstream behaviors... we don't # care if we resolved the object with a version specified or not here... _vet_potential_remap(obj) logging.info( "{} with {} {}{} already exists - using existing object".format( cls.__name__, type_, ident, ", def_uuid {}".format(def_uuid) if def_uuid else "" ) ) return obj, def_uuid is not None except MissingVersionError: # only get MissingVersionError if the object exists but version uuid doesn't. # In that case, we can do from existing head object and do the # PUT later kwargs.pop("version") obj = cls(**kwargs) _vet_potential_remap(obj) logging.info( "{} with {} {} already exists - using existing object".format( cls.__name__, type_, ident, ) ) return obj, False # if we made it here, the object is just doesn't exist by any mechanism # we can find. return composite({"exists": lambda: False}), False
[docs] @classmethod def create( cls, config=None, overwrite=False, prompt=False, object_names=None, force_overwrite=False, allow_snapshot_uuid_remap=False, **kwargs, ): """ Create new object in ESP database using config file or other data. This method should be overwritten by subclasses of BaseModel for model-specific logic. Args: config (str, dict, list): Config file or information to use in creating new object. overwrite (bool): Whether or not to delete the current entry in the ESP database. prompt (bool): If current entry for config exists, attempt to detect conflicts and present interactive prompt for resolving conflicts. object_names (list[str]): When config refers to a list of objects to load, only load the items named by object_names. Default is to load all objects in config. force_overwrite (bool): Will force an overwrite, even when no change is detected between the existing DB configuration and the configuration loaded from file. """ if config is None: config = {} if isinstance(config, dict): config.update(kwargs) if not config: raise AssertionError( "Incorrect arguments specified to create()! Please see documentation for more information." ) # load config aliases = ALIASES.copy() if cls.__extend_aliases__ else {} aliases.update(cls.__aliases__) data = load_config(config, aliases, cls) # manage single/multiple defs if not isinstance(data, (list, tuple)): data = [data] # traverse data and create/update created = [] for item in data: # only load items listed explicitly by name # if the object_names argument is provided. if object_names is not None and "name" in item and item["name"] not in object_names: continue # totally bypass model creation logic if we're doing a hub import # set defaults if isinstance(item, dict): if item.get("_format", None) == "hub": created.append(cls._import_hub(item)) continue for key in cls.__defaults__: if key not in item: default = cls.__defaults__[key] if isinstance(default, (list, dict)): item[key] = type(default)(default) else: item[key] = default item = item.copy() item = cls.parse_import(item, overwrite=overwrite, allow_snapshot_uuid_remap=allow_snapshot_uuid_remap) # break on no create or model-specific override if item is None: continue elif isinstance(item, BaseModel): created.append(item) continue elif isinstance(item, (list, tuple)) and all(isinstance(x, BaseModel) for x in item): created.extend(item) continue # Handle object format item = dict(item) # load object (or phony object) for determining if it exists # check for versioned imports... obj, exists_for_version = cls._resolve_existing(item, allow_snapshot_uuid_remap) # generate json hash of item for update checking hash_ = json.dumps({k: v for k, v in item.items() if k != "uuid"}, sort_keys=True) hash_time = datetime.datetime.now(datetime.timezone.utc) hash_time = hash_time.isoformat() hash_time = hash_time.replace("+00:00", "") item.setdefault("augment", {}) item["augment"].update( dict( imported_content=hash_, imported_at=hash_time, ) ) if obj.exists(): # however we found the object, we now have it, so we'll be # using the object's UUID to run a PUT, so we don't need # a UUID in item if there ever was one. item.pop("uuid", None) if exists_for_version: # then we want to immediately abort processing. created.append(obj) continue # object doesn't exist - create if not obj.exists(): ident = item.get("name") or item.get("fixed_id") or item.get("uuid") or "<UNKNOWN>" logging.info("Creating {}: {}".format(cls.__name__, ident)) if "snapshot" in item: item["snapshot"] = cls._remap_snapshot_uuids(item["snapshot"]) created.append(cls._create_model(item, overwrite=overwrite, prompt=prompt)) # object exists - don't update elif obj.exists() and not overwrite: logging.info("Object {} already exists; will not overwrite.".format(repr(obj))) cls.parse_response(obj.data.json(), overwrite=overwrite, prompt=prompt) created.append(obj) # check if update is allowed for model elif not cls.__allow_update__: raise AssertionError( "Update not allowed for model {}! " "Either no backend support or model assumptions " "would be violated!".format(cls.__name__) ) # object exists and overwrite specified else: change = "Config" different = False # pre-prompt or updated through ui if "imported_at" not in obj.augment or "imported_content" not in obj.augment: different = True change = "Remote" # otherwise, detect changes with previous import hash else: # keeping this block here since it's a more robust solution than the else block. # We can remove the else block when we complete the python 3.9 migration. if hasattr(datetime.datetime, "fromisoformat"): last_update = datetime.datetime.fromisoformat(obj.updated_at.replace("Z", "")) imported_at = datetime.datetime.fromisoformat(obj.augment.imported_at) else: last_update = datetime.datetime.strptime(obj.updated_at, "%Y-%m-%dT%H:%M:%S.%fZ") imported_at = datetime.datetime.strptime(obj.augment.imported_at, "%Y-%m-%dT%H:%M:%S.%f") previous_hash = obj.augment.get("imported_content", "{}") import_delta = int(abs((last_update - imported_at).total_seconds())) # if the last update was 30s after the last import or the content is different # TODO: Why are we separating by 60 seconds (or 30s based on the comment above??)? # It seems completely arbitrary change = "Remote" if import_delta > 60 else "Config" different = import_delta > 60 or previous_hash != hash_ # content not different -> don't overwrite if not different: if force_overwrite: logging.info( "Force-overwriting existing object {} despite no changes detected".format(repr(obj)) ) created.append(cls._overwrite_model(obj, item, overwrite=overwrite, prompt=prompt)) else: logging.info("No change detected for object {}; will not overwrite.".format(repr(obj))) cls.parse_response(obj.data.json(), overwrite=overwrite, prompt=prompt) created.append(obj) # content different but no prompt -> force overwrite elif not prompt: logging.info("Overwriting existing object {}".format(repr(obj))) created.append( cls._overwrite_model( obj, item, overwrite=overwrite, prompt=prompt, allow_snapshot_uuid_remap=allow_snapshot_uuid_remap, ) ) # content different and prompt -> print diff and prompt else: # generate diff hash for config change if change == "Config": prev = yaml.dump(json.loads(previous_hash)).split("\n") # generate diff hash for remote change elif change == "Remote": jdata = obj.data.json() # remove old keys for diff jdata.setdefault("meta", {}) if "augment" in jdata["meta"]: if "imported_content" in jdata["meta"]["augment"]: del jdata["meta"]["augment"]["imported_content"] del jdata["meta"]["augment"]["imported_at"] # generate remote hash rhash = {key: jdata.get(key) for key in item if key != "uuid"} rhash = json.dumps(rhash, sort_keys=True) prev = yaml.dump(json.loads(rhash)).split("\n") # take diff of object current = yaml.dump(json.loads(hash_)).split("\n") diff = list(difflib.unified_diff(prev, current))[2:] # notify user about change sys.stderr.write(f"\n=======\n{change} change detected for object {repr(obj)}.\n=======\n") if len(diff): sys.stderr.write("\n--> start diff {} <--\n".format(repr(obj))) sys.stderr.write("\n".join(diff)) sys.stderr.write("\n\n--> end diff {} <--\n\n".format(repr(obj))) else: sys.stderr.write( "\nContent delta could not be determined locally for {}. " "Please check ESP for potential content differences. " "See below for content that will be imported after the prompt:\n".format(repr(obj)) ) sys.stderr.write("\n{}\n".format("\n".join(current))) # prompt for input until correct while True: inp = input("Keep previous (-) or new (+) version? ") # keep previous if inp == "-": logging.info("Keeping existing version for object {}".format(repr(obj))) created.append(obj) break # update with new elif inp == "+": logging.info("Overwriting existing object {}".format(repr(obj))) created.append(cls._overwrite_model(obj, item, overwrite=overwrite, prompt=prompt)) break # input error else: sys.stderr.write( "\n Error -> `{}` isn't a supported option. Please enter `-` or `+`\n\n".format(inp) ) return created if len(created) > 1 else created[0]
@classmethod def _remap_snapshot_uuids(cls, snapshot): # note: snapshot metadata is only recorded in the top-level pin. So if you # export a pinned WFC, it's in the WFC; pin and export a WF, and it's in the WF. # Likewise for Pipelines and signature flows. Also, the client imports nested dependencies # before outer dependencies, so if we have a snapshot, we should have _already_ imported # all dependencies and therefore the UUID_REMAP_CACHE should have everything we need by now. if isinstance(snapshot, list): return [cls._remap_snapshot_uuids(x) for x in snapshot] if not isinstance(snapshot, dict): return snapshot remapped = dict() for key, value in snapshot.items(): # we only touch the uuid keys. if key == "uuid": if value in UUID_REMAP_CACHE: value = UUID_REMAP_CACHE[value] else: import warnings warnings.warn( ( "Houston, we have a potential logic problem. " "The UUID_REMAP_CACHE should have been fully populated, but `{}` was missing. " "assuming the root cause of the miss is a nested definition where the parent " "def_uuid already exists" ).format(value) ) # raise AssertionError( # 'Houston, we have a logic problem. ' # 'The UUID_REMAP_CACHE should have been fully populated, but `{}` was missing'.format( # value # )) else: value = cls._remap_snapshot_uuids(value) remapped[key] = value return remapped @classmethod def _overwrite_model(cls, obj, item, overwrite, prompt=False, allow_snapshot_uuid_remap=False): """Overwrite an existing model object with new data from item""" if "snapshot" in item and allow_snapshot_uuid_remap: item["snapshot"] = obj._remap_snapshot_uuids(item["snapshot"]) result = base.SESSION.put("/api/{}/{}".format(cls.__api__, obj.uuid), json=item) data = result.json() # clear cache on overwrite global CACHE if "uuid" in data and data["uuid"] in CACHE: del CACHE[data["uuid"]] key = (cls.__name__, obj.name) if key in NAME_CACHE: del NAME_CACHE[key] return cls.parse_response(data, overwrite=overwrite, prompt=prompt) @classmethod def _create_model(cls, item, overwrite=False, prompt=False): """Create a new model object where none exists already given the information in item""" result = base.SESSION.post("/api/{}".format(cls.__api__), json=item) return cls.parse_response(result.json(), overwrite=overwrite, prompt=prompt)
[docs] @classmethod def parse_import(cls, data, overwrite=False, prompt=False, allow_snapshot_uuid_remap=False): """ Method for formatting import-style data to data that can be used in a backend POST/PUT request. Returns: A data structure to use in a POST/PUT request for creating or updating the model. """ return data
[docs] @classmethod def parse_response(cls, data, overwrite=False, prompt=False, allow_snapshot_uuid_remap=False): """ Method for parsing request returned from POST/PUT during object creation. This method is responsible for returning an object after create() is called. :param allow_snapshot_uuid_remap: """ return cls.from_data(data)
[docs] def export( self, filename=None, deep=False, filter_empty=False, use_dump_format=False, use_hub_format=False, versioned=False, ): """Export a yaml-formatted object. Args: filename (str): Filename to write yaml output to deep (bool): Perform a deep (True) or shallow (False) export filter_empty (bool): Attributes where the value is None or empty string are filtered from export. use_dump_format (bool): Whether to use the "dump" yaml format. use_hub_format (bool): Whether to use the hub yaml format versioned (bool): If true, attempts to export a snapshotted/pinned export. Note: not all models support pinning. Note: A deep export embeds all nested objects (e.g.: protocols of a workflow). A deep export is therefore the yaml analog to the .lab7 file. A shallow export uses names as references to nested objects. """ if versioned: deep = True if use_hub_format: export_dict = self._export_hub(deep, versioned) else: export_dict = self._export(deep, filter_empty, versioned) # remove 'augment' metadata export_dict.get("augment", {}).pop("imported_content", None) export_dict.get("augment", {}).pop("imported_at", None) export_dict.get("meta", {}).get("augment", {}).pop("imported_content", None) export_dict.get("meta", {}).get("augment", {}).pop("imported_at", None) if not use_hub_format: fixed_id = export_dict.pop("fixed_id", None) name = export_dict.get("name", None) if fixed_id: if name and (is_uuid(fixed_id) or fixed_id == name): key = name export_dict.pop("name", None) else: key = fixed_id else: key = name export_dict.pop("name", None) export_dict = {key: export_dict} from esp.export import ModelDumper, str_presenter, DEFAULT_DUMP_CONF import yaml if use_dump_format: yaml.add_representer(str, str_presenter) def dump(obj, out=None): if out: yaml.dump(obj, out, Dumper=ModelDumper, **DEFAULT_DUMP_CONF["dump_config"]) else: return yaml.dump(obj, Dumper=ModelDumper, **DEFAULT_DUMP_CONF["dump_config"]) else: dump = yaml.dump if filename is None: return export_dict elif filename == "-": print(dump(export_dict)) else: with open(filename, "w") as out: dump(export_dict, out)
def _isempty(self, value): if value is None: return True if hasattr(value, "__len__") and len(value) == 0: return True return False def _export_hub(self, deep, versioned=False): """Export model in hub yaml format. versioned not currently supported""" result = base.SESSION.get("/api/{}/{}/export?format=yaml".format(self.__api__, self.uuid)) export_dict = yaml.safe_load(result.content) export_dict["_format"] = "hub" augment = export_dict.get("augment", {}) if augment is None: export_dict["augment"] = {} return export_dict @classmethod def _import_hub(cls, data): data = cls.parse_import_hub(data) result = base.SESSION.post("/api/{}/import".format(cls.__api__), data=json.dumps(data, default=str)) return cls.from_data(result.json()) @classmethod def parse_import_hub(cls, data): if "_format" in data: data.pop("_format") return data def _export(self, deep, filter_empty, versioned=False): # try/except is needed to handle cases like Worksheet which will # try and fail to load @cached fields when hasattr() is called. try: if not hasattr(self, "__exportable__"): raise NotImplementedError() except: raise NotImplementedError("{} is not exportable.".format(type(self))) export_dict = {} formatters = {} if hasattr(self, "__export_format__"): formatters.update(self.__export_format__) formatter_prop = "__{}_format__".format("deep" if deep else "shallow") if hasattr(self, formatter_prop): formatters.update(getattr(self, formatter_prop)) # don't use global ALIAS for this as the impacts # would be too far-reaching. But class-specific alias # works as a fine-grained control mechanism on output # keys. rev_alias = {v: k for k, v in self.__aliases__.items()} if versioned and self.__versioned_exportable__: props_to_export = list(self.__exportable__) + list(self.__versioned_exportable__) else: props_to_export = list(self.__exportable__) for prop in props_to_export: if prop in formatters: aspec = inspect.getfullargspec(formatters[prop]) kwargs = {} if "deep" in aspec.args: kwargs["deep"] = deep if "filter_empty" in aspec.args: kwargs["filter_empty"] = filter_empty if "versioned" in aspec.args: kwargs["versioned"] = versioned value = formatters[prop](self, **kwargs) elif hasattr(self, prop): value = getattr(self, prop) if isinstance(value, composite): value = value.json() if isinstance(value, (list, tuple)): value = yaml_str_list(value) else: value = yaml_str(value) else: continue if not self._isempty(value) or not filter_empty: export_dict[rev_alias.get(prop, prop)] = value return export_dict def _data_by_uuid(self): try: return base.SESSION.get("/api/{}/{}".format(self.__api__, self.ident)).json() except: return {} def _data_by_name(self): """ Method for querying data by specified name. .. note:: There are a lot of backend inconsistencies here that require the logic below. Ideally, a query with name={} should just resolve correctly without additional parsing. """ # format query self._params.update({"name": self.ident}) query = "" for key, val in self._params.items(): param = self._param_to_queryarg(val) query += key + "=" + quote_plus(param) + "&" # query ret = base.SESSION.get("/api/{}?{}".format(self.__api__, query)).json() # return nothing if no results if len(ret) == 0: return {} # look for associated name in results for dat in ret: # Note: shouldn't be necessary starting in 2.4.1 b/c we reverted the name filter to be exact match # again, but keeping in case. if dat["name"] == self.ident: return dat return {} def _data_by_fixed_id(self): """ Method for querying data by fixed_id """ # in 2.4.1, fixed_id is captured as the barcode under the hood... query = "" for key, val in self._params.items(): param = self._param_to_queryarg(val) query += key + "=" + quote_plus(param) + "&" query += "barcode={}".format(quote_plus(self.ident)) ret = base.SESSION.get("/api/{}?{}".format(self.__api__, query)).json() if len(ret) == 0: return {} return ret[0] @classmethod def _param_to_queryarg(cls, val): if isinstance(val, six.string_types): return val elif isinstance(val, BaseModel): return val.name else: raise AssertionError( "No rule for using parameter {} as query " "parameter for {}.".format(val, cls.__name__) ) @property def augment(self): """ Proxy for setting Model.data.meta.augment. """ if "meta" not in self.data: self.data["meta"] = {} if "augment" not in self.data["meta"]: self.data["meta"]["augment"] = {} return self.data["meta"]["augment"] @augment.setter def augment(self, value): """ Proxy for setting Model.data.meta.augment. """ if "meta" not in self.data: self.data["meta"] = {} self.data["meta"]["augment"] = value return def _set_data_version(self): """ Ensures that the data loaded is correct given the requested version. Should only be called after _data has been initially/tentatively resolved. """ if not self._version or not self.__version_api__: return version_is_uuid = is_uuid(self._version) try: pointer_uuid = self.uuid # AssertionError will be raised if the ESP object is not found, # in which case, just return. except AssertionError: return if ( # Have to use get for the def_uuid until the signature flow API # returns a structure more like other versioned objects. (version_is_uuid and self._data.get("def_uuid") != self._version) or self._data.meta.get("snapshot", {}).get("name") != self._version ): # should be able to speed this up... for version in self.versions: if self._version == version.uuid or self._version == version.meta.get("snapshot", {}).get("name"): self._data = version.data break else: raise MissingVersionError(type(self), self.uuid, self._version) # def_uuid will be the version uuid... we still need the wrapper object UUID to be the correct UUID. # This extra conditional is to handle the fact that signature flows API doesn't work # quite like other versioned objects. Can be removed once the API functions more similarly. if "def_uuid" not in self._data: self._data["def_uuid"] = self._data["uuid"] self._data["uuid"] = pointer_uuid if self._snapshot is None and "snapshot" in self._data.get("meta", {}): self._snapshot = self._data.meta.snapshot @property def data(self): """ Property containing all information about an object. This is the information typically returned from a GET request using the object endpoint and uuid. """ if self._data is None: data = {} ident_uuid = is_uuid(self.ident) # look in cache for existing object if base.SESSION.cache: if ident_uuid: if self.ident in CACHE: self._data = composite(CACHE[self.ident]) self._set_data_version() return self._data else: # careful... tread very carefully here... # b/c we could have a _definition_ by name and class, # or we could have the _wrapper type_ by name and class... and those # may be different. Note: this code is in here as a forwards look # to what could be, but we never actually put anything into the NAME_CACHE # as yet. uuid = NAME_CACHE.get((self.__class__.__name__, self.ident), None) if uuid and uuid in CACHE: self._data = composite(CACHE[uuid]) self._set_data_version() return self._data # do query if cache doesn't exist if len(data) == 0: if ident_uuid: data = self._data_by_uuid() elif self.ident is not None: # prefer data by name for 2.4.1 to aid in the transition. data = self._data_by_name() if not data: self._params.pop("name", None) data = self._data_by_fixed_id() else: data = {} # set empty data structure for no results if data is None or len(data) == 0: data = {} # cache data if specified if base.SESSION.cache: if "uuid" in data: CACHE[data["uuid"]] = data if "name" in data: # Commented out for now because it triggers a lot of # test errors to use the name cache, likely because of # version mismatches. _maybe_ not a problem for a production # ESP, but definitely a problem for the test suite. # the rest of the NAME_CACHE code can continue on as-is # since it's never actually triggered if the cache isn't built # here anyway. # NAME_CACHE[(self.__class__.__name__, data['name'])] = data['uuid'] pass self._data = composite(data) self._set_data_version() return self._data def refresh(self): if "uuid" in self.data and self.uuid in CACHE: del CACHE[self.uuid] key = (self.__class__.__name__, self.name) if key in NAME_CACHE: del NAME_CACHE[key] self._data = None self.data # there might not be a need for _data with cached.invalidate cached.invalidate(self, "refresh") return
[docs] def exists(self): """ Test if object exists in esp database. TODO: THINK ABOUT HOW THIS SHOULD BE USED -- fill in after usage documentation. """ try: return self.data.uuid is not None except AttributeError: return False
[docs] def drop(self, deep=False): """ Issue DELETE request to remove object in ESP database. """ if self.exists(): if self.__droppable__: logging.info( "Dropping {}: {}".format(self.__class__.__name__, self.name if "name" in self.data else self.uuid) ) result = base.SESSION.delete("/api/{}/{}".format(self.__api__, quote_plus(self.uuid))) return result else: raise AssertionError(f"Dropping a {type(self).__name__} is not allowed!") return None
[docs] def json(self): """ Return dictionary object with object metadata. """ return self.data.json()
[docs] def summary(self): """ Print organized description of entity. """ for key in sorted(self.data.keys()): print("{}: {}".format(key, self.data[key])) return
[docs] def push(self, dry=False): """ Run put with metadata to update entry. """ # find and format mutable data data = self.json() payload = {} for key in self.__mutable__: if key in self.__push_format__: if callable(self.__push_format__[key]): payload[key] = self.__push_format__[key](self) else: payload[key] = self.__push_format__[key] elif key in data: payload[key] = data[key] # return 'dry' push with payload only if dry: return payload # push data result = base.SESSION.put("/api/{}/{}".format(self.__api__, self.uuid), json=payload) self.data.update(result.json()) cached.invalidate(self, "refresh") # update cache if base.SESSION.cache: CACHE[self.uuid] = dict(self.data) return
@cached.tag("refresh") def history(self): actions = base.SESSION.get("/api/resources/{}/actions".format(self.uuid)).json()["actions"] return sorted([ResourceAction(x) for x in actions], key=lambda x: x.timestamp) @cached.tag("refresh") def versions(self): """ Return full history of item in ESP database as objects of this type. The order of the list is most recent (index 0) to least recent (index -1). """ if self.__version_api__ is None: raise AssertionError( "Versioning not supported for model. " "Please set __version_api__ property in model " "to enable versioning." ) ver = [] res = base.SESSION.get("/api/{}".format(self.__version_api__)) for item in res.json(): # name can change from a version to other version # maybe we should use just barcode if item.get("name") == self.name or item.get("barcode") == self.barcode: ver.append(type(self).from_data(item)) return sorted(ver, key=lambda x: x.created_timestamp, reverse=True) @cached def created_timestamp(self): """ Return datetime object with creation timestamp. """ return utils.parse_esp_timestamp(self.created_at) @cached def updated_timestamp(self): """ Return datetime object with last update timestamp. """ return utils.parse_esp_timestamp(self.updated_at) @cached def owner(self): """ Relationship to owner (creator) of object. """ from .admin import User ownerstr = self.data.get("owner") if not ownerstr: return None match = re.match(r"(.*) \((.*@.*)\)", ownerstr) if not match: raise ValueError( "Expected owner to be formatted `user name (handle@domain)` " "but was: {}".format(ownerstr) ) return User(match.group(1))
[docs]class LinkedModel(BaseModel): """ Subclass of BaseModel for decorating parent-child relationship functionality. All items in the ESP database /can/ be linked, but there needs to be more formalism around what /should/ be linked in the database. Because of this, we should restrict access to linking functionality to specific models until the pattern for provenance is more flushed out on the backend. Args: ident (str): Name or uuid to use for initializing object. data (dict): Data to use for instantiating object. This only needs to be specified if you already have all metadata related to an object. """ # def push(self): # # TODO: OVERWRITE PUSH METHOD TO ALSO UPDATE DEPENDENCIES/LINKS # # this would create for a much better api for managing # # parent/child relationships, but it needs a route on the # # backend (.../dependencies/set): # # x.children.append(Sample('x')) # # x.children = x.children[:-1] # # x.push() # # ... instead of ... # # x.add_children(Sample('x')) # # x.remove_children(x.children[-1]) # return
[docs] def add_dependents(self, items, label=None): """ Add resource dependencies for item or list of items. """ return self.update_link(items, link="dependents", method="add", label=label)
[docs] def remove_dependents(self, items, label=None): """ Remove resource dependencies for item or list of items. """ return self.update_link(items, link="dependents", method="remove", label=label)
[docs] def add_priors(self, items, label=None): """ Add resource priors for item or list of items. """ return self.update_link(items, link="priors", method="add", label=label)
[docs] def remove_priors(self, items, label=None): """ Remove resource priors for item or list of items. """ return self.update_link(items, link="priors", method="remove", label=label)
[docs] def add_children(self, items): """ Add specified items as children to object. """ return self.add_dependents(items, "begat")
[docs] def remove_children(self, items): """ Remove specified items as children to object. """ return self.remove_dependents(items, "begat")
[docs] def add_parents(self, items): """ Add specified items as parent to object. """ return self.add_priors(items, "begat")
[docs] def remove_parents(self, items): """ Remove specified items as parent to object. """ return self.remove_priors(items, "begat")
@cached.tag("refresh") def dependencies(self): """ Return dependency information for item. """ res = base.SESSION.get("/api/resources/{}/dependencies".format(self.uuid)) return composite(res.json()["dependencies"]) @property def priors(self): """ Proxy for accessing prior item dependencies. TODO: Re-map the dependencies to their related class types in the client. This currently just returns the JSON associated with the dependency data. """ return self.dependencies.priors if "priors" in self.dependencies else [] @property def dependents(self): """ Proxy for accessing dependent item dependencies. TODO: Re-map the dependencies to their related class types in the client. This currently just returns the JSON associated with the dependency data. """ return self.dependencies.dependents if "dependents" in self.dependencies else []
[docs] def get_priors(self, label): """ Method for getting priors tagged with specific label. Args: label (str): Label to filter priors with. """ return [x[0] for x in self.priors if x[1] == label]
[docs] def get_dependents(self, label): """ Method for getting dependencies tagged with specific label. Args: label (str): Label to filter dependencies with. """ return [x[0] for x in self.dependents if x[1] == label]
@cached.tag("refresh") def parents(self): """ Return sample parent objects. .. note:: Changes to these data does not push changes to the backend. Use self.add_parents for that functionality. """ return [type(self)(x["uuid"]) for x in self.get_priors("begat")] @cached.tag("refresh") def children(self): """ Return sample children. .. note:: Changes to these data does not push changes to the backend. Use self.add_children for that functionality. """ return [type(self)(x["uuid"]) for x in self.get_dependents("begat")]
[docs] def generation(self, entity_type, which="closestup"): """ Efficiently return generation of a particular type. Args: entity_type (str): The type of entity to search for in the hierarchy. which (str): One of "closestup", "furthestup", "allup" for the nearest, furthest, or all ancestors of the given type, respectively, or "closestdown", 'furthestdown', 'alldown' for the nearest, furthest, or all descendants of the given type, respectively. Returns: List[Sample] - List of samples of the appropriate type. May be empty. Never null. """ valid_which = ("closestup", "furthestup", "allup", "closestdown", "furthestdown", "alldown") if which not in valid_which: raise ValueError("Invalid `which`. Must be one of `{}".format(valid_which)) results = utils.eval_expression( "sample_generation('{}:{}', sample_uuid='{}')".format(which, entity_type, self.uuid) ) return [type(self).from_data(x) for x in results]
class ResourceAction(object): def __init__(self, data): self.data = composite(data) @cached def agent(self): from .admin import User return User(self.data.agent) @property def message(self): return self.data.desc desc = message @cached def timestamp(self): return datetime.datetime.fromisoformat(self.data.timestamp[:-1]) @property def resource_uuid(self): return self.data.resource