使用自己的数据 Finetune TorchVision 中的 Faster R-CNN

1. Faster R-CNN 概述

Faster R-CNN[1] 是典型的两阶段的目标检测算法,两阶段的目标检测算法由 R-CNN 到 Fast R-CNN,再发展到 Faster R-CNN,其最主要的发展脉络是如何加速模型的检测速度:

  • R-CNN:由启发式的方法,如 Selective Search 等方法在原图上找到候选,每一个候选框经过 CNN 网络得到每一个候选框对应的语义向量,再通过 SVM 分类器判别是否是需要的候选框,并训练一个边界框回归模型,对最终的边界框优化。可以发现在整个过程中被割裂成了多个独立的部分,其过程如下图所示[2]

  • Fast R-CNN:通过 R-CNN,发现每个候选框都要经过 CNN 网络提取语义向量,通过最后还得经过 SVM 和边界框回归得到最终的判断和边界框,这里完全可以在一个网络里实现,因此在 Fast R-CNN 中只需要将原图通过一次 CNN 网络,并将候选框映射到特征图上,得到对应的语义向量,这很大程度上提高了检测的速度,同时,通过一个网络实现目标的判断以及边界框的回归,也使得整个过程不再是孤立的,其过程如下图所示[2]

  • Faster R-CNN:Fast R-CNN 的进一步优化,这次是对候选框的选择部分,在 Fast R-CNN 及之前的模型中是使用启发式的方法预先选择出一部分的候选框,在 Faster R-CNN 直接使用 CNN 网络代替这部分,其过程如下图所示[2]

2. Finetune Faster R-CNN

在 PyTorch 的 TorchVision 包中提供了多种 backbone 的 Faster R-CNN 实现,对于一个新的目标检测任务,我们需要 Finetune 这些模型,以适应新的任务,Finetune 的过程大致需要以下几个步骤:

  1. 构造数据集
  2. 搭建模型
  3. 损失函数
  4. 模型训练
  5. 模型预测

2.1. 构造数据集

在此,我们选择 Kaggle 上的一个口罩检测的数据集[3],数据集分为两个文件夹:

  • images:包含了原始的图片
  • annotations:包含了图片的标注结果,格式为 xml 格式

我们以其中一张图为例,展示原图以及标注后的结果:

其中,绿色的框表示戴了口罩,红色的框表示未戴口罩。为了方便后面的数据集的准备,我们先对数据进行一次处理,将标注结果全部从 xml 文件中读取出来:

import os
import xml.etree.ElementTree as ET

annotation_file_list = os.listdir("archive/annotations")

def parse_xml(xml_file):
    tree = ET.parse(xml_file)
    root = tree.getroot()

    bbox = [[int(box.find(f"bndbox/{i}").text) for i in ["xmin", "ymin", "xmax", "ymax"]] for box in root.iter("object")]
    labels = [2 if box.find("name").text == "with_mask" else 1 for box in root.iter("object")]

    return bbox, labels

for i, anno in enumerate(annotation_file_list):
    xml_path = "archive/annotations/" + anno
    bbox, labels = parse_xml(xml_path)

    new_file_name = anno.strip().split(".")[0] + ".txt"
    new_file_path = "archive/labels/" + new_file_name

    fw = open(new_file_path, "w")
    for j, label in enumerate(labels):
        x1, y1, x2, y2 = bbox[j]
        s = str(label) + " " + str(x1) + " " + str(y1) + " " + str(x2) + " " + str(y2) + "\n"
        fw.write(s)
    fw.close()

最终做成的数据格式变成一个 txt 的文件,格式为:label x1 y1 x2 y2,注意,label 分别取了 1 表示未戴口罩,2 表示戴了口罩。同时,我们将数据集拆分成训练集和测试集两个部分。接下来通过 Dataset 类构造数据集,其实现代码如下:

class FacemaskDataset(Dataset):
    def __init__(self, root_path):
        self.root_path = root_path
        self.images = sorted([root_path + "/train/" + i for i in os.listdir(root_path + "/train/")])
        self.labels = sorted([root_path + "/train_labels/" + i for i in os.listdir(root_path + "/train_labels/")])

        self.transform_img = transforms.Compose([transforms.ToTensor()])

    def __getitem__(self, index):
        img = Image.open(self.images[index]).convert("RGB")
        bbox, labels = self._parse_label(self.labels[index])

        target = {
            "boxes": bbox,
            "labels": labels,
            "image_id": torch.tensor([index]),
            "area": (bbox[:, 2] - bbox[:, 0]) * (bbox[:, 3] - bbox[:, 1]),
            "iscrowd": torch.zeros(len(bbox), dtype=torch.int64)
        }

        return self.transform_img(img), target
    
    def _parse_label(self, label_file):
        bbox = []
        labels = []
        f = open(label_file)
        for line in f.readlines():
            bbox_i = []
            lines = line.strip().split(" ")
            labels.append(int(lines[0]))
            for i in range(1, len(lines)):
                bbox_i.append(int(lines[i]))
            bbox.append(bbox_i)

        bbox = torch.tensor(bbox, dtype=torch.float32)
        labels = torch.tensor(labels, dtype=torch.int64)
        return bbox, labels
        
    def __len__(self):
        return len(self.images)

此处有两个注意点:第一,对于图片,无需做 resize 的操作,因为在 Faster R-CNN 的代码中还会对其进行 resize 的操作,具体的操作在 GeneralizedRCNNTransform 类中。

第二个注意点事在 target 的构造部分,使用了 dict 的编排方式,这是因为在后续 resize 等一系列的操作中,会根据不同的字段找到对应的值,进行一些变换,具体的格式也可以参考[4]

2.2. 搭建模型

因为是使用 TorchVision 中的实现方式,这里选用 fasterrcnn_resnet50_fpn,其完整的模型的代码如下:

model = fasterrcnn_resnet50_fpn(weights=True)
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes=3)
model = model.to(device)

因为预训练模型的类别数与当前我们的数据集的类别个数不一致,因此要修改预测模型的数据个数。从上面数据集部分可以看到,实际上是 2 个类别,分别为戴口罩未戴口罩,这里的 num_classes 参数是 3,这是因为还有一个隐藏的类别是背景。通过 summary() 函数查看网络结构,如下:

==============================================================================================================
Layer (type:depth-idx)                                       Output Shape              Param #
==============================================================================================================
FasterRCNNModel                                              --                        --
├─FasterRCNN: 1-1                                            [100, 4]                  --
│    └─GeneralizedRCNNTransform: 2-1                         [1, 3, 800, 800]          --
│    └─BackboneWithFPN: 2-2                                  [1, 256, 13, 13]          --
│    │    └─IntermediateLayerGetter: 3-1                     [1, 2048, 25, 25]         23,454,912
│    │    └─FeaturePyramidNetwork: 3-2                       [1, 256, 13, 13]          3,344,384
│    └─RegionProposalNetwork: 2-3                            [1000, 4]                 --
│    │    └─RPNHead: 3-3                                     [1, 3, 200, 200]          593,935
│    │    └─AnchorGenerator: 3-4                             [159882, 4]               --
│    └─RoIHeads: 2-4                                         [100, 4]                  --
│    │    └─MultiScaleRoIAlign: 3-5                          [1000, 256, 7, 7]         --
│    │    └─TwoMLPHead: 3-6                                  [1000, 1024]              13,895,680
│    │    └─FastRCNNPredictor: 3-7                           [1000, 3]                 15,375
==============================================================================================================
Total params: 41,304,286
Trainable params: 41,081,886
Non-trainable params: 222,400
Total mult-adds (G): 133.97
==============================================================================================================
Input size (MB): 0.60
Forward/backward pass size (MB): 1483.73
Params size (MB): 165.22
Estimated Total Size (MB): 1649.54
==============================================================================================================

2.3. 损失函数

fasterrcnn_resnet50_fpn 中,有四个损失函数,在训练模式 model.train() 下,通过 print 直接打印 model 的输出 outputs = model(images, targets) 就可以得到所有的损失,结果如下:

{
    'loss_classifier': tensor(1.4224, device='cuda:0', grad_fn=<NllLossBackward0>), 
    'loss_box_reg': tensor(0.1402, device='cuda:0', grad_fn=<DivBackward0>), 
    'loss_objectness': tensor(0.1346, device='cuda:0', grad_fn=<BinaryCrossEntropyWithLogitsBackward0>), 
    'loss_rpn_box_reg': tensor(0.0372, device='cuda:0', grad_fn=<DivBackward0>)
}

这里有四个损失函数:

  • loss_classifier:整个 Faster R-CNN 对候选框的分类损失;
  • loss_box_reg:整个 Faster R-CNN 对候选框的回归损失;
  • loss_objectness:RPN 模块对于候选框是否是背景的分类损失;
  • loss_rpn_box_reg:RPN 模块对于候选框的回归损失。

这四个损失通过线性组合在一起:

loss=λ1loss_classifier+λ2loss_box_reg+λ3loss_objectness+λ4loss_rpn_box_regloss = \lambda _1\cdot loss\_classifier + \lambda _2\cdot loss\_box\_reg + \lambda _3\cdot loss\_objectness + \lambda _4\cdot loss\_rpn\_box\_reg

此处一般设置λ1=λ2=λ3=λ4=1\lambda _1=\lambda _2=\lambda _3=\lambda _4=1,代码如下:

outputs = model(images, targets)
optimizer.zero_grad()

print(f"train->outputs: {outputs}")
loss = sum(loss for loss in outputs.values())

2.4. 模型训练

模型训练的代码如下:

def train(self):
    # 1. 加载数据,拆分训练集和验证集
    train_dataset = FacemaskDataset(self.WORKING_DIR)
    generator = torch.Generator().manual_seed(25)
    train_dataset, val_dataset = random_split(train_dataset, [0.8, 0.2], generator=generator)

    device = "cuda" if torch.cuda.is_available() else "cpu"

    if device == "cuda":
        num_workers = torch.cuda.device_count() * 4

    BATCH_SIZE = 8

    train_dataloader = DataLoader(dataset=train_dataset, 
                                  num_workers=num_workers, 
                                  pin_memory=False,
                                  batch_size=BATCH_SIZE,
                                  shuffle=True,
                                  collate_fn=self.collate_fn)
    val_dataloader = DataLoader(dataset=val_dataset,
                                  num_workers=num_workers, 
                                  pin_memory=False,
                                  batch_size=BATCH_SIZE,
                                  shuffle=True,
                                  collate_fn=self.collate_fn)
        
    # 2. 定义模型
    model = fasterrcnn_resnet50_fpn(weights=True)
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes=3)
    model = model.to(device)
    # INFO: 设计模型参数
    optimizer = optim.SGD([p for p in model.parameters() if p.requires_grad], lr=0.001, momentum=0.95, weight_decay=0.05)

    EPOCHS = 40

    train_losses = []
    val_losses = []
    best_val = float('-inf')

    for epoch in tqdm(range(EPOCHS), desc="EPOCHS", leave=True):
        model.train()
        train_running_loss = 0
    
        for idx, images_targets in enumerate(tqdm(train_dataloader, desc="Training", leave=True)):
            images = [image.to(device) for image in images_targets[0]]
            targets = [{k: v.to(device) for k, v in target.items()} for target in images_targets[1]]

            outputs = model(images, targets)
            optimizer.zero_grad()

            # print(f"train->outputs: {outputs}")
            loss = sum(loss for loss in outputs.values())
        
            train_running_loss += loss.item()

            loss.backward()
            optimizer.step()

        train_loss = train_running_loss/(idx + 1)

        train_losses.append(train_loss)

        val_score = self.custom_evaluate(model, val_dataloader, device)
        print(f"val_score: {val_score}")
        if val_score["mAP"] > best_val:
            best_val = val_score["mAP"]
            torch.save(model.state_dict(), 'best_fasterrcnn.pth')
    # INFO: 记录
    EPOCHS_plot = []
    train_losses_plot = []
    val_losses_plot = []
    for i in range(0, EPOCHS, 5):
        EPOCHS_plot.append(i)
        train_losses_plot.append(train_losses[i])
    print(f"EPOCHS_plot: {EPOCHS_plot}")
    print(f"train_losses_plot: {train_losses_plot}")
    plot_losses(EPOCHS_plot, train_losses_plot, val_losses_plot, "fasterrcnn_train_val_losses.jpg")

请注意,在 DataLoader 函数中重新定义了 collate_fn 函数,其定义的代码如下:

def collate_fn(self, batch):
    return tuple(zip(*batch))

其主要的原因是图像未经过 resize,collate_fn 默认会将数据打包成 batch,但是不是同样大小的数据无法打包,使用上述自定义的方式,直接以 tuple 返回训练数据。

在验证集上,我们定义了 mAP 的计算,其具体的代码如下:

def custom_evaluate(self, model, dataloader, device, num_classes=3, iou_threshold=0.5):
    model.eval()
    all_preds = []
    all_targets = []

    # 1. 数据收集阶段
    with torch.no_grad():
        for images, targets in dataloader:
            images = [img.to(device) for img in images]
            batch_preds = model(images)  # 模型输出格式: List[Dict{boxes, labels, scores}]
            
            # 转换预测结果为统一格式 [image_id, class, conf, x1, y1, x2, y2]
            for img_idx, pred in enumerate(batch_preds):
                img_id = targets[img_idx]['image_id'].item()
                for box, label, score in zip(pred['boxes'], pred['labels'], pred['scores']):
                    all_preds.append([img_id, label.item(), score.item(), *box.cpu().tolist()])
            
            # 转换真实标签为统一格式 [image_id, class, x1, y1, x2, y2]
            for target in targets:
                img_id = target['image_id'].item()
                for box, label in zip(target['boxes'], target['labels']):
                    all_targets.append([img_id, label.item(), *box.cpu().tolist()])

    # 2. 按类别分组
    class_preds = defaultdict(list)
    class_targets = defaultdict(list)
    for pred in all_preds:
        class_preds[pred[1]].append(pred)
    for target in all_targets:
        class_targets[target[1]].append(target)

    # 3. 逐类别计算AP
    aps = []
    for class_id in range(num_classes):
        preds = sorted(class_preds.get(class_id, []), key=lambda x: x[2], reverse=True)  # 按置信度排序
        targets = [t[2:] for t in class_targets.get(class_id, [])]  # 提取坐标
        
        tp = np.zeros(len(preds))
        fp = np.zeros(len(preds))
        gt_matched = defaultdict(set)  # 记录已匹配的真实框

        # 4. IoU匹配
        for pred_idx, pred in enumerate(preds):
            img_id = pred[0]
            pred_box = pred[3:]
            max_iou = 0.0
            best_gt_idx = -1

            # 获取同图像同类别真实框
            img_targets = [t for t in class_targets[class_id] if t[0] == img_id]
            for gt_idx, gt_box in enumerate(img_targets):
                if gt_idx in gt_matched[img_id]:
                    continue  # 已匹配的框跳过
                iou = calculate_iou(pred_box, gt_box[2:])  # 计算IoU
                if iou > max_iou and iou >= iou_threshold:
                    max_iou = iou
                    best_gt_idx = gt_idx

            # 标记TP/FP
            if best_gt_idx != -1:
                tp[pred_idx] = 1
                gt_matched[img_id].add(best_gt_idx)
            else:
                fp[pred_idx] = 1

        # 5. 计算PR曲线
        tp_cum = np.cumsum(tp)
        fp_cum = np.cumsum(fp)
        recalls = tp_cum / (len(targets) + 1e-6)
        precisions = tp_cum / (tp_cum + fp_cum + 1e-6)
        
        # 6. VOC 11点插值法计算AP
        ap = 0.0
        for t in np.arange(0, 1.1, 0.1):
            mask = recalls >= t
            if np.sum(mask) == 0:
                p = 0
            else:
                p = np.max(precisions[mask])
            ap += p / 11
        
        aps.append(ap)

    # 7. 计算全局指标
    mAP = np.mean(aps) if aps else 0.0
    final_recall = np.sum([len(v) for v in class_targets.values()]) / (len(all_targets) + 1e-6)
    
    return {"mAP": mAP, "Recall": final_recall}

2.5. 模型预测

模型推理就是加载保存好的模型,并对新的数据预测,代码如下:

def predict(self, img, model_path, score=0.5):
    transform_img = transforms.Compose([transforms.ToTensor()])

    device = "cuda" if torch.cuda.is_available() else "cpu"

    model = fasterrcnn_resnet50_fpn()
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes=3)
    model = model.to(device)
    model.load_state_dict(torch.load(model_path, 
                                     map_location=torch.device(device), 
                                     weights_only=True))

    # 开始推理
    model.eval()
    with torch.inference_mode():
        tensor_image = transform_img(img)
        tensor_image = tensor_image.float().to(device)

        pred = model(tensor_image.unsqueeze(0))
        final_ans = []
        # 解析出结果
        for i in range(len(pred)):
            box_res = pred[i]["boxes"].long().tolist()
            print(f"box_res: {box_res}")
            label_res = pred[i]["labels"].tolist()
            print(f"label_res: {label_res}")
            score_res = pred[i]["scores"].tolist()
            print(f"score_res: {score_res}")
            for j, label in enumerate(label_res):
                if score_res[j] >= score:
                    ans = {}
                    ans["box"] = box_res[j]
                    ans["label"] = label
                    ans["score"] = score_res[j]
                    final_ans.append(ans)
        return final_ans

最终在测试数据上的表现如下:

3. 总结

文章主要是基于 TorchVision 中的 fasterrcnn_resnet50_fpn 模型,在新的数据集上 Finetune,完成基本的 Fast R-CNN 在口罩检测任务上的实践,这其中只是对 Faster R-CNN 的原理有粗略的涉及,要想理解其中的具体过程,还需要对其源代码做详细的理解。

参考文献

[1] Ren S, He K, Girshick R, et al. Faster r-cnn: Towards real-time object detection with region proposal networks[J]. Advances in neural information processing systems, 2015, 28.

[2] https://courses.grainger.illinois.edu/ece420/sp2017/iccv2015_tutorial_convolutional_feature_maps_kaiminghe.pdf

[3] https://www.kaggle.com/datasets/andrewmvd/face-mask-detection/data

[4] https://docs.pytorch.org/tutorials/intermediate/torchvision_tutorial.html