Optimizing Python Blob Detection and Tracking for Performance on NXP's IMX8M Plus Board

53 views Asked by At

I've developed a system for blob detection and tracking in Python using OpenCV and Numpy, which is intended for processing video input. The system is structured around custom classes for detecting and tracking objects (blobs) through video frames. However, I'm encountering significant performance issues, with the system running too slowly for real-time application. My setup is deployed on an NXP's IMX8M Plus board, which offers certain hardware acceleration capabilities, but I'm uncertain how to best leverage these or otherwise optimize my system for improved performance.

The system's architecture includes:

  • Blob: Represents a detected object, holding properties like ID, contour, and a method for updating its path with new contours.
  • BlobTracker: Manages multiple Blob instances, updates their states with new detections, and calculates metrics like Intersection over Union (IoU) and centroid distances for tracking.
  • MOG2ObjectDetector: Uses OpenCV's MOG2 background subtractor for initial blob detection, processes masks to generate bounding boxes, and feeds these into the BlobTracker.

Main Issues:

  1. The system's performance does not meet the requirements for real-time processing.
  2. I'm unsure how to utilize the hardware acceleration features of the NXP's IMX8M Plus board effectively.
  3. I'm considering various optimizations, including potentially using spatial hashing for detected blobs, and open to suggestions on code or algorithmic improvements.

Questions:

  1. What specific code optimizations can be made, especially with numpy or OpenCV operations, to enhance performance?
  2. How can I take advantage of the hardware acceleration capabilities available on the NXP's IMX8M Plus board in this context?
  3. Would implementing a spatial hash for blob detection significantly improve tracking performance? If so, how should I approach it?

Code:

We will call, for each frame, the MOG2ObjectDetector.get_matches(...):


import math
from typing import List, Tuple, Optional
import cv2
import numpy as np

class Blob:
    """Class to represent a detected object (blob) in a video."""

    def __init__(self, id: int, contour: List[int]):
        """Initialize a Blob with an ID, a contour, a path, a time_to_live and a centroid."""
        self.id = id
        self.seen = False
        self.contour = contour
        self.path = [contour]
        self.time_to_live = 10

    def update_path(self, new_contour):
        """Update the path of the Blob with a new contour and recalculate its centroid."""
        self.path.append(new_contour)
        self.contour = new_contour
        self.time_to_live += 1

    def get_detection(self):
        """Decrease the time_to_live by one and return the contour of the Blob."""
        self.time_to_live -= 1
        return self.contour

class BlobTracker:
    """Class to track multiple Blobs in a video."""

    def __init__(self, width, height, fps):
        """Initialize a BlobTracker with an ID counter, a list of Blobs, and frame parameters."""
        self.next_id = 0
        self.blobs: List[Blob] = []
        self.frame_width = width
        self.frame_height = height
        self.flow_threshold = max(0, width // fps) ** 2 # Assuming path in video is 5 meters long and the maximum velocity of an object is 30km/h. (about 10m/s -> which takes 1/2s to pass the entire frame)
        self.iou_threshold = 0.6

    def update_blobs(self, new_matches: List[List[int]]):
        """Update the Blobs based on new matches."""
        for blob in self.blobs:
            self.update_blob(blob, new_matches)

        for match in new_matches:
            print("Created new matched blob:", self.next_id)
            new_blob = Blob(self.next_id, match[:4])
            self.next_id += 1
            self.blobs.append(new_blob)

        return self.get_detections()

    def get_detections(self):
        """Get the detections from the Blobs, and remove any Blobs that are no longer alive."""
        detections = []
        for blob in self.blobs:
            if not blob.seen:
                blob.seen = True
                detections.append(blob.get_detection())
            if blob.time_to_live <= 0:
                self.blobs.remove(blob)
                del blob
        return detections

    def update_blob(self, blob: Blob, matches: List[List[int]]):
        """Update a Blob based on new matches."""
        for match in matches:
            if self.iou_constraint_is_passed(blob, match[:4]):
                if self.distance_squared_constraint_is_passed(blob, match[:4]):
                    blob.update_path(match[:4])
                    matches.remove(match)

    def iou_constraint_is_passed(self, blob: Blob, match: List[int]) -> bool:
        """Check if the Intersection over Union (IoU) of a Blob and a match is above a threshold."""
        iou = self.calculate_iou(blob.contour, match)
        return iou > self.iou_threshold
    @staticmethod
    def calculate_iou(blob_contour, match_contour):
        xA = max(blob_contour[0], match_contour[0])
        yA = max(blob_contour[1], match_contour[1])
        xB = min(blob_contour[2], match_contour[2])
        yB = min(blob_contour[3], match_contour[3])
        interArea = max(0, xB - xA + 1) * max(0, yB - yA + 1)
        boxAArea = (blob_contour[2] - blob_contour[0] + 1) * (blob_contour[3] - blob_contour[1] + 1)
        boxBArea = (match_contour[2] - match_contour[0] + 1) * (match_contour[3] - match_contour[1] + 1)
        iou = interArea / float(boxAArea + boxBArea - interArea)
        return iou    
    def distance_squared_constraint_is_passed(self, blob: Blob, match: List[int]):
        """Check if the squared distance between the centroid of a Blob and a match is below a threshold."""
        print(f"Calculate dS2: {blob.contour} vs. {match}")
        blob_centroid = self.calculate_centroid(*blob.contour)
        match_centroid = self.calculate_centroid(*match)
        dS2 = (blob_centroid[0] - match_centroid[0]) ** 2 + (blob_centroid[1] - match_centroid[1]) ** 2
        return dS2 < self.flow_threshold
    @staticmethod
    def calculate_centroid(x_min: int, y_min: int, x_max: int, y_max: int):
        """Calculate the centroid of the match based on its contour."""
        return list(map(lambda val: val // 2, (x_min + x_max, y_min + y_max)))

class MOG2ObjectDetector:
    def __init__(
        self,
        inputShape: tuple,
        doTPU: bool = False,
        fps: int = 30,
        backgroundSubtractorHistory: int = 500,
        backgroundSubtractorvarThreshold: int = 64,
        backgroundSubtractordetectShadows: bool = False,
        bboxDetectionKernelSize: int = 64,
        bboxDetectionThreshold: int = 64,
        bboxMaxIncrease: int = 2,
        returnMask: bool = False,
    ):

        self.inputShape = inputShape
        self.doTPU = doTPU
        
        self.blob_tracker = BlobTracker(inputShape[0], inputShape[1], fps)

        self.backgroundSubtractor = cv2.createBackgroundSubtractorMOG2(
            history=backgroundSubtractorHistory,
            varThreshold=backgroundSubtractorvarThreshold,
            detectShadows=backgroundSubtractordetectShadows,
        )

        self.bboxDetectionKernel = np.ones(
            (bboxDetectionKernelSize, bboxDetectionKernelSize), dtype=np.float16
        ) / (bboxDetectionKernelSize**2)
        self.bboxDetectionThreshold = bboxDetectionThreshold
        self.bboxMaxIncrease = bboxMaxIncrease
        self.bboxLastFrame = []

        self.returnMask = returnMask

    def get_matches(self, processed, frame):
        # blurred = cv2.GaussianBlur(processed, (5, 5), 0)
        blurred = processed # does not make a lot of sense to do gaussian before MOG imo, but it seemed like other's did it as well
        mask = self.frame_to_mask(blurred)
        processed = self.process_mask(mask)
        bbox = self.mask_to_bbox(processed)
        blobs = self.blob_tracker.update_blobs(bbox)
        print(blobs)
        return blobs

    def frame_to_mask(self, frame):
        """
        Convert a frame to a mask. If TPU (in this case, GPU) is available, the frame is first converted to grayscale, then to UMat
        before applying the background subtractor. If TPU is not available, the background subtractor is applied
        directly to the frame.

        Args:
            frame: The frame to be converted to a mask.

        Returns:
            mask: The frame converted to a mask.
        """
        if self.doTPU:
            # convert image to grayscale
            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            # convert numpy.ndarray to UMat
            umat_gray = cv2.UMat(gray)
            umat_mask = self.backgroundSubtractor.apply(umat_gray)

            mask = umat_mask.get()

            return mask
        else:
            return self.backgroundSubtractor.apply(frame)

    def process_mask(self, mask):
        """
        Process a mask. If TPU (in this case, GPU) is available, the mask is first converted to UMat, then convolved with a
        bboxDetectionKernel and thresholded. If TPU is not available, the mask is directly convolved and thresholded.

        Args:
            mask: The mask to be processed.

        Returns:
            processed: The processed mask.
        """
        if self.doTPU:
            umat_mask = cv2.UMat(mask)

            umat_convolved = cv2.filter2D(umat_mask, -1, self.bboxDetectionKernel)
            _, umat_processed = cv2.threshold(
                umat_convolved, self.bboxDetectionThreshold, 255, cv2.THRESH_BINARY
            )

            processed = umat_processed.get()

            return processed
        else:
            convolved = cv2.filter2D(mask, -1, self.bboxDetectionKernel)
            _, processed = cv2.threshold(
                convolved, self.bboxDetectionThreshold, 255, cv2.THRESH_BINARY
            )
            return processed

    @staticmethod
    def get_centroid(x: int, y: int, w: int, h: int) -> Tuple[int, int]:
        x1 = int(w / 2)
        y1 = int(h / 2)

        cx = x + x1
        cy = y + y1

        return (cx, cy)

    def mask_to_bbox(self, thresholded):
        """
        Convert a thresholded mask into bounding boxes. Each contour found in the mask is converted into a bounding box.
        The bounding box's parameters are calculated and a centroid is also calculated for each bounding box.

        Args:
            thresholded: The thresholded mask to be converted into bounding boxes.

        Returns:
            detections: A list of bounding boxes and their respective centroids.
        """
        if self.doTPU:
            # convert numpy.ndarray to UMat
            umat_thresholded = cv2.UMat(thresholded.astype("uint8"))
            contours, _ = cv2.findContours(
                umat_thresholded, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_TC89_L1
            )

            detections = [_ for _ in range(len(contours))]
            for i, contour in enumerate(contours):
                (x, y, w, h) = cv2.boundingRect(contour)
                centroid = self.get_centroid(x, y, w, h)
                detections[i] = [x, y, x + w, y + h, h * w, centroid]

            # convert UMat back to numpy.ndarray
            detections = [detection.get() for detection in detections]

        else:
            contours, _ = cv2.findContours(
                thresholded.astype("uint8"), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_TC89_L1
            )

            detections = [_ for _ in range(len(contours))]
            for i, contour in enumerate(contours):
                (x, y, w, h) = cv2.boundingRect(contour)
                centroid = self.get_centroid(x, y, w, h)
                detections[i] = [x, y, x + w, y + h, h * w, centroid]

        return detections

    def limit_bbox(self, bbox):
        if len(bbox) - len(self.bboxLastFrame) > self.bboxMaxIncrease:
            return self.bboxLastFrame
        bbox = [b for b in bbox if b[4] > 1000]
        return bbox

I have tried converting to umat and to simpler precision, but these seem to not make it go any faster. Documentation on OpenCV and OpenGL stated that converting to umat should make it run on the GPU, but this does not seem to be the case. I am really new to developing on nxp devices and am kind of struggling using the provided tools, even though I combed through their documentations multiple times.

Thanks!!!

0

There are 0 answers