Title: Manipulating live streaming with GStreamer in Python - stop and restart multiple times

323 views Asked by At

I'm working with a GStreamer pipeline in Python to handle live streaming. My goal is to manipulate the live streaming such that when I receive a request for live streaming, I want to start an RTMP stream. This is a part of a bigger pipeline which I'm designing to store audio and video in muxed segments of one minute each and start live streaming for 30 minutes upon receiving a request.

Before integrating into the full system, I'm trying to solve a sub-problem: I want to stop and restart the live streaming multiple times with a time gap (time.sleep(100)). I'm having difficulty achieving this.

This is code that I have tried. The issue I am facing is, once rtmp2sink is put into paused or null or ready state, it does not return to playing state anymore.

import os
import gi
import time
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib

# Initialize GStreamer
Gst.init(None)

# create the elements of the pipeline

source = Gst.ElementFactory.make("v4l2src", "source")
raw_video_caps = Gst.Caps.from_string("video/x-raw, width=1280, height=720, framerate=10/1")
raw_capsfilter = Gst.ElementFactory.make("capsfilter", "raw_capsfilter")
encoder = Gst.ElementFactory.make("vpuenc_h264","encoder")
parser = Gst.ElementFactory.make("h264parse", "parser")
video_queue = Gst.ElementFactory.make("queue", "video_queue")
flvmux = Gst.ElementFactory.make("flvmux", "flvmux")
rtmpsink = Gst.ElementFactory.make("rtmp2sink", "rtmpsink")

if not source or not encoder or not flvmux or not rtmpsink:
    print("Not all element could be created")
    exit(-1)

# Create audio elements
audio_source =  Gst.ElementFactory.make("alsasrc", "audio_source")
audio_convert1 = Gst.ElementFactory.make("audioconvert", "audio_convert1")
audio_caps1 = Gst.Caps.from_string("audio/x-raw, rate=44100, channels=2, format=S16LE")
audio_capsfilter1 = Gst.ElementFactory.make("capsfilter", "audio_capsfilter1")
audio_convert2 = Gst.ElementFactory.make("audioconvert", "audio_convert2")
audio_caps2 = Gst.Caps.from_string("audio/x-raw, channels=1")
audio_capsfilter2 = Gst.ElementFactory.make("capsfilter", "audio_capsfilter2")
audio_convert3 = Gst.ElementFactory.make("audioconvert", "audio_convert3")
audio_caps3 = Gst.Caps.from_string("audio/x-raw, channels=2")
audio_capsfilter3 = Gst.ElementFactory.make("capsfilter", "audio_capsfilter3")
queue = Gst.ElementFactory.make("queue", "queue")
audio_encoder = Gst.ElementFactory.make("voaacenc", "audio_encoder")
aacparse = Gst.ElementFactory.make("aacparse", "aacparse")
audio_queue2 = Gst.ElementFactory.make("queue", "audio_queue2")

source.set_property("device", "/dev/video2")
rtmpsink.set_property("location", "rtmps://url")
rtmpsink.set_property("sync","false")

raw_capsfilter.set_property("caps", raw_video_caps)
audio_source.set_property("device", "hw:0,0")
audio_capsfilter1.set_property("caps", audio_caps1)
audio_capsfilter2.set_property("caps", audio_caps2)
audio_capsfilter3.set_property("caps", audio_caps3)
audio_encoder.set_property("bitrate", 96000)

# Create an empty pipeline
pipeline = Gst.Pipeline.new("rtmps-pipeline")

if not pipeline:
    print("Not all elements could be created")

for elem in [source, raw_capsfilter, encoder, parser,video_queue,flvmux, rtmpsink]:
    pipeline.add(elem)

for elem in [audio_source, audio_convert1, audio_capsfilter1, audio_convert2, audio_capsfilter2, audio_convert3, audio_capsfilter3, queue, audio_encoder,audio_queue2,aacparse]:
    pipeline.add(elem)

def link_element(src1, src2):
    if not src1.link(src2):
        print("Elemenets could not be linked. Exiting...")
        exit(-1)

def pause_pipeline():
    print("Pausing pipeline...")
    pipeline.set_state(Gst.State.PAUSED)
    rtmpsink.set_state(Gst.State.NULL)
    pipeline.remove(rtmpsink)
    flvmux.unlink(rtmpsink)
    # Schedule the resume after 60 seconds
    GLib.timeout_add_seconds(30, resume_pipeline)
    return False  # Return False to make sure the function is not called repeatedly

def resume_pipeline():
    global rtmpsink
    print("Resuming pipeline...")
    rtmpsink = None
    rtmpsink = Gst.ElementFactory.make("rtmp2sink", "rtmpsink")
    rtmpsink.set_property("location", "rtmps://url")
    rtmpsink.set_property("sync","false")
    pipeline.add(rtmpsink)
    flvmux.link(rtmpsink)
    rtmpsink.set_state(Gst.State.READY)
    pipeline.set_state(Gst.State.PLAYING)
    return False

def on_message(bus, message, loop):
    mtype = message.type
    if mtype == Gst.MessageType.ERROR:
        err, debug = message.parse_error()
        print("Error:", err, debug)
        loop.quit()
    elif mtype == Gst.MessageType.EOS:
        print("End of Stream")
        loop.quit()

# Link video elements
for src, dest in [(source, raw_capsfilter), (raw_capsfilter, encoder), (encoder, parser), (parser, video_queue), (video_queue, flvmux), (flvmux, rtmpsink)]:
    link_element(src, dest)

# Link audio elements
for src, dest in [(audio_source, audio_convert1), (audio_convert1, audio_capsfilter1), (audio_capsfilter1, audio_convert2), (audio_convert2, audio_capsfilter2), (audio_capsfilter2, audio_convert3), (audio_convert3, audio_capsfilter3), (audio_capsfilter3, queue), (queue, audio_encoder), (audio_encoder, aacparse), (aacparse, audio_queue2),(audio_queue2, flvmux)]:
    link_element(src, dest)

# Create an event loop and feed Gstremer bus message to it
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", on_message, loop)

# Start playing 
ret = pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
    print("Unable to set the pipeline to the playing state")
    exit(-1)

# Schedule the pause after 60 seconds
GLib.timeout_add_seconds(30, pause_pipeline)
try:
    loop.run()
except:
    pass

pipeline.set_state(Gst.State.NULL)
0

There are 0 answers