Using v4l2sink with DeepStream

223 views Asked by At

I'm working on deepstream code to pass rtsp streams to virtual V4L2 devices (I used v4l2loopback to create the virtual devices). I have a code that works without errors, however, I can't read the V4L2 device.

Does anyone know of a working DeepStream code where v4l2sink is used? I have tried to find an example without success.

Here is my code. The writing part to v4l2sink is in the function: create_v4l2sink_branch()

import sys
import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstRtspServer', '1.0')
import math
import sys
import common.utils as DS_UTILS
import pyds
from common.bus_call import bus_call
from common.FPS import PERF_DATA
from common.is_aarch_64 import is_aarch64
from gi.repository import GLib, Gst, GstRtspServer

CODEC="H264"
BITRATE=4000000
MAX_DISPLAY_LEN = 64
MUXER_OUTPUT_WIDTH = 1920
MUXER_OUTPUT_HEIGHT = 1080
MUXER_BATCH_TIMEOUT_USEC = 400000
TILED_OUTPUT_WIDTH = 1920
TILED_OUTPUT_HEIGHT = 1080
GST_CAPS_FEATURES_NVMM = "memory:NVMM"
OSD_PROCESS_MODE = 0
OSD_DISPLAY_TEXT = 1
MUX_SYNC_INPUTS = 0

ds_loop=None
perf_data = None

def terminate_pipeline(u_data):
    global ds_loop
    pass
    # if global_config.request_to_stop == True:
    #     print("Aborting pipeline by request")
    #     ds_loop.quit()
    #     return False
    return True 

def create_onscreen_branch(pipeline, gst_elem, index):
    print("Creating EGLSink")
    sink = DS_UTILS.create_gst_element("nveglglessink", f"nvvideo-renderer-{index}")
    sink.set_property('sync', 0)
    sink.set_property('async', 1)
    pipeline.add(sink)

    if is_aarch64():
        transform = DS_UTILS.create_gst_element("nvegltransform", f"nvegl-transform{index}")
        pipeline.add(transform)
        gst_elem.link(transform)
        transform.link(sink)
    else:
        gst_elem.link(sink)
    sink.set_property("qos", 0)

def create_v4l2sink_branch(pipeline, gst_elem, index, output_video_device):
    # Create a caps filter
    caps = DS_UTILS.create_gst_element("capsfilter", f"filter-{index}")
    #caps.set_property("caps", Gst.Caps.from_string("video/x-raw(memory:NVMM), format=I420"))
    #caps.set_property("caps", Gst.Caps.from_string("video/x-raw(memory:NVMM), format=NV12"))

    identity = DS_UTILS.create_gst_element("identity", f"identity-{index}")
    identity.set_property("drop-allocation", 1)

    nvvidconv = DS_UTILS.create_gst_element("nvvideoconvert", f"convertor-{index}")    

    sink = DS_UTILS.create_gst_element("v4l2sink", f"v4l2sink-{index}")
    sink.set_property('device', output_video_device)
    sink.set_property("sync", 0)
    sink.set_property("async", 1)

    pipeline.add(caps)
    pipeline.add(nvvidconv)
    pipeline.add(identity)
    pipeline.add(sink)

    gst_elem.link(caps)
    caps.link(nvvidconv)
    nvvidconv.link(identity)
    identity.link(sink)

def run_pipeline(rtsp_v4l2_pairs):
    # Check input arguments
    number_sources = len(rtsp_v4l2_pairs)
    perf_data = PERF_DATA(number_sources)

    # Standard GStreamer initialization
    Gst.init(None)

    # Create gstreamer elements */
    # Create Pipeline element that will form a connection of other elements
    print("Creating Pipeline")
    pipeline = Gst.Pipeline()
    is_live = False
    if not pipeline:
        sys.stderr.write(" Unable to create Pipeline \n")
        return 

    # Create nvstreammux instance to form batches from one or more sources.
    streammux = DS_UTILS.create_gst_element("nvstreammux", "Stream-muxer")
    pipeline.add(streammux)
    for i in range(number_sources):
        uri_name = rtsp_v4l2_pairs[i][0]
        print("  Creating source_bin {} --> {}".format(i, uri_name))
        is_live = uri_name.find("rtsp://") == 0
        source_bin = DS_UTILS.create_source_bin(i, uri_name)

        pipeline.add(source_bin)
        padname = "sink_%u" % i
        sinkpad = streammux.get_request_pad(padname)
        if not sinkpad:
            sys.stderr.write("Unable to create sink pad bin \n")
        srcpad = source_bin.get_static_pad("src")
        if not srcpad:
            sys.stderr.write("Unable to create src pad bin \n")
        srcpad.link(sinkpad)

    # streammux setup 
    if is_live:
        print("  At least one of the sources is live")
        streammux.set_property('live-source', 1)   
    streammux.set_property('width', MUXER_OUTPUT_WIDTH)
    streammux.set_property('height', MUXER_OUTPUT_HEIGHT)
    streammux.set_property('batch-size', number_sources)
    streammux.set_property("batched-push-timeout", MUXER_BATCH_TIMEOUT_USEC)
    #streammux.set_property("sync-inputs", MUX_SYNC_INPUTS)


    queue = DS_UTILS.create_gst_element("queue", "queue1")
    pipeline.add(queue)
    nvstreamdemux = DS_UTILS.create_gst_element("nvstreamdemux", "nvstreamdemux")
    pipeline.add(nvstreamdemux)

    # linking
    streammux.link(queue)
    queue.link(nvstreamdemux)


    for i in range(number_sources):
        queue = DS_UTILS.create_gst_element("queue", f"queue{2+i}")
        pipeline.add(queue)

        demuxsrcpad = nvstreamdemux.get_request_pad(f"src_{i}")
        if not demuxsrcpad:
            sys.stderr.write("Unable to create demux src pad \n")

        queuesinkpad = queue.get_static_pad("sink")
        if not queuesinkpad:
            sys.stderr.write("Unable to create queue sink pad \n")
        demuxsrcpad.link(queuesinkpad)

        #create_onscreen_branch(pipeline=pipeline, gst_elem=queue, index=i)
        create_v4l2sink_branch(pipeline=pipeline, gst_elem=queue, index=i, output_video_device=rtsp_v4l2_pairs[i][1])

    # for termate the pipeline 
    GLib.timeout_add_seconds(1, terminate_pipeline, 0)
    # display FPS 
    GLib.timeout_add(5000, perf_data.perf_print_callback)

    # create an event loop and feed gstreamer bus mesages to it
    loop = GLib.MainLoop()
    ds_loop = loop
    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect("message", bus_call, loop)
 

    print("Starting pipeline")
    # start play back and listed to events      
    pipeline.set_state(Gst.State.PLAYING)
    try:
        loop.run()
    except:
        pass
    # cleanup
    print("Pipeline ended")
    pipeline.set_state(Gst.State.NULL)

if __name__ == '__main__':
    import json
    import sys 

    pairs = [
        ("rtsp://192.168.1.88:554/22", "/dev/video6")
    ]
    

    run_pipeline(rtsp_v4l2_pairs=pairs)
0

There are 0 answers