Source code for schedula.utils.alg

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
#
# Copyright 2015-2020, Vincenzo Arcidiacono;
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl

"""
It contains basic algorithms, numerical tricks, and data processing tasks.
"""

import collections
from .gen import counter
from .cst import EMPTY, NONE
from .dsp import SubDispatch, bypass, selector, stlp, parent_func, NoSub

__author__ = 'Vincenzo Arcidiacono <vinci1it2000@gmail.com>'


# modified from NetworkX library
[docs]def add_edge_fun(graph): """ Returns a function that adds an edge to the `graph` checking only the out node. :param graph: A directed graph. :type graph: schedula.utils.graph.DiGraph :return: A function that adds an edge to the `graph`. :rtype: callable """ # Namespace shortcut for speed. succ, pred, node = graph.succ, graph.pred, graph.nodes def add_edge(u, v, **attr): if v not in succ: # Add nodes. succ[v], pred[v], node[v] = {}, {}, {} succ[u][v] = pred[v][u] = attr # Add the edge. return add_edge # Returns the function.
[docs]def remove_edge_fun(graph): """ Returns a function that removes an edge from the `graph`. ..note:: The out node is removed if this is isolate. :param graph: A directed graph. :type graph: schedula.utils.graph.DiGraph :return: A function that remove an edge from the `graph`. :rtype: callable """ # Namespace shortcut for speed. rm_edge, rm_node = graph.remove_edge, graph.remove_node succ, pred = graph.succ, graph.pred def remove_edge(u, v): rm_edge(u, v) # Remove the edge. if not (succ[v] or pred[v]): # Check if v is isolate. rm_node(v) # Remove the isolate out node. return remove_edge # Returns the function.
[docs]def get_unused_node_id(graph, initial_guess='unknown', _format='{}<%d>'): """ Finds an unused node id in `graph`. :param graph: A directed graph. :type graph: schedula.utils.graph.DiGraph :param initial_guess: Initial node id guess. :type initial_guess: str, optional :param _format: Format to generate the new node id if the given is already used. :type _format: str, optional :return: An unused node id. :rtype: str """ nodes = graph.nodes # Namespace shortcut for speed. n = counter() # Counter. id_fmt = _format.format(initial_guess.replace('%', '%%')) # Node id format. node_id = initial_guess # Initial guess. while node_id in nodes: # Check if node id is used. node_id = id_fmt % n() # Guess. return node_id # Returns an unused node id.
[docs]def add_func_edges(dsp, fun_id, nodes_bunch, edge_weights=None, input=True, data_nodes=None): """ Adds function node edges. :param dsp: A dispatcher that identifies the model adopted. :type dsp: schedula.Dispatcher :param fun_id: Function node id. :type fun_id: str :param nodes_bunch: A container of nodes which will be iterated through once. :type nodes_bunch: iterable :param edge_weights: Edge weights. :type edge_weights: dict, optional :param input: If True the nodes_bunch are input nodes, otherwise are output nodes. :type input: bool, optional :param data_nodes: Data nodes to be deleted if something fail. :type data_nodes: list :return: List of new data nodes. :rtype: list """ # Namespace shortcut for speed. add_edge = _add_edge_dmap_fun(dsp.dmap, edge_weights) node, add_data = dsp.dmap.nodes, dsp.add_data remove_nodes = dsp.dmap.remove_nodes_from # Define an error message. msg = 'Invalid %sput id: {} is not a data node' % ['out', 'in'][input] i, j = ('i', 'o') if input else ('o', 'i') data_nodes = data_nodes or [] # Update data nodes. for u in nodes_bunch: # Iterate nodes. try: if node[u]['type'] != 'data': # The node is not a data node. data_nodes.append(fun_id) # Add function id to be removed. remove_nodes(data_nodes) # Remove function and new data nodes. raise ValueError(msg.format(u)) # Raise error. except KeyError: data_nodes.append(add_data(data_id=u)) # Add new data node. add_edge(**{i: u, j: fun_id, 'w': u}) # Add edge. return data_nodes # Return new data nodes.
def _add_edge_dmap_fun(graph, edges_weights=None): """ Adds edge to the dispatcher map. :param graph: A directed graph. :type graph: schedula.utils.graph.DiGraph :param edges_weights: Edge weights. :type edges_weights: dict, optional :return: A function that adds an edge to the `graph`. :rtype: callable """ add = graph.add_edge # Namespace shortcut for speed. if edges_weights is not None: def add_edge(i, o, w): if w in edges_weights: add(i, o, weight=edges_weights[w]) # Weighted edge. else: add(i, o) # Normal edge. else: # noinspection PyUnusedLocal def add_edge(i, o, w): add(i, o) # Normal edge. return add_edge # Returns the function. def _get_node(nodes, node_id, fuzzy=True): """ Returns a dispatcher node that match the given node id. :param nodes: Dispatcher nodes. :type nodes: dict :param node_id: Node id. :type node_id: str :return: The dispatcher node and its id. :rtype: (str, dict) """ try: return node_id, nodes[node_id] # Return dispatcher node and its id. except KeyError as ex: if fuzzy: it = sorted(nodes.items()) n = next(((k, v) for k, v in it if node_id in k), EMPTY) if n is not EMPTY: return n raise ex def _nodes(alist): return set(sum(map(stlp, alist), ())) def _get_sub_inp(attr, pred): inp = attr['inputs'] return set(sum(map(stlp, (v for k, v in inp.items() if k in pred)), ())) def _get_sub_out(attr, succ): out = attr['outputs'] return {k for k, v in out.items() if any(i in succ for i in stlp(v))} def _update_io(a, pred, succ, parent=True): inp_k, out_k = ['inputs', 'outputs'][::int(parent) * 2 - 1] a[inp_k] = selector(set(a[inp_k]).intersection(pred), a[inp_k]) o = {k: tuple(j for j in stlp(v) if j in succ) for k, v in a[out_k].items()} a[out_k] = {k: bypass(*v) for k, v in o.items() if v} if parent: nds = set(a['function'].data_nodes) _update_io(a, nds, nds, parent=False) return set(pred) - set(a[inp_k]), set(succ) - _nodes(a[out_k].values()) def _search_node_description(dsp, node_id, what='description'): dsp = getattr(dsp, 'dsp', dsp) from .des import search_node_description return search_node_description(node_id, dsp.nodes[node_id], dsp, what)
[docs]def get_sub_node(dsp, path, node_attr='auto', solution=NONE, _level=0, _dsp_name=NONE): """ Returns a sub node of a dispatcher. :param dsp: A dispatcher object or a sub dispatch function. :type dsp: schedula.Dispatcher | SubDispatch :param path: A sequence of node ids or a single node id. Each id identifies a sub-level node. :type path: tuple, str :param node_attr: Output node attr. If the searched node does not have this attribute, all its attributes are returned. When 'auto', returns the "default" attributes of the searched node, which are: - for data node: its output, and if not exists, all its attributes. - for function and sub-dispatcher nodes: the 'function' attribute. :type node_attr: str | None :param solution: Parent Solution. :type solution: schedula.utils.Solution :param _level: Path level. :type _level: int :param _dsp_name: dsp name to show when the function raise a value error. :type _dsp_name: str :return: A sub node of a dispatcher and its path. :rtype: dict | object, tuple[str] **Example**: .. dispatcher:: o :opt: graph_attr={'ratio': '1'}, depth=-1 :code: >>> from schedula import Dispatcher >>> s_dsp = Dispatcher(name='Sub-dispatcher') >>> def fun(a, b): ... return a + b ... >>> s_dsp.add_function('a + b', fun, ['a', 'b'], ['c']) 'a + b' >>> dispatch = SubDispatch(s_dsp, ['c'], output_type='dict') >>> dsp = Dispatcher(name='Dispatcher') >>> dsp.add_function('Sub-dispatcher', dispatch, ['a'], ['b']) 'Sub-dispatcher' >>> o = dsp.dispatch(inputs={'a': {'a': 3, 'b': 1}}) ... Get the sub node 'c' output or type:: >>> get_sub_node(dsp, ('Sub-dispatcher', 'c')) (4, ('Sub-dispatcher', 'c')) >>> get_sub_node(dsp, ('Sub-dispatcher', 'c'), node_attr='type') ('data', ('Sub-dispatcher', 'c')) Get the sub-dispatcher output: .. dispatcher:: sol :opt: graph_attr={'ratio': '1'}, depth=-1 :code: >>> sol, p = get_sub_node(dsp, ('Sub-dispatcher',), node_attr='output') >>> sol, p (Solution([('a', 3), ('b', 1), ('c', 4)]), ('Sub-dispatcher',)) """ path = list(path) if isinstance(dsp, SubDispatch): # Take the dispatcher obj. dsp = dsp.dsp if _dsp_name is NONE: # Set origin dispatcher name for warning purpose. _dsp_name = dsp.name if solution is NONE: # Set origin dispatcher name for warning purpose. solution = dsp.solution node_id = path[_level] # Node id at given level. try: node_id, node = _get_node(dsp.nodes, node_id) # Get dispatcher node. path[_level] = node_id except KeyError: if _level == len(path) - 1 and node_attr in ('auto', 'output') \ and solution is not EMPTY: try: # Get dispatcher node. node_id, node = _get_node(solution, node_id, False) path[_level] = node_id return node, tuple(path) except KeyError: pass msg = 'Path %s does not exist in %s dispatcher.' % (path, _dsp_name) raise ValueError(msg) _level += 1 # Next level. if _level < len(path): # Is not path leaf?. try: if node['type'] in ('function', 'dispatcher'): try: solution = solution.workflow.nodes[node_id]['solution'] except (KeyError, AttributeError): solution = EMPTY dsp = parent_func(node['function']) # Get parent function. else: raise KeyError except KeyError: msg = 'Node of path %s at level %i is not a function or ' \ 'sub-dispatcher node of %s ' \ 'dispatcher.' % (path, _level, _dsp_name) raise ValueError(msg) # Continue the node search. return get_sub_node(dsp, path, node_attr, solution, _level, _dsp_name) else: data, sol = EMPTY, solution # Return the sub node. if node_attr == 'auto' and node['type'] != 'data': # Auto: function. node_attr = 'function' elif node_attr == 'auto' and sol is not EMPTY and node_id in sol: data = sol[node_id] # Auto: data output. elif node_attr == 'output' and node['type'] != 'data': data = sol.workflow.nodes[node_id]['solution'] elif node_attr == 'output' and node['type'] == 'data': data = sol[node_id] elif node_attr == 'description': # Search and return node description. data = _search_node_description(dsp, node_id)[0] elif node_attr == 'value_type' and node['type'] == 'data': # Search and return data node value's type. data = _search_node_description(dsp, node_id, node_attr)[0] elif node_attr == 'default_value': data = dsp.default_values[node_id] elif node_attr == 'dsp': data = dsp elif node_attr == 'sol': data = sol if data is EMPTY: data = node.get(node_attr, node) return data, tuple(path) # Return the data
[docs]class DspPipe(collections.OrderedDict): def __repr__(self): return "<%s instance at %s>" % (self.__class__.__name__, id(self))
[docs]def get_full_pipe(sol, base=()): """ Returns the full pipe of a dispatch run. :param sol: A Solution object. :type sol: schedula.utils.Solution :param base: Base node id. :type base: tuple[str] :return: Full pipe of a dispatch run. :rtype: DspPipe """ pipe, i = DspPipe(), len(base) for p in sol._pipe: n, s = p[-1] d = s.dsp p = {'task': p} if n in s._errors: p['error'] = s._errors[n] node_id = s.full_name + (n,) assert base == node_id[:i], '%s != %s' % (node_id[:i], base) n_id = node_id[i:] n, path = d.get_node(n, node_attr=None) if n['type'] == 'function' and 'function' in n: try: sub_sol = s.workflow.nodes[path[-1]]['solution'] if isinstance(parent_func(n['function']), NoSub): node_id = () sp = get_full_pipe(sub_sol, base=node_id) if sp: p['sub_pipe'] = sp except KeyError: pass pipe[bypass(*n_id)] = p return pipe
def _sort_sk_wait_in(sol): c = counter() def _get_sk_wait_in(s): w = set() _l = [] for n, a in s.dsp.sub_dsp_nodes.items(): if 'function' in a and s.index + a['index'] in s.sub_sol: sub_sol = s.sub_sol[s.index + a['index']] n_d, ll = _get_sk_wait_in(sub_sol) _l += ll wi = {k for k, v in sub_sol._wait_in.items() if v is True} n_d = n_d.union(wi) o = a['outputs'] w = w.union([o[k] for k in set(o).intersection(n_d)]) # Nodes to be visited. wi = {k for k, v in s._wait_in.items() if v is True} n_d = (set(s.workflow.nodes.keys()) - s._visited) - w n_d = n_d.union(s._visited.intersection(wi)) wi = n_d.intersection(wi) _l += [(s._meet.get(k, float('inf')), k, c(), s._wait_in) for k in wi] return set(n_d), _l return sorted(_get_sk_wait_in(sol)[1]) def _union_workflow(sol, node_id=None, bfs=None): if node_id is not None: j = bfs[node_id] = bfs.get(node_id, {NONE: set()}) else: j = bfs or {NONE: set()} j[NONE].update(sol.workflow.edges) for n, a in sol.dsp.sub_dsp_nodes.items(): if 'function' in a: s = sol.sub_sol.get(sol.index + a['index'], None) if s: _union_workflow(s, node_id=n, bfs=j) return j def _convert_bfs(bfs): from .graph import DiGraph g = DiGraph() g.add_edges_from(bfs[NONE]) bfs[NONE] = g for k, v in bfs.items(): if k is not NONE: _convert_bfs(v) return bfs