Source code for schedula.utils.blue

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
#
# Copyright 2015-2019, Vincenzo Arcidiacono;
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl

"""
It provides a Blueprint class to construct a Dispatcher and SubDispatch objects.
"""
from .cst import EMPTY
from ..dispatcher import Dispatcher


def _call_kw(loc, skip=('self', 'kwargs')):
    return {k: v for k, v in loc.items()
            if not (k.startswith('__') or k in skip)}


def _init(obj, memo=None):
    return obj.register(memo=memo) if isinstance(obj, Blueprint) else obj


def _safe_call(fn, *args, memo=None, **kwargs):
    return fn(
        *(_init(a, memo) for a in args),
        **{k: _init(v, memo=memo) for k, v in kwargs.items()}
    )


[docs]class Blueprint: """Base Blueprint class.""" cls = Dispatcher
[docs] def __init__(self, *args, **kwargs): self.args = args self.kwargs = kwargs self.deferred = []
def __getstate__(self): d, keys = self.__dict__, ('args', 'kwargs', 'deferred', 'cls') return {k: d[k] for k in keys if k in d} def _set_cls(self, cls): self.cls = cls return self
[docs] def register(self, obj=None, memo=None): """ Creates a :class:`Blueprint.cls` and calls each deferred operation. :param obj: The initialized object with which to call all deferred operations. :type obj: object :param memo: A dictionary to cache registered Blueprints. :type memo: dict[Blueprint,T] :return: The initialized object. :rtype: Blueprint.cls | Blueprint **--------------------------------------------------------------------** Example:: >>> import schedula as sh >>> blue = sh.BlueDispatcher().add_func(len, ['lenght']) >>> blue.register() <schedula.dispatcher.Dispatcher object at ...> """ if memo and self in memo: obj = memo[self] if obj is not None: return obj if obj is None: obj = _safe_call(self.cls, *self.args, memo=memo, **self.kwargs) for method, kwargs in self.deferred: _safe_call(getattr(obj, method), memo=memo, **kwargs) if memo is not None: memo[self] = obj return obj
[docs] def extend(self, *blues, memo=None): """ Extends deferred operations calling each operation of given Blueprints. :param blues: Blueprints or Dispatchers to extend deferred operations. :type blues: Blueprint | schedula.dispatcher.Dispatcher :param memo: A dictionary to cache Blueprints. :type memo: dict[T,Blueprint] :return: Self. :rtype: Blueprint **--------------------------------------------------------------------** Example:: >>> import schedula as sh >>> blue = sh.BlueDispatcher() >>> blue.extend( ... BlueDispatcher().add_func(len, ['length']), ... BlueDispatcher().add_func(callable, ['is_callable']) ... ) <schedula.utils.blue.BlueDispatcher object at ...> """ memo = {} if memo is None else memo for blue in blues: if isinstance(blue, Dispatcher): blue = blue.blue(memo=memo) for method, kwargs in blue.deferred: getattr(self, method)(**kwargs) return self
def __call__(self, *args, **kwargs): """Calls the registered Blueprint.""" return self.register(memo={})(*args, **kwargs)
def _parent_blue(func, memo=None): from .dsp import add_args, SubDispatch from functools import partial memo = {} if memo is None else memo if isinstance(func, partial): return func.__class__( *(_parent_blue(v, memo) for v in (func.func,) + func.args), **{k: _parent_blue(v, memo) for k, v in func.keywords.items()} ) elif isinstance(func, add_args): return func.__class__( *(_parent_blue(v, memo) for v in (func.func, func.n, func.callback)) ) elif isinstance(func, (Dispatcher, SubDispatch)): return func.blue(memo) return func
[docs]class BlueDispatcher(Blueprint): """ Blueprint object is a blueprint of how to construct or extend a Dispatcher. **------------------------------------------------------------------------** **Example**: Create a BlueDispatcher:: >>> import schedula as sh >>> blue = sh.BlueDispatcher(name='Dispatcher') Add data/function/dispatcher nodes to the dispatcher map as usual:: >>> blue.add_data(data_id='a', default_value=3) <schedula.utils.blue.BlueDispatcher object at ...> >>> @sh.add_function(blue, True, True, outputs=['c']) ... def diff_function(a, b=2): ... return b - a ... >>> blue.add_function(function=max, inputs=['c', 'd'], outputs=['e']) <schedula.utils.blue.BlueDispatcher object at ...> >>> from math import log >>> sub_blue = sh.BlueDispatcher(name='Sub-Dispatcher') >>> sub_blue.add_data(data_id='a', default_value=2).add_function( ... function=log, inputs=['a'], outputs=['b'] ... ) <schedula.utils.blue.BlueDispatcher object at ...> >>> blue.add_dispatcher(sub_blue, ('a',), {'b': 'f'}) <schedula.utils.blue.BlueDispatcher object at ...> You can set the default values as usual:: >>> blue.set_default_value(data_id='c', value=1, initial_dist=6) <schedula.utils.blue.BlueDispatcher object at ...> You can also create a `Blueprint` out of `SubDispatchFunction` and add it to the `Dispatcher` as follow:: >>> func = sh.SubDispatchFunction(sub_blue, 'func', ['a'], ['b']) >>> blue.add_from_lists(fun_list=[ ... dict(function=func, inputs=['a'], outputs=['d']), ... dict(function=func, inputs=['c'], outputs=['g']), ... ]) <schedula.utils.blue.BlueDispatcher object at ...> Finally you can create the dispatcher object using the method `new`: .. dispatcher:: dsp :opt: graph_attr={'ratio': '1'} :code: >>> dsp = blue.register(memo={}); dsp <schedula.dispatcher.Dispatcher object at ...> Or dispatch, calling the Blueprint object: .. dispatcher:: sol :opt: graph_attr={'ratio': '1'} :code: >>> sol = blue({'a': 1}); sol Solution([('a', 1), ('b', 2), ('c', 1), ('d', 0.0), ('f', 0.0), ('e', 1), ('g', 0.0)]) """
[docs] def __init__(self, dmap=None, name='', default_values=None, raises=False, description='', executor=None): super(BlueDispatcher, self).__init__(**_call_kw(locals()))
[docs] def add_data(self, data_id=None, default_value=EMPTY, initial_dist=0.0, wait_inputs=False, wildcard=None, function=None, callback=None, description=None, filters=None, await_result=None, **kwargs): """ Add a single data node to the dispatcher. :param data_id: Data node id. If None will be assigned automatically ('unknown<%d>') not in dmap. :type data_id: str, optional :param default_value: Data node default value. This will be used as input if it is not specified as inputs in the ArciDispatch algorithm. :type default_value: T, optional :param initial_dist: Initial distance in the ArciDispatch algorithm when the data node default value is used. :type initial_dist: float, int, optional :param wait_inputs: If True ArciDispatch algorithm stops on the node until it gets all input estimations. :type wait_inputs: bool, optional :param wildcard: If True, when the data node is used as input and target in the ArciDispatch algorithm, the input value will be used as input for the connected functions, but not as output. :type wildcard: bool, optional :param function: Data node estimation function. This can be any function that takes only one dictionary (key=function node id, value=estimation of data node) as input and return one value that is the estimation of the data node. :type function: callable, optional :param callback: Callback function to be called after node estimation. This can be any function that takes only one argument that is the data node estimation output. It does not return anything. :type callback: callable, optional :param description: Data node's description. :type description: str, optional :param filters: A list of functions that are invoked after the invocation of the main function. :type filters: list[function], optional :param await_result: If True the Dispatcher waits data results before assigning them to the solution. If a number is defined this is used as `timeout` for `Future.result` method [default: False]. Note this is used when asynchronous or parallel execution is enable. :type await_result: bool|int|float, optional :param kwargs: Set additional node attributes using key=value. :type kwargs: keyword arguments, optional :return: Self. :rtype: BlueDispatcher """ kwargs.update(_call_kw(locals())) self.deferred.append(('add_data', kwargs)) return self
[docs] def add_function(self, function_id=None, function=None, inputs=None, outputs=None, input_domain=None, weight=None, inp_weight=None, out_weight=None, description=None, filters=None, await_domain=None, await_result=None, **kwargs): """ Add a single function node to dispatcher. :param function_id: Function node id. If None will be assigned as <fun.__name__>. :type function_id: str, optional :param function: Data node estimation function. :type function: callable, optional :param inputs: Ordered arguments (i.e., data node ids) needed by the function. :type inputs: list, optional :param outputs: Ordered results (i.e., data node ids) returned by the function. :type outputs: list, optional :param input_domain: A function that checks if input values satisfy the function domain. This can be any function that takes the same inputs of the function and returns True if input values satisfy the domain, otherwise False. In this case the dispatch algorithm doesn't pass on the node. :type input_domain: callable, optional :param weight: Node weight. It is a weight coefficient that is used by the dispatch algorithm to estimate the minimum workflow. :type weight: float, int, optional :param inp_weight: Edge weights from data nodes to the function node. It is a dictionary (key=data node id) with the weight coefficients used by the dispatch algorithm to estimate the minimum workflow. :type inp_weight: dict[str, float | int], optional :param out_weight: Edge weights from the function node to data nodes. It is a dictionary (key=data node id) with the weight coefficients used by the dispatch algorithm to estimate the minimum workflow. :type out_weight: dict[str, float | int], optional :param description: Function node's description. :type description: str, optional :param filters: A list of functions that are invoked after the invocation of the main function. :type filters: list[function], optional :param await_domain: If True the Dispatcher waits all input results before executing the `input_domain` function. If a number is defined this is used as `timeout` for `Future.result` method [default: True]. Note this is used when asynchronous or parallel execution is enable. :type await_domain: bool|int|float, optional :param await_result: If True the Dispatcher waits output results before assigning them to the workflow. If a number is defined this is used as `timeout` for `Future.result` method [default: False]. Note this is used when asynchronous or parallel execution is enable. :type await_result: bool|int|float, optional :param kwargs: Set additional node attributes using key=value. :type kwargs: keyword arguments, optional """ kwargs.update(_call_kw(locals())) self.deferred.append(('add_function', kwargs)) return self
[docs] def add_func(self, function, outputs=None, weight=None, inputs_kwargs=False, inputs_defaults=False, filters=None, input_domain=None, await_domain=None, await_result=None, inp_weight=None, out_weight=None, description=None, inputs=None, function_id=None, **kwargs): """ Add a single function node to dispatcher. :param inputs_kwargs: Do you want to include kwargs as inputs? :type inputs_kwargs: bool :param inputs_defaults: Do you want to set default values? :type inputs_defaults: bool :param function_id: Function node id. If None will be assigned as <fun.__name__>. :type function_id: str, optional :param function: Data node estimation function. :type function: callable, optional :param inputs: Ordered arguments (i.e., data node ids) needed by the function. If None it will take parameters names from function signature. :type inputs: list, optional :param outputs: Ordered results (i.e., data node ids) returned by the function. :type outputs: list, optional :param input_domain: A function that checks if input values satisfy the function domain. This can be any function that takes the same inputs of the function and returns True if input values satisfy the domain, otherwise False. In this case the dispatch algorithm doesn't pass on the node. :type input_domain: callable, optional :param weight: Node weight. It is a weight coefficient that is used by the dispatch algorithm to estimate the minimum workflow. :type weight: float, int, optional :param inp_weight: Edge weights from data nodes to the function node. It is a dictionary (key=data node id) with the weight coefficients used by the dispatch algorithm to estimate the minimum workflow. :type inp_weight: dict[str, float | int], optional :param out_weight: Edge weights from the function node to data nodes. It is a dictionary (key=data node id) with the weight coefficients used by the dispatch algorithm to estimate the minimum workflow. :type out_weight: dict[str, float | int], optional :param description: Function node's description. :type description: str, optional :param filters: A list of functions that are invoked after the invocation of the main function. :type filters: list[function], optional :param await_domain: If True the Dispatcher waits all input results before executing the `input_domain` function. If a number is defined this is used as `timeout` for `Future.result` method [default: True]. Note this is used when asynchronous or parallel execution is enable. :type await_domain: bool|int|float, optional :param await_result: If True the Dispatcher waits output results before assigning them to the workflow. If a number is defined this is used as `timeout` for `Future.result` method [default: False]. Note this is used when asynchronous or parallel execution is enable. :type await_result: bool|int|float, optional :param kwargs: Set additional node attributes using key=value. :type kwargs: keyword arguments, optional :return: Self. :rtype: BlueDispatcher """ kwargs.update(_call_kw(locals())) self.deferred.append(('add_func', kwargs)) return self
[docs] def add_dispatcher(self, dsp, inputs, outputs, dsp_id=None, input_domain=None, weight=None, inp_weight=None, description=None, include_defaults=False, await_domain=None, **kwargs): """ Add a single sub-dispatcher node to dispatcher. :param dsp: Child dispatcher that is added as sub-dispatcher node to the parent dispatcher. :type dsp: Dispatcher | dict[str, list] :param inputs: Inputs mapping. Data node ids from parent dispatcher to child sub-dispatcher. :type inputs: dict[str, str | list[str]] | tuple[str] | (str, ..., dict[str, str | list[str]]) :param outputs: Outputs mapping. Data node ids from child sub-dispatcher to parent dispatcher. :type outputs: dict[str, str | list[str]] | tuple[str] | (str, ..., dict[str, str | list[str]]) :param dsp_id: Sub-dispatcher node id. If None will be assigned as <dsp.name>. :type dsp_id: str, optional :param input_domain: A function that checks if input values satisfy the function domain. This can be any function that takes the a dictionary with the inputs of the sub-dispatcher node and returns True if input values satisfy the domain, otherwise False. .. note:: This function is invoked every time that a data node reach the sub-dispatcher node. :type input_domain: (dict) -> bool, optional :param weight: Node weight. It is a weight coefficient that is used by the dispatch algorithm to estimate the minimum workflow. :type weight: float, int, optional :param inp_weight: Edge weights from data nodes to the sub-dispatcher node. It is a dictionary (key=data node id) with the weight coefficients used by the dispatch algorithm to estimate the minimum workflow. :type inp_weight: dict[str, int | float], optional :param description: Sub-dispatcher node's description. :type description: str, optional :param include_defaults: If True the default values of the sub-dispatcher are added to the current dispatcher. :type include_defaults: bool, optional :param await_domain: If True the Dispatcher waits all input results before executing the `input_domain` function. If a number is defined this is used as `timeout` for `Future.result` method [default: True]. Note this is used when asynchronous or parallel execution is enable. :type await_domain: bool|int|float, optional :param kwargs: Set additional node attributes using key=value. :type kwargs: keyword arguments, optional :return: Self. :rtype: BlueDispatcher """ kwargs.update(_call_kw(locals())) self.deferred.append(('add_dispatcher', kwargs)) return self
[docs] def add_from_lists(self, data_list=None, fun_list=None, dsp_list=None): """ Add multiple function and data nodes to dispatcher. :param data_list: It is a list of data node kwargs to be loaded. :type data_list: list[dict], optional :param fun_list: It is a list of function node kwargs to be loaded. :type fun_list: list[dict], optional :param dsp_list: It is a list of sub-dispatcher node kwargs to be loaded. :type dsp_list: list[dict], optional :return: Self. :rtype: BlueDispatcher """ self.deferred.append(('add_from_lists', _call_kw(locals()))) return self
[docs] def set_default_value(self, data_id, value=EMPTY, initial_dist=0.0): """ Set the default value of a data node in the dispatcher. :param data_id: Data node id. :type data_id: str :param value: Data node default value. .. note:: If `EMPTY` the previous default value is removed. :type value: T, optional :param initial_dist: Initial distance in the ArciDispatch algorithm when the data node default value is used. :type initial_dist: float, int, optional :return: Self. :rtype: BlueDispatcher """ self.deferred.append(('set_default_value', _call_kw(locals()))) return self