Source code for gtda.metaestimators.collection_transformer
"""CollectionTransformer meta-estimator."""
# License: GNU AGPLv3
from functools import reduce
from operator import and_
from warnings import warn
import numpy as np
from joblib import Parallel, delayed
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.base import clone
from sklearn.utils.metaestimators import if_delegate_has_method
from gtda.utils import check_collection
[docs]class CollectionTransformer(BaseEstimator, TransformerMixin):
"""Meta-transformer for applying a fit-transformer to each input in a
collection.
If `transformer` possesses a ``fit_transform`` method,
``CollectionTransformer(transformer)`` also possesses a
:meth:`fit_transform` method which, on each entry in its input ``X``,
fit-transforms a clone of `transformer`. A collection (list or ndarray) of
outputs is returned.
Note: to have compatibility with scikit-learn and giotto-tda pipelines, a
:meth:`transform` method is also present but it is simply an alias for
:meth:`fit_transform`.
Parameters
----------
transformer : object
The fit-transformer instance from which the transformer acting on
collections is built. Should implement ``fit_transform``.
n_jobs : int or None, optional, default: ``None``
The number of jobs to use in a joblib-parallel application of
`transformer`'s ``fit_transform`` to each input. ``None`` means 1
unless in a :obj:`joblib.parallel_backend` context. ``-1`` means using
all processors.
parallel_backend_prefer : ``"processes"`` | ``"threads"`` | ``None``, \
optional, default: ``None``
Soft hint for the default joblib backend to use in a joblib-parallel
application of `transformer`'s ``fit_transform`` to each input. See
[1]_.
parallel_backend_require : ``"sharedmem"`` or None, optional, default: \
``None``
Hard constraint to select the backend. If set to ``'sharedmem'``, the
selected backend will be single-host and thread-based even if the user
asked for a non-thread based backend with parallel_backend.
Examples
--------
>>> import numpy as np
>>> from sklearn.decomposition import PCA
>>> from gtda.metaestimators import CollectionTransformer
>>> rng = np.random.default_rng()
Create a collection of 1000 2D inputs for PCA, as a single 3D ndarray (we
could also create a list of 2D inputs instead).
>>> X = rng.random((1000, 100, 50))
In the case of PCA, joblib parallelism can be very beneficial!
>>> multi_pca = CollectionTransformer(PCA(n_components=3), n_jobs=-1)
>>> Xt = multi_pca.fit_transform(X)
Since all PCA outputs have the same shape, ``Xt`` is an ndarray.
>>> print(Xt.shape)
(1000, 100, 3)
See also
--------
gtda.mapper.utils.pipeline.transformer_from_callable_on_rows, \
gtda.mapper.utils.decorators.method_to_transform
References
----------
.. [1] "Thread-based parallelism vs process-based parallelism", in
`joblib documentation
<https://joblib.readthedocs.io/en/latest/parallel.html>`_.
"""
[docs] def __init__(self, transformer, n_jobs=None, parallel_backend_prefer=None,
parallel_backend_require=None):
self.transformer = transformer
self.n_jobs = n_jobs
self.parallel_backend_prefer = parallel_backend_prefer
self.parallel_backend_require = parallel_backend_require
def _validate_transformer(self):
if not hasattr(self.transformer, "fit_transform"):
raise TypeError("`transformer` must possess a fit_transform "
"method.")
if not isinstance(self.transformer, BaseEstimator):
warn("`transformer` is not an instance of "
"sklearn.base.BaseEstimator. This will lead to limited "
"functionality in a scikit-learn context.", UserWarning)
[docs] def fit(self, X, y=None):
"""Do nothing and return the estimator unchanged.
This method is here to implement the usual scikit-learn API and hence
work in pipelines.
Parameters
----------
X : list of length n_samples, or ndarray of shape (n_samples, ...)
Collection of inputs to be fit-transformed by `transformer`.
y : None
There is no need for a target in a transformer, yet the pipeline
API requires this parameter.
Returns
-------
self : object
"""
check_collection(X, accept_sparse=True, accept_large_sparse=True,
force_all_finite=False)
self._validate_transformer()
self._is_fitted = True
return self
[docs] @if_delegate_has_method(delegate="transformer")
def fit_transform(self, X, y=None):
"""Fit-transform a clone of `transformer` to each element in the
collection `X`.
Parameters
----------
X : list of length n_samples, or ndarray of shape (n_samples, ...)
Collection of inputs to be fit-transformed by `transformer`.
y : None
There is no need for a target in a transformer, yet the pipeline
API requires this parameter.
Returns
-------
Xt : list of length n_samples, or ndarray of shape (n_samples, ...)
Collection of outputs. It is a list unless all outputs have the
same shape, in which case it is converted to an ndarray.
"""
Xt = check_collection(X, accept_sparse=True, accept_large_sparse=True,
force_all_finite=False)
self._validate_transformer()
Xt = Parallel(n_jobs=self.n_jobs, prefer=self.parallel_backend_prefer,
require=self.parallel_backend_require)(
delayed(clone(self.transformer).fit_transform)(x) for x in Xt
)
x0_shape = Xt[0].shape
if reduce(and_, (x.shape == x0_shape for x in Xt), True):
Xt = np.asarray(Xt)
return Xt
[docs] def transform(self, X, y=None):
"""Alias for :meth:`fit_transform`.
Allows for this class to be used as an intermediate step in a
scikit-learn pipeline.
Parameters
----------
X : list of length n_samples, or ndarray of shape (n_samples, ...)
Collection of inputs to be fit-transformed by `transformer`.
y : None
There is no need for a target in a transformer, yet the pipeline
API requires this parameter.
Returns
-------
Xt : list of length n_samples, or ndarray of shape (n_samples, ...)
Collection of outputs. It is a list unless all outputs have the
same shape, in which case it is converted to an ndarray.
"""
return self.fit_transform(X, y)