Source code for navis.core.dotprop

#    This script is part of navis (http://www.github.com/navis-org/navis).
#    Copyright (C) 2018 Philipp Schlegel
#
#    This program is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.

import copy
import numbers
import pint
import types
import uuid
import warnings

import numpy as np
import pandas as pd

from typing import Union, Callable, List, Optional, Tuple
from typing_extensions import Literal

from .. import utils, config, core, sampling, graph

from .base import BaseNeuron

try:
    import xxhash
except ImportError:
    xxhash = None

try:
    from pykdtree.kdtree import KDTree
except ImportError:
    from scipy.spatial import cKDTree as KDTree

__all__ = ['Dotprops']

# Set up logging
logger = config.get_logger(__name__)

# This is to prevent pint to throw a warning about numpy integration
with warnings.catch_warnings():
    warnings.simplefilter("ignore")
    pint.Quantity([])


[docs] class Dotprops(BaseNeuron): """Neuron represented as points + local vectors. Dotprops consist of points with x/y/z coordinates, a tangent vector and an alpha value describing the immediate neighbourhood (see also references). Typically constructed using :func:`navis.make_dotprops`. References ---------- Masse N.Y., Cachero S., Ostrovsky A., and Jefferis G.S.X.E. (2012). A mutual information approach to automate identification of neuronal clusters in Drosophila brain images. Frontiers in Neuroinformatics 6 (00021). doi: 10.3389/fninf.2012.00021 Parameters ---------- points : numpy array (N, 3) array of x/y/z coordinates. k : int, optional Number of nearest neighbors for tangent vector calculation. This can be ``None`` or ``0`` but then vectors must be provided on initialization and can subsequently not be re-calculated. Typical values here are ``k=20`` for dense (e.g. from light level data) and ``k=5`` for sparse (e.g. from skeletons) point clouds. vect : numpy array, optional (N, 3) array of vectors. If not provided will recalculate both ``vect`` and ``alpha`` using ``k``. alpha : numpy array, optional (N, ) array of alpha values. If not provided will recalculate both ``alpha`` and ``vect`` using ``k``. units : str | pint.Units | pint.Quantity Units for coordinates. Defaults to ``None`` (dimensionless). Strings must be parsable by pint: e.g. "nm", "um", "micrometer" or "8 nanometers". **metadata Any additional data to attach to neuron. """ connectors: Optional[pd.DataFrame] points: np.ndarray alpha: np.ndarray vect: np.ndarray k: Optional[int] soma: Optional[Union[list, np.ndarray]] #: Attributes used for neuron summary SUMMARY_PROPS = ['type', 'name', 'k', 'units', 'n_points'] #: Attributes to be used when comparing two neurons. EQ_ATTRIBUTES = ['name', 'n_points', 'k'] #: Temporary attributes that need clearing when neuron data changes TEMP_ATTR = ['_memory_usage'] #: Core data table(s) used to calculate hash _CORE_DATA = ['points', 'vect']
[docs] def __init__(self, points: np.ndarray, k: int, vect: Optional[np.ndarray] = None, alpha: Optional[np.ndarray] = None, units: Union[pint.Unit, str] = None, **metadata ): """Initialize Dotprops Neuron.""" super().__init__() self.k = k self.points = points self.alpha = alpha self.vect = vect self.soma = None for k, v in metadata.items(): try: setattr(self, k, v) except AttributeError: raise AttributeError(f"Unable to set neuron's `{k}` attribute.") self.units = units
def __truediv__(self, other, copy=True): """Implement division for coordinates.""" if isinstance(other, numbers.Number) or utils.is_iterable(other): # If a number, consider this an offset for coordinates n = self.copy() if copy else self _ = np.divide(n.points, other, out=n.points, casting='unsafe') if n.has_connectors: n.connectors.loc[:, ['x', 'y', 'z']] /= other # Force recomputing of KDTree if hasattr(n, '_tree'): delattr(n, '_tree') # Convert units # Note: .to_compact() throws a RuntimeWarning and returns unchanged # values when `units` is a iterable with warnings.catch_warnings(): warnings.simplefilter("ignore") n.units = (n.units * other).to_compact() return n return NotImplemented def __mul__(self, other, copy=True): """Implement multiplication for coordinates.""" if isinstance(other, numbers.Number) or utils.is_iterable(other): # If a number, consider this an offset for coordinates n = self.copy() if copy else self _ = np.multiply(n.points, other, out=n.points, casting='unsafe') if n.has_connectors: n.connectors.loc[:, ['x', 'y', 'z']] *= other # Force recomputing of KDTree if hasattr(n, '_tree'): delattr(n, '_tree') # Convert units # Note: .to_compact() throws a RuntimeWarning and returns unchanged # values when `units` is a iterable with warnings.catch_warnings(): warnings.simplefilter("ignore") n.units = (n.units / other).to_compact() return n return NotImplemented def __getstate__(self): """Get state (used e.g. for pickling).""" state = {k: v for k, v in self.__dict__.items() if not callable(v)} # The KDTree from pykdtree does not like being pickled # We will have to remove it which will force it to be regenerated # after unpickling if '_tree' in state: if 'pykdtree' in str(type(state['_tree'])): _ = state.pop('_tree') return state @property def alpha(self): """Alpha value for tangent vectors (optional).""" if isinstance(self._alpha, type(None)): if isinstance(self.k, type(None)) or (self.k <= 0): raise ValueError('Unable to calculate `alpha` for Dotprops not ' 'generated using k-nearest-neighbors.') self.recalculate_tangents(self.k, inplace=True) return self._alpha @alpha.setter def alpha(self, value): if not isinstance(value, type(None)): value = np.asarray(value) if value.ndim != 1: raise ValueError(f'alpha must be (N, ) array, got {value.shape}') self._alpha = value @property def bbox(self) -> np.ndarray: """Bounding box (includes connectors).""" mn = np.min(self.points, axis=0) mx = np.max(self.points, axis=0) if self.has_connectors: cn_mn = np.min(self.connectors[['x', 'y', 'z']].values, axis=0) cn_mx = np.max(self.connectors[['x', 'y', 'z']].values, axis=0) mn = np.min(np.vstack((mn, cn_mn)), axis=0) mx = np.max(np.vstack((mx, cn_mx)), axis=0) return np.vstack((mn, mx)).T @property def datatables(self) -> List[str]: """Names of all DataFrames attached to this neuron.""" return [k for k, v in self.__dict__.items() if isinstance(v, pd.DataFrame, np.ndarray)] @property def kdtree(self): """KDTree for points.""" if not getattr(self, '_tree', None): self._tree = KDTree(self.points) return self._tree @property def points(self): """Center of tangent vectors.""" return self._points @points.setter def points(self, value): if isinstance(value, type(None)): value = np.zeros((0, 3)) value = np.asarray(value) if value.ndim != 2 or value.shape[1] != 3: raise ValueError(f'points must be (N, 3) array, got {value.shape}') self._points = value # Also reset KDtree self._tree = None @property def vect(self): """Tangent vectors.""" if isinstance(self._vect, type(None)): self.recalculate_tangents(self.k, inplace=True) return self._vect @vect.setter def vect(self, value): if not isinstance(value, type(None)): value = np.asarray(value) if value.ndim != 2 or value.shape[1] != 3: raise ValueError(f'vectors must be (N, 3) array, got {value.shape}') self._vect = value @property def sampling_resolution(self): """Mean distance between points.""" dist, _ = self.kdtree.query(self.points, k=2) return np.mean(dist[:, 1]) @property def soma(self) -> Optional[int]: """Index of soma point. ``None`` if no soma. You can assign either a function that accepts a Dotprops as input or a fix value. Default is None. """ if callable(self._soma): soma = self._soma.__call__() # type: ignore # say int not callable else: soma = self._soma # Sanity check to make sure that the soma node actually exists if isinstance(soma, type(None)): # Return immmediately without expensive checks return soma elif utils.is_iterable(soma): if not any(soma): soma = None elif any(np.array(soma) < 0) or any(np.array(soma) > self.points.shape[0]): logger.warning(f'Soma(s) {soma} not found in points.') soma = None else: if 0 < soma < self.points.shape[0]: logger.warning(f'Soma {soma} not found in node table.') soma = None return soma @soma.setter def soma(self, value: Union[Callable, int, None]) -> None: """Set soma.""" if hasattr(value, '__call__'): self._soma = types.MethodType(value, self) elif isinstance(value, type(None)): self._soma = None elif isinstance(value, bool) and not value: self._soma = None else: if 0 < value < self.points.shape[0]: self._soma = value else: raise ValueError('Soma must be function, None or a valid node index.') @property def type(self) -> str: """Neuron type.""" return 'navis.Dotprops' def dist_dots(self, other: 'Dotprops', alpha: bool = False, distance_upper_bound: Optional[float] = None, **kwargs) -> Union[ Tuple[np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray, np.ndarray] ]: """Query this Dotprops against another. This function is mainly for ``navis.nblast``. Parameters ---------- other : Dotprops alpha : bool If True, will also return the product of the product of the alpha values of matched points. distance_upper_bound : non-negative float, optional If provided, we will stop the nearest neighbor search at this distance which can vastly speed up the query. For points with no hit within this distance, `dist` will be set to `distance_upper_bound`, and `dotprods` and `alpha_prod` will be set to 0. kwargs Keyword arguments are passed to the KDTree's ``query()`` method. Note that we are using ``pykdtree.kdtree.KDTree`` if available and fall back to ``scipy.spatial.cKDTree`` if pykdtree is not installed. Returns ------- dist : np.ndarray For each point in ``self``, the distance to the closest point in ``other``. dotprods : np.ndarray Dotproduct of each pair of closest points between ``self`` and ``other``. alpha_prod : np.ndarray Dotproduct of each pair of closest points between ``self`` and ``other``. Only returned if ``alpha=True``. """ if not isinstance(other, Dotprops): raise TypeError(f'Expected Dotprops, got "{type(other)}"') # If we are using pykdtree we need to make sure that self.points is # of the same dtype as other.points - not a problem with scipy but # the overhead is typically only a few micro seconds anyway points = self.points.astype(other.points.dtype, copy=False) # Scipy's KDTree does not like the distance to be None diub = distance_upper_bound if distance_upper_bound else np.inf fast_dists, fast_idxs = other.kdtree.query(points, distance_upper_bound=diub, **kwargs) # If upper distance we have to worry about infinite distances if distance_upper_bound: no_nn = fast_dists == np.inf fast_dists[no_nn] = distance_upper_bound # Temporarily give those nodes a match fast_idxs[no_nn] = 0 fast_dotprods = np.abs((self.vect * other.vect[fast_idxs]).sum(axis=1)) if distance_upper_bound: fast_dotprods[no_nn] = 0 if not alpha: return fast_dists, fast_dotprods fast_alpha = self.alpha * other.alpha[fast_idxs] if distance_upper_bound: fast_alpha[no_nn] = 0 return fast_dists, fast_dotprods, fast_alpha def downsample(self, factor=5, inplace=False, **kwargs): """Downsample the neuron by given factor. Parameters ---------- factor : int, optional Factor by which to downsample the neurons. Default = 5. inplace : bool, optional If True, operation will be performed on itself. If False, operation is performed on copy which is then returned. **kwargs Additional arguments passed to :func:`~navis.downsample_neuron`. See Also -------- :func:`~navis.downsample_neuron` Base function. See for details and examples. """ if inplace: x = self else: x = self.copy() sampling.downsample_neuron(x, factor, inplace=True, **kwargs) if not inplace: return x return None def copy(self) -> 'Dotprops': """Return a copy of the dotprops. Returns ------- Dotprops """ # Don't copy the KDtree - when using pykdtree, copy.copy throws an # error and the construction is super fast anyway no_copy = ['_lock', '_tree'] # Generate new empty neuron - note we pass vect and alpha to # prevent calculation on initialization x = self.__class__(points=np.zeros((0, 3)), k=1, vect=np.zeros((0, 3)), alpha=np.zeros(0)) # Populate with this neuron's data x.__dict__.update({k: copy.copy(v) for k, v in self.__dict__.items() if k not in no_copy}) return x def recalculate_tangents(self, k: int, inplace=False) -> None: """Recalculate tangent vectors and alpha with a new ``k``. Parameters ---------- k : int Number of nearest neighbours to use for tangent vector calculation. inplace : bool If False, will return a copy and leave the original data unmodified. Returns ------- Dotprops Only if ``inplace=False``. """ if not inplace: x = self.copy() else: x = self if isinstance(k, type(None)) or k < 1: raise ValueError(f'`k` must be integer >= 1, got "{k}"') # Checks and balances n_points = x.points.shape[0] if n_points < k: raise ValueError(f"Too few points ({n_points}) to calculate {k} " "nearest-neighbors") # Create the KDTree and get the k-nearest neighbors for each point dist, ix = self.kdtree.query(x.points, k=k) # Get points: array of (N, k, 3) pt = x.points[ix] # Generate centers for each cloud of k nearest neighbors centers = np.mean(pt, axis=1) # Generate vector from center cpt = pt - centers.reshape((pt.shape[0], 1, 3)) # Get inertia (N, 3, 3) inertia = cpt.transpose((0, 2, 1)) @ cpt # Extract vector and alpha u, s, vh = np.linalg.svd(inertia) x.vect = vh[:, 0, :] x.alpha = (s[:, 0] - s[:, 1]) / np.sum(s, axis=1) # Keep track of k x.k = k if not inplace: return x
[docs] def snap(self, locs, to='points'): """Snap xyz location(s) to closest point or synapse. Parameters ---------- locs : (N, 3) array | (3, ) array Either single or multiple XYZ locations. to : "points" | "connectors" Whether to snap to points or connectors. Returns ------- ix : int | list of int Index/Indices of the closest point/connector. dist : float | list of float Distance(s) to the closest point/connector. Examples -------- >>> import navis >>> n = navis.example_neurons(1) >>> dp = navis.make_dotprops(n, k=5) >>> ix, dist = dp.snap([0, 0, 0]) >>> ix 1123 """ locs = np.asarray(locs).astype(np.float64) is_single = (locs.ndim == 1 and len(locs) == 3) is_multi = (locs.ndim == 2 and locs.shape[1] == 3) if not is_single and not is_multi: raise ValueError('Expected a single (x, y, z) location or a ' '(N, 3) array of multiple locations') if to not in ['points', 'connectors']: raise ValueError('`to` must be "points" or "connectors", ' f'got {to}') # Generate tree tree = graph.neuron2KDTree(self, data=to) # Find the closest node dist, ix = tree.query(locs) return ix, dist
[docs] def to_skeleton(self, scale_vec: Union[float, Literal['auto']] = 'auto' ) -> core.TreeNeuron: """Turn dotprops into a skeleton. This is mainly for things like plotting as it does not produce meaningful edges. Also note that only minimal meta data is carried over. Parameters ---------- scale_vec : "auto" | float Factor by which to scale each tangent vector when generating the line segments. If "auto" (default for plotting) will use the sampling resolution (median distance between points) to determine a suitable values. Returns ------- TreeNeuron """ if not isinstance(scale_vec, numbers.Number) and scale_vec != 'auto': raise ValueError('`scale_vect` must be "auto" or a number, ' f'got {scale_vec}') if scale_vec == 'auto': scale_vec = self.sampling_resolution * .8 # Prepare segments - this is based on nat:::plot3d.dotprops halfvect = self.vect / 2 * scale_vec starts = self.points - halfvect ends = self.points + halfvect # Interweave starts and ends segs = np.zeros((starts.shape[0] * 2, 3)) segs[::2] = starts segs[1::2] = ends # Generate node table nodes = pd.DataFrame(segs, columns=['x', 'y', 'z']) nodes['node_id'] = nodes.index nodes['parent_id'] = -1 nodes.loc[1::2, 'parent_id'] = nodes.index.values[::2] # Produce a minimal TreeNeuron tn = core.TreeNeuron(nodes, units=self.units, id=self.id) # Carry over the label if getattr(self, '_label', None): tn._label = self._label # Add some other relevant attributes directly if self.has_connectors: tn._connectors = self._connectors tn._soma = self._soma return tn
def __len__(self): return len(self.points)