Detecting hands or body using running_mode=VIDEO / LIVE_STREAM (Mediapipe)

552 views Asked by At

I'm trying to implement a hands detection using Mediapipe libraries. For now, I'm detecting hands frame by frame on my realsense stream using the code below.

But now I want to use running_mode=VIDEO or running_mode=LIVE_STREAM for better detection and tracking. I've found no example or documentation on this particular option. How can I implement this feature?


import cv2
import numpy as np
import os
import pyrealsense2 as rs

import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
from mediapipe import solutions
from mediapipe.framework.formats import landmark_pb2


def hand_detection_realsense_video():

  # Create an HandLandmarker object.
  VisionRunningMode = mp.tasks.vision.RunningMode
  options = vision.HandLandmarkerOptions(base_options=python.BaseOptions(model_asset_path=model_path_full),
                                      running_mode=VisionRunningMode.IMAGE,
                                      num_hands=2,
                                      min_hand_detection_confidence=0.5,
                                      min_hand_presence_confidence=0.5,
                                      min_tracking_confidence=0.5)
  detector = vision.HandLandmarker.create_from_options(options)

  # Configure depth and color streams
  pipeline = rs.pipeline()
  config = rs.config()

  # Get device product line for setting a supporting resolution
  pipeline_wrapper = rs.pipeline_wrapper(pipeline)
  pipeline_profile = config.resolve(pipeline_wrapper)
  device = pipeline_profile.get_device()
  device_product_line = str(device.get_info(rs.camera_info.product_line))  

  config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
  config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)

  # Start streaming
  pipeline.start(config)

  try:
      while True:

        # Wait for a coherent pair of frames: depth and color
        frames = pipeline.wait_for_frames()
        depth_frame = frames.get_depth_frame()
        color_frame = frames.get_color_frame()
        if not depth_frame or not color_frame:
            continue

        # Convert images to numpy arrays
        color_image = np.asanyarray(color_frame.get_data())

        # Hand detection
        mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=color_image)
        detection_result = detector.detect(mp_image)
        annotated_image = draw_landmarks_on_image(mp_image.numpy_view(), detection_result)

        # Show images
        images = np.hstack((color_image, annotated_image))
        cv2.namedWindow('RealSense', cv2.WINDOW_AUTOSIZE)
        cv2.imshow('RealSense', images)
        cv2.waitKey(1)

        if cv2.waitKey(1) & 0xFF == ord('q'): 
            break
      
  finally:

      # Stop streaming
      pipeline.stop()

1

There are 1 answers

0
Rémi.T On BEST ANSWER

I've finally got it working, you need to have result_callback pointing to your display method at the creation of the detector. The display method is slightly different since it now takes a Mediapipe image class rather than a numpy array.

Here is my code, hopping it will help someone else:

import numpy as np
import cv2
import pyrealsense2 as rs
import time

import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
from mediapipe import solutions
from mediapipe.framework.formats import landmark_pb2

#

MARGIN = 10  # pixels
FONT_SIZE = 1
FONT_THICKNESS = 1
HANDEDNESS_TEXT_COLOR = (88, 205, 54) # vibrant green

class Hands_detection():
   
  def __init__(self, model_path) -> None:
    # Initialization of the image
    self.annotated_image = np.zeros((640,480,3), np.uint8)

    # Create an HandLandmarker object
    VisionRunningMode = mp.tasks.vision.RunningMode
    options = vision.HandLandmarkerOptions(base_options=python.BaseOptions(model_asset_path=model_path_full),
                                        running_mode=VisionRunningMode.LIVE_STREAM,
                                        result_callback=self.draw_hand_landmarks_on_live,
                                        num_hands=2,
                                        min_hand_detection_confidence=0.5,
                                        min_hand_presence_confidence=0.5,
                                        min_tracking_confidence=0.5)
    self.detector = vision.HandLandmarker.create_from_options(options)

    # Configure depth and color streams
    self.pipeline = rs.pipeline()
    config = rs.config() 

    config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
    config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)

    # Start streaming
    self.pipeline.start(config)

  #

  def run(self):
    try:
      self.detect_body()
    finally:
      # Stop streaming
      self.pipeline.stop()

  #

  def draw_hand_landmarks_on_live(self, detection_result, rgb_image, _):
    hand_landmarks_list = detection_result.hand_landmarks
    handedness_list = detection_result.handedness
    rgb_image = rgb_image.numpy_view()
    annotated_image = np.copy(rgb_image)

    # Loop through the detected hands to visualize.
    for idx in range(len(hand_landmarks_list)):
      hand_landmarks = hand_landmarks_list[idx]
      handedness = handedness_list[idx]

      # Draw the hand landmarks.
      hand_landmarks_proto = landmark_pb2.NormalizedLandmarkList()
      hand_landmarks_proto.landmark.extend([
        landmark_pb2.NormalizedLandmark(x=landmark.x, y=landmark.y, z=landmark.z) for landmark in hand_landmarks
      ])
      solutions.drawing_utils.draw_landmarks(
        annotated_image,
        hand_landmarks_proto,
        solutions.hands.HAND_CONNECTIONS,
        solutions.drawing_styles.get_default_hand_landmarks_style(),
        solutions.drawing_styles.get_default_hand_connections_style())

      # Get the top left corner of the detected hand's bounding box.
      height, width, _ = annotated_image.shape
      x_coordinates = [landmark.x for landmark in hand_landmarks]
      y_coordinates = [landmark.y for landmark in hand_landmarks]
      text_x = int(min(x_coordinates) * width)
      text_y = int(min(y_coordinates) * height) - MARGIN

      # Draw handedness (left or right hand) on the image.
      cv2.putText(annotated_image, f"{handedness[0].category_name}",
                  (text_x, text_y), cv2.FONT_HERSHEY_DUPLEX,
                  FONT_SIZE, HANDEDNESS_TEXT_COLOR, FONT_THICKNESS, cv2.LINE_AA)

    self.annotated_image = annotated_image
    return

  #
  
  def detect_body(self):
    while True:
      # Wait for a coherent pair of frames: depth and color
      frames = self.pipeline.wait_for_frames()
      depth_frame = frames.get_depth_frame()
      color_frame = frames.get_color_frame()
      if not depth_frame or not color_frame:
          continue     

      color_image = np.array(color_frame.get_data())
      timestamp = int(round(time.time()*1000))

      mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=color_image)
      detection_result = self.detector.detect_async(mp_image, timestamp)

      cv2.imshow('Hand detection', self.annotated_image)
      cv2.waitKey(1)

      if cv2.waitKey(1) & 0xFF == ord('q'):
          break

This also works with body detection by modifying the detector and the display method.