Source code for schedula.utils.dsp

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
#
# Copyright 2015-2025, Vincenzo Arcidiacono;
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl

"""
It provides tools to create models with the
:class:`~schedula.dispatcher.Dispatcher`.
"""
import math
import inspect
import functools
import itertools
import collections
import copy as _copy
from .cst import START
from .gen import Token
from .base import Base
from .exc import DispatcherError
from dataclasses import dataclass

__author__ = 'Vincenzo Arcidiacono <vinci1it2000@gmail.com>'


[docs] def stlp(s): """ Converts a string in a tuple. """ if isinstance(s, str): return s, return s
[docs] def combine_dicts(*dicts, copy=False, base=None): """ Combines multiple dicts in one. :param dicts: A sequence of dicts. :type dicts: dict :param copy: If True, it returns a deepcopy of input values. :type copy: bool, optional :param base: Base dict where combine multiple dicts in one. :type base: dict, optional :return: A unique dict. :rtype: dict Example:: >>> sorted(combine_dicts({'a': 3, 'c': 3}, {'a': 1, 'b': 2}).items()) [('a', 1), ('b', 2), ('c', 3)] """ if len(dicts) == 1 and base is None: # Only one input dict. cd = dicts[0].copy() else: cd = {} if base is None else base # Initialize empty dict. for d in dicts: # Combine dicts. if d: # noinspection PyTypeChecker cd.update(d) # Return combined dict. return {k: _copy.deepcopy(v) for k, v in cd.items()} if copy else cd
[docs] def kk_dict(*kk, **adict): """ Merges and defines dictionaries with values identical to keys. :param kk: A sequence of keys and/or dictionaries. :type kk: object | dict, optional :param adict: A dictionary. :type adict: dict, optional :return: Merged dictionary. :rtype: dict Example:: >>> sorted(kk_dict('a', 'b', 'c').items()) [('a', 'a'), ('b', 'b'), ('c', 'c')] >>> sorted(kk_dict('a', 'b', **{'a-c': 'c'}).items()) [('a', 'a'), ('a-c', 'c'), ('b', 'b')] >>> sorted(kk_dict('a', {'b': 'c'}, 'c').items()) [('a', 'a'), ('b', 'c'), ('c', 'c')] >>> sorted(kk_dict('a', 'b', **{'b': 'c'}).items()) Traceback (most recent call last): ... ValueError: keyword argument repeated (b) >>> sorted(kk_dict({'a': 0, 'b': 1}, **{'b': 2, 'a': 3}).items()) Traceback (most recent call last): ... ValueError: keyword argument repeated (a, b) """ for k in kk: if isinstance(k, dict): if any(i in adict for i in k): k = ', '.join(sorted(set(k).intersection(adict))) raise ValueError('keyword argument repeated ({})'.format(k)) adict.update(k) elif k in adict: raise ValueError('keyword argument repeated ({})'.format(k)) else: adict[k] = k return adict
[docs] def bypass(*inputs, copy=False): """ Returns the same arguments. :param inputs: Inputs values. :type inputs: T :param copy: If True, it returns a deepcopy of input values. :type copy: bool, optional :return: Same input values. :rtype: (T, ...), T Example:: >>> bypass('a', 'b', 'c') ('a', 'b', 'c') >>> bypass('a') 'a' """ if len(inputs) == 1: inputs = inputs[0] # Same inputs. return _copy.deepcopy(inputs) if copy else inputs # Return inputs.
[docs] def summation(*inputs): """ Sums inputs values. :param inputs: Inputs values. :type inputs: int, float :return: Sum of the input values. :rtype: int, float Example:: >>> summation(1, 3.0, 4, 2) 10.0 """ # Return the sum of the input values. return functools.reduce(lambda x, y: x + y, inputs)
[docs] def map_dict(key_map, *dicts, copy=False, base=None): """ Returns a dict with new key values. :param key_map: A dictionary that maps the dict keys ({old key: new key}. :type key_map: dict :param dicts: A sequence of dicts. :type dicts: dict :param copy: If True, it returns a deepcopy of input values. :type copy: bool, optional :param base: Base dict where combine multiple dicts in one. :type base: dict, optional :return: A unique dict with new key values. :rtype: dict Example:: >>> d = map_dict({'a': 'c', 'b': 'd'}, {'a': 1, 'b': 1}, {'b': 2}) >>> sorted(d.items()) [('c', 1), ('d', 2)] """ it = combine_dicts(*dicts).items() # Combine dicts. get = key_map.get # Namespace shortcut. # Return mapped dict. return combine_dicts({get(k, k): v for k, v in it}, copy=copy, base=base)
[docs] def map_list(key_map, *inputs, copy=False, base=None): """ Returns a new dict. :param key_map: A list that maps the dict keys ({old key: new key} :type key_map: list[str | dict | list] :param inputs: A sequence of data. :type inputs: iterable | dict | int | float | list | tuple :param copy: If True, it returns a deepcopy of input values. :type copy: bool, optional :param base: Base dict where combine multiple dicts in one. :type base: dict, optional :return: A unique dict with new values. :rtype: dict Example:: >>> key_map = [ ... 'a', ... {'a': 'c'}, ... [ ... 'a', ... {'a': 'd'} ... ] ... ] >>> inputs = ( ... 2, ... {'a': 3, 'b': 2}, ... [ ... 1, ... {'a': 4} ... ] ... ) >>> d = map_list(key_map, *inputs) >>> sorted(d.items()) [('a', 1), ('b', 2), ('c', 3), ('d', 4)] """ d = {} if base is None else base # Initialize empty dict. for m, v in zip(key_map, inputs): if isinstance(m, dict): map_dict(m, v, base=d) # Apply a map dict. elif isinstance(m, list): map_list(m, *v, base=d) # Apply a map list. else: d[m] = v # Apply map. return combine_dicts(copy=copy, base=d) # Return dict.
[docs] def selector(keys, dictionary, copy=False, output_type='dict', allow_miss=False): """ Selects the chosen dictionary keys from the given dictionary. :param keys: Keys to select. :type keys: list, tuple, set :param dictionary: A dictionary. :type dictionary: dict :param copy: If True the output contains deep-copies of the values. :type copy: bool :param output_type: Type of function output: + 'list': a list with all values listed in `keys`. + 'dict': a dictionary with any outputs listed in `keys`. + 'values': if output length == 1 return a single value otherwise a tuple with all values listed in `keys`. :type output_type: str, optional :param allow_miss: If True it does not raise when some key is missing in the dictionary. :type allow_miss: bool :return: A dictionary with chosen dictionary keys if present in the sequence of dictionaries. These are combined with :func:`combine_dicts`. :rtype: dict Example:: >>> import schedula as sh >>> fun = sh.partial(selector, ['a', 'b']) >>> sorted(fun({'a': 1, 'b': 2, 'c': 3}).items()) [('a', 1), ('b', 2)] """ if not allow_miss: # noinspection PyUnusedLocal def check(key): return True else: def check(key): return key in dictionary if output_type == 'list': # Select as list. res = [dictionary[k] for k in keys if check(k)] return _copy.deepcopy(res) if copy else res elif output_type == 'values': return bypass(*[dictionary[k] for k in keys if check(k)], copy=copy) # Select as dict. return bypass({k: dictionary[k] for k in keys if check(k)}, copy=copy)
[docs] def replicate_value(value, n=2, copy=True): """ Replicates `n` times the input value. :param n: Number of replications. :type n: int :param value: Value to be replicated. :type value: T :param copy: If True the list contains deep-copies of the value. :type copy: bool :return: A list with the value replicated `n` times. :rtype: list Example:: >>> import schedula as sh >>> fun = sh.partial(replicate_value, n=5) >>> fun({'a': 3}) ({'a': 3}, {'a': 3}, {'a': 3}, {'a': 3}, {'a': 3}) """ return bypass(*[value] * n, copy=copy) # Return replicated values.
[docs] def parent_func(func, input_id=None): """ Return the parent function of a wrapped function (wrapped with :class:`functools.partial` and :class:`add_args`). :param func: Wrapped function. :type func: callable :param input_id: Index of the first input of the wrapped function. :type input_id: int :return: Parent function. :rtype: callable """ if isinstance(func, add_args): if input_id is not None: input_id -= func.n return parent_func(func.func, input_id=input_id) elif isinstance(func, partial): if input_id is not None: # noinspection PyTypeChecker input_id += len(func.args) return parent_func(func.func, input_id=input_id) if input_id is None: return func else: return func, input_id
if inspect.isclass(functools.partial): partial = functools.partial else: # MicroPython. class partial: def __init__(self, func, *args, **keywords): self.func = func self.args = args self.keywords = keywords def __call__(self, *args, **keywords): keywords = combine_dicts(self.keywords, keywords) return self.func(*(self.args + args), **keywords)
[docs] class add_args: """ Adds arguments to a function (left side). :param func: Function to wrap. :type func: callable :param n: Number of unused arguments to add to the left side. :type n: int :return: Wrapped function. :rtype: callable Example:: >>> import inspect >>> def original_func(a, b, *args, c=0): ... '''Doc''' ... return a + b + c >>> func = add_args(original_func, n=2) >>> func.__name__, func.__doc__ ('original_func', 'Doc') >>> func(1, 2, 3, 4, c=5) 12 >>> str(inspect.signature(func)) '(none, none, a, b, *args, c=0)' """ __name__ = __doc__ = None _args = ('func', 'n', 'callback')
[docs] def __init__(self, func, n=1, callback=None): self.n = n self.callback = callback self.func = func for i in range(2): # noinspection PyBroadException try: self.__name__ = func.__name__ self.__doc__ = func.__doc__ break except AttributeError: func = parent_func(func)
@property def __signature__(self): return _get_signature(self.func, self.n)
[docs] def __call__(self, *args, **kwargs): res = self.func(*args[self.n:], **kwargs) if self.callback: self.callback(res, *args, **kwargs) return res
[docs] def _get_signature(func, n=1): import inspect sig = inspect.signature(func) # Get function signature. def ept_par(): # Return none signature parameter. name = Token('none') return name, inspect.Parameter(name, inspect._POSITIONAL_OR_KEYWORD) # Update signature parameters. par = itertools.chain(*([p() for p in itertools.repeat(ept_par, n)], sig.parameters.items())) sig._parameters = sig._parameters.__class__(collections.OrderedDict(par)) return sig
[docs] def stack_nested_keys(nested_dict, key=(), depth=-1): """ Stacks the keys of nested-dictionaries into tuples and yields a list of k-v pairs. :param nested_dict: Nested dictionary. :type nested_dict: dict :param key: Initial keys. :type key: tuple, optional :param depth: Maximum keys depth. :type depth: int, optional :return: List of k-v pairs. :rtype: generator """ if depth != 0 and hasattr(nested_dict, 'items'): depth = depth - 1 for k, v in nested_dict.items(): yield from stack_nested_keys(v, key=key + (k,), depth=depth) else: yield key, nested_dict
[docs] def get_nested_dicts(nested_dict, *keys, default=None, init_nesting=dict): """ Get/Initialize the value of nested-dictionaries. :param nested_dict: Nested dictionary. :type nested_dict: dict :param keys: Nested keys. :type keys: object :param default: Function used to initialize a new value. :type default: callable, optional :param init_nesting: Function used to initialize a new intermediate nesting dict. :type init_nesting: callable, optional :return: Value of nested-dictionary. :rtype: generator """ if keys: default = default or init_nesting if keys[0] in nested_dict: nd = nested_dict[keys[0]] else: d = default() if len(keys) == 1 else init_nesting() nd = nested_dict[keys[0]] = d return get_nested_dicts(nd, *keys[1:], default=default, init_nesting=init_nesting) return nested_dict
[docs] def are_in_nested_dicts(nested_dict, *keys): """ Nested keys are inside of nested-dictionaries. :param nested_dict: Nested dictionary. :type nested_dict: dict :param keys: Nested keys. :type keys: object :return: True if nested keys are inside of nested-dictionaries, otherwise False. :rtype: bool """ if keys: # noinspection PyBroadException try: return are_in_nested_dicts(nested_dict[keys[0]], *keys[1:]) except Exception: # Key error or not a dict. return False return True
[docs] def combine_nested_dicts(*nested_dicts, depth=-1, base=None): """ Merge nested-dictionaries. :param nested_dicts: Nested dictionaries. :type nested_dicts: dict :param depth: Maximum keys depth. :type depth: int, optional :param base: Base dict where combine multiple dicts in one. :type base: dict, optional :return: Combined nested-dictionary. :rtype: dict """ if base is None: base = {} for nested_dict in nested_dicts: for k, v in stack_nested_keys(nested_dict, depth=depth): while k: # noinspection PyBroadException try: get_nested_dicts(base, *k[:-1])[k[-1]] = v break except Exception: # A branch of the nested_dict is longer than the base. k = k[:-1] v = get_nested_dicts(nested_dict, *k) return base
[docs] class SubDispatch(Base): """ It dispatches a given :class:`~schedula.dispatcher.Dispatcher` like a function. This function takes a sequence of dictionaries as input that will be combined before the dispatching. :return: A function that executes the dispatch of the given :class:`~schedula.dispatcher.Dispatcher`. :rtype: callable .. seealso:: :func:`~schedula.dispatcher.Dispatcher.dispatch`, :func:`combine_dicts` Example: .. dispatcher:: dsp :opt: graph_attr={'ratio': '1'}, depth=-1 :code: >>> from schedula import Dispatcher >>> sub_dsp = Dispatcher(name='Sub-dispatcher') ... >>> def fun(a): ... return a + 1, a - 1 ... >>> sub_dsp.add_function('fun', fun, ['a'], ['b', 'c']) 'fun' >>> dispatch = SubDispatch(sub_dsp, ['a', 'b', 'c'], output_type='dict') >>> dsp = Dispatcher(name='Dispatcher') >>> dsp.add_function('Sub-dispatch', dispatch, ['d'], ['e']) 'Sub-dispatch' The Dispatcher output is: .. dispatcher:: o :opt: graph_attr={'ratio': '1'}, depth=-1 :code: >>> o = dsp.dispatch(inputs={'d': {'a': 3}}) while, the Sub-dispatch is: .. dispatcher:: sol :opt: graph_attr={'ratio': '1'}, depth=-1 :code: >>> sol = o.workflow.nodes['Sub-dispatch']['solution'] >>> sol Solution({'a': 3, 'b': 4, 'c': 2}) >>> sol == o['e'] True """
[docs] def __new__(cls, dsp=None, *args, **kwargs): from .blue import Blueprint if isinstance(dsp, Blueprint): return Blueprint(dsp, *args, **kwargs)._set_cls(cls) return super(SubDispatch, cls).__new__(cls)
[docs] def __getstate__(self): state = self.__dict__.copy() state['solution'] = state['solution'].__class__(state['dsp']) del state['__name__'] return state
[docs] def __setstate__(self, d): self.__dict__ = d self.__name__ = self.name
[docs] def __init__(self, dsp, outputs=None, inputs_dist=None, wildcard=False, no_call=False, shrink=False, rm_unused_nds=False, output_type='all', function_id=None, output_type_kw=None): """ Initializes the Sub-dispatch. :param dsp: A dispatcher that identifies the model adopted. :type dsp: schedula.Dispatcher | schedula.utils.blue.BlueDispatcher :param outputs: Ending data nodes. :type outputs: list[str], iterable :param inputs_dist: Initial distances of input data nodes. :type inputs_dist: dict[str, int | float], optional :param wildcard: If True, when the data node is used as input and target in the ArciDispatch algorithm, the input value will be used as input for the connected functions, but not as output. If it is equal to 2, the the data node that cannot be calculated are excluded by the wildcard condition. :type wildcard: bool, int, optional :param no_call: If True data node estimation function is not used. :type no_call: bool, optional :param shrink: If True the dispatcher is shrink before the dispatch. :type shrink: bool, optional :param rm_unused_nds: If True unused function and sub-dispatcher nodes are removed from workflow. :type rm_unused_nds: bool, optional :param output_type: Type of function output: + 'all': a dictionary with all dispatch outputs. + 'list': a list with all outputs listed in `outputs`. + 'dict': a dictionary with any outputs listed in `outputs`. :type output_type: str, optional :param output_type_kw: Extra kwargs to pass to the `selector` function. :type output_type_kw: dict, optional :param function_id: Function name. :type function_id: str, optional """ super(Base, self).__init__() self.dsp = dsp self.outputs = outputs self.wildcard = wildcard self.no_call = no_call self.shrink = shrink self.output_type = output_type self.output_type_kw = output_type_kw or {} self.inputs_dist = inputs_dist self.rm_unused_nds = rm_unused_nds self.name = self.__name__ = function_id or dsp.name self.__doc__ = dsp.__doc__ self.solution = dsp.solution.__class__(dsp)
[docs] def blue(self, memo=None, depth=-1): """ Constructs a Blueprint out of the current object. :param memo: A dictionary to cache Blueprints. :type memo: dict[T,schedula.utils.blue.Blueprint] :param depth: Depth of sub-dispatch blue. If negative all levels are bluprinted. :type depth: int, optional :return: A Blueprint of the current object. :rtype: schedula.utils.blue.Blueprint """ if depth == 0: return self depth -= 1 memo = {} if memo is None else memo if self not in memo: import inspect from .blue import Blueprint, _parent_blue keys = tuple(inspect.signature(self.__init__).parameters) memo[self] = Blueprint(**{ k: _parent_blue(v, memo, depth) for k, v in self.__dict__.items() if k in keys })._set_cls(self.__class__) return memo[self]
[docs] def __call__(self, *input_dicts, copy_input_dicts=False, _stopper=None, _executor=False, _sol_name=(), _verbose=False): # Combine input dictionaries. i = combine_dicts(*input_dicts, copy=copy_input_dicts) # Dispatch the function calls. self.solution = self.dsp.dispatch( i, self.outputs, self.inputs_dist, self.wildcard, self.no_call, self.shrink, self.rm_unused_nds, stopper=_stopper, executor=_executor, sol_name=_sol_name, verbose=_verbose ) return self._return(self.solution)
[docs] def _return(self, solution): outs = self.outputs solution.result() solution.parent = self # Set output. if self.output_type != 'all': try: # Save outputs. return selector( outs, solution, output_type=self.output_type, **self.output_type_kw ) except KeyError: # Outputs not reached. missed = {k for k in outs if k not in solution} # Raise error msg = '\n Unreachable output-targets: {}\n Available ' \ 'outputs: {}'.format(missed, list(solution.keys())) raise DispatcherError(msg, sol=solution) return solution # Return outputs.
[docs] def copy(self): return _copy.deepcopy(self)
[docs] class run_model: """ It is an utility function to execute dynamically generated function/models and - if Dispatcher based - add their workflows to the parent solution. :return: A function that executes the dispatch of the given `dsp`. :rtype: callable **Example**: Follows a simple example on how to use the :func:`~schedula.utils.dsp.run_model`: .. dispatcher:: dsp :opt: graph_attr={'ratio': '1'} >>> from schedula import Dispatcher >>> dsp = Dispatcher(name='Dispatcher') >>> dsp.add_function( ... function_id='execute_dsp', function=run_model, ... inputs=['dsp_model', 'inputs'], outputs=['outputs'] ... ) 'execute_dsp' >>> dsp_model = Dispatcher(name='Model') >>> dsp_model.add_function('max', max, inputs=['a', 'b'], outputs=['c']) 'max' >>> sol = dsp({'dsp_model': dsp_model, 'inputs': {'b': 1, 'a': 2}}) >>> sol['outputs'] Solution({'a': 2, 'b': 1, 'c': 2}) >>> sol.workflow.nodes['execute_dsp']['solution'] Solution({'a': 2, 'b': 1, 'c': 2}) Moreover, it can be used also with all :func:`~schedula.utils.dsp.SubDispatcher` like objects:: >>> sub_dsp = SubDispatch(dsp_model, outputs=['c'], output_type='list') >>> sol = dsp({'dsp_model': sub_dsp, 'inputs': {'b': 1, 'a': 2}}) >>> sol['outputs'] [2] >>> sol.workflow.nodes['execute_dsp']['solution'] Solution({'a': 2, 'b': 1, 'c': 2}) """
[docs] def __init__(self, func, *args, _init=None, **kwargs): from .blue import Blueprint if isinstance(func, Blueprint): func = func.register(memo={}) self.func = func if _init: args, kwargs = _init(*args, **kwargs) self.args = args self.kwargs = kwargs
[docs] def __call__(self, **kwargs): return self.func(*self.args, **self.kwargs, **kwargs)
[docs] class MapDispatch(SubDispatch): """ It dynamically builds a :class:`~schedula.dispatcher.Dispatcher` that is used to invoke recursivelly a *dispatching function* that is defined by a constructor function that takes a `dsp` base model as input. The created function takes a list of dictionaries as input that are used to invoke the mapping function and returns a list of outputs. :return: A function that executes the dispatch of the given :class:`~schedula.dispatcher.Dispatcher`. :rtype: callable .. seealso:: :func:`~schedula.utils.dsp.SubDispatch` Example: A simple example on how to use the :func:`~schedula.utils.dsp.MapDispatch`: .. dispatcher:: map_func :opt: graph_attr={'ratio': '1'}, depth=-1, workflow=True :code: >>> from schedula import Dispatcher, MapDispatch >>> dsp = Dispatcher(name='model') ... >>> def fun(a, b): ... return a + b, a - b ... >>> dsp.add_func(fun, ['c', 'd'], inputs_kwargs=True) 'fun' >>> map_func = MapDispatch(dsp, constructor_kwargs={ ... 'outputs': ['c', 'd'], 'output_type': 'list' ... }) >>> map_func([{'a': 1, 'b': 2}, {'a': 2, 'b': 2}, {'a': 3, 'b': 2}]) [[3, -1], [4, 0], [5, 1]] The execution model is created dynamically according to the length of the provided inputs. Moreover, the :func:`~schedula.utils.dsp.MapDispatch` has the possibility to define default values, that are recursively merged with the input provided to the *dispatching function* as follow: .. dispatcher:: map_func :opt: graph_attr={'ratio': '1'}, depth=-1, workflow=True :code: >>> map_func([{'a': 1}, {'a': 3, 'b': 3}], defaults={'b': 2}) [[3, -1], [6, 0]] The :func:`~schedula.utils.dsp.MapDispatch` can also be used as a partial reducing function, i.e., part of the outpus of the previous step are used as input for the successive execution of the *dispatching function*. For example: .. dispatcher:: map_func :opt: graph_attr={'ratio': '1'}, depth=-1, workflow=True :code: >>> map_func = MapDispatch(dsp, recursive_inputs={'c': 'b'}) >>> map_func([{'a': 1, 'b': 1}, {'a': 2}, {'a': 3}]) [Solution({'a': 1, 'b': 1, 'c': 2, 'd': 0}), Solution({'a': 2, 'b': 2, 'c': 4, 'd': 0}), Solution({'a': 3, 'b': 4, 'c': 7, 'd': -1})] """
[docs] def __init__(self, dsp, defaults=None, recursive_inputs=None, constructor=SubDispatch, constructor_kwargs=None, function_id=None, func_kw=lambda *args, **data: {}, input_label='inputs<{}>', output_label='outputs<{}>', data_label='data<{}>', cluster_label='task<{}>', **kwargs): """ Initializes the MapDispatch function. :param dsp: A dispatcher that identifies the base model. :type dsp: schedula.Dispatcher | schedula.utils.blue.BlueDispatcher :param defaults: Defaults values that are recursively merged with the input provided to the *dispatching function*. :type defaults: dict :param recursive_inputs: List of data node ids that are extracted from the outputs of the *dispatching function* and then merged with the inputs of the its successive evaluation. If a dictionary is given, this is used to rename the data node ids extracted. :type recursive_inputs: list | dict :param constructor: It initializes the *dispatching function*. :type constructor: function | class :param constructor_kwargs: Extra keywords passed to the constructor function. :type constructor_kwargs: function | class :param function_id: Function name. :type function_id: str, optional :param func_kw: Extra keywords to add the *dispatching function* to execution model. :type func_kw: function, optional :param input_label: Custom label formatter for recursive inputs. :type input_label: str, optional :param output_label: Custom label formatter for recursive outputs. :type output_label: str, optional :param data_label: Custom label formatter for recursive internal data. :type data_label: str, optional :param kwargs: Keywords to initialize the execution model. :type kwargs: object """ super(MapDispatch, self).__init__( dsp, function_id=function_id, output_type='list' ) self.func = constructor(dsp, **(constructor_kwargs or {})) self.kwargs = kwargs or {} self.defaults = defaults self.recursive_inputs = recursive_inputs self.input_label = input_label self.output_label = output_label self.data_label = data_label self.cluster_label = cluster_label self.func_kw = func_kw
[docs] @staticmethod def prepare_inputs(inputs, defaults): inputs = [combine_dicts(defaults, d) for d in inputs] return inputs if len(inputs) > 1 else inputs[0]
[docs] @staticmethod def recursive_data(recursive_inputs, input_data, outputs): data = selector(recursive_inputs, outputs or {}, allow_miss=True) if isinstance(recursive_inputs, dict): data = map_dict(recursive_inputs, data) data.update(input_data) return data
[docs] @staticmethod def format_labels(it, label): f = label.format return [f(k, **v) for k, v in it]
[docs] @staticmethod def format_clusters(it, label): f = label.format return [{'body': { 'label': f'"{f(k, **v)}"', 'labelloc': 'b' }} for k, v in it]
[docs] def _init_dsp(self, defaults, inputs, recursive_inputs=None): from ..dispatcher import Dispatcher defaults = combine_dicts(self.defaults or {}, defaults or {}) self.dsp = dsp = Dispatcher(**self.kwargs) add_data, add_func = dsp.add_data, dsp.add_func n = len(str(len(inputs) + 1)) it = [(str(k).zfill(n), v) for k, v in enumerate(inputs, 1)] inp = self.format_labels(it, self.input_label) clt = self.format_clusters(it, self.cluster_label) rl = self.format_labels(it, 'run<{}>') self.outputs = out = self.format_labels(it, self.output_label) add_func(self.prepare_inputs, inp, inputs=['inputs', 'defaults']) recursive = recursive_inputs or self.recursive_inputs if recursive: func = functools.partial(self.recursive_data, recursive) dat = self.format_labels(it, self.data_label) fl = self.format_labels(it, 'recursive_data<{}>') it = iter(zip(inp, dat, clt, fl)) i, d, c, fid = next(it) add_data(i, clusters=c) add_func(bypass, [d], inputs=[i], clusters=c) for (i, d, c, fid), o, in zip(it, out[:-1]): add_data(i, clusters=c) add_func(func, [d], inputs=[i, o], clusters=c, function_id=fid) inp = dat for i, o, c, fid, (k, v) in zip(inp, out, clt, rl, enumerate(inputs)): add_data(i, clusters=c) kw = {'clusters': c, 'function_id': fid} kw.update(self.func_kw(k, **v)) add_func(self.func, [o], inputs=[i], **kw) add_data(o, clusters=c) return {'inputs': inputs, 'defaults': defaults}
# noinspection PyMethodOverriding
[docs] def __call__(self, inputs, defaults=None, recursive_inputs=None, _stopper=None, _executor=False, _sol_name=(), _verbose=False): inputs = self._init_dsp(defaults, inputs, recursive_inputs) return super(MapDispatch, self).__call__( inputs, _stopper=_stopper, _executor=_executor, _verbose=_verbose, _sol_name=_sol_name )
[docs] class SubDispatchFunction(SubDispatch): """ It converts a :class:`~schedula.dispatcher.Dispatcher` into a function. This function takes a sequence of arguments or a key values as input of the dispatch. :return: A function that executes the dispatch of the given `dsp`. :rtype: callable .. seealso:: :func:`~schedula.dispatcher.Dispatcher.dispatch`, :func:`~schedula.dispatcher.Dispatcher.shrink_dsp` **Example**: A dispatcher with two functions `max` and `min` and an unresolved cycle (i.e., `a` --> `max` --> `c` --> `min` --> `a`): .. dispatcher:: dsp :opt: graph_attr={'ratio': '1'} >>> from schedula import Dispatcher >>> dsp = Dispatcher(name='Dispatcher') >>> dsp.add_function('max', max, inputs=['a', 'b'], outputs=['c']) 'max' >>> from math import log >>> def my_log(x): ... return log(x - 1) >>> dsp.add_function('log(x - 1)', my_log, inputs=['c'], ... outputs=['a'], input_domain=lambda c: c > 1) 'log(x - 1)' Extract a static function node, i.e. the inputs `a` and `b` and the output `a` are fixed:: >>> fun = SubDispatchFunction(dsp, 'myF', ['a', 'b'], ['a']) >>> fun.__name__ 'myF' >>> fun(b=1, a=2) 0.0 .. dispatcher:: fun :opt: workflow=True, graph_attr={'ratio': '1'} >>> fun.dsp.name = 'Created function internal' The created function raises a ValueError if un-valid inputs are provided: .. dispatcher:: fun :opt: workflow=True, graph_attr={'ratio': '1'} :code: >>> fun(1, 0) # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ... DispatcherError: Unreachable output-targets: ... Available outputs: ... """ var_keyword = 'kw'
[docs] def __init__(self, dsp, function_id=None, inputs=None, outputs=None, inputs_dist=None, shrink=True, wildcard=True, output_type=None, output_type_kw=None, first_arg_as_kw=False): """ Initializes the Sub-dispatch Function. :param dsp: A dispatcher that identifies the model adopted. :type dsp: schedula.Dispatcher | schedula.utils.blue.BlueDispatcher :param function_id: Function name. :type function_id: str, optional :param inputs: Input data nodes. :type inputs: list[str], iterable, optional :param outputs: Ending data nodes. :type outputs: list[str], iterable, optional :param inputs_dist: Initial distances of input data nodes. :type inputs_dist: dict[str, int | float], optional :param shrink: If True the dispatcher is shrink before the dispatch. :type shrink: bool, optional :param wildcard: If True, when the data node is used as input and target in the ArciDispatch algorithm, the input value will be used as input for the connected functions, but not as output. If it is equal to 2, the the data node that cannot be calculated are excluded by the wildcard condition. :type wildcard: bool, int, optional :param output_type: Type of function output: + 'all': a dictionary with all dispatch outputs. + 'list': a list with all outputs listed in `outputs`. + 'dict': a dictionary with any outputs listed in `outputs`. :type output_type: str, optional :param output_type_kw: Extra kwargs to pass to the `selector` function. :type output_type_kw: dict, optional :param first_arg_as_kw: Uses the first argument of the __call__ method as `kwargs`. :type output_type_kw: bool """ if shrink: dsp = dsp.shrink_dsp( inputs, outputs, inputs_dist=inputs_dist, wildcard=wildcard ) if outputs: # Outputs not reached. missed = {k for k in outputs if k not in dsp.nodes} if missed: # If outputs are missing raise error. available = list(dsp.data_nodes.keys()) # Available data nodes. # Raise error msg = '\n Unreachable output-targets: {}\n Available ' \ 'outputs: {}'.format(missed, available) raise ValueError(msg) # Set internal proprieties self.inputs = inputs # Set dsp name equal to function id. self.function_id = dsp.name = function_id or dsp.name or 'fun' no_call = False self._sol = dsp.solution.__class__( dsp, dict.fromkeys(inputs or (), None), outputs, wildcard, None, inputs_dist, no_call, False ) # Initialize as sub dispatch. super(SubDispatchFunction, self).__init__( dsp, outputs, inputs_dist, wildcard, no_call, True, True, 'list', output_type_kw=output_type_kw ) # Define the function to return outputs sorted. if output_type is not None: self.output_type = output_type elif outputs is None: self.output_type = 'all' elif len(outputs) == 1: self.output_type = 'values' self.first_arg_as_kw = first_arg_as_kw
@property def __signature__(self): import inspect dfl, p = self.dsp.default_values, [] for name in self.inputs or (): par = inspect.Parameter('par', inspect._POSITIONAL_OR_KEYWORD) par._name = name if name in dfl: par._default = dfl[name]['value'] p.append(par) if self.var_keyword: p.append(inspect.Parameter(self.var_keyword, inspect._VAR_KEYWORD)) return inspect.Signature(p, __validate_parameters__=False)
[docs] def _parse_inputs(self, *args, **kw): if self.first_arg_as_kw: for k in sorted(args[0]): if k in kw: msg = 'multiple values for argument %r' raise TypeError(msg % k) from None kw.update(args[0]) args = args[1:] defaults, inputs = self.dsp.default_values, {} for i, k in enumerate(self.inputs or ()): try: inputs[k] = args[i] if k in kw: msg = 'multiple values for argument %r' raise TypeError(msg % k) from None except IndexError: if k in kw: inputs[k] = kw.pop(k) elif k in defaults: inputs[k] = defaults[k]['value'] else: msg = 'missing a required argument: %r' raise TypeError(msg % k) from None if len(inputs) < len(args): raise TypeError('too many positional arguments') from None if self.var_keyword: inputs.update(kw) elif not all(k in inputs for k in kw): k = next(k for k in sorted(kw) if k not in inputs) msg = 'got an unexpected keyword argument %r' raise TypeError(msg % k) from None return inputs
[docs] def __call__(self, *args, _stopper=None, _executor=False, _sol_name=(), _verbose=False, **kw): # Namespace shortcuts. self.solution = sol = self._sol._copy_structure() sol.verbose = _verbose self.solution.full_name, dfl = _sol_name, self.dsp.default_values # Parse inputs. inp = self._parse_inputs(*args, **kw) i = tuple(k for k in inp if k not in self.dsp.data_nodes) if i: msg = "%s() got an unexpected keyword argument '%s'" raise TypeError(msg % (self.function_id, min(i))) inputs_dist = combine_dicts( sol.inputs_dist, dict.fromkeys(inp, 0), self.inputs_dist or {} ) inp.update({k: v['value'] for k, v in dfl.items() if k not in inp}) # Initialize. sol._init_workflow(inp, inputs_dist=inputs_dist, clean=False) # Dispatch outputs. sol._run(stopper=_stopper, executor=_executor) # Return outputs sorted. return self._return(sol)
[docs] class SubDispatchPipe(SubDispatchFunction): """ It converts a :class:`~schedula.dispatcher.Dispatcher` into a function. This function takes a sequence of arguments as input of the dispatch. :return: A function that executes the pipe of the given `dsp`. :rtype: callable .. seealso:: :func:`~schedula.dispatcher.Dispatcher.dispatch`, :func:`~schedula.dispatcher.Dispatcher.shrink_dsp` **Example**: A dispatcher with two functions `max` and `min` and an unresolved cycle (i.e., `a` --> `max` --> `c` --> `min` --> `a`): .. dispatcher:: dsp :opt: graph_attr={'ratio': '1'} >>> from schedula import Dispatcher >>> dsp = Dispatcher(name='Dispatcher') >>> dsp.add_function('max', max, inputs=['a', 'b'], outputs=['c']) 'max' >>> def func(x): ... return x - 1 >>> dsp.add_function('x - 1', func, inputs=['c'], outputs=['a']) 'x - 1' Extract a static function node, i.e. the inputs `a` and `b` and the output `a` are fixed:: >>> fun = SubDispatchPipe(dsp, 'myF', ['a', 'b'], ['a']) >>> fun.__name__ 'myF' >>> fun(2, 1) 1 .. dispatcher:: fun :opt: workflow=True, graph_attr={'ratio': '1'} >>> fun.dsp.name = 'Created function internal' The created function raises a ValueError if un-valid inputs are provided: .. dispatcher:: fun :opt: workflow=True, graph_attr={'ratio': '1'} :code: >>> fun(1, 0) 0 """ var_keyword = None
[docs] def __init__(self, dsp, function_id=None, inputs=None, outputs=None, inputs_dist=None, no_domain=True, wildcard=True, shrink=True, output_type=None, output_type_kw=None, first_arg_as_kw=False): """ Initializes the Sub-dispatch Function. :param dsp: A dispatcher that identifies the model adopted. :type dsp: schedula.Dispatcher | schedula.utils.blue.BlueDispatcher :param function_id: Function name. :type function_id: str :param inputs: Input data nodes. :type inputs: list[str], iterable :param outputs: Ending data nodes. :type outputs: list[str], iterable, optional :param inputs_dist: Initial distances of input data nodes. :type inputs_dist: dict[str, int | float], optional :param no_domain: Skip the domain check. :type no_domain: bool, optional :param shrink: If True the dispatcher is shrink before the dispatch. :type shrink: bool, optional :param wildcard: If True, when the data node is used as input and target in the ArciDispatch algorithm, the input value will be used as input for the connected functions, but not as output. If it is equal to 2, the the data node that cannot be calculated are excluded by the wildcard condition. :type wildcard: bool,int, optional :param output_type: Type of function output: + 'all': a dictionary with all dispatch outputs. + 'list': a list with all outputs listed in `outputs`. + 'dict': a dictionary with any outputs listed in `outputs`. :type output_type: str, optional :param output_type_kw: Extra kwargs to pass to the `selector` function. :type output_type_kw: dict, optional :param first_arg_as_kw: Converts first argument of the __call__ method as `kwargs`. :type output_type_kw: bool """ self.solution = sol = dsp.solution.__class__( dsp, inputs, outputs, wildcard, inputs_dist, True, True, no_domain=no_domain ) sol._run() if shrink: from .alg import _union_workflow, _convert_bfs bfs = _union_workflow(sol) o, bfs = outputs or sol, _convert_bfs(bfs) dsp = dsp._get_dsp_from_bfs(o, bfs_graphs=bfs) super(SubDispatchPipe, self).__init__( dsp, function_id, inputs, outputs=outputs, inputs_dist=inputs_dist, shrink=False, wildcard=wildcard, output_type=output_type, output_type_kw=output_type_kw, first_arg_as_kw=first_arg_as_kw ) self._reset_sol() self.pipe = self._set_pipe()
[docs] def _reset_sol(self): self._sol.no_call = True self._sol._init_workflow() self._sol._run() for s in self._sol.sub_sol.values(): s.no_call = False
[docs] def _set_pipe(self): def _make_tks(task): v, s = task[-1] if v is START: nxt_nds = s.dsp.dmap[v] else: nxt_nds = s.workflow[v] nxt_dsp = [n for n in nxt_nds if s.nodes[n]['type'] == 'dispatcher'] nxt_dsp = [(n, s._edge_length(s.dmap[v][n], s.nodes[n])) for n in nxt_dsp] return (task[0], task[1], (v, s)), nxt_nds, nxt_dsp return [_make_tks(v['task']) for v in self._sol.pipe.values()]
[docs] def _init_new_solution(self, full_name, verbose): key_map, sub_sol = {}, {} for k, s in self._sol.sub_sol.items(): ns = s._copy_structure(dist=1) ns.verbose = verbose ns.fringe = None ns.sub_sol = sub_sol ns.full_name = full_name + s.full_name key_map[s] = ns sub_sol[ns.index] = ns return key_map[self._sol], lambda x: key_map[x]
[docs] def _init_workflows(self, inputs): self.solution.inputs.update(inputs) for s in self.solution.sub_sol.values(): s._init_workflow(clean=False)
[docs] def _callback_pipe_failure(self): pass
[docs] def _pipe_append(self): return self.solution._pipe.append
[docs] def __call__(self, *args, _stopper=None, _executor=False, _sol_name=(), _verbose=False, **kw): self.solution, key_map = self._init_new_solution(_sol_name, _verbose) pipe_append = self._pipe_append() self._init_workflows(self._parse_inputs(*args, **kw)) for x, nxt_nds, nxt_dsp in self.pipe: v, s = x[-1] s = key_map(s) pipe_append(x[:2] + ((v, s),)) if not s._set_node_output( v, False, next_nds=nxt_nds, stopper=_stopper, executor=_executor): self._callback_pipe_failure() break for n, vw_d in nxt_dsp: s._set_sub_dsp_node_input(v, n, [], False, vw_d) s._see_remote_link_node(v) # Return outputs sorted. return self._return(self.solution)
[docs] class NoSub: """Class for avoiding to add a sub solution to the workflow."""
[docs] class DispatchPipe(NoSub, SubDispatchPipe): """ It converts a :class:`~schedula.dispatcher.Dispatcher` into a function. This function takes a sequence of arguments as input of the dispatch. :return: A function that executes the pipe of the given `dsp`, updating its workflow. :rtype: callable .. note:: This wrapper is not thread safe, because it overwrite the solution. .. seealso:: :func:`~schedula.dispatcher.Dispatcher.dispatch`, :func:`~schedula.dispatcher.Dispatcher.shrink_dsp` **Example**: A dispatcher with two functions `max` and `min` and an unresolved cycle (i.e., `a` --> `max` --> `c` --> `min` --> `a`): .. dispatcher:: dsp :opt: graph_attr={'ratio': '1'} >>> from schedula import Dispatcher >>> dsp = Dispatcher(name='Dispatcher') >>> dsp.add_function('max', max, inputs=['a', 'b'], outputs=['c']) 'max' >>> def func(x): ... return x - 1 >>> dsp.add_function('x - 1', func, inputs=['c'], outputs=['a']) 'x - 1' Extract a static function node, i.e. the inputs `a` and `b` and the output `a` are fixed:: >>> fun = DispatchPipe(dsp, 'myF', ['a', 'b'], ['a']) >>> fun.__name__ 'myF' >>> fun(2, 1) 1 .. dispatcher:: fun :opt: workflow=True, graph_attr={'ratio': '1'} >>> fun.dsp.name = 'Created function internal' The created function raises a ValueError if un-valid inputs are provided: .. dispatcher:: fun :opt: workflow=True, graph_attr={'ratio': '1'} :code: >>> fun(1, 0) 0 """
[docs] def __getstate__(self): self._init_workflows(dict.fromkeys(self.inputs or ())) self._reset_sol() state = super(DispatchPipe, self).__getstate__() del state['pipe'] return state
[docs] def __setstate__(self, d): super(DispatchPipe, self).__setstate__(d) self.pipe = self._set_pipe()
[docs] def _pipe_append(self): return lambda *args: None
[docs] def _init_new_solution(self, _sol_name, verbose): from .asy import EXECUTORS EXECUTORS.set_active(id(self._sol)) return self._sol, lambda x: x
[docs] def _init_workflows(self, inputs): for s in self.solution.sub_sol.values(): s._visited.clear() return super(DispatchPipe, self)._init_workflows(inputs)
[docs] def _return(self, solution): # noinspection PyBroadException try: solution.result() except Exception: self._callback_pipe_failure() return super(DispatchPipe, self)._return(solution)
[docs] def _callback_pipe_failure(self): raise DispatcherError("The pipe is not respected.", sol=self.solution)
[docs] def plot(self, workflow=None, *args, **kwargs): if workflow: return self.solution.plot(*args, **kwargs) return super(DispatchPipe, self).plot(workflow, *args, **kwargs)
[docs] def _get_par_args(func, exl_kw=False): par = collections.OrderedDict() for k, v in _get_signature(func, 0)._parameters.items(): if v.kind >= v.VAR_POSITIONAL or (exl_kw and v.default is not v.empty): break par[k] = v return par
[docs] def add_function(dsp, inputs_kwargs=False, inputs_defaults=False, **kw): """ Decorator to add a function to a dispatcher. :param dsp: A dispatcher. :type dsp: schedula.Dispatcher | schedula.blue.BlueDispatcher :param inputs_kwargs: Do you want to include kwargs as inputs? :type inputs_kwargs: bool :param inputs_defaults: Do you want to set default values? :type inputs_defaults: bool :param kw: See :func:~`schedula.dispatcher.Dispatcher.add_function`. :return: Decorator. :rtype: callable **------------------------------------------------------------------------** **Example**: .. dispatcher:: sol :opt: graph_attr={'ratio': '1'} :code: >>> import schedula as sh >>> dsp = sh.Dispatcher(name='Dispatcher') >>> @sh.add_function(dsp, outputs=['e']) ... @sh.add_function(dsp, False, True, outputs=['i'], inputs='ecah') ... @sh.add_function(dsp, True, outputs=['l']) ... def f(a, b, c, d=1): ... return (a + b) - c + d >>> @sh.add_function(dsp, True, outputs=['d']) ... def g(e, i, *args, d=0): ... return e + i + d >>> sol = dsp({'a': 1, 'b': 2, 'c': 3}); sol Solution({'a': 1, 'b': 2, 'c': 3, 'h': 1, 'e': 1, 'i': 4, 'd': 5, 'l': 5}) """ def decorator(f): dsp.add_func( f, inputs_kwargs=inputs_kwargs, inputs_defaults=inputs_defaults, **kw ) return f return decorator
[docs] @dataclass(repr=False, frozen=True, eq=False) class inf: """Class to model infinite numbers for workflow distance.""" _inf: float = 0 _num: float = 0
[docs] def __iter__(self): yield self._inf yield self._num
[docs] @staticmethod def format(val): if not isinstance(val, tuple): val = 0, val return inf(*val)
[docs] def __repr__(self): if self._inf == 0: return str(self._num) return 'inf(inf={}, num={})'.format(*self)
[docs] def __add__(self, other): if isinstance(other, self.__class__): return inf(self._inf + other._inf, self._num + other._num) return inf(self._inf, self._num + other)
[docs] def __sub__(self, other): other = isinstance(other, self.__class__) and other or (0, other) return inf(*(x - y for x, y in zip(self, other)))
[docs] def __rsub__(self, other): other = isinstance(other, self.__class__) and other or (0, other) return inf(*(x - y for x, y in zip(other, self)))
[docs] def __mul__(self, other): other = isinstance(other, self.__class__) and other or (other, other) return inf(*(x * y for x, y in zip(self, other)))
[docs] def __truediv__(self, other): other = isinstance(other, self.__class__) and other or (other, other) return inf(*(x / y for x, y in zip(self, other)))
[docs] def __rtruediv__(self, other): other = isinstance(other, self.__class__) and other or (other, other) return inf(*(x / y for x, y in zip(other, self)))
[docs] def __pow__(self, other): other = isinstance(other, self.__class__) and other or (other, other) return inf(*(x ** y for x, y in zip(self, other)))
[docs] def __rpow__(self, other): other = isinstance(other, self.__class__) and other or (other, other) return inf(*(x ** y for x, y in zip(other, self)))
[docs] def __mod__(self, other): other = isinstance(other, self.__class__) and other or (other, other) return inf(*(x % y for x, y in zip(self, other)))
[docs] def __rmod__(self, other): other = isinstance(other, self.__class__) and other or (other, other) return inf(*(x % y for x, y in zip(other, self)))
[docs] def __floordiv__(self, other): other = isinstance(other, self.__class__) and other or (other, other) return inf(*(x // y for x, y in zip(self, other)))
[docs] def __rfloordiv__(self, other): other = isinstance(other, self.__class__) and other or (other, other) return inf(*(x // y for x, y in zip(other, self)))
[docs] def __neg__(self): return inf(*(-x for x in self))
[docs] def __pos__(self): return inf(*(+x for x in self))
[docs] def __abs__(self): return inf(*(map(abs, self)))
[docs] def __trunc__(self): return inf(*(map(math.trunc, self)))
[docs] def __floor__(self): return inf(*(map(math.floor, self)))
[docs] def __ceil__(self): return inf(*(map(math.ceil, self)))
[docs] def __round__(self, n=None): return inf(*(round(x, n) for x in self))
__radd__ = __add__ __rmul__ = __mul__
[docs] def __eq__(self, other): try: return self._inf == other._inf and self._num == other._num except AttributeError: return self._inf == 0 and self._num == other
[docs] def __lt__(self, other): try: a = other._inf return self._inf < a or (self._inf == a and self._num < other._num) except AttributeError: return self._inf < 0 or (self._inf == 0 and self._num < other)
[docs] def __le__(self, other): try: a = other._inf return self._inf < a or (self._inf == a and self._num <= other._num) except AttributeError: return self._inf < 0 or (self._inf == 0 and self._num <= other)
[docs] def __gt__(self, other): try: a = other._inf return self._inf > a or (self._inf == a and self._num > other._num) except AttributeError: return self._inf > 0 or (self._inf == 0 and self._num > other)
[docs] def __ge__(self, other): try: a = other._inf return self._inf > a or (self._inf == a and self._num >= other._num) except AttributeError: return self._inf > 0 or (self._inf == 0 and self._num >= other)
[docs] def __ne__(self, other): return not self == other