Source code for schedula.utils.asy.executors

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
#
# Copyright 2015-2024, Vincenzo Arcidiacono;
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl

"""
It defines the executors classes.
"""
import time
import functools
from ..cst import EMPTY
from . import _process_funcs
from ..exc import ExecutorShutdown
from ..imp import Future, finalize, Error
from ..dsp import parent_func, SubDispatch, NoSub, get_nested_dicts


def _safe_set_result(fut, value):
    try:
        not fut.done() and fut.set_result(value)
    except Error:
        pass


def _safe_set_exception(fut, value):
    try:
        not fut.done() and fut.set_exception(value)
    except Error:
        pass


[docs] class Executor: """Base Executor"""
[docs] def __init__(self): self.tasks = {} finalize(self, self.shutdown, False)
def __reduce__(self): return self.__class__, () def _set_future(self, fut, res): self.tasks.pop(fut) if 'err' in res: _safe_set_exception(fut, res['err']) else: _safe_set_result(fut, res['res']) return fut @staticmethod def _target(send, func, args, kwargs): try: obj = {'res': func(*args, **kwargs)} except BaseException as ex: obj = {'err': ex} if send: send(obj) else: return obj
[docs] def shutdown(self, wait=True): tasks = dict(self.tasks) if wait: from concurrent.futures import wait as _wait_fut # noinspection PyCallingNonCallable _wait_fut(tasks) for fut, task in tasks.items(): _safe_set_exception(fut, ExecutorShutdown) for _ in range(100): try: hasattr(task, 'terminate') and task.terminate() break except AttributeError: time.sleep(0.01) pass else: raise ValueError('Task could not terminate!') return tasks
[docs] def submit(self, func, *args, **kwargs): fut, send = Future(), lambda res: self._set_future(fut, res) self.tasks[fut] = None self._target(send, func, args, kwargs) return fut
[docs] class ThreadExecutor(Executor): """Multi Thread Executor"""
[docs] def submit(self, func, *args, **kwargs): import threading fut, send = Future(), lambda res: self._set_future(fut, res) task = threading.Thread( target=self._target, args=(send, func, args, kwargs) ) self.tasks[fut], task.daemon = task, True task.start() return fut
[docs] class ProcessExecutor(Executor): """Process Executor""" _init = None _init_args = () _init_kwargs = {} _shutdown = None def _submit(self, func, args, kwargs): # noinspection PyUnresolvedReferences from multiprocess import get_context ctx = get_context() fut, (c0, c1) = Future(), ctx.Pipe(duplex=False) self.tasks[fut] = task = ctx.Process( target=self._target, args=(c1.send, func, args, kwargs) ) task.start() return self._set_future(fut, c0.recv()) def __reduce__(self): return self.__class__, (), { '_init': self._init, '_submit': self._submit, '_shutdown': self._shutdown, '_init_args': self._init_args, '_init_kwargs': self._init_kwargs }
[docs] def __init__(self, *args, **state): super(ProcessExecutor, self).__init__() import threading self.lock = threading.Lock() for k, v in state.items(): setattr(self, k, v)
[docs] def init(self): if self._init: with self.lock: self._init()
[docs] def submit(self, func, *args, **kwargs): self.init() return self._submit(func, args, kwargs)
[docs] def shutdown(self, wait=True): tasks = super(ProcessExecutor, self).shutdown(wait) if self._shutdown: with self.lock: self._shutdown() return tasks
[docs] class ProcessPoolExecutor(ProcessExecutor): """Process Pool Executor""" def _init(self): if getattr(self, 'pool', None) is None: # noinspection PyUnresolvedReferences from multiprocess import get_context ctx = get_context() self.pool = ctx.Pool(*self._init_args, **self._init_kwargs) def _submit(self, func, args, kwargs): fut = Future() callback = functools.partial(_safe_set_result, fut) error_callback = functools.partial(_safe_set_exception, fut) self.tasks[fut] = self.pool.apply_async( func, args, kwargs, callback, error_callback ) fut.add_done_callback(self.tasks.pop) return fut def _shutdown(self): if getattr(self, 'pool', None): self.pool.terminate() self.pool.join()
[docs] class PoolExecutor: """General PoolExecutor to dispatch asynchronously and in parallel."""
[docs] def __init__(self, thread_executor, process_executor=None, parallel=None): """ :param thread_executor: Thread pool executor to dispatch asynchronously. :type thread_executor: ThreadExecutor :param process_executor: Process pool executor to execute in parallel the functions calls. :type process_executor: ProcessExecutor | ProcessPoolExecutor :param parallel: Run `_process_funcs` in parallel. :type parallel: bool """ self._thread = thread_executor self._process = process_executor self._parallel = parallel self._running = bool(thread_executor) self.futures = {} finalize(self, self.shutdown, False)
def __reduce__(self): return self.__class__, (self._thread, self._process, self._parallel)
[docs] def add_future(self, sol_id, fut): get_nested_dicts(self.futures, fut, default=set).add(sol_id) fut.add_done_callback(self.futures.pop) return fut
[docs] def get_futures(self, sol_id=EMPTY): if sol_id is EMPTY: return self.futures else: return {k for k, v in self.futures.items() if sol_id in v}
[docs] def thread(self, sol_id, *args, **kwargs): if self._running: return self.add_future(sol_id, self._thread.submit(*args, **kwargs)) fut = Future() fut.set_exception(ExecutorShutdown) return fut
[docs] def process_funcs(self, exe_id, funcs, *args, **kw): not_sub = self._process and not any(map( lambda x: isinstance(x, SubDispatch) and not isinstance(x, NoSub), map(parent_func, funcs) )) if self._parallel is not False and not_sub or self._parallel: sid = exe_id[-1] exe_id = False, sid return self.process(sid, _process_funcs, exe_id, funcs, *args, **kw) return _process_funcs(exe_id, funcs, *args, **kw)
[docs] def process(self, sol_id, fn, *args, **kwargs): if self._running: if self._process: fut = self._process.submit(fn, *args, **kwargs) return self.add_future(sol_id, fut).result() return fn(*args, **kwargs) raise ExecutorShutdown
[docs] def wait(self, timeout=None): from concurrent.futures import wait as _wait_fut _wait_fut(self.futures, timeout)
[docs] def shutdown(self, wait=True): if self._running: wait and self.wait() self._running = False tasks = { 'executor': self, 'tasks': { 'process': self._process and self._process.shutdown( 0 ) or {}, 'thread': self._thread.shutdown(0) } } self.futures = {} self._process = self._thread = None return tasks