作为自编码器的入门项目,我实现了一个无监督的异常检测系统,传统的异常检测手段有很多,在有监督时可以单纯用多分类问题来判别异常,也可以用高斯聚类来帮助判别异常出现的概率。这里如果要实现无监督学习的异常检测,一种方法是借助数据降维和聚类来帮助我们实现无监督数据聚类,然后在此之上进行异常检测。
这里用MNIST的1-9的数字作为正常样本,0的数字作为异常样本。使用Auto-encoder降维和一些聚类算法。最后评估降维后的正常样本和异常样本在特征空间上的某些区别,找到一个合适的超参数阈值区分正常样本和异常样本。
准备工作
首先导入Pytorch等工具包
import torch
import torchvision
import torch.nn as nn
from torchvision import transforms
from torchvision.utils import save_image
from torch.autograd import Variable
import torch.utils.data as Data
import numpy as np
import warnings
warnings.filterwarnings('ignore')
import tensorflow as tf
在tensorflow里把MNIST直接导入内存并预处理以备使用
(x_train_origin,t_train_origin),(x_test_origin,t_test_origin) = tf.keras.datasets.mnist.load_data()
X_train = x_train_origin/255.
X_test = x_test_origin/255.
X_normal = [X_train[i] for i in range(len(X_train)) if t_train_origin[i]!=0]
y_normal = np.ones(len(X_normal))
X_anomaly = [X_train[i] for i in range(len(X_train)) if t_train_origin[i]==0]
X_normal = torch.tensor(X_normal)
X_anomaly = torch.tensor(X_anomaly)
y_normal = torch.tensor(y_normal)
X_normal = X_normal.float()
X_anomaly = X_anomaly.float()
y_normal = y_normal.float()
train_data = Data.TensorDataset(X_normal,y_normal)
定义一些需要用到的超参数
# 超参数
EPOCH = 10
BATCH_SIZE = 64
LR = 0.005
train_loader = torch.utils.data.DataLoader(dataset=train_data,
batch_size=BATCH_SIZE,
shuffle=True)
定义Auto-encoder,这个模型实现的是自动进行数据的压缩和重建,本质上是逼近一个数据分布;如果我们用正常样本训练模型,模型就会在异常样本处出现较低的概率。
模型搭建与训练
class AutoEncoder(nn.Module):
def __init__(self):
super(AutoEncoder, self).__init__()
# 压缩
self.encoder = nn.Sequential(
nn.Linear(28*28, 500),
nn.Tanh(),
nn.Linear(500, 125),
nn.Tanh(),
nn.Linear(125, 40),
nn.Tanh(),
nn.Linear(40, 5),
)
# 解压
self.decoder = nn.Sequential(
nn.Linear(5, 40),
nn.Tanh(),
nn.Linear(40, 125),
nn.Tanh(),
nn.Linear(125, 500),
nn.Tanh(),
nn.Linear(500, 28*28),
nn.Sigmoid(), # 激励函数让输出值在 (0, 1)
)
def forward(self, x):
encoded = self.encoder(x)
decoded = self.decoder(encoded)
return encoded, decoded
autoencoder = AutoEncoder()
训练,模型的输入和输出都是同样的样本
optimizer = torch.optim.Adam(autoencoder.parameters(), lr=LR)
loss_func = nn.MSELoss()
for epoch in range(EPOCH):
for step, (x,y) in enumerate(train_loader):
b_x = Variable(x.view(-1, 28*28)) # batch x, shape (batch, 28*28)
b_y = Variable(x.view(-1, 28*28)) # batch y, shape (batch, 28*28)
encoded, decoded = autoencoder(b_x)
loss = loss_func(decoded, b_y) # mean square error
optimizer.zero_grad() # clear gradients for this training step
loss.backward() # backpropagation, compute gradients
optimizer.step() # apply gradients
数据将在AE的帮助下降低到一个更容易处理的维度。
问题分析
我们先尝试用降维后的数据分布直接训练另一个模型,可以直接训练一个高斯混合模型,但这里我使用的是一个K-means的聚类模型。
origin_data = Variable(train_loader.dataset.tensors[0].view(-1, 28*28).type(torch.FloatTensor))
encoded_data, _ = autoencoder(origin_data) # 提取压缩的特征值
X = encoded_data.detach().numpy()
from sklearn.cluster import KMeans
clustering_model = KMeans(n_clusters=9, random_state=9)
clustering_model.fit(X)
centers = clustering_model.cluster_centers_
没有比较好的评估方式,我们直接用样本点和距离最近的簇中心的距离作为评判标准,并观察异常样本是不是拥有比较特殊的分布,以至于它离各簇都比较远。
centers = clustering_model.cluster_centers_
#计算样本与簇中心的距离
Dist2cluster = []
for center in centers:
Dist2cluster.append(np.sum((X-center)**2,axis = 1))
Dist2cluster = np.array(Dist2cluster)
Dist2cluster = np.min(Dist2cluster, axis = 0)
import seaborn as sns
#把异常的数据拿出来看一看簇中心距
origin_data = Variable(X_anomaly.view(-1, 28*28).type(torch.FloatTensor))
encoded_data, recon_data = autoencoder(origin_data)
X_anomaly = encoded_data.detach().numpy()
Dist2cluster_anomaly = []
for center in centers:
Dist2cluster_anomaly.append(np.sum((X_anomaly-center)**2,axis = 1))
Dist2cluster_anomaly = np.array(Dist2cluster_anomaly)
Dist2cluster_anomaly = np.min(Dist2cluster_anomaly, axis = 0)
plt.figure(figsize = (10,4))
plt.subplot(121)
plt.title('Normal')
sns.kdeplot(Dist2cluster,shade=True)
plt.subplot(122)
plt.title('Anomaly')
sns.kdeplot(Dist2cluster_anomaly,shade=True)
大概是因为簇与簇直接本就存在差异,异常样本和正常样本的差异并不容易从K-means中看出。我们可以借助另一种判别标准,我们认为贴合数据分布的图片通过AE重构后,更不容易失真。因此,我们可以考虑从重构误差下手,计算原图片与重构图片的误差值(MSE)。
def Reconstruct_error(model, data):
input_data = Variable(data.view(-1, 28*28).type(torch.FloatTensor))
_, output_data = autoencoder(input_data)
input_data, output_data = input_data.detach().numpy(), output_data.detach().numpy()
#二值化
MSE = np.sum((output_data-input_data)**2,axis = 1)
return MSE
err_normal = Reconstruct_error(autoencoder, train_loader.dataset.tensors[0])
err_anomaly = Reconstruct_error(autoencoder, origin_data)
plt.figure(figsize = (10,4))
plt.subplot(121)
plt.title('Normal')
sns.kdeplot(err_normal,shade=True)
plt.subplot(122)
plt.title('Anomaly')
sns.kdeplot(err_anomaly,shade=True)
看起来好像比较理想,正常样本一般在0-30的区间,而异常样本一般是40或更大。
如果我们希望异常更容易被发现,就设置更高的阈值,如果不希望系统误报,则设置低阈值。下面用测试集测试准确率。
y_test = t_test_origin==0
X_test = torch.tensor(X_test).float()
err = Reconstruct_error(autoencoder, X_test)
yhat = err>35
anom_test = yhat[y_test]
anomaly_accuracy = np.sum(anom_test)/len(anom_test)
nom_test = ~yhat[~(y_test)]
normal_accuracy = np.sum(nom_test)/len(nom_test)
print("Normal accuracy:",normal_accuracy)
print("Anomaly accuracy:",anomaly_accuracy)
Normal accuracy: 0.8682926829268293
Anomaly accuracy: 0.9622448979591837
看起来取得了不错的效果。
总结
无监督的机器学习任务自由度很高,我们可以尝试各种各样的方法去辅助我们开展工作。并没有最好的算法,最贴合任务的算法才是最好的。
转载:https://blog.csdn.net/qq_41858347/article/details/105320827