Source code for lingam.multi_group_resit

"""
Python implementation of the LiNGAM algorithms.
The LiNGAM Project: https://sites.google.com/view/sshimizu06/lingam
"""

import numbers
import copy

import numpy as np
from sklearn.utils import check_array, resample

from .bootstrap import BootstrapResult
from .hsic import hsic_test_gamma


[docs] class MultiGroupRESIT: """Implementation of RESIT(regression with subsequent independence test) Algorithm [1]_ with multiple groups References ---------- .. [1] Jonas Peters, Joris M Mooij, Dominik Janzing, and Bernhard Sch ̈olkopf. Causal discovery with continuous additive noise models. Journal of Machine Learning Research, 15:2009-2053, 2014. Notes ----- RESIT algorithm returns an **adjacency matrix consisting of zeros or ones**, rather than an adjacency matrix consisting of causal coefficients, in order to estimate nonlinear causality. """
[docs] def __init__(self, regressor, random_state=None, prior_knowledge=None, alpha=0.01): """Construct a RESIT model. Parameters ---------- regressor : regressor object implementing 'fit' and 'predict' function (default=None) Regressor to compute residuals. This regressor object must have ``fit`` method and ``predict`` function like scikit-learn's model. random_state : int, optional (default=None) ``random_state`` is the seed used by the random number generator. prior_knowledge : array-like, shape (n_features, n_features), optional (default=None) Prior knowledge used for causal discovery, where ``n_features`` is the number of features. The elements of prior knowledge matrix are defined as follows [1]_: * ``0`` : :math:`x_i` does not have a directed path to :math:`x_j` * ``1`` : :math:`x_i` has a directed path to :math:`x_j` * ``-1`` : No prior knowledge is available to know if either of the two cases above (0 or 1) is true. alpha : float, optional (default=0.01) Alpha level for HSIC independence test when removing superfluous edges. """ # Check parameters if regressor is None: raise ValueError("Specify regression model in 'regressor'.") else: if not (hasattr(regressor, "fit") and hasattr(regressor, "predict")): raise ValueError("'regressor' has no fit or predict method.") if alpha < 0.0: raise ValueError("alpha must be an float greater than 0.") self._Aknw = prior_knowledge self._alpha = alpha self._reg = regressor if self._Aknw is not None: self._Aknw = check_array(self._Aknw) self._Aknw = np.where(self._Aknw < 0, np.nan, self._Aknw)
[docs] def fit(self, X_list): """Fit the model to multiple datasets. Parameters ---------- X_list : list, shape [X, ...] Multiple datasets for training, where ``X`` is an dataset. The shape of ''X'' is (n_samples, n_features), where ``n_samples`` is the number of samples and ``n_features`` is the number of features. Returns ------- self : object Returns the instance itself. """ # Check parameters X_list = self._check_X_list(X_list) n_features = X_list[0].shape[1] # Check prior knowledge if self._Aknw is not None: if (n_features, n_features) != self._Aknw.shape: raise ValueError( "The shape of prior knowledge must be (n_features, n_features)" ) else: # Extract all partial orders in prior knowledge matrix self._partial_orders = self._extract_partial_orders(self._Aknw) # Determine topological order pa, pi = self._estimate_order(X_list) # Remove superfluous edges pa_list = [self._remove_edges(X, copy.deepcopy(pa), pi) for X in X_list] # Create adjacency matrix from parent-child relationship am_list = [] for pa in pa_list: adjacency_matrix = np.zeros([n_features, n_features]) for i, parents in pa.items(): for p in parents: adjacency_matrix[i, p] = 1 am_list.append(adjacency_matrix) self._causal_order = pi self._adjacency_matrices = am_list return self
[docs] def bootstrap(self, X_list, n_sampling): """Evaluate the statistical reliability of DAG based on the bootstrapping. Parameters ---------- X_list : array-like, shape (X, ...) Multiple datasets for training, where ``X`` is an dataset. The shape of ''X'' is (n_samples, n_features), where ``n_samples`` is the number of samples and ``n_features`` is the number of features. n_sampling : int Number of bootstrapping samples. Returns ------- results : array-like, shape (BootstrapResult, ...) Returns the results of bootstrapping for multiple datasets. """ # Check parameters X_list = self._check_X_list(X_list) if isinstance(n_sampling, (numbers.Integral, np.integer)): if not 0 < n_sampling: raise ValueError("n_sampling must be an integer greater than 0.") else: raise ValueError("n_sampling must be an integer greater than 0.") # Bootstrapping adjacency_matrices_list = np.zeros( [len(X_list), n_sampling, self._n_features, self._n_features] ) total_effects_list = np.zeros( [len(X_list), n_sampling, self._n_features, self._n_features] ) for n in range(n_sampling): resampled_X_list = [resample(X) for X in X_list] self.fit(resampled_X_list) for i, am in enumerate(self._adjacency_matrices): adjacency_matrices_list[i][n] = am result_list = [] for am, te in zip(adjacency_matrices_list, total_effects_list): result_list.append(BootstrapResult(am, te)) return result_list
def _check_X_list(self, X_list): """Check input X list.""" if not isinstance(X_list, list): raise ValueError("X_list must be a list.") if len(X_list) < 2: raise ValueError("X_list must be a list containing at least two items") self._k = len(X_list) self._n_features = check_array(X_list[0]).shape[1] X_list_ = [] for X in X_list: X_ = check_array(X) if X_.shape[1] != self._n_features: raise ValueError( "X_list must be a list with the same number of features" ) X_list_.append(X_) return X_list_ def _estimate_order(self, X_list): """Determine topological order""" n_features = X_list[0].shape[1] S = np.arange(n_features) pa = {} pi = [] for _ in range(n_features): Sc = self._search_candidate(S) if len(Sc) == 1: k = Sc[0] else: fisher_stats = [] for k in Sc: fisher_stat = 0 for X in X_list: # Regress Xk on {Xi} predictors = [i for i in S if i != k] self._reg.fit(X[:, predictors], X[:, k]) residual = X[:, k] - self._reg.predict(X[:, predictors]) # Measure dependence between residuals and {Xi} _, hsic_p = hsic_test_gamma(residual, X[:, predictors]) fisher_stat += np.inf if hsic_p == 0 else -2 * np.log(hsic_p) fisher_stats.append(fisher_stat) k = Sc[np.argmin(fisher_stats)] S = S[S != k] pa[k] = S.tolist() pi.insert(0, k) # Update partial orders if self._Aknw is not None: self._partial_orders = self._partial_orders[ self._partial_orders[:, 1] != k ] # The topological order after exploration is revised based on prior knowledge. if self._Aknw is not None: _pk = self._Aknw.copy() np.fill_diagonal(_pk, 0) pk_pa = {} for i, parents in pa.items(): new_parents = [p for p in parents if _pk[i, p] != 0] pk_pa[i] = new_parents pa = pk_pa return pa, pi def _remove_edges(self, X, pa, pi): """Remove superfluous edges""" for k in range(1, X.shape[1]): parents = pa[pi[k]].copy() for l in parents: # Regress Xk on {Xi} predictors = [i for i in pa[pi[k]] if i != l] if len(predictors) >= 1: self._reg.fit(X[:, predictors], X[:, pi[k]]) residual = X[:, pi[k]] - self._reg.predict(X[:, predictors]) else: residual = X[:, pi[k]] # Measure dependence between residuals and {Xi} _, hsic_p = hsic_test_gamma(residual, X[:, parents]) # eliminate edge if hsic_p > self._alpha: pa[pi[k]].remove(l) return pa def _extract_partial_orders(self, pk): """Extract partial orders from prior knowledge.""" path_pairs = np.array(np.where(pk == 1)).transpose() no_path_pairs = np.array(np.where(pk == 0)).transpose() # Check for inconsistencies in pairs with path check_pairs = np.concatenate([path_pairs, path_pairs[:, [1, 0]]]) if len(check_pairs) > 0: pairs, counts = np.unique(check_pairs, axis=0, return_counts=True) if len(pairs[counts > 1]) > 0: raise ValueError( f"The prior knowledge contains inconsistencies at the following indices: {pairs[counts>1].tolist()}" ) # Check for inconsistencies in pairs without path # If there are duplicate pairs without path, they cancel out and are not ordered. check_pairs = np.concatenate([no_path_pairs, no_path_pairs[:, [1, 0]]]) if len(check_pairs) > 0: pairs, counts = np.unique(check_pairs, axis=0, return_counts=True) check_pairs = np.concatenate([no_path_pairs, pairs[counts > 1]]) pairs, counts = np.unique(check_pairs, axis=0, return_counts=True) no_path_pairs = pairs[counts < 2] check_pairs = np.concatenate([path_pairs, no_path_pairs[:, [1, 0]]]) if len(check_pairs) == 0: # If no pairs are extracted from the specified prior knowledge, return check_pairs pairs = np.unique(check_pairs, axis=0) return pairs[:, [1, 0]] # [to, from] -> [from, to] def _search_candidate(self, S): """Search for candidate features""" # If no prior knowledge is specified, nothing to do. if self._Aknw is None: return S # Candidate features that are not to the left of the partial orders if len(self._partial_orders) != 0: Sc = [i for i in S if i not in self._partial_orders[:, 0]] return Sc return S
[docs] def estimate_total_effect(self, X, from_index, to_index): """Estimate total effect using causal model. Parameters ---------- X : array-like, shape (n_samples, n_features) Original data, where n_samples is the number of samples and n_features is the number of features. from_index : Index of source variable to estimate total effect. to_index : Index of destination variable to estimate total effect. Returns ------- total_effect : float **Because RESIT is a nonlinear algorithm, it cannot estimate the total effect and always returns a value of zero** """ return 0
[docs] def get_error_independence_p_values(self, X): """Calculate the p-value matrix of independence between error variables. Parameters ---------- X : array-like, shape (n_samples, n_features) Original data, where n_samples is the number of samples and n_features is the number of features. Returns ------- independence_p_values : array-like, shape (n_features, n_features) **RESIT always returns a zero matrix** """ n_features = X.shape[1] p_values = np.zeros([n_features, n_features]) return p_values
@property def causal_order_(self): """Estimated causal ordering. Returns ------- causal_order_ : array-like, shape (n_features) The causal order of fitted model, where n_features is the number of features. """ return self._causal_order @property def adjacency_matrices_(self): """Estimated adjacency matrices. Returns ------- adjacency_matrices_ : array-like, shape (B, ...) The list of adjacency matrix B for multiple datasets. The shape of B is (n_features, n_features), where n_features is the number of features. """ return self._adjacency_matrices