0
点赞
收藏
分享

微信扫一扫

【机器学习】利用numpy实现K-Means聚类算法(Python代码)

 
简 介:下面是我在学习时候的记录并加上自己的理解。本文意在记录自己近期学习过程中的所学所得,如有错误,欢迎大家指正。
 
关键词:Python、机器学习、K-Means聚类

"""
* Created with PyCharm
* 作者: 阿光
* 日期: 2021/7/18
* 时间: 14:19
* 描述: 利用numpy自己实现K-Means算法
"""

import random
import warnings

import numpy as np
import pandas as pd

from Draw import draw

warnings.filterwarnings('ignore')

# 加载数据
data = pd.read_csv("../../data/cluster_500-10_7.csv", encoding="gbk")
X = data.iloc[:, 1:-2].values
y = data.iloc[:, -1].values.flatten()

cluster = 7 # 定义簇的个数
iter_ = 500 # 每轮的k-means的迭代次数
epochs = 20 # 模型的迭代轮数
label_ = np.zeros(len(X)) # 定义样本分类集合,代表每个样本所属的簇类别,[0,1,4,2]
best_label = [] # 保存最好的聚类效果
best_score = 99999 # 保存模型最优效果下的DB指数


def kmeans(x):
"""
* 描述: K-Means算法的入口
* 参数: x即样本数据
* 返回值:
"""
# 随机选择k个样本作为初始向量
# test = random.sample(range(0, x.shape[0]), cluster)
# print(test)
x_center = x[random.sample(range(0, x.shape[0]), cluster), :]
# x_center = init_cluster(X, cluster)
# 新的簇的向量中心
new_center = x_center.copy()

k = 0 # 记录迭代轮数

# 进行迭代更新,只要上一次和本次的簇中心不一致就继续迭代
while True:
if ((x_center == new_center).sum() == x_center.shape[0] * x_center.shape[1] and k != 0) or k > iter_:
break
x_center = new_center
k += 1
# 计算样本到簇中心的距离
matrix = distance(x_center, x)
# 更新每个样本的所属簇分类
update_cluster(matrix)
# 根据新的样本划分情况,更新每个簇的中心
new_center = update_center(x)

# print("聚类结束,共迭代{}轮".format(k))


def run(X):
"""
* 描述: k-means算法的主调函数
* 参数: X为样本数据
* 返回值:
"""
# 迭代多个epoch,根据DBI指数选取最好的结果
for epoch in range(epochs):
kmeans(X) # 调用kmeans
# 计算DBI指数
DBI = optimizer_score(cluster, X)
global best_score, best_label
# 如果当前轮数的效果好于上一次的,记录最优的DBI指数,以及最好的聚类结果
if DBI < best_score:
best_score = DBI
best_label = label_


def init_cluster(X, cluster):
x_center = np.zeros((cluster, X.shape[1]))
i = 0
res = set()
while True:
if i == 0:
index = random.sample(range(0, X.shape[0]), 1)
res.add(index[0])
x_center[i, :] = X[index]
i += 1
else:
distance_ = np.zeros((X.shape[0], i))
for j in range(i):
row = x_center[j, :]
distance_[:, j] = np.sqrt(((X - row) ** 2).sum(axis=1))

distance_ = (distance_ - distance_.mean(axis=0)) / distance_.std(axis=0)
rank = np.argsort(distance_, axis=0)
rank = (rank - rank.mean(axis=0)) / rank.std(axis=0)
loc = np.sum(rank, axis=1)
# distance_ = np.sum(distance_, axis=1)
# location = np.where(distance_ == distance_.max())
index = np.argmax(loc)
print(res)
if index not in res:
x_center[i, :] = X[index]
res.add(index)
i += 1
else:
while index < X.shape[0] - 2:
index = index + 1
if index not in res:
x_center[i, :] = X[index]
res.add(index)
i += 1
break

if i == cluster:
break
return x_center


def optimizer_score(cluster, X):
"""
* 描述: 计算DBI优化指标
* 参数:
cluster:聚类簇数
X:样本数据
* 返回值: 模型的DBI指数
"""

# 用于计算簇内平均值
def avg_C(label):
distance_ = np.zeros((X[label_ == label, :].shape[0], X[label_ == label, :].shape[0]))
for i, row in enumerate(X[label_ == label, :]):
distance_[i, :] = np.sqrt(np.sum((X[label_ == label, :] - row) ** 2, axis=1))
distance_ = distance_.sum() / (X.shape[0] * (X.shape[0] - 1))
return distance_

# 用于计算簇间中心的距离
def d_cen(cluster_i, cluster_j):
X_i = X[label_ == cluster_i, :]
X_j = X[label_ == cluster_j, :]

X_i_mean = X_i.mean(axis=0)
x_j_mean = X_j.mean(axis=0)

return np.sqrt(((X_i_mean - x_j_mean) ** 2).sum())

DBI = 0
# 迭代每个簇,计算每个簇的DB指数,然后求平均值
for i in range(cluster):
dis_list = []
for j in range(cluster):
if i != j:
dis_list.append((avg_C(i) + avg_C(j)) / d_cen(i, j))
DBI += max(dis_list)

return DBI / cluster


def distance(center, x):
"""
* 描述: 计算每个样本到簇中心的距离
* 参数:
center:簇中心,形状为(10,5)10个簇,5个特征属性
x:数据样本,形状(100,5),100个样本,5个特征
* 返回值: 返回每个样本到各个簇中心的距离,形状为(100,10),每一行代表每个样本到各个簇的距离
"""
matrix = np.random.rand(x.shape[0], center.shape[0])
# 迭代每一行,进行计算到各个簇中心的距离
for i, row in enumerate(x):
matrix[i, :] = np.sum((row - center) ** 2, axis=1) # 平方差,然后按照每一行进行求和
return matrix


def update_cluster(matrix):
"""
* 描述: 更新每个样本所属的簇分类
* 参数: matrix就是样本到簇中心的距离矩阵
* 返回值:
"""
global label_
# 返回每行的最小值的下标,即返回距离最近的簇中心所属的类别
label_ = np.argmin(matrix, axis=1)


def update_center(x):
"""
* 描述: 更新每个簇的中心
* 参数: x,样本数据
* 返回值:
"""
new_center = np.zeros((cluster, x.shape[1]))
# 依次迭代每个类别下的数据,计算该类数据的均值
for category in range(cluster):
new_center[category, :] = x[label_ == category, :].mean(axis=0)
return new_center


if __name__ == "__main__":
run(X) # 调用算法
draw(X, y, best_label, cluster) # 将聚类结果可视化

举报

相关推荐

0 条评论