小言_互联网的博客

【深度学习】SSD-tensorflow代码剖析

333人阅读  评论(0)
  • SSD的整体结构流程图:

  • 具体代码部分:

  1. ssd_net 网络结构:
# SSD net definition
def ssd_net(inputs,
            num_classes=SSDNet.default_params.num_classes,
            feat_layers=SSDNet.default_params.feat_layers,
            anchor_sizes=SSDNet.default_params.anchor_sizes,
            anchor_ratios=SSDNet.default_params.anchor_ratios,
            normalizations=SSDNet.default_params.normalizations,
            is_training=True,
            dropout_keep_prob=0.5,
            prediction_fn=slim.softmax,
            reuse=None,
            scope='ssd_300_vgg'):
    end_points={}
    with tf.variable_scope(scope,'ssd_300_vgg',[inputs],reuse=reuse):
        # original vgg-16 block,if you want change base-line network,you can change in here
        net = slim.repeat(inputs,2,slim.conv2d,64,[3,3],scope='conv1')
        end_points['block1'] = net
        net = slim.max_pool2d(net, [2, 2], scope='pool1')
        # block2
        net = slim.repeat(net,2,slim.conv2d,128,[3,3],scope='conv2')
        end_points['block2'] = net
        net = slim.max_pool2d(net,[2,2],scope='pool2')
        # block3
        net = slim.repeat(net,3,slim.conv2d,256,[3,3],scope='conv3')
        end_points['block3']=net
        net = slim.max_pool2d(net,[2,2],scope='pool3')
        # block4
        net = slim.repeat(net,3,slim.conv2d,512,[3,3],scope='conv4')
        end_points['block4'] = net
        net = slim.max_pool2d(net,[2,2],scope='pool4')
        # block5
        net = slim.repeat(net,3,slim.conv2d,512,[3,3],scope='conv5')
        end_points['block5'] = net
        net = slim.max_pool2d(net,[3,3],stride=1,scope='pool5')

        # additional ssd blocks
        # block6
        net = slim.conv2d(net,1024,[3,3],rate=6,scope='conv6')
        end_points['block6'] = net
        net = tf.layers.dropout(net,rate=dropout_keep_prob,training=is_training)
        # block 7:1x1 conv
        net = slim.conv2d(net,1024,[1,1],scope='conv7')
        end_points['block7'] = net
        net = tf.layers.dropout(net,rate=dropout_keep_prob,training=is_training)
        # block 8,9,10,11
        end_point = 'block8'
        with tf.variable_scope(end_point):
            net = slim.conv2d(net,256,[1,1],scope='conv1x1')
            net = custom_layers.pad2d(net,pad=(1,1))
            net = slim.conv2d(net,512,[3,3],stride=2,scope='conv3x3',padding='VALID')
        end_points[end_point] = net

        end_point = 'block9'
        with tf.variable_scope(end_point):
            net = slim.conv2d(net,128,[1,1],scope='conv1x1')
            net = custom_layers.pad2d(net,pad=(1,1))
            net = slim.conv2d(net,256,[3,3],stride = 2,scope='conv3x3',padding='VALID')
        end_points[end_point] = net

        end_point = 'block10'
        with tf.variable_scope(end_point):
            net = slim.conv2d(net,128,[1,1],scope='conv1x1')
            net = slim.conv2d(net,256,[3,3],scope='con3x3',padding='VALID')
        end_points[end_point] = net

        end_point = 'block11'
        with tf.variable_scope(end_point):
            net = slim.conv2d(net,128,[1,1],scope='conv1x1')
            net = slim.conv2d(net,256,[3,3],scope='conv3x3',padding='VALID')
        end_points[end_point] = net

        #prediction and localisations layers
        predictions =[]
        logits = []
        localisations = []
        for i,layer in enumerate(feat_layers):
            # 做多尺度大小box预测的特征层,返回每个cell中每个先验框的类别p和预测的位置l
            p,l = ssd_multibox_layer(end_points[layer],
                                     num_classes,
                                     anchor_sizes[i],
                                     anchor_ratios[i],
                                     normalizations[i])
            predictions.append(prediction_fn(p))
            logits.append(p)
            localisations.append(l)

        return predictions,localisations,logits,end_points
  1. 先验框生成:
def ssd_anchors_all_layers(img_shape,
                           layers_shape,
                           anchor_sizes,
                           anchor_ratios,
                           anchor_steps,
                           offset=0.5,
                           dtype=np.float32):
    """
    计算所有特征图的先验框
    :param img_shape: 输入图像shape
    :param layers_shape: 特征图shape
    :param anchor_sizes: 先验框大小
    :param anchor_ratios:
    :param anchor_steps: 先验框所在特征图相对于输入图像的比例
    :param offset:
    :param dtype:
    :return:
    """
    layers_anchors =[]
    for i,s in enumerate(layers_shape):
        anchor_bboxes = ssd_anchor_one_layer(img_shape,s,
                                             anchor_sizes[i],
                                             anchor_ratios[i],
                                             anchor_steps[i],
                                             offset=offset,
                                             dtype = dtype)
        layers_anchors.append(anchor_bboxes)
    return layers_anchors

def ssd_anchor_one_layer(img_shape,
                         feat_shape,
                         sizes,
                         ratios,
                         step,
                         offset=0.5,
                         dtype=np.float32):
    """
    compute SSD default anchor boxes for one feature layer.
    determine the relative position grid of the centers, and the relative width and height
    :param img_shape:
    :param feat_shape:
    :param sizes:
    :param ratios:
    :param step:
    :param offset: grid offser
    :param dtype:
    :return: y,x,h,w:relative x and y grids,and height and width
    """
    y,x = np.mgrid[0:feat_shape[0],0:feat_shape[1]]
    y = (y.astype(dtype) + offset) * step /img_shape[0]
    x = (x.astype(dtype)+offset) * step / img_shape[1]

    # expanded dims to support easy broadcasting
    x = np.expand_dims(x,axis=-1)
    y = np.expand_dims(y, axis=-1)

    # try to compute the relative height and width
    num_anchors = len(sizes) + len(ratios)
    h = np.zeros((num_anchors,), dtype=dtype)
    w = np.zeros((num_anchors,),dtype=dtype)
    h[0] = sizes[0] / img_shape[0]
    w[0] = sizes[0] / img_shape[1]
    di = 1
    if len(sizes) > 1:
        h[1] = math.sqrt(sizes[0] * sizes[1]) / img_shape[0]
        w[1] = math.sqrt(sizes[0] * sizes[1]) / img_shape[1]
        di +=1
    for i,r in enumerate(ratios):
        h[i+di] = sizes[0] / img_shape[0] / math.sqrt(r)
        w[i+di] = sizes[0] / img_shape[1] * math.sqrt(r)
    return y,x,h,w
  1. 默认框(先验框)匹配策略,寻找与默认框IOU最大的GTbox:
    def bbbox_encode(self, labels, bboxes, anchors, scope=None):
        """
        该部分主要是默认框的匹配策略(
        和原论文中的Matching strategy有些不同,该部分仅仅是寻找与默认框IOU最大的GTbox,
        并没有通过阈值0.5去筛选正样本),
        将每个默认框与ground truth box进行匹配,
        寻找与之IOU(交并比)最大的ground truth box,
        并计算每个默认框与之匹配的ground truth box的偏差(
        矩形框中心坐标x、y方向偏移量,以及高h宽w的缩放比例)
        :param labels: 是GT box对应的标签
        :param bboxes:bboxes是GT box对应的坐标信息
        :param anchors: 生成的默认框
        :param scope:
        :return:
        """
        return ssd_common.tf_ssd_bboxes_encode(labels,bboxes,anchors,
                                               self.params.num_classes,
                                               self.params.no_annotation_label,
                                               ignore_threshold=0.5,
                                               prior_scaling=self.params.prior_scaling,
                                               scope=scope)

def tf_ssd_bboxes_encode(labels,
                         bboxes,
                         anchors,
                         num_classes,
                         no_annotation_label,
                         ignore_threshold=0.5,
                         prior_scaling=[0.1,0.1,0.2,0.2],
                         dtype=tf.float32,
                         scope='ssd_bboxes_encode'):
    """
    encode groundtruth labels and bounding boxes using SSD net anchors.
    encode boxes for all feature layers.
    :param labels: 1D tensor(int 64) containing groundtruth labels 真实标签
    :param bboxes: Nx4 tensor (float) with bboxes relative coordinate 真实bbox
    :param anchors: list of Numpy array with layer anchors 存放每一个预测层生成的默认框
    :param num_classes:
    :param no_annotation_label:
    :param ignore_threshold: threshold for positive match with groundtruce bboxes
    :param prior_scaling: scaling of encoded coordinates
    :param dtype:
    :param scope:
    :return:
    (target_labels, target_localizations, target_scores):
    each element is a list of target tensors
    """
    with tf.name_scope(scope):
        target_labels = []  # 存放匹配到的GTbox的label的 容器
        target_localizations =[]  # 存放匹配到的GTbox的位置信息的容器
        target_scores =[]  # 存放默认框与匹配到的GTbox的IOU(交并比)
        for i, anchors_layer in enumerate(anchors):  # 遍历每个预测层的默认框
            with tf.name_scope('bboxes_encode_block_%i' % i):
                t_labels, t_loc, t_scores = tf_ssd_bboxes_encode_layer(labels,bboxes, #匹配默认框的ground truth box并计算偏差
                                                                       anchors_layer,
                                                                       num_classes,
                                                                       no_annotation_label,
                                                                       ignore_threshold,
                                                                       prior_scaling,dtype)
                target_labels.append(t_labels)  # 匹配到的ground truth box对应标签
                target_localizations.append(t_loc)  # 默认框与匹配到的ground truth box的坐标差异
                target_scores.append(t_scores)  # 默认框与匹配到的ground truth box的IOU(交并比)
        return target_labels, target_localizations,target_scores

def tf_ssd_bboxes_encode_layer(labels, # GTbox类别
                               bboxes, # GTbox位置信息
                               anchors_layer, # 默认框坐标信息(中心点坐标以及宽/高
                               num_classes,
                               no_annotation_label,
                               ignore_threshold=0.5,
                               prior_scaling=[0.1,0.1,0.2,0.2],
                               dtype=tf.float32):
    """
    在该函数中仅仅只是寻找与每个默认框最匹配的GTbox,
    并没有进行筛选正负样本,关于正负样本的选取会在下一部分losses计算中讲述
    encode groundtruth labels and bounding boxes using SSD anchors from one layer
    :param labels: 1D Tensor(int 64) containing groundtruth labels
    :param bboxes: Nx4 tensor (float) with bboxes relative coordinate
    :param anchor_layer: numpy array with layer anchors
    :param num_classes:
    :param no_annotation_label:
    :param ignore_threshold: threshold for positive match with groundtruth bboxes
    :param prior_scaling: scaling of encoded coordinate
    :param dtype:
    :return:
    (target_labels,target_localizations,target_scores): target tensors
    """
    # anchors coordinate and volume 先验框的坐标和大小
    # 1.得到每个anchor的(左上,右下)坐标,
    # 因为 groundtruth_bboxes 的坐标表示为(y1,x1,y2,x2),
    # 所以这一步将 anchor 的坐标也转换成这种形式。
    # 转换到默认框的左上角以及右下角坐标

    yref, xref, href, wref = anchors_layer
    ymin = yref - href / 2.
    xmin = xref - wref / 2.
    ymax = yref + href / 2.
    xmax = xref + wref / 2.
    vol_anchors = (xmax - xmin) * (ymax - ymin)  # 默认框的面积

    # initialize tensors 初始化各种参数
    # shape  = (feat_size,feat_size,num_anchors)
    shape = (yref.shape[0], yref.shape[1], href.size)
    feat_labels = tf.zeros(shape, dtype=tf.int64)  # 存放默认框匹配的GTbox标签
    feat_scores = tf.zeros(shape, dtype=dtype)  # 存放默认框与匹配的GTbox的IOU(交互比)

    feat_ymin = tf.zeros(shape, dtype=dtype)  # 存放默认框匹配到的GTbox的坐标信息
    feat_xmin = tf.zeros(shape, dtype=dtype)
    feat_ymax = tf.ones(shape, dtype=dtype)
    feat_xmax = tf.ones(shape, dtype=dtype)

    def jaccard_with_anchors(bbox):  # 计算重叠度函数
        """
        计算所有 anchors 和某一个 groundtruth_bbox 的IOU
        compute jaccard score between a box and the anchors
        :param bbox:
        :return:
        """
        int_ymin = tf.maximum(ymin, bbox[0])
        int_xmin = tf.maximum(xmin, bbox[1])
        int_ymax = tf.minimum(ymax, bbox[2])
        int_xmax = tf.minimum(xmax, bbox[3])
        h = tf.maximum(int_ymax - int_ymin, 0.)
        w = tf.maximum(int_xmax - int_xmin, 0.)
        # volumes
        inter_vol = h * w
        union_vol = vol_anchors - inter_vol + (bbox[2] - bbox[0]) * (bbox[3] - bbox[1])
        jaccard = tf.div(inter_vol, union_vol)
        return jaccard
    def intersection_with_anchors(bbox):
        """
        compute intersecion between score a box and the anchors
        :param bbox:
        :return:
        """
        int_ymin = tf.maximum(ymin, bbox[0])
        int_xmin = tf.maximum(xmin, bbox[1])
        int_ymax = tf.minimum(ymax, bbox[2])
        int_xmax = tf.minimum(xmax, bbox[3])
        h = tf.maximum(int_ymax - int_ymin, 0.)
        w = tf.maximum(int_xmax - int_xmin, 0.)
        inter_vol = h * w
        scores = tf.div(inter_vol, vol_anchors)
        return scores

    def condition(i, feat_labels, feat_scores, feat_ymin, feat_xmin, feat_ymax, feat_xmax):
        """
        循环条件
        condition: check label index
        :return:
        """
        r = tf.less(i, tf.shape(labels))  # tf.shape(labels)GTbox的个数,当i<=tf.shape(labels)时返回true
        return r[0]

    def body(i,feat_labels,feat_scores,feat_ymin,feat_xmin,feat_ymax,feat_xmax):
        """
        执行循环主体
        Body: update feature labels, scores and bboxes
        follow the original SSD paper for that purpose:
        - assign values when jaccard > 0.5
        - only update if beat the score of other bboxes
        寻找该层所有默认框匹配满足条件的GTbox
        """
        # jaccard score
        label = labels[i]
        bbox = bboxes[i]
        jaccard = jaccard_with_anchors(bbox)  # 计算该层所有的默认框与该真实框的交并比
        # mask : check threshold + scores + no annotations +num_classes
        mask = tf.greater(jaccard, feat_scores)  # 交并比是否比之前匹配的GTbox大
        mask = tf.logical_and(mask, feat_scores > -0.5)  # 暂时不清楚意义,但这里并不是为了获取正样本所以并不是大于0.5
        mask = tf.logical_and(mask, label < num_classes)  # 判断真实标签label小于num-classes,防止出错
        imask = tf.cast(mask, tf.int64)  # 转型
        fmask = tf.cast(mask, dtype)  # dtype float32
        # update values using mask 根据mask更新标签和交并比
        feat_labels = imask * label + (1 - imask) * feat_labels  # 当imask为1时更新标签
        feat_scores = tf.where(mask, jaccard, feat_scores)  # 当mask为true时更新为jaccard,否则为feat-score

        feat_ymin = fmask * bbox[0] + (1 - fmask) * feat_ymin  # 当fmask为1.0时更新坐标信息
        feat_xmin = fmask * bbox[1] + (1 - fmask) * feat_xmin
        feat_ymax = fmask * bbox[2] + (1 - fmask) * feat_ymax
        feat_xmax = fmask * bbox[3] + (1 - fmask) * feat_xmax

        return [i+1, feat_labels, feat_scores,
                feat_ymin, feat_xmin, feat_ymax, feat_xmax]

    # main loop definition
    i = 0
    [i, feat_labels,feat_scores,
     feat_ymin,feat_xmin,feat_ymax,feat_xmax] = tf.while_loop(condition,body, # condition是循环条件,body是循环体,第三项是参数
                                                              [i,feat_labels,feat_scores,
                                                               feat_ymin,feat_xmin,
                                                               feat_ymax,feat_xmax])

    # transfer to center / size 转换回中心坐标以及宽高
    feat_cy = (feat_ymax + feat_ymin) / 2.
    feat_cx = (feat_xmax + feat_xmin) / 2.
    feat_h = feat_ymax - feat_ymin
    feat_w = feat_xmax - feat_xmin
    # encode features  默认框中心与匹配的真实框中心坐标偏差,高和宽的偏差
    feat_cy = (feat_cy - yref) / href / prior_scaling[0]
    feat_cx = (feat_cx - xref) / wref / prior_scaling[1]
    feat_h = tf.log(feat_h / href) / prior_scaling[2]
    feat_w = tf.log(feat_w / wref) / prior_scaling[3]
    # use SSD ordering: x / y/w/h instead of ours
    feat_localizations = tf.stack([feat_cx,feat_cy,feat_w,feat_h],axis=-1)
    return feat_labels,feat_localizations,feat_scores
  1. 预测框:
    def detected_bboxes(self, predictions, localizations,
                        select_threshold=None, nms_threshold=0.5,
                        clipping_bbox=None, top_k=400, keep_top_k=200):
        """
        通过SSD network输出得到预测框
        :param predictions: 直接通过卷积得到的预测框种类
        :param localizations: 直接通过卷积得到的预测框位置
        :param select:
        :return:
        """
        # select top_k bboxes from preditions, and clip
        # 得到对应某个类别的得分值以及bbox
        rscores, rbboxes = ssd_common.tf_ssd_bboxes_select(predictions, localizations,
                                                           select_threshold=select_threshold,
                                                           num_classes=self.params.num_classes)
        # 按照得分高低,筛选出400个bbox和对应得分
        rscores, rbboxes = tfe.bboxes_sort(rscores, rbboxes, top_k=top_k)
        # apply NMS algorithm 应用非极大值抑制,筛选掉与得分最高bbox重叠率大于0.5的,保留200个
        rscores, rbboxes = tfe.bboxes_nms_batch(rscores, rbboxes,
                                                nms_threshold=nms_threshold,
                                                keep_top_k=keep_top_k)
        if clipping_bbox is not None:
            rbboxes = tfe.bboxes_clip(clipping_bbox, rbboxes)
        return rscores, rbboxes

  1. 损失函数
def ssd_losses(logits, localisations, gclasses, glocalisations, gscores,
               match_threshold=0.5,
               negative_ratio=3.,
               alpha=1.,
               label_smoothing=0,
               device='/cpu:0',
               scope=None):
    """

    :param logits: 预测类别
    :param localisations: 预测偏移位置
    :param gclasses: 正确类别
    :param glocalisations: 实际偏移位置
    :param gscores: 与GT的交并比
    :param match_threshold:
    :param negative_ratio:
    :param alpha:
    :param label_smoothing:
    :param device:
    :param scope:
    :return:
    """
    with tf.name_scope(scope,'ssd_losses'):
        lshape = tfe.get_shape(logits[0],5)
        num_classes = lshape[-1]
        batch_size = lshape[0]

        # flattern out all vectors 展平所有的向量
        flogits = []
        fgclasses = []
        fgscores = []
        flocalisations = []
        fglocalisations = []
        for i in range(len(logits)):
            flogits.append(tf.reshape(logits[i], [-1, num_classes]))
            fgclasses.append(tf.reshape(gclasses[i], [-1]))
            fgscores.append(tf.reshape(gscores[i], [-1]))
            flocalisations.append(tf.reshape(localisations[1], [-1, 4]))
            fglocalisations.append(tf.reshape(glocalisations[i], [-1, 4]))
        # add concat the crap
        logits = tf.concat(flogits, axis=0)
        gclasses = tf.concat(fgclasses, axis=0)
        gscores = tf.concat(fgscores, axis=0)
        localisations = tf.concat(flocalisations, axis=0)
        glocalisations = tf.concat(fglocalisations, axis=0)
        dtype = logits.dtype
        # compute positive matching mask.. 计算正样本数目
        pmask = gscores > match_threshold  # 交并比是否大于0.5
        fpmask = tf.cast(pmask, dtype)
        n_positives = tf.reduce_sum(fpmask)  # 正样本数目

        # hard negative mining
        no_classes = tf.cast(pmask, tf.int32)
        predictions = slim.softmax(logits)
        nmask = tf.logical_and(tf.logical_not(pmask), gscores > -0.5)  # 交并比小于0.5并大于-0.5的负样本
        fnmask = tf.cast(nmask, dtype)  # 转成float型
        nvalues = tf.where(nmask, predictions[:, 0], 1. - fnmask)  # True时为背景概率, False时为1.0, 0是背景
        nvalues_flat = tf.reshape(nvalues, [-1])
        # number of negative entries to select
        max_neg_entries = tf.cast(tf.reduce_sum(fnmask), tf.int32)  # 所有供选择的负样本数目
        n_neg = tf.cast(negative_ratio * n_positives, tf.int32) + batch_size
        n_neg = tf.minimum(n_neg, max_neg_entries)  # 负样本的个数

        val, idxes = tf.nn.top_k(-nvalues_flat, k=n_neg)  # 按排序获取前k个值,以及对应的id
        max_hard_pred = -val[-1]  # 负样本的背景概率阈值
        # final negative mask
        nmask = tf.logical_and(nmask, nvalues < max_hard_pred)  # 交并比小于0.5并且大于-0.5的负样本,且概率小于max_hard_pred
        fnmask = tf.cast(nmask, dtype)

        # add cross-entropy loss
        with tf.name_scope('cross_entropy_pos'):
            loss = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=logits,
                                                                  labels=gclasses)
            loss = tf.div(tf.reduce_sum(loss * fpmask), batch_size, name='value')  # fpmask是正样本的mask, 正为1, 负为0
            tf.losses.add_loss(loss)

        with tf.name_scope('cross_entropy_neg'):
            loss = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=logits,
                                                                  labels=no_classes)
            loss =tf.div(tf.reduce_sum(loss * fnmask), batch_size, name='value')  # fnmask是负样本的mask,负为1,正为0
            tf.loss.add_loss(loss)

        # add localization loss: smooth L1, L1, ...
        with tf.name_scope('localization'):
            # weights tensor: positive mask + random negative
            weights = tf.expand_dims(alpha * fpmask, axis=-1)
            loss = custom_layers.abs_smooth(localisations - glocalisations)
            loss = tf.div(tf.reduce_sum(loss * weights), batch_size, name='value')
            tf.losses.add_loss(loss)

参考大神代码来理解:大神666~


转载:https://blog.csdn.net/qq_43348528/article/details/105681368
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场