#!/usr/bin/env python
# -*- coding: UTF-8 -*-
#
# Copyright 2015-2020, Vincenzo Arcidiacono;
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl
"""
It contains basic algorithms, numerical tricks, and data processing tasks.
"""
from .gen import counter
from .cst import EMPTY, NONE
from .dsp import SubDispatch, bypass, selector, stlp, parent_func
import collections
__author__ = 'Vincenzo Arcidiacono <vinci1it2000@gmail.com>'
# modified from NetworkX library
[docs]def add_edge_fun(graph):
"""
Returns a function that adds an edge to the `graph` checking only the out
node.
:param graph:
A directed graph.
:type graph: networkx.classes.digraph.DiGraph
:return:
A function that adds an edge to the `graph`.
:rtype: callable
"""
# Namespace shortcut for speed.
succ, pred, node = graph._succ, graph._pred, graph._node
def add_edge(u, v, **attr):
if v not in succ: # Add nodes.
succ[v], pred[v], node[v] = {}, {}, {}
succ[u][v] = pred[v][u] = attr # Add the edge.
return add_edge # Returns the function.
[docs]def remove_edge_fun(graph):
"""
Returns a function that removes an edge from the `graph`.
..note:: The out node is removed if this is isolate.
:param graph:
A directed graph.
:type graph: networkx.classes.digraph.DiGraph
:return:
A function that remove an edge from the `graph`.
:rtype: callable
"""
# Namespace shortcut for speed.
rm_edge, rm_node = graph.remove_edge, graph.remove_node
from networkx import is_isolate
def remove_edge(u, v):
rm_edge(u, v) # Remove the edge.
if is_isolate(graph, v): # Check if v is isolate.
rm_node(v) # Remove the isolate out node.
return remove_edge # Returns the function.
[docs]def get_unused_node_id(graph, initial_guess='unknown', _format='{}<%d>'):
"""
Finds an unused node id in `graph`.
:param graph:
A directed graph.
:type graph: networkx.classes.digraph.DiGraph
:param initial_guess:
Initial node id guess.
:type initial_guess: str, optional
:param _format:
Format to generate the new node id if the given is already used.
:type _format: str, optional
:return:
An unused node id.
:rtype: str
"""
has_node = graph.has_node # Namespace shortcut for speed.
n = counter() # Counter.
id_fmt = _format.format(initial_guess.replace('%', '%%')) # Node id format.
node_id = initial_guess # Initial guess.
while has_node(node_id): # Check if node id is used.
node_id = id_fmt % n() # Guess.
return node_id # Returns an unused node id.
[docs]def add_func_edges(dsp, fun_id, nodes_bunch, edge_weights=None, input=True,
data_nodes=None):
"""
Adds function node edges.
:param dsp:
A dispatcher that identifies the model adopted.
:type dsp: schedula.Dispatcher
:param fun_id:
Function node id.
:type fun_id: str
:param nodes_bunch:
A container of nodes which will be iterated through once.
:type nodes_bunch: iterable
:param edge_weights:
Edge weights.
:type edge_weights: dict, optional
:param input:
If True the nodes_bunch are input nodes, otherwise are output nodes.
:type input: bool, optional
:param data_nodes:
Data nodes to be deleted if something fail.
:type data_nodes: list
:return:
List of new data nodes.
:rtype: list
"""
# Namespace shortcut for speed.
add_edge = _add_edge_dmap_fun(dsp.dmap, edge_weights)
node, add_data = dsp.dmap.nodes, dsp.add_data
remove_nodes = dsp.dmap.remove_nodes_from
# Define an error message.
msg = 'Invalid %sput id: {} is not a data node' % ['out', 'in'][input]
i, j = ('i', 'o') if input else ('o', 'i')
data_nodes = data_nodes or [] # Update data nodes.
for u in nodes_bunch: # Iterate nodes.
try:
if node[u]['type'] != 'data': # The node is not a data node.
data_nodes.append(fun_id) # Add function id to be removed.
remove_nodes(data_nodes) # Remove function and new data nodes.
raise ValueError(msg.format(u)) # Raise error.
except KeyError:
data_nodes.append(add_data(data_id=u)) # Add new data node.
add_edge(**{i: u, j: fun_id, 'w': u}) # Add edge.
return data_nodes # Return new data nodes.
def _add_edge_dmap_fun(graph, edges_weights=None):
"""
Adds edge to the dispatcher map.
:param graph:
A directed graph.
:type graph: networkx.classes.digraph.DiGraph
:param edges_weights:
Edge weights.
:type edges_weights: dict, optional
:return:
A function that adds an edge to the `graph`.
:rtype: callable
"""
add = graph.add_edge # Namespace shortcut for speed.
if edges_weights is not None:
def add_edge(i, o, w):
if w in edges_weights:
add(i, o, weight=edges_weights[w]) # Weighted edge.
else:
add(i, o) # Normal edge.
else:
# noinspection PyUnusedLocal
def add_edge(i, o, w):
add(i, o) # Normal edge.
return add_edge # Returns the function.
def _get_node(nodes, node_id, fuzzy=True):
"""
Returns a dispatcher node that match the given node id.
:param nodes:
Dispatcher nodes.
:type nodes: dict
:param node_id:
Node id.
:type node_id: str
:return:
The dispatcher node and its id.
:rtype: (str, dict)
"""
try:
return node_id, nodes[node_id] # Return dispatcher node and its id.
except KeyError as ex:
if fuzzy:
it = sorted(nodes.items())
n = next(((k, v) for k, v in it if node_id in k), EMPTY)
if n is not EMPTY:
return n
raise ex
def _nodes(alist):
return set(sum(map(stlp, alist), ()))
def _get_sub_inp(attr, pred):
inp = attr['inputs']
return set(sum(map(stlp, (v for k, v in inp.items() if k in pred)), ()))
def _get_sub_out(attr, succ):
out = attr['outputs']
return {k for k, v in out.items() if any(i in succ for i in stlp(v))}
def _update_io(a, pred, succ, parent=True):
inp_k, out_k = ['inputs', 'outputs'][::int(parent) * 2 - 1]
a[inp_k] = selector(set(a[inp_k]).intersection(pred), a[inp_k])
o = {k: tuple(j for j in stlp(v) if j in succ)
for k, v in a[out_k].items()}
a[out_k] = {k: bypass(*v) for k, v in o.items() if v}
if parent:
nds = set(a['function'].data_nodes)
_update_io(a, nds, nds, parent=False)
return set(pred) - set(a[inp_k]), set(succ) - _nodes(a[out_k].values())
def _search_node_description(dsp, node_id, what='description'):
dsp = getattr(dsp, 'dsp', dsp)
from .des import search_node_description
return search_node_description(node_id, dsp.nodes[node_id], dsp, what)
[docs]def get_sub_node(dsp, path, node_attr='auto', solution=NONE, _level=0,
_dsp_name=NONE):
"""
Returns a sub node of a dispatcher.
:param dsp:
A dispatcher object or a sub dispatch function.
:type dsp: schedula.Dispatcher | SubDispatch
:param path:
A sequence of node ids or a single node id. Each id identifies a
sub-level node.
:type path: tuple, str
:param node_attr:
Output node attr.
If the searched node does not have this attribute, all its attributes
are returned.
When 'auto', returns the "default" attributes of the searched node,
which are:
- for data node: its output, and if not exists, all its attributes.
- for function and sub-dispatcher nodes: the 'function' attribute.
:type node_attr: str | None
:param solution:
Parent Solution.
:type solution: schedula.utils.Solution
:param _level:
Path level.
:type _level: int
:param _dsp_name:
dsp name to show when the function raise a value error.
:type _dsp_name: str
:return:
A sub node of a dispatcher and its path.
:rtype: dict | object, tuple[str]
**Example**:
.. dispatcher:: o
:opt: graph_attr={'ratio': '1'}, depth=-1
:code:
>>> from schedula import Dispatcher
>>> s_dsp = Dispatcher(name='Sub-dispatcher')
>>> def fun(a, b):
... return a + b
...
>>> s_dsp.add_function('a + b', fun, ['a', 'b'], ['c'])
'a + b'
>>> dispatch = SubDispatch(s_dsp, ['c'], output_type='dict')
>>> dsp = Dispatcher(name='Dispatcher')
>>> dsp.add_function('Sub-dispatcher', dispatch, ['a'], ['b'])
'Sub-dispatcher'
>>> o = dsp.dispatch(inputs={'a': {'a': 3, 'b': 1}})
...
Get the sub node 'c' output or type::
>>> get_sub_node(dsp, ('Sub-dispatcher', 'c'))
(4, ('Sub-dispatcher', 'c'))
>>> get_sub_node(dsp, ('Sub-dispatcher', 'c'), node_attr='type')
('data', ('Sub-dispatcher', 'c'))
Get the sub-dispatcher output:
.. dispatcher:: sol
:opt: graph_attr={'ratio': '1'}, depth=-1
:code:
>>> sol, p = get_sub_node(dsp, ('Sub-dispatcher',), node_attr='output')
>>> sol, p
(Solution([('a', 3), ('b', 1), ('c', 4)]), ('Sub-dispatcher',))
"""
path = list(path)
if isinstance(dsp, SubDispatch): # Take the dispatcher obj.
dsp = dsp.dsp
if _dsp_name is NONE: # Set origin dispatcher name for warning purpose.
_dsp_name = dsp.name
if solution is NONE: # Set origin dispatcher name for warning purpose.
solution = dsp.solution
node_id = path[_level] # Node id at given level.
try:
node_id, node = _get_node(dsp.nodes, node_id) # Get dispatcher node.
path[_level] = node_id
except KeyError:
if _level == len(path) - 1 and node_attr in ('auto', 'output') \
and solution is not EMPTY:
try:
# Get dispatcher node.
node_id, node = _get_node(solution, node_id, False)
path[_level] = node_id
return node, tuple(path)
except KeyError:
pass
msg = 'Path %s does not exist in %s dispatcher.' % (path, _dsp_name)
raise ValueError(msg)
_level += 1 # Next level.
if _level < len(path): # Is not path leaf?.
try:
if node['type'] in ('function', 'dispatcher'):
try:
solution = solution.workflow.nodes[node_id]['solution']
except (KeyError, AttributeError):
solution = EMPTY
dsp = parent_func(node['function']) # Get parent function.
else:
raise KeyError
except KeyError:
msg = 'Node of path %s at level %i is not a function or ' \
'sub-dispatcher node of %s ' \
'dispatcher.' % (path, _level, _dsp_name)
raise ValueError(msg)
# Continue the node search.
return get_sub_node(dsp, path, node_attr, solution, _level, _dsp_name)
else:
data, sol = EMPTY, solution
# Return the sub node.
if node_attr == 'auto' and node['type'] != 'data': # Auto: function.
node_attr = 'function'
elif node_attr == 'auto' and sol is not EMPTY and node_id in sol:
data = sol[node_id] # Auto: data output.
elif node_attr == 'output' and node['type'] != 'data':
data = sol.workflow.nodes[node_id]['solution']
elif node_attr == 'output' and node['type'] == 'data':
data = sol[node_id]
elif node_attr == 'description': # Search and return node description.
data = _search_node_description(dsp, node_id)[0]
elif node_attr == 'value_type' and node['type'] == 'data':
# Search and return data node value's type.
data = _search_node_description(dsp, node_id, node_attr)[0]
elif node_attr == 'default_value':
data = dsp.default_values[node_id]
elif node_attr == 'dsp':
data = dsp
elif node_attr == 'sol':
data = sol
if data is EMPTY:
data = node.get(node_attr, node)
return data, tuple(path) # Return the data
[docs]class DspPipe(collections.OrderedDict):
def __repr__(self):
return "<%s instance at %s>" % (self.__class__.__name__, id(self))
[docs]def get_full_pipe(sol, base=()):
"""
Returns the full pipe of a dispatch run.
:param sol:
A Solution object.
:type sol: schedula.utils.Solution
:param base:
Base node id.
:type base: tuple[str]
:return:
Full pipe of a dispatch run.
:rtype: DspPipe
"""
pipe, i = DspPipe(), len(base)
for p in sol._pipe:
n, s = p[-1]
d = s.dsp
p = {'task': p}
if n in s._errors:
p['error'] = s._errors[n]
node_id = s.full_name + (n,)
assert base == node_id[:i], '%s != %s' % (node_id[:i], base)
n_id = node_id[i:]
n, path = d.get_node(n, node_attr=None)
if n['type'] == 'function' and 'function' in n:
try:
sub_sol = s.workflow.nodes[path[-1]]['solution']
sp = get_full_pipe(sub_sol, base=node_id)
if sp:
p['sub_pipe'] = sp
except KeyError:
pass
pipe[bypass(*n_id)] = p
return pipe
def _sort_sk_wait_in(sol):
c = counter()
def _get_sk_wait_in(s):
w = set()
_l = []
for n, a in s.dsp.sub_dsp_nodes.items():
if 'function' in a and s.index + a['index'] in s.sub_sol:
sub_sol = s.sub_sol[s.index + a['index']]
n_d, ll = _get_sk_wait_in(sub_sol)
_l += ll
wi = {k for k, v in sub_sol._wait_in.items() if v is True}
n_d = n_d.union(wi)
o = a['outputs']
w = w.union([o[k] for k in set(o).intersection(n_d)])
# Nodes to be visited.
wi = {k for k, v in s._wait_in.items() if v is True}
n_d = (set(s.workflow.nodes.keys()) - s._visited) - w
n_d = n_d.union(s._visited.intersection(wi))
wi = n_d.intersection(wi)
_l += [(s._meet.get(k, float('inf')), k, c(), s._wait_in) for k in wi]
return set(n_d), _l
return sorted(_get_sk_wait_in(sol)[1])
def _union_workflow(sol, node_id=None, bfs=None):
if node_id is not None:
j = bfs[node_id] = bfs.get(node_id, {NONE: set()})
else:
j = bfs or {NONE: set()}
j[NONE].update(sol.workflow.edges())
for n, a in sol.dsp.sub_dsp_nodes.items():
if 'function' in a:
s = sol.sub_sol.get(sol.index + a['index'], None)
if s:
_union_workflow(s, node_id=n, bfs=j)
return j
def _convert_bfs(bfs):
from networkx import DiGraph
g = DiGraph()
g.add_edges_from(bfs[NONE])
bfs[NONE] = g
for k, v in bfs.items():
if k is not NONE:
_convert_bfs(v)
return bfs