#!/usr/bin/env python
# -*- coding: UTF-8 -*-
#
# Copyright 2015-2019, Vincenzo Arcidiacono;
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl
"""
It provides a solution class for dispatch result.
"""
import collections
import heapq
import logging
import time
from .alg import add_edge_fun, remove_edge_fun, get_full_pipe, _sort_sk_wait_in
from .cst import START, NONE, PLOT
from .dsp import stlp
from .exc import DispatcherError, DispatcherAbort, SkipNode, ExecutorShutdown
from .asy import async_thread, await_result, async_process, AsyncList
from .base import Base
log = logging.getLogger(__name__)
# noinspection PyTypeChecker
[docs]class Solution(Base, collections.OrderedDict):
"""Solution class for dispatch result."""
def __hash__(self):
return id(self)
[docs] def __init__(self, dsp=None, inputs=None, outputs=None, wildcard=False,
cutoff=None, inputs_dist=None, no_call=False,
rm_unused_nds=False, wait_in=None, no_domain=False,
_empty=False, index=(-1,), full_name=()):
super(Solution, self).__init__()
self.index = index
self.rm_unused_nds = rm_unused_nds
self.no_call = no_call
self.no_domain = no_domain
self.cutoff = cutoff
self._wait_in = wait_in or {}
self.outputs = set(outputs or ())
self.full_name = full_name
self._pipe = []
self.parent = dsp
from ..dispatcher import Dispatcher
self._set_dsp_features(dsp or Dispatcher())
if not _empty:
self._set_inputs(inputs, inputs_dist)
# Set wildcards.
self._set_wildcards(*((inputs, outputs) if wildcard else ()))
# Initialize workflow params.
self._init_workflow()
def _input_value(self, inputs=None):
# Define a function that return the input value of a given data node.
if self.no_call:
# noinspection PyUnusedLocal
def input_value(k):
return {}
else:
inputs = self.inputs if inputs is None else inputs
def input_value(k):
return {'value': inputs[k]}
return input_value
def _set_dsp_features(self, dsp):
self.dsp = dsp
self.name = dsp.name
self.nodes = dsp.nodes
self.dmap = dsp.dmap
self.raises = dsp.raises
self._pred = dsp.dmap.pred
self._succ = dsp.dmap.succ
self._edge_length = dsp._edge_length
def _set_inputs(self, inputs, initial_dist):
if self.no_call:
# Set initial values.
initial_values = dict.fromkeys(self.dsp.default_values, NONE)
if inputs is not None: # Update initial values with input values.
initial_values.update(dict.fromkeys(inputs, NONE))
else:
# Set initial values.
initial_values = {k: v['value']
for k, v in self.dsp.default_values.items()}
if inputs is not None: # Update initial values with input values.
initial_values.update(inputs)
# Set initial values.
initial_distances = {k: v['initial_dist']
for k, v in self.dsp.default_values.items()
if not inputs or k not in inputs}
if initial_dist is not None: # Update initial distances.
initial_distances.update(initial_dist)
self.inputs, self.inputs_dist = initial_values, initial_distances
def _set_wildcards(self, inputs=None, outputs=None):
"""
Update wildcards set with the input data nodes that are also outputs.
:param inputs:
Input data nodes.
:type inputs: list[str], iterable, optional
:param outputs:
Ending data nodes.
:type outputs: list[str], iterable, optional
"""
w = self._wildcards = set() # Clean wildcards.
if outputs and inputs:
node, wi = self.nodes, self._wait_in.get # Namespace shortcut.
# Input data nodes that are in output_targets.
w_crd = {u: node[u] for u in inputs if u in outputs or wi(u, False)}
# Data nodes without the wildcard.
w.update([k for k, v in w_crd.items() if v.get('wildcard', True)])
def _update_methods(self):
self._wf_add_edge = add_edge_fun(self.workflow)
self._wf_remove_edge = remove_edge_fun(self.workflow)
self.check_wait_in = self._check_wait_input_flag()
self.check_targets = self._check_targets()
self.check_cutoff = self._check_cutoff()
def _clean_set(self):
self.clear()
from networkx import DiGraph
self.workflow = DiGraph()
self._visited = set()
self._wf_pred = self.workflow.pred
self._errors = collections.OrderedDict()
self.sub_sol = {self.index: self}
self.fringe = [] # Use heapq with (distance, wait, label).
self.dist, self.seen, self._meet = {START: -1}, {START: -1}, {START: -1}
self._update_methods()
self._pipe = []
def _init_workflow(self, inputs=None, input_value=None, inputs_dist=None,
initial_dist=0.0, clean=True):
# Clean previous outputs.
if clean:
self._clean_set()
# Namespace shortcuts for speed.
add_value = self._add_initial_value
self._visited.add(START) # Nodes visited by the algorithm.
# Add the starting node to the workflow graph.
self.workflow.add_node(START, type='start')
if inputs_dist is None: # Update inp dist.
inputs_dist = self.inputs_dist or {}
if inputs is None:
inputs = self.inputs
input_value = input_value or self._input_value(inputs)
# Add initial values to fringe and seen.
it = ((inputs_dist.get(v, 0.0) + initial_dist, v) for v in inputs)
for d, k in sorted(it):
add_value(k, input_value(k), d)
self._add_out_dsp_inputs()
def _close(self, cached_ids):
p = self.index[:-1]
if p:
p = self.sub_sol[p]
if self.index in cached_ids:
k = cached_ids[self.index]
else:
i = self.index[-1:]
k = next(k for k, v in p.nodes.items() if v['index'] == i)
cached_ids[self.index] = k
return not set(p.dmap[k]).difference(p.dist)
return False
[docs] def result(self, timeout=None):
"""
Set all asynchronous results.
:param timeout:
The number of seconds to wait for the result if the futures aren't
done. If None, then there is no limit on the wait time.
:type timeout: float
:return:
Update Solution.
:rtype: Solution
"""
it, exceptions, future_lists = [], [], []
from concurrent.futures import Future, wait as wait_fut
def update(fut, data, key):
if isinstance(fut, Future):
it.append((fut, data, key))
elif isinstance(fut, AsyncList) and fut not in future_lists:
future_lists.append(fut)
it.extend([(j, fut, i)
for i, j in enumerate(fut)
if isinstance(j, Future)][::-1])
for s in self.sub_sol.values():
for k, v in list(s.items()):
update(v, s, k)
for d in s.workflow.nodes.values():
if 'results' in d:
update(d['results'], d, 'results')
for d in s.workflow.edges.values():
if 'value' in d:
update(d['value'], d, 'value')
wait_fut({v[0] for v in it}, timeout)
for f, d, k in it:
try:
d[k] = await_result(f, 0)
except SkipNode as e:
exceptions.append((f, d, k, e.ex))
del d[k]
except (Exception, ExecutorShutdown, DispatcherAbort) as ex:
exceptions.append((f, d, k, ex))
del d[k]
if exceptions:
raise exceptions[0][-1]
return self
def _run(self, stopper=None, executor=False):
# Initialized and terminated dispatcher sets.
dsp_closed, dsp_init, cached_ids = set(), {self.index}, {}
# Reset function pipe.
pipe = self._pipe = []
# A function to check if a dispatcher has been initialized.
check_dsp = dsp_init.__contains__
# Namespaces shortcuts
dsp_init_add, pipe_append = dsp_init.add, pipe.append
dsp_closed_add = dsp_closed.add
fringe, check_cutoff = self.fringe, self.check_cutoff
ctx = {
'no_call': self.no_call, 'stopper': stopper, 'executor': executor
}
def _dsp_closed_add(s):
dsp_closed_add(s.index)
for val in s.dsp.sub_dsp_nodes.values():
_s = s.sub_sol.get(s.index + val['index'], None)
if _s:
_dsp_closed_add(_s)
while fringe:
# Visit the closest available node.
n = (d, _, (v, sol)) = heapq.heappop(fringe)
# Skip terminated sub-dispatcher or visited nodes.
if sol.index in dsp_closed or (v is not START and v in sol.dist):
continue
# Close sub-dispatcher solution when all outputs are satisfied.
if sol._close(cached_ids):
_dsp_closed_add(sol)
cached_ids.pop(sol.index)
continue
dsp_init_add(sol.index) # Update initialized dispatcher sets.
pipe_append(n) # Add node to the pipe.
# Set and visit nodes.
if not sol._visit_nodes(v, d, fringe, check_cutoff, **ctx):
if self is sol:
break # Reach all targets.
else:
_dsp_closed_add(sol) # Terminated sub-dispatcher.
# See remote link node.
sol._see_remote_link_node(v, fringe, d, check_dsp)
if self.rm_unused_nds: # Remove unused func and sub-dsp nodes.
self._remove_unused_nodes()
return self # Data outputs.
[docs] def get_sub_dsp_from_workflow(self, sources, reverse=False,
add_missing=False, check_inputs=True):
"""
Returns the sub-dispatcher induced by the workflow from sources.
The induced sub-dispatcher of the dsp contains the reachable nodes and
edges evaluated with breadth-first-search on the workflow graph from
source nodes.
:param sources:
Source nodes for the breadth-first-search.
A container of nodes which will be iterated through once.
:type sources: list[str], iterable
:param reverse:
If True the workflow graph is assumed as reversed.
:type reverse: bool, optional
:param add_missing:
If True, missing function' inputs are added to the sub-dispatcher.
:type add_missing: bool, optional
:param check_inputs:
If True the missing function' inputs are not checked.
:type check_inputs: bool, optional
:return:
A sub-dispatcher.
:rtype: schedula.dispatcher.Dispatcher
"""
sub_dsp = self.dsp.get_sub_dsp_from_workflow(
sources, self.workflow, reverse=reverse, add_missing=add_missing,
check_inputs=check_inputs
)
return sub_dsp # Return the sub-dispatcher map.
@property
def pipe(self):
"""Returns the full pipe of a dispatch run."""
return get_full_pipe(self)
def _copy_structure(self, **kwargs):
sol = self.__class__(
self.dsp, self.inputs, self.outputs, False, self.cutoff,
self.inputs_dist, self.no_call, self.rm_unused_nds, self._wait_in,
self.no_domain, True, self.index, self.full_name
)
sol._clean_set()
it = ['_wildcards', 'inputs', 'inputs_dist']
it += [k for k, v in kwargs.items() if v]
for k in it:
setattr(sol, k, getattr(self, k))
return sol
def __deepcopy__(self, memo):
y = super(Solution, self).__deepcopy__(memo)
y._update_methods()
return y
def _add_out_dsp_inputs(self):
# Nodes that are out of the dispatcher nodes.
o = sorted(set(self.inputs).difference(self.nodes))
# Add nodes that are out of the dispatcher nodes.
if self.no_call:
self.update(collections.OrderedDict.fromkeys(o, None))
else:
self.update(collections.OrderedDict((k, self.inputs[k]) for k in o))
def _check_targets(self):
"""
Returns a function to terminate the ArciDispatch algorithm when all
targets have been visited.
:return:
A function to terminate the ArciDispatch algorithm.
:rtype: (str) -> bool
"""
if self.outputs:
targets = self.outputs.copy() # Namespace shortcut for speed.
def check_targets(node_id):
"""
Terminates ArciDispatch algorithm when all targets have been
visited.
:param node_id:
Data or function node id.
:type node_id: str
:return:
True if all targets have been visited, otherwise False.
:rtype: bool
"""
try:
targets.remove(node_id) # Remove visited node.
return not targets # If no targets terminate the algorithm.
except KeyError: # The node is not in the targets set.
return False
else:
# noinspection PyUnusedLocal
def check_targets(node_id):
return False
return check_targets # Return the function.
def _check_cutoff(self):
"""
Returns a function to stop the search of the investigated node of the
ArciDispatch algorithm.
:return:
A function to stop the search.
:rtype: (int | float) -> bool
"""
if self.cutoff is not None:
cutoff = self.cutoff # Namespace shortcut for speed.
def check_cutoff(distance):
"""
Stops the search of the investigated node of the ArciDispatch
algorithm.
:param distance:
Distance from the starting node.
:type distance: float, int
:return:
True if distance > cutoff, otherwise False.
:rtype: bool
"""
return distance > cutoff # Check cutoff distance.
else: # cutoff is None.
# noinspection PyUnusedLocal
def check_cutoff(distance):
return False
return check_cutoff # Return the function.
def _check_wait_input_flag(self):
"""
Returns a function to stop the search of the investigated node of the
ArciDispatch algorithm.
:return:
A function to stop the search.
:rtype: (bool, str) -> bool
"""
wf_pred = self._wf_pred # Namespace shortcuts.
pred = {k: set(v).issubset for k, v in self._pred.items()}
if self._wait_in:
we = self._wait_in.get # Namespace shortcut.
def check_wait_input_flag(wait_in, n_id):
"""
Stops the search of the investigated node of the ArciDispatch
algorithm, until all inputs are satisfied.
:param wait_in:
If True the node is waiting input estimations.
:type wait_in: bool
:param n_id:
Data or function node id.
:type n_id: str
:return:
True if all node inputs are satisfied, otherwise False.
:rtype: bool
"""
# Return true if the node inputs are satisfied.
if we(n_id, wait_in):
return not pred[n_id](wf_pred[n_id])
return False
else:
def check_wait_input_flag(wait_in, n_id):
# Return true if the node inputs are satisfied.
return wait_in and not pred[n_id](wf_pred[n_id])
return check_wait_input_flag # Return the function.
def _get_node_estimations(self, node_attr, node_id):
"""
Returns the data nodes estimations and `wait_inputs` flag.
:param node_attr:
Dictionary of node attributes.
:type node_attr: dict
:param node_id:
Data node's id.
:type node_id: str
:returns:
- node estimations with minimum distance from the starting node, and
- `wait_inputs` flag
:rtype: (dict[str, T], bool)
"""
# Get data node estimations.
estimations = self._wf_pred[node_id]
wait_in = node_attr['wait_inputs'] # Namespace shortcut.
# Check if node has multiple estimations and it is not waiting inputs.
if len(estimations) > 1 and not self._wait_in.get(node_id, wait_in):
# Namespace shortcuts.
dist, edg_length, adj = self.dist, self._edge_length, self.dmap.adj
est = [] # Estimations' heap.
for k, v in estimations.items(): # Calculate length.
if k is not START:
d = dist[k] + edg_length(adj[k][node_id], node_attr)
heapq.heappush(est, (d, k, v))
# The estimation with minimum distance from the starting node.
estimations = {est[0][1]: est[0][2]}
# Remove unused workflow edges.
self.workflow.remove_edges_from([(v[1], node_id) for v in est[1:]])
return estimations, wait_in # Return estimations and wait_inputs flag.
def _remove_wait_in(self):
ll = _sort_sk_wait_in(self)
n_d = set()
for d, k, _, w in ll:
if d == ll[0][0]:
w[k] = False
if w is self._wait_in:
n_d.add(k)
return n_d, ll
def _set_node_output(self, node_id, no_call, next_nds=None, **kw):
"""
Set the node outputs from node inputs.
:param node_id:
Data or function node id.
:type node_id: str
:param no_call:
If True data node estimation function is not used.
:type no_call: bool
:return:
If the output have been evaluated correctly.
:rtype: bool
"""
# Namespace shortcuts.
node_attr = self.nodes[node_id]
node_type = node_attr['type']
if node_type == 'data': # Set data node.
return self._set_data_node_output(node_id, node_attr, no_call,
next_nds, **kw)
elif node_type == 'function': # Set function node.
return self._set_function_node_output(node_id, node_attr, no_call,
next_nds, **kw)
def _evaluate_function(self, args, node_id, node_attr, attr, stopper=None,
executor=False):
if 'started' not in attr:
attr['started'] = time.time()
def _callback(is_sol, sol):
if is_sol:
attr['solution'] = sol
res = async_process(
[node_attr['function']], *args, stopper=stopper, executor=executor,
sol=self, callback=_callback, sol_name=self.full_name + (node_id,)
)
return res
def _check_function_domain(self, args, node_attr, node_id):
# noinspection PyUnresolvedReferences
attr = self.workflow.nodes[node_id]
if not self.no_domain and 'input_domain' in node_attr:
if node_attr.get('await_domain', True):
args = map(await_result, args)
args = [v for v in args if v is not NONE]
# noinspection PyCallingNonCallable
attr['solution_domain'] = bool(node_attr['input_domain'](*args))
if not attr['solution_domain']:
raise SkipNode
def _evaluate_node(self, args, node_attr, node_id, skip_func=False, **kw):
# noinspection PyUnresolvedReferences
attr = self.workflow.nodes[node_id]
try:
if skip_func:
value = args[0]
else:
args = [v for v in args if v is not NONE]
value = self._evaluate_function(args, node_id, node_attr, attr,
**kw)
value = self._apply_filters(value, node_id, node_attr, attr, **kw)
if 'started' in attr:
attr['duration'] = time.time() - attr['started']
if 'callback' in node_attr: # Invoke callback func of data node.
try:
# noinspection PyCallingNonCallable
node_attr['callback'](value)
except Exception as ex:
msg = "Failed CALLBACK '%s' due to:\n %s"
self._warning(msg, node_id, ex)
return value
except Exception as ex:
if 'started' in attr:
attr['duration'] = time.time() - attr['started']
# Some error occurs.
msg = "Failed DISPATCHING '%s' due to:\n %r"
self._warning(msg, node_id, ex)
raise SkipNode(ex=ex)
def _set_data_node_output(self, node_id, node_attr, no_call, next_nds=None,
**kw):
"""
Set the data node output from node estimations.
:param node_id:
Data node id.
:type node_id: str
:param node_attr:
Dictionary of node attributes.
:type node_attr: dict[str, T]
:param no_call:
If True data node estimations are not used.
:type no_call: bool
:return:
If the output have been evaluated correctly.
:rtype: bool
"""
# Get data node estimations.
est, wait_in = self._get_node_estimations(node_attr, node_id)
if not no_call:
if node_id is PLOT:
est = est.copy()
est[PLOT] = {'value': {'obj': self}}
sf, args = False, ({k: v['value'] for k, v in est.items()},)
if not (wait_in or 'function' in node_attr):
# Data node that has just one estimation value.
sf, args = True, tuple(args[0].values())
try:
# Final estimation of the node and node status.
value = async_thread(self, args, node_attr, node_id, sf, **kw)
except SkipNode:
return False
if value is not NONE: # Set data output.
self[node_id] = value
value = {'value': value} # Output value.
else:
self[node_id] = NONE # Set data output.
value = {} # Output value.
if next_nds:
# namespace shortcuts for speed.
wf_add_edge = self._wf_add_edge
for u in next_nds: # Set workflow.
wf_add_edge(node_id, u, **value)
else:
# namespace shortcuts for speed.
n, has, sub_sol = self.nodes, self.workflow.has_edge, self.sub_sol
def no_visited_in_sub_dsp(i):
node = n[i]
if node['type'] == 'dispatcher' and has(i, node_id):
visited = sub_sol[self.index + node['index']]._visited
return node['inputs'][node_id] not in visited
return True
# List of functions.
succ_fun = [u for u in self._succ[node_id]
if no_visited_in_sub_dsp(u)]
# Check if it has functions as outputs and wildcard condition.
if succ_fun and succ_fun[0] not in self._visited:
# namespace shortcuts for speed.
wf_add_edge = self._wf_add_edge
for u in succ_fun: # Set workflow.
wf_add_edge(node_id, u, **value)
return True # Return that the output have been evaluated correctly.
def _apply_filters(self, res, node_id, node_attr, attr, stopper=None,
executor=False):
filters, funcs = [res], node_attr.get('filters', ())
if funcs:
if 'started' not in attr:
attr['started'] = time.time()
attr['solution_filters'] = filters
# noinspection PyUnusedLocal
def _callback(is_sol, sol):
filters.append(sol)
res = async_process(
funcs, res, stopper=stopper, executor=executor, sol=self,
callback=_callback, sol_name=self.full_name + (node_id,)
)
return res
def _set_function_node_output(self, node_id, node_attr, no_call,
next_nds=None, **kw):
"""
Set the function node output from node inputs.
:param node_id:
Function node id.
:type node_id: str
:param node_attr:
Dictionary of node attributes.
:type node_attr: dict[str, T]
:param no_call:
If True data node estimation function is not used.
:type no_call: bool
:return:
If the output have been evaluated correctly.
:rtype: bool
"""
# Namespace shortcuts for speed.
o_nds, dist = node_attr['outputs'], self.dist
# List of nodes that can still be estimated by the function node.
output_nodes = next_nds or set(self._succ[node_id]).difference(dist)
if not output_nodes: # This function is not needed.
self.workflow.remove_node(node_id) # Remove function node.
return False
wf_add_edge = self._wf_add_edge # Namespace shortcuts for speed.
if no_call:
for u in output_nodes: # Set workflow out.
wf_add_edge(node_id, u)
return True
args = self._wf_pred[node_id] # List of the function's arguments.
args = [args[k]['value'] for k in node_attr['inputs']]
try:
self._check_function_domain(args, node_attr, node_id)
res = async_thread(self, args, node_attr, node_id, **kw)
# noinspection PyUnresolvedReferences
self.workflow.nodes[node_id]['results'] = res
except SkipNode:
return False
# Set workflow.
for k, v in zip(o_nds, res if len(o_nds) > 1 else [res]):
if k in output_nodes and v is not NONE:
wf_add_edge(node_id, k, value=v)
return True # Return that the output have been evaluated correctly.
def _add_initial_value(self, data_id, value, initial_dist=0.0,
fringe=None, check_cutoff=None, no_call=None):
"""
Add initial values updating workflow, seen, and fringe.
:param fringe:
Heapq of closest available nodes.
:type fringe: list[(float | int, bool, (str, Dispatcher)]
:param check_cutoff:
Check the cutoff limit.
:type check_cutoff: (int | float) -> bool
:param no_call:
If True data node estimation function is not used.
:type no_call: bool
:param data_id:
Data node id.
:type data_id: str
:param value:
Data node value e.g., {'value': val}.
:type value: dict[str, T]
:param initial_dist:
Data node initial distance in the ArciDispatch algorithm.
:type initial_dist: float, int, optional
:return:
True if the data has been visited, otherwise false.
:rtype: bool
"""
# Namespace shortcuts for speed.
nodes, seen, edge_weight = self.nodes, self.seen, self._edge_length
wf_remove_edge, check_wait_in = self._wf_remove_edge, self.check_wait_in
wf_add_edge, dsp_in = self._wf_add_edge, self._set_sub_dsp_node_input
update_view = self._update_meeting
if fringe is None:
fringe = self.fringe
if no_call is None:
no_call = self.no_call
check_cutoff = check_cutoff or self.check_cutoff
if data_id not in nodes: # Data node is not in the dmap.
return False
wait_in = nodes[data_id]['wait_inputs'] # Store wait inputs flag.
index = nodes[data_id]['index'] # Store node index.
wf_add_edge(START, data_id, **value) # Add edge.
if data_id in self._wildcards: # Check if the data node has wildcard.
self._visited.add(data_id) # Update visited nodes.
self.workflow.add_node(data_id) # Add node to workflow.
for w, edge_data in self.dmap[data_id].items(): # See func node.
wf_add_edge(data_id, w, **value) # Set workflow.
node = nodes[w] # Node attributes.
# Evaluate distance.
vw_dist = initial_dist + edge_weight(edge_data, node)
update_view(w, vw_dist) # Update view distance.
# Check the cutoff limit and if all inputs are satisfied.
if check_cutoff(vw_dist):
wf_remove_edge(data_id, w) # Remove workflow edge.
continue # Pass the node.
elif node['type'] == 'dispatcher':
dsp_in(data_id, w, fringe, check_cutoff, no_call, vw_dist)
elif check_wait_in(True, w):
continue # Pass the node.
seen[w] = vw_dist # Update distance.
vd = (True, w, self.index + node['index']) # Virtual distance.
heapq.heappush(fringe, (vw_dist, vd, (w, self))) # Add 2 heapq.
return True
update_view(data_id, initial_dist) # Update view distance.
if check_cutoff(initial_dist): # Check the cutoff limit.
wf_remove_edge(START, data_id) # Remove workflow edge.
elif not check_wait_in(wait_in, data_id): # Check inputs.
seen[data_id] = initial_dist # Update distance.
vd = (wait_in, data_id, self.index + index) # Virtual distance.
# Add node to heapq.
heapq.heappush(fringe, (initial_dist, vd, (data_id, self)))
return True
return False
def _update_meeting(self, node_id, dist):
view = self._meet
if node_id in self._meet:
view[node_id] = max(dist, view[node_id])
else:
view[node_id] = dist
def _visit_nodes(self, node_id, dist, fringe, check_cutoff, no_call=False,
**kw):
"""
Visits a node, updating workflow, seen, and fringe..
:param node_id:
Node id to visit.
:type node_id: str
:param dist:
Distance from the starting node.
:type dist: float, int
:param fringe:
Heapq of closest available nodes.
:type fringe: list[(float | int, bool, (str, Dispatcher)]
:param check_cutoff:
Check the cutoff limit.
:type check_cutoff: (int | float) -> bool
:param no_call:
If True data node estimation function is not used.
:type no_call: bool, optional
:return:
False if all dispatcher targets have been reached, otherwise True.
:rtype: bool
"""
# Namespace shortcuts.
wf_rm_edge, wf_has_edge = self._wf_remove_edge, self.workflow.has_edge
edge_weight, nodes = self._edge_length, self.nodes
self.dist[node_id] = dist # Set minimum dist.
self._visited.add(node_id) # Update visited nodes.
if not self._set_node_output(node_id, no_call, **kw): # Set output.
# Some error occurs or inputs are not in the function domain.
return True
if self.check_targets(node_id): # Check if the targets are satisfied.
return False # Stop loop.
for w, e_data in self.dmap[node_id].items():
if not wf_has_edge(node_id, w): # Check wildcard option.
continue
node = nodes[w] # Get node attributes.
vw_d = dist + edge_weight(e_data, node) # Evaluate dist.
if check_cutoff(vw_d): # Check the cutoff limit.
wf_rm_edge(node_id, w) # Remove edge that cannot be see.
continue
if node['type'] == 'dispatcher':
self._set_sub_dsp_node_input(
node_id, w, fringe, check_cutoff, no_call, vw_d)
else: # See the node.
self._see_node(w, fringe, vw_d)
return True
def _see_node(self, node_id, fringe, dist, w_wait_in=0):
"""
See a node, updating seen and fringe.
:param node_id:
Node id to see.
:type node_id: str
:param fringe:
Heapq of closest available nodes.
:type fringe: list[(float | int, bool, (str, Dispatcher)]
:param dist:
Distance from the starting node.
:type dist: float, int
:param w_wait_in:
Additional weight for sorting correctly the nodes in the fringe.
:type w_wait_in: int, float
:return:
True if the node is visible, otherwise False.
:rtype: bool
"""
# Namespace shortcuts.
seen, dists = self.seen, self.dist
wait_in = self.nodes[node_id]['wait_inputs'] # Wait inputs flag.
self._update_meeting(node_id, dist) # Update view distance.
# Check if inputs are satisfied.
if self.check_wait_in(wait_in, node_id):
pass # Pass the node
elif node_id in dists: # The node w already estimated.
if dist < dists[node_id]: # Error for negative paths.
raise DispatcherError('Contradictory paths found: '
'negative weights?', sol=self)
elif node_id not in seen or dist < seen[node_id]: # Check min dist.
seen[node_id] = dist # Update dist.
index = self.nodes[node_id]['index'] # Node index.
# Virtual distance.
vd = (w_wait_in + int(wait_in), node_id, self.index + index)
# Add to heapq.
heapq.heappush(fringe, (dist, vd, (node_id, self)))
return True # The node is visible.
return False # The node is not visible.
def _remove_unused_nodes(self):
"""
Removes unused function and sub-dispatcher nodes.
"""
# Namespace shortcuts.
nodes, wf_remove_node = self.nodes, self.workflow.remove_node
add_visited, succ = self._visited.add, self.workflow.succ
# Remove unused function and sub-dispatcher nodes.
for n in (set(self._wf_pred) - set(self._visited)):
node_type = nodes[n]['type'] # Node type.
if node_type == 'data':
continue # Skip data node.
if node_type == 'dispatcher' and succ[n]:
add_visited(n) # Add to visited nodes.
i = self.index + nodes[n]['index']
self.sub_sol[i]._remove_unused_nodes()
continue # Skip sub-dispatcher node with outputs.
wf_remove_node(n) # Remove unused node.
def _init_sub_dsp(self, dsp, fringe, outputs, no_call, initial_dist, index,
full_name):
"""
Initialize the dispatcher as sub-dispatcher and update the fringe.
:param fringe:
Heapq of closest available nodes.
:type fringe: list[(float | int, bool, (str, Dispatcher)]
:param outputs:
Ending data nodes.
:type outputs: list[str], iterable
:param no_call:
If True data node estimation function is not used.
:type no_call: bool
"""
# Initialize as sub-dispatcher.
sol = self.__class__(
dsp, {}, outputs, False, None, None, no_call, False,
wait_in=self._wait_in.get(dsp, None), index=self.index + index,
full_name=full_name
)
sol.sub_sol = self.sub_sol
for f in sol.fringe: # Update the fringe.
item = (initial_dist + f[0], (2,) + f[1][1:], f[-1])
heapq.heappush(fringe, item)
return sol
def _see_remote_link_node(self, node_id, fringe=None, dist=None,
check_dsp=lambda x: True):
"""
See data remote links of the node (set output to remote links).
:param node_id:
Node id.
:type node_id: str
:param fringe:
Heapq of closest available nodes.
:type fringe: list[(float | int, bool, (str, Dispatcher)]
:param dist:
Distance from the starting node.
:type dist: float, int
:param check_dsp:
A function to check if the remote dispatcher is ok.
:type check_dsp: (Dispatcher) -> bool
"""
# Namespace shortcut.
node, p_id, c_i = self.nodes[node_id], self.index[:-1], self.index[-1:]
if node['type'] == 'data' and p_id and check_dsp(p_id):
sol = self.sub_sol[self.index[:-1]] # Get parent solution.
for dsp_id, n in sol.dsp.nodes.items():
if n['index'] == c_i and node_id in n.get('outputs', {}):
value = self[node_id] # Get data output.
for n_id in stlp(n['outputs'][node_id]):
# Node has been visited or inp do not coincide with out.
if not (n_id in sol._visited or
sol.workflow.has_edge(n_id, dsp_id)):
# Donate the result to the child.
sol._wf_add_edge(dsp_id, n_id, value=value)
if fringe is not None:
# See node.
sol._see_node(n_id, fringe, dist, w_wait_in=2)
break
def _check_sub_dsp_domain(self, dsp_id, node, pred, kw):
if 'input_domain' in node and not (self.no_domain or self.no_call):
try:
adict = {k: v['value'] for k, v in pred.items()}
if node.get('await_domain', True):
adict = {k: await_result(v) for k, v in adict.items()}
kw['solution_domain'] = s = bool(node['input_domain'](adict))
return s
except Exception as ex: # Some error occurs.
msg = "Failed SUB-DSP DOMAIN '%s' due to:\n %r"
self._warning(msg, dsp_id, ex)
return False
def _set_sub_dsp_node_input(self, node_id, dsp_id, fringe, check_cutoff,
no_call, initial_dist):
"""
Initializes the sub-dispatcher and set its inputs.
:param node_id:
Input node to set.
:type node_id: str
:param dsp_id:
Sub-dispatcher node id.
:type dsp_id: str
:param fringe:
Heapq of closest available nodes.
:type fringe: list[(float | int, bool, (str, Dispatcher)]
:param check_cutoff:
Check the cutoff limit.
:type check_cutoff: (int | float) -> bool
:param no_call:
If True data node estimation function is not used.
:type no_call: bool
:param initial_dist:
Distance to reach the sub-dispatcher node.
:type initial_dist: int, float
:return:
If the input have been set.
:rtype: bool
"""
# Namespace shortcuts.
node = self.nodes[dsp_id]
dsp, pred = node['function'], self._wf_pred[dsp_id]
distances, sub_sol = self.dist, self.sub_sol
iv_nodes = [node_id] # Nodes do be added as initial values.
self._meet[dsp_id] = initial_dist # Set view distance.
# Check if inputs are satisfied.
if self.check_wait_in(node['wait_inputs'], dsp_id):
return False # Pass the node
if dsp_id not in distances:
kw = {}
dom = self._check_sub_dsp_domain(dsp_id, node, pred, kw)
if dom is True:
iv_nodes = pred # Args respect the domain.
elif dom is False:
return False
# Initialize the sub-dispatcher.
sub_sol[self.index + node['index']] = sol = self._init_sub_dsp(
dsp, fringe, node['outputs'], no_call, initial_dist,
node['index'], self.full_name + (dsp_id,)
)
self.workflow.add_node(dsp_id, solution=sol, **kw)
distances[dsp_id] = initial_dist # Update min distance.
else:
sol = sub_sol[self.index + node['index']]
for n_id in iv_nodes:
# Namespace shortcuts.
val = pred[n_id]
for n in stlp(node['inputs'][n_id]):
# Add initial value to the sub-dispatcher.
sol._add_initial_value(
n, val, initial_dist, fringe, check_cutoff, no_call
)
return True
def _warning(self, msg, node_id, ex, *args, **kwargs):
"""
Handles the error messages.
.. note:: If `self.raises` is True the dispatcher interrupt the dispatch
when an error occur, otherwise it logs a warning.
"""
raises = self.raises(ex) if callable(self.raises) else self.raises
if raises and isinstance(ex, DispatcherError):
ex.update(self)
raise ex
self._errors[node_id] = msg % ((node_id, ex) + args)
node_id = '/'.join(self.full_name + (node_id,))
if raises:
raise DispatcherError(msg, node_id, ex, *args, sol=self, **kwargs)
else:
kwargs['exc_info'] = kwargs.get('exc_info', 1)
log.error(msg, node_id, ex, *args, **kwargs)