# This script is part of navis (http://www.github.com/navis-org/navis).
# Copyright (C) 2018 Philipp Schlegel
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
"""Module containing utility functions for BLASTING."""
import numpy as np
import pandas as pd
import scipy.cluster.hierarchy as sch
from scipy.spatial.distance import squareform, pdist
from .. import config
logger = config.logger
def _extract_matches_n(scores, N=None, distances=False):
"""Return top N matches."""
if not distances:
if N > 1:
# This partitions of the largest N values (faster than argsort)
# Their correct order, however, is not guaranteed
top_n = np.argpartition(scores.values, -N, axis=-1)[:, -N:]
else:
# For N=1 this is still faster
top_n = np.argmax(scores.values, axis=-1).reshape(-1, 1)
else:
if N > 1:
top_n = np.argpartition(scores.values, N, axis=-1)[:, :N]
else:
top_n = np.argmin(scores.values, axis=-1).reshape(-1, 1)
# This make sure we order them properly
top_scores = scores.values[np.arange(len(scores)).reshape(-1, 1), top_n]
ind_ordered = np.argsort(top_scores, axis=1)
if distances:
ind_ordered = ind_ordered[:, ::-1]
top_n = top_n[np.arange(len(top_n)).reshape(-1, 1), ind_ordered]
top_scores = top_scores[np.arange(len(top_scores)).reshape(-1, 1), ind_ordered]
# Now collate matches
matches = pd.DataFrame()
matches['id'] = scores.index.values
for i in range(N):
matches[f'match_{i + 1}'] = scores.columns[top_n[:, -(i + 1)]]
matches[f'score_{i + 1}'] = top_scores[:, -(i + 1)]
return matches
def _extract_matches_threshold(scores, threshold=.3, distances=False):
"""Extract all matches above a given threshold from score matrix."""
if not distances:
ind, cols = np.where(scores.values >= threshold)
else:
ind, cols = np.where(scores.values <= threshold)
matches = pd.DataFrame()
matches['query'] = scores.index[ind]
matches['match'] = scores.columns[cols]
matches['score'] = scores.values[ind, cols]
matches = matches.sort_values(['query', 'match', 'score']).set_index(['query', 'match'])
return matches
def _extract_matches_perc(scores, perc=.05, distances=False):
"""Extract all matches within a given percentage of the top match."""
if not distances:
thresh = np.max(scores.values, axis=1)
thresh = thresh - np.abs(thresh * perc)
ind, cols = np.where(scores.values >= thresh.reshape(-1, 1))
else:
thresh = np.min(scores.values, axis=1)
thresh = thresh + np.abs(thresh * perc)
ind, cols = np.where(scores.values <= thresh.reshape(-1, 1))
matches = pd.DataFrame()
matches.index = scores.index
match_str = []
scores_str = []
for i in range(len(matches)):
this = cols[ind == i]
sc = scores.values[i, this]
srt = np.argsort(sc)[::-1]
m = scores.columns[this][srt]
sc = sc[srt]
match_str.append(','.join(m.astype(str)))
scores_str.append(','.join(sc.round(3).astype(str)))
matches['matches'] = match_str
matches['scores'] = scores_str
return matches
[docs]
def update_scores(queries, targets, scores_ex, nblast_func, **kwargs):
"""Update score matrix by running only new query->target pairs.
Parameters
----------
queries : Dotprops
targets : Dotprops
scores_ex : pandas.DataFrame
DataFrame with existing scores.
nblast_func : callable
The NBLAST to use. For example: ``navis.nblast``.
**kwargs
Argument passed to ``nblast_func``.
Returns
-------
pandas.DataFrame
Updated scores.
Examples
--------
Mostly for testing but also illustrates the principle:
>>> import navis
>>> import numpy as np
>>> nl = navis.example_neurons(n=5)
>>> dp = navis.make_dotprops(nl, k=5) / 125
>>> # Full NBLAST
>>> scores = navis.nblast(dp, dp, n_cores=1)
>>> # Subset and fill in
>>> scores2 = navis.nbl.update_scores(dp, dp,
... scores_ex=scores.iloc[:3, 2:],
... nblast_func=navis.nblast,
... n_cores=1)
>>> np.all(scores == scores2)
True
"""
if not callable(nblast_func):
raise TypeError('`nblast_func` must be callable.')
# The np.isin query is much faster if we force any strings to <U18 by
# converting to arrays
is_new_q = ~np.isin(queries.id, np.array(scores_ex.index))
is_new_t = ~np.isin(targets.id, np.array(scores_ex.columns))
logger.info(f'Found {is_new_q.sum()} new queries and '
f'{is_new_t.sum()} new targets.')
# Reindex old scores
scores = scores_ex.reindex(index=queries.id, columns=targets.id).copy()
# NBLAST new queries against all targets
if 'precision' not in kwargs:
kwargs['precision'] = scores.values.dtype
if any(is_new_q):
logger.info(f'Updating new queries -> targets scores')
qt = nblast_func(queries[is_new_q], targets, **kwargs)
scores.loc[qt.index, qt.columns] = qt.values
# NBLAST all old queries against new targets
if any(is_new_t):
logger.info(f'Updating old queries -> new targets scores')
tq = nblast_func(queries[~is_new_q], targets[is_new_t], **kwargs)
scores.loc[tq.index, tq.columns] = tq.values
return scores
[docs]
def compress_scores(scores, threshold=None, digits=None):
"""Compress scores.
This will not necessarily reduce the in-memory footprint but will lead to
much smaller file sizes when saved to disk.
Parameters
----------
scores : pandas.DataFrame
threshold : float, optional
Scores lower than this will be capped at `threshold`.
digits : int, optional
Round scores to the Nth digit.
Returns
-------
scores_comp : pandas.DataFrame
Copy of the original dataframe with the data cast to 32bit
floats and the optional filters (see `threshold` and
`digits`) applied.
"""
scores = scores.astype(np.float32)
if digits is not None:
scores = scores.round(digits)
if threshold is not None:
scores.clip(lower=threshold, inplace=True)
return scores
def make_linkage(x, method='single', optimal_ordering=False):
"""Make linkage from input. If input looks like linkage it is passed through."""
if isinstance(x, pd.DataFrame):
# Make sure it is symmetric
if x.shape[0] != x.shape[1]:
raise ValueError(f'Scores must be symmetric, got shape {x.shape}')
# A cheap check for whether these are mean scores
if any(x.values[0].round(5) != x.values[:, 0].round(5)):
logger.warning(f'Symmetrizing scores because they do not look like mean scores!')
x = (x + x.values.T) / 2
dists = squareform(1 - x.values, checks=False)
Z = sch.linkage(dists, method=method, optimal_ordering=optimal_ordering)
elif isinstance(x, np.ndarray):
Z = x
else:
raise TypeError(f'Expected scores) (DataFrame) or linkage (array), got {type(x)}')
return Z
def dendrogram(x, method='ward', **kwargs):
"""Plot dendrogram.
This is just a convenient thin wrapper around scipy's dendrogram function
that lets you feed NBLAST scores directly. Note that this causes some
overhead for very large NBLASTs.
Parameters
----------
x : DataFrame | array
Pandas DataFrame is assumed to be NBLAST scores. Array is
assumed to be a linkage.
method : str
Method for ``linkage``. Ignored if ``x`` is already a linkage.
**kwargs
Keyword argument passed to scipy's ``dendrogram``.
Returns
-------
dendrogram
"""
# Some sensible defaults that help with large dendrograms
DEFAULTS = dict(no_labels=True,
labels=x.index.values.astype(str) if isinstance(x, pd.DataFrame) else None)
DEFAULTS.update(kwargs)
# Make linkage
Z = make_linkage(x, method=method)
return sch.dendrogram(Z, **DEFAULTS)
[docs]
def make_clusters(x, t, criterion='n_clusters', method='ward', **kwargs):
"""Form flat clusters.
This is a thin wrapper around ``scipy.cluster.hierarchy.cut_tree`` and
``scipy.cluster.hierarchy.fcluster`` functions.
Parameters
----------
x : DataFrame | array
Pandas DataFrame is assumed to be NBLAST scores. Array is
assumed to be a linkage.
t : scalar
See ``method``.
criterion : str
Method to use for creating clusters:
- `n_clusters` uses ``cut_tree`` to create ``t`` clusters
- `height` uses ``cut_tree`` to cut the dendrogram at
height ``t``
- `inconsistent`, `distance`, `maxclust`, etc are passed
through to ``fcluster``
method : str
Method for ``linkage``. Ignored if ``x`` is already a linkage.
**kwargs
Additional keyword arguments are passed through to the
cluster functions ``cut_tree`` and ``fcluster``.
Returns
-------
clusters : np.ndarray
"""
# Make linkage
Z = make_linkage(x, method=method)
if criterion == 'n_clusters':
cl = sch.cut_tree(Z, n_clusters=t, **kwargs).flatten()
elif criterion == 'height':
cl = sch.cut_tree(Z, height=t, **kwargs).flatten()
else:
cl = sch.fcluster(Z, t=t, criterion=criterion, **kwargs)
return cl
def nblast_prime(scores, n_dim=.2, metric='euclidean'):
"""Generate a smoothed version of the NBLAST scores.
In brief:
1. Run PCA on the NBLAST scores and extract the first N components.
2. From that calulate a new similarity matrix.
Requires scikit-learn.
Parameters
----------
scores : pandas.DataFrame
The all-by-all NBLAST scores.
n_dim : float | int
The number of dimensions to use. If float (0 < n_dim < 1) will
use `scores.shape[0] * n_dim`.
metric : str
Which distance metric to use. Directly passed through to the
`scipy.spatial.distance.pdist` function.
Returns
-------
scores_new
"""
try:
from sklearn.decomposition import PCA
except ImportError:
raise ImportError('Please install scikit-learn to use `nblast_prime`:\n'
' pip3 install scikit-learn -U')
if not isinstance(scores, pd.DataFrame):
raise TypeError(f'`scores` must be pandas DataFrame, got "{type(scores)}"')
if (scores.shape[0] != scores.shape[1]) or ~np.all(scores.columns == scores.index):
logger.warning('NBLAST matrix is not symmetric - are you sure this is '
'an all-by-all matrix?')
if n_dim < 1:
n_dim = int(scores.shape[1] * n_dim)
pca = PCA(n_components=n_dim)
X_new = pca.fit_transform(scores.values)
dist = pdist(X_new, metric=metric)
return pd.DataFrame(1 - squareform(dist), index=scores.index, columns=scores.columns)
def most(x, f=.9):
"""Check if most (as opposed to all) entries are True."""
if x.sum() >= (x.shape[0] * f):
return True
return False