Source code for indra.explanation.pathfinding.pathfinding

__all__ = ['shortest_simple_paths', 'bfs_search', 'find_sources',
           'get_path_iter', 'bfs_search_multiple_nodes',
           '_bidirectional_shortest_path', '_bidirectional_pred_succ',
           'open_dijkstra_search']
import sys
import logging
from collections import deque, OrderedDict
from copy import deepcopy
from typing import Callable, List, Tuple, Set, Optional, Generator

import networkx as nx
import networkx.algorithms.simple_paths as simple_paths

from numpy import log as ln

from .util import get_sorted_neighbors, Node, Edge, EdgeFilter, SendType


logger = logging.getLogger(__name__)


# Copy from networkx.algorithms.simple_paths
# Added ignore_nodes and ignore_edges arguments
[docs]def shortest_simple_paths(G, source, target, weight=None, ignore_nodes=None, ignore_edges=None, hashes=None, ref_counts_function=None, strict_mesh_id_filtering=False, const_c=1, const_tk=10): """Generate all simple paths in the graph G from source to target, starting from shortest ones. A simple path is a path with no repeated nodes. If a weighted shortest path search is to be used, no negative weights are allowed. Parameters ---------- G : NetworkX graph source : node Starting node for path target : node Ending node for path weight : string Name of the edge attribute to be used as a weight. If None all edges are considered to have unit weight. Default value None. ignore_nodes : container of nodes nodes to ignore, optional ignore_edges : container of edges edges to ignore, optional hashes : list hashes specifying (if not empty) allowed edges ref_counts_function : function function counting references and PMIDs of an edge from its statement hashes strict_mesh_id_filtering : bool if true, exclude all edges not relevant to provided hashes const_c : int Constant used in MeSH IDs-based weight calculation const_tk : int Constant used in MeSH IDs-based weight calculation Returns ------- path_generator: generator A generator that produces lists of simple paths, in order from shortest to longest. Raises ------ NetworkXNoPath If no path exists between source and target. NetworkXError If source or target nodes are not in the input graph. NetworkXNotImplemented If the input graph is a Multi[Di]Graph. Examples -------- >>> G = nx.cycle_graph(7) >>> paths = list(nx.shortest_simple_paths(G, 0, 3)) >>> print(paths) [[0, 1, 2, 3], [0, 6, 5, 4, 3]] You can use this function to efficiently compute the k shortest/best paths between two nodes. >>> from itertools import islice >>> def k_shortest_paths(G, source, target, k, weight=None): ... return list(islice(nx.shortest_simple_paths(G, source, target, ... weight=weight), k)) >>> for path in k_shortest_paths(G, 0, 3, 2): ... print(path) [0, 1, 2, 3] [0, 6, 5, 4, 3] Notes ----- This procedure is based on algorithm by Jin Y. Yen [1]_. Finding the first $K$ paths requires $O(KN^3)$ operations. See Also -------- all_shortest_paths shortest_path all_simple_paths References ---------- .. [1] Jin Y. Yen, "Finding the K Shortest Loopless Paths in a Network", Management Science, Vol. 17, No. 11, Theory Series (Jul., 1971), pp. 712-716. """ if source not in G: s = source[0] if isinstance(source, tuple) else source raise nx.NodeNotFound('source node %s not in graph' % s) if target not in G: t = target[0] if isinstance(target, tuple) else target raise nx.NodeNotFound('target node %s not in graph' % t) allowed_edges = [] if hashes: if strict_mesh_id_filtering: length_func = len shortest_path_func = _bidirectional_shortest_path for u, v in G.edges(): if ref_counts_function(G, u, v)[0]: allowed_edges.append((u, v)) else: weight = 'context_weight' def length_func(path): return sum(G.adj[u][v][weight] for (u, v) in zip(path, path[1:])) def shortest_path_func(G, source, target, weight, ignore_nodes, ignore_edges, force_edges): return simple_paths._bidirectional_dijkstra(G, source, target, weight, ignore_nodes, ignore_edges) for u, v, data, in G.edges(data=True): ref_counts, total = \ ref_counts_function(G, u, v) if not ref_counts: ref_counts = 1e-15 data['context_weight'] = \ -const_c * ln(ref_counts / (total + const_tk)) else: if strict_mesh_id_filtering: return [] if weight is None: length_func = len shortest_path_func = _bidirectional_shortest_path else: def length_func(path): return sum(G.adj[u][v][weight] for (u, v) in zip(path, path[1:])) def shortest_path_func(G, source, target, weight, ignore_nodes, ignore_edges, force_edges): return simple_paths._bidirectional_dijkstra(G, source, target, weight, ignore_nodes, ignore_edges) culled_ignored_nodes = set() \ if ignore_nodes is None else set(ignore_nodes) culled_ignored_edges = set() \ if ignore_edges is None else set(ignore_edges) listA = list() listB = simple_paths.PathBuffer() prev_path = None while True: cur_ignore_nodes = culled_ignored_nodes.copy() cur_ignore_edges = culled_ignored_edges.copy() if not prev_path: length, path = shortest_path_func(G, source, target, weight=weight, ignore_nodes=cur_ignore_nodes, ignore_edges=cur_ignore_edges, force_edges=allowed_edges) listB.push(length, path) else: for i in range(1, len(prev_path)): root = prev_path[:i] root_length = length_func(root) for path in listA: if path[:i] == root: cur_ignore_edges.add((path[i - 1], path[i])) try: length, spur = shortest_path_func( G, root[-1], target, ignore_nodes=cur_ignore_nodes, ignore_edges=cur_ignore_edges, weight=weight, force_edges=allowed_edges) path = root[:-1] + spur listB.push(root_length + length, path) except nx.NetworkXNoPath: pass cur_ignore_nodes.add(root[-1]) if listB: path = listB.pop() rcvd_ignore_values = yield path if rcvd_ignore_values is not None: culled_ignored_nodes = culled_ignored_nodes.union( rcvd_ignore_values[0]) culled_ignored_edges = culled_ignored_edges.union( rcvd_ignore_values[1]) listA.append(path) prev_path = path else: break
# Implementation inspired by networkx's # networkx.algorithms.traversal.breadth_first_search::generic_bfs_edges
[docs]def bfs_search_multiple_nodes(g, source_nodes, path_limit=None, **kwargs): """Do breadth first search from each of given nodes and yield paths until path limit is met. Parameters ---------- g : nx.Digraph An nx.DiGraph to search in. Can also be a signed node graph. It is required that node data contains 'ns' (namespace) and edge data contains 'belief'. source_nodes : list[node] List of nodes in the graph to start from. path_limit : int The maximum number of paths to return. Default: no limit. **kwargs : keyword arguments Any kwargs to pass to bfs_search. Yields ------ path : tuple(node) Paths in the bfs search starting from `source`. """ yielded_paths = 0 for n in source_nodes: paths = bfs_search(g, n, path_limit=path_limit, **kwargs) for p in paths: yield p yielded_paths += 1 if path_limit and yielded_paths >= path_limit: break if path_limit and yielded_paths >= path_limit: break
[docs]def get_path_iter(graph, source, target, path_length, loop, dummy_target, filter_func): """Return a generator of paths with path_length cutoff from source to target. Parameters ---------- graph : nx.Digraph An nx.DiGraph to search in. source : node Starting node for path. target : node Ending node for path. path_length : int Maximum depth of the paths. loop : bool Whether the path should be a loop. If True, source is appended to path. dummy_target : bool Whether provided target is a dummy node and should be removed from path filter_func : function or None A function to constrain the search. A function should take a node as a parameter and return True if the node is allowed to be in a path and False otherwise. If None, then no filtering is done. Returns ------- path_generator: generator A generator of the paths between source and target. """ path_iter = simple_paths_with_constraints( graph, source, target, path_length, False, filter_func) try: for p in path_iter: path = deepcopy(p) # Remove common target from a path. if dummy_target: path.remove(target) if loop: path.append(path[0]) # A path should contain at least one edge if len(path) < 2: continue yield path except nx.NetworkXNoPath: pass
[docs]def find_sources(graph, target, sources, filter_func=None): """Get the set of source nodes with paths to the target. Given a common target and a list of sources (or None if test statement subject is None), perform a breadth-first search upstream from the target to determine whether there are any sources that have paths to the target. For efficiency, does not return the full path, but identifies the upstream sources and the length of the path. Parameters ---------- graph : nx.DiGraph A DiGraph with signed nodes to find paths in. target : node The signed node (usually common target node) in the graph to start looking upstream for matching sources. sources : list[node] Signed nodes corresponding to the subject or upstream influence being checked. filter_func : Optional[function] A function to constrain the intermediate nodes in the path. A function should take a node as a parameter and return True if the node is allowed to be in a path and False otherwise. Returns ------- generator of (source, path_length) Yields tuples of source node and path length (int). If there are no paths to any of the given source nodes, the generator is empty. """ # Update filter function to not filter the sources if sources is not None: filter_func = filter_except(filter_func, sources) # First, create a list of visited nodes # Adapted from # networkx.algorithms.traversal.breadth_first_search.bfs_edges visited = set([target]) # Generate list of predecessor nodes with a sign updated according to # the sign of the target node # The queue holds tuples of "parents" (in this case downstream nodes) # and their "children" (in this case their upstream influencers) pred = graph.predecessors(target) if filter_func: pred = filter(filter_func, pred) queue = deque([(target, pred, 0)]) while queue: parent, children, path_length = queue[0] try: # Get the next child in the list child = next(children) # Is this child one of the source nodes we're looking for? If # so, yield it along with path length. # Also make sure that found source is positive if (sources is None or child in sources) and child[1] == 0: logger.debug("Found path to %s from %s with length %d" % (target, child, path_length+1)) yield (child, path_length+1) # Check this child against the visited list. If we haven't # visited it already (accounting for the path to the node), # then add it to the queue. if child not in visited: visited.add(child) pred = graph.predecessors(child) if filter_func: pred = filter(filter_func, pred) queue.append( (child, pred, path_length + 1)) # Once we've finished iterating over the children of the current # node, pop the node off and go to the next one in the queue except StopIteration: queue.popleft() # There was no path; this will produce an empty generator return
def _bidirectional_shortest_path(G, source, target, ignore_nodes=None, ignore_edges=None, weight=None, hashes=None, force_edges=None): """Returns the shortest path between source and target ignoring nodes and edges in the containers ignore_nodes and ignore_edges. This is a custom modification of the standard bidirectional shortest path implementation at networkx.algorithms.unweighted Parameters ---------- G : NetworkX graph source : node starting node for path target : node ending node for path ignore_nodes : container of nodes nodes to ignore, optional ignore_edges : container of edges edges to ignore, optional weight : None This function accepts a weight argument for convenience of shortest_simple_paths function. It will be ignored. force_edges : list list specifying (if not empty) allowed edges Returns ------- path: list List of nodes in a path from source to target. Raises ------ NetworkXNoPath If no path exists between source and target. See Also -------- shortest_path """ # call helper to do the real work results = _bidirectional_pred_succ(G, source, target, ignore_nodes, ignore_edges, force_edges=force_edges) pred, succ, w = results # build path from pred+w+succ path = [] # from w to target while w is not None: path.append(w) w = succ[w] # from source to w w = pred[path[0]] while w is not None: path.insert(0, w) w = pred[w] return len(path), path def _bidirectional_pred_succ(G, source, target, ignore_nodes=None, ignore_edges=None, force_edges=None): """Bidirectional shortest path helper. Returns (pred,succ,w) where pred is a dictionary of predecessors from w to the source, and succ is a dictionary of successors from w to the target. """ # does BFS from both source and target and meets in the middle if ignore_nodes and (source in ignore_nodes or target in ignore_nodes): raise nx.NetworkXNoPath("No path between %s and %s." % (source, target)) if target == source: return ({target: None}, {source: None}, source) # handle either directed or undirected if G.is_directed(): Gpred = G.predecessors Gsucc = G.successors else: Gpred = G.neighbors Gsucc = G.neighbors # support optional nodes filter if ignore_nodes: def filter_iter(nodes): def iterate(v): for w in nodes(v): if w not in ignore_nodes: yield w return iterate Gpred = filter_iter(Gpred) Gsucc = filter_iter(Gsucc) # support optional edges filter if ignore_edges or force_edges: if G.is_directed(): def filter_pred_iter(pred_iter): def iterate(v): if force_edges: for w in pred_iter(v): if (w, v) not in ignore_edges and (w, v)\ in force_edges: yield w else: for w in pred_iter(v): if (w, v) not in ignore_edges: yield w return iterate def filter_succ_iter(succ_iter): def iterate(v): if force_edges: for w in succ_iter(v): if (v, w) not in ignore_edges and (v, w)\ in force_edges: yield w else: for w in succ_iter(v): if (v, w) not in ignore_edges: yield w return iterate Gpred = filter_pred_iter(Gpred) Gsucc = filter_succ_iter(Gsucc) else: def filter_iter(nodes): def iterate(v): if force_edges: for w in nodes(v): if (v, w) not in ignore_edges \ and (w, v) not in ignore_edges \ and (v, w) in force_edges and (w, v)\ in force_edges: yield w else: for w in nodes(v): if (v, w) not in ignore_edges \ and (w, v) not in ignore_edges: yield w return iterate Gpred = filter_iter(Gpred) Gsucc = filter_iter(Gsucc) # predecesssor and successors in search pred = {source: None} succ = {target: None} # initialize fringes, start with forward forward_fringe = [source] reverse_fringe = [target] while forward_fringe and reverse_fringe: if len(forward_fringe) <= len(reverse_fringe): this_level = forward_fringe forward_fringe = [] for v in this_level: for w in Gsucc(v): if w not in pred: forward_fringe.append(w) pred[w] = v if w in succ: # found path return pred, succ, w else: this_level = reverse_fringe reverse_fringe = [] for v in this_level: for w in Gpred(v): if w not in succ: succ[w] = v reverse_fringe.append(w) if w in pred: # found path return pred, succ, w raise nx.NetworkXNoPath("No path between %s and %s." % (source, target)) # This code is adapted from nx.algorithms.simple_paths._all_simple_paths_graph def simple_paths_with_constraints(G, source, target, cutoff=None, allow_shorter=False, filter_func=None): """Find all simple paths between source and target with given constraints. Parameters ---------- G : nx.Digraph An nx.DiGraph to search in. source : node Starting node for path. target : node Ending node for path. cutoff : Optional[int] Maximum (if allow_shorter=False) or desired (if allow_shorter=True) depth of the paths. allow_shorter : Optional[bool] If cutoff is provided, whether to allow paths shorter than cutoff. NOTE: If cutoff is not provided, this is always reset to True. filter_func : Optional[function] A function to constrain the intermediate nodes in the path. A function should take a node as a parameter and return True if the node is allowed to be in a path and False otherwise. Returns ------- path_generator: generator A generator of the paths between source and target. """ if cutoff is None: cutoff = len(G) - 1 # If we didn't have a set cutoff, allow any paths allow_shorter = True # Update filter function to not filter target filter_func = filter_except(filter_func, {target}) visited = OrderedDict.fromkeys([source]) new_nodes = iter(G[source]) if filter_func: new_nodes = filter(filter_func, new_nodes) stack = [new_nodes] while stack: children = stack[-1] child = next(children, None) if child is None: stack.pop() visited.popitem() elif len(visited) < cutoff: # Get paths shorter than cutoff if allowed if allow_shorter and child == target: yield list(visited) + [target] elif child not in visited: visited[child] = None new_nodes = iter(G[child]) if filter_func: new_nodes = filter(filter_func, new_nodes) stack.append(new_nodes) else: # len(visited) == cutoff: # This path will be exactly of cutoff length if child == target or target in children: yield list(visited) + [target] stack.pop() visited.popitem() def filter_except(filter_func, nodes_to_keep): """Update the filter function to keep some nodes. Parameters ---------- filter_func : function A function to constrain the intermediate nodes in the path. A function should take a node as a parameter and return True if the node is allowed to be in a path and False otherwise. nodes_to_keep : iterable A collection of nodes to keep regardless of filter function. Returns ------- new_filter : function Updated filter function that filters out everything according to original filter_func except nodes_to_keep. """ if filter_func is None: return None def new_filter(n): if n in nodes_to_keep: return True return filter_func(n) return new_filter