"""
Python implementation of the LiNGAM algorithms.
The LiNGAM Project: https://sites.google.com/view/sshimizu06/lingam
"""
import numpy as np
import scipy.optimize as sopt
from scipy.special import expit as sigmoid
# from factor_analyzer import (ConfirmatoryFactorAnalyzer, ModelSpecificationParser)
from psy import cfa
from statsmodels.stats.outliers_influence import variance_inflation_factor
import pandas as pd
[docs]
class LiNA:
"""Implementation of LiNA Algorithm [1]_
References
----------
.. [1] Y. Zeng, S. Shimizu, R. Cai, F. Xie, M. Yamamoto and Z. Hao.
Causal Discovery with Multi-Domain LiNGAM for Latent Factors.
In Proc. of the Thirtieth International Joint Conference on
Artificial Intelligence (IJCAI-21), 2021: 2097--2103.
"""
[docs]
def __init__(
self,
w_threshold=0.3,
lambda1=0.1,
lambda2=0.1,
loss_type='laplace',
max_iter=100,
h_tol=1e-8,
rho_max=1e+16
):
"""Construct a LiNA model.
Parameters
----------
w_threshold : float (default=0.3)
Drop edge if the weight btw. latent factors is less than w_threshold.
lambda1 : float, optional (default=0.1)
L1 penalty parameter.
lambda2 : float, (default=0.1)
L2 penalty parameter.
loss_type : str, (default='laplace')
Type of distribution of the noise.
max_iter : int, (default=100)
Maximum number of dual ascent steps.
h_tol : float, (default=1e-8)
Tolerance parameter of the acyclicity constraint.
rho_max : float, (default=1e+16)
Maximum value of the regularization parameter rho.
"""
self._w_threshold = w_threshold
self._lambda1 = lambda1
self._lambda2 = lambda2
self._loss_type = loss_type
self._max_iter = max_iter
self._h_tol = h_tol
self._rho_max = rho_max
self._adjacency_matrix = None
self._measurement_matrix = None
[docs]
def fit(self, X, G_sign, scale):
"""Fit the model to X with measurement structure and latent factors' scales.
Parameters
----------
X : array-like, shape (n_samples, n_features)
Training data, where ``n_samples`` is the number of samples
and ``n_features`` is the number of measurement features.
G_sign: array-like, shape (n_features, n_features_latent)
Measurement structure matrix, where ``n_features_latent`` is
the number of latent factors and ``n_features`` is the
number of measurement features.
scale: array-like, shape (1, n_features_latent)
Scales of the latent factors.
Returns
-------
self : object
Returns the instance of self.
"""
n_samples = X.shape[0]
n_features_latent = G_sign.shape[1]
lam, phi, var_e = cfa(X, G_sign)
G_est_nor = lam
f_est_nor = np.linalg.inv(G_est_nor.T @ G_est_nor) @ G_est_nor.T @ (X.T)
f_est_nor = f_est_nor.T
f_est = np.zeros([n_samples, n_features_latent])
for j in range(n_features_latent):
f_est[:, j] = f_est_nor[:, j] * scale[0, j]
# W_est = _estimate_LiNA(f_est, self._w_threshold, self._lambda1, self._loss_type) # input f: n*d
f = f_est
W_est = self._estimate_LiNA(f)
self._adjacency_matrix = W_est
self._measurement_matrix = G_est_nor
return self._adjacency_matrix
def _estimate_LiNA(self, f):
"""Estimate the adjacency matrix btw. estimated latent factors for single-domain data"""
def _loss(W):
"""Evaluate value and gradient of loss."""
M = f @ W
if self._loss_type == 'l2':
R = f - M
loss = 0.5 / f.shape[0] * (R ** 2).sum()
D = - 1.0 / f.shape[0] * f.T @ R
elif self._loss_type == 'logistic':
loss = 1.0 / f.shape[0] * (np.logaddexp(0, M) - f * M).sum()
D = 1.0 / f.shape[0] * f.T @ (sigmoid(M) - f)
elif self._loss_type == 'poisson':
S = np.exp(M)
loss = 1.0 / f.shape[0] * (S - f * M).sum()
D = 1.0 / f.shape[0] * f.T @ (S - f)
elif self._loss_type == 'laplace':
R = f - M
loss = - np.sum(-np.log(np.cosh(R)))
D = - f.T @ np.tanh(R)
else:
raise ValueError('unknown loss type')
return loss, D
def _h(W):
"""Evaluate value and gradient of acyclicity constraint."""
# E = slin.expm(W * W) # (Zheng et al. 2018)
# h = np.trace(E) - d
M = np.eye(d) + W * W / d # (Yu et al. 2019)
E = np.linalg.matrix_power(M, d - 1)
h = (E.T * M).sum() - d
return h, E
def _adj(w):
"""Convert doubled variables ([2 d^2] array) back to original variables ([d, d] matrix)."""
return (w[:d * d] - w[d * d:]).reshape([d, d])
def _func(w):
"""Evaluate value and gradient of augmented Lagrangian for doubled variables ([2 d^2] array)."""
W = _adj(w)
loss, D = _loss(W)
h, E = _h(W)
obj = loss + 0.5 * rho * h * h + alpha * h + self._lambda1 * w.sum()
# obj = loss + 0.5 * rho * h * h + alpha * h + self._lambda1 * w.sum()
# + self._lambda2*(np.linalg.norm(W, ord = 'fro')**2) # add l2 penalty constraint
G = D + (rho * h + alpha) * E.T * W * 2
# G = D + (rho * h + alpha) * E.T * W * 2 + 2 * self._lambda2 * W #gradient of loss + h + penalty
grad_cat = np.concatenate((G + self._lambda1, - G + self._lambda1), axis=None)
return obj, grad_cat
n, d = f.shape
w_est, rho, alpha, h = np.zeros(2 * d * d), 1.0, 0.0, np.inf # double w_est into (w_pos, w_neg)
bnds = [(0, 0) if i == j else (0, None) for _ in range(2) for i in range(d) for j in range(d)]
for _ in range(self._max_iter):
w_new, h_new = None, None
while rho < self._rho_max:
sol = sopt.minimize(_func, w_est, method='L-BFGS-B', jac=True, bounds=bnds)
w_new = sol.x
h_new, _ = _h(_adj(w_new))
if h_new > 0.25 * h:
rho *= 10
else:
break
w_est, h = w_new, h_new
alpha += rho * h
if h <= self._h_tol or rho >= self._rho_max:
break
W_est = _adj(w_est)
W_est[np.abs(W_est) < self._w_threshold] = 0
return W_est
@property
def adjacency_matrix_(self):
"""Estimated adjacency matrix between latent factors.
Returns
-------
adjacency_matrix_ : array-like, shape (n_features_latent, n_features_latent)
The adjacency matrix of latent factors, where
``n_features_latent`` is the number of latent
factors.
"""
return self._adjacency_matrix
@property
def measurement_matrix_(self):
"""Estimated measurement matrix between measurement variables and
latent factors.
Returns
-------
measurement_matrix_ : array-like, shape (n_features, n_features_latent)
The measurement matrix between measurement variables and
latent factors, where ``n_features_latent`` is the
number of latent factors and ``n_features`` is the
number of measurement variables.
"""
return self._measurement_matrix
[docs]
class MDLiNA:
"""Implementation of MD-LiNA Algorithm [2]_
References
----------
.. [2] Y. Zeng, S. Shimizu, R. Cai, F. Xie, M. Yamamoto and Z. Hao.
Causal Discovery with Multi-Domain LiNGAM for Latent Factors.
In Proc. of the Thirtieth International Joint Conference on
Artificial Intelligence (IJCAI-21), 2021: 2097--2103.
"""
[docs]
def __init__(
self,
w_threshold=0.3,
lambda1=0.1,
lambda2=0.1,
loss_type='laplace',
max_iter=100,
h_tol=1e-8,
rho_max=1e+16,
no_of_domain=2,
no_of_latent_1domain=3,
):
"""Construct an MD-LiNA model.
Parameters
----------
w_threshold : float (default=0.3)
Drop edge if the weight btw. latent factors is less than w_threshold.
lambda1 : float, optional (default=0.1)
L1 penalty parameter.
lambda2 : float, (default=0.1)
L2 penalty parameter.
loss_type : str, (default='laplace')
Type of distribution of the noise.
max_iter : int, (default=100)
Maximum number of dual ascent steps.
h_tol : float, (default=1e-8)
Tolerance parameter of the acyclicity constraint.
rho_max : float, (default=1e+16)
Maximum value of the regularization parameter rho.
no_of_domain : int, (default=2)
Number of domains.
no_of_latent_1domain : float, (default=3)
Number of latent factors in a domain.
"""
self._w_threshold = w_threshold
self._lambda1 = lambda1
self._lambda2 = lambda2
self._loss_type = loss_type
self._max_iter = max_iter
self._h_tol = h_tol
self._rho_max = rho_max
self._no_of_domain = no_of_domain
self._no_of_latent_1domain = no_of_latent_1domain
self._adjacency_matrix = None
self._measurement_matrix = None
[docs]
def fit(self, X, G_sign, scale):
"""Fit the model to X with measurement structure and latent factors' scales.
Parameters
----------
X : array-like, shape (n_samples, n_features)
Training data, where ``n_samples`` is the number of samples
of all domains and ``n_features`` is the number of features
of all domains.
G_sign: array-like, shape (n_features, n_features_latent)
Measurement structure matrix, where ``n_features_latent`` is
the number of latent factors of all domains and ``n_features``
is the number of measurement variables of all domains.
scale: array-like, shape (1, n_features_latent)
Scales of the latent factors.
Returns
-------
self : object
Returns the instance of self.
"""
# # PHASE I: use CFA to estimate \bar{G}
lam, phi, var_e = cfa(X, G_sign)
G_est = lam
f_est = np.linalg.inv(G_est.T @ G_est) @ G_est.T @ (X.T)
f_est = f_est.T
# F = f_est
F = np.zeros([f_est.shape[0], f_est.shape[1]])
for j in range(f_est.shape[1]):
if j < self._no_of_latent_1domain:
F[:, j] = f_est[:, j] * scale[0, j]
else:
F[:, j] = f_est[:, j] * scale[1, j]
# optimize together
W0 = np.zeros([self._no_of_latent_1domain, self._no_of_latent_1domain])
H0 = np.zeros([self._no_of_latent_1domain * self._no_of_domain, self._no_of_latent_1domain])
times = 0
min_like = np.inf
Likelihood_all = np.zeros([1, 10000])
while 1:
# update H
H_est = self._estimate_MDLiNA_H(F, W0)
F_final2 = np.linalg.inv(H_est.T @ H_est) @ H_est.T @ F.T
# update B
W_est = self._estimate_MDLiNA_B(F_final2.T)
times = times + 1
print('W_est is:\n', W_est)
print('W0 is:\n', W0)
# check if convergent?
inv_HH = np.linalg.inv(H_est.T @ H_est) @ H_est.T
P_H = H_est @ inv_HH @ F.T
LL = np.sum(np.log(np.cosh(F @ H_est - F @ inv_HH.T @ W_est))) + (np.abs(W_est)).sum() + 10000 * self._conh(
W_est)
recon_error = np.trace((F.T - P_H).T @ (F.T - P_H))
H1 = (np.abs(H_est)).sum()
likelihood_all = LL + recon_error + H1
Likelihood_all[0, times] = likelihood_all
if likelihood_all <= min_like:
min_like = likelihood_all
if np.abs(Likelihood_all[0, times] - Likelihood_all[0, times - 1]) < 1e-1:
break
elif times >= 30 and np.abs(likelihood_all - min_like) < 1e-1:
break
elif times >= 30 and likelihood_all <= min_like:
break
elif times >= 50 and np.abs(likelihood_all - np.min(Likelihood_all[0, times - 20:times - 1])) < 1e-1:
break
elif times >= 50 and likelihood_all <= np.min(Likelihood_all[0, times - 20:times - 1]):
break
else:
W0 = W_est
H0 = H_est
if np.where(H_est != 0)[0].shape[0] == 0:
H_est = H0
if np.where(W_est != 0)[0].shape[0] == 0:
W_est = W0
# choose new factors: represent which concept.
tt = 0
if tt == 0:
Weight = np.abs(H_est)
W = W_est
else:
Weight = np.abs(H_est)
W = W0
BB = np.zeros([self._no_of_latent_1domain, self._no_of_latent_1domain])
for i in range(self._no_of_latent_1domain):
for j in range(self._no_of_latent_1domain):
BB[j, i] = np.abs(Weight[j, i]) + np.abs(Weight[j + self._no_of_latent_1domain, i])
order = np.zeros([1, self._no_of_latent_1domain])
if len(np.where(BB == 0)) == 0:
for i in range(self._no_of_latent_1domain):
indice = np.where(BB == np.amax(BB))
order[0, indice[1]] = indice[0]
BB[indice[0], :] = 0
BB[:, indice[1]] = 0
else:
indice = np.where(BB == 0)
BB[:, indice[1]] = BB[:, indice[1]] + 0.01
for i in range(self._no_of_latent_1domain):
indice = np.where(BB == np.amax(BB))
if len(indice[0]) > 1:
order[0, indice[1][0]] = indice[0][0]
BB[indice[0][0], :] = 0
BB[:, indice[1][0]] = 0
else:
order[0, indice[1]] = indice[0]
BB[indice[0], :] = 0
BB[:, indice[1]] = 0
W_est_new0 = np.zeros([self._no_of_latent_1domain, self._no_of_latent_1domain])
W_est_new = np.zeros([self._no_of_latent_1domain, self._no_of_latent_1domain])
for i in range(self._no_of_latent_1domain):
indice = np.where(order == i)
W_est_new0[[i], :] = W[indice[1], :] # row
for i in range(self._no_of_latent_1domain):
indice = np.where(order == i)
W_est_new[:, [i]] = W_est_new0[:, indice[1]] # column
self._adjacency_matrix = W_est_new
self._measurement_matrix = G_est
return self._adjacency_matrix
def calc_vif(X):
# Calculating VIF
vif = pd.DataFrame()
X = pd.DataFrame(X) # convert ndarray to dataframe
vif["variables"] = X.columns
vif["VIF"] = [variance_inflation_factor(X.values, i) for i in range(X.shape[1])]
return (vif)
def _estimate_MDLiNA_H(self, f, W):
""" Optimize H matrix btw. multi-domain latent factors and latent factors of interest
Returns:
H_est (np.ndarray): [hq, q] estimated H
"""
def _loss(H):
"""Evaluate value and gradient of loss."""
inv_HH = np.linalg.inv(H.T @ H)
fH = f @ H
M = fH @ inv_HH @ W
if self._loss_type == 'l2':
R = f - M
loss = 0.5 / f.shape[0] * (R ** 2).sum()
D = - 1.0 / f.shape[0] * f.T @ R
elif self._loss_type == 'laplace':
R = fH @ inv_HH - M
# standardize
for j in range(hq):
R[:, j] = R[:, j] - np.mean(R[:, j])
R[:, j] = R[:, j] / np.std(R[:, j])
if np.trace(H.T @ H) < 10000:
aa = np.trace(H.T @ H)
D = - f.T @ np.tanh(R) @ (np.eye(np.size(H, 1)) - W) @ inv_HH \
+ H @ inv_HH @ inv_HH @ fH.T @ np.tanh(R) @ (np.eye(np.size(H, 1)) - W) \
+ H @ inv_HH @ inv_HH @ (np.eye(np.size(H, 1)) - W).T @ np.tanh(R).T @ fH \
- 2 * f.T @ fH @ inv_HH + 2 * H @ inv_HH @ fH.T @ fH @ inv_HH
else:
aa = 0
D = - f.T @ np.tanh(R) @ (np.eye(np.size(H, 1)) - W) @ inv_HH \
+ H @ inv_HH @ inv_HH @ fH.T @ np.tanh(R) @ (np.eye(np.size(H, 1)) - W) \
+ H @ inv_HH @ inv_HH @ (np.eye(np.size(H, 1)) - W).T @ np.tanh(R).T @ fH \
- 2 * f.T @ fH @ inv_HH + 2 * H @ inv_HH @ fH.T @ fH @ inv_HH + 2 * H
loss = np.sum(np.log(np.cosh(R))) \
+ np.trace((f - fH @ inv_HH @ H.T).T @ (f - fH @ inv_HH @ H.T)) \
+ (np.trace(H.T @ H) - aa)
else:
raise ValueError('unknown loss type')
return loss, D
def _adj(h):
"""Convert doubled variables ([2 d^2] array) back to original variables ([d, d] matrix)."""
return (h[:hq * q] - h[hq * q:]).reshape([q, hq])
def _func(h):
"""Evaluate value and gradient of augmented Lagrangian for doubled variables ([2 d^2] array)."""
H = _adj(h)
loss, D = _loss(H)
obj = loss + self._lambda1 * h.sum()
G = D # gradient of loss
grad_cat = np.concatenate((G + self._lambda1, - G + self._lambda1), axis=None)
return obj, grad_cat
n, q = f.shape
hq = np.size(W, 0) # hq is \hat{q}: the number of shared latent factors
# ut.set_random_seed(0)
h_est, score_0 = np.random.rand(2 * hq * q), np.inf
for _ in range(self._max_iter):
# h_new, score_1 = None, None
sol = sopt.minimize(_func, h_est, method='BFGS', jac=True)
h_new = sol.x
score_1 = sol.fun
if np.abs(score_0 - score_1) < self._h_tol:
break
if score_1 > 0 and score_1 < score_0:
h_est = h_new
score_0 = score_1
# see if singular?
bb = _adj(h_est)
bb[np.abs(bb) < self._w_threshold] = 0
if np.linalg.det(bb.T @ bb) == 0:
h_est[0:8] = h_est[0:8] + 0.1
if np.max(np.abs(h_est)) > 100:
h_est = np.random.rand(2 * hq * q)
score_0 = np.inf
else:
h_est = np.random.rand(2 * hq * q)
h_est = h_new
H_est = _adj(h_est)
H_est[np.abs(H_est) < self._w_threshold] = 0
self._H_est = H_est
return self._H_est
def _estimate_MDLiNA_B(self, f):
"""Optimize the causal effect matrix among shared latent factors
Returns:
W_est (np.ndarray): [d, d]
"""
def _loss(W):
"""Evaluate value and gradient of loss."""
M = f @ W
if self._loss_type == 'l2':
R = f - M
loss = 0.5 / f.shape[0] * (R ** 2).sum()
D = - 1.0 / f.shape[0] * f.T @ R
elif self._loss_type == 'logistic':
loss = 1.0 / f.shape[0] * (np.logaddexp(0, M) - f * M).sum()
D = 1.0 / f.shape[0] * f.T @ (sigmoid(M) - f)
elif self._loss_type == 'poisson':
S = np.exp(M)
loss = 1.0 / f.shape[0] * (S - f * M).sum()
D = 1.0 / f.shape[0] * f.T @ (S - f)
elif self._loss_type == 'laplace':
R = f - M
loss = - np.sum(-np.log(np.cosh(R)))
D = - f.T @ np.tanh(R)
else:
raise ValueError('unknown loss type')
return loss, D
def _h(W):
"""Evaluate value and gradient of acyclicity constraint."""
M = np.eye(d) + W * W / d # (Yu et al. 2019)
E = np.linalg.matrix_power(M, d - 1)
h = (E.T * M).sum() - d
return h, E
def _adj(w):
"""Convert doubled variables ([2 d^2] array) back to original variables ([d, d] matrix)."""
return (w[:d * d] - w[d * d:]).reshape([d, d])
def _func(w):
"""Evaluate value and gradient of augmented Lagrangian for doubled variables ([2 d^2] array)."""
W = _adj(w)
loss, D = _loss(W)
h, E = _h(W)
obj = loss + 0.5 * rho * h * h + alpha * h + self._lambda1 * w.sum()
G = D + (rho * h + alpha) * E.T * W * 2
grad_cat = np.concatenate((G + self._lambda1, - G + self._lambda1), axis=None)
return obj, grad_cat
n, d = f.shape
w_est, rho, alpha, h = np.zeros(2 * d * d), 1.0, 0.0, np.inf
bnds = [(0, 0) if i == j else (0, None) for _ in range(2) for i in range(d) for j in range(d)]
for _ in range(self._max_iter):
w_new, h_new = None, None
while rho < self._rho_max:
sol = sopt.minimize(_func, w_est, method='L-BFGS-B', jac=True, bounds=bnds)
w_new = sol.x
h_new, _ = _h(_adj(w_new))
if h_new > 0.25 * h:
rho *= 10
else:
break
w_est, h = w_new, h_new
alpha += rho * h
if h <= self._h_tol or rho >= self._rho_max:
break
W_est = _adj(w_est)
W_est[np.abs(W_est) < self._w_threshold] = 0
self._W_est = W_est
return self._W_est
def _conh(self, W):
"""Evaluate value of acyclicity constraint."""
d = self._no_of_latent_1domain
M = np.eye(d) + W * W / d # (Yu et al. 2019)
E = np.linalg.matrix_power(M, d - 1)
h = (E.T * M).sum() - d
return h
@property
def adjacency_matrix_(self):
"""Estimated adjacency matrix between latent factors of interest, which
is shared by all domains.
Returns
-------
adjacency_matrix_ : array-like, shape (n_features_latent_1domain, n_features_latent_1domain)
The adjacency matrix of latent factors of interest, where
``n_features_latent_1domain`` is the number of latent
factors of interest.
"""
return self._adjacency_matrix
@property
def measurement_matrix_(self):
"""Estimated measurement matrix between measurement variables and
latent factors from all domains.
Returns
-------
measurement_matrix_ : array-like, shape (n_features, n_features_latent)
The measurement matrix between measurement variables and
latent factors, where ``n_features_latent`` is the
number of latent factors and ``n_features`` is the
number of measurement variables from all domains.
"""
return self._measurement_matrix