"""
Python implementation of the LiNGAM algorithms.
The LiNGAM Project: https://sites.google.com/view/sshimizu06/lingam
"""
from itertools import product
import networkx as nx
import numpy as np
import pandas as pd
import scipy.optimize as sopt
from scipy.special import expit as sigmoid
from sklearn.linear_model import (LinearRegression, LogisticRegression,
PoissonRegressor)
from sklearn.metrics import log_loss
import lingam.utils as ut
[docs]
class LiM:
"""Implementation of LiM Algorithm [1]_
References
----------
.. [1] Zeng Y, Shimizu S, Matsui H, et al. Causal discovery for
linear mixed data[C]//Conference on Causal Learning and Rea-
soning. PMLR, 2022: 994-1009.
"""
[docs]
def __init__(
self,
lambda1=0.1,
loss_type="mixed",
max_iter=150,
h_tol=1e-8,
rho_max=1e16,
w_threshold=0.1,
):
"""Construct a LiM model.
Parameters
----------
lambda1 : float, optional (default=0.1)
L1 penalty parameter.
loss_type : str, (default='mixed')
Type of distribution of the noise.
max_iter : int, (default=150)
Maximum number of dual ascent steps.
h_tol : float, (default=1e-8)
Tolerance parameter of the acyclicity constraint.
rho_max : float, (default=1e+16)
Maximum value of the regularization parameter rho.
w_threshold : float (default=0.1)
Drop edge if the weight btw. variables is less than w_threshold.
"""
self._lambda1 = lambda1
self._loss_type = loss_type
self._max_iter = max_iter
self._h_tol = h_tol
self._rho_max = rho_max
self._w_threshold = w_threshold
self._adjacency_matrix = None
[docs]
def fit(self, X, dis_con, only_global=False, is_poisson=False):
"""Fit the model to X with mixed data.
Parameters
----------
X : array-like, shape (n_samples, n_features)
Training data, where ``n_samples`` is the number of samples
and ``n_features`` is the number of observed variables.
dis_con: array-like, shape (1, n_features)
Indicators of discrete or continuous variables, where "1"
indicates a continuous variable, while "0" a discrete
variable.
only_global: boolean, optional (default=False)
If True, then the method will only perform the global optimization
to estimate the causal structure, without the local search phase.
is_poisson: boolean, optional (default=False)
If True, then the method will use poisson regression model to compute
the log-likelihood in the local search phase.
Returns
-------
self : object
Returns the instance of self.
"""
W_min_lss = self._estimate_LiM(X, dis_con, only_global, is_poisson)
self._adjacency_matrix = W_min_lss
return self._adjacency_matrix
def _estimate_LiM(self, X, dis_con, only_global, is_poisson):
"""Estimate the adjacency matrix btw. mixed variables"""
def _loss(W):
"""Evaluate value and gradient of loss."""
M = X @ W
if self._loss_type == "logistic":
loss = 1.0 / X.shape[0] * (np.logaddexp(0, M) - X * M).sum()
G_loss = 1.0 / X.shape[0] * X.T @ (sigmoid(M) - X)
elif self._loss_type == "poisson":
loss = -np.sum(X.T @ M) + np.exp(M).sum()
for j in range(X.shape[0]):
for k in range(X.shape[1]):
loss += np.log(_factorial(X[j][k]))
loss = 1.0 / X.shape[0] * loss
G_loss = 1.0 / X.shape[0] * (-X.T @ X + X.T @ np.exp(M))
elif self._loss_type == "laplace":
R = X - M
loss = -1.0 / X.shape[0] * np.sum(-np.log(np.cosh(R)))
G_loss = 1.0 / X.shape[0] * X.T @ np.tanh(R)
elif self._loss_type == "mixed":
R = X - M
a1, a2 = 1, 1
loss_dis = (
(np.logaddexp(0, M) - X * M) * np.absolute(dis_con - 1)
).sum()
loss_con = np.sum(-np.log(np.cosh(R)) * dis_con)
loss = -1.0 / X.shape[0] * (a1 * loss_dis + a2 * loss_con)
W_dis = np.zeros([d, d])
W_con = np.zeros([d, d])
for ii in np.where(dis_con[0, :] == 0):
W_dis[[ii], :] = 1
W_dis[:, [ii]] = 1
for jj in np.where(dis_con[0, :] == 1):
W_con[[jj], :] = 1
W_con[:, [jj]] = 1
G_dis = X.T @ (sigmoid(M) - X) * W_dis
G_con = -X.T @ np.tanh(R) * W_con
G_loss = 1.0 / X.shape[0] * (G_dis + G_con)
elif self._loss_type == "mixed_dag":
dag = nx.DiGraph(W)
lingam_data = np.transpose(X)
df = pd.DataFrame(X)
is_continuous = dis_con[0, :].astype(bool)
is_discrete = np.invert(is_continuous)
loss = -_bic_scoring(
dag, is_discrete, is_poisson, df, lingam_data
) # lingam_data:dims*samples
R = X - M
G_loss = 1.0 / X.shape[0] * (X.T @ (sigmoid(M) - X) - X.T @ np.tanh(R))
else:
raise ValueError("unknown loss type")
return loss, G_loss
def _bic_scoring(dag: nx.DiGraph, is_discrete, is_poisson, df, lingam_data):
"""Evaluate value of loss given the DAG."""
sample_size = df.shape[0]
K = dag.number_of_edges() + dag.number_of_nodes()
# K = dag.number_of_edges() + np.sum(is_discrete)
penalty = np.log(sample_size) / 2 * K
total_score = 0.0 - penalty
for i in dag.nodes:
parents_i = [j for j in dag.predecessors(i)]
# 離散 logistic
if is_discrete[i] and is_poisson == False:
if not parents_i:
# Bernoulli binary variable, likelihood using Bernoulli model with MLE parameters
frequency_table = df[i].value_counts()
likekihood_bernoulli = 0.0
for count_k in frequency_table:
likekihood_bernoulli += count_k * (
np.log(count_k) - np.log(frequency_table.sum())
)
total_score += likekihood_bernoulli
elif parents_i:
# Logistic binary variable, likelihood using logistic regression model.
X = df[parents_i]
y = df[i]
logistic = LogisticRegression(
solver="lbfgs"
) # or solver='liblinear'
logistic.fit(X, y)
predict_prob = logistic.predict_proba(X)
# Negative cross-entropy loss a.k.a log-likelihood
likekihood_logistic = -log_loss(
y_true=y, y_pred=predict_prob, normalize=False
)
total_score += likekihood_logistic
pass
# 離散 poisson
elif is_discrete[i] and is_poisson == True:
if not parents_i:
# Bernoulli count variable, likelihood using Bernoulli model with MLE parameters
frequency_table = df[i].value_counts()
likekihood_bernoulli = 0.0
for count_k in frequency_table:
likekihood_bernoulli += count_k * (
np.log(count_k) - np.log(frequency_table.sum())
)
total_score += likekihood_bernoulli
elif parents_i:
# Count variable, likelihood using poisson regression model.
for iii in range(len(parents_i)):
X = lingam_data[parents_i[iii]]
X = X.reshape(-1, 1)
y = lingam_data[i]
poisson = PoissonRegressor()
poisson.fit(X, y)
beta = poisson.coef_
# compute likelihood
likekihood_poisson = -np.sum(
y * lingam_data[parents_i[iii]] * beta
) + np.sum(np.exp(lingam_data[parents_i[iii]] * beta))
for j in range(len(y)):
likekihood_poisson += np.log(_factorial(y[j]))
total_score += likekihood_poisson
# # or we can compute likelihood via nn
# pnllloss = nn.PoissonNLLLoss()
# log_input = np.log(lingam_data[parents_i] * beta)
# target = y # torch.randn(5, 2)
# output = pnllloss(log_input, target)
# likekihood_poisson = output.item()
# total_score += likekihood_poisson
# 連続
elif not is_discrete[i]:
b_i = np.zeros(dag.number_of_nodes())
if parents_i: # estimate b_i
X = df[parents_i]
y = df[i]
lr = LinearRegression(fit_intercept=True) # because zero means
lr.fit(X, y)
for index, j in enumerate(parents_i):
b_i[j] = lr.coef_[index]
bi_0 = lr.intercept_
else:
bi_0 = np.mean(df[i])
likekihood_lingam = ut.likelihood_i(lingam_data, i, b_i, bi_0)
total_score += likekihood_lingam
pass
return total_score
def _h(W):
"""Evaluate value and gradient of acyclicity constraint."""
# E = slin.expm(W * W) # (Zheng et al. 2018)
# h = np.trace(E) - d
# A different formulation, slightly faster at the cost of numerical stability
M = np.eye(d) + W * W / d # (Yu et al. 2019)
G_h = np.linalg.matrix_power(M, d - 1)
h = (G_h.T * M).sum() - d
return h, G_h
def _adj(w):
"""Convert doubled variables ([2 d^2] array) back to original variables ([d, d] matrix)."""
return (w[: d * d] - w[d * d :]).reshape([d, d])
def _func(w):
"""Evaluate value and gradient of augmented Lagrangian for doubled variables ([2 d^2] array)."""
W = _adj(w)
loss, G_loss = _loss(W)
h, G_h = _h(W)
obj = loss + 0.5 * rho * h * h + alpha * h + self._lambda1 * w.sum()
# G_smooth = G_loss + (rho * h + alpha) * G_h # Zheng Xun Dag 2018
G_smooth = G_loss + (rho * h + alpha) * G_h.T * W * 2 # 2019
g_obj = np.concatenate(
(G_smooth + self._lambda1, -G_smooth + self._lambda1), axis=None
)
return obj, g_obj
def _factorial(y):
if not isinstance(y, int):
y = round(y)
if y == 0 or y == 1 or y < 0:
return 1
else:
return y * _factorial(y - 1)
n, d = X.shape
w_est, rho, alpha, h = (
np.random.random(2 * d * d),
1.0,
0.0,
np.inf,
) # double w_est into (w_pos, w_neg)
bnds = [
(0, 0) if i == j else (0, None)
for _ in range(2)
for i in range(d)
for j in range(d)
]
# if self._loss_type == 'l2':
# X = X - np.mean(X, axis=0, keepdims=True)
for _ in range(self._max_iter):
w_new, h_new = None, None
while rho < self._rho_max:
sol = sopt.minimize(
_func, w_est, method="L-BFGS-B", jac=True, bounds=bnds
)
# print('--- One iteration passed.....', sol.fun)
w_new = sol.x
h_new, _ = _h(_adj(w_new))
if h_new >= 0.25 * h:
rho *= 10
else:
break
w_est, h = w_new, h_new
alpha += rho * h
# print('------- rho is:', rho)
if h <= self._h_tol and h != 0:
break
if rho >= self._rho_max * 1e-6 and h > 1e05: # avoid the full graph
w_est = np.random.random(2 * d * d)
rho = 1.0
elif rho >= self._rho_max:
break
if np.sum(np.absolute(_adj(w_est))) < 0.09: # avoid the zero matrix
w_est = np.random.random(2 * d * d)
W_est = _adj(w_est)
W_est[np.abs(W_est) < self._w_threshold] = 0
print("W_est (without the 2nd phase) is: \n", W_est)
if not only_global:
self._loss_type = "mixed_dag"
aa, aaa = _loss(W_est) # loss
I = np.where(W_est != 0)
# check directions
W_min_lss = np.copy(W_est)
candi_setting = list(
product(range(2), repeat=len(I[0]))
) # 1:reverse the direction; 0:unchanged
for candi_setting_i in range(1, len(candi_setting)):
W_tmp = np.copy(W_est)
for iii in range(len(I[0])): # transform to W_tmp
if candi_setting[candi_setting_i][iii] == 1:
W_tmp[I[0][iii], I[1][iii]] = 0
W_tmp[I[1][iii], I[0][iii]] = 1
lss, lss_G = _loss(W_tmp)
if lss < aa and _h(W_tmp)[0] < self._h_tol:
W_min_lss = np.copy(W_tmp)
aa = lss
# prune process
if d > 2 and len(I[0]) > (d - 1): # > min_edge
W0 = np.copy(W_min_lss)
I_delete = np.where(W_min_lss != 0)
for delete_i in range(len(I_delete[0])):
W_tmp = np.copy(W0)
W_tmp[I_delete[0][delete_i], I_delete[1][delete_i]] = 0
lss, lss_G = _loss(W_tmp)
if lss < aa and _h(W_tmp)[0] < self._h_tol:
W_min_lss = np.copy(W_tmp)
aa = lss
#
if not np.all(
W_est.astype(bool) == W_min_lss.astype(bool)
): # if they are different
W0 = np.copy(W_est)
for delete_i in range(len(I[0])):
W_tmp = np.copy(W0)
W_tmp[I[0][delete_i], I[1][delete_i]] = 0
lss, lss_G = _loss(W_tmp)
if lss < aa and _h(W_tmp)[0] < self._h_tol:
W_min_lss = np.copy(W_tmp)
aa = lss
# add process
if d > 2 and len(I[0]) < (d * (d - 1) / 2):
W0 = np.copy(W_min_lss)
W_edges = W0 + W0.T + np.eye(d)
I_add = np.where(W_edges == 0) # add undirected edges' indices
for add_i in range(len(I_add[0])):
W_tmp = np.copy(W0)
W_tmp[I_add[0][add_i], I_add[1][add_i]] = 1
lss, lss_G = _loss(W_tmp)
if lss < aa and _h(W_tmp)[0] < self._h_tol:
W_min_lss = np.copy(W_tmp)
aa = lss
#
if not np.all(
W_est.astype(bool) == W_min_lss.astype(bool)
): # if they are different
W0 = np.copy(W_est)
W_edges = W0 + W0.T + np.eye(d)
I_add = np.where(W_edges == 0) # add undirected edges' indices
for add_i in range(len(I_add[0])):
W_tmp = np.copy(W0)
W_tmp[I_add[0][add_i], I_add[1][add_i]] = 1
lss, lss_G = _loss(W_tmp)
if lss < aa and _h(W_tmp)[0] < self._h_tol:
W_min_lss = np.copy(W_tmp)
aa = lss
# print('W_min_lss is:\n', W_min_lss)
# print('W_true is:\n', W_true)
else:
W_min_lss = np.copy(W_est)
return W_min_lss
@property
def adjacency_matrix_(self):
"""Estimated adjacency matrix between mixed variables.
Returns
-------
adjacency_matrix_ : array-like, shape (n_features, n_features)
The adjacency matrix of variables, where
``n_features`` is the number of observed variables.
"""
return self._adjacency_matrix