Source code for navis.graph.graph_utils

#    This script is part of navis (http://www.github.com/navis-org/navis).
#    Copyright (C) 2018 Philipp Schlegel
#
#    This program is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.

import numbers
import warnings

import pandas as pd
import numpy as np
import networkx as nx

from typing import Union, Optional, List, Tuple, Sequence, Dict, Set, overload, Iterable
from typing_extensions import Literal

from pandas.api.types import CategoricalDtype
from scipy.sparse import csgraph, csr_matrix

from .. import graph, utils, config, core, morpho

# Set up logging
logger = config.get_logger(__name__)

__all__ = sorted(['classify_nodes', 'cut_skeleton', 'longest_neurite',
                  'split_into_fragments', 'reroot_skeleton', 'distal_to',
                  'dist_between', 'find_main_branchpoint',
                  'generate_list_of_childs', 'geodesic_matrix',
                  'node_label_sorting',
                  'segment_length', 'rewire_skeleton', 'insert_nodes',
                  'remove_nodes', 'dist_to_root'])


@utils.map_neuronlist(desc='Gen. segments', allow_parallel=True)
def _generate_segments(x: 'core.NeuronObject',
                       weight: Optional[str] = None,
                       return_lengths: bool = False) -> Union[list, Tuple[list, list]]:
    """Generate segments maximizing segment lengths.

    Parameters
    ----------
    x :         TreeNeuron | NeuronList
                May contain multiple neurons.
    weight :    'weight' | None, optional
                If ``"weight"`` use physical, geodesic length to determine
                segment length. If ``None`` use number of nodes (faster).
    return_lengths : bool
                If True, also return lengths of segments according to ``weight``.

    Returns
    -------
    segments :  list
                Segments as list of lists containing node IDs. List is
                sorted by segment lengths.
    lengths :   list
                Length for each segment according to ``weight``. Only provided
                if `return_lengths` is True.

    Examples
    --------
    This is for doctests mostly

    >>> import navis
    >>> n = navis.example_neurons(1)
    >>> unweighted = navis.graph_utils._generate_segments(n)
    >>> weighted = navis.graph_utils._generate_segments(n, weight='weight')

    """
    if not isinstance(x, core.TreeNeuron):
        raise ValueError(f'Expected TreeNeuron, got "{type(x)}"')

    # At this point x is TreeNeuron
    x: core.TreeNeuron

    assert weight in ('weight', None), f'Unable to use weight "{weight}"'
    d = dist_to_root(x, igraph_indices=False, weight=weight)
    endNodeIDs = x.nodes[x.nodes.type == 'end'].node_id.values
    endNodeIDs = sorted(endNodeIDs, key=lambda x: d.get(x, 0), reverse=True)

    if config.use_igraph and x.igraph:
        g: igraph.Graph = x.igraph  # noqa
        # Convert endNodeIDs to indices
        id2ix = dict(zip(x.igraph.vs['node_id'], range(len(x.igraph.vs))))
        endNodeIDs = [id2ix[n] for n in endNodeIDs]
    else:
        g: nx.DiGraph = x.graph

    seen: set = set()
    sequences = []
    for nodeID in endNodeIDs:
        sequence = [nodeID]
        parents = list(g.successors(nodeID))
        while True:
            if not parents:
                break
            parentID = parents[0]
            sequence.append(parentID)
            if parentID in seen:
                break
            seen.add(parentID)
            parents = list(g.successors(parentID))

        if len(sequence) > 1:
            sequences.append(sequence)

    # If igraph, turn indices back to node IDs
    if config.use_igraph and x.igraph:
        ix2id = {v: k for k, v in id2ix.items()}
        sequences = [[ix2id[ix] for ix in s] for s in sequences]

    # Sort sequences by length
    lengths = [d[s[0]] - d[s[-1]] for s in sequences]
    sequences = [x for _, x in sorted(zip(lengths, sequences), reverse=True)]

    if return_lengths:
        return sequences, sorted(lengths, reverse=True)
    else:
        return sequences


def _connected_components(x: Union['core.TreeNeuron', 'core.MeshNeuron']) -> List[Set[int]]:
    """Extract the connected components within a neuron.

    Parameters
    ----------
    x :         TreeNeuron | MeshNeuron

    Returns
    -------
    list
                List containing sets of node/vertex IDs for each subgraph.

    Examples
    --------
    For doctest only

    >>> import navis
    >>> n = navis.example_neurons(1, kind='skeleton')
    >>> cc = navis.graph_utils._connected_components(n)
    >>> m = navis.example_neurons(1, kind='mesh')
    >>> cc = navis.graph_utils._connected_components(m)

    """
    assert isinstance(x, (core.TreeNeuron, core.MeshNeuron))

    if config.use_igraph and x.igraph:
        G: igraph.Graph = x.igraph  # noqa
        # Get the vertex clustering
        vc = G.components(mode='WEAK')
        # Membership maps indices to connected components
        ms = np.array(vc.membership)
        if isinstance(x, core.TreeNeuron):
            # For skeletons we need node IDs
            ids = np.array(G.vs['node_id'])
        else:
            # For MeshNeurons we can use the indices directly
            ids = np.array(G.vs.indices)

        # Extract node IDs/vertex indices for each component
        cc = [ids[ms == i] for i in np.unique(ms)]
    else:
        G: nx.DiGraph = x.graph
        cc = nx.connected_components(G.to_undirected())
        cc = list(cc)

    return cc


def _break_segments(x: 'core.NeuronObject') -> list:
    """Break neuron into small segments connecting ends, branches and root.

    Parameters
    ----------
    x :         TreeNeuron | NeuronList
                May contain multiple neurons.

    Returns
    -------
    list
                Segments as list of lists containing node IDs.

    Examples
    --------
    For doctest only

    >>> import navis
    >>> n = navis.example_neurons(1)
    >>> seg = navis.graph_utils._break_segments(n)

    """
    if isinstance(x, core.NeuronList):
        return [_break_segments(x[i]) for i in range(len(x))]
    elif isinstance(x, core.TreeNeuron):
        pass
    else:
        logger.error('Unexpected datatype: %s' % str(type(x)))
        raise ValueError

    # At this point x is TreeNeuron
    x: core.TreeNeuron

    if x.igraph and config.use_igraph:
        g: Union['igraph.Graph', 'nx.DiGraph'] = x.igraph # noqa
        end = g.vs.select(_indegree=0).indices
        branch = g.vs.select(_indegree_gt=1, _outdegree=1).indices
        root = g.vs.select(_outdegree=0).indices

        # Get seeds
        seeds = branch + end
        # Remove seeds that are also roots (=disconnected single nodes)
        seeds = set(seeds) - set(root)

        # Converting to set speeds up the "parent in stops" check
        stops = set(branch + root)
        seg_list = []
        for s in seeds:
            parent = g.successors(s)[0]
            seg = [s, parent]
            while parent not in stops:
                parent = g.successors(parent)[0]
                seg.append(parent)
            seg_list.append(seg)
        # Translate indices to node IDs
        ix_id = {v: n for v, n in zip(g.vs.indices,
                                      g.vs.get_attribute_values('node_id'))}
        seg_list = [[ix_id[n] for n in s] for s in seg_list]
    else:
        seeds = x.nodes[x.nodes.type.isin(['branch', 'end'])].node_id.values
        stops = x.nodes[x.nodes.type.isin(['branch', 'root'])].node_id.values
        # Converting to set speeds up the "parent in stops" check
        stops = set(stops)
        g = x.graph
        seg_list = []
        for s in seeds:
            parent = next(g.successors(s), None)
            seg = [s, parent]
            while parent not in stops:
                parent = next(g.successors(parent), None)
                seg.append(parent)
            seg_list.append(seg)

    return seg_list


[docs] @utils.lock_neuron def dist_to_root(x: 'core.TreeNeuron', weight=None, igraph_indices: bool = False) -> dict: """Calculate distance to root for each node. Parameters ---------- x : TreeNeuron weight : str, optional Use "weight" if you want geodesic distance and ``None`` if you want node count. igraph_indices : bool Whether to return igraph node indices instead of node IDs. This is mainly used for internal functions. Returns ------- dist : dict Dictionary with root distances. Examples -------- For doctest only >>> import navis >>> n = navis.example_neurons(1) >>> seg = navis.graph.dist_to_root(n) See Also -------- :func:`navis.geodesic_matrix` For distances between all points. """ if not isinstance(x, core.TreeNeuron): raise TypeError(f'Expected TreeNeuron, got {type(x)}') dist = {} for root in x.root: dist.update(nx.shortest_path_length(x.graph, target=root, weight=weight)) # Map node ID to vertex index for igraph if igraph_indices: if not x.igraph: raise ValueError('Neuron does not have an igraph representation.') id2ix = dict(zip(x.igraph.vs['node_id'], range(len(x.igraph.vs)))) dist = {id2ix[k]: v for k, v in dist.items()} return dist
def _edge_count_to_root_old(x: 'core.TreeNeuron') -> dict: """Return a map of nodeID vs number of edges. Starts from the first node that lacks successors (aka the root). """ current_level: List[int] g: Union['igraph.Graph', 'nx.DiGraph'] # noqa if x.igraph and config.use_igraph: g = x.igraph current_level = g.vs(_outdegree=0).indices else: g = x.graph current_level = list(x.root) dist = {} count = 1 next_level: List[Union[str, int]] = [] while current_level: # Consume all elements in current_level while current_level: node = current_level.pop() dist[node] = count next_level.extend(g.predecessors(node)) # Rotate lists (current_level is now empty) current_level, next_level = next_level, current_level # type: ignore count += 1 # Map vertex index to node ID if x.igraph and config.use_igraph: # Grab graph once to avoid overhead from stale checks g = x.igraph dist = {g.vs[k]['node_id']: v for k, v in dist.items()} return dist @utils.map_neuronlist(desc='Classifying', allow_parallel=True) @utils.lock_neuron def _classify_nodes_old(x: 'core.NeuronObject', inplace: bool = True ) -> Optional['core.NeuronObject']: """Classify neuron's nodes into end nodes, branches, slabs or root. Adds ``'type'`` column to ``x.nodes``. Parameters ---------- x : TreeNeuron | NeuronList Neuron(s) whose nodes to classify nodes. inplace : bool, optional If ``False``, nodes will be classified on a copy which is then returned leaving the original neuron unchanged. Returns ------- TreeNeuron/List Examples -------- >>> import navis >>> nl = navis.example_neurons(2) >>> _ = navis.graph.classify_nodes(nl, inplace=True) """ if not inplace: x = x.copy() if not isinstance(x, core.TreeNeuron): raise TypeError(f'Expected TreeNeuron(s), got "{type(x)}"') # At this point x is TreeNeuron x: core.TreeNeuron # Make sure there are nodes to classify if not x.nodes.empty: if config.use_igraph and x.igraph: # Get graph representation of neuron vs = x.igraph.vs # Get branch/end nodes based on their degree of connectivity ends = vs.select(_indegree=0).get_attribute_values('node_id') branches = vs.select(_indegree_gt=1).get_attribute_values('node_id') else: # Get graph representation of neuron g = x.graph # Get branch/end nodes based on their degree of connectivity deg = pd.DataFrame.from_dict(dict(g.degree()), orient='index') # [ n for n in g.nodes if g.degree(n) == 1 ] ends = deg[deg.iloc[:, 0] == 1].index.values # [ n for n in g.nodes if g.degree(n) > 2 ] branches = deg[deg.iloc[:, 0] > 2].index.values # This also resets the column if it already exists. This is important # because an existing column will be categorical and if we try setting # types that didn't previously exist, it will throw exceptions. x.nodes['type'] = 'slab' x.nodes.loc[x.nodes.node_id.isin(ends), 'type'] = 'end' x.nodes.loc[x.nodes.node_id.isin(branches), 'type'] = 'branch' x.nodes.loc[x.nodes.parent_id < 0, 'type'] = 'root' else: x.nodes['type'] = None # Turn into categorical data - saves tons of memory # Note that we have to make sure all categories are set even if they # don't exist (e.g. if a neuron has no branch points) cat_types = CategoricalDtype(categories=["end", "branch", "root", "slab"], ordered=False) x.nodes['type'] = x.nodes['type'].astype(cat_types) return x @utils.map_neuronlist(desc='Classifying', allow_parallel=True) @utils.lock_neuron def classify_nodes(x: "core.NeuronObject", categorical=True, inplace: bool = True): """Classify neuron's nodes into end nodes, branches, slabs or root. Adds a ``'type'`` column to ``x.nodes`` table. Parameters ---------- x : TreeNeuron | NeuronList Neuron(s) whose nodes to classify. categorical : bool If True (default), will use categorical data type which takes up much less memory at a small run-time overhead. inplace : bool, optional If ``False``, nodes will be classified on a copy which is then returned leaving the original neuron unchanged. Returns ------- TreeNeuron/List Examples -------- >>> import navis >>> nl = navis.example_neurons(2) >>> _ = navis.graph.classify_nodes(nl, inplace=True) """ if not inplace: x = x.copy() if not isinstance(x, core.TreeNeuron): raise TypeError(f'Expected TreeNeuron(s), got "{type(x)}"') if x.nodes.empty: x.nodes["type"] = None return x # Make sure there are nodes to classify # Note: I have tried to optimized the s**t out of this, i.e. every # single line of code here has been tested for speed. Do not # change anything unless you know what you're doing! # Turns out that numpy.isin() recently started to complain if the # node_ids are uint64 and the parent_ids are int64 (but strangely # not with 32bit integers). If that's the case we have to convert # the node_ids to int64. node_ids = x.nodes.node_id.values parent_ids = x.nodes.parent_id.values if node_ids.dtype == np.uint64: node_ids = node_ids.astype(np.int64) cl = np.full(len(x.nodes), "slab", dtype="<U6") cl[~np.isin(node_ids, parent_ids)] = "end" bp = x.nodes.parent_id.value_counts() bp = bp.index.values[bp.values > 1] cl[np.isin(node_ids, bp)] = "branch" cl[parent_ids < 0] = "root" if categorical: cl = pd.Categorical(cl, categories=["end", "branch", "root", "slab"], ordered=False) x.nodes["type"] = cl return x # only this combination will return a single bool @overload def distal_to(x: 'core.TreeNeuron', a: Union[str, str], b: Union[str, int], ) -> bool: pass # if above types don't a DataFrame will be returned @overload def distal_to(x: 'core.TreeNeuron', a: Optional[List[Union[str, int]]], b: Optional[Union[str, int, List[Union[str, int]]]], ) -> pd.DataFrame: pass # if above types don't a DataFrame will be returned @overload def distal_to(x: 'core.TreeNeuron', a: Optional[Union[str, int, List[Union[str, int]]]], b: Optional[List[Union[str, int]]], ) -> pd.DataFrame: pass
[docs] @utils.lock_neuron def distal_to(x: 'core.TreeNeuron', a: Optional[Union[str, int, List[Union[str, int]]]] = None, b: Optional[Union[str, int, List[Union[str, int]]]] = None, ) -> Union[bool, pd.DataFrame]: """Check if nodes A are distal to nodes B. Important --------- Please note that if node A is not distal to node B, this does **not** automatically mean it is proximal instead: if nodes are on different branches, they are neither distal nor proximal to one another! To test for this case run a->b and b->a - if both return ``False``, nodes are on different branches. Also: if a and b are the same node, this function will return ``True``! Parameters ---------- x : TreeNeuron a,b : single node ID | list of node IDs | None, optional If no node IDs are provided, will consider all node. Note that for large sets of nodes it might be more efficient to use :func:`navis.geodesic_matrix` (see examples). Returns ------- bool If ``a`` and ``b`` are single node IDs respectively. pd.DataFrame If ``a`` and/or ``b`` are lists of node IDs. Columns and rows (index) represent node IDs. Neurons ``a`` are rows, neurons ``b`` are columns. Examples -------- >>> import navis >>> # Get a neuron >>> x = navis.example_neurons(1) >>> # Get a random node >>> n = x.nodes.iloc[100].node_id >>> # Check all nodes if they are distal or proximal to that node >>> df = navis.distal_to(x, n) >>> # Get the IDs of the nodes that are distal >>> dist = df.loc[n, df.loc[n]].index.values >>> len(dist) 101 For large neurons and/or large sets of `a`/`b` it can be much faster to use `geodesic_matrix` instead: >>> import navis >>> import numpy as np >>> x = navis.example_neurons(1) >>> # Get an all-by-all distal_to >>> df = navis.geodesic_matrix(x, weight=None, directed=True) < np.inf >>> # Get distal_to for specific nodes >>> df = navis.geodesic_matrix(x, weight=None, directed=True) < np.inf >>> # Get distal_to for specific nodes >>> a, b = x.nodes.node_id.values[:100], x.nodes.node_id.values[-100:] >>> dist = navis.geodesic_matrix(x, weight=None, directed=True, from_=a) >>> distal_to = dist[b] < np.inf See Also -------- :func:`navis.geodesic_matrix` Depending on your neuron and how many nodes you're asking for, this function can be considerably faster! See examples. """ if isinstance(x, core.NeuronList) and len(x) == 1: x = x[0] if not isinstance(x, core.TreeNeuron): raise ValueError(f'Please pass a single TreeNeuron, got {type(x)}') # At this point x is TreeNeuron x: core.TreeNeuron if not isinstance(a, type(None)): tnA = utils.make_iterable(a) # Make sure we're dealing with integers tnA = np.unique(tnA).astype(int) else: tnA = x.nodes.node_id.values if not isinstance(b, type(None)): tnB = utils.make_iterable(b) # Make sure we're dealing with integers tnB = np.unique(tnB).astype(int) else: tnB = x.nodes.node_id.values if x.igraph and config.use_igraph: # Map node ID to index id2ix = {n: v for v, n in zip(x.igraph.vs.indices, x.igraph.vs['node_id'])} # Convert node IDs to indices tnA = [id2ix[n] for n in tnA] # type: ignore tnB = [id2ix[n] for n in tnB] # type: ignore # Get path lengths le = x.igraph.shortest_paths(tnA, tnB, mode='OUT') # Converting to numpy array first is ~2X as fast le = np.asarray(le) # Convert to True/False le = le != float('inf') df = pd.DataFrame(le, index=x.igraph.vs[tnA]['node_id'], columns=x.igraph.vs[tnB]['node_id']) else: # Generate empty DataFrame df = pd.DataFrame(np.zeros((len(tnA), len(tnB)), dtype=bool), columns=tnB, index=tnA) # Iterate over all targets # Grab graph once to avoid overhead from stale checks g = x.graph for nB in config.tqdm(tnB, desc='Querying paths', disable=(len(tnB) < 1000) | config.pbar_hide, leave=config.pbar_leave): # Get all paths TO this target. This function returns a dictionary: # { source1 : path_length, source2 : path_length, ... } containing # all nodes distal to this node. paths = nx.shortest_path_length(g, source=None, target=nB) # Check if sources are among our targets df[nB] = [nA in paths for nA in tnA] if df.shape == (1, 1): return df.values[0][0] else: # Return boolean return df
[docs] def geodesic_matrix(x: 'core.NeuronObject', from_: Optional[Iterable[int]] = None, directed: bool = False, weight: Optional[str] = 'weight', limit: Union[float, int] = np.inf ) -> pd.DataFrame: """Generate geodesic ("along-the-arbor") distance matrix between nodes/vertices. Parameters ---------- x : TreeNeuron | MeshNeuron | NeuronList If list, must contain a SINGLE neuron. from_ : list | numpy.ndarray, optional Node IDs (for TreeNeurons) or vertex indices (for MeshNeurons). If provided, will compute distances only FROM this subset to all other nodes/vertices. directed : bool, optional If True, pairs without a child->parent path will be returned with ``distance = "inf"``. Only relevant for ``TreeNeurons``. weight : 'weight' | None, optional If ``weight`` distances are given as physical length. If ``None`` distances is number of nodes. limit : int | float, optional Use to limit distance calculations. Nodes that are not within ``limit`` will have distance ``np.inf``. If neuron has its `.units` set, you can also pass a string such as "10 microns". Returns ------- pd.DataFrame Geodesic distance matrix. Distances in nanometres. See Also -------- :func:`navis.distal_to` Check if a node A is distal to node B. :func:`navis.dist_between` Get point-to-point geodesic distances. :func:`navis.dist_to_root` Distances from all skeleton node to their root(s). Examples -------- Find average geodesic distance between all leaf nodes >>> import navis >>> n = navis.example_neurons(1) >>> # Generate distance matrix >>> m = navis.geodesic_matrix(n) >>> # Subset matrix to leaf nodes >>> leafs = n.nodes[n.nodes.type=='end'].node_id.values >>> l_dist = m.loc[leafs, leafs] >>> # Get mean >>> round(l_dist.mean().mean()) 12983 """ if isinstance(x, core.NeuronList): if len(x) == 1: x = x[0] else: raise ValueError('Cannot process more than a single neuron.') elif not isinstance(x, (core.TreeNeuron, core.MeshNeuron)): raise ValueError(f'Unable to process data of type "{type(x)}"') limit = x.map_units(limit, on_error='raise') # Makes no sense to use directed for MeshNeurons if isinstance(x, core.MeshNeuron): directed = False if x.igraph and config.use_igraph: if isinstance(x, core.TreeNeuron): nodeList = np.array(x.igraph.vs.get_attribute_values('node_id')) else: nodeList = np.arange(len(x.igraph.vs)) # Matrix is ordered by vertex number m = _igraph_to_sparse(x.igraph, weight_attr=weight) else: nodeList = np.array(x.graph.nodes()) if hasattr(nx, 'to_scipy_sparse_matrix'): m = nx.to_scipy_sparse_matrix(x.graph, nodeList, weight=weight) else: m = nx.to_scipy_sparse_array(x.graph, nodeList, weight=weight) if not isinstance(from_, type(None)): from_ = np.unique(utils.make_iterable(from_)) miss = from_[~np.isin(from_, nodeList)].astype(str) if any(miss): raise ValueError(f'Node/vertex IDs not present: {", ".join(miss)}') indices = np.where(np.isin(nodeList, from_))[0] ix = nodeList[indices] else: indices = None ix = nodeList # For some reason csgrpah.dijkstra expects indices/indptr as int32 # igraph seems to do that by default but networkx uses int64 for indices m.indptr = m.indptr.astype('int32', copy=False) m.indices = m.indices.astype('int32', copy=False) dmat = csgraph.dijkstra(m, directed=directed, indices=indices, limit=limit) return pd.DataFrame(dmat, columns=nodeList, index=ix) # type: ignore # no stubs
[docs] @utils.lock_neuron def segment_length(x: 'core.TreeNeuron', segment: List[int]) -> float: """Get length of a linear segment. This function is superfast but has no checks - you must provide a valid segment. Parameters ---------- x : TreeNeuron Neuron to which this segment belongs. segment : list of ints Linear segment as list of node IDs ordered child->parent. Returns ------- length : float See Also -------- :func:`navis.dist_between` If you only know start and end points of the segment. Examples -------- >>> import navis >>> n = navis.example_neurons(1) >>> l = navis.segment_length(n, n.segments[0]) >>> round(l) 56356 """ if not isinstance(x, core.TreeNeuron): raise ValueError(f'Unable to process data of type "{type(x)}"') # Get graph once to avoid overhead from validation - do NOT change this graph = x.graph dist = np.array([graph.edges[(c, p)]['weight'] for c, p in zip(segment[:-1], segment[1:])]) return sum(dist)
[docs] @utils.lock_neuron def dist_between(x: 'core.NeuronObject', a: int, b: int) -> float: """Get the geodesic distance between nodes in nanometers. Parameters ---------- x : TreeNeuron | MeshNeuron | NeuronList If NeuronList must contain only a single neuron. a,b : int Node IDs (for TreeNeurons) or vertex indices (MeshNeurons) to check the distance between. Returns ------- int distance in nm See Also -------- :func:`~navis.distal_to` Check if a node A is distal to node B. :func:`~navis.geodesic_matrix` Get all-by-all geodesic distance matrix. :func:`navis.segment_length` Much faster if you have a linear segment and know all node IDs. Examples -------- >>> import navis >>> n = navis.example_neurons(1) >>> d = navis.dist_between(n, ... n.nodes.node_id.values[0], ... n.nodes.node_id.values[1]) """ if isinstance(x, core.NeuronList): if len(x) == 1: x = x[0] else: raise ValueError(f'Need a single TreeNeuron, got {len(x)}') if isinstance(x, (core.TreeNeuron, core.MeshNeuron)): G: Union['igraph.Graph', # noqa 'nx.DiGraph'] = x.igraph if (x.igraph and config.use_igraph) else x.graph elif isinstance(x, nx.DiGraph): G = x elif 'igraph' in str(type(x.igraph)): # We can't use isinstance here because igraph library might not be installed G = x else: raise ValueError(f'Unable to process data of type {type(x)}') if ((utils.is_iterable(a) and len(a) > 1) # type: ignore # this is just a check or (utils.is_iterable(b) and len(b) > 1)): # type: ignore # this is just a check raise ValueError('Can only process single nodes/vertices. Use ' 'navis.geodesic_matrix instead.') a = utils.make_non_iterable(a) b = utils.make_non_iterable(b) try: _ = int(a) _ = int(b) except BaseException: raise ValueError('a, b need to be node IDs or vertex indices!') # If we're working with network X DiGraph if isinstance(G, nx.DiGraph): return int(nx.algorithms.shortest_path_length(G.to_undirected(as_view=True), a, b, weight='weight')) else: if isinstance(x, core.TreeNeuron): a = G.vs.find(node_id=a) b = G.vs.find(node_id=b) # If not, we're assuming g is an iGraph object return G.shortest_paths(a, b, weights='weight', mode='ALL')[0][0]
[docs] @utils.map_neuronlist(desc='Searching', allow_parallel=True) @utils.meshneuron_skeleton(method='node_to_vertex') def find_main_branchpoint(x: 'core.NeuronObject', method: Union[Literal['longest_neurite'], Literal['betweenness']] = 'betweenness', threshold: float = .95, reroot_soma: bool = False) -> Union[int, List[int]]: """Find main branch point of unipolar (e.g. insect) neurons. Note that this might produce garbage if the neuron is fragmented. Parameters ---------- x : TreeNeuron | NeuronList May contain multiple neurons. method : "longest_neurite" | "centrality" The method to use: - "longest_neurite" assumes that the main branch point is where the two largest branches converge - "betweenness" uses centrality to determine the point which most shortest paths traverse threshold : float [0-1] Sets the cutoff for method "betweenness". Decrease threshold to be more inclusive (useful if the cell body fiber has little bristles), increase to be more stringent (i.e. when the skeleton is very clean). reroot_soma : bool, optional If True, neuron will be rerooted to soma. Returns ------- branch_point : int | list of int Node ID or list of node IDs of the main branch point(s). Examples -------- >>> import navis >>> n = navis.example_neurons(1) >>> navis.find_main_branchpoint(n, reroot_soma=True) 110 >>> # Cut neuron into axon, dendrites and primary neurite tract: >>> # for this we need to cut twice - once at the main branch point >>> # and once at one of its childs >>> child = n.nodes[n.nodes.parent_id == 2066].node_id.values[0] >>> split = navis.cut_skeleton(n, [2066, child]) >>> split # doctest: +SKIP <class 'navis.core.neuronlist.NeuronList'> of 3 neurons type n_nodes n_connectors n_branches n_leafs cable_length soma 0 TreeNeuron 2572 0 170 176 475078.177926 None 1 TreeNeuron 139 0 1 3 89983.511392 [3490] 2 TreeNeuron 3656 0 63 66 648285.745750 None """ utils.eval_param(method, name='method', allowed_values=('longest_neurite', 'betweenness')) if not isinstance(x, core.TreeNeuron): raise TypeError(f'Expected TreeNeuron(s), got "{type(x)}"') # At this point x is TreeNeuron x: core.TreeNeuron # If no branches if x.nodes[x.nodes.type == 'branch'].empty: raise ValueError('Neuron has no branch points.') if reroot_soma and not isinstance(x.soma, type(None)): x = x.reroot(x.soma, inplace=False) if method == 'longest_neurite': G = x.graph # First, find longest path longest = nx.dag_longest_path(G, weight='weight') # Remove longest path # (use subgraph to avoid editing original or copying raph) keep = ~np.isin(G.nodes, longest) G = G.subgraph(np.array(G.nodes)[keep]) # Find second longst path sc_longest = nx.dag_longest_path(G, weight='weight') # Parent of the last node in sc_longest is the common branch point bp = list(x.graph.successors(sc_longest[-1]))[0] else: # Get betweenness for each node x = morpho.betweeness_centrality(x, directed=True, from_='branch_points') # Get branch points with highest centrality high_between = x.branch_points.betweenness >= x.branch_points.betweenness.max() * threshold candidates = x.branch_points[high_between] # If only one nodes just go with it if candidates.shape[0] == 1: bp = candidates.node_id.values[0] else: # If multiple points get the farthest one from the root root_dists = dist_to_root(x) bp = sorted(candidates.node_id.values, key=lambda x: root_dists[x])[-1] # This makes sure we get the same data type as in the node table # -> Network X seems to sometimes convert integers to floats return x.nodes.node_id.dtype.type(bp)
[docs] @utils.meshneuron_skeleton(method='split') def split_into_fragments(x: 'core.NeuronObject', n: int = 2, min_size: Optional[Union[float, str]] = None, reroot_soma: bool = False) -> 'core.NeuronList': """Split neuron into fragments. Cuts are based on longest neurites: the first cut is made where the second largest neurite merges onto the largest neurite, the second cut is made where the third largest neurite merges into either of the first fragments and so on. Parameters ---------- x : TreeNeuron | MeshNeuron | NeuronList Must be a single neuron. n : int, optional Number of fragments to split into. Must be >1. min_size : int | str, optional Minimum size of fragment to be cut off. If too small, will stop cutting. This takes only the longest path in each fragment into account! If the neuron(s), has its `.units` set, you can also pass this as a string such as "10 microns". reroot_soma : bool, optional If True, neuron will be rerooted to soma. Returns ------- NeuronList Examples -------- >>> import navis >>> x = navis.example_neurons(1) >>> # Cut into two fragments >>> cut1 = navis.split_into_fragments(x, n=2) >>> # Cut into fragments of >10 um size >>> cut2 = navis.split_into_fragments(x, n=float('inf'), min_size=10e3) """ if isinstance(x, core.NeuronList): if len(x) == 1: x = x[0] else: raise Exception(f'{x.shape[0]} neurons provided. Please provide ' 'only a single neuron!') if not isinstance(x, core.TreeNeuron): raise TypeError(f'Expected a single TreeNeuron, got "{type(x)}"') if n < 2: raise ValueError('Number of fragments must be at least 2.') # At this point x is TreeNeuron x: core.TreeNeuron min_size = x.map_units(min_size, on_error='raise') if reroot_soma and not isinstance(x.soma, type(None)): x.reroot(x.soma, inplace=True) # Collect nodes of the n longest neurites tn_to_preserve: List[int] = [] fragments = [] i = 0 while i < n: if tn_to_preserve: # Generate fresh graph g = graph.neuron2nx(x) # Remove nodes that we have already preserved g.remove_nodes_from(tn_to_preserve) else: g = x.graph # Get path longest_path = nx.dag_longest_path(g) # Check if fragment is still long enough if min_size: this_length = sum([v for k, v in nx.get_edge_attributes( g, 'weight').items() if k[1] in longest_path]) if this_length <= min_size: break tn_to_preserve += longest_path fragments.append(longest_path) i += 1 # Next, make some virtual cuts and get the complement of nodes for # each fragment graphs = [x.graph.copy()] # Grab graph once to avoide overhead from stale checking g = x.graph for fr in fragments[1:]: this_g = nx.bfs_tree(g, fr[-1], reverse=True) graphs.append(this_g) # Next, we need to remove nodes that are in subsequent graphs from # those graphs for i, g in enumerate(graphs): for g2 in graphs[i + 1:]: g.remove_nodes_from(g2.nodes) # Now make neurons nl = core.NeuronList([morpho.subset_neuron(x, g) for g in graphs]) return nl
[docs] @utils.map_neuronlist(desc='Pruning', allow_parallel=True) @utils.meshneuron_skeleton(method='subset') def longest_neurite(x: 'core.NeuronObject', n: int = 1, reroot_soma: bool = False, from_root: bool = True, inverse: bool = False, inplace: bool = False) -> 'core.NeuronObject': """Return a neuron consisting of only the longest neurite(s). Based on geodesic distances. Parameters ---------- x : TreeNeuron | NeuronList Neuron(s) to prune. n : int | slice Number of longest neurites to preserve. For example: - ``n=1`` keeps the longest neurites - ``n=2`` keeps the two longest neurites - ``n=slice(1, None)`` removes the longest neurite reroot_soma : bool If True, neuron will be rerooted to soma. from_root : bool If True, will look for longest neurite from root. If False, will look for the longest neurite between any two tips. inverse : bool If True, will instead *remove* the longest neurite. inplace : bool If False, copy of the neuron will be trimmed down to longest neurite and returned. Returns ------- TreeNeuron/List Pruned neuron. See Also -------- :func:`~navis.split_into_fragments` Split neuron into fragments based on longest neurites. Examples -------- >>> import navis >>> n = navis.example_neurons(1) >>> # Keep only the longest neurite >>> ln1 = navis.longest_neurite(n, n=1, reroot_soma=True) >>> # Keep the two longest neurites >>> ln2 = navis.longest_neurite(n, n=2, reroot_soma=True) >>> # Keep everything but the longest neurite >>> ln3 = navis.longest_neurite(n, n=slice(1, None), reroot_soma=True) """ if not isinstance(x, core.TreeNeuron): raise TypeError(f'Expected TreeNeuron(s), got "{type(x)}"') if isinstance(n, numbers.Number) and n < 1: raise ValueError('Number of longest neurites to preserve must be >=1') # At this point x is TreeNeuron x: core.TreeNeuron if not inplace: x = x.copy() if not from_root: # Find the two most distal points leafs = x.leafs.node_id.values dists = geodesic_matrix(x, from_=leafs)[leafs] # This might be multiple values mx = np.where(dists == np.max(dists.values)) start = dists.columns[mx[0][0]] # Reroot to one of the nodes that gives the longest distance x.reroot(start, inplace=True) elif reroot_soma and not isinstance(x.soma, type(None)): x.reroot(x.soma, inplace=True) segments = _generate_segments(x, weight='weight') if isinstance(n, (int, np.integer)): tn_to_preserve: List[int] = [tn for s in segments[:n] for tn in s] elif isinstance(n, slice): tn_to_preserve = [tn for s in segments[n] for tn in s] else: raise TypeError(f'Unable to use N of type "{type(n)}"') if not inverse: _ = morpho.subset_neuron(x, tn_to_preserve, inplace=True) else: _ = morpho.subset_neuron(x, ~np.isin(x.nodes.node_id.values, tn_to_preserve), inplace=True) return x
[docs] @utils.lock_neuron def reroot_skeleton(x: 'core.NeuronObject', new_root: Union[int, str], inplace: bool = False) -> 'core.TreeNeuron': """Reroot neuron to new root. Parameters ---------- x : TreeNeuron | NeuronList List must contain only a SINGLE neuron. new_root : int | iterable Node ID(s) of node(s) to reroot to. If multiple new roots are provided, they will be rerooted in sequence. inplace : bool, optional If True the input neuron will be rerooted in place. If False will reroot and return a copy of the original. Returns ------- TreeNeuron Rerooted neuron. See Also -------- :func:`~navis.TreeNeuron.reroot` Quick access to reroot directly from TreeNeuron/List objects. Examples -------- >>> import navis >>> n = navis.example_neurons(1, kind='skeleton') >>> # Reroot neuron to its soma >>> n2 = navis.reroot_skeleton(n, n.soma) """ if isinstance(x, core.NeuronList): if len(x) == 1: x = x[0] else: raise ValueError(f'Expected a single neuron, got {len(x)}') if not isinstance(x, core.TreeNeuron): raise ValueError(f'Unable to reroot object of type "{type(x)}"') # Make new root an iterable new_roots = utils.make_iterable(new_root) # Parse new roots for i, root in enumerate(new_roots): if root is None: raise ValueError('New root can not be <None>') # If new root is a tag, rather than a ID, try finding that node if isinstance(root, str): if x.tags is None: raise ValueError("Neuron does not have tags") if root not in x.tags: raise ValueError(f'#{x.id}: Found no nodes with tag {root}' ' - please double check!') elif len(x.tags[root]) > 1: raise ValueError(f'#{x.id}: Found multiple node with tag ' f'{root} - please double check!') else: new_roots[i] = x.tags[root][0] # At this point x is TreeNeuron x: core.TreeNeuron # At this point new_roots is list of int new_roots: Iterable[int] if not inplace: # Make a copy x = x.copy() # Run this in a separate function so that the lock is applied to copy _ = reroot_skeleton(x, new_root=new_roots, inplace=True) return x # Keep track of node ID dtype nodeid_dtype = x.nodes.node_id.dtype # Go over each new root for new_root in new_roots: # Skip if new root is old root if any(x.root == new_root): continue if x.igraph and config.use_igraph: # Grab graph once to avoid overhead from stale checks g = x.igraph # Prevent warnings in the following code - querying paths between # unreachable nodes will otherwise generate a runtime warning with warnings.catch_warnings(): warnings.simplefilter("ignore") # Find paths to all roots path = g.get_shortest_paths(g.vs.find(node_id=new_root), [g.vs.find(node_id=r) for r in x.root]) epath = g.get_shortest_paths(g.vs.find(node_id=new_root), [g.vs.find(node_id=r) for r in x.root], output='epath') # Extract paths that actually worked (i.e. within a continuous fragment) path = [p for p in path if p][0] epath = [p for p in epath if p][0] edges = [(s, t) for s, t in zip(path[:-1], path[1:])] weights = [g.es[e]['weight'] for e in epath] # Get all weights and append inversed new weights all_weights = g.es['weight'] + weights # Add inverse edges: old_root->new_root g.add_edges([(e[1], e[0]) for e in edges]) # Re-set weights g.es['weight'] = all_weights # Remove new_root->old_root g.delete_edges(edges) # Get degree of old root for later categorisation old_root_deg = len(g.es.select(_target=path[-1])) # Translate path indices to node IDs ix2id = {ix: n for ix, n in zip(g.vs.indices, g.vs.get_attribute_values('node_id'))} path = [ix2id[i] for i in path] else: # Grab graph once to avoid overhead from stale checks g = x.graph # If this NetworkX graph is just an (immutable) view, turn it into a # full, independent graph nx_main_version = '.'.join(nx.__version__.split(".")[:2]) if float(nx_main_version) < 2.2: if isinstance(g, nx.classes.graphviews.ReadOnlyGraph): x._graph_nx = g = nx.DiGraph(g) elif hasattr(g, '_NODE_OK'): x._graph_nx = g = nx.DiGraph(g) elif nx.is_frozen(g): x._graph_nx = g = nx.DiGraph(g) # Walk from new root to old root and remove edges along the way parent = next(g.successors(new_root), None) if not parent: # new_root is already the root continue path = [new_root] weights = [] while parent is not None: weights.append(g[path[-1]][parent]['weight']) g.remove_edge(path[-1], parent) path.append(parent) parent = next(g.successors(parent), None) # Invert path and add weights new_edges = [(path[i + 1], path[i], {'weight': weights[i]}) for i in range(len(path) - 1)] # Add inverted path between old and new root g.add_edges_from(new_edges) # Get degree of old root for later categorisation old_root_deg = g.in_degree(path[-1]) # Set index to node ID for later x.nodes.set_index('node_id', inplace=True) # Propagate changes in graph back to node table # Assign new node type to old root x.nodes.loc[path[1:], 'parent_id'] = path[:-1] if old_root_deg == 1: x.nodes.loc[path[-1], 'type'] = 'slab' elif old_root_deg > 1: x.nodes.loc[path[-1], 'type'] = 'branch' else: x.nodes.loc[path[-1], 'type'] = 'end' # Make new root node type "root" x.nodes.loc[path[0], 'type'] = 'root' # Set new root's parent to None x.nodes.loc[new_root, 'parent_id'] = -1 # Reset index x.nodes.reset_index(drop=False, inplace=True) # Make sure node ID has the same datatype as before if x.nodes.node_id.dtype != nodeid_dtype: x.nodes['node_id'] = x.nodes.node_id.astype(nodeid_dtype, copy=False) # Finally: only reset non-graph related attributes if x.igraph and config.use_igraph: x._clear_temp_attr(exclude=['igraph', 'classify_nodes']) else: x._clear_temp_attr(exclude=['graph', 'classify_nodes']) return x
[docs] def cut_skeleton(x: 'core.NeuronObject', where: Union[int, str, List[Union[int, str]]], ret: Union[Literal['both'], Literal['proximal'], Literal['distal']] = 'both' ) -> 'core.NeuronList': """Split skeleton at given point and returns two new neurons. Split is performed between cut node and its parent node. The cut node itself will still be present in both resulting neurons. Parameters ---------- x : TreeNeuron | NeuronList Must be a single skeleton. where : int | str | list Node ID(s) or tag(s) of the node(s) to cut. The edge that is cut is the one between this node and its parent. So cut node must not be a root node! Multiple cuts are performed in the order of ``cut_node``. Fragments are ordered distal -> proximal. ret : 'proximal' | 'distal' | 'both', optional Define which parts of the neuron to return. Use this to speed up processing when you need only parts of the neuron. Returns ------- split : NeuronList Fragments of the input neuron after cutting sorted such that distal parts come before proximal parts. For example, with a single cut you can expect to return a NeuronList containing two neurons: the first contains the part distal and the second the part proximal to the cut node. The distal->proximal order of fragments is tried to be maintained for multiple cuts but this is not guaranteed. Examples -------- Cut skeleton at a (somewhat random) branch point >>> import navis >>> n = navis.example_neurons(1) >>> bp = n.nodes[n.nodes.type=='branch'].node_id.values >>> dist, prox = navis.cut_skeleton(n, bp[0]) Make cuts at multiple branch points >>> import navis >>> n = navis.example_neurons(1) >>> bp = n.nodes[n.nodes.type=='branch'].node_id.values >>> splits = navis.cut_skeleton(n, bp[:10]) See Also -------- :func:`navis.TreeNeuron.prune_distal_to` :func:`navis.TreeNeuron.prune_proximal_to` ``TreeNeuron/List`` shorthands to this function. :func:`navis.subset_neuron` Returns a neuron consisting of a subset of its nodes. """ utils.eval_param(ret, name='ret', allowed_values=('proximal', 'distal', 'both')) if isinstance(x, core.NeuronList): if len(x) == 1: x = x[0] else: raise Exception(f'Expected a single TreeNeuron, got {len(x)}') if not isinstance(x, core.TreeNeuron): raise TypeError(f'Expected a single TreeNeuron, got "{type(x)}"') if x.n_trees != 1: raise ValueError(f'Unable to cut: neuron {x.id} consists of multiple ' 'disconnected trees. Use navis.heal_skeleton()' ' to fix.') # At this point x is TreeNeuron x: core.TreeNeuron # Turn cut node into iterable if not utils.is_iterable(where): where = [where] # Process cut nodes (i.e. if tag) cn_ids: List[int] = [] for cn in where: # If cut_node is a tag (rather than an ID), try finding that node if isinstance(cn, str): if x.tags is None: raise ValueError(f"Neuron {x.id} has no tags") if cn not in x.tags: raise ValueError(f'#{x.id}: Found no node with tag {cn}' ' - please double check!') cn_ids += x.tags[cn] elif cn not in x.nodes.node_id.values: raise ValueError(f'No node with ID "{cn}" found.') elif cn in x.root: raise ValueError(f'Unable to cut at node "{cn}" - node is root') else: cn_ids.append(cn) # Remove duplicates while retaining order - set() would mess that up seen: Set[int] = set() cn_ids = [cn for cn in cn_ids if not (cn in seen or seen.add(cn))] # Warn if not all returned if len(cn_ids) > 1 and ret != 'both': logger.warning('Multiple cuts should use `ret = "both"`.') # Go over all cut_nodes -> order matters! res = [x] for cn in cn_ids: # First, find out in which neuron the cut node is to_cut = [n for n in res if cn in n.nodes.node_id.values][0] to_cut_ix = res.index(to_cut) # Remove this neuron from results (will be cut into two) res.remove(to_cut) # Cut neuron if x.igraph and config.use_igraph: cut = _cut_igraph(to_cut, cn, ret) else: cut = _cut_networkx(to_cut, cn, ret) # If ret != 'both', we will get only a single neuron - therefore # make sure cut is iterable cut = utils.make_iterable(cut) # Add results back to results at same index, proximal first for c in cut[::-1]: res.insert(to_cut_ix, c) return core.NeuronList(res)
def _cut_igraph(x: 'core.TreeNeuron', cut_node: int, ret: str) -> Union['core.TreeNeuron', Tuple['core.TreeNeuron', 'core.TreeNeuron']]: """Use iGraph to cut a neuron.""" # Make a copy g = x.igraph.copy() # Get vertex index cut_ix = g.vs.find(node_id=cut_node).index # Get edge to parent e = g.es.find(_source=cut_ix) # Remove edge g.delete_edges(e) # Make graph undirected -> otherwise .decompose() throws an error # This issue is fixed in the up-to-date branch of igraph-python # (which is not on PyPI O_o ) g.to_undirected(combine_edges='first') # Get subgraph -> fastest way to get sets of nodes for subsetting a, b = g.decompose(mode='WEAK') # IMPORTANT: a,b are now UNDIRECTED graphs -> we must not keep using them! if x.root[0] in a.vs['node_id']: dist_graph, prox_graph = b, a else: dist_graph, prox_graph = a, b if ret == 'distal' or ret == 'both': dist = morpho.subset_neuron(x, subset=dist_graph.vs['node_id'], inplace=False) # Change new root for dist dist.nodes.loc[dist.nodes.node_id == cut_node, 'type'] = 'root' # Clear other temporary attributes dist._clear_temp_attr(exclude=['igraph', 'type', 'classify_nodes']) if ret == 'proximal' or ret == 'both': ss: Sequence[int] = prox_graph.vs['node_id'] + [cut_node] prox = morpho.subset_neuron(x, subset=ss, inplace=False) # Change new root for dist prox.nodes.loc[prox.nodes.node_id == cut_node, 'type'] = 'end' # Clear other temporary attributes prox._clear_temp_attr(exclude=['igraph', 'type', 'classify_nodes']) if ret == 'both': return dist, prox elif ret == 'distal': return dist else: # elif ret == 'proximal': return prox def _cut_networkx(x: 'core.TreeNeuron', cut_node: Union[int, str], ret: str) -> Union['core.TreeNeuron', Tuple['core.TreeNeuron', 'core.TreeNeuron']]: """Use networkX graph to cut a neuron.""" # Get subgraphs consisting of nodes distal to cut node dist_graph: nx.DiGraph = nx.bfs_tree(x.graph, cut_node, reverse=True) if ret == 'distal' or ret == 'both': # bfs_tree does not preserve 'weight' # -> need to subset original graph by those nodes dist_graph = x.graph.subgraph(dist_graph.nodes) # Generate new neurons # This is the actual bottleneck of the function: ~70% of time dist = morpho.subset_neuron(x, subset=dist_graph, inplace=False) # type: ignore # doesn't know nx.DiGraph # Change new root for dist dist.nodes.loc[dist.nodes.node_id == cut_node, 'parent_id'] = -1 dist.nodes.loc[dist.nodes.node_id == cut_node, 'type'] = 'root' # Reassign graphs dist._graph_nx = dist_graph # Clear other temporary attributes dist._clear_temp_attr(exclude=['graph', 'type', 'classify_nodes']) if ret == 'proximal' or ret == 'both': # bfs_tree does not preserve 'weight' # need to subset original graph by those nodes ss_nodes = [n for n in x.graph.nodes if n not in dist_graph.nodes] + \ [cut_node] prox_graph: nx.DiGraph = x.graph.subgraph(ss_nodes) # Generate new neurons # This is the actual bottleneck of the function: ~70% of time prox = morpho.subset_neuron(x, subset=prox_graph, inplace=False) # Change cut node to end node for prox prox.nodes.loc[prox.nodes.node_id == cut_node, 'type'] = 'end' # Reassign graphs prox._graph_nx = prox_graph # Clear other temporary attributes prox._clear_temp_attr(exclude=['graph', 'type', 'classify_nodes']) # ATTENTION: prox/dist_graph contain pointers to the original graph # -> changes to attributes will propagate back if ret == 'both': return dist, prox elif ret == 'distal': return dist else: # elif ret == 'proximal': return prox def generate_list_of_childs(x: 'core.NeuronObject') -> Dict[int, List[int]]: """Return list of childs. Parameters ---------- x : TreeNeuron | NeuronList If List, must contain a SINGLE neuron. Returns ------- dict ``{parent_id: [child_id, child_id, ...]}`` """ assert isinstance(x, core.TreeNeuron) # Grab graph once to avoid overhead from stale checks g = x.graph return {n: [e[0] for e in g.in_edges(n)] for n in g.nodes} def node_label_sorting(x: 'core.TreeNeuron', weighted: bool = False) -> List[Union[str, int]]: """Return nodes ordered by node label sorting according to Cuntz et al., PLoS Computational Biology (2010). Parameters ---------- x : TreeNeuron weighted : bool If True will use actual distances instead of just node count. Depending on how evenly spaced your points are, this might not make much difference. Returns ------- list ``[root, node_id, node_id, ...]`` """ if isinstance(x, core.NeuronList) and len(x) == 1: x = x[0] if not isinstance(x, core.TreeNeuron): raise TypeError(f'Expected a singleTreeNeuron, got "{type(x)}"') if len(x.root) > 1: raise ValueError('Unable to process multi-root neurons!') # Get relevant terminal nodes term = x.nodes[x.nodes.type == 'end'].node_id.values # Get distance from terminals to all other nodes geo = geodesic_matrix(x, from_=term, directed=True, weight='weight' if weighted else None) # Set distance between unreachable points to None # Need to reinitialise SparseMatrix to replace float('inf') with NaN # dist_mat[geo == float('inf')] = None dist_mat = pd.DataFrame(np.where(geo == float('inf'), # type: ignore # no stubs for SparseDataFrame np.nan, geo), columns=geo.columns, index=geo.index) # Get starting points (i.e. branches off the root) and sort by longest # path to a terminal (note we're operating on the simplified version # of the skeleton) curr_points = sorted(list(x.simple.graph.predecessors(x.root[0])), key=lambda n: dist_mat[n].max(), reverse=True) # Walk from root towards terminals, prioritising longer branches nodes_walked = [] while curr_points: nodes_walked.append(curr_points.pop(0)) # If the current point is a terminal point, stop here if nodes_walked[-1] in term: pass else: new_points = sorted(list(x.simple.graph.predecessors(nodes_walked[-1])), key=lambda n: dist_mat[n].max(), reverse=True) curr_points = new_points + curr_points # Translate into segments node_list = [x.root[0]] # Note that we're inverting here so that the segments are ordered # proximal -> distal (i.e. root to tips) seg_dict = {s[0]: s[::-1] for s in _break_segments(x)} for n in nodes_walked: # Note that we're skipping the first (proximal) node to avoid double # counting nodes node_list += seg_dict[n][1:] return np.array(node_list) def _igraph_to_sparse(graph, weight_attr=None): edges = graph.get_edgelist() if weight_attr is None: weights = [1] * len(edges) else: weights = graph.es[weight_attr] if not graph.is_directed(): edges.extend([(v, u) for u, v in edges]) weights.extend(weights) # Note: previously, we used a generator (weights, zip(*egdes)) as input to # csr_matrix but with Scipy 1.13.0 this has stopped working edges = np.array(edges) return csr_matrix((weights, (edges[:,0], edges[:,1])), shape=(len(graph.vs), len(graph.vs))) def connected_subgraph(x: Union['core.TreeNeuron', nx.DiGraph], ss: Sequence[Union[str, int]]) -> Tuple[np.ndarray, Union[int, str]]: """Return set of nodes necessary to connect all nodes in subset ``ss``. Parameters ---------- x : navis.TreeNeuron | nx.DiGraph Neuron (or graph thereof) to get subgraph for. ss : list | array-like Node IDs of node to subset to. Returns ------- np.ndarray Node IDs of connected subgraph. root ID ID of the node most proximal to the old root in the connected subgraph. Examples -------- >>> import navis >>> n = navis.example_neurons(1) >>> ends = n.nodes[n.nodes.type.isin(['end', 'root'])].node_id.values >>> sg, root = navis.graph.graph_utils.connected_subgraph(n, ends) >>> # Since we asked for a subgraph connecting all terminals + root, >>> # we expect to see all nodes in the subgraph >>> sg.shape[0] == n.nodes.shape[0] True """ if isinstance(x, core.NeuronList): if len(x) == 1: g = x[0].graph elif isinstance(x, core.TreeNeuron): g = x.graph elif isinstance(x, nx.DiGraph): g = x else: raise TypeError(f'Input must be a single TreeNeuron or graph, got "{type(x)}".') ss = set(ss) missing = ss - set(g.nodes) if np.any(missing): missing = np.array(list(missing)).astype(str) # do NOT remove list() here! raise ValueError(f'Nodes not found: {",".join(missing)}') # Find nodes that are leafs WITHIN the subset g_ss = g.subgraph(ss) in_degree = dict(g_ss.in_degree) leafs = ss & {n for n, d in in_degree.items() if not d} # Run this for each connected component of the neuron include = set() new_roots = [] for cc in nx.connected_components(g.to_undirected()): # Walk from each node to root and keep track of path paths = [] for n in leafs & cc: this_path = [] while n is not None: this_path.append(n) n = next(g.successors(n), None) paths.append(this_path) # If none of these cc in subset there won't be paths if not paths: continue # Find the nodes that all paths have in common common = set.intersection(*[set(p) for p in paths]) # Now find the first (most distal from root) common node longest_path = sorted(paths, key=lambda x: len(x))[-1] first_common = sorted(common, key=lambda x: longest_path.index(x))[0] # Now go back to paths and collect all nodes until this first common node for p in paths: it = iter(p) n = next(it, None) while n is not None: if n in include: break if n == first_common: include.add(n) break include.add(n) n = next(it, None) # In cases where there are even more distal common ancestors # (first common will typically be a branch point) this_ss = ss & cc if this_ss - include: # Make sure the new root is set correctly nr = sorted(this_ss - include, key=lambda x: longest_path.index(x))[-1] new_roots.append(nr) # Add those nodes to be included include = set.union(include, this_ss) else: new_roots.append(first_common) return np.array(list(include)), new_roots
[docs] def insert_nodes(x: 'core.TreeNeuron', where: List[tuple], coords: List[tuple] = None, validate: bool = True, inplace: bool = False) -> Optional['core.TreeNeuron']: """Insert new nodes between existing nodes. Parameters ---------- x : TreeNeuron Neuron to insert new nodes into. where : list of node pairs Must be a list of node ID pairs. A new node will be added between the nodes of each pair (see examples). coords : None | list of (x, y, z) coordinates | list of fractions Can be: - ``None``: new nodes will be inserted exactly between the two nodes - (N, 3) array of coordinates for the newly inserted nodes - (N, ) array of fractional distances [0-1]: e.g. 0.25 means that a new node will be inserted a quarter of the way between the two nodes (from the child's perspective) validate : bool If True, will make sure that pairs in ``where`` are always in (parent, child) order. If you know this to already be the case, set ``validate=False`` to save some time. inplace : bool If True, will rewire the neuron inplace. If False, will return a rewired copy of the neuron. Returns ------- TreeNeuron Examples -------- Insert new nodes between some random points >>> import navis >>> n = navis.example_neurons(1) >>> n.n_nodes 4465 >>> where = n.nodes[['parent_id', 'node_id']].values[100:200] >>> _ = navis.insert_nodes(n, where=where, inplace=True) >>> n.n_nodes 4565 """ utils.eval_param(x, name='x', allowed_types=(core.TreeNeuron, )) where = np.asarray(where) if where.ndim != 2 or where.shape[1] != 2: raise ValueError('Expected `where` to be a (N, 2) list of pairs. ' f'Got {where.shape}') # Validate if that's desired if validate: # Setup to get parents parent = x.nodes.set_index('node_id').parent_id # Get parents of the left and the right nodes of each pair parent_left = parent.loc[where[:, 0]].values parent_right = parent.loc[where[:, 1]].values # Check if the right node is parent of the left or the other way around correct_order = where[:, 0] == parent_right swapped = where[:, 1] == parent_left not_connected = ~(correct_order | swapped) if np.any(not_connected): raise ValueError('The following pairs are not connected: ' f'{where[not_connected]}') # Flip nodes where necessary to sure we have (parent, child) order if np.any(swapped): where[swapped, :] = where[swapped][:, [1, 0]] # If not provided, generate coordinates in the center between each node pair if isinstance(coords, type(None)): node_locs = x.nodes.set_index('node_id')[['x', 'y', 'z']] left_loc = node_locs.loc[where[:, 0]].values right_loc = node_locs.loc[where[:, 1]].values # Find center between each node coords = left_loc + (right_loc - left_loc) / 2 coords = np.asarray(coords) # Make sure we have correct coordinates if coords.shape[0] != where.shape[0]: raise ValueError(f'Expected {where.shape[0]} coordinates or distances, ' f'got {coords.shape[0]}') # If array of fractional distances translate to coordinates if coords.ndim == 1: node_locs = x.nodes.set_index('node_id')[['x', 'y', 'z']] left_loc = node_locs.loc[where[:, 0]].values right_loc = node_locs.loc[where[:, 1]].values # Find center between each node coords = left_loc + (right_loc - left_loc) * coords.reshape(-1, 1) # For the moment, we will interpolate the radius rad = x.nodes.set_index('node_id').radius new_rad = (rad.loc[where[:, 0]].values + rad.loc[where[:, 1]].values) / 2 # Generate table for new nodes new_nodes = pd.DataFrame() max_id = x.nodes.node_id.max() + 1 new_nodes['node_id'] = np.arange(max_id, max_id + where.shape[0]).astype(int) new_nodes['parent_id'] = where[:, 0] new_nodes['x'] = coords[:, 0] new_nodes['y'] = coords[:, 1] new_nodes['z'] = coords[:, 2] new_nodes['radius'] = new_rad # Merge tables nodes = pd.concat([x.nodes, new_nodes], join='outer', axis=0, sort=True, ignore_index=True) # Remap nodes new_parents = dict(zip(where[:, 1], new_nodes.node_id.values)) to_rewire = nodes.node_id.isin(new_parents) nodes.loc[to_rewire, 'parent_id'] = nodes.loc[to_rewire, 'node_id'].map(new_parents) if not inplace: x = x.copy() x._nodes = nodes return x
[docs] def remove_nodes(x: 'core.TreeNeuron', which: List[int], inplace: bool = False) -> Optional['core.TreeNeuron']: """Drop nodes from neuron without disconnecting it. Dropping node 2 from 1->2->3 will lead to connectivity 1->3. Parameters ---------- x : TreeNeuron Neuron to remove nodes from. which : list of node IDs IDs of nodes to remove. inplace : bool If True, will rewire the neuron inplace. If False, will return a rewired copy of the neuron. Returns ------- TreeNeuron Examples -------- Drop points from a neuron >>> import navis >>> n = navis.example_neurons(1) >>> n.n_nodes 4465 >>> # Drop a hundred nodes >>> n2 = navis.remove_nodes(n, n.nodes.node_id.values[100:200]) >>> n2.n_nodes 4365 """ utils.eval_param(x, name='x', allowed_types=(core.TreeNeuron, )) if not utils.is_iterable(which): which = [which] which = np.asarray(which) miss = ~np.isin(which, x.nodes.node_id.values) if np.any(miss): raise ValueError(f'{len(miss)} node IDs not found in neuron') if not inplace: x = x.copy() # Generate new list of parents lop = dict(zip(x.nodes.node_id.values, x.nodes.parent_id.values)) # Rewire to skip the to-be-removed nodes for n in which: lop.update({c: lop[n] for c, p in lop.items() if p == n}) # Rewire neuron x.nodes['parent_id'] = x.nodes.node_id.map(lop) # Drop nodes x.nodes = x.nodes[~x.nodes.node_id.isin(which)].copy() # Clear temporary attributes x._clear_temp_attr() return x
[docs] def rewire_skeleton(x: 'core.TreeNeuron', g: nx.Graph, root: Optional[id] = None, inplace: bool = False) -> Optional['core.TreeNeuron']: """Rewire neuron from graph. This function takes a graph representation of a neuron and rewires its node table accordingly. This is useful if we made changes to the graph (i.e. adding or removing edges) and want those to propagate to the node table. Parameters ---------- x : TreeNeuron Neuron to be rewired. g : networkx.Graph Graph to use for rewiring. Please note that directionality (if present) is not taken into account. Nodes not included in the graph will be disconnected (i.e. won't have a parent). Nodes in the graph but not in the table are ignored! root : int Node ID for the new root. If not given, will try to use the current root. inplace : bool If True, will rewire the neuron inplace. If False, will return a rewired copy of the neuron. Returns ------- TreeNeuron Examples -------- >>> import navis >>> n = navis.example_neurons(1) >>> n.n_trees 1 >>> # Drop one edge from graph >>> g = n.graph.copy() >>> g.remove_edge(310, 309) >>> # Rewire neuron >>> n2 = navis.rewire_skeleton(n, g, inplace=False) >>> n2.n_trees 2 """ assert isinstance(x, core.TreeNeuron), f'Expected TreeNeuron, got {type(x)}' assert isinstance(g, nx.Graph), f'Expected networkx graph, got {type(g)}' if not inplace: x = x.copy() if g.is_directed(): g = g.to_undirected() g = nx.minimum_spanning_tree(g, weight='weight') if not root: root = x.root[0] if x.root[0] in g.nodes else next(iter(g.nodes)) # Generate tree for the main component tree = nx.dfs_tree(g, source=root) # Generate list of parents lop = {e[1]: e[0] for e in tree.edges} # If the graph has more than one connected component, # the remaining components have arbitrary roots if len(tree.edges) != len(g.edges): for cc in nx.connected_components(g): if root not in cc: tree = nx.dfs_tree(g, source=cc.pop()) lop.update({e[1]: e[0] for e in tree.edges}) # Update parent IDs x.nodes['parent_id'] = x.nodes.node_id.map(lambda x: lop.get(x, -1)) x._clear_temp_attr() return x