#!/usr/bin/env python
# -*- coding: UTF-8 -*-
#
# Copyright 2015-2024, Vincenzo Arcidiacono;
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl
"""
It contains functions to dispatch asynchronously and in parallel.
Sub-Modules:
.. currentmodule:: schedula.utils.asy
.. autosummary::
:nosignatures:
:toctree: asy/
executors
factory
"""
from ..imp import Future
from ..cst import EMPTY
from .factory import ExecutorFactory
from ..exc import DispatcherError, DispatcherAbort
from ..dsp import parent_func, SubDispatch, NoSub, run_model
def _sync_executor():
from .executors import PoolExecutor, Executor
# noinspection PyTypeChecker
return PoolExecutor(Executor())
def _async_executor():
from .executors import PoolExecutor, ThreadExecutor
return PoolExecutor(ThreadExecutor())
def _parallel_executor(*args, **kwargs):
from .executors import PoolExecutor, ThreadExecutor, ProcessExecutor
return PoolExecutor(ThreadExecutor(), ProcessExecutor(*args, **kwargs))
def _parallel_pool_executor(*args, **kwargs):
from .executors import PoolExecutor, ThreadExecutor, ProcessPoolExecutor
return PoolExecutor(
ThreadExecutor(), ProcessPoolExecutor(*args, **kwargs), False
)
def _parallel_dispatch_executor():
from .executors import PoolExecutor, ThreadExecutor, ProcessExecutor
return PoolExecutor(ThreadExecutor(), ProcessExecutor(), True)
EXECUTORS = ExecutorFactory({
'sync': _sync_executor,
'async': _async_executor,
'parallel': _parallel_executor,
'parallel-pool': _parallel_pool_executor,
'parallel-dispatch': _parallel_dispatch_executor
})
[docs]
def register_executor(name, init, executors=None):
"""
Register a new executor type.
:param name:
Executor name.
:type name: str
:param init:
Function to initialize the executor.
:type init: callable
:param executors:
Executor factory.
:type executors: ExecutorFactory
"""
if executors is None:
executors = EXECUTORS
executors[name] = init
[docs]
def shutdown_executor(name=EMPTY, sol_id=EMPTY, wait=True, executors=None):
"""
Clean-up the resources associated with the Executor.
:param name:
Executor name.
:type name: str
:param sol_id:
Solution id.
:type sol_id: int
:param wait:
If True then shutdown will not return until all running futures have
finished executing and the resources used by the executor have been
reclaimed.
:type wait: bool
:param executors:
Executor factory.
:type executors: ExecutorFactory
:return:
Shutdown pool executor.
:rtype: dict[concurrent.futures.Future,Thread|Process]
"""
if executors is None:
executors = EXECUTORS
return executors.shutdown_executor(name, sol_id, wait)
[docs]
def shutdown_executors(wait=True, executors=None):
"""
Clean-up the resources of all initialized executors.
:param wait:
If True then shutdown will not return until all running futures have
finished executing and the resources used by the executors have been
reclaimed.
:type wait: bool
:param executors:
Executor factory.
:type executors: ExecutorFactory
:return:
Shutdown pool executor.
:rtype: dict[str,dict]
"""
return shutdown_executor(wait=wait, executors=executors)
def _process_funcs(
exe_id, funcs, executor, *args, stopper=None, sol_name=None,
verbose=False, **kw):
from ...dispatcher import Dispatcher
res, sid = [], exe_id[-1]
for fn in funcs:
if stopper and stopper.is_set():
raise DispatcherAbort
pfunc, r = parent_func(fn), {}
if isinstance(pfunc, type) and issubclass(pfunc, run_model):
fn = fn(*args)
args, kw = (), {}
pfunc = fn.func
if isinstance(pfunc, (SubDispatch, Dispatcher)):
try:
if isinstance(pfunc, Dispatcher):
r['res'] = fn(*args, stopper=stopper, executor=executor,
sol_name=sol_name, verbose=verbose, **kw)
else:
r['res'] = fn(*args, _stopper=stopper, _executor=executor,
_sol_name=sol_name, _verbose=verbose, **kw)
except DispatcherError as ex:
if isinstance(pfunc, NoSub):
raise ex
r['err'] = ex
if not isinstance(pfunc, NoSub):
r['sol'] = pfunc.solution
else:
e = EXECUTORS.get_executor(exe_id)
r['res'] = e.process(sid, fn, *args, **kw) if e else fn(*args, **kw)
res.append(r)
if 'err' in r:
break
args, kw = (r['res'],), {}
return res
[docs]
def async_process(funcs, *args, executor=False, sol=None, callback=None, **kw):
"""
Execute `func(*args)` in an asynchronous parallel process.
:param funcs:
Functions to be executed.
:type funcs: list[callable]
:param args:
Arguments to be passed to first function call.
:type args: tuple
:param executor:
Pool executor to run the function.
:type executor: str | bool
:param sol:
Parent solution.
:type sol: schedula.utils.sol.Solution
:param callback:
Callback function to be called after all function execution.
:type callback: callable
:param kw:
Keywords to be passed to first function call.
:type kw: dict
:return:
Functions result.
:rtype: object
"""
exe_id = EXECUTORS.executor_id(executor, sol)
exe = EXECUTORS.get_executor(exe_id)
res = (exe and exe.process_funcs or _process_funcs)(
exe_id, funcs, executor, *args, **kw
)
for r in res:
callback and callback('sol' in r, r.get('sol', r.get('res')))
if 'err' in r:
raise r['err']
return res[-1]['res']
def _async_eval(sol, args, node_attr, *a, **kw):
try:
if node_attr['type'] == 'data' and (
node_attr['wait_inputs'] or 'function' in node_attr):
args = {k: await_result(v) for k, v in args[0].items()},
else:
args = tuple(map(await_result, args))
except BaseException as ex:
raise ex
else:
return sol._evaluate_node(args, node_attr, *a, **kw)
def _await_result(result, timeout, sol, node_id):
from ..exc import SkipNode
try:
return await_result(result, None if timeout is True else timeout)
except Exception as ex:
sol._ended(sol.workflow.nodes[node_id], node_id)
# Some error occurs.
msg = "Failed DISPATCHING '%s' due to:\n %r"
sol._warning(msg, node_id, ex)
raise SkipNode(ex=ex)
[docs]
def async_thread(sol, args, node_attr, node_id, *a, **kw):
"""
Execute `sol._evaluate_node` in an asynchronous thread.
:param sol:
Solution to be updated.
:type sol: schedula.utils.sol.Solution
:param args:
Arguments to be passed to node calls.
:type args: tuple
:param node_attr:
Dictionary of node attributes.
:type node_attr: dict
:param node_id:
Data or function node id.
:type node_id: str
:param a:
Extra args to invoke `sol._evaluate_node`.
:type a: tuple
:param kw:
Extra kwargs to invoke `sol._evaluate_node`.
:type kw: dict
:return:
Function result.
:rtype: concurrent.futures.Future | AsyncList
"""
name = kw.get('executor', False)
exe_id = EXECUTORS.executor_id(name, sol)
sid = exe_id[-1]
executor = EXECUTORS.get_executor(exe_id)
if not executor:
return sol._evaluate_node(args, node_attr, node_id, *a, **kw)
futures = args
if node_attr['type'] == 'data' and (
node_attr['wait_inputs'] or 'function' in node_attr):
futures = args[0].values()
futures = {v for v in futures if isinstance(v, Future)}
def _submit():
return EXECUTORS.get_executor(exe_id).thread(
sid, _async_eval, sol, args, node_attr, node_id, *a, **kw
)
if futures: # Chain results.
result = executor.add_future(sid, Future())
from .executors import _safe_set_exception, _safe_set_result
def _set_res(fut):
try:
_safe_set_result(result, fut.result())
except BaseException as ex:
_safe_set_exception(result, ex)
def _submit_task(fut=None):
futures.discard(fut)
if not (futures or result.done()):
_submit().add_done_callback(_set_res)
for f in list(futures):
f.add_done_callback(_submit_task)
else:
result = _submit()
timeout = node_attr.get('await_result', False)
if timeout is not False:
return _await_result(result, timeout, sol, node_id)
n = len(node_attr.get('outputs', []))
if n > 1:
result_list = AsyncList(future=result, n=n)
for r in result_list:
executor.add_future(sid, r)
return result_list
return result
[docs]
class AsyncList(list):
"""List of asynchronous results."""
[docs]
def __init__(self, *, future=None, n=1):
super(AsyncList, self).__init__()
self.extend(Future() for _ in range(n))
future.add_done_callback(self)
def __call__(self, future):
from .executors import _safe_set_result, _safe_set_exception
try:
res = tuple(future.result())
assert len(self) <= len(res)
except BaseException as ex:
for fut in self:
_safe_set_exception(fut, ex)
else:
for fut, value in zip(self, res):
_safe_set_result(fut, value)
return future
[docs]
def await_result(obj, timeout=None):
"""
Return the result of a `Future` object.
:param obj:
Value object.
:type obj: concurrent.futures.Future | object
:param timeout:
The number of seconds to wait for the result if the future isn't done.
If None, then there is no limit on the wait time.
:type timeout: int
:return:
Result.
:rtype: object
Example::
>>> from concurrent.futures import Future
>>> fut = Future()
>>> fut.set_result(3)
>>> await_result(fut), await_result(4)
(3, 4)
"""
return obj.result(timeout) if isinstance(obj, Future) else obj
[docs]
def atexit_register(*args, **kwargs):
try:
from atexit import register as _register
except ImportError:
try:
from atexit import atexit_register as _register
except ImportError: # MicroPython.
_register = None
if _register is not None:
_register(*args, **kwargs)
return _register
atexit_register(shutdown_executors, wait=False)