Source code for lingam.group_direct_lingam

"""
Python implementation of the LiNGAM algorithms.
The LiNGAM Project: https://sites.google.com/view/sshimizu06/lingam
"""

import numbers

import numpy as np
from scipy.stats import laplace
from sklearn.linear_model import LinearRegression
from sklearn.utils import check_array, resample

from .utils import predict_adaptive_lasso, calculate_total_effect
from .bootstrap import BootstrapResult


[docs] class GroupDirectLiNGAM: """Implementation of GroupDirectLiNGAM Algorithm [1]_ References ---------- .. [1] D. Entner and P. O. Hoyer. Estimating a causal order among groups of variables in linear models. In Proc. 22nd International Conference on Artificial Neural Networks (ICANN2012), pp. 83--90, Lausanne, Switzerland, 2012. """
[docs] def __init__(self, prior_knowledge=None): """Construct a GroupDirectLiNGAM model. Parameters ---------- prior_knowledge : array-like, shape (n_groups, n_groups), optional (default=None) Prior knowledge used for causal discovery, where ``n_groups`` is the number of groups. The elements of prior knowledge matrix are defined as follows [1]_: * ``0`` : :math:`x_i` does not have a directed path to :math:`x_j` * ``1`` : :math:`x_i` has a directed path to :math:`x_j` * ``-1`` : No prior knowledge is available to know if either of the two cases above (0 or 1) is true. """ self._adjacency_matrix = None self._causal_order = [] self._Aknw = prior_knowledge if self._Aknw is not None: self._Aknw = check_array(self._Aknw) self._Aknw = np.where(self._Aknw < 0, np.nan, self._Aknw)
[docs] def fit(self, X, groups): """Fit the model to X. Parameters ---------- X : array-like, shape (n_samples, n_features) Training data, where ``n_samples`` is the number of samples and ``n_features`` is the number of features. groups : array-like, shape (n_groups) The list of features for each group. where ``n_groups`` is the number of groups. Returns ------- self : object Returns the instance itself. """ # Check parameters X = check_array(X) n_groups = len(groups) # Check prior knowledge if self._Aknw is not None: if (n_groups, n_groups) != self._Aknw.shape: raise ValueError( "The shape of prior knowledge must be (n_groups, n_groups)" ) else: # Extract all partial orders in prior knowledge matrix self._partial_orders = self._extract_partial_orders(self._Aknw) # Causal discovery self._causal_order = self._estimate_causal_order(X, groups) return self._estimate_adjacency_matrix(X, groups, prior_knowledge=self._Aknw)
def _extract_partial_orders(self, pk): """Extract partial orders from prior knowledge.""" path_pairs = np.array(np.where(pk == 1)).transpose() no_path_pairs = np.array(np.where(pk == 0)).transpose() # Check for inconsistencies in pairs with path check_pairs = np.concatenate([path_pairs, path_pairs[:, [1, 0]]]) if len(check_pairs) > 0: pairs, counts = np.unique(check_pairs, axis=0, return_counts=True) if len(pairs[counts > 1]) > 0: raise ValueError( f"The prior knowledge contains inconsistencies at the following indices: {pairs[counts>1].tolist()}" ) # Check for inconsistencies in pairs without path # If there are duplicate pairs without path, they cancel out and are not ordered. check_pairs = np.concatenate([no_path_pairs, no_path_pairs[:, [1, 0]]]) if len(check_pairs) > 0: pairs, counts = np.unique(check_pairs, axis=0, return_counts=True) check_pairs = np.concatenate([no_path_pairs, pairs[counts > 1]]) pairs, counts = np.unique(check_pairs, axis=0, return_counts=True) no_path_pairs = pairs[counts < 2] check_pairs = np.concatenate([path_pairs, no_path_pairs[:, [1, 0]]]) if len(check_pairs) == 0: # If no pairs are extracted from the specified prior knowledge, return check_pairs pairs = np.unique(check_pairs, axis=0) return pairs[:, [1, 0]] # [to, from] -> [from, to] def _search_candidate(self, S): """Search for candidate features""" # If no prior knowledge is specified, nothing to do. if self._Aknw is None: return S # Candidate features that are not to the left of the partial orders if len(self._partial_orders) != 0: Sc = [i for i in S if i not in self._partial_orders[:, 1]] return Sc return S def _get_resid_and_coef(self, predictors, target): """Get the residuals and coefficients of the ordinary least squares method""" lr = LinearRegression().fit(predictors, target) resid = target - lr.predict(predictors) return resid, lr.coef_ def _log_likelihood(self, x, y): """Compute the log likelihoods assuming a Laplace distribution""" residuals, _ = self._get_resid_and_coef(y, x) sigma = np.mean(np.abs(residuals)) return np.sum(laplace.logpdf(residuals, loc=0, scale=sigma)) def _log_likelihood_ratio(self, x, y): """Compute the (normalized) ratio of the log likelihoods for the two causal models""" if x.ndim == 1: x = x.reshape(-1, 1) if y.ndim == 1: y = y.reshape(-1, 1) m = len(x) LogL_xy = self._log_likelihood(x, y) LogL_yx = self._log_likelihood(y, x) R = (LogL_xy - LogL_yx) / m return R def _pairwise_measure(self, X, remaining_groups, j, groups): """Compute the pairwise measure between one group and all remaining groups""" mu = 0 total_n_i = 0 X_j = X[:, groups[j]] n_j = X_j.shape[1] for i in remaining_groups: if i == j: continue X_i = X[:, groups[i]] n_i = X_i.shape[1] total_n_i += n_i for k in range(n_j): x_jk = X_j[:, k] for l in range(n_i): x_il = X_i[:, l] r_li, coef = self._get_resid_and_coef(X_j, x_il) b_lk = coef[k] z_ikl = b_lk * x_jk + r_li R = self._log_likelihood_ratio(x_jk, z_ikl) mu += min(0, R) ** 2 return mu / (n_j * total_n_i) def _get_residual_matrix(self, X, predictors, targets): """Get the matrix of residuals""" if len(predictors) == 0: return X Y = np.zeros_like(X) for target in targets: Y[:, target], _ = self._get_resid_and_coef(X[:, list(predictors)], X[:, target]) return Y def _estimate_causal_order(self, X, groups): """Estimate the causal order of groups by repeatedly identifying exogenous groups""" n_groups = len(groups) G = np.arange(n_groups) Rj = np.copy(X) S = G K = [] for _ in range(n_groups): Sc = self._search_candidate(S) if len(Sc) == 1: k = Sc[0] else: # Find an exogenous group mu_list = [float('inf') for j in range(n_groups)] for j in G: if j in Sc: mu = self._pairwise_measure(Rj, Sc, j, groups) mu_list[j] = mu k = np.argmin(mu_list) S = S[S != k] K.append(k) # Replace the data matrix with the residuals Xj = groups[k] Xi = [k for i in S for k in groups[i]] Rj = self._get_residual_matrix(Rj, Xj, Xi) # Update partial orders if self._Aknw is not None: self._partial_orders = self._partial_orders[self._partial_orders[:, 0] != k] return K def _estimate_adjacency_matrix(self, X, groups, prior_knowledge=None): """Estimate adjacency matrix by causal order. Parameters ---------- X : array-like, shape (n_samples, n_features) Training data, where n_samples is the number of samples and n_features is the number of features. groups : array-like, shape (n_groups) The list of features for each group. prior_knowledge : array-like, shape (n_groups, n_groups), optional (default=None) Prior knowledge matrix. Returns ------- self : object Returns the instance itself. """ if prior_knowledge is not None: pk = prior_knowledge.copy() np.fill_diagonal(pk, 0) B = np.zeros([X.shape[1], X.shape[1]], dtype="float64") for i in range(1, len(self._causal_order)): target_group_idx = self._causal_order[i] predictor_groups_idxs = self._causal_order[:i] # Exclude groups specified in no_path with prior knowledge if prior_knowledge is not None: predictor_groups_idxs = [p for p in predictor_groups_idxs if pk[target_group_idx, p] != 0] # target is exogenous groups if predictors are empty if len(predictor_groups_idxs) == 0: continue # Retrieve variables from groups target_group = groups[target_group_idx] predictor_groups = [groups[idx] for idx in predictor_groups_idxs] predictors = [var for group in predictor_groups for var in group] for target in target_group: B[target, predictors] = predict_adaptive_lasso(X, predictors, target) self._adjacency_matrix = B return self @property def adjacency_matrix_(self): """Estimated adjacency matrix. Returns ------- adjacency_matrix_ : array-like, shape (n_features, n_features) The adjacency matrix B of fitted model, where n_features is the number of features. """ return self._adjacency_matrix @property def causal_order_(self): """Estimated causal ordering. Returns ------- causal_order_ : array-like, shape (n_groups) The causal order of fitted model, where n_groups is the number of groups. """ return self._causal_order
[docs] def bootstrap(self, X, groups, n_sampling): """Evaluate the statistical reliability of DAG based on the bootstrapping. Parameters ---------- X : array-like, shape (n_samples, n_features) Training data, where ``n_samples`` is the number of samples and ``n_features`` is the number of features. groups : array-like, shape (n_groups) The list of features for each group. n_sampling : int Number of bootstrapping samples. Returns ------- result : BootstrapResult Returns the result of bootstrapping. """ # Check parameters X = check_array(X) if isinstance(n_sampling, (numbers.Integral, np.integer)): if not 0 < n_sampling: raise ValueError("n_sampling must be an integer greater than 0.") else: raise ValueError("n_sampling must be an integer greater than 0.") # Bootstrapping adjacency_matrices = np.zeros([n_sampling, X.shape[1], X.shape[1]]) total_effects = np.zeros([n_sampling, X.shape[1], X.shape[1]]) index = np.arange(X.shape[0]) resampled_indices = [] for i in range(n_sampling): resampled_X, resampled_index = resample(X, index) self.fit(resampled_X, groups) adjacency_matrices[i] = self._adjacency_matrix # Calculate total effects for c, from_group in enumerate(self._causal_order): for to_group in self._causal_order[c + 1 :]: for from_ in groups[from_group]: for to in groups[to_group]: total_effects[i, to, from_] = calculate_total_effect( self._adjacency_matrix, from_, to ) resampled_indices.append(resampled_index) return BootstrapResult(adjacency_matrices, total_effects, resampled_indices=resampled_indices)