Source code for navis.transforms.xfm_funcs

#    This script is part of navis (http://www.github.com/navis-org/navis).
#    Copyright (C) 2018 Philipp Schlegel
#
#    This program is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
#    GNU General Public License for more details.

import math
import numbers

import numpy as np
import pandas as pd
import trimesh as tm

from functools import lru_cache
from scipy import ndimage
from scipy.spatial.distance import pdist
from typing import Union, Optional

from .. import utils, core, config
from .base import BaseTransform, TransformSequence, TransOptimizer
from .affine import AffineTransform

logger = config.get_logger(__name__)


[docs] def xform(x: Union['core.NeuronObject', 'pd.DataFrame', 'np.ndarray'], transform: Union[BaseTransform, TransformSequence], affine_fallback: bool = True, caching: bool = True) -> Union['core.NeuronObject', 'pd.DataFrame', 'np.ndarray']: """Apply transform(s) to data. Notes ----- For Neurons only: whether there is a change in units during transformation (e.g. nm -> um) is inferred by comparing distances between x/y/z coordinates before and after transform. This guesstimate is then used to convert ``.units`` and node/soma radii. This works reasonably well with base 10 increments (e.g. nm -> um) but is off with odd changes in units. Parameters ---------- x : Neuron/List | Volume/Trimesh | numpy.ndarray | pandas.DataFrame Data to transform. Dataframe must contain ``['x', 'y', 'z']`` columns. Numpy array must be shape ``(N, 3)``. transform : Transform/Sequence or list thereof Either a single transform or a transform sequence. affine_fallback : bool In same cases the non-rigid transformation of points can fail - for example if points are outside the deformation field. If that happens, they will be returned as ``NaN``. Unless ``affine_fallback`` is ``True``, in which case we will apply only the rigid affine part of the transformation to at least get close to the correct coordinates. caching : bool If True, will (pre-)cache data for transforms whenever possible. Depending on the data and the type of transforms this can tremendously speed things up at the cost of increased memory usage: - ``False`` = no upfront cost, lower memory footprint - ``True`` = higher upfront cost, most definitely faster Only applies if input is NeuronList and if transforms include H5 transform. Returns ------- same type as ``x`` Copy of input with transformed coordinates. Examples -------- >>> import navis >>> # Example neurons are in 8nm voxel space >>> nl = navis.example_neurons() >>> # Make a simple Affine transform to go from voxel to nanometers >>> import numpy as np >>> M = np.diag([8, 8, 8, 8]) >>> tr = navis.transforms.AffineTransform(M) >>> # Apply the transform >>> xf = navis.xform(nl, tr) See Also -------- :func:`navis.xform_brain` Higher level function that finds and applies a sequence of transforms to go from one template brain to another. """ # We need to work with TransformSequence if isinstance(transform, (list, np.ndarray)): transform = TransformSequence(*transform) elif isinstance(transform, BaseTransform): transform = TransformSequence(transform) elif not isinstance(transform, TransformSequence): raise TypeError(f'Expected Transform or TransformSequence, got "{type(transform)}"') if isinstance(x, core.NeuronList): if len(x) == 1: x = x[0] else: xf = [] # Get the transformation sequence with TransOptimizer(transform, bbox=x.bbox, caching=caching): try: for i, n in enumerate(config.tqdm(x, desc='Xforming', disable=config.pbar_hide, leave=config.pbar_leave)): xf.append(xform(n, transform=transform, caching=caching, affine_fallback=affine_fallback)) # If not caching we will clear the map cache after # each neuron to free memory if not caching: _get_coordinates_map.cache_clear() except BaseException: raise finally: # Make sure we clear the coordinate map cache when done _get_coordinates_map.cache_clear() return x.__class__(xf) if isinstance(x, core.BaseNeuron): # VoxelNeurons are a special case and have hence their own function if isinstance(x, core.VoxelNeuron): return _xform_image(x, transform=transform) xf = x.copy() # We will collate spatial data to reduce overhead from calling # R's xform_brain if isinstance(xf, core.TreeNeuron): xyz = xf.nodes[['x', 'y', 'z']].values elif isinstance(xf, core.MeshNeuron): xyz = xf.vertices elif isinstance(xf, core.Dotprops): xyz = xf.points # If this dotprops has a `k`, we only need to transform points and # can regenerate the rest. If not, we need to make helper points # to carry over vectors if isinstance(xf.k, type(None)) or xf.k <= 0: # To avoid problems with these helpers we need to make sure # they aren't too close to their cognate points (otherwise we'll # get NaNs later). We can fix this by scaling the vector by the # sampling resolution which should also help make things less # noisy. hp = xf.points + xf.vect * xf.sampling_resolution xyz = np.append(xyz, hp, axis=0) else: raise TypeError(f"Don't know how to transform neuron of type '{type(xf)}'") # Add connectors if they exist if xf.has_connectors: xyz = np.vstack([xyz, xf.connectors[['x', 'y', 'z']].values]) # Do the xform of all spatial data xyz_xf = xform(xyz, transform=transform, affine_fallback=affine_fallback) # Guess change in spatial units if xyz.shape[0] > 1: change, magnitude = _guess_change(xyz, xyz_xf, sample=1000) else: change, magnitude = 1, 0 logger.warning(f'Unable to assess change of units for neuron {x.id}: ' 'must have at least two nodes/points.') # Round change -> this rounds to the first non-zero digit # change = np.around(change, decimals=-magnitude) # Map xformed coordinates back if isinstance(xf, core.TreeNeuron): xf.nodes[['x', 'y', 'z']] = xyz_xf[:xf.n_nodes] # Fix radius based on our best estimate if 'radius' in xf.nodes.columns: xf.nodes['radius'] *= 10**magnitude elif isinstance(xf, core.Dotprops): xf.points = xyz_xf[:xf.points.shape[0]] # If this dotprops has a `k`, set tangent vectors and alpha to # None so they will be regenerated if not isinstance(xf.k, type(None)) and xf.k > 0: xf._vect = xf._alpha = None else: # Re-generate vectors hp = xyz_xf[xf.points.shape[0]: xf.points.shape[0] * 2] vect = xf.points - hp vect = vect / np.linalg.norm(vect, axis=1).reshape(-1, 1) xf._vect = vect elif isinstance(xf, core.MeshNeuron): xf.vertices = xyz_xf[:xf.vertices.shape[0]] if xf.has_connectors: xf.connectors[['x', 'y', 'z']] = xyz_xf[-xf.connectors.shape[0]:] # Make an educated guess as to whether the units have changed if hasattr(xf, 'units') and magnitude != 0: if isinstance(xf.units, (config.ureg.Unit, config.ureg.Quantity)): xf.units = (xf.units / 10**magnitude).to_compact() # Fix soma radius if applicable if hasattr(xf, 'soma_radius') and isinstance(xf.soma_radius, numbers.Number): xf.soma_radius *= 10**magnitude return xf elif isinstance(x, pd.DataFrame): if any([c not in x.columns for c in ['x', 'y', 'z']]): raise ValueError('DataFrame must have x, y and z columns.') x = x.copy() x[['x', 'y', 'z']] = xform(x[['x', 'y', 'z']].values, transform=transform, affine_fallback=affine_fallback) return x elif isinstance(x, tm.Trimesh): x = x.copy() x.vertices = xform(x.vertices, transform=transform, affine_fallback=affine_fallback) return x else: try: # At this point we expect numpy arrays x = np.asarray(x) except BaseException: raise TypeError(f'Unable to transform data of type "{type(x)}"') if not x.ndim == 2 or x.shape[1] != 3: raise ValueError('Array must be of shape (N, 3).') # Apply transform and return xformed points return transform.xform(x, affine_fallback=affine_fallback)
def _xform_image(x: 'core.VoxelNeuron', transform: Union[BaseTransform, TransformSequence] ) -> 'core.VoxelNeuron': """Apply transform(s) to image (voxel) data. Parameters ---------- x : VoxelNeuron Data to transform. transform : Transform/Sequence or list thereof Either a single transform or a transform sequence. Returns ------- VoxelNeuron Copy of neuron with transformed coordinates. """ if not isinstance(x, core.VoxelNeuron): raise TypeError(f'Unable to transform image of type "{type(x)}"') # Get a target->source mapping # This is in a separate function because we are caching it # This cache is cleared by `xform` depending on whether caching is active # Note the conversion to tuples to make parameters hashable (ix_array_source, ix_array_target, target_voxel_size, target_offset) = _get_coordinates_map(transform, tuple(x.bbox.flatten()), x.shape, tuple(x.units_xyz.magnitude)) # Use target->source index mapping to interpolate the image # order=1 means linear interpolation (much faster) mapped = ndimage.map_coordinates(x.grid, ix_array_source.T, order=1) grid_xf = np.zeros(x.shape) grid_xf[ix_array_target[:, 0], ix_array_target[:, 1], ix_array_target[:, 2]] = mapped # Generate the transformed neuron xf = x.copy() # Grid should be the same data type as original # also: vispy doesn't like float64 xf.grid = grid_xf.astype(xf.grid.dtype) # New offset based on bounding box xf.offset = target_offset # Set voxel size xf.units = target_voxel_size return xf # Note that the output of this function can be fairly large in memory # (order ~4Gb for VFB image with shape (1210, 566, 174)) @lru_cache(maxsize=2) def _get_coordinates_map(transform, bbox, shape, spacing): """Get target->source coordinate map for scipy's map_coordinates. Parameters ---------- transform : Transform/Sequence The source->target transform. bbox : (6, ) tuple Bounding box of the image (in model space). shape : (3, ) tuple Shape of the image. spacing : (3, ) tuple The voxel dimensions. """ # Parameters must be hashable for cache - hence no arrays # Here, we convert bbox back to arrays bbox = np.array(bbox).reshape(3, 2) # We could just use the two points of the source's bounding box to calculate # the target's bounding box. However, this can yield incorrect results. # It's better to sample a couple more points on the surface of the source's # bounding box b = tm.primitives.Box(extents=bbox[:, 1] - bbox[:, 0]).to_mesh() b.vertices += bbox.mean(axis=1) b = b.subdivide().vertices # Subdivide to get more points on the surface # Temporarily ignore warnings current_level = int(logger.level) try: logger.setLevel('ERROR') # Transform points individually to avoid caching the entire volume b_xf = np.vstack([transform.xform(p.reshape(-1, 3), affine_fallback=True) for p in b]) bbox_xf = np.vstack([np.min(b_xf, axis=0), np.max(b_xf, axis=0)]).T except BaseException: raise finally: logger.setLevel(current_level) # Next: generate a voxel grid in the target space of the same shape as # our input grid target_voxel_size = np.abs((bbox_xf[:, 1] - bbox_xf[:, 0]) / shape) # Generate a grid of xyz coordinates XX, YY, ZZ = np.meshgrid(range(shape[0]), range(shape[1]), range(shape[2]), indexing='ij') # Coordinate grid has, for each voxel, in the (M, N, K) voxel grid an xyz # index coordinate -> hence shape is (3, M, N, K) ix_grid = np.array([XX, YY, ZZ]) # Potential idea: # - generate a downsampled grid and upsample by interpolation later # -> tried that but the interpolation during upsampling is just as # expensive as transforming all points from the get-go # - or process in bocks which allows to skip blocks that are entirely empty # Convert grid into (N * N * K, 3) voxel array ix_array_target = ix_grid.T.reshape(-1, 3) # Convert indices to actual coordinates coo_array_target = (ix_array_target * target_voxel_size) + bbox_xf[:, 0] # Transform these coordinates from target back to source space # This step is the VERY slow one since we are (potentially) xforming # millions of coordinates try: logger.setLevel('ERROR') coo_array_source = (-transform).xform(coo_array_target, affine_fallback=True) except BaseException: raise finally: logger.setLevel(current_level) # Convert coordinates back into voxels ix_array_source = (coo_array_source - bbox[:, 0]) / spacing # New offset target_offset = bbox_xf[:, 0] return ix_array_source, ix_array_target, target_voxel_size, target_offset def _guess_change(xyz_before: np.ndarray, xyz_after: np.ndarray, sample: float = .1) -> tuple: """Guess change in units during xforming.""" if isinstance(xyz_before, pd.DataFrame): xyz_before = xyz_before[['x', 'y', 'z']].values if isinstance(xyz_after, pd.DataFrame): xyz_after = xyz_after[['x', 'y', 'z']].values # Select the same random sample of points in both spaces if sample <= 1: sample = int(xyz_before.shape[0] * sample) # Make sure we don't sample more than we have sample = min(xyz_before.shape[0], sample) rnd_ix = np.random.choice(xyz_before.shape[0], sample, replace=False) sample_bef = xyz_before[rnd_ix, :] sample_aft = xyz_after[rnd_ix, :] # Get pairwise distance between those points dist_pre = pdist(sample_bef) dist_post = pdist(sample_aft) # Calculate how the distance between nodes changed and get the average # Note we are ignoring nans - happens e.g. when points did not transform. with np.errstate(divide='ignore', invalid='ignore'): change = dist_post / dist_pre # Drop infinite values in rare cases where nodes end up on top of another mean_change = np.nanmean(change[change < np.inf]) # Find the order of magnitude magnitude = round(math.log10(mean_change)) return mean_change, magnitude
[docs] def mirror(points: np.ndarray, mirror_axis_size: float, mirror_axis: str = 'x', warp: Optional['BaseTransform'] = None) -> np.ndarray: """Mirror 3D coordinates about given axis. This is a lower level version of `navis.mirror_brain` that: 1. Flips object along midpoint of axis using a affine transformation. 2. (Optional) Applies a warp transform that corrects asymmetries. Parameters ---------- points : (N, 3) numpy array 3D coordinates to mirror. mirror_axis_size : int | float A single number specifying the size of the mirror axis. This is used to find the midpoint to mirror about. mirror_axis : 'x' | 'y' | 'z', optional Axis to mirror. Defaults to `x`. warp : Transform, optional If provided, will apply this warp transform after the affine flipping. Typically this will be a mirror registration to compensate for left/right asymmetries. Returns ------- points_mirrored Mirrored coordinates. See Also -------- :func:`navis.mirror_brain` Higher level function that uses meta data from registered template brains to transform data for you. """ utils.eval_param(mirror_axis, name='mirror_axis', allowed_values=('x', 'y', 'z'), on_error='raise') # At this point we expect numpy arrays points = np.asarray(points) if not points.ndim == 2 or points.shape[1] != 3: raise ValueError('Array must be of shape (N, 3).') # Translate mirror axis to index mirror_ix = {'x': 0, 'y': 1, 'z': 2}[mirror_axis] # Construct homogeneous affine mirroring transform mirrormat = np.eye(4, 4) mirrormat[mirror_ix, 3] = mirror_axis_size mirrormat[mirror_ix, mirror_ix] = -1 # Turn into affine transform flip_transform = AffineTransform(mirrormat) # Flip about mirror axis points_mirrored = flip_transform.xform(points) if isinstance(warp, (BaseTransform, TransformSequence)): points_mirrored = warp.xform(points_mirrored) # Note that we are enforcing the same data type as the input data here. # This is unlike in `xform` or `xform_brain` where data might genuinely # end up in a space that requires higher precision (e.g. going from # nm to microns). return points_mirrored.astype(points.dtype)
def _surface_voxels(bounds, spacing): """Get surface voxels for given bounding box.""" assert bounds.shape == (3, 2) # Create planes xy_plane = np.mgrid[bounds[0][0]:bounds[0][1]:spacing[0], bounds[1][0]:bounds[1][1]:spacing[1]].reshape(2, -1).T xz_plane = np.mgrid[bounds[0][0]:bounds[0][1]:spacing[0], bounds[2][0]:bounds[2][1]:spacing[2]].reshape(2, -1).T yz_plane = np.mgrid[bounds[1][0]:bounds[1][1]:spacing[1], bounds[2][0]:bounds[2][1]:spacing[2]].reshape(2, -1).T top = np.zeros((len(xy_plane), 3)) top[:, :2] = xy_plane top[:, 2] = bounds[2][1] bot = np.zeros((len(xy_plane), 3)) bot[:, :2] = xy_plane bot[:, 2] = bounds[2][0] front = np.zeros((len(xz_plane), 3)) front[:, [0, 2]] = xz_plane front[:, 1] = bounds[1][0] back = np.zeros((len(xz_plane), 3)) back[:, [0, 2]] = xz_plane back[:, 1] = bounds[1][1] left = np.zeros((len(yz_plane), 3)) left[:, [1, 2]] = yz_plane left[:, 0] = bounds[0][0] right = np.zeros((len(yz_plane), 3)) right[:, [1, 2]] = yz_plane right[:, 0] = bounds[0][1] return np.vstack((top, bot, front, back, left, right))