"""Construct and handle Mapper pipelines."""
# License: GNU AGPLv3
from sklearn.pipeline import Pipeline
from .cluster import ParallelClustering
from .nerve import Nerve
from .utils._list_feature_union import ListFeatureUnion
from .utils.pipeline import transformer_from_callable_on_rows, identity
global_pipeline_params = ('memory', 'verbose')
nodes_params = ('scaler', 'filter_func', 'cover')
clust_prepr_params = ('clustering_preprocessing',)
clust_params = ('clusterer', 'n_jobs',
'parallel_backend_prefer')
nerve_params = ('min_intersection',)
clust_prepr_params_prefix = 'pullback_cover__'
nodes_params_prefix = 'pullback_cover__map_and_cover__'
clust_params_prefix = 'clustering__'
nerve_params_prefix = 'nerve__'
[docs]class MapperPipeline(Pipeline):
"""Subclass of :class:`sklearn.pipeline.Pipeline` to deal with
pipelines generated by :func:`~gtda.mapper.pipeline.make_mapper_pipeline`.
The :meth:`set_params` method is modified from the corresponding method in
:class:`sklearn.pipeline.Pipeline` to allow for simple access to the
parameters involved in the definition of the Mapper algorithm, without the
need to interface with the nested structure of the Pipeline objects
generated by :func:`~gtda.mapper.pipeline.make_mapper_pipeline`. The
convenience method :meth:`get_mapper_params` shows which parameters can
be set. See the Examples below.
Examples
--------
>>> from sklearn.cluster import DBSCAN
>>> from sklearn.decomposition import PCA
>>> from gtda.mapper import make_mapper_pipeline, CubicalCover
>>> filter_func = PCA(n_components=2)
>>> cover = CubicalCover()
>>> clusterer = DBSCAN()
>>> pipe = make_mapper_pipeline(filter_func=filter_func,
... cover=cover,
... clusterer=clusterer)
>>> print(pipe.get_mapper_params()['clusterer__eps'])
0.5
>>> pipe.set_params(clusterer___eps=0.1)
>>> print(pipe.get_mapper_params()['clusterer__eps'])
0.1
See also
--------
make_mapper_pipeline
"""
# TODO: Abstract away common logic into a more generalisable implementation
[docs] def get_mapper_params(self, deep=True):
"""Get all Mapper parameters for this estimator.
Parameters
----------
deep : boolean, optional, default: ``True``
If ``True``, will return the parameters for this estimator and
contained subobjects that are estimators.
Returns
-------
params : mapping of string to any
Parameter names mapped to their values.
"""
pipeline_params = super().get_params(deep=deep)
return {**{param: pipeline_params[param]
for param in global_pipeline_params},
**self._clean_dict_keys(pipeline_params, nodes_params_prefix),
**self._clean_dict_keys(
pipeline_params, clust_prepr_params_prefix),
**self._clean_dict_keys(pipeline_params, clust_params_prefix),
**self._clean_dict_keys(pipeline_params, nerve_params_prefix)}
[docs] def set_params(self, **kwargs):
"""Set the Mapper parameters.
Valid parameter keys can be listed with :meth:`get_mapper_params()`.
Returns
-------
self
"""
mapper_nodes_kwargs = self._subset_kwargs(kwargs, nodes_params)
mapper_clust_prepr_kwargs = \
self._subset_kwargs(kwargs, clust_prepr_params)
mapper_clust_kwargs = self._subset_kwargs(kwargs, clust_params)
mapper_nerve_kwargs = self._subset_kwargs(kwargs, nerve_params)
if mapper_nodes_kwargs:
super().set_params(
**{nodes_params_prefix + key: mapper_nodes_kwargs[key]
for key in mapper_nodes_kwargs})
[kwargs.pop(key) for key in mapper_nodes_kwargs]
if mapper_clust_prepr_kwargs:
super().set_params(
**{clust_prepr_params_prefix + key:
mapper_clust_prepr_kwargs[key] for key in
mapper_clust_prepr_kwargs})
[kwargs.pop(key) for key in mapper_clust_prepr_kwargs]
if mapper_clust_kwargs:
super().set_params(
**{clust_params_prefix + key: mapper_clust_kwargs[key]
for key in mapper_clust_kwargs})
[kwargs.pop(key) for key in mapper_clust_kwargs]
if mapper_nerve_kwargs:
super().set_params(
**{nerve_params_prefix + key: mapper_nerve_kwargs[key]
for key in mapper_nerve_kwargs})
[kwargs.pop(key) for key in mapper_nerve_kwargs]
super().set_params(**kwargs)
return self
@staticmethod
def _subset_kwargs(kwargs, param_strings):
return {key: value for key, value in kwargs.items()
if key.startswith(param_strings)}
@staticmethod
def _clean_dict_keys(kwargs, prefix):
return {
key[len(prefix):]: kwargs[key]
for key in kwargs
if (key.startswith(prefix)
and not key.startswith(prefix + 'steps')
and not key.startswith(prefix + 'memory')
and not key.startswith(prefix + 'verbose')
and not key.startswith(prefix + 'transformer_list')
and not key.startswith(prefix + 'n_jobs')
and not key.startswith(prefix + 'transformer_weights')
and not key.startswith(prefix + 'map_and_cover'))
}
[docs]def make_mapper_pipeline(scaler=None,
filter_func=None,
cover=None,
clustering_preprocessing=None,
clusterer=None,
n_jobs=None,
parallel_backend_prefer='threads',
graph_step=True,
min_intersection=1,
memory=None,
verbose=False):
"""Construct a MapperPipeline object according to the specified Mapper
steps. [1]_
The role of this function's main parameters is illustrated in `this diagram
<../../../../_images/mapper_pipeline.svg>`_. All computational steps may
be scikit-learn estimators, including Pipeline objects.
Parameters
----------
scaler : object or None, optional, default: ``None``
If ``None``, no scaling is performed. Otherwise, it must be an
object with a ``fit_transform`` method.
filter_func : object, callable or None, optional, default: ``None``
If `None``, PCA (:class:`sklearn.decomposition.PCA`) with 2
components and default parameters is used as a default filter
function. Otherwise, it may be an object with a ``fit_transform``
method, or a callable acting on one-dimensional arrays -- in which
case the callable is applied independently to each row of the
(scaled) data.
cover : object or None, optional, default: ``None``
Covering transformer, e.g. an instance of
:class:`~gtda.mapper.OneDimensionalCover` or of
:class:`~gtda.mapper.CubicalCover`. ``None`` is equivalent to passing
an instance of :class:`~gtda.mapper.CubicalCover` with its default
parameters.
clustering_preprocessing : object or None, optional, default: ``None``
If not ``None``, it is a transformer which is applied to the
data independently to the `scaler` -> `filter_func` -> `cover`
pipeline. Clustering is then performed on portions (determined by
the `scaler` -> `filter_func` -> `cover` pipeline) of the transformed
data.
clusterer : object or None, optional, default: ``None``
Clustering object with a ``fit`` method which stores cluster labels.
``None`` is equivalent to passing an instance of
:class:`sklearn.cluster.DBSCAN` with its default parameters.
n_jobs : int or None, optional, default: ``None``
The number of jobs to use in a joblib-parallel application of the
clustering step across pullback cover sets. To be used in
conjunction with `parallel_backend_prefer`. ``None`` means 1 unless
in a :obj:`joblib.parallel_backend` context. ``-1`` means using all
processors.
parallel_backend_prefer : ``'processes'`` | ``'threads'``, optional, \
default: ``'threads'``
Soft hint for the default joblib backend to use in a joblib-parallel
application of the clustering step across pullback cover sets. To be
used in conjunction with `n_jobs`. The default process-based backend is
'loky' and the default thread-based backend is 'threading'. See [2]_.
graph_step : bool, optional, default: ``True``
Whether the resulting pipeline should stop at the calculation of the
Mapper cover, or include the construction of the Mapper graph.
min_intersection : int, optional, default: ``1``
Minimum size of the intersection between clusters required for creating
an edge in the Mapper graph. Ignored if `graph_step` is set to
``False``.
memory : None, str or object with the joblib.Memory interface, \
optional, default: ``None``
Used to cache the fitted transformers of the pipeline. By default, no
caching is performed. If a string is given, it is the path to the
caching directory. Enabling caching triggers a clone of the
transformers before fitting. Therefore, the transformer instance
given to the pipeline cannot be inspected directly. Use the attribute
``named_steps`` or ``steps`` to inspect estimators within the
pipeline. Caching the transformers is advantageous when fitting is
time consuming.
verbose : bool, optional, default: ``False``
If True, the time elapsed while fitting each step will be printed as it
is completed.
Returns
-------
mapper_pipeline : :class:`~gtda.mapper.pipeline.MapperPipeline` object
Output Mapper pipeline.
Examples
--------
>>> # Example of basic usage with default parameters
>>> import numpy as np
>>> from gtda.mapper import make_mapper_pipeline
>>> mapper = make_mapper_pipeline()
>>> print(mapper.__class__)
<class 'gtda.mapper.pipeline.MapperPipeline'>
>>> mapper_params = mapper.get_mapper_params()
>>> print(mapper_params['filter_func'].__class__)
<class 'sklearn.decomposition._pca.PCA'>
>>> print(mapper_params['cover'].__class__)
<class 'gtda.mapper.cover.CubicalCover'>
>>> print(mapper_params['clusterer'].__class__)
<class 'sklearn.cluster._dbscan.DBSCAN'>
>>> X = np.random.random((10000, 4)) # 10000 points in 4-dimensional space
>>> mapper_graph = mapper.fit_transform(X) # Create the mapper graph
>>> print(type(mapper_graph))
igraph.Graph
>>> # Node metadata stored as dict in graph object
>>> print(mapper_graph['node_metadata'].keys())
dict_keys(['node_id', 'pullback_set_label', 'partial_cluster_label',
'node_elements'])
>>> # Find which points belong to first node of graph
>>> node_id, node_elements = mapper_graph['node_metadata']['node_id'],
... mapper_graph['node_metadata']['node_elements']
>>> print(f'Node Id: {node_id[0]}, Node elements: {node_elements[0]}, '
f'Data points: {X[node_elements[0]]}')
Node Id: 0,
Node elements: [8768],
Data points: [[0.01838998 0.76928754 0.98199244 0.0074299 ]]
>>> #######################################################################
>>> # Example using a scaler from scikit-learn, a filter function from
>>> # gtda.mapper.filter, and a clusterer from gtda.mapper.cluster
>>> from sklearn.preprocessing import MinMaxScaler
>>> from gtda.mapper import Projection, FirstHistogramGap
>>> scaler = MinMaxScaler()
>>> filter_func = Projection(columns=[0, 1])
>>> clusterer = FirstHistogramGap()
>>> mapper = make_mapper_pipeline(scaler=scaler,
... filter_func=filter_func,
... clusterer=clusterer)
>>> #######################################################################
>>> # Example using a callable acting on each row of X separately
>>> import numpy as np
>>> from gtda.mapper import OneDimensionalCover
>>> cover = OneDimensionalCover()
>>> mapper.set_params(scaler=None, filter_func=np.sum, cover=cover)
>>> #######################################################################
>>> # Example setting the memory parameter to cache each step and avoid
>>> # recomputation of early steps
>>> from tempfile import mkdtemp
>>> from shutil import rmtree
>>> cachedir = mkdtemp()
>>> mapper.set_params(memory=cachedir, verbose=True)
>>> mapper_graph = mapper.fit_transform(X)
[Pipeline] ............ (step 1 of 3) Processing scaler, total= 0.0s
[Pipeline] ....... (step 2 of 3) Processing filter_func, total= 0.0s
[Pipeline] ............. (step 3 of 3) Processing cover, total= 0.0s
[Pipeline] .... (step 1 of 3) Processing pullback_cover, total= 0.0s
[Pipeline] ........ (step 2 of 3) Processing clustering, total= 0.3s
[Pipeline] ............. (step 3 of 3) Processing nerve, total= 0.0s
>>> mapper.set_params(min_intersection=3)
>>> mapper_graph = mapper.fit_transform(X)
[Pipeline] ............. (step 3 of 3) Processing nerve, total= 0.0s
>>> # Clear the cache directory when you don't need it anymore
>>> rmtree(cachedir)
>>> #######################################################################
>>> # Example using a large dataset for which parallelism in
>>> # clustering across the pullback cover sets can be beneficial
>>> from sklearn.cluster import DBSCAN
>>> mapper = make_mapper_pipeline(clusterer=DBSCAN(),
... n_jobs=6,
... memory=mkdtemp(),
... verbose=True)
>>> X = np.random.random((100000, 4))
>>> mapper.fit_transform(X)
[Pipeline] ............ (step 1 of 3) Processing scaler, total= 0.0s
[Pipeline] ....... (step 2 of 3) Processing filter_func, total= 0.1s
[Pipeline] ............. (step 3 of 3) Processing cover, total= 0.6s
[Pipeline] .... (step 1 of 3) Processing pullback_cover, total= 0.7s
[Pipeline] ........ (step 2 of 3) Processing clustering, total= 1.9s
[Pipeline] ............. (step 3 of 3) Processing nerve, total= 0.3s
>>> mapper.set_params(n_jobs=1)
>>> mapper.fit_transform(X)
[Pipeline] ........ (step 2 of 3) Processing clustering, total= 5.3s
[Pipeline] ............. (step 3 of 3) Processing nerve, total= 0.3s
See also
--------
:class:`MapperPipeline`,
:meth:`~gtda.mapper.utils.decorators.method_to_transform`
References
----------
.. [1] G. Singh, F. Mémoli, and G. Carlsson, "Topological methods for the
analysis of high dimensional data sets and 3D object recognition";
in *SPBG*, pp. 91--100, 2007.
.. [2] "Thread-based parallelism vs process-based parallelism", in
`joblib documentation
<https://joblib.readthedocs.io/en/latest/parallel.html>`_.
"""
# TODO: Implement parameter validation
if scaler is None:
_scaler = identity(validate=False)
else:
_scaler = scaler
# If filter_func is not a scikit-learn transformer, hope it as a
# callable to be applied on each row separately. Then attempt to create a
# FunctionTransformer object to implement this behaviour.
if filter_func is None:
from sklearn.decomposition import PCA
_filter_func = PCA(n_components=2)
elif not hasattr(filter_func, 'fit_transform'):
_filter_func = transformer_from_callable_on_rows(filter_func)
else:
_filter_func = filter_func
if cover is None:
from .cover import CubicalCover
_cover = CubicalCover()
else:
_cover = cover
if clustering_preprocessing is None:
_clustering_preprocessing = identity(validate=True)
else:
_clustering_preprocessing = clustering_preprocessing
if clusterer is None:
from sklearn.cluster import DBSCAN
_clusterer = DBSCAN()
else:
_clusterer = clusterer
map_and_cover = Pipeline(
steps=[('scaler', _scaler),
('filter_func', _filter_func),
('cover', _cover)],
verbose=verbose)
all_steps = [
('pullback_cover', ListFeatureUnion(
[('clustering_preprocessing', _clustering_preprocessing),
('map_and_cover', map_and_cover)])),
('clustering', ParallelClustering(
clusterer=_clusterer,
n_jobs=n_jobs,
parallel_backend_prefer=parallel_backend_prefer))
]
if graph_step:
all_steps.append(('nerve', Nerve(min_intersection=min_intersection)))
mapper_pipeline = MapperPipeline(
steps=all_steps, memory=memory, verbose=verbose)
return mapper_pipeline