Source code for gtda.mapper.cluster

"""Clustering methods and classes for parallelised clustering."""
# License: GNU AGPLv3

from inspect import signature
from numbers import Real

import numpy as np
from joblib import Parallel, delayed
from sklearn.base import BaseEstimator, ClusterMixin, clone
from sklearn.cluster import DBSCAN

try:  # scikit-learn >= 0.22.1
    from sklearn.cluster._agglomerative import _TREE_BUILDERS, _hc_cut
except ImportError:
    from sklearn.cluster._hierarchical import _TREE_BUILDERS, _hc_cut
from sklearn.utils import check_array
from sklearn.utils.validation import check_memory

from ..utils.intervals import Interval
from ..utils.validation import validate_params
from .utils._cluster import _num_clusters_histogram, _num_clusters_simple


class ParallelClustering(BaseEstimator):
    """Employ joblib parallelism to cluster different portions of a dataset.

    An arbitrary clustering class which stores a ``labels_`` attribute in
    ``fit`` can be passed to the constructor. Examples are most classes in
    ``sklearn.cluster``. The input of :meth:`fit` is of the form ``[X_tot,
    masks]`` where ``X_tot`` is the full dataset, and ``masks`` is a
    two-dimensional boolean array, each column of which indicates the
    location of a portion of ``X_tot`` to cluster separately. Parallelism is
    achieved over the columns of ``masks``.

    Parameters
    ----------
    clusterer : object, optional, default: ``None``
        Clustering object such as derived from
        :class:`sklearn.base.ClusterMixin`. ``None`` means that the default
        :class:`sklearn.cluster.DBSCAN` is used.

    n_jobs : int or None, optional, default: ``None``
        The number of jobs to use for the computation. ``None`` means 1
        unless in a :obj:`joblib.parallel_backend` context. ``-1`` means
        using all processors.

    parallel_backend_prefer : ``'processes'`` | ``'threads'``, optional, \
        default: ``'threads'``
        Selects the default joblib backend. The default process-based backend
        is 'loky' and the default thread-based backend is 'threading'.

    Attributes
    ----------
    clusterers_ : tuple of object
        If `clusterer` is not ``None``, clones of `clusterer` fitted
        to the portions of the full data array specified in :meth:`fit`.
        Otherwise, clones of a default instance of
        :class:`sklearn.cluster.DBSCAN`, fitted in the same way.

    clusters_ : list of list of tuple
       Labels and indices of each cluster found in :meth:`fit`. The i-th
       entry corresponds to the i-th portion of the data; it is a list
       of triples of the form ``(i, label, indices)``, where ``label`` is a
       cluster label and ``indices`` is the array of indices of points
       belonging to cluster ``(i, label)``.

    """

    def __init__(self, clusterer=None,
                 n_jobs=None,
                 parallel_backend_prefer='threads'):
        self.clusterer = clusterer
        self.n_jobs = n_jobs
        self.parallel_backend_prefer = parallel_backend_prefer

    def _validate_clusterer(self, default=DBSCAN()):
        """Set :attr:`clusterer_` depending on the value of  `clusterer`.

        Also verify whether calculations are to be based on precomputed
        metric/affinity information or not.

        """
        if self.clusterer is not None:
            self._clusterer = self.clusterer
        else:
            self._clusterer = default
        params = [param for param in ['metric', 'affinity']
                  if param in signature(self._clusterer.__init__).parameters]
        precomputed = [(getattr(self._clusterer, param) == 'precomputed')
                       for param in params]
        if not precomputed:
            self._precomputed = False
        elif len(precomputed) == 1:
            self._precomputed = precomputed[0]
        else:
            raise NotImplementedError("Behaviour when metric and affinity "
                                      "are both set to 'precomputed' not yet"
                                      "implemented by ParallelClustering.")

    def fit(self, X, y=None, sample_weight=None):
        """Fit the clusterer on each portion of the data.

        :attr:`clusterers_` and :attr:`clusters_` are computed and stored.

        Parameters
        ----------
        X : list-like of form ``[X_tot, masks]``
            Input data as a list of length 2. ``X_tot`` is an ndarray of shape
            (n_samples, n_features) or (n_samples, n_samples) specifying the
            full data. ``masks`` is a boolean ndarray of shape
            (n_samples, n_portions) whose columns are boolean masks
            on ``X_tot``, specifying the portions of ``X_tot`` to be
            independently clustered.

        y : None
            There is no need for a target in a transformer, yet the pipeline
            API requires this parameter.

        sample_weight : array-like or None, optional, default: ``None``
            The weights for each observation in the full data. If ``None``,
            all observations are assigned equal weight. Otherwise, it has
            shape (n_samples,).

        Returns
        -------
        self : object

        """
        self._validate_clusterer()
        X_tot, masks = X
        if sample_weight is not None:
            sample_weights = [sample_weight[masks[:, i]]
                              for i in range(masks.shape[1])]
        else:
            sample_weights = [None] * masks.shape[1]

        if self._precomputed:
            single_fitter = self._fit_single_abs_labels_precomputed
        else:
            single_fitter = self._fit_single_abs_labels

        self.clusterers_ = Parallel(n_jobs=self.n_jobs,
                                    prefer=self.parallel_backend_prefer)(
            delayed(single_fitter)(
                X_tot, np.flatnonzero(mask),
                mask_num, sample_weight=sample_weights[mask_num])
            for mask_num, mask in enumerate(masks.T))
        self.clusters_ = [clusterer.abs_labels_ for clusterer in
                          self.clusterers_]
        return self

    def _fit_single_abs_labels(self, X, relative_indices, mask_num,
                               sample_weight=None):
        cloned_clusterer, unique_labels, unique_labels_inverse = \
            self._fit_single(X, relative_indices, sample_weight)
        self._create_abs_labels(cloned_clusterer, relative_indices, mask_num,
                                unique_labels, unique_labels_inverse)
        return cloned_clusterer

    def _fit_single_abs_labels_precomputed(self, X, relative_indices, mask_num,
                                           sample_weight=None):
        relative_2d_indices = np.ix_(relative_indices, relative_indices)
        cloned_clusterer, unique_labels, unique_labels_inverse = \
            self._fit_single(X, relative_2d_indices, sample_weight)
        self._create_abs_labels(cloned_clusterer, relative_indices, mask_num,
                                unique_labels, unique_labels_inverse)
        return cloned_clusterer

    def _fit_single(self, X, relative_indices, sample_weight):
        cloned_clusterer = clone(self._clusterer)
        X_sub = X[relative_indices]

        fit_params = signature(cloned_clusterer.fit).parameters
        if 'sample_weight' in fit_params:
            cloned_clusterer.fit(X_sub, sample_weight=sample_weight)
        else:
            cloned_clusterer.fit(X_sub)

        unique_labels, unique_labels_inverse = np.unique(
            cloned_clusterer.labels_, return_inverse=True)
        return cloned_clusterer, unique_labels, unique_labels_inverse

    @staticmethod
    def _create_abs_labels(cloned_clusterer, relative_indices, mask_num,
                           unique_labels, inv):
        cloned_clusterer.abs_labels_ = [
            (mask_num, label, relative_indices[inv == i])
            for i, label in enumerate(unique_labels)]

    def fit_predict(self, X, y=None, sample_weight=None):
        """Fit to the data, and return the found clusters.

        Parameters
        ----------
        X : list-like of form ``[X_tot, masks]``
            Input data as a list of length 2. ``X_tot`` is an ndarray of shape
            (n_samples, n_features) or (n_samples, n_samples) specifying the
            full data. ``masks`` is a boolean ndarray of shape
            (n_samples, n_portions) whose columns are boolean masks
            on ``X_tot``, specifying the portions of ``X_tot`` to be
            independently clustered.

        y : None
            There is no need for a target in a transformer, yet the pipeline
            API requires this parameter.

        sample_weight : array-like or None, optional, default: ``None``
            The weights for each observation in the full data. If ``None``,
            all observations are assigned equal weight. Otherwise, it has
            shape (n_samples,).

        Returns
        -------
        clusters : list of list of tuple
            See :attr:`clusters_`.

        """
        self.fit(X, sample_weight=sample_weight)
        return self.clusters_

    def transform(self, X, y=None):
        """Not implemented.

        Only present so that the class is a valid step in a scikit-learn
        pipeline.

        Parameters
        ----------
        X : Ignored
            Ignored.

        y : None
            There is no need for a target in a transformer, yet the pipeline
            API requires this parameter.

        """
        raise NotImplementedError(
            "Transforming new data with a fitted ParallelClustering object "
            "not yet implemented, use fit_transform instead.")

    def fit_transform(self, X, y=None, **fit_params):
        """Alias for :meth:`fit_predict`.

        Allows for this class to be used as a step in a scikit-learn pipeline.

        Parameters
        ----------
        X : list-like of form ``[X_tot, masks]``
            Input data as a list of length 2. ``X_tot`` is an ndarray of shape
            (n_samples, n_features) or (n_samples, n_samples) specifying the
            full data. ``masks`` is a boolean ndarray of shape
            (n_samples, n_portions) whose columns are boolean masks
            on ``X_tot``, specifying the portions of ``X_tot`` to be
            independently clustered.

        y : None
            There is no need for a target in a transformer, yet the pipeline
            API requires this parameter.

        Returns
        -------
        Xt : list of list of tuple
            See :attr:`clusters_`.

        """
        Xt = self.fit_predict(X, y, **fit_params)
        return Xt


class Agglomerative:
    """Base class for agglomerative clustering.

    Implements scikit-learn's tree building algorithms for linkage-based
    clustering. Inheriting classes may implement stopping rules for determining
    the number of clusters.

    Attributes
    ----------
    children_ : ndarray of shape (n_nodes - 1, 2)
        The children of each non-leaf node. Values less than ``n_samples``
        correspond to leaves of the tree which are the original samples.
        A node ``i`` greater than or equal to ``n_samples`` is a non-leaf
        node and has children ``children_[i - n_samples]``. Alternatively
        at the ``i``-th iteration, ``children[i][0]`` and ``children[i][1]``
        are merged to form node ``n_samples + i``.

    n_leaves_ : int
        Number of leaves in the hierarchical tree.

    distances_ : ndarray of shape (n_nodes - 1,)
        Distances between nodes in the corresponding place in
        :attr:`children_`.

    """

    def _build_tree(self, X):
        memory = check_memory(self.memory)

        if self.linkage == "ward" and self.affinity != "euclidean":
            raise ValueError(f"{self.affinity} was provided as affinity. "
                             f"Ward can only work with Euclidean distances.")
        if self.linkage not in _TREE_BUILDERS:
            raise ValueError(f"Unknown linkage type {self.linkage}. Valid "
                             f"options are {_TREE_BUILDERS.keys()}")
        tree_builder = _TREE_BUILDERS[self.linkage]

        # Construct the tree
        kwargs = {}
        if self.linkage != 'ward':
            kwargs['linkage'] = self.linkage
            kwargs['affinity'] = self.affinity

        out = memory.cache(tree_builder)(
            X, n_clusters=None, return_distance=True, **kwargs)

        # Scikit-learn's tree_builder returns a tuple (children,
        # n_connected_components, n_leaves, parent, distances)
        self.children_, _, self.n_leaves_, _, self.distances_ = out


[docs]class FirstSimpleGap(ClusterMixin, BaseEstimator, Agglomerative): """Agglomerative clustering cutting the dendrogram at the first instance of a sufficiently large gap. A simple threshold is determined as a fraction of the largest linkage value in the full dendrogram. If possible, the dendrogram is cut at the first occurrence of a gap, between the linkage values of successive merges, which exceeds this threshold. Otherwise, a single cluster is returned. The algorithm can be partially overridden to ensure that the final number of clusters does not exceed a certain threshold, by passing a parameter `max_fraction`. Parameters ---------- relative_gap_size : float, optional, default: ``0.3`` The fraction of the largest linkage in the dendrogram to be used as a threshold for determining a large enough gap. max_fraction : float or None, optional, default: ``None`` When not ``None``, the algorithm is constrained to produce no more than ``max_fraction * (n_samples - 1)`` clusters, even if a candidate gap is observed in the iterative process which would produce a greater number of clusters. affinity : str, optional, default: ``'euclidean'`` Metric used to compute the linkage. Can be ``'euclidean'``, ``'l1'``, ``'l2'``, ``'manhattan'``, ``'cosine'``, or ``'precomputed'``. If linkage is ``'ward'``, only ``'euclidean'`` is accepted. If ``'precomputed'``, a distance matrix (instead of a similarity matrix) is needed as input for :meth:`fit`. memory : None, str or object with the joblib.Memory interface, \ optional, default: ``None`` Used to cache the output of the computation of the tree. By default, no caching is done. If a string is given, it is the path to the caching directory. linkage : ``'ward'`` | ``'complete'`` | ``'average'`` | ``'single'``, \ optional, default: ``'single'`` Which linkage criterion to use. The linkage criterion determines which distance to use between sets of observation. The algorithm will merge the pairs of cluster that minimize this criterion. - ``'ward'`` minimizes the variance of the clusters being merged. - ``'average'`` uses the average of the distances of each observation of the two sets. - ``'complete'`` linkage uses the maximum distances between all observations of the two sets. - ``'single'`` uses the minimum of the distances between all observations of the two sets. Attributes ---------- n_clusters_ : int The number of clusters found by the algorithm. labels_ : ndarray of shape (n_samples,) Cluster labels for each sample. children_ : ndarray of shape (n_nodes - 1, 2) The children of each non-leaf node. Values less than ``n_samples`` correspond to leaves of the tree which are the original samples. A node ``i`` greater than or equal to ``n_samples`` is a non-leaf node and has children ``children_[i - n_samples]``. Alternatively at the ``i``-th iteration, ``children[i][0]`` and ``children[i][1]`` are merged to form node ``n_samples + i``. n_leaves_ : int Number of leaves in the hierarchical tree. distances_ : ndarray of shape (n_nodes - 1,) Distances between nodes in the corresponding place in :attr:`children_`. See also -------- FirstHistogramGap """ _hyperparameters = { 'relative_gap_size': { 'type': Real, 'in': Interval(0, 1, closed='right')}, 'max_fraction': { 'type': (Real, type(None)), 'in': Interval(0, 1, closed='right')}, 'affinity': {'type': str}, 'linkage': {'type': str} }
[docs] def __init__(self, relative_gap_size=0.3, max_fraction=None, affinity='euclidean', memory=None, linkage='single'): self.relative_gap_size = relative_gap_size self.max_fraction = max_fraction self.affinity = affinity self.memory = memory self.linkage = linkage
[docs] def fit(self, X, y=None): """Fit the agglomerative clustering from features or distance matrix. The stopping rule is used to determine :attr:`n_clusters_`, and the full dendrogram is cut there to compute :attr:`labels_`. Parameters ---------- X : ndarray of shape (n_samples, n_features) or (n_samples, n_samples) Training instances to cluster, or distances between instances if ``affinity='precomputed'``. y : ignored Not used, present here for API consistency by convention. Returns ------- self """ X = check_array(X) validate_params( self.get_params(), self._hyperparameters, exclude=['memory']) _max_fraction = 1. if self.max_fraction is None else self.max_fraction if X.shape[0] == 1: self.labels_ = np.array([0]) self.n_clusters_ = 1 return self self._build_tree(X) min_gap_size = self.relative_gap_size * self.distances_[-1] self.n_clusters_ = _num_clusters_simple( self.distances_, min_gap_size, _max_fraction) # Cut the tree to find labels # TODO: Verify whether Daniel Mullner's implementation of this step # offers any advantage self.labels_ = _hc_cut(self.n_clusters_, self.children_, self.n_leaves_) return self
[docs]class FirstHistogramGap(ClusterMixin, BaseEstimator, Agglomerative): """Agglomerative clustering with stopping rule given by a histogram-based version of the first gap method, introduced in [1]_. Given a frequency threshold f and an initial integer k: 1) create a histogram of k equally spaced bins of the number of merges in the dendrogram, as a function of the linkage parameter; 2) the value of linkage at which the tree is to be cut is the first one after which a bin of height no greater than f (i.e. a "gap") is observed; 3) if no gap is observed, increase k and repeat 1) and 2) until termination. The algorithm can be partially overridden to ensure that the final number of clusters does not exceed a certain threshold, by passing a parameter `max_fraction`. Parameters ---------- freq_threshold : int, optional, default: ``0`` The frequency threshold for declaring that a gap in the histogram of merges is present. max_fraction : float or None, optional, default: ``None`` When not ``None``, the algorithm is constrained to produce no more than ``max_fraction * (n_samples - 1)`` clusters, even if a candidate gap is observed in the iterative process which would produce a greater number of clusters. n_bins_start : int, optional, default: ``5`` The initial number of bins in the iterative process for finding a gap in the histogram of merges. affinity : str, optional, default: ``'euclidean'`` Metric used to compute the linkage. Can be ``'euclidean'``, ``'l1'``, ``'l2'``, ``'manhattan'``, ``'cosine'``, or ``'precomputed'``. If linkage is ``'ward'``, only ``'euclidean'`` is accepted. If ``'precomputed'``, a distance matrix (instead of a similarity matrix) is needed as input for :meth:`fit`. memory : None, str or object with the joblib.Memory interface, \ optional, default: ``None`` Used to cache the output of the computation of the tree. By default, no caching is done. If a string is given, it is the path to the caching directory. linkage : ``'ward'`` | ``'complete'`` | ``'average'`` | ``'single'``, \ optional, default: ``'single'`` Which linkage criterion to use. The linkage criterion determines which distance to use between sets of observation. The algorithm will merge the pairs of cluster that minimize this criterion. - ``'ward'`` minimizes the variance of the clusters being merged. - ``'average'`` uses the average of the distances of each observation of the two sets. - ``'complete'`` linkage uses the maximum distances between all observations of the two sets. - ``'single'`` uses the minimum of the distances between all observations of the two sets. Attributes ---------- n_clusters_ : int The number of clusters found by the algorithm. labels_ : ndarray of shape (n_samples,) Cluster labels for each sample. children_ : ndarray of shape (n_nodes - 1, 2) The children of each non-leaf node. Values less than ``n_samples`` correspond to leaves of the tree which are the original samples. A node ``i`` greater than or equal to ``n_samples`` is a non-leaf node and has children ``children_[i - n_samples]``. Alternatively at the ``i``-th iteration, ``children[i][0]`` and ``children[i][1]`` are merged to form node ``n_samples + i``. n_leaves_ : int Number of leaves in the hierarchical tree. distances_ : ndarray of shape (n_nodes - 1,) Distances between nodes in the corresponding place in :attr:`children_`. See also -------- FirstSimpleGap References ---------- .. [1] G. Singh, F. Mémoli, and G. Carlsson, "Topological methods for the analysis of high dimensional data sets and 3D object recognition"; in *SPBG*, pp. 91--100, 2007. """ _hyperparameters = { 'freq_threshold': { 'type': int, 'in': Interval(0, np.inf, closed='left')}, 'max_fraction': { 'type': (Real, type(None)), 'in': Interval(0, 1, closed='right')}, 'n_bins_start': { 'type': int, 'in': Interval(1, np.inf, closed='left')}, 'affinity': {'type': str}, 'linkage': {'type': str} }
[docs] def __init__(self, freq_threshold=0, max_fraction=None, n_bins_start=5, affinity='euclidean', memory=None, linkage='single'): self.freq_threshold = freq_threshold self.max_fraction = max_fraction self.n_bins_start = n_bins_start self.affinity = affinity self.memory = memory self.linkage = linkage
[docs] def fit(self, X, y=None): """Fit the agglomerative clustering from features or distance matrix. The stopping rule is used to determine :attr:`n_clusters_`, and the full dendrogram is cut there to compute :attr:`labels_`. Parameters ---------- X : ndarray of shape (n_samples, n_features) or (n_samples, n_samples) Training instances to cluster, or distances between instances if ``affinity='precomputed'``. y : ignored Not used, present here for API consistency by convention. Returns ------- self """ X = check_array(X) validate_params( self.get_params(), self._hyperparameters, exclude=['memory']) _max_fraction = 1. if self.max_fraction is None else self.max_fraction if X.shape[0] == 1: self.labels_ = np.array([0]) self.n_clusters_ = 1 return self self._build_tree(X) self.n_clusters_ = _num_clusters_histogram( self.distances_, self.freq_threshold, self.n_bins_start, _max_fraction) # Cut the tree to find labels # TODO: Verify whether Daniel Mullner's implementation of this step # offers any advantage self.labels_ = _hc_cut(self.n_clusters_, self.children_, self.n_leaves_) return self