summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWim Taymans <wim.taymans@collabora.co.uk>2010-12-15 19:43:10 +0100
committerWim Taymans <wim.taymans@collabora.co.uk>2010-12-20 12:50:40 +0100
commit28b15a60ebb713a42a214d70b55b7f4a7a7ea10b (patch)
tree1dc18bff56872e295ce3b1b2e4402159a7d53ddd
parent2286326203b6ac229828f46662f6498b1044d413 (diff)
wj2depay: rwrite depayloader
Make depayloader more resilient to packet loss.
-rw-r--r--gst/rtp/gstrtpwj2depay.c895
-rw-r--r--gst/rtp/gstrtpwj2depay.h28
2 files changed, 560 insertions, 363 deletions
diff --git a/gst/rtp/gstrtpwj2depay.c b/gst/rtp/gstrtpwj2depay.c
index 9d0ea73ce..96d609c9f 100644
--- a/gst/rtp/gstrtpwj2depay.c
+++ b/gst/rtp/gstrtpwj2depay.c
@@ -1,5 +1,5 @@
/* GStreamer
- * Copyright (C) <2010> Wim Taymans <wim.taymans@gmail.com>
+ * Copyright (C) <2009> Wim Taymans <wim.taymans@gmail.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
@@ -46,11 +46,37 @@ GST_STATIC_PAD_TEMPLATE ("sink",
"clock-rate = (int) 90000, " "encoding-name = (string) \"X-WJ2\"")
);
+#define HEADER_SIZE 10
+
+typedef enum
+{
+ WJ2_MARKER = 0xFF,
+ WJ2_MARKER_SOC = 0x4F,
+ WJ2_MARKER_SOT = 0x90,
+ WJ2_MARKER_SOP = 0x91,
+ WJ2_MARKER_SOD = 0x93,
+ WJ2_MARKER_EOC = 0xD9
+} RtpWJ2Marker;
+
+#define DEFAULT_BUFFER_LIST TRUE
+
+enum
+{
+ PROP_0,
+ PROP_BUFFER_LIST,
+ PROP_LAST
+};
+
GST_BOILERPLATE (GstRtpWJ2Depay, gst_rtp_wj2_depay, GstBaseRTPDepayload,
GST_TYPE_BASE_RTP_DEPAYLOAD);
static void gst_rtp_wj2_depay_finalize (GObject * object);
+static void gst_rtp_wj2_depay_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec);
+static void gst_rtp_wj2_depay_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec);
+
static GstStateChangeReturn
gst_rtp_wj2_depay_change_state (GstElement * element,
GstStateChange transition);
@@ -89,6 +115,14 @@ gst_rtp_wj2_depay_class_init (GstRtpWJ2DepayClass * klass)
gobject_class->finalize = gst_rtp_wj2_depay_finalize;
+ gobject_class->set_property = gst_rtp_wj2_depay_set_property;
+ gobject_class->get_property = gst_rtp_wj2_depay_get_property;
+
+ g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BUFFER_LIST,
+ g_param_spec_boolean ("buffer-list", "Buffer List",
+ "Use Buffer Lists",
+ DEFAULT_BUFFER_LIST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
gstelement_class->change_state = gst_rtp_wj2_depay_change_state;
gstbasertpdepayload_class->set_caps = gst_rtp_wj2_depay_setcaps;
@@ -99,59 +133,28 @@ gst_rtp_wj2_depay_class_init (GstRtpWJ2DepayClass * klass)
}
static void
-gst_rtp_wj2_store_wheader (GstRtpWJ2Depay * rtpwj2depay, guint idx,
- GstBuffer * buf)
-{
- GstBuffer *old;
-
- GST_DEBUG_OBJECT (rtpwj2depay, "storing WJ2 header %p at index %u", buf, idx);
- if ((old = rtpwj2depay->WH[idx]))
- gst_buffer_unref (old);
- rtpwj2depay->WH[idx] = buf;
- /* after we completed the header, we can accept packets with any WH_id */
- rtpwj2depay->WH_id = -1;
-}
-
-static void
-gst_rtp_wj2_store_mheader (GstRtpWJ2Depay * rtpwj2depay, guint idx,
- GstBuffer * buf)
+free_buffer (gpointer data)
{
- GstBuffer *old;
-
- GST_DEBUG_OBJECT (rtpwj2depay, "storing main header %p at index %u", buf,
- idx);
- if ((old = rtpwj2depay->MH[idx]))
- gst_buffer_unref (old);
- rtpwj2depay->MH[idx] = buf;
+ if (data)
+ gst_buffer_unref (GST_BUFFER_CAST (data));
}
static void
-gst_rtp_wj2_clear_caches (GstRtpWJ2Depay * rtpwj2depay)
+gst_rtp_wj2_depay_init (GstRtpWJ2Depay * rtpwj2depay,
+ GstRtpWJ2DepayClass * klass)
{
- guint i;
+ rtpwj2depay->buffer_list = DEFAULT_BUFFER_LIST;
- for (i = 0; i < 16; i++)
- gst_rtp_wj2_store_wheader (rtpwj2depay, i, NULL);
+ rtpwj2depay->tiles = g_ptr_array_new_with_free_func (free_buffer);
- for (i = 0; i < 8; i++)
- gst_rtp_wj2_store_mheader (rtpwj2depay, i, NULL);
+ rtpwj2depay->pu_adapter = gst_adapter_new ();
+ rtpwj2depay->t_adapter = gst_adapter_new ();
+ rtpwj2depay->f_adapter = gst_adapter_new ();
}
-static void
-gst_rtp_wj2_clear_adapter (GstRtpWJ2Depay * rtpwj2depay, gboolean reset_offset)
-{
- GST_LOG_OBJECT (rtpwj2depay, "clearing adapter");
- gst_adapter_clear (rtpwj2depay->adapter);
- rtpwj2depay->WH_id = -1;
- rtpwj2depay->MH_id = -1;
- if (reset_offset)
- rtpwj2depay->offset = 0;
- rtpwj2depay->state = GST_RTP_WJ2_STATE_NONE;
-}
static void
-gst_rtp_wj2_store_tile (GstRtpWJ2Depay * rtpwj2depay, guint idx,
- GstBuffer * buf)
+store_tile (GstRtpWJ2Depay * rtpwj2depay, guint idx, GstBuffer * buf)
{
if (idx < rtpwj2depay->tiles->len) {
GstBuffer *old;
@@ -160,6 +163,7 @@ gst_rtp_wj2_store_tile (GstRtpWJ2Depay * rtpwj2depay, guint idx,
if ((old = g_ptr_array_index (rtpwj2depay->tiles, idx)))
gst_buffer_unref (old);
+
g_ptr_array_index (rtpwj2depay->tiles, idx) = buf;
} else {
GST_DEBUG_OBJECT (rtpwj2depay, "invalid tile index %u", idx);
@@ -169,28 +173,66 @@ gst_rtp_wj2_store_tile (GstRtpWJ2Depay * rtpwj2depay, guint idx,
}
static void
-gst_rtp_wj2_clear_tiles (GstRtpWJ2Depay * rtpwj2depay)
+clear_tiles (GstRtpWJ2Depay * rtpwj2depay)
{
guint i, len;
len = rtpwj2depay->tiles->len;
for (i = 0; i < len; i++)
- gst_rtp_wj2_store_tile (rtpwj2depay, i, NULL);
+ store_tile (rtpwj2depay, i, NULL);
}
static void
-free_buffer (gpointer data)
+store_wheader (GstRtpWJ2Depay * rtpwj2depay, guint idx, GstBuffer * buf)
{
- if (data)
- gst_buffer_unref (GST_BUFFER_CAST (data));
+ GstBuffer *old;
+
+ GST_DEBUG_OBJECT (rtpwj2depay, "storing WJ2 header %p at index %u", buf, idx);
+ if ((old = rtpwj2depay->WH[idx]))
+ gst_buffer_unref (old);
+ rtpwj2depay->WH[idx] = buf;
}
static void
-gst_rtp_wj2_depay_init (GstRtpWJ2Depay * rtpwj2depay,
- GstRtpWJ2DepayClass * klass)
+clear_wheaders (GstRtpWJ2Depay * rtpwj2depay)
{
- rtpwj2depay->adapter = gst_adapter_new ();
- rtpwj2depay->tiles = g_ptr_array_new_with_free_func (free_buffer);
+ guint i;
+
+ for (i = 0; i < 16; i++)
+ store_wheader (rtpwj2depay, i, NULL);
+}
+
+static void
+store_mheader (GstRtpWJ2Depay * rtpwj2depay, guint idx, GstBuffer * buf)
+{
+ GstBuffer *old;
+
+ GST_DEBUG_OBJECT (rtpwj2depay, "storing main header %p at index %u", buf,
+ idx);
+ if ((old = rtpwj2depay->MH[idx]))
+ gst_buffer_unref (old);
+ rtpwj2depay->MH[idx] = buf;
+}
+
+static void
+clear_mheaders (GstRtpWJ2Depay * rtpwj2depay)
+{
+ guint i;
+
+ for (i = 0; i < 8; i++)
+ store_mheader (rtpwj2depay, i, NULL);
+}
+
+static void
+gst_rtp_wj2_depay_reset (GstRtpWJ2Depay * rtpwj2depay)
+{
+ clear_tiles (rtpwj2depay);
+ clear_wheaders (rtpwj2depay);
+ clear_mheaders (rtpwj2depay);
+ gst_adapter_clear (rtpwj2depay->pu_adapter);
+ gst_adapter_clear (rtpwj2depay->t_adapter);
+ gst_adapter_clear (rtpwj2depay->f_adapter);
+ rtpwj2depay->next_frag = 0;
}
static void
@@ -200,11 +242,11 @@ gst_rtp_wj2_depay_finalize (GObject * object)
rtpwj2depay = GST_RTP_WJ2_DEPAY (object);
- gst_rtp_wj2_clear_caches (rtpwj2depay);
- gst_rtp_wj2_clear_tiles (rtpwj2depay);
+ clear_mheaders (rtpwj2depay);
- g_object_unref (rtpwj2depay->adapter);
- g_ptr_array_free (rtpwj2depay->tiles, TRUE);
+ g_object_unref (rtpwj2depay->pu_adapter);
+ g_object_unref (rtpwj2depay->t_adapter);
+ g_object_unref (rtpwj2depay->f_adapter);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
@@ -230,11 +272,230 @@ gst_rtp_wj2_depay_setcaps (GstBaseRTPDepayload * depayload, GstCaps * caps)
return res;
}
+static void
+gst_rtp_wj2_depay_clear_pu (GstRtpWJ2Depay * rtpwj2depay)
+{
+ gst_adapter_clear (rtpwj2depay->pu_adapter);
+ rtpwj2depay->have_sync = FALSE;
+}
+
+/* calculate how many tiles there are in the wj2 header */
+static void
+gst_rtp_wj2_ensure_ntiles (GstRtpWJ2Depay * rtpwj2depay, GstBuffer * buf)
+{
+ guint size, tiles;
+
+ size = GST_BUFFER_SIZE (buf);
+ if (size < 32)
+ return;
+
+ tiles = (size - 32) / 4;
+ GST_DEBUG_OBJECT (rtpwj2depay, "ensure %u tiles from size %u", tiles, size);
+ g_ptr_array_set_size (rtpwj2depay->tiles, tiles);
+}
+
+static GstFlowReturn
+gst_rtp_wj2_depay_flush_pu (GstBaseRTPDepayload * depayload)
+{
+ GstRtpWJ2Depay *rtpwj2depay;
+ GstBuffer *header;
+ guint avail, MHF, MH_id, tp, tci;
+
+ rtpwj2depay = GST_RTP_WJ2_DEPAY (depayload);
+
+ /* take all available buffers */
+ avail = gst_adapter_available (rtpwj2depay->pu_adapter);
+ if (avail == 0)
+ goto done;
+
+ tp = rtpwj2depay->pu_tp;
+ tci = rtpwj2depay->pu_tci;
+ MHF = rtpwj2depay->pu_MHF;
+ MH_id = rtpwj2depay->last_MH_id;
+
+ GST_DEBUG_OBJECT (rtpwj2depay, "flushing PU of size %u", avail);
+
+ if (tp == 3) {
+ /* we have a WJ2 header */
+ GST_DEBUG_OBJECT (rtpwj2depay, "keeping WJ2 header %u", MH_id);
+ header = gst_adapter_take_buffer (rtpwj2depay->pu_adapter, avail);
+
+ gst_rtp_wj2_ensure_ntiles (rtpwj2depay, header);
+ store_wheader (rtpwj2depay, MH_id, header);
+ } else if (MHF == 0) {
+ GList *packets, *walk;
+
+ packets = gst_adapter_take_list (rtpwj2depay->pu_adapter, avail);
+ /* append packets */
+ for (walk = packets; walk; walk = g_list_next (walk)) {
+ GstBuffer *buf = GST_BUFFER_CAST (walk->data);
+ GST_DEBUG_OBJECT (rtpwj2depay, "append pu packet of size %u",
+ GST_BUFFER_SIZE (buf));
+ gst_adapter_push (rtpwj2depay->t_adapter, buf);
+ }
+ g_list_free (packets);
+ } else {
+ /* we have a header */
+ GST_DEBUG_OBJECT (rtpwj2depay, "keeping header %u", MH_id);
+ /* we managed to see the start and end of the header, take all from
+ * adapter and keep in header */
+ header = gst_adapter_take_buffer (rtpwj2depay->pu_adapter, avail);
+
+ store_mheader (rtpwj2depay, MH_id, header);
+ }
+
+done:
+ rtpwj2depay->have_sync = FALSE;
+
+ return GST_FLOW_OK;
+}
+
+static GstFlowReturn
+gst_rtp_wj2_depay_flush_tile (GstBaseRTPDepayload * depayload)
+{
+ GstRtpWJ2Depay *rtpwj2depay;
+ guint avail, MH_id;
+ GList *packets, *walk;
+ guint8 end[2];
+ GstFlowReturn ret = GST_FLOW_OK;
+
+ rtpwj2depay = GST_RTP_WJ2_DEPAY (depayload);
+
+ /* flush pending PU */
+ gst_rtp_wj2_depay_flush_pu (depayload);
+
+ /* take all available buffers */
+ avail = gst_adapter_available (rtpwj2depay->t_adapter);
+ if (avail == 0)
+ goto done;
+
+ MH_id = rtpwj2depay->last_MH_id;
+
+ GST_DEBUG_OBJECT (rtpwj2depay, "flushing tile of size %u", avail);
+
+ if (gst_adapter_available (rtpwj2depay->f_adapter) == 0) {
+ GstBuffer *mheader;
+
+ /* we need a header now */
+ if ((mheader = rtpwj2depay->MH[MH_id]) == NULL)
+ goto waiting_header;
+
+ rtpwj2depay->frame_tci = rtpwj2depay->pu_tci;
+
+ /* push header in the adapter */
+ GST_DEBUG_OBJECT (rtpwj2depay, "pushing header %u", MH_id);
+ gst_adapter_push (rtpwj2depay->f_adapter, gst_buffer_ref (mheader));
+ }
+
+ /* check for last bytes */
+ gst_adapter_copy (rtpwj2depay->t_adapter, end, avail - 2, 2);
+
+ /* now append the tile packets to the frame */
+ packets = gst_adapter_take_list (rtpwj2depay->t_adapter, avail);
+ for (walk = packets; walk; walk = g_list_next (walk)) {
+ GstBuffer *buf = GST_BUFFER_CAST (walk->data);
+
+ if (walk == packets) {
+ guint8 *data;
+ guint size;
+
+ /* first buffer should contain the SOT */
+ data = GST_BUFFER_DATA (buf);
+ size = GST_BUFFER_SIZE (buf);
+
+ if (size < 12)
+ goto invalid_tile;
+
+ if (data[0] == 0xff && data[1] == WJ2_MARKER_SOT) {
+ guint Psot, nPsot;
+
+ if (end[0] == 0xff && end[1] == WJ2_MARKER_EOC)
+ nPsot = avail - 2;
+ else
+ nPsot = avail;
+
+ Psot = GST_READ_UINT32_BE (&data[6]);
+ if (Psot != nPsot && Psot != 0) {
+ /* Psot must match the size of the tile */
+ GST_DEBUG_OBJECT (rtpwj2depay, "set Psot from %u to %u", Psot, nPsot);
+ buf = gst_buffer_make_writable (buf);
+ data = GST_BUFFER_DATA (buf);
+ GST_WRITE_UINT32_BE (&data[6], nPsot);
+ }
+ }
+ }
+
+ GST_DEBUG_OBJECT (rtpwj2depay, "append pu packet of size %u",
+ GST_BUFFER_SIZE (buf));
+ gst_adapter_push (rtpwj2depay->f_adapter, buf);
+ }
+ g_list_free (packets);
+
+done:
+ rtpwj2depay->last_tile = -1;
+
+ return ret;
+
+ /* errors */
+waiting_header:
+ {
+ GST_DEBUG_OBJECT (rtpwj2depay, "waiting for header %u", MH_id);
+ gst_adapter_clear (rtpwj2depay->t_adapter);
+ rtpwj2depay->last_tile = -1;
+ return ret;
+ }
+invalid_tile:
+ {
+ GST_ELEMENT_WARNING (rtpwj2depay, STREAM, DECODE, ("Invalid tile"), (NULL));
+ gst_adapter_clear (rtpwj2depay->t_adapter);
+ rtpwj2depay->last_tile = -1;
+ return ret;
+ }
+}
+
+static GstFlowReturn
+gst_rtp_wj2_depay_flush_frame (GstBaseRTPDepayload * depayload)
+{
+ GstRtpWJ2Depay *rtpwj2depay;
+ guint avail;
+ GstBuffer *outbuf;
+
+ GstFlowReturn ret = GST_FLOW_OK;
+
+ rtpwj2depay = GST_RTP_WJ2_DEPAY (depayload);
+
+ /* flush pending tile */
+ gst_rtp_wj2_depay_flush_tile (depayload);
+
+ /* last buffer take all data out of the adapter */
+ avail = gst_adapter_available (rtpwj2depay->f_adapter);
+ if (avail == 0)
+ goto done;
+
+ outbuf = gst_adapter_take_buffer (rtpwj2depay->f_adapter, avail);
+ store_tile (rtpwj2depay, rtpwj2depay->frame_tci, outbuf);
+
+ /* we accept any tci now */
+ rtpwj2depay->frame_tci = -1;
+
+ /* reset state */
+ rtpwj2depay->next_frag = 0;
+ rtpwj2depay->have_sync = FALSE;
+
+done:
+ /* we can't keep headers with MH_id of 0 */
+ store_mheader (rtpwj2depay, 0, NULL);
+
+ return ret;
+}
+
/* parse and store the WJ2 header packet */
-static GstBuffer *
-gst_rtp_wj2_flush_frame (GstRtpWJ2Depay * rtpwj2depay)
+static GstFlowReturn
+gst_rtp_wj2_depay_flush_wj2 (GstBaseRTPDepayload * depayload)
{
- GstBuffer *wheader, *buf, *out = NULL;
+ GstRtpWJ2Depay *rtpwj2depay;
+ GstFlowReturn ret = GST_FLOW_OK;
+ GstBuffer *wheader, *buf;
gint WH_id, MH_id, size, wsize;
guint Xsiz, Ysiz, XOsiz, YOsiz, XTsiz, YTsiz, XTOsiz, YTOsiz;
guint TC_count, T_count, C_count;
@@ -243,8 +504,13 @@ gst_rtp_wj2_flush_frame (GstRtpWJ2Depay * rtpwj2depay)
GstBufferList *blist;
GstBufferListIterator *it;
- WH_id = rtpwj2depay->WH_id;
- MH_id = rtpwj2depay->MH_id;
+ rtpwj2depay = GST_RTP_WJ2_DEPAY (depayload);
+
+ /* flush pending tile */
+ gst_rtp_wj2_depay_flush_frame (depayload);
+
+ WH_id = rtpwj2depay->last_WH_id;
+ MH_id = rtpwj2depay->last_MH_id;
if (WH_id == -1 || MH_id == -1)
goto done;
@@ -272,6 +538,15 @@ gst_rtp_wj2_flush_frame (GstRtpWJ2Depay * rtpwj2depay)
T_count = (Xsiz / XTsiz) * (Ysiz / YTsiz);
C_count = TC_count / T_count;
+ if (rtpwj2depay->tiles->len < TC_count)
+ goto missing_buffer;
+
+ /* check if we have all tiles */
+ for (i = 0; i < TC_count; i++) {
+ if (g_ptr_array_index (rtpwj2depay->tiles, i) == NULL)
+ goto missing_buffer;
+ }
+
GST_DEBUG_OBJECT (rtpwj2depay, "TC %u, T %u, C %u", TC_count, T_count,
C_count);
@@ -280,7 +555,6 @@ gst_rtp_wj2_flush_frame (GstRtpWJ2Depay * rtpwj2depay)
buf = gst_buffer_new_and_alloc (wsize);
wdata = GST_BUFFER_DATA (buf);
offset = wsize;
-
GST_DEBUG_OBJECT (rtpwj2depay, "Header size %u", wsize);
/* fill partial header */
@@ -309,10 +583,9 @@ gst_rtp_wj2_flush_frame (GstRtpWJ2Depay * rtpwj2depay)
wdata[74 + (9 * i)] = data[32 + (4 * i)] & 0xf;
/* fill in offset */
GST_WRITE_UINT32_LE (wdata + 75 + (9 * i), offset);
+
/* take the data buffer and its length */
J_buf = g_ptr_array_index (rtpwj2depay->tiles, i);
- if (J_buf == NULL)
- goto missing_buffer;
length = GST_BUFFER_SIZE (J_buf);
/* align length and add ADV header length */
@@ -355,6 +628,8 @@ gst_rtp_wj2_flush_frame (GstRtpWJ2Depay * rtpwj2depay)
/* take the data buffer and its length */
J_buf = g_ptr_array_index (rtpwj2depay->tiles, i);
+ g_ptr_array_index (rtpwj2depay->tiles, i) = NULL;
+
wsize = GST_BUFFER_SIZE (J_buf);
/* align length and add ADV header length */
length = GST_ROUND_UP_32 (wsize);
@@ -372,16 +647,20 @@ gst_rtp_wj2_flush_frame (GstRtpWJ2Depay * rtpwj2depay)
}
gst_buffer_list_iterator_free (it);
- it = gst_buffer_list_iterate (blist);
- gst_buffer_list_iterator_next_group (it);
- out = gst_buffer_list_iterator_merge_group (it);
- gst_buffer_list_iterator_free (it);
+ ret = gst_base_rtp_depayload_push_list (depayload, blist);
+
+ /* we accept any WH_id and MH_id now */
+ rtpwj2depay->last_WH_id = -1;
+ rtpwj2depay->last_MH_id = -1;
+
+ /* reset state */
+ rtpwj2depay->next_frag = 0;
+ rtpwj2depay->have_sync = FALSE;
done:
- gst_rtp_wj2_clear_tiles (rtpwj2depay);
- gst_rtp_wj2_clear_adapter (rtpwj2depay, TRUE);
+ clear_tiles (rtpwj2depay);
- return out;
+ return ret;
/* ERRORS */
missing_buffer:
@@ -391,305 +670,186 @@ missing_buffer:
}
}
-static void
-gst_rtp_wj2_new_state (GstRtpWJ2Depay * rtpwj2depay, GstRtpWJ2State state)
-{
- GstRtpWJ2State old = rtpwj2depay->state;
-
- GST_DEBUG_OBJECT (rtpwj2depay, "state %d -> %d", old, state);
-
- if (old == GST_RTP_WJ2_STATE_NONE || old == state) {
- /* we're at the desired state, all fine */
- } else if (state == GST_RTP_WJ2_STATE_WHEADER) {
- if (old == GST_RTP_WJ2_STATE_MHEADER) {
- /* we found a wj2 header and we were doing Main headers,
- * clear main headers */
- gst_rtp_wj2_clear_adapter (rtpwj2depay, TRUE);
- } else if (old == GST_RTP_WJ2_STATE_MDATA) {
- /* we found a wj2 header and we were doing J2K data,
- * flush the accumulated data */
- }
- } else if (state == GST_RTP_WJ2_STATE_MHEADER) {
- if (old == GST_RTP_WJ2_STATE_WHEADER) {
- /* we found a main header and we were doing wj2 headers,
- * clear wj2 headers */
- gst_rtp_wj2_clear_adapter (rtpwj2depay, TRUE);
- } else if (old == GST_RTP_WJ2_STATE_MDATA) {
- /* we found a main header and we were doing j2k data,
- * flush data */
- rtpwj2depay->offset = 0;
- }
- } else if (state == GST_RTP_WJ2_STATE_MDATA) {
- if (old == GST_RTP_WJ2_STATE_WHEADER) {
- /* we found j2k data and we were doing w2j headers,
- * flush headers */
- } else if (old == GST_RTP_WJ2_STATE_MHEADER) {
- /* we found j2k data and we were doing main headers,
- * flush headers */
- }
- }
- rtpwj2depay->state = state;;
-}
-/* parse and store the WJ2 header packet */
-static void
-gst_rtp_wj2_handle_w_header (GstRtpWJ2Depay * rtpwj2depay, GstBuffer * buf,
- guint8 * payload)
+static GstBuffer *
+gst_rtp_wj2_depay_process (GstBaseRTPDepayload * depayload, GstBuffer * buf)
{
- GstBuffer *WH_buf;
- guint offset, WHF, avail, WH_id;
+ GstRtpWJ2Depay *rtpwj2depay;
+ guint8 *payload;
+ guint header_size, frag_offset, tile, payload_len, wj2len;
+ gint gap;
+ guint32 rtptime;
+ guint tp, MHF, L, MH_id, WH_id, tci;
+
+ rtpwj2depay = GST_RTP_WJ2_DEPAY (depayload);
+
+ payload = gst_rtp_buffer_get_payload (buf);
+ payload_len = gst_rtp_buffer_get_payload_len (buf);
+
+ /* we need at least a header */
+ if (payload_len < 8)
+ goto empty_packet;
+
+ rtptime = gst_rtp_buffer_get_timestamp (buf);
- /* first update our state */
- gst_rtp_wj2_new_state (rtpwj2depay, GST_RTP_WJ2_STATE_WHEADER);
+ /* new timestamp marks new frame */
+ if (rtpwj2depay->last_rtptime != rtptime) {
+ rtpwj2depay->last_rtptime = rtptime;
+ /* flush pending WJ2 frame */
+ gst_rtp_wj2_depay_flush_wj2 (depayload);
+ }
/* 0 1 2 3
* 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |tp |MHF|MH_id|T| priority | tile number |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * | MBZ |L| WH_id | fragment offset |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * | Tile Component index |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ *
+ * With tp = 3:
+ *
+ * 0 1 2 3
+ * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* |1 1|WHF| WH_id | MBZ | fragment offset |
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
*
* WHF = 1 piece, 2 end, 3 complete
*/
- WHF = (payload[0] & 0x30) >> 4;
- WH_id = (payload[0] & 0xf);
- offset = (payload[2] << 8) | payload[3];
-
- GST_DEBUG_OBJECT (rtpwj2depay, "WHF %u, WH_id %u, offset %u, expect %u",
- WHF, WH_id, offset, rtpwj2depay->offset);
-
- /* we have a valid WH_id if we didn't have a previous one of it it matched the
- * previous one */
- if (rtpwj2depay->WH_id == -1)
- rtpwj2depay->WH_id = WH_id;
- else if (rtpwj2depay->WH_id != WH_id)
- goto unexpected_header;
-
- /* valid offset when it matches our expected offset, we can't really generate
- * dummy data for this header*/
- if (rtpwj2depay->offset != offset)
- goto unexpected_header;
-
- /* take WJ2 header data, push in the adapter */
- WH_buf = gst_rtp_buffer_get_payload_subbuffer (buf, 4, -1);
- gst_adapter_push (rtpwj2depay->adapter, WH_buf);
-
- /* adjust the expected next offset */
- rtpwj2depay->offset += GST_BUFFER_SIZE (WH_buf);
+ tp = payload[0] >> 6;
+ MHF = (payload[0] & 0x30) >> 4;
- if ((WHF & 2) == 2) {
- /* we have a complete WJ2 header */
- avail = gst_adapter_available (rtpwj2depay->adapter);
- WH_buf = gst_adapter_take_buffer (rtpwj2depay->adapter, avail);
+ if (tp == 3) {
+ /* WJ2 header */
+ MH_id = (payload[0] & 0xf);
+ frag_offset = (payload[2] << 8) | payload[3];
+ tile = 0;
+ header_size = 4;
+ tci = 0;
+ } else {
+ MH_id = (payload[0] & 0xe) >> 1;
+ /* else jpeg 2000 packet */
+ tile = (payload[2] << 8) | payload[3];
+ L = (payload[4] & 0x10) >> 4;
+ WH_id = payload[4] & 0xf;
+ frag_offset = (payload[5] << 16) | (payload[6] << 8) | payload[7];
+ tci = (payload[8] << 8) | payload[9];
- GST_LOG_OBJECT (rtpwj2depay, "complete header %u of size %u", WH_id, avail);
+ if (rtpwj2depay->last_WH_id == -1)
+ rtpwj2depay->last_WH_id = WH_id;
+ else if (rtpwj2depay->last_WH_id != WH_id)
+ goto wrong_WH_id;
- /* store the new header */
- gst_rtp_wj2_store_wheader (rtpwj2depay, WH_id, WH_buf);
+ header_size = HEADER_SIZE;
}
- return;
-
- /* ERRORS */
-unexpected_header:
- {
- /* doesn't match, we need to discard data */
- gst_rtp_wj2_clear_adapter (rtpwj2depay, TRUE);
- return;
- }
-}
-
-/* calculate how many tiles there are in the wj2 header */
-static void
-gst_rtp_wj2_ensure_ntiles (GstRtpWJ2Depay * rtpwj2depay, GstBuffer * buf)
-{
- guint size, tiles;
- if (buf == NULL)
- return;
-
- size = GST_BUFFER_SIZE (buf);
- if (size < 32)
- return;
+ wj2len = payload_len - header_size;
- tiles = (size - 32) / 4;
- GST_DEBUG_OBJECT (rtpwj2depay, "ensure %u tiles from size %u", tiles, size);
- g_ptr_array_set_size (rtpwj2depay->tiles, tiles);
-}
+ if (rtpwj2depay->last_MH_id == -1)
+ rtpwj2depay->last_MH_id = MH_id;
+ else if (rtpwj2depay->last_MH_id != MH_id)
+ goto wrong_MH_id;
-static GstBuffer *
-gst_rtp_wj2_depay_process (GstBaseRTPDepayload * depayload, GstBuffer * buf)
-{
- GstRtpWJ2Depay *rtpwj2depay;
- guint8 *payload;
- guint tp;
- GstBuffer *out = NULL;
+ GST_DEBUG_OBJECT (rtpwj2depay,
+ "tp %u, tci %u, MHF %u, MH_id %u, tile %u, frag %u, expected %u", tp, tci,
+ MHF, MH_id, tile, frag_offset, rtpwj2depay->next_frag);
- rtpwj2depay = GST_RTP_WJ2_DEPAY (depayload);
+ /* calculate the gap between expected frag */
+ gap = frag_offset - rtpwj2depay->next_frag;
+ /* calculate next frag */
+ rtpwj2depay->next_frag = frag_offset + wj2len;
- /* flush everything on discont for now */
- if (GST_BUFFER_IS_DISCONT (buf)) {
- if (rtpwj2depay->state != GST_RTP_WJ2_STATE_MDATA) {
- GST_DEBUG_OBJECT (rtpwj2depay, "DISCONT, flushing data");
- gst_rtp_wj2_clear_adapter (rtpwj2depay, TRUE);
- } else {
- GST_DEBUG_OBJECT (rtpwj2depay,
- "DISCONT, not flushing data, we in data mode");
- }
+ if (gap != 0) {
+ GST_DEBUG_OBJECT (rtpwj2depay, "discont of %d, clear PU", gap);
+ /* discont, clear pu adapter and resync */
+ gst_rtp_wj2_depay_clear_pu (rtpwj2depay);
}
- if (gst_rtp_buffer_get_payload_len (buf) < 8)
- goto empty_packet;
-
- payload = gst_rtp_buffer_get_payload (buf);
-
- /* look at the header type */
- tp = payload[0] >> 6;
-
- GST_LOG_OBJECT (rtpwj2depay, "GOT tp %d", tp);
-
+ /* check for sync code */
if (tp == 3) {
- /* handle a WJ2 header packet */
- gst_rtp_wj2_handle_w_header (rtpwj2depay, buf, payload);
- } else {
- guint MHF, L, offset, MH_id, WH_id, tci;
- GstBuffer *J_buf;
- /* 0 1 2 3
- * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
- * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- * |tp |MHF|MH_id|T| priority | tile number |
- * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- * | MBZ |L| WH_id | fragment offset |
- * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- * | Tile Component index |
- * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- */
- MHF = (payload[0] & 0x30) >> 4;
- MH_id = (payload[0] & 0xe) >> 1;
- L = (payload[4] & 0x10) >> 4;
- WH_id = payload[4] & 0xf;
- offset = (payload[5] << 16) | (payload[6] << 8) | payload[7];
- tci = (payload[8] << 8) | payload[9];
+ GST_DEBUG_OBJECT (rtpwj2depay, "found WJ2 header packet");
- GST_DEBUG_OBJECT (rtpwj2depay,
- "MHF %u, L %u, MH %u, WH %u, offset %u, TCI %u", MHF, L, MH_id, WH_id,
- offset, tci);
-
- GST_DEBUG_OBJECT (rtpwj2depay, "current WH_id %d, MH_id %d",
- rtpwj2depay->WH_id, rtpwj2depay->MH_id);
-
- if (MHF != 0) {
- gst_rtp_wj2_new_state (rtpwj2depay, GST_RTP_WJ2_STATE_MHEADER);
-
- /* we have a Main JPEG 2000 header */
- GST_DEBUG_OBJECT (rtpwj2depay,
- "we have a main header, expected offset %u", rtpwj2depay->offset);
- /* valid offset when it matches our expected offset, we can't really generate
- * dummy data for this header*/
- if (rtpwj2depay->offset != offset)
- goto invalid_m_header;
- } else {
- GstBuffer *mheader = NULL;
-
- gst_rtp_wj2_new_state (rtpwj2depay, GST_RTP_WJ2_STATE_MDATA);
-
- GST_DEBUG_OBJECT (rtpwj2depay,
- "we have JPEG2000 data, expected offset %u", rtpwj2depay->offset);
-
- /* check if we have the WJ2 header */
- if (rtpwj2depay->WH[WH_id] == NULL)
- goto waiting_header;
-
- if (MH_id != 0) {
- /* check if we have the main header */
- if ((mheader = rtpwj2depay->MH[MH_id]) == NULL)
- goto waiting_header;
- } else if (rtpwj2depay->MH_id == -1)
- goto waiting_header;
-
- if (rtpwj2depay->offset != offset) {
- guint fillsize;
-
- if (offset < rtpwj2depay->offset)
- goto invalid_packet;
-
- fillsize = offset - rtpwj2depay->offset;
-
- /* gap in the data, this could either be a missing header or a gap in the
- * data, check for missing header first */
- if (mheader && fillsize == GST_BUFFER_SIZE (mheader)) {
- GST_DEBUG_OBJECT (rtpwj2depay, "missing main header of %u bytes",
- fillsize);
- /* push header in the adapter */
- gst_adapter_push (rtpwj2depay->adapter, mheader);
+ if (frag_offset == 0) {
+ /* flush the previous frame, should have happened when the timestamp
+ * changed above. */
+ gst_rtp_wj2_depay_flush_wj2 (depayload);
+ rtpwj2depay->have_sync = TRUE;
+ }
+ } else if (wj2len > 2 && payload[HEADER_SIZE] == 0xff) {
+ guint marker = payload[HEADER_SIZE + 1];
+
+ /* packets must start with SOC, SOT or SOP */
+ switch (marker) {
+ case WJ2_MARKER_SOC:
+ GST_DEBUG_OBJECT (rtpwj2depay, "found SOC packet");
+ /* flush the previous frame */
+ gst_rtp_wj2_depay_flush_frame (depayload);
+ rtpwj2depay->have_sync = TRUE;
+ break;
+ case WJ2_MARKER_SOT:
+ /* flush the previous tile */
+ gst_rtp_wj2_depay_flush_tile (depayload);
+ GST_DEBUG_OBJECT (rtpwj2depay, "found SOT packet");
+ rtpwj2depay->have_sync = TRUE;
+ /* we sync on the tile now */
+ rtpwj2depay->last_tile = tile;
+ break;
+ case WJ2_MARKER_SOP:
+ GST_DEBUG_OBJECT (rtpwj2depay, "found SOP packet");
+ /* flush the previous PU */
+ gst_rtp_wj2_depay_flush_pu (depayload);
+ if (rtpwj2depay->last_tile != tile) {
+ /* wrong tile, we lose sync and we need a new SOT or SOC to regain
+ * sync. First flush out the previous tile if we have one. */
+ if (rtpwj2depay->last_tile != -1)
+ gst_rtp_wj2_depay_flush_tile (depayload);
+ /* now we have no more valid tile and no sync */
+ rtpwj2depay->last_tile = -1;
+ rtpwj2depay->have_sync = FALSE;
} else {
- /* fill with 0 */
- GST_DEBUG_OBJECT (rtpwj2depay, "filling gap of %u bytes", fillsize);
-
- J_buf = gst_buffer_new_and_alloc (fillsize);
- gst_adapter_push (rtpwj2depay->adapter, J_buf);
+ rtpwj2depay->have_sync = TRUE;
}
- }
+ break;
+ default:
+ GST_DEBUG_OBJECT (rtpwj2depay, "no sync packet 0x%02d", marker);
+ break;
}
+ }
- if (rtpwj2depay->WH_id == -1) {
- /* first time we lock onto a WH_id */
- rtpwj2depay->WH_id = WH_id;
- gst_rtp_wj2_ensure_ntiles (rtpwj2depay, rtpwj2depay->WH[WH_id]);
- } else if (rtpwj2depay->WH_id != WH_id)
- goto invalid_m_header;
-
- /* we have a valid MH_id if we didn't have a previous one or if it matched the
- * previous one */
- if (rtpwj2depay->MH_id == -1)
- rtpwj2depay->MH_id = MH_id;
- else if (rtpwj2depay->MH_id != MH_id)
- goto invalid_m_header;
-
- /* take JPEG2000 data, push in the adapter */
- J_buf = gst_rtp_buffer_get_payload_subbuffer (buf, 10, -1);
- GST_DEBUG_OBJECT (rtpwj2depay, "adding buffer of %u bytes",
- GST_BUFFER_SIZE (J_buf));
- rtpwj2depay->offset += GST_BUFFER_SIZE (J_buf);
- gst_adapter_push (rtpwj2depay->adapter, J_buf);
-
- if (MHF != 0) {
- if ((MHF & 2) == 2) {
- guint avail;
-
- /* we have a complete JPEG2000 header */
- avail = gst_adapter_available (rtpwj2depay->adapter);
- GST_LOG_OBJECT (rtpwj2depay, "complete main header %u of size %u",
- MH_id, avail);
-
- /* only cache headers when != 0 */
- if (MH_id != 0) {
- J_buf = gst_adapter_take_buffer (rtpwj2depay->adapter, avail);
- /* store the new header */
- gst_rtp_wj2_store_mheader (rtpwj2depay, MH_id, J_buf);
- /* and push it back in */
- gst_adapter_push (rtpwj2depay->adapter, J_buf);
- }
- }
- } else {
- /* we have a complete JPEG2000 packet */
- if (L) {
- guint avail;
-
- avail = gst_adapter_available (rtpwj2depay->adapter);
- J_buf = gst_adapter_take_buffer (rtpwj2depay->adapter, avail);
-
- GST_LOG_OBJECT (rtpwj2depay, "complete data of size %u", avail);
+ if (rtpwj2depay->have_sync) {
+ GstBuffer *pu_frag;
- gst_rtp_wj2_store_tile (rtpwj2depay, tci, J_buf);
- }
+ if (gst_adapter_available (rtpwj2depay->pu_adapter) == 0) {
+ /* first part of pu, record state */
+ GST_DEBUG_OBJECT (rtpwj2depay, "first PU");
+ rtpwj2depay->pu_tp = tp;
+ rtpwj2depay->pu_tci = tci;
+ rtpwj2depay->pu_MHF = MHF;
}
-
- /* last packet, construct WJ2 image from collected data */
- if (gst_rtp_buffer_get_marker (buf)) {
- GST_LOG_OBJECT (rtpwj2depay, "complete wj2 frame");
- out = gst_rtp_wj2_flush_frame (rtpwj2depay);
+ /* and push in pu adapter */
+ GST_DEBUG_OBJECT (rtpwj2depay, "push pu of size %u in adapter", wj2len);
+ pu_frag = gst_rtp_buffer_get_payload_subbuffer (buf, header_size, -1);
+ gst_adapter_push (rtpwj2depay->pu_adapter, pu_frag);
+
+ if (MHF & 2) {
+ /* last part of main header received, we can flush it */
+ GST_DEBUG_OBJECT (rtpwj2depay, "header end, flush pu");
+ gst_rtp_wj2_depay_flush_pu (depayload);
}
+ } else {
+ GST_DEBUG_OBJECT (rtpwj2depay, "discard packet, no sync");
+ }
+
+ /* marker bit finishes the frame */
+ if (gst_rtp_buffer_get_marker (buf)) {
+ GST_DEBUG_OBJECT (rtpwj2depay, "marker set, last buffer");
+ /* then flush frame */
+ gst_rtp_wj2_depay_flush_frame (depayload);
}
- return out;
+ return NULL;
/* ERRORS */
empty_packet:
@@ -698,24 +858,57 @@ empty_packet:
("Empty Payload."), (NULL));
return NULL;
}
-waiting_header:
+wrong_WH_id:
{
- GST_DEBUG_OBJECT (rtpwj2depay, "we are waiting for a header");
+ GST_ELEMENT_WARNING (rtpwj2depay, STREAM, DECODE,
+ ("Invalid WH_id %u, expected %u", WH_id, rtpwj2depay->last_WH_id),
+ (NULL));
+ gst_rtp_wj2_depay_clear_pu (rtpwj2depay);
return NULL;
}
-invalid_m_header:
+wrong_MH_id:
{
- /* doesn't match, we need to discard data */
- GST_DEBUG_OBJECT (rtpwj2depay, "invalid header");
- gst_adapter_clear (rtpwj2depay->adapter);
- rtpwj2depay->offset = 0;
- rtpwj2depay->MH_id = -1;
- rtpwj2depay->WH_id = -1;
+ GST_ELEMENT_WARNING (rtpwj2depay, STREAM, DECODE,
+ ("Invalid MH_id %u, expected %u", MH_id, rtpwj2depay->last_MH_id),
+ (NULL));
+ gst_rtp_wj2_depay_clear_pu (rtpwj2depay);
return NULL;
}
-invalid_packet:
- {
- return NULL;
+}
+
+static void
+gst_rtp_wj2_depay_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec)
+{
+ GstRtpWJ2Depay *rtpwj2depay;
+
+ rtpwj2depay = GST_RTP_WJ2_DEPAY (object);
+
+ switch (prop_id) {
+ case PROP_BUFFER_LIST:
+ rtpwj2depay->buffer_list = g_value_get_boolean (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_rtp_wj2_depay_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec)
+{
+ GstRtpWJ2Depay *rtpwj2depay;
+
+ rtpwj2depay = GST_RTP_WJ2_DEPAY (object);
+
+ switch (prop_id) {
+ case PROP_BUFFER_LIST:
+ g_value_set_boolean (value, rtpwj2depay->buffer_list);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
}
}
@@ -731,7 +924,7 @@ gst_rtp_wj2_depay_change_state (GstElement * element, GstStateChange transition)
case GST_STATE_CHANGE_NULL_TO_READY:
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
- gst_rtp_wj2_clear_adapter (rtpwj2depay, TRUE);
+ gst_rtp_wj2_depay_reset (rtpwj2depay);
break;
default:
break;
@@ -741,7 +934,7 @@ gst_rtp_wj2_depay_change_state (GstElement * element, GstStateChange transition)
switch (transition) {
case GST_STATE_CHANGE_PAUSED_TO_READY:
- gst_rtp_wj2_clear_adapter (rtpwj2depay, TRUE);
+ gst_rtp_wj2_depay_reset (rtpwj2depay);
break;
case GST_STATE_CHANGE_READY_TO_NULL:
break;
diff --git a/gst/rtp/gstrtpwj2depay.h b/gst/rtp/gstrtpwj2depay.h
index 5b118d0cd..df5be0232 100644
--- a/gst/rtp/gstrtpwj2depay.h
+++ b/gst/rtp/gstrtpwj2depay.h
@@ -40,27 +40,31 @@ G_BEGIN_DECLS
typedef struct _GstRtpWJ2Depay GstRtpWJ2Depay;
typedef struct _GstRtpWJ2DepayClass GstRtpWJ2DepayClass;
-typedef enum {
- GST_RTP_WJ2_STATE_NONE,
- GST_RTP_WJ2_STATE_WHEADER,
- GST_RTP_WJ2_STATE_MHEADER,
- GST_RTP_WJ2_STATE_MDATA
-} GstRtpWJ2State;
-
struct _GstRtpWJ2Depay
{
GstBaseRTPDepayload depayload;
+ guint64 last_rtptime;
+ guint last_WH_id;
+ guint last_MH_id;
+ guint last_tile;
+
GstBuffer *WH[16];
GstBuffer *MH[8];
GPtrArray *tiles;
- gint WH_id;
- gint MH_id;
- guint offset;
- GstAdapter *adapter;
- GstRtpWJ2State state;
+ guint pu_tp;
+ guint pu_MHF;
+ guint pu_tci;
+ GstAdapter *pu_adapter;
+ GstAdapter *t_adapter;
+ guint frame_tci;
+ GstAdapter *f_adapter;
+
+ guint next_frag;
+ gboolean have_sync;
+ gboolean buffer_list;
gint width, height;
};