Source code for navis.nbl.synblast_funcs


#    This script is part of navis (http://www.github.com/navis-org/navis).
#    Copyright (C) 2018 Philipp Schlegel
#
#    This program is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.

"""Module contains functions implementing SyNBLAST."""

import os
import operator
import time

import numpy as np
import pandas as pd
import multiprocessing as mp

from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import Union, Optional
from typing_extensions import Literal

from .. import config, utils
from ..core import NeuronList, BaseNeuron

from .base import Blaster, NestedIndices
from .smat import Lookup2d

from .nblast_funcs import (check_microns, find_optimal_partition,
                           nblast_preflight, smat_fcwb)

try:
    from pykdtree.kdtree import KDTree
except ImportError:
    from scipy.spatial import cKDTree as KDTree

__all__ = ['synblast']

fp = os.path.dirname(__file__)
smat_path = os.path.join(fp, 'score_mats')

logger = config.get_logger(__name__)


# This multiplier controls job size for syNBLASTs (only relevant for
# multiprocessing and if progress=True).
# Larger multiplier = larger job sizes = fewer jobs = slower updates & less overhead
# Smaller multiplier = smaller job sizes = more jobs = faster updates & more overhead
JOB_SIZE_MULTIPLIER = 1


class SynBlaster(Blaster):
    """Implements a synapsed-based NBLAST algorithm.

    Please note that some properties are computed on initialization and
    changing parameters at a later stage will mess things up!

    TODOs
    -----
    - implement `use_alpha` as average synapse density (i.e. emphasize areas
      where a neuron has lots of synapses)

    Parameters
    ----------
    normalized :    bool
                    If True, will normalize scores by the best possible score
                    (i.e. self-self) of the query neuron.
    by_type :       bool
                    If True will only compare synapses with the same value in
                    the "type" column.
    smat :          navis.nbl.smat.Lookup2d | pd.DataFrame | str
                    How to convert the point match pairs into an NBLAST score,
                    usually by a lookup table.
                    If 'auto' (default), will use scoring matrices
                    from FCWB. Same behaviour as in R's nat.nblast
                    implementation.
                    Dataframes will be used to build a ``Lookup2d``.
                    If ``smat=None`` the scores will be
                    generated as the product of the distances and the dotproduct
                    of the vectors of nearest-neighbor pairs.
    progress :      bool
                    If True, will show a progress bar.

    """

    def __init__(self, normalized=True, by_type=True,
                 smat='auto', progress=True):
        """Initialize class."""
        super().__init__(progress=progress)
        self.normalized = normalized
        self.by_type = by_type

        if smat is None:
            self.score_fn = operator.mul
        elif smat == 'auto':
            self.score_fn = smat_fcwb()
        elif isinstance(smat, pd.DataFrame):
            self.score_fn = Lookup2d.from_dataframe(smat)
        else:
            self.score_fn = smat

        self.ids = []

    def append(self, neuron, id=None, self_hit=None) -> NestedIndices:
        """Append neurons/connector tables, returning numerical indices of added objects.

        Note that `self_hit` is ignored and hence calculated from scratch
        when `neuron` is a (nested) list of neurons.
        """
        if isinstance(neuron, pd.DataFrame):
            return self._append_connectors(neuron, id, self_hit=self_hit)

        if isinstance(neuron, BaseNeuron):
            if not neuron.has_connectors:
                raise ValueError('Neuron must have synapses')
            return self._append_connectors(neuron.connectors, neuron.id, self_hit=self_hit)

        try:
            return [self.append(n) for n in neuron]
        except TypeError:
            raise ValueError(
                "Expected a dataframe, or a Neuron or sequence thereof; got "
                f"{type(neuron)}"
            )

    def _append_connectors(self, connectors: pd.DataFrame, id, self_hit=None) -> int:
        if id is None:
            raise ValueError("Explicit non-None id required for appending connectors")

        next_idx = len(self)
        self.ids.append(id)
        self.neurons.append({})
        if not self.by_type:
            data = connectors[['x', 'y', 'z']].values
            # Generate the KDTree
            self.neurons[-1]['all'] = KDTree(data)
        else:
            if 'type' not in connectors.columns:
                raise ValueError('Connector tables must have a "type" column '
                                 'if `by_type=True`')
            for ty in connectors['type'].unique():
                data = connectors.loc[connectors['type'] == ty, ['x', 'y', 'z']].values
                # Generate the KDTree
                self.neurons[-1][ty] = KDTree(data)

        # Calculate score for self hit if required
        if not self_hit:
            self_hit = self.calc_self_hit(connectors)
        self.self_hits.append(self_hit)

        return next_idx

    def calc_self_hit(self, cn):
        """Non-normalized value for self hit."""
        return cn.shape[0] * self.score_fn(0, 1)

    def single_query_target(self, q_idx: int, t_idx: int, scores='forward'):
        """Query single target against single target."""
        # Take a short-cut if this is a self-self comparison
        if q_idx == t_idx:
            if self.normalized:
                return 1
            return self.self_hits[q_idx]

        # Run nearest-neighbor search for query against target
        t_trees = self.neurons[t_idx]
        q_trees = self.neurons[q_idx]

        dists = []
        # Go over all connector types (e.g. "pre" and "post") present in the
        # query neuron. If `by_type=False`, there will be a single tree simply
        # called "all"
        for ty, qt in q_trees.items():
            # If target does not have this type of connectors
            if ty not in t_trees:
                # Note that this infinite distance will simply get the worst
                # score possible in the scoring function
                dists = np.append(dists, [self.score_fn.max_dist] * qt.data.shape[0])
            else:
                tt = t_trees[ty]
                data = qt.data
                # pykdtree tracks data as flat array
                if data.ndim == 1:
                    data = data.reshape((qt.n, qt.ndim))
                dists = np.append(dists, tt.query(data)[0])

        # We use the same scoring function as for normal NBLAST but ignore the
        # vector dotproduct component
        scr = self.score_fn(dists, 1).sum()

        # Normalize against best possible hit (self hit)
        if self.normalized:
            scr /= self.self_hits[q_idx]

        # For the mean score we also have to produce the reverse score
        if scores in ('mean', 'min', 'max'):
            reverse = self.single_query_target(t_idx, q_idx, scores='forward')
            if scores == 'mean':
                scr = (scr + reverse) / 2
            elif scores == 'min':
                scr = min(scr, reverse)
            elif scores == 'max':
                scr = max(scr, reverse)

        return scr


[docs] def synblast(query: Union['BaseNeuron', 'NeuronList'], target: Union['BaseNeuron', 'NeuronList'], by_type: bool = False, cn_types: Optional[list] = None, scores: Union[Literal['forward'], Literal['mean'], Literal['min'], Literal['max']] = 'forward', normalized: bool = True, smat: Optional[Union[str, pd.DataFrame]] = 'auto', n_cores: int = os.cpu_count() // 2, progress: bool = True) -> pd.DataFrame: """Synapsed-based variant of NBLAST. The gist is this: for each synapse in the query neuron, we find the closest synapse in the target neuron (can be restricted by synapse types). Those distances are then scored similar to nearest-neighbor pairs in NBLAST but without the vector component. Parameters ---------- query,target : Neuron/List Query neuron(s) to SynBLAST against the targets. Units should be in microns as NBLAST is optimized for that and have similar sampling resolutions. Neurons must have (non-empty) connector tables. by_type : bool If True, will use the "type" column in the connector tables to only compare e.g. pre- with pre- and post- with postsynapses. cn_types : str | list, optional Use this to restrict synblast to specific types of connectors (e.g. "pre"synapses only). scores : 'forward' | 'mean' | 'min' | 'max' Determines the final scores: - 'forward' (default) returns query->target scores - 'mean' returns the mean of query->target and target->query scores - 'min' returns the minium between query->target and target->query scores - 'max' returns the maximum between query->target and target->query scores n_cores : int, optional Max number of cores to use for nblasting. Default is ``os.cpu_count() // 2``. This should ideally be an even number as that allows optimally splitting queries onto individual processes. normalized : bool, optional Whether to return normalized SynBLAST scores. smat : str | pd.DataFrame, optional Score matrix. If 'auto' (default), will use scoring matrices from FCWB. Same behaviour as in R's nat.nblast implementation. If ``smat=None`` the scores will be generated as the product of the distances and the dotproduct of the vectors of nearest-neighbor pairs. progress : bool Whether to show progress bars. This may cause some overhead, so switch off if you don't really need it. Returns ------- scores : pandas.DataFrame Matrix with SynBLAST scores. Rows are query neurons, columns are targets. Examples -------- >>> import navis >>> nl = navis.example_neurons(n=5) >>> nl.units <Quantity([8 8 8 8 8], 'nanometer')> >>> # Convert to microns >>> nl_um = nl * (8 / 1000) >>> # Run type-agnostic SyNBLAST >>> scores = navis.synblast(nl_um[:3], nl_um[3:], progress=False) >>> # Run type-sensitive (i.e. pre vs pre and post vs post) SyNBLAST >>> scores = navis.synblast(nl_um[:3], nl_um[3:], by_type=True, progress=False) See Also -------- :func:`navis.nblast` The original morphology-based NBLAST. """ # Make sure we're working on NeuronList query = NeuronList(query) target = NeuronList(target) # Run pre-flight checks nblast_preflight(query, target, n_cores, req_unique_ids=True, req_dotprops=False, req_microns=isinstance(smat, str) and smat=='auto') # Make sure all neurons have connectors if not all(query.has_connectors): raise ValueError('Some query neurons appear to not have a connector table.') if not all(target.has_connectors): raise ValueError('Some target neurons appear to not have a connector table.') if not isinstance(cn_types, type(None)): cn_types = utils.make_iterable(cn_types) if not isinstance(cn_types, type(None)) or by_type: if any(['type' not in n.connectors.columns for n in query]): raise ValueError('Connector tables must have a "type" column if ' '`by_type=True` or `cn_types` is not `None`.') # Find a partition that produces batches that each run in approximately # 10 seconds if n_cores and n_cores > 1: if progress: # If progress bar, we need to make smaller mini batches. # These mini jobs must not be too small - otherwise the overhead # from spawning and sending results between processes slows things # down dramatically. Hence we want to make sure that each job runs # for >10s. The run time depends on the system and how big the neurons # are. Here, we run a quick test and try to extrapolate from there n_rows, n_cols = find_batch_partition(query, target, T=10 * JOB_SIZE_MULTIPLIER) else: # If no progress bar needed, we can just split neurons evenly across # all available cores n_rows, n_cols = find_optimal_partition(n_cores, query, target) else: n_rows = n_cols = 1 # Calculate self-hits once for all neurons nb = SynBlaster(normalized=normalized, by_type=by_type, smat=smat, progress=progress) def get_connectors(n): """Gets the required connectors from a neuron.""" if not isinstance(cn_types, type(None)): return n.connectors[n.connectors['type'].isin(cn_types)] else: return n.connectors query_self_hits = np.array([nb.calc_self_hit(get_connectors(n)) for n in query]) target_self_hits = np.array([nb.calc_self_hit(get_connectors(n)) for n in target]) # Initialize a pool of workers # Note that we're forcing "spawn" instead of "fork" (default on linux)! # This is to reduce the memory footprint since "fork" appears to inherit all # variables (including all neurons) while "spawn" appears to get only # what's required to run the job? with ProcessPoolExecutor(max_workers=n_cores, mp_context=mp.get_context('spawn')) as pool: with config.tqdm(desc='Preparing', total=n_rows * n_cols, leave=False, disable=not progress) as pbar: futures = {} blasters = [] for qix in np.array_split(np.arange(len(query)), n_rows): for tix in np.array_split(np.arange(len(target)), n_cols): # Initialize NBlaster this = SynBlaster(normalized=normalized, by_type=by_type, smat=smat, progress=progress) # Add queries and targets for i, ix in enumerate(qix): n = query[ix] this.append(get_connectors(n), id=n.id, self_hit=query_self_hits[ix]) for i, ix in enumerate(tix): n = target[ix] this.append(get_connectors(n), id=n.id, self_hit=target_self_hits[ix]) # Keep track of indices of queries and targets this.queries = np.arange(len(qix)) this.targets = np.arange(len(tix)) + len(qix) this.queries_ix = qix # this facilitates filling in the big matrix later this.targets_ix = tix # this facilitates filling in the big matrix later this.pbar_position = len(blasters) if not utils.is_jupyter() else None blasters.append(this) pbar.update() # If multiple cores requested, submit job to the pool right away if n_cores and n_cores > 1 and (n_cols > 1 or n_rows > 1): this.progress=False # no progress bar for individual NBLASTERs futures[pool.submit(this.multi_query_target, q_idx=this.queries, t_idx=this.targets, scores=scores)] = this # Collect results if futures and len(futures) > 1: # Prepare empty score matrix scores = pd.DataFrame(np.empty((len(query), len(target)), dtype=this.dtype), index=query.id, columns=target.id) scores.index.name = 'query' scores.columns.name = 'target' # Collect results # We're dropping the "N / N_total" bit from the progress bar because # it's not helpful here fmt = ('{desc}: {percentage:3.0f}%|{bar}| [{elapsed}<{remaining}]') for f in config.tqdm(as_completed(futures), desc='NBLASTing', bar_format=fmt, total=len(futures), smoothing=0, disable=not progress, leave=False): res = f.result() this = futures[f] # Fill-in big score matrix scores.iloc[this.queries_ix, this.targets_ix] = res.values else: scores = this.multi_query_target(this.queries, this.targets, scores=scores) return scores # Find an optimal partition that minimizes the number of neurons # we have to send to each process n_rows, n_cols = find_optimal_partition(n_cores, query, target) blasters = [] for q in np.array_split(query, n_rows): for t in np.array_split(target, n_cols): # Initialize SynNBlaster this = SynBlaster(normalized=normalized, by_type=by_type, smat=smat, progress=progress) # Add queries and targets for nl in [q, t]: for n in nl: if not isinstance(cn_types, type(None)): cn = n.connectors[n.connectors['type'].isin(cn_types)] else: cn = n.connectors this.append(cn, id=n.id) # Keep track of indices of queries and targets this.queries = np.arange(len(q)) this.targets = np.arange(len(t)) + len(q) this.pbar_position = len(blasters) if not utils.is_jupyter() else None blasters.append(this) # If only one core, we don't need to break out the multiprocessing if n_cores == 1: return this.multi_query_target(this.queries, this.targets, scores=scores) with ProcessPoolExecutor(max_workers=len(blasters)) as pool: # Each nblaster is passed to its own process futures = [pool.submit(this.multi_query_target, q_idx=this.queries, t_idx=this.targets, scores=scores) for this in blasters] results = [f.result() for f in futures] scores = pd.DataFrame(np.zeros((len(query), len(target))), index=query.id, columns=target.id) for res in results: scores.loc[res.index, res.columns] = res.values return scores
def find_batch_partition(q, t, T=10): """Find partitions such that each batch takes about `T` seconds.""" # Get a median-sized query and target q_ix = np.argsort(q.n_connectors)[len(q)//2] t_ix = np.argsort(t.n_connectors)[len(t)//2] # Generate the KDTree tree = KDTree(q[q_ix].connectors[['x', 'y', 'z']].values) # Run a quick single query benchmark timings = [] for i in range(10): # Run 10 tests s = time.time() _ = tree.query(t[t_ix].connectors[['x', 'y', 'z']].values) # ignoring scoring / normalizing timings.append(time.time() - s) time_per_query = min(timings) # seconds per medium sized query # Number of queries per job such that each job runs in `T` second queries_per_batch = T / time_per_query # Number of neurons per batch neurons_per_batch = max(1, int(np.sqrt(queries_per_batch))) n_rows = len(q) // neurons_per_batch n_cols = len(t) // neurons_per_batch return max(1, n_rows), max(1, n_cols)