mmdetection - 基于PyTorch的开源目标检测系统

mmdetection是一款优秀的基于PyTorch的开源目标检测系统,由香港中文大学多媒体实验室开发,遵循Apache-2.0开源协议。由于该框架只有README文件说明,而没有文档,源代码注释也寥寥,因此为了理解该框架,我读了几天源代码,以下做一点整理记录。

mmdetection - 基于PyTorch的开源目标检测系统

0 开始

open-mmlab/mmdetection

  • mmdetection的GitHub主页

mmdetection/INSTALL.md

  • mmdetection的安装说明
  • mmdetection只支持Linux环境。经测试,在Windows10系统下即使解决了sh脚本执行问题,也会在编译环节遭遇错误。

open-mmlab/mmcv

  • mmcv是mmdetection依赖的重要计算机视觉库

1 tools/train.py训练解析

1.1 使用说明

1.1.1 tools/train.py训练

在GPU 3上训练一个backbone为ResNet-50的Cascade R-CNN模型:

1
CUDA_VISIBLE_DEVICES=3 python tools/train.py configs/xray_cascade_rcnn_r50_fpn_1x.py

其它常用选项:

  • --work_dir是模型checkpoint文件的输出目录,可以在configs/*.py中配置;
  • --resume_from是指定在某个checkpoint的基础上继续训练,可以在configs/*.py中配置;
  • --validate是指是否在训练中建立checkpoint的时候对该checkpoint进行评估(evaluate);
  • --gpus是指使用的GPU数量,默认值为1颗;
  • --launcher是指分布式训练的任务启动器(job launcher),默认值为none表示不进行分布式训练;

1.1.2 tools/dist_train.sh分布式训练

也可以使用分布式训练,在单台或多台机器上进行分布式训练:

1
./tools/dist_train.sh <CONFIG_FILE> <GPU_NUM> [optional arguments]

实质上/tools/dist_train.sh很简单,就是调用tools/train.py并配置--launcher选项为pytorch以便启动分布式训练。/tools/dist_train.sh脚本实现如下:

1
2
3
4
5
#!/usr/bin/env bash

PYTHON=${PYTHON:-"python"}

$PYTHON -m torch.distributed.launch --nproc_per_node=$2 $(dirname "$0")/train.py $1 --launcher pytorch ${@:3}

1.2 源代码参考

tools/train.py负责训练指定模型。源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
from __future__ import division

import argparse
from mmcv import Config

from mmdet import __version__
from mmdet.datasets import get_dataset
from mmdet.apis import (train_detector, init_dist, get_root_logger,
set_random_seed)
from mmdet.models import build_detector
import torch


def parse_args():
parser = argparse.ArgumentParser(description='Train a detector')
parser.add_argument('config', help='train config file path')
parser.add_argument('--work_dir', help='the dir to save logs and models')
parser.add_argument(
'--resume_from', help='the checkpoint file to resume from')
parser.add_argument(
'--validate',
action='store_true',
help='whether to evaluate the checkpoint during training')
parser.add_argument(
'--gpus',
type=int,
default=1,
help='number of gpus to use '
'(only applicable to non-distributed training)')
parser.add_argument('--seed', type=int, default=None, help='random seed')
parser.add_argument(
'--launcher',
choices=['none', 'pytorch', 'slurm', 'mpi'],
default='none',
help='job launcher')
parser.add_argument('--local_rank', type=int, default=0)
args = parser.parse_args()

return args


def main():
args = parse_args()

cfg = Config.fromfile(args.config)
# set cudnn_benchmark
if cfg.get('cudnn_benchmark', False):
torch.backends.cudnn.benchmark = True
# update configs according to CLI args
if args.work_dir is not None:
cfg.work_dir = args.work_dir
if args.resume_from is not None:
cfg.resume_from = args.resume_from
cfg.gpus = args.gpus
if cfg.checkpoint_config is not None:
# save mmdet version in checkpoints as meta data
cfg.checkpoint_config.meta = dict(
mmdet_version=__version__, config=cfg.text)

# init distributed env first, since logger depends on the dist info.
if args.launcher == 'none':
distributed = False
else:
distributed = True
init_dist(args.launcher, **cfg.dist_params)

# init logger before other steps
logger = get_root_logger(cfg.log_level)
logger.info('Distributed training: {}'.format(distributed))

# set random seeds
if args.seed is not None:
logger.info('Set random seed to {}'.format(args.seed))
set_random_seed(args.seed)

model = build_detector(
cfg.model, train_cfg=cfg.train_cfg, test_cfg=cfg.test_cfg)

train_dataset = get_dataset(cfg.data.train)
train_detector(
model,
train_dataset,
cfg,
distributed=distributed,
validate=args.validate,
logger=logger)


if __name__ == '__main__':
main()

1.3 原理解析

1.3.1 主要原理

本节对tools/train.py源代码执行的主要过程进行解释。对配置文件配置项和命令行参数的判断和设置略去。

首先,读取config文件(configs\*.py)并建立mmcv.Config对象以便后续解析:

1
cfg = Config.fromfile(args.config)

随后,调用mmdet.models.build_detector方法,输入配置信息,以建立模型:

1
model = build_detector(cfg.model, train_cfg=cfg.train_cfg, test_cfg=cfg.test_cfg)

随后,调用mmdet.datasets.get_dataset方法,根据配置文件中写的训练集信息,建立训练集的数据集对象:

1
train_dataset = get_dataset(cfg.data.train)

最后,调用mmdet.apis.train_detector方法,输入已经构建好的模型model、训练集train_dataset、配置config对象以及其它参数,在训练集上训练该模型:

1
2
3
4
5
6
7
train_detector(
model,
train_dataset,
cfg,
distributed=distributed,
validate=args.validate,
logger=logger)

如果使用是分布式训练,且设置了--validate,会在训练中建立checkpoint的时候对该checkpoint进行评估。(未采用分布式训练时,--validate无效,因为train_detector中调用的mmdet.apis._non_dist_train函数未对validate参数做任何处理)。详情见以下1.3.2节。

1.3.2 mmdet.api.train解析

mmdet.apis.train.py源代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
from __future__ import division

from collections import OrderedDict

import torch
from mmcv.runner import Runner, DistSamplerSeedHook
from mmcv.parallel import MMDataParallel, MMDistributedDataParallel

from mmdet.core import (DistOptimizerHook, DistEvalmAPHook,
CocoDistEvalRecallHook, CocoDistEvalmAPHook)
from mmdet.datasets import build_dataloader
from mmdet.models import RPN
from .env import get_root_logger


def parse_losses(losses):
log_vars = OrderedDict()
for loss_name, loss_value in losses.items():
if isinstance(loss_value, torch.Tensor):
log_vars[loss_name] = loss_value.mean()
elif isinstance(loss_value, list):
log_vars[loss_name] = sum(_loss.mean() for _loss in loss_value)
else:
raise TypeError(
'{} is not a tensor or list of tensors'.format(loss_name))

loss = sum(_value for _key, _value in log_vars.items() if 'loss' in _key)

log_vars['loss'] = loss
for name in log_vars:
log_vars[name] = log_vars[name].item()

return loss, log_vars


def batch_processor(model, data, train_mode):
losses = model(**data)
loss, log_vars = parse_losses(losses)

outputs = dict(
loss=loss, log_vars=log_vars, num_samples=len(data['img'].data))

return outputs


def train_detector(model,
dataset,
cfg,
distributed=False,
validate=False,
logger=None):
if logger is None:
logger = get_root_logger(cfg.log_level)

# start training
if distributed:
_dist_train(model, dataset, cfg, validate=validate)
else:
_non_dist_train(model, dataset, cfg, validate=validate)


def _dist_train(model, dataset, cfg, validate=False):
# prepare data loaders
data_loaders = [
build_dataloader(
dataset,
cfg.data.imgs_per_gpu,
cfg.data.workers_per_gpu,
dist=True)
]
# put model on gpus
model = MMDistributedDataParallel(model.cuda())
# build runner
runner = Runner(model, batch_processor, cfg.optimizer, cfg.work_dir,
cfg.log_level)
# register hooks
optimizer_config = DistOptimizerHook(**cfg.optimizer_config)
runner.register_training_hooks(cfg.lr_config, optimizer_config,
cfg.checkpoint_config, cfg.log_config)
runner.register_hook(DistSamplerSeedHook())
# register eval hooks
if validate:
if isinstance(model.module, RPN):
# TODO: implement recall hooks for other datasets
runner.register_hook(CocoDistEvalRecallHook(cfg.data.val))
else:
if cfg.data.val.type == 'CocoDataset':
runner.register_hook(CocoDistEvalmAPHook(cfg.data.val))
else:
runner.register_hook(DistEvalmAPHook(cfg.data.val))

if cfg.resume_from:
runner.resume(cfg.resume_from)
elif cfg.load_from:
runner.load_checkpoint(cfg.load_from)
runner.run(data_loaders, cfg.workflow, cfg.total_epochs)


def _non_dist_train(model, dataset, cfg, validate=False):
# prepare data loaders
data_loaders = [
build_dataloader(
dataset,
cfg.data.imgs_per_gpu,
cfg.data.workers_per_gpu,
cfg.gpus,
dist=False)
]
# put model on gpus
model = MMDataParallel(model, device_ids=range(cfg.gpus)).cuda()
# build runner
runner = Runner(model, batch_processor, cfg.optimizer, cfg.work_dir,
cfg.log_level)
runner.register_training_hooks(cfg.lr_config, cfg.optimizer_config,
cfg.checkpoint_config, cfg.log_config)

if cfg.resume_from:
runner.resume(cfg.resume_from)
elif cfg.load_from:
runner.load_checkpoint(cfg.load_from)
runner.run(data_loaders, cfg.workflow, cfg.total_epochs)

train_detector方法非常简短,通过是否分布式训练作为分支判断,分别调用:

  • _dist_train方法
  • _non_dist_train方法

均在同文件中实现。

2 tools/test.py测试解析

tools/test.py负责对训练好的模型进行测试评估。

2.1 使用说明

2.1.1 输出到文件

在数据集上对训练好的模型进行测试,把模型的输出保存到文件:

1
python tools/test.py <CONFIG_FILE> <CHECKPOINT_FILE> --gpus <GPU_NUM> --out <OUT_FILE>

2.1.2 评估bbox等预测指标

使用8颗GPU对训练好的Mask R-CNN模型进行测试,把模型输出保存到results.pkl并评估bbox和segm的测试结果:

1
python tools/test.py configs/mask_rcnn_r50_fpn_1x.py <CHECKPOINT_FILE> --gpus 8 --out results.pkl --eval bbox segm

我训练了一个简单的backbone为ResNet-50的Cascade R-CNN模型,并使用以下命令进行测试评价:

1
CUDA_VISIBLE_DEVICES=0 python tools/test.py configs/xray_cascade_rcnn_r50_fpn_1x.py work_dirs/xray/cascade_rcnn_r50_fpn_1x/latest.pth --out xray.pkl --eval bbox

显示结果为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
(mmdetection) sjy@cjx:~/mmdetection$ CUDA_VISIBLE_DEVICES=0 python tools/test.py configs/xray_cascade_rcnn_r50_fpn_1x.py work_dirs/xray/cascade_rcnn_r50_fpn_1x/latest.pth --out xray.pkl --eval bbox
loading annotations into memory...
Done (t=0.02s)
creating index...
index created!
{1: 1, 2: 2, 3: 3, 4: 4, 5: 5}
[>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>] 178/178, 6.9 task/s, elapsed: 26s, ETA: 0swriting results to xray.pkl
Starting evaluate bbox
Loading and preparing results...
DONE (t=0.06s)
creating index...
index created!
Running per image evaluation...
Evaluate annotation type *bbox*
DONE (t=0.53s).
Accumulating evaluation results...
DONE (t=0.08s).
Average Precision (AP) @[ IoU=0.50:0.95 | area= all | maxDets=100 ] = 0.546
Average Precision (AP) @[ IoU=0.50 | area= all | maxDets=100 ] = 0.843
Average Precision (AP) @[ IoU=0.75 | area= all | maxDets=100 ] = 0.589
Average Precision (AP) @[ IoU=0.50:0.95 | area= small | maxDets=100 ] = 0.605
Average Precision (AP) @[ IoU=0.50:0.95 | area=medium | maxDets=100 ] = -1.000
Average Precision (AP) @[ IoU=0.50:0.95 | area= large | maxDets=100 ] = -1.000
Average Recall (AR) @[ IoU=0.50:0.95 | area= all | maxDets= 1 ] = 0.350
Average Recall (AR) @[ IoU=0.50:0.95 | area= all | maxDets= 10 ] = 0.607
Average Recall (AR) @[ IoU=0.50:0.95 | area= all | maxDets=100 ] = 0.614
Average Recall (AR) @[ IoU=0.50:0.95 | area= small | maxDets=100 ] = 0.614
Average Recall (AR) @[ IoU=0.50:0.95 | area=medium | maxDets=100 ] = -1.000
Average Recall (AR) @[ IoU=0.50:0.95 | area= large | maxDets=100 ] = -1.000

此处的格式化输出称为检测评价矩阵(detection evaluation metrics)。

在底层实现上是在mmdet.core.evaluation.coco_utils.py中,coco_eval方法通过调用微软的COCO API中的pycocotools包实现的。

以下节选mmdet.core.evaluation.coco_utils.pycoco_eval方法的实现代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import mmcv
import numpy as np
from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval

from .recall import eval_recalls


def coco_eval(result_file, result_types, coco, max_dets=(100, 300, 1000)):
for res_type in result_types:
assert res_type in [
'proposal', 'proposal_fast', 'bbox', 'segm', 'keypoints'
]

if mmcv.is_str(coco):
coco = COCO(coco)
assert isinstance(coco, COCO)

if result_types == ['proposal_fast']:
ar = fast_eval_recall(result_file, coco, np.array(max_dets))
for i, num in enumerate(max_dets):
print('AR@{}\t= {:.4f}'.format(num, ar[i]))
return

assert result_file.endswith('.json')
coco_dets = coco.loadRes(result_file)

img_ids = coco.getImgIds()
for res_type in result_types:
iou_type = 'bbox' if res_type == 'proposal' else res_type
cocoEval = COCOeval(coco, coco_dets, iou_type)
cocoEval.params.imgIds = img_ids
if res_type == 'proposal':
cocoEval.params.useCats = 0
cocoEval.params.maxDets = list(max_dets)
cocoEval.evaluate()
cocoEval.accumulate()
cocoEval.summarize()
  • 通过构造COCOeval对象,配置参数,并依次调用evaluateaccumulatesummarize方法实现对数据集的测试评价。

此处摘录COCO数据集文档中对该评价矩阵的简要说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Average Precision (AP):
AP % AP at IoU=.50:.05:.95 (primary challenge metric)
APIoU=.50 % AP at IoU=.50 (PASCAL VOC metric)
APIoU=.75 % AP at IoU=.75 (strict metric)
AP Across Scales:
APsmall % AP for small objects: area < 322
APmedium % AP for medium objects: 322 < area < 962
APlarge % AP for large objects: area > 962
Average Recall (AR):
ARmax=1 % AR given 1 detection per image
ARmax=10 % AR given 10 detections per image
ARmax=100 % AR given 100 detections per image
AR Across Scales:
ARsmall % AR for small objects: area < 322
ARmedium % AR for medium objects: 322 < area < 962
ARlarge % AR for large objects: area > 962

对该矩阵的详细说明可参阅官方文档:

COCO Detection Evaluation

2.1.3 可视化预测结果

如果支持X Server,可以显示图形界面,则可以通过--show选项对测试图片进行显示输出:

1
python tools/test.py <CONFIG_FILE> <CHECKPOINT_FILE> --show

可以通过安装MobaXterm在Windows上SSH连接Linux服务器并实现X Server的远程图形显示功能。

MobaXterm

Enhanced terminal for Windows with X11 server, tabbed SSH client, network tools and much more

2.2 源代码参考

tools/test.py源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import argparse

import torch
import mmcv
from mmcv.runner import load_checkpoint, parallel_test, obj_from_dict
from mmcv.parallel import scatter, collate, MMDataParallel

from mmdet import datasets
from mmdet.core import results2json, coco_eval
from mmdet.datasets import build_dataloader
from mmdet.models import build_detector, detectors


def single_test(model, data_loader, show=False):
model.eval()
results = []
dataset = data_loader.dataset
prog_bar = mmcv.ProgressBar(len(dataset))
for i, data in enumerate(data_loader):
with torch.no_grad():
result = model(return_loss=False, rescale=not show, **data)
results.append(result)

if show:
model.module.show_result(data, result, dataset.img_norm_cfg,
dataset=dataset.CLASSES)

batch_size = data['img'][0].size(0)
for _ in range(batch_size):
prog_bar.update()
return results


def _data_func(data, device_id):
data = scatter(collate([data], samples_per_gpu=1), [device_id])[0]
return dict(return_loss=False, rescale=True, **data)


def parse_args():
parser = argparse.ArgumentParser(description='MMDet test detector')
parser.add_argument('config', help='test config file path')
parser.add_argument('checkpoint', help='checkpoint file')
parser.add_argument(
'--gpus', default=1, type=int, help='GPU number used for testing')
parser.add_argument(
'--proc_per_gpu',
default=1,
type=int,
help='Number of processes per GPU')
parser.add_argument('--out', help='output result file')
parser.add_argument(
'--eval',
type=str,
nargs='+',
choices=['proposal', 'proposal_fast', 'bbox', 'segm', 'keypoints'],
help='eval types')
parser.add_argument('--show', action='store_true', help='show results')
args = parser.parse_args()
return args


def main():
args = parse_args()

if args.out is not None and not args.out.endswith(('.pkl', '.pickle')):
raise ValueError('The output file must be a pkl file.')

cfg = mmcv.Config.fromfile(args.config)
# set cudnn_benchmark
if cfg.get('cudnn_benchmark', False):
torch.backends.cudnn.benchmark = True
cfg.model.pretrained = None
cfg.data.test.test_mode = True

dataset = obj_from_dict(cfg.data.test, datasets, dict(test_mode=True))
if args.gpus == 1:
model = build_detector(
cfg.model, train_cfg=None, test_cfg=cfg.test_cfg)
load_checkpoint(model, args.checkpoint)
model = MMDataParallel(model, device_ids=[0])

data_loader = build_dataloader(
dataset,
imgs_per_gpu=1,
workers_per_gpu=cfg.data.workers_per_gpu,
num_gpus=1,
dist=False,
shuffle=False)
outputs = single_test(model, data_loader, args.show)
else:
model_args = cfg.model.copy()
model_args.update(train_cfg=None, test_cfg=cfg.test_cfg)
model_type = getattr(detectors, model_args.pop('type'))
outputs = parallel_test(
model_type,
model_args,
args.checkpoint,
dataset,
_data_func,
range(args.gpus),
workers_per_gpu=args.proc_per_gpu)

if args.out:
print('writing results to {}'.format(args.out))
mmcv.dump(outputs, args.out)
eval_types = args.eval
if eval_types:
print('Starting evaluate {}'.format(' and '.join(eval_types)))
if eval_types == ['proposal_fast']:
result_file = args.out
coco_eval(result_file, eval_types, dataset.coco)
else:
if not isinstance(outputs[0], dict):
result_file = args.out + '.json'
results2json(dataset, outputs, result_file)
coco_eval(result_file, eval_types, dataset.coco)
else:
for name in outputs[0]:
print('\nEvaluating {}'.format(name))
outputs_ = [out[name] for out in outputs]
result_file = args.out + '.{}.json'.format(name)
results2json(dataset, outputs_, result_file)
coco_eval(result_file, eval_types, dataset.coco)


if __name__ == '__main__':
main()

2.3 原理解析

2.3.1 主要原理

首先,读取配置文件,构造Config类对象cfg

1
cfg = mmcv.Config.fromfile(args.config)

随后,通过mmcv.runner.obj_from_dict方法读取配置文件中关于测试集的配置,构建数据集对象dataset

1
dataset = obj_from_dict(cfg.data.test, datasets, dict(test_mode=True))

随后,以单颗GPU的情况(args.gpus == 1)为例,将通过mmdet.models.build_detector方法,读取模型配置,设置训练配置为None,并传入测试配置,构造待测试模型model

1
model = build_detector(cfg.model, train_cfg=None, test_cfg=cfg.test_cfg)

随后,通过mmcv.runner.load_checkpoint方法,读取模型训练取得的checkpoint以配置模型中的参数:

1
load_checkpoint(model, args.checkpoint)

随后,通过mmcv.parallel.MMDataParallel方法,在设备上构建非分布式计算的模型:

1
model = MMDataParallel(model, device_ids=[0])

mmdetection implements distributed training and non-distributed training, which uses MMDistributedDataParallel and MMDataParallel respectively.

(README.md)

随后,通过mmdet.datasets.build_dataloader方法,根据数据集对象dataset构造数据加载器对象data_loader

1
2
3
4
5
6
7
data_loader = build_dataloader(
dataset,
imgs_per_gpu=1,
workers_per_gpu=cfg.data.workers_per_gpu,
num_gpus=1,
dist=False,
shuffle=False)

最后,通过调用同文件中定义的single_test方法,输入模型对象model、数据加载器data_loader等配置实现单GPU设备上对模型的测试输出:

1
outputs = single_test(model, data_loader, args.show)

2.3.2 single_test单设备测试

在该single_test方法中,实际通过以下的几个主要步骤对模型进行测试输出。

首先,通过torch.nn.Module.eval方法,将该模型设置进入评价模式(evaluation mode):

1
model.eval()

随后,通过遍历数据加载器data_loader读取数据,按照PyTorch的标准流程,取消梯度计算,输入数据运行模型,并取得模型输出(同时处理好X Server中图片目标检测结果可视化和Shell中进度条刷新事宜):

1
2
3
4
5
6
7
8
9
10
11
12
for i, data in enumerate(data_loader):
with torch.no_grad():
result = model(return_loss=False, rescale=not show, **data)
results.append(result)

if show:
model.module.show_result(data, result, dataset.img_norm_cfg,
dataset=dataset.CLASSES)

batch_size = data['img'][0].size(0)
for _ in range(batch_size):
prog_bar.update()

3 models模型实现解析

以下内容摘选自:mmdetection / TECHNICAL_DETAILS.md

Model

In mmdetection, model components are basically categorized as 4 types.

  • backbone: usually a FCN network to extract feature maps, e.g., ResNet.
  • neck: the part between backbones and heads, e.g., FPN, ASPP.
  • head: the part for specific tasks, e.g., bbox prediction and mask prediction.
  • roi extractor: the part for extracting features from feature maps, e.g., RoI Align.

We also write implement some general detection pipelines with the above components, such as SingleStageDetector and TwoStageDetector.

在mmdetection中,模型基本上分为四个部分,形象地称为:

  • 骨干(backbone):通常通过全连接网络来提取特征映射图,例如:ResNet。
  • 脖颈(neck):连接骨干和头的部分,例如:FPN、ASPP。
  • 头(head):用于特定任务,例如:候选框的预测、掩膜的预测。
  • 兴趣区域提取器(RoI extractor):该部分组件用于在特征映射图上提取特征,例如:RoI Align。

在官方的技术细节说明中,我们可以从SingleStageDetectorTwoStageDetector这两个类的实现中来阅读代码理解mmdetection框架中,基本目标检测模型的实现原理。

SingleStageDetectorTwoStageDetector均位于mmdet.models.detectors中,分别在single_stage.pytwo_stage.py中实现。

3.1 single_stage.py解析

3.1.1 源代码参考

mmdet/models/detectors/single_stage.py实现了一个通用的基础单Stage目标检测模型,源代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import torch.nn as nn

from .base import BaseDetector
from .. import builder
from ..registry import DETECTORS
from mmdet.core import bbox2result


@DETECTORS.register_module
class SingleStageDetector(BaseDetector):

def __init__(self,
backbone,
neck=None,
bbox_head=None,
train_cfg=None,
test_cfg=None,
pretrained=None):
super(SingleStageDetector, self).__init__()
self.backbone = builder.build_backbone(backbone)
if neck is not None:
self.neck = builder.build_neck(neck)
self.bbox_head = builder.build_head(bbox_head)
self.train_cfg = train_cfg
self.test_cfg = test_cfg
self.init_weights(pretrained=pretrained)

def init_weights(self, pretrained=None):
super(SingleStageDetector, self).init_weights(pretrained)
self.backbone.init_weights(pretrained=pretrained)
if self.with_neck:
if isinstance(self.neck, nn.Sequential):
for m in self.neck:
m.init_weights()
else:
self.neck.init_weights()
self.bbox_head.init_weights()

def extract_feat(self, img):
x = self.backbone(img)
if self.with_neck:
x = self.neck(x)
return x

def forward_train(self,
img,
img_metas,
gt_bboxes,
gt_labels,
gt_bboxes_ignore=None):
x = self.extract_feat(img)
outs = self.bbox_head(x)
loss_inputs = outs + (gt_bboxes, gt_labels, img_metas, self.train_cfg)
losses = self.bbox_head.loss(
*loss_inputs, gt_bboxes_ignore=gt_bboxes_ignore)
return losses

def simple_test(self, img, img_meta, rescale=False):
x = self.extract_feat(img)
outs = self.bbox_head(x)
bbox_inputs = outs + (img_meta, self.test_cfg, rescale)
bbox_list = self.bbox_head.get_bboxes(*bbox_inputs)
bbox_results = [
bbox2result(det_bboxes, det_labels, self.bbox_head.num_classes)
for det_bboxes, det_labels in bbox_list
]
return bbox_results[0]

def aug_test(self, imgs, img_metas, rescale=False):
raise NotImplementedError

3.1.2 原理解析

可以看到,对于SingleStageDetector通过继承基础类BaseDetector来实现单Stage的目标检测模型:

1
2
@DETECTORS.register_module
class SingleStageDetector(BaseDetector):

函数__init__定义模型的数据结构和数据初始化:

1
2
3
4
5
6
7
def __init__(self,
backbone,
neck=None,
bbox_head=None,
train_cfg=None,
test_cfg=None,
pretrained=None):

函数forward_train定义模型的前向传播训练,输出为训练时的损失losses

1
2
3
4
5
6
def forward_train(self,
img,
img_metas,
gt_bboxes,
gt_labels,
gt_bboxes_ignore=None):

函数simple_test定义模型的测试评价所需要执行的内容,输出为模型预测的边界框结果bbox_results

1
2
3
4
5
6
7
8
9
10
def simple_test(self, img, img_meta, rescale=False):
x = self.extract_feat(img)
outs = self.bbox_head(x)
bbox_inputs = outs + (img_meta, self.test_cfg, rescale)
bbox_list = self.bbox_head.get_bboxes(*bbox_inputs)
bbox_results = [
bbox2result(det_bboxes, det_labels, self.bbox_head.num_classes)
for det_bboxes, det_labels in bbox_list
]
return bbox_results[0]

根据代码实现,可以看到:bbox_results是通过Python的列表推导式,遍历数据并调用bbox2result函数,根据该函数的返回值填充出的。

bbox_results中包含以下信息:

  • 模型目标检测输出的边界框det_bboxes
  • 各框的预测标记信息结合出的结果det_labels

3.2 two_stage.py解析

3.2.1 源代码参考

mmdet/models/detectors/two_stage.py实现了一个通用的基础双Stage目标检测模型。相较于单Stage模型,双Stage模型增加了目标实例(instance)的掩膜(mask)输出,即增加了实例分割(instance segmentation)的功能。

源代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
import torch
import torch.nn as nn

from .base import BaseDetector
from .test_mixins import RPNTestMixin, BBoxTestMixin, MaskTestMixin
from .. import builder
from ..registry import DETECTORS
from mmdet.core import bbox2roi, bbox2result, build_assigner, build_sampler


@DETECTORS.register_module
class TwoStageDetector(BaseDetector, RPNTestMixin, BBoxTestMixin,
MaskTestMixin):

def __init__(self,
backbone,
neck=None,
rpn_head=None,
bbox_roi_extractor=None,
bbox_head=None,
mask_roi_extractor=None,
mask_head=None,
train_cfg=None,
test_cfg=None,
pretrained=None):
super(TwoStageDetector, self).__init__()
self.backbone = builder.build_backbone(backbone)

if neck is not None:
self.neck = builder.build_neck(neck)
else:
raise NotImplementedError

if rpn_head is not None:
self.rpn_head = builder.build_head(rpn_head)

if bbox_head is not None:
self.bbox_roi_extractor = builder.build_roi_extractor(
bbox_roi_extractor)
self.bbox_head = builder.build_head(bbox_head)

if mask_head is not None:
self.mask_roi_extractor = builder.build_roi_extractor(
mask_roi_extractor)
self.mask_head = builder.build_head(mask_head)

self.train_cfg = train_cfg
self.test_cfg = test_cfg

self.init_weights(pretrained=pretrained)

@property
def with_rpn(self):
return hasattr(self, 'rpn_head') and self.rpn_head is not None

def init_weights(self, pretrained=None):
super(TwoStageDetector, self).init_weights(pretrained)
self.backbone.init_weights(pretrained=pretrained)
if self.with_neck:
if isinstance(self.neck, nn.Sequential):
for m in self.neck:
m.init_weights()
else:
self.neck.init_weights()
if self.with_rpn:
self.rpn_head.init_weights()
if self.with_bbox:
self.bbox_roi_extractor.init_weights()
self.bbox_head.init_weights()
if self.with_mask:
self.mask_roi_extractor.init_weights()
self.mask_head.init_weights()

def extract_feat(self, img):
x = self.backbone(img)
if self.with_neck:
x = self.neck(x)
return x

def forward_train(self,
img,
img_meta,
gt_bboxes,
gt_labels,
gt_bboxes_ignore=None,
gt_masks=None,
proposals=None):
x = self.extract_feat(img)

losses = dict()

# RPN forward and loss
if self.with_rpn:
rpn_outs = self.rpn_head(x)
rpn_loss_inputs = rpn_outs + (gt_bboxes, img_meta,
self.train_cfg.rpn)
rpn_losses = self.rpn_head.loss(
*rpn_loss_inputs, gt_bboxes_ignore=gt_bboxes_ignore)
losses.update(rpn_losses)

proposal_inputs = rpn_outs + (img_meta, self.test_cfg.rpn)
proposal_list = self.rpn_head.get_bboxes(*proposal_inputs)
else:
proposal_list = proposals

# assign gts and sample proposals
if self.with_bbox or self.with_mask:
bbox_assigner = build_assigner(self.train_cfg.rcnn.assigner)
bbox_sampler = build_sampler(
self.train_cfg.rcnn.sampler, context=self)
num_imgs = img.size(0)
if gt_bboxes_ignore is None:
gt_bboxes_ignore = [None for _ in range(num_imgs)]
sampling_results = []
for i in range(num_imgs):
assign_result = bbox_assigner.assign(
proposal_list[i], gt_bboxes[i], gt_bboxes_ignore[i],
gt_labels[i])
sampling_result = bbox_sampler.sample(
assign_result,
proposal_list[i],
gt_bboxes[i],
gt_labels[i],
feats=[lvl_feat[i][None] for lvl_feat in x])
sampling_results.append(sampling_result)

# bbox head forward and loss
if self.with_bbox:
rois = bbox2roi([res.bboxes for res in sampling_results])
# TODO: a more flexible way to decide which feature maps to use
bbox_feats = self.bbox_roi_extractor(
x[:self.bbox_roi_extractor.num_inputs], rois)
cls_score, bbox_pred = self.bbox_head(bbox_feats)

bbox_targets = self.bbox_head.get_target(
sampling_results, gt_bboxes, gt_labels, self.train_cfg.rcnn)
loss_bbox = self.bbox_head.loss(cls_score, bbox_pred,
*bbox_targets)
losses.update(loss_bbox)

# mask head forward and loss
if self.with_mask:
pos_rois = bbox2roi([res.pos_bboxes for res in sampling_results])
mask_feats = self.mask_roi_extractor(
x[:self.mask_roi_extractor.num_inputs], pos_rois)
mask_pred = self.mask_head(mask_feats)

mask_targets = self.mask_head.get_target(
sampling_results, gt_masks, self.train_cfg.rcnn)
pos_labels = torch.cat(
[res.pos_gt_labels for res in sampling_results])
loss_mask = self.mask_head.loss(mask_pred, mask_targets,
pos_labels)
losses.update(loss_mask)

return losses

def simple_test(self, img, img_meta, proposals=None, rescale=False):
"""Test without augmentation."""
assert self.with_bbox, "Bbox head must be implemented."

x = self.extract_feat(img)

proposal_list = self.simple_test_rpn(
x, img_meta, self.test_cfg.rpn) if proposals is None else proposals

det_bboxes, det_labels = self.simple_test_bboxes(
x, img_meta, proposal_list, self.test_cfg.rcnn, rescale=rescale)
bbox_results = bbox2result(det_bboxes, det_labels,
self.bbox_head.num_classes)

if not self.with_mask:
return bbox_results
else:
segm_results = self.simple_test_mask(
x, img_meta, det_bboxes, det_labels, rescale=rescale)
return bbox_results, segm_results

def aug_test(self, imgs, img_metas, rescale=False):
"""Test with augmentations.

If rescale is False, then returned bboxes and masks will fit the scale
of imgs[0].
"""
# recompute feats to save memory
proposal_list = self.aug_test_rpn(
self.extract_feats(imgs), img_metas, self.test_cfg.rpn)
det_bboxes, det_labels = self.aug_test_bboxes(
self.extract_feats(imgs), img_metas, proposal_list,
self.test_cfg.rcnn)

if rescale:
_det_bboxes = det_bboxes
else:
_det_bboxes = det_bboxes.clone()
_det_bboxes[:, :4] *= img_metas[0][0]['scale_factor']
bbox_results = bbox2result(_det_bboxes, det_labels,
self.bbox_head.num_classes)

# det_bboxes always keep the original scale
if self.with_mask:
segm_results = self.aug_test_mask(
self.extract_feats(imgs), img_metas, det_bboxes, det_labels)
return bbox_results, segm_results
else:
return bbox_results

3.2.2 原理解析

可以看到,TwoStageDetector通过继承基础类BaseDetector以及RPN等类来实现双Stage的目标检测模型:

1
2
3
@DETECTORS.register_module
class TwoStageDetector(BaseDetector, RPNTestMixin, BBoxTestMixin,
MaskTestMixin):

函数__init__定义模型的数据结构和数据初始化:

1
2
3
4
5
6
7
8
9
10
11
def __init__(self,
backbone,
neck=None,
rpn_head=None,
bbox_roi_extractor=None,
bbox_head=None,
mask_roi_extractor=None,
mask_head=None,
train_cfg=None,
test_cfg=None,
pretrained=None):

函数forward_train定义模型的前向传播训练,输出为训练时的损失losses

1
2
3
4
5
6
7
8
def forward_train(self,
img,
img_meta,
gt_bboxes,
gt_labels,
gt_bboxes_ignore=None,
gt_masks=None,
proposals=None):

函数simple_test定义模型的测试评价所需要执行的内容,输出为模型预测的边界框结果bbox_resultssegm_results

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def simple_test(self, img, img_meta, proposals=None, rescale=False):
"""Test without augmentation."""
assert self.with_bbox, "Bbox head must be implemented."

x = self.extract_feat(img)

proposal_list = self.simple_test_rpn(
x, img_meta, self.test_cfg.rpn) if proposals is None else proposals

det_bboxes, det_labels = self.simple_test_bboxes(
x, img_meta, proposal_list, self.test_cfg.rcnn, rescale=rescale)
bbox_results = bbox2result(det_bboxes, det_labels,
self.bbox_head.num_classes)

if not self.with_mask:
return bbox_results
else:
segm_results = self.simple_test_mask(
x, img_meta, det_bboxes, det_labels, rescale=rescale)
return bbox_results, segm_results

根据代码实现,可以看到:bbox_results是通过Python的列表推导式,遍历数据并调用bbox2result函数,根据该函数的返回值填充出的。

bbox_results中包含以下信息:

  • 模型目标检测输出的边界框det_bboxes
  • 各框的预测标记信息结合出的结果det_labels

另外,双Stage模型还输出了目标实例的掩膜预测结果segm_results

4 自定义数据集解析

4.1 coco.py解析

4.1.1 源代码参考

coco.py位于mmdet.datasets中,实现CocoDataset类,并实现了一系列对COCO数据集读取解析的类成员方法。coco.py源代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import numpy as np
from pycocotools.coco import COCO

from .custom import CustomDataset


class CocoDataset(CustomDataset):

CLASSES = ('person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus',
'train', 'truck', 'boat', 'traffic_light', 'fire_hydrant',
'stop_sign', 'parking_meter', 'bench', 'bird', 'cat', 'dog',
'horse', 'sheep', 'cow', 'elephant', 'bear', 'zebra', 'giraffe',
'backpack', 'umbrella', 'handbag', 'tie', 'suitcase', 'frisbee',
'skis', 'snowboard', 'sports_ball', 'kite', 'baseball_bat',
'baseball_glove', 'skateboard', 'surfboard', 'tennis_racket',
'bottle', 'wine_glass', 'cup', 'fork', 'knife', 'spoon', 'bowl',
'banana', 'apple', 'sandwich', 'orange', 'broccoli', 'carrot',
'hot_dog', 'pizza', 'donut', 'cake', 'chair', 'couch',
'potted_plant', 'bed', 'dining_table', 'toilet', 'tv', 'laptop',
'mouse', 'remote', 'keyboard', 'cell_phone', 'microwave',
'oven', 'toaster', 'sink', 'refrigerator', 'book', 'clock',
'vase', 'scissors', 'teddy_bear', 'hair_drier', 'toothbrush')

def load_annotations(self, ann_file):
self.coco = COCO(ann_file)
self.cat_ids = self.coco.getCatIds()
self.cat2label = {
cat_id: i + 1
for i, cat_id in enumerate(self.cat_ids)
}
self.img_ids = self.coco.getImgIds()
img_infos = []
for i in self.img_ids:
info = self.coco.loadImgs([i])[0]
info['filename'] = info['file_name']
img_infos.append(info)
return img_infos

def get_ann_info(self, idx):
img_id = self.img_infos[idx]['id']
ann_ids = self.coco.getAnnIds(imgIds=[img_id])
ann_info = self.coco.loadAnns(ann_ids)
return self._parse_ann_info(ann_info, self.with_mask)

def _filter_imgs(self, min_size=32):
"""Filter images too small or without ground truths."""
valid_inds = []
ids_with_ann = set(_['image_id'] for _ in self.coco.anns.values())
for i, img_info in enumerate(self.img_infos):
if self.img_ids[i] not in ids_with_ann:
continue
if min(img_info['width'], img_info['height']) >= min_size:
valid_inds.append(i)
return valid_inds

def _parse_ann_info(self, ann_info, with_mask=True):
"""Parse bbox and mask annotation.

Args:
ann_info (list[dict]): Annotation info of an image.
with_mask (bool): Whether to parse mask annotations.

Returns:
dict: A dict containing the following keys: bboxes, bboxes_ignore,
labels, masks, mask_polys, poly_lens.
"""
gt_bboxes = []
gt_labels = []
gt_bboxes_ignore = []
# Two formats are provided.
# 1. mask: a binary map of the same size of the image.
# 2. polys: each mask consists of one or several polys, each poly is a
# list of float.
if with_mask:
gt_masks = []
gt_mask_polys = []
gt_poly_lens = []
for i, ann in enumerate(ann_info):
if ann.get('ignore', False):
continue
x1, y1, w, h = ann['bbox']
if ann['area'] <= 0 or w < 1 or h < 1:
continue
bbox = [x1, y1, x1 + w - 1, y1 + h - 1]
if ann['iscrowd']:
gt_bboxes_ignore.append(bbox)
else:
gt_bboxes.append(bbox)
gt_labels.append(self.cat2label[ann['category_id']])
if with_mask:
gt_masks.append(self.coco.annToMask(ann))
mask_polys = [
p for p in ann['segmentation'] if len(p) >= 6
] # valid polygons have >= 3 points (6 coordinates)
poly_lens = [len(p) for p in mask_polys]
gt_mask_polys.append(mask_polys)
gt_poly_lens.extend(poly_lens)
if gt_bboxes:
gt_bboxes = np.array(gt_bboxes, dtype=np.float32)
gt_labels = np.array(gt_labels, dtype=np.int64)
else:
gt_bboxes = np.zeros((0, 4), dtype=np.float32)
gt_labels = np.array([], dtype=np.int64)

if gt_bboxes_ignore:
gt_bboxes_ignore = np.array(gt_bboxes_ignore, dtype=np.float32)
else:
gt_bboxes_ignore = np.zeros((0, 4), dtype=np.float32)

ann = dict(
bboxes=gt_bboxes, labels=gt_labels, bboxes_ignore=gt_bboxes_ignore)

if with_mask:
ann['masks'] = gt_masks
# poly format is not used in the current implementation
ann['mask_polys'] = gt_mask_polys
ann['poly_lens'] = gt_poly_lens
return ann

4.1.2 原理解析

以下内容摘选自:mmdetection / README.md

Train on custom datasets

We define a simple annotation format.

The annotation of a dataset is a list of dict, each dict corresponds to an image. There are 3 field filename (relative path), width, height for testing, and an additional field ann for training. ann is also a dict containing at least 2 fields: bboxes and labels, both of which are numpy arrays. Some datasets may provide annotations like crowd/difficult/ignored bboxes, we use bboxes_ignore and labels_ignore to cover them.

Here is an example.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[
{
'filename': 'a.jpg',
'width': 1280,
'height': 720,
'ann': {
'bboxes': <np.ndarray> (n, 4),
'labels': <np.ndarray> (n, ),
'bboxes_ignore': <np.ndarray> (k, 4),
'labels_ignore': <np.ndarray> (k, ) (optional field)
}
},
...
]

There are two ways to work with custom datasets.

  • online conversion

You can write a new Dataset class inherited from CustomDataset, and overwrite two methods load_annotations(self, ann_file) and get_ann_info(self, idx), like CocoDataset and VOCDataset.

  • offline conversion

You can convert the annotation format to the expected format above and save it to a pickle or json file, like pascal_voc.py. Then you can simply use CustomDataset.

正如mmdetection / README.md中所述,coco.py定义的CocoDataset,属于online conversion,通过继承CustomDataset基础类并重载实现 load_annotations(self, ann_file)get_ann_info(self, idx)两个成员函数实现一个自定义的数据集类型加入mmdetection框架,这两个函数是对外调用的接口。

load_annotations(self, ann_file) 用于加载标注文件到内存中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def load_annotations(self, ann_file):
self.coco = COCO(ann_file)
self.cat_ids = self.coco.getCatIds()
self.cat2label = {
cat_id: i + 1
for i, cat_id in enumerate(self.cat_ids)
}
self.img_ids = self.coco.getImgIds()
img_infos = []
for i in self.img_ids:
info = self.coco.loadImgs([i])[0]
info['filename'] = info['file_name']
img_infos.append(info)
return img_infos

get_ann_info(self, idx)用于提取指定image idx的某个image的标注信息:

1
2
3
4
5
def get_ann_info(self, idx):
img_id = self.img_infos[idx]['id']
ann_ids = self.coco.getAnnIds(imgIds=[img_id])
ann_info = self.coco.loadAnns(ann_ids)
return self._parse_ann_info(ann_info, self.with_mask)

5 bbox数据结构解析

作为目标检测系统,候选框是目标检测模型的基本输出。bbox数据结构是处理模型输出结果所必须的掌握的数据结构。

5.1 COCO数据集的bbox数据结构

COCO数据集的标注中,目标检测标注内容的JSON数据结构如下:

以下内容摘选自:COCO DataFormat

1. Object Detection

Each object instance annotation contains a series of fields, including the category id and segmentation mask of the object. The segmentation format depends on whether the instance represents a single object (iscrowd=0 in which case polygons are used) or a collection of objects (iscrowd=1 in which case RLE is used). Note that a single object (iscrowd=0) may require multiple polygons, for example if occluded. Crowd annotations (iscrowd=1) are used to label large groups of objects (e.g. a crowd of people). In addition, an enclosing bounding box is provided for each object (box coordinates are measured from the top left image corner and are 0-indexed).

1
2
3
4
5
6
7
8
9
annotation{
"id" : int,
"image_id" : int,
"category_id" : int,
"segmentation" : RLE or [polygon],
"area" : float,
"bbox" : [x,y,width,height],
"iscrowd" : 0 or 1,
}

详细文档可参阅:

COCO DataFormat

在COCO数据集遵循的标注规范中,边界框(bbox, bounding box)的数据结构是:

1
"bbox" : [x,y,width,height], 

目标物体通过边界框来标注,边界框则通过其左上角坐标(x, y)以及边界框的宽度width和高度height来确定。单位均为像素。

5.2 mmdetection中的bbox数据结构

5.2.1 CocoDataset对bbox的解析转换

CocoDataset定义于4.1节中提到的coco.py

CocoDataset负责将外部的COCO数据集读取解析并转换为mmdetection框架实际处理使用的数据结构。

类内部成员函数_parse_ann_info(ann_info, with_mask)封装了解析单个图片标注信息的流程,get_ann_info(self, idx)在函数堆栈返回时通过调用该解析函数实现对当前图片标注信息的解析。在_parse_ann_info(ann_info, with_mask)函数中,对COCO数据集的"bbox" : [x,y,width,height]候选框数据格式进行了解析和数据结构的转换:

1
2
3
4
5
6
7
for i, ann in enumerate(ann_info):
if ann.get('ignore', False):
continue
x1, y1, w, h = ann['bbox']
if ann['area'] <= 0 or w < 1 or h < 1:
continue
bbox = [x1, y1, x1 + w - 1, y1 + h - 1]

_parse_ann_info(ann_info, with_mask)的这段实现中,通过循环遍历标注信息容器ann_info,将COCO数据集中的左上角坐标(x1, y1)、宽w、高h读出。并转换为左上角坐标(x1,y1)和右下角坐标(x1+w-1, y1+h-1),将这四个坐标值依次写入bbox结构供mmdetection框架处理。

也就是说,mmdetection中的bbox数据结构是通过候选框的左上角坐标右下角坐标来标记候选框的。

5.2.2 bbox2result解析

在mmdetection中模型对bbox的实际调用方面,两个基本的目标检测通用模型SingleStageDetectorTwoStageDetector在测试输出时都是通过调用bbox2result函数来将模型输出的结果转换到一个list型的result作为模型最终的返回结果。

1
2
3
4
5
6
7
8
9
10
def simple_test(self, img, img_meta, rescale=False):
x = self.extract_feat(img)
outs = self.bbox_head(x)
bbox_inputs = outs + (img_meta, self.test_cfg, rescale)
bbox_list = self.bbox_head.get_bboxes(*bbox_inputs)
bbox_results = [
bbox2result(det_bboxes, det_labels, self.bbox_head.num_classes)
for det_bboxes, det_labels in bbox_list
]
return bbox_results[0]

SingleStageDetector为例,返回值是通过调用bbox2result取得的。

5.2.2.1 源代码参考

bbox2result函数在mmdet\core\bbox\transforms.py中实现,源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def bbox2result(bboxes, labels, num_classes):
"""Convert detection results to a list of numpy arrays.

Args:
bboxes (Tensor): shape (n, 5)
labels (Tensor): shape (n, )
num_classes (int): class number, including background class

Returns:
list(ndarray): bbox results of each class
"""
if bboxes.shape[0] == 0:
return [
np.zeros((0, 5), dtype=np.float32) for i in range(num_classes - 1)
]
else:
bboxes = bboxes.cpu().numpy()
labels = labels.cpu().numpy()
return [bboxes[labels == i, :] for i in range(num_classes - 1)]
5.2.2.2 原理解析

根据函数实现内的块注释,可以知道,传入参数bboxes和labels都是PyTorch中的张量类型数据。

在注释中,可以看到bboxes (Tensor): shape (n, 5)表示bboxes是n×5维的矩阵,而labels (Tensor): shape (n, )表示labels包含n条记录。当然,n是对当前图片目标检测后,输出的候选框个数。

一个bbox应该包含4个坐标值来框选目标区域,那么第5个数值是什么呢?

5.2.3 AnchorHead.get_boxes解析

可以看到,在simple_test函数中,是通过调用bbox_head.get_boxes函数取得的:

1
bbox_list = self.bbox_head.get_bboxes(*bbox_inputs)
5.2.3.1 get_boxes解析

get_boxes函数在mmdet\models\anchor_heads\anchor_head.py中定义,是AnchorHead的类成员函数。源代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def get_bboxes(self, cls_scores, bbox_preds, img_metas, cfg,
rescale=False):
assert len(cls_scores) == len(bbox_preds)
num_levels = len(cls_scores)

mlvl_anchors = [
self.anchor_generators[i].grid_anchors(cls_scores[i].size()[-2:],
self.anchor_strides[i])
for i in range(num_levels)
]
result_list = []
for img_id in range(len(img_metas)):
cls_score_list = [
cls_scores[i][img_id].detach() for i in range(num_levels)
]
bbox_pred_list = [
bbox_preds[i][img_id].detach() for i in range(num_levels)
]
img_shape = img_metas[img_id]['img_shape']
scale_factor = img_metas[img_id]['scale_factor']
proposals = self.get_bboxes_single(cls_score_list, bbox_pred_list,
mlvl_anchors, img_shape,
scale_factor, cfg, rescale)
result_list.append(proposals)
return result_list

get_boxes函数支持对输入的一批图片批量提取区域proposals,并输出分值score和边界框bbox

get_boxes函数中,通过for循环遍历img_id并每次调用同为类成员函数的get_boxes_single来对当前img_id提取区域proposals,并将proposals加入容器result_list以便最后遍历完成后返回结果。

5.2.3.2 get_boxes_single解析

get_boxes函数也在mmdet\models\anchor_heads\anchor_head.py中定义,同为AnchorHead的类成员函数。源代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def get_bboxes_single(self,
cls_scores,
bbox_preds,
mlvl_anchors,
img_shape,
scale_factor,
cfg,
rescale=False):
assert len(cls_scores) == len(bbox_preds) == len(mlvl_anchors)
mlvl_bboxes = []
mlvl_scores = []
for cls_score, bbox_pred, anchors in zip(cls_scores, bbox_preds,
mlvl_anchors):
assert cls_score.size()[-2:] == bbox_pred.size()[-2:]
cls_score = cls_score.permute(1, 2, 0).reshape(
-1, self.cls_out_channels)
if self.use_sigmoid_cls:
scores = cls_score.sigmoid()
else:
scores = cls_score.softmax(-1)
bbox_pred = bbox_pred.permute(1, 2, 0).reshape(-1, 4)
nms_pre = cfg.get('nms_pre', -1)
if nms_pre > 0 and scores.shape[0] > nms_pre:
if self.use_sigmoid_cls:
max_scores, _ = scores.max(dim=1)
else:
max_scores, _ = scores[:, 1:].max(dim=1)
_, topk_inds = max_scores.topk(nms_pre)
anchors = anchors[topk_inds, :]
bbox_pred = bbox_pred[topk_inds, :]
scores = scores[topk_inds, :]
bboxes = delta2bbox(anchors, bbox_pred, self.target_means,
self.target_stds, img_shape)
mlvl_bboxes.append(bboxes)
mlvl_scores.append(scores)
mlvl_bboxes = torch.cat(mlvl_bboxes)
if rescale:
mlvl_bboxes /= mlvl_bboxes.new_tensor(scale_factor)
mlvl_scores = torch.cat(mlvl_scores)
if self.use_sigmoid_cls:
padding = mlvl_scores.new_zeros(mlvl_scores.shape[0], 1)
mlvl_scores = torch.cat([padding, mlvl_scores], dim=1)
det_bboxes, det_labels = multiclass_nms(
mlvl_bboxes, mlvl_scores, cfg.score_thr, cfg.nms, cfg.max_per_img)
return det_bboxes, det_labels

在这份代码中,返回输出的是det_bboxes以及det_labels,分别对应调用bbox2result时的输入参数bboxeslabels

get_boxes_single函数的返回值又是通过调用multiclass_nms函数取得的:

1
2
3
det_bboxes, det_labels = multiclass_nms(
mlvl_bboxes, mlvl_scores, cfg.score_thr, cfg.nms, cfg.max_per_img)
return det_bboxes, det_labels

5.2.4 multiclass_nms解析

5.2.4.1 源代码参考

multiclass_nms函数在mmdet\core\post_processing\bbox_nms.py中定义。该文件中只包含这一个函数的定义实现,源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import torch

from mmdet.ops.nms import nms_wrapper


def multiclass_nms(multi_bboxes, multi_scores, score_thr, nms_cfg, max_num=-1):
"""NMS for multi-class bboxes.

Args:
multi_bboxes (Tensor): shape (n, #class*4) or (n, 4)
multi_scores (Tensor): shape (n, #class)
score_thr (float): bbox threshold, bboxes with scores lower than it
will not be considered.
nms_thr (float): NMS IoU threshold
max_num (int): if there are more than max_num bboxes after NMS,
only top max_num will be kept.

Returns:
tuple: (bboxes, labels), tensors of shape (k, 5) and (k, 1). Labels
are 0-based.
"""
num_classes = multi_scores.shape[1]
bboxes, labels = [], []
nms_cfg_ = nms_cfg.copy()
nms_type = nms_cfg_.pop('type', 'nms')
nms_op = getattr(nms_wrapper, nms_type)
for i in range(1, num_classes):
cls_inds = multi_scores[:, i] > score_thr
if not cls_inds.any():
continue
# get bboxes and scores of this class
if multi_bboxes.shape[1] == 4:
_bboxes = multi_bboxes[cls_inds, :]
else:
_bboxes = multi_bboxes[cls_inds, i * 4:(i + 1) * 4]
_scores = multi_scores[cls_inds, i]
cls_dets = torch.cat([_bboxes, _scores[:, None]], dim=1)
cls_dets, _ = nms_op(cls_dets, **nms_cfg_)
cls_labels = multi_bboxes.new_full(
(cls_dets.shape[0], ), i - 1, dtype=torch.long)
bboxes.append(cls_dets)
labels.append(cls_labels)
if bboxes:
bboxes = torch.cat(bboxes)
labels = torch.cat(labels)
if bboxes.shape[0] > max_num:
_, inds = bboxes[:, -1].sort(descending=True)
inds = inds[:max_num]
bboxes = bboxes[inds]
labels = labels[inds]
else:
bboxes = multi_bboxes.new_zeros((0, 5))
labels = multi_bboxes.new_zeros((0, ), dtype=torch.long)

return bboxes, labels

5.2.4.2 原理解析

由函数的块注释可以得知,这是一段实现候选框NMS功能的代码。NMS就是非极大值抑制(Non-Maximum Suppression,NMS),指的就是对重叠候选框,抑制非极大的候选框,而选出分数score为极大值的候选框。

非极大值抑制(Non-Maximum Suppression,NMS),顾名思义就是抑制不是极大值的元素,可以理解为局部最大搜索。这个局部代表的是一个邻域,邻域有两个参数可变,一是邻域的维数,二是邻域的大小。这里不讨论通用的NMS算法(参考论文《Efficient Non-Maximum Suppression》对1维和2维数据的NMS实现),而是用于目标检测中提取分数最高的窗口的。例如在行人检测中,滑动窗口经提取特征,经分类器分类识别后,每个窗口都会得到一个分数。但是滑动窗口会导致很多窗口与其他窗口存在包含或者大部分交叉的情况。这时就需要用到NMS来选取那些邻域里分数最高(是行人的概率最大),并且抑制那些分数低的窗口。 NMS在计算机视觉领域有着非常重要的应用,如视频目标跟踪、数据挖掘、3D重建、目标识别以及纹理分析等。


著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。 原文: https://www.cnblogs.com/makefile/p/nms.html © 康行天下

结合函数的注释,可以知道在该函数的输入中,主要的四个输入参数为:

  • multi_bboxes:输入的n×4维矩阵。n个候选框,每个候选框4个坐标值。
    • 对多类(multiclass),可以是n×#class*4维,即在该目标位置,对每类预测不同的候选框形状。
  • multi_scores:输入的n维向量。n个score分值,n个候选框对应的分值。
  • score_thr:候选框的分值阈值,低于该阈值的候选框须滤除。
  • nms_cfg:NMS的交并比IoU阈值,达到该阈值的才作为NMS局部最大搜索的邻域。

在函数实现中,通过for循环遍历各个类,对每个类,判断哪些行(候选框)的第i类的score高于score阈值:

1
cls_inds = multi_scores[:, i] > score_thr

此处的cls_inds为n维的Bool型向量。

接下来,通过cls_indsBool向量将当前第i类(亦第i列)的,高于阈值(cls_inds[row_id])为True的行筛选出来,取得这些score高于阈值的bboxesscores,存放在_bboxes_scores中:

1
2
3
4
5
6
# get bboxes and scores of this class
if multi_bboxes.shape[1] == 4:
_bboxes = multi_bboxes[cls_inds, :]
else:
_bboxes = multi_bboxes[cls_inds, i * 4:(i + 1) * 4]
_scores = multi_scores[cls_inds, i]

随后,将_bboxes_scores做结构转换以便维度和谐,并进行拼接,输出该类的检测结果cls_dets

1
cls_dets = torch.cat([_bboxes, _scores[:, None]], dim=1)

然后,调用nms_op函数对cls_dets做NMS操作:

1
cls_dets, _ = nms_op(cls_dets, **nms_cfg_)

cls_dets此后被加入bboxes中,作为最终函数返回的数据结构:

1
bboxes.append(cls_dets)

到这里,可以真正理解:bboxes中的数据结构实际上是候选框的坐标信息(_bboxes,即每个bbox的左上角坐标x1, y1和右下角坐标x1+w-1, y2+h-1,共4维)和候选框对应的分值(_scores,即每个bbox的score,共1维)。因此每一个bbox在mmdetection的模型的预测输出中是一个5维张量。

本节所述的multiclass_nms函数通过返回值,将该数据结构的bboxes返回给以上所述的get_boxes_single,继而返回给get_boxes,再返回给bbox2result,最终成为模型测试simple_test函数输出的结果bbox_results