简 介:下面是我在学习时候的记录并加上自己的理解。本文意在记录自己近期学习过程中的所学所得,如有错误,欢迎大家指正。
关键词:Python、机器学习、DBSCAN
自己实现DBSCAN算法,需要对两个参数ξ和Minpt的选取选取进行说明,语言不限。要能支持多维数组,距离用欧式距离。
"""
* Created with PyCharm
* 作者: 阿光
* 日期: 2021/7/18
* 时间: 21:53
* 描述: 使用numpy实现DBSCAN
"""
import warnings
import numpy as np
import pandas as pd
from Draw import draw
warnings.filterwarnings('ignore')
epsilon = 4 # 定义邻域半径
M = 5 # 定义阈值
k = 0 # 类别标签
# 加载数据
data = pd.read_csv("../../data/cluster_500-10_7.csv", encoding="gbk")
X = data.iloc[:, 1:-2].values
y = data.iloc[:, -1].values.flatten()
label_ = np.zeros(len(X))
def dbscan(X):
"""
* 描述: DBSCAN算法的主入口
* 参数:
X:样本数据
* 返回值:
"""
# 获取核心对象
coreObjectList = get_coreobject(X)
# 开始迭代每个核心对象,进行划圈
iter_coreobject(coreObjectList, X)
def iter_coreobject(coreObjectList, X):
"""
* 描述: 迭代每个核心对象,然后将其可达的对象划为一簇
* 参数:
coreObjectList:核心列表对象
X:样本数据
* 返回值:
"""
def get_epsilonobject(row):
"""
* 描述: 获取某个样本邻域内的样本
* 参数:
row:核心对象
* 返回值: 邻域内的样本的索引
"""
return np.where(np.sqrt((((X - row) ** 2).sum(axis=1))) <= epsilon)[0]
def distance_epsilon(coreobject, X):
"""
* 描述: 判断某一核心对象邻域内是否有核心对象,如果有重复该步骤,如果不是将其划为一簇
* 参数:
coreobject:核心对象
X:样本数据
* 返回值:
"""
# 保存某一簇内的样本
queue = [coreobject]
# 判断队列中是否还存在数据
while len(queue) != 0:
object = X[queue.pop(0), :]
# 判断其是否是核心对象,如果是将它的邻域内的样本也加入队列
if judge_coreobject(X, object):
nonlocal noAccess # 获取上一层作用域的变量
# 获取未访问和该核心对象邻域内样本的交集
delta = set(noAccess).intersection(set(get_epsilonobject(object)))
# 将邻域内的样本添加到队列
queue.extend(delta)
# 将已经访问的数据从noAccess中去掉
noAccess = list(set(noAccess).difference(set(delta)))
# 记录未被访问过的数据
noAccess = list(range(X.shape[0]))
# 判断是否还有核心对象可以进行迭代
while len(coreObjectList) != 0:
# 为了获取本次迭代的数据
AccessOld = noAccess.copy()
# 取出一个核心对象
coreobject = coreObjectList.pop(0)
# 扩充领域,添加邻域内的样本
distance_epsilon(coreobject, X)
# 取得本轮划分簇的数据,就是本次迭代到的数据
cluster_k = list(set(AccessOld).difference(set(noAccess)))
global k
# 将本次迭代到的数据划分成一簇
label_[[cluster_k]] = k
# 将所有已经访问过的数据从noAccess中移除
coreObjectList = list(set(coreObjectList).difference(set(cluster_k)))
# 将簇+1
k += 1
def judge_coreobject(X, row):
"""
* 描述: 判断某一样本是否为核心对象
* 参数:
X:样本数据
row:待判断的样本数据
* 返回值: 返回true和false
"""
# 判断在邻域距离内的样本数是否达到阈值
if (np.sqrt(((X - row) ** 2).sum(axis=1)) <= epsilon).sum() - 1 >= M:
return True
return False
def get_coreobject(x):
"""
* 描述: 迭代数据集,获取核心对象列表
* 参数:
x:样本数据
* 返回值: 核心对象列表
"""
# 初始化列表
coreObjectList = []
# 判断是否是核心对象
for i, row in enumerate(x):
if judge_coreobject(X, row):
coreObjectList.append(i)
return coreObjectList
if __name__ == "__main__":
dbscan(X)
draw(X, y, label_, 7)