diff options
author | Jan Alexander Steffens (heftig) <jan.steffens@gmail.com> | 2018-02-16 09:17:40 +0100 |
---|---|---|
committer | Olivier CrĂȘte <olivier.crete@collabora.com> | 2018-02-23 15:29:57 -0500 |
commit | 37a9e0fff916f059644c2ec4013947d0eefd83c3 (patch) | |
tree | 6dcc3b335b39a3830cff0ef76e6566e82cbcdaa7 | |
parent | 8a5dab1c060ca05b3fbb8492c293117e101ecf70 (diff) |
srt: Add support for streamheaders to sinks
https://bugzilla.gnome.org/show_bug.cgi?id=793503
-rw-r--r-- | ext/srt/gstsrtbasesink.c | 102 | ||||
-rw-r--r-- | ext/srt/gstsrtbasesink.h | 7 | ||||
-rw-r--r-- | ext/srt/gstsrtclientsink.c | 12 | ||||
-rw-r--r-- | ext/srt/gstsrtserversink.c | 7 |
4 files changed, 128 insertions, 0 deletions
diff --git a/ext/srt/gstsrtbasesink.c b/ext/srt/gstsrtbasesink.c index 8c465a199..d5521c8bb 100644 --- a/ext/srt/gstsrtbasesink.c +++ b/ext/srt/gstsrtbasesink.c @@ -123,6 +123,7 @@ gst_srt_base_sink_finalize (GObject * object) { GstSRTBaseSink *self = GST_SRT_BASE_SINK (object); + g_clear_pointer (&self->headers, gst_buffer_list_unref); g_clear_pointer (&self->uri, gst_uri_unref); g_clear_pointer (&self->passphrase, g_free); @@ -130,8 +131,62 @@ gst_srt_base_sink_finalize (GObject * object) } static gboolean +gst_srt_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps) +{ + GstSRTBaseSink *self = GST_SRT_BASE_SINK (sink); + GstStructure *s; + const GValue *streamheader; + + GST_DEBUG_OBJECT (self, "setcaps %" GST_PTR_FORMAT, caps); + + g_clear_pointer (&self->headers, gst_buffer_list_unref); + + s = gst_caps_get_structure (caps, 0); + streamheader = gst_structure_get_value (s, "streamheader"); + + if (!streamheader) { + GST_DEBUG_OBJECT (self, "'streamheader' field not present"); + } else if (GST_VALUE_HOLDS_BUFFER (streamheader)) { + GST_DEBUG_OBJECT (self, "'streamheader' field holds buffer"); + self->headers = gst_buffer_list_new_sized (1); + gst_buffer_list_add (self->headers, g_value_dup_boxed (streamheader)); + } else if (GST_VALUE_HOLDS_ARRAY (streamheader)) { + guint i, size; + + GST_DEBUG_OBJECT (self, "'streamheader' field holds array"); + + size = gst_value_array_get_size (streamheader); + self->headers = gst_buffer_list_new_sized (size); + + for (i = 0; i < size; i++) { + const GValue *v = gst_value_array_get_value (streamheader, i); + if (!GST_VALUE_HOLDS_BUFFER (v)) { + GST_ERROR_OBJECT (self, "'streamheader' item of unexpected type '%s'", + G_VALUE_TYPE_NAME (v)); + return FALSE; + } + + gst_buffer_list_add (self->headers, g_value_dup_boxed (v)); + } + } else { + GST_ERROR_OBJECT (self, "'streamheader' field has unexpected type '%s'", + G_VALUE_TYPE_NAME (streamheader)); + return FALSE; + } + + GST_DEBUG_OBJECT (self, "Collected streamheaders: %u buffers", + self->headers ? gst_buffer_list_length (self->headers) : 0); + + return TRUE; +} + +static gboolean gst_srt_base_sink_stop (GstBaseSink * sink) { + GstSRTBaseSink *self = GST_SRT_BASE_SINK (sink); + + g_clear_pointer (&self->headers, gst_buffer_list_unref); + return TRUE; } @@ -143,6 +198,12 @@ gst_srt_base_sink_render (GstBaseSink * sink, GstBuffer * buffer) GstSRTBaseSinkClass *bclass = GST_SRT_BASE_SINK_GET_CLASS (sink); GstFlowReturn ret = GST_FLOW_OK; + if (self->headers && GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_HEADER)) { + GST_DEBUG_OBJECT (self, "Have streamheaders," + " ignoring header %" GST_PTR_FORMAT, buffer); + return GST_FLOW_OK; + } + GST_TRACE_OBJECT (self, "sending buffer %p, offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT ", timestamp %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT @@ -203,6 +264,7 @@ gst_srt_base_sink_class_init (GstSRTBaseSinkClass * klass) g_object_class_install_properties (gobject_class, PROP_LAST, properties); + gstbasesink_class->set_caps = GST_DEBUG_FUNCPTR (gst_srt_base_sink_set_caps); gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_srt_base_sink_stop); gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_srt_base_sink_render); } @@ -283,6 +345,46 @@ gst_srt_base_sink_uri_handler_init (gpointer g_iface, gpointer iface_data) iface->set_uri = gst_srt_base_sink_uri_set_uri; } +gboolean +gst_srt_base_sink_send_headers (GstSRTBaseSink * self, + GstSRTBaseSinkSendCallback send_cb, gpointer user_data) +{ + guint size, i; + + g_return_val_if_fail (GST_IS_SRT_BASE_SINK (self), FALSE); + g_return_val_if_fail (send_cb, FALSE); + + if (!self->headers) + return TRUE; + + size = gst_buffer_list_length (self->headers); + + GST_DEBUG_OBJECT (self, "Sending %u stream headers", size); + + for (i = 0; i < size; i++) { + GstBuffer *buffer = gst_buffer_list_get (self->headers, i); + GstMapInfo info; + gboolean ret; + + GST_TRACE_OBJECT (self, "sending header %u %" GST_PTR_FORMAT, i, buffer); + + if (!gst_buffer_map (buffer, &info, GST_MAP_READ)) { + GST_ELEMENT_ERROR (self, RESOURCE, READ, + ("Could not map the input stream"), (NULL)); + return FALSE; + } + + ret = send_cb (self, &info, user_data); + + gst_buffer_unmap (buffer, &info); + + if (!ret) + return FALSE; + } + + return TRUE; +} + GstStructure * gst_srt_base_sink_get_stats (GSocketAddress * sockaddr, SRTSOCKET sock) { diff --git a/ext/srt/gstsrtbasesink.h b/ext/srt/gstsrtbasesink.h index 453a0d557..9d617d5fd 100644 --- a/ext/srt/gstsrtbasesink.h +++ b/ext/srt/gstsrtbasesink.h @@ -45,6 +45,7 @@ struct _GstSRTBaseSink { GstBaseSink parent; GstUri *uri; + GstBufferList *headers; gint latency; gchar *passphrase; gint key_length; @@ -65,6 +66,12 @@ struct _GstSRTBaseSinkClass { GST_EXPORT GType gst_srt_base_sink_get_type (void); +typedef gboolean (*GstSRTBaseSinkSendCallback) (GstSRTBaseSink *sink, + const GstMapInfo *mapinfo, gpointer user_data); + +gboolean gst_srt_base_sink_send_headers (GstSRTBaseSink *sink, + GstSRTBaseSinkSendCallback send_cb, gpointer user_data); + GstStructure * gst_srt_base_sink_get_stats (GSocketAddress *sockaddr, SRTSOCKET sock); diff --git a/ext/srt/gstsrtclientsink.c b/ext/srt/gstsrtclientsink.c index 3bafb1eaf..09de74eb6 100644 --- a/ext/srt/gstsrtclientsink.c +++ b/ext/srt/gstsrtclientsink.c @@ -68,6 +68,8 @@ struct _GstSRTClientSinkPrivate gboolean rendez_vous; gchar *bind_address; guint16 bind_port; + + gboolean sent_headers; }; #define GST_SRT_CLIENT_SINK_GET_PRIVATE(obj) \ @@ -190,6 +192,14 @@ gst_srt_client_sink_send_buffer (GstSRTBaseSink * sink, GstSRTClientSink *self = GST_SRT_CLIENT_SINK (sink); GstSRTClientSinkPrivate *priv = GST_SRT_CLIENT_SINK_GET_PRIVATE (self); + if (!priv->sent_headers) { + if (!gst_srt_base_sink_send_headers (sink, send_buffer_internal, + GINT_TO_POINTER (priv->sock))) + return FALSE; + + priv->sent_headers = TRUE; + } + return send_buffer_internal (sink, mapinfo, GINT_TO_POINTER (priv->sock)); } @@ -214,6 +224,8 @@ gst_srt_client_sink_stop (GstBaseSink * sink) g_clear_object (&priv->sockaddr); + priv->sent_headers = FALSE; + return GST_BASE_SINK_CLASS (parent_class)->stop (sink); } diff --git a/ext/srt/gstsrtserversink.c b/ext/srt/gstsrtserversink.c index f7b76cf33..d704000e5 100644 --- a/ext/srt/gstsrtserversink.c +++ b/ext/srt/gstsrtserversink.c @@ -103,6 +103,7 @@ typedef struct { int sock; GSocketAddress *sockaddr; + gboolean sent_headers; } SRTClient; static SRTClient * @@ -418,6 +419,12 @@ gst_srt_server_sink_send_buffer (GstSRTBaseSink * sink, SRTClient *client = clients->data; clients = clients->next; + if (!client->sent_headers) { + if (!gst_srt_base_sink_send_headers (sink, send_buffer_internal, client)) + goto err; + + client->sent_headers = TRUE; + } if (!send_buffer_internal (sink, mapinfo, client)) goto err; |