小言_互联网的博客

对抗生成网络GAN系列——f-AnoGAN原理及缺陷检测实战

637人阅读  评论(0)

🍊作者简介:秃头小苏,致力于用最通俗的语言描述问题

🍊专栏推荐:深度学习网络原理与实战

🍊近期目标:写好专栏的每一篇文章

🍊支持小苏:点赞👍🏼、收藏⭐、留言📩

 

对抗生成网络GAN系列——f-AnoGAN原理及缺陷检测实战

写在前面

  在前面我已经介绍了好几种用于缺陷检测的GAN网络了,感兴趣的可以关注一下我的专栏:深度学习网络原理与实战 。目前专栏主要更新了GAN系列文章和Transformer系列文章,都有理论详解和代码实战,文中的讲解都比较通俗易懂,如果你希望丰富这方面的知识,建议你阅读试试,相信你会有蛮不错的收获。🍸🍸🍸

​  在阅读本篇教程之前,我觉得你有必要读读下面三篇文章:

  [1]是利用GAN网络实现缺陷检测的开山之作,也算是这篇文章的基础,所以这是你必须要读且要理解透彻的。[2]算是[1]比较经典的改进,和本篇文章也有一定的相似之处,理解它会对你看透此篇文章有很大帮助。[3]提出了一种使原始GAN训练更加稳定的方式,本篇文章在训练GA时使用了,建议你也要有所了解。🍹🍹🍹

  如果你准备好了的话,就让我们一起来看看f-AnoGAN吧!!!🚖🚖🚖

 

f-AnoGAN原理详解✨✨✨

​  我们先来看看f-AnoGAN的全称吧——f-AnoGAN: Fast unsupervised anomaly detection with generative adversarial networks。点击☞☞☞下载论文了解详情。📩📩📩

​  如果你对我上文提到的三篇文章都有所了解的话,再来看这篇文章,你会发现它是真滴简单。🌼🌼🌼这就带大家一起来看看f-AnoGAN的网路架构。首先,我们先来看看f-AnoGAN的训练过程,训练主要分两步进行,第一步是训练一个生成对抗网络,第二步利用第一步生成对抗网络的权重,训练一个encoder编码器。我们直接来看下图好了:

  在步骤①中,我们训练的是一个WGAN,关于WGAN的细节可以看[3]这篇博客,在后文的代码实战中我也会谈谈这部分的内容。如果你对WGAN不了解的话,也不用太担心,这里你完全可以训练一个原始GAN,只是效果可能没有WGAN好罢了,但是对于理解f-AnoGAN的步骤是完全没用影响的。当WGAN训练完毕后,生成器G和判别器D的权重就会冻结,步骤②的G和D的权重不会发生变化。在步骤②中,我们目的是训练一个编码器E。论文中给出了三种训练E的结构,分别为ziz结构,izi结构和izif结构,我们一个个来看一下:

  • ziz结构

    ​ 我们直接来看下图吧:

    ​ z表示的是潜在变量,i表示的是图片(image),ziz结构即表示潜在变量z通过固定的生成器生成image,然后再通过编码器编码成潜在变量z。此部分的损失函数用 L z i z L_{ziz} Lziz表示,其表达式为:

    L z i z ( z ) = 1 d ∣ ∣ z − E ( G ( z ) ) ∣ ∣ 2 L_{ziz}(z)=\frac{1}{d}||z-E(G(z))||^2 Lziz(z)=d1zE(G(z))2

    ​ 其中d表示z的维度,其实上述公式就是计算z和 E ( G ( z ) ) E(G(z)) E(G(z))的MSE损失啦。

  • izi结构

    ​ 同样的,我们直接看图:

    ​ izi结构表示real image先经过编码器E将图片映射到潜在空间,然后再通过生成器G生成图片。此部分的损失函数用 L i z i L_{izi} Lizi表示,其表达式为:

    L i z i ( x ) = 1 n ∣ ∣ x − G ( E ( x ) ) ∣ ∣ 2 L_{izi}(x)=\frac{1}{n}||x-G(E(x))||^2 Lizi(x)=n1xG(E(x))2

    ​ 其中,n表示输入image的像素点数量,这个公式同样表示x和 G ( E ( x ) ) G(E(x)) G(E(x))的MSE损失。

 

  • izif结构

    ​ izif结构相比izi结构在后面加了一个判别器D,如下图所示:【论文最后选择了这个结构训练编码器E】

    不知道大家发现没有,这个结构和GANomaly是非常像的。izif的损失函数由两部分构成,一部分为 L i z i L_{izi} Lizi,另一部分为 L D L_D LD。izif损失函数表达式如下:

    L i z i f ( x ) = 1 n ∣ ∣ x − G ( E ( x ) ) ∣ ∣ 2   +   k n d ⋅ ∣ ∣ f ( x ) − f ( G ( E ( x ) ) ) ∣ ∣ 2 L_{izif}(x)=\frac{1}{n}||x-G(E(x))||^2 \ + \ \frac{k}{n_d} \cdot ||f(x)-f(G(E(x)))||^2 Lizif(x)=n1xG(E(x))2 + ndkf(x)f(G(E(x)))2

    其中,k为两个损失函数的权重参数,代码中k=1。f(*)表示判别器中间层的输出, n d n_d nd表示判别器中间输出层的维度。



  f-AnoGAN的训练过程就为大家介绍到这里了,是不是很简单呢。【如果你觉得有难度的话建议你看看我写在前面中提到的三篇博文,或者结合我下文的代码理解理解】训练结束后,我们保存生成器G、判别器D和编码器E的权重,然后将它们用于缺陷检测中。缺陷检测就更加简单啦,异常得分函数就是我们上文所说的izif结构的损失函数,如下图所示:

 
 

f-AnoGAN代码实战✨✨✨

代码目录结构分析

  这部分我在paperswithcode上看到了一个用pytorch实现的f-AnoGAN的代码:f-AnoGAN源码地址。🍵🍵🍵这个代码的逻辑非常清晰,所以我就以这个代码来为大家介绍f-AnoGAN的实现了。🍖🍖🍖

​ 首先我们来看一下整个代码的结构,如下图所示:

​  我们需要注意一下,mnistmvtec_adyour_own_dataset是针对不同数据集进行实验的。考虑到大家对mnist数据集相对熟悉,故本文以mnist数据集为例为大家介绍。【也就是说mvtec_adyour_own_dataset文件夹下的文件都不会使用到,这里大家注意一下就好】

 

数据集加载🧨🧨🧨

  这部分定义在mnist文件夹下的tools.py中,首先我们获取MNIST数据集,通过torchvision下的datasets包直接下载即可,如下:

train = datasets.MNIST(path, train=True, download=download)
test = datasets.MNIST(path, train=False, download=download)

  我们知道,minst数据集train中有60000条数据,test中有10000条数据。这些数据的targets为0-9,首先我们获取train中targets为0的数据,代码如下:

_x_train = train.data[train.targets == training_label]      #传入的training_label为0

​  通过调试可以发现,_x_train的维度为(5923,28,28),即targets=0的数据一共有5923条。

​  接着我们将_x_train按照8:2的比列划分为训练集和测试集的一部分,代码如下:

x_train, x_test_normal = _x_train.split((int(len(_x_train) * split_rate)), dim=0)   #传入的split_rate为0.8

​  运行后x_train有4738条数据,x_test_normal有1185条数据。

  上文说到x_test_normal只是测试集的一部分,完整的测试数据集包括x_test_normal、train数据集中除去targets=0以外的其它数据和test中的所有数据,代码如下:

x_test = torch.cat([x_test_normal,
                        train.data[train.targets != training_label],
                        test.data], dim=0)

  这样最终测试集的数据共有65262条。


  上文我们获得了训练集和测试集的数据,我们还需要获取训练集和测试集的标签,代码如下:

_y_train = train.targets[train.targets == training_label]
y_train, y_test_normal = _y_train.split((int(len(_y_train) * split_rate)), dim=0)
y_test = torch.cat([y_test_normal,
                        train.targets[train.targets != training_label],
                        test.targets], dim=0)                                       

  同样,训练集的标签y_train有4738个,测试集的标签y_test有65262个。


  有了数据后,我们对数据做一些预处理,然后用DataLoader加载数据集,代码如下:

train_mnist = SimpleDataset(x_train, y_train,
                                transform=transforms.Compose(
                                    [transforms.ToPILImage(),
                                     transforms.ToTensor(),
                                     transforms.Normalize([0.5], [0.5])])
                                )
train_dataloader = DataLoader(train_mnist, batch_size=opt.batch_size,
                                  shuffle=True)

 

模型搭建

class Generator(nn.Module):
    def __init__(self, opt):
        super().__init__()
        self.img_shape = (opt.channels, opt.img_size, opt.img_size)

        def block(in_feat, out_feat, normalize=True):
            layers = [nn.Linear(in_feat, out_feat)]
            if normalize:
                layers.append(nn.BatchNorm1d(out_feat, 0.8))
            layers.append(nn.LeakyReLU(0.2, inplace=True))
            return layers

        self.model = nn.Sequential(
            *block(opt.latent_dim, 128, normalize=False),
            *block(128, 256),
            *block(256, 512),
            *block(512, 1024),
            nn.Linear(1024, int(np.prod(self.img_shape))),
            nn.Tanh()
            )

    def forward(self, z):
        img = self.model(z)
        img = img.view(img.shape[0], *self.img_shape)
        return img


class Discriminator(nn.Module):
    def __init__(self, opt):
        super().__init__()
        img_shape = (opt.channels, opt.img_size, opt.img_size)

        self.features = nn.Sequential(
            nn.Linear(int(np.prod(img_shape)), 512),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(512, 256),
            nn.LeakyReLU(0.2, inplace=True)
            )

        self.last_layer = nn.Sequential(
            nn.Linear(256, 1)
            )

    def forward(self, img):
        features = self.forward_features(img)
        validity = self.last_layer(features)
        return validity

    def forward_features(self, img):
        img_flat = img.view(img.shape[0], -1)
        features = self.features(img_flat)
        return features


class Encoder(nn.Module):
    def __init__(self, opt):
        super().__init__()
        img_shape = (opt.channels, opt.img_size, opt.img_size)

        self.model = nn.Sequential(
            nn.Linear(int(np.prod(img_shape)), 512),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(512, 256),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(256, opt.latent_dim),
            nn.Tanh()
        )

    def forward(self, img):
        img_flat = img.view(img.shape[0], -1)
        validity = self.model(img_flat)
        return validity

 

  由于是教学,所以搭建的模型很简单,甚至都没有卷积,都是全连接层,大家肯定一看就能明白。🌸🌸🌸

 

训练WGAN🧨🧨🧨

  阅读这部分之前建议阅读WGAN的相关知识喔,点击☞☞☞了解详情。WGAN属于是开山之作,而f-AnoGAN用的是WGAN-GP,其是一种WGAN的改进。关于WGAN-GP我还没做相关介绍,大家自行补充知识,推荐链接:WGAN-GP。如果觉得学起来有困难的话欢迎评论区留言讨论,要是人多的话后期可能会出一起WGAN-GP的教程。【其实WGAN-GP和WGAN的思想是一样的,只是在于实现lipschitz条件的方式不同】

注:在大家理解WGAN-GP时可能会遇到直线段的另一种定义方式,是凸优化中的相关内容,不清楚的可以参考我此前一篇关于凸优化介绍的文章:凸优化理论基础1–仿射集🍋🍋🍋

​ 我们来看看训练WGAN的代码吧:

def train_wgangp(opt, generator, discriminator,
                 dataloader, device, lambda_gp=10):
    generator.to(device)
    discriminator.to(device)

    optimizer_G = torch.optim.Adam(generator.parameters(),
                                   lr=opt.lr, betas=(opt.b1, opt.b2))
    optimizer_D = torch.optim.Adam(discriminator.parameters(),
                                   lr=opt.lr, betas=(opt.b1, opt.b2))

    os.makedirs("results/images", exist_ok=True)

    padding_epoch = len(str(opt.n_epochs))
    padding_i = len(str(len(dataloader)))

    batches_done = 0
    for epoch in range(opt.n_epochs):
        for i, (imgs, _)in enumerate(dataloader):

            # Configure input
            real_imgs = imgs.to(device)

            # ---------------------
            #  Train Discriminator
            # ---------------------

            optimizer_D.zero_grad()

            # Sample noise as generator input
            z = torch.randn(imgs.shape[0], opt.latent_dim, device=device)

            # Generate a batch of images
            fake_imgs = generator(z)

            # Real images
            real_validity = discriminator(real_imgs)
            # Fake images
            fake_validity = discriminator(fake_imgs.detach())   #使用.detach()方法可以不更新generator的值
            # Gradient penalty
            gradient_penalty = compute_gradient_penalty(discriminator,
                                                        real_imgs.data,
                                                        fake_imgs.data,
                                                        device)
            # Adversarial loss
            d_loss = (-torch.mean(real_validity) + torch.mean(fake_validity)
                      + lambda_gp * gradient_penalty)

            d_loss.backward()
            optimizer_D.step()

            optimizer_G.zero_grad()

            # Train the generator and output log every n_critic steps
            if i % opt.n_critic == 0:

                # -----------------
                #  Train Generator
                # -----------------

                # Generate a batch of images
                fake_imgs = generator(z)
                # Loss measures generator's ability to fool the discriminator
                # Train on fake images
                fake_validity = discriminator(fake_imgs)
                g_loss = -torch.mean(fake_validity)

                g_loss.backward()
                optimizer_G.step()

                print(f"[Epoch {
     epoch:{
     padding_epoch}}/{
     opt.n_epochs}] "
                      f"[Batch {
     i:{
     padding_i}}/{
     len(dataloader)}] "
                      f"[D loss: {
     d_loss.item():3f}] "
                      f"[G loss: {
     g_loss.item():3f}]")

                if batches_done % opt.sample_interval == 0:
                    save_image(fake_imgs.data[:25],
                               f"results/images/{
     batches_done:06}.png",
                               nrow=5, normalize=True)

                batches_done += opt.n_critic

    torch.save(generator.state_dict(), "results/generator")
    torch.save(discriminator.state_dict(), "results/discriminator")


 

  上述代码的核心是compute_gradient_penalty函数,是用来计算梯度惩罚的,这也是WGAN-GP最核心的地方,代码如下:

def compute_gradient_penalty(D, real_samples, fake_samples, device):
    """Calculates the gradient penalty loss for WGAN GP"""
    # Random weight term for interpolation between real and fake samples
    alpha = torch.rand(*real_samples.shape[:2], 1, 1, device=device)
    # Get random interpolation between real and fake samples
    interpolates = (alpha * real_samples + (1 - alpha) * fake_samples)
    # 可以直接对变量进行操作,现在pytorch已经舍弃autograd.Variable
    interpolates.requires_grad_(requires_grad=True)
    # interpolates = autograd.Variable(interpolates, requires_grad=True)
    d_interpolates = D(interpolates)
    fake = torch.ones(*d_interpolates.shape, device=device)
    # Get gradient w.r.t. interpolates
    # https://zhuanlan.zhihu.com/p/83172023
    gradients = autograd.grad(outputs=d_interpolates, inputs=interpolates,
                              grad_outputs=fake, create_graph=True,
                              retain_graph=True, only_inputs=True)[0]
    gradients = gradients.view(gradients.shape[0], -1)
    gradient_penalty = ((gradients.norm(2, dim=1) - 1) ** 2).mean()
    return gradient_penalty

 

注:想要理解这部分代码,需要理解pytorch中的auograd包。推荐大家去阅读此篇博文:Pytorch autograd,backward详解

训练结束后,我们保存了生成器和判别器的权重,同时保存了一些生成图片结果,部分展示如下,是不是效果还不错呢。🍄🍄🍄

【因为我们训练集图片都是0,所有我们生成的图片都是0喔!!!🍀🍀🍀】

 

训练编码器E🧨🧨🧨

​ 话不多说,让我们直接上代码吧!!!🍭🍭🍭

def train_encoder_izif(opt, generator, discriminator, encoder,
                       dataloader, device, kappa=1.0):
    generator.load_state_dict(torch.load("results/generator"))
    discriminator.load_state_dict(torch.load("results/discriminator"))

    generator.to(device).eval()
    discriminator.to(device).eval()
    encoder.to(device)

    criterion = nn.MSELoss()

    optimizer_E = torch.optim.Adam(encoder.parameters(),
                                   lr=opt.lr, betas=(opt.b1, opt.b2))

    os.makedirs("results/images_e", exist_ok=True)

    padding_epoch = len(str(opt.n_epochs))
    padding_i = len(str(len(dataloader)))

    batches_done = 0
    for epoch in range(opt.n_epochs):
        for i, (imgs, _) in enumerate(dataloader):

            # Configure input
            real_imgs = imgs.to(device)

            # ----------------
            #  Train Encoder
            # ----------------

            optimizer_E.zero_grad()

            # Generate a batch of latent variables
            z = encoder(real_imgs)

            # Generate a batch of images
            fake_imgs = generator(z)

            # Real features
            real_features = discriminator.forward_features(real_imgs)
            # Fake features
            fake_features = discriminator.forward_features(fake_imgs)

            # izif architecture
            loss_imgs = criterion(fake_imgs, real_imgs)
            loss_features = criterion(fake_features, real_features)
            e_loss = loss_imgs + kappa * loss_features

            e_loss.backward()
            optimizer_E.step()

            # Output training log every n_critic steps
            if i % opt.n_critic == 0:
                print(f"[Epoch {
     epoch:{
     padding_epoch}}/{
     opt.n_epochs}] "
                      f"[Batch {
     i:{
     padding_i}}/{
     len(dataloader)}] "
                      f"[E loss: {
     e_loss.item():3f}]")

                if batches_done % opt.sample_interval == 0:
                    fake_z = encoder(fake_imgs)
                    reconfiguration_imgs = generator(fake_z)
                    save_image(reconfiguration_imgs.data[:25],
                               f"results/images_e/{
     batches_done:06}.png",
                               nrow=5, normalize=True)

                batches_done += opt.n_critic
    torch.save(encoder.state_dict(), "results/encoder")

 

你会发现这些代码真滴很简单。训练结束后我们会保存编码器E的权重和重构后的一些图片。重构后图片效果也还是蛮好的。

 
 

测试异常得分🧨🧨🧨

我们将检测的异常得分保存在score.csv文件中,保存四项参数,分别为label、img_distance、anomaly_score和z_distance。

def test_anomaly_detection(opt, generator, discriminator, encoder,
                           dataloader, device, kappa=1.0):
    generator.load_state_dict(torch.load("results/generator"))
    discriminator.load_state_dict(torch.load("results/discriminator"))
    encoder.load_state_dict(torch.load("results/encoder"))

    generator.to(device).eval()
    discriminator.to(device).eval()
    encoder.to(device).eval()

    criterion = nn.MSELoss()

    with open("results/score.csv", "w") as f:
        f.write("label,img_distance,anomaly_score,z_distance\n")

    for (img, label) in tqdm(dataloader):

        real_img = img.to(device)

        real_z = encoder(real_img)
        fake_img = generator(real_z)
        fake_z = encoder(fake_img)

        real_feature = discriminator.forward_features(real_img)
        fake_feature = discriminator.forward_features(fake_img)

        # Scores for anomaly detection
        img_distance = criterion(fake_img, real_img)
        loss_feature = criterion(fake_feature, real_feature)
        anomaly_score = img_distance + kappa * loss_feature

        z_distance = criterion(fake_z, real_z)

        with open("results/score.csv", "a") as f:
            f.write(f"{
     label.item()},{
     img_distance},"
                    f"{
     anomaly_score},{
     z_distance}\n")

 

在得到score.csv文件后,我们可以来读取文件内容绘制精度曲线。首先导入一些必要的包:

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.metrics import roc_curve, precision_recall_curve, auc

然后读取刚刚得到的score.csv文件:

df = pd.read_csv("./results/score.csv")

df的内容如下:可以看到一共有65262行数据,这和我们数据读取时测试集数据大小是一致的。🥝🥝🥝

接着我们读取各列的数据,并把标签为0的标签设置为0,其它的设置为1.

trainig_label = 0
labels = np.where(df["label"].values == trainig_label, 0, 1)
anomaly_score = df["anomaly_score"].values
img_distance = df["img_distance"].values
z_distance = df["z_distance"].values

然后可以根据上面的值得到一些画图所需值:

fpr, tpr, _ = roc_curve(labels, img_distance)
precision, recall, _ = precision_recall_curve(labels, img_distance)
roc_auc = auc(fpr, tpr)
pr_auc =  auc(recall, precision)

接下来就可以画图了:

plt.plot(fpr, tpr, label=f"AUC = {
     roc_auc:3f}")
plt.plot([0, 1], [0, 1], linestyle="--")
plt.title("ROC-AUC")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.legend()
plt.show()

plt.plot(recall, precision, label=f"PR = {
     pr_auc:3f}")
plt.title("PR-AUC")
plt.xlabel("Recall")
plt.ylabel("Pecision")
plt.legend()
plt.show()

plt.hist([anomaly_score[labels == 0], anomaly_score[labels == 1]],
          bins=100, density=True, stacked=True,
          label=["Normal", "Abnormal"])
plt.title("Discrete distributions of anomaly scores")
plt.xlabel("Anomaly scores A(x)")
plt.ylabel("h")
plt.legend()
plt.show()


 

保存差异图像

  代码中还定义了保存原图和生成图差异的图像,即将真实图像和生成图像做差,看看它们的差异,代码很简单,我们来看看:

def save_compared_images(opt, generator, encoder, dataloader, device):
    generator.load_state_dict(torch.load("results/generator"))
    encoder.load_state_dict(torch.load("results/encoder"))

    generator.to(device).eval()
    encoder.to(device).eval()

    os.makedirs("results/images_diff", exist_ok=True)

    for i, (img, label) in enumerate(dataloader):
        real_img = img.to(device)

        real_z = encoder(real_img)
        fake_img = generator(real_z)

        compared_images = torch.empty(real_img.shape[0] * 3,
                                      *real_img.shape[1:])
        compared_images[0::3] = real_img
        compared_images[1::3] = fake_img
        compared_images[2::3] = real_img - fake_img

        save_image(compared_images.data,
                   f"results/images_diff/{
     opt.n_grid_lines*(i+1):06}.png",
                   nrow=3, normalize=True)

        if opt.n_iters is not None and opt.n_iters == i:
            break


 

​ 我也抽取一张保存的图像来给大家看看结果:

  通过上图可以发现,无论原始输入即原图是什么,生成图都会将其生成0,原图和生成图做差后得到的图片因此也会产生不同的差异。🍊🍊🍊

 

总结

  f-AnoGAN就为大家介绍到这里了,其实你细细摸索下来会觉得非常简单。代码部分大家要勤动手,多调试,这样你会有不一样的收获。🌾🌾🌾

 
 
如若文章对你有所帮助,那就🛴🛴🛴


转载:https://blog.csdn.net/qq_47233366/article/details/127912282
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场