栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

使用机器学习算法实现单细胞测序数据的降维及聚类(一)

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

使用机器学习算法实现单细胞测序数据的降维及聚类(一)

主要代码参考于此,感谢b站大学
主要代码参考于此,感谢GitHub老师
本篇主要记录一下几种常用的降维算法
数据是darmanis数据集,包括466个细胞2000个高表达量基因,分为九种类型的细胞集群。数据部分截图:

其中行为基因列为细胞,每个数据表示基因在细胞中的表达量。

1.PCA

import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.preprocessing import StandardScaler, MinMaxScaler


def pca(data, n_dim):
    num_cells, num_genes = data.shape
    # 去均值
    data = data - np.mean(data, axis=0)
    # 获得协方差矩阵 Conv_data=(num_genes,num_genes)
    Conv_data = np.dot(data.T, data) / num_cells - 1
    # 求特征值和特征向量
    eig_values, eig_vectors = np.linalg.eig(Conv_data)
    # 将特征值从大到小排序,并取前n_dim个
    indexs = np.argsort(-eig_values)[:n_dim]
    # 取对应的特征向量组成降维矩阵  pick_eig_vector=(num_genes,n_dim)
    pick_eig_vector = eig_vectors[:, indexs]
    # 获得新的矩阵 New_data=(num_cells,n_dim)
    New_data = np.dot(data, pick_eig_vector)
    return New_data


def draw_pic(datas, labs):
    plt.cla()
    unque_labs = np.unique(labs)
    colors = [plt.cm.Spectral(each)
              for each in np.linspace(0, 1, len(unque_labs))]
    p = []
    legends = []
    for i in range(len(unque_labs)):
        index = np.where(labs == unque_labs[i])
        pi = plt.scatter(datas[index, 0], datas[index, 1], c=[colors[i]])
        p.append(pi)
        legends.append(unque_labs[i])
    plt.legend(p, legends)
    plt.show()


if __name__ == "__main__":
    data = pd.read_csv('dataset/darmanis.csv')
    data = data.to_numpy().T
    matrix = data[1:, 2:].astype(float)
    print(matrix.shape)
    labels = data[1:,1:2].astype(int).flatten()
    print(labels.shape)
    new_data = pca(matrix, 2)
    draw_pic(new_data, labels)

降维效果如图

2.T-SNE

import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from PCA import pca
from K_Means import KMeans

# 计算任意两点之前距离 ||x_i-x_j||^2
# X 维度 [N,D]
def cal_pairwise_dist(X):
    sum_X = np.sum(np.square(X), 1)
    D = np.add(np.add(-2 * np.dot(X, X.T), sum_X).T, sum_X)
    # 返回任意两个点之间距离
    return D


# 计算P_ij 以及 log松弛度
def calc_P_and_entropy(D, beta=1.0):
    P = np.exp(-D.copy() * beta)
    sumP = np.sum(P)
    # 计算熵
    log_entropy = np.log(sumP) + beta * np.sum(D * P) / sumP

    P = P / sumP
    return P, log_entropy


# 二值搜索寻找最优的 sigma
def binary_search(D, init_beta, logU, tol=1e-5, max_iter=50):
    beta_max = np.inf
    beta_min = -np.inf
    beta = init_beta

    P, log_entropy = calc_P_and_entropy(D, beta)
    diff_log_entropy = log_entropy - logU

    m_iter = 0
    while np.abs(diff_log_entropy) > tol and m_iter < max_iter:
        # 交叉熵比期望值大,增大beta
        if diff_log_entropy > 0:
            beta_min = beta
            if beta_max == np.inf or beta_max == -np.inf:
                beta = beta * 2
            else:
                beta = (beta + beta_max) / 2.
        # 交叉熵比期望值小, 减少beta
        else:
            beta_max = beta
            if beta_min == -np.inf or beta_min == -np.inf:
                beta = beta / 2
            else:
                beta = (beta + beta_min) / 2.

        # 重新计算
        P, log_entropy = calc_P_and_entropy(D, beta)

        diff_log_entropy = log_entropy - logU

        m_iter = m_iter + 1

    # 返回最优的 beta 以及所对应的 P
    return P, beta


# 给定一组数据 datas :[N,D]
# 计算联合概率 P_ij : [N,N]
def p_joint(datas, target_perplexity):
    N, D = np.shape(datas)
    # 计算两两之间的距离
    distances = cal_pairwise_dist(datas)

    beta = np.ones([N, 1])  # beta = 1/(2*sigma^2)
    logU = np.log(target_perplexity)
    p_conditional = np.zeros([N, N])
    # 对每个样本点搜索最优的sigma(beta) 并计算对应的P
    for i in range(N):
        if i % 500 == 0:
            print("给 %d 个点计算联合概率 P_ij" % (i))
        # 删除 i -i 点
        Di = np.delete(distances[i, :], i)
        # 进行二值搜索,寻找 beta
        # 使 log_entropy 最接近 logU
        P, beta[i] = binary_search(Di, beta[i], logU)

        # 在ii的位置插0
        p_conditional[i] = np.insert(P, i, 0)

    # 计算联合概率
    P_join = p_conditional + p_conditional.T
    P_join = P_join / np.sum(P_join)

    print("σ的平均值: %f" % np.mean(np.sqrt(1 / beta)))
    return P_join


# Y : 低维数据 [N,d]
# 根据Y,计算低维的联合概率 q_ij
def q_tsne(Y):
    N = np.shape(Y)[0]
    sum_Y = np.sum(np.square(Y), 1)
    num = -2. * np.dot(Y, Y.T)
    num = 1. / (1. + np.add(np.add(num, sum_Y).T, sum_Y))
    num[range(N), range(N)] = 0.
    Q = num / np.sum(num)
    Q = np.maximum(Q, 1e-12)
    return Q, num


# datas 输入高维数据 [N,D]
# labs 高维数据的标签[N,1]
# dim 降维的维度 d
# plot 绘图

def estimate_tsen(datas, labs, dim, target_perplexity, plot=False):
    N, D = np.shape(datas)

    # 随机初始化低维数据Y
    Y = np.random.randn(N, dim)

    # 计算高维数据的联合概率
    print("计算高维数据的联合概率")
    P = p_joint(datas, target_perplexity)

    # 开始若干轮对 P 进行放大
    P = P * 4.
    P = np.maximum(P, 1e-12)

    # 开始进行迭代训练
    # 训练相关参数
    max_iter = 1500
    initial_momentum = 0.5  # 摩擦系数
    final_momentum = 0.8
    eta = 500  # 学习率
    min_gain = 0.01
    dY = np.zeros([N, dim])  # 梯度
    iY = np.zeros([N, dim])  # Y的变化
    gains = np.ones([N, dim])

    for m_iter in range(max_iter):

        # 计算 Q
        Q, num = q_tsne(Y)

        # 计算梯度
        PQ = P - Q
        for i in range(N):
            dY[i, :] = np.sum(np.tile(PQ[:, i] * num[:, i], (dim, 1)).T * (Y[i, :] - Y), 0)

        # 带有冲量的梯度下降
        if m_iter < 20:
            momentum = initial_momentum
        else:
            momentum = final_momentum
        # 如果梯度方向和增益方向相同,减小增加,反之增加
        gains = (gains + 0.2) * ((dY > 0.) != (iY > 0.)) + 
                (gains * 0.8) * ((dY > 0.) == (iY > 0.))
        gains[gains < min_gain] = min_gain
        # 当前梯度记录了上一个梯度方向信息,
        iY = momentum * iY - eta * (gains * dY)
        Y = Y + iY

        # Y 取中心化,减均值
        Y = Y - np.tile(np.mean(Y, 0), (N, 1))

        # Compute current value of cost function
        if (m_iter + 1) % 10 == 0:
            C = np.sum(P * np.log(P / Q))
            print("Iteration %d: loss is %f" % (m_iter + 1, C))

        # 停止放大P
        if m_iter == 100:
            P = P / 4.

        if plot and m_iter % 100 == 0:
            print("Draw Map")
            # draw_pic(Y, labs, name="%d.jpg" % (m_iter))

    return Y

# 画出每次迭代的降维效果图
def draw_pic(datas, labs, name='1.jpg'):
    plt.cla()
    unque_labs = np.unique(labs)
    colors = [plt.cm.Spectral(each)
              for each in np.linspace(0, 1, len(unque_labs))]
    p = []
    legends = []
    for i in range(len(unque_labs)):
        index = np.where(labs == unque_labs[i])
        pi = plt.scatter(datas[index, 0], datas[index, 1], c=[colors[i]])
        p.append(pi)
        legends.append(unque_labs[i])

    plt.legend(p, legends)
    plt.show()


if __name__ == "__main__":
    data = pd.read_csv('dataset/darmanis.csv')
    data = data.to_numpy().T
    matrix = data[1:, 2:].astype(float)
    print(matrix.shape)
    labels = data[1:, 1:2]
    print(labels.shape)
    print(labels)
    pca_matrix = pca(matrix, 30)
    X = pca_matrix.real
    Y = estimate_tsen(X, labels, 2, 30, plot=True)
    # 降维后用kmeans聚类
    centroids, clusterAssment=KMeans(Y,9)
    defalut_color = ['#FF0000', '#FF1493', '#9400D3', '#7B68EE', '#FFD700',
             '#00BFFF', '#00FF00', '#FF8C00', '#FF4500', '#8B4513',
             '#1F77B4', '#FF7F0E', '#2CA02C', '#D62728']
    # 根据不同的簇上色
    for i in range(Y.shape[0]):
        colorIndex = int(clusterAssment[i, 0])
        plt.scatter(Y[i, 0], Y[i, 1], color=defalut_color[colorIndex])
    # 绘制质心
    mark = ['Dr', 'Db', 'Dg', 'Dk', '^b', '+b', 'sb', 'db', ' 

和PCA进行结合,先用PCA降至30维,再用TSNE降至两维以可视化展示

聚类效果

3.MDS

import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from K_Means import KMeans
from sklearn.manifold import TSNE

'''
MDS算法
基本思想:将高维坐标中的点投影到低维空间中,保持点彼此之间的相似性尽可能不变。
算法流程:
1.利用给定数据计算距离矩阵
2.计算降维后矢量Z的互相关矩阵B
3.对B进行特征值分解,选取较大的若干特征值与特征向量
'''

# loading dataset
def loadData(filename):
    data = pd.read_csv(filename).to_numpy()
    data = data.T
    return data


# normalization
def normalization(data):
    maxcols = data.max(axis=0)
    mincols = data.min(axis=0)
    data_shape = data.shape
    data_rows = data_shape[0]
    data_cols = data_shape[1]
    nor_data = np.empty((data_rows, data_cols))
    for i in range(data_cols):
        nor_data[:, i] = (data[:, i] - mincols[i]) / (maxcols[i] - mincols[i])
    return nor_data


# Calculate the distance between the two sample points
def calDistance(data):
    n_cells, n_genes = data.shape
    distance = np.zeros((n_cells, n_cells))
    for i in range(n_cells):
        for j in range(n_cells):
            distance[i, j] = np.sqrt(np.dot((data[i] - data[j]), (data[i] - data[j]).T))
    return distance


'''
parameter:
distance:The distance matrix obtained by the calDistance function
dim:The number of dimensions you want to reduce
return:Compute the cross-correlation matrix B of the dimension reduction vector Z

'''


def mds(distance, dim):
    n_cells = distance.shape[0]
    distance[distance < 0] = 0
    distance = distance ** 2
    T1 = np.ones((n_cells, n_cells)) * np.sum(distance) / n_cells ** 2
    T2 = np.sum(distance, axis=1, keepdims=True) / n_cells
    T3 = np.sum(distance, axis=0, keepdims=True) / n_cells
    B = -(T1 - T2 - T3 + distance)
    eigval, eigvec = np.linalg.eig(B)
    index = np.argsort(-eigval)[:dim]
    sort_eigval = eigval[index].real
    sort_eigvec = eigvec[:, index]
    return sort_eigvec * sort_eigval ** 0.5


# Dimensionality reduction visualization
def showDimension(data, labels):
    plt.cla()
    unique_labels = np.unique(labels)
    colors = [plt.cm.Spectral(each) for each in np.linspace(0, 1, len(unique_labels))]
    p = []
    legends = []
    for i in range(len(unique_labels)):
        index = np.where(labels == unique_labels[i])
        pi = plt.scatter(data[index, 0], data[index, 1], c=[colors[i]])
        p.append(pi)
        legends.append(unique_labels[i])
    plt.legend(p, legends)
    plt.show()


# Cluster visualization
def showCluster(dataSet, k, clusterAssment):
    m, n = dataSet.shape
    if n != 2:
        print("Data is not two-dimensional")
        return 1
    defalut_color = ['#95d0fc', '#96f97b', '#c79fef',
                     '#ff81c0','#00035b', '#06c2ac',
                     '#ffff14', '#033500', '#c20078']
    if k > len(defalut_color):
        print("K is too big")
        return 1
    # Draw all samples
    for i in range(m):
        colorIndex = int(clusterAssment[i, 0])
        plt.scatter(dataSet[i, 0], dataSet[i, 1], color=defalut_color[colorIndex])


if __name__ == '__main__':
    dataset = loadData('dataset/darmanis.csv')
    data = dataset[1:, 2:].astype(np.float32)
    data = normalization(data)
    labels = dataset[1:, 1].astype(int).flatten()
    # Using MDS
    distance = calDistance(data)
    mds_data = mds(distance, 2)
    _, clusterAssment1 = KMeans(mds_data, 9)
    plt.subplot(1, 2, 1)
    plt.title('MDS+K-means')
    showCluster(mds_data, 9, clusterAssment1)
    # showDimension(dim_data, labels)
    # Using TSNE
    tsne_data = TSNE(n_components=2).fit_transform(data)
    _, clusterAssment2 = KMeans(tsne_data, 9)
    plt.subplot(1, 2, 2)
    plt.title('TSNE+K-means')
    showCluster(tsne_data, 9, clusterAssment2)
    plt.show()

darmanis数据集聚类效果

zeisel数据集聚类效果(zeisel数据集包括3005个细胞2000个高表达量基因,样本太大,我的小破电脑起码跑了五分钟才出效果)

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/618940.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号