I am using gstreamer-rs to pull a stream from an rtsp camera stream. I would like to add some custom metadata to the ts segment like the camera source, and some other information everytime a new ts segment is created. The custom metadata can change so I cannot do it static.
I use the get-fragment-stream callback to get when the ts segment is written.
- I added a callback to get the stream convert it into a gst::Stream
- Then add the custom tags and merge them and the push it back to call get-fragment-stream
But I am running into issues where the gst::Stream does not support io.write.
What is the right way to add a custom tag to every ts segment generated by hlssink?
Here is a simple example program that runs if the hlssink.connect is removed.
use gst::prelude::*;
use gst::{DebugGraphDetails};
use anyhow::{Error, format_err};
use curl::easy::Easy;
use std::process::exit;
#[cfg(not(target_os = "macos"))]
pub fn run<T,F: FnOnce() -> T + Send + 'static>(main: F) -> T
where
T: Send + 'static, { main() }
fn k_main () {
gst::init().unwrap();
let main_loop = glib::MainLoop::new(None, false);
let pipeline = gst::Pipeline::new(None);
let rtspsrc = gst::ElementFactory::make("rtspsrc", None).unwrap();
let rtph264depay = gst::ElementFactory::make("rtph264depay", None).unwrap();
let h264parse = gst::ElementFactory::make("h264parse", None).unwrap();
let hlssink2 = gst::ElementFactory::make("hlssink2", None).unwrap();
rtspsrc.set_property_from_str("location", "rtsp://<rtspurl>");
hlssink2.set_property("playlist-location", "output.m3u8");
hlssink2.set_property("location", "output%05d.ts");
hlssink2.set_property("max-files", 10 as u32);
hlssink2.set_property("target-duration", 10 as u32);
pipeline.add_many(&[
&rtspsrc,
&rtph264depay,
&h264parse,
&hlssink2,
]).unwrap();
gst::Element::link_many(&[
&rtph264depay,
&h264parse,
&hlssink2,
]
).unwrap();
rtspsrc.connect_pad_added(move |src, src_pad| {
println!("inside connect_pad_added src_pad={:#?}", src_pad.current_caps());
let is_video = if src_pad.name().ends_with("_96") {
true
} else {
false
};
let connect_remote_stream = || -> Result<(), Error> {
let video_sink_pad = rtph264depay.static_pad("sink").expect("could not get sink pad from rtph264depay");
src_pad.link(&video_sink_pad).expect("failed to link rtspsrc.video->rtph264depay.sink");
println!("linked rtspsrc->rtph264depay");
Ok(())
};
if is_video {
match connect_remote_stream() {
Ok(_) => println!("rtsp stream is setup!"),
Err(e) => log::error!("could not setup rtspsrc->rtph264depay")
}
}
});
// connect to the hlssink to know when ts segment is written
let hlssink_clone = hlssink2.clone();
hlssink_clone.connect("get-fragment-stream", false, move |args| {
let fragment_stream = args[0].get::<gst::Stream>().expect("Failed to extract fragment stream from get-fragment-stream signal");
let segment_number = args[1].get::<u32>().expect("failed to extract segment number from gst-fragment-stream");
//println!("segment-number={}", segment_number);
let mut custom_tags = gst::tags::TagList::new();
custom_tags.add::<gst::tags::Title>(&format!("Custom Title {}", segment_number).as_str(), gst::TagMergeMode::Replace);
custom_tags.add::<gst::tags::Artist>(&format!("Custom artist {}", "blahblah").as_str(), gst::TagMergeMode::Replace);
fragment_stream.set_tags(Some(&custom_tags));
Some(
gio::WriteOutputStream::new(fragment_stream)
)
});
let bus = pipeline.bus().unwrap();
let main_loop_clone = main_loop.clone();
bus.add_watch(move |_, msg| {
use gst::MessageView;
match msg.view() {
MessageView::Eos(..) => {
println!("eos received");
main_loop_clone.quit()
}
MessageView::Error(err) => {
println!("error from {:#?}: {} ({:#?})",
err.src().map(|s| s.path_string()),
err.error(),
err.debug());
}
MessageView::StateChanged(state_changed) => {
println!("{} state changed from {:?} to {:?}", msg.src().unwrap(), state_changed.old(), state_changed.current());
}
MessageView::StreamStart(stream_status) => {
println!("stream start={:?}", stream_status.seqnum());
}
_=> (),
}
glib::Continue(true)
}).expect("failed to add bus watch");
match pipeline.set_state(gst::State::Playing) {
Ok(result) => println!("pipeline set to playing"),
Err(e) => {
println!("unable to set pipeline to playing e: {}", e);
exit(0);
}
}
main_loop.run();
pipeline.set_state(gst::State::Null).unwrap();
}
fn main() {
println!("Hello, world!");
run(k_main);
}