Source code for schedula.utils.asy.factory

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# Copyright 2015-2020, Vincenzo Arcidiacono;
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at:

It defines the `ExecutorFactory` class.
import weakref
import threading
from ..cst import EMPTY
from ..dsp import get_nested_dicts

[docs]class ExecutorFactory(dict):
[docs] def __init__(self, *args, **kwargs): super(ExecutorFactory, self).__init__(*args, **kwargs) self._executors = {} self._lock = threading.Lock() weakref.finalize(self, self.shutdown_executor, wait=False)
def __getstate__(self): it = self.__dict__.items() return {k: v for k, v in it if k != '_lock' and k != '_executors'} def __setstate__(self, state): self.__dict__ = state self._executors = {} self._lock = threading.Lock() weakref.finalize(self, self.shutdown_executor, wait=False)
[docs] @staticmethod def executor_id(name, sol): if name is True: name = sol.dsp.executor return name, id(sol)
[docs] def get_executor(self, exe_id): name, sol_id = exe_id if name is not False: d = get_nested_dicts(self._executors, name) if get_nested_dicts(d, 'active', sol_id, default=lambda: True): default = self.get(name, lambda: None) with self._lock: return get_nested_dicts(d, 'executor', default=default) else: from .executors import PoolExecutor # noinspection PyTypeChecker return PoolExecutor(None)
[docs] def set_executor(self, name, value): get_nested_dicts(self._executors, name)['executor'] = value
def _filter_executors(self, name=EMPTY, sol_id=EMPTY): bn, bs = name is EMPTY, sol_id is EMPTY for n, d in self._executors.items(): if (bn or n == name) and (bs or sol_id in d['active']): yield n, d
[docs] def set_active(self, sol_id, value=True): for k, d in self._filter_executors(sol_id=sol_id): d['active'][sol_id] = value
[docs] def pop_active(self, sol_id): for k, d in self._filter_executors(sol_id=sol_id): d['active'].pop(sol_id, None)
[docs] def shutdown_executor(self, name=EMPTY, sol_id=EMPTY, wait=True): data = dict(self._filter_executors(name=name, sol_id=sol_id)) if wait: from concurrent.futures import wait as _wait_fut futures = set() for d in data.values(): if d.get('executor'): futures.update(d['executor'].get_futures(sol_id)) # noinspection PyCallingNonCallable,PyUnboundLocalVariable _wait_fut(futures) with self._lock: tasks, force = {}, sol_id is EMPTY for k, d in self._filter_executors(name=name, sol_id=sol_id): active = d.get('active', {}) active[sol_id] = False if force or not any(active.values()): active.update(dict.fromkeys(active, False)) tasks[k] = d.pop('executor', None) return {k: e.shutdown(0) for k, e in tasks.items() if e}