Source code for navis.nbl.ablast_funcs


#    This script is part of navis (http://www.github.com/navis-org/navis).
#    Copyright (C) 2018 Philipp Schlegel
#
#    This program is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.

"""Module contains functions implementing alignNBLAST."""

import os
import operator

import numpy as np
import pandas as pd
import multiprocessing as mp

from functools import partial
from typing import Callable, Dict, Union, Optional
from typing_extensions import Literal
from concurrent.futures import ProcessPoolExecutor, as_completed

from .. import config, utils, core
from ..transforms.align import align_rigid, align_deform, align_pca, _align_rigid_deform

from .base import Blaster, NestedIndices
from .nblast_funcs import (nblast_preflight,find_optimal_partition, set_omp_flag)
from .smat import Lookup2d, _nblast_v1_scoring

logger = config.logger


# This multiplier controls job size for NBLASTs (only relevant for
# multiprocessing and if progress=True).
# Larger multiplier = larger job sizes = fewer jobs = slower updates & less overhead
# Smaller multiplier = smaller job sizes = more jobs = faster updates & more overhead
JOB_SIZE_MULTIPLIER = 1


class NBlasterAlign(Blaster):
    """Implements a version of NBLAST were neurons are first aligned.

    Please note that some properties are computed on initialization and
    changing parameters (e.g. ``use_alpha``) at a later stage will mess things
    up!

    Parameters
    ----------
    use_alpha :     bool
                    Whether or not to use alpha values for the scoring.
                    If True, the dotproduct of nearest neighbor vectors will
                    be scaled by ``sqrt(alpha1 * alpha2)``.
    normalized :    bool
                    If True, will normalize scores by the best possible score
                    (i.e. self-self) of the query neuron.
    smat :          navis.nbl.smat.Lookup2d | pd.DataFrame | str | Callable
                    How to convert the point match pairs into an NBLAST score,
                    usually by a lookup table:
                     - if 'auto' (default), will use the "official" NBLAST scoring
                       matrices based on FCWB data. Same behaviour as in R's
                       nat.nblast implementation.
                     - if ``smat='v1'`` it uses the analytic formulation of the
                       NBLAST scoring from Kohl et. al (2013). You can adjust parameter
                       ``sigma_scaling`` (default to 10) using ``smat_kwargs``.
                     - DataFrames will be used to build a ``Lookup2d``
                     - if ``Callable`` given, it passes distance and dot products as
                       first and second argument respectively
                     - if ``smat=None`` the scores will be generated as the
                       product of the distances and the dotproduct of the vectors
                       of nearest-neighbor pairs
    smat_kwargs:    Dictionary with additional parameters passed to scoring
                    functions. For example: ``smat_kwargs["sigma_scoring"] = 10``.
    limit_dist :    float | "auto" | None
                    Sets the max distance for the nearest neighbor search
                    (`distance_upper_bound`). Typically this should be the
                    highest distance considered by the scoring function. If
                    "auto", will extract that value from the first axis of the
                    scoring matrix.
    progress :      bool
                    If True, will show a progress bar.

    """

    def __init__(self,
                 align_func, two_way_align=True, sample_align=None,
                 use_alpha=False, normalized=True, smat='auto',
                 limit_dist=None, approx_nn=False, dtype=np.float64,
                 progress=True,
                 smat_kwargs=dict(),
                 align_kwargs=dict(),
                 dotprop_kwargs=dict(),
                 ):
        """Initialize class."""
        super().__init__(progress=progress, dtype=dtype)
        self.align_func = align_func
        self.two_way_align = two_way_align
        self.sample_align = sample_align
        self.use_alpha = use_alpha
        self.normalized = normalized
        self.approx_nn = approx_nn
        self.dotprop_kwargs = dotprop_kwargs
        self.align_kwargs = align_kwargs
        self.desc = "NBlasting"
        self.self_hits = {}
        self.dotprops = {}
        self.neurons = []

        if smat is None:
            self.score_fn = operator.mul
        elif smat == 'auto':
            from ..nbl.smat import smat_fcwb
            self.score_fn = smat_fcwb(self.use_alpha)
        elif smat == 'v1':
            self.score_fn = partial(
                _nblast_v1_scoring, sigma_scoring = smat_kwargs.get('sigma_scoring', 10)
            )
        elif isinstance(smat, pd.DataFrame):
            self.score_fn = Lookup2d.from_dataframe(smat)
        else:
            self.score_fn = smat

        if limit_dist == "auto":
            try:
                if self.score_fn.axes[0].boundaries[-1] != np.inf:
                    self.distance_upper_bound = self.score_fn.axes[0].boundaries[-1]
                else:
                    # If the right boundary is open (i.e. infinity), we will use
                    # the second highest boundary plus a 5% offset
                    self.distance_upper_bound = self.score_fn.axes[0].boundaries[-2] * 1.05
            except AttributeError:
                logger.warning("Could not infer distance upper bound from scoring function")
                self.distance_upper_bound = None
        else:
            self.distance_upper_bound = limit_dist

    def append(self, neuron: core.BaseNeuron) -> NestedIndices:
        """Append neurons.

        Returns the numerical index appended neurons.
        If neurons is a (possibly nested) sequence of neurons,
        return a (possibly nested) list of indices.

        Note that `self_hit` is ignored (and hence calculated from scratch)
        when `neurons` is a nested list of dotprops.
        """
        if isinstance(neuron, core.BaseNeuron):
            return self._append_neuron(neuron)

        try:
            return [self.append(n) for n in neuron]
        except TypeError:  # i.e. not iterable
            raise ValueError(f"Expected Neuron or iterable thereof; got {type(neuron)}")

    def _append_neuron(self, neuron: core.BaseNeuron) -> int:
        next_id = len(self)
        self.neurons.append(neuron)
        self.ids.append(neuron.id)
        return next_id

    def get_dotprop(self, ix):
        if ix not in self.dotprops:
            if not isinstance(self.neurons[ix], core.Dotprops):
                self.dotprops[ix] = core.make_dotprops(self.neurons[ix],
                                                    **self.dotprop_kwargs)
            else:
                self.dotprops[ix] = self.neurons[ix]
        return self.dotprops[ix]

    def get_self_hit(self, ix):
        if ix not in self.self_hits:
            self.self_hits[ix] = self.calc_self_hit(self.get_dotprop(ix))
        return self.self_hits[ix]

    def calc_self_hit(self, dotprops):
        """Non-normalized value for self hit."""
        if not self.use_alpha:
            return len(dotprops.points) * self.score_fn(0, 1.0)
        else:
            dists = np.repeat(0, len(dotprops.points))
            alpha = dotprops.alpha * dotprops.alpha
            dots = np.repeat(1, len(dotprops.points)) * np.sqrt(alpha)
            return self.score_fn(dists, dots).sum()

    def score_single(self, q_dp, t_dp, q_idx):
        """Calculate score for single query/target dotprop pair."""
        # Run nearest-neighbor search for query against target
        data = q_dp.dist_dots(t_dp,
                              alpha=self.use_alpha,
                              # eps=0.1 means we accept 10% inaccuracy
                              eps=.1 if self.approx_nn else 0,
                              distance_upper_bound=self.distance_upper_bound)

        if self.use_alpha:
            dists, dots, alpha = data
            dots *= np.sqrt(alpha)
        else:
            dists, dots = data

        scr = self.score_fn(dists, dots).sum()

        # Normalize against best hit
        if self.normalized:
            scr /= self.get_self_hit(q_idx)

        return scr

    def single_query_target(self, q_idx: int, t_idx: int, scores='forward',
                            allow_rev_align=True):
        """Query single query against single target."""
        # Take a short-cut if this is a self-self comparison
        if q_idx == t_idx:
            if self.normalized:
                return 1
            return self.get_self_hit(q_idx)

        # Align the query to the target
        q_xf = self.align_func(self.neurons[q_idx],
                               target=self.neurons[t_idx],
                               sample=self.sample_align,
                               progress=False,
                               **self.align_kwargs)[0][0]

        # The query must always be made into new dotprops because it has been
        # moved around
        q_dp = core.make_dotprops(q_xf, **self.dotprop_kwargs)

        # The target dotprop has to be compute only once
        t_dp = self.get_dotprop(t_idx)

        scr = self.score_single(q_dp, t_dp, q_idx)

        # For the mean score we also have to produce the reverse score
        if scores in ('mean', 'min', 'max', 'both'):
            reverse = self.score_single(t_dp, q_dp, t_idx)
            if scores == 'mean':
                scr = (scr + reverse) / 2
            elif scores == 'min':
                scr = min(scr, reverse)
            elif scores == 'max':
                scr = max(scr, reverse)
            elif scores == 'both':
                # If both scores are requested
                scr = [scr, reverse]

        if self.two_way_align and allow_rev_align:
            rev = self.single_query_target(t_idx, q_idx, scores=scores,
                                           allow_rev_align=False)
            scr = (scr + rev) / 2

        return scr


[docs] def nblast_align(query: Union[core.BaseNeuron, core.NeuronList], target: Optional[str] = None, align_method: Union[Literal['rigid'], Literal['deform'], Literal['pca']] = 'rigid', two_way_align: bool = True, sample_align: Optional[float] = None, scores: Union[Literal['forward'], Literal['mean'], Literal['min'], Literal['max']] = 'mean', normalized: bool = True, use_alpha: bool = False, smat: Optional[Union[str, pd.DataFrame, Callable]] = 'auto', limit_dist: Optional[Union[Literal['auto'], int, float]] = None, approx_nn: bool = False, precision: Union[int, str, np.dtype] = 64, n_cores: int = os.cpu_count() // 2, progress: bool = True, dotprop_kwargs: Optional[Dict] = dict(), align_kwargs: Optional[Dict] = dict(), smat_kwargs: Optional[Dict] = dict()) -> pd.DataFrame: """Run NBLAST on pairwise-aligned neurons. Requires the `pycpd` library at least version 2.0.1 which at the time of writing is only available from Github (not PyPI): https://github.com/siavashk/pycpd Parameters ---------- query : Neuron | NeuronList Query neuron(s) to NBLAST against the targets. Neurons should be in microns as NBLAST is optimized for that and have similar sampling resolutions. target : Neuron | NeuronList, optional Target neuron(s) to NBLAST against. Neurons should be in microns as NBLAST is optimized for that and have similar sampling resolutions. If not provided, will NBLAST queries against themselves. align_method : "rigid" | "deform" | "pca" | "rigid+deform" Which method to use for alignment. Maps to the respective ``navis.align_{method}`` function. two_way_align : bool If True, will run the alignment + NBLAST in both, query->target as well as query->target direction. This is highly recommended because it reduces the chance that a single bad alignment will mess up your scores. sample_align : float [0-1], optional If provided, will calculate an initial alignment on just a fraction of the points followed by a landmark transform to transform the rest. Use this to speed things up. scores : 'forward' | 'mean' | 'min' | 'max' | 'both' Determines the final scores: - 'forward' (default) returns query->target scores - 'mean' returns the mean of query->target and target->query scores - 'min' returns the minium between query->target and target->query scores - 'max' returns the maximum between query->target and target->query scores - 'both' will return foward and reverse scores as multi-index DataFrame use_alpha : bool, optional Emphasizes neurons' straight parts (backbone) over parts that have lots of branches for the NBLAST. normalized : bool, optional Whether to return normalized NBLAST scores. smat : str | pd.DataFrame | Callable Score matrix. If 'auto' (default), will use scoring matrices from FCWB. Same behaviour as in R's nat.nblast implementation. If ``smat='v1'`` it uses the analytic formulation of the NBLAST scoring from Kohl et. al (2013). You can adjust parameter ``sigma_scaling`` (default to 10) using ``smat_kwargs``. If ``Callable`` given, it passes distance and dot products as first and second argument respectively. If ``smat=None`` the scores will be generated as the product of the distances and the dotproduct of the vectors of nearest-neighbor pairs. limit_dist : float | "auto" | None Sets the max distance for the nearest neighbor search (`distance_upper_bound`). Typically this should be the highest distance considered by the scoring function. If "auto", will extract that value from the scoring matrix. While this can give a ~2X speed up, it will introduce slight inaccuracies because we won't have a vector component for points without a nearest neighbour within the distance limits. The impact depends on the scoring function but with the default FCWB ``smat``, this is typically limited to the third decimal (0.0086 +/- 0.0027 for an all-by-all of 1k neurons). approx_nn : bool If True, will use approximate nearest neighbors. This gives a >2X speed up but also produces only approximate scores. Impact depends on the use case - testing highly recommended! n_cores : int, optional Max number of cores to use for nblasting. Default is ``os.cpu_count() // 2``. This should ideally be an even number as that allows optimally splitting queries onto individual processes. Also note that due to multiple layers of concurrency using all available cores might not be the fastest option. precision : int [16, 32, 64] | str [e.g. "float64"] | np.dtype Precision for scores. Defaults to 64 bit (double) floats. This is useful to reduce the memory footprint for very large matrices. In real-world scenarios 32 bit (single)- and depending on the purpose even 16 bit (half) - are typically sufficient. progress : bool Whether to show progress bars. This may cause some overhead, so switch off if you don't really need it. smat_kwargs : dict, optional Dictionary with additional parameters passed to scoring functions. align_kwargs : dict, optional Dictionary with additional parameters passed to alignment function. dotprop_kwargs : dict, optional Dictionary with additional parameters passed to ``navis.make_dotprops``. Only relevant if inputs aren't already dotprops. Returns ------- scores : pandas.DataFrame Matrix with NBLAST scores. Rows are query neurons, columns are targets. The order is the same as in ``query``/``target`` and the labels are based on the neurons' ``.id`` property. Important to note that even when ``q == t`` and with ``scores=mean`` the matrix will not be symmetrical because we run separate alignments for the forward and the reverse comparisons. References ---------- Costa M, Manton JD, Ostrovsky AD, Prohaska S, Jefferis GS. NBLAST: Rapid, Sensitive Comparison of Neuronal Structure and Construction of Neuron Family Databases. Neuron. 2016 Jul 20;91(2):293-311. doi: 10.1016/j.neuron.2016.06.012. Examples -------- >>> import navis >>> nl = navis.example_neurons(n=5) >>> nl.units <Quantity([8 8 8 8 8], 'nanometer')> >>> # Convert to microns >>> nl_um = nl * (8 / 1000) >>> # Run the align nblast >>> scores = navis.nblast_align(nl_um[:3], nl_um[3:], ... dotprop_kwargs=dict(k=5), ... sample_align=.2) See Also -------- :func:`navis.nblast` The vanilla version of NBLAST. :func:`navis.nblast_allbyall` A more efficient way than ``nblast(query=x, target=x)``. :func:`navis.nblast_smart` A smart(er) NBLAST suited for very large NBLAST. :func:`navis.synblast` A synapse-based variant of NBLAST. """ if isinstance(target, type(None)): target = query # Make sure we're working on NeuronLists query = core.NeuronList(query) target = core.NeuronList(target) if not callable(align_method): align_func = {'rigid': align_rigid, 'deform': align_deform, 'pca': align_pca, 'rigid+deform': _align_rigid_deform}[align_method] else: align_func = align_method # Run NBLAST preflight checks nblast_preflight(query, target, n_cores, req_dotprops=False, req_unique_ids=True, req_microns=isinstance(smat, str) and smat=='auto') # Find a partition that produces batches that each run in approximately # 10 seconds if n_cores and n_cores > 1: n_rows, n_cols = find_optimal_partition(n_cores, query, target) if progress: # If progress bar, we need to make smaller mini batches. # These mini jobs must not be too small - otherwise the overhead # from spawning and sending results between processes slows things # down dramatically. Here we hardcode such that we get updates # at most every 1% n_rows = max(n_rows, len(query) // 10) n_cols = max(n_cols, len(target) // 10) else: n_rows = n_cols = 1 # This makes sure we don't run into multiple layers of concurrency # Note that it doesn't do anything for the parent process (which is great # if we end up not actually using multiple cores) with set_omp_flag(limits=1): # Initialize a pool of workers # Note that we're forcing "spawn" instead of "fork" (default on linux)! # This is to reduce the memory footprint since "fork" appears to inherit all # variables (including all neurons) while "spawn" appears to get only # what's required to run the job? with ProcessPoolExecutor(max_workers=n_cores, mp_context=mp.get_context('spawn')) as pool: with config.tqdm(desc='Preparing', total=n_rows * n_cols, leave=False, disable=not progress) as pbar: futures = {} nblasters = [] for qix in np.array_split(np.arange(len(query)), n_rows): for tix in np.array_split(np.arange(len(target)), n_cols): # Initialize NBlaster this = NBlasterAlign(align_func=align_func, two_way_align=two_way_align, sample_align=sample_align, use_alpha=use_alpha, normalized=normalized, smat=smat, limit_dist=limit_dist, dtype=precision, approx_nn=approx_nn, progress=progress, align_kwargs=align_kwargs, dotprop_kwargs=dotprop_kwargs, smat_kwargs=smat_kwargs) # Add queries and targets for i, ix in enumerate(qix): this.append(query[ix]) for i, ix in enumerate(tix): this.append(target[ix]) # Keep track of indices of queries and targets this.queries = np.arange(len(qix)) this.targets = np.arange(len(tix)) + len(qix) this.queries_ix = qix # this facilitates filling in the big matrix later this.targets_ix = tix # this facilitates filling in the big matrix later this.pbar_position = len(nblasters) if not utils.is_jupyter() else None nblasters.append(this) pbar.update() # If multiple cores requested, submit job to the pool right away if n_cores and n_cores > 1 and (n_cols > 1 or n_rows > 1): this.progress=False # no progress bar for individual NBLASTERs futures[pool.submit(this.multi_query_target, q_idx=this.queries, t_idx=this.targets, scores=scores)] = this # Collect results if futures and len(futures) > 1: # Prepare empty score matrix scores = pd.DataFrame(np.empty((len(query), len(target)), dtype=this.dtype), index=query.id, columns=target.id) scores.index.name = 'query' scores.columns.name = 'target' # Collect results # We're dropping the "N / N_total" bit from the progress bar because # it's not helpful here fmt = ('{desc}: {percentage:3.0f}%|{bar}| [{elapsed}<{remaining}]') for f in config.tqdm(as_completed(futures), desc='NBLASTing', bar_format=fmt, total=len(futures), smoothing=0, disable=not progress, leave=False): res = f.result() this = futures[f] # Fill-in big score matrix scores.iloc[this.queries_ix, this.targets_ix] = res.values else: scores = this.multi_query_target(this.queries, this.targets, scores=scores) return scores