Source code for lingam.abic_lingam

"""
Python implementation of the LiNGAM algorithms.
The LiNGAM Project: https://sites.google.com/view/sshimizu06/lingam
"""

import math
import numpy as np
import warnings

import autograd.numpy as anp
from autograd import grad
import scipy.optimize as sopt
import functools
from sklearn.utils import check_array, resample
from sklearn.linear_model import LinearRegression

from .base import _BaseLiNGAM
from .bootstrap import BootstrapResult


[docs] class ABICLiNGAM(_BaseLiNGAM): """Implementation of ABIC-LiNGAM Algorithm. [1]_ Original code: https://github.com/Yoshimitsu-try/ABIC_LiNGAM References ---------- .. [1] Y. Morinishi and S. Shimizu. Differentiable causal discovery of linear non-Gaussian acyclic models under unmeasured confounding. Transactions on Machine Learning Research (TMLR), 2025. """
[docs] def __init__( self, beta=1.0, lam=0.05, acyc_order=None, seed=0, max_outer=100, tol_h=1e-8, min_causal_effect=0.05, min_error_covariance=0.05, rho_max=1e16, inner_start=1, inner_growth=1, inner_tol=1e-4, ): """Construct a ABICLiNGAM model. Parameters ---------- beta : float, optional (default=1.0) Power in residual loss, i.e., ||r||^(2*beta) lam : float, optional (default=0.05) The weight of the regularization term. acyc_order : int or None, optional (default=None) Order of the truncated series for acyclicity penalty. If None, defaults to the number of variables. min_causal_effect : float, optional (default=0.05) Threshold for detecting causal edge. Causal edges with absolute values of causal effects less than ``min_causal_effect`` are excluded. min_error_covariance : float, optional (default=0.05) Threshold for detecting error covariances. Error covariances with absolute values less than ``min_error_covariance`` are excluded. seed : int, optional (default=0) Seed for the random number generator. max_outer : int, optional (default=100) Maximum number of outer iterations. tol_h : float, optional (default=1e-8) Tolerance for acyclicity penalty to stop. rho_max : float, optional (default=1e16) Maximum value for Augmented Lagrangian penalty parameter rho. inner_start : int, optional (default=1) Initial number of inner refinement steps. inner_growth : int, optional (default=1) Growth of inner refinement steps per outer iteration. inner_tol : float, optional (default=1e-4) Tolerance for inner loop convergence. """ # Check parameters if beta <= 0.0: raise ValueError("beta must be positive.") if lam < 0.0: raise ValueError("lam must be non-negative.") if acyc_order is not None: if not isinstance(acyc_order, int): raise TypeError("acyc_order must be an integer or None.") if acyc_order < 1: raise ValueError("acyc_order must be >= 1.") if min_causal_effect < 0.0: raise ValueError("min_causal_effect must be non-negative.") if min_error_covariance < 0.0: raise ValueError("min_error_covariance must be non-negative.") if max_outer < 1: raise ValueError("max_outer must be at least 1.") if tol_h <= 0.0: raise ValueError("tol_h must be positive.") if rho_max <= 0.0: raise ValueError("rho_max must be positive.") if inner_start < 1: raise ValueError("inner_start must be at least 1.") if inner_growth < 0: raise ValueError("inner_growth must be non-negative.") if inner_tol <= 0.0: raise ValueError("inner_tol must be positive.") self._beta = float(beta) self._lam = float(lam) self._acyc_order = acyc_order self._min_causal_effect = float(min_causal_effect) self._min_error_covariance = float(min_error_covariance) self._seed = int(seed) self._max_outer = int(max_outer) self._tol_h = float(tol_h) self._rho_max = float(rho_max) self._inner_start = int(inner_start) self._inner_growth = int(inner_growth) self._inner_tol = float(inner_tol) super().__init__()
[docs] def fit(self, X): """Fit the model to X. Parameters ---------- X : array-like, shape (n_samples, n_features) Observed data matrix. Returns ------- self : object Returns the instance itself. """ self._X = anp.asarray(check_array(X)) d = self._X.shape[1] # Random number generator self._rng = np.random.default_rng(self._seed) # Check parameters if d < 2: raise ValueError("Data must have at least two variables (features).") # Initialize parameters B = anp.array(self._rng.uniform(-0.5, 0.5, size=(d, d))) L = anp.array(self._rng.uniform(-0.05, 0.05, size=(d, d))) lower_mask = anp.array(np.tril(np.ones((d, d)), k=-1)) L = L * lower_mask L = L + L.T L = L - anp.diag(anp.diag(L)) D = anp.diag(anp.diag(anp.cov(self._X.T))) rho, alpha, h_prev = 1.0, 0.0, np.inf inner_cap = self._inner_start penalty_fn = self._bow_penalty bounds = self._build_bounds() # no prior knowledge objective = functools.partial(self._objective) gradient = grad(objective) for _ in range(self._max_outer): B_new, L_new, D_new = None, None, None h_new = None while rho < self._rho_max: B_new = B.copy() L_new = L.copy() D_new = D.copy() # inner refinement for _ in range(inner_cap): B_old, L_old, D_old = B_new, L_new, D_new Z = self._pseudo(B_new, L_new + D_new) theta0 = anp.concatenate([anp.ravel(B_new), anp.ravel(L_new)]) res = sopt.minimize( self._objective, theta0, args=(rho, alpha, Z, penalty_fn), method="L-BFGS-B", jac=gradient, bounds=bounds, options={"disp": False}, ) B_new = anp.reshape(res.x[: d * d], (d, d)) L_new = anp.reshape(res.x[d * d :], (d, d)) L_new = L_new + L_new.T L_new = L_new - anp.diag(anp.diag(L_new)) # refresh diagonal noise from residuals (different expression) diag_vals = [ anp.var(self._X[:, j] - anp.dot(self._X, B_new[:, j])) for j in range(d) ] D_new = anp.diag(anp.array(diag_vals)) # convergence of inner loop delta = anp.sum(anp.abs(B_old - B_new)) + anp.sum( anp.abs((L_old + D_old) - (L_new + D_new)) ) if float(delta) < self._inner_tol: break h_new = self._acyclicity_penalty(B_new) + penalty_fn(B_new, L_new) # penalty schedule if float(h_new) < 0.25 * float(h_prev): break else: rho *= 10.0 # AL update B, L, D = B_new.copy(), L_new.copy(), D_new.copy() h_prev = h_new alpha = alpha + rho * h_prev inner_cap += self._inner_growth if float(h_prev) <= self._tol_h or rho >= self._rho_max: break self._B = anp.where(anp.abs(B) < self._min_causal_effect, 0.0, B) self._omega = anp.where( anp.abs(L + D) < self._min_error_covariance, 0.0, (L + D) ) # Merge coefficient matrix and error covariance matrix omega_copied = self._omega.copy() anp.fill_diagonal(omega_copied, 0.0) omega_copied[anp.abs(omega_copied) > 0] = anp.nan self._adjacency_matrix = self._B.T.copy() self._adjacency_matrix[anp.isnan(omega_copied)] = anp.nan # Estimate causal order from coefficient matrix self._causal_order = self._causal_order_from_adjacency_matrix(self._B.T) return self
[docs] def bootstrap(self, X, n_sampling=100): """Bootstrap sampling to assess variability of estimates. Parameters ---------- X : array-like, shape (n_samples, n_features) Observed data matrix. n_sampling : int, optional (default=100) Number of bootstrap samples. Returns ------- Bs : array-like, shape (n_sampling, n_features, n_features) Bootstrap samples of estimated adjacency matrices. Omegas : array-like, shape (n_sampling, n_features, n_features) Bootstrap samples of estimated error covariance matrices. """ X = anp.asarray(X) d = X.shape[1] adjacency_matrices = anp.zeros((n_sampling, d, d)) Bs = anp.zeros((n_sampling, d, d)) Omegas = anp.zeros((n_sampling, d, d)) total_effects = anp.zeros((n_sampling, d, d)) index = anp.arange(X.shape[0]) resampled_indices = [] for i in range(n_sampling): resampled_X, resampled_index = resample(X, index) resampled_indices.append(resampled_index) self.fit(resampled_X) adjacency_matrices[i] = self._adjacency_matrix Bs[i] = self._B Omegas[i] = self._omega # Calculate total effects for c, from_ in enumerate(self._causal_order): for to in self._causal_order[c + 1 :]: if True in np.isnan(self._adjacency_matrix[from_]): total_effects[i, to, from_] = np.nan else: total_effects[i, to, from_] = self.estimate_total_effect( resampled_X, from_, to ) return ABICBootstrapResult( adjacency_matrices, Bs, Omegas, total_effects, resampled_indices=resampled_indices, )
def _acyclicity_penalty(self, W, K=None): """Smooth acyclicity surrogate written as a truncated series: h(W) = sum_{k=1..K} trace((W∘W)^k) / k! where ∘ is Hadamard product. K defaults to d. This differs in form/implementation from common "M=I+..." variants and avoids any custom VJP; autograd handles the gradient. Parameters ---------- W : array-like, shape (d, d) Directed adjacency matrix. K : int or None Order of the truncated series. If None, defaults to d. Returns ------- penalty : float Value of the acyclicity penalty. """ d = W.shape[0] if K is None: K = self._acyc_order or d A = W * W Ak = anp.eye(d) acc = 0.0 for k in range(1, K + 1): Ak = anp.dot(Ak, A) acc = acc + anp.trace(Ak) / float(math.factorial(k)) return acc @staticmethod def _bow_penalty(W1, W2): """Bow-freeness surrogate in an alternative form: || W1 ∘ W2 ||_F^2 / |W1| Parameters ---------- W1 : array-like, shape (d, d) Directed adjacency matrix. W2 : array-like, shape (d, d) Bidirected adjacency matrix. Returns ------- penalty : float Value of the bow-freeness penalty. """ A = W1 * W2 return anp.sum(A * A) / A.size def _objective(self, theta, rho, alpha, Z, penalty_fn): """Augmented Lagrangian objective. All pieces are auto-diff compatible. Parameters ---------- theta : array-like, shape (d*d + d*d,) Concatenated parameter vector [vec(B), vec(L)], where L is strictly lower triangular part to be mirrored to form a symmetric matrix. rho : float The weight of the penalty term alpha : float The Lagrange multiplier Z : list of array-like, shape (n, d) Pseudo-variables for bidirected part. penalty_fn : callable Structural penalty function on (B, L_sym). Returns ------- obj : float Value of the objective function. """ n, d = self._X.shape # unpack and enforce symmetry for the bidirected part B = anp.reshape(theta[: d * d], (d, d)) L = anp.reshape(theta[d * d :], (d, d)) L = L + L.T L = L - anp.diag(anp.diag(L)) # zero diagonal # LS(theta) term (with generalized power 2*beta) LS = 0.0 for j in range(d): r = self._X[:, j] - anp.dot(self._X, B[:, j]) - anp.dot(Z[j], L[:, j]) LS = LS + 0.5 / n * (anp.linalg.norm(r) ** (2 * self._beta)) # structural constraints h = self._acyclicity_penalty(B) + penalty_fn(B, L) aug = 0.5 * rho * (h**2) + alpha * h # smooth L0-ish (tanh-like) regularization on theta s = anp.log(n) * anp.abs(theta) t = (anp.exp(s) - 1) / (anp.exp(s) + 1) return LS + aug + self._lam * anp.sum(t) def _build_bounds(self, levels=None, exogenous=(), w_range=4.0): """Create L-BFGS-B bounds for theta Parameters ---------- levels : list[list[index]] or None Prior knowledge about variable ordering. Each sublist represents a level, where variables in earlier levels cannot have incoming edges from later levels. exogenous : array-like, shape (index, ...) Indices of exogenous variables (no incoming edges). w_range : float Range for weights (default: 4.0). Bounds are set to [-w_range, w_range]. Returns ------- bounds : list[tuple[float, float]] Bounds for each parameter in theta = [vec(B), vec(L)]. """ d = self._X.shape[1] if levels is None: levels = [[i for i in range(d)]] tier = {v: t for t, group in enumerate(levels) for v in group} exo = set(exogenous) # Directed bounds B: start wide, zero diag, then forbid backward edges B_lo = -w_range * np.ones((d, d)) B_hi = +w_range * np.ones((d, d)) np.fill_diagonal(B_lo, 0.0) np.fill_diagonal(B_hi, 0.0) for i in range(d): for j in range(d): if i == j: continue if tier[i] > tier[j]: B_lo[i, j] = 0.0 B_hi[i, j] = 0.0 # Bidirected bounds L (we optimize lower-tri; upper/diag fixed zero) L_lo = -w_range * np.ones((d, d)) L_hi = +w_range * np.ones((d, d)) for i in range(d): for j in range(d): if i <= j or (i in exo) or (j in exo): L_lo[i, j] = 0.0 L_hi[i, j] = 0.0 bounds_B = np.c_[B_lo.reshape(-1), B_hi.reshape(-1)].tolist() bounds_L = np.c_[L_lo.reshape(-1), L_hi.reshape(-1)].tolist() return bounds_B + bounds_L def _pseudo(self, B, omega): """ Build pseudo-variables Z using solves instead of explicit inversion. Parameters ---------- B : array-like, shape (d, d) Current estimate of directed adjacency matrix. omega : array-like, shape (d, d) Current estimate of error covariance matrix. Returns ------- Z : list of array-like, shape (n, d) Returns a list Z such that Z[j] has a zero column at j (shape (n, d)). """ d = B.shape[0] eps = self._X - anp.dot(self._X, B) Z = [None] * d for j in range(d): idx = [k for k in range(d) if k != j] omega_ = omega[anp.ix_(idx, idx)] Zij = anp.linalg.solve(omega_, eps[:, idx].T).T # (n, d-1) Zj = anp.insert(Zij, j, 0.0, axis=1) # (n, d) Z[j] = Zj return Z def _causal_order_from_adjacency_matrix(self, matrix, threshold=1e-8): """Estimate causal order from adjacency matrix using Kahn's algorithm.""" d = matrix.shape[0] # 有向グラフの隣接リストと入次数を作成 adj = {i: set() for i in range(d)} indegree = [0] * d for i in range(d): for j in range(d): if i != j and abs(matrix[i, j]) > threshold: adj[i].add(j) indegree[j] += 1 # 入次数0のノードから順に並べる order = [] queue = [i for i in range(d) if indegree[i] == 0] while queue: n = queue.pop(0) order.append(n) for m in adj[n]: indegree[m] -= 1 if indegree[m] == 0: queue.append(m) if len(order) != d: raise ValueError("The adjacency matrix contains a cycle (not a DAG).") return order[::-1]
[docs] def estimate_total_effect(self, X, from_index, to_index): """Estimate total effect using causal model. Parameters ---------- X : array-like, shape (n_samples, n_features) Original data, where n_samples is the number of samples and n_features is the number of features. from_index : Index of source variable to estimate total effect. to_index : Index of destination variable to estimate total effect. Returns ------- total_effect : float Estimated total effect. """ # Check parameters X = check_array(X) # Check confounders if True in np.isnan(self._adjacency_matrix[from_index]): warnings.warn( f"The estimated causal effect may be incorrect because " f"the source variable (from_index={from_index}) is influenced by confounders." ) return np.nan # from_index + parents indices parents = np.where(np.abs(self._adjacency_matrix[from_index]) > 0)[0] predictors = [from_index] predictors.extend(parents) # Estimate total effect lr = LinearRegression() lr.fit(X[:, predictors], X[:, to_index]) return lr.coef_[0]
@property def coefficient_matrix_(self): """Estimated coefficient matrix. Returns ------- coefficient_matrix_ : array-like, shape (n_features, n_features) The coefficient matrix B of fitted model, where n_features is the number of features. """ return self._B @property def error_covariance_matrix_(self): """Estimated error covariance matrix. Returns ------- error_covariance_matrix_ : array-like, shape (n_features, n_features) The error covariance matrix Omega of fitted model, where n_features is the number of features. """ return self._omega
class ABICBootstrapResult(BootstrapResult): """The result of bootstrapping for Time series algorithm.""" def __init__( self, adjacency_matrices, coefficient_matrices, error_covariance_matrices, total_effects, resampled_indices=None, ): """Construct a BootstrapResult. Parameters ---------- adjacency_matrices : array-like, shape (n_sampling) The adjacency matrix list by bootstrapping. coefficient_matrices : array-like, shape (n_sampling) The coefficient matrix list by bootstrapping. error_covariance_matrices : array-like, shape (n_sampling) The error covariance matrix list by bootstrapping. total_effects : array-like, shape (n_sampling) The total effects list by bootstrapping. resampled_indices : list of array-like, shape (n_sampling), optional (default=None) The list of resampled indices used in bootstrapping. """ super().__init__( adjacency_matrices, total_effects, resampled_indices=resampled_indices ) self._coefficient_matrices = coefficient_matrices self._error_covariance_matrices = error_covariance_matrices @property def coefficient_matrices_(self): """The coefficient matrix list by bootstrapping. Returns ------- coefficient_matrices_ : array-like, shape (n_sampling) The coefficient matrix list, where ``n_sampling`` is the number of bootstrap sampling. """ return self._coefficient_matrices @property def error_covariance_matrices_(self): """The error covariance matrix list by bootstrapping. Returns ------- error_covariance_matrices_ : array-like, shape (n_sampling) The error covariance matrix list, where ``n_sampling`` is the number of bootstrap sampling. """ return self._error_covariance_matrices