Skip to content

Commit

Permalink
src: stream: Move from shm to proxy pair
Browse files Browse the repository at this point in the history
  • Loading branch information
joaoantoniocardoso committed Apr 18, 2023
1 parent 21614db commit c894b6a
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 85 deletions.
15 changes: 14 additions & 1 deletion src/stream/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,23 @@ impl PipelineState {
.expect("No static sink pad found on capsfilter")
.current_caps()
.context("Failed to get caps from capsfilter sink pad")?;
let Some(encode) = caps
.iter()
.find_map(|structure| {
structure.iter().find_map(|(key, sendvalue)| {
if key == "encoding-name" {
Some(sendvalue.to_value().get::<String>().expect("Failed accessing encoding-name parameter"))
} else {
None
}
})
}) else {
return Err(anyhow!("Cannot find 'media' in caps"));
};

debug!("caps: {:#?}", caps.to_string());

RTSPServer::add_pipeline(&sink.path(), &sink.socket_path(), caps)?;
RTSPServer::add_pipeline(&sink.path(), sink.proxysink(), &encode)?;

RTSPServer::start_pipeline(&sink.path())?;
}
Expand Down
104 changes: 45 additions & 59 deletions src/stream/rtsp/rtsp_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ use gst_rtsp::RTSPLowerTrans;
use gst_rtsp_server::{prelude::*, RTSPTransportMode};
use tracing::*;

use crate::stream::rtsp::rtsp_media_factory;

#[allow(dead_code)]
pub struct RTSPServer {
pub server: gst_rtsp_server::RTSPServer,
host: String,
port: u16,
run: bool,
pub path_to_factory: HashMap<String, gst_rtsp_server::RTSPMediaFactory>,
pub path_to_factory: HashMap<String, rtsp_media_factory::Factory>,
main_loop_thread: Option<std::thread::JoinHandle<()>>,
main_loop_thread_rx_channel: std::sync::mpsc::Receiver<String>,
}
Expand Down Expand Up @@ -105,70 +107,30 @@ impl RTSPServer {
}

#[instrument(level = "debug")]
pub fn add_pipeline(path: &str, socket_path: &str, rtp_caps: &gst::Caps) -> Result<()> {
// Initialize the singleton before calling gst factory
let mut rtsp_server = RTSP_SERVER.as_ref().lock().unwrap();

let factory = gst_rtsp_server::RTSPMediaFactory::new();
factory.set_shared(true);
factory.set_buffer_size(0);
factory.set_latency(0u32);
factory.set_transport_mode(RTSPTransportMode::PLAY);
factory.set_protocols(RTSPLowerTrans::UDP | RTSPLowerTrans::UDP_MCAST);

let Some(encode) = rtp_caps
.iter()
.find_map(|structure| {
structure.iter().find_map(|(key, sendvalue)| {
if key == "encoding-name" {
Some(sendvalue.to_value().get::<String>().expect("Failed accessing encoding-name parameter"))
} else {
None
}
})
}) else {
return Err(anyhow!("Cannot find 'media' in caps"));
};

let rtp_caps = rtp_caps.to_string();
let description = match encode.as_str() {
fn create_rtsp_bin(proxysink: &gst::Element, encode: &str) -> Result<gst::Bin> {
let description = match encode {
"H264" => {
format!(
concat!(
"shmsrc socket-path={socket_path} do-timestamp=true",
" ! queue leaky=downstream flush-on-eos=true max-size-buffers=0",
" ! capsfilter caps={rtp_caps:?}",
" ! rtph264depay",
" ! rtph264pay name=pay0 aggregate-mode=zero-latency config-interval=10 pt=96",
),
socket_path = socket_path,
rtp_caps = rtp_caps,
concat!(
"proxysrc name=ProxySrc message-forward=true",
" ! queue leaky=downstream flush-on-eos=true max-size-buffers=0",
" ! rtph264depay",
" ! rtph264pay name=pay0 aggregate-mode=zero-latency config-interval=10 pt=96",
)
}
"RAW" => {
format!(
concat!(
"shmsrc socket-path={socket_path} do-timestamp=true",
" ! queue leaky=downstream flush-on-eos=true max-size-buffers=0",
" ! capsfilter caps={rtp_caps:?}",
" ! rtpvrawdepay",
" ! rtpvrawpay name=pay0 pt=96",
),
socket_path = socket_path,
rtp_caps = rtp_caps,
concat!(
"proxysrc name=ProxySrc",
" ! queue leaky=downstream flush-on-eos=true max-size-buffers=0",
" ! rtpvrawdepay",
" ! rtpvrawpay name=pay0 pt=96",
)
}
"JPEG" => {
format!(
concat!(
"shmsrc socket-path={socket_path} do-timestamp=true",
" ! queue leaky=downstream flush-on-eos=true max-size-buffers=0",
" ! capsfilter caps={rtp_caps:?}",
" ! rtpjpegdepay",
" ! rtpjpegpay name=pay0 pt=96",
),
socket_path = socket_path,
rtp_caps = rtp_caps,
concat!(
"proxysrc name=ProxySrc",
" ! queue leaky=downstream flush-on-eos=true max-size-buffers=0",
" ! rtpjpegdepay",
" ! rtpjpegpay name=pay0 pt=96",
)
}
unsupported => {
Expand All @@ -180,7 +142,31 @@ impl RTSPServer {

debug!("RTSP Server description: {description:#?}");

factory.set_launch(&description);
let rtsp_bin = gst::parse_bin_from_description(&description, true)?;
{
let proxysrc = rtsp_bin
.by_name("ProxySrc")
.expect("Failed to find proxysrc by name: wrong name?");
proxysrc.set_property("proxysink", proxysink);
let _ = rtsp_bin.set_state(gst::State::Playing);
}

Ok(rtsp_bin)
}

#[instrument(level = "debug")]
pub fn add_pipeline(path: &str, proxysink: &gst::Element, encode: &str) -> Result<()> {
// Initialize the singleton before calling gst factory
let mut rtsp_server = RTSP_SERVER.as_ref().lock().unwrap();

let rtsp_bin = Self::create_rtsp_bin(proxysink, encode)?;

let factory = rtsp_media_factory::Factory::new(rtsp_bin);
factory.set_shared(true);
factory.set_buffer_size(0);
factory.set_latency(0u32);
factory.set_transport_mode(RTSPTransportMode::PLAY);
factory.set_protocols(RTSPLowerTrans::UDP | RTSPLowerTrans::UDP_MCAST);

if let Some(server) = rtsp_server
.path_to_factory
Expand Down
43 changes: 18 additions & 25 deletions src/stream/sink/rtsp_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@ use super::SinkInterface;
pub struct RtspSink {
sink_id: uuid::Uuid,
queue: gst::Element,
sink: gst::Element,
proxysink: gst::Element,
sink_sink_pad: gst::Pad,
tee_src_pad: Option<gst::Pad>,
path: String,
socket_path: String,
}
impl SinkInterface for RtspSink {
#[instrument(level = "debug", skip(self))]
Expand All @@ -26,8 +25,6 @@ impl SinkInterface for RtspSink {
) -> Result<()> {
let sink_id = &self.get_id();

let _ = std::fs::remove_file(&self.socket_path); // Remove if already exists

// Set Tee's src pad
if self.tee_src_pad.is_some() {
return Err(anyhow!(
Expand Down Expand Up @@ -55,7 +52,7 @@ impl SinkInterface for RtspSink {
};

// Add the Sink elements to the Pipeline
let elements = &[&self.queue, &self.sink];
let elements = &[&self.queue, &self.proxysink];
if let Err(add_err) = pipeline.add_many(elements) {
let msg = format!("Failed to add WebRTCSink's elements to the Pipeline: {add_err:?}");
error!(msg);
Expand Down Expand Up @@ -143,10 +140,6 @@ impl SinkInterface for RtspSink {

#[instrument(level = "debug", skip(self))]
fn unlink(&self, pipeline: &gst::Pipeline, pipeline_id: &uuid::Uuid) -> Result<()> {
if let Err(error) = std::fs::remove_file(&self.socket_path) {
warn!("Failed removing the RTSP Sink socket file. Reason: {error:?}");
}

let Some(tee_src_pad) = &self.tee_src_pad else {
warn!("Tried to unlink Sink from a pipeline without a Tee src pad.");
return Ok(());
Expand Down Expand Up @@ -180,7 +173,7 @@ impl SinkInterface for RtspSink {
}

// Remove the Sink's elements from the Source's pipeline
let elements = &[&self.queue, &self.sink];
let elements = &[&self.queue, &self.proxysink];
if let Err(remove_err) = pipeline.remove_many(elements) {
warn!("Failed removing RtspSink's elements from pipeline: {remove_err:?}");
}
Expand All @@ -191,7 +184,7 @@ impl SinkInterface for RtspSink {
}

// Set Sink to null
if let Err(state_err) = self.sink.set_state(gst::State::Null) {
if let Err(state_err) = self.proxysink.set_state(gst::State::Null) {
warn!("Failed to set RtspSink's to NULL: {state_err:#?}");
}

Expand Down Expand Up @@ -227,34 +220,34 @@ impl RtspSink {
"Failed to find RTSP compatible address. Example: \"rtsp://0.0.0.0:8554/test\"",
)?;

let socket_path = format!("/tmp/{id}");
let sink = gst::ElementFactory::make("shmsink")
.property_from_str("socket-path", &socket_path)
.property("sync", true)
.property("wait-for-connection", false)
.property("shm-size", 10_000_000u32)
.build()?;
let proxysink = gst::ElementFactory::make("proxysink").build()?;

let sink_sink_pad = sink.static_pad("sink").context("Failed to get Sink Pad")?;
let sink_sink_pad = proxysink
.static_pad("sink")
.context("Failed to get Sink Pad")?;

Ok(Self {
sink_id: id,
queue,
sink,
proxysink,
sink_sink_pad,
path,
socket_path,
tee_src_pad: Default::default(),
})
}

#[instrument(level = "trace", skip(self))]
pub fn path(&self) -> String {
self.path.clone()
pub fn proxysink(&self) -> &gst::Element {
&self.proxysink
}

// #[instrument(level = "trace", skip(self))]
// pub fn link_proxy(&self, proxysrc: &gst::Element) {
// proxysrc.set_property("proxysink", &self.proxysink)
// }

#[instrument(level = "trace", skip(self))]
pub fn socket_path(&self) -> String {
self.socket_path.clone()
pub fn path(&self) -> String {
self.path.clone()
}
}

0 comments on commit c894b6a

Please sign in to comment.