Source code for lingam.multi_group_camuv

"""
Python implementation of the LiNGAM algorithms.
The LiNGAM Project: https://sites.google.com/view/sshimizu06/lingam
"""

import copy
import itertools

import numpy as np
from pygam import LinearGAM
from scipy.stats import chi2
from sklearn.utils import check_array

from .hsic import hsic_test_gamma
from .utils import f_correlation


[docs] class MultiGroupCAMUV: """Implementation of CAM-UV Algorithm with multiple groups [1]_ [2]_ [3]_ References ---------- .. [1] T.N.Maeda and S.Shimizu. Causal additive models with unobserved variables. In Proc. Thirty-Seventh Conference on Uncertainty in Artificial Intelligence (UAI). PMLR 161:97-106, 2021. .. [2] T. N. Maeda and S. Shimizu. Use of prior knowledge to discover causal additive models with unobserved variables and its application to time series data. Behaviormetrika, xx(xx): 1-19, 2024. .. [3] S. Shimizu. Joint estimation of linear non-Gaussian acyclic models. Neurocomputing, 81: 104-107, 2012. """
[docs] def __init__( self, alpha=0.01, num_explanatory_vals=2, independence="hsic", ind_corr=0.5, prior_knowledge=None, ): """Construct a CAM-UV model. Parameters ---------- alpha : float, optional (default=0.01) Alpha level. num_explanatory_vals : int, optional (default=2) Maximum number of explanatory variables. independence : {'hsic', 'fcorr'}, optional (default='hsic') Methods to determine independence. If 'hsic' is set, test for independence by HSIC. If 'fcorr' is set, independence is determined by F-correlation. ind_corr : float, optional (default=0.5) The threshold value for determining independence by F-correlation; independence is determined when the value of F-correlation is below this threshold value. prior_knowledge : array-like, shape ((index, index), ...), optional (default=None) List of variable pairs indicating prior knowledge. If (0, 3) is included , it means that X0 cannot be a cause of X3. """ # Check parameters if num_explanatory_vals <= 0: raise ValueError("num_explanatory_vals must be > 0.") if alpha < 0: raise ValueError("alpha must be >= 0.") if independence not in ("hsic", "fcorr"): raise ValueError("independence must be 'hsic' or 'fcorr'.") if ind_corr < 0.0: raise ValueError("ind_corr must be an float greater than 0.") self._num_explanatory_vals = num_explanatory_vals self._alpha = alpha self._independence = independence self._ind_corr = ind_corr self._pk_dict = self._make_pk_dict(prior_knowledge)
[docs] def fit(self, X_list): """Fit the model to multiple datasets. Parameters ---------- X_list : list, shape [X, ...] Multiple datasets for training, where ``X`` is an dataset. The shape of ''X'' is (n_samples, n_features), where ``n_samples`` is the number of samples and ``n_features`` is the number of features. Returns ------- self : object Returns the instance itself. """ X_list = self._check_X_list(X_list) d = X_list[0].shape[1] N = self._get_neighborhoods(X_list) P = self._find_parents(X_list, self._num_explanatory_vals, N) U = [] for i in range(d): for j in range(d)[i + 1 :]: if (i in P[j]) or (j in P[i]): continue if (i not in N[j]) or (j not in N[i]): continue in_X_list = [] in_Y_list = [] for X in X_list: n = X.shape[0] i_residual = self._get_residual(X, i, P[i]) j_residual = self._get_residual(X, j, P[j]) in_X_list.append(np.reshape(i_residual, [n, 1])) in_Y_list.append(np.reshape(j_residual, [n, 1])) if not self._is_independent(in_X_list, in_Y_list): if not set([i, j]) in U: U.append(set([i, j])) self._U = U self._P = P return self._estimate_adjacency_matrix(X_list[0], P, U)
def _make_pk_dict(self, prior_knowledge): if prior_knowledge is None: return None pk_dict = dict() for pair in prior_knowledge: if not pair[1] in pk_dict: pk_dict[pair[1]] = [pair[0]] else: pk_dict[pair[1]].append(pair[0]) return pk_dict def _check_X_list(self, X_list): """Check input X list.""" if not isinstance(X_list, list): raise ValueError("X_list must be a list.") if len(X_list) < 2: raise ValueError("X_list must be a list containing at least two items") self._k = len(X_list) self._n_features = check_array(X_list[0]).shape[1] X_list_ = [] for X in X_list: X_ = check_array(X) if X_.shape[1] != self._n_features: raise ValueError( "X_list must be a list with the same number of features" ) X_list_.append(X_) return X_list_ def _get_residual(self, X, explained_i, explanatory_ids): explanatory_ids = list(explanatory_ids) if len(explanatory_ids) == 0: residual = X[:, explained_i] else: gam = LinearGAM().fit(X[:, explanatory_ids], X[:, explained_i]) residual = X[:, explained_i] - gam.predict(X[:, explanatory_ids]) return residual def _is_independent(self, X_list, Y_list): if self._independence == "hsic": threshold = self._alpha elif self._independence == "fcorr": threshold = self._ind_corr is_independent, _ = self._is_independent_by(X_list, Y_list, threshold) return is_independent def _is_independent_by(self, X_list, Y_list, threshold): is_independent = False value = 0.0 if self._independence == "hsic": fisher_stat = 0 for X, Y in zip(X_list, Y_list): _, hsic_p = hsic_test_gamma(X, Y) fisher_stat += np.inf if hsic_p == 0 else -2 * np.log(hsic_p) fisher_p = chi2.sf(fisher_stat, df=2 * self._k) is_independent = fisher_p > threshold value = fisher_p elif self._independence == "fcorr": max_f_corr = 0.0 for X, Y in zip(X_list, Y_list): f_corr = f_correlation(X, Y) if f_corr > max_f_corr: max_f_corr = f_corr is_independent = max_f_corr < threshold value = max_f_corr return is_independent, value def _get_neighborhoods(self, X_list): d = X_list[0].shape[1] N = [set() for i in range(d)] for i in range(d): for j in range(d)[i + 1 :]: in_X_list = [] in_Y_list = [] for X in X_list: n = X.shape[0] in_X_list.append(np.reshape(X[:, i], [n, 1])) in_Y_list.append(np.reshape(X[:, j], [n, 1])) if not self._is_independent(in_X_list, in_Y_list): N[i].add(j) N[j].add(i) return N def _find_parents(self, X_list, maxnum_vals, N): d = X_list[0].shape[1] P = [set() for i in range(d)] # Parents t = 2 Y_list = [copy.deepcopy(X) for X in X_list] while True: changed = False variables_set_list = list(itertools.combinations(set(range(d)), t)) for variables_set in variables_set_list: variables_set = set(variables_set) if not self._check_identified_causality(variables_set, P): continue child, is_independence_with_K = self._get_child( X_list, variables_set, P, N, Y_list ) if child is None: continue if not is_independence_with_K: continue parents = variables_set - {child} if not self._check_independence_withou_K(parents, child, P, N, Y_list): continue for parent in parents: P[child].add(parent) changed = True Y_list_ = [] for X, Y in zip(X_list, Y_list): Y_list_.append(self._get_residuals_matrix(X, Y, P, child)) Y_list = Y_list_ if changed: t = 2 else: t += 1 if t > maxnum_vals: break for i in range(d): non_parents = set() for j in P[i]: in_X_list = [] in_Y_list = [] for X in X_list: n = X.shape[0] residual_i = self._get_residual(X, i, P[i] - {j}) residual_j = self._get_residual(X, j, P[j]) in_X_list.append(np.reshape(residual_i, [n, 1])) in_Y_list.append(np.reshape(residual_j, [n, 1])) if self._is_independent(in_X_list, in_Y_list): non_parents.add(j) P[i] = P[i] - non_parents return P def _check_prior_knowledge(self, xj_list, xi): if self._pk_dict is not None: for xj in xj_list: if (xi in self._pk_dict) and (xj in self._pk_dict[xi]): return True return False def _get_residuals_matrix(self, X, Y_old, P, child): Y = copy.deepcopy(Y_old) Y[:, child] = self._get_residual(X, child, P[child]) return Y def _get_child(self, X_list, variables_set, P, N, Y_list): prev_independence = 0.0 if self._independence == "hsic" else 1.0 max_independence_child = None for child in variables_set: parents = variables_set - {child} if self._check_prior_knowledge(parents, child): continue if not self._check_correlation(child, parents, N): continue in_X_list = [] in_Y_list = [] for X, Y in zip(X_list, Y_list): n = X.shape[0] residual = self._get_residual(X, child, parents | P[child]) in_X_list.append(np.reshape(residual, [n, 1])) in_Y_list.append(np.reshape(Y[:, list(parents)], [n, len(parents)])) is_ind, value = self._is_independent_by( in_X_list, in_Y_list, prev_independence ) if is_ind: prev_independence = value max_independence_child = child if self._independence == "hsic": is_independent = prev_independence > self._alpha elif self._independence == "fcorr": is_independent = prev_independence < self._ind_corr return max_independence_child, is_independent def _check_independence_withou_K(self, parents, child, P, N, Y_list): for parent in parents: in_X_list = [] in_Y_list = [] for Y in Y_list: n = Y.shape[0] in_X_list.append(np.reshape(Y[:, child], [n, 1])) in_Y_list.append(np.reshape(Y[:, parent], [n, 1])) if self._is_independent(in_X_list, in_Y_list): return False return True def _check_identified_causality(self, variables_set, P): variables_list = list(variables_set) for i in variables_list: for j in variables_list[variables_list.index(i) + 1 :]: if (j in P[i]) or (i in P[j]): return False return True def _check_correlation(self, child, parents, N): for parent in parents: if parent not in N[child]: return False return True def _estimate_adjacency_matrix(self, X, P, U): B = np.zeros([X.shape[1], X.shape[1]], dtype="float64") for i, parents in enumerate(P): for parent in parents: B[i, parent] = 1 for confounded_pair in U: confounded_pair = list(confounded_pair) B[confounded_pair[0], confounded_pair[1]] = np.nan B[confounded_pair[1], confounded_pair[0]] = np.nan self._adjacency_matrix = B return self @property def adjacency_matrix_(self): """Estimated adjacency matrix. Returns ------- adjacency_matrix_ : array-like, shape (n_features, n_features) The adjacency matrix B of fitted model, where n_features is the number of features. """ return self._adjacency_matrix