#!/usr/bin/env python
# -*- coding: UTF-8 -*-
#
# Copyright 2015-2020, Vincenzo Arcidiacono;
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl
"""
It defines the executors classes.
"""
import weakref
import threading
import functools
from . import EXECUTORS
from ..cst import EMPTY
from concurrent.futures._base import Error
from concurrent.futures import Future, wait as _wait_fut
from ..dsp import parent_func, SubDispatch, NoSub, get_nested_dicts
from ..exc import ExecutorShutdown, DispatcherError, DispatcherAbort
def _safe_set_result(fut, value):
try:
not fut.done() and fut.set_result(value)
except Error:
pass
def _safe_set_exception(fut, value):
try:
not fut.done() and fut.set_exception(value)
except Error:
pass
def _process_funcs(
exe_id, funcs, executor, *args, stopper=None, sol_name=None, **kw):
res, sid = [], exe_id[-1]
for fn in funcs:
if stopper and stopper.is_set():
raise DispatcherAbort
pfunc, r = parent_func(fn), {}
if isinstance(pfunc, SubDispatch):
try:
r['res'] = fn(*args, _stopper=stopper, _executor=executor,
_sol_name=sol_name, **kw)
except DispatcherError as ex:
if isinstance(pfunc, NoSub):
raise ex
r['err'] = ex
r['sol'] = pfunc.solution
else:
e = EXECUTORS.get_executor(exe_id)
r['res'] = e.process(sid, fn, *args, **kw) if e else fn(*args, **kw)
res.append(r)
if 'err' in r:
break
args, kw = (r['res'],), {}
return res
[docs]class Executor:
"""Base Executor"""
[docs] def __init__(self):
self.tasks = {}
weakref.finalize(self, self.shutdown, False)
def __reduce__(self):
return self.__class__, ()
def _set_future(self, fut, res):
self.tasks.pop(fut)
if 'err' in res:
_safe_set_exception(fut, res['err'])
else:
_safe_set_result(fut, res['res'])
return fut
@staticmethod
def _target(send, func, args, kwargs):
try:
send({'res': func(*args, **kwargs)})
except BaseException as ex:
send({'err': ex})
[docs] def shutdown(self, wait=True):
tasks = dict(self.tasks)
if wait:
# noinspection PyCallingNonCallable
_wait_fut(tasks)
for fut, task in tasks.items():
_safe_set_exception(fut, ExecutorShutdown)
try:
task.terminate()
except AttributeError:
pass
return tasks
[docs] def submit(self, func, *args, **kwargs):
raise NotImplemented
[docs]class ProcessExecutor(Executor):
"""Multi Process Executor"""
def __reduce__(self):
return self.__class__, (self._ctx,)
[docs] def __init__(self, mp_context=None):
super(ProcessExecutor, self).__init__()
if not mp_context:
# noinspection PyUnresolvedReferences
from multiprocess import get_context
mp_context = get_context()
self._ctx = mp_context
[docs] def submit(self, func, *args, **kwargs):
# noinspection PyUnresolvedReferences
fut, (c0, c1) = Future(), self._ctx.Pipe(duplex=False)
self.tasks[fut] = task = self._ctx.Process(
target=self._target, args=(c1.send, func, args, kwargs)
)
task.start()
return self._set_future(fut, c0.recv())
[docs]class ThreadExecutor(Executor):
"""Multi Thread Executor"""
[docs] def submit(self, func, *args, **kwargs):
fut, send = Future(), lambda res: self._set_future(fut, res)
task = threading.Thread(
target=self._target, args=(send, func, args, kwargs)
)
self.tasks[fut], task.daemon = task, True
task.start()
return fut
[docs]class ProcessPoolExecutor(Executor):
"""Process Pool Executor"""
def __reduce__(self):
return self.__class__, (
self._max_workers, self._ctx, self._initializer, self._initargs
)
[docs] def __init__(self, max_workers=None, mp_context=None, initializer=None,
initargs=()):
super(ProcessPoolExecutor, self).__init__()
if not mp_context:
# noinspection PyUnresolvedReferences
from multiprocess import get_context
mp_context = get_context()
self._max_workers = max_workers
self._ctx = mp_context
self._initializer = initializer
self._initargs = initargs
self.lock = self._ctx.Lock()
self.pool = None
def _init(self):
with self.lock:
if self.pool is None:
self.pool = self._ctx.Pool(
processes=self._max_workers, initializer=self._initializer,
initargs=self._initargs,
)
[docs] def submit(self, func, *args, **kwargs):
self._init()
fut = Future()
callback = functools.partial(_safe_set_result, fut)
error_callback = functools.partial(_safe_set_exception, fut)
self.tasks[fut] = self.pool.apply_async(
func, args, kwargs, callback, error_callback
)
fut.add_done_callback(self.tasks.pop)
return fut
[docs] def shutdown(self, wait=True):
super(ProcessPoolExecutor, self).shutdown(wait)
with self.lock:
if self.pool:
self.pool.terminate()
self.pool.join()
[docs]class PoolExecutor:
"""General PoolExecutor to dispatch asynchronously and in parallel."""
[docs] def __init__(self, thread_executor, process_executor=None, parallel=None):
"""
:param thread_executor:
Thread pool executor to dispatch asynchronously.
:type thread_executor: ThreadExecutor
:param process_executor:
Process pool executor to execute in parallel the functions calls.
:type process_executor: ProcessExecutor | ProcessPoolExecutor
:param parallel:
Run `_process_funcs` in parallel.
:type parallel: bool
"""
self._thread = thread_executor
self._process = process_executor
self._parallel = parallel
self._running = bool(thread_executor)
self.futures = {}
weakref.finalize(self, self.shutdown, False)
def __reduce__(self):
return self.__class__, (self._thread, self._process, self._parallel)
[docs] def add_future(self, sol_id, fut):
get_nested_dicts(self.futures, fut, default=set).add(sol_id)
fut.add_done_callback(self.futures.pop)
return fut
[docs] def get_futures(self, sol_id=EMPTY):
if sol_id is EMPTY:
return self.futures
else:
return {k for k, v in self.futures.items() if sol_id in v}
[docs] def thread(self, sol_id, *args, **kwargs):
if self._running:
return self.add_future(sol_id, self._thread.submit(*args, **kwargs))
fut = Future()
fut.set_exception(ExecutorShutdown)
return fut
[docs] def process_funcs(self, exe_id, funcs, *args, **kw):
not_sub = self._process and not any(map(
lambda x: isinstance(x, SubDispatch) and not isinstance(x, NoSub),
map(parent_func, funcs)
))
if self._parallel is not False and not_sub or self._parallel:
sid = exe_id[-1]
exe_id = False, sid
return self.process(sid, _process_funcs, exe_id, funcs, *args, **kw)
return _process_funcs(exe_id, funcs, *args, **kw)
[docs] def process(self, sol_id, fn, *args, **kwargs):
if self._running:
if self._process:
fut = self._process.submit(fn, *args, **kwargs)
return self.add_future(sol_id, fut).result()
return fn(*args, **kwargs)
raise ExecutorShutdown
[docs] def wait(self, timeout=None):
_wait_fut(self.futures, timeout)
[docs] def shutdown(self, wait=True):
if self._running:
wait and self.wait()
self._running = False
tasks = dict(
executor=self,
tasks=dict(
process=self._process and self._process.shutdown(0) or {},
thread=self._thread.shutdown(0)
)
)
self.futures = {}
self._process = self._thread = None
return tasks