主要代码参考于此,感谢b站大学
主要代码参考于此,感谢GitHub老师
本篇主要记录一下几种常用的降维算法
数据是darmanis数据集,包括466个细胞2000个高表达量基因,分为九种类型的细胞集群。数据部分截图:
其中行为基因列为细胞,每个数据表示基因在细胞中的表达量。
1.PCA
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.preprocessing import StandardScaler, MinMaxScaler
def pca(data, n_dim):
num_cells, num_genes = data.shape
# 去均值
data = data - np.mean(data, axis=0)
# 获得协方差矩阵 Conv_data=(num_genes,num_genes)
Conv_data = np.dot(data.T, data) / num_cells - 1
# 求特征值和特征向量
eig_values, eig_vectors = np.linalg.eig(Conv_data)
# 将特征值从大到小排序,并取前n_dim个
indexs = np.argsort(-eig_values)[:n_dim]
# 取对应的特征向量组成降维矩阵 pick_eig_vector=(num_genes,n_dim)
pick_eig_vector = eig_vectors[:, indexs]
# 获得新的矩阵 New_data=(num_cells,n_dim)
New_data = np.dot(data, pick_eig_vector)
return New_data
def draw_pic(datas, labs):
plt.cla()
unque_labs = np.unique(labs)
colors = [plt.cm.Spectral(each)
for each in np.linspace(0, 1, len(unque_labs))]
p = []
legends = []
for i in range(len(unque_labs)):
index = np.where(labs == unque_labs[i])
pi = plt.scatter(datas[index, 0], datas[index, 1], c=[colors[i]])
p.append(pi)
legends.append(unque_labs[i])
plt.legend(p, legends)
plt.show()
if __name__ == "__main__":
data = pd.read_csv('dataset/darmanis.csv')
data = data.to_numpy().T
matrix = data[1:, 2:].astype(float)
print(matrix.shape)
labels = data[1:,1:2].astype(int).flatten()
print(labels.shape)
new_data = pca(matrix, 2)
draw_pic(new_data, labels)
降维效果如图
2.T-SNE
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from PCA import pca
from K_Means import KMeans
# 计算任意两点之前距离 ||x_i-x_j||^2
# X 维度 [N,D]
def cal_pairwise_dist(X):
sum_X = np.sum(np.square(X), 1)
D = np.add(np.add(-2 * np.dot(X, X.T), sum_X).T, sum_X)
# 返回任意两个点之间距离
return D
# 计算P_ij 以及 log松弛度
def calc_P_and_entropy(D, beta=1.0):
P = np.exp(-D.copy() * beta)
sumP = np.sum(P)
# 计算熵
log_entropy = np.log(sumP) + beta * np.sum(D * P) / sumP
P = P / sumP
return P, log_entropy
# 二值搜索寻找最优的 sigma
def binary_search(D, init_beta, logU, tol=1e-5, max_iter=50):
beta_max = np.inf
beta_min = -np.inf
beta = init_beta
P, log_entropy = calc_P_and_entropy(D, beta)
diff_log_entropy = log_entropy - logU
m_iter = 0
while np.abs(diff_log_entropy) > tol and m_iter < max_iter:
# 交叉熵比期望值大,增大beta
if diff_log_entropy > 0:
beta_min = beta
if beta_max == np.inf or beta_max == -np.inf:
beta = beta * 2
else:
beta = (beta + beta_max) / 2.
# 交叉熵比期望值小, 减少beta
else:
beta_max = beta
if beta_min == -np.inf or beta_min == -np.inf:
beta = beta / 2
else:
beta = (beta + beta_min) / 2.
# 重新计算
P, log_entropy = calc_P_and_entropy(D, beta)
diff_log_entropy = log_entropy - logU
m_iter = m_iter + 1
# 返回最优的 beta 以及所对应的 P
return P, beta
# 给定一组数据 datas :[N,D]
# 计算联合概率 P_ij : [N,N]
def p_joint(datas, target_perplexity):
N, D = np.shape(datas)
# 计算两两之间的距离
distances = cal_pairwise_dist(datas)
beta = np.ones([N, 1]) # beta = 1/(2*sigma^2)
logU = np.log(target_perplexity)
p_conditional = np.zeros([N, N])
# 对每个样本点搜索最优的sigma(beta) 并计算对应的P
for i in range(N):
if i % 500 == 0:
print("给 %d 个点计算联合概率 P_ij" % (i))
# 删除 i -i 点
Di = np.delete(distances[i, :], i)
# 进行二值搜索,寻找 beta
# 使 log_entropy 最接近 logU
P, beta[i] = binary_search(Di, beta[i], logU)
# 在ii的位置插0
p_conditional[i] = np.insert(P, i, 0)
# 计算联合概率
P_join = p_conditional + p_conditional.T
P_join = P_join / np.sum(P_join)
print("σ的平均值: %f" % np.mean(np.sqrt(1 / beta)))
return P_join
# Y : 低维数据 [N,d]
# 根据Y,计算低维的联合概率 q_ij
def q_tsne(Y):
N = np.shape(Y)[0]
sum_Y = np.sum(np.square(Y), 1)
num = -2. * np.dot(Y, Y.T)
num = 1. / (1. + np.add(np.add(num, sum_Y).T, sum_Y))
num[range(N), range(N)] = 0.
Q = num / np.sum(num)
Q = np.maximum(Q, 1e-12)
return Q, num
# datas 输入高维数据 [N,D]
# labs 高维数据的标签[N,1]
# dim 降维的维度 d
# plot 绘图
def estimate_tsen(datas, labs, dim, target_perplexity, plot=False):
N, D = np.shape(datas)
# 随机初始化低维数据Y
Y = np.random.randn(N, dim)
# 计算高维数据的联合概率
print("计算高维数据的联合概率")
P = p_joint(datas, target_perplexity)
# 开始若干轮对 P 进行放大
P = P * 4.
P = np.maximum(P, 1e-12)
# 开始进行迭代训练
# 训练相关参数
max_iter = 1500
initial_momentum = 0.5 # 摩擦系数
final_momentum = 0.8
eta = 500 # 学习率
min_gain = 0.01
dY = np.zeros([N, dim]) # 梯度
iY = np.zeros([N, dim]) # Y的变化
gains = np.ones([N, dim])
for m_iter in range(max_iter):
# 计算 Q
Q, num = q_tsne(Y)
# 计算梯度
PQ = P - Q
for i in range(N):
dY[i, :] = np.sum(np.tile(PQ[:, i] * num[:, i], (dim, 1)).T * (Y[i, :] - Y), 0)
# 带有冲量的梯度下降
if m_iter < 20:
momentum = initial_momentum
else:
momentum = final_momentum
# 如果梯度方向和增益方向相同,减小增加,反之增加
gains = (gains + 0.2) * ((dY > 0.) != (iY > 0.)) +
(gains * 0.8) * ((dY > 0.) == (iY > 0.))
gains[gains < min_gain] = min_gain
# 当前梯度记录了上一个梯度方向信息,
iY = momentum * iY - eta * (gains * dY)
Y = Y + iY
# Y 取中心化,减均值
Y = Y - np.tile(np.mean(Y, 0), (N, 1))
# Compute current value of cost function
if (m_iter + 1) % 10 == 0:
C = np.sum(P * np.log(P / Q))
print("Iteration %d: loss is %f" % (m_iter + 1, C))
# 停止放大P
if m_iter == 100:
P = P / 4.
if plot and m_iter % 100 == 0:
print("Draw Map")
# draw_pic(Y, labs, name="%d.jpg" % (m_iter))
return Y
# 画出每次迭代的降维效果图
def draw_pic(datas, labs, name='1.jpg'):
plt.cla()
unque_labs = np.unique(labs)
colors = [plt.cm.Spectral(each)
for each in np.linspace(0, 1, len(unque_labs))]
p = []
legends = []
for i in range(len(unque_labs)):
index = np.where(labs == unque_labs[i])
pi = plt.scatter(datas[index, 0], datas[index, 1], c=[colors[i]])
p.append(pi)
legends.append(unque_labs[i])
plt.legend(p, legends)
plt.show()
if __name__ == "__main__":
data = pd.read_csv('dataset/darmanis.csv')
data = data.to_numpy().T
matrix = data[1:, 2:].astype(float)
print(matrix.shape)
labels = data[1:, 1:2]
print(labels.shape)
print(labels)
pca_matrix = pca(matrix, 30)
X = pca_matrix.real
Y = estimate_tsen(X, labels, 2, 30, plot=True)
# 降维后用kmeans聚类
centroids, clusterAssment=KMeans(Y,9)
defalut_color = ['#FF0000', '#FF1493', '#9400D3', '#7B68EE', '#FFD700',
'#00BFFF', '#00FF00', '#FF8C00', '#FF4500', '#8B4513',
'#1F77B4', '#FF7F0E', '#2CA02C', '#D62728']
# 根据不同的簇上色
for i in range(Y.shape[0]):
colorIndex = int(clusterAssment[i, 0])
plt.scatter(Y[i, 0], Y[i, 1], color=defalut_color[colorIndex])
# 绘制质心
mark = ['Dr', 'Db', 'Dg', 'Dk', '^b', '+b', 'sb', 'db', '
和PCA进行结合,先用PCA降至30维,再用TSNE降至两维以可视化展示
聚类效果
3.MDS
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from K_Means import KMeans
from sklearn.manifold import TSNE
'''
MDS算法
基本思想:将高维坐标中的点投影到低维空间中,保持点彼此之间的相似性尽可能不变。
算法流程:
1.利用给定数据计算距离矩阵
2.计算降维后矢量Z的互相关矩阵B
3.对B进行特征值分解,选取较大的若干特征值与特征向量
'''
# loading dataset
def loadData(filename):
data = pd.read_csv(filename).to_numpy()
data = data.T
return data
# normalization
def normalization(data):
maxcols = data.max(axis=0)
mincols = data.min(axis=0)
data_shape = data.shape
data_rows = data_shape[0]
data_cols = data_shape[1]
nor_data = np.empty((data_rows, data_cols))
for i in range(data_cols):
nor_data[:, i] = (data[:, i] - mincols[i]) / (maxcols[i] - mincols[i])
return nor_data
# Calculate the distance between the two sample points
def calDistance(data):
n_cells, n_genes = data.shape
distance = np.zeros((n_cells, n_cells))
for i in range(n_cells):
for j in range(n_cells):
distance[i, j] = np.sqrt(np.dot((data[i] - data[j]), (data[i] - data[j]).T))
return distance
'''
parameter:
distance:The distance matrix obtained by the calDistance function
dim:The number of dimensions you want to reduce
return:Compute the cross-correlation matrix B of the dimension reduction vector Z
'''
def mds(distance, dim):
n_cells = distance.shape[0]
distance[distance < 0] = 0
distance = distance ** 2
T1 = np.ones((n_cells, n_cells)) * np.sum(distance) / n_cells ** 2
T2 = np.sum(distance, axis=1, keepdims=True) / n_cells
T3 = np.sum(distance, axis=0, keepdims=True) / n_cells
B = -(T1 - T2 - T3 + distance)
eigval, eigvec = np.linalg.eig(B)
index = np.argsort(-eigval)[:dim]
sort_eigval = eigval[index].real
sort_eigvec = eigvec[:, index]
return sort_eigvec * sort_eigval ** 0.5
# Dimensionality reduction visualization
def showDimension(data, labels):
plt.cla()
unique_labels = np.unique(labels)
colors = [plt.cm.Spectral(each) for each in np.linspace(0, 1, len(unique_labels))]
p = []
legends = []
for i in range(len(unique_labels)):
index = np.where(labels == unique_labels[i])
pi = plt.scatter(data[index, 0], data[index, 1], c=[colors[i]])
p.append(pi)
legends.append(unique_labels[i])
plt.legend(p, legends)
plt.show()
# Cluster visualization
def showCluster(dataSet, k, clusterAssment):
m, n = dataSet.shape
if n != 2:
print("Data is not two-dimensional")
return 1
defalut_color = ['#95d0fc', '#96f97b', '#c79fef',
'#ff81c0','#00035b', '#06c2ac',
'#ffff14', '#033500', '#c20078']
if k > len(defalut_color):
print("K is too big")
return 1
# Draw all samples
for i in range(m):
colorIndex = int(clusterAssment[i, 0])
plt.scatter(dataSet[i, 0], dataSet[i, 1], color=defalut_color[colorIndex])
if __name__ == '__main__':
dataset = loadData('dataset/darmanis.csv')
data = dataset[1:, 2:].astype(np.float32)
data = normalization(data)
labels = dataset[1:, 1].astype(int).flatten()
# Using MDS
distance = calDistance(data)
mds_data = mds(distance, 2)
_, clusterAssment1 = KMeans(mds_data, 9)
plt.subplot(1, 2, 1)
plt.title('MDS+K-means')
showCluster(mds_data, 9, clusterAssment1)
# showDimension(dim_data, labels)
# Using TSNE
tsne_data = TSNE(n_components=2).fit_transform(data)
_, clusterAssment2 = KMeans(tsne_data, 9)
plt.subplot(1, 2, 2)
plt.title('TSNE+K-means')
showCluster(tsne_data, 9, clusterAssment2)
plt.show()
darmanis数据集聚类效果
zeisel数据集聚类效果(zeisel数据集包括3005个细胞2000个高表达量基因,样本太大,我的小破电脑起码跑了五分钟才出效果)



