Source code for schedula.utils.asy

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
#
# Copyright 2015-2020, Vincenzo Arcidiacono;
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl

"""
It contains functions to dispatch asynchronously and  in parallel.
"""


def _async_executor():
    return PoolExecutor(ThreadExecutor())


def _parallel_executor():
    return PoolExecutor(ThreadExecutor(), ProcessExecutor())


def _parallel_pool_executor():
    return PoolExecutor(ThreadExecutor(), ProcessPoolExecutor(), False)


def _parallel_dispatch_executor():
    return PoolExecutor(ThreadExecutor(), ProcessExecutor(), True)


_EXECUTORS = {}
EXECUTORS = {
    'async': _async_executor,
    'parallel': _parallel_executor,
    'parallel-pool': _parallel_pool_executor,
    'parallel-dispatch': _parallel_dispatch_executor
}


def _executor_name(name, dsp):
    if name is True:
        name = dsp.executor
    return name


def _get_executor(name):
    if name is not False:
        if name not in _EXECUTORS and name in EXECUTORS:
            _EXECUTORS[name] = EXECUTORS[name]()
        return _EXECUTORS.get(name)


[docs]def register_executor(name, init): """ Register a new executor type. :param name: Executor name. :type name: str :param init: Function to initialize the executor. :type init: callable """ EXECUTORS[name] = init
[docs]def shutdown_executor(name, wait=True): """ Clean-up the resources associated with the Executor. :param name: Executor name. :type name: str :param wait: If True then shutdown will not return until all running futures have finished executing and the resources used by the executor have been reclaimed. :type wait: bool :return: Shutdown pool executor. :rtype: dict[concurrent.futures.Future,threading.Thread|multiprocess.Process] """ return _EXECUTORS.pop(name).shutdown(wait)
[docs]def shutdown_executors(wait=True): """ Clean-up the resources of all initialized executors. :param wait: If True then shutdown will not return until all running futures have finished executing and the resources used by the executors have been reclaimed. :type wait: bool :return: Shutdown pool executor. :rtype: dict[str,dict] """ return {k: shutdown_executor(k, wait) for k in list(_EXECUTORS.keys())}
def _process_funcs(name, funcs, executor, *args, stopper=None, sol_name=None, **kw): from .exc import DispatcherError, DispatcherAbort from .dsp import parent_func, SubDispatch, NoSub res, e = [], _get_executor(name) for fn in funcs: if stopper and stopper.is_set(): raise DispatcherAbort pfunc, r = parent_func(fn), {} if isinstance(pfunc, SubDispatch): try: r['res'] = fn(*args, _stopper=stopper, _executor=executor, _sol_name=sol_name, **kw) except DispatcherError as ex: if isinstance(pfunc, NoSub): raise ex r['err'] = ex r['sol'] = pfunc.solution else: r['res'] = e.process(fn, *args, **kw) if e else fn(*args, **kw) res.append(r) if 'err' in r: break args, kw = (r['res'],), {} return res
[docs]def async_process(funcs, *args, executor=False, sol=None, callback=None, **kw): """ Execute `func(*args)` in an asynchronous parallel process. :param funcs: Functions to be executed. :type funcs: list[callable] :param args: Arguments to be passed to first function call. :type args: tuple :param executor: Pool executor to run the function. :type executor: str | bool :param sol: Parent solution. :type sol: schedula.utils.sol.Solution :param callback: Callback function to be called after all function execution. :type callback: callable :param kw: Keywords to be passed to first function call. :type kw: dict :return: Functions result. :rtype: object """ name = _executor_name(executor, sol.dsp) e = _get_executor(name) res = (e and e.process_funcs or _process_funcs)( name, funcs, executor, *args, **kw ) for r in res: callback and callback('sol' in r, r.get('sol', r.get('res'))) if 'err' in r: raise r['err'] return res[-1]['res']
def _async_eval(sol, args, node_attr, *a, **kw): try: if node_attr['type'] == 'data' and ( node_attr['wait_inputs'] or 'function' in node_attr): args = {k: await_result(v) for k, v in args[0].items()}, else: args = tuple(map(await_result, args)) except BaseException as ex: raise ex else: return sol._evaluate_node(args, node_attr, *a, **kw) def _await_result(result, timeout, sol, node_id): from .exc import SkipNode try: return await_result(result, None if timeout is True else timeout) except Exception as ex: attr = sol.workflow.nodes[node_id] if 'started' in attr: import time attr['duration'] = time.time() - attr['started'] # Some error occurs. msg = "Failed DISPATCHING '%s' due to:\n %r" sol._warning(msg, node_id, ex) raise SkipNode(ex=ex)
[docs]def async_thread(sol, args, node_attr, node_id, *a, **kw): """ Execute `sol._evaluate_node` in an asynchronous thread. :param sol: Solution to be updated. :type sol: schedula.utils.sol.Solution :param args: Arguments to be passed to node calls. :type args: tuple :param node_attr: Dictionary of node attributes. :type node_attr: dict :param node_id: Data or function node id. :type node_id: str :param a: Extra args to invoke `sol._evaluate_node`. :type a: tuple :param kw: Extra kwargs to invoke `sol._evaluate_node`. :type kw: dict :return: Function result. :rtype: concurrent.futures.Future | AsyncList """ executor = _get_executor(_executor_name(kw.get('executor', False), sol.dsp)) if not executor: return sol._evaluate_node(args, node_attr, node_id, *a, **kw) futures = args if node_attr['type'] == 'data' and ( node_attr['wait_inputs'] or 'function' in node_attr): futures = args[0].values() from concurrent.futures import Future futures = {v for v in futures if isinstance(v, Future)} def _submit(): return executor.thread( _async_eval, sol, args, node_attr, node_id, *a, **kw ) if futures: # Chain results. result = Future() def _set_res(fut): try: result.set_result(fut.result()) except BaseException as ex: result.set_exception(ex) def _submit_task(fut=None): futures.discard(fut) not futures and _submit().add_done_callback(_set_res) for f in list(futures): f.add_done_callback(_submit_task) else: result = _submit() timeout = node_attr.get('await_result', False) if timeout is not False: return _await_result(result, timeout, sol, node_id) n = len(node_attr.get('outputs', [])) return AsyncList(future=result, n=n) if n > 1 else result
[docs]class Executor: """Base Executor"""
[docs] def __init__(self): self.tasks = {}
def __reduce__(self): return self.__class__, () def _set_future(self, fut, res): self.tasks.pop(fut) if 'err' in res: fut.set_exception(res['err']) else: fut.set_result(res['res']) return fut @staticmethod def _target(send, func, args, kwargs): try: send({'res': func(*args, **kwargs)}) except BaseException as ex: send({'err': ex})
[docs] def shutdown(self, wait=True): from .exc import ExecutorShutdown from concurrent.futures import wait as wait_fut tasks = dict(self.tasks) if wait: wait_fut(tasks) for fut, task in tasks.items(): not fut.done() and fut.set_exception(ExecutorShutdown) try: task.terminate() except AttributeError: pass return tasks
[docs] def submit(self, func, *args, **kwargs): raise NotImplemented
[docs]class ProcessExecutor(Executor): """Multi Process Executor"""
[docs] def submit(self, func, *args, **kwargs): # noinspection PyUnresolvedReferences from multiprocess import Process, Pipe from concurrent.futures import Future fut, (c0, c1) = Future(), Pipe(False) task = Process(target=self._target, args=(c1.send, func, args, kwargs)) self.tasks[fut] = task task.start() return self._set_future(fut, c0.recv())
[docs]class ThreadExecutor(Executor): """Multi Thread Executor"""
[docs] def submit(self, func, *args, **kwargs): from threading import Thread from concurrent.futures import Future fut, send = Future(), lambda res: self._set_future(fut, res) task = Thread(target=self._target, args=(send, func, args, kwargs)) self.tasks[fut], task.daemon = task, True task.start() return fut
[docs]class ProcessPoolExecutor(Executor): """Process Pool Executor"""
[docs] def __init__(self): super(ProcessPoolExecutor, self).__init__() import os from multiprocess import Pool self.pool = Pool(os.cpu_count() or 1)
[docs] def submit(self, func, *args, **kwargs): from concurrent.futures import Future fut = Future() self.tasks[fut] = self.pool.apply_async( func, args, kwargs, fut.set_result, fut.set_exception ) fut.add_done_callback(self.tasks.pop) return fut
[docs] def shutdown(self, wait=True): super(ProcessPoolExecutor, self).shutdown(wait) self.pool.terminate() self.pool.join()
[docs]class PoolExecutor: """General PoolExecutor to dispatch asynchronously and in parallel."""
[docs] def __init__(self, thread_executor, process_executor=None, parallel=None): """ :param thread_executor: Thread pool executor to dispatch asynchronously. :type thread_executor: ThreadExecutor :param process_executor: Process pool executor to execute in parallel the functions calls. :type process_executor: ProcessExecutor | ProcessPoolExecutor :param parallel: Run `_process_funcs` in parallel. :type parallel: bool """ self._thread = thread_executor self._process = process_executor self._parallel = parallel
[docs] def thread(self, *args, **kwargs): return self._thread.submit(*args, **kwargs)
[docs] def process_funcs(self, name, funcs, *args, **kw): from .dsp import parent_func, SubDispatch, NoSub not_sub = self._process and not any(map( lambda x: isinstance(x, SubDispatch) and not isinstance(x, NoSub), map(parent_func, funcs) )) if self._parallel is not False and not_sub or self._parallel: return self.process(_process_funcs, False, funcs, *args, **kw) return _process_funcs(name, funcs, *args, **kw)
[docs] def process(self, fn, *args, **kwargs): if self._process: fut = self._process.submit(fn, *args, **kwargs) return fut.result() return fn(*args, **kwargs)
[docs] def shutdown(self, wait=True): return { 'executor': self, 'tasks': { 'process': self._process and self._process.shutdown(wait) or {}, 'thread': self._thread.shutdown(wait) } }
[docs]class AsyncList(list): "List of asynchronous results."
[docs] def __init__(self, *, future=None, n=1): super(AsyncList, self).__init__() from concurrent.futures import Future self.extend(Future() for _ in range(n)) future.add_done_callback(self)
def __call__(self, future): try: res = tuple(future.result()) assert len(self) <= len(res) except BaseException as ex: for fut in self: fut.set_exception(ex) else: for fut, value in zip(self, res): fut.set_result(value) return future
[docs]def await_result(obj, timeout=None): """ Return the result of a `Future` object. :param obj: Value object. :type obj: concurrent.futures.Future | object :param timeout: The number of seconds to wait for the result if the future isn't done. If None, then there is no limit on the wait time. :type timeout: int :return: Result. :rtype: object Example:: >>> from concurrent.futures import Future >>> fut = Future() >>> fut.set_result(3) >>> await_result(fut), await_result(4) (3, 4) """ from concurrent.futures import Future return obj.result(timeout) if isinstance(obj, Future) else obj