summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJan Alexander Steffens (heftig) <jan.steffens@gmail.com>2018-02-16 09:17:40 +0100
committerOlivier CrĂȘte <olivier.crete@collabora.com>2018-02-23 15:29:57 -0500
commit37a9e0fff916f059644c2ec4013947d0eefd83c3 (patch)
tree6dcc3b335b39a3830cff0ef76e6566e82cbcdaa7
parent8a5dab1c060ca05b3fbb8492c293117e101ecf70 (diff)
srt: Add support for streamheaders to sinks
https://bugzilla.gnome.org/show_bug.cgi?id=793503
-rw-r--r--ext/srt/gstsrtbasesink.c102
-rw-r--r--ext/srt/gstsrtbasesink.h7
-rw-r--r--ext/srt/gstsrtclientsink.c12
-rw-r--r--ext/srt/gstsrtserversink.c7
4 files changed, 128 insertions, 0 deletions
diff --git a/ext/srt/gstsrtbasesink.c b/ext/srt/gstsrtbasesink.c
index 8c465a199..d5521c8bb 100644
--- a/ext/srt/gstsrtbasesink.c
+++ b/ext/srt/gstsrtbasesink.c
@@ -123,6 +123,7 @@ gst_srt_base_sink_finalize (GObject * object)
{
GstSRTBaseSink *self = GST_SRT_BASE_SINK (object);
+ g_clear_pointer (&self->headers, gst_buffer_list_unref);
g_clear_pointer (&self->uri, gst_uri_unref);
g_clear_pointer (&self->passphrase, g_free);
@@ -130,8 +131,62 @@ gst_srt_base_sink_finalize (GObject * object)
}
static gboolean
+gst_srt_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
+{
+ GstSRTBaseSink *self = GST_SRT_BASE_SINK (sink);
+ GstStructure *s;
+ const GValue *streamheader;
+
+ GST_DEBUG_OBJECT (self, "setcaps %" GST_PTR_FORMAT, caps);
+
+ g_clear_pointer (&self->headers, gst_buffer_list_unref);
+
+ s = gst_caps_get_structure (caps, 0);
+ streamheader = gst_structure_get_value (s, "streamheader");
+
+ if (!streamheader) {
+ GST_DEBUG_OBJECT (self, "'streamheader' field not present");
+ } else if (GST_VALUE_HOLDS_BUFFER (streamheader)) {
+ GST_DEBUG_OBJECT (self, "'streamheader' field holds buffer");
+ self->headers = gst_buffer_list_new_sized (1);
+ gst_buffer_list_add (self->headers, g_value_dup_boxed (streamheader));
+ } else if (GST_VALUE_HOLDS_ARRAY (streamheader)) {
+ guint i, size;
+
+ GST_DEBUG_OBJECT (self, "'streamheader' field holds array");
+
+ size = gst_value_array_get_size (streamheader);
+ self->headers = gst_buffer_list_new_sized (size);
+
+ for (i = 0; i < size; i++) {
+ const GValue *v = gst_value_array_get_value (streamheader, i);
+ if (!GST_VALUE_HOLDS_BUFFER (v)) {
+ GST_ERROR_OBJECT (self, "'streamheader' item of unexpected type '%s'",
+ G_VALUE_TYPE_NAME (v));
+ return FALSE;
+ }
+
+ gst_buffer_list_add (self->headers, g_value_dup_boxed (v));
+ }
+ } else {
+ GST_ERROR_OBJECT (self, "'streamheader' field has unexpected type '%s'",
+ G_VALUE_TYPE_NAME (streamheader));
+ return FALSE;
+ }
+
+ GST_DEBUG_OBJECT (self, "Collected streamheaders: %u buffers",
+ self->headers ? gst_buffer_list_length (self->headers) : 0);
+
+ return TRUE;
+}
+
+static gboolean
gst_srt_base_sink_stop (GstBaseSink * sink)
{
+ GstSRTBaseSink *self = GST_SRT_BASE_SINK (sink);
+
+ g_clear_pointer (&self->headers, gst_buffer_list_unref);
+
return TRUE;
}
@@ -143,6 +198,12 @@ gst_srt_base_sink_render (GstBaseSink * sink, GstBuffer * buffer)
GstSRTBaseSinkClass *bclass = GST_SRT_BASE_SINK_GET_CLASS (sink);
GstFlowReturn ret = GST_FLOW_OK;
+ if (self->headers && GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_HEADER)) {
+ GST_DEBUG_OBJECT (self, "Have streamheaders,"
+ " ignoring header %" GST_PTR_FORMAT, buffer);
+ return GST_FLOW_OK;
+ }
+
GST_TRACE_OBJECT (self, "sending buffer %p, offset %"
G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT
", timestamp %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT
@@ -203,6 +264,7 @@ gst_srt_base_sink_class_init (GstSRTBaseSinkClass * klass)
g_object_class_install_properties (gobject_class, PROP_LAST, properties);
+ gstbasesink_class->set_caps = GST_DEBUG_FUNCPTR (gst_srt_base_sink_set_caps);
gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_srt_base_sink_stop);
gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_srt_base_sink_render);
}
@@ -283,6 +345,46 @@ gst_srt_base_sink_uri_handler_init (gpointer g_iface, gpointer iface_data)
iface->set_uri = gst_srt_base_sink_uri_set_uri;
}
+gboolean
+gst_srt_base_sink_send_headers (GstSRTBaseSink * self,
+ GstSRTBaseSinkSendCallback send_cb, gpointer user_data)
+{
+ guint size, i;
+
+ g_return_val_if_fail (GST_IS_SRT_BASE_SINK (self), FALSE);
+ g_return_val_if_fail (send_cb, FALSE);
+
+ if (!self->headers)
+ return TRUE;
+
+ size = gst_buffer_list_length (self->headers);
+
+ GST_DEBUG_OBJECT (self, "Sending %u stream headers", size);
+
+ for (i = 0; i < size; i++) {
+ GstBuffer *buffer = gst_buffer_list_get (self->headers, i);
+ GstMapInfo info;
+ gboolean ret;
+
+ GST_TRACE_OBJECT (self, "sending header %u %" GST_PTR_FORMAT, i, buffer);
+
+ if (!gst_buffer_map (buffer, &info, GST_MAP_READ)) {
+ GST_ELEMENT_ERROR (self, RESOURCE, READ,
+ ("Could not map the input stream"), (NULL));
+ return FALSE;
+ }
+
+ ret = send_cb (self, &info, user_data);
+
+ gst_buffer_unmap (buffer, &info);
+
+ if (!ret)
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
GstStructure *
gst_srt_base_sink_get_stats (GSocketAddress * sockaddr, SRTSOCKET sock)
{
diff --git a/ext/srt/gstsrtbasesink.h b/ext/srt/gstsrtbasesink.h
index 453a0d557..9d617d5fd 100644
--- a/ext/srt/gstsrtbasesink.h
+++ b/ext/srt/gstsrtbasesink.h
@@ -45,6 +45,7 @@ struct _GstSRTBaseSink {
GstBaseSink parent;
GstUri *uri;
+ GstBufferList *headers;
gint latency;
gchar *passphrase;
gint key_length;
@@ -65,6 +66,12 @@ struct _GstSRTBaseSinkClass {
GST_EXPORT
GType gst_srt_base_sink_get_type (void);
+typedef gboolean (*GstSRTBaseSinkSendCallback) (GstSRTBaseSink *sink,
+ const GstMapInfo *mapinfo, gpointer user_data);
+
+gboolean gst_srt_base_sink_send_headers (GstSRTBaseSink *sink,
+ GstSRTBaseSinkSendCallback send_cb, gpointer user_data);
+
GstStructure * gst_srt_base_sink_get_stats (GSocketAddress *sockaddr,
SRTSOCKET sock);
diff --git a/ext/srt/gstsrtclientsink.c b/ext/srt/gstsrtclientsink.c
index 3bafb1eaf..09de74eb6 100644
--- a/ext/srt/gstsrtclientsink.c
+++ b/ext/srt/gstsrtclientsink.c
@@ -68,6 +68,8 @@ struct _GstSRTClientSinkPrivate
gboolean rendez_vous;
gchar *bind_address;
guint16 bind_port;
+
+ gboolean sent_headers;
};
#define GST_SRT_CLIENT_SINK_GET_PRIVATE(obj) \
@@ -190,6 +192,14 @@ gst_srt_client_sink_send_buffer (GstSRTBaseSink * sink,
GstSRTClientSink *self = GST_SRT_CLIENT_SINK (sink);
GstSRTClientSinkPrivate *priv = GST_SRT_CLIENT_SINK_GET_PRIVATE (self);
+ if (!priv->sent_headers) {
+ if (!gst_srt_base_sink_send_headers (sink, send_buffer_internal,
+ GINT_TO_POINTER (priv->sock)))
+ return FALSE;
+
+ priv->sent_headers = TRUE;
+ }
+
return send_buffer_internal (sink, mapinfo, GINT_TO_POINTER (priv->sock));
}
@@ -214,6 +224,8 @@ gst_srt_client_sink_stop (GstBaseSink * sink)
g_clear_object (&priv->sockaddr);
+ priv->sent_headers = FALSE;
+
return GST_BASE_SINK_CLASS (parent_class)->stop (sink);
}
diff --git a/ext/srt/gstsrtserversink.c b/ext/srt/gstsrtserversink.c
index f7b76cf33..d704000e5 100644
--- a/ext/srt/gstsrtserversink.c
+++ b/ext/srt/gstsrtserversink.c
@@ -103,6 +103,7 @@ typedef struct
{
int sock;
GSocketAddress *sockaddr;
+ gboolean sent_headers;
} SRTClient;
static SRTClient *
@@ -418,6 +419,12 @@ gst_srt_server_sink_send_buffer (GstSRTBaseSink * sink,
SRTClient *client = clients->data;
clients = clients->next;
+ if (!client->sent_headers) {
+ if (!gst_srt_base_sink_send_headers (sink, send_buffer_internal, client))
+ goto err;
+
+ client->sent_headers = TRUE;
+ }
if (!send_buffer_internal (sink, mapinfo, client))
goto err;