# This script is part of navis (http://www.github.com/navis-org/navis).
# Copyright (C) 2018 Philipp Schlegel
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
import io
import json
import os
import struct
import tempfile
import requests
import numpy as np
import pandas as pd
from pathlib import Path
from functools import lru_cache
from typing import Union, Dict, Optional, Any, IO, List
from typing_extensions import Literal
from zipfile import ZipFile, ZipInfo
from .. import config, utils, core
from . import base
try:
import zlib
import zipfile
compression = zipfile.ZIP_DEFLATED
except ImportError:
compression = zipfile.ZIP_STORED
DEFAULT_FMT = "{name}"
class PrecomputedReader(base.BaseReader):
def is_valid_file(self, file):
"""Return True if file should be considered for reading."""
if isinstance(file, zipfile.ZipInfo):
file = str(file.filename)
elif isinstance(file, Path):
if not file.is_file():
return False
file = str(file.name)
else:
file = str(file)
# Drop anything with a file extension or hidden files (e.g. ".DS_store")
if '.' in file:
return False
# Ignore the info file
if file == 'info':
return False
# Ignore manifests
if file.endswith(':0'):
return False
return True
class PrecomputedMeshReader(PrecomputedReader):
def __init__(
self,
fmt: str = DEFAULT_FMT,
attrs: Optional[Dict[str, Any]] = None
):
super().__init__(fmt=fmt,
attrs=attrs,
file_ext='',
name_fallback='mesh',
read_binary=True)
def read_buffer(
self, f: IO, attrs: Optional[Dict[str, Any]] = None
) -> 'core.MeshNeuron':
"""Read buffer into a MeshNeuron.
Parameters
----------
f : IO
Readable buffer - must be bytes.
attrs : dict | None
Arbitrary attributes to include in the MeshNeuron.
Returns
-------
core.MeshNeuron
"""
if not isinstance(f.read(0), bytes):
raise ValueError(f'Expected bytes, got {type(f.read(0))}')
num_vertices = np.frombuffer(f.read(4), np.uint32)[0]
vertices = np.frombuffer(f.read(int(3 * 4 * num_vertices)),
np.float32).reshape(-1, 3)
faces = np.frombuffer(f.read(),
np.uint32).reshape(-1, 3)
return core.MeshNeuron({'vertices': vertices, 'faces': faces},
**(self._make_attributes({'name': self.name_fallback,
'origin': 'DataFrame'}, attrs)))
class PrecomputedSkeletonReader(PrecomputedReader):
def __init__(
self,
fmt: str = DEFAULT_FMT,
attrs: Optional[Dict[str, Any]] = None,
info: Dict[str, Any] = {}
):
super().__init__(fmt=fmt,
attrs=attrs,
file_ext='',
name_fallback='skeleton',
read_binary=True)
self.info = info
def read_buffer(
self,
f: IO, attrs: Optional[Dict[str, Any]] = None
) -> 'core.TreeNeuron':
"""Read buffer into a TreeNeuron.
Parameters
----------
f : IO
Readable buffer - must be bytes.
attrs : dict | None
Arbitrary attributes to include in the TreeNeuron.
Returns
-------
core.TreeNeuron
"""
if not isinstance(f.read(0), bytes):
raise ValueError(f'Expected bytes, got {type(f.read(0))}')
num_nodes = np.frombuffer(f.read(4), np.uint32)[0]
num_edges = np.frombuffer(f.read(4), np.uint32)[0]
nodes = np.frombuffer(f.read(int(3 * 4 * num_nodes)),
np.float32).reshape(-1, 3)
edges = np.frombuffer(f.read(int(2 * 4 * num_edges)),
np.uint32).reshape(-1, 2)
swc = self.make_swc(nodes, edges)
# Check for malformed vertex attributes (should be list of dicts)
if isinstance(self.info.get('vertex_attributes', None), dict):
self.info['vertex_attributes'] = [self.info['vertex_attributes']]
# Parse additional vertex attributes if specified as per the info file
for attr in self.info.get('vertex_attributes', []):
dtype = np.dtype(attr['data_type'])
n_comp = attr['num_components']
values = np.frombuffer(f.read(int(n_comp * dtype.itemsize * num_nodes)),
dtype).reshape(-1, n_comp)
if n_comp == 1:
swc[attr['id']] = values.flatten()
else:
for i in range(n_comp):
swc[f"{attr['id']}_{i}"] = values[:, i]
return core.TreeNeuron(swc,
**(self._make_attributes({'name': self.name_fallback,
'origin': 'DataFrame'}, attrs)))
def make_swc(
self, nodes: np.ndarray, edges: np.ndarray
) -> pd.DataFrame:
"""Make SWC table from nodes and edges.
Parameters
----------
nodes : (N, 3) array
edges : (N, 2) array
Returns
-------
pandas.DataFrame
"""
swc = pd.DataFrame()
swc['node_id'] = np.arange(len(nodes))
swc['x'], swc['y'], swc['z'] = nodes[:, 0], nodes[:, 1], nodes[:, 2]
edge_dict = dict(zip(edges[:, 1], edges[:, 0]))
swc['parent_id'] = swc.node_id.map(lambda x: edge_dict.get(x, -1)).astype(np.int32)
return swc
[docs]
def read_precomputed(f: Union[str, io.BytesIO],
datatype: Union[Literal['auto'],
Literal['mesh'],
Literal['skeleton']] = 'auto',
include_subdirs: bool = False,
fmt: str = '{id}',
info: Union[bool, str, dict] = True,
limit: Optional[int] = None,
parallel: Union[bool, int] = 'auto',
**kwargs) -> 'core.NeuronObject':
"""Read skeletons and meshes from neuroglancer's precomputed format.
Follows the formats specified
`here <https://github.com/google/neuroglancer/tree/master/src/neuroglancer/datasource/precomputed>`_.
Parameters
----------
f : filepath | folder | zip file | bytes
Filename, folder or bytes. If folder, will import all
files. If a ``.zip``, ``.tar`` or ``.tar.gz`` file will
read all files in the archive. See also ``limit`` parameter.
datatype : "auto" | "skeleton" | "mesh"
Which data type we expect to read from the files. If
"auto", we require a "info" file in the same directory
as ``f``.
include_subdirs : bool, optional
If True and ``f`` is a folder, will also search
subdirectories for binary files.
fmt : str
Formatter to specify what files to look for (when `f` is
directory) and how they are parsed into neuron
attributes. Some illustrative examples:
- ``{name}`` (default) uses the filename
(minus the suffix) as the neuron's name property
- ``{id}`` (default) uses the filename as the neuron's ID
property
- ``{name,id}`` uses the filename as the neuron's
name and ID properties
- ``{name}.{id}`` splits the filename at a "."
and uses the first part as name and the second as ID
- ``{name,id:int}`` same as above but converts
into integer for the ID
- ``{name}_{myproperty}`` splits the filename at
"_" and uses the first part as name and as a
generic "myproperty" property
- ``{name}_{}_{id}`` splits the filename at
"_" and uses the first part as name and the last as
ID. The middle part is ignored.
Throws a ValueError if pattern can't be found in
filename. Ignored for DataFrames.
info : bool | str | dict
An info file describing the data:
- ``True`` = will look for `info` file in base folder
- ``False`` = do not use/look for `info` file
- ``str`` = filepath to `info` file
- ``dict`` = already parsed info file
limit : int, optional
If reading from a folder you can use this parameter to
read only the first ``limit`` files. Useful if
wanting to get a sample from a large library of
skeletons/meshes.
parallel : "auto" | bool | int
Defaults to ``auto`` which means only use parallel
processing if more than 200 files are imported. Spawning
and joining processes causes overhead and is
considerably slower for imports of small numbers of
neurons. Integer will be interpreted as the
number of cores (otherwise defaults to
``os.cpu_count() // 2``).
**kwargs
Keyword arguments passed to the construction of the
neurons. You can use this to e.g. set meta data such
as ``units``.
Returns
-------
navis.MeshNeuron
navis.NeuronList
See Also
--------
:func:`navis.write_precomputed`
Export neurons/volumes to precomputed format.
"""
utils.eval_param(datatype, name='datatype', allowed_values=('skeleton',
'mesh',
'auto'))
# See if we can get the info file from somewhere
if info is True and not isinstance(f, bytes):
# Find info in zip archive
if str(f).endswith('.zip'):
with ZipFile(Path(f).expanduser(), 'r') as zip:
if 'info' in [f.filename for f in zip.filelist]:
info = json.loads(zip.read('info').decode())
elif datatype == 'auto':
raise ValueError('No `info` file found in zip file. Please '
'specify data type using the `datatype` '
'parameter.')
# Try loading info from URL
elif utils.is_url(str(f)):
base_url = '/'.join(str(f).split('/')[:-1])
info = _fetch_info_file(base_url, raise_missing=False)
# Try loading info from parent path
else:
fp = Path(str(f))
# Find first existing root
while not fp.is_dir():
fp = fp.parent
fp = fp / 'info'
if fp.is_file():
with open(fp, 'r') as info_file:
info = json.load(info_file)
# At this point we should have a dictionary - even if it's empty
if not isinstance(info, dict):
info = {}
# Parse data type from info file (if required)
if datatype == 'auto':
if '@type' not in info:
raise ValueError('Either no `info` file found or it does not specify '
'a data type. Please provide data type using the '
'`datatype` parameter.')
if info.get('@type', None) == 'neuroglancer_legacy_mesh':
datatype = 'mesh'
elif info.get('@type', None) == 'neuroglancer_skeletons':
datatype = 'skeleton'
else:
raise ValueError('Data type specified in `info` file unknown: '
f'{info.get("@type", None)}. Please provide data '
'type using the `datatype` parameter.')
if isinstance(f, bytes):
f = io.BytesIO(f)
if datatype == 'skeleton':
if not isinstance(info, dict):
info = {}
reader = PrecomputedSkeletonReader(fmt=fmt, attrs=kwargs, info=info)
else:
reader = PrecomputedMeshReader(fmt=fmt, attrs=kwargs)
return reader.read_any(f, include_subdirs, parallel, limit=limit)
class PrecomputedWriter(base.Writer):
"""Writer class that also takes care of `info` files."""
def write_any(self, x, filepath, write_info=True, **kwargs):
"""Write any to file. Default entry point."""
# First write the actual neurons
kwargs['write_info'] = False
super().write_any(x, filepath=filepath, **kwargs)
# Write info file to the correct directory/zipfile
if write_info:
add_props = {}
if kwargs.get('radius', False):
add_props['vertex_attributes'] = [{'id': 'radius',
'data_type': 'float32',
'num_components': 1}]
if str(self.path).endswith('.zip'):
with ZipFile(self.path, mode='a') as zf:
# Context-manager will remove temporary directory and its contents
with tempfile.TemporaryDirectory() as tempdir:
# Write info to zip
if write_info:
# Generate temporary filename
f = os.path.join(tempdir, 'info')
write_info_file(x, f, add_props=add_props)
# Add file to zip
zf.write(f, arcname='info', compress_type=compression)
else:
fp = self.path
# Find the first existing root directory
while not fp.is_dir():
fp = fp.parent
write_info_file(x, fp, add_props=add_props)
[docs]
def write_precomputed(x: Union['core.NeuronList', 'core.TreeNeuron', 'core.MeshNeuron', 'core.Volume'],
filepath: Optional[str] = None,
write_info: bool = True,
write_manifest: bool = False,
radius: bool = False) -> None:
"""Export skeletons or meshes to neuroglancer's (legacy) precomputed format.
Note that you should not mix meshes and skeletons in the same folder!
Follows the formats specified
`here <https://github.com/google/neuroglancer/tree/master/src/neuroglancer/datasource/precomputed>`_.
Parameters
----------
x : TreeNeuron | MeshNeuron | Volume | Trimesh | NeuronList
If multiple neurons, will generate a file for each
neuron (see also ``filepath``). For use in neuroglancer
coordinates should generally be in nanometers.
filepath : None | str | list, optional
If ``None``, will return byte string or list of
thereof. If filepath will save to this file. If path
will save neuron(s) in that path using ``{x.id}``
as filename(s). If list, input must be NeuronList and
a filepath must be provided for each neuron.
write_info : bool
Whether to also write a JSON-formatted ``info`` file that
can be parsed by e.g. neuroglancer. This only works if
inputs are either only skeletons or only meshes!
write_manifest : bool
For meshes only: whether to also write manifests. For
each mesh we will create a JSON-encoded ``{id}:0`` file
that contains a "fragments" entry that maps to the
actual filename. Note that this will not work on Windows
because colons aren't allowed in file names and on OSX
the colon will show up as a ``/`` in the Finder.
radius : bool
For TreeNeurons only: whether to write radius as
additional vertex property.
Returns
-------
None
If filepath is not ``None``.
bytes
If filepath is ``None``.
See Also
--------
:func:`navis.read_precomputed`
Import neurons from neuroglancer's precomputed format.
:func:`navis.write_mesh`
Write meshes to generic mesh formats (obj, stl, etc).
Examples
--------
Write skeletons:
>>> import navis
>>> n = navis.example_neurons(3, kind='skeleton')
>>> navis.write_precomputed(n, tmp_dir)
Write meshes:
>>> import navis
>>> n = navis.example_neurons(3, kind='mesh')
>>> navis.write_precomputed(n, tmp_dir)
Write directly to zip archive:
>>> import navis
>>> n = navis.example_neurons(3, kind='skeleton')
>>> navis.write_precomputed(n, tmp_dir / 'precomputed.zip')
"""
writer = PrecomputedWriter(_write_precomputed, ext=None)
return writer.write_any(x,
filepath=filepath,
write_info=write_info,
write_manifest=write_manifest,
radius=radius)
def _write_precomputed(x: Union['core.TreeNeuron', 'core.MeshNeuron', 'core.Volume'],
filepath: Optional[str] = None,
write_info: bool = True,
write_manifest: bool = False,
radius: bool = False) -> None:
"""Write single neuron to neuroglancer's precomputed format."""
if filepath and os.path.isdir(filepath):
if isinstance(x, core.BaseNeuron):
if not x.id:
raise ValueError('Neuron(s) must have an ID when destination '
'is a folder')
filepath = os.path.join(filepath, f'{x.id}')
elif isinstance(x, core.Volume):
filepath = os.path.join(filepath, f'{x.name}')
else:
raise ValueError(f'Unable to generate filename for {type(x)}')
if isinstance(x, core.TreeNeuron):
return _write_skeleton(x, filepath, radius=radius)
elif utils.is_mesh(x):
return _write_mesh(x.vertices, x.faces, filepath,
write_manifest=write_manifest)
else:
raise TypeError(f'Unable to write data of type "{type(x)}"')
def write_info_file(data, filepath, add_props={}):
"""Write neuroglancer 'info' file for given neurons.
Parameters
----------
data : navis.NeuronList | navis.Volumes | trimesh
filepath : str | Path
Path to write the file to.
add_props : dict
Additional properties to write to the file.
"""
info = {}
if utils.is_iterable(data):
types = list(set([type(d) for d in data]))
if len(types) > 1:
raise ValueError('Unable to write info file for mixed data: '
f'{data.types}')
data = data[0]
if utils.is_mesh(data):
info['@type'] = 'neuroglancer_legacy_mesh'
elif isinstance(data, core.TreeNeuron):
info['@type'] = 'neuroglancer_skeletons'
# If we know the units add transform from "stored model"
# to "model space" which is supposed to be nm
if not data.units.dimensionless:
u = data.units.to('1 nm').magnitude
else:
u = 1
tr = np.zeros((4, 3), dtype=int)
tr[:3, :3] = np.diag([u, u, u])
info['transform'] = tr.T.flatten().tolist()
else:
raise TypeError(f'Unable to write info file for data of type "{type(data)}"')
info.update(add_props)
if not str(filepath).endswith('/info'):
filepath = os.path.join(filepath, 'info')
with open(filepath, 'w') as f:
json.dump(info, f)
def _write_mesh(vertices, faces, filename, write_manifest=False):
"""Write mesh to precomputed binary format."""
# Make sure we are working with the correct data types
vertices = np.asarray(vertices, dtype='float32')
faces = np.asarray(faces, dtype='uint32')
n_vertices = np.uint32(vertices.shape[0])
vertex_index_format = [n_vertices, vertices, faces]
results = b''.join([array.tobytes('C') for array in vertex_index_format])
if filename:
filename = Path(filename)
with open(filename, 'wb') as f:
f.write(results)
if write_manifest:
with open(filename.parent / f'{filename.name}:0', 'w') as f:
json.dump({'fragments': [filename.name]}, f)
else:
return results
def _write_skeleton(x, filename, radius=False):
"""Write skeleton to neuroglancers binary format."""
# Below code modified from:
# https://github.com/google/neuroglancer/blob/master/python/neuroglancer/skeleton.py#L34
result = io.BytesIO()
vertex_positions = x.nodes[['x', 'y', 'z']].values.astype('float32', order='C')
# Map edges node IDs to node indices
node_ix = pd.Series(x.nodes.reset_index(drop=True).index, index=x.nodes.node_id)
edges = x.edges.copy().astype('uint32', order='C')
edges[:, 0] = node_ix.loc[edges[:, 0]].values
edges[:, 1] = node_ix.loc[edges[:, 1]].values
edges = edges[:, [1, 0]] # For some reason we have to switch direction
result.write(struct.pack('<II', vertex_positions.shape[0], edges.shape[0]))
result.write(vertex_positions.tobytes())
result.write(edges.tobytes())
if radius and 'radius' in x.nodes.columns:
if any(pd.isnull(x.nodes['radius'])):
raise ValueError('Unable to write radii with missing values.')
result.write(x.nodes.radius.values.astype('float32').tobytes())
if filename:
with open(filename, 'wb') as f:
f.write(result.getvalue())
else:
return result.getvalue()
@lru_cache
def _fetch_info_file(base_url, raise_missing=True):
"""Try and fetch `info` file for given base url."""
if not base_url.endswith('/'):
base_url += '/'
r = requests.get(f'{base_url}info')
try:
r.raise_for_status()
except requests.HTTPError:
if raise_missing:
raise
else:
return {}
except BaseException:
raise
return r.json()