tensorflow/models

View on GitHub
official/vision/evaluation/coco_utils.py

Summary

Maintainability
D
2 days
Test Coverage
# Copyright 2024 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Util functions related to pycocotools and COCO eval."""

import copy
import json

# Import libraries

from absl import logging
import numpy as np
from PIL import Image
from pycocotools import coco
from pycocotools import mask as mask_api
import six
import tensorflow as tf, tf_keras

from official.common import dataset_fn
from official.vision.dataloaders import tf_example_decoder
from official.vision.ops import box_ops
from official.vision.ops import mask_ops


class COCOWrapper(coco.COCO):
  """COCO wrapper class.

  This class wraps COCO API object, which provides the following additional
  functionalities:
    1. Support string type image id.
    2. Support loading the ground-truth dataset using the external annotation
       dictionary.
    3. Support loading the prediction results using the external annotation
       dictionary.
  """

  def __init__(self, eval_type='box', annotation_file=None, gt_dataset=None):
    """Instantiates a COCO-style API object.

    Args:
      eval_type: either 'box' or 'mask'.
      annotation_file: a JSON file that stores annotations of the eval dataset.
        This is required if `gt_dataset` is not provided.
      gt_dataset: the ground-truth eval datatset in COCO API format.
    """
    if ((annotation_file and gt_dataset) or
        ((not annotation_file) and (not gt_dataset))):
      raise ValueError('One and only one of `annotation_file` and `gt_dataset` '
                       'needs to be specified.')

    if eval_type not in ['box', 'mask']:
      raise ValueError('The `eval_type` can only be either `box` or `mask`.')

    coco.COCO.__init__(self, annotation_file=annotation_file)
    self._eval_type = eval_type
    if gt_dataset:
      self.dataset = gt_dataset
      self.createIndex()

  def loadRes(self, predictions):
    """Loads result file and return a result api object.

    Args:
      predictions: a list of dictionary each representing an annotation in COCO
        format. The required fields are `image_id`, `category_id`, `score`,
        `bbox`, `segmentation`.

    Returns:
      res: result COCO api object.

    Raises:
      ValueError: if the set of image id from predctions is not the subset of
        the set of image id of the ground-truth dataset.
    """
    res = coco.COCO()
    res.dataset['images'] = copy.deepcopy(self.dataset['images'])
    res.dataset['categories'] = copy.deepcopy(self.dataset['categories'])

    image_ids = [ann['image_id'] for ann in predictions]
    if set(image_ids) != (set(image_ids) & set(self.getImgIds())):
      raise ValueError('Results do not correspond to the current dataset!')
    for ann in predictions:
      x1, x2, y1, y2 = [ann['bbox'][0], ann['bbox'][0] + ann['bbox'][2],
                        ann['bbox'][1], ann['bbox'][1] + ann['bbox'][3]]
      if self._eval_type == 'box':
        ann['area'] = ann['bbox'][2] * ann['bbox'][3]
        ann['segmentation'] = [
            [x1, y1, x1, y2, x2, y2, x2, y1]]
      elif self._eval_type == 'mask':
        ann['area'] = mask_api.area(ann['segmentation'])

    res.dataset['annotations'] = copy.deepcopy(predictions)
    res.createIndex()
    return res


def convert_predictions_to_coco_annotations(predictions):
  """Converts a batch of predictions to annotations in COCO format.

  Args:
    predictions: a dictionary of lists of numpy arrays including the following
      fields. 'K' below denotes the maximum number of instances per image.
      Required fields:
        - source_id: a list of numpy arrays of int or string of shape
            [batch_size].
        - detection_boxes: a list of numpy arrays of float of shape
            [batch_size, K, 4], where coordinates are in the original image
            space (not the scaled image space).
        - detection_classes: a list of numpy arrays of int of shape
            [batch_size, K].
        - detection_scores: a list of numpy arrays of float of shape
            [batch_size, K].
      Optional fields:
        - detection_masks: a list of numpy arrays of float of shape
            [batch_size, K, mask_height, mask_width].
        - detection_keypoints: a list of numpy arrays of float of shape
            [batch_size, K, num_keypoints, 2]

  Returns:
    coco_predictions: prediction in COCO annotation format.
  """
  coco_predictions = []
  num_batches = len(predictions['source_id'])
  max_num_detections = predictions['detection_classes'][0].shape[1]
  use_outer_box = 'detection_outer_boxes' in predictions
  for i in range(num_batches):
    predictions['detection_boxes'][i] = box_ops.yxyx_to_xywh(
        predictions['detection_boxes'][i])
    if use_outer_box:
      predictions['detection_outer_boxes'][i] = box_ops.yxyx_to_xywh(
          predictions['detection_outer_boxes'][i])
      mask_boxes = predictions['detection_outer_boxes']
    else:
      mask_boxes = predictions['detection_boxes']

    batch_size = predictions['source_id'][i].shape[0]
    if 'detection_keypoints' in predictions:
      # Adds extra ones to indicate the visibility for each keypoint as is
      # recommended by MSCOCO. Also, convert keypoint from [y, x] to [x, y]
      # as mandated by COCO.
      num_keypoints = predictions['detection_keypoints'][i].shape[2]
      coco_keypoints = np.concatenate(
          [
              predictions['detection_keypoints'][i][..., 1:],
              predictions['detection_keypoints'][i][..., :1],
              np.ones([batch_size, max_num_detections, num_keypoints, 1]),
          ],
          axis=-1,
      ).astype(int)
    for j in range(batch_size):
      if 'detection_masks' in predictions:
        image_masks = mask_ops.paste_instance_masks(
            predictions['detection_masks'][i][j],
            mask_boxes[i][j],
            int(predictions['image_info'][i][j, 0, 0]),
            int(predictions['image_info'][i][j, 0, 1]),
        )
        binary_masks = (image_masks > 0.0).astype(np.uint8)
        encoded_masks = [
            mask_api.encode(np.asfortranarray(binary_mask))
            for binary_mask in list(binary_masks)
        ]
      for k in range(max_num_detections):
        ann = {}
        ann['image_id'] = predictions['source_id'][i][j]
        ann['category_id'] = predictions['detection_classes'][i][j, k]
        ann['bbox'] = predictions['detection_boxes'][i][j, k]
        ann['score'] = predictions['detection_scores'][i][j, k]
        if 'detection_masks' in predictions:
          ann['segmentation'] = encoded_masks[k]
        if 'detection_keypoints' in predictions:
          ann['keypoints'] = coco_keypoints[j, k].flatten().tolist()
        coco_predictions.append(ann)

  for i, ann in enumerate(coco_predictions):
    ann['id'] = i + 1

  return coco_predictions


def convert_groundtruths_to_coco_dataset(groundtruths, label_map=None):
  """Converts ground-truths to the dataset in COCO format.

  Args:
    groundtruths: a dictionary of numpy arrays including the fields below.
      Note that each element in the list represent the number for a single
      example without batch dimension. 'K' below denotes the actual number of
      instances for each image.
      Required fields:
        - source_id: a list of numpy arrays of int or string of shape
          [batch_size].
        - height: a list of numpy arrays of int of shape [batch_size].
        - width: a list of numpy arrays of int of shape [batch_size].
        - num_detections: a list of numpy arrays of int of shape [batch_size].
        - boxes: a list of numpy arrays of float of shape [batch_size, K, 4],
            where coordinates are in the original image space (not the
            normalized coordinates).
        - classes: a list of numpy arrays of int of shape [batch_size, K].
      Optional fields:
        - is_crowds: a list of numpy arrays of int of shape [batch_size, K]. If
            th field is absent, it is assumed that this instance is not crowd.
        - areas: a list of numy arrays of float of shape [batch_size, K]. If the
            field is absent, the area is calculated using either boxes or
            masks depending on which one is available.
        - masks: a list of numpy arrays of string of shape [batch_size, K],
    label_map: (optional) a dictionary that defines items from the category id
      to the category name. If `None`, collect the category mapping from the
      `groundtruths`.

  Returns:
    coco_groundtruths: the ground-truth dataset in COCO format.
  """
  source_ids = np.concatenate(groundtruths['source_id'], axis=0)
  heights = np.concatenate(groundtruths['height'], axis=0)
  widths = np.concatenate(groundtruths['width'], axis=0)
  gt_images = [{'id': int(i), 'height': int(h), 'width': int(w)} for i, h, w
               in zip(source_ids, heights, widths)]

  gt_annotations = []
  num_batches = len(groundtruths['source_id'])
  for i in range(num_batches):
    logging.log_every_n(
        logging.INFO,
        'convert_groundtruths_to_coco_dataset: Processing annotation %d', 100,
        i)
    max_num_instances = groundtruths['classes'][i].shape[1]
    batch_size = groundtruths['source_id'][i].shape[0]
    for j in range(batch_size):
      num_instances = groundtruths['num_detections'][i][j]
      if num_instances > max_num_instances:
        logging.warning(
            'num_groundtruths is larger than max_num_instances, %d v.s. %d',
            num_instances, max_num_instances)
        num_instances = max_num_instances
      for k in range(int(num_instances)):
        ann = {}
        ann['image_id'] = int(groundtruths['source_id'][i][j])
        if 'is_crowds' in groundtruths:
          ann['iscrowd'] = int(groundtruths['is_crowds'][i][j, k])
        else:
          ann['iscrowd'] = 0
        ann['category_id'] = int(groundtruths['classes'][i][j, k])
        boxes = groundtruths['boxes'][i]
        ann['bbox'] = [
            float(boxes[j, k, 1]),
            float(boxes[j, k, 0]),
            float(boxes[j, k, 3] - boxes[j, k, 1]),
            float(boxes[j, k, 2] - boxes[j, k, 0])]
        if 'areas' in groundtruths:
          ann['area'] = float(groundtruths['areas'][i][j, k])
        else:
          ann['area'] = float(
              (boxes[j, k, 3] - boxes[j, k, 1]) *
              (boxes[j, k, 2] - boxes[j, k, 0]))
        if 'masks' in groundtruths:
          if isinstance(groundtruths['masks'][i][j, k], tf.Tensor):
            mask = Image.open(
                six.BytesIO(groundtruths['masks'][i][j, k].numpy()))
          else:
            mask = Image.open(
                six.BytesIO(groundtruths['masks'][i][j, k]))
          np_mask = np.array(mask, dtype=np.uint8)
          np_mask[np_mask > 0] = 255
          encoded_mask = mask_api.encode(np.asfortranarray(np_mask))
          ann['segmentation'] = encoded_mask
          # Ensure the content of `counts` is JSON serializable string.
          if 'counts' in ann['segmentation']:
            ann['segmentation']['counts'] = six.ensure_str(
                ann['segmentation']['counts'])
          if 'areas' not in groundtruths:
            ann['area'] = mask_api.area(encoded_mask)
        if 'keypoints' in groundtruths:
          keypoints = groundtruths['keypoints'][i]
          coco_keypoints = []
          num_valid_keypoints = 0
          for z in range(len(keypoints[j, k, :, 1])):
            # Convert from [y, x] to [x, y] as mandated by COCO.
            x = float(keypoints[j, k, z, 1])
            y = float(keypoints[j, k, z, 0])
            coco_keypoints.append(x)
            coco_keypoints.append(y)
            if tf.math.is_nan(x) or tf.math.is_nan(y) or (
                x == 0 and y == 0):
              visibility = 0
            else:
              visibility = 2
              num_valid_keypoints = num_valid_keypoints + 1
            coco_keypoints.append(visibility)
          ann['keypoints'] = coco_keypoints
          ann['num_keypoints'] = num_valid_keypoints
        gt_annotations.append(ann)

  for i, ann in enumerate(gt_annotations):
    ann['id'] = i + 1

  if label_map:
    gt_categories = [{'id': i, 'name': label_map[i]} for i in label_map]
  else:
    category_ids = [gt['category_id'] for gt in gt_annotations]
    gt_categories = [{'id': i} for i in set(category_ids)]

  gt_dataset = {
      'images': gt_images,
      'categories': gt_categories,
      'annotations': copy.deepcopy(gt_annotations),
  }
  return gt_dataset


class COCOGroundtruthGenerator:
  """Generates the ground-truth annotations from a single example."""

  def __init__(self, file_pattern, file_type, num_examples, include_mask,
               regenerate_source_id=False):
    self._file_pattern = file_pattern
    self._num_examples = num_examples
    self._include_mask = include_mask
    self._dataset_fn = dataset_fn.pick_dataset_fn(file_type)
    self._regenerate_source_id = regenerate_source_id

  def _parse_single_example(self, example):
    """Parses a single serialized tf.Example proto.

    Args:
      example: a serialized tf.Example proto string.

    Returns:
      A dictionary of ground-truth with the following fields:
        source_id: a scalar tensor of int64 representing the image source_id.
        height: a scalar tensor of int64 representing the image height.
        width: a scalar tensor of int64 representing the image width.
        boxes: a float tensor of shape [K, 4], representing the ground-truth
          boxes in absolute coordinates with respect to the original image size.
        classes: a int64 tensor of shape [K], representing the class labels of
          each instances.
        is_crowds: a bool tensor of shape [K], indicating whether the instance
          is crowd.
        areas: a float tensor of shape [K], indicating the area of each
          instance.
        masks: a string tensor of shape [K], containing the bytes of the png
          mask of each instance.
    """
    decoder = tf_example_decoder.TfExampleDecoder(
        include_mask=self._include_mask,
        regenerate_source_id=self._regenerate_source_id)
    decoded_tensors = decoder.decode(example)

    image = decoded_tensors['image']
    image_size = tf.shape(image)[0:2]
    boxes = box_ops.denormalize_boxes(
        decoded_tensors['groundtruth_boxes'], image_size)

    source_id = decoded_tensors['source_id']
    if source_id.dtype is tf.string:
      source_id = tf.strings.to_number(source_id, out_type=tf.int64)

    groundtruths = {
        'source_id': source_id,
        'height': decoded_tensors['height'],
        'width': decoded_tensors['width'],
        'num_detections': tf.shape(decoded_tensors['groundtruth_classes'])[0],
        'boxes': boxes,
        'classes': decoded_tensors['groundtruth_classes'],
        'is_crowds': decoded_tensors['groundtruth_is_crowd'],
        'areas': decoded_tensors['groundtruth_area'],
    }
    if self._include_mask:
      groundtruths.update({
          'masks': decoded_tensors['groundtruth_instance_masks_png'],
      })
    return groundtruths

  def _build_pipeline(self):
    """Builds data pipeline to generate ground-truth annotations."""
    dataset = tf.data.Dataset.list_files(self._file_pattern, shuffle=False)
    dataset = dataset.interleave(
        map_func=lambda filename: self._dataset_fn(filename).prefetch(1),
        cycle_length=None,
        num_parallel_calls=tf.data.experimental.AUTOTUNE)

    dataset = dataset.take(self._num_examples)
    dataset = dataset.map(self._parse_single_example,
                          num_parallel_calls=tf.data.experimental.AUTOTUNE)
    dataset = dataset.batch(1, drop_remainder=False)
    dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)
    return dataset

  def __call__(self):
    return self._build_pipeline()


def scan_and_generator_annotation_file(file_pattern: str,
                                       file_type: str,
                                       num_samples: int,
                                       include_mask: bool,
                                       annotation_file: str,
                                       regenerate_source_id: bool = False):
  """Scans and generate the COCO-style annotation JSON file given a dataset."""
  groundtruth_generator = COCOGroundtruthGenerator(
      file_pattern, file_type, num_samples, include_mask, regenerate_source_id)
  generate_annotation_file(groundtruth_generator, annotation_file)


def generate_annotation_file(groundtruth_generator,
                             annotation_file):
  """Generates COCO-style annotation JSON file given a ground-truth generator."""
  groundtruths = {}
  logging.info('Loading groundtruth annotations from dataset to memory...')
  for i, groundtruth in enumerate(groundtruth_generator()):
    logging.log_every_n(logging.INFO,
                        'generate_annotation_file: Processing annotation %d',
                        100, i)
    for k, v in six.iteritems(groundtruth):
      if k not in groundtruths:
        groundtruths[k] = [v]
      else:
        groundtruths[k].append(v)
  gt_dataset = convert_groundtruths_to_coco_dataset(groundtruths)

  logging.info('Saving groundtruth annotations to the JSON file...')
  with tf.io.gfile.GFile(annotation_file, 'w') as f:
    f.write(json.dumps(gt_dataset))
  logging.info('Done saving the JSON file...')