From 373efd364d81bdeaa1ccd7f4c6b009afffc5df92 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 1 Nov 2012 15:54:58 +0000 Subject: gstpay: rewrite payloader Use adapter to assmeble the payload and make a flush function to turn this payload into (fragmented) packets. --- gst/rtp/gstrtpgstpay.c | 270 +++++++++++++++++++++++-------------------------- gst/rtp/gstrtpgstpay.h | 6 +- 2 files changed, 130 insertions(+), 146 deletions(-) diff --git a/gst/rtp/gstrtpgstpay.c b/gst/rtp/gstrtpgstpay.c index 7b94a8e7b..8405360b2 100644 --- a/gst/rtp/gstrtpgstpay.c +++ b/gst/rtp/gstrtpgstpay.c @@ -117,6 +117,7 @@ gst_rtp_gst_pay_class_init (GstRtpGSTPayClass * klass) static void gst_rtp_gst_pay_init (GstRtpGSTPay * rtpgstpay, GstRtpGSTPayClass * klass) { + rtpgstpay->adapter = gst_adapter_new (); } static void @@ -126,34 +127,142 @@ gst_rtp_gst_pay_finalize (GObject * obj) rtpgstpay = GST_RTP_GST_PAY (obj); - g_free (rtpgstpay->capsstr); + g_object_unref (rtpgstpay->adapter); G_OBJECT_CLASS (parent_class)->finalize (obj); } +static GstFlowReturn +gst_rtp_gst_pay_flush (GstRtpGSTPay * rtpgstpay, GstClockTime timestamp) +{ + GstFlowReturn ret; + guint avail; + guint frag_offset; + + frag_offset = 0; + avail = gst_adapter_available (rtpgstpay->adapter); + + while (avail) { + guint towrite; + guint8 *payload; + guint payload_len; + guint packet_len; + GstBuffer *outbuf; + + /* this will be the total lenght of the packet */ + packet_len = gst_rtp_buffer_calc_packet_len (8 + avail, 0, 0); + + /* fill one MTU or all available bytes */ + towrite = MIN (packet_len, GST_BASE_RTP_PAYLOAD_MTU (rtpgstpay)); + + /* this is the payload length */ + payload_len = gst_rtp_buffer_calc_payload_len (towrite, 0, 0); + + /* create buffer to hold the payload */ + outbuf = gst_rtp_buffer_new_allocate (payload_len, 0, 0); + payload = gst_rtp_buffer_get_payload (outbuf); + + GST_DEBUG_OBJECT (rtpgstpay, "new packet len %u, frag %u", packet_len, + frag_offset); + + /* + * 0 1 2 3 + * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + * |C| CV |D|X|Y|Z| MBZ | + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + * | Frag_offset | + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + */ + payload[0] = rtpgstpay->flags; + payload[1] = payload[2] = payload[3] = 0; + payload[4] = frag_offset >> 24; + payload[5] = frag_offset >> 16; + payload[6] = frag_offset >> 8; + payload[7] = frag_offset & 0xff; + + payload += 8; + payload_len -= 8; + + GST_DEBUG_OBJECT (rtpgstpay, "copy %u bytes from adapter", payload_len); + + gst_adapter_copy (rtpgstpay->adapter, payload, 0, payload_len); + gst_adapter_flush (rtpgstpay->adapter, payload_len); + + frag_offset += payload_len; + avail -= payload_len; + + if (avail == 0) + gst_rtp_buffer_set_marker (outbuf, TRUE); + + GST_BUFFER_TIMESTAMP (outbuf) = timestamp; + + ret = gst_basertppayload_push (GST_BASE_RTP_PAYLOAD (rtpgstpay), outbuf); + if (ret != GST_FLOW_OK) + goto push_failed; + } + rtpgstpay->flags &= 0x70; + + return GST_FLOW_OK; + + /* ERRORS */ +push_failed: + { + GST_DEBUG_OBJECT (rtpgstpay, "push failed %d (%s)", ret, + gst_flow_get_name (ret)); + gst_adapter_clear (rtpgstpay->adapter); + rtpgstpay->flags &= 0x70; + return ret; + } +} + static gboolean gst_rtp_gst_pay_setcaps (GstBaseRTPPayload * payload, GstCaps * caps) { GstRtpGSTPay *rtpgstpay; gboolean res; - gchar *capsenc, *capsver; + gchar *capsstr, *capsenc, *capsver; + guint capslen, capslen_prefix_len; + guint8 *ptr; + GstBuffer *outbuf; rtpgstpay = GST_RTP_GST_PAY (payload); - g_free (rtpgstpay->capsstr); - rtpgstpay->capsstr = gst_caps_to_string (caps); - rtpgstpay->capslen = strlen (rtpgstpay->capsstr); + capsstr = gst_caps_to_string (caps); + capslen = strlen (capsstr); + rtpgstpay->current_CV = rtpgstpay->next_CV; /* encode without 0 byte */ - capsenc = g_base64_encode ((guchar *) rtpgstpay->capsstr, rtpgstpay->capslen); - GST_DEBUG_OBJECT (payload, "caps=%s, caps(base64)=%s", - rtpgstpay->capsstr, capsenc); + capsenc = g_base64_encode ((guchar *) capsstr, capslen); + GST_DEBUG_OBJECT (payload, "caps=%s, caps(base64)=%s", capsstr, capsenc); /* for 0 byte */ - rtpgstpay->capslen++; + capslen++; - capsver = g_strdup_printf ("%d", rtpgstpay->current_CV); + /* start of buffer, calculate length */ + capslen_prefix_len = 1; + while (capslen >> (7 * capslen_prefix_len)) + capslen_prefix_len++; + + outbuf = gst_buffer_new_and_alloc (capslen + capslen_prefix_len); + ptr = GST_BUFFER_DATA (outbuf); + + /* write caps length */ + while (capslen_prefix_len) { + capslen_prefix_len--; + *ptr++ = ((capslen_prefix_len > 0) ? 0x80 : 0) | + ((capslen >> (7 * capslen_prefix_len)) & 0x7f); + } + memcpy (ptr, capsstr, capslen); + g_free (capsstr); + + /* store in adapter, we don't flush yet, buffer will follow */ + rtpgstpay->flags = (1 << 7) | (rtpgstpay->current_CV << 4); + rtpgstpay->next_CV = (rtpgstpay->next_CV + 1) & 0x7; + gst_adapter_push (rtpgstpay->adapter, outbuf); + /* make caps for SDP */ + capsver = g_strdup_printf ("%d", rtpgstpay->current_CV); gst_basertppayload_set_options (payload, "application", TRUE, "X-GST", 90000); res = gst_basertppayload_set_outcaps (payload, "caps", G_TYPE_STRING, capsenc, @@ -168,153 +277,26 @@ static GstFlowReturn gst_rtp_gst_pay_handle_buffer (GstBaseRTPPayload * basepayload, GstBuffer * buffer) { - GstRtpGSTPay *rtpgstpay; - guint8 *data; - guint size; - GstBuffer *outbuf; GstFlowReturn ret; + GstRtpGSTPay *rtpgstpay; GstClockTime timestamp; - guint32 frag_offset; - guint flags; - gchar *capsstr; - guint capslen; - guint capslen_prefix_len; rtpgstpay = GST_RTP_GST_PAY (basepayload); - size = GST_BUFFER_SIZE (buffer); - data = GST_BUFFER_DATA (buffer); timestamp = GST_BUFFER_TIMESTAMP (buffer); - ret = GST_FLOW_OK; - /* caps always from SDP for now */ - flags = 0; if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT)) - flags |= (1 << 3); + rtpgstpay->flags |= (1 << 3); if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_MEDIA1)) - flags |= (1 << 2); + rtpgstpay->flags |= (1 << 2); if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_MEDIA2)) - flags |= (1 << 1); + rtpgstpay->flags |= (1 << 1); if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_MEDIA3)) - flags |= (1 << 0); - - capsstr = rtpgstpay->capsstr; - capslen = rtpgstpay->capslen; - if (capslen) { - /* start of buffer, calculate length */ - capslen_prefix_len = 1; - while (capslen >> (7 * capslen_prefix_len)) - capslen_prefix_len++; - - GST_DEBUG_OBJECT (rtpgstpay, "sending inline caps"); - rtpgstpay->next_CV++; - - flags |= (1 << 7); - } else { - capslen_prefix_len = 0; - } - - flags |= (rtpgstpay->current_CV << 4); - - /* - * 0 1 2 3 - * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 - * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - * |C| CV |D|X|Y|Z| MBZ | - * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - * | Frag_offset | - * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - */ - frag_offset = 0; - GST_DEBUG_OBJECT (basepayload, "buffer size=%u", size); - - while (size > 0) { - guint towrite; - guint8 *payload; - guint payload_len; - guint packet_len; - - /* this will be the total lenght of the packet */ - packet_len = - gst_rtp_buffer_calc_packet_len (8 + capslen + capslen_prefix_len + size, - 0, 0); - - /* fill one MTU or all available bytes */ - towrite = MIN (packet_len, GST_BASE_RTP_PAYLOAD_MTU (rtpgstpay)); + rtpgstpay->flags |= (1 << 0); - /* this is the payload length */ - payload_len = gst_rtp_buffer_calc_payload_len (towrite, 0, 0); - - /* create buffer to hold the payload */ - outbuf = gst_rtp_buffer_new_allocate (payload_len, 0, 0); - payload = gst_rtp_buffer_get_payload (outbuf); - - GST_DEBUG_OBJECT (basepayload, "new packet len %u, frag %u", packet_len, - frag_offset); - - payload[0] = flags; - payload[1] = payload[2] = payload[3] = 0; - payload[4] = frag_offset >> 24; - payload[5] = frag_offset >> 16; - payload[6] = frag_offset >> 8; - payload[7] = frag_offset & 0xff; - - payload += 8; - payload_len -= 8; - - if (capslen) { - guint tocopy; - - /* we need to write caps */ - if (frag_offset == 0) { - /* write caps length */ - while (capslen_prefix_len) { - capslen_prefix_len--; - *payload++ = ((capslen_prefix_len > 0) ? 0x80 : 0) | - ((capslen >> (7 * capslen_prefix_len)) & 0x7f); - payload_len--; - frag_offset++; - } - } - - tocopy = MIN (payload_len, capslen); - GST_DEBUG_OBJECT (basepayload, "copy %u bytes from caps to payload", - tocopy); - memcpy (payload, capsstr, tocopy); - - capsstr += tocopy; - capslen -= tocopy; - payload += tocopy; - payload_len -= tocopy; - frag_offset += tocopy; - - if (capslen == 0) { - rtpgstpay->capslen = 0; - g_free (rtpgstpay->capsstr); - rtpgstpay->capsstr = NULL; - } - } - - if (capslen == 0) { - /* no more caps, continue with data */ - GST_DEBUG_OBJECT (basepayload, "copy %u bytes from buffer to payload", - payload_len); - memcpy (payload, data, payload_len); - - data += payload_len; - size -= payload_len; - frag_offset += payload_len; - } - - if (size == 0) - gst_rtp_buffer_set_marker (outbuf, TRUE); - - GST_BUFFER_TIMESTAMP (outbuf) = timestamp; - - ret = gst_basertppayload_push (basepayload, outbuf); - } - gst_buffer_unref (buffer); + gst_adapter_push (rtpgstpay->adapter, buffer); + ret = gst_rtp_gst_pay_flush (rtpgstpay, timestamp); return ret; } diff --git a/gst/rtp/gstrtpgstpay.h b/gst/rtp/gstrtpgstpay.h index 6eeb8cc7b..ed54a7e9a 100644 --- a/gst/rtp/gstrtpgstpay.h +++ b/gst/rtp/gstrtpgstpay.h @@ -22,6 +22,7 @@ #include #include +#include G_BEGIN_DECLS @@ -43,8 +44,9 @@ struct _GstRtpGSTPay { GstBaseRTPPayload payload; - gchar *capsstr; - guint capslen; + GstAdapter *adapter; + guint8 flags; + guint8 current_CV; /* CV field of incoming caps*/ guint8 next_CV; }; -- cgit v1.2.3