LiNA [1] allows to locate the latent factors as well as uncover the causal structure between such latent factors of interests. Causal structure between latent factors can be ubiquitous in real-world applications, e.g., relations bewteen anxiety, depression, and coping in psychology [2] [3] , etc.
This method is based on the LiNA model as shown below,
where \({\varepsilon}^{(m)}\) and \({e}^{(m)}\) are random vectors that collect external influences, and errors, respectively, and they are independent with each other. \({f}^{(m)}\) and \({x}^{(m)}\) are random vectors that collect latent factors, and observed data, respectively. \({B}^{(m)}\) is a matrix that collects causal effects \(b_{ij}^{(m)}\) between \({f}_i^{(m)}\) and \({f}_j^{(m)}\), while \({G}^{(m)}\) collects factor loadings \(g_{ij}^{(m)}\) between \({f}_j^{(m)}\) and \({x}_i^{(m)}\). \(m\) stands for the \(m^{th}\) domain.
This method makes the following assumptions.
No causal relations between observed variables
Non-Gaussian continuous distubance variables (except at most one) for latent factors
Gaussian error variables (except at most one) for observed variables
Each latent factor has at lest 2 pure measurement variables.
Import and settings
In this example, we need to import numpy
, and random
in addition to lingam
import numpy as np
import random
import lingam
import lingam.utils as ut
print([np.__version__, lingam.__version__])
['1.20.3', '1.5.4']
Single-domain test data
First, we generate a causal structure with 10 measurement variables and 5 latent factors, where each latent variable has 2 pure measurement variables.
noise_ratios = 0.1
n_features = 10 # number of measurement vars.
n_samples, n_features_latent, n_edges, graph_type, sem_type = 1000, 5, 5, 'ER', 'laplace'
B_true = ut.simulate_dag(n_features_latent, n_edges, graph_type)
W_true = ut.simulate_parameter(B_true) # row to column
f, E, E_weight = ut.simulate_linear_sem(W_true, n_samples, sem_type)
f_nor = np.zeros([n_samples, n_features_latent])
scale = np.zeros([1, n_features_latent])
W_true_scale = np.zeros([n_features_latent, n_features_latent])
for j in range(n_features_latent):
scale[0, j] = np.std(f[:, j])
f_nor[:, j] = f[:, j] / np.std(f[:, j])
W_true_scale[:, j] = W_true[:, j] / scale[0, j] # scaled W_true
# generate noises ei of xi
e = np.random.random([n_features, n_samples])
for j in range(n_features):
e[j, :] = e[j, :] - np.mean(e[j, :])
e[j, :] = e[j, :] / np.std(e[j, :])
G = np.zeros([n_features, n_features_latent])
G[0, 0] = random.choice((-1, 1)) * (0.2 + 0.5 * np.random.rand(1))
G[1, 0] = random.choice((-1, 1)) * (0.2 + 0.5 * np.random.rand(1))
G[2, 1] = random.choice((-1, 1)) * (0.2 + 0.5 * np.random.rand(1))
G[3, 1] = random.choice((-1, 1)) * (0.2 + 0.5 * np.random.rand(1))
G[4, 2] = random.choice((-1, 1)) * (0.2 + 0.5 * np.random.rand(1))
G[5, 2] = random.choice((-1, 1)) * (0.2 + 0.5 * np.random.rand(1))
G[6, 3] = random.choice((-1, 1)) * (0.2 + 0.5 * np.random.rand(1))
G[7, 3] = random.choice((-1, 1)) * (0.2 + 0.5 * np.random.rand(1))
G[8, 4] = random.choice((-1, 1)) * (0.2 + 0.5 * np.random.rand(1))
G[9, 4] = random.choice((-1, 1)) * (0.2 + 0.5 * np.random.rand(1))
G_sign = np.sign(G)
# normalize G
G_nor = np.zeros([n_features, n_features_latent])
for j in range(n_features):
e[j, :] = e[j, :] / np.sqrt(np.square(np.sum(G[j, :])) + np.square(noise_ratios))
G_nor[j, :] = G[j, :] / np.sqrt(np.square(np.sum(G[j, :])) + np.square(noise_ratios))
X = G_nor @ f_nor.T + noise_ratios * e # X:n_features*n_samples "e is small or n_features are large"
X = X.T
print('The true adjacency matrix is:\n', W_true)
The true adjacency matrix is:
[[ 0. 0. 0. 0. 0. ]
[ 0. 0. 0. 0.52905044 -1.87243368]
[-1.94141783 0. 0. 0. 0. ]
[ 0. 0. 0. 0. 1.12108398]
[ 0. 0. -0.87478353 0. 0. ]]
Causal Discovery for single-domain data
To run causal discovery, we create a LiNA
object and call the fit
model = lingam.LiNA(), G_sign, scale)
<lingam.lina.LiNA at 0x2130f482970>
Using the _adjacency_matrix
properties, we can see the estimated adjacency
matrix between latent factors.
print('The estimated adjacency matrix is:\n', model._adjacency_matrix)
The estimated adjacency matrix is:
[[ 0. 0. 0. 0. 0. ]
[ 0. 0. 0. 0.51703777 -1.75584025]
[-1.75874721 0. 0. 0. 0. ]
[ 0. 0. 0. 0. 0.99860274]
[ 0. 0. -0.77518384 0. 0. ]]
Multi-domain test data
We generate a causal structure with 2 domains where in each domain there are 6 measurement variables and 3 latent factors. Each latent factor has 2 pure measurement variables.
n_features = 6 # number of measurement vars. in each domain
noise_ratios = 0.1
n_samples, n_features_latent, n_edges, graph_type, sem_type1, sem_type2 = 1000, 3, 3, 'ER', 'subGaussian', 'supGaussian'
# n_edges: number of edges btw. latent factors in a domain
# sem_type1/sem_type2: different distributions of noises from different domains
B_true = ut.simulate_dag(n_features_latent, n_edges, graph_type) # skeleton btw. latent factors
W_true = ut.simulate_parameter(B_true) # causal effects matrix btw. latent factors
# 1 domain
f, E, E_weight = ut.simulate_linear_sem(W_true, n_samples, sem_type1)
f_nor1 = np.zeros([n_samples, n_features_latent])
scale1 = np.zeros([1, n_features_latent])
W_true_scale = np.zeros([n_features_latent, n_features_latent])
for j in range(n_features_latent):
scale1[0, j] = np.std(f[:, j])
f_nor1[:, j] = f[:, j] / np.std(f[:, j])
W_true_scale[:, j] = W_true[:, j] / scale1[0, j]
e = np.random.random([n_features, n_samples])
for j in range(n_features):
e[j, :] = e[j, :] - np.mean(e[j, :])
e[j, :] = e[j, :] / np.std(e[j, :])
G1 = np.zeros([n_features, n_features_latent])
G1[0, 0] = random.choice((-1, 1)) * (0.2 + 0.5 * np.random.rand(1))
G1[1, 0] = random.choice((-1, 1)) * (0.2 + 0.5 * np.random.rand(1))
G1[2, 1] = random.choice((-1, 1)) * (0.2 + 0.5 * np.random.rand(1))
G1[3, 1] = random.choice((-1, 1)) * (0.2 + 0.5 * np.random.rand(1))
G1[4, 2] = random.choice((-1, 1)) * (0.2 + 0.5 * np.random.rand(1))
G1[5, 2] = random.choice((-1, 1)) * (0.2 + 0.5 * np.random.rand(1))
G_sign1 = np.sign(G1)
# normalize G
G_nor1 = np.zeros([n_features, n_features_latent])
for j in range(n_features):
e[j, :] = e[j, :] / np.sqrt(np.square(np.sum(G1[j, :])) + np.square(noise_ratios))
G_nor1[j, :] = G1[j, :] / np.sqrt(np.square(np.sum(G1[j, :])) + np.square(noise_ratios))
X1 = G_nor1 @ f_nor1.T + noise_ratios * e # "the noise ratio e is small or n_features is large"
X1 = X1.T
# 2 domain
f2, E, E_weight = ut.simulate_linear_sem(W_true, n_samples, sem_type2)
f_nor2 = np.zeros([n_samples, n_features_latent])
scale2 = np.zeros([1, n_features_latent])
W_true_scale = np.zeros([n_features_latent, n_features_latent])
for j in range(n_features_latent):
scale2[0, j] = np.std(f2[:, j])
f_nor2[:, j] = f2[:, j] / np.std(f2[:, j])
W_true_scale[:, j] = W_true[:, j] / scale2[0, j]
e = np.random.random([n_features, n_samples])
for j in range(n_features):
e[j, :] = e[j, :] - np.mean(e[j, :])
e[j, :] = e[j, :] / np.std(e[j, :])
G2 = np.zeros([n_features, n_features_latent])
G2[0, 0] = random.choice((-1, 1)) * (0.2 + 0.5 * np.random.rand(1))
G2[1, 0] = random.choice((-1, 1)) * (0.2 + 0.5 * np.random.rand(1))
G2[2, 1] = random.choice((-1, 1)) * (0.2 + 0.5 * np.random.rand(1))
G2[3, 1] = random.choice((-1, 1)) * (0.2 + 0.5 * np.random.rand(1))
G2[4, 2] = random.choice((-1, 1)) * (0.2 + 0.5 * np.random.rand(1))
G2[5, 2] = random.choice((-1, 1)) * (0.2 + 0.5 * np.random.rand(1))
G_sign2 = np.sign(G2)
# normalize G
G_nor2 = np.zeros([n_features, n_features_latent])
for j in range(n_features):
e[j, :] = e[j, :] / np.sqrt(np.square(np.sum(G2[j, :])) + np.square(noise_ratios))
G_nor2[j, :] = G2[j, :] / np.sqrt(np.square(np.sum(G2[j, :])) + np.square(noise_ratios))
X2 = G_nor2 @ f_nor2.T + noise_ratios * e
X2 = X2.T # X:n_samples * n_features
# augment the data X
X = scipy.linalg.block_diag(X1, X2)
G_sign = scipy.linalg.block_diag(G_sign1, G_sign2)
scale = scipy.linalg.block_diag(scale1, scale2)
print('The true adjacency matrix is:\n', W_true)
The true adjacency matrix is:
[[0. 1.18580721 1.14604785]
[0. 0. 0. ]
[0. 0.63920121 0. ]]
Causal Discovery for multi-domain data
To run causal discovery, we create a MDLiNA
object and call the fit
model = lingam.MDLiNA(), G_sign, scale)
<lingam.lina.MDLiNA at 0x1812ee2fdf0>
Using the _adjacency_matrix
properties, we can see the estimated adjacency
matrix between latent factors of interest.
print('The estimated adjacency matrix is:\n', model._adjacency_matrix)
The estimated adjacency matrix is:
[[ 0. 0.34880702 -0.78706636]
[ 0. 0. 0.61577239]
[ 0. 0. 0. ]]