I'm working on deepstream code to pass rtsp streams to virtual V4L2 devices (I used v4l2loopback to create the virtual devices). I have a code that works without errors, however, I can't read the V4L2 device.
Does anyone know of a working DeepStream code where v4l2sink is used? I have tried to find an example without success.
Here is my code. The writing part to v4l2sink is in the function: create_v4l2sink_branch()
import sys
import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstRtspServer', '1.0')
import math
import sys
import common.utils as DS_UTILS
import pyds
from common.bus_call import bus_call
from common.FPS import PERF_DATA
from common.is_aarch_64 import is_aarch64
from gi.repository import GLib, Gst, GstRtspServer
CODEC="H264"
BITRATE=4000000
MAX_DISPLAY_LEN = 64
MUXER_OUTPUT_WIDTH = 1920
MUXER_OUTPUT_HEIGHT = 1080
MUXER_BATCH_TIMEOUT_USEC = 400000
TILED_OUTPUT_WIDTH = 1920
TILED_OUTPUT_HEIGHT = 1080
GST_CAPS_FEATURES_NVMM = "memory:NVMM"
OSD_PROCESS_MODE = 0
OSD_DISPLAY_TEXT = 1
MUX_SYNC_INPUTS = 0
ds_loop=None
perf_data = None
def terminate_pipeline(u_data):
global ds_loop
pass
# if global_config.request_to_stop == True:
# print("Aborting pipeline by request")
# ds_loop.quit()
# return False
return True
def create_onscreen_branch(pipeline, gst_elem, index):
print("Creating EGLSink")
sink = DS_UTILS.create_gst_element("nveglglessink", f"nvvideo-renderer-{index}")
sink.set_property('sync', 0)
sink.set_property('async', 1)
pipeline.add(sink)
if is_aarch64():
transform = DS_UTILS.create_gst_element("nvegltransform", f"nvegl-transform{index}")
pipeline.add(transform)
gst_elem.link(transform)
transform.link(sink)
else:
gst_elem.link(sink)
sink.set_property("qos", 0)
def create_v4l2sink_branch(pipeline, gst_elem, index, output_video_device):
# Create a caps filter
caps = DS_UTILS.create_gst_element("capsfilter", f"filter-{index}")
#caps.set_property("caps", Gst.Caps.from_string("video/x-raw(memory:NVMM), format=I420"))
#caps.set_property("caps", Gst.Caps.from_string("video/x-raw(memory:NVMM), format=NV12"))
identity = DS_UTILS.create_gst_element("identity", f"identity-{index}")
identity.set_property("drop-allocation", 1)
nvvidconv = DS_UTILS.create_gst_element("nvvideoconvert", f"convertor-{index}")
sink = DS_UTILS.create_gst_element("v4l2sink", f"v4l2sink-{index}")
sink.set_property('device', output_video_device)
sink.set_property("sync", 0)
sink.set_property("async", 1)
pipeline.add(caps)
pipeline.add(nvvidconv)
pipeline.add(identity)
pipeline.add(sink)
gst_elem.link(caps)
caps.link(nvvidconv)
nvvidconv.link(identity)
identity.link(sink)
def run_pipeline(rtsp_v4l2_pairs):
# Check input arguments
number_sources = len(rtsp_v4l2_pairs)
perf_data = PERF_DATA(number_sources)
# Standard GStreamer initialization
Gst.init(None)
# Create gstreamer elements */
# Create Pipeline element that will form a connection of other elements
print("Creating Pipeline")
pipeline = Gst.Pipeline()
is_live = False
if not pipeline:
sys.stderr.write(" Unable to create Pipeline \n")
return
# Create nvstreammux instance to form batches from one or more sources.
streammux = DS_UTILS.create_gst_element("nvstreammux", "Stream-muxer")
pipeline.add(streammux)
for i in range(number_sources):
uri_name = rtsp_v4l2_pairs[i][0]
print(" Creating source_bin {} --> {}".format(i, uri_name))
is_live = uri_name.find("rtsp://") == 0
source_bin = DS_UTILS.create_source_bin(i, uri_name)
pipeline.add(source_bin)
padname = "sink_%u" % i
sinkpad = streammux.get_request_pad(padname)
if not sinkpad:
sys.stderr.write("Unable to create sink pad bin \n")
srcpad = source_bin.get_static_pad("src")
if not srcpad:
sys.stderr.write("Unable to create src pad bin \n")
srcpad.link(sinkpad)
# streammux setup
if is_live:
print(" At least one of the sources is live")
streammux.set_property('live-source', 1)
streammux.set_property('width', MUXER_OUTPUT_WIDTH)
streammux.set_property('height', MUXER_OUTPUT_HEIGHT)
streammux.set_property('batch-size', number_sources)
streammux.set_property("batched-push-timeout", MUXER_BATCH_TIMEOUT_USEC)
#streammux.set_property("sync-inputs", MUX_SYNC_INPUTS)
queue = DS_UTILS.create_gst_element("queue", "queue1")
pipeline.add(queue)
nvstreamdemux = DS_UTILS.create_gst_element("nvstreamdemux", "nvstreamdemux")
pipeline.add(nvstreamdemux)
# linking
streammux.link(queue)
queue.link(nvstreamdemux)
for i in range(number_sources):
queue = DS_UTILS.create_gst_element("queue", f"queue{2+i}")
pipeline.add(queue)
demuxsrcpad = nvstreamdemux.get_request_pad(f"src_{i}")
if not demuxsrcpad:
sys.stderr.write("Unable to create demux src pad \n")
queuesinkpad = queue.get_static_pad("sink")
if not queuesinkpad:
sys.stderr.write("Unable to create queue sink pad \n")
demuxsrcpad.link(queuesinkpad)
#create_onscreen_branch(pipeline=pipeline, gst_elem=queue, index=i)
create_v4l2sink_branch(pipeline=pipeline, gst_elem=queue, index=i, output_video_device=rtsp_v4l2_pairs[i][1])
# for termate the pipeline
GLib.timeout_add_seconds(1, terminate_pipeline, 0)
# display FPS
GLib.timeout_add(5000, perf_data.perf_print_callback)
# create an event loop and feed gstreamer bus mesages to it
loop = GLib.MainLoop()
ds_loop = loop
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)
print("Starting pipeline")
# start play back and listed to events
pipeline.set_state(Gst.State.PLAYING)
try:
loop.run()
except:
pass
# cleanup
print("Pipeline ended")
pipeline.set_state(Gst.State.NULL)
if __name__ == '__main__':
import json
import sys
pairs = [
("rtsp://192.168.1.88:554/22", "/dev/video6")
]
run_pipeline(rtsp_v4l2_pairs=pairs)