#!/usr/bin/env python
# -*- coding: UTF-8 -*-
#
# Copyright 2015-2020, Vincenzo Arcidiacono;
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl
"""
It contains functions to dispatch asynchronously and in parallel.
Sub-Modules:
.. currentmodule:: schedula.utils.asy
.. autosummary::
:nosignatures:
:toctree: asy/
executors
factory
"""
import atexit
from ..cst import EMPTY
from .factory import ExecutorFactory
def _async_executor():
from .executors import PoolExecutor, ThreadExecutor
return PoolExecutor(ThreadExecutor())
def _parallel_executor(*args, **kwargs):
from .executors import PoolExecutor, ThreadExecutor, ProcessExecutor
return PoolExecutor(ThreadExecutor(), ProcessExecutor(*args, **kwargs))
def _parallel_pool_executor(*args, **kwargs):
from .executors import PoolExecutor, ThreadExecutor, ProcessPoolExecutor
return PoolExecutor(
ThreadExecutor(), ProcessPoolExecutor(*args, **kwargs), False
)
def _parallel_dispatch_executor():
from .executors import PoolExecutor, ThreadExecutor, ProcessExecutor
return PoolExecutor(ThreadExecutor(), ProcessExecutor(), True)
EXECUTORS = ExecutorFactory({
'async': _async_executor,
'parallel': _parallel_executor,
'parallel-pool': _parallel_pool_executor,
'parallel-dispatch': _parallel_dispatch_executor
})
[docs]def register_executor(name, init):
"""
Register a new executor type.
:param name:
Executor name.
:type name: str
:param init:
Function to initialize the executor.
:type init: callable
"""
EXECUTORS[name] = init
[docs]def shutdown_executor(name=EMPTY, sol_id=EMPTY, wait=True):
"""
Clean-up the resources associated with the Executor.
:param name:
Executor name.
:type name: str
:param sol_id:
Solution id.
:type sol_id: int
:param wait:
If True then shutdown will not return until all running futures have
finished executing and the resources used by the executor have been
reclaimed.
:type wait: bool
:return:
Shutdown pool executor.
:rtype: dict[concurrent.futures.Future,Thread|Process]
"""
return EXECUTORS.shutdown_executor(name, sol_id, wait)
[docs]def shutdown_executors(wait=True):
"""
Clean-up the resources of all initialized executors.
:param wait:
If True then shutdown will not return until all running futures have
finished executing and the resources used by the executors have been
reclaimed.
:type wait: bool
:return:
Shutdown pool executor.
:rtype: dict[str,dict]
"""
return shutdown_executor(wait=wait)
[docs]def async_process(funcs, *args, executor=False, sol=None, callback=None, **kw):
"""
Execute `func(*args)` in an asynchronous parallel process.
:param funcs:
Functions to be executed.
:type funcs: list[callable]
:param args:
Arguments to be passed to first function call.
:type args: tuple
:param executor:
Pool executor to run the function.
:type executor: str | bool
:param sol:
Parent solution.
:type sol: schedula.utils.sol.Solution
:param callback:
Callback function to be called after all function execution.
:type callback: callable
:param kw:
Keywords to be passed to first function call.
:type kw: dict
:return:
Functions result.
:rtype: object
"""
from .executors import _process_funcs
exe_id = EXECUTORS.executor_id(executor, sol)
exe = EXECUTORS.get_executor(exe_id)
res = (exe and exe.process_funcs or _process_funcs)(
exe_id, funcs, executor, *args, **kw
)
for r in res:
callback and callback('sol' in r, r.get('sol', r.get('res')))
if 'err' in r:
raise r['err']
return res[-1]['res']
def _async_eval(sol, args, node_attr, *a, **kw):
try:
if node_attr['type'] == 'data' and (
node_attr['wait_inputs'] or 'function' in node_attr):
args = {k: await_result(v) for k, v in args[0].items()},
else:
args = tuple(map(await_result, args))
except BaseException as ex:
raise ex
else:
return sol._evaluate_node(args, node_attr, *a, **kw)
def _await_result(result, timeout, sol, node_id):
from ..exc import SkipNode
try:
return await_result(result, None if timeout is True else timeout)
except Exception as ex:
attr = sol.workflow.nodes[node_id]
if 'started' in attr:
import time
attr['duration'] = time.time() - attr['started']
# Some error occurs.
msg = "Failed DISPATCHING '%s' due to:\n %r"
sol._warning(msg, node_id, ex)
raise SkipNode(ex=ex)
[docs]def async_thread(sol, args, node_attr, node_id, *a, **kw):
"""
Execute `sol._evaluate_node` in an asynchronous thread.
:param sol:
Solution to be updated.
:type sol: schedula.utils.sol.Solution
:param args:
Arguments to be passed to node calls.
:type args: tuple
:param node_attr:
Dictionary of node attributes.
:type node_attr: dict
:param node_id:
Data or function node id.
:type node_id: str
:param a:
Extra args to invoke `sol._evaluate_node`.
:type a: tuple
:param kw:
Extra kwargs to invoke `sol._evaluate_node`.
:type kw: dict
:return:
Function result.
:rtype: concurrent.futures.Future | AsyncList
"""
name = kw.get('executor', False)
exe_id = EXECUTORS.executor_id(name, sol)
sid = exe_id[-1]
executor = EXECUTORS.get_executor(exe_id)
if not executor:
return sol._evaluate_node(args, node_attr, node_id, *a, **kw)
futures = args
if node_attr['type'] == 'data' and (
node_attr['wait_inputs'] or 'function' in node_attr):
futures = args[0].values()
from concurrent.futures import Future
futures = {v for v in futures if isinstance(v, Future)}
def _submit():
return EXECUTORS.get_executor(exe_id).thread(
sid, _async_eval, sol, args, node_attr, node_id, *a, **kw
)
if futures: # Chain results.
result = executor.add_future(sid, Future())
from .executors import _safe_set_exception, _safe_set_result
def _set_res(fut):
try:
_safe_set_result(result, fut.result())
except BaseException as ex:
_safe_set_exception(result, ex)
def _submit_task(fut=None):
futures.discard(fut)
if not (futures or result.done()):
_submit().add_done_callback(_set_res)
for f in list(futures):
f.add_done_callback(_submit_task)
else:
result = _submit()
timeout = node_attr.get('await_result', False)
if timeout is not False:
return _await_result(result, timeout, sol, node_id)
n = len(node_attr.get('outputs', []))
if n > 1:
result_list = AsyncList(future=result, n=n)
for r in result_list:
executor.add_future(sid, r)
return result_list
return result
[docs]class AsyncList(list):
"""List of asynchronous results."""
[docs] def __init__(self, *, future=None, n=1):
super(AsyncList, self).__init__()
from concurrent.futures import Future
self.extend(Future() for _ in range(n))
future.add_done_callback(self)
def __call__(self, future):
from .executors import _safe_set_result, _safe_set_exception
try:
res = tuple(future.result())
assert len(self) <= len(res)
except BaseException as ex:
for fut in self:
_safe_set_exception(fut, ex)
else:
for fut, value in zip(self, res):
_safe_set_result(fut, value)
return future
[docs]def await_result(obj, timeout=None):
"""
Return the result of a `Future` object.
:param obj:
Value object.
:type obj: concurrent.futures.Future | object
:param timeout:
The number of seconds to wait for the result if the future isn't done.
If None, then there is no limit on the wait time.
:type timeout: int
:return:
Result.
:rtype: object
Example::
>>> from concurrent.futures import Future
>>> fut = Future()
>>> fut.set_result(3)
>>> await_result(fut), await_result(4)
(3, 4)
"""
from concurrent.futures import Future
return obj.result(timeout) if isinstance(obj, Future) else obj
atexit.register(shutdown_executors, wait=False)