8.2.8.19. MapDispatch

class MapDispatch(dsp=None, *args, **kwargs)[source]

It dynamically builds a Dispatcher that is used to invoke recursivelly a dispatching function that is defined by a constructor function that takes a dsp base model as input.

The created function takes a list of dictionaries as input that are used to invoke the mapping function and returns a list of outputs.

Returns:

A function that executes the dispatch of the given Dispatcher.

Return type:

callable

See also

SubDispatch()

Example:

A simple example on how to use the MapDispatch():

>>> from schedula import Dispatcher, MapDispatch
>>> dsp = Dispatcher(name='model')
...
>>> def fun(a, b):
...     return a + b, a - b
...
>>> dsp.add_func(fun, ['c', 'd'], inputs_kwargs=True)
'fun'
>>> map_func = MapDispatch(dsp, constructor_kwargs={
...     'outputs': ['c', 'd'], 'output_type': 'list'
... })
>>> map_func([{'a': 1, 'b': 2}, {'a': 2, 'b': 2}, {'a': 3, 'b': 2}])
[[3, -1], [4, 0], [5, 1]]

The execution model is created dynamically according to the length of the provided inputs. Moreover, the MapDispatch() has the possibility to define default values, that are recursively merged with the input provided to the dispatching function as follow:

>>> map_func([{'a': 1}, {'a': 3, 'b': 3}], defaults={'b': 2})
[[3, -1], [6, 0]]

The MapDispatch() can also be used as a partial reducing function, i.e., part of the outpus of the previous step are used as input for the successive execution of the dispatching function. For example:

>>> map_func = MapDispatch(dsp, recursive_inputs={'c': 'b'})
>>> map_func([{'a': 1, 'b': 1}, {'a': 2}, {'a': 3}])
[Solution({'a': 1, 'b': 1, 'c': 2, 'd': 0}),
 Solution({'a': 2, 'b': 2, 'c': 4, 'd': 0}),
 Solution({'a': 3, 'b': 4, 'c': 7, 'd': -1})]

Methods

__call__

Call self as a function.

__deepcopy__

__delattr__

Implement delattr(self, name).

__dir__

Default dir() implementation.

__eq__

Return self==value.

__format__

Default object formatter.

__ge__

Return self>=value.

__getattribute__

Return getattr(self, name).

__getstate__

Helper for pickle.

__gt__

Return self>value.

__hash__

Return hash(self).

__init__

Initializes the MapDispatch function.

__init_subclass__

This method is called when a class is subclassed.

__le__

Return self<=value.

__lt__

Return self<value.

__ne__

Return self!=value.

__new__

__reduce__

Helper for pickle.

__reduce_ex__

Helper for pickle.

__repr__

Return repr(self).

__setattr__

Implement setattr(self, name, value).

__setstate__

__sizeof__

Size of object in memory, in bytes.

__str__

Return str(self).

__subclasshook__

Abstract classes can override this to customize issubclass().

_init_dsp

_return

blue

Constructs a Blueprint out of the current object.

copy

form

Creates a dispatcher Form Flask app.

format_clusters

format_labels

get_node

Returns a sub node of a dispatcher.

plot

Plots the Dispatcher with a graph in the DOT language with Graphviz.

prepare_inputs

recursive_data

web

Creates a dispatcher Flask app.

__init__(dsp, defaults=None, recursive_inputs=None, constructor=<class 'schedula.utils.dsp.SubDispatch'>, constructor_kwargs=None, function_id=None, func_kw=<function MapDispatch.<lambda>>, input_label='inputs<{}>', output_label='outputs<{}>', data_label='data<{}>', cluster_label='task<{}>', **kwargs)[source]

Initializes the MapDispatch function.

Parameters:
  • dsp (schedula.Dispatcher | schedula.utils.blue.BlueDispatcher) – A dispatcher that identifies the base model.

  • defaults (dict) – Defaults values that are recursively merged with the input provided to the dispatching function.

  • recursive_inputs (list | dict) – List of data node ids that are extracted from the outputs of the dispatching function and then merged with the inputs of the its successive evaluation. If a dictionary is given, this is used to rename the data node ids extracted.

  • constructor (function | class) – It initializes the dispatching function.

  • constructor_kwargs (function | class) – Extra keywords passed to the constructor function.

  • function_id (str, optional) – Function name.

  • func_kw (function, optional) – Extra keywords to add the dispatching function to execution model.

  • input_label (str, optional) – Custom label formatter for recursive inputs.

  • output_label (str, optional) – Custom label formatter for recursive outputs.

  • data_label (str, optional) – Custom label formatter for recursive internal data.

  • kwargs (object) – Keywords to initialize the execution model.

Attributes

__annotations__

__dict__

__doc__

__module__

__weakref__

list of weak references to the object

__init__(dsp, defaults=None, recursive_inputs=None, constructor=<class 'schedula.utils.dsp.SubDispatch'>, constructor_kwargs=None, function_id=None, func_kw=<function MapDispatch.<lambda>>, input_label='inputs<{}>', output_label='outputs<{}>', data_label='data<{}>', cluster_label='task<{}>', **kwargs)[source]

Initializes the MapDispatch function.

Parameters:
  • dsp (schedula.Dispatcher | schedula.utils.blue.BlueDispatcher) – A dispatcher that identifies the base model.

  • defaults (dict) – Defaults values that are recursively merged with the input provided to the dispatching function.

  • recursive_inputs (list | dict) – List of data node ids that are extracted from the outputs of the dispatching function and then merged with the inputs of the its successive evaluation. If a dictionary is given, this is used to rename the data node ids extracted.

  • constructor (function | class) – It initializes the dispatching function.

  • constructor_kwargs (function | class) – Extra keywords passed to the constructor function.

  • function_id (str, optional) – Function name.

  • func_kw (function, optional) – Extra keywords to add the dispatching function to execution model.

  • input_label (str, optional) – Custom label formatter for recursive inputs.

  • output_label (str, optional) – Custom label formatter for recursive outputs.

  • data_label (str, optional) – Custom label formatter for recursive internal data.

  • kwargs (object) – Keywords to initialize the execution model.

static prepare_inputs(inputs, defaults)[source]
static recursive_data(recursive_inputs, input_data, outputs)[source]
static format_labels(it, label)[source]
static format_clusters(it, label)[source]
_init_dsp(defaults, inputs, recursive_inputs=None)[source]
__call__(inputs, defaults=None, recursive_inputs=None, _stopper=None, _executor=False, _sol_name=(), _verbose=False)[source]

Call self as a function.

__annotations__ = {}
__doc__ = "\n    It dynamically builds a :class:`~schedula.dispatcher.Dispatcher` that is\n    used to invoke recursivelly a *dispatching function* that is defined\n    by a constructor function that takes a `dsp` base model as input.\n\n    The created function takes a list of dictionaries as input that are used to\n    invoke the mapping function and returns a list of outputs.\n\n    :return:\n        A function that executes the dispatch of the given\n        :class:`~schedula.dispatcher.Dispatcher`.\n    :rtype: callable\n\n    .. seealso:: :func:`~schedula.utils.dsp.SubDispatch`\n\n    Example:\n\n    A simple example on how to use the :func:`~schedula.utils.dsp.MapDispatch`:\n\n    .. dispatcher:: map_func\n       :opt: graph_attr={'ratio': '1'}, depth=-1, workflow=True\n       :code:\n\n        >>> from schedula import Dispatcher, MapDispatch\n        >>> dsp = Dispatcher(name='model')\n        ...\n        >>> def fun(a, b):\n        ...     return a + b, a - b\n        ...\n        >>> dsp.add_func(fun, ['c', 'd'], inputs_kwargs=True)\n        'fun'\n        >>> map_func = MapDispatch(dsp, constructor_kwargs={\n        ...     'outputs': ['c', 'd'], 'output_type': 'list'\n        ... })\n        >>> map_func([{'a': 1, 'b': 2}, {'a': 2, 'b': 2}, {'a': 3, 'b': 2}])\n        [[3, -1], [4, 0], [5, 1]]\n\n    The execution model is created dynamically according to the length of the\n    provided inputs. Moreover, the :func:`~schedula.utils.dsp.MapDispatch` has\n    the possibility to define default values, that are recursively merged with\n    the input provided to the *dispatching function* as follow:\n\n    .. dispatcher:: map_func\n       :opt: graph_attr={'ratio': '1'}, depth=-1, workflow=True\n       :code:\n\n        >>> map_func([{'a': 1}, {'a': 3, 'b': 3}], defaults={'b': 2})\n        [[3, -1], [6, 0]]\n\n    The :func:`~schedula.utils.dsp.MapDispatch` can also be used as a partial\n    reducing function, i.e., part of the outpus of the previous step are used as\n    input for the successive execution of the *dispatching function*. For\n    example:\n\n    .. dispatcher:: map_func\n       :opt: graph_attr={'ratio': '1'}, depth=-1, workflow=True\n       :code:\n\n        >>> map_func = MapDispatch(dsp, recursive_inputs={'c': 'b'})\n        >>> map_func([{'a': 1, 'b': 1}, {'a': 2}, {'a': 3}])\n        [Solution({'a': 1, 'b': 1, 'c': 2, 'd': 0}),\n         Solution({'a': 2, 'b': 2, 'c': 4, 'd': 0}),\n         Solution({'a': 3, 'b': 4, 'c': 7, 'd': -1})]\n    "
__module__ = 'schedula.utils.dsp'