# License: MIT
import numpy as np
from sklearn.naive_bayes import GaussianNB
from .base import BaseCoTrainEstimator
from ..utils.utils import check_Xs, check_Xs_y_nan_allowed
[docs]class CTClassifier(BaseCoTrainEstimator):
    r"""
    This class implements the co-training classifier for supervised and
    semi-supervised learning with the framework as described in [#1CTC]_.
    The best use case is when the 2 views of input data are sufficiently
    distinct and independent as detailed in [#1CTC]_. However, this can
    also be successful when a single matrix of input data is given as
    both views and two estimators are chosen which are quite different.
    [#2CTC]_. See the examples below.
    In the semi-supervised case, performance can vary greatly, so using
    a separate validation set or cross validation procedure is
    recommended to ensure the classifier has fit well.
    Parameters
    ----------
    estimator1 : classifier object, (default=sklearn GaussianNB)
        The classifier object which will be trained on view 1 of the data.
        This classifier should support the predict_proba() function so that
        classification probabilities can be computed and co-training can be
        performed effectively.
    estimator2 : classifier object, (default=sklearn GaussianNB)
        The classifier object which will be trained on view 2 of the data.
        Does not need to be of the same type as ``estimator1``, but should
        support predict_proba().
    p : int, optional (default=None)
        The number of positive classifications from the unlabeled_pool
        training set which will be given a positive "label". If None, the
        default is the floor of the ratio of positive to negative examples
        in the labeled training data (at least 1). If only one of ``p`` or
        ``n`` is not None, the other will be set to be the same. When the
        labels are 0 or 1, positive is defined as 1, and in general, positive
        is the larger label.
    n : int, optional (default=None)
        The number of negative classifications from the unlabeled_pool
        training set which will be given a negative "label". If None, the
        default is the floor of the ratio of positive to negative examples
        in the labeled training data (at least 1). If only one of ``p`` or
        ``n`` is not None, the other will be set to be the same. When the
        labels are 0 or 1, negative is defined as 0, and in general, negative
        is the smaller label.
    unlabeled_pool_size : int, optional (default=75)
        The number of unlabeled_pool samples which will be kept in a
        separate pool for classification and selection by the updated
        classifier at each training iteration.
    num_iter : int, optional (default=50)
        The maximum number of training iterations to run.
    random_state : int (default=None)
        The starting random seed for fit() and class operations, passed to
        numpy.random.seed().
    Attributes
    ----------
    estimator1_ : classifier object
        The classifier used on view 1.
    estimator2_ : classifier object
        The classifier used on view 2.
    class_name_: string
        The name of the class.
    p_ : int, optional (default=None)
        The number of positive classifications from the unlabeled_pool
        training set which will be given a positive "label" each round.
    n_ : int, optional (default=None)
        The number of negative classifications from the unlabeled_pool
        training set which will be given a negative "label" each round.
    classes_ : array-like of shape (n_classes,)
        Unique class labels.
    Examples
    --------
    >>> # Supervised learning of single-view data with 2 distinct estimators
    >>> from mvlearn.semi_supervised import CTClassifier
    >>> from mvlearn.datasets import load_UCImultifeature
    >>> import numpy as np
    >>> from sklearn.ensemble import RandomForestClassifier
    >>> from sklearn.naive_bayes import GaussianNB
    >>> from sklearn.model_selection import train_test_split
    >>> data, labels = load_UCImultifeature(select_labeled=[0,1])
    >>> X1 = data[0]  # Only using the first view
    >>> X1_train, X1_test, l_train, l_test = train_test_split(X1, labels)
    >>> # Supervised learning with a single view of data and 2 estimator types
    >>> estimator1 = GaussianNB()
    >>> estimator2 = RandomForestClassifier()
    >>> ctc = CTClassifier(estimator1, estimator2, random_state=1)
    >>> # Use the same matrix for each view
    >>> ctc = ctc.fit([X1_train, X1_train], l_train)
    >>> preds = ctc.predict([X1_test, X1_test])
    >>> print("Accuracy: ", sum(preds==l_test) / len(preds))
    Accuracy:  0.97
    Notes
    -----
    Multi-view co-training is most helpful for tasks in semi-supervised
    learning where each view offers unique information not seen in the
    other. As is shown in the example notebooks for using this algorithm,
    multi-view co-training can provide good classification results even
    when number of unlabeled samples far exceeds the number of labeled
    samples. This classifier uses 2 classifiers which work individually
    on each view but which share information and thus result in improved
    performance over looking at the views completely separately or even
    when concatenating the views to get more features in a single-view
    setting. The classifier can be initialized with or without the
    classifiers desired for each view being specified, but if the
    classifier for a certain view is specified, then it must support a
    predict_proba() method in order to give a sense of the most likely labels
    for different examples. This is because the algorithm must be able to
    determine which of the training samples it is most confident about during
    training epochs. The algorithm, as first proposed by Blum and Mitchell,
    is described in detail below.
    *Algorithm*
    Given:
        * a set *L* of labeled training samples (with 2 views)
        * a set *U* of unlabeled samples (with 2 views)
    Create a pool *U'* of examples by choosing *u* examples at random
    from *U*
    Loop for *k* iterations
        * Use *L* to train a classifier *h1* (``estimator1``) that considers
          only the view 1 portion of the data (i.e. Xs[0])
        * Use *L* to train a classifier *h2* (``estimator2``) that considers
          only the view 2 portion of the data (i.e. Xs[1])
        * Allow *h1* to label *p* (``self.p_``) positive and *n* (``self.n_``)
          negative samples from view 1 of *U'*
        * Allow *h2* to label *p* positive and *n* negative samples
          from view 2 of *U'*
        * Add these self-labeled samples to *L*
        * Randomly take 2*p* + 2*n* samples from *U* to replenish *U'*
    References
    ----------
    .. [#1CTC] Blum, A., and Mitchell, T. "Combining labeled and unlabeled
            data with co-training." In Proceedings of the Eleventh Annual
            Conference on Computational Learning Theory, pages 92–100, 1998.
    .. [#2CTC] Goldman, Sally, and Yan Zhou. "Enhancing supervised
            learning with unlabeled data." In Proceedings of the Eleventh
            Annual Conference on Computational Learning Theory, 2000.
    """
    def __init__(self,
                 estimator1=None,
                 estimator2=None,
                 p=None,
                 n=None,
                 unlabeled_pool_size=75,
                 num_iter=50,
                 random_state=None
                 ):
        # initialize a BaseCTEstimator object
        super().__init__(estimator1, estimator2, random_state)
        # if not given, set classifiers as gaussian naive bayes estimators
        if self.estimator1_ is None:
            self.estimator1_ = GaussianNB()
        if self.estimator2_ is None:
            self.estimator2_ = GaussianNB()
        # If only 1 of p or n is not None, set them equal
        if (p is not None and n is None):
            n = p
            self.p_, self.n_ = p, n
        elif (p is None and n is not None):
            p = n
            self.p_, self.n_ = p, n
        else:
            self.p_, self.n_ = p, n
        self.n_views = 2  # only 2 view learning supported currently
        self.class_name_ = "CTClassifier"
        self.unlabeled_pool_size = unlabeled_pool_size
        self.num_iter = num_iter
        self._check_params()
    def _check_params(self):
        r"""
        Checks that cotraining parameters are valid. Throws AttributeError
        if estimators are invalid. Throws ValueError if any other parameters
        are not valid. The checks performed are:
            - estimator1 and estimator2 have predict_proba methods
            - p and n are both positive
            - unlabeled_pool_size is positive
            - num_iter is positive
        """
        # verify that estimator1 and estimator2 have predict_proba
        if (not hasattr(self.estimator1_, 'predict_proba') or
                not hasattr(self.estimator2_, 'predict_proba')):
            raise AttributeError("Co-training classifier must be initialized "
                                 "with classifiers supporting "
                                 "predict_proba().")
        if (self.p_ is not None and self.p_ <= 0) or (self.n_ is not None and
                                                      self.n_ <= 0):
            raise ValueError("Both p and n must be positive.")
        if self.unlabeled_pool_size <= 0:
            raise ValueError("unlabeled_pool_size must be positive.")
        if self.num_iter <= 0:
            raise ValueError("num_iter must be positive.")
    def fit(self, Xs, y):
        r"""
        Fit the classifier object to the data in Xs, y.
        Parameters
        ----------
        Xs : list of array-likes or numpy.ndarray
            - Xs length: n_views
            - Xs[i] shape: (n_samples, n_features_i)
            A list of the different views of data to train on.
        y : array, shape (n_samples,)
            The labels of the training data. Unlabeled examples should
            have label np.nan.
        Returns
        -------
        self : returns an instance of self
        """
        # verify Xs and y
        Xs, y = check_Xs_y_nan_allowed(Xs,
                                       y,
                                       multiview=True,
                                       enforce_views=self.n_views,
                                       max_classes=2, min_classes=1)
        y = np.array(y)
        if self.random_state is not None:
            np.random.seed(self.random_state)
        self.classes_ = list(set(y[~np.isnan(y)]))
        self.n_classes = len(self.classes_)
        # extract the multiple views given
        X1 = Xs[0]
        X2 = Xs[1]
        # if don't have 2 classes of labeled data, then just fit and return,
        # since can't do any iterations of cotraining
        if self.n_classes > 1:
            # if both p & n are none, set as ratio of one class to the other
            if (self.p_ is None and self.n_ is None):
                num_class_n = sum(1 for y_n in y if y_n == self.classes_[0])
                num_class_p = sum(1 for y_p in y if y_p == self.classes_[1])
                p_over_n_ratio = num_class_p // num_class_n
                if p_over_n_ratio > 1:
                    self.p_, self.n_ = p_over_n_ratio, 1
                else:
                    self.n_, self.p_ = num_class_n // num_class_p, 1
            # the full set of unlabeled samples
            U = [i for i, y_i in enumerate(y) if np.isnan(y_i)]
            # shuffle unlabeled_pool data for easy random access
            np.random.shuffle(U)
            # the small pool of unlabled samples to draw from in training
            unlabeled_pool = U[-min(len(U), self.unlabeled_pool_size):]
            # the labeled samples
            L = [i for i, y_i in enumerate(y) if ~np.isnan(y_i)]
            # remove the pool from overall unlabeled data
            U = U[:-len(unlabeled_pool)]
            # number of rounds of co-training
            it = 0
            # machine epsilon
            eps = np.finfo(float).eps
            while it < self.num_iter and U:
                it += 1
                # fit each model to its respective view
                self.estimator1_.fit(X1[L], y[L])
                self.estimator2_.fit(X2[L], y[L])
                # predict log probability for greater spread in confidence
                y1_prob = np.log(self.estimator1_.
                                 predict_proba(X1[unlabeled_pool]) + eps)
                y2_prob = np.log(self.estimator2_.
                                 predict_proba(X2[unlabeled_pool]) + eps)
                n, p = [], []
                # take the most confident labeled examples from the
                # unlabeled pool in each category and put them in L
                for i in (y1_prob[:, 0].argsort())[-self.n_:]:
                    if y1_prob[i, 0] > np.log(0.5):
                        n.append(i)
                for i in (y1_prob[:, 1].argsort())[-self.p_:]:
                    if y1_prob[i, 1] > np.log(0.5):
                        p.append(i)
                for i in (y2_prob[:, 0].argsort())[-self.n_:]:
                    if y2_prob[i, 0] > np.log(0.5):
                        n.append(i)
                for i in (y2_prob[:, 1].argsort())[-self.p_:]:
                    if y2_prob[i, 1] > np.log(0.5):
                        p.append(i)
                # create new labels for new additions to the labeled group
                y[[unlabeled_pool[x] for x in n]] = self.classes_[0]
                y[[unlabeled_pool[x] for x in p]] = self.classes_[1]
                L.extend([unlabeled_pool[x] for x in p])
                L.extend([unlabeled_pool[x] for x in n])
                # remove newly labeled samples from unlabeled_pool
                unlabeled_pool = [elem for elem in unlabeled_pool
                                  if not (elem in np.array(unlabeled_pool)[p]
                                          or elem in
                                          np.array(unlabeled_pool)[n])]
                # add new elements to unlabeled_pool
                add_counter = 0
                num_to_add = len(p) + len(n)
                while add_counter != num_to_add and U:
                    add_counter += 1
                    unlabeled_pool.append(U.pop())
        # if only had 1 class in the labeled examples
        else:
            # the labeled sample indices
            L = [i for i, y_i in enumerate(y) if ~np.isnan(y_i)]
        # fit the overall model on fully "labeled" data
        self.estimator1_.fit(X1[L], y[L])
        self.estimator2_.fit(X2[L], y[L])
        return self
    def predict(self, Xs):
        r"""
        Predict the classes of the examples in the two input views.
        Parameters
        ----------
        Xs : list of array-likes or numpy.ndarray
            - Xs length: n_views
            - Xs[i] shape: (n_samples, n_features_i)
            A list of the different views of data to predict.
        Returns
        -------
        y_pred : array-like (n_samples,)
            The predicted class of each input example. If the two classifiers
            don't agree, pick the one with the highest predicted probability
            from predict_proba().
        """
        Xs = check_Xs(Xs,
                      multiview=True,
                      enforce_views=self.n_views)
        X1 = Xs[0]
        X2 = Xs[1]
        # predict each view independently
        y1 = self.estimator1_.predict(X1)
        y2 = self.estimator2_.predict(X2)
        # initialize
        y_pred = np.zeros(X1.shape[0],)
        # predict samples based on trained classifiers
        for i, (y1_i, y2_i) in enumerate(zip(y1, y2)):
            # if classifiers agree, use their prediction
            if y1_i == y2_i:
                y_pred[i] = y1_i
            # if classifiers don't agree, take the more confident
            else:
                y1_probs = self.estimator1_.predict_proba([X1[i]])[0]
                y2_probs = self.estimator2_.predict_proba([X2[i]])[0]
                sum_y_probs = [prob1 + prob2 for (prob1, prob2) in
                               zip(y1_probs, y2_probs)]
                max_sum_prob = max(sum_y_probs)
                y_pred[i] = self.classes_[sum_y_probs.index(max_sum_prob)]
        return y_pred
    def predict_proba(self, Xs):
        r"""
        Predict the probability of each example belonging to a each class.
        Parameters
        ----------
        Xs : list of array-likes or numpy.ndarray
            - Xs length: n_views
            - Xs[i] shape: (n_samples, n_features_i)
            A list of the different views of data to predict.
        Returns
        -------
        y_proba : array-like (n_samples, n_classes)
            The probability of each sample being in each class.
        """
        Xs = check_Xs(Xs,
                      multiview=True,
                      enforce_views=self.n_views)
        X1 = Xs[0]
        X2 = Xs[1]
        # predict each probability independently
        y1_proba = self.estimator1_.predict_proba(X1)
        y2_proba = self.estimator2_.predict_proba(X2)
        # return the average probability for the sample
        return (y1_proba + y2_proba) * .5