飞道的博客

基于自编码器实现无监督异常检测系统

409人阅读  评论(0)

作为自编码器的入门项目,我实现了一个无监督的异常检测系统,传统的异常检测手段有很多,在有监督时可以单纯用多分类问题来判别异常,也可以用高斯聚类来帮助判别异常出现的概率。这里如果要实现无监督学习的异常检测,一种方法是借助数据降维和聚类来帮助我们实现无监督数据聚类,然后在此之上进行异常检测。
这里用MNIST的1-9的数字作为正常样本,0的数字作为异常样本。使用Auto-encoder降维和一些聚类算法。最后评估降维后的正常样本和异常样本在特征空间上的某些区别,找到一个合适的超参数阈值区分正常样本和异常样本。

准备工作

首先导入Pytorch等工具包

import torch
import torchvision
import torch.nn as nn
from torchvision import transforms
from torchvision.utils import save_image
from torch.autograd import Variable
import torch.utils.data as Data
import numpy as np
import warnings
warnings.filterwarnings('ignore')
import tensorflow as tf

在tensorflow里把MNIST直接导入内存并预处理以备使用

(x_train_origin,t_train_origin),(x_test_origin,t_test_origin) = tf.keras.datasets.mnist.load_data()
X_train = x_train_origin/255.
X_test = x_test_origin/255.

X_normal = [X_train[i] for i in range(len(X_train)) if t_train_origin[i]!=0]
y_normal = np.ones(len(X_normal))
X_anomaly = [X_train[i] for i in range(len(X_train)) if t_train_origin[i]==0]

X_normal = torch.tensor(X_normal)
X_anomaly = torch.tensor(X_anomaly)
y_normal = torch.tensor(y_normal)
X_normal = X_normal.float()
X_anomaly = X_anomaly.float()
y_normal = y_normal.float()

train_data = Data.TensorDataset(X_normal,y_normal)

定义一些需要用到的超参数

# 超参数
EPOCH = 10
BATCH_SIZE = 64
LR = 0.005

train_loader = torch.utils.data.DataLoader(dataset=train_data,
                                          batch_size=BATCH_SIZE, 
                                          shuffle=True)

定义Auto-encoder,这个模型实现的是自动进行数据的压缩和重建,本质上是逼近一个数据分布;如果我们用正常样本训练模型,模型就会在异常样本处出现较低的概率。

模型搭建与训练

class AutoEncoder(nn.Module):
    def __init__(self):
        super(AutoEncoder, self).__init__()

        # 压缩
        self.encoder = nn.Sequential(
            nn.Linear(28*28, 500),
            nn.Tanh(),
            nn.Linear(500, 125),
            nn.Tanh(),
            nn.Linear(125, 40),
            nn.Tanh(),
            nn.Linear(40, 5),
        )
        # 解压
        self.decoder = nn.Sequential(
            nn.Linear(5, 40),
            nn.Tanh(),
            nn.Linear(40, 125),
            nn.Tanh(),
            nn.Linear(125, 500),
            nn.Tanh(),
            nn.Linear(500, 28*28),
            nn.Sigmoid(),       # 激励函数让输出值在 (0, 1)
        )

    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return encoded, decoded

autoencoder = AutoEncoder()

训练,模型的输入和输出都是同样的样本

optimizer = torch.optim.Adam(autoencoder.parameters(), lr=LR)
loss_func = nn.MSELoss()

for epoch in range(EPOCH):
    for step, (x,y) in enumerate(train_loader):
        b_x = Variable(x.view(-1, 28*28))   # batch x, shape (batch, 28*28)
        b_y = Variable(x.view(-1, 28*28))   # batch y, shape (batch, 28*28)
 
        encoded, decoded = autoencoder(b_x)
 
        loss = loss_func(decoded, b_y)      # mean square error
        optimizer.zero_grad()               # clear gradients for this training step
        loss.backward()                     # backpropagation, compute gradients
        optimizer.step()                    # apply gradients

数据将在AE的帮助下降低到一个更容易处理的维度。

问题分析

我们先尝试用降维后的数据分布直接训练另一个模型,可以直接训练一个高斯混合模型,但这里我使用的是一个K-means的聚类模型。

origin_data = Variable(train_loader.dataset.tensors[0].view(-1, 28*28).type(torch.FloatTensor))
encoded_data, _ = autoencoder(origin_data)    # 提取压缩的特征值
X = encoded_data.detach().numpy()
from sklearn.cluster import KMeans

clustering_model = KMeans(n_clusters=9, random_state=9)
clustering_model.fit(X)
centers = clustering_model.cluster_centers_

没有比较好的评估方式,我们直接用样本点和距离最近的簇中心的距离作为评判标准,并观察异常样本是不是拥有比较特殊的分布,以至于它离各簇都比较远。

centers = clustering_model.cluster_centers_

#计算样本与簇中心的距离
Dist2cluster = []

for center in centers:
    Dist2cluster.append(np.sum((X-center)**2,axis = 1))
Dist2cluster = np.array(Dist2cluster)
Dist2cluster = np.min(Dist2cluster, axis = 0)

import seaborn as sns
#把异常的数据拿出来看一看簇中心距
origin_data = Variable(X_anomaly.view(-1, 28*28).type(torch.FloatTensor))
encoded_data, recon_data = autoencoder(origin_data)
X_anomaly = encoded_data.detach().numpy()

Dist2cluster_anomaly = []
for center in centers:
    Dist2cluster_anomaly.append(np.sum((X_anomaly-center)**2,axis = 1))
Dist2cluster_anomaly = np.array(Dist2cluster_anomaly)
Dist2cluster_anomaly = np.min(Dist2cluster_anomaly, axis = 0)

plt.figure(figsize = (10,4))
plt.subplot(121)
plt.title('Normal')
sns.kdeplot(Dist2cluster,shade=True)
plt.subplot(122)
plt.title('Anomaly')
sns.kdeplot(Dist2cluster_anomaly,shade=True)


大概是因为簇与簇直接本就存在差异,异常样本和正常样本的差异并不容易从K-means中看出。我们可以借助另一种判别标准,我们认为贴合数据分布的图片通过AE重构后,更不容易失真。因此,我们可以考虑从重构误差下手,计算原图片与重构图片的误差值(MSE)。

def Reconstruct_error(model, data):
    input_data = Variable(data.view(-1, 28*28).type(torch.FloatTensor))
    _, output_data = autoencoder(input_data)
    input_data, output_data = input_data.detach().numpy(), output_data.detach().numpy()
    #二值化
    
    MSE = np.sum((output_data-input_data)**2,axis = 1)
    return MSE

err_normal = Reconstruct_error(autoencoder, train_loader.dataset.tensors[0])
err_anomaly = Reconstruct_error(autoencoder, origin_data)

plt.figure(figsize = (10,4))
plt.subplot(121)
plt.title('Normal')
sns.kdeplot(err_normal,shade=True)
plt.subplot(122)
plt.title('Anomaly')
sns.kdeplot(err_anomaly,shade=True)


看起来好像比较理想,正常样本一般在0-30的区间,而异常样本一般是40或更大。
如果我们希望异常更容易被发现,就设置更高的阈值,如果不希望系统误报,则设置低阈值。下面用测试集测试准确率。

y_test = t_test_origin==0

X_test = torch.tensor(X_test).float()
err = Reconstruct_error(autoencoder, X_test)
yhat = err>35

anom_test = yhat[y_test]
anomaly_accuracy = np.sum(anom_test)/len(anom_test)
nom_test = ~yhat[~(y_test)]
normal_accuracy = np.sum(nom_test)/len(nom_test)
print("Normal accuracy:",normal_accuracy)
print("Anomaly accuracy:",anomaly_accuracy)

Normal accuracy: 0.8682926829268293
Anomaly accuracy: 0.9622448979591837

看起来取得了不错的效果。

总结

无监督的机器学习任务自由度很高,我们可以尝试各种各样的方法去辅助我们开展工作。并没有最好的算法,最贴合任务的算法才是最好的。


转载:https://blog.csdn.net/qq_41858347/article/details/105320827
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场