Simultaneously map video and data streams to one subprocess pipeline in real-time

1.2k views Asked by At

I need to process the video stream and the klvdata streams simultaneously in real-time in OpenCV/Python. I'm using FFMPEG to read the file or stream as OpenCV does not retain the klvdata. I pass the data to OpenCV with the subprocess module.

My problem is I cannot figure out how to map both the video and klvdata to the same subprocess pipe simultaneously?

My code:

#!/usr/bin/env python3
import sys, json, klvdata;
from subprocess import PIPE
import subprocess as sp
import cv2
import numpy

command = ['ffmpeg',
    '-i', 'DayFlight.mpg',
    '-map', '0:0',
    '-map', '0:d',        
    '-pix_fmt', 'bgr24',
    '-c:v', 'rawvideo',      
    '-an','-sn',              
    '-f', 'image2pipe', '-',
    '-c:d', 'copy',
    '-f','data',
    ]

pipe = sp.Popen(command, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE, bufsize=10**8)

while True:
   raw_image = pipe.stdout.read(1280*720*3)
   image =  numpy.fromstring(raw_image, dtype='uint8')
   image = image.reshape((720,1280,3))          
   if image is not None:
      cv2.imshow('Video', image)
   if cv2.waitKey(1) & 0xFF == ord('q'):
      break
   for packet in klvdata.StreamParser(pipe.stdout): 
      metadata = packet.MetadataList()
      print(metadata)
pipe.stdout.flush()
cv2.destroyAllWindows()

Produces the below error:

Traceback (most recent call last):
  File "test_cv.py", line 32, in <module>
    metadata = packet.MetadataList()
AttributeError: 'UnknownElement' object has no attribute 'MetadataList'

Any help is greatly appreciated.

1

There are 1 answers

1
Rotem On BEST ANSWER

For splitting the video and data, we may map the video stream to stderr pipe and map the KLV data stream to to stdout pipe.

The same technique is used for separating video and audio in my following answer.

Accurate synchronization between the the video frame and the corresponding data is relatively simple when each video frame has private KLV data (synchronize by sequential order).

The Day Flight.mpg sample file has much fewer data packets than frames, and accurate synchronization is not possible using the suggested solution (I don't think it is possible using the pipes approach).
We may still apply some coarse synchronization - assume the data and the frame are read in time proximity.

Suggested way for splitting the video and the data:

                                            -----------
                                       --->| Raw Video | ---> stderr (pipe)
 -----------        -------------     |     -----------    
| Input     |      | FFmpeg      |    |
| Video with| ---> | sub-process | ---      
| Data      |      |             |    |    
 -----------        -------------     |     -----------
                                       --->| KLV data  | ---> stdout (pipe)
                                            -----------

The video and the data are read in two separate threads:

  • Video reader thread - read raw video frames (in BGR) format.
  • Data reader thread - read and parse KLV data.

According to Wikipedia, the KLV format is not well defined:

Keys can be 1, 2, 4, or 16 bytes in length.
Presumably in a separate specification document you would agree on a key length for a given application.

In the sample video, the key length is 16 bytes, but it is not guaranteed...


Reading the KLV data from stdout pipe:
When reading data from a pipe (in real-time like manner), we need to know the expected number of bytes to read.
That forces us to do partial parsing of the KLV data:

  • Read the "key" (assume 16 bytes length).
  • Read the "length" - there is some challenge with "BER length" standard.
  • Read the the "data" (size to read is defined by the length).

After reading the key, length and data, we have one "KLV data packet", we can send to the KLV data parser.


Here is a code sample that woks with Day Flight.mpg sample input file:

#!/usr/bin/env python3
import klvdata
import subprocess as sp
import shlex
import threading
import numpy as np
import cv2
from io import BytesIO

# Video reader thread.
def video_reader(pipe):
    cols, rows = 1280, 720  # Assume we know frame size is 1280x720

    counter = 0
    while True:
        raw_image = pipe.read(cols*rows*3)  # Read raw video frame

        # Break the loop when length is too small
        if len(raw_image) < cols*rows*3:
            break

        if (counter % 60) == 0:
            # Show video frame evey 60 frames
            image = np.frombuffer(raw_image, np.uint8).reshape([rows, cols, 3])
            cv2.imshow('Video', image) # Show video image for testing
            cv2.waitKey(1)
        counter += 1



# https://github.com/paretech/klvdata/tree/master/klvdata
def bytes_to_int(value, signed=False):
    """Return integer given bytes."""
    return int.from_bytes(bytes(value), byteorder='big', signed=signed)


# Data reader thread (read KLV data).
def data_reader(pipe):
    key_length = 16  # Assume key length is 16 bytes.

    f = open('data.bin', 'wb')  # For testing - store the KLV data to data.bin (binary file)

    while True:
        # https://en.wikipedia.org/wiki/KLV
        # The first few bytes are the Key, much like a key in a standard hash table data structure.
        # Keys can be 1, 2, 4, or 16 bytes in length.
        # Presumably in a separate specification document you would agree on a key length for a given application.
        key = pipe.read(key_length)  # Read the key
        
        if len(key) < key_length:
            break  # Break the loop when length is too small
        f.write(key)  # Write data to binary file for testing

        # https://github.com/paretech/klvdata/tree/master/klvdata
        # Length field
        len_byte = pipe.read(1)

        if len(len_byte) < 1:
            break  # Break the loop when length is too small
        f.write(len_byte)  # Write data to binary file for testing

        byte_length = bytes_to_int(len_byte)

        # https://github.com/paretech/klvdata/tree/master/klvdata                                                
        if byte_length < 128:
            # BER Short Form
            length = byte_length
            ber_len_bytes = b''
        else:
            # BER Long Form
            ber_len = byte_length - 128
            ber_len_bytes = pipe.read(ber_len)

            if len(ber_len_bytes) < ber_len:
                break  # Break the loop when length is too small
            f.write(ber_len_bytes)  # Write ber_len_bytes to binary file for testing

            length = bytes_to_int(ber_len_bytes)

        # Read the value (length bytes)
        value = pipe.read(length)
        if len(value) < length:
            break  # Break the loop when length is too small
        f.write(value)  # Write data to binary file for testing

        klv_data = key + len_byte + ber_len_bytes + value  # Concatenate key length and data
        klv_data_as_bytes_io = BytesIO(klv_data)  # Wrap klv_data with BytesIO (before parsing)

        # Parse the KLV data
        for packet in klvdata.StreamParser(klv_data_as_bytes_io): 
            metadata = packet.MetadataList()
            print(metadata)
            print() # New line

# Execute FFmpeg as sub-process
# Map the video to stderr and map the data to stdout
process = sp.Popen(shlex.split('ffmpeg -hide_banner -loglevel quiet '                        # Set loglevel to quiet for disabling the prints ot stderr
                               '-i "Day Flight.mpg" '                                        # Input video "Day Flight.mpg"
                               '-map 0:v -c:v rawvideo -pix_fmt bgr24 -f:v rawvideo pipe:2 ' # rawvideo format is mapped to stderr pipe (raw video codec with bgr24 pixel format)
                               '-map 0:d -c copy -copy_unknown -f:d data pipe:1 '            # Copy the data without ddecoding.
                               '-report'),                                                   # Create a log file (because we can't the statuses that are usually printed to stderr).
                                stdout=sp.PIPE, stderr=sp.PIPE)


# Start video reader thread (pass stderr pipe as argument).
video_thread = threading.Thread(target=video_reader, args=(process.stderr,))
video_thread.start()

# Start data reader thread (pass stdout pipe as argument).
data_thread = threading.Thread(target=data_reader, args=(process.stdout,))
data_thread.start()


# Wait for threads (and process) to finish.
video_thread.join()
data_thread.join()
process.wait()

The above code saves the data to data.bin (for testing).
data.bin may be used for consistency check.
Execute FFmpeg CLI for extracting the data stream:

ffmpeg -y -i "Day Flight.mpg" -map 0:d -c copy -copy_unknown -f data raw.bin

Verify that raw.bin and data.bin files are equal.