summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Waters <matthew@centricular.com>2018-09-10 23:52:05 +1000
committerMatthew Waters <matthew@centricular.com>2018-09-21 19:45:12 +1000
commit07e9374eff7967713ab2dc47b1eb2843d0bfcaa0 (patch)
treee745fcad33ad9bdbdfe1be64b94804ccb7e3697d
parentcf46d49b1e94587b15093dfdf12431c1ab00ba57 (diff)
webrtcbin: add support for data channels based on SCTP
Mostly follows the W3C specification https://www.w3.org/TR/webrtc/#peer-to-peer-data-api With contributions from: Mathieu Duponchelle <mathieu@centricular.com> https://bugzilla.gnome.org/show_bug.cgi?id=794351
-rw-r--r--ext/webrtc/Makefile.am8
-rw-r--r--ext/webrtc/fwd.h4
-rw-r--r--ext/webrtc/gstwebrtcbin.c1058
-rw-r--r--ext/webrtc/gstwebrtcbin.h13
-rw-r--r--ext/webrtc/meson.build4
-rw-r--r--ext/webrtc/sctptransport.c270
-rw-r--r--ext/webrtc/sctptransport.h65
-rw-r--r--ext/webrtc/transportreceivebin.c58
-rw-r--r--ext/webrtc/transportsendbin.c26
-rw-r--r--ext/webrtc/webrtcdatachannel.c1296
-rw-r--r--ext/webrtc/webrtcdatachannel.h83
-rw-r--r--ext/webrtc/webrtcsdp.c35
-rw-r--r--ext/webrtc/webrtcsdp.h4
-rw-r--r--gst-libs/gst/webrtc/webrtc_fwd.h53
-rw-r--r--tests/check/elements/webrtcbin.c500
15 files changed, 3286 insertions, 191 deletions
diff --git a/ext/webrtc/Makefile.am b/ext/webrtc/Makefile.am
index 5f9a71488..778f112d0 100644
--- a/ext/webrtc/Makefile.am
+++ b/ext/webrtc/Makefile.am
@@ -7,11 +7,13 @@ noinst_HEADERS = \
gstwebrtcstats.h \
icestream.h \
nicetransport.h \
+ sctptransport.h \
transportstream.h \
transportsendbin.h \
transportreceivebin.h \
utils.h \
webrtcsdp.h \
+ webrtcdatachannel.h \
webrtctransceiver.h
libgstwebrtc_la_SOURCES = \
@@ -21,11 +23,13 @@ libgstwebrtc_la_SOURCES = \
gstwebrtcstats.c \
icestream.c \
nicetransport.c \
+ sctptransport.c \
transportstream.c \
transportsendbin.c \
transportreceivebin.c \
utils.c \
webrtcsdp.c \
+ webrtcdatachannel.c \
webrtctransceiver.c
libgstwebrtc_la_SOURCES += $(BUILT_SOURCES)
@@ -40,12 +44,14 @@ libgstwebrtc_la_CFLAGS = \
$(GST_SDP_CFLAGS) \
$(NICE_CFLAGS)
libgstwebrtc_la_LIBADD = \
+ -lgstapp-@GST_API_VERSION@ \
$(GST_PLUGINS_BASE_LIBS) \
$(GST_BASE_LIBS) \
$(GST_LIBS) \
$(GST_SDP_LIBS) \
$(NICE_LIBS) \
- $(top_builddir)/gst-libs/gst/webrtc/libgstwebrtc-@GST_API_VERSION@.la
+ $(top_builddir)/gst-libs/gst/webrtc/libgstwebrtc-@GST_API_VERSION@.la \
+ $(top_builddir)/gst-libs/gst/sctp/libgstsctp-@GST_API_VERSION@.la
libgstwebrtc_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS)
libgstwebrtc_la_LIBTOOLFLAGS = $(GST_PLUGIN_LIBTOOLFLAGS)
diff --git a/ext/webrtc/fwd.h b/ext/webrtc/fwd.h
index 903145fbf..aa26ec6de 100644
--- a/ext/webrtc/fwd.h
+++ b/ext/webrtc/fwd.h
@@ -41,6 +41,10 @@ typedef struct _GstWebRTCNiceTransport GstWebRTCNiceTransport;
typedef struct _GstWebRTCNiceTransportClass GstWebRTCNiceTransportClass;
typedef struct _GstWebRTCNiceTransportPrivate GstWebRTCNiceTransportPrivate;
+typedef struct _GstWebRTCSCTPTransport GstWebRTCSCTPTransport;
+typedef struct _GstWebRTCSCTPTransportClass GstWebRTCSCTPTransportClass;
+typedef struct _GstWebRTCSCTPTransportPrivate GstWebRTCSCTPTransportPrivate;
+
typedef struct _TransportStream TransportStream;
typedef struct _TransportStreamClass TransportStreamClass;
diff --git a/ext/webrtc/gstwebrtcbin.c b/ext/webrtc/gstwebrtcbin.c
index 268432caa..60614b829 100644
--- a/ext/webrtc/gstwebrtcbin.c
+++ b/ext/webrtc/gstwebrtcbin.c
@@ -28,6 +28,8 @@
#include "utils.h"
#include "webrtcsdp.h"
#include "webrtctransceiver.h"
+#include "webrtcdatachannel.h"
+#include "sctptransport.h"
#include <stdio.h>
#include <stdlib.h>
@@ -50,8 +52,8 @@
/*
* This webrtcbin implements the majority of the W3's peerconnection API and
* implementation guide where possible. Generating offers, answers and setting
- * local and remote SDP's are all supported. To start with, only the media
- * interface has been implemented (no datachannel yet).
+ * local and remote SDP's are all supported. Both media descriptions and
+ * descriptions involving data channels are supported.
*
* Each input/output pad is equivalent to a Track in W3 parlance which are
* added/removed from the bin. The number of requested sink pads is the number
@@ -70,7 +72,6 @@
* LS groups
* bundling
* setting custom DTLS certificates
- * data channel
*
* seperate session id's from mlineindex properly
* how to deal with replacing a input/output track/stream
@@ -108,6 +109,32 @@ _have_nice_elements (GstWebRTCBin * webrtc)
}
static gboolean
+_have_sctp_elements (GstWebRTCBin * webrtc)
+{
+ GstPluginFeature *feature;
+
+ feature = gst_registry_lookup_feature (gst_registry_get (), "sctpdec");
+ if (feature) {
+ gst_object_unref (feature);
+ } else {
+ GST_ELEMENT_ERROR (webrtc, CORE, MISSING_PLUGIN, NULL,
+ ("%s", "sctp elements are not available"));
+ return FALSE;
+ }
+
+ feature = gst_registry_lookup_feature (gst_registry_get (), "sctpenc");
+ if (feature) {
+ gst_object_unref (feature);
+ } else {
+ GST_ELEMENT_ERROR (webrtc, CORE, MISSING_PLUGIN, NULL,
+ ("%s", "sctp elements are not available"));
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+static gboolean
_have_dtls_elements (GstWebRTCBin * webrtc)
{
GstPluginFeature *feature;
@@ -273,7 +300,8 @@ gst_webrtc_bin_pad_new (const gchar * name, GstPadDirection direction)
G_DEFINE_TYPE_WITH_CODE (GstWebRTCBin, gst_webrtc_bin, GST_TYPE_BIN,
G_ADD_PRIVATE (GstWebRTCBin)
GST_DEBUG_CATEGORY_INIT (gst_webrtc_bin_debug, "webrtcbin", 0,
- "webrtcbin element"););
+ "webrtcbin element");
+ );
static GstPad *_connect_input_stream (GstWebRTCBin * webrtc,
GstWebRTCBinPad * pad);
@@ -303,6 +331,8 @@ enum
ADD_TRANSCEIVER_SIGNAL,
GET_TRANSCEIVERS_SIGNAL,
ADD_TURN_SERVER_SIGNAL,
+ CREATE_DATA_CHANNEL_SIGNAL,
+ ON_DATA_CHANNEL_SIGNAL,
LAST_SIGNAL,
};
@@ -524,6 +554,47 @@ _find_pad (GstWebRTCBin * webrtc, gconstpointer data, FindPadFunc func)
return NULL;
}
+typedef gboolean (*FindDataChannelFunc) (GstWebRTCDataChannel * p1,
+ gconstpointer data);
+
+static GstWebRTCDataChannel *
+_find_data_channel (GstWebRTCBin * webrtc, gconstpointer data,
+ FindDataChannelFunc func)
+{
+ int i;
+
+ for (i = 0; i < webrtc->priv->data_channels->len; i++) {
+ GstWebRTCDataChannel *channel =
+ g_array_index (webrtc->priv->data_channels, GstWebRTCDataChannel *,
+ i);
+
+ if (func (channel, data))
+ return channel;
+ }
+
+ return NULL;
+}
+
+static gboolean
+data_channel_match_for_id (GstWebRTCDataChannel * channel, gint * id)
+{
+ return channel->id == *id;
+}
+
+static GstWebRTCDataChannel *
+_find_data_channel_for_id (GstWebRTCBin * webrtc, gint id)
+{
+ GstWebRTCDataChannel *channel;
+
+ channel = _find_data_channel (webrtc, &id,
+ (FindDataChannelFunc) data_channel_match_for_id);
+
+ GST_TRACE_OBJECT (webrtc,
+ "Found data channel %" GST_PTR_FORMAT " for id %i", channel, id);
+
+ return channel;
+}
+
static void
_add_pad_to_list (GstWebRTCBin * webrtc, GstWebRTCBinPad * pad)
{
@@ -1415,7 +1486,6 @@ _create_transport_channel (GstWebRTCBin * webrtc, guint session_id)
{
GstWebRTCDTLSTransport *transport;
TransportStream *ret;
- gchar *pad_name;
/* FIXME: how to parametrize the sender and the receiver */
ret = transport_stream_new (webrtc, session_id);
@@ -1439,6 +1509,44 @@ _create_transport_channel (GstWebRTCBin * webrtc, guint session_id)
G_CALLBACK (_on_dtls_transport_notify_state), webrtc);
}
+ GST_TRACE_OBJECT (webrtc,
+ "Create transport %" GST_PTR_FORMAT " for session %u", ret, session_id);
+
+ return ret;
+}
+
+static gboolean
+_message_media_is_datachannel (const GstSDPMessage * msg, guint media_id)
+{
+ const GstSDPMedia *media;
+
+ if (!msg)
+ return FALSE;
+
+ if (gst_sdp_message_medias_len (msg) <= media_id)
+ return FALSE;
+
+ media = gst_sdp_message_get_media (msg, media_id);
+
+ if (g_strcmp0 (gst_sdp_media_get_media (media), "application") != 0)
+ return FALSE;
+
+ if (gst_sdp_media_formats_len (media) != 1)
+ return FALSE;
+
+ if (g_strcmp0 (gst_sdp_media_get_format (media, 0),
+ "webrtc-datachannel") != 0)
+ return FALSE;
+
+ return TRUE;
+}
+
+static TransportStream *
+_create_rtp_transport_channel (GstWebRTCBin * webrtc, guint session_id)
+{
+ TransportStream *ret = _create_transport_channel (webrtc, session_id);
+ gchar *pad_name;
+
gst_bin_add (GST_BIN (webrtc), GST_ELEMENT (ret->send_bin));
gst_bin_add (GST_BIN (webrtc), GST_ELEMENT (ret->receive_bin));
@@ -1456,15 +1564,217 @@ _create_transport_channel (GstWebRTCBin * webrtc, guint session_id)
g_array_append_val (webrtc->priv->transports, ret);
- GST_TRACE_OBJECT (webrtc,
- "Create transport %" GST_PTR_FORMAT " for session %u", ret, session_id);
-
gst_element_sync_state_with_parent (GST_ELEMENT (ret->send_bin));
gst_element_sync_state_with_parent (GST_ELEMENT (ret->receive_bin));
return ret;
}
+/* this is called from the webrtc thread with the pc lock held */
+static void
+_on_data_channel_ready_state (GstWebRTCDataChannel * channel,
+ GParamSpec * pspec, GstWebRTCBin * webrtc)
+{
+ GstWebRTCDataChannelState ready_state;
+ guint i;
+
+ g_object_get (channel, "ready-state", &ready_state, NULL);
+
+ if (ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_OPEN) {
+ gboolean found = FALSE;
+
+ for (i = 0; i < webrtc->priv->pending_data_channels->len; i++) {
+ GstWebRTCDataChannel *c;
+
+ c = g_array_index (webrtc->priv->pending_data_channels,
+ GstWebRTCDataChannel *, i);
+ if (c == channel) {
+ found = TRUE;
+ g_array_remove_index (webrtc->priv->pending_data_channels, i);
+ break;
+ }
+ }
+ if (found == FALSE) {
+ GST_FIXME_OBJECT (webrtc, "Received open for unknown data channel");
+ return;
+ }
+
+ g_array_append_val (webrtc->priv->data_channels, channel);
+
+ g_signal_emit (webrtc, gst_webrtc_bin_signals[ON_DATA_CHANNEL_SIGNAL], 0,
+ gst_object_ref (channel));
+ }
+}
+
+static void
+_link_data_channel_to_sctp (GstWebRTCBin * webrtc,
+ GstWebRTCDataChannel * channel)
+{
+ if (webrtc->priv->sctp_transport && !channel->sctp_transport) {
+ gint id;
+
+ g_object_get (channel, "id", &id, NULL);
+
+ if (webrtc->priv->sctp_transport->association_established && id != -1) {
+ gchar *pad_name;
+
+ gst_webrtc_data_channel_set_sctp_transport (channel,
+ webrtc->priv->sctp_transport);
+ pad_name = g_strdup_printf ("sink_%u", id);
+ if (!gst_element_link_pads (channel->appsrc, "src",
+ channel->sctp_transport->sctpenc, pad_name))
+ g_warn_if_reached ();
+ g_free (pad_name);
+ }
+ }
+}
+
+static void
+_on_sctpdec_pad_added (GstElement * sctpdec, GstPad * pad,
+ GstWebRTCBin * webrtc)
+{
+ GstWebRTCDataChannel *channel;
+ guint stream_id;
+ GstPad *sink_pad;
+
+ if (sscanf (GST_PAD_NAME (pad), "src_%u", &stream_id) != 1)
+ return;
+
+ PC_LOCK (webrtc);
+ channel = _find_data_channel_for_id (webrtc, stream_id);
+ if (!channel) {
+ channel = g_object_new (GST_TYPE_WEBRTC_DATA_CHANNEL, NULL);
+ channel->id = stream_id;
+ channel->webrtcbin = webrtc;
+
+ gst_bin_add (GST_BIN (webrtc), channel->appsrc);
+ gst_bin_add (GST_BIN (webrtc), channel->appsink);
+
+ gst_element_sync_state_with_parent (channel->appsrc);
+ gst_element_sync_state_with_parent (channel->appsink);
+
+ _link_data_channel_to_sctp (webrtc, channel);
+
+ g_array_append_val (webrtc->priv->pending_data_channels, channel);
+ }
+
+ g_signal_connect (channel, "notify::ready-state",
+ G_CALLBACK (_on_data_channel_ready_state), webrtc);
+
+ sink_pad = gst_element_get_static_pad (channel->appsink, "sink");
+ if (gst_pad_link (pad, sink_pad) != GST_PAD_LINK_OK)
+ GST_WARNING_OBJECT (channel, "Failed to link sctp pad %s with channel %"
+ GST_PTR_FORMAT, GST_PAD_NAME (pad), channel);
+ gst_object_unref (sink_pad);
+ PC_UNLOCK (webrtc);
+}
+
+static void
+_on_sctp_state_notify (GstWebRTCSCTPTransport * sctp, GParamSpec * pspec,
+ GstWebRTCBin * webrtc)
+{
+ GstWebRTCSCTPTransportState state;
+
+ g_object_get (sctp, "state", &state, NULL);
+
+ if (state == GST_WEBRTC_SCTP_TRANSPORT_STATE_CONNECTED) {
+ int i;
+
+ PC_LOCK (webrtc);
+ GST_DEBUG_OBJECT (webrtc, "SCTP association established");
+
+ for (i = 0; i < webrtc->priv->data_channels->len; i++) {
+ GstWebRTCDataChannel *channel;
+
+ channel =
+ g_array_index (webrtc->priv->data_channels, GstWebRTCDataChannel *,
+ i);
+
+ _link_data_channel_to_sctp (webrtc, channel);
+
+ if (!channel->negotiated && !channel->opened)
+ gst_webrtc_data_channel_start_negotiation (channel);
+ }
+ PC_UNLOCK (webrtc);
+ }
+}
+
+static TransportStream *
+_create_data_channel_transports (GstWebRTCBin * webrtc, guint session_id)
+{
+ if (!webrtc->priv->data_channel_transport) {
+ TransportStream *stream = _create_transport_channel (webrtc, session_id);
+ GstWebRTCSCTPTransport *sctp_transport;
+ int i;
+
+ webrtc->priv->data_channel_transport = stream;
+
+ g_object_set (stream, "rtcp-mux", TRUE, NULL);
+
+ gst_bin_add (GST_BIN (webrtc), GST_ELEMENT (stream->send_bin));
+ gst_bin_add (GST_BIN (webrtc), GST_ELEMENT (stream->receive_bin));
+
+ if (!(sctp_transport = webrtc->priv->sctp_transport)) {
+ sctp_transport = gst_webrtc_sctp_transport_new ();
+ sctp_transport->transport =
+ g_object_ref (webrtc->priv->data_channel_transport->transport);
+ sctp_transport->webrtcbin = webrtc;
+
+ gst_bin_add (GST_BIN (webrtc), sctp_transport->sctpdec);
+ gst_bin_add (GST_BIN (webrtc), sctp_transport->sctpenc);
+ }
+
+ g_signal_connect (sctp_transport->sctpdec, "pad-added",
+ G_CALLBACK (_on_sctpdec_pad_added), webrtc);
+ g_signal_connect (sctp_transport, "notify::state",
+ G_CALLBACK (_on_sctp_state_notify), webrtc);
+
+ if (!gst_element_link_pads (GST_ELEMENT (stream->receive_bin), "data_src",
+ GST_ELEMENT (sctp_transport->sctpdec), "sink"))
+ g_warn_if_reached ();
+
+ if (!gst_element_link_pads (GST_ELEMENT (sctp_transport->sctpenc), "src",
+ GST_ELEMENT (stream->send_bin), "data_sink"))
+ g_warn_if_reached ();
+
+ for (i = 0; i < webrtc->priv->data_channels->len; i++) {
+ GstWebRTCDataChannel *channel;
+
+ channel =
+ g_array_index (webrtc->priv->data_channels, GstWebRTCDataChannel *,
+ i);
+
+ _link_data_channel_to_sctp (webrtc, channel);
+ }
+
+ gst_element_sync_state_with_parent (GST_ELEMENT (stream->send_bin));
+ gst_element_sync_state_with_parent (GST_ELEMENT (stream->receive_bin));
+
+ if (!webrtc->priv->sctp_transport) {
+ gst_element_sync_state_with_parent (GST_ELEMENT
+ (sctp_transport->sctpdec));
+ gst_element_sync_state_with_parent (GST_ELEMENT
+ (sctp_transport->sctpenc));
+ }
+
+ g_array_append_val (webrtc->priv->transports, stream);
+
+ webrtc->priv->sctp_transport = sctp_transport;
+ }
+
+ return webrtc->priv->data_channel_transport;
+}
+
+static TransportStream *
+_create_transport_stream (GstWebRTCBin * webrtc, guint session_id,
+ gboolean is_datachannel)
+{
+ if (is_datachannel)
+ return _create_data_channel_transports (webrtc, session_id);
+ else
+ return _create_rtp_transport_channel (webrtc, session_id);
+}
+
static guint
g_array_find_uint (GArray * array, guint val)
{
@@ -1797,7 +2107,7 @@ sdp_media_from_transceiver (GstWebRTCBin * webrtc, GstSDPMedia * media,
/* FIXME: bundle */
item = _find_transport_for_session (webrtc, media_idx);
if (!item)
- item = _create_transport_channel (webrtc, media_idx);
+ item = _create_transport_stream (webrtc, media_idx, FALSE);
webrtc_transceiver_set_transport (WEBRTC_TRANSCEIVER (trans), item);
}
@@ -1873,6 +2183,58 @@ _create_offer_task (GstWebRTCBin * webrtc, const GstStructure * options)
gst_sdp_media_uninit (&media);
}
+ /* add data channel support */
+ if (webrtc->priv->data_channels->len > 0) {
+ GstSDPMedia media = { 0, };
+ gchar *ufrag, *pwd, *sdp_mid;
+
+ gst_sdp_media_init (&media);
+ /* mandated by JSEP */
+ gst_sdp_media_add_attribute (&media, "setup", "actpass");
+
+ /* FIXME: only needed when restarting ICE */
+ _generate_ice_credentials (&ufrag, &pwd);
+ gst_sdp_media_add_attribute (&media, "ice-ufrag", ufrag);
+ gst_sdp_media_add_attribute (&media, "ice-pwd", pwd);
+ g_free (ufrag);
+ g_free (pwd);
+
+ gst_sdp_media_set_media (&media, "application");
+ gst_sdp_media_set_port_info (&media, 9, 0);
+ gst_sdp_media_set_proto (&media, "UDP/DTLS/SCTP");
+ gst_sdp_media_add_connection (&media, "IN", "IP4", "0.0.0.0", 0, 0);
+ gst_sdp_media_add_format (&media, "webrtc-datachannel");
+
+ sdp_mid = g_strdup_printf ("%s%u", gst_sdp_media_get_media (&media),
+ webrtc->priv->media_counter++);
+ gst_sdp_media_add_attribute (&media, "mid", sdp_mid);
+ g_free (sdp_mid);
+
+ /* FIXME: negotiate this properly */
+ gst_sdp_media_add_attribute (&media, "sctp-port", "5000");
+
+ _create_data_channel_transports (webrtc, webrtc->priv->transceivers->len);
+ {
+ gchar *cert, *fingerprint, *val;
+
+ g_object_get (webrtc->priv->sctp_transport->transport, "certificate",
+ &cert, NULL);
+
+ fingerprint =
+ _generate_fingerprint_from_certificate (cert, G_CHECKSUM_SHA256);
+ g_free (cert);
+ val =
+ g_strdup_printf ("%s %s",
+ _g_checksum_to_webrtc_string (G_CHECKSUM_SHA256), fingerprint);
+ g_free (fingerprint);
+
+ gst_sdp_media_add_attribute (&media, "fingerprint", val);
+ g_free (val);
+ }
+
+ gst_sdp_message_add_media (ret, &media);
+ }
+
/* FIXME: pre-emptively setup receiving elements when needed */
/* XXX: only true for the initial offerer */
@@ -2045,7 +2407,6 @@ _create_answer_task (GstWebRTCBin * webrtc, const GstStructure * options)
gst_sdp_media_new (&media);
gst_sdp_media_set_port_info (media, 9, 0);
- gst_sdp_media_set_proto (media, "UDP/TLS/RTP/SAVPF");
gst_sdp_media_add_connection (media, "IN", "IP4", "0.0.0.0", 0, 0);
{
@@ -2060,6 +2421,7 @@ _create_answer_task (GstWebRTCBin * webrtc, const GstStructure * options)
offer_media =
(GstSDPMedia *) gst_sdp_message_get_media (pending_remote->sdp, i);
+
for (j = 0; j < gst_sdp_media_attributes_len (offer_media); j++) {
const GstSDPAttribute *attr =
gst_sdp_media_get_attribute (offer_media, j);
@@ -2071,155 +2433,217 @@ _create_answer_task (GstWebRTCBin * webrtc, const GstStructure * options)
}
}
- offer_caps = gst_caps_new_empty ();
- for (j = 0; j < gst_sdp_media_formats_len (offer_media); j++) {
- guint pt = atoi (gst_sdp_media_get_format (offer_media, j));
- GstCaps *caps;
+ /* set the a=setup: attribute */
+ offer_setup = _get_dtls_setup_from_media (offer_media);
+ answer_setup = _intersect_dtls_setup (offer_setup);
+ if (answer_setup == GST_WEBRTC_DTLS_SETUP_NONE) {
+ GST_WARNING_OBJECT (webrtc, "Could not intersect offer setup with "
+ "transceiver direction");
+ goto rejected;
+ }
+ _media_replace_setup (media, answer_setup);
- caps = gst_sdp_media_get_caps_from_media (offer_media, pt);
+ if (g_strcmp0 (gst_sdp_media_get_media (offer_media), "application") == 0) {
+ int sctp_port;
- /* gst_sdp_media_get_caps_from_media() produces caps with name
- * "application/x-unknown" which will fail intersection with
- * "application/x-rtp" caps so mangle the returns caps to have the
- * correct name here */
- for (k = 0; k < gst_caps_get_size (caps); k++) {
- GstStructure *s = gst_caps_get_structure (caps, k);
- gst_structure_set_name (s, "application/x-rtp");
+ if (gst_sdp_media_formats_len (offer_media) != 1) {
+ GST_WARNING_OBJECT (webrtc, "Could not find a format in the m= line "
+ "for webrtc-datachannel");
+ goto rejected;
+ }
+ if (g_strcmp0 (gst_sdp_media_get_format (offer_media, 0),
+ "webrtc-datachannel") != 0) {
+ GST_WARNING_OBJECT (webrtc,
+ "format field of data channel m= line "
+ "is not \'webrtc-datachannel\'");
+ goto rejected;
+ }
+ sctp_port = _get_sctp_port_from_media (offer_media);
+ if (sctp_port == -1) {
+ GST_WARNING_OBJECT (webrtc, "media does not contain a sctp port");
+ goto rejected;
}
- gst_caps_append (offer_caps, caps);
- }
+ /* XXX: older browsers will produce a different SDP format for data
+ * channel that is currently not parsed correctly */
+ gst_sdp_media_set_proto (media, "UDP/DTLS/SCTP");
- for (j = 0; j < webrtc->priv->transceivers->len; j++) {
- GstCaps *trans_caps;
+ gst_sdp_media_set_media (media, "application");
+ gst_sdp_media_set_port_info (media, 9, 0);
+ gst_sdp_media_add_format (media, "webrtc-datachannel");
- rtp_trans =
- g_array_index (webrtc->priv->transceivers, GstWebRTCRTPTransceiver *,
- j);
- trans_caps = _find_codec_preferences (webrtc, rtp_trans, GST_PAD_SINK, j);
+ /* FIXME: negotiate this properly on renegotiation */
+ gst_sdp_media_add_attribute (media, "sctp-port", "5000");
- GST_TRACE_OBJECT (webrtc, "trying to compare %" GST_PTR_FORMAT
- " and %" GST_PTR_FORMAT, offer_caps, trans_caps);
+ _create_data_channel_transports (webrtc, i);
- /* FIXME: technically this is a little overreaching as some fields we
- * we can deal with not having and/or we may have unrecognized fields
- * that we cannot actually support */
- if (trans_caps) {
- answer_caps = gst_caps_intersect (offer_caps, trans_caps);
- if (answer_caps && !gst_caps_is_empty (answer_caps)) {
- GST_LOG_OBJECT (webrtc,
- "found compatible transceiver %" GST_PTR_FORMAT
- " for offer media %u", trans, i);
- if (trans_caps)
- gst_caps_unref (trans_caps);
- break;
- } else {
- if (answer_caps) {
- gst_caps_unref (answer_caps);
- answer_caps = NULL;
+ {
+ gchar *cert, *fingerprint, *val;
+
+ g_object_get (webrtc->priv->sctp_transport->transport, "certificate",
+ &cert, NULL);
+
+ fingerprint =
+ _generate_fingerprint_from_certificate (cert, G_CHECKSUM_SHA256);
+ g_free (cert);
+ val =
+ g_strdup_printf ("%s %s",
+ _g_checksum_to_webrtc_string (G_CHECKSUM_SHA256), fingerprint);
+ g_free (fingerprint);
+
+ gst_sdp_media_add_attribute (media, "fingerprint", val);
+ g_free (val);
+ }
+ } else if (g_strcmp0 (gst_sdp_media_get_media (offer_media), "audio") == 0
+ || g_strcmp0 (gst_sdp_media_get_media (offer_media), "video") == 0) {
+ gst_sdp_media_set_proto (media, "UDP/TLS/RTP/SAVPF");
+
+ offer_caps = gst_caps_new_empty ();
+ for (j = 0; j < gst_sdp_media_formats_len (offer_media); j++) {
+ guint pt = atoi (gst_sdp_media_get_format (offer_media, j));
+ GstCaps *caps;
+
+ caps = gst_sdp_media_get_caps_from_media (offer_media, pt);
+
+ /* gst_sdp_media_get_caps_from_media() produces caps with name
+ * "application/x-unknown" which will fail intersection with
+ * "application/x-rtp" caps so mangle the returns caps to have the
+ * correct name here */
+ for (k = 0; k < gst_caps_get_size (caps); k++) {
+ GstStructure *s = gst_caps_get_structure (caps, k);
+ gst_structure_set_name (s, "application/x-rtp");
+ }
+
+ gst_caps_append (offer_caps, caps);
+ }
+
+ for (j = 0; j < webrtc->priv->transceivers->len; j++) {
+ GstCaps *trans_caps;
+
+ rtp_trans =
+ g_array_index (webrtc->priv->transceivers,
+ GstWebRTCRTPTransceiver *, j);
+ trans_caps =
+ _find_codec_preferences (webrtc, rtp_trans, GST_PAD_SINK, j);
+
+ GST_TRACE_OBJECT (webrtc, "trying to compare %" GST_PTR_FORMAT
+ " and %" GST_PTR_FORMAT, offer_caps, trans_caps);
+
+ /* FIXME: technically this is a little overreaching as some fields we
+ * we can deal with not having and/or we may have unrecognized fields
+ * that we cannot actually support */
+ if (trans_caps) {
+ answer_caps = gst_caps_intersect (offer_caps, trans_caps);
+ if (answer_caps && !gst_caps_is_empty (answer_caps)) {
+ GST_LOG_OBJECT (webrtc,
+ "found compatible transceiver %" GST_PTR_FORMAT
+ " for offer media %u", trans, i);
+ if (trans_caps)
+ gst_caps_unref (trans_caps);
+ break;
+ } else {
+ if (answer_caps) {
+ gst_caps_unref (answer_caps);
+ answer_caps = NULL;
+ }
+ if (trans_caps)
+ gst_caps_unref (trans_caps);
+ rtp_trans = NULL;
}
- if (trans_caps)
- gst_caps_unref (trans_caps);
+ } else {
rtp_trans = NULL;
}
- } else {
- rtp_trans = NULL;
}
- }
- if (rtp_trans) {
- answer_dir = rtp_trans->direction;
- g_assert (answer_caps != NULL);
- } else {
- /* if no transceiver, then we only receive that stream and respond with
- * the exact same caps */
- /* FIXME: how to validate that subsequent elements can actually receive
- * this payload/format */
- answer_dir = GST_WEBRTC_RTP_TRANSCEIVER_DIRECTION_RECVONLY;
- answer_caps = gst_caps_ref (offer_caps);
- }
+ if (rtp_trans) {
+ answer_dir = rtp_trans->direction;
+ g_assert (answer_caps != NULL);
+ } else {
+ /* if no transceiver, then we only receive that stream and respond with
+ * the exact same caps */
+ /* FIXME: how to validate that subsequent elements can actually receive
+ * this payload/format */
+ answer_dir = GST_WEBRTC_RTP_TRANSCEIVER_DIRECTION_RECVONLY;
+ answer_caps = gst_caps_ref (offer_caps);
+ }
- if (!rtp_trans) {
- trans = _create_webrtc_transceiver (webrtc, answer_dir, i);
- rtp_trans = GST_WEBRTC_RTP_TRANSCEIVER (trans);
- } else {
- trans = WEBRTC_TRANSCEIVER (rtp_trans);
- }
+ if (!rtp_trans) {
+ trans = _create_webrtc_transceiver (webrtc, answer_dir, i);
+ rtp_trans = GST_WEBRTC_RTP_TRANSCEIVER (trans);
+ } else {
+ trans = WEBRTC_TRANSCEIVER (rtp_trans);
+ }
- if (!trans->do_nack) {
- answer_caps = gst_caps_make_writable (answer_caps);
- for (k = 0; k < gst_caps_get_size (answer_caps); k++) {
- GstStructure *s = gst_caps_get_structure (answer_caps, k);
- gst_structure_remove_fields (s, "rtcp-fb-nack", NULL);
+ if (!trans->do_nack) {
+ answer_caps = gst_caps_make_writable (answer_caps);
+ for (k = 0; k < gst_caps_get_size (answer_caps); k++) {
+ GstStructure *s = gst_caps_get_structure (answer_caps, k);
+ gst_structure_remove_fields (s, "rtcp-fb-nack", NULL);
+ }
}
- }
- gst_sdp_media_set_media_from_caps (answer_caps, media);
+ gst_sdp_media_set_media_from_caps (answer_caps, media);
- _get_rtx_target_pt_and_ssrc_from_caps (answer_caps, &target_pt,
- &target_ssrc);
+ _get_rtx_target_pt_and_ssrc_from_caps (answer_caps, &target_pt,
+ &target_ssrc);
- original_target_pt = target_pt;
+ original_target_pt = target_pt;
- _media_add_fec (media, trans, offer_caps, &target_pt);
- if (trans->do_nack) {
- _media_add_rtx (media, trans, offer_caps, target_pt, target_ssrc);
- if (target_pt != original_target_pt)
- _media_add_rtx (media, trans, offer_caps, original_target_pt,
- target_ssrc);
- }
+ _media_add_fec (media, trans, offer_caps, &target_pt);
+ if (trans->do_nack) {
+ _media_add_rtx (media, trans, offer_caps, target_pt, target_ssrc);
+ if (target_pt != original_target_pt)
+ _media_add_rtx (media, trans, offer_caps, original_target_pt,
+ target_ssrc);
+ }
- if (answer_dir != GST_WEBRTC_RTP_TRANSCEIVER_DIRECTION_RECVONLY)
- _media_add_ssrcs (media, answer_caps, webrtc,
- WEBRTC_TRANSCEIVER (rtp_trans));
+ if (answer_dir != GST_WEBRTC_RTP_TRANSCEIVER_DIRECTION_RECVONLY)
+ _media_add_ssrcs (media, answer_caps, webrtc,
+ WEBRTC_TRANSCEIVER (rtp_trans));
- gst_caps_unref (answer_caps);
- answer_caps = NULL;
+ gst_caps_unref (answer_caps);
+ answer_caps = NULL;
- /* set the new media direction */
- offer_dir = _get_direction_from_media (offer_media);
- answer_dir = _intersect_answer_directions (offer_dir, answer_dir);
- if (answer_dir == GST_WEBRTC_RTP_TRANSCEIVER_DIRECTION_NONE) {
- GST_WARNING_OBJECT (webrtc, "Could not intersect offer direction with "
- "transceiver direction");
- goto rejected;
- }
- _media_replace_direction (media, answer_dir);
+ /* set the new media direction */
+ offer_dir = _get_direction_from_media (offer_media);
+ answer_dir = _intersect_answer_directions (offer_dir, answer_dir);
+ if (answer_dir == GST_WEBRTC_RTP_TRANSCEIVER_DIRECTION_NONE) {
+ GST_WARNING_OBJECT (webrtc, "Could not intersect offer direction with "
+ "transceiver direction");
+ goto rejected;
+ }
+ _media_replace_direction (media, answer_dir);
+
+ /* FIXME: bundle! */
+ if (!trans->stream) {
+ TransportStream *item = _find_transport_for_session (webrtc, i);
+ if (!item)
+ item = _create_transport_stream (webrtc, i, FALSE);
+ webrtc_transceiver_set_transport (trans, item);
+ }
+ /* set the a=fingerprint: for this transport */
+ g_object_get (trans->stream->transport, "certificate", &cert, NULL);
+
+ {
+ gchar *fingerprint, *val;
+
+ fingerprint =
+ _generate_fingerprint_from_certificate (cert, G_CHECKSUM_SHA256);
+ g_free (cert);
+ val =
+ g_strdup_printf ("%s %s",
+ _g_checksum_to_webrtc_string (G_CHECKSUM_SHA256), fingerprint);
+ g_free (fingerprint);
+
+ gst_sdp_media_add_attribute (media, "fingerprint", val);
+ g_free (val);
+ }
- /* set the a=setup: attribute */
- offer_setup = _get_dtls_setup_from_media (offer_media);
- answer_setup = _intersect_dtls_setup (offer_setup);
- if (answer_setup == GST_WEBRTC_DTLS_SETUP_NONE) {
- GST_WARNING_OBJECT (webrtc, "Could not intersect offer direction with "
- "transceiver direction");
+ gst_caps_unref (offer_caps);
+ } else {
+ GST_WARNING_OBJECT (webrtc, "unknown m= line media name");
goto rejected;
}
- _media_replace_setup (media, answer_setup);
-
- /* FIXME: bundle! */
- if (!trans->stream) {
- TransportStream *item = _find_transport_for_session (webrtc, i);
- if (!item)
- item = _create_transport_channel (webrtc, i);
- webrtc_transceiver_set_transport (trans, item);
- }
- /* set the a=fingerprint: for this transport */
- g_object_get (trans->stream->transport, "certificate", &cert, NULL);
-
- {
- gchar *fingerprint, *val;
-
- fingerprint =
- _generate_fingerprint_from_certificate (cert, G_CHECKSUM_SHA256);
- g_free (cert);
- val =
- g_strdup_printf ("%s %s",
- _g_checksum_to_webrtc_string (G_CHECKSUM_SHA256), fingerprint);
- g_free (fingerprint);
-
- gst_sdp_media_add_attribute (media, "fingerprint", val);
- g_free (val);
- }
if (0) {
rejected:
@@ -2230,8 +2654,6 @@ _create_answer_task (GstWebRTCBin * webrtc, const GstStructure * options)
}
gst_sdp_message_add_media (ret, media);
gst_sdp_media_free (media);
-
- gst_caps_unref (offer_caps);
}
/* FIXME: can we add not matched transceivers? */
@@ -2410,7 +2832,7 @@ _connect_input_stream (GstWebRTCBin * webrtc, GstWebRTCBinPad * pad)
/* FIXME: bundle */
item = _find_transport_for_session (webrtc, pad->mlineindex);
if (!item)
- item = _create_transport_channel (webrtc, pad->mlineindex);
+ item = _create_transport_stream (webrtc, pad->mlineindex, FALSE);
webrtc_transceiver_set_transport (trans, item);
}
@@ -2454,7 +2876,7 @@ _connect_output_stream (GstWebRTCBin * webrtc, GstWebRTCBinPad * pad)
/* FIXME: bundle */
item = _find_transport_for_session (webrtc, pad->mlineindex);
if (!item)
- item = _create_transport_channel (webrtc, pad->mlineindex);
+ item = _create_transport_stream (webrtc, pad->mlineindex, FALSE);
webrtc_transceiver_set_transport (trans, item);
}
@@ -2512,10 +2934,9 @@ _filter_sdp_fields (GQuark field_id, const GValue * value,
static void
_update_transceiver_from_sdp_media (GstWebRTCBin * webrtc,
const GstSDPMessage * sdp, guint media_idx,
- GstWebRTCRTPTransceiver * rtp_trans)
+ TransportStream * stream, GstWebRTCRTPTransceiver * rtp_trans)
{
WebRTCTransceiver *trans = WEBRTC_TRANSCEIVER (rtp_trans);
- TransportStream *stream = trans->stream;
GstWebRTCRTPTransceiverDirection prev_dir = rtp_trans->current_direction;
GstWebRTCRTPTransceiverDirection new_dir;
const GstSDPMedia *media = gst_sdp_message_get_media (sdp, media_idx);
@@ -2534,14 +2955,6 @@ _update_transceiver_from_sdp_media (GstWebRTCBin * webrtc,
}
}
- if (!stream) {
- /* FIXME: find an existing transport for e.g. bundle/reconfiguration */
- stream = _find_transport_for_session (webrtc, media_idx);
- if (!stream)
- stream = _create_transport_channel (webrtc, media_idx);
- webrtc_transceiver_set_transport (trans, stream);
- }
-
{
const GstSDPMedia *local_media, *remote_media;
GstWebRTCRTPTransceiverDirection local_dir, remote_dir;
@@ -2725,6 +3138,121 @@ _update_transceiver_from_sdp_media (GstWebRTCBin * webrtc,
}
}
+/* must be called with the pc lock held */
+static gint
+_generate_data_channel_id (GstWebRTCBin * webrtc)
+{
+ gboolean is_client;
+ gint new_id = -1, max_channels = 0;
+
+ if (webrtc->priv->sctp_transport) {
+ g_object_get (webrtc->priv->sctp_transport, "max-channels", &max_channels,
+ NULL);
+ }
+ if (max_channels <= 0) {
+ max_channels = 65534;
+ }
+
+ g_object_get (webrtc->priv->sctp_transport->transport, "client", &is_client,
+ NULL);
+
+ /* TODO: a better search algorithm */
+ do {
+ GstWebRTCDataChannel *channel;
+
+ new_id++;
+
+ if (new_id < 0 || new_id >= max_channels) {
+ /* exhausted id space */
+ GST_WARNING_OBJECT (webrtc, "Could not find a suitable "
+ "data channel id (max %i)", max_channels);
+ return -1;
+ }
+
+ /* client must generate even ids, server must generate odd ids */
+ if (new_id % 2 == ! !is_client)
+ continue;
+
+ channel = _find_data_channel_for_id (webrtc, new_id);
+ if (!channel)
+ break;
+ } while (TRUE);
+
+ return new_id;
+}
+
+static void
+_update_data_channel_from_sdp_media (GstWebRTCBin * webrtc,
+ const GstSDPMessage * sdp, guint media_idx, TransportStream * stream)
+{
+ const GstSDPMedia *local_media, *remote_media;
+ GstWebRTCDTLSSetup local_setup, remote_setup, new_setup;
+ TransportReceiveBin *receive;
+ int local_port, remote_port;
+ guint64 local_max_size, remote_max_size, max_size;
+ int i;
+
+ local_media =
+ gst_sdp_message_get_media (webrtc->current_local_description->sdp,
+ media_idx);
+ remote_media =
+ gst_sdp_message_get_media (webrtc->current_remote_description->sdp,
+ media_idx);
+
+ local_setup = _get_dtls_setup_from_media (local_media);
+ remote_setup = _get_dtls_setup_from_media (remote_media);
+ new_setup = _get_final_setup (local_setup, remote_setup);
+ if (new_setup == GST_WEBRTC_DTLS_SETUP_NONE)
+ return;
+
+ /* data channel is always rtcp-muxed to avoid generating ICE candidates
+ * for RTCP */
+ g_object_set (stream, "rtcp-mux", TRUE, "dtls-client",
+ new_setup == GST_WEBRTC_DTLS_SETUP_ACTIVE, NULL);
+
+ local_port = _get_sctp_port_from_media (local_media);
+ remote_port = _get_sctp_port_from_media (local_media);
+ if (local_port == -1 || remote_port == -1)
+ return;
+
+ if (0 == (local_max_size =
+ _get_sctp_max_message_size_from_media (local_media)))
+ local_max_size = G_MAXUINT64;
+ if (0 == (remote_max_size =
+ _get_sctp_max_message_size_from_media (remote_media)))
+ remote_max_size = G_MAXUINT64;
+ max_size = MIN (local_max_size, remote_max_size);
+
+ webrtc->priv->sctp_transport->max_message_size = max_size;
+
+ g_object_set (webrtc->priv->sctp_transport->sctpdec, "local-sctp-port",
+ local_port, NULL);
+ g_object_set (webrtc->priv->sctp_transport->sctpenc, "remote-sctp-port",
+ remote_port, NULL);
+
+ for (i = 0; i < webrtc->priv->data_channels->len; i++) {
+ GstWebRTCDataChannel *channel;
+
+ channel =
+ g_array_index (webrtc->priv->data_channels, GstWebRTCDataChannel *, i);
+
+ if (channel->id == -1)
+ channel->id = _generate_data_channel_id (webrtc);
+ if (channel->id == -1)
+ GST_ELEMENT_WARNING (webrtc, RESOURCE, NOT_FOUND,
+ ("%s", "Failed to generate an identifier for a data channel"), NULL);
+
+ if (webrtc->priv->sctp_transport->association_established
+ && !channel->negotiated && !channel->opened) {
+ _link_data_channel_to_sctp (webrtc, channel);
+ gst_webrtc_data_channel_start_negotiation (channel);
+ }
+ }
+
+ receive = TRANSPORT_RECEIVE_BIN (stream->receive_bin);
+ transport_receive_bin_set_receive_state (receive, RECEIVE_STATE_PASS);
+}
+
static gboolean
_find_compatible_unassociated_transceiver (GstWebRTCRTPTransceiver * p1,
gconstpointer data)
@@ -2745,6 +3273,7 @@ _update_transceivers_from_sdp (GstWebRTCBin * webrtc, SDPSource source,
for (i = 0; i < gst_sdp_message_medias_len (sdp->sdp); i++) {
const GstSDPMedia *media = gst_sdp_message_get_media (sdp->sdp, i);
+ TransportStream *stream;
GstWebRTCRTPTransceiver *trans;
/* skip rejected media */
@@ -2753,24 +3282,41 @@ _update_transceivers_from_sdp (GstWebRTCBin * webrtc, SDPSource source,
trans = _find_transceiver_for_sdp_media (webrtc, sdp->sdp, i);
+ stream = _find_transport_for_session (webrtc, i);
+ if (!stream) {
+ stream = _create_transport_stream (webrtc, i,
+ _message_media_is_datachannel (sdp->sdp, i));
+ if (trans)
+ webrtc_transceiver_set_transport ((WebRTCTransceiver *) trans, stream);
+ }
+
if (source == SDP_LOCAL && sdp->type == GST_WEBRTC_SDP_TYPE_OFFER && !trans) {
GST_ERROR ("State mismatch. Could not find local transceiver by mline.");
return FALSE;
} else {
- if (trans) {
- _update_transceiver_from_sdp_media (webrtc, sdp->sdp, i, trans);
+ if (g_strcmp0 (gst_sdp_media_get_media (media), "audio") == 0 ||
+ g_strcmp0 (gst_sdp_media_get_media (media), "video") == 0) {
+ if (trans) {
+ _update_transceiver_from_sdp_media (webrtc, sdp->sdp, i, stream,
+ trans);
+ } else {
+ trans = _find_transceiver (webrtc, NULL,
+ (FindTransceiverFunc) _find_compatible_unassociated_transceiver);
+ /* XXX: default to the advertised direction in the sdp for new
+ * transceviers. The spec doesn't actually say what happens here, only
+ * that calls to setDirection will change the value. Nothing about
+ * a default value when the transceiver is created internally */
+ if (!trans)
+ trans =
+ GST_WEBRTC_RTP_TRANSCEIVER (_create_webrtc_transceiver (webrtc,
+ _get_direction_from_media (media), i));
+ _update_transceiver_from_sdp_media (webrtc, sdp->sdp, i, stream,
+ trans);
+ }
+ } else if (_message_media_is_datachannel (sdp->sdp, i)) {
+ _update_data_channel_from_sdp_media (webrtc, sdp->sdp, i, stream);
} else {
- trans = _find_transceiver (webrtc, NULL,
- (FindTransceiverFunc) _find_compatible_unassociated_transceiver);
- /* XXX: default to the advertised direction in the sdp for new
- * transceviers. The spec doesn't actually say what happens here, only
- * that calls to setDirection will change the value. Nothing about
- * a default value when the transceiver is created internally */
- if (!trans)
- trans =
- GST_WEBRTC_RTP_TRANSCEIVER (_create_webrtc_transceiver (webrtc,
- _get_direction_from_media (media), i));
- _update_transceiver_from_sdp_media (webrtc, sdp->sdp, i, trans);
+ GST_ERROR_OBJECT (webrtc, "Unknown media type in SDP at index %u", i);
}
}
}
@@ -2982,8 +3528,6 @@ _set_description_task (GstWebRTCBin * webrtc, struct set_description *sd)
g_free (to);
}
- /* TODO: necessary data channel modifications */
-
if (sd->sdp->type == GST_WEBRTC_SDP_TYPE_ROLLBACK) {
/* FIXME:
* If the mid value of an RTCRtpTransceiver was set to a non-null value
@@ -3001,8 +3545,8 @@ _set_description_task (GstWebRTCBin * webrtc, struct set_description *sd)
}
if (webrtc->signaling_state == GST_WEBRTC_SIGNALING_STATE_STABLE) {
- GList *tmp;
gboolean prev_need_negotiation = webrtc->priv->need_negotiation;
+ GList *tmp;
/* media modifications */
_update_transceivers_from_sdp (webrtc, sd->source, sd->sdp);
@@ -3040,7 +3584,9 @@ _set_description_task (GstWebRTCBin * webrtc, struct set_description *sd)
/* FIXME: bundle */
item = _find_transport_for_session (webrtc, i);
if (!item)
- item = _create_transport_channel (webrtc, i);
+ item =
+ _create_transport_stream (webrtc, i,
+ _message_media_is_datachannel (sd->sdp->sdp, i));
_get_ice_credentials_from_sdp_media (sd->sdp->sdp, i, &ufrag, &pwd);
gst_webrtc_ice_set_local_credentials (webrtc->priv->ice,
@@ -3060,7 +3606,9 @@ _set_description_task (GstWebRTCBin * webrtc, struct set_description *sd)
/* FIXME: bundle */
item = _find_transport_for_session (webrtc, i);
if (!item)
- item = _create_transport_channel (webrtc, i);
+ item =
+ _create_transport_stream (webrtc, i,
+ _message_media_is_datachannel (sd->sdp->sdp, i));
_get_ice_credentials_from_sdp_media (sd->sdp->sdp, i, &ufrag, &pwd);
gst_webrtc_ice_set_remote_credentials (webrtc->priv->ice,
@@ -3374,6 +3922,130 @@ copy_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
return TRUE;
}
+static GstWebRTCDataChannel *
+gst_webrtc_bin_create_data_channel (GstWebRTCBin * webrtc, const gchar * label,
+ GstStructure * init_params)
+{
+ gboolean ordered;
+ gint max_packet_lifetime;
+ gint max_retransmits;
+ const gchar *protocol;
+ gboolean negotiated;
+ gint id;
+ GstWebRTCPriorityType priority;
+ GstWebRTCDataChannel *ret;
+ gint max_channels = 65534;
+
+ g_return_val_if_fail (GST_IS_WEBRTC_BIN (webrtc), NULL);
+ g_return_val_if_fail (label != NULL, NULL);
+ g_return_val_if_fail (strlen (label) <= 65535, NULL);
+ g_return_val_if_fail (webrtc->priv->is_closed != TRUE, NULL);
+
+ if (!init_params
+ || !gst_structure_get_boolean (init_params, "ordered", &ordered))
+ ordered = TRUE;
+ if (!init_params
+ || !gst_structure_get_int (init_params, "max-packet-lifetime",
+ &max_packet_lifetime))
+ max_packet_lifetime = -1;
+ if (!init_params
+ || !gst_structure_get_boolean (init_params, "max-retransmits",
+ &max_retransmits))
+ max_retransmits = -1;
+ /* both retransmits and lifetime cannot be set */
+ g_return_val_if_fail ((max_packet_lifetime == -1)
+ || (max_retransmits == -1), NULL);
+
+ if (!init_params
+ || !(protocol = gst_structure_get_string (init_params, "protocol")))
+ protocol = "";
+ g_return_val_if_fail (strlen (protocol) <= 65535, NULL);
+
+ if (!init_params
+ || !gst_structure_get_boolean (init_params, "negotiated", &negotiated))
+ negotiated = FALSE;
+ if (!negotiated || !init_params
+ || !gst_structure_get_int (init_params, "id", &id))
+ id = -1;
+ if (negotiated)
+ g_return_val_if_fail (id != -1, NULL);
+ g_return_val_if_fail (id < 65535, NULL);
+
+ if (!init_params
+ || !gst_structure_get_enum (init_params, "priority",
+ GST_TYPE_WEBRTC_PRIORITY_TYPE, (gint *) & priority))
+ priority = GST_WEBRTC_PRIORITY_TYPE_LOW;
+
+ /* FIXME: clamp max-retransmits and max-packet-lifetime */
+
+ if (webrtc->priv->sctp_transport) {
+ /* Let transport be the connection's [[SctpTransport]] slot.
+ *
+ * If the [[DataChannelId]] slot is not null, transport is in
+ * connected state and [[DataChannelId]] is greater or equal to the
+ * transport's [[MaxChannels]] slot, throw an OperationError.
+ */
+ g_object_get (webrtc->priv->sctp_transport, "max-channels", &max_channels,
+ NULL);
+
+ g_return_val_if_fail (id <= max_channels, NULL);
+ }
+
+ if (!_have_nice_elements (webrtc) || !_have_dtls_elements (webrtc) ||
+ !_have_sctp_elements (webrtc))
+ return NULL;
+
+ PC_LOCK (webrtc);
+ /* check if the id has been used already */
+ if (id != -1) {
+ GstWebRTCDataChannel *channel = _find_data_channel_for_id (webrtc, id);
+ if (channel) {
+ GST_ELEMENT_WARNING (webrtc, LIBRARY, SETTINGS,
+ ("Attempting to add a data channel with a duplicate ID: %i", id),
+ NULL);
+ PC_UNLOCK (webrtc);
+ return NULL;
+ }
+ } else if (webrtc->current_local_description
+ && webrtc->current_remote_description && webrtc->priv->sctp_transport
+ && webrtc->priv->sctp_transport->transport) {
+ /* else we can only generate an id if we're configured already. The other
+ * case for generating an id is on sdp setting */
+ id = _generate_data_channel_id (webrtc);
+ if (id == -1) {
+ GST_ELEMENT_WARNING (webrtc, RESOURCE, NOT_FOUND,
+ ("%s", "Failed to generate an identifier for a data channel"), NULL);
+ PC_UNLOCK (webrtc);
+ return NULL;
+ }
+ }
+
+ ret = g_object_new (GST_TYPE_WEBRTC_DATA_CHANNEL, "label", label,
+ "ordered", ordered, "max-packet-lifetime", max_packet_lifetime,
+ "max-retransmits", max_retransmits, "protocol", protocol,
+ "negotiated", negotiated, "id", id, "priority", priority, NULL);
+
+ if (ret) {
+ gst_bin_add (GST_BIN (webrtc), ret->appsrc);
+ gst_bin_add (GST_BIN (webrtc), ret->appsink);
+
+ gst_element_sync_state_with_parent (ret->appsrc);
+ gst_element_sync_state_with_parent (ret->appsink);
+
+ ret = gst_object_ref (ret);
+ ret->webrtcbin = webrtc;
+ g_array_append_val (webrtc->priv->data_channels, ret);
+ _link_data_channel_to_sctp (webrtc, ret);
+ if (webrtc->priv->sctp_transport &&
+ webrtc->priv->sctp_transport->association_established
+ && !ret->negotiated)
+ gst_webrtc_data_channel_start_negotiation (ret);
+ }
+
+ PC_UNLOCK (webrtc);
+ return ret;
+}
+
/* === rtpbin signal implementations === */
static void
@@ -4001,6 +4673,8 @@ gst_webrtc_bin_dispose (GObject * object)
g_array_free (webrtc->priv->ice_stream_map, TRUE);
webrtc->priv->ice_stream_map = NULL;
+ g_clear_object (&webrtc->priv->sctp_transport);
+
G_OBJECT_CLASS (parent_class)->dispose (object);
}
@@ -4017,6 +4691,14 @@ gst_webrtc_bin_finalize (GObject * object)
g_array_free (webrtc->priv->transceivers, TRUE);
webrtc->priv->transceivers = NULL;
+ if (webrtc->priv->data_channels)
+ g_array_free (webrtc->priv->data_channels, TRUE);
+ webrtc->priv->data_channels = NULL;
+
+ if (webrtc->priv->pending_data_channels)
+ g_array_free (webrtc->priv->pending_data_channels, TRUE);
+ webrtc->priv->pending_data_channels = NULL;
+
if (webrtc->priv->pending_ice_candidates)
g_array_free (webrtc->priv->pending_ice_candidates, TRUE);
webrtc->priv->pending_ice_candidates = NULL;
@@ -4313,6 +4995,16 @@ gst_webrtc_bin_class_init (GstWebRTCBinClass * klass)
G_TYPE_NONE, 1, GST_TYPE_WEBRTC_RTP_TRANSCEIVER);
/**
+ * GstWebRTCBin::on-data-channel:
+ * @object: the #GstWebRtcBin
+ * @candidate: the new #GstWebRTCDataChannel
+ */
+ gst_webrtc_bin_signals[ON_DATA_CHANNEL_SIGNAL] =
+ g_signal_new ("on-data-channel", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
+ G_TYPE_NONE, 1, GST_TYPE_WEBRTC_DATA_CHANNEL);
+
+ /**
* GstWebRTCBin::add-transceiver:
* @object: the #GstWebRtcBin
* @direction: the direction of the new transceiver
@@ -4351,6 +5043,33 @@ gst_webrtc_bin_class_init (GstWebRTCBinClass * klass)
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
G_CALLBACK (gst_webrtc_bin_add_turn_server), NULL, NULL,
g_cclosure_marshal_generic, G_TYPE_BOOLEAN, 1, G_TYPE_STRING);
+
+ /*
+ * GstWebRTCBin::create-data-channel:
+ * @object: the #GstWebRtcBin
+ * @label: the label for the data channel
+ * @options: a #GstStructure of options for creating the data channel
+ *
+ * The options dictionary is the same format as the RTCDataChannelInit
+ * members outlined https://www.w3.org/TR/webrtc/#dom-rtcdatachannelinit and
+ * and reproduced below
+ *
+ * ordered G_TYPE_BOOLEAN Whether the channal will send data with guarenteed ordering
+ * max-packet-lifetime G_TYPE_INT The time in milliseconds to attempt transmitting unacknowledged data. -1 for unset
+ * max-retransmits G_TYPE_INT The number of times data will be attempted to be transmitted without acknowledgement before dropping
+ * protocol G_TYPE_STRING The subprotocol used by this channel
+ * negotiated G_TYPE_BOOLEAN Whether the created data channel should not perform in-band chnanel announcment. If %TRUE, then application must negotiate the channel itself and create the corresponding channel on the peer with the same id.
+ * id G_TYPE_INT Override the default identifier selection of this channel
+ * priority GST_TYPE_WEBRTC_PRIORITY_TYPE The priority to use for this channel
+ *
+ * Returns: a new data channel object
+ */
+ gst_webrtc_bin_signals[CREATE_DATA_CHANNEL_SIGNAL] =
+ g_signal_new_class_handler ("create-data-channel",
+ G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
+ G_CALLBACK (gst_webrtc_bin_create_data_channel), NULL, NULL,
+ g_cclosure_marshal_generic, GST_TYPE_WEBRTC_DATA_CHANNEL, 2,
+ G_TYPE_STRING, GST_TYPE_STRUCTURE);
}
static void
@@ -4404,6 +5123,15 @@ gst_webrtc_bin_init (GstWebRTCBin * webrtc)
g_array_set_clear_func (webrtc->priv->transports,
(GDestroyNotify) _transport_free);
+ webrtc->priv->data_channels = g_array_new (FALSE, TRUE, sizeof (gpointer));
+ g_array_set_clear_func (webrtc->priv->data_channels,
+ (GDestroyNotify) _deref_and_unref);
+
+ webrtc->priv->pending_data_channels =
+ g_array_new (FALSE, TRUE, sizeof (gpointer));
+ g_array_set_clear_func (webrtc->priv->pending_data_channels,
+ (GDestroyNotify) _deref_and_unref);
+
webrtc->priv->session_mid_map =
g_array_new (FALSE, TRUE, sizeof (SessionMidItem));
g_array_set_clear_func (webrtc->priv->session_mid_map,
diff --git a/ext/webrtc/gstwebrtcbin.h b/ext/webrtc/gstwebrtcbin.h
index 49603ec5e..a0cc69446 100644
--- a/ext/webrtc/gstwebrtcbin.h
+++ b/ext/webrtc/gstwebrtcbin.h
@@ -23,6 +23,7 @@
#include <gst/sdp/sdp.h>
#include "fwd.h"
#include "gstwebrtcice.h"
+#include "transportstream.h"
G_BEGIN_DECLS
@@ -37,7 +38,9 @@ typedef enum
GST_WEBRTC_BIN_ERROR_INVALID_STATE,
GST_WEBRTC_BIN_ERROR_BAD_SDP,
GST_WEBRTC_BIN_ERROR_FINGERPRINT,
-} GstWebRTCJSEPSDPError;
+ GST_WEBRTC_BIN_ERROR_SCTP_FAILURE,
+ GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
+} GstWebRTCError;
GType gst_webrtc_bin_pad_get_type(void);
#define GST_TYPE_WEBRTC_BIN_PAD (gst_webrtc_bin_pad_get_type())
@@ -107,6 +110,13 @@ struct _GstWebRTCBinPrivate
GArray *transceivers;
GArray *session_mid_map;
GArray *transports;
+ GArray *data_channels;
+ /* list of data channels we've received a sctp stream for but no data
+ * channel protocol for */
+ GArray *pending_data_channels;
+
+ GstWebRTCSCTPTransport *sctp_transport;
+ TransportStream *data_channel_transport;
GstWebRTCICE *ice;
GArray *ice_stream_map;
@@ -115,7 +125,6 @@ struct _GstWebRTCBinPrivate
/* peerconnection variables */
gboolean is_closed;
gboolean need_negotiation;
- gpointer sctp_transport; /* FIXME */
/* peerconnection helper thread for promises */
GMainContext *main_context;
diff --git a/ext/webrtc/meson.build b/ext/webrtc/meson.build
index 04145eb05..b2f485be5 100644
--- a/ext/webrtc/meson.build
+++ b/ext/webrtc/meson.build
@@ -4,6 +4,7 @@ webrtc_sources = [
'gstwebrtcstats.c',
'icestream.c',
'nicetransport.c',
+ 'sctptransport.c',
'gstwebrtcbin.c',
'transportreceivebin.c',
'transportsendbin.c',
@@ -11,6 +12,7 @@ webrtc_sources = [
'utils.c',
'webrtcsdp.c',
'webrtctransceiver.c',
+ 'webrtcdatachannel.c',
]
libnice_dep = dependency('nice', version : '>=0.1.14', required : get_option('webrtc'),
@@ -22,7 +24,7 @@ if libnice_dep.found()
webrtc_sources,
c_args : gst_plugins_bad_args + ['-DGST_USE_UNSTABLE_API'],
include_directories : [configinc],
- dependencies : [libnice_dep, gstbase_dep, gstsdp_dep, gstwebrtc_dep],
+ dependencies : [libnice_dep, gstbase_dep, gstsdp_dep, gstapp_dep, gstwebrtc_dep, gstsctp_dep],
install : true,
install_dir : plugins_install_dir,
)
diff --git a/ext/webrtc/sctptransport.c b/ext/webrtc/sctptransport.c
new file mode 100644
index 000000000..f5643e9fe
--- /dev/null
+++ b/ext/webrtc/sctptransport.c
@@ -0,0 +1,270 @@
+/* GStreamer
+ * Copyright (C) 2018 Matthew Waters <matthew@centricular.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
+
+#include <stdio.h>
+
+#include "sctptransport.h"
+#include "gstwebrtcbin.h"
+
+#define GST_CAT_DEFAULT gst_webrtc_sctp_transport_debug
+GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT);
+
+enum
+{
+ SIGNAL_0,
+ ON_RESET_STREAM_SIGNAL,
+ LAST_SIGNAL,
+};
+
+enum
+{
+ PROP_0,
+ PROP_TRANSPORT,
+ PROP_STATE,
+ PROP_MAX_MESSAGE_SIZE,
+ PROP_MAX_CHANNELS,
+};
+
+static guint gst_webrtc_sctp_transport_signals[LAST_SIGNAL] = { 0 };
+
+#define gst_webrtc_sctp_transport_parent_class parent_class
+G_DEFINE_TYPE_WITH_CODE (GstWebRTCSCTPTransport, gst_webrtc_sctp_transport,
+ GST_TYPE_OBJECT, GST_DEBUG_CATEGORY_INIT (gst_webrtc_sctp_transport_debug,
+ "webrtcsctptransport", 0, "webrtcsctptransport"););
+
+typedef void (*SCTPTask) (GstWebRTCSCTPTransport * sctp, gpointer user_data);
+
+struct task
+{
+ GstWebRTCSCTPTransport *sctp;
+ SCTPTask func;
+ gpointer user_data;
+ GDestroyNotify notify;
+};
+
+static void
+_execute_task (GstWebRTCBin * webrtc, struct task *task)
+{
+ if (task->func)
+ task->func (task->sctp, task->user_data);
+}
+
+static void
+_free_task (struct task *task)
+{
+ gst_object_unref (task->sctp);
+
+ if (task->notify)
+ task->notify (task->user_data);
+ g_free (task);
+}
+
+static void
+_sctp_enqueue_task (GstWebRTCSCTPTransport * sctp, SCTPTask func,
+ gpointer user_data, GDestroyNotify notify)
+{
+ struct task *task = g_new0 (struct task, 1);
+
+ task->sctp = gst_object_ref (sctp);
+ task->func = func;
+ task->user_data = user_data;
+ task->notify = notify;
+
+ gst_webrtc_bin_enqueue_task (sctp->webrtcbin,
+ (GstWebRTCBinFunc) _execute_task, task, (GDestroyNotify) _free_task);
+}
+
+static void
+_emit_stream_reset (GstWebRTCSCTPTransport * sctp, gpointer user_data)
+{
+ guint stream_id = GPOINTER_TO_UINT (user_data);
+
+ g_signal_emit (sctp,
+ gst_webrtc_sctp_transport_signals[ON_RESET_STREAM_SIGNAL], 0, stream_id);
+}
+
+static void
+_on_sctp_dec_pad_removed (GstElement * sctpdec, GstPad * pad,
+ GstWebRTCSCTPTransport * sctp)
+{
+ guint stream_id;
+
+ if (sscanf (GST_PAD_NAME (pad), "src_%u", &stream_id) != 1)
+ return;
+
+ _sctp_enqueue_task (sctp, (SCTPTask) _emit_stream_reset,
+ GUINT_TO_POINTER (stream_id), NULL);
+}
+
+static void
+_on_sctp_association_established (GstElement * sctpenc, gboolean established,
+ GstWebRTCSCTPTransport * sctp)
+{
+ GST_OBJECT_LOCK (sctp);
+ if (established)
+ sctp->state = GST_WEBRTC_SCTP_TRANSPORT_STATE_CONNECTED;
+ else
+ sctp->state = GST_WEBRTC_SCTP_TRANSPORT_STATE_CLOSED;
+ sctp->association_established = established;
+ GST_OBJECT_UNLOCK (sctp);
+
+ g_object_notify (G_OBJECT (sctp), "state");
+}
+
+static void
+gst_webrtc_sctp_transport_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec)
+{
+// GstWebRTCSCTPTransport *sctp = GST_WEBRTC_SCTP_TRANSPORT (object);
+
+ switch (prop_id) {
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_webrtc_sctp_transport_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec)
+{
+ GstWebRTCSCTPTransport *sctp = GST_WEBRTC_SCTP_TRANSPORT (object);
+
+ switch (prop_id) {
+ case PROP_TRANSPORT:
+ g_value_set_object (value, sctp->transport);
+ break;
+ case PROP_STATE:
+ g_value_set_enum (value, sctp->state);
+ break;
+ case PROP_MAX_MESSAGE_SIZE:
+ g_value_set_uint64 (value, sctp->max_message_size);
+ break;
+ case PROP_MAX_CHANNELS:
+ g_value_set_uint (value, sctp->max_channels);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_webrtc_sctp_transport_finalize (GObject * object)
+{
+ GstWebRTCSCTPTransport *sctp = GST_WEBRTC_SCTP_TRANSPORT (object);
+
+ g_signal_handlers_disconnect_by_data (sctp->sctpdec, sctp);
+ g_signal_handlers_disconnect_by_data (sctp->sctpenc, sctp);
+
+ gst_object_unref (sctp->sctpdec);
+ gst_object_unref (sctp->sctpenc);
+
+ g_clear_object (&sctp->transport);
+
+ G_OBJECT_CLASS (parent_class)->finalize (object);
+}
+
+static void
+gst_webrtc_sctp_transport_constructed (GObject * object)
+{
+ GstWebRTCSCTPTransport *sctp = GST_WEBRTC_SCTP_TRANSPORT (object);
+ guint association_id;
+
+ association_id = g_random_int_range (0, G_MAXUINT16);
+
+ sctp->sctpdec =
+ g_object_ref_sink (gst_element_factory_make ("sctpdec", NULL));
+ g_object_set (sctp->sctpdec, "sctp-association-id", association_id, NULL);
+ sctp->sctpenc =
+ g_object_ref_sink (gst_element_factory_make ("sctpenc", NULL));
+ g_object_set (sctp->sctpenc, "sctp-association-id", association_id, NULL);
+
+ g_signal_connect (sctp->sctpdec, "pad-removed",
+ G_CALLBACK (_on_sctp_dec_pad_removed), sctp);
+ g_signal_connect (sctp->sctpenc, "sctp-association-established",
+ G_CALLBACK (_on_sctp_association_established), sctp);
+
+ G_OBJECT_CLASS (parent_class)->constructed (object);
+}
+
+static void
+gst_webrtc_sctp_transport_class_init (GstWebRTCSCTPTransportClass * klass)
+{
+ GObjectClass *gobject_class = (GObjectClass *) klass;
+
+ gobject_class->constructed = gst_webrtc_sctp_transport_constructed;
+ gobject_class->get_property = gst_webrtc_sctp_transport_get_property;
+ gobject_class->set_property = gst_webrtc_sctp_transport_set_property;
+ gobject_class->finalize = gst_webrtc_sctp_transport_finalize;
+
+ g_object_class_install_property (gobject_class,
+ PROP_TRANSPORT,
+ g_param_spec_object ("transport",
+ "WebRTC DTLS Transport",
+ "DTLS transport used for this SCTP transport",
+ GST_TYPE_WEBRTC_DTLS_TRANSPORT,
+ G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class,
+ PROP_STATE,
+ g_param_spec_enum ("state",
+ "WebRTC SCTP Transport state", "WebRTC SCTP Transport state",
+ GST_TYPE_WEBRTC_SCTP_TRANSPORT_STATE,
+ GST_WEBRTC_SCTP_TRANSPORT_STATE_NEW,
+ G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class,
+ PROP_MAX_MESSAGE_SIZE,
+ g_param_spec_uint64 ("max-message-size",
+ "Maximum message size",
+ "Maximum message size as reported by the transport", 0, G_MAXUINT64,
+ 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class,
+ PROP_MAX_CHANNELS,
+ g_param_spec_uint ("max-channels",
+ "Maximum number of channels", "Maximum number of channels",
+ 0, G_MAXUINT16, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+
+ /**
+ * GstWebRTCSCTPTransport::reset-stream:
+ * @object: the #GstWebRTCSCTPTransport
+ * @stream_id: the SCTP stream that was reset
+ */
+ gst_webrtc_sctp_transport_signals[ON_RESET_STREAM_SIGNAL] =
+ g_signal_new ("stream-reset", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
+ G_TYPE_NONE, 1, G_TYPE_UINT);
+}
+
+static void
+gst_webrtc_sctp_transport_init (GstWebRTCSCTPTransport * nice)
+{
+}
+
+GstWebRTCSCTPTransport *
+gst_webrtc_sctp_transport_new (void)
+{
+ return g_object_new (GST_TYPE_WEBRTC_SCTP_TRANSPORT, NULL);
+}
diff --git a/ext/webrtc/sctptransport.h b/ext/webrtc/sctptransport.h
new file mode 100644
index 000000000..d5327a77e
--- /dev/null
+++ b/ext/webrtc/sctptransport.h
@@ -0,0 +1,65 @@
+/* GStreamer
+ * Copyright (C) 2018 Matthew Waters <matthew@centricular.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef __GST_WEBRTC_SCTP_TRANSPORT_H__
+#define __GST_WEBRTC_SCTP_TRANSPORT_H__
+
+#include <gst/gst.h>
+/* libnice */
+#include <agent.h>
+#include <gst/webrtc/webrtc.h>
+#include "gstwebrtcice.h"
+
+G_BEGIN_DECLS
+
+GType gst_webrtc_sctp_transport_get_type(void);
+#define GST_TYPE_WEBRTC_SCTP_TRANSPORT (gst_webrtc_sctp_transport_get_type())
+#define GST_WEBRTC_SCTP_TRANSPORT(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_WEBRTC_SCTP_TRANSPORT,GstWebRTCSCTPTransport))
+#define GST_IS_WEBRTC_SCTP_TRANSPORT(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_WEBRTC_SCTP_TRANSPORT))
+#define GST_WEBRTC_SCTP_TRANSPORT_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass) ,GST_TYPE_WEBRTC_SCTP_TRANSPORT,GstWebRTCSCTPTransportClass))
+#define GST_IS_WEBRTC_SCTP_TRANSPORT_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass) ,GST_TYPE_WEBRTC_SCTP_TRANSPORT))
+#define GST_WEBRTC_SCTP_TRANSPORT_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS((obj) ,GST_TYPE_WEBRTC_SCTP_TRANSPORT,GstWebRTCSCTPTransportClass))
+
+struct _GstWebRTCSCTPTransport
+{
+ GstObject parent;
+
+ GstWebRTCDTLSTransport *transport;
+ GstWebRTCSCTPTransportState state;
+ guint64 max_message_size;
+ guint max_channels;
+
+ gboolean association_established;
+
+ GstElement *sctpdec;
+ GstElement *sctpenc;
+
+ GstWebRTCBin *webrtcbin;
+};
+
+struct _GstWebRTCSCTPTransportClass
+{
+ GstObjectClass parent_class;
+};
+
+GstWebRTCSCTPTransport * gst_webrtc_sctp_transport_new (void);
+
+G_END_DECLS
+
+#endif /* __GST_WEBRTC_SCTP_TRANSPORT_H__ */
diff --git a/ext/webrtc/transportreceivebin.c b/ext/webrtc/transportreceivebin.c
index 6730b1fb7..4038cda0d 100644
--- a/ext/webrtc/transportreceivebin.c
+++ b/ext/webrtc/transportreceivebin.c
@@ -25,23 +25,24 @@
#include "utils.h"
/*
- * ,----------------------------transport_receive_%u-----------------------------,
- * ; (rtp) ;
- * ; ,---nicesrc----, ,-capsfilter-, ,----dtlssrtpdec----, ,--funnel--, ;
- * ; ; src o--o sink src o--o sink rtp_src o------o sink_0 ; ;
- * ; '--------------' '------------' ; ; ; src o--o rtp_src
- * ; ; rtcp_src o-, ,--o sink_1 ; ;
- * ; '-------------------' ; ; '----------' ;
- * ; ; ; ,--funnel--, ;
- * ; '-+--o sink_0 ; ;
- * ; ,-' ; src o--o rtcp_src
- * ; (rtcp) ; ,-o sink_1 ; ;
- * ; ,---nicesrc----, ,-capsfilter-, ,----dtlssrtpdec----, ; ; '----------' ;
- * ; ; src o--o sink src o--o sink rtp_src o-' ; ;
- * ; '--------------' '------------' ; ; ; ;
- * ; ; rtcp_src o----' ;
- * ; '-------------------' ;
- * '-----------------------------------------------------------------------------'
+ * ,----------------------------transport_receive_%u----------------------------,
+ * ; (rtp/data) ;
+ * ; ,---nicesrc----, ,-capsfilter-, ,---dtlssrtpdec---, ,--funnel--, ;
+ * ; ; src o--o sink src o--o sink rtp_src o-------o sink_0 ; ;
+ * ; '--------------' '------------' ; ; ; src o--o rtp_src
+ * ; ; rtcp_src o---, ,-o sink_1 ; ;
+ * ; ; ; ; ; '----------' ;
+ * ; ; data_src o-, ; ; ,--funnel--, ;
+ * ; '-----------------' ; '-+-o sink_0 ; ;
+ * ; ,---dtlssrtpdec---, ; ,-' ; src o--o rtcp_src
+ * ; (rtcp) ; rtp_src o-+-' ,-o sink_1 ; ;
+ * ; ,---nicesrc----, ,-capsfilter-, ; ; ; ; '----------' ;
+ * ; ; src o--o sink src o--o sink rtcp_src o-+---' ,--funnel--, ;
+ * ; '--------------' '------------' ; ; '-----o sink_0 ; ;
+ * ; ; data_src o-, ; src o--o data_src
+ * ; '-----------------' '-----o sink_1 ; ;
+ * ; '----------' ;
+ * '----------------------------------------------------------------------------'
*
* Do we really wnat to be *that* permissive in what we accept?
*
@@ -70,6 +71,12 @@ GST_STATIC_PAD_TEMPLATE ("rtcp_src",
GST_PAD_ALWAYS,
GST_STATIC_CAPS ("application/x-rtp"));
+static GstStaticPadTemplate data_sink_template =
+GST_STATIC_PAD_TEMPLATE ("data_src",
+ GST_PAD_SINK,
+ GST_PAD_ALWAYS,
+ GST_STATIC_CAPS_ANY);
+
enum
{
PROP_0,
@@ -336,6 +343,21 @@ transport_receive_bin_constructed (GObject * object)
gst_element_add_pad (GST_ELEMENT (receive), ghost);
gst_object_unref (pad);
+ /* create funnel for data_src */
+ funnel = gst_element_factory_make ("funnel", NULL);
+ gst_bin_add (GST_BIN (receive), funnel);
+ if (!gst_element_link_pads (receive->stream->transport->dtlssrtpdec,
+ "data_src", funnel, "sink_0"))
+ g_warn_if_reached ();
+ if (!gst_element_link_pads (receive->stream->rtcp_transport->dtlssrtpdec,
+ "data_src", funnel, "sink_1"))
+ g_warn_if_reached ();
+
+ pad = gst_element_get_static_pad (funnel, "src");
+ ghost = gst_ghost_pad_new ("data_src", pad);
+ gst_element_add_pad (GST_ELEMENT (receive), ghost);
+ gst_object_unref (pad);
+
G_OBJECT_CLASS (parent_class)->constructed (object);
}
@@ -350,6 +372,8 @@ transport_receive_bin_class_init (TransportReceiveBinClass * klass)
gst_element_class_add_static_pad_template (element_class, &rtp_sink_template);
gst_element_class_add_static_pad_template (element_class,
&rtcp_sink_template);
+ gst_element_class_add_static_pad_template (element_class,
+ &data_sink_template);
gst_element_class_set_metadata (element_class, "WebRTC Transport Receive Bin",
"Filter/Network/WebRTC", "A bin for webrtc connections",
diff --git a/ext/webrtc/transportsendbin.c b/ext/webrtc/transportsendbin.c
index be1b8aa37..36522d336 100644
--- a/ext/webrtc/transportsendbin.c
+++ b/ext/webrtc/transportsendbin.c
@@ -27,9 +27,11 @@
/*
* ,------------------------transport_send_%u-------------------------,
* ; ,-----dtlssrtpenc---, ;
- * rtp_sink o--------------------------o rtp_sink_0 ; ,---nicesink---, ;
- * ; ; src o--o sink ; ;
- * ; ,--outputselector--, ,-o rtcp_sink_0 ; '--------------' ;
+ * data_sink o--------------------------o data_sink ; ;
+ * ; ; ; ,---nicesink---, ;
+ * rtp_sink o--------------------------o rtp_sink_0 src o--o sink ; ;
+ * ; ; ; '--------------' ;
+ * ; ,--outputselector--, ,-o rtcp_sink_0 ; ;
* ; ; src_0 o-' '-------------------' ;
* rtcp_sink ;---o sink ; ,----dtlssrtpenc----, ,---nicesink---, ;
* ; ; src_1 o---o rtcp_sink_0 src o--o sink ; ;
@@ -61,6 +63,12 @@ GST_STATIC_PAD_TEMPLATE ("rtcp_sink",
GST_PAD_ALWAYS,
GST_STATIC_CAPS ("application/x-rtp"));
+static GstStaticPadTemplate data_sink_template =
+GST_STATIC_PAD_TEMPLATE ("data_sink",
+ GST_PAD_SINK,
+ GST_PAD_ALWAYS,
+ GST_STATIC_CAPS_ANY);
+
enum
{
PROP_0,
@@ -422,6 +430,16 @@ transport_send_bin_constructed (GObject * object)
gst_element_add_pad (GST_ELEMENT (send), ghost);
gst_object_unref (pad);
+ /* push the data stream onto the RTP dtls element */
+ templ = _find_pad_template (transport->dtlssrtpenc,
+ GST_PAD_SINK, GST_PAD_REQUEST, "data_sink");
+ pad = gst_element_request_pad (transport->dtlssrtpenc, templ, "data_sink",
+ NULL);
+
+ ghost = gst_ghost_pad_new ("data_sink", pad);
+ gst_element_add_pad (GST_ELEMENT (send), ghost);
+ gst_object_unref (pad);
+
/* RTCP */
transport = send->stream->rtcp_transport;
/* Do the common init for the context struct */
@@ -509,6 +527,8 @@ transport_send_bin_class_init (TransportSendBinClass * klass)
gst_element_class_add_static_pad_template (element_class, &rtp_sink_template);
gst_element_class_add_static_pad_template (element_class,
&rtcp_sink_template);
+ gst_element_class_add_static_pad_template (element_class,
+ &data_sink_template);
gst_element_class_set_metadata (element_class, "WebRTC Transport Send Bin",
"Filter/Network/WebRTC", "A bin for webrtc connections",
diff --git a/ext/webrtc/webrtcdatachannel.c b/ext/webrtc/webrtcdatachannel.c
new file mode 100644
index 000000000..2f46ee43e
--- /dev/null
+++ b/ext/webrtc/webrtcdatachannel.c
@@ -0,0 +1,1296 @@
+/* GStreamer
+ * Copyright (C) 2018 Matthew Waters <matthew@centricular.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * SECTION:gstwebrtc-datachannel
+ * @short_description: RTCDataChannel object
+ * @title: GstWebRTCDataChannel
+ * @see_also: #GstWebRTCRTPTransceiver
+ *
+ * <ulink url="http://w3c.github.io/webrtc-pc/#dom-rtcsctptransport">http://w3c.github.io/webrtc-pc/#dom-rtcsctptransport</ulink>
+ */
+
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
+
+#include "webrtcdatachannel.h"
+#include <gst/app/gstappsink.h>
+#include <gst/app/gstappsrc.h>
+#include <gst/base/gstbytereader.h>
+#include <gst/base/gstbytewriter.h>
+#include <gst/sctp/sctpreceivemeta.h>
+#include <gst/sctp/sctpsendmeta.h>
+
+#include "gstwebrtcbin.h"
+
+#define GST_CAT_DEFAULT gst_webrtc_data_channel_debug
+GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT);
+
+#define gst_webrtc_data_channel_parent_class parent_class
+G_DEFINE_TYPE_WITH_CODE (GstWebRTCDataChannel, gst_webrtc_data_channel,
+ GST_TYPE_OBJECT, GST_DEBUG_CATEGORY_INIT (gst_webrtc_data_channel_debug,
+ "webrtcdatachannel", 0, "webrtcdatachannel");
+ );
+
+enum
+{
+ SIGNAL_0,
+ SIGNAL_ON_OPEN,
+ SIGNAL_ON_CLOSE,
+ SIGNAL_ON_ERROR,
+ SIGNAL_ON_MESSAGE_DATA,
+ SIGNAL_ON_MESSAGE_STRING,
+ SIGNAL_ON_BUFFERED_AMOUNT_LOW,
+ SIGNAL_SEND_DATA,
+ SIGNAL_SEND_STRING,
+ SIGNAL_CLOSE,
+ LAST_SIGNAL,
+};
+
+enum
+{
+ PROP_0,
+ PROP_LABEL,
+ PROP_ORDERED,
+ PROP_MAX_PACKET_LIFETIME,
+ PROP_MAX_RETRANSMITS,
+ PROP_PROTOCOL,
+ PROP_NEGOTIATED,
+ PROP_ID,
+ PROP_PRIORITY,
+ PROP_READY_STATE,
+ PROP_BUFFERED_AMOUNT,
+ PROP_BUFFERED_AMOUNT_LOW_THRESHOLD,
+};
+
+static guint gst_webrtc_data_channel_signals[LAST_SIGNAL] = { 0 };
+
+typedef enum
+{
+ DATA_CHANNEL_PPID_WEBRTC_CONTROL = 50,
+ DATA_CHANNEL_PPID_WEBRTC_STRING = 51,
+ DATA_CHANNEL_PPID_WEBRTC_BINARY_PARTIAL = 52, /* deprecated */
+ DATA_CHANNEL_PPID_WEBRTC_BINARY = 53,
+ DATA_CHANNEL_PPID_WEBRTC_STRING_PARTIAL = 54, /* deprecated */
+ DATA_CHANNEL_PPID_WEBRTC_BINARY_EMPTY = 56,
+ DATA_CHANNEL_PPID_WEBRTC_STRING_EMPTY = 57,
+} DataChannelPPID;
+
+typedef enum
+{
+ CHANNEL_TYPE_RELIABLE = 0x00,
+ CHANNEL_TYPE_RELIABLE_UNORDERED = 0x80,
+ CHANNEL_TYPE_PARTIAL_RELIABLE_REXMIT = 0x01,
+ CHANNEL_TYPE_PARTIAL_RELIABLE_REXMIT_UNORDERED = 0x81,
+ CHANNEL_TYPE_PARTIAL_RELIABLE_TIMED = 0x02,
+ CHANNEL_TYPE_PARTIAL_RELIABLE_TIMED_UNORDERED = 0x82,
+} DataChannelReliabilityType;
+
+typedef enum
+{
+ CHANNEL_MESSAGE_ACK = 0x02,
+ CHANNEL_MESSAGE_OPEN = 0x03,
+} DataChannelMessage;
+
+static guint16
+priority_type_to_uint (GstWebRTCPriorityType pri)
+{
+ switch (pri) {
+ case GST_WEBRTC_PRIORITY_TYPE_VERY_LOW:
+ return 64;
+ case GST_WEBRTC_PRIORITY_TYPE_LOW:
+ return 192;
+ case GST_WEBRTC_PRIORITY_TYPE_MEDIUM:
+ return 384;
+ case GST_WEBRTC_PRIORITY_TYPE_HIGH:
+ return 768;
+ }
+ g_assert_not_reached ();
+ return 0;
+}
+
+static GstWebRTCPriorityType
+priority_uint_to_type (guint16 val)
+{
+ if (val <= 128)
+ return GST_WEBRTC_PRIORITY_TYPE_VERY_LOW;
+ if (val <= 256)
+ return GST_WEBRTC_PRIORITY_TYPE_LOW;
+ if (val <= 512)
+ return GST_WEBRTC_PRIORITY_TYPE_MEDIUM;
+ return GST_WEBRTC_PRIORITY_TYPE_HIGH;
+}
+
+static GstBuffer *
+construct_open_packet (GstWebRTCDataChannel * channel)
+{
+ GstByteWriter w;
+ gsize label_len = strlen (channel->label);
+ gsize proto_len = strlen (channel->protocol);
+ gsize size = 12 + label_len + proto_len;
+ DataChannelReliabilityType reliability = 0;
+ guint32 reliability_param = 0;
+ guint16 priority;
+ GstBuffer *buf;
+
+/*
+ * 0 1 2 3
+ * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * | Message Type | Channel Type | Priority |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * | Reliability Parameter |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * | Label Length | Protocol Length |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * \ /
+ * | Label |
+ * / \
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * \ /
+ * | Protocol |
+ * / \
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ */
+
+ gst_byte_writer_init_with_size (&w, size, FALSE);
+
+ if (!gst_byte_writer_put_uint8 (&w, (guint8) CHANNEL_MESSAGE_OPEN))
+ g_return_val_if_reached (NULL);
+
+ if (!channel->ordered)
+ reliability |= 0x80;
+ if (channel->max_retransmits != -1) {
+ reliability |= 0x01;
+ reliability_param = channel->max_retransmits;
+ }
+ if (channel->max_packet_lifetime != -1) {
+ reliability |= 0x02;
+ reliability_param = channel->max_packet_lifetime;
+ }
+
+ priority = priority_type_to_uint (channel->priority);
+
+ if (!gst_byte_writer_put_uint8 (&w, (guint8) reliability))
+ g_return_val_if_reached (NULL);
+ if (!gst_byte_writer_put_uint16_be (&w, (guint16) priority))
+ g_return_val_if_reached (NULL);
+ if (!gst_byte_writer_put_uint32_be (&w, (guint32) reliability_param))
+ g_return_val_if_reached (NULL);
+ if (!gst_byte_writer_put_uint16_be (&w, (guint16) label_len))
+ g_return_val_if_reached (NULL);
+ if (!gst_byte_writer_put_uint16_be (&w, (guint16) proto_len))
+ g_return_val_if_reached (NULL);
+ if (!gst_byte_writer_put_data (&w, (guint8 *) channel->label, label_len))
+ g_return_val_if_reached (NULL);
+ if (!gst_byte_writer_put_data (&w, (guint8 *) channel->protocol, proto_len))
+ g_return_val_if_reached (NULL);
+
+ buf = gst_byte_writer_reset_and_get_buffer (&w);
+
+ /* send reliable and ordered */
+ gst_sctp_buffer_add_send_meta (buf, DATA_CHANNEL_PPID_WEBRTC_CONTROL, TRUE,
+ GST_SCTP_SEND_META_PARTIAL_RELIABILITY_NONE, 0);
+
+ return buf;
+}
+
+static GstBuffer *
+construct_ack_packet (GstWebRTCDataChannel * channel)
+{
+ GstByteWriter w;
+ GstBuffer *buf;
+
+/*
+ * 0 1 2 3
+ * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * | Message Type |
+ * +-+-+-+-+-+-+-+-+
+ */
+
+ gst_byte_writer_init_with_size (&w, 1, FALSE);
+
+ if (!gst_byte_writer_put_uint8 (&w, (guint8) CHANNEL_MESSAGE_ACK))
+ g_return_val_if_reached (NULL);
+
+ buf = gst_byte_writer_reset_and_get_buffer (&w);
+
+ /* send reliable and ordered */
+ gst_sctp_buffer_add_send_meta (buf, DATA_CHANNEL_PPID_WEBRTC_CONTROL, TRUE,
+ GST_SCTP_SEND_META_PARTIAL_RELIABILITY_NONE, 0);
+
+ return buf;
+}
+
+typedef void (*ChannelTask) (GstWebRTCDataChannel * channel,
+ gpointer user_data);
+
+struct task
+{
+ GstWebRTCDataChannel *channel;
+ ChannelTask func;
+ gpointer user_data;
+ GDestroyNotify notify;
+};
+
+static void
+_execute_task (GstWebRTCBin * webrtc, struct task *task)
+{
+ if (task->func)
+ task->func (task->channel, task->user_data);
+}
+
+static void
+_free_task (struct task *task)
+{
+ gst_object_unref (task->channel);
+
+ if (task->notify)
+ task->notify (task->user_data);
+ g_free (task);
+}
+
+static void
+_channel_enqueue_task (GstWebRTCDataChannel * channel, ChannelTask func,
+ gpointer user_data, GDestroyNotify notify)
+{
+ struct task *task = g_new0 (struct task, 1);
+
+ task->channel = gst_object_ref (channel);
+ task->func = func;
+ task->user_data = user_data;
+ task->notify = notify;
+
+ gst_webrtc_bin_enqueue_task (channel->webrtcbin,
+ (GstWebRTCBinFunc) _execute_task, task, (GDestroyNotify) _free_task);
+}
+
+static void
+_channel_store_error (GstWebRTCDataChannel * channel, GError * error)
+{
+ GST_OBJECT_LOCK (channel);
+ if (error) {
+ GST_WARNING_OBJECT (channel, "Error: %s",
+ error ? error->message : "Unknown");
+ if (!channel->stored_error)
+ channel->stored_error = error;
+ else
+ g_clear_error (&error);
+ }
+ GST_OBJECT_UNLOCK (channel);
+}
+
+static void
+_maybe_emit_on_error (GstWebRTCDataChannel * channel, GError * error)
+{
+ if (error) {
+ GST_WARNING_OBJECT (channel, "error thrown");
+ g_signal_emit (channel, gst_webrtc_data_channel_signals[SIGNAL_ON_ERROR], 0,
+ error);
+ }
+}
+
+static void
+_emit_on_open (GstWebRTCDataChannel * channel, gpointer user_data)
+{
+ GST_OBJECT_LOCK (channel);
+ if (channel->ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_CLOSING ||
+ channel->ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_CLOSED) {
+ GST_OBJECT_UNLOCK (channel);
+ return;
+ }
+
+ if (channel->ready_state != GST_WEBRTC_DATA_CHANNEL_STATE_OPEN) {
+ channel->ready_state = GST_WEBRTC_DATA_CHANNEL_STATE_OPEN;
+ GST_OBJECT_UNLOCK (channel);
+ g_object_notify (G_OBJECT (channel), "ready-state");
+
+ GST_INFO_OBJECT (channel, "We are open and ready for data!");
+ g_signal_emit (channel, gst_webrtc_data_channel_signals[SIGNAL_ON_OPEN], 0,
+ NULL);
+ } else {
+ GST_OBJECT_UNLOCK (channel);
+ }
+}
+
+static void
+_transport_closed_unlocked (GstWebRTCDataChannel * channel)
+{
+ GError *error;
+
+ if (channel->ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_CLOSED)
+ return;
+
+ channel->ready_state = GST_WEBRTC_DATA_CHANNEL_STATE_CLOSED;
+
+ error = channel->stored_error;
+ channel->stored_error = NULL;
+ GST_OBJECT_UNLOCK (channel);
+
+ g_object_notify (G_OBJECT (channel), "ready-state");
+ GST_INFO_OBJECT (channel, "We are closed for data");
+
+ _maybe_emit_on_error (channel, error);
+
+ g_signal_emit (channel, gst_webrtc_data_channel_signals[SIGNAL_ON_CLOSE], 0,
+ NULL);
+ GST_OBJECT_LOCK (channel);
+}
+
+static void
+_transport_closed (GstWebRTCDataChannel * channel, gpointer user_data)
+{
+ GST_OBJECT_LOCK (channel);
+ _transport_closed_unlocked (channel);
+ GST_OBJECT_UNLOCK (channel);
+}
+
+static void
+_close_sctp_stream (GstWebRTCDataChannel * channel, gpointer user_data)
+{
+ GstPad *pad, *peer;
+
+ pad = gst_element_get_static_pad (channel->appsrc, "src");
+ peer = gst_pad_get_peer (pad);
+ gst_object_unref (pad);
+
+ if (peer) {
+ GstElement *sctpenc = gst_pad_get_parent_element (peer);
+
+ if (sctpenc) {
+ gst_element_release_request_pad (sctpenc, peer);
+ gst_object_unref (sctpenc);
+ }
+ gst_object_unref (peer);
+ }
+
+ _transport_closed (channel, NULL);
+}
+
+static void
+_close_procedure (GstWebRTCDataChannel * channel, gpointer user_data)
+{
+ /* https://www.w3.org/TR/webrtc/#data-transport-closing-procedure */
+ GST_OBJECT_LOCK (channel);
+ if (channel->ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_CLOSED
+ || channel->ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_CLOSING) {
+ GST_OBJECT_UNLOCK (channel);
+ return;
+ }
+ channel->ready_state = GST_WEBRTC_DATA_CHANNEL_STATE_CLOSING;
+ GST_OBJECT_UNLOCK (channel);
+ g_object_notify (G_OBJECT (channel), "ready-state");
+
+ GST_OBJECT_LOCK (channel);
+ if (channel->buffered_amount <= 0) {
+ _channel_enqueue_task (channel, (ChannelTask) _close_sctp_stream,
+ NULL, NULL);
+ }
+
+ GST_OBJECT_UNLOCK (channel);
+}
+
+static void
+_on_sctp_reset_stream (GstWebRTCSCTPTransport * sctp, guint stream_id,
+ GstWebRTCDataChannel * channel)
+{
+ if (channel->id == stream_id)
+ _channel_enqueue_task (channel, (ChannelTask) _transport_closed,
+ GUINT_TO_POINTER (stream_id), NULL);
+}
+
+static void
+gst_webrtc_data_channel_close (GstWebRTCDataChannel * channel)
+{
+ _close_procedure (channel, NULL);
+}
+
+static GstFlowReturn
+_parse_control_packet (GstWebRTCDataChannel * channel, guint8 * data,
+ gsize size, GError ** error)
+{
+ GstByteReader r;
+ guint8 message_type;
+
+ if (!data)
+ g_return_val_if_reached (GST_FLOW_ERROR);
+ if (size < 1)
+ g_return_val_if_reached (GST_FLOW_ERROR);
+
+ gst_byte_reader_init (&r, data, size);
+
+ if (!gst_byte_reader_get_uint8 (&r, &message_type))
+ g_return_val_if_reached (GST_FLOW_ERROR);
+
+ if (message_type == CHANNEL_MESSAGE_ACK) {
+ /* all good */
+ GST_INFO_OBJECT (channel, "Received channel ack");
+ return GST_FLOW_OK;
+ } else if (message_type == CHANNEL_MESSAGE_OPEN) {
+ guint8 reliability;
+ guint32 reliability_param;
+ guint16 priority, label_len, proto_len;
+ const guint8 *src;
+ gchar *label, *proto;
+ GstBuffer *buffer;
+ GstFlowReturn ret;
+
+ GST_INFO_OBJECT (channel, "Received channel open");
+
+ if (channel->negotiated) {
+ g_set_error (error, GST_WEBRTC_BIN_ERROR,
+ GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
+ "Data channel was signalled as negotiated already");
+ g_return_val_if_reached (GST_FLOW_ERROR);
+ }
+
+ if (channel->opened)
+ return GST_FLOW_OK;
+
+ if (!gst_byte_reader_get_uint8 (&r, &reliability))
+ goto parse_error;
+ if (!gst_byte_reader_get_uint16_be (&r, &priority))
+ goto parse_error;
+ if (!gst_byte_reader_get_uint32_be (&r, &reliability_param))
+ goto parse_error;
+ if (!gst_byte_reader_get_uint16_be (&r, &label_len))
+ goto parse_error;
+ if (!gst_byte_reader_get_uint16_be (&r, &proto_len))
+ goto parse_error;
+
+ label = g_new0 (gchar, (gsize) label_len + 1);
+ proto = g_new0 (gchar, (gsize) proto_len + 1);
+
+ if (!gst_byte_reader_get_data (&r, label_len, &src))
+ goto parse_error;
+ memcpy (label, src, label_len);
+ label[label_len] = '\0';
+ if (!gst_byte_reader_get_data (&r, proto_len, &src))
+ goto parse_error;
+ memcpy (proto, src, proto_len);
+ proto[proto_len] = '\0';
+
+ channel->label = label;
+ channel->protocol = proto;
+ channel->priority = priority_uint_to_type (priority);
+ channel->ordered = !(reliability & 0x80);
+ if (reliability & 0x01) {
+ channel->max_retransmits = reliability_param;
+ channel->max_packet_lifetime = -1;
+ } else if (reliability & 0x02) {
+ channel->max_retransmits = -1;
+ channel->max_packet_lifetime = reliability_param;
+ } else {
+ channel->max_retransmits = -1;
+ channel->max_packet_lifetime = -1;
+ }
+ channel->opened = TRUE;
+
+ GST_INFO_OBJECT (channel, "Received channel open for SCTP stream %i "
+ "label %s protocol %s ordered %s", channel->id, channel->label,
+ channel->protocol, channel->ordered ? "true" : "false");
+
+ _channel_enqueue_task (channel, (ChannelTask) _emit_on_open, NULL, NULL);
+
+ GST_INFO_OBJECT (channel, "Sending channel ack");
+ buffer = construct_ack_packet (channel);
+
+ GST_OBJECT_LOCK (channel);
+ channel->buffered_amount += gst_buffer_get_size (buffer);
+ GST_OBJECT_UNLOCK (channel);
+
+ ret = gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc), buffer);
+ if (ret != GST_FLOW_OK) {
+ g_set_error (error, GST_WEBRTC_BIN_ERROR,
+ GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
+ "Could not send ack packet");
+ }
+ return ret;
+ } else {
+ g_set_error (error, GST_WEBRTC_BIN_ERROR,
+ GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
+ "Unknown message type in control protocol");
+ return GST_FLOW_ERROR;
+ }
+
+parse_error:
+ {
+ g_set_error (error, GST_WEBRTC_BIN_ERROR,
+ GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, "Failed to parse packet");
+ g_return_val_if_reached (GST_FLOW_ERROR);
+ }
+}
+
+static void
+on_sink_eos (GstAppSink * sink, gpointer user_data)
+{
+}
+
+struct map_info
+{
+ GstBuffer *buffer;
+ GstMapInfo map_info;
+};
+
+static void
+buffer_unmap_and_unref (struct map_info *info)
+{
+ gst_buffer_unmap (info->buffer, &info->map_info);
+ gst_buffer_unref (info->buffer);
+ g_free (info);
+}
+
+static void
+_emit_have_data (GstWebRTCDataChannel * channel, GBytes * data)
+{
+ GST_LOG_OBJECT (channel, "Have data %p", data);
+ g_signal_emit (channel,
+ gst_webrtc_data_channel_signals[SIGNAL_ON_MESSAGE_DATA], 0, data);
+}
+
+static void
+_emit_have_string (GstWebRTCDataChannel * channel, gchar * str)
+{
+ GST_LOG_OBJECT (channel, "Have string %p", str);
+ g_signal_emit (channel,
+ gst_webrtc_data_channel_signals[SIGNAL_ON_MESSAGE_STRING], 0, str);
+}
+
+static GstFlowReturn
+_data_channel_have_sample (GstWebRTCDataChannel * channel, GstSample * sample,
+ GError ** error)
+{
+ GstSctpReceiveMeta *receive;
+ GstBuffer *buffer;
+ GstFlowReturn ret = GST_FLOW_OK;
+
+ GST_LOG_OBJECT (channel, "Received sample %" GST_PTR_FORMAT, sample);
+
+ g_return_val_if_fail (channel->sctp_transport != NULL, GST_FLOW_ERROR);
+
+ buffer = gst_sample_get_buffer (sample);
+ if (!buffer) {
+ g_set_error (error, GST_WEBRTC_BIN_ERROR,
+ GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, "No buffer to handle");
+ return GST_FLOW_ERROR;
+ }
+ receive = gst_sctp_buffer_get_receive_meta (buffer);
+ if (!receive) {
+ g_set_error (error, GST_WEBRTC_BIN_ERROR,
+ GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
+ "No SCTP Receive meta on the buffer");
+ return GST_FLOW_ERROR;
+ }
+
+ switch (receive->ppid) {
+ case DATA_CHANNEL_PPID_WEBRTC_CONTROL:{
+ GstMapInfo info = GST_MAP_INFO_INIT;
+ if (!gst_buffer_map (buffer, &info, GST_MAP_READ)) {
+ g_set_error (error, GST_WEBRTC_BIN_ERROR,
+ GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
+ "Failed to map received buffer");
+ ret = GST_FLOW_ERROR;
+ } else {
+ ret = _parse_control_packet (channel, info.data, info.size, error);
+ }
+ break;
+ }
+ case DATA_CHANNEL_PPID_WEBRTC_STRING:
+ case DATA_CHANNEL_PPID_WEBRTC_STRING_PARTIAL:{
+ GstMapInfo info = GST_MAP_INFO_INIT;
+ if (!gst_buffer_map (buffer, &info, GST_MAP_READ)) {
+ g_set_error (error, GST_WEBRTC_BIN_ERROR,
+ GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
+ "Failed to map received buffer");
+ ret = GST_FLOW_ERROR;
+ } else {
+ gchar *str = g_strndup ((gchar *) info.data, info.size);
+ _channel_enqueue_task (channel, (ChannelTask) _emit_have_string, str,
+ g_free);
+ }
+ break;
+ }
+ case DATA_CHANNEL_PPID_WEBRTC_BINARY:
+ case DATA_CHANNEL_PPID_WEBRTC_BINARY_PARTIAL:{
+ struct map_info *info = g_new0 (struct map_info, 1);
+ if (!gst_buffer_map (buffer, &info->map_info, GST_MAP_READ)) {
+ g_set_error (error, GST_WEBRTC_BIN_ERROR,
+ GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
+ "Failed to map received buffer");
+ ret = GST_FLOW_ERROR;
+ } else {
+ GBytes *data = g_bytes_new_with_free_func (info->map_info.data,
+ info->map_info.size, (GDestroyNotify) buffer_unmap_and_unref, info);
+ info->buffer = gst_buffer_ref (buffer);
+ _channel_enqueue_task (channel, (ChannelTask) _emit_have_data, data,
+ (GDestroyNotify) g_bytes_unref);
+ }
+ break;
+ }
+ case DATA_CHANNEL_PPID_WEBRTC_BINARY_EMPTY:
+ _channel_enqueue_task (channel, (ChannelTask) _emit_have_data, NULL,
+ NULL);
+ break;
+ case DATA_CHANNEL_PPID_WEBRTC_STRING_EMPTY:
+ _channel_enqueue_task (channel, (ChannelTask) _emit_have_string, NULL,
+ NULL);
+ break;
+ default:
+ g_set_error (error, GST_WEBRTC_BIN_ERROR,
+ GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
+ "Unknown SCTP PPID %u received", receive->ppid);
+ ret = GST_FLOW_ERROR;
+ break;
+ }
+
+ return ret;
+}
+
+static GstFlowReturn
+on_sink_preroll (GstAppSink * sink, gpointer user_data)
+{
+ GstWebRTCDataChannel *channel = user_data;
+ GstSample *sample = gst_app_sink_pull_preroll (sink);
+ GstFlowReturn ret;
+
+ if (sample) {
+ /* This sample also seems to be provided by the sample callback
+ ret = _data_channel_have_sample (channel, sample); */
+ ret = GST_FLOW_OK;
+ gst_sample_unref (sample);
+ } else if (gst_app_sink_is_eos (sink)) {
+ ret = GST_FLOW_EOS;
+ } else {
+ ret = GST_FLOW_ERROR;
+ }
+
+ if (ret != GST_FLOW_OK) {
+ _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
+ }
+
+ return ret;
+}
+
+static GstFlowReturn
+on_sink_sample (GstAppSink * sink, gpointer user_data)
+{
+ GstWebRTCDataChannel *channel = user_data;
+ GstSample *sample = gst_app_sink_pull_sample (sink);
+ GstFlowReturn ret;
+ GError *error = NULL;
+
+ if (sample) {
+ ret = _data_channel_have_sample (channel, sample, &error);
+ gst_sample_unref (sample);
+ } else if (gst_app_sink_is_eos (sink)) {
+ ret = GST_FLOW_EOS;
+ } else {
+ ret = GST_FLOW_ERROR;
+ }
+
+ if (error)
+ _channel_store_error (channel, error);
+
+ if (ret != GST_FLOW_OK) {
+ _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
+ }
+
+ return ret;
+}
+
+static GstAppSinkCallbacks sink_callbacks = {
+ on_sink_eos,
+ on_sink_preroll,
+ on_sink_sample,
+};
+
+void
+gst_webrtc_data_channel_start_negotiation (GstWebRTCDataChannel * channel)
+{
+ GstBuffer *buffer;
+
+ g_return_if_fail (!channel->negotiated);
+ g_return_if_fail (channel->id != -1);
+ g_return_if_fail (channel->sctp_transport != NULL);
+
+ buffer = construct_open_packet (channel);
+
+ GST_INFO_OBJECT (channel, "Sending channel open for SCTP stream %i "
+ "label %s protocol %s ordered %s", channel->id, channel->label,
+ channel->protocol, channel->ordered ? "true" : "false");
+
+ GST_OBJECT_LOCK (channel);
+ channel->buffered_amount += gst_buffer_get_size (buffer);
+ GST_OBJECT_UNLOCK (channel);
+
+ if (gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc),
+ buffer) == GST_FLOW_OK) {
+ channel->opened = TRUE;
+ _channel_enqueue_task (channel, (ChannelTask) _emit_on_open, NULL, NULL);
+ } else {
+ GError *error = NULL;
+ g_set_error (&error, GST_WEBRTC_BIN_ERROR,
+ GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
+ "Failed to send DCEP open packet");
+ _channel_store_error (channel, error);
+ _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
+ }
+}
+
+static void
+_get_sctp_reliability (GstWebRTCDataChannel * channel,
+ GstSctpSendMetaPartiallyReliability * reliability, guint * rel_param)
+{
+ if (channel->max_retransmits != -1) {
+ *reliability = GST_SCTP_SEND_META_PARTIAL_RELIABILITY_RTX;
+ *rel_param = channel->max_retransmits;
+ } else if (channel->max_packet_lifetime != -1) {
+ *reliability = GST_SCTP_SEND_META_PARTIAL_RELIABILITY_TTL;
+ *rel_param = channel->max_packet_lifetime;
+ } else {
+ *reliability = GST_SCTP_SEND_META_PARTIAL_RELIABILITY_NONE;
+ *rel_param = 0;
+ }
+}
+
+static gboolean
+_is_within_max_message_size (GstWebRTCDataChannel * channel, gsize size)
+{
+ return size <= channel->sctp_transport->max_message_size;
+}
+
+static void
+gst_webrtc_data_channel_send_data (GstWebRTCDataChannel * channel,
+ GBytes * bytes)
+{
+ GstSctpSendMetaPartiallyReliability reliability;
+ guint rel_param;
+ guint32 ppid;
+ GstBuffer *buffer;
+ GstFlowReturn ret;
+
+ if (!bytes) {
+ buffer = gst_buffer_new ();
+ ppid = DATA_CHANNEL_PPID_WEBRTC_BINARY_EMPTY;
+ } else {
+ gsize size;
+ guint8 *data;
+
+ data = (guint8 *) g_bytes_get_data (bytes, &size);
+ g_return_if_fail (data != NULL);
+ if (!_is_within_max_message_size (channel, size)) {
+ GError *error = NULL;
+ g_set_error (&error, GST_WEBRTC_BIN_ERROR,
+ GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
+ "Requested to send data that is too large");
+ _channel_store_error (channel, error);
+ _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL,
+ NULL);
+ return;
+ }
+
+ buffer = gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY, data, size,
+ 0, size, bytes, (GDestroyNotify) g_bytes_unref);
+ ppid = DATA_CHANNEL_PPID_WEBRTC_BINARY;
+ }
+
+ _get_sctp_reliability (channel, &reliability, &rel_param);
+ gst_sctp_buffer_add_send_meta (buffer, ppid, channel->ordered, reliability,
+ rel_param);
+
+ GST_LOG_OBJECT (channel, "Sending data using buffer %" GST_PTR_FORMAT,
+ buffer);
+
+ GST_OBJECT_LOCK (channel);
+ channel->buffered_amount += gst_buffer_get_size (buffer);
+ GST_OBJECT_UNLOCK (channel);
+
+ ret = gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc), buffer);
+
+ if (ret != GST_FLOW_OK) {
+ GError *error = NULL;
+ g_set_error (&error, GST_WEBRTC_BIN_ERROR,
+ GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, "Failed to send data");
+ _channel_store_error (channel, error);
+ _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
+ }
+}
+
+static void
+gst_webrtc_data_channel_send_string (GstWebRTCDataChannel * channel,
+ gchar * str)
+{
+ GstSctpSendMetaPartiallyReliability reliability;
+ guint rel_param;
+ guint32 ppid;
+ GstBuffer *buffer;
+ GstFlowReturn ret;
+
+ g_return_if_fail (!channel->negotiated && channel->opened);
+ g_return_if_fail (channel->sctp_transport != NULL);
+
+ if (!str) {
+ buffer = gst_buffer_new ();
+ ppid = DATA_CHANNEL_PPID_WEBRTC_STRING_EMPTY;
+ } else {
+ gsize size = strlen (str);
+ gchar *str_copy = g_strdup (str);
+
+ if (!_is_within_max_message_size (channel, size)) {
+ GError *error = NULL;
+ g_set_error (&error, GST_WEBRTC_BIN_ERROR,
+ GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
+ "Requested to send a string that is too large");
+ _channel_store_error (channel, error);
+ _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL,
+ NULL);
+ return;
+ }
+
+ buffer =
+ gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY, str_copy,
+ size, 0, size, str_copy, g_free);
+ ppid = DATA_CHANNEL_PPID_WEBRTC_STRING;
+ }
+
+ _get_sctp_reliability (channel, &reliability, &rel_param);
+ gst_sctp_buffer_add_send_meta (buffer, ppid, channel->ordered, reliability,
+ rel_param);
+
+ GST_TRACE_OBJECT (channel, "Sending string using buffer %" GST_PTR_FORMAT,
+ buffer);
+
+ GST_OBJECT_LOCK (channel);
+ channel->buffered_amount += gst_buffer_get_size (buffer);
+ GST_OBJECT_UNLOCK (channel);
+
+ ret = gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc), buffer);
+
+ if (ret != GST_FLOW_OK) {
+ GError *error = NULL;
+ g_set_error (&error, GST_WEBRTC_BIN_ERROR,
+ GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, "Failed to send string");
+ _channel_store_error (channel, error);
+ _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
+ }
+}
+
+void
+gst_webrtc_data_channel_set_sctp_transport (GstWebRTCDataChannel * channel,
+ GstWebRTCSCTPTransport * sctp)
+{
+ g_return_if_fail (GST_IS_WEBRTC_DATA_CHANNEL (channel));
+ g_return_if_fail (GST_IS_WEBRTC_SCTP_TRANSPORT (sctp));
+
+ GST_OBJECT_LOCK (channel);
+ if (channel->sctp_transport)
+ g_signal_handlers_disconnect_by_data (channel->sctp_transport, channel);
+
+ gst_object_replace ((GstObject **) & channel->sctp_transport,
+ GST_OBJECT (sctp));
+
+ if (sctp)
+ g_signal_connect (sctp, "stream-reset", G_CALLBACK (_on_sctp_reset_stream),
+ channel);
+ GST_OBJECT_UNLOCK (channel);
+}
+
+static void
+gst_webrtc_data_channel_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec)
+{
+ GstWebRTCDataChannel *channel = GST_WEBRTC_DATA_CHANNEL (object);
+
+ GST_OBJECT_LOCK (channel);
+ switch (prop_id) {
+ case PROP_LABEL:
+ channel->label = g_value_dup_string (value);
+ break;
+ case PROP_ORDERED:
+ channel->ordered = g_value_get_boolean (value);
+ break;
+ case PROP_MAX_PACKET_LIFETIME:
+ channel->max_packet_lifetime = g_value_get_int (value);
+ break;
+ case PROP_MAX_RETRANSMITS:
+ channel->max_retransmits = g_value_get_int (value);
+ break;
+ case PROP_PROTOCOL:
+ channel->protocol = g_value_dup_string (value);
+ break;
+ case PROP_NEGOTIATED:
+ channel->negotiated = g_value_get_boolean (value);
+ break;
+ case PROP_ID:
+ channel->id = g_value_get_int (value);
+ break;
+ case PROP_PRIORITY:
+ channel->priority = g_value_get_enum (value);
+ break;
+ case PROP_BUFFERED_AMOUNT_LOW_THRESHOLD:
+ channel->buffered_amount_low_threshold = g_value_get_uint64 (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+ GST_OBJECT_UNLOCK (channel);
+}
+
+static void
+gst_webrtc_data_channel_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec)
+{
+ GstWebRTCDataChannel *channel = GST_WEBRTC_DATA_CHANNEL (object);
+
+ GST_OBJECT_LOCK (channel);
+ switch (prop_id) {
+ case PROP_LABEL:
+ g_value_set_string (value, channel->label);
+ break;
+ case PROP_ORDERED:
+ g_value_set_boolean (value, channel->ordered);
+ break;
+ case PROP_MAX_PACKET_LIFETIME:
+ g_value_set_int (value, channel->max_packet_lifetime);
+ break;
+ case PROP_MAX_RETRANSMITS:
+ g_value_set_int (value, channel->max_retransmits);
+ break;
+ case PROP_PROTOCOL:
+ g_value_set_string (value, channel->protocol);
+ break;
+ case PROP_NEGOTIATED:
+ g_value_set_boolean (value, channel->negotiated);
+ break;
+ case PROP_ID:
+ g_value_set_int (value, channel->id);
+ break;
+ case PROP_PRIORITY:
+ g_value_set_enum (value, channel->priority);
+ break;
+ case PROP_READY_STATE:
+ g_value_set_enum (value, channel->ready_state);
+ break;
+ case PROP_BUFFERED_AMOUNT:
+ g_value_set_uint64 (value, channel->buffered_amount);
+ break;
+ case PROP_BUFFERED_AMOUNT_LOW_THRESHOLD:
+ g_value_set_uint64 (value, channel->buffered_amount_low_threshold);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+ GST_OBJECT_UNLOCK (channel);
+}
+
+static void
+_emit_low_threshold (GstWebRTCDataChannel * channel, gpointer user_data)
+{
+ GST_LOG_OBJECT (channel, "Low threshold reached");
+ g_signal_emit (channel,
+ gst_webrtc_data_channel_signals[SIGNAL_ON_BUFFERED_AMOUNT_LOW], 0);
+}
+
+static GstPadProbeReturn
+on_appsrc_data (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
+{
+ GstWebRTCDataChannel *channel = user_data;
+ guint64 prev_amount;
+ guint64 size = 0;
+
+ if (GST_PAD_PROBE_INFO_TYPE (info) & (GST_PAD_PROBE_TYPE_BUFFER)) {
+ GstBuffer *buffer = GST_PAD_PROBE_INFO_BUFFER (info);
+ size = gst_buffer_get_size (buffer);
+ } else if (GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
+ GstBufferList *list = GST_PAD_PROBE_INFO_BUFFER_LIST (info);
+ size = gst_buffer_list_calculate_size (list);
+ }
+
+ if (size > 0) {
+ GST_OBJECT_LOCK (channel);
+ prev_amount = channel->buffered_amount;
+ channel->buffered_amount -= size;
+ if (prev_amount > channel->buffered_amount_low_threshold &&
+ channel->buffered_amount < channel->buffered_amount_low_threshold) {
+ _channel_enqueue_task (channel, (ChannelTask) _emit_low_threshold,
+ NULL, NULL);
+ }
+
+ if (channel->ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_CLOSING
+ && channel->buffered_amount <= 0) {
+ _channel_enqueue_task (channel, (ChannelTask) _close_sctp_stream, NULL,
+ NULL);
+ }
+ GST_OBJECT_UNLOCK (channel);
+ }
+
+ return GST_PAD_PROBE_OK;
+}
+
+static void
+gst_webrtc_data_channel_constructed (GObject * object)
+{
+ GstWebRTCDataChannel *channel = GST_WEBRTC_DATA_CHANNEL (object);
+ GstPad *pad;
+ GstCaps *caps;
+
+ caps = gst_caps_new_any ();
+
+ channel->appsrc = gst_element_factory_make ("appsrc", NULL);
+ gst_object_ref_sink (channel->appsrc);
+ pad = gst_element_get_static_pad (channel->appsrc, "src");
+
+ channel->src_probe = gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_DATA_BOTH,
+ (GstPadProbeCallback) on_appsrc_data, channel, NULL);
+
+ channel->appsink = gst_element_factory_make ("appsink", NULL);
+ gst_object_ref_sink (channel->appsink);
+ g_object_set (channel->appsink, "sync", FALSE, "async", FALSE, "caps", caps,
+ NULL);
+ gst_app_sink_set_callbacks (GST_APP_SINK (channel->appsink), &sink_callbacks,
+ channel, NULL);
+
+ gst_object_unref (pad);
+ gst_caps_unref (caps);
+}
+
+static void
+gst_webrtc_data_channel_finalize (GObject * object)
+{
+ GstWebRTCDataChannel *channel = GST_WEBRTC_DATA_CHANNEL (object);
+
+ if (channel->src_probe) {
+ GstPad *pad = gst_element_get_static_pad (channel->appsrc, "src");
+ gst_pad_remove_probe (pad, channel->src_probe);
+ gst_object_unref (pad);
+ channel->src_probe = 0;
+ }
+
+ g_free (channel->label);
+ channel->label = NULL;
+
+ g_free (channel->protocol);
+ channel->protocol = NULL;
+
+ if (channel->sctp_transport)
+ g_signal_handlers_disconnect_by_data (channel->sctp_transport, channel);
+ g_clear_object (&channel->sctp_transport);
+
+ g_clear_object (&channel->appsrc);
+ g_clear_object (&channel->appsink);
+
+ G_OBJECT_CLASS (parent_class)->finalize (object);
+}
+
+static void
+gst_webrtc_data_channel_class_init (GstWebRTCDataChannelClass * klass)
+{
+ GObjectClass *gobject_class = (GObjectClass *) klass;
+
+ gobject_class->constructed = gst_webrtc_data_channel_constructed;
+ gobject_class->get_property = gst_webrtc_data_channel_get_property;
+ gobject_class->set_property = gst_webrtc_data_channel_set_property;
+ gobject_class->finalize = gst_webrtc_data_channel_finalize;
+
+ g_object_class_install_property (gobject_class,
+ PROP_LABEL,
+ g_param_spec_string ("label",
+ "Label", "Data channel label",
+ NULL,
+ G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class,
+ PROP_ORDERED,
+ g_param_spec_boolean ("ordered",
+ "Ordered", "Using ordered transmission mode",
+ FALSE,
+ G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class,
+ PROP_MAX_PACKET_LIFETIME,
+ g_param_spec_int ("max-packet-lifetime",
+ "Maximum Packet Lifetime",
+ "Maximum number of milliseconds that transmissions and "
+ "retransmissions may occur in unreliable mode (-1 = unset)",
+ -1, G_MAXUINT16, -1,
+ G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class,
+ PROP_MAX_RETRANSMITS,
+ g_param_spec_int ("max-retransmits",
+ "Maximum Retransmits",
+ "Maximum number of retransmissions attempted in unreliable mode",
+ -1, G_MAXUINT16, 0,
+ G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class,
+ PROP_PROTOCOL,
+ g_param_spec_string ("protocol",
+ "Protocol", "Data channel protocol",
+ "",
+ G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class,
+ PROP_NEGOTIATED,
+ g_param_spec_boolean ("negotiated",
+ "Negotiated",
+ "Whether this data channel was negotiated by the application", FALSE,
+ G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class,
+ PROP_ID,
+ g_param_spec_int ("id",
+ "ID",
+ "ID negotiated by this data channel (-1 = unset)",
+ -1, G_MAXUINT16, -1,
+ G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class,
+ PROP_PRIORITY,
+ g_param_spec_enum ("priority",
+ "Priority",
+ "The priority of data sent using this data channel",
+ GST_TYPE_WEBRTC_PRIORITY_TYPE,
+ GST_WEBRTC_PRIORITY_TYPE_LOW,
+ G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class,
+ PROP_READY_STATE,
+ g_param_spec_enum ("ready-state",
+ "Ready State",
+ "The Ready state of this data channel",
+ GST_TYPE_WEBRTC_DATA_CHANNEL_STATE,
+ GST_WEBRTC_DATA_CHANNEL_STATE_NEW,
+ G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class,
+ PROP_BUFFERED_AMOUNT,
+ g_param_spec_uint64 ("buffered-amount",
+ "Buffered Amount",
+ "The amount of data in bytes currently buffered",
+ 0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class,
+ PROP_BUFFERED_AMOUNT_LOW_THRESHOLD,
+ g_param_spec_uint64 ("buffered-amount-low-threshold",
+ "Buffered Amount Low Threshold",
+ "The threshold at which the buffered amount is considered low and "
+ "the buffered-amount-low signal is emitted",
+ 0, G_MAXUINT64, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ /**
+ * GstWebRTCDataChannel::on-open:
+ * @object: the #GstWebRTCDataChannel
+ */
+ gst_webrtc_data_channel_signals[SIGNAL_ON_OPEN] =
+ g_signal_new ("on-open", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
+ G_TYPE_NONE, 0);
+
+ /**
+ * GstWebRTCDataChannel::on-close:
+ * @object: the #GstWebRTCDataChannel
+ */
+ gst_webrtc_data_channel_signals[SIGNAL_ON_CLOSE] =
+ g_signal_new ("on-close", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
+ G_TYPE_NONE, 0);
+
+ /**
+ * GstWebRTCDataChannel::on-error:
+ * @object: the #GstWebRTCDataChannel
+ * @error: the #GError thrown
+ */
+ gst_webrtc_data_channel_signals[SIGNAL_ON_ERROR] =
+ g_signal_new ("on-error", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
+ G_TYPE_NONE, 1, G_TYPE_ERROR);
+
+ /**
+ * GstWebRTCDataChannel::on-message-data:
+ * @object: the #GstWebRTCDataChannel
+ * @data: (nullable): a #GBytes of the data received
+ */
+ gst_webrtc_data_channel_signals[SIGNAL_ON_MESSAGE_DATA] =
+ g_signal_new ("on-message-data", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
+ G_TYPE_NONE, 1, G_TYPE_BYTES);
+
+ /**
+ * GstWebRTCDataChannel::on-message-string:
+ * @object: the #GstWebRTCDataChannel
+ * @data: (nullable): the data received as a string
+ */
+ gst_webrtc_data_channel_signals[SIGNAL_ON_MESSAGE_STRING] =
+ g_signal_new ("on-message-string", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
+ G_TYPE_NONE, 1, G_TYPE_STRING);
+
+ /**
+ * GstWebRTCDataChannel::on-buffered-amount-low:
+ * @object: the #GstWebRTCDataChannel
+ */
+ gst_webrtc_data_channel_signals[SIGNAL_ON_BUFFERED_AMOUNT_LOW] =
+ g_signal_new ("on-buffered-amount-low", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
+ G_TYPE_NONE, 0);
+
+ /**
+ * GstWebRTCDataChannel::send-data:
+ * @object: the #GstWebRTCDataChannel
+ * @data: (nullable): a #GBytes with the data
+ */
+ gst_webrtc_data_channel_signals[SIGNAL_SEND_DATA] =
+ g_signal_new_class_handler ("send-data", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
+ G_CALLBACK (gst_webrtc_data_channel_send_data), NULL, NULL,
+ g_cclosure_marshal_generic, G_TYPE_NONE, 1, G_TYPE_BYTES);
+
+ /**
+ * GstWebRTCDataChannel::send-string:
+ * @object: the #GstWebRTCDataChannel
+ * @data: (nullable): a #GBytes with the data
+ */
+ gst_webrtc_data_channel_signals[SIGNAL_SEND_STRING] =
+ g_signal_new_class_handler ("send-string", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
+ G_CALLBACK (gst_webrtc_data_channel_send_string), NULL, NULL,
+ g_cclosure_marshal_generic, G_TYPE_NONE, 1, G_TYPE_STRING);
+
+ /**
+ * GstWebRTCDataChannel::close:
+ * @object: the #GstWebRTCDataChannel
+ *
+ * Close the data channel
+ */
+ gst_webrtc_data_channel_signals[SIGNAL_CLOSE] =
+ g_signal_new_class_handler ("close", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
+ G_CALLBACK (gst_webrtc_data_channel_close), NULL, NULL,
+ g_cclosure_marshal_generic, G_TYPE_NONE, 0);
+}
+
+static void
+gst_webrtc_data_channel_init (GstWebRTCDataChannel * channel)
+{
+}
diff --git a/ext/webrtc/webrtcdatachannel.h b/ext/webrtc/webrtcdatachannel.h
new file mode 100644
index 000000000..769844171
--- /dev/null
+++ b/ext/webrtc/webrtcdatachannel.h
@@ -0,0 +1,83 @@
+/* GStreamer
+ * Copyright (C) 2018 Matthew Waters <matthew@centricular.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef __GST_WEBRTC_DATA_CHANNEL_H__
+#define __GST_WEBRTC_DATA_CHANNEL_H__
+
+#include <gst/gst.h>
+#include <gst/webrtc/webrtc_fwd.h>
+#include <gst/webrtc/dtlstransport.h>
+#include "sctptransport.h"
+
+G_BEGIN_DECLS
+
+GST_WEBRTC_API
+GType gst_webrtc_data_channel_get_type(void);
+#define GST_TYPE_WEBRTC_DATA_CHANNEL (gst_webrtc_data_channel_get_type())
+#define GST_WEBRTC_DATA_CHANNEL(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_WEBRTC_DATA_CHANNEL,GstWebRTCDataChannel))
+#define GST_IS_WEBRTC_DATA_CHANNEL(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_WEBRTC_DATA_CHANNEL))
+#define GST_WEBRTC_DATA_CHANNEL_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass) ,GST_TYPE_WEBRTC_DATA_CHANNEL,GstWebRTCDataChannelClass))
+#define GST_IS_WEBRTC_DATA_CHANNEL_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass) ,GST_TYPE_WEBRTC_DATA_CHANNEL))
+#define GST_WEBRTC_DATA_CHANNEL_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS((obj) ,GST_TYPE_WEBRTC_DATA_CHANNEL,GstWebRTCDataChannelClass))
+
+typedef struct _GstWebRTCDataChannel GstWebRTCDataChannel;
+typedef struct _GstWebRTCDataChannelClass GstWebRTCDataChannelClass;
+
+struct _GstWebRTCDataChannel
+{
+ GstObject parent;
+
+ GstWebRTCSCTPTransport *sctp_transport;
+ GstElement *appsrc;
+ GstElement *appsink;
+
+ gchar *label;
+ gboolean ordered;
+ guint max_packet_lifetime;
+ guint max_retransmits;
+ gchar *protocol;
+ gboolean negotiated;
+ gint id;
+ GstWebRTCPriorityType priority;
+ GstWebRTCDataChannelState ready_state;
+ guint64 buffered_amount;
+ guint64 buffered_amount_low_threshold;
+
+ GstWebRTCBin *webrtcbin;
+ gboolean opened;
+ gulong src_probe;
+ GError *stored_error;
+
+ gpointer _padding[GST_PADDING];
+};
+
+struct _GstWebRTCDataChannelClass
+{
+ GstObjectClass parent_class;
+
+ gpointer _padding[GST_PADDING];
+};
+
+void gst_webrtc_data_channel_start_negotiation (GstWebRTCDataChannel *channel);
+void gst_webrtc_data_channel_set_sctp_transport (GstWebRTCDataChannel *channel,
+ GstWebRTCSCTPTransport *sctp);
+
+G_END_DECLS
+
+#endif /* __GST_WEBRTC_DATA_CHANNEL_H__ */
diff --git a/ext/webrtc/webrtcsdp.c b/ext/webrtc/webrtcsdp.c
index 5584d9bd1..042a34282 100644
--- a/ext/webrtc/webrtcsdp.c
+++ b/ext/webrtc/webrtcsdp.c
@@ -714,3 +714,38 @@ _generate_ice_credentials (gchar ** ufrag, gchar ** password)
ice_credential_chars[g_random_int_range (0,
strlen (ice_credential_chars))];
}
+
+int
+_get_sctp_port_from_media (const GstSDPMedia * media)
+{
+ int sctpmap = -1, i;
+
+ for (i = 0; i < gst_sdp_media_attributes_len (media); i++) {
+ const GstSDPAttribute *attr = gst_sdp_media_get_attribute (media, i);
+
+ if (g_strcmp0 (attr->key, "sctp-port") == 0) {
+ return atoi (attr->value);
+ } else if (g_strcmp0 (attr->key, "sctpmap") == 0) {
+ sctpmap = atoi (attr->value);
+ }
+ }
+
+ if (sctpmap >= 0)
+ GST_LOG ("no sctp-port attribute in media");
+ return sctpmap;
+}
+
+guint64
+_get_sctp_max_message_size_from_media (const GstSDPMedia * media)
+{
+ int i;
+
+ for (i = 0; i < gst_sdp_media_attributes_len (media); i++) {
+ const GstSDPAttribute *attr = gst_sdp_media_get_attribute (media, i);
+
+ if (g_strcmp0 (attr->key, "max-message-size") == 0)
+ return atoi (attr->value);
+ }
+
+ return 65536;
+}
diff --git a/ext/webrtc/webrtcsdp.h b/ext/webrtc/webrtcsdp.h
index 779dcc276..d5ea777b3 100644
--- a/ext/webrtc/webrtcsdp.h
+++ b/ext/webrtc/webrtcsdp.h
@@ -76,5 +76,9 @@ G_GNUC_INTERNAL
gboolean _media_has_attribute_key (const GstSDPMedia * media,
const gchar * key);
+G_GNUC_INTERNAL
+int _get_sctp_port_from_media (const GstSDPMedia * media);
+G_GNUC_INTERNAL
+guint64 _get_sctp_max_message_size_from_media (const GstSDPMedia * media);
#endif /* __WEBRTC_UTILS_H__ */
diff --git a/gst-libs/gst/webrtc/webrtc_fwd.h b/gst-libs/gst/webrtc/webrtc_fwd.h
index be0a3c334..2e50125f5 100644
--- a/gst-libs/gst/webrtc/webrtc_fwd.h
+++ b/gst-libs/gst/webrtc/webrtc_fwd.h
@@ -264,4 +264,57 @@ typedef enum /*< underscore_name=gst_webrtc_fec_type >*/
GST_WEBRTC_FEC_TYPE_ULP_RED,
} GstWebRTCFECType;
+/**
+ * GstWebRTCSCTPTransportState:
+ * GST_WEBRTC_SCTP_TRANSPORT_STATE_NEW: new
+ * GST_WEBRTC_SCTP_TRANSPORT_STATE_CONNECTING: connecting
+ * GST_WEBRTC_SCTP_TRANSPORT_STATE_CONNECTED: connected
+ * GST_WEBRTC_SCTP_TRANSPORT_STATE_CLOSED: closed
+ *
+ * See <ulink url="http://w3c.github.io/webrtc-pc/#dom-rtcsctptransportstate">http://w3c.github.io/webrtc-pc/#dom-rtcsctptransportstate</ulink>
+ */
+typedef enum /*< underscore_name=gst_webrtc_sctp_transport_state >*/
+{
+ GST_WEBRTC_SCTP_TRANSPORT_STATE_NEW,
+ GST_WEBRTC_SCTP_TRANSPORT_STATE_CONNECTING,
+ GST_WEBRTC_SCTP_TRANSPORT_STATE_CONNECTED,
+ GST_WEBRTC_SCTP_TRANSPORT_STATE_CLOSED,
+} GstWebRTCSCTPTransportState;
+
+/**
+ * GstWebRTCPriorityType:
+ * GST_WEBRTC_PRIORITY_TYPE_VERY_LOW: very-low
+ * GST_WEBRTC_PRIORITY_TYPE_LOW: low
+ * GST_WEBRTC_PRIORITY_TYPE_MEDIUM: medium
+ * GST_WEBRTC_PRIORITY_TYPE_HIGH: high
+ *
+ * See <ulink url="http://w3c.github.io/webrtc-pc/#dom-rtcprioritytype">http://w3c.github.io/webrtc-pc/#dom-rtcprioritytype</ulink>
+ */
+typedef enum /*< underscore_name=gst_webrtc_priority_type >*/
+{
+ GST_WEBRTC_PRIORITY_TYPE_VERY_LOW = 1,
+ GST_WEBRTC_PRIORITY_TYPE_LOW,
+ GST_WEBRTC_PRIORITY_TYPE_MEDIUM,
+ GST_WEBRTC_PRIORITY_TYPE_HIGH,
+} GstWebRTCPriorityType;
+
+/**
+ * GstWebRTCDataChannelState:
+ * GST_WEBRTC_DATA_CHANNEL_STATE_NEW: new
+ * GST_WEBRTC_DATA_CHANNEL_STATE_CONNECTING: connection
+ * GST_WEBRTC_DATA_CHANNEL_STATE_OPEN: open
+ * GST_WEBRTC_DATA_CHANNEL_STATE_CLOSING: closing
+ * GST_WEBRTC_DATA_CHANNEL_STATE_CLOSED: closed
+ *
+ * See <ulink url="http://w3c.github.io/webrtc-pc/#dom-rtcdatachannelstate">http://w3c.github.io/webrtc-pc/#dom-rtcdatachannelstate</ulink>
+ */
+typedef enum /*< underscore_name=gst_webrtc_data_channel_state >*/
+{
+ GST_WEBRTC_DATA_CHANNEL_STATE_NEW,
+ GST_WEBRTC_DATA_CHANNEL_STATE_CONNECTING,
+ GST_WEBRTC_DATA_CHANNEL_STATE_OPEN,
+ GST_WEBRTC_DATA_CHANNEL_STATE_CLOSING,
+ GST_WEBRTC_DATA_CHANNEL_STATE_CLOSED,
+} GstWebRTCDataChannelState;
+
#endif /* __GST_WEBRTC_FWD_H__ */
diff --git a/tests/check/elements/webrtcbin.c b/tests/check/elements/webrtcbin.c
index 6f11914c3..2c3b61157 100644
--- a/tests/check/elements/webrtcbin.c
+++ b/tests/check/elements/webrtcbin.c
@@ -86,6 +86,12 @@ struct test_webrtc
GstElement * element,
GstPromise * promise,
gpointer user_data);
+ gpointer data_channel_data;
+ GDestroyNotify data_channel_notify;
+ void (*on_data_channel) (struct test_webrtc * t,
+ GstElement * element,
+ GObject *data_channel,
+ gpointer user_data);
gpointer answer_data;
GDestroyNotify answer_notify;
void (*on_pad_added) (struct test_webrtc * t,
@@ -291,6 +297,16 @@ _on_pad_added (GstElement * webrtc, GstPad * new_pad, struct test_webrtc *t)
}
static void
+_on_data_channel (GstElement * webrtc, GObject * data_channel,
+ struct test_webrtc *t)
+{
+ g_mutex_lock (&t->lock);
+ if (t->on_data_channel)
+ t->on_data_channel (t, webrtc, data_channel, t->data_channel_data);
+ g_mutex_unlock (&t->lock);
+}
+
+static void
_pad_added_not_reached (struct test_webrtc *t, GstElement * element,
GstPad * pad, gpointer user_data)
{
@@ -333,6 +349,13 @@ _offer_answer_not_reached (struct test_webrtc *t, GstElement * element,
}
static void
+_on_data_channel_not_reached (struct test_webrtc *t, GstElement * element,
+ GObject * data_channel, gpointer user_data)
+{
+ g_assert_not_reached ();
+}
+
+static void
_broadcast (struct test_webrtc *t)
{
g_mutex_lock (&t->lock);
@@ -387,6 +410,7 @@ test_webrtc_new (void)
ret->on_pad_added = _pad_added_not_reached;
ret->on_offer_created = _offer_answer_not_reached;
ret->on_answer_created = _offer_answer_not_reached;
+ ret->on_data_channel = _on_data_channel_not_reached;
ret->bus_message = _bus_no_errors;
g_mutex_init (&ret->lock);
@@ -415,6 +439,10 @@ test_webrtc_new (void)
G_CALLBACK (_on_ice_candidate), ret);
g_signal_connect (ret->webrtc2, "on-ice-candidate",
G_CALLBACK (_on_ice_candidate), ret);
+ g_signal_connect (ret->webrtc1, "on-data-channel",
+ G_CALLBACK (_on_data_channel), ret);
+ g_signal_connect (ret->webrtc2, "on-data-channel",
+ G_CALLBACK (_on_data_channel), ret);
g_signal_connect (ret->webrtc1, "pad-added", G_CALLBACK (_on_pad_added), ret);
g_signal_connect (ret->webrtc2, "pad-added", G_CALLBACK (_on_pad_added), ret);
g_signal_connect_swapped (ret->webrtc1, "notify::ice-gathering-state",
@@ -475,6 +503,8 @@ test_webrtc_free (struct test_webrtc *t)
t->answer_notify (t->answer_data);
if (t->pad_added_notify)
t->pad_added_notify (t->pad_added_data);
+ if (t->data_channel_notify)
+ t->data_channel_notify (t->data_channel_data);
fail_unless_equals_int (GST_STATE_CHANGE_SUCCESS,
gst_element_set_state (t->webrtc1, GST_STATE_NULL));
@@ -523,11 +553,17 @@ test_webrtc_wait_for_answer_error_eos (struct test_webrtc *t)
}
static void
-test_webrtc_signal_state (struct test_webrtc *t, TestState state)
+test_webrtc_signal_state_unlocked (struct test_webrtc *t, TestState state)
{
- g_mutex_lock (&t->lock);
t->state = state;
g_cond_broadcast (&t->cond);
+}
+
+static void
+test_webrtc_signal_state (struct test_webrtc *t, TestState state)
+{
+ g_mutex_lock (&t->lock);
+ test_webrtc_signal_state_unlocked (t, state);
g_mutex_unlock (&t->lock);
}
@@ -1403,6 +1439,459 @@ GST_START_TEST (test_recvonly_sendonly)
GST_END_TEST;
+static gboolean
+_message_media_is_datachannel (const GstSDPMessage * msg, guint media_id)
+{
+ const GstSDPMedia *media;
+
+ if (!msg)
+ return FALSE;
+
+ if (gst_sdp_message_medias_len (msg) <= media_id)
+ return FALSE;
+
+ media = gst_sdp_message_get_media (msg, media_id);
+
+ if (g_strcmp0 (gst_sdp_media_get_media (media), "application") != 0)
+ return FALSE;
+
+ if (gst_sdp_media_formats_len (media) != 1)
+ return FALSE;
+
+ if (g_strcmp0 (gst_sdp_media_get_format (media, 0),
+ "webrtc-datachannel") != 0)
+ return FALSE;
+
+ return TRUE;
+}
+
+static void
+on_sdp_has_datachannel (struct test_webrtc *t, GstElement * element,
+ GstWebRTCSessionDescription * desc, gpointer user_data)
+{
+ gboolean have_data_channel = FALSE;
+ int i;
+
+ for (i = 0; i < gst_sdp_message_medias_len (desc->sdp); i++) {
+ if (_message_media_is_datachannel (desc->sdp, i)) {
+ /* there should only be one data channel m= section */
+ fail_unless_equals_int (FALSE, have_data_channel);
+ have_data_channel = TRUE;
+ }
+ }
+
+ fail_unless_equals_int (TRUE, have_data_channel);
+}
+
+static void
+on_channel_error_not_reached (GObject * channel, GError * error,
+ gpointer user_data)
+{
+ g_assert_not_reached ();
+}
+
+GST_START_TEST (test_data_channel_create)
+{
+ struct test_webrtc *t = test_webrtc_new ();
+ GObject *channel = NULL;
+ struct validate_sdp offer = { on_sdp_has_datachannel, NULL };
+ struct validate_sdp answer = { on_sdp_has_datachannel, NULL };
+ gchar *label;
+
+ t->on_negotiation_needed = NULL;
+ t->offer_data = &offer;
+ t->on_offer_created = validate_sdp;
+ t->answer_data = &answer;
+ t->on_answer_created = validate_sdp;
+ t->on_ice_candidate = NULL;
+
+ g_signal_emit_by_name (t->webrtc1, "create-data-channel", "label", NULL,
+ &channel);
+ g_assert_nonnull (channel);
+ g_object_get (channel, "label", &label, NULL);
+ g_assert_cmpstr (label, ==, "label");
+ g_signal_connect (channel, "on-error",
+ G_CALLBACK (on_channel_error_not_reached), NULL);
+
+ test_webrtc_create_offer (t, t->webrtc1);
+
+ test_webrtc_wait_for_answer_error_eos (t);
+ fail_unless_equals_int (STATE_ANSWER_CREATED, t->state);
+ g_object_unref (channel);
+ g_free (label);
+ test_webrtc_free (t);
+}
+
+GST_END_TEST;
+
+static void
+have_data_channel (struct test_webrtc *t, GstElement * element,
+ GObject * our, gpointer user_data)
+{
+ GObject *other = user_data;
+ gchar *our_label, *other_label;
+
+ g_signal_connect (our, "on-error", G_CALLBACK (on_channel_error_not_reached),
+ NULL);
+
+ g_object_get (our, "label", &our_label, NULL);
+ g_object_get (other, "label", &other_label, NULL);
+
+ g_assert_cmpstr (our_label, ==, other_label);
+
+ g_free (our_label);
+ g_free (other_label);
+
+ test_webrtc_signal_state_unlocked (t, STATE_CUSTOM);
+}
+
+GST_START_TEST (test_data_channel_remote_notify)
+{
+ struct test_webrtc *t = test_webrtc_new ();
+ GObject *channel = NULL;
+ struct validate_sdp offer = { on_sdp_has_datachannel, NULL };
+ struct validate_sdp answer = { on_sdp_has_datachannel, NULL };
+
+ t->on_negotiation_needed = NULL;
+ t->offer_data = &offer;
+ t->on_offer_created = validate_sdp;
+ t->answer_data = &answer;
+ t->on_answer_created = validate_sdp;
+ t->on_ice_candidate = NULL;
+ t->on_data_channel = have_data_channel;
+
+ g_signal_emit_by_name (t->webrtc1, "create-data-channel", "label", NULL,
+ &channel);
+ g_assert_nonnull (channel);
+ t->data_channel_data = channel;
+ g_signal_connect (channel, "on-error",
+ G_CALLBACK (on_channel_error_not_reached), NULL);
+
+ gst_element_set_state (t->webrtc1, GST_STATE_PLAYING);
+ gst_element_set_state (t->webrtc2, GST_STATE_PLAYING);
+
+ test_webrtc_create_offer (t, t->webrtc1);
+
+ test_webrtc_wait_for_state_mask (t, 1 << STATE_CUSTOM);
+
+ g_object_unref (channel);
+ test_webrtc_free (t);
+}
+
+GST_END_TEST;
+
+static const gchar *test_string = "GStreamer WebRTC is awesome!";
+
+static void
+on_message_string (GObject * channel, const gchar * str, struct test_webrtc *t)
+{
+ gchar *expected = g_object_steal_data (channel, "expected");
+ g_assert_cmpstr (expected, ==, str);
+ g_free (expected);
+
+ test_webrtc_signal_state (t, STATE_CUSTOM);
+}
+
+static void
+have_data_channel_transfer_string (struct test_webrtc *t, GstElement * element,
+ GObject * our, gpointer user_data)
+{
+ GObject *other = user_data;
+ GstWebRTCDataChannelState state;
+
+ g_object_get (our, "ready-state", &state, NULL);
+ fail_unless_equals_int (GST_WEBRTC_DATA_CHANNEL_STATE_OPEN, state);
+ g_object_get (other, "ready-state", &state, NULL);
+ fail_unless_equals_int (GST_WEBRTC_DATA_CHANNEL_STATE_OPEN, state);
+
+ g_object_set_data_full (our, "expected", g_strdup (test_string), g_free);
+ g_signal_connect (our, "on-message-string", G_CALLBACK (on_message_string),
+ t);
+
+ g_signal_connect (other, "on-error",
+ G_CALLBACK (on_channel_error_not_reached), NULL);
+ g_signal_emit_by_name (other, "send-string", test_string);
+}
+
+GST_START_TEST (test_data_channel_transfer_string)
+{
+ struct test_webrtc *t = test_webrtc_new ();
+ GObject *channel = NULL;
+ struct validate_sdp offer = { on_sdp_has_datachannel, NULL };
+ struct validate_sdp answer = { on_sdp_has_datachannel, NULL };
+
+ t->on_negotiation_needed = NULL;
+ t->offer_data = &offer;
+ t->on_offer_created = validate_sdp;
+ t->answer_data = &answer;
+ t->on_answer_created = validate_sdp;
+ t->on_ice_candidate = NULL;
+ t->on_data_channel = have_data_channel_transfer_string;
+
+ g_signal_emit_by_name (t->webrtc1, "create-data-channel", "label", NULL,
+ &channel);
+ g_assert_nonnull (channel);
+ t->data_channel_data = channel;
+ g_signal_connect (channel, "on-error",
+ G_CALLBACK (on_channel_error_not_reached), NULL);
+
+ gst_element_set_state (t->webrtc1, GST_STATE_PLAYING);
+ gst_element_set_state (t->webrtc2, GST_STATE_PLAYING);
+
+ test_webrtc_create_offer (t, t->webrtc1);
+
+ test_webrtc_wait_for_state_mask (t, 1 << STATE_CUSTOM);
+
+ g_object_unref (channel);
+ test_webrtc_free (t);
+}
+
+GST_END_TEST;
+
+#define g_assert_cmpbytes(b1, b2) \
+ G_STMT_START { \
+ gsize l1, l2; \
+ const guint8 *d1 = g_bytes_get_data (b1, &l1); \
+ const guint8 *d2 = g_bytes_get_data (b2, &l2); \
+ g_assert_cmpmem (d1, l1, d2, l2); \
+ } G_STMT_END;
+
+static void
+on_message_data (GObject * channel, GBytes * data, struct test_webrtc *t)
+{
+ GBytes *expected = g_object_steal_data (channel, "expected");
+ g_assert_cmpbytes (data, expected);
+ g_bytes_unref (expected);
+
+ test_webrtc_signal_state (t, STATE_CUSTOM);
+}
+
+static void
+have_data_channel_transfer_data (struct test_webrtc *t, GstElement * element,
+ GObject * our, gpointer user_data)
+{
+ GObject *other = user_data;
+ GBytes *data = g_bytes_new_static (test_string, strlen (test_string));
+ GstWebRTCDataChannelState state;
+
+ g_object_get (our, "ready-state", &state, NULL);
+ fail_unless_equals_int (GST_WEBRTC_DATA_CHANNEL_STATE_OPEN, state);
+ g_object_get (other, "ready-state", &state, NULL);
+ fail_unless_equals_int (GST_WEBRTC_DATA_CHANNEL_STATE_OPEN, state);
+
+ g_object_set_data_full (our, "expected", g_bytes_ref (data),
+ (GDestroyNotify) g_bytes_unref);
+ g_signal_connect (our, "on-message-data", G_CALLBACK (on_message_data), t);
+
+ g_signal_connect (other, "on-error",
+ G_CALLBACK (on_channel_error_not_reached), NULL);
+ g_signal_emit_by_name (other, "send-data", data);
+}
+
+GST_START_TEST (test_data_channel_transfer_data)
+{
+ struct test_webrtc *t = test_webrtc_new ();
+ GObject *channel = NULL;
+ struct validate_sdp offer = { on_sdp_has_datachannel, NULL };
+ struct validate_sdp answer = { on_sdp_has_datachannel, NULL };
+
+ t->on_negotiation_needed = NULL;
+ t->offer_data = &offer;
+ t->on_offer_created = validate_sdp;
+ t->answer_data = &answer;
+ t->on_answer_created = validate_sdp;
+ t->on_ice_candidate = NULL;
+ t->on_data_channel = have_data_channel_transfer_data;
+
+ g_signal_emit_by_name (t->webrtc1, "create-data-channel", "label", NULL,
+ &channel);
+ g_assert_nonnull (channel);
+ t->data_channel_data = channel;
+ g_signal_connect (channel, "on-error",
+ G_CALLBACK (on_channel_error_not_reached), NULL);
+
+ gst_element_set_state (t->webrtc1, GST_STATE_PLAYING);
+ gst_element_set_state (t->webrtc2, GST_STATE_PLAYING);
+
+ test_webrtc_create_offer (t, t->webrtc1);
+
+ test_webrtc_wait_for_state_mask (t, 1 << STATE_CUSTOM);
+
+ g_object_unref (channel);
+ test_webrtc_free (t);
+}
+
+GST_END_TEST;
+
+static void
+have_data_channel_create_data_channel (struct test_webrtc *t,
+ GstElement * element, GObject * our, gpointer user_data)
+{
+ GObject *another;
+
+ t->on_data_channel = have_data_channel_transfer_string;
+
+ g_signal_emit_by_name (t->webrtc1, "create-data-channel", "label", NULL,
+ &another);
+ g_assert_nonnull (another);
+ t->data_channel_data = another;
+ g_signal_connect (another, "on-error",
+ G_CALLBACK (on_channel_error_not_reached), NULL);
+}
+
+GST_START_TEST (test_data_channel_create_after_negotiate)
+{
+ struct test_webrtc *t = test_webrtc_new ();
+ GObject *channel = NULL;
+ struct validate_sdp offer = { on_sdp_has_datachannel, NULL };
+ struct validate_sdp answer = { on_sdp_has_datachannel, NULL };
+
+ t->on_negotiation_needed = NULL;
+ t->offer_data = &offer;
+ t->on_offer_created = validate_sdp;
+ t->answer_data = &answer;
+ t->on_answer_created = validate_sdp;
+ t->on_ice_candidate = NULL;
+ t->on_data_channel = have_data_channel_create_data_channel;
+
+ g_signal_emit_by_name (t->webrtc1, "create-data-channel", "prev-label", NULL,
+ &channel);
+ g_assert_nonnull (channel);
+ t->data_channel_data = channel;
+ g_signal_connect (channel, "on-error",
+ G_CALLBACK (on_channel_error_not_reached), NULL);
+
+ gst_element_set_state (t->webrtc1, GST_STATE_PLAYING);
+ gst_element_set_state (t->webrtc2, GST_STATE_PLAYING);
+
+ test_webrtc_create_offer (t, t->webrtc1);
+
+ test_webrtc_wait_for_state_mask (t, 1 << STATE_CUSTOM);
+
+ g_object_unref (channel);
+ test_webrtc_free (t);
+}
+
+GST_END_TEST;
+
+static void
+on_buffered_amount_low_emitted (GObject * channel, struct test_webrtc *t)
+{
+ test_webrtc_signal_state (t, STATE_CUSTOM);
+}
+
+static void
+have_data_channel_check_low_threshold_emitted (struct test_webrtc *t,
+ GstElement * element, GObject * our, gpointer user_data)
+{
+ g_signal_connect (our, "on-buffered-amount-low",
+ G_CALLBACK (on_buffered_amount_low_emitted), t);
+ g_object_set (our, "buffered-amount-low-threshold", 1, NULL);
+
+ g_signal_connect (our, "on-error", G_CALLBACK (on_channel_error_not_reached),
+ NULL);
+ g_signal_emit_by_name (our, "send-string", "DATA");
+}
+
+GST_START_TEST (test_data_channel_low_threshold)
+{
+ struct test_webrtc *t = test_webrtc_new ();
+ GObject *channel = NULL;
+ struct validate_sdp offer = { on_sdp_has_datachannel, NULL };
+ struct validate_sdp answer = { on_sdp_has_datachannel, NULL };
+
+ t->on_negotiation_needed = NULL;
+ t->offer_data = &offer;
+ t->on_offer_created = validate_sdp;
+ t->answer_data = &answer;
+ t->on_answer_created = validate_sdp;
+ t->on_ice_candidate = NULL;
+ t->on_data_channel = have_data_channel_check_low_threshold_emitted;
+
+ g_signal_emit_by_name (t->webrtc1, "create-data-channel", "label", NULL,
+ &channel);
+ g_assert_nonnull (channel);
+ t->data_channel_data = channel;
+ g_signal_connect (channel, "on-error",
+ G_CALLBACK (on_channel_error_not_reached), NULL);
+
+ gst_element_set_state (t->webrtc1, GST_STATE_PLAYING);
+ gst_element_set_state (t->webrtc2, GST_STATE_PLAYING);
+
+ test_webrtc_create_offer (t, t->webrtc1);
+
+ test_webrtc_wait_for_state_mask (t, 1 << STATE_CUSTOM);
+
+ g_object_unref (channel);
+ test_webrtc_free (t);
+}
+
+GST_END_TEST;
+
+static void
+on_channel_error (GObject * channel, GError * error, struct test_webrtc *t)
+{
+ g_assert_nonnull (error);
+
+ test_webrtc_signal_state (t, STATE_CUSTOM);
+}
+
+static void
+have_data_channel_transfer_large_data (struct test_webrtc *t,
+ GstElement * element, GObject * our, gpointer user_data)
+{
+ GObject *other = user_data;
+ const gsize size = 1024 * 1024;
+ guint8 *random_data = g_new (guint8, size);
+ GBytes *data;
+ gsize i;
+
+ for (i = 0; i < size; i++)
+ random_data[i] = (guint8) (i & 0xff);
+
+ data = g_bytes_new_static (random_data, size);
+
+ g_object_set_data_full (our, "expected", g_bytes_ref (data),
+ (GDestroyNotify) g_bytes_unref);
+ g_signal_connect (our, "on-message-data", G_CALLBACK (on_message_data), t);
+
+ g_signal_connect (other, "on-error", G_CALLBACK (on_channel_error), t);
+ g_signal_emit_by_name (other, "send-data", data);
+}
+
+GST_START_TEST (test_data_channel_max_message_size)
+{
+ struct test_webrtc *t = test_webrtc_new ();
+ GObject *channel = NULL;
+ struct validate_sdp offer = { on_sdp_has_datachannel, NULL };
+ struct validate_sdp answer = { on_sdp_has_datachannel, NULL };
+
+ t->on_negotiation_needed = NULL;
+ t->offer_data = &offer;
+ t->on_offer_created = validate_sdp;
+ t->answer_data = &answer;
+ t->on_answer_created = validate_sdp;
+ t->on_ice_candidate = NULL;
+ t->on_data_channel = have_data_channel_transfer_large_data;
+
+ g_signal_emit_by_name (t->webrtc1, "create-data-channel", "label", NULL,
+ &channel);
+ g_assert_nonnull (channel);
+ t->data_channel_data = channel;
+
+ gst_element_set_state (t->webrtc1, GST_STATE_PLAYING);
+ gst_element_set_state (t->webrtc2, GST_STATE_PLAYING);
+
+ test_webrtc_create_offer (t, t->webrtc1);
+
+ test_webrtc_wait_for_state_mask (t, 1 << STATE_CUSTOM);
+
+ g_object_unref (channel);
+ test_webrtc_free (t);
+}
+
+GST_END_TEST;
+
static Suite *
webrtcbin_suite (void)
{
@@ -1429,6 +1918,13 @@ webrtcbin_suite (void)
tcase_add_test (tc, test_add_recvonly_transceiver);
tcase_add_test (tc, test_recvonly_sendonly);
tcase_add_test (tc, test_payload_types);
+ tcase_add_test (tc, test_data_channel_create);
+ tcase_add_test (tc, test_data_channel_remote_notify);
+ tcase_add_test (tc, test_data_channel_transfer_string);
+ tcase_add_test (tc, test_data_channel_transfer_data);
+ tcase_add_test (tc, test_data_channel_create_after_negotiate);
+ tcase_add_test (tc, test_data_channel_low_threshold);
+ tcase_add_test (tc, test_data_channel_max_message_size);
}
if (nicesrc)