Source code for schedula.utils.dsp

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
#
# Copyright 2015-2020, Vincenzo Arcidiacono;
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl

"""
It provides tools to create models with the
:class:`~schedula.dispatcher.Dispatcher`.
"""
import collections
import copy as _copy
import functools
import itertools
import math
from .base import Base
from .exc import DispatcherError
from .gen import Token

__author__ = 'Vincenzo Arcidiacono <vinci1it2000@gmail.com>'


[docs]def stlp(s): """ Converts a string in a tuple. """ if isinstance(s, str): return s, return s
[docs]def combine_dicts(*dicts, copy=False, base=None): """ Combines multiple dicts in one. :param dicts: A sequence of dicts. :type dicts: dict :param copy: If True, it returns a deepcopy of input values. :type copy: bool, optional :param base: Base dict where combine multiple dicts in one. :type base: dict, optional :return: A unique dict. :rtype: dict Example:: >>> sorted(combine_dicts({'a': 3, 'c': 3}, {'a': 1, 'b': 2}).items()) [('a', 1), ('b', 2), ('c', 3)] """ if len(dicts) == 1 and base is None: # Only one input dict. cd = dicts[0].copy() else: cd = {} if base is None else base # Initialize empty dict. for d in dicts: # Combine dicts. if d: # noinspection PyTypeChecker cd.update(d) # Return combined dict. return {k: _copy.deepcopy(v) for k, v in cd.items()} if copy else cd
[docs]def kk_dict(*kk, **adict): """ Merges and defines dictionaries with values identical to keys. :param kk: A sequence of keys and/or dictionaries. :type kk: object | dict, optional :param adict: A dictionary. :type adict: dict, optional :return: Merged dictionary. :rtype: dict Example:: >>> sorted(kk_dict('a', 'b', 'c').items()) [('a', 'a'), ('b', 'b'), ('c', 'c')] >>> sorted(kk_dict('a', 'b', **{'a-c': 'c'}).items()) [('a', 'a'), ('a-c', 'c'), ('b', 'b')] >>> sorted(kk_dict('a', {'b': 'c'}, 'c').items()) [('a', 'a'), ('b', 'c'), ('c', 'c')] >>> sorted(kk_dict('a', 'b', **{'b': 'c'}).items()) Traceback (most recent call last): ... ValueError: keyword argument repeated (b) >>> sorted(kk_dict({'a': 0, 'b': 1}, **{'b': 2, 'a': 3}).items()) Traceback (most recent call last): ... ValueError: keyword argument repeated (a, b) """ for k in kk: if isinstance(k, dict): if not set(k).isdisjoint(adict): k = ', '.join(sorted(set(k).intersection(adict))) raise ValueError('keyword argument repeated ({})'.format(k)) adict.update(k) elif k in adict: raise ValueError('keyword argument repeated ({})'.format(k)) else: adict[k] = k return adict
[docs]def bypass(*inputs, copy=False): """ Returns the same arguments. :param inputs: Inputs values. :type inputs: T :param copy: If True, it returns a deepcopy of input values. :type copy: bool, optional :return: Same input values. :rtype: (T, ...), T Example:: >>> bypass('a', 'b', 'c') ('a', 'b', 'c') >>> bypass('a') 'a' """ if len(inputs) == 1: inputs = inputs[0] # Same inputs. return _copy.deepcopy(inputs) if copy else inputs # Return inputs.
[docs]def summation(*inputs): """ Sums inputs values. :param inputs: Inputs values. :type inputs: int, float :return: Sum of the input values. :rtype: int, float Example:: >>> summation(1, 3.0, 4, 2) 10.0 """ # Return the sum of the input values. return functools.reduce(lambda x, y: x + y, inputs)
[docs]def map_dict(key_map, *dicts, copy=False, base=None): """ Returns a dict with new key values. :param key_map: A dictionary that maps the dict keys ({old key: new key} :type key_map: dict :param dicts: A sequence of dicts. :type dicts: dict :param copy: If True, it returns a deepcopy of input values. :type copy: bool, optional :param base: Base dict where combine multiple dicts in one. :type base: dict, optional :return: A unique dict with new key values. :rtype: dict Example:: >>> d = map_dict({'a': 'c', 'b': 'd'}, {'a': 1, 'b': 1}, {'b': 2}) >>> sorted(d.items()) [('c', 1), ('d', 2)] """ it = combine_dicts(*dicts).items() # Combine dicts. get = key_map.get # Namespace shortcut. # Return mapped dict. return combine_dicts({get(k, k): v for k, v in it}, copy=copy, base=base)
[docs]def map_list(key_map, *inputs, copy=False, base=None): """ Returns a new dict. :param key_map: A list that maps the dict keys ({old key: new key} :type key_map: list[str | dict | list] :param inputs: A sequence of data. :type inputs: iterable | dict | int | float | list | tuple :param copy: If True, it returns a deepcopy of input values. :type copy: bool, optional :param base: Base dict where combine multiple dicts in one. :type base: dict, optional :return: A unique dict with new values. :rtype: dict Example:: >>> key_map = [ ... 'a', ... {'a': 'c'}, ... [ ... 'a', ... {'a': 'd'} ... ] ... ] >>> inputs = ( ... 2, ... {'a': 3, 'b': 2}, ... [ ... 1, ... {'a': 4} ... ] ... ) >>> d = map_list(key_map, *inputs) >>> sorted(d.items()) [('a', 1), ('b', 2), ('c', 3), ('d', 4)] """ d = {} if base is None else base # Initialize empty dict. for m, v in zip(key_map, inputs): if isinstance(m, dict): map_dict(m, v, base=d) # Apply a map dict. elif isinstance(m, list): map_list(m, *v, base=d) # Apply a map list. else: d[m] = v # Apply map. return combine_dicts(copy=copy, base=d) # Return dict.
[docs]def selector(keys, dictionary, copy=False, output_type='dict', allow_miss=False): """ Selects the chosen dictionary keys from the given dictionary. :param keys: Keys to select. :type keys: list, tuple, set :param dictionary: A dictionary. :type dictionary: dict :param copy: If True the output contains deep-copies of the values. :type copy: bool :param output_type: Type of function output: + 'list': a list with all values listed in `keys`. + 'dict': a dictionary with any outputs listed in `keys`. + 'values': if output length == 1 return a single value otherwise a tuple with all values listed in `keys`. :type output_type: str, optional :param allow_miss: If True it does not raise when some key is missing in the dictionary. :type allow_miss: bool :return: A dictionary with chosen dictionary keys if present in the sequence of dictionaries. These are combined with :func:`combine_dicts`. :rtype: dict Example:: >>> from functools import partial >>> fun = partial(selector, ['a', 'b']) >>> sorted(fun({'a': 1, 'b': 2, 'c': 3}).items()) [('a', 1), ('b', 2)] """ if not allow_miss: # noinspection PyUnusedLocal def check(key): return True else: def check(key): return key in dictionary if output_type == 'list': # Select as list. res = [dictionary[k] for k in keys if check(k)] return _copy.deepcopy(res) if copy else res elif output_type == 'values': return bypass(*[dictionary[k] for k in keys if check(k)], copy=copy) # Select as dict. return bypass({k: dictionary[k] for k in keys if check(k)}, copy=copy)
[docs]def replicate_value(value, n=2, copy=True): """ Replicates `n` times the input value. :param n: Number of replications. :type n: int :param value: Value to be replicated. :type value: T :param copy: If True the list contains deep-copies of the value. :type copy: bool :return: A list with the value replicated `n` times. :rtype: list Example:: >>> from functools import partial >>> fun = partial(replicate_value, n=5) >>> fun({'a': 3}) ({'a': 3}, {'a': 3}, {'a': 3}, {'a': 3}, {'a': 3}) """ return bypass(*[value] * n, copy=copy) # Return replicated values.
[docs]def parent_func(func, input_id=None): """ Return the parent function of a wrapped function (wrapped with :class:`functools.partial` and :class:`add_args`). :param func: Wrapped function. :type func: callable :param input_id: Index of the first input of the wrapped function. :type input_id: int :return: Parent function. :rtype: callable """ if isinstance(func, functools.partial): if input_id is not None: # noinspection PyTypeChecker input_id += len(func.args) return parent_func(func.func, input_id=input_id) elif isinstance(func, add_args): if input_id is not None: input_id -= func.n return parent_func(func.func, input_id=input_id) if input_id is None: return func else: return func, input_id
[docs]class add_args(object): """ Adds arguments to a function (left side). :param func: Function to wrap. :type func: callable :param n: Number of unused arguments to add to the left side. :type n: int :return: Wrapped function. :rtype: callable Example:: >>> import inspect >>> def original_func(a, b, *args, c=0): ... '''Doc''' ... return a + b + c >>> func = add_args(original_func, n=2) >>> func.__name__, func.__doc__ ('original_func', 'Doc') >>> func(1, 2, 3, 4, c=5) 12 >>> str(inspect.signature(func)) '(none, none, a, b, *args, c=0)' """
[docs] def __init__(self, func, n=1, callback=None): self.n = n self.callback = callback self.func = func for i in range(2): # noinspection PyBroadException try: self.__name__ = func.__name__ self.__doc__ = func.__doc__ break except AttributeError: func = parent_func(func)
@property def __signature__(self): return _get_signature(self.func, self.n) def __call__(self, *args, **kwargs): res = self.func(*args[self.n:], **kwargs) if self.callback: self.callback(res, *args, **kwargs) return res
def _get_signature(func, n=1): import inspect sig = inspect.signature(func) # Get function signature. def ept_par(): # Return none signature parameter. name = Token('none') return name, inspect.Parameter(name, inspect._POSITIONAL_OR_KEYWORD) # Update signature parameters. par = itertools.chain(*([p() for p in itertools.repeat(ept_par, n)], sig.parameters.items())) sig._parameters = sig._parameters.__class__(collections.OrderedDict(par)) return sig
[docs]def stack_nested_keys(nested_dict, key=(), depth=-1): """ Stacks the keys of nested-dictionaries into tuples and yields a list of k-v pairs. :param nested_dict: Nested dictionary. :type nested_dict: dict :param key: Initial keys. :type key: tuple, optional :param depth: Maximum keys depth. :type depth: int, optional :return: List of k-v pairs. :rtype: generator """ if depth != 0 and hasattr(nested_dict, 'items'): for k, v in nested_dict.items(): yield from stack_nested_keys(v, key=key + (k,), depth=depth - 1) else: yield key, nested_dict
[docs]def get_nested_dicts(nested_dict, *keys, default=None, init_nesting=dict): """ Get/Initialize the value of nested-dictionaries. :param nested_dict: Nested dictionary. :type nested_dict: dict :param keys: Nested keys. :type keys: object :param default: Function used to initialize a new value. :type default: callable, optional :param init_nesting: Function used to initialize a new intermediate nesting dict. :type init_nesting: callable, optional :return: Value of nested-dictionary. :rtype: generator """ if keys: default = default or init_nesting if keys[0] in nested_dict: nd = nested_dict[keys[0]] else: d = default() if len(keys) == 1 else init_nesting() nd = nested_dict[keys[0]] = d return get_nested_dicts(nd, *keys[1:], default=default, init_nesting=init_nesting) return nested_dict
[docs]def are_in_nested_dicts(nested_dict, *keys): """ Nested keys are inside of nested-dictionaries. :param nested_dict: Nested dictionary. :type nested_dict: dict :param keys: Nested keys. :type keys: object :return: True if nested keys are inside of nested-dictionaries, otherwise False. :rtype: bool """ if keys: # noinspection PyBroadException try: return are_in_nested_dicts(nested_dict[keys[0]], *keys[1:]) except Exception: # Key error or not a dict. return False return True
[docs]def combine_nested_dicts(*nested_dicts, depth=-1, base=None): """ Merge nested-dictionaries. :param nested_dicts: Nested dictionaries. :type nested_dicts: dict :param depth: Maximum keys depth. :type depth: int, optional :param base: Base dict where combine multiple dicts in one. :type base: dict, optional :return: Combined nested-dictionary. :rtype: dict """ if base is None: base = {} for nested_dict in nested_dicts: for k, v in stack_nested_keys(nested_dict, depth=depth): while k: # noinspection PyBroadException try: get_nested_dicts(base, *k[:-1])[k[-1]] = v break except Exception: # A branch of the nested_dict is longer than the base. k = k[:-1] v = get_nested_dicts(nested_dict, *k) return base
[docs]class SubDispatch(Base): """ It dispatches a given :class:`~schedula.dispatcher.Dispatcher` like a function. This function takes a sequence of dictionaries as input that will be combined before the dispatching. :return: A function that executes the dispatch of the given :class:`~schedula.dispatcher.Dispatcher`. :rtype: callable .. seealso:: :func:`~schedula.dispatcher.Dispatcher.dispatch`, :func:`combine_dicts` Example: .. dispatcher:: dsp :opt: graph_attr={'ratio': '1'}, depth=-1 :code: >>> from schedula import Dispatcher >>> sub_dsp = Dispatcher(name='Sub-dispatcher') ... >>> def fun(a): ... return a + 1, a - 1 ... >>> sub_dsp.add_function('fun', fun, ['a'], ['b', 'c']) 'fun' >>> dispatch = SubDispatch(sub_dsp, ['a', 'b', 'c'], output_type='dict') >>> dsp = Dispatcher(name='Dispatcher') >>> dsp.add_function('Sub-dispatch', dispatch, ['d'], ['e']) 'Sub-dispatch' The Dispatcher output is: .. dispatcher:: o :opt: graph_attr={'ratio': '1'}, depth=-1 :code: >>> o = dsp.dispatch(inputs={'d': {'a': 3}}) while, the Sub-dispatch is: .. dispatcher:: sol :opt: graph_attr={'ratio': '1'}, depth=-1 :code: >>> sol = o.workflow.nodes['Sub-dispatch']['solution'] >>> sol Solution([('a', 3), ('b', 4), ('c', 2)]) >>> sol == o['e'] True """ def __new__(cls, dsp=None, *args, **kwargs): from .blue import Blueprint if isinstance(dsp, Blueprint): return Blueprint(dsp, *args, **kwargs)._set_cls(cls) return Base.__new__(cls) def __getstate__(self): state = self.__dict__.copy() state['solution'] = state['solution'].__class__(state['dsp']) del state['__name__'] return state def __setstate__(self, d): self.__dict__ = d self.__name__ = self.name
[docs] def __init__(self, dsp, outputs=None, cutoff=None, inputs_dist=None, wildcard=False, no_call=False, shrink=False, rm_unused_nds=False, output_type='all'): """ Initializes the Sub-dispatch. :param dsp: A dispatcher that identifies the model adopted. :type dsp: schedula.Dispatcher :param outputs: Ending data nodes. :type outputs: list[str], iterable :param cutoff: Depth to stop the search. :type cutoff: float, int, optional :param inputs_dist: Initial distances of input data nodes. :type inputs_dist: dict[str, int | float], optional :param wildcard: If True, when the data node is used as input and target in the ArciDispatch algorithm, the input value will be used as input for the connected functions, but not as output. :type wildcard: bool, optional :param no_call: If True data node estimation function is not used. :type no_call: bool, optional :param shrink: If True the dispatcher is shrink before the dispatch. :type shrink: bool, optional :param rm_unused_nds: If True unused function and sub-dispatcher nodes are removed from workflow. :type rm_unused_nds: bool, optional :param output_type: Type of function output: + 'all': a dictionary with all dispatch outputs. + 'list': a list with all outputs listed in `outputs`. + 'dict': a dictionary with any outputs listed in `outputs`. :type output_type: str, optional """ self.dsp = dsp self.outputs = outputs self.cutoff = cutoff self.wildcard = wildcard self.no_call = no_call self.shrink = shrink self.output_type = output_type self.inputs_dist = inputs_dist self.rm_unused_nds = rm_unused_nds self.name = self.__name__ = dsp.name self.__doc__ = dsp.__doc__ self.solution = dsp.solution.__class__(dsp)
[docs] def blue(self, memo=None): """ Constructs a Blueprint out of the current object. :param memo: A dictionary to cache Blueprints. :type memo: dict[T,schedula.utils.blue.Blueprint] :return: A Blueprint of the current object. :rtype: schedula.utils.blue.Blueprint """ memo = {} if memo is None else memo if self not in memo: import inspect from .blue import Blueprint, _parent_blue keys = tuple(inspect.signature(self.__init__).parameters) memo[self] = Blueprint(**{ k: _parent_blue(v, memo) for k, v in self.__dict__.items() if k in keys })._set_cls(self.__class__) return memo[self]
def __call__(self, *input_dicts, copy_input_dicts=False, _stopper=None, _executor=None, _sol_name=()): # Combine input dictionaries. i = combine_dicts(*input_dicts, copy=copy_input_dicts) # Dispatch the function calls. self.solution = self.dsp.dispatch( i, self.outputs, self.cutoff, self.inputs_dist, self.wildcard, self.no_call, self.shrink, self.rm_unused_nds, stopper=_stopper, executor=_executor, sol_name=_sol_name ) return self._return(self.solution) def _return(self, solution): outs = self.outputs solution.result() solution.parent = self # Set output. if self.output_type != 'all': try: # Save outputs. return selector(outs, solution, output_type=self.output_type) except KeyError: missed = set(outs).difference(solution) # Outputs not reached. # Raise error msg = '\n Unreachable output-targets: {}\n Available ' \ 'outputs: {}'.format(missed, list(solution.keys())) raise DispatcherError(msg, sol=solution) return solution # Return outputs.
[docs] def copy(self): return _copy.deepcopy(self)
[docs]class SubDispatchFunction(SubDispatch): """ It converts a :class:`~schedula.dispatcher.Dispatcher` into a function. This function takes a sequence of arguments or a key values as input of the dispatch. :return: A function that executes the dispatch of the given `dsp`. :rtype: callable .. seealso:: :func:`~schedula.dispatcher.Dispatcher.dispatch`, :func:`~schedula.dispatcher.Dispatcher.shrink_dsp` **Example**: A dispatcher with two functions `max` and `min` and an unresolved cycle (i.e., `a` --> `max` --> `c` --> `min` --> `a`): .. dispatcher:: dsp :opt: graph_attr={'ratio': '1'} >>> from schedula import Dispatcher >>> dsp = Dispatcher(name='Dispatcher') >>> dsp.add_function('max', max, inputs=['a', 'b'], outputs=['c']) 'max' >>> from math import log >>> def my_log(x): ... return log(x - 1) >>> dsp.add_function('log(x - 1)', my_log, inputs=['c'], ... outputs=['a'], input_domain=lambda c: c > 1) 'log(x - 1)' Extract a static function node, i.e. the inputs `a` and `b` and the output `a` are fixed:: >>> fun = SubDispatchFunction(dsp, 'myF', ['a', 'b'], ['a']) >>> fun.__name__ 'myF' >>> fun(b=1, a=2) 0.0 .. dispatcher:: fun :opt: workflow=True, graph_attr={'ratio': '1'} >>> fun.dsp.name = 'Created function internal' The created function raises a ValueError if un-valid inputs are provided: .. dispatcher:: fun :opt: workflow=True, graph_attr={'ratio': '1'} :code: >>> fun(1, 0) # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ... DispatcherError: Unreachable output-targets: ... Available outputs: ... """ var_keyword = 'kw'
[docs] def __init__(self, dsp, function_id=None, inputs=None, outputs=None, cutoff=None, inputs_dist=None, shrink=True, wildcard=True): """ Initializes the Sub-dispatch Function. :param dsp: A dispatcher that identifies the model adopted. :type dsp: schedula.Dispatcher :param function_id: Function name. :type function_id: str, optional :param inputs: Input data nodes. :type inputs: list[str], iterable, optional :param outputs: Ending data nodes. :type outputs: list[str], iterable, optional :param cutoff: Depth to stop the search. :type cutoff: float, int, optional :param inputs_dist: Initial distances of input data nodes. :type inputs_dist: dict[str, int | float], optional """ if shrink: dsp = dsp.shrink_dsp(inputs, outputs, cutoff=cutoff, inputs_dist=inputs_dist, wildcard=wildcard) if outputs: missed = set(outputs).difference(dsp.nodes) # Outputs not reached. if missed: # If outputs are missing raise error. available = list(dsp.data_nodes.keys()) # Available data nodes. # Raise error msg = '\n Unreachable output-targets: {}\n Available ' \ 'outputs: {}'.format(missed, available) raise ValueError(msg) # Set internal proprieties self.inputs = inputs # Set dsp name equal to function id. self.function_id = dsp.name = function_id or dsp.name or 'fun' no_call = False self._sol = dsp.solution.__class__( dsp, dict.fromkeys(inputs or (), None), outputs, wildcard, None, inputs_dist, no_call, False ) # Initialize as sub dispatch. super(SubDispatchFunction, self).__init__( dsp, outputs, cutoff, inputs_dist, wildcard, no_call, True, True, 'list' ) # Define the function to return outputs sorted. if outputs is None: self.output_type = 'all' elif len(outputs) == 1: self.output_type = 'values'
@property def __signature__(self): import inspect dfl, p = self.dsp.default_values, [] for name in self.inputs or (): par = inspect.Parameter('par', inspect._POSITIONAL_OR_KEYWORD) par._name = name if name in dfl: par._default = dfl[name]['value'] p.append(par) if self.var_keyword: p.append(inspect.Parameter(self.var_keyword, inspect._VAR_KEYWORD)) return inspect.Signature(p, __validate_parameters__=False) def __call__(self, *args, _stopper=None, _executor=False, _sol_name=(), **kw): # Namespace shortcuts. self.solution = sol = self._sol._copy_structure() self.solution.full_name, dfl = _sol_name, self.dsp.default_values # Parse inputs. ba = self.__signature__.bind(*args, **kw) ba.apply_defaults() inp, extra = ba.arguments, ba.arguments.pop(self.var_keyword, {}) i = set(extra) - set(self.dsp.data_nodes) if i: msg = "%s() got an unexpected keyword argument '%s'" raise TypeError(msg % (self.function_id, min(i))) inp.update(extra) inputs_dist = combine_dicts( sol.inputs_dist, dict.fromkeys(inp, 0), self.inputs_dist or {} ) inp.update({k: dfl[k]['value'] for k in set(dfl) - set(inp)}) # Initialize. sol._init_workflow(inp, inputs_dist=inputs_dist, clean=False) # Dispatch outputs. sol._run(stopper=_stopper, executor=_executor) # Return outputs sorted. return self._return(sol)
[docs]class SubDispatchPipe(SubDispatchFunction): """ It converts a :class:`~schedula.dispatcher.Dispatcher` into a function. This function takes a sequence of arguments as input of the dispatch. :return: A function that executes the pipe of the given `dsp`. :rtype: callable .. seealso:: :func:`~schedula.dispatcher.Dispatcher.dispatch`, :func:`~schedula.dispatcher.Dispatcher.shrink_dsp` **Example**: A dispatcher with two functions `max` and `min` and an unresolved cycle (i.e., `a` --> `max` --> `c` --> `min` --> `a`): .. dispatcher:: dsp :opt: graph_attr={'ratio': '1'} >>> from schedula import Dispatcher >>> dsp = Dispatcher(name='Dispatcher') >>> dsp.add_function('max', max, inputs=['a', 'b'], outputs=['c']) 'max' >>> def func(x): ... return x - 1 >>> dsp.add_function('x - 1', func, inputs=['c'], outputs=['a']) 'x - 1' Extract a static function node, i.e. the inputs `a` and `b` and the output `a` are fixed:: >>> fun = SubDispatchPipe(dsp, 'myF', ['a', 'b'], ['a']) >>> fun.__name__ 'myF' >>> fun(2, 1) 1 .. dispatcher:: fun :opt: workflow=True, graph_attr={'ratio': '1'} >>> fun.dsp.name = 'Created function internal' The created function raises a ValueError if un-valid inputs are provided: .. dispatcher:: fun :opt: workflow=True, graph_attr={'ratio': '1'} :code: >>> fun(1, 0) 0 """ var_keyword = None
[docs] def __init__(self, dsp, function_id=None, inputs=None, outputs=None, cutoff=None, inputs_dist=None, no_domain=True, wildcard=True): """ Initializes the Sub-dispatch Function. :param dsp: A dispatcher that identifies the model adopted. :type dsp: schedula.Dispatcher :param function_id: Function name. :type function_id: str :param inputs: Input data nodes. :type inputs: list[str], iterable :param outputs: Ending data nodes. :type outputs: list[str], iterable, optional :param cutoff: Depth to stop the search. :type cutoff: float, int, optional :param inputs_dist: Initial distances of input data nodes. :type inputs_dist: dict[str, int | float], optional """ self.solution = sol = dsp.solution.__class__( dsp, inputs, outputs, wildcard, cutoff, inputs_dist, True, True, no_domain=no_domain ) sol._run() from .alg import _union_workflow, _convert_bfs bfs = _union_workflow(sol) o, bfs = outputs or sol, _convert_bfs(bfs) dsp = dsp._get_dsp_from_bfs(o, bfs_graphs=bfs) super(SubDispatchPipe, self).__init__( dsp, function_id, inputs, outputs=outputs, cutoff=cutoff, inputs_dist=inputs_dist, shrink=False, wildcard=wildcard ) self._reset_sol() self.pipe = self._set_pipe()
def _reset_sol(self): self._sol.no_call = True self._sol._init_workflow() self._sol._run() self._sol.no_call = False def _set_pipe(self): from .cst import START def _make_tks(task): v, s = task[-1] if v is START: nxt_nds = s.dsp.dmap[v] else: nxt_nds = s.workflow[v] nxt_dsp = [n for n in nxt_nds if s.nodes[n]['type'] == 'dispatcher'] nxt_dsp = [(n, s._edge_length(s.dmap[v][n], s.nodes[n])) for n in nxt_dsp] return (task[0], task[1], (v, s)), nxt_nds, nxt_dsp return [_make_tks(v['task']) for v in self._sol.pipe.values()] def _init_new_solution(self, full_name): key_map, sub_sol = {}, {} for k, s in self._sol.sub_sol.items(): ns = s._copy_structure(dist=1) ns.sub_sol = sub_sol ns.full_name = full_name + s.full_name key_map[s] = ns sub_sol[ns.index] = ns return key_map[self._sol], lambda x: key_map[x] def _init_workflows(self, inputs): self.solution.inputs.update(inputs) for s in self.solution.sub_sol.values(): s._init_workflow(clean=False) def _callback_pipe_failure(self): pass def _pipe_append(self): return self.solution._pipe.append def __call__(self, *args, _stopper=None, _executor=False, _sol_name=(), **kw): self.solution, key_map = self._init_new_solution(_sol_name) pipe_append = self._pipe_append() ba = self.__signature__.bind(*args, **kw) ba.apply_defaults() self._init_workflows(ba.arguments) for x, nxt_nds, nxt_dsp in self.pipe: v, s = x[-1] s = key_map(s) pipe_append(x[:2] + ((v, s),)) if not s._set_node_output( v, False, next_nds=nxt_nds, stopper=_stopper, executor=_executor): self._callback_pipe_failure() break for n, vw_d in nxt_dsp: s._set_sub_dsp_node_input(v, n, [], s.check_cutoff, False, vw_d) s._see_remote_link_node(v) # Return outputs sorted. return self._return(self.solution)
[docs]class NoSub: """Class for avoiding to add a sub solution to the workflow."""
[docs]class DispatchPipe(NoSub, SubDispatchPipe): """ It converts a :class:`~schedula.dispatcher.Dispatcher` into a function. This function takes a sequence of arguments as input of the dispatch. :return: A function that executes the pipe of the given `dsp`, updating its workflow. :rtype: callable .. note:: This wrapper is not thread safe, because it overwrite the solution. .. seealso:: :func:`~schedula.dispatcher.Dispatcher.dispatch`, :func:`~schedula.dispatcher.Dispatcher.shrink_dsp` **Example**: A dispatcher with two functions `max` and `min` and an unresolved cycle (i.e., `a` --> `max` --> `c` --> `min` --> `a`): .. dispatcher:: dsp :opt: graph_attr={'ratio': '1'} >>> from schedula import Dispatcher >>> dsp = Dispatcher(name='Dispatcher') >>> dsp.add_function('max', max, inputs=['a', 'b'], outputs=['c']) 'max' >>> def func(x): ... return x - 1 >>> dsp.add_function('x - 1', func, inputs=['c'], outputs=['a']) 'x - 1' Extract a static function node, i.e. the inputs `a` and `b` and the output `a` are fixed:: >>> fun = DispatchPipe(dsp, 'myF', ['a', 'b'], ['a']) >>> fun.__name__ 'myF' >>> fun(2, 1) 1 .. dispatcher:: fun :opt: workflow=True, graph_attr={'ratio': '1'} >>> fun.dsp.name = 'Created function internal' The created function raises a ValueError if un-valid inputs are provided: .. dispatcher:: fun :opt: workflow=True, graph_attr={'ratio': '1'} :code: >>> fun(1, 0) 0 """ def __getstate__(self): self._init_workflows(dict.fromkeys(self.inputs or ())) self._reset_sol() state = super(DispatchPipe, self).__getstate__() del state['pipe'] return state def __setstate__(self, d): super(DispatchPipe, self).__setstate__(d) self.pipe = self._set_pipe() def _pipe_append(self): return lambda *args: None def _init_new_solution(self, _sol_name): return self._sol, lambda x: x def _init_workflows(self, inputs): for s in self.solution.sub_sol.values(): s._visited.clear() return super(DispatchPipe, self)._init_workflows(inputs) def _return(self, solution): # noinspection PyBroadException try: solution.result() except Exception: self._callback_pipe_failure() return super(DispatchPipe, self)._return(solution) def _callback_pipe_failure(self): raise DispatcherError("The pipe is not respected.", sol=self.solution)
[docs] def plot(self, workflow=None, *args, **kwargs): if workflow: return self.solution.plot(*args, **kwargs) return super(DispatchPipe, self).plot(workflow, *args, **kwargs)
def _get_par_args(func, exl_kw=False): par = collections.OrderedDict() for k, v in _get_signature(func, 0)._parameters.items(): if v.kind >= v.VAR_POSITIONAL or (exl_kw and v.default is not v.empty): break par[k] = v return par
[docs]def add_function(dsp, inputs_kwargs=False, inputs_defaults=False, **kw): """ Decorator to add a function to a dispatcher. :param dsp: A dispatcher. :type dsp: schedula.Dispatcher :param inputs_kwargs: Do you want to include kwargs as inputs? :type inputs_kwargs: bool :param inputs_defaults: Do you want to set default values? :type inputs_defaults: bool :param kw: See :func:~`schedula.dispatcher.Dispatcher.add_function`. :type kw: dict :return: Decorator. :rtype: callable **------------------------------------------------------------------------** **Example**: .. dispatcher:: sol :opt: graph_attr={'ratio': '1'} :code: >>> import schedula as sh >>> dsp = sh.Dispatcher(name='Dispatcher') >>> @sh.add_function(dsp, outputs=['e']) ... @sh.add_function(dsp, False, True, outputs=['i'], inputs='ecah') ... @sh.add_function(dsp, True, outputs=['l']) ... def f(a, b, c, d=1): ... return (a + b) - c + d >>> @sh.add_function(dsp, True, outputs=['d']) ... def g(e, i, *args, d=0): ... return e + i + d >>> sol = dsp({'a': 1, 'b': 2, 'c': 3}); sol Solution([('a', 1), ('b', 2), ('c', 3), ('h', 1), ('e', 1), ('i', 4), ('d', 5), ('l', 5)]) """ def decorator(f): dsp.add_func( f, inputs_kwargs=inputs_kwargs, inputs_defaults=inputs_defaults, **kw ) return f return decorator
[docs]class inf(collections.namedtuple('_inf', ['inf', 'num'])): """Class to model infinite numbers for workflow distance.""" _methods = { 'add': {'func': lambda x, y: x + y, 'dfl': 0}, 'sub': {'func': lambda x, y: x - y, 'dfl': 0}, 'mul': {'func': lambda x, y: x * y}, 'truediv': {'func': lambda x, y: x / y}, 'pow': {'func': lambda x, y: x ** y}, 'mod': {'func': lambda x, y: x % y}, 'floordiv': {'func': lambda x, y: x // y}, 'neg': {'func': lambda x: -x, 'self': True}, 'pos': {'func': lambda x: +x, 'self': True}, 'abs': {'func': lambda x: abs(x), 'self': True}, 'round': {'func': lambda *a: round(*a), 'self': True}, 'trunc': {'func': math.trunc, 'self': True}, 'floor': {'func': math.floor, 'self': True}, 'ceil': {'func': math.ceil, 'self': True}, } for k in ('add', 'sub', 'mul', 'mod', 'pow', 'truediv', 'floordiv'): _methods['r%s' % k] = combine_dicts(_methods[k], {'reverse': True}) for k in ('ge', 'gt', 'eq', 'le', 'lt', 'ne'): _methods[k] = {'func': getattr(tuple, '__%s__' % k), 'dfl': 0, 'log': 1} # noinspection PyMethodParameters def _wrap(k, d): f = d['func'] if d.get('log'): def method(self, other, *a): if not isinstance(other, self.__class__): other = d.get('dfl', other), other return f(self, other, *a) elif d.get('self'): def method(self, *a): return inf(*(f(x, *a) for x in self)) else: i = -1 if d.get('reverse') else 1 def method(self, other, *a): if not isinstance(other, self.__class__): other = d.get('dfl', other), other return inf(*(f(x, y, *a) for x, y in zip(*(self, other)[::i]))) method.__name__ = k return method for k in _methods: exec('__{0}__ = _wrap("__{0}__", _methods["{0}"])'.format(k)) del _wrap, _methods, k