fasterrcnn_resnet50_fpn - 从torchvision源码理解Faster R-CNN原理

PyTorch的torchvision包中实现了Faster R-CNN。本文结合对torchvision源码的阅读,深入理解Faster R-CNN的内部原理,以便进行开发利用。

fasterrcnn_resnet50_fpn - 从torchvision源码理解Faster R-CNN原理

1 接口层

外部调用

根据PyTorch的torchvision库的文档,Faster R-CNN模型对象可以直接通过fasterrcnn_resnet50_fpn函数来构造。

具体地,官方文档给出了训练时和预测时的调用样例:

torchvision.models.detection.fasterrcnn_resnet50_fpn

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
>>> model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
>>> # For training
>>> images, boxes = torch.rand(4, 3, 600, 1200), torch.rand(4, 11, 4)
>>> labels = torch.randint(1, 91, (4, 11))
>>> images = list(image for image in images)
>>> targets = []
>>> for i in range(len(images)):
>>> d = {}
>>> d['boxes'] = boxes[i]
>>> d['labels'] = labels[i]
>>> targets.append(d)
>>> output = model(images, targets)
>>> # For inference
>>> model.eval()
>>> x = [torch.rand(3, 300, 400), torch.rand(3, 500, 400)]
>>> predictions = model(x)
>>>
>>> # optionally, if you want to export the model to ONNX:
>>> torch.onnx.export(model, x, "faster_rcnn.onnx", opset_version = 11)
  • 其中,不论是训练,还是预测,模型的输入都是list容器,表示的是若干个图片(与目标框和类别)。

fasterrcnn_resnet50_fpn

fasterrcnn_resnet50_fpn函数在torchvision.models.detection.faster_rcnn包中实现,文档见torchvision.models.detection.fasterrcnn_resnet50_fpn

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def fasterrcnn_resnet50_fpn(pretrained=False, progress=True,
num_classes=91, pretrained_backbone=True, trainable_backbone_layers=3, **kwargs):
"""
Constructs a Faster R-CNN model with a ResNet-50-FPN backbone.

The input to the model is expected to be a list of tensors, each of shape ``[C, H, W]``, one for each
image, and should be in ``0-1`` range. Different images can have different sizes.

The behavior of the model changes depending if it is in training or evaluation mode.

During training, the model expects both the input tensors, as well as a targets (list of dictionary),
containing:
- boxes (``FloatTensor[N, 4]``): the ground-truth boxes in ``[x1, y1, x2, y2]`` format, with values of ``x``
between ``0`` and ``W`` and values of ``y`` between ``0`` and ``H``
- labels (``Int64Tensor[N]``): the class label for each ground-truth box

The model returns a ``Dict[Tensor]`` during training, containing the classification and regression
losses for both the RPN and the R-CNN.

During inference, the model requires only the input tensors, and returns the post-processed
predictions as a ``List[Dict[Tensor]]``, one for each input image. The fields of the ``Dict`` are as
follows:
- boxes (``FloatTensor[N, 4]``): the predicted boxes in ``[x1, y1, x2, y2]`` format, with values of ``x``
between ``0`` and ``W`` and values of ``y`` between ``0`` and ``H``
- labels (``Int64Tensor[N]``): the predicted labels for each image
- scores (``Tensor[N]``): the scores or each prediction

Faster R-CNN is exportable to ONNX for a fixed batch size with inputs images of fixed size.

Example::

>>> model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
>>> # For training
>>> images, boxes = torch.rand(4, 3, 600, 1200), torch.rand(4, 11, 4)
>>> labels = torch.randint(1, 91, (4, 11))
>>> images = list(image for image in images)
>>> targets = []
>>> for i in range(len(images)):
>>> d = {}
>>> d['boxes'] = boxes[i]
>>> d['labels'] = labels[i]
>>> targets.append(d)
>>> output = model(images, targets)
>>> # For inference
>>> model.eval()
>>> x = [torch.rand(3, 300, 400), torch.rand(3, 500, 400)]
>>> predictions = model(x)
>>>
>>> # optionally, if you want to export the model to ONNX:
>>> torch.onnx.export(model, x, "faster_rcnn.onnx", opset_version = 11)

Arguments:
pretrained (bool): If True, returns a model pre-trained on COCO train2017
progress (bool): If True, displays a progress bar of the download to stderr
pretrained_backbone (bool): If True, returns a model with backbone pre-trained on Imagenet
num_classes (int): number of output classes of the model (including the background)
trainable_backbone_layers (int): number of trainable (not frozen) resnet layers starting from final block.
Valid values are between 0 and 5, with 5 meaning all backbone layers are trainable.
"""
assert trainable_backbone_layers <= 5 and trainable_backbone_layers >= 0
# dont freeze any layers if pretrained model or backbone is not used
if not (pretrained or pretrained_backbone):
trainable_backbone_layers = 5
if pretrained:
# no need to download the backbone if pretrained is set
pretrained_backbone = False
backbone = resnet_fpn_backbone('resnet50', pretrained_backbone, trainable_layers=trainable_backbone_layers)
model = FasterRCNN(backbone, num_classes, **kwargs)
if pretrained:
state_dict = load_state_dict_from_url(model_urls['fasterrcnn_resnet50_fpn_coco'],
progress=progress)
model.load_state_dict(state_dict)
return model

该函数的实现中,首先进行参数检查:

  1. 检查trainable_backbone_layers参数,必须在0~5之间,表示从最后一层开始计数,有几层在训练中是可优化的;
  2. 检查pretrainedpretrained_backbone参数,如果整个模型都设为预训练的,那就当然没必要再单独下载预训练的backbone了,把整个Faster R-CNN模型都载入预训练参数即可。

Faster R-CNN模型是FasterRCNN类的实例。实例化时,传入指定的backbone作为FasterRCNN的backbone。

resnet_fpn_backbone

backbone通过对外开放的resnet_fpn_backbone函数来构造。

resnet_fpn_backbone函数在torchvision.models.detection.backbone_utils包中实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def resnet_fpn_backbone(
backbone_name,
pretrained,
norm_layer=misc_nn_ops.FrozenBatchNorm2d,
trainable_layers=3,
returned_layers=None,
extra_blocks=None
):
"""
Constructs a specified ResNet backbone with FPN on top. Freezes the specified number of layers in the backbone.

Examples::

>>> from torchvision.models.detection.backbone_utils import resnet_fpn_backbone
>>> backbone = resnet_fpn_backbone('resnet50', pretrained=True, trainable_layers=3)
>>> # get some dummy image
>>> x = torch.rand(1,3,64,64)
>>> # compute the output
>>> output = backbone(x)
>>> print([(k, v.shape) for k, v in output.items()])
>>> # returns
>>> [('0', torch.Size([1, 256, 16, 16])),
>>> ('1', torch.Size([1, 256, 8, 8])),
>>> ('2', torch.Size([1, 256, 4, 4])),
>>> ('3', torch.Size([1, 256, 2, 2])),
>>> ('pool', torch.Size([1, 256, 1, 1]))]

Arguments:
backbone_name (string): resnet architecture. Possible values are 'ResNet', 'resnet18', 'resnet34', 'resnet50',
'resnet101', 'resnet152', 'resnext50_32x4d', 'resnext101_32x8d', 'wide_resnet50_2', 'wide_resnet101_2'
norm_layer (torchvision.ops): it is recommended to use the default value. For details visit:
(https://github.com/facebookresearch/maskrcnn-benchmark/issues/267)
pretrained (bool): If True, returns a model with backbone pre-trained on Imagenet
trainable_layers (int): number of trainable (not frozen) resnet layers starting from final block.
Valid values are between 0 and 5, with 5 meaning all backbone layers are trainable.
"""
backbone = resnet.__dict__[backbone_name](
pretrained=pretrained,
norm_layer=norm_layer)

# select layers that wont be frozen
assert trainable_layers <= 5 and trainable_layers >= 0
layers_to_train = ['layer4', 'layer3', 'layer2', 'layer1', 'conv1'][:trainable_layers]
# freeze layers only if pretrained backbone is used
for name, parameter in backbone.named_parameters():
if all([not name.startswith(layer) for layer in layers_to_train]):
parameter.requires_grad_(False)

if extra_blocks is None:
extra_blocks = LastLevelMaxPool()

if returned_layers is None:
returned_layers = [1, 2, 3, 4]
assert min(returned_layers) > 0 and max(returned_layers) < 5
return_layers = {f'layer{k}': str(v) for v, k in enumerate(returned_layers)}

in_channels_stage2 = backbone.inplanes // 8
in_channels_list = [in_channels_stage2 * 2 ** (i - 1) for i in returned_layers]
out_channels = 256
return BackboneWithFPN(backbone, return_layers, in_channels_list, out_channels, extra_blocks=extra_blocks)

首先,根据传入参数选出对应的resnet模型进行实例化。

随后,检查trainable_layers参数的合法值范围,并通过parameter.requires_grad_(False)来freeze除此以外的其他层。

默认未定义extra_blocks的时候,会在feature map结尾添加一个maxpool2d层,该LastLevelMaxPool类实现并不复杂:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# defined in torchvision.ops.feature_pyramid_network
class LastLevelMaxPool(ExtraFPNBlock):
"""
Applies a max_pool2d on top of the last feature map
"""
def forward(
self,
x: List[Tensor],
y: List[Tensor],
names: List[str],
) -> Tuple[List[Tensor], List[str]]:
names.append("pool")
x.append(F.max_pool2d(x[-1], 1, 2, 0))
return x, names

根据官方文档torch.nn.functional.max_pool2d可进一步查阅torch.nn.MaxPool2d,实际上F.max_pool2d(x[-1], 1, 2, 0)表示:

  1. 输入input为x[-1]
  2. 池化窗口大小kernel_size为1;
  3. 步长stride为2;
  4. 边界填充padding为0。

关于卷积类的操作可以结合可视化理解:

Convolution arithmetic

然后,处理其他传参:

  1. return_layers,这是一个dict,与传入的backbone相配合,key是backbone的module name,value是用户定义的返回名;
  2. in_channels_list,这是一个list,与传入的backbone和return_layers相配合,是backbone返回的每一层feature map的通道数;
  3. out_channels,一个整数,FPN中的通道数。

2 实现层

我们以一个例子贯穿始终:

1
2
3
4
5
6
7
import torch
import torchvision

model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
model.eval()
x = [torch.rand(3, 300, 400), torch.rand(3, 500, 400)] # 模拟输入两张尺寸不同的图片
predictions = model(x)

我们使用预训练模型,并模拟输入两张图片。均为3通道,一张$300 \times 400$的$H \times W$分辨率,一张$500 \times 400$。

FasterRCNN

FasterRCNN类在torchvision.models.detection.faster_rcnn.py中实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
class FasterRCNN(GeneralizedRCNN):
"""
Implements Faster R-CNN.

The input to the model is expected to be a list of tensors, each of shape [C, H, W], one for each
image, and should be in 0-1 range. Different images can have different sizes.

The behavior of the model changes depending if it is in training or evaluation mode.

During training, the model expects both the input tensors, as well as a targets (list of dictionary),
containing:
- boxes (FloatTensor[N, 4]): the ground-truth boxes in [x1, y1, x2, y2] format, with values of x
between 0 and W and values of y between 0 and H
- labels (Int64Tensor[N]): the class label for each ground-truth box

The model returns a Dict[Tensor] during training, containing the classification and regression
losses for both the RPN and the R-CNN.

During inference, the model requires only the input tensors, and returns the post-processed
predictions as a List[Dict[Tensor]], one for each input image. The fields of the Dict are as
follows:
- boxes (FloatTensor[N, 4]): the predicted boxes in [x1, y1, x2, y2] format, with values of x
between 0 and W and values of y between 0 and H
- labels (Int64Tensor[N]): the predicted labels for each image
- scores (Tensor[N]): the scores or each prediction

Arguments:
backbone (nn.Module): the network used to compute the features for the model.
It should contain a out_channels attribute, which indicates the number of output
channels that each feature map has (and it should be the same for all feature maps).
The backbone should return a single Tensor or and OrderedDict[Tensor].
num_classes (int): number of output classes of the model (including the background).
If box_predictor is specified, num_classes should be None.
min_size (int): minimum size of the image to be rescaled before feeding it to the backbone
max_size (int): maximum size of the image to be rescaled before feeding it to the backbone
image_mean (Tuple[float, float, float]): mean values used for input normalization.
They are generally the mean values of the dataset on which the backbone has been trained
on
image_std (Tuple[float, float, float]): std values used for input normalization.
They are generally the std values of the dataset on which the backbone has been trained on
rpn_anchor_generator (AnchorGenerator): module that generates the anchors for a set of feature
maps.
rpn_head (nn.Module): module that computes the objectness and regression deltas from the RPN
rpn_pre_nms_top_n_train (int): number of proposals to keep before applying NMS during training
rpn_pre_nms_top_n_test (int): number of proposals to keep before applying NMS during testing
rpn_post_nms_top_n_train (int): number of proposals to keep after applying NMS during training
rpn_post_nms_top_n_test (int): number of proposals to keep after applying NMS during testing
rpn_nms_thresh (float): NMS threshold used for postprocessing the RPN proposals
rpn_fg_iou_thresh (float): minimum IoU between the anchor and the GT box so that they can be
considered as positive during training of the RPN.
rpn_bg_iou_thresh (float): maximum IoU between the anchor and the GT box so that they can be
considered as negative during training of the RPN.
rpn_batch_size_per_image (int): number of anchors that are sampled during training of the RPN
for computing the loss
rpn_positive_fraction (float): proportion of positive anchors in a mini-batch during training
of the RPN
box_roi_pool (MultiScaleRoIAlign): the module which crops and resizes the feature maps in
the locations indicated by the bounding boxes
box_head (nn.Module): module that takes the cropped feature maps as input
box_predictor (nn.Module): module that takes the output of box_head and returns the
classification logits and box regression deltas.
box_score_thresh (float): during inference, only return proposals with a classification score
greater than box_score_thresh
box_nms_thresh (float): NMS threshold for the prediction head. Used during inference
box_detections_per_img (int): maximum number of detections per image, for all classes.
box_fg_iou_thresh (float): minimum IoU between the proposals and the GT box so that they can be
considered as positive during training of the classification head
box_bg_iou_thresh (float): maximum IoU between the proposals and the GT box so that they can be
considered as negative during training of the classification head
box_batch_size_per_image (int): number of proposals that are sampled during training of the
classification head
box_positive_fraction (float): proportion of positive proposals in a mini-batch during training
of the classification head
bbox_reg_weights (Tuple[float, float, float, float]): weights for the encoding/decoding of the
bounding boxes

Example::

>>> import torch
>>> import torchvision
>>> from torchvision.models.detection import FasterRCNN
>>> from torchvision.models.detection.rpn import AnchorGenerator
>>> # load a pre-trained model for classification and return
>>> # only the features
>>> backbone = torchvision.models.mobilenet_v2(pretrained=True).features
>>> # FasterRCNN needs to know the number of
>>> # output channels in a backbone. For mobilenet_v2, it's 1280
>>> # so we need to add it here
>>> backbone.out_channels = 1280
>>>
>>> # let's make the RPN generate 5 x 3 anchors per spatial
>>> # location, with 5 different sizes and 3 different aspect
>>> # ratios. We have a Tuple[Tuple[int]] because each feature
>>> # map could potentially have different sizes and
>>> # aspect ratios
>>> anchor_generator = AnchorGenerator(sizes=((32, 64, 128, 256, 512),),
>>> aspect_ratios=((0.5, 1.0, 2.0),))
>>>
>>> # let's define what are the feature maps that we will
>>> # use to perform the region of interest cropping, as well as
>>> # the size of the crop after rescaling.
>>> # if your backbone returns a Tensor, featmap_names is expected to
>>> # be ['0']. More generally, the backbone should return an
>>> # OrderedDict[Tensor], and in featmap_names you can choose which
>>> # feature maps to use.
>>> roi_pooler = torchvision.ops.MultiScaleRoIAlign(featmap_names=['0'],
>>> output_size=7,
>>> sampling_ratio=2)
>>>
>>> # put the pieces together inside a FasterRCNN model
>>> model = FasterRCNN(backbone,
>>> num_classes=2,
>>> rpn_anchor_generator=anchor_generator,
>>> box_roi_pool=roi_pooler)
>>> model.eval()
>>> x = [torch.rand(3, 300, 400), torch.rand(3, 500, 400)]
>>> predictions = model(x)
"""

def __init__(self, backbone, num_classes=None,
# transform parameters
min_size=800, max_size=1333,
image_mean=None, image_std=None,
# RPN parameters
rpn_anchor_generator=None, rpn_head=None,
rpn_pre_nms_top_n_train=2000, rpn_pre_nms_top_n_test=1000,
rpn_post_nms_top_n_train=2000, rpn_post_nms_top_n_test=1000,
rpn_nms_thresh=0.7,
rpn_fg_iou_thresh=0.7, rpn_bg_iou_thresh=0.3,
rpn_batch_size_per_image=256, rpn_positive_fraction=0.5,
# Box parameters
box_roi_pool=None, box_head=None, box_predictor=None,
box_score_thresh=0.05, box_nms_thresh=0.5, box_detections_per_img=100,
box_fg_iou_thresh=0.5, box_bg_iou_thresh=0.5,
box_batch_size_per_image=512, box_positive_fraction=0.25,
bbox_reg_weights=None):

if not hasattr(backbone, "out_channels"):
raise ValueError(
"backbone should contain an attribute out_channels "
"specifying the number of output channels (assumed to be the "
"same for all the levels)")

assert isinstance(rpn_anchor_generator, (AnchorGenerator, type(None)))
assert isinstance(box_roi_pool, (MultiScaleRoIAlign, type(None)))

if num_classes is not None:
if box_predictor is not None:
raise ValueError("num_classes should be None when box_predictor is specified")
else:
if box_predictor is None:
raise ValueError("num_classes should not be None when box_predictor "
"is not specified")

out_channels = backbone.out_channels

if rpn_anchor_generator is None:
anchor_sizes = ((32,), (64,), (128,), (256,), (512,))
aspect_ratios = ((0.5, 1.0, 2.0),) * len(anchor_sizes)
rpn_anchor_generator = AnchorGenerator(
anchor_sizes, aspect_ratios
)
if rpn_head is None:
rpn_head = RPNHead(
out_channels, rpn_anchor_generator.num_anchors_per_location()[0]
)

rpn_pre_nms_top_n = dict(training=rpn_pre_nms_top_n_train, testing=rpn_pre_nms_top_n_test)
rpn_post_nms_top_n = dict(training=rpn_post_nms_top_n_train, testing=rpn_post_nms_top_n_test)

rpn = RegionProposalNetwork(
rpn_anchor_generator, rpn_head,
rpn_fg_iou_thresh, rpn_bg_iou_thresh,
rpn_batch_size_per_image, rpn_positive_fraction,
rpn_pre_nms_top_n, rpn_post_nms_top_n, rpn_nms_thresh)

if box_roi_pool is None:
box_roi_pool = MultiScaleRoIAlign(
featmap_names=['0', '1', '2', '3'],
output_size=7,
sampling_ratio=2)

if box_head is None:
resolution = box_roi_pool.output_size[0]
representation_size = 1024
box_head = TwoMLPHead(
out_channels * resolution ** 2,
representation_size)

if box_predictor is None:
representation_size = 1024
box_predictor = FastRCNNPredictor(
representation_size,
num_classes)

roi_heads = RoIHeads(
# Box
box_roi_pool, box_head, box_predictor,
box_fg_iou_thresh, box_bg_iou_thresh,
box_batch_size_per_image, box_positive_fraction,
bbox_reg_weights,
box_score_thresh, box_nms_thresh, box_detections_per_img)

if image_mean is None:
image_mean = [0.485, 0.456, 0.406]
if image_std is None:
image_std = [0.229, 0.224, 0.225]
transform = GeneralizedRCNNTransform(min_size, max_size, image_mean, image_std)

super(FasterRCNN, self).__init__(backbone, rpn, roi_heads, transform)

FasterRCNN的代码看起来很长,实际上主要是文档注释。

FasterRCNN的实现只有__init__函数,因为FasterRCNN继承自GeneralizedRCNN,主要结构和计算流的实现都在父类中实现了,该子类的实现实际上只需要做一些参数检查和子类的具体子结构的实例化。

FasterRCNN的__init__函数的主要就是在做参数检查和一些实例化准备工作,其结果就是将准备好的backbone、rpn、roi_heads和transform对象传递给父类(GeneralizedRCNN)的初始化函数,由此构建一个FasterRCNN实例对象。

GeneralizedRCNN

GeneralizedRCNNtorchvision.models.detection.generalized_rcnn.py中实现,负责以父类的形式定义RCNN架构的整体计算。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
class GeneralizedRCNN(nn.Module):
"""
Main class for Generalized R-CNN.

Arguments:
backbone (nn.Module):
rpn (nn.Module):
roi_heads (nn.Module): takes the features + the proposals from the RPN and computes
detections / masks from it.
transform (nn.Module): performs the data transformation from the inputs to feed into
the model
"""

def __init__(self, backbone, rpn, roi_heads, transform):
super(GeneralizedRCNN, self).__init__()
self.transform = transform
self.backbone = backbone
self.rpn = rpn
self.roi_heads = roi_heads
# used only on torchscript mode
self._has_warned = False

@torch.jit.unused
def eager_outputs(self, losses, detections):
# type: (Dict[str, Tensor], List[Dict[str, Tensor]]) -> Union[Dict[str, Tensor], List[Dict[str, Tensor]]]
if self.training:
return losses

return detections

def forward(self, images, targets=None):
# type: (List[Tensor], Optional[List[Dict[str, Tensor]]]) -> Tuple[Dict[str, Tensor], List[Dict[str, Tensor]]]
"""
Arguments:
images (list[Tensor]): images to be processed
targets (list[Dict[Tensor]]): ground-truth boxes present in the image (optional)

Returns:
result (list[BoxList] or dict[Tensor]): the output from the model.
During training, it returns a dict[Tensor] which contains the losses.
During testing, it returns list[BoxList] contains additional fields
like `scores`, `labels` and `mask` (for Mask R-CNN models).

"""
if self.training and targets is None:
raise ValueError("In training mode, targets should be passed")
if self.training:
assert targets is not None
for target in targets:
boxes = target["boxes"]
if isinstance(boxes, torch.Tensor):
if len(boxes.shape) != 2 or boxes.shape[-1] != 4:
raise ValueError("Expected target boxes to be a tensor"
"of shape [N, 4], got {:}.".format(
boxes.shape))
else:
raise ValueError("Expected target boxes to be of type "
"Tensor, got {:}.".format(type(boxes)))

original_image_sizes = torch.jit.annotate(List[Tuple[int, int]], [])
for img in images:
val = img.shape[-2:]
assert len(val) == 2
original_image_sizes.append((val[0], val[1]))

images, targets = self.transform(images, targets)

# Check for degenerate boxes
# TODO: Move this to a function
if targets is not None:
for target_idx, target in enumerate(targets):
boxes = target["boxes"]
degenerate_boxes = boxes[:, 2:] <= boxes[:, :2]
if degenerate_boxes.any():
# print the first degenerate box
bb_idx = torch.where(degenerate_boxes.any(dim=1))[0][0]
degen_bb: List[float] = boxes[bb_idx].tolist()
raise ValueError("All bounding boxes should have positive height and width."
" Found invalid box {} for target at index {}."
.format(degen_bb, target_idx))

features = self.backbone(images.tensors)
if isinstance(features, torch.Tensor):
features = OrderedDict([('0', features)])
proposals, proposal_losses = self.rpn(images, features, targets)
detections, detector_losses = self.roi_heads(features, proposals, images.image_sizes, targets)
detections = self.transform.postprocess(detections, images.image_sizes, original_image_sizes)

losses = {}
losses.update(detector_losses)
losses.update(proposal_losses)

if torch.jit.is_scripting():
if not self._has_warned:
warnings.warn("RCNN always returns a (Losses, Detections) tuple in scripting")
self._has_warned = True
return (losses, detections)
else:
return self.eager_outputs(losses, detections)

__init__中,GenerailizedRCNN把R-CNN架构定义为4个组成部分:

  1. transform:一个变换模型,用于对图像和其他输入进行变换;
  2. backbone:一个特征提取模型,输入的是进过变换处理的图像张量,输出的是取得的图像特征features;
  3. rpn:一个RPN模型,输入包含——图像images、backbone提取出的图像特征features以及训练时输入的包含bbox ground truth的targets,输出包含——预测的区域proposals和相应的损失;
  4. roi_heads:一个RoIHeads模型,输入包含——backbone输出的features,RPN输出的proposals,以及图像尺寸和训练时的targets。

该类的__forward__计算流差不多就是这四部分依次执行的过程,除了一些参数检查,训练时和预测时对输入的区分以外,主要代码逻辑可以概括为:

1
2
3
4
5
6
7
8
9
10
11
def forward(self, images, targets=None):
# 1. transform
images, targets = self.transform(images, targets)
# 2. backbone
features = self.backbone(images.tensors)
# 3. rpn
proposals, proposal_losses = self.rpn(images, features, targets)
# 4. roi_heads
detections, detector_losses = self.roi_heads(features, proposals, images.image_sizes, targets)
# 5. postprocess
detections = self.transform.postprocess(detections, images.image_sizes, original_image_sizes)

GeneralizedRCNNTransform

Faster R-CNN模型对输入图像的预处理由torchvision.models.detection.transform包的GeneralizedRCNNTransform类实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
class GeneralizedRCNNTransform(nn.Module):
"""
Performs input / target transformation before feeding the data to a GeneralizedRCNN
model.

The transformations it perform are:
- input normalization (mean subtraction and std division)
- input / target resizing to match min_size / max_size

It returns a ImageList for the inputs, and a List[Dict[Tensor]] for the targets
"""

def __init__(self, min_size, max_size, image_mean, image_std):
super(GeneralizedRCNNTransform, self).__init__()
if not isinstance(min_size, (list, tuple)):
min_size = (min_size,)
self.min_size = min_size
self.max_size = max_size
self.image_mean = image_mean
self.image_std = image_std

def forward(self,
images, # type: List[Tensor]
targets=None # type: Optional[List[Dict[str, Tensor]]]
):
# type: (...) -> Tuple[ImageList, Optional[List[Dict[str, Tensor]]]]
images = [img for img in images]
if targets is not None:
# make a copy of targets to avoid modifying it in-place
# once torchscript supports dict comprehension
# this can be simplified as as follows
# targets = [{k: v for k,v in t.items()} for t in targets]
targets_copy: List[Dict[str, Tensor]] = []
for t in targets:
data: Dict[str, Tensor] = {}
for k, v in t.items():
data[k] = v
targets_copy.append(data)
targets = targets_copy
for i in range(len(images)):
image = images[i]
target_index = targets[i] if targets is not None else None

if image.dim() != 3:
raise ValueError("images is expected to be a list of 3d tensors "
"of shape [C, H, W], got {}".format(image.shape))
image = self.normalize(image)
image, target_index = self.resize(image, target_index)
images[i] = image
if targets is not None and target_index is not None:
targets[i] = target_index

image_sizes = [img.shape[-2:] for img in images]
images = self.batch_images(images)
image_sizes_list = torch.jit.annotate(List[Tuple[int, int]], [])
for image_size in image_sizes:
assert len(image_size) == 2
image_sizes_list.append((image_size[0], image_size[1]))

image_list = ImageList(images, image_sizes_list)
return image_list, targets

def normalize(self, image):
dtype, device = image.dtype, image.device
mean = torch.as_tensor(self.image_mean, dtype=dtype, device=device)
std = torch.as_tensor(self.image_std, dtype=dtype, device=device)
return (image - mean[:, None, None]) / std[:, None, None]

def torch_choice(self, k):
# type: (List[int]) -> int
"""
Implements `random.choice` via torch ops so it can be compiled with
TorchScript. Remove if https://github.com/pytorch/pytorch/issues/25803
is fixed.
"""
index = int(torch.empty(1).uniform_(0., float(len(k))).item())
return k[index]

def resize(self, image, target):
# type: (Tensor, Optional[Dict[str, Tensor]]) -> Tuple[Tensor, Optional[Dict[str, Tensor]]]
h, w = image.shape[-2:]
if self.training:
size = float(self.torch_choice(self.min_size))
else:
# FIXME assume for now that testing uses the largest scale
size = float(self.min_size[-1])
if torchvision._is_tracing():
image, target = _resize_image_and_masks_onnx(image, size, float(self.max_size), target)
else:
image, target = _resize_image_and_masks(image, size, float(self.max_size), target)

if target is None:
return image, target

bbox = target["boxes"]
bbox = resize_boxes(bbox, (h, w), image.shape[-2:])
target["boxes"] = bbox

if "keypoints" in target:
keypoints = target["keypoints"]
keypoints = resize_keypoints(keypoints, (h, w), image.shape[-2:])
target["keypoints"] = keypoints
return image, target

# _onnx_batch_images() is an implementation of
# batch_images() that is supported by ONNX tracing.
@torch.jit.unused
def _onnx_batch_images(self, images, size_divisible=32):
# type: (List[Tensor], int) -> Tensor
max_size = []
for i in range(images[0].dim()):
max_size_i = torch.max(torch.stack([img.shape[i] for img in images]).to(torch.float32)).to(torch.int64)
max_size.append(max_size_i)
stride = size_divisible
max_size[1] = (torch.ceil((max_size[1].to(torch.float32)) / stride) * stride).to(torch.int64)
max_size[2] = (torch.ceil((max_size[2].to(torch.float32)) / stride) * stride).to(torch.int64)
max_size = tuple(max_size)

# work around for
# pad_img[: img.shape[0], : img.shape[1], : img.shape[2]].copy_(img)
# which is not yet supported in onnx
padded_imgs = []
for img in images:
padding = [(s1 - s2) for s1, s2 in zip(max_size, tuple(img.shape))]
padded_img = torch.nn.functional.pad(img, (0, padding[2], 0, padding[1], 0, padding[0]))
padded_imgs.append(padded_img)

return torch.stack(padded_imgs)

def max_by_axis(self, the_list):
# type: (List[List[int]]) -> List[int]
maxes = the_list[0]
for sublist in the_list[1:]:
for index, item in enumerate(sublist):
maxes[index] = max(maxes[index], item)
return maxes

def batch_images(self, images, size_divisible=32):
# type: (List[Tensor], int) -> Tensor
if torchvision._is_tracing():
# batch_images() does not export well to ONNX
# call _onnx_batch_images() instead
return self._onnx_batch_images(images, size_divisible)

max_size = self.max_by_axis([list(img.shape) for img in images])
stride = float(size_divisible)
max_size = list(max_size)
max_size[1] = int(math.ceil(float(max_size[1]) / stride) * stride)
max_size[2] = int(math.ceil(float(max_size[2]) / stride) * stride)

batch_shape = [len(images)] + max_size
batched_imgs = images[0].new_full(batch_shape, 0)
for img, pad_img in zip(images, batched_imgs):
pad_img[: img.shape[0], : img.shape[1], : img.shape[2]].copy_(img)

return batched_imgs

def postprocess(self,
result, # type: List[Dict[str, Tensor]]
image_shapes, # type: List[Tuple[int, int]]
original_image_sizes # type: List[Tuple[int, int]]
):
# type: (...) -> List[Dict[str, Tensor]]
if self.training:
return result
for i, (pred, im_s, o_im_s) in enumerate(zip(result, image_shapes, original_image_sizes)):
boxes = pred["boxes"]
boxes = resize_boxes(boxes, im_s, o_im_s)
result[i]["boxes"] = boxes
if "masks" in pred:
masks = pred["masks"]
masks = paste_masks_in_image(masks, boxes, o_im_s)
result[i]["masks"] = masks
if "keypoints" in pred:
keypoints = pred["keypoints"]
keypoints = resize_keypoints(keypoints, im_s, o_im_s)
result[i]["keypoints"] = keypoints
return result

def __repr__(self):
format_string = self.__class__.__name__ + '('
_indent = '\n '
format_string += "{0}Normalize(mean={1}, std={2})".format(_indent, self.image_mean, self.image_std)
format_string += "{0}Resize(min_size={1}, max_size={2}, mode='bilinear')".format(_indent, self.min_size,
self.max_size)
format_string += '\n)'
return format_string

对输入图像的初步转换处理在forward前向传播函数中实现,主要实现normalize和resize操作:

  1. self.normalize:初始参数在FasterRCNN的初始化中被设为image_mean = [0.485, 0.456, 0.406]image_std = [0.229, 0.224, 0.225]
  2. self.resize:初始参数在FasterRCNN的初始化中被设为min_size=800, max_size=1333
  3. self.batch_images,对一个batch的图像做了Padding,使其输出的张量尺寸一致。

根据该转换模块的默认值,结合本节开头的例子:

  1. 经过resize处理后,因为最小尺寸必须为800,因此$300 \times 400$的图片1转换为了$800 \times 1066$,$400 \times 500$的图片2转换为了$1000 \times 800$;
  2. 因为batch处理转tensors时加padding的缘故,两个图片的张量尺寸被统一为$1024 \times 1088$。

BackboneWithFPN

BackboneWithFPNtorchvision.models.detection.backbone_utils中实现,其作用就是以ResNet模型中提取出的一些中间层作为backbone,在backbone后面继续接上一个FPN。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class BackboneWithFPN(nn.Module):
"""
Adds a FPN on top of a model.
Internally, it uses torchvision.models._utils.IntermediateLayerGetter to
extract a submodel that returns the feature maps specified in return_layers.
The same limitations of IntermediatLayerGetter apply here.
Arguments:
backbone (nn.Module)
return_layers (Dict[name, new_name]): a dict containing the names
of the modules for which the activations will be returned as
the key of the dict, and the value of the dict is the name
of the returned activation (which the user can specify).
in_channels_list (List[int]): number of channels for each feature map
that is returned, in the order they are present in the OrderedDict
out_channels (int): number of channels in the FPN.
Attributes:
out_channels (int): the number of channels in the FPN
"""
def __init__(self, backbone, return_layers, in_channels_list, out_channels, extra_blocks=None):
super(BackboneWithFPN, self).__init__()

if extra_blocks is None:
extra_blocks = LastLevelMaxPool()

self.body = IntermediateLayerGetter(backbone, return_layers=return_layers)
self.fpn = FeaturePyramidNetwork(
in_channels_list=in_channels_list,
out_channels=out_channels,
extra_blocks=extra_blocks,
)
self.out_channels = out_channels

def forward(self, x):
x = self.body(x)
x = self.fpn(x)
return x

该类的实现很简单,就像是一个组合,把backbone和FPN装起来:

  1. 把从backbone中取出的(用于提供feature maps)中间层作为模型的body;
  2. 构造出FPN(FeaturePyramidNetworkj)作为模型的fpn;

然后数据流定义很简洁,就是输入数据x先经过body,再经过fpn,就完成了。

有了backbone+FPN的模型,就可以进一步构造Faster R-CNN模型了。

torchvision的Faster R-CNN的backbone负责提取图像特征,具体实现由ResNet中间层衔接FPN组成。

ResNet

class ResNet(nn.Module)

ResNet在torchvision.models.resnet包中实现,属于卷积神经网络实现的范畴,本文不再赘述。

有了resnet作为backbone,就可以通过resnet_fpn_backbone构造一个在resnet后面接上FPN的模型,具体地,是构造BackboneWithFPN类的对象。

torchvision实现中:

  1. 默认后三层,即ResNet的layer4, layer3, layer2为可训练层,其余freeze;
  2. 默认返回后四层的feature map,即layer1, layer2, layer3, layer4,命名index依次为0, 1, 2, 3,每层输出feature map的通道数依次为256, 512, 1024, 2048。

本节的例子经过ResNet部分的计算后,从输入的$2 \times 3 \times 1024 \times 1088$的tensor,转换为了一个有序字典OrderedDict:

  1. '0': shape[2, 256, 256, 272],源自ResNet的layer1;
  2. '1': shape[2, 512, 128, 136],源自ResNet的layer2;
  3. '2': shape[2, 1024, 64, 68],源自ResNet的layer3;
  4. '3': shape[2, 2048, 32, 34],源自ResNet的layer4;
FPN(FeaturePyramidNetwork)

FeaturePyramidNetworktorchvision.ops.feature_pyramid_network包中实现。FPN实现了金字塔结构的特征提取,低层的卷积感受野小,其特征代表小目标的特征,而高层的卷积感受野大,因此其特征适合表示大目标特征。在目标检测中运用FPN,在低层配合小尺寸anchor,在高层配合大尺寸anchors,有利于同时有效检测小目标和大目标。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
class FeaturePyramidNetwork(nn.Module):
"""
Module that adds a FPN from on top of a set of feature maps. This is based on
`"Feature Pyramid Network for Object Detection" <https://arxiv.org/abs/1612.03144>`_.

The feature maps are currently supposed to be in increasing depth
order.

The input to the model is expected to be an OrderedDict[Tensor], containing
the feature maps on top of which the FPN will be added.

Arguments:
in_channels_list (list[int]): number of channels for each feature map that
is passed to the module
out_channels (int): number of channels of the FPN representation
extra_blocks (ExtraFPNBlock or None): if provided, extra operations will
be performed. It is expected to take the fpn features, the original
features and the names of the original features as input, and returns
a new list of feature maps and their corresponding names

Examples::

>>> m = torchvision.ops.FeaturePyramidNetwork([10, 20, 30], 5)
>>> # get some dummy data
>>> x = OrderedDict()
>>> x['feat0'] = torch.rand(1, 10, 64, 64)
>>> x['feat2'] = torch.rand(1, 20, 16, 16)
>>> x['feat3'] = torch.rand(1, 30, 8, 8)
>>> # compute the FPN on top of x
>>> output = m(x)
>>> print([(k, v.shape) for k, v in output.items()])
>>> # returns
>>> [('feat0', torch.Size([1, 5, 64, 64])),
>>> ('feat2', torch.Size([1, 5, 16, 16])),
>>> ('feat3', torch.Size([1, 5, 8, 8]))]

"""
def __init__(
self,
in_channels_list: List[int],
out_channels: int,
extra_blocks: Optional[ExtraFPNBlock] = None,
):
super(FeaturePyramidNetwork, self).__init__()
self.inner_blocks = nn.ModuleList()
self.layer_blocks = nn.ModuleList()
for in_channels in in_channels_list:
if in_channels == 0:
raise ValueError("in_channels=0 is currently not supported")
inner_block_module = nn.Conv2d(in_channels, out_channels, 1)
layer_block_module = nn.Conv2d(out_channels, out_channels, 3, padding=1)
self.inner_blocks.append(inner_block_module)
self.layer_blocks.append(layer_block_module)

# initialize parameters now to avoid modifying the initialization of top_blocks
for m in self.children():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_uniform_(m.weight, a=1)
nn.init.constant_(m.bias, 0)

if extra_blocks is not None:
assert isinstance(extra_blocks, ExtraFPNBlock)
self.extra_blocks = extra_blocks

def get_result_from_inner_blocks(self, x: Tensor, idx: int) -> Tensor:
"""
This is equivalent to self.inner_blocks[idx](x),
but torchscript doesn't support this yet
"""
num_blocks = 0
for m in self.inner_blocks:
num_blocks += 1
if idx < 0:
idx += num_blocks
i = 0
out = x
for module in self.inner_blocks:
if i == idx:
out = module(x)
i += 1
return out

def get_result_from_layer_blocks(self, x: Tensor, idx: int) -> Tensor:
"""
This is equivalent to self.layer_blocks[idx](x),
but torchscript doesn't support this yet
"""
num_blocks = 0
for m in self.layer_blocks:
num_blocks += 1
if idx < 0:
idx += num_blocks
i = 0
out = x
for module in self.layer_blocks:
if i == idx:
out = module(x)
i += 1
return out

def forward(self, x: Dict[str, Tensor]) -> Dict[str, Tensor]:
"""
Computes the FPN for a set of feature maps.

Arguments:
x (OrderedDict[Tensor]): feature maps for each feature level.

Returns:
results (OrderedDict[Tensor]): feature maps after FPN layers.
They are ordered from highest resolution first.
"""
# unpack OrderedDict into two lists for easier handling
names = list(x.keys())
x = list(x.values())

last_inner = self.get_result_from_inner_blocks(x[-1], -1)
results = []
results.append(self.get_result_from_layer_blocks(last_inner, -1))

for idx in range(len(x) - 2, -1, -1):
inner_lateral = self.get_result_from_inner_blocks(x[idx], idx)
feat_shape = inner_lateral.shape[-2:]
inner_top_down = F.interpolate(last_inner, size=feat_shape, mode="nearest")
last_inner = inner_lateral + inner_top_down
results.insert(0, self.get_result_from_layer_blocks(last_inner, idx))

if self.extra_blocks is not None:
results, names = self.extra_blocks(results, x, names)

# make it back an OrderedDict
out = OrderedDict([(k, v) for k, v in zip(names, results)])

return out

按原论文的思路,FPN第n层输出feature map $P_n$的是把两者进行合并:

  1. lateral:CNN的第n层feature map $C_n$,做1×1卷积;
  2. top-down upsampling:FPN的n+1层feature map $P_{n+1}$做2×上采样(长宽各2倍)变成第n层的尺寸;

此后,采用3×3卷积对合并后的feature map进行卷积处理,以便消除上采样操作造成的失真效应(aliasing effect)。

此时,形成的每层的最终的feature map就是最终的feature map $P_n$,例如:从ResNet的2~5层feature map 经过FPN取得,对应的两者的空域尺寸(spatial size)是相同的。

torchvision的具体实现中:

  1. self.inner_blocks就是FPN的所有1×1卷积;
  2. self.layer_blocks就是FPN合并后需要用到的3×3卷积;

这两者都是nn.ModuleList(),在__init__初始化时,在一个n次(n个feature map)的for循环中进行初始化,都填入nn.Conv2d对象,设置为统一的out_channels

__forward__定义的计算流中,核心代码逻辑可以概括为:

1
2
3
4
5
6
7
8
9
10
11
12
def forward(self, x: Dict[str, Tensor]):
for idx in range(len(x) - 2, -1, -1):
inner_lateral = self.get_result_from_inner_blocks(x[idx], idx)
feat_shape = inner_lateral.shape[-2:]
inner_top_down = F.interpolate(last_inner, size=feat_shape, mode="nearest")
last_inner = inner_lateral + inner_top_down
results.insert(0, self.get_result_from_layer_blocks(last_inner, idx))

if self.extra_blocks is not None:
results, names = self.extra_blocks(results, x, names)

return out

具体步骤是从后往前计算每一层的result,即论文中的$P_n$:

  1. inner_lateral就是CNN的feature map经过1×1卷积计算的结果,该卷积通过self.get_result_from_inner_blocks(x[idx], idx)实现;
  2. inner_top_down就是从后一层$P_{n+1}$上采样出来的结果,该上采样通过插值实现F.interpolate(last_inner, size=feat_shape, mode="nearest")
  3. last_inner就是两者合并的结果,通过element-wise addition实现;
  4. 在加入results前,还需要用3×3卷积计算一下,即self.get_result_from_layer_blocks(last_inner, idx)

最后,如果还有额外计算块的话,就再算一遍,取得这层的结果也加入。

在具体实现中,在FPN尾部增加了LastLevelMaxPool,并将其计算结果命名为pool加入了names

本节的例子经过FPN部分的计算后,从ResNet输出的4个通道数不同的feature maps,转换为了各层通道数一致的一个有序字典OrderedDict:

  1. '0': shape[2, 256, 256, 272],源自ResNet的layer1;
  2. '1': shape[2, 256, 128, 136],源自ResNet的layer2;
  3. '2': shape[2, 256, 64, 68],源自ResNet的layer3;
  4. '3': shape[2, 256, 32, 34],源自ResNet的layer4;
  5. 'pool': shape[2, 256, 16, 17],源自FPN作为extra_blocksLastLevelMaxPool

RegionProposalNetwork

RegionProposalNetworktorchvision.models.detection.rpn包中实现。

RegionProposalNetwork的实现比较长,主要看__init____forward__就可以了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
class RegionProposalNetwork(torch.nn.Module):
"""
Implements Region Proposal Network (RPN).

Arguments:
anchor_generator (AnchorGenerator): module that generates the anchors for a set of feature
maps.
head (nn.Module): module that computes the objectness and regression deltas
fg_iou_thresh (float): minimum IoU between the anchor and the GT box so that they can be
considered as positive during training of the RPN.
bg_iou_thresh (float): maximum IoU between the anchor and the GT box so that they can be
considered as negative during training of the RPN.
batch_size_per_image (int): number of anchors that are sampled during training of the RPN
for computing the loss
positive_fraction (float): proportion of positive anchors in a mini-batch during training
of the RPN
pre_nms_top_n (Dict[int]): number of proposals to keep before applying NMS. It should
contain two fields: training and testing, to allow for different values depending
on training or evaluation
post_nms_top_n (Dict[int]): number of proposals to keep after applying NMS. It should
contain two fields: training and testing, to allow for different values depending
on training or evaluation
nms_thresh (float): NMS threshold used for postprocessing the RPN proposals

"""
__annotations__ = {
'box_coder': det_utils.BoxCoder,
'proposal_matcher': det_utils.Matcher,
'fg_bg_sampler': det_utils.BalancedPositiveNegativeSampler,
'pre_nms_top_n': Dict[str, int],
'post_nms_top_n': Dict[str, int],
}

def __init__(self,
anchor_generator,
head,
#
fg_iou_thresh, bg_iou_thresh,
batch_size_per_image, positive_fraction,
#
pre_nms_top_n, post_nms_top_n, nms_thresh):
super(RegionProposalNetwork, self).__init__()
self.anchor_generator = anchor_generator
self.head = head
self.box_coder = det_utils.BoxCoder(weights=(1.0, 1.0, 1.0, 1.0))

# used during training
self.box_similarity = box_ops.box_iou

self.proposal_matcher = det_utils.Matcher(
fg_iou_thresh,
bg_iou_thresh,
allow_low_quality_matches=True,
)

self.fg_bg_sampler = det_utils.BalancedPositiveNegativeSampler(
batch_size_per_image, positive_fraction
)
# used during testing
self._pre_nms_top_n = pre_nms_top_n
self._post_nms_top_n = post_nms_top_n
self.nms_thresh = nms_thresh
self.min_size = 1e-3

def pre_nms_top_n(self):
if self.training:
return self._pre_nms_top_n['training']
return self._pre_nms_top_n['testing']

def post_nms_top_n(self):
if self.training:
return self._post_nms_top_n['training']
return self._post_nms_top_n['testing']

def assign_targets_to_anchors(self, anchors, targets):
# type: (List[Tensor], List[Dict[str, Tensor]]) -> Tuple[List[Tensor], List[Tensor]]
labels = []
matched_gt_boxes = []
for anchors_per_image, targets_per_image in zip(anchors, targets):
gt_boxes = targets_per_image["boxes"]

if gt_boxes.numel() == 0:
# Background image (negative example)
device = anchors_per_image.device
matched_gt_boxes_per_image = torch.zeros(anchors_per_image.shape, dtype=torch.float32, device=device)
labels_per_image = torch.zeros((anchors_per_image.shape[0],), dtype=torch.float32, device=device)
else:
match_quality_matrix = self.box_similarity(gt_boxes, anchors_per_image)
matched_idxs = self.proposal_matcher(match_quality_matrix)
# get the targets corresponding GT for each proposal
# NB: need to clamp the indices because we can have a single
# GT in the image, and matched_idxs can be -2, which goes
# out of bounds
matched_gt_boxes_per_image = gt_boxes[matched_idxs.clamp(min=0)]

labels_per_image = matched_idxs >= 0
labels_per_image = labels_per_image.to(dtype=torch.float32)

# Background (negative examples)
bg_indices = matched_idxs == self.proposal_matcher.BELOW_LOW_THRESHOLD
labels_per_image[bg_indices] = 0.0

# discard indices that are between thresholds
inds_to_discard = matched_idxs == self.proposal_matcher.BETWEEN_THRESHOLDS
labels_per_image[inds_to_discard] = -1.0

labels.append(labels_per_image)
matched_gt_boxes.append(matched_gt_boxes_per_image)
return labels, matched_gt_boxes

def _get_top_n_idx(self, objectness, num_anchors_per_level):
# type: (Tensor, List[int]) -> Tensor
r = []
offset = 0
for ob in objectness.split(num_anchors_per_level, 1):
if torchvision._is_tracing():
num_anchors, pre_nms_top_n = _onnx_get_num_anchors_and_pre_nms_top_n(ob, self.pre_nms_top_n())
else:
num_anchors = ob.shape[1]
pre_nms_top_n = min(self.pre_nms_top_n(), num_anchors)
_, top_n_idx = ob.topk(pre_nms_top_n, dim=1)
r.append(top_n_idx + offset)
offset += num_anchors
return torch.cat(r, dim=1)

def filter_proposals(self, proposals, objectness, image_shapes, num_anchors_per_level):
# type: (Tensor, Tensor, List[Tuple[int, int]], List[int]) -> Tuple[List[Tensor], List[Tensor]]
num_images = proposals.shape[0]
device = proposals.device
# do not backprop throught objectness
objectness = objectness.detach()
objectness = objectness.reshape(num_images, -1)

levels = [
torch.full((n,), idx, dtype=torch.int64, device=device)
for idx, n in enumerate(num_anchors_per_level)
]
levels = torch.cat(levels, 0)
levels = levels.reshape(1, -1).expand_as(objectness)

# select top_n boxes independently per level before applying nms
top_n_idx = self._get_top_n_idx(objectness, num_anchors_per_level)

image_range = torch.arange(num_images, device=device)
batch_idx = image_range[:, None]

objectness = objectness[batch_idx, top_n_idx]
levels = levels[batch_idx, top_n_idx]
proposals = proposals[batch_idx, top_n_idx]

final_boxes = []
final_scores = []
for boxes, scores, lvl, img_shape in zip(proposals, objectness, levels, image_shapes):
boxes = box_ops.clip_boxes_to_image(boxes, img_shape)
keep = box_ops.remove_small_boxes(boxes, self.min_size)
boxes, scores, lvl = boxes[keep], scores[keep], lvl[keep]
# non-maximum suppression, independently done per level
keep = box_ops.batched_nms(boxes, scores, lvl, self.nms_thresh)
# keep only topk scoring predictions
keep = keep[:self.post_nms_top_n()]
boxes, scores = boxes[keep], scores[keep]
final_boxes.append(boxes)
final_scores.append(scores)
return final_boxes, final_scores

def compute_loss(self, objectness, pred_bbox_deltas, labels, regression_targets):
# type: (Tensor, Tensor, List[Tensor], List[Tensor]) -> Tuple[Tensor, Tensor]
"""
Arguments:
objectness (Tensor)
pred_bbox_deltas (Tensor)
labels (List[Tensor])
regression_targets (List[Tensor])

Returns:
objectness_loss (Tensor)
box_loss (Tensor)
"""

sampled_pos_inds, sampled_neg_inds = self.fg_bg_sampler(labels)
sampled_pos_inds = torch.where(torch.cat(sampled_pos_inds, dim=0))[0]
sampled_neg_inds = torch.where(torch.cat(sampled_neg_inds, dim=0))[0]

sampled_inds = torch.cat([sampled_pos_inds, sampled_neg_inds], dim=0)

objectness = objectness.flatten()

labels = torch.cat(labels, dim=0)
regression_targets = torch.cat(regression_targets, dim=0)

box_loss = det_utils.smooth_l1_loss(
pred_bbox_deltas[sampled_pos_inds],
regression_targets[sampled_pos_inds],
beta=1 / 9,
size_average=False,
) / (sampled_inds.numel())

objectness_loss = F.binary_cross_entropy_with_logits(
objectness[sampled_inds], labels[sampled_inds]
)

return objectness_loss, box_loss

def forward(self,
images, # type: ImageList
features, # type: Dict[str, Tensor]
targets=None # type: Optional[List[Dict[str, Tensor]]]
):
# type: (...) -> Tuple[List[Tensor], Dict[str, Tensor]]
"""
Arguments:
images (ImageList): images for which we want to compute the predictions
features (OrderedDict[Tensor]): features computed from the images that are
used for computing the predictions. Each tensor in the list
correspond to different feature levels
targets (List[Dict[Tensor]]): ground-truth boxes present in the image (optional).
If provided, each element in the dict should contain a field `boxes`,
with the locations of the ground-truth boxes.

Returns:
boxes (List[Tensor]): the predicted boxes from the RPN, one Tensor per
image.
losses (Dict[Tensor]): the losses for the model during training. During
testing, it is an empty dict.
"""
# RPN uses all feature maps that are available
features = list(features.values())
objectness, pred_bbox_deltas = self.head(features)
anchors = self.anchor_generator(images, features)

num_images = len(anchors)
num_anchors_per_level_shape_tensors = [o[0].shape for o in objectness]
num_anchors_per_level = [s[0] * s[1] * s[2] for s in num_anchors_per_level_shape_tensors]
objectness, pred_bbox_deltas = \
concat_box_prediction_layers(objectness, pred_bbox_deltas)
# apply pred_bbox_deltas to anchors to obtain the decoded proposals
# note that we detach the deltas because Faster R-CNN do not backprop through
# the proposals
proposals = self.box_coder.decode(pred_bbox_deltas.detach(), anchors)
proposals = proposals.view(num_images, -1, 4)
boxes, scores = self.filter_proposals(proposals, objectness, images.image_sizes, num_anchors_per_level)

losses = {}
if self.training:
assert targets is not None
labels, matched_gt_boxes = self.assign_targets_to_anchors(anchors, targets)
regression_targets = self.box_coder.encode(matched_gt_boxes, anchors)
loss_objectness, loss_rpn_box_reg = self.compute_loss(
objectness, pred_bbox_deltas, labels, regression_targets)
losses = {
"loss_objectness": loss_objectness,
"loss_rpn_box_reg": loss_rpn_box_reg,
}
return boxes, losses

主要看__forward__中的计算流,RPN的完整过程

  1. 把输入的特征features输入到RPNHead(self.head)中,输出object/non-object分类分值(objectness)和bbox回归数值(pred_bbox_deltas);
  2. self.anchor_generator为当前输入的图像和feature map生成anchors
  3. self.box_coder.decode把bbox回归数值pred_bbox_deltas算到锚框anchors上,得到预测出的候选框proposals
  4. 计算出的proposals可能很多且相互密集重叠,那么就通过self.filter_proposals做一遍过滤,输出候选框proposals和与之对应的分值scores
  5. 如果是训练时,当然在RPN阶段需要根据预测出的proposals与候选框真值之间的误差来计算损失。
AnchorGenerator

AnchorGeneratortorchvision.models.detection.anchor_utils中实现,其作用是根据预定义的anchor的sizes和aspect_ratios,针对图像到feature map的尺寸比例,计算feature map对应的anchors。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
class AnchorGenerator(nn.Module):
"""
Module that generates anchors for a set of feature maps and
image sizes.

The module support computing anchors at multiple sizes and aspect ratios
per feature map. This module assumes aspect ratio = height / width for
each anchor.

sizes and aspect_ratios should have the same number of elements, and it should
correspond to the number of feature maps.

sizes[i] and aspect_ratios[i] can have an arbitrary number of elements,
and AnchorGenerator will output a set of sizes[i] * aspect_ratios[i] anchors
per spatial location for feature map i.

Arguments:
sizes (Tuple[Tuple[int]]):
aspect_ratios (Tuple[Tuple[float]]):
"""

__annotations__ = {
"cell_anchors": Optional[List[torch.Tensor]],
"_cache": Dict[str, List[torch.Tensor]]
}

def __init__(
self,
sizes=((128, 256, 512),),
aspect_ratios=((0.5, 1.0, 2.0),),
):
super(AnchorGenerator, self).__init__()

if not isinstance(sizes[0], (list, tuple)):
# TODO change this
sizes = tuple((s,) for s in sizes)
if not isinstance(aspect_ratios[0], (list, tuple)):
aspect_ratios = (aspect_ratios,) * len(sizes)

assert len(sizes) == len(aspect_ratios)

self.sizes = sizes
self.aspect_ratios = aspect_ratios
self.cell_anchors = None
self._cache = {}

# TODO: https://github.com/pytorch/pytorch/issues/26792
# For every (aspect_ratios, scales) combination, output a zero-centered anchor with those values.
# (scales, aspect_ratios) are usually an element of zip(self.scales, self.aspect_ratios)
# This method assumes aspect ratio = height / width for an anchor.
def generate_anchors(self, scales, aspect_ratios, dtype=torch.float32, device="cpu"):
# type: (List[int], List[float], int, Device) -> Tensor # noqa: F821
scales = torch.as_tensor(scales, dtype=dtype, device=device)
aspect_ratios = torch.as_tensor(aspect_ratios, dtype=dtype, device=device)
h_ratios = torch.sqrt(aspect_ratios)
w_ratios = 1 / h_ratios

ws = (w_ratios[:, None] * scales[None, :]).view(-1)
hs = (h_ratios[:, None] * scales[None, :]).view(-1)

base_anchors = torch.stack([-ws, -hs, ws, hs], dim=1) / 2
return base_anchors.round()

def set_cell_anchors(self, dtype, device):
# type: (int, Device) -> None # noqa: F821
if self.cell_anchors is not None:
cell_anchors = self.cell_anchors
assert cell_anchors is not None
# suppose that all anchors have the same device
# which is a valid assumption in the current state of the codebase
if cell_anchors[0].device == device:
return

cell_anchors = [
self.generate_anchors(
sizes,
aspect_ratios,
dtype,
device
)
for sizes, aspect_ratios in zip(self.sizes, self.aspect_ratios)
]
self.cell_anchors = cell_anchors

def num_anchors_per_location(self):
return [len(s) * len(a) for s, a in zip(self.sizes, self.aspect_ratios)]

# For every combination of (a, (g, s), i) in (self.cell_anchors, zip(grid_sizes, strides), 0:2),
# output g[i] anchors that are s[i] distance apart in direction i, with the same dimensions as a.
def grid_anchors(self, grid_sizes, strides):
# type: (List[List[int]], List[List[Tensor]]) -> List[Tensor]
anchors = []
cell_anchors = self.cell_anchors
assert cell_anchors is not None
assert len(grid_sizes) == len(strides) == len(cell_anchors)

for size, stride, base_anchors in zip(
grid_sizes, strides, cell_anchors
):
grid_height, grid_width = size
stride_height, stride_width = stride
device = base_anchors.device

# For output anchor, compute [x_center, y_center, x_center, y_center]
shifts_x = torch.arange(
0, grid_width, dtype=torch.float32, device=device
) * stride_width
shifts_y = torch.arange(
0, grid_height, dtype=torch.float32, device=device
) * stride_height
shift_y, shift_x = torch.meshgrid(shifts_y, shifts_x)
shift_x = shift_x.reshape(-1)
shift_y = shift_y.reshape(-1)
shifts = torch.stack((shift_x, shift_y, shift_x, shift_y), dim=1)

# For every (base anchor, output anchor) pair,
# offset each zero-centered base anchor by the center of the output anchor.
anchors.append(
(shifts.view(-1, 1, 4) + base_anchors.view(1, -1, 4)).reshape(-1, 4)
)

return anchors

def cached_grid_anchors(self, grid_sizes, strides):
# type: (List[List[int]], List[List[Tensor]]) -> List[Tensor]
key = str(grid_sizes) + str(strides)
if key in self._cache:
return self._cache[key]
anchors = self.grid_anchors(grid_sizes, strides)
self._cache[key] = anchors
return anchors

def forward(self, image_list, feature_maps):
# type: (ImageList, List[Tensor]) -> List[Tensor]
grid_sizes = list([feature_map.shape[-2:] for feature_map in feature_maps])
image_size = image_list.tensors.shape[-2:]
dtype, device = feature_maps[0].dtype, feature_maps[0].device
strides = [[torch.tensor(image_size[0] // g[0], dtype=torch.int64, device=device),
torch.tensor(image_size[1] // g[1], dtype=torch.int64, device=device)] for g in grid_sizes]
self.set_cell_anchors(dtype, device)
anchors_over_all_feature_maps = self.cached_grid_anchors(grid_sizes, strides)
anchors = torch.jit.annotate(List[List[torch.Tensor]], [])
for i, (image_height, image_width) in enumerate(image_list.image_sizes):
anchors_in_image = []
for anchors_per_feature_map in anchors_over_all_feature_maps:
anchors_in_image.append(anchors_per_feature_map)
anchors.append(anchors_in_image)
anchors = [torch.cat(anchors_per_image) for anchors_per_image in anchors]
# Clear the cache in case that memory leaks.
self._cache.clear()
return anchors

这部分代码也有点长,主要原理也还是看__forward__计算流即可,这里面有一系列预备的计算,随后就是两层的for循环,表示:每一个图片可以传入$n$个(尺寸不同的)feature map,每一个feature map上都有个anchor,那么每个图片就有个anchors。 其中,每个滑窗位置上有个anchor,第$i$个feature map上有个滑窗位置,则该层feature map上有个anchors。

两层循环,外层遍历images,内层遍历feature maps,由此输出所有图片的feature map上的anchors。

在具体实现中,AnchorGenerator:

  1. self.set_cell_anchors函数负责为每一层feature map生成self.cell_anchors,这个Cell Anchors的尺寸基于的是输入图片tensor的尺寸,;
  2. self.cached_grid_anchors函数内会进一步调用self.grid_anchors函数,该函数负责根据feature map的网格尺寸以及该feature map相较于输入图片tensor的步长,计算出anchors_over_all_feature_maps,它的尺寸则是基于输入图片tensor的尺寸。
  3. 双重for循环,输入$N$个图片,就相应地将anchors复制出$N$份。
  4. 最后torch.cat拉平每个图片上不同feature map上的所有$K$个anchors,形成一个长度为$N$的list,每个元素是$K \times 4$的anchors张量。

对应到例子,torchvision实现默认为:

  1. 3种aspect ratio,分别为0.5, 1.0, 2.0;
  2. 每层feature map对应1个scale,5层feature map分别为16, 32, 64, 128, 256。

因此,cell_anchors中,每层feature map都是3个anchor cells:

  1. '0': shape[3, 4]
  2. '1': shape[3, 4]
  3. '2': shape[3, 4]
  4. '3': shape[3, 4]
  5. '4': shape[3, 4]

结合例子来算,把cell_anchors算到输入图像张量的每一个滑窗位置上,就可以算出所有位置上的所有anchors_over_all_feature_maps

  1. '0': shape[208896, 4],
  2. '1': shape[52224, 4],
  3. '2': shape[13056, 4],
  4. '3': shape[3264, 4],
  5. '4': shape[816, 4],

最后返回的anchors会为输入的每个图片复制一份,并通过torch.cat拉平:

  1. shape[278256, 4],
  2. shape[278256, 4],
RPNHead

RPNHeadtorchvision.models.detection.rpn包中实现。RPNHead被用于以滑窗的形式在特征提取出的feature map上滑动并计算每个anchor的bbox回归值和object/non-object二分类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class RPNHead(nn.Module):
"""
Adds a simple RPN Head with classification and regression heads

Arguments:
in_channels (int): number of channels of the input feature
num_anchors (int): number of anchors to be predicted
"""

def __init__(self, in_channels, num_anchors):
super(RPNHead, self).__init__()
self.conv = nn.Conv2d(
in_channels, in_channels, kernel_size=3, stride=1, padding=1
)
self.cls_logits = nn.Conv2d(in_channels, num_anchors, kernel_size=1, stride=1)
self.bbox_pred = nn.Conv2d(
in_channels, num_anchors * 4, kernel_size=1, stride=1
)

for layer in self.children():
torch.nn.init.normal_(layer.weight, std=0.01)
torch.nn.init.constant_(layer.bias, 0)

def forward(self, x):
# type: (List[Tensor]) -> Tuple[List[Tensor], List[Tensor]]
logits = []
bbox_reg = []
for feature in x:
t = F.relu(self.conv(feature))
logits.append(self.cls_logits(t))
bbox_reg.append(self.bbox_pred(t))
return logits, bbox_reg

可以看到,RPNHead的结构不复杂,就是三个卷积:

  1. self.conv:3×3卷积,对输入的feature map做卷积处理;
  2. self.cls_logits:1×1卷积,对处理后的feature map $t$做卷积,取得object/non-object的分类数值;
  3. self.bbox_reg:1×1卷积,对处理后的feature map $t$做卷积,取得bbox坐标值的回归数值。

forward前向传播计算的时候,输入的x是一个List[Tensor],即FPN的输出。值得注意的是,for循环遍历的并不是每一张图片,而是FPN输出的每一层特征。

在本例中,RPNHead的两个卷积分支输出了两个List[Tensor]

logits (objectness):

  1. '0': shape[2, 3, 252, 272];
  2. '1': shape[2, 3, 128, 136];
  3. '2': shape[2, 3, 64, 68];
  4. '3': shape[2, 3, 32, 34];
  5. '4': shape[2, 3, 16, 17];

bbox_regs (pred_bbox_deltas):

  1. '0': shape[2, 12, 252, 272];
  2. '1': shape[2, 12, 128, 136];
  3. '2': shape[2, 12, 64, 68];
  4. '3': shape[2, 12, 32, 34];
  5. '4': shape[2, 12, 16, 17];

因为每个滑窗位置对应三种ratios,即3个anchors,所以logits是3个值,而bbox_regs因为坐标乘4,所以是12个值。

BoxCoder

BoxCodertorchvision.models.detection._utils中实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class BoxCoder(object):
"""
This class encodes and decodes a set of bounding boxes into
the representation used for training the regressors.
"""

def __init__(self, weights, bbox_xform_clip=math.log(1000. / 16)):
# type: (Tuple[float, float, float, float], float) -> None
"""
Arguments:
weights (4-element tuple)
bbox_xform_clip (float)
"""
self.weights = weights
self.bbox_xform_clip = bbox_xform_clip

def encode(self, reference_boxes, proposals):
# type: (List[Tensor], List[Tensor]) -> List[Tensor]
boxes_per_image = [len(b) for b in reference_boxes]
reference_boxes = torch.cat(reference_boxes, dim=0)
proposals = torch.cat(proposals, dim=0)
targets = self.encode_single(reference_boxes, proposals)
return targets.split(boxes_per_image, 0)

def encode_single(self, reference_boxes, proposals):
"""
Encode a set of proposals with respect to some
reference boxes

Arguments:
reference_boxes (Tensor): reference boxes
proposals (Tensor): boxes to be encoded
"""
dtype = reference_boxes.dtype
device = reference_boxes.device
weights = torch.as_tensor(self.weights, dtype=dtype, device=device)
targets = encode_boxes(reference_boxes, proposals, weights)

return targets

def decode(self, rel_codes, boxes):
# type: (Tensor, List[Tensor]) -> Tensor
assert isinstance(boxes, (list, tuple))
assert isinstance(rel_codes, torch.Tensor)
boxes_per_image = [b.size(0) for b in boxes]
concat_boxes = torch.cat(boxes, dim=0)
box_sum = 0
for val in boxes_per_image:
box_sum += val
pred_boxes = self.decode_single(
rel_codes.reshape(box_sum, -1), concat_boxes
)
return pred_boxes.reshape(box_sum, -1, 4)

def decode_single(self, rel_codes, boxes):
"""
From a set of original boxes and encoded relative box offsets,
get the decoded boxes.

Arguments:
rel_codes (Tensor): encoded boxes
boxes (Tensor): reference boxes.
"""

boxes = boxes.to(rel_codes.dtype)

widths = boxes[:, 2] - boxes[:, 0]
heights = boxes[:, 3] - boxes[:, 1]
ctr_x = boxes[:, 0] + 0.5 * widths
ctr_y = boxes[:, 1] + 0.5 * heights

wx, wy, ww, wh = self.weights
dx = rel_codes[:, 0::4] / wx
dy = rel_codes[:, 1::4] / wy
dw = rel_codes[:, 2::4] / ww
dh = rel_codes[:, 3::4] / wh

# Prevent sending too large values into torch.exp()
dw = torch.clamp(dw, max=self.bbox_xform_clip)
dh = torch.clamp(dh, max=self.bbox_xform_clip)

pred_ctr_x = dx * widths[:, None] + ctr_x[:, None]
pred_ctr_y = dy * heights[:, None] + ctr_y[:, None]
pred_w = torch.exp(dw) * widths[:, None]
pred_h = torch.exp(dh) * heights[:, None]

pred_boxes1 = pred_ctr_x - torch.tensor(0.5, dtype=pred_ctr_x.dtype, device=pred_w.device) * pred_w
pred_boxes2 = pred_ctr_y - torch.tensor(0.5, dtype=pred_ctr_y.dtype, device=pred_h.device) * pred_h
pred_boxes3 = pred_ctr_x + torch.tensor(0.5, dtype=pred_ctr_x.dtype, device=pred_w.device) * pred_w
pred_boxes4 = pred_ctr_y + torch.tensor(0.5, dtype=pred_ctr_y.dtype, device=pred_h.device) * pred_h
pred_boxes = torch.stack((pred_boxes1, pred_boxes2, pred_boxes3, pred_boxes4), dim=2).flatten(1)
return pred_boxes

RPN中self.box_coder使用BoxCoder作为bbox的编解码器:

1
proposals = self.box_coder.decode(pred_bbox_deltas.detach(), anchors)

通过BoxCoder实例,将RPNHead回归出的pred_bbox_deltas与RPN的锚框anchors做解码计算,把回归出的偏移值加到基准anchors位置上,解码输出候选框proposals

在本例中,RPN的forward对解码出的原始proposals做了维度整理proposals = proposals.view(num_images, -1, 4),得到的proposals是:

  • shape[2, 278256, 4]
filter_proposals

filter_proposals是一个对RPN Head生成的候选框proposals的过滤操作,在RPN类RegionProposalNetwork中作为成员函数实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class RegionProposalNetwork(torch.nn.Module):

# ...

def filter_proposals(self, proposals, objectness, image_shapes, num_anchors_per_level):
# type: (Tensor, Tensor, List[Tuple[int, int]], List[int]) -> Tuple[List[Tensor], List[Tensor]]
num_images = proposals.shape[0]
device = proposals.device
# do not backprop throught objectness
objectness = objectness.detach()
objectness = objectness.reshape(num_images, -1)

levels = [
torch.full((n,), idx, dtype=torch.int64, device=device)
for idx, n in enumerate(num_anchors_per_level)
]
levels = torch.cat(levels, 0)
levels = levels.reshape(1, -1).expand_as(objectness)

# select top_n boxes independently per level before applying nms
top_n_idx = self._get_top_n_idx(objectness, num_anchors_per_level)

image_range = torch.arange(num_images, device=device)
batch_idx = image_range[:, None]

objectness = objectness[batch_idx, top_n_idx]
levels = levels[batch_idx, top_n_idx]
proposals = proposals[batch_idx, top_n_idx]

final_boxes = []
final_scores = []
for boxes, scores, lvl, img_shape in zip(proposals, objectness, levels, image_shapes):
boxes = box_ops.clip_boxes_to_image(boxes, img_shape)
keep = box_ops.remove_small_boxes(boxes, self.min_size)
boxes, scores, lvl = boxes[keep], scores[keep], lvl[keep]
# non-maximum suppression, independently done per level
keep = box_ops.batched_nms(boxes, scores, lvl, self.nms_thresh)
# keep only topk scoring predictions
keep = keep[:self.post_nms_top_n()]
boxes, scores = boxes[keep], scores[keep]
final_boxes.append(boxes)
final_scores.append(scores)
return final_boxes, final_scores

proposals的过滤操作分几个阶段实现:

  1. 首先,根据objectness分值和num_anchors_per_level来在每层选出top_n_idx(pre_nms_top_n)用于在NMS前先筛选一下proposals
  2. 此后,进入for循环,遍历batch中的每张图片,
    1. 先做一些裁边界、去小框的处理;
    2. 然后再做(类间)NMS;
    3. 保留当前图片所有结果的post_nms_top_n的目标作为返回结果。

最后将这么多筛选操作筛选出的final_boxesfinal_scores返回(boxes是筛选后的proposalsscores是筛选后的objectness)。

在本例中,有两张图片,每张图片上有278256个anchors,因此产生278256个proposalsobjectness,进过筛选处理后:

  1. final_boxes
    1. shape[1000, 4]
    2. shape[1000, 4]
  2. final_scores:
    1. shape 1000
    2. shape 1000

因为FasterRCNN中默认值rpn_post_nms_top_n_test=1000,所以在eval模式(即test, infer情况)下,例子中的两张图片都各筛选出了top-1000个boxes。

RoIHeads

RoIHeadstorchvision.models.detection.roi_heads包中实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
class RoIHeads(torch.nn.Module):
__annotations__ = {
'box_coder': det_utils.BoxCoder,
'proposal_matcher': det_utils.Matcher,
'fg_bg_sampler': det_utils.BalancedPositiveNegativeSampler,
}

def __init__(self,
box_roi_pool,
box_head,
box_predictor,
# Faster R-CNN training
fg_iou_thresh, bg_iou_thresh,
batch_size_per_image, positive_fraction,
bbox_reg_weights,
# Faster R-CNN inference
score_thresh,
nms_thresh,
detections_per_img,
# Mask
mask_roi_pool=None,
mask_head=None,
mask_predictor=None,
keypoint_roi_pool=None,
keypoint_head=None,
keypoint_predictor=None,
):
super(RoIHeads, self).__init__()

self.box_similarity = box_ops.box_iou
# assign ground-truth boxes for each proposal
self.proposal_matcher = det_utils.Matcher(
fg_iou_thresh,
bg_iou_thresh,
allow_low_quality_matches=False)

self.fg_bg_sampler = det_utils.BalancedPositiveNegativeSampler(
batch_size_per_image,
positive_fraction)

if bbox_reg_weights is None:
bbox_reg_weights = (10., 10., 5., 5.)
self.box_coder = det_utils.BoxCoder(bbox_reg_weights)

self.box_roi_pool = box_roi_pool
self.box_head = box_head
self.box_predictor = box_predictor

self.score_thresh = score_thresh
self.nms_thresh = nms_thresh
self.detections_per_img = detections_per_img

self.mask_roi_pool = mask_roi_pool
self.mask_head = mask_head
self.mask_predictor = mask_predictor

self.keypoint_roi_pool = keypoint_roi_pool
self.keypoint_head = keypoint_head
self.keypoint_predictor = keypoint_predictor

def has_mask(self):
if self.mask_roi_pool is None:
return False
if self.mask_head is None:
return False
if self.mask_predictor is None:
return False
return True

def has_keypoint(self):
if self.keypoint_roi_pool is None:
return False
if self.keypoint_head is None:
return False
if self.keypoint_predictor is None:
return False
return True

def assign_targets_to_proposals(self, proposals, gt_boxes, gt_labels):
# type: (List[Tensor], List[Tensor], List[Tensor]) -> Tuple[List[Tensor], List[Tensor]]
matched_idxs = []
labels = []
for proposals_in_image, gt_boxes_in_image, gt_labels_in_image in zip(proposals, gt_boxes, gt_labels):

if gt_boxes_in_image.numel() == 0:
# Background image
device = proposals_in_image.device
clamped_matched_idxs_in_image = torch.zeros(
(proposals_in_image.shape[0],), dtype=torch.int64, device=device
)
labels_in_image = torch.zeros(
(proposals_in_image.shape[0],), dtype=torch.int64, device=device
)
else:
# set to self.box_similarity when https://github.com/pytorch/pytorch/issues/27495 lands
match_quality_matrix = box_ops.box_iou(gt_boxes_in_image, proposals_in_image)
matched_idxs_in_image = self.proposal_matcher(match_quality_matrix)

clamped_matched_idxs_in_image = matched_idxs_in_image.clamp(min=0)

labels_in_image = gt_labels_in_image[clamped_matched_idxs_in_image]
labels_in_image = labels_in_image.to(dtype=torch.int64)

# Label background (below the low threshold)
bg_inds = matched_idxs_in_image == self.proposal_matcher.BELOW_LOW_THRESHOLD
labels_in_image[bg_inds] = 0

# Label ignore proposals (between low and high thresholds)
ignore_inds = matched_idxs_in_image == self.proposal_matcher.BETWEEN_THRESHOLDS
labels_in_image[ignore_inds] = -1 # -1 is ignored by sampler

matched_idxs.append(clamped_matched_idxs_in_image)
labels.append(labels_in_image)
return matched_idxs, labels

def subsample(self, labels):
# type: (List[Tensor]) -> List[Tensor]
sampled_pos_inds, sampled_neg_inds = self.fg_bg_sampler(labels)
sampled_inds = []
for img_idx, (pos_inds_img, neg_inds_img) in enumerate(
zip(sampled_pos_inds, sampled_neg_inds)
):
img_sampled_inds = torch.where(pos_inds_img | neg_inds_img)[0]
sampled_inds.append(img_sampled_inds)
return sampled_inds

def add_gt_proposals(self, proposals, gt_boxes):
# type: (List[Tensor], List[Tensor]) -> List[Tensor]
proposals = [
torch.cat((proposal, gt_box))
for proposal, gt_box in zip(proposals, gt_boxes)
]

return proposals

def check_targets(self, targets):
# type: (Optional[List[Dict[str, Tensor]]]) -> None
assert targets is not None
assert all(["boxes" in t for t in targets])
assert all(["labels" in t for t in targets])
if self.has_mask():
assert all(["masks" in t for t in targets])

def select_training_samples(self,
proposals, # type: List[Tensor]
targets # type: Optional[List[Dict[str, Tensor]]]
):
# type: (...) -> Tuple[List[Tensor], List[Tensor], List[Tensor], List[Tensor]]
self.check_targets(targets)
assert targets is not None
dtype = proposals[0].dtype
device = proposals[0].device

gt_boxes = [t["boxes"].to(dtype) for t in targets]
gt_labels = [t["labels"] for t in targets]

# append ground-truth bboxes to propos
proposals = self.add_gt_proposals(proposals, gt_boxes)

# get matching gt indices for each proposal
matched_idxs, labels = self.assign_targets_to_proposals(proposals, gt_boxes, gt_labels)
# sample a fixed proportion of positive-negative proposals
sampled_inds = self.subsample(labels)
matched_gt_boxes = []
num_images = len(proposals)
for img_id in range(num_images):
img_sampled_inds = sampled_inds[img_id]
proposals[img_id] = proposals[img_id][img_sampled_inds]
labels[img_id] = labels[img_id][img_sampled_inds]
matched_idxs[img_id] = matched_idxs[img_id][img_sampled_inds]

gt_boxes_in_image = gt_boxes[img_id]
if gt_boxes_in_image.numel() == 0:
gt_boxes_in_image = torch.zeros((1, 4), dtype=dtype, device=device)
matched_gt_boxes.append(gt_boxes_in_image[matched_idxs[img_id]])

regression_targets = self.box_coder.encode(matched_gt_boxes, proposals)
return proposals, matched_idxs, labels, regression_targets

def postprocess_detections(self,
class_logits, # type: Tensor
box_regression, # type: Tensor
proposals, # type: List[Tensor]
image_shapes # type: List[Tuple[int, int]]
):
# type: (...) -> Tuple[List[Tensor], List[Tensor], List[Tensor]]
device = class_logits.device
num_classes = class_logits.shape[-1]

boxes_per_image = [boxes_in_image.shape[0] for boxes_in_image in proposals]
pred_boxes = self.box_coder.decode(box_regression, proposals)

pred_scores = F.softmax(class_logits, -1)

pred_boxes_list = pred_boxes.split(boxes_per_image, 0)
pred_scores_list = pred_scores.split(boxes_per_image, 0)

all_boxes = []
all_scores = []
all_labels = []
for boxes, scores, image_shape in zip(pred_boxes_list, pred_scores_list, image_shapes):
boxes = box_ops.clip_boxes_to_image(boxes, image_shape)

# create labels for each prediction
labels = torch.arange(num_classes, device=device)
labels = labels.view(1, -1).expand_as(scores)

# remove predictions with the background label
boxes = boxes[:, 1:]
scores = scores[:, 1:]
labels = labels[:, 1:]

# batch everything, by making every class prediction be a separate instance
boxes = boxes.reshape(-1, 4)
scores = scores.reshape(-1)
labels = labels.reshape(-1)

# remove low scoring boxes
inds = torch.where(scores > self.score_thresh)[0]
boxes, scores, labels = boxes[inds], scores[inds], labels[inds]

# remove empty boxes
keep = box_ops.remove_small_boxes(boxes, min_size=1e-2)
boxes, scores, labels = boxes[keep], scores[keep], labels[keep]

# non-maximum suppression, independently done per class
keep = box_ops.batched_nms(boxes, scores, labels, self.nms_thresh)
# keep only topk scoring predictions
keep = keep[:self.detections_per_img]
boxes, scores, labels = boxes[keep], scores[keep], labels[keep]

all_boxes.append(boxes)
all_scores.append(scores)
all_labels.append(labels)

return all_boxes, all_scores, all_labels

def forward(self,
features, # type: Dict[str, Tensor]
proposals, # type: List[Tensor]
image_shapes, # type: List[Tuple[int, int]]
targets=None # type: Optional[List[Dict[str, Tensor]]]
):
# type: (...) -> Tuple[List[Dict[str, Tensor]], Dict[str, Tensor]]
"""
Arguments:
features (List[Tensor])
proposals (List[Tensor[N, 4]])
image_shapes (List[Tuple[H, W]])
targets (List[Dict])
"""
if targets is not None:
for t in targets:
# TODO: https://github.com/pytorch/pytorch/issues/26731
floating_point_types = (torch.float, torch.double, torch.half)
assert t["boxes"].dtype in floating_point_types, 'target boxes must of float type'
assert t["labels"].dtype == torch.int64, 'target labels must of int64 type'
if self.has_keypoint():
assert t["keypoints"].dtype == torch.float32, 'target keypoints must of float type'

if self.training:
proposals, matched_idxs, labels, regression_targets = self.select_training_samples(proposals, targets)
else:
labels = None
regression_targets = None
matched_idxs = None

box_features = self.box_roi_pool(features, proposals, image_shapes)
box_features = self.box_head(box_features)
class_logits, box_regression = self.box_predictor(box_features)

result = torch.jit.annotate(List[Dict[str, torch.Tensor]], [])
losses = {}
if self.training:
assert labels is not None and regression_targets is not None
loss_classifier, loss_box_reg = fastrcnn_loss(
class_logits, box_regression, labels, regression_targets)
losses = {
"loss_classifier": loss_classifier,
"loss_box_reg": loss_box_reg
}
else:
boxes, scores, labels = self.postprocess_detections(class_logits, box_regression, proposals, image_shapes)
num_images = len(boxes)
for i in range(num_images):
result.append(
{
"boxes": boxes[i],
"labels": labels[i],
"scores": scores[i],
}
)

if self.has_mask():
mask_proposals = [p["boxes"] for p in result]
if self.training:
assert matched_idxs is not None
# during training, only focus on positive boxes
num_images = len(proposals)
mask_proposals = []
pos_matched_idxs = []
for img_id in range(num_images):
pos = torch.where(labels[img_id] > 0)[0]
mask_proposals.append(proposals[img_id][pos])
pos_matched_idxs.append(matched_idxs[img_id][pos])
else:
pos_matched_idxs = None

if self.mask_roi_pool is not None:
mask_features = self.mask_roi_pool(features, mask_proposals, image_shapes)
mask_features = self.mask_head(mask_features)
mask_logits = self.mask_predictor(mask_features)
else:
mask_logits = torch.tensor(0)
raise Exception("Expected mask_roi_pool to be not None")

loss_mask = {}
if self.training:
assert targets is not None
assert pos_matched_idxs is not None
assert mask_logits is not None

gt_masks = [t["masks"] for t in targets]
gt_labels = [t["labels"] for t in targets]
rcnn_loss_mask = maskrcnn_loss(
mask_logits, mask_proposals,
gt_masks, gt_labels, pos_matched_idxs)
loss_mask = {
"loss_mask": rcnn_loss_mask
}
else:
labels = [r["labels"] for r in result]
masks_probs = maskrcnn_inference(mask_logits, labels)
for mask_prob, r in zip(masks_probs, result):
r["masks"] = mask_prob

losses.update(loss_mask)

# keep none checks in if conditional so torchscript will conditionally
# compile each branch
if self.keypoint_roi_pool is not None and self.keypoint_head is not None \
and self.keypoint_predictor is not None:
keypoint_proposals = [p["boxes"] for p in result]
if self.training:
# during training, only focus on positive boxes
num_images = len(proposals)
keypoint_proposals = []
pos_matched_idxs = []
assert matched_idxs is not None
for img_id in range(num_images):
pos = torch.where(labels[img_id] > 0)[0]
keypoint_proposals.append(proposals[img_id][pos])
pos_matched_idxs.append(matched_idxs[img_id][pos])
else:
pos_matched_idxs = None

keypoint_features = self.keypoint_roi_pool(features, keypoint_proposals, image_shapes)
keypoint_features = self.keypoint_head(keypoint_features)
keypoint_logits = self.keypoint_predictor(keypoint_features)

loss_keypoint = {}
if self.training:
assert targets is not None
assert pos_matched_idxs is not None

gt_keypoints = [t["keypoints"] for t in targets]
rcnn_loss_keypoint = keypointrcnn_loss(
keypoint_logits, keypoint_proposals,
gt_keypoints, pos_matched_idxs)
loss_keypoint = {
"loss_keypoint": rcnn_loss_keypoint
}
else:
assert keypoint_logits is not None
assert keypoint_proposals is not None

keypoints_probs, kp_scores = keypointrcnn_inference(keypoint_logits, keypoint_proposals)
for keypoint_prob, kps, r in zip(keypoints_probs, kp_scores, result):
r["keypoints"] = keypoint_prob
r["keypoints_scores"] = kps

losses.update(loss_keypoint)

return result, losses

主要看__forward__函数的实现,虽然很长,但是如果只考虑Faster R-CNN需要的部分(不考虑用于Mask R-CNN的图像分割分支),其实可以概括为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def forward(self,
features, # type: Dict[str, Tensor]
proposals, # type: List[Tensor]
image_shapes, # type: List[Tuple[int, int]]
targets=None # type: Optional[List[Dict[str, Tensor]]]
):
# 1. RoI Pool (or RoI Align)
box_features = self.box_roi_pool(features, proposals, image_shapes)
# 2. MLP Head
box_features = self.box_head(box_features)
# 3. Predictor
class_logits, box_regression = self.box_predictor(box_features)

result = torch.jit.annotate(List[Dict[str, torch.Tensor]], [])
losses = {}
if self.training:
assert labels is not None and regression_targets is not None
loss_classifier, loss_box_reg = fastrcnn_loss(
class_logits, box_regression, labels, regression_targets)
losses = {
"loss_classifier": loss_classifier,
"loss_box_reg": loss_box_reg
}
else:
# 4. Postprocess Detections
boxes, scores, labels = self.postprocess_detections(class_logits, box_regression, proposals, image_shapes)
num_images = len(boxes)
for i in range(num_images):
result.append(
{
"boxes": boxes[i],
"labels": labels[i],
"scores": scores[i],
}
)

return result, losses

Faster R-CNN的RoIHeads主要包含几个步骤:

  1. RoI Pool:由box_features = self.box_roi_pool(features, proposals, image_shapes)执行,Faster R-CNN的RoI Pool的具体实现是torchvision.ops.poolers包中的MultiScaleRoIAlign类。因为目标的形状不尽相同,所以涉及到的特征窗口就不尽相同。RoI Pool的目的在于通过把尺寸不定的RoI window划分为固定的网格做池化,来把输入的变长的RoI特征池化为定长的特征输出,方便后续的特征处理。
  2. MLP Head:由box_features = self.box_head(box_features)执行,Faster R-CNN的MLP Head的具体实现是torchvision.models.detection.faster_rcnn中的TwoMLPHead类。MLP Head承接RoI Pool池化出的定长特征向量,并通过MLP做非线性计算,输出最终特征用于后续的任务(分类、回归等)。
  3. Predictor:由class_logits, box_regression = self.box_predictor(box_features)执行,Faster R-CNN的Predictor的具体实现是torchvision.models.detection.faster_rcnn中的FastRCNNPredictor类。上一步MLP操作输出的特征作为最后的特征,交给Predictor去做具体任务的预测,例如:目标分类,bbox位置和尺寸值的回归预测。
  4. Postprocess Detections:由boxes, scores, labels = self.postprocess_detections(class_logits, box_regression, proposals, image_shapes),该函数是RoIHeads类的一个成员函数。
MultiScaleRoIAlign

torchvision采用torchvision.ops.poolers包中的MultiScaleRoIAlign作为Faster R-CNN的RoI Pool的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
class MultiScaleRoIAlign(nn.Module):
"""
Multi-scale RoIAlign pooling, which is useful for detection with or without FPN.

It infers the scale of the pooling via the heuristics present in the FPN paper.

Arguments:
featmap_names (List[str]): the names of the feature maps that will be used
for the pooling.
output_size (List[Tuple[int, int]] or List[int]): output size for the pooled region
sampling_ratio (int): sampling ratio for ROIAlign

Examples::

>>> m = torchvision.ops.MultiScaleRoIAlign(['feat1', 'feat3'], 3, 2)
>>> i = OrderedDict()
>>> i['feat1'] = torch.rand(1, 5, 64, 64)
>>> i['feat2'] = torch.rand(1, 5, 32, 32) # this feature won't be used in the pooling
>>> i['feat3'] = torch.rand(1, 5, 16, 16)
>>> # create some random bounding boxes
>>> boxes = torch.rand(6, 4) * 256; boxes[:, 2:] += boxes[:, :2]
>>> # original image size, before computing the feature maps
>>> image_sizes = [(512, 512)]
>>> output = m(i, [boxes], image_sizes)
>>> print(output.shape)
>>> torch.Size([6, 5, 3, 3])

"""

__annotations__ = {
'scales': Optional[List[float]],
'map_levels': Optional[LevelMapper]
}

def __init__(
self,
featmap_names: List[str],
output_size: Union[int, Tuple[int], List[int]],
sampling_ratio: int,
):
super(MultiScaleRoIAlign, self).__init__()
if isinstance(output_size, int):
output_size = (output_size, output_size)
self.featmap_names = featmap_names
self.sampling_ratio = sampling_ratio
self.output_size = tuple(output_size)
self.scales = None
self.map_levels = None

def convert_to_roi_format(self, boxes: List[Tensor]) -> Tensor:
concat_boxes = torch.cat(boxes, dim=0)
device, dtype = concat_boxes.device, concat_boxes.dtype
ids = torch.cat(
[
torch.full_like(b[:, :1], i, dtype=dtype, layout=torch.strided, device=device)
for i, b in enumerate(boxes)
],
dim=0,
)
rois = torch.cat([ids, concat_boxes], dim=1)
return rois

def infer_scale(self, feature: Tensor, original_size: List[int]) -> float:
# assumption: the scale is of the form 2 ** (-k), with k integer
size = feature.shape[-2:]
possible_scales = torch.jit.annotate(List[float], [])
for s1, s2 in zip(size, original_size):
approx_scale = float(s1) / float(s2)
scale = 2 ** float(torch.tensor(approx_scale).log2().round())
possible_scales.append(scale)
assert possible_scales[0] == possible_scales[1]
return possible_scales[0]

def setup_scales(
self,
features: List[Tensor],
image_shapes: List[Tuple[int, int]],
) -> None:
assert len(image_shapes) != 0
max_x = 0
max_y = 0
for shape in image_shapes:
max_x = max(shape[0], max_x)
max_y = max(shape[1], max_y)
original_input_shape = (max_x, max_y)

scales = [self.infer_scale(feat, original_input_shape) for feat in features]
# get the levels in the feature map by leveraging the fact that the network always
# downsamples by a factor of 2 at each level.
lvl_min = -torch.log2(torch.tensor(scales[0], dtype=torch.float32)).item()
lvl_max = -torch.log2(torch.tensor(scales[-1], dtype=torch.float32)).item()
self.scales = scales
self.map_levels = initLevelMapper(int(lvl_min), int(lvl_max))

def forward(
self,
x: Dict[str, Tensor],
boxes: List[Tensor],
image_shapes: List[Tuple[int, int]],
) -> Tensor:
"""
Arguments:
x (OrderedDict[Tensor]): feature maps for each level. They are assumed to have
all the same number of channels, but they can have different sizes.
boxes (List[Tensor[N, 4]]): boxes to be used to perform the pooling operation, in
(x1, y1, x2, y2) format and in the image reference size, not the feature map
reference.
image_shapes (List[Tuple[height, width]]): the sizes of each image before they
have been fed to a CNN to obtain feature maps. This allows us to infer the
scale factor for each one of the levels to be pooled.
Returns:
result (Tensor)
"""
x_filtered = []
for k, v in x.items():
if k in self.featmap_names:
x_filtered.append(v)
num_levels = len(x_filtered)
rois = self.convert_to_roi_format(boxes)
if self.scales is None:
self.setup_scales(x_filtered, image_shapes)

scales = self.scales
assert scales is not None

if num_levels == 1:
return roi_align(
x_filtered[0], rois,
output_size=self.output_size,
spatial_scale=scales[0],
sampling_ratio=self.sampling_ratio
)

mapper = self.map_levels
assert mapper is not None

levels = mapper(boxes)

num_rois = len(rois)
num_channels = x_filtered[0].shape[1]

dtype, device = x_filtered[0].dtype, x_filtered[0].device
result = torch.zeros(
(num_rois, num_channels,) + self.output_size,
dtype=dtype,
device=device,
)

tracing_results = []
for level, (per_level_feature, scale) in enumerate(zip(x_filtered, scales)):
idx_in_level = torch.where(levels == level)[0]
rois_per_level = rois[idx_in_level]

result_idx_in_level = roi_align(
per_level_feature, rois_per_level,
output_size=self.output_size,
spatial_scale=scale, sampling_ratio=self.sampling_ratio)

if torchvision._is_tracing():
tracing_results.append(result_idx_in_level.to(dtype))
else:
# result and result_idx_in_level's dtypes are based on dtypes of different
# elements in x_filtered. x_filtered contains tensors output by different
# layers. When autocast is active, it may choose different dtypes for
# different layers' outputs. Therefore, we defensively match result's dtype
# before copying elements from result_idx_in_level in the following op.
# We need to cast manually (can't rely on autocast to cast for us) because
# the op acts on result in-place, and autocast only affects out-of-place ops.
result[idx_in_level] = result_idx_in_level.to(result.dtype)

if torchvision._is_tracing():
result = _onnx_merge_levels(levels, tracing_results)

return result

实际上Faster R-CNN论文发表时并没有RoI Align技术,当时仍然沿用的是Fast R-CNN中的RoI Pool。RoI Pool指的是对RoI内的特征做池化,取得一个小的feature map,即,把原来形状不定的$h \times w$($h, w$均为变量)的RoI窗口内的特征池化为统一的$H \times W$($H, W$均为常量)的小feature map。

RoI Align其实是Mask R-CNN论文中提出的概念。RoI Align觉得RoI Pool的处理太粗糙了,存在量化(Quantization)的问题,计算feature map上的窗口坐标的时候就舍入取整了,窗口内划分bins的时候又舍入取整了,这样就很不精确。这样的量化处理,用作分类任务倒还影响不大,但是用作图像分割这种像素级精度的任务时就是个问题了。

RoI Align对RoI Pool的改进及其二次插值的数学计算原理可以仔细阅读这篇文章:

Understanding Region of Interest — (RoI Align and RoI Warp) | by Kemal Erdem (burnpiro) | Towards Data Science

MultiScaleRoIAlign核心的RoI Align操作是通过调用torchvision.ops.roi_align包的roi_align函数实现的,而该函数实际上也只是执行了对底层torch.ops.torchvision.roi_align函数的调用。

在本例中,输入的两张图片经过RPN处理后,各得到1000个boxes,即共2000个boxes。经过RoIPool / RoIAlign处理后,输出为:

  • box_featuresresults):shape[2000, 256, 7, 7]

表示2000个boxes,都被池化为了$C, H, W = 256, 7, 7$的特征。

TwoMLPHead

torchvision采用torchvision.models.detection.faster_rcnn包中的TwoMLPHead作为MLP Head的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class TwoMLPHead(nn.Module):
"""
Standard heads for FPN-based models

Arguments:
in_channels (int): number of input channels
representation_size (int): size of the intermediate representation
"""

def __init__(self, in_channels, representation_size):
super(TwoMLPHead, self).__init__()

self.fc6 = nn.Linear(in_channels, representation_size)
self.fc7 = nn.Linear(representation_size, representation_size)

def forward(self, x):
x = x.flatten(start_dim=1)

x = F.relu(self.fc6(x))
x = F.relu(self.fc7(x))

return x

这部分并不复杂,实际上就是实现了两层的MLP,名为fc6fc7

在本例中,RoIAlign输出的box_features原shape[2000, 256, 7, 7],在TwoMLPHead中:

  1. 首先经过flatten处理,变为shape[2000, 12544];
  2. 进过双层MLP处理后,变为shape[2000, 1024]。

返回的是如上非线性转换后的box_features特征,此时shape[2000, 1024]。

FastRCNNPredictor

torchvision采用torchvision.models.detection.faster_rcnn包中的FastRCNNPredictor作为Predictor的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class FastRCNNPredictor(nn.Module):
"""
Standard classification + bounding box regression layers
for Fast R-CNN.

Arguments:
in_channels (int): number of input channels
num_classes (int): number of output classes (including background)
"""

def __init__(self, in_channels, num_classes):
super(FastRCNNPredictor, self).__init__()
self.cls_score = nn.Linear(in_channels, num_classes)
self.bbox_pred = nn.Linear(in_channels, num_classes * 4)

def forward(self, x):
if x.dim() == 4:
assert list(x.shape[2:]) == [1, 1]
x = x.flatten(start_dim=1)
scores = self.cls_score(x)
bbox_deltas = self.bbox_pred(x)

return scores, bbox_deltas

这部分也并不复杂,实际上就是同论文中描述的一样,通过MLP实现了两个预测分支:

  1. self.cls_score分支预测目标的分类分值scores
  2. self.bbox_pred分支回归目标对应各个分类的目标框回归值。

torchvision的预训练模型中,FastRCNNPredictor的num_classes是91,即能识别含背景在内的91个类。

在本例中,两个分支根据RoIAlign和TwoMLPHead提取出的特征,分别预测输出:

  1. class_logitssocres):shape[2000, 91];
  2. box_regressionbbox_deltas):shape[2000, 364]。

意思是输入的2张图片上共2000个框(1000个/图片),这2000个框都做了分类预测,并且为每个类分别计算了目标框的回归修正值。

postprocess_detections

在模型的Predictor完成预测后,还需要做后续的一些处理,该部分的处理在torchvision.models.detection.roi_heads.RoIHeadspostprocess_detections函数中实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class RoIHeads(torch.nn.Module):

# ...

def postprocess_detections(self,
class_logits, # type: Tensor
box_regression, # type: Tensor
proposals, # type: List[Tensor]
image_shapes # type: List[Tuple[int, int]]
):
# type: (...) -> Tuple[List[Tensor], List[Tensor], List[Tensor]]
device = class_logits.device
num_classes = class_logits.shape[-1]

boxes_per_image = [boxes_in_image.shape[0] for boxes_in_image in proposals]
pred_boxes = self.box_coder.decode(box_regression, proposals)

pred_scores = F.softmax(class_logits, -1)

pred_boxes_list = pred_boxes.split(boxes_per_image, 0)
pred_scores_list = pred_scores.split(boxes_per_image, 0)

all_boxes = []
all_scores = []
all_labels = []
for boxes, scores, image_shape in zip(pred_boxes_list, pred_scores_list, image_shapes):
boxes = box_ops.clip_boxes_to_image(boxes, image_shape)

# create labels for each prediction
labels = torch.arange(num_classes, device=device)
labels = labels.view(1, -1).expand_as(scores)

# remove predictions with the background label
boxes = boxes[:, 1:]
scores = scores[:, 1:]
labels = labels[:, 1:]

# batch everything, by making every class prediction be a separate instance
boxes = boxes.reshape(-1, 4)
scores = scores.reshape(-1)
labels = labels.reshape(-1)

# remove low scoring boxes
inds = torch.where(scores > self.score_thresh)[0]
boxes, scores, labels = boxes[inds], scores[inds], labels[inds]

# remove empty boxes
keep = box_ops.remove_small_boxes(boxes, min_size=1e-2)
boxes, scores, labels = boxes[keep], scores[keep], labels[keep]

# non-maximum suppression, independently done per class
keep = box_ops.batched_nms(boxes, scores, labels, self.nms_thresh)
# keep only topk scoring predictions
keep = keep[:self.detections_per_img]
boxes, scores, labels = boxes[keep], scores[keep], labels[keep]

all_boxes.append(boxes)
all_scores.append(scores)
all_labels.append(labels)

return all_boxes, all_scores, all_labels

后处理在for循环中,遍历每一张图片:

  1. 去除背景类的框;
  2. 去除低分框;
  3. 去除空框(尺寸极小的无意义小框);
  4. 对它的boxes和scores做(类内)NMS;
  5. 保留self.detections_per_img个的top-k个目标。

因为本例输入的是随机值填充的模拟图片,所以在去除低分框的环节,2000个候选框就因为没有实际的目标而被全部滤除了。

GeneralizedRCNNTransform.postprocess

Faster R-CNN模型的后期处理由torchvision.models.detection.transform包的GeneralizedRCNNTransform类实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class GeneralizedRCNNTransform(nn.Module):

# ...

def postprocess(self,
result, # type: List[Dict[str, Tensor]]
image_shapes, # type: List[Tuple[int, int]]
original_image_sizes # type: List[Tuple[int, int]]
):
# type: (...) -> List[Dict[str, Tensor]]
if self.training:
return result
for i, (pred, im_s, o_im_s) in enumerate(zip(result, image_shapes, original_image_sizes)):
boxes = pred["boxes"]
boxes = resize_boxes(boxes, im_s, o_im_s)
result[i]["boxes"] = boxes
if "masks" in pred:
masks = pred["masks"]
masks = paste_masks_in_image(masks, boxes, o_im_s)
result[i]["masks"] = masks
if "keypoints" in pred:
keypoints = pred["keypoints"]
keypoints = resize_keypoints(keypoints, im_s, o_im_s)
result[i]["keypoints"] = keypoints
return result

其实对于目标检测而言,实际上只对boxes的坐标做了resize的操作。因为GeneralizedRCNNTransform在对输入图像做预处理的时候,有进行尺寸转换,而且转tensor的时候又增加了padding是同一batch的图像张量能够保持尺寸一致。所以输出结果的时候,还是要把在tensor上的坐标转换为原始图像尺度上的坐标。

3 总结

最后总览一下整个模型的实现结构,只需通过简单的print

1
print(model)

,即可输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
FasterRCNN(
(transform): GeneralizedRCNNTransform(
Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
Resize(min_size=(800,), max_size=1333, mode='bilinear')
)
(backbone): BackboneWithFPN(
(body): IntermediateLayerGetter(
(conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
(bn1): FrozenBatchNorm2d(64)
(relu): ReLU(inplace=True)
(maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
(layer1): Sequential(
(0): Bottleneck(
(conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): FrozenBatchNorm2d(64)
(conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(bn2): FrozenBatchNorm2d(64)
(conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): FrozenBatchNorm2d(256)
(relu): ReLU(inplace=True)
(downsample): Sequential(
(0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
(1): FrozenBatchNorm2d(256)
)
)
(1): Bottleneck(
(conv1): Conv2d(256, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): FrozenBatchNorm2d(64)
(conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(bn2): FrozenBatchNorm2d(64)
(conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): FrozenBatchNorm2d(256)
(relu): ReLU(inplace=True)
)
(2): Bottleneck(
(conv1): Conv2d(256, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): FrozenBatchNorm2d(64)
(conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(bn2): FrozenBatchNorm2d(64)
(conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): FrozenBatchNorm2d(256)
(relu): ReLU(inplace=True)
)
)
(layer2): Sequential(
(0): Bottleneck(
(conv1): Conv2d(256, 128, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): FrozenBatchNorm2d(128)
(conv2): Conv2d(128, 128, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
(bn2): FrozenBatchNorm2d(128)
(conv3): Conv2d(128, 512, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): FrozenBatchNorm2d(512)
(relu): ReLU(inplace=True)
(downsample): Sequential(
(0): Conv2d(256, 512, kernel_size=(1, 1), stride=(2, 2), bias=False)
(1): FrozenBatchNorm2d(512)
)
)
(1): Bottleneck(
(conv1): Conv2d(512, 128, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): FrozenBatchNorm2d(128)
(conv2): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(bn2): FrozenBatchNorm2d(128)
(conv3): Conv2d(128, 512, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): FrozenBatchNorm2d(512)
(relu): ReLU(inplace=True)
)
(2): Bottleneck(
(conv1): Conv2d(512, 128, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): FrozenBatchNorm2d(128)
(conv2): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(bn2): FrozenBatchNorm2d(128)
(conv3): Conv2d(128, 512, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): FrozenBatchNorm2d(512)
(relu): ReLU(inplace=True)
)
(3): Bottleneck(
(conv1): Conv2d(512, 128, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): FrozenBatchNorm2d(128)
(conv2): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(bn2): FrozenBatchNorm2d(128)
(conv3): Conv2d(128, 512, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): FrozenBatchNorm2d(512)
(relu): ReLU(inplace=True)
)
)
(layer3): Sequential(
(0): Bottleneck(
(conv1): Conv2d(512, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): FrozenBatchNorm2d(256)
(conv2): Conv2d(256, 256, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
(bn2): FrozenBatchNorm2d(256)
(conv3): Conv2d(256, 1024, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): FrozenBatchNorm2d(1024)
(relu): ReLU(inplace=True)
(downsample): Sequential(
(0): Conv2d(512, 1024, kernel_size=(1, 1), stride=(2, 2), bias=False)
(1): FrozenBatchNorm2d(1024)
)
)
(1): Bottleneck(
(conv1): Conv2d(1024, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): FrozenBatchNorm2d(256)
(conv2): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(bn2): FrozenBatchNorm2d(256)
(conv3): Conv2d(256, 1024, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): FrozenBatchNorm2d(1024)
(relu): ReLU(inplace=True)
)
(2): Bottleneck(
(conv1): Conv2d(1024, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): FrozenBatchNorm2d(256)
(conv2): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(bn2): FrozenBatchNorm2d(256)
(conv3): Conv2d(256, 1024, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): FrozenBatchNorm2d(1024)
(relu): ReLU(inplace=True)
)
(3): Bottleneck(
(conv1): Conv2d(1024, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): FrozenBatchNorm2d(256)
(conv2): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(bn2): FrozenBatchNorm2d(256)
(conv3): Conv2d(256, 1024, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): FrozenBatchNorm2d(1024)
(relu): ReLU(inplace=True)
)
(4): Bottleneck(
(conv1): Conv2d(1024, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): FrozenBatchNorm2d(256)
(conv2): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(bn2): FrozenBatchNorm2d(256)
(conv3): Conv2d(256, 1024, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): FrozenBatchNorm2d(1024)
(relu): ReLU(inplace=True)
)
(5): Bottleneck(
(conv1): Conv2d(1024, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): FrozenBatchNorm2d(256)
(conv2): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(bn2): FrozenBatchNorm2d(256)
(conv3): Conv2d(256, 1024, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): FrozenBatchNorm2d(1024)
(relu): ReLU(inplace=True)
)
)
(layer4): Sequential(
(0): Bottleneck(
(conv1): Conv2d(1024, 512, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): FrozenBatchNorm2d(512)
(conv2): Conv2d(512, 512, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
(bn2): FrozenBatchNorm2d(512)
(conv3): Conv2d(512, 2048, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): FrozenBatchNorm2d(2048)
(relu): ReLU(inplace=True)
(downsample): Sequential(
(0): Conv2d(1024, 2048, kernel_size=(1, 1), stride=(2, 2), bias=False)
(1): FrozenBatchNorm2d(2048)
)
)
(1): Bottleneck(
(conv1): Conv2d(2048, 512, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): FrozenBatchNorm2d(512)
(conv2): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(bn2): FrozenBatchNorm2d(512)
(conv3): Conv2d(512, 2048, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): FrozenBatchNorm2d(2048)
(relu): ReLU(inplace=True)
)
(2): Bottleneck(
(conv1): Conv2d(2048, 512, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): FrozenBatchNorm2d(512)
(conv2): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(bn2): FrozenBatchNorm2d(512)
(conv3): Conv2d(512, 2048, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): FrozenBatchNorm2d(2048)
(relu): ReLU(inplace=True)
)
)
)
(fpn): FeaturePyramidNetwork(
(inner_blocks): ModuleList(
(0): Conv2d(256, 256, kernel_size=(1, 1), stride=(1, 1))
(1): Conv2d(512, 256, kernel_size=(1, 1), stride=(1, 1))
(2): Conv2d(1024, 256, kernel_size=(1, 1), stride=(1, 1))
(3): Conv2d(2048, 256, kernel_size=(1, 1), stride=(1, 1))
)
(layer_blocks): ModuleList(
(0): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
(1): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
(2): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
(3): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
)
(extra_blocks): LastLevelMaxPool()
)
)
(rpn): RegionProposalNetwork(
(anchor_generator): AnchorGenerator()
(head): RPNHead(
(conv): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
(cls_logits): Conv2d(256, 3, kernel_size=(1, 1), stride=(1, 1))
(bbox_pred): Conv2d(256, 12, kernel_size=(1, 1), stride=(1, 1))
)
)
(roi_heads): RoIHeads(
(box_roi_pool): MultiScaleRoIAlign()
(box_head): TwoMLPHead(
(fc6): Linear(in_features=12544, out_features=1024, bias=True)
(fc7): Linear(in_features=1024, out_features=1024, bias=True)
)
(box_predictor): FastRCNNPredictor(
(cls_score): Linear(in_features=1024, out_features=91, bias=True)
(bbox_pred): Linear(in_features=1024, out_features=364, bias=True)
)
)
)

模型结构可以总结为层次结构:

  • Faster R-CNN
    • (transform): GeneralizedRCNNTransform
    • (backbone): BackboneWithFPN
      • (body): IntermediateLayerGetter
      • (fpn): FeaturePyramidNetwork
    • (rpn): RegionProposalNetwork
      • (anchor_generator): AnchorGenerator
      • (head): RPNHead
    • (roi_heads): RoIHeads
      • (box_roi_pool): MultiScaleRoIAlign
      • (box_head): TwoMLPHead
      • (box_predictor): FastRCNNPredictor